diff --git a/backend/services/integrations/integration/elasticsearch.go b/backend/services/integrations/integration/elasticsearch.go index 915ecfc1e..f19b28563 100644 --- a/backend/services/integrations/integration/elasticsearch.go +++ b/backend/services/integrations/integration/elasticsearch.go @@ -8,7 +8,6 @@ import ( "fmt" elasticlib "github.com/elastic/go-elasticsearch/v7" "log" - "reflect" "strconv" "time" @@ -29,19 +28,6 @@ type elasticsearchLog struct { Time time.Time `json:"utc_time"` // Should be parsed automatically from RFC3339 } -//map[string]interface{} -// SearchResult represents the result of the search operation -//type elasticResponse struct { -// Hits struct { -// Hits []struct { -// Id string `json:"_id"` -// Source json.RawMessage `json:"_source"` -// } `json:"hits"` -// } `json:"hits"` -// ScrollId string `json:"_scroll_id"` -//} - - func (es *elasticsearch) Request(c *client) error { address := es.Host + ":" + es.Port.String() apiKey := b64.StdEncoding.EncodeToString([]byte(es.ApiKeyId + ":" + es.ApiKey)) @@ -60,17 +46,6 @@ func (es *elasticsearch) Request(c *client) error { log.Println(err) return err } - // TODO: ping/versions/ client host check - // res0, err := esC.Info() - // if err != nil { - // log.Printf("ELASTIC Error getting info: %s", err) - // } - // defer res0.Body.Close() - // // Check response status - // if res0.IsError() { - // log.Printf("ELASTIC Error: %s", res0.String()) - // } - // log.Printf("ELASTIC Info: %v ", res0.String()) gteTs := c.getLastMessageTimestamp() + 1000 // Sec or millisec to add ? log.Printf("gteTs: %v ", gteTs) @@ -90,7 +65,6 @@ func (es *elasticsearch) Request(c *client) error { "range": map[string]interface{}{ "utc_time": map[string]interface{}{ "gte": strconv.FormatUint(gteTs, 10), - //"gte": gteTs, "lte": "now", }, }, @@ -129,13 +103,13 @@ func (es *elasticsearch) Request(c *client) error { } else { log.Printf("Elasticsearch Error [%s] %s: %s\n", res.Status(), - e["error"], //.(map[string]interface{})["type"], - e["error"], //.(map[string]interface{})["reason"], + e["error"], + e["error"], ) return fmt.Errorf("Elasticsearch Error [%s] %s: %s", res.Status(), - e["error"], //.(map[string]interface{})["type"], - e["error"], //.(map[string]interface{})["reason"], + e["error"], + e["error"], ) } } @@ -143,65 +117,52 @@ func (es *elasticsearch) Request(c *client) error { for { var esResp map[string]interface{} if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil { - log.Fatalf("Error parsing raw response body: %s", err) - + return fmt.Errorf("Error parsing the response body: %s", err) // If no error, then convert response to a map[string]interface - } else { - log.Println("mapResp TYPE:", reflect.TypeOf(esResp), "\n") - log.Println(esResp) } - //var esResp elasticResponse - //if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil { - // log.Printf("Error parsing the response body: %s\n", err) - // return fmt.Errorf("Error parsing the response body: %s", err) - //} - - log.Printf("parsed response: %v\n", esResp) if _, ok := esResp["hits"]; !ok { - log.Println("Hits not found") + log.Printf("Hits not found in \n%v\n", esResp) break } - hits:=esResp["hits"].(map[string]interface{})["hits"].([]interface{}) - for _, hit := range hits{ + hits := esResp["hits"].(map[string]interface{})["hits"].([]interface{}) + if len(hits) == 0 { + log.Println("No hits found") + break + } + for _, hit := range hits { // Parse the attributes/fields of the document doc := hit.(map[string]interface{}) - source := doc["_source"] + source := doc["_source"].(map[string]interface{}) - var esLog elasticsearchLog - esLog=source.(elasticsearchLog) - log.Println("doc _source:", reflect.TypeOf(source)) + if _, ok := esResp["message"]; !ok { + log.Printf("message not found in doc \n%v\n", doc) + c.errChan <- fmt.Errorf("message not found in doc '%v' ", doc) + continue + } - // Get the document's _id and print it out along with _source data + if _, ok := esResp["utc_time"]; !ok { + log.Printf("utc_time not found in doc \n%v\n", doc) + c.errChan <- fmt.Errorf("utc_time not found in doc '%v' ", doc) + continue + } + + parsedTime, err := time.Parse(time.RFC3339, source["utc_time"].(string)) + if err != nil { + log.Println("cannot parse time") + c.errChan <- fmt.Errorf("cannot parse RFC3339 time of doc '%v' ", doc) + continue + } + esLog := elasticsearchLog{Message: source["message"].(string), Time: parsedTime} docID := doc["_id"] - log.Println("docID:", docID) - log.Println("_source:", source, "\n") - //} - ////if len(esResp.Hits.Hits) == 0 { - //if len(hits) == 0 { - // break - //} - // - //for _, hit := range esResp.Hits.Hits { - // var esLog elasticsearchLog - //if err = json.Unmarshal(hit.Source, &esLog); err != nil { - //if err = json.Unmarshal(source, &esLog); err != nil { - // log.Printf("Error unmarshalling the response source: %s\n", err) - // c.errChan <- err - // continue - //} + token, err := GetToken(esLog.Message) if err != nil { log.Printf("Error generating token: %s\n", err) c.errChan <- err continue } - //parsedTime, err := time.Parse(time.RFC3339, esLog.Timestamp) - //if err != nil { - // c.errChan <- err - // continue - //} timestamp := uint64(utime.ToMilliseconds(esLog.Time)) c.setLastMessageTimestamp(timestamp) c.evChan <- &SessionErrorEvent{ @@ -210,10 +171,8 @@ func (es *elasticsearch) Request(c *client) error { RawErrorEvent: &messages.RawErrorEvent{ Source: "elasticsearch", Timestamp: timestamp, - //Name: hit.Id, // sure? - Name: fmt.Sprintf("%v", docID), // sure? - //Payload: string(hit.Source), - Payload: fmt.Sprintf("%v", source), + Name: fmt.Sprintf("%v", docID), + Payload: fmt.Sprintf("%v", source), }, } } @@ -221,10 +180,10 @@ func (es *elasticsearch) Request(c *client) error { log.Println("_scroll_id not found") break } + log.Println("Scrolling") scrollId := esResp["_scroll_id"] res, err = esC.Scroll( esC.Scroll.WithContext(context.Background()), - //esC.Scroll.WithScrollID(esResp.ScrollId), esC.Scroll.WithScrollID(fmt.Sprintf("%v", scrollId)), esC.Scroll.WithScroll(time.Minute*2), )