feat(integrations): changed all structures

This commit is contained in:
Taha Yassine Kraiem 2022-01-28 12:40:50 +01:00
parent 2e227f6584
commit 86eb724f27

View file

@ -8,6 +8,7 @@ import (
"fmt"
elasticlib "github.com/elastic/go-elasticsearch/v7"
"log"
"reflect"
"strconv"
"time"
@ -27,20 +28,9 @@ type elasticsearchLog struct {
Message string
Time time.Time `json:"utc_time"` // Should be parsed automatically from RFC3339
}
//map[string]interface{}
// SearchResult represents the result of the search operation
type elasticResponse struct {
Hits ResultHits `json:"hits"`
ScrollId string `json:"_scroll_id"`
}
// ResultHits represents the result of the search hits
type ResultHits struct {
Hits []struct {
Id string `json:"_id"`
Source json.RawMessage `json:"_source"`
} `json:"hits"`
}
//type elasticResponse struct {
// Hits struct {
// Hits []struct {
@ -51,6 +41,7 @@ type ResultHits struct {
// ScrollId string `json:"_scroll_id"`
//}
func (es *elasticsearch) Request(c *client) error {
address := es.Host + ":" + es.Port.String()
apiKey := b64.StdEncoding.EncodeToString([]byte(es.ApiKeyId + ":" + es.ApiKey))
@ -150,34 +141,56 @@ func (es *elasticsearch) Request(c *client) error {
}
for {
//var mapResp map[string]interface{}
//if err := json.NewDecoder(res.Body).Decode(&mapResp); err != nil {
// log.Fatalf("Error parsing raw response body: %s", err)
//
// // If no error, then convert response to a map[string]interface
//} else {
// log.Println("mapResp TYPE:", reflect.TypeOf(mapResp), "\n")
// log.Println(mapResp)
var esResp map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil {
log.Fatalf("Error parsing raw response body: %s", err)
// If no error, then convert response to a map[string]interface
} else {
log.Println("mapResp TYPE:", reflect.TypeOf(esResp), "\n")
log.Println(esResp)
}
//var esResp elasticResponse
//if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil {
// log.Printf("Error parsing the response body: %s\n", err)
// return fmt.Errorf("Error parsing the response body: %s", err)
//}
var esResp elasticResponse
if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil {
log.Printf("Error parsing the response body: %s\n", err)
return fmt.Errorf("Error parsing the response body: %s", err)
}
log.Printf("parsed response: %v\n", esResp)
if len(esResp.Hits.Hits) == 0 {
if _, ok := esResp["hits"]; ok {
log.Println("Hits not found")
break
}
hits:=esResp["hits"].(map[string]interface{})["hits"].([]interface{})
for _, hit := range hits{
// Parse the attributes/fields of the document
doc := hit.(map[string]interface{})
source := doc["_source"]
for _, hit := range esResp.Hits.Hits {
var esLog elasticsearchLog
if err = json.Unmarshal(hit.Source, &esLog); err != nil {
log.Printf("Error unmarshalling the response source: %s\n", err)
c.errChan <- err
continue
}
esLog=source.(elasticsearchLog)
log.Println("doc _source:", reflect.TypeOf(source))
// Get the document's _id and print it out along with _source data
docID := doc["_id"]
log.Println("docID:", docID)
log.Println("_source:", source, "\n")
//}
////if len(esResp.Hits.Hits) == 0 {
//if len(hits) == 0 {
// break
//}
//
//for _, hit := range esResp.Hits.Hits {
// var esLog elasticsearchLog
//if err = json.Unmarshal(hit.Source, &esLog); err != nil {
//if err = json.Unmarshal(source, &esLog); err != nil {
// log.Printf("Error unmarshalling the response source: %s\n", err)
// c.errChan <- err
// continue
//}
token, err := GetToken(esLog.Message)
if err != nil {
log.Printf("Error generating token: %s\n", err)
@ -197,15 +210,22 @@ func (es *elasticsearch) Request(c *client) error {
RawErrorEvent: &messages.RawErrorEvent{
Source: "elasticsearch",
Timestamp: timestamp,
Name: hit.Id, // sure?
Payload: string(hit.Source),
//Name: hit.Id, // sure?
Name: fmt.Sprintf("%v", docID), // sure?
//Payload: string(hit.Source),
Payload: fmt.Sprintf("%v", source),
},
}
}
if _, ok := esResp["_scroll_id"]; ok {
log.Println("_scroll_id not found")
break
}
scrollId := esResp["_scroll_id"]
res, err = esC.Scroll(
esC.Scroll.WithContext(context.Background()),
esC.Scroll.WithScrollID(esResp.ScrollId),
//esC.Scroll.WithScrollID(esResp.ScrollId),
esC.Scroll.WithScrollID(fmt.Sprintf("%v", scrollId)),
esC.Scroll.WithScroll(time.Minute*2),
)
if err != nil {