openreplay/backend/pkg/connector/elasticsearch.go
2024-04-09 14:58:47 +02:00

160 lines
3.6 KiB
Go

package connector
import (
"bytes"
"context"
"encoding/json"
"io"
"log"
"net/http"
"time"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
v4 "github.com/aws/aws-sdk-go/aws/signer/v4"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v8/esapi"
"openreplay/backend/internal/config/connector"
"openreplay/backend/pkg/logger"
)
const (
sessionsIndex = "sessions"
eventsIndex = "events"
)
type awsSignerTransport struct {
HTTPClient *http.Client
AWSSession *session.Session
AWSService string
AWSRegion string
Credentials *credentials.Credentials
signer *v4.Signer
}
func (a *awsSignerTransport) RoundTrip(req *http.Request) (*http.Response, error) {
var (
err error
body io.ReadSeeker = nil
)
if req.Body != nil {
bodyBytes, err := io.ReadAll(req.Body)
if err != nil {
log.Println("Error reading request body: ", err)
return nil, err
}
req.Body.Close()
body = bytes.NewReader(bodyBytes)
}
_, err = a.signer.Sign(req, body, a.AWSService, a.AWSRegion, time.Now())
if err != nil {
return nil, err
}
return a.HTTPClient.Do(req)
}
type ElasticSearch struct {
log logger.Logger
cfg *connector.Config
conn *elasticsearch.Client
}
func NewElasticSearch(log logger.Logger, cfg *connector.Config) (*ElasticSearch, error) {
esConf := elasticsearch.Config{
Addresses: cfg.GetURLs(),
}
if cfg.UseAWS {
sess, err := session.NewSession()
if err != nil {
return nil, err
}
esConf.Transport = &awsSignerTransport{
HTTPClient: &http.Client{},
AWSSession: sess,
AWSService: "es",
AWSRegion: cfg.AWSRegion,
Credentials: sess.Config.Credentials,
signer: v4.NewSigner(sess.Config.Credentials),
}
}
es, err := elasticsearch.NewClient(esConf)
if err != nil {
return nil, err
}
// Test the connection to the Elasticsearch server with the Info() API
res, err := es.Info()
if err != nil {
return nil, err
}
defer res.Body.Close()
return &ElasticSearch{
log: log,
cfg: cfg,
conn: es,
}, nil
}
func (e *ElasticSearch) InsertEvents(batch []map[string]string) error {
e.log.Info(context.Background(), "Inserting events into ElasticSearch, events: %d", len(batch))
var buf bytes.Buffer
noIndexStr := `{"index":{}}\n`
for _, doc := range batch {
data, _ := json.Marshal(doc)
buf.Grow(len(noIndexStr) + len(data) + 1)
buf.WriteString(noIndexStr)
buf.Write(data)
buf.WriteByte('\n')
}
return e.doRequest(eventsIndex, buf)
}
func (e *ElasticSearch) InsertSessions(batch []map[string]string) error {
e.log.Info(context.Background(), "Inserting sessions into ElasticSearch, sessions: %d", len(batch))
var buf bytes.Buffer
for _, doc := range batch {
meta := map[string]interface{}{
"index": map[string]interface{}{
"_index": sessionsIndex,
"_id": doc["sessionid"],
},
}
metadata, _ := json.Marshal(meta)
data, _ := json.Marshal(doc)
buf.Grow(len(metadata) + len(data) + 2)
buf.Write(metadata)
buf.WriteByte('\n')
buf.Write(data)
buf.WriteByte('\n')
}
return e.doRequest(sessionsIndex, buf)
}
func (e *ElasticSearch) doRequest(index string, data bytes.Buffer) error {
req := esapi.BulkRequest{
Index: index,
Body: &data,
Refresh: "false",
}
res, err := req.Do(context.Background(), e.conn)
if err != nil || res.IsError() {
if err != nil {
e.log.Fatal(context.Background(), "Error getting response: %s", err)
} else {
e.log.Fatal(context.Background(), "Error indexing batch: %s", res.String())
}
}
res.Body.Close()
return nil
}
func (e *ElasticSearch) Close() error {
return nil
}