feat(integrations): fixed elasticsearch auth

This commit is contained in:
Taha Yassine Kraiem 2022-01-27 16:12:43 +01:00
parent fea33ca3a1
commit a1280d7ba2

View file

@ -1,30 +1,30 @@
package integration
import (
elasticlib "github.com/elastic/go-elasticsearch/v7"
"bytes"
"context"
"time"
"encoding/json"
"fmt"
"bytes"
elasticlib "github.com/elastic/go-elasticsearch/v7"
"strconv"
"time"
"openreplay/backend/pkg/utime"
b64 "encoding/base64"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/utime"
)
type elasticsearch struct {
Host string
Port json.Number
ApiKeyId string //`json:"api_key_id"`
ApiKey string //`json:"api_key"`
Indexes string
Host string
Port json.Number
ApiKeyId string //`json:"api_key_id"`
ApiKey string //`json:"api_key"`
Indexes string
}
type elasticsearchLog struct {
Message string
Time time.Time `json:"utc_time"` // Should be parsed automatically from RFC3339
Time time.Time `json:"utc_time"` // Should be parsed automatically from RFC3339
}
type elasticResponce struct {
@ -33,22 +33,22 @@ type elasticResponce struct {
// Value int
//}
Hits []struct {
Id string `json:"_id"`
Id string `json:"_id"`
Source json.RawMessage `json:"_source"`
}
}
ScrollId string `json:"_scroll_id"`
}
func (es *elasticsearch) Request(c* client) error {
func (es *elasticsearch) Request(c *client) error {
address := es.Host + ":" + es.Port.String()
cfg := elasticlib.Config{
Addresses: []string{
address,
},
Username: es.ApiKeyId,
Password: es.ApiKey,
Addresses: []string{
address,
},
//Username: es.ApiKeyId,
//Password: es.ApiKey,
APIKey: b64.StdEncoding.EncodeToString([]byte(es.ApiKeyId + ":" + es.ApiKey)),
}
esC, err := elasticlib.NewClient(cfg)
@ -57,88 +57,88 @@ func (es *elasticsearch) Request(c* client) error {
}
// TODO: ping/versions/ client host check
// res0, err := esC.Info()
// res0, err := esC.Info()
// if err != nil {
// log.Printf("ELASTIC Error getting info: %s", err)
// }
// defer res0.Body.Close()
// // Check response status
// if res0.IsError() {
// log.Printf("ELASTIC Error: %s", res0.String())
// }
// log.Printf("ELASTIC Info: %v ", res0.String())
// log.Printf("ELASTIC Error getting info: %s", err)
// }
// defer res0.Body.Close()
// // Check response status
// if res0.IsError() {
// log.Printf("ELASTIC Error: %s", res0.String())
// }
// log.Printf("ELASTIC Info: %v ", res0.String())
gteTs := c.getLastMessageTimestamp() + 1000 // Sec or millisec to add ?
var buf bytes.Buffer
query := map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"filter": []map[string]interface{}{
map[string]interface{}{
"match": map[string]interface{} {
"message": map[string]interface{}{
"query": "openReplaySessionToken=", // asayer_session_id=
},
},
},
map[string]interface{}{
"range": map[string]interface{} {
"utc_time": map[string]interface{}{
"gte": strconv.FormatUint(gteTs, 10),
"lte": "now",
},
},
},
map[string]interface{}{
"term": map[string]interface{}{
"tags": "error",
},
},
},
},
},
}
if err := json.NewEncoder(&buf).Encode(query); err != nil {
return fmt.Errorf("Error encoding the query: %s", err)
}
query := map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"filter": []map[string]interface{}{
map[string]interface{}{
"match": map[string]interface{}{
"message": map[string]interface{}{
"query": "openReplaySessionToken=", // asayer_session_id=
},
},
},
map[string]interface{}{
"range": map[string]interface{}{
"utc_time": map[string]interface{}{
"gte": strconv.FormatUint(gteTs, 10),
"lte": "now",
},
},
},
map[string]interface{}{
"term": map[string]interface{}{
"tags": "error",
},
},
},
},
},
}
if err := json.NewEncoder(&buf).Encode(query); err != nil {
return fmt.Errorf("Error encoding the query: %s", err)
}
res, err := esC.Search(
esC.Search.WithContext(context.Background()),
esC.Search.WithIndex(es.Indexes),
esC.Search.WithSize(1000),
esC.Search.WithScroll(time.Minute * 2),
esC.Search.WithBody(&buf),
esC.Search.WithSort("timestamp:asc"),
)
if err != nil {
return fmt.Errorf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
return fmt.Errorf("Error parsing the response body: %v", err)
} else {
return fmt.Errorf("Elasticsearch [%s] %s: %s",
res.Status(),
e["error"],//.(map[string]interface{})["type"],
e["error"],//.(map[string]interface{})["reason"],
)
}
}
esC.Search.WithContext(context.Background()),
esC.Search.WithIndex(es.Indexes),
esC.Search.WithSize(1000),
esC.Search.WithScroll(time.Minute*2),
esC.Search.WithBody(&buf),
esC.Search.WithSort("timestamp:asc"),
)
if err != nil {
return fmt.Errorf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
return fmt.Errorf("Error parsing the response body: %v", err)
} else {
return fmt.Errorf("Elasticsearch [%s] %s: %s",
res.Status(),
e["error"], //.(map[string]interface{})["type"],
e["error"], //.(map[string]interface{})["reason"],
)
}
}
for {
var esResp elasticResponce
if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil {
return fmt.Errorf("Error parsing the response body: %s", err)
}
if len(esResp.Hits.Hits) == 0 {
break
}
for {
var esResp elasticResponce
if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil {
return fmt.Errorf("Error parsing the response body: %s", err)
}
if len(esResp.Hits.Hits) == 0 {
break
}
for _, hit := range esResp.Hits.Hits {
var esLog elasticsearchLog
for _, hit := range esResp.Hits.Hits {
var esLog elasticsearchLog
if err = json.Unmarshal(hit.Source, &esLog); err != nil {
c.errChan <- err
continue
@ -159,35 +159,35 @@ func (es *elasticsearch) Request(c* client) error {
//SessionID: sessionID,
Token: token,
RawErrorEvent: &messages.RawErrorEvent{
Source: "elasticsearch",
Source: "elasticsearch",
Timestamp: timestamp,
Name: hit.Id, // sure?
Payload: string(hit.Source),
Name: hit.Id, // sure?
Payload: string(hit.Source),
},
}
}
}
res, err = esC.Scroll(
esC.Scroll.WithContext(context.Background()),
esC.Scroll.WithScrollID(esResp.ScrollId),
esC.Scroll.WithScroll(time.Minute * 2),
)
if err != nil {
return fmt.Errorf("Error getting scroll response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
return fmt.Errorf("Error parsing the response body: %v", err)
} else {
return fmt.Errorf("Elasticsearch [%s] %s: %s",
res.Status(),
e["error"],//.(map[string]interface{})["type"],
e["error"],//.(map[string]interface{})["reason"],
)
}
}
res, err = esC.Scroll(
esC.Scroll.WithContext(context.Background()),
esC.Scroll.WithScrollID(esResp.ScrollId),
esC.Scroll.WithScroll(time.Minute*2),
)
if err != nil {
return fmt.Errorf("Error getting scroll response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
return fmt.Errorf("Error parsing the response body: %v", err)
} else {
return fmt.Errorf("Elasticsearch [%s] %s: %s",
res.Status(),
e["error"], //.(map[string]interface{})["type"],
e["error"], //.(map[string]interface{})["reason"],
)
}
}
}
return nil
}
}