Skip to content

Commit

Permalink
reset adapter.go
Browse files Browse the repository at this point in the history
Signed-off-by: Harisudarsan <[email protected]>
  • Loading branch information
harisudarsan1 committed Jul 4, 2024
1 parent 900f188 commit 47e722b
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 16 deletions.
10 changes: 5 additions & 5 deletions relay-server/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ WORKDIR /usr/src/kubearmor-relay-server

COPY . .

RUN make
RUN cd relay-server && make

### Copy executable image

Expand All @@ -26,14 +26,14 @@ LABEL name="kubearmor-relay-server" \
alerts, and system logs generated by KubeArmor in each node, streamlining log integration with other systems."

RUN microdnf -y update && \
microdnf -y install --nodocs --setopt=install_weak_deps=0 --setopt=keepcache=0 shadow-utils && \
microdnf clean all
microdnf -y install --nodocs --setopt=install_weak_deps=0 --setopt=keepcache=0 shadow-utils && \
microdnf clean all

RUN groupadd --gid 1000 default \
&& useradd --uid 1000 --gid default --shell /bin/bash --create-home default
&& useradd --uid 1000 --gid default --shell /bin/bash --create-home default

COPY LICENSE /licenses/license.txt
COPY --from=builder /usr/src/kubearmor-relay-server/kubearmor-relay-server /KubeArmor/kubearmor-relay-server
COPY --from=builder /usr/src/kubearmor-relay-server/relay-server/kubearmor-relay-server /KubeArmor/kubearmor-relay-server

USER default

Expand Down
30 changes: 22 additions & 8 deletions relay-server/elasticsearch/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/google/uuid"
kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log"

pb "github.com/kubearmor/KubeArmor/protobuf"
kl "github.com/kubearmor/kubearmor-relay-server/relay-server/common"

// kif "github.com/kubearmor/kubearmor-relay-server/relay-server/informers"
Expand All @@ -36,8 +37,8 @@ type ElasticsearchClient struct {
cancel context.CancelFunc
bulkIndexer esutil.BulkIndexer
ctx context.Context
alertCh chan interface{}
logCh chan interface{}
alertCh chan *pb.Alert
logCh chan *pb.Log
// client *kif.Client
}

Expand Down Expand Up @@ -74,8 +75,8 @@ func NewElasticsearchClient(esURL, Endpoint string) (*ElasticsearchClient, error
if err != nil {
log.Fatalf("Error creating the indexer: %s", err)
}
alertCh := make(chan interface{}, 10000)
logCh := make(chan interface{}, 10000)
alertCh := make(chan *pb.Alert, 10000)
logCh := make(chan *pb.Log, 10000)
kaClient := server.NewClient(Endpoint)

// k8sClient := kif.GetK8sClient()
Expand Down Expand Up @@ -147,7 +148,14 @@ func (ecl *ElasticsearchClient) Start() error {
}
kg.Printf("Checked the liveness of the gRPC server")

client.WgServer.Go(func() error {
// var wg sync.WaitGroup

// stop := make(chan struct{})
// errCh := make(chan error, 1)

client.WgServer.Add(1)
go func() error {
defer client.WgServer.Done()
for client.Running {
res, err := client.AlertStream.Recv()
if err != nil {
Expand All @@ -165,8 +173,9 @@ func (ecl *ElasticsearchClient) Start() error {
//not able to add it to Log buffer
}
}

return nil
})
}()

for i := 0; i < 5; i++ {
go func() {
Expand All @@ -181,8 +190,12 @@ func (ecl *ElasticsearchClient) Start() error {
}
}()
}
client.WgServer.Add(1)
go func() error {

defer client.WgServer.Done()
// client.WatchLogs(&wg, stop, errCh, ecl.logCh)

client.WgServer.Go(func() error {
for client.Running {
res, err := client.LogStream.Recv()
if err != nil {
Expand Down Expand Up @@ -223,8 +236,9 @@ func (ecl *ElasticsearchClient) Start() error {
fmt.Printf("%s\n", string(tel))
ecl.logCh <- res
}

return nil
})
}()

for i := 0; i < 5; i++ {
go func() {
Expand Down
7 changes: 4 additions & 3 deletions relay-server/server/relayServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/google/uuid"
pb "github.com/kubearmor/KubeArmor/protobuf"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -302,7 +301,7 @@ type LogClient struct {
Ifclient *kif.Client

// wait group
WgServer *errgroup.Group
WgServer sync.WaitGroup

Context context.Context
}
Expand Down Expand Up @@ -384,6 +383,9 @@ func NewClient(server string) *LogClient {

lc.Ifclient = Informerclient

// var g errgroup.Group
lc.WgServer = sync.WaitGroup{}

return lc
}

Expand Down Expand Up @@ -581,7 +583,6 @@ func (lc *LogClient) WatchLogs(wg *sync.WaitGroup, stop chan struct{}, errCh cha
}

}
return
}

select {
Expand Down

0 comments on commit 47e722b

Please sign in to comment.