From 86eb724f2768fb9743555074a260f21354ab87d3 Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Fri, 28 Jan 2022 12:40:50 +0100 Subject: [PATCH] feat(integrations): changed all structures --- .../integrations/integration/elasticsearch.go | 94 +++++++++++-------- 1 file changed, 57 insertions(+), 37 deletions(-) diff --git a/backend/services/integrations/integration/elasticsearch.go b/backend/services/integrations/integration/elasticsearch.go index 51201ffdb..3cc64cc77 100644 --- a/backend/services/integrations/integration/elasticsearch.go +++ b/backend/services/integrations/integration/elasticsearch.go @@ -8,6 +8,7 @@ import ( "fmt" elasticlib "github.com/elastic/go-elasticsearch/v7" "log" + "reflect" "strconv" "time" @@ -27,20 +28,9 @@ type elasticsearchLog struct { Message string Time time.Time `json:"utc_time"` // Should be parsed automatically from RFC3339 } + //map[string]interface{} // SearchResult represents the result of the search operation -type elasticResponse struct { - Hits ResultHits `json:"hits"` - ScrollId string `json:"_scroll_id"` -} - -// ResultHits represents the result of the search hits -type ResultHits struct { - Hits []struct { - Id string `json:"_id"` - Source json.RawMessage `json:"_source"` - } `json:"hits"` -} //type elasticResponse struct { // Hits struct { // Hits []struct { @@ -51,6 +41,7 @@ type ResultHits struct { // ScrollId string `json:"_scroll_id"` //} + func (es *elasticsearch) Request(c *client) error { address := es.Host + ":" + es.Port.String() apiKey := b64.StdEncoding.EncodeToString([]byte(es.ApiKeyId + ":" + es.ApiKey)) @@ -150,34 +141,56 @@ func (es *elasticsearch) Request(c *client) error { } for { - //var mapResp map[string]interface{} - //if err := json.NewDecoder(res.Body).Decode(&mapResp); err != nil { - // log.Fatalf("Error parsing raw response body: %s", err) - // - // // If no error, then convert response to a map[string]interface - //} else { - // log.Println("mapResp TYPE:", reflect.TypeOf(mapResp), "\n") - // log.Println(mapResp) + var esResp map[string]interface{} + if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil { + log.Fatalf("Error parsing raw response body: %s", err) + + // If no error, then convert response to a map[string]interface + } else { + log.Println("mapResp TYPE:", reflect.TypeOf(esResp), "\n") + log.Println(esResp) + } + + //var esResp elasticResponse + //if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil { + // log.Printf("Error parsing the response body: %s\n", err) + // return fmt.Errorf("Error parsing the response body: %s", err) //} - var esResp elasticResponse - if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil { - log.Printf("Error parsing the response body: %s\n", err) - return fmt.Errorf("Error parsing the response body: %s", err) - } - log.Printf("parsed response: %v\n", esResp) - if len(esResp.Hits.Hits) == 0 { + if _, ok := esResp["hits"]; ok { + log.Println("Hits not found") break } + hits:=esResp["hits"].(map[string]interface{})["hits"].([]interface{}) + for _, hit := range hits{ + + // Parse the attributes/fields of the document + doc := hit.(map[string]interface{}) + source := doc["_source"] - for _, hit := range esResp.Hits.Hits { var esLog elasticsearchLog - if err = json.Unmarshal(hit.Source, &esLog); err != nil { - log.Printf("Error unmarshalling the response source: %s\n", err) - c.errChan <- err - continue - } + esLog=source.(elasticsearchLog) + log.Println("doc _source:", reflect.TypeOf(source)) + + // Get the document's _id and print it out along with _source data + docID := doc["_id"] + log.Println("docID:", docID) + log.Println("_source:", source, "\n") + //} + ////if len(esResp.Hits.Hits) == 0 { + //if len(hits) == 0 { + // break + //} + // + //for _, hit := range esResp.Hits.Hits { + // var esLog elasticsearchLog + //if err = json.Unmarshal(hit.Source, &esLog); err != nil { + //if err = json.Unmarshal(source, &esLog); err != nil { + // log.Printf("Error unmarshalling the response source: %s\n", err) + // c.errChan <- err + // continue + //} token, err := GetToken(esLog.Message) if err != nil { log.Printf("Error generating token: %s\n", err) @@ -197,15 +210,22 @@ func (es *elasticsearch) Request(c *client) error { RawErrorEvent: &messages.RawErrorEvent{ Source: "elasticsearch", Timestamp: timestamp, - Name: hit.Id, // sure? - Payload: string(hit.Source), + //Name: hit.Id, // sure? + Name: fmt.Sprintf("%v", docID), // sure? + //Payload: string(hit.Source), + Payload: fmt.Sprintf("%v", source), }, } } - + if _, ok := esResp["_scroll_id"]; ok { + log.Println("_scroll_id not found") + break + } + scrollId := esResp["_scroll_id"] res, err = esC.Scroll( esC.Scroll.WithContext(context.Background()), - esC.Scroll.WithScrollID(esResp.ScrollId), + //esC.Scroll.WithScrollID(esResp.ScrollId), + esC.Scroll.WithScrollID(fmt.Sprintf("%v", scrollId)), esC.Scroll.WithScroll(time.Minute*2), ) if err != nil {