diff --git a/relay-server/elasticsearch/adapter.go b/relay-server/elasticsearch/adapter.go index 28db4d6..459154f 100644 --- a/relay-server/elasticsearch/adapter.go +++ b/relay-server/elasticsearch/adapter.go @@ -5,22 +5,17 @@ import ( "context" "encoding/json" "fmt" - "log" - "strings" - "sync/atomic" - "time" - "github.com/cenkalti/backoff/v4" "github.com/dustin/go-humanize" "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esutil" "github.com/google/uuid" kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log" + "log" + "strings" + "sync/atomic" + "time" - pb "github.com/kubearmor/KubeArmor/protobuf" - kl "github.com/kubearmor/kubearmor-relay-server/relay-server/common" - - // kif "github.com/kubearmor/kubearmor-relay-server/relay-server/informers" "github.com/kubearmor/kubearmor-relay-server/relay-server/server" ) @@ -32,14 +27,10 @@ var ( // ElasticsearchClient Structure type ElasticsearchClient struct { - kaClient *server.LogClient esClient *elasticsearch.Client cancel context.CancelFunc bulkIndexer esutil.BulkIndexer ctx context.Context - alertCh chan *pb.Alert - logCh chan *pb.Log - // client *kif.Client } // NewElasticsearchClient creates a new Elasticsearch client with the given Elasticsearch URL @@ -75,27 +66,7 @@ func NewElasticsearchClient(esURL, Endpoint string) (*ElasticsearchClient, error if err != nil { log.Fatalf("Error creating the indexer: %s", err) } - alertCh := make(chan *pb.Alert, 10000) - logCh := make(chan *pb.Log, 10000) - kaClient := server.NewClient(Endpoint) - - // k8sClient := kif.GetK8sClient() - // cc := &kif.ClusterCache{ - // - // mu: &sync.RWMutex{}, - // - // ipPodCache: make(map[string]PodServiceInfo), - // } - // client := &kif.Client{ - // k8sClient: k8sClient, - // ClusterIPCache: cc, - // } - - // client := kif.InitializeClient() - // go kif.StartInformers(client) - - // TODO: remove this informers - return &ElasticsearchClient{kaClient: kaClient, bulkIndexer: bi, esClient: esClient, alertCh: alertCh, logCh: logCh}, nil + return &ElasticsearchClient{bulkIndexer: bi, esClient: esClient}, nil } // bulkIndex takes an interface and index name and adds the data to the Elasticsearch bulk indexer. @@ -139,115 +110,29 @@ func (ecl *ElasticsearchClient) bulkIndex(a interface{}, index string) { // Additional goroutines consume alert from the alert channel and bulk index them. func (ecl *ElasticsearchClient) Start() error { start = time.Now() - client := ecl.kaClient + // client := ecl.kaClient ecl.ctx, ecl.cancel = context.WithCancel(context.Background()) - // do healthcheck - if ok := client.DoHealthCheck(); !ok { - return fmt.Errorf("failed to check the liveness of the gRPC server") - } - kg.Printf("Checked the liveness of the gRPC server") - - // var wg sync.WaitGroup - - // stop := make(chan struct{}) - // errCh := make(chan error, 1) - - client.WgServer.Add(1) - go func() error { - defer client.WgServer.Done() - for client.Running { - res, err := client.AlertStream.Recv() - if err != nil { - return fmt.Errorf("failed to receive an alert (%s) %s", client.Server, err) - } - tel, _ := json.Marshal(res) - fmt.Printf("%s\n", string(tel)) - - select { - case ecl.alertCh <- res: - case <-client.Context.Done(): - // The context is over, stop processing results - return nil - default: - //not able to add it to Log buffer - } - } - - return nil - }() - for i := 0; i < 5; i++ { go func() { for { select { - case alert := <-ecl.alertCh: + case alert := <-server.ESAlertChannel: ecl.bulkIndex(alert, "alert") case <-ecl.ctx.Done(): - close(ecl.alertCh) + close(server.ESAlertChannel) return } } }() - } - client.WgServer.Add(1) - go func() error { - - defer client.WgServer.Done() - // client.WatchLogs(&wg, stop, errCh, ecl.logCh) - - for client.Running { - res, err := client.LogStream.Recv() - if err != nil { - kg.Warnf("Failed to receive an log (%s)", client.Server) - break - } - - if containsKprobe := strings.Contains(res.Data, "kprobe"); containsKprobe { - - resourceMap := kl.Extractdata(res.GetResource()) - remoteIP := resourceMap["remoteip"] - podserviceInfo, found := ecl.kaClient.Ifclient.ClusterIPCache.Get(remoteIP) - - if found { - switch podserviceInfo.Type { - case "POD": - resource := res.GetResource() + fmt.Sprintf(" hostname=%s podname=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.PodName, podserviceInfo.NamespaceName) - data := res.GetData() + fmt.Sprintf(" ownertype=pod") - res.Data = data - res.Resource = resource - // kg.Printf("logData:%s", res.Data) - break - case "SERVICE": - resource := res.GetResource() + fmt.Sprintf(" hostname=%s servicename=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.ServiceName, podserviceInfo.NamespaceName) - - data := res.GetData() + fmt.Sprintf(" ownertype=service") - res.Data = data - res.Resource = resource - // kg.Printf("logData:%s", res.Data) - - break - } - } - - } - tel, _ := json.Marshal(res) - fmt.Printf("%s\n", string(tel)) - ecl.logCh <- res - } - - return nil - }() - - for i := 0; i < 5; i++ { go func() { for { select { - case log := <-ecl.logCh: + case log := <-server.ESLogChannel: ecl.bulkIndex(log, "log") case <-ecl.ctx.Done(): - close(ecl.logCh) + close(server.ESLogChannel) return } } @@ -259,14 +144,10 @@ func (ecl *ElasticsearchClient) Start() error { // Stop stops the Elasticsearch client and performs necessary cleanup operations. // It stops the Kubearmor Relay client, closes the BulkIndexer and cancels the context. func (ecl *ElasticsearchClient) Stop() error { - logClient := ecl.kaClient - logClient.Running = false + // logClient := ecl.kaClient + server.Running = false time.Sleep(2 * time.Second) - //Destoy KubeArmor Relay Client - if err := logClient.DestroyClient(); err != nil { - return fmt.Errorf("failed to destroy the kubearmor relay gRPC client (%s)", err.Error()) - } kg.Printf("Destroyed kubearmor relay gRPC client") //Close BulkIndexer diff --git a/relay-server/server/k8sHandler.go b/relay-server/server/k8sHandler.go index da73177..a056ded 100644 --- a/relay-server/server/k8sHandler.go +++ b/relay-server/server/k8sHandler.go @@ -54,6 +54,8 @@ var stdoutlogs = false var stdoutalerts = false var stdoutmsg = false +var enableEsDashboards = os.Getenv("ENABLE_DASHBOARDS") == "true" + // NewK8sHandler Function func NewK8sHandler() *K8sHandler { kh := &K8sHandler{} diff --git a/relay-server/server/relayServer.go b/relay-server/server/relayServer.go index a782a60..cc8227a 100644 --- a/relay-server/server/relayServer.go +++ b/relay-server/server/relayServer.go @@ -297,11 +297,8 @@ type LogClient struct { // logs LogStream pb.LogService_WatchLogsClient - //Informerclient - Ifclient *kif.Client - - // wait group - WgServer sync.WaitGroup + // // wait group + // WgServer sync.WaitGroup Context context.Context } @@ -377,15 +374,6 @@ func NewClient(server string) *LogClient { return nil } - // start informers - Informerclient := kif.InitializeClient() - go kif.StartInformers(Informerclient) - - lc.Ifclient = Informerclient - - // var g errgroup.Group - lc.WgServer = sync.WaitGroup{} - return lc } @@ -523,6 +511,11 @@ func (rs *RelayServer) AddAlertFromBuffChan() { fmt.Printf("%s\n", string(tel)) } AlertLock.RLock() + + if enableEsDashboards { + ESAlertChannel <- (&alert) + } + for uid := range AlertStructs { select { case AlertStructs[uid].Broadcast <- (&alert): @@ -553,36 +546,6 @@ func (lc *LogClient) WatchLogs(wg *sync.WaitGroup, stop chan struct{}, errCh cha default: if res, err = lc.LogStream.Recv(); err != nil { errCh <- fmt.Errorf("failed to receive a log (%s) %s", lc.Server, err.Error()) - - if containsKprobe := strings.Contains(res.Data, "kprobe"); containsKprobe && res.GetOperation() == "Network" { - - resourceMap := kl.Extractdata(res.GetResource()) - remoteIP := resourceMap["remoteip"] - podserviceInfo, found := lc.Ifclient.ClusterIPCache.Get(remoteIP) - - if found { - switch podserviceInfo.Type { - case "POD": - resource := res.GetResource() + fmt.Sprintf(" hostname=%s podname=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.PodName, podserviceInfo.NamespaceName) - data := res.GetData() + fmt.Sprintf(" ownertype=pod") - res.Data = data - - res.Resource = resource - // kg.Printf("logData:%s", res.Data) - break - case "SERVICE": - resource := res.GetResource() + fmt.Sprintf(" hostname=%s servicename=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.ServiceName, podserviceInfo.NamespaceName) - - data := res.GetData() + fmt.Sprintf(" ownertype=service") - res.Data = data - res.Resource = resource - // kg.Printf("logData:%s", res.Data) - - break - } - } - - } } select { @@ -608,10 +571,45 @@ func (rs *RelayServer) AddLogFromBuffChan() { if err := kl.Clone(*res, &log); err != nil { kg.Warnf("Failed to clone a log (%v)", *res) } + + if containsKprobe := strings.Contains(log.Data, "kprobe"); containsKprobe && log.GetOperation() == "Network" { + + resourceMap := kl.Extractdata(log.GetResource()) + remoteIP := resourceMap["remoteip"] + podserviceInfo, found := rs.Ifclient.ClusterIPCache.Get(remoteIP) + + if found { + switch podserviceInfo.Type { + case "POD": + resource := log.GetResource() + fmt.Sprintf(" hostname=%s podname=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.PodName, podserviceInfo.NamespaceName) + data := log.GetData() + fmt.Sprintf(" ownertype=pod") + log.Data = data + + log.Resource = resource + kg.Printf("logData:%s", log.Data) + break + case "SERVICE": + resource := log.GetResource() + fmt.Sprintf(" hostname=%s servicename=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.ServiceName, podserviceInfo.NamespaceName) + + data := log.GetData() + fmt.Sprintf(" ownertype=service") + log.Data = data + log.Resource = resource + kg.Printf("logData:%s", log.Data) + + break + } + } + + } + if stdoutlogs { tel, _ := json.Marshal(log) fmt.Printf("%s\n", string(tel)) } + if enableEsDashboards { + ESLogChannel <- (&log) + } + for uid := range LogStructs { select { case LogStructs[uid].Broadcast <- (&log): @@ -651,17 +649,26 @@ type RelayServer struct { // wait group WgServer sync.WaitGroup + + //Informerclient + Ifclient *kif.Client } // LogBufferChannel store incoming data from log stream in buffer var LogBufferChannel chan *pb.Log +// ESLogChannel send logs to ES adapter +var ESLogChannel chan *pb.Log + // MsgBufferChannel store incoming data from Alert stream in buffer var MsgBufferChannel chan *pb.Message // AlertBufferChannel store incoming data from msg stream in buffer var AlertBufferChannel chan *pb.Alert +// ESAlertChannel send alerts to ES adapter +var ESAlertChannel chan *pb.Alert + // NewRelayServer Function func NewRelayServer(port string) *RelayServer { rs := &RelayServer{} @@ -672,6 +679,11 @@ func NewRelayServer(port string) *RelayServer { AlertBufferChannel = make(chan *pb.Alert, 1000) MsgBufferChannel = make(chan *pb.Message, 100) + if enableEsDashboards { + ESLogChannel = make(chan *pb.Log, 10000) + AlertBufferChannel = make(chan *pb.Alert, 1000) + } + // listen to gRPC port listener, err := net.Listen("tcp", ":"+rs.Port) if err != nil { @@ -717,6 +729,12 @@ func NewRelayServer(port string) *RelayServer { LogStructs = make(map[string]LogStruct) LogLock = &sync.RWMutex{} + // start informers + Informerclient := kif.InitializeClient() + go kif.StartInformers(Informerclient) + + rs.Ifclient = Informerclient + // set wait group rs.WgServer = sync.WaitGroup{}