feat(integrations): fixed response parsing

This commit is contained in:
Taha Yassine Kraiem 2022-01-28 15:05:29 +01:00
parent 2167853fc9
commit 93be4b4ff1

View file

@ -8,7 +8,6 @@ import (
"fmt"
elasticlib "github.com/elastic/go-elasticsearch/v7"
"log"
"reflect"
"strconv"
"time"
@ -29,19 +28,6 @@ type elasticsearchLog struct {
Time time.Time `json:"utc_time"` // Should be parsed automatically from RFC3339
}
//map[string]interface{}
// SearchResult represents the result of the search operation
//type elasticResponse struct {
// Hits struct {
// Hits []struct {
// Id string `json:"_id"`
// Source json.RawMessage `json:"_source"`
// } `json:"hits"`
// } `json:"hits"`
// ScrollId string `json:"_scroll_id"`
//}
func (es *elasticsearch) Request(c *client) error {
address := es.Host + ":" + es.Port.String()
apiKey := b64.StdEncoding.EncodeToString([]byte(es.ApiKeyId + ":" + es.ApiKey))
@ -60,17 +46,6 @@ func (es *elasticsearch) Request(c *client) error {
log.Println(err)
return err
}
// TODO: ping/versions/ client host check
// res0, err := esC.Info()
// if err != nil {
// log.Printf("ELASTIC Error getting info: %s", err)
// }
// defer res0.Body.Close()
// // Check response status
// if res0.IsError() {
// log.Printf("ELASTIC Error: %s", res0.String())
// }
// log.Printf("ELASTIC Info: %v ", res0.String())
gteTs := c.getLastMessageTimestamp() + 1000 // Sec or millisec to add ?
log.Printf("gteTs: %v ", gteTs)
@ -90,7 +65,6 @@ func (es *elasticsearch) Request(c *client) error {
"range": map[string]interface{}{
"utc_time": map[string]interface{}{
"gte": strconv.FormatUint(gteTs, 10),
//"gte": gteTs,
"lte": "now",
},
},
@ -129,13 +103,13 @@ func (es *elasticsearch) Request(c *client) error {
} else {
log.Printf("Elasticsearch Error [%s] %s: %s\n",
res.Status(),
e["error"], //.(map[string]interface{})["type"],
e["error"], //.(map[string]interface{})["reason"],
e["error"],
e["error"],
)
return fmt.Errorf("Elasticsearch Error [%s] %s: %s",
res.Status(),
e["error"], //.(map[string]interface{})["type"],
e["error"], //.(map[string]interface{})["reason"],
e["error"],
e["error"],
)
}
}
@ -143,65 +117,52 @@ func (es *elasticsearch) Request(c *client) error {
for {
var esResp map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil {
log.Fatalf("Error parsing raw response body: %s", err)
return fmt.Errorf("Error parsing the response body: %s", err)
// If no error, then convert response to a map[string]interface
} else {
log.Println("mapResp TYPE:", reflect.TypeOf(esResp), "\n")
log.Println(esResp)
}
//var esResp elasticResponse
//if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil {
// log.Printf("Error parsing the response body: %s\n", err)
// return fmt.Errorf("Error parsing the response body: %s", err)
//}
log.Printf("parsed response: %v\n", esResp)
if _, ok := esResp["hits"]; !ok {
log.Println("Hits not found")
log.Printf("Hits not found in \n%v\n", esResp)
break
}
hits:=esResp["hits"].(map[string]interface{})["hits"].([]interface{})
for _, hit := range hits{
hits := esResp["hits"].(map[string]interface{})["hits"].([]interface{})
if len(hits) == 0 {
log.Println("No hits found")
break
}
for _, hit := range hits {
// Parse the attributes/fields of the document
doc := hit.(map[string]interface{})
source := doc["_source"]
source := doc["_source"].(map[string]interface{})
var esLog elasticsearchLog
esLog=source.(elasticsearchLog)
log.Println("doc _source:", reflect.TypeOf(source))
if _, ok := esResp["message"]; !ok {
log.Printf("message not found in doc \n%v\n", doc)
c.errChan <- fmt.Errorf("message not found in doc '%v' ", doc)
continue
}
// Get the document's _id and print it out along with _source data
if _, ok := esResp["utc_time"]; !ok {
log.Printf("utc_time not found in doc \n%v\n", doc)
c.errChan <- fmt.Errorf("utc_time not found in doc '%v' ", doc)
continue
}
parsedTime, err := time.Parse(time.RFC3339, source["utc_time"].(string))
if err != nil {
log.Println("cannot parse time")
c.errChan <- fmt.Errorf("cannot parse RFC3339 time of doc '%v' ", doc)
continue
}
esLog := elasticsearchLog{Message: source["message"].(string), Time: parsedTime}
docID := doc["_id"]
log.Println("docID:", docID)
log.Println("_source:", source, "\n")
//}
////if len(esResp.Hits.Hits) == 0 {
//if len(hits) == 0 {
// break
//}
//
//for _, hit := range esResp.Hits.Hits {
// var esLog elasticsearchLog
//if err = json.Unmarshal(hit.Source, &esLog); err != nil {
//if err = json.Unmarshal(source, &esLog); err != nil {
// log.Printf("Error unmarshalling the response source: %s\n", err)
// c.errChan <- err
// continue
//}
token, err := GetToken(esLog.Message)
if err != nil {
log.Printf("Error generating token: %s\n", err)
c.errChan <- err
continue
}
//parsedTime, err := time.Parse(time.RFC3339, esLog.Timestamp)
//if err != nil {
// c.errChan <- err
// continue
//}
timestamp := uint64(utime.ToMilliseconds(esLog.Time))
c.setLastMessageTimestamp(timestamp)
c.evChan <- &SessionErrorEvent{
@ -210,10 +171,8 @@ func (es *elasticsearch) Request(c *client) error {
RawErrorEvent: &messages.RawErrorEvent{
Source: "elasticsearch",
Timestamp: timestamp,
//Name: hit.Id, // sure?
Name: fmt.Sprintf("%v", docID), // sure?
//Payload: string(hit.Source),
Payload: fmt.Sprintf("%v", source),
Name: fmt.Sprintf("%v", docID),
Payload: fmt.Sprintf("%v", source),
},
}
}
@ -221,10 +180,10 @@ func (es *elasticsearch) Request(c *client) error {
log.Println("_scroll_id not found")
break
}
log.Println("Scrolling")
scrollId := esResp["_scroll_id"]
res, err = esC.Scroll(
esC.Scroll.WithContext(context.Background()),
//esC.Scroll.WithScrollID(esResp.ScrollId),
esC.Scroll.WithScrollID(fmt.Sprintf("%v", scrollId)),
esC.Scroll.WithScroll(time.Minute*2),
)