diff --git a/backend/go.mod b/backend/go.mod index c4de88211..0592b68d4 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -1,6 +1,8 @@ module openreplay/backend -go 1.20 +go 1.21 + +toolchain go1.21.1 require ( cloud.google.com/go/logging v1.7.0 @@ -14,6 +16,7 @@ require ( github.com/confluentinc/confluent-kafka-go/v2 v2.2.0 github.com/docker/distribution v2.8.1+incompatible github.com/elastic/go-elasticsearch/v7 v7.13.1 + github.com/elastic/go-elasticsearch/v8 v8.13.0 github.com/go-redis/redis v6.15.9+incompatible github.com/google/uuid v1.3.0 github.com/gorilla/mux v1.8.0 @@ -45,9 +48,12 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/internal v1.2.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/elastic/elastic-transport-go/v8 v8.5.0 // indirect + github.com/go-logr/logr v1.3.0 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/google/go-cmp v0.5.9 // indirect + github.com/google/go-cmp v0.6.0 // indirect github.com/google/s2a-go v0.1.4 // indirect github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect github.com/googleapis/gax-go/v2 v2.11.0 // indirect @@ -67,14 +73,15 @@ require ( github.com/shopspring/decimal v1.3.1 // indirect github.com/sirupsen/logrus v1.8.1 // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/otel v1.7.0 // indirect - go.opentelemetry.io/otel/trace v1.7.0 // indirect + go.opentelemetry.io/otel v1.21.0 // indirect + go.opentelemetry.io/otel/metric v1.21.0 // indirect + go.opentelemetry.io/otel/trace v1.21.0 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/crypto v0.14.0 // indirect golang.org/x/oauth2 v0.10.0 // indirect golang.org/x/sync v0.3.0 // indirect - golang.org/x/sys v0.13.0 // indirect + golang.org/x/sys v0.14.0 // indirect golang.org/x/text v0.13.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/appengine v1.6.7 // indirect diff --git a/backend/internal/config/common/config.go b/backend/internal/config/common/config.go index 5fd98f292..460c46789 100644 --- a/backend/internal/config/common/config.go +++ b/backend/internal/config/common/config.go @@ -58,3 +58,14 @@ type Clickhouse struct { UserName string `env:"CLICKHOUSE_USERNAME,default=default"` Password string `env:"CLICKHOUSE_PASSWORD,default="` } + +// ElasticSearch config + +type ElasticSearch struct { + URLs string `env:"ELASTICSEARCH_URLS"` + UseAWS bool `env:"ELASTICSEARCH_IN_AWS,default=false"` +} + +func (cfg *ElasticSearch) GetURLs() []string { + return strings.Split(cfg.URLs, ",") +} diff --git a/backend/internal/config/connector/config.go b/backend/internal/config/connector/config.go index b31030fa5..e90463e12 100644 --- a/backend/internal/config/connector/config.go +++ b/backend/internal/config/connector/config.go @@ -15,6 +15,7 @@ type Config struct { redis.Redis common.Redshift common.Clickhouse + common.ElasticSearch objectstorage.ObjectsConfig ConnectorType string `env:"CONNECTOR_TYPE,default=redshift"` SessionsTableName string `env:"SESSIONS_TABLE_NAME,default=connector_user_sessions"` diff --git a/backend/pkg/connector/elasticsearch.go b/backend/pkg/connector/elasticsearch.go new file mode 100644 index 000000000..d19d83d34 --- /dev/null +++ b/backend/pkg/connector/elasticsearch.go @@ -0,0 +1,160 @@ +package connector + +import ( + "bytes" + "context" + "encoding/json" + "io" + "log" + "net/http" + "time" + + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + v4 "github.com/aws/aws-sdk-go/aws/signer/v4" + "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v8/esapi" + + "openreplay/backend/internal/config/connector" + "openreplay/backend/pkg/logger" +) + +const ( + sessionsIndex = "sessions" + eventsIndex = "events" +) + +type awsSignerTransport struct { + HTTPClient *http.Client + AWSSession *session.Session + AWSService string + AWSRegion string + Credentials *credentials.Credentials + signer *v4.Signer +} + +func (a *awsSignerTransport) RoundTrip(req *http.Request) (*http.Response, error) { + var ( + err error + body io.ReadSeeker = nil + ) + + if req.Body != nil { + bodyBytes, err := io.ReadAll(req.Body) + if err != nil { + log.Println("Error reading request body: ", err) + return nil, err + } + req.Body.Close() + body = bytes.NewReader(bodyBytes) + } + + _, err = a.signer.Sign(req, body, a.AWSService, a.AWSRegion, time.Now()) + if err != nil { + return nil, err + } + return a.HTTPClient.Do(req) +} + +type ElasticSearch struct { + log logger.Logger + cfg *connector.Config + conn *elasticsearch.Client +} + +func NewElasticSearch(log logger.Logger, cfg *connector.Config) (*ElasticSearch, error) { + esConf := elasticsearch.Config{ + Addresses: cfg.GetURLs(), + } + if cfg.UseAWS { + sess, err := session.NewSession() + if err != nil { + return nil, err + } + esConf.Transport = &awsSignerTransport{ + HTTPClient: &http.Client{}, + AWSSession: sess, + AWSService: "es", + AWSRegion: cfg.AWSRegion, + Credentials: sess.Config.Credentials, + signer: v4.NewSigner(sess.Config.Credentials), + } + } + es, err := elasticsearch.NewClient(esConf) + if err != nil { + return nil, err + } + + // Test the connection to the Elasticsearch server with the Info() API + res, err := es.Info() + if err != nil { + return nil, err + } + defer res.Body.Close() + + return &ElasticSearch{ + log: log, + cfg: cfg, + conn: es, + }, nil +} + +func (e *ElasticSearch) InsertEvents(batch []map[string]string) error { + e.log.Info(context.Background(), "Inserting events into ElasticSearch, events: %d", len(batch)) + var buf bytes.Buffer + noIndexStr := `{"index":{}}\n` + + for _, doc := range batch { + data, _ := json.Marshal(doc) + buf.Grow(len(noIndexStr) + len(data) + 1) + buf.WriteString(noIndexStr) + buf.Write(data) + buf.WriteByte('\n') + } + return e.doRequest(eventsIndex, buf) +} + +func (e *ElasticSearch) InsertSessions(batch []map[string]string) error { + e.log.Info(context.Background(), "Inserting sessions into ElasticSearch, sessions: %d", len(batch)) + var buf bytes.Buffer + + for _, doc := range batch { + meta := map[string]interface{}{ + "index": map[string]interface{}{ + "_index": sessionsIndex, + "_id": doc["sessionid"], + }, + } + + metadata, _ := json.Marshal(meta) + data, _ := json.Marshal(doc) + buf.Grow(len(metadata) + len(data) + 2) + buf.Write(metadata) + buf.WriteByte('\n') + buf.Write(data) + buf.WriteByte('\n') + } + return e.doRequest(sessionsIndex, buf) +} + +func (e *ElasticSearch) doRequest(index string, data bytes.Buffer) error { + req := esapi.BulkRequest{ + Index: index, + Body: &data, + Refresh: "false", + } + res, err := req.Do(context.Background(), e.conn) + if err != nil || res.IsError() { + if err != nil { + e.log.Fatal(context.Background(), "Error getting response: %s", err) + } else { + e.log.Fatal(context.Background(), "Error indexing batch: %s", res.String()) + } + } + res.Body.Close() + return nil +} + +func (e *ElasticSearch) Close() error { + return nil +} diff --git a/ee/backend/cmd/connector/main.go b/ee/backend/cmd/connector/main.go index 72e878023..c0886d69f 100644 --- a/ee/backend/cmd/connector/main.go +++ b/ee/backend/cmd/connector/main.go @@ -38,6 +38,10 @@ func main() { if db, err = saver.NewClickHouse(log, cfg); err != nil { log.Fatal(ctx, "can't init clickhouse connection: %s", err) } + case "elasticsearch": + if db, err = saver.NewElasticSearch(log, cfg); err != nil { + log.Fatal(ctx, "can't init elasticsearch connection: %s", err) + } default: log.Fatal(ctx, "unknown connector type: %s", cfg.ConnectorType) }