feat(backend): added es connector

This commit is contained in:
Alexander 2024-04-09 14:58:47 +02:00
parent 5421aedfe6
commit bdb3f37c56
5 changed files with 188 additions and 5 deletions

View file

@ -1,6 +1,8 @@
module openreplay/backend
go 1.20
go 1.21
toolchain go1.21.1
require (
cloud.google.com/go/logging v1.7.0
@ -14,6 +16,7 @@ require (
github.com/confluentinc/confluent-kafka-go/v2 v2.2.0
github.com/docker/distribution v2.8.1+incompatible
github.com/elastic/go-elasticsearch/v7 v7.13.1
github.com/elastic/go-elasticsearch/v8 v8.13.0
github.com/go-redis/redis v6.15.9+incompatible
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
@ -45,9 +48,12 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/internal v1.2.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/elastic/elastic-transport-go/v8 v8.5.0 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.11.0 // indirect
@ -67,14 +73,15 @@ require (
github.com/shopspring/decimal v1.3.1 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/oauth2 v0.10.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect

View file

@ -58,3 +58,14 @@ type Clickhouse struct {
UserName string `env:"CLICKHOUSE_USERNAME,default=default"`
Password string `env:"CLICKHOUSE_PASSWORD,default="`
}
// ElasticSearch config
type ElasticSearch struct {
URLs string `env:"ELASTICSEARCH_URLS"`
UseAWS bool `env:"ELASTICSEARCH_IN_AWS,default=false"`
}
func (cfg *ElasticSearch) GetURLs() []string {
return strings.Split(cfg.URLs, ",")
}

View file

@ -15,6 +15,7 @@ type Config struct {
redis.Redis
common.Redshift
common.Clickhouse
common.ElasticSearch
objectstorage.ObjectsConfig
ConnectorType string `env:"CONNECTOR_TYPE,default=redshift"`
SessionsTableName string `env:"SESSIONS_TABLE_NAME,default=connector_user_sessions"`

View file

@ -0,0 +1,160 @@
package connector
import (
"bytes"
"context"
"encoding/json"
"io"
"log"
"net/http"
"time"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
v4 "github.com/aws/aws-sdk-go/aws/signer/v4"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v8/esapi"
"openreplay/backend/internal/config/connector"
"openreplay/backend/pkg/logger"
)
const (
sessionsIndex = "sessions"
eventsIndex = "events"
)
type awsSignerTransport struct {
HTTPClient *http.Client
AWSSession *session.Session
AWSService string
AWSRegion string
Credentials *credentials.Credentials
signer *v4.Signer
}
func (a *awsSignerTransport) RoundTrip(req *http.Request) (*http.Response, error) {
var (
err error
body io.ReadSeeker = nil
)
if req.Body != nil {
bodyBytes, err := io.ReadAll(req.Body)
if err != nil {
log.Println("Error reading request body: ", err)
return nil, err
}
req.Body.Close()
body = bytes.NewReader(bodyBytes)
}
_, err = a.signer.Sign(req, body, a.AWSService, a.AWSRegion, time.Now())
if err != nil {
return nil, err
}
return a.HTTPClient.Do(req)
}
type ElasticSearch struct {
log logger.Logger
cfg *connector.Config
conn *elasticsearch.Client
}
func NewElasticSearch(log logger.Logger, cfg *connector.Config) (*ElasticSearch, error) {
esConf := elasticsearch.Config{
Addresses: cfg.GetURLs(),
}
if cfg.UseAWS {
sess, err := session.NewSession()
if err != nil {
return nil, err
}
esConf.Transport = &awsSignerTransport{
HTTPClient: &http.Client{},
AWSSession: sess,
AWSService: "es",
AWSRegion: cfg.AWSRegion,
Credentials: sess.Config.Credentials,
signer: v4.NewSigner(sess.Config.Credentials),
}
}
es, err := elasticsearch.NewClient(esConf)
if err != nil {
return nil, err
}
// Test the connection to the Elasticsearch server with the Info() API
res, err := es.Info()
if err != nil {
return nil, err
}
defer res.Body.Close()
return &ElasticSearch{
log: log,
cfg: cfg,
conn: es,
}, nil
}
func (e *ElasticSearch) InsertEvents(batch []map[string]string) error {
e.log.Info(context.Background(), "Inserting events into ElasticSearch, events: %d", len(batch))
var buf bytes.Buffer
noIndexStr := `{"index":{}}\n`
for _, doc := range batch {
data, _ := json.Marshal(doc)
buf.Grow(len(noIndexStr) + len(data) + 1)
buf.WriteString(noIndexStr)
buf.Write(data)
buf.WriteByte('\n')
}
return e.doRequest(eventsIndex, buf)
}
func (e *ElasticSearch) InsertSessions(batch []map[string]string) error {
e.log.Info(context.Background(), "Inserting sessions into ElasticSearch, sessions: %d", len(batch))
var buf bytes.Buffer
for _, doc := range batch {
meta := map[string]interface{}{
"index": map[string]interface{}{
"_index": sessionsIndex,
"_id": doc["sessionid"],
},
}
metadata, _ := json.Marshal(meta)
data, _ := json.Marshal(doc)
buf.Grow(len(metadata) + len(data) + 2)
buf.Write(metadata)
buf.WriteByte('\n')
buf.Write(data)
buf.WriteByte('\n')
}
return e.doRequest(sessionsIndex, buf)
}
func (e *ElasticSearch) doRequest(index string, data bytes.Buffer) error {
req := esapi.BulkRequest{
Index: index,
Body: &data,
Refresh: "false",
}
res, err := req.Do(context.Background(), e.conn)
if err != nil || res.IsError() {
if err != nil {
e.log.Fatal(context.Background(), "Error getting response: %s", err)
} else {
e.log.Fatal(context.Background(), "Error indexing batch: %s", res.String())
}
}
res.Body.Close()
return nil
}
func (e *ElasticSearch) Close() error {
return nil
}

View file

@ -38,6 +38,10 @@ func main() {
if db, err = saver.NewClickHouse(log, cfg); err != nil {
log.Fatal(ctx, "can't init clickhouse connection: %s", err)
}
case "elasticsearch":
if db, err = saver.NewElasticSearch(log, cfg); err != nil {
log.Fatal(ctx, "can't init elasticsearch connection: %s", err)
}
default:
log.Fatal(ctx, "unknown connector type: %s", cfg.ConnectorType)
}