From a1280d7ba234698cec60f10f472e93a21c102c48 Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Thu, 27 Jan 2022 16:12:43 +0100 Subject: [PATCH] feat(integrations): fixed elasticsearch auth --- .../integrations/integration/elasticsearch.go | 236 +++++++++--------- 1 file changed, 118 insertions(+), 118 deletions(-) diff --git a/backend/services/integrations/integration/elasticsearch.go b/backend/services/integrations/integration/elasticsearch.go index 66389f7b9..f66b50b6c 100644 --- a/backend/services/integrations/integration/elasticsearch.go +++ b/backend/services/integrations/integration/elasticsearch.go @@ -1,30 +1,30 @@ package integration import ( - elasticlib "github.com/elastic/go-elasticsearch/v7" + "bytes" "context" - "time" "encoding/json" "fmt" - "bytes" + elasticlib "github.com/elastic/go-elasticsearch/v7" "strconv" + "time" - "openreplay/backend/pkg/utime" + b64 "encoding/base64" "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/utime" ) - type elasticsearch struct { - Host string - Port json.Number - ApiKeyId string //`json:"api_key_id"` - ApiKey string //`json:"api_key"` - Indexes string + Host string + Port json.Number + ApiKeyId string //`json:"api_key_id"` + ApiKey string //`json:"api_key"` + Indexes string } type elasticsearchLog struct { Message string - Time time.Time `json:"utc_time"` // Should be parsed automatically from RFC3339 + Time time.Time `json:"utc_time"` // Should be parsed automatically from RFC3339 } type elasticResponce struct { @@ -33,22 +33,22 @@ type elasticResponce struct { // Value int //} Hits []struct { - Id string `json:"_id"` + Id string `json:"_id"` Source json.RawMessage `json:"_source"` } } ScrollId string `json:"_scroll_id"` } - -func (es *elasticsearch) Request(c* client) error { +func (es *elasticsearch) Request(c *client) error { address := es.Host + ":" + es.Port.String() cfg := elasticlib.Config{ - Addresses: []string{ - address, - }, - Username: es.ApiKeyId, - Password: es.ApiKey, + Addresses: []string{ + address, + }, + //Username: es.ApiKeyId, + //Password: es.ApiKey, + APIKey: b64.StdEncoding.EncodeToString([]byte(es.ApiKeyId + ":" + es.ApiKey)), } esC, err := elasticlib.NewClient(cfg) @@ -57,88 +57,88 @@ func (es *elasticsearch) Request(c* client) error { } // TODO: ping/versions/ client host check - // res0, err := esC.Info() + // res0, err := esC.Info() // if err != nil { - // log.Printf("ELASTIC Error getting info: %s", err) - // } - // defer res0.Body.Close() - // // Check response status - // if res0.IsError() { - // log.Printf("ELASTIC Error: %s", res0.String()) - // } - // log.Printf("ELASTIC Info: %v ", res0.String()) + // log.Printf("ELASTIC Error getting info: %s", err) + // } + // defer res0.Body.Close() + // // Check response status + // if res0.IsError() { + // log.Printf("ELASTIC Error: %s", res0.String()) + // } + // log.Printf("ELASTIC Info: %v ", res0.String()) gteTs := c.getLastMessageTimestamp() + 1000 // Sec or millisec to add ? var buf bytes.Buffer - query := map[string]interface{}{ - "query": map[string]interface{}{ - "bool": map[string]interface{}{ - "filter": []map[string]interface{}{ - map[string]interface{}{ - "match": map[string]interface{} { - "message": map[string]interface{}{ - "query": "openReplaySessionToken=", // asayer_session_id= - }, - }, - }, - map[string]interface{}{ - "range": map[string]interface{} { - "utc_time": map[string]interface{}{ - "gte": strconv.FormatUint(gteTs, 10), - "lte": "now", - }, - }, - }, - map[string]interface{}{ - "term": map[string]interface{}{ - "tags": "error", - }, - }, - }, - }, - }, - } - if err := json.NewEncoder(&buf).Encode(query); err != nil { - return fmt.Errorf("Error encoding the query: %s", err) - } + query := map[string]interface{}{ + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + "filter": []map[string]interface{}{ + map[string]interface{}{ + "match": map[string]interface{}{ + "message": map[string]interface{}{ + "query": "openReplaySessionToken=", // asayer_session_id= + }, + }, + }, + map[string]interface{}{ + "range": map[string]interface{}{ + "utc_time": map[string]interface{}{ + "gte": strconv.FormatUint(gteTs, 10), + "lte": "now", + }, + }, + }, + map[string]interface{}{ + "term": map[string]interface{}{ + "tags": "error", + }, + }, + }, + }, + }, + } + if err := json.NewEncoder(&buf).Encode(query); err != nil { + return fmt.Errorf("Error encoding the query: %s", err) + } res, err := esC.Search( - esC.Search.WithContext(context.Background()), - esC.Search.WithIndex(es.Indexes), - esC.Search.WithSize(1000), - esC.Search.WithScroll(time.Minute * 2), - esC.Search.WithBody(&buf), - esC.Search.WithSort("timestamp:asc"), - ) - if err != nil { - return fmt.Errorf("Error getting response: %s", err) - } - defer res.Body.Close() - if res.IsError() { - var e map[string]interface{} - if err := json.NewDecoder(res.Body).Decode(&e); err != nil { - return fmt.Errorf("Error parsing the response body: %v", err) - } else { - return fmt.Errorf("Elasticsearch [%s] %s: %s", - res.Status(), - e["error"],//.(map[string]interface{})["type"], - e["error"],//.(map[string]interface{})["reason"], - ) - } - } + esC.Search.WithContext(context.Background()), + esC.Search.WithIndex(es.Indexes), + esC.Search.WithSize(1000), + esC.Search.WithScroll(time.Minute*2), + esC.Search.WithBody(&buf), + esC.Search.WithSort("timestamp:asc"), + ) + if err != nil { + return fmt.Errorf("Error getting response: %s", err) + } + defer res.Body.Close() + if res.IsError() { + var e map[string]interface{} + if err := json.NewDecoder(res.Body).Decode(&e); err != nil { + return fmt.Errorf("Error parsing the response body: %v", err) + } else { + return fmt.Errorf("Elasticsearch [%s] %s: %s", + res.Status(), + e["error"], //.(map[string]interface{})["type"], + e["error"], //.(map[string]interface{})["reason"], + ) + } + } - for { - var esResp elasticResponce - if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil { - return fmt.Errorf("Error parsing the response body: %s", err) - } - if len(esResp.Hits.Hits) == 0 { - break - } + for { + var esResp elasticResponce + if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil { + return fmt.Errorf("Error parsing the response body: %s", err) + } + if len(esResp.Hits.Hits) == 0 { + break + } - for _, hit := range esResp.Hits.Hits { - var esLog elasticsearchLog + for _, hit := range esResp.Hits.Hits { + var esLog elasticsearchLog if err = json.Unmarshal(hit.Source, &esLog); err != nil { c.errChan <- err continue @@ -159,35 +159,35 @@ func (es *elasticsearch) Request(c* client) error { //SessionID: sessionID, Token: token, RawErrorEvent: &messages.RawErrorEvent{ - Source: "elasticsearch", + Source: "elasticsearch", Timestamp: timestamp, - Name: hit.Id, // sure? - Payload: string(hit.Source), + Name: hit.Id, // sure? + Payload: string(hit.Source), }, } - } + } - res, err = esC.Scroll( - esC.Scroll.WithContext(context.Background()), - esC.Scroll.WithScrollID(esResp.ScrollId), - esC.Scroll.WithScroll(time.Minute * 2), - ) - if err != nil { - return fmt.Errorf("Error getting scroll response: %s", err) - } - defer res.Body.Close() - if res.IsError() { - var e map[string]interface{} - if err := json.NewDecoder(res.Body).Decode(&e); err != nil { - return fmt.Errorf("Error parsing the response body: %v", err) - } else { - return fmt.Errorf("Elasticsearch [%s] %s: %s", - res.Status(), - e["error"],//.(map[string]interface{})["type"], - e["error"],//.(map[string]interface{})["reason"], - ) - } - } + res, err = esC.Scroll( + esC.Scroll.WithContext(context.Background()), + esC.Scroll.WithScrollID(esResp.ScrollId), + esC.Scroll.WithScroll(time.Minute*2), + ) + if err != nil { + return fmt.Errorf("Error getting scroll response: %s", err) + } + defer res.Body.Close() + if res.IsError() { + var e map[string]interface{} + if err := json.NewDecoder(res.Body).Decode(&e); err != nil { + return fmt.Errorf("Error parsing the response body: %v", err) + } else { + return fmt.Errorf("Elasticsearch [%s] %s: %s", + res.Status(), + e["error"], //.(map[string]interface{})["type"], + e["error"], //.(map[string]interface{})["reason"], + ) + } + } } return nil -} \ No newline at end of file +}