From 22a3dc9f8e05b09f60760f2b83fa61da5743e148 Mon Sep 17 00:00:00 2001 From: Alexander Date: Mon, 22 Apr 2024 13:34:23 +0200 Subject: [PATCH] feat(backend): patched redshift connector (#2117) --- backend/internal/config/common/config.go | 14 ++ .../internal/config/objectstorage/config.go | 1 + backend/pkg/logger/logger.go | 73 +++++++++ ee/backend/cmd/connector/main.go | 47 +++--- ee/backend/pkg/connector/clickhouse.go | 82 +++++----- ee/backend/pkg/connector/model.go | 53 ++++++- ee/backend/pkg/connector/redshift.go | 46 +++--- ee/backend/pkg/connector/saver.go | 148 ++++++++++++++---- 8 files changed, 347 insertions(+), 117 deletions(-) create mode 100644 backend/pkg/logger/logger.go diff --git a/backend/internal/config/common/config.go b/backend/internal/config/common/config.go index 5fd98f292..0dee014d4 100644 --- a/backend/internal/config/common/config.go +++ b/backend/internal/config/common/config.go @@ -48,6 +48,7 @@ type Redshift struct { User string `env:"REDSHIFT_USER"` Password string `env:"REDSHIFT_PASSWORD"` Database string `env:"REDSHIFT_DATABASE"` + Bucket string `env:"REDSHIFT_BUCKET,default=rdshftbucket"` } // Clickhouse config @@ -58,3 +59,16 @@ type Clickhouse struct { UserName string `env:"CLICKHOUSE_USERNAME,default=default"` Password string `env:"CLICKHOUSE_PASSWORD,default="` } + +// ElasticSearch config + +type ElasticSearch struct { + URLs string `env:"ELASTICSEARCH_URLS"` + UseAWS bool `env:"ELASTICSEARCH_IN_AWS,default=false"` + User string `env:"ELASTICSEARCH_USER"` + Password string `env:"ELASTICSEARCH_PASSWORD"` +} + +func (cfg *ElasticSearch) GetURLs() []string { + return strings.Split(cfg.URLs, ",") +} diff --git a/backend/internal/config/objectstorage/config.go b/backend/internal/config/objectstorage/config.go index a1ca6568d..7ffab5f70 100644 --- a/backend/internal/config/objectstorage/config.go +++ b/backend/internal/config/objectstorage/config.go @@ -14,6 +14,7 @@ type ObjectsConfig struct { AzureAccountName string `env:"AZURE_ACCOUNT_NAME"` AzureAccountKey string `env:"AZURE_ACCOUNT_KEY"` UseS3Tags bool `env:"USE_S3_TAGS,default=true"` + AWSIAMRole string `env:"AWS_IAM_ROLE"` } func (c *ObjectsConfig) UseFileTags() bool { diff --git a/backend/pkg/logger/logger.go b/backend/pkg/logger/logger.go new file mode 100644 index 000000000..d30d7a42c --- /dev/null +++ b/backend/pkg/logger/logger.go @@ -0,0 +1,73 @@ +package logger + +import ( + "context" + "fmt" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "os" +) + +type Logger interface { + Debug(ctx context.Context, message string, args ...interface{}) + Info(ctx context.Context, message string, args ...interface{}) + Warn(ctx context.Context, message string, args ...interface{}) + Error(ctx context.Context, message string, args ...interface{}) + Fatal(ctx context.Context, message string, args ...interface{}) +} + +type loggerImpl struct { + l *zap.Logger +} + +func New() Logger { + encoderConfig := zap.NewProductionEncoderConfig() + encoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05.000") + jsonEncoder := zapcore.NewJSONEncoder(encoderConfig) + core := zapcore.NewCore(jsonEncoder, zapcore.AddSync(os.Stdout), zap.InfoLevel) + baseLogger := zap.New(core, zap.AddCaller()) + logger := baseLogger.WithOptions(zap.AddCallerSkip(1)) + return &loggerImpl{l: logger} +} + +func (l *loggerImpl) prepare(ctx context.Context, logger *zap.Logger) *zap.Logger { + if sID, ok := ctx.Value("sessionID").(string); ok { + logger = logger.With(zap.String("sessionID", sID)) + } + if pID, ok := ctx.Value("projectID").(string); ok { + logger = logger.With(zap.String("projectID", pID)) + } + if tVer, ok := ctx.Value("tracker").(string); ok { + logger = logger.With(zap.String("tracker", tVer)) + } + if httpMethod, ok := ctx.Value("httpMethod").(string); ok { + logger = logger.With(zap.String("httpMethod", httpMethod)) + } + if urlPath, ok := ctx.Value("url").(string); ok { + logger = logger.With(zap.String("url", urlPath)) + } + if batch, ok := ctx.Value("batch").(string); ok { + logger = logger.With(zap.String("batch", batch)) + } + return logger +} + +func (l *loggerImpl) Debug(ctx context.Context, message string, args ...interface{}) { + l.prepare(ctx, l.l.With(zap.String("level", "debug"))).Debug(fmt.Sprintf(message, args...)) +} + +func (l *loggerImpl) Info(ctx context.Context, message string, args ...interface{}) { + l.prepare(ctx, l.l.With(zap.String("level", "info"))).Info(fmt.Sprintf(message, args...)) +} + +func (l *loggerImpl) Warn(ctx context.Context, message string, args ...interface{}) { + l.prepare(ctx, l.l.With(zap.String("level", "warn"))).Warn(fmt.Sprintf(message, args...)) +} + +func (l *loggerImpl) Error(ctx context.Context, message string, args ...interface{}) { + l.prepare(ctx, l.l.With(zap.String("level", "error"))).Error(fmt.Sprintf(message, args...)) +} + +func (l *loggerImpl) Fatal(ctx context.Context, message string, args ...interface{}) { + l.prepare(ctx, l.l.With(zap.String("level", "fatal"))).Fatal(fmt.Sprintf(message, args...)) +} diff --git a/ee/backend/cmd/connector/main.go b/ee/backend/cmd/connector/main.go index c854a6fba..8710b9579 100644 --- a/ee/backend/cmd/connector/main.go +++ b/ee/backend/cmd/connector/main.go @@ -1,59 +1,59 @@ package main import ( - "log" - "openreplay/backend/pkg/db/postgres/pool" - "openreplay/backend/pkg/db/redis" - "openreplay/backend/pkg/projects" - "openreplay/backend/pkg/sessions" + "context" config "openreplay/backend/internal/config/connector" "openreplay/backend/internal/connector" saver "openreplay/backend/pkg/connector" + "openreplay/backend/pkg/db/postgres/pool" + "openreplay/backend/pkg/db/redis" + "openreplay/backend/pkg/logger" "openreplay/backend/pkg/memory" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/objectstorage/store" + "openreplay/backend/pkg/projects" "openreplay/backend/pkg/queue" + "openreplay/backend/pkg/sessions" "openreplay/backend/pkg/terminator" ) func main() { - log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) - + ctx := context.Background() + log := logger.New() cfg := config.New() objStore, err := store.NewStore(&cfg.ObjectsConfig) if err != nil { - log.Fatalf("can't init object storage: %s", err) + log.Fatal(ctx, "can't init object storage: %s", err) } var db saver.Database switch cfg.ConnectorType { case "redshift": - if db, err = saver.NewRedshift(cfg, objStore); err != nil { - log.Fatalf("can't init redshift connection: %s", err) + if db, err = saver.NewRedshift(log, cfg, objStore); err != nil { + log.Fatal(ctx, "can't init redshift connection: %s", err) } case "clickhouse": - if db, err = saver.NewClickHouse(cfg); err != nil { - log.Fatalf("can't init clickhouse connection: %s", err) + if db, err = saver.NewClickHouse(log, cfg); err != nil { + log.Fatal(ctx, "can't init clickhouse connection: %s", err) } default: - log.Fatalf("unknown connector type: %s", cfg.ConnectorType) + log.Fatal(ctx, "unknown connector type: %s", cfg.ConnectorType) } defer db.Close() // Init postgres connection pgConn, err := pool.New(cfg.Postgres.String()) if err != nil { - log.Printf("can't init postgres connection: %s", err) - return + log.Fatal(ctx, "can't init postgres connection: %s", err) } defer pgConn.Close() // Init redis connection redisClient, err := redis.New(&cfg.Redis) if err != nil { - log.Printf("can't init redis connection: %s", err) + log.Warn(ctx, "can't init redis connection: %s", err) } defer redisClient.Close() @@ -61,7 +61,7 @@ func main() { sessManager := sessions.New(pgConn, projManager, redisClient) // Saves messages to Redshift - dataSaver := saver.New(cfg, db, sessManager, projManager) + dataSaver := saver.New(log, cfg, db, sessManager, projManager) // Message filter msgFilter := []int{messages.MsgConsoleLog, messages.MsgCustomEvent, messages.MsgJSException, @@ -69,7 +69,13 @@ func main() { messages.MsgSessionStart, messages.MsgSessionEnd, messages.MsgConnectionInformation, messages.MsgMetadata, messages.MsgPageEvent, messages.MsgPerformanceTrackAggr, messages.MsgUserID, messages.MsgUserAnonymousID, messages.MsgJSException, messages.MsgJSExceptionDeprecated, - messages.MsgInputEvent, messages.MsgMouseClick, messages.MsgIssueEventDeprecated} + messages.MsgInputEvent, messages.MsgMouseClick, messages.MsgIssueEventDeprecated, + // Mobile messages + messages.MsgIOSSessionStart, messages.MsgIOSSessionEnd, messages.MsgIOSUserID, messages.MsgIOSUserAnonymousID, + messages.MsgIOSMetadata, messages.MsgIOSEvent, messages.MsgIOSNetworkCall, + messages.MsgIOSClickEvent, messages.MsgIOSSwipeEvent, messages.MsgIOSInputEvent, + messages.MsgIOSCrash, messages.MsgIOSIssueEvent, + } // Init consumer consumer := queue.NewConsumer( @@ -86,12 +92,11 @@ func main() { // Init memory manager memoryManager, err := memory.NewManager(cfg.MemoryLimitMB, cfg.MaxMemoryUsage) if err != nil { - log.Printf("can't init memory manager: %s", err) - return + log.Fatal(ctx, "can't init memory manager: %s", err) } // Run service and wait for TERM signal service := connector.New(cfg, consumer, dataSaver, memoryManager) - log.Printf("Connector service started\n") + log.Info(ctx, "Connector service started") terminator.Wait(service) } diff --git a/ee/backend/pkg/connector/clickhouse.go b/ee/backend/pkg/connector/clickhouse.go index f5a4c286c..283bc2fbf 100644 --- a/ee/backend/pkg/connector/clickhouse.go +++ b/ee/backend/pkg/connector/clickhouse.go @@ -2,22 +2,23 @@ package connector import ( "context" - "log" "strconv" "strings" "time" - "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "openreplay/backend/internal/config/connector" + "openreplay/backend/pkg/logger" ) type ClickHouse struct { + log logger.Logger cfg *connector.Config conn driver.Conn } -func NewClickHouse(cfg *connector.Config) (*ClickHouse, error) { +func NewClickHouse(log logger.Logger, cfg *connector.Config) (*ClickHouse, error) { url := cfg.Clickhouse.URL url = strings.TrimPrefix(url, "tcp://") url = strings.TrimSuffix(url, "/default") @@ -43,6 +44,7 @@ func NewClickHouse(cfg *connector.Config) (*ClickHouse, error) { return nil, err } c := &ClickHouse{ + log: log, cfg: cfg, conn: conn, } @@ -57,8 +59,10 @@ func (c *ClickHouse) InsertEvents(batch []map[string]string) error { return err } for _, event := range batch { + ctx := context.Background() + ctx = context.WithValue(ctx, "sessionID", c.Uint64(ctx, event["sessionid"])) if err := bulk.Append( - Uint64(event["sessionid"]), + c.Uint64(ctx, event["sessionid"]), nullableString(event["consolelog_level"]), nullableString(event["consolelog_value"]), nullableString(event["customevent_name"]), @@ -72,11 +76,11 @@ func (c *ClickHouse) InsertEvents(batch []map[string]string) error { nullableString(event["networkrequest_url"]), nullableString(event["networkrequest_request"]), nullableString(event["networkrequest_response"]), - nullableUint64(event["networkrequest_status"]), - nullableUint64(event["networkrequest_timestamp"]), - nullableUint64(event["networkrequest_duration"]), + c.nullableUint64(ctx, event["networkrequest_status"]), + c.nullableUint64(ctx, event["networkrequest_timestamp"]), + c.nullableUint64(ctx, event["networkrequest_duration"]), nullableString(event["issueevent_message_id"]), - nullableUint64(event["issueevent_timestamp"]), + c.nullableUint64(ctx, event["issueevent_timestamp"]), nullableString(event["issueevent_type"]), nullableString(event["issueevent_context_string"]), nullableString(event["issueevent_context"]), @@ -84,10 +88,10 @@ func (c *ClickHouse) InsertEvents(batch []map[string]string) error { nullableString(event["issueevent_url"]), nullableString(event["customissue_name"]), nullableString(event["customissue_payload"]), - nullableUint64(event["received_at"]), - nullableUint64(event["batch_order_number"]), + c.nullableUint64(ctx, event["received_at"]), + c.nullableUint64(ctx, event["batch_order_number"]), ); err != nil { - log.Printf("can't append value set to batch, err: %s", err) + c.log.Error(ctx, "can't append value set to batch, err: ", err) } } return bulk.Send() @@ -101,42 +105,44 @@ func (c *ClickHouse) InsertSessions(batch []map[string]string) error { return err } for _, sess := range batch { + ctx := context.Background() + ctx = context.WithValue(ctx, "sessionID", c.Uint64(ctx, sess["sessionid"])) if err := bulk.Append( - Uint64(sess["sessionid"]), + c.Uint64(ctx, sess["sessionid"]), nullableString(sess["user_agent"]), nullableString(sess["user_browser"]), nullableString(sess["user_browser_version"]), nullableString(sess["user_country"]), nullableString(sess["user_device"]), - nullableUint64(sess["user_device_heap_size"]), - nullableUint64(sess["user_device_memory_size"]), + c.nullableUint64(ctx, sess["user_device_heap_size"]), + c.nullableUint64(ctx, sess["user_device_memory_size"]), nullableString(sess["user_device_type"]), nullableString(sess["user_os"]), nullableString(sess["user_os_version"]), nullableString(sess["user_uuid"]), - nullableUint64(sess["connection_effective_bandwidth"]), + c.nullableUint64(ctx, sess["connection_effective_bandwidth"]), nullableString(sess["connection_type"]), nullableString(sess["referrer"]), nullableString(sess["user_anonymous_id"]), nullableString(sess["user_id"]), - nullableUint64(sess["session_start_timestamp"]), - nullableUint64(sess["session_end_timestamp"]), - nullableUint64(sess["session_duration"]), - nullableUint64(sess["first_contentful_paint"]), - nullableUint64(sess["speed_index"]), - nullableUint64(sess["visually_complete"]), - nullableUint64(sess["timing_time_to_interactive"]), - nullableUint64(sess["avg_cpu"]), - nullableUint64(sess["avg_fps"]), - nullableUint64(sess["max_cpu"]), - nullableUint64(sess["max_fps"]), - nullableUint64(sess["max_total_js_heap_size"]), - nullableUint64(sess["max_used_js_heap_size"]), - nullableUint64(sess["js_exceptions_count"]), - nullableUint64(sess["inputs_count"]), - nullableUint64(sess["clicks_count"]), - nullableUint64(sess["issues_count"]), - nullableUint64(sess["pages_count"]), + c.nullableUint64(ctx, sess["session_start_timestamp"]), + c.nullableUint64(ctx, sess["session_end_timestamp"]), + c.nullableUint64(ctx, sess["session_duration"]), + c.nullableUint64(ctx, sess["first_contentful_paint"]), + c.nullableUint64(ctx, sess["speed_index"]), + c.nullableUint64(ctx, sess["visually_complete"]), + c.nullableUint64(ctx, sess["timing_time_to_interactive"]), + c.nullableUint64(ctx, sess["avg_cpu"]), + c.nullableUint64(ctx, sess["avg_fps"]), + c.nullableUint64(ctx, sess["max_cpu"]), + c.nullableUint64(ctx, sess["max_fps"]), + c.nullableUint64(ctx, sess["max_total_js_heap_size"]), + c.nullableUint64(ctx, sess["max_used_js_heap_size"]), + c.nullableUint64(ctx, sess["js_exceptions_count"]), + c.nullableUint64(ctx, sess["inputs_count"]), + c.nullableUint64(ctx, sess["clicks_count"]), + c.nullableUint64(ctx, sess["issues_count"]), + c.nullableUint64(ctx, sess["pages_count"]), nullableString(sess["metadata_1"]), nullableString(sess["metadata_2"]), nullableString(sess["metadata_3"]), @@ -148,7 +154,7 @@ func (c *ClickHouse) InsertSessions(batch []map[string]string) error { nullableString(sess["metadata_9"]), nullableString(sess["metadata_10"]), ); err != nil { - log.Printf("can't append value set to batch, err: %s", err) + c.log.Error(ctx, "can't append value set to batch, err: ", err) } } return bulk.Send() @@ -158,13 +164,13 @@ func (c *ClickHouse) Close() error { return c.conn.Close() } -func Uint64(v string) uint64 { +func (c *ClickHouse) Uint64(ctx context.Context, v string) uint64 { if v == "" { return 0 } res, err := strconv.Atoi(v) if err != nil { - log.Printf("can't convert string to uint64, err: %s", err) + c.log.Error(ctx, "can't convert string to uint64, err: %s", err) return 0 } return uint64(res) @@ -178,12 +184,12 @@ func nullableString(v string) *string { return p } -func nullableUint64(v string) *uint64 { +func (c *ClickHouse) nullableUint64(ctx context.Context, v string) *uint64 { var p *uint64 = nil if v != "" { res, err := strconv.Atoi(v) if err != nil { - log.Printf("can't convert string to uint64, err: %s", err) + c.log.Error(ctx, "can't convert string to uint64, err: %s", err) return nil } a := uint64(res) diff --git a/ee/backend/pkg/connector/model.go b/ee/backend/pkg/connector/model.go index 0bbbaa5af..eff9d8fac 100644 --- a/ee/backend/pkg/connector/model.go +++ b/ee/backend/pkg/connector/model.go @@ -4,10 +4,11 @@ import "strconv" var sessionColumns = []string{ "sessionid", - "user_agent", "user_browser", "user_browser_version", "user_country", + "user_city", + "user_state", "user_device", "user_device_heap_size", "user_device_memory_size", @@ -20,6 +21,8 @@ var sessionColumns = []string{ "referrer", "user_anonymous_id", "user_id", + "tracker_version", + "rev_id", "session_start_timestamp", "session_end_timestamp", "session_duration", @@ -98,8 +101,52 @@ var eventColumns = []string{ "issueevent_url", "customissue_name", "customissue_payload", - "received_at", - "batch_order_number", + "mobile_event_name", + "mobile_event_payload", + "mobile_networkcall_type", + "mobile_networkcall_method", + "mobile_networkcall_url", + "mobile_networkcall_request", + "mobile_networkcall_response", + "mobile_networkcall_status", + "mobile_networkcall_timestamp", + "mobile_networkcall_duration", + "mobile_clickevent_x", + "mobile_clickevent_y", + "mobile_clickevent_timestamp", + "mobile_clickevent_label", + "mobile_swipeevent_x", + "mobile_swipeevent_y", + "mobile_swipeevent_timestamp", + "mobile_swipeevent_label", + "mobile_inputevent_label", + "mobile_inputevent_value", + "mobile_crash_name", + "mobile_crash_reason", + "mobile_crash_stacktrace", + "mobile_issueevent_timestamp", + "mobile_issueevent_type", + "mobile_issueevent_context_string", + "mobile_issueevent_context", + "mobile_issueevent_payload", +} + +var eventInts = []string{ + "networkrequest_status", + "networkrequest_timestamp", + "networkrequest_duration", + "issueevent_message_id", + "issueevent_timestamp", + "mobile_networkcall_status", + "mobile_networkcall_timestamp", + "mobile_networkcall_duration", + "mobile_clickevent_x", + "mobile_clickevent_y", + "mobile_clickevent_timestamp", + "mobile_swipeevent_x", + "mobile_swipeevent_y", + "mobile_swipeevent_timestamp", + "mobile_issueevent_timestamp", } func QUOTES(s string) string { diff --git a/ee/backend/pkg/connector/redshift.go b/ee/backend/pkg/connector/redshift.go index 54aba0037..96f628a3c 100644 --- a/ee/backend/pkg/connector/redshift.go +++ b/ee/backend/pkg/connector/redshift.go @@ -5,23 +5,24 @@ import ( "context" "database/sql" "fmt" + "github.com/google/uuid" - "log" - "openreplay/backend/pkg/objectstorage" + _ "github.com/lib/pq" "openreplay/backend/internal/config/connector" - - _ "github.com/lib/pq" + "openreplay/backend/pkg/logger" + "openreplay/backend/pkg/objectstorage" ) type Redshift struct { + log logger.Logger cfg *connector.Config ctx context.Context db *sql.DB objStorage objectstorage.ObjectStorage } -func NewRedshift(cfg *connector.Config, objStorage objectstorage.ObjectStorage) (*Redshift, error) { +func NewRedshift(log logger.Logger, cfg *connector.Config, objStorage objectstorage.ObjectStorage) (*Redshift, error) { var source string if cfg.ConnectionString != "" { source = cfg.ConnectionString @@ -29,7 +30,7 @@ func NewRedshift(cfg *connector.Config, objStorage objectstorage.ObjectStorage) source = fmt.Sprintf("postgres://%s:%s@%s:%d/%s", cfg.Redshift.User, cfg.Redshift.Password, cfg.Redshift.Host, cfg.Redshift.Port, cfg.Redshift.Database) } - log.Println("Connecting to Redshift Source: ", source) + log.Info(context.Background(), "Connecting to Redshift Source: ", source) sqldb, err := sql.Open("postgres", source) if err != nil { return nil, err @@ -38,6 +39,7 @@ func NewRedshift(cfg *connector.Config, objStorage objectstorage.ObjectStorage) return nil, err } return &Redshift{ + log: log, cfg: cfg, ctx: context.Background(), db: sqldb, @@ -73,15 +75,13 @@ func (r *Redshift) InsertEvents(batch []map[string]string) error { reader := bytes.NewReader(buf.Bytes()) if err := r.objStorage.Upload(reader, fileName, "text/csv", objectstorage.NoCompression); err != nil { - log.Printf("can't upload file to s3: %s", err) - return err + return fmt.Errorf("can't upload file to s3: %s", err) } // Copy data from s3 bucket to redshift if err := r.Copy(r.cfg.EventsTableName, fileName, "|", true, false); err != nil { - log.Printf("can't copy data from s3 to redshift: %s", err) - return err + return fmt.Errorf("can't copy data from s3 to redshift: %s", err) } - log.Printf("events batch of %d events is successfully saved", len(batch)) + r.log.Info(context.Background(), "events batch of %d events is successfully saved", len(batch)) return nil } @@ -113,15 +113,13 @@ func (r *Redshift) InsertSessions(batch []map[string]string) error { reader := bytes.NewReader(buf.Bytes()) if err := r.objStorage.Upload(reader, fileName, "text/csv", objectstorage.NoCompression); err != nil { - log.Printf("can't upload file to s3: %s", err) - return err + return fmt.Errorf("can't upload file to s3: %s", err) } // Copy data from s3 bucket to redshift if err := r.Copy(r.cfg.SessionsTableName, fileName, "|", true, false); err != nil { - log.Printf("can't copy data from s3 to redshift: %s", err) - return err + return fmt.Errorf("can't copy data from s3 to redshift: %s", err) } - log.Printf("sessions batch of %d sessions is successfully saved", len(batch)) + r.log.Info(context.Background(), "sessions batch of %d sessions is successfully saved", len(batch)) return nil } @@ -131,19 +129,23 @@ func (r *Redshift) Copy(tableName, fileName, delimiter string, creds, gzip bool) gzipSQL string ) if creds { - credentials = fmt.Sprintf(`ACCESS_KEY_ID '%s' SECRET_ACCESS_KEY '%s'`, r.cfg.AWSAccessKeyID, r.cfg.AWSSecretAccessKey) + if r.cfg.AWSAccessKeyID != "" && r.cfg.AWSSecretAccessKey != "" { + credentials = fmt.Sprintf(`ACCESS_KEY_ID '%s' SECRET_ACCESS_KEY '%s'`, r.cfg.AWSAccessKeyID, r.cfg.AWSSecretAccessKey) + } else if r.cfg.AWSIAMRole != "" { + credentials = fmt.Sprintf(`IAM_ROLE '%s'`, "r.cfg.AWSIAMRole") + } else { + credentials = "IAM_ROLE default" + } } if gzip { gzipSQL = "GZIP" } - bucketName := "rdshftbucket" - filePath := fmt.Sprintf("s3://%s/%s", bucketName, fileName) + filePath := fmt.Sprintf("s3://%s/%s", r.cfg.Redshift.Bucket, fileName) - copySQL := fmt.Sprintf(`COPY "%s" FROM '%s' WITH %s TIMEFORMAT 'auto' DATEFORMAT 'auto' TRUNCATECOLUMNS - STATUPDATE ON %s DELIMITER AS '%s' IGNOREHEADER 1 REMOVEQUOTES ESCAPE TRIMBLANKS EMPTYASNULL ACCEPTANYDATE`, + copySQL := fmt.Sprintf(`COPY "%s" FROM '%s' WITH %s TIMEFORMAT 'auto' DATEFORMAT 'auto' TRUNCATECOLUMNS STATUPDATE ON %s DELIMITER AS '%s' IGNOREHEADER 1 REMOVEQUOTES ESCAPE TRIMBLANKS EMPTYASNULL ACCEPTANYDATE`, tableName, filePath, gzipSQL, credentials, delimiter) - log.Printf("Running command: %s", copySQL) + r.log.Debug(context.Background(), "Executing COPY SQL: %s", copySQL) _, err := r.db.ExecContext(r.ctx, copySQL) return err diff --git a/ee/backend/pkg/connector/saver.go b/ee/backend/pkg/connector/saver.go index b6c835249..315d89e0e 100644 --- a/ee/backend/pkg/connector/saver.go +++ b/ee/backend/pkg/connector/saver.go @@ -1,20 +1,22 @@ package connector import ( + "context" "fmt" - "log" - "openreplay/backend/internal/http/geoip" - "openreplay/backend/pkg/projects" - "openreplay/backend/pkg/sessions" "strconv" "time" config "openreplay/backend/internal/config/connector" + "openreplay/backend/internal/http/geoip" + "openreplay/backend/pkg/logger" "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/projects" + "openreplay/backend/pkg/sessions" ) // Saver collect sessions and events and saves them to Redshift type Saver struct { + log logger.Logger cfg *config.Config db Database sessModule sessions.Sessions @@ -26,19 +28,21 @@ type Saver struct { events []map[string]string } -func New(cfg *config.Config, db Database, sessions sessions.Sessions, projects projects.Projects) *Saver { +func New(log logger.Logger, cfg *config.Config, db Database, sessions sessions.Sessions, projects projects.Projects) *Saver { + ctx := context.Background() if cfg == nil { - log.Fatal("connector config is empty") + log.Fatal(ctx, "connector config is empty") } // Validate column names in sessions table if err := validateColumnNames(sessionColumns); err != nil { - log.Printf("can't validate column names: %s", err) + log.Error(ctx, "can't validate sessions column names: %s", err) } // Validate column names in events table if err := validateColumnNames(eventColumns); err != nil { - log.Printf("can't validate column names: %s", err) + log.Error(ctx, "can't validate events column names: %s", err) } return &Saver{ + log: log, cfg: cfg, db: db, sessModule: sessions, @@ -83,14 +87,48 @@ func handleEvent(msg messages.Message) map[string]string { case *messages.CustomIssue: event["customissue_name"] = QUOTES(m.Name) event["customissue_payload"] = QUOTES(m.Payload) + // Mobile events + case *messages.IOSEvent: + event["mobile_event_name"] = QUOTES(m.Name) + event["mobile_event_payload"] = QUOTES(m.Payload) + case *messages.IOSNetworkCall: + event["mobile_networkcall_type"] = QUOTES(m.Type) + event["mobile_networkcall_method"] = QUOTES(m.Method) + event["mobile_networkcall_url"] = QUOTES(m.URL) + event["mobile_networkcall_request"] = QUOTES(m.Request) + event["mobile_networkcall_response"] = QUOTES(m.Response) + event["mobile_networkcall_status"] = fmt.Sprintf("%d", m.Status) + event["mobile_networkcall_timestamp"] = fmt.Sprintf("%d", m.Timestamp) + event["mobile_networkcall_duration"] = fmt.Sprintf("%d", m.Duration) + case *messages.IOSClickEvent: + event["mobile_clickevent_x"] = fmt.Sprintf("%d", m.X) + event["mobile_clickevent_y"] = fmt.Sprintf("%d", m.Y) + event["mobile_clickevent_timestamp"] = fmt.Sprintf("%d", m.Timestamp) + event["mobile_clickevent_label"] = QUOTES(m.Label) + case *messages.IOSSwipeEvent: + event["mobile_swipeevent_x"] = fmt.Sprintf("%d", m.X) + event["mobile_swipeevent_y"] = fmt.Sprintf("%d", m.Y) + event["mobile_swipeevent_timestamp"] = fmt.Sprintf("%d", m.Timestamp) + event["mobile_swipeevent_label"] = QUOTES(m.Label) + case *messages.IOSInputEvent: + event["mobile_inputevent_label"] = QUOTES(m.Label) + event["mobile_inputevent_value"] = QUOTES(m.Value) + case *messages.IOSCrash: + event["mobile_crash_name"] = QUOTES(m.Name) + event["mobile_crash_reason"] = QUOTES(m.Reason) + event["mobile_crash_stacktrace"] = QUOTES(m.Stacktrace) + case *messages.IOSIssueEvent: + event["mobile_issueevent_timestamp"] = fmt.Sprintf("%d", m.Timestamp) + event["mobile_issueevent_type"] = QUOTES(m.Type) + event["mobile_issueevent_context_string"] = QUOTES(m.ContextString) + event["mobile_issueevent_context"] = QUOTES(m.Context) + event["mobile_issueevent_payload"] = QUOTES(m.Payload) } if len(event) == 0 { return nil } event["sessionid"] = fmt.Sprintf("%d", msg.SessionID()) - event["received_at"] = fmt.Sprintf("%d", uint64(time.Now().UnixMilli())) - event["batch_order_number"] = fmt.Sprintf("%d", 0) return event } @@ -110,21 +148,19 @@ func (s *Saver) updateSessionInfoFromCache(sessID uint64, sess map[string]string sess["session_end_timestamp"] = fmt.Sprintf("%d", info.Timestamp+*info.Duration) } if sess["session_duration"] == "" && sess["session_start_timestamp"] != "" && sess["session_end_timestamp"] != "" { + ctx := context.WithValue(context.Background(), "sessionID", sessID) start, err := strconv.Atoi(sess["session_start_timestamp"]) if err != nil { - log.Printf("Error parsing session_start_timestamp: %v", err) + s.log.Error(ctx, "error parsing session_start_timestamp: %s", err) } end, err := strconv.Atoi(sess["session_end_timestamp"]) if err != nil { - log.Printf("Error parsing session_end_timestamp: %v", err) + s.log.Error(ctx, "error parsing session_end_timestamp: %s", err) } if start != 0 && end != 0 { sess["session_duration"] = fmt.Sprintf("%d", end-start) } } - if sess["user_agent"] == "" && info.UserAgent != "" { - sess["user_agent"] = QUOTES(info.UserAgent) - } if sess["user_browser"] == "" && info.UserBrowser != "" { sess["user_browser"] = QUOTES(info.UserBrowser) } @@ -200,19 +236,23 @@ func (s *Saver) handleSession(msg messages.Message) { case *messages.SessionStart, *messages.SessionEnd, *messages.ConnectionInformation, *messages.Metadata, *messages.PageEvent, *messages.PerformanceTrackAggr, *messages.UserID, *messages.UserAnonymousID, *messages.JSException, *messages.JSExceptionDeprecated, *messages.InputEvent, *messages.MouseClick, - *messages.IssueEvent, *messages.IssueEventDeprecated: + *messages.IssueEvent, *messages.IssueEventDeprecated, + // Mobile messages + *messages.IOSSessionStart, *messages.IOSSessionEnd, *messages.IOSUserID, *messages.IOSUserAnonymousID, + *messages.IOSMetadata: default: return } if s.sessions == nil { s.sessions = make(map[uint64]map[string]string) } + ctx := context.WithValue(context.Background(), "sessionID", msg.SessionID()) sess, ok := s.sessions[msg.SessionID()] if !ok { // Try to load session from cache cached, err := s.sessModule.GetCached(msg.SessionID()) if err != nil && err != sessions.ErrSessionNotFound { - log.Printf("Failed to get cached session: %v", err) + s.log.Warn(ctx, "failed to get cached session: %s", err) } if cached != nil { sess = cached @@ -233,7 +273,6 @@ func (s *Saver) handleSession(msg messages.Message) { case *messages.SessionStart: sess["session_start_timestamp"] = fmt.Sprintf("%d", m.Timestamp) sess["user_uuid"] = QUOTES(m.UserUUID) - sess["user_agent"] = QUOTES(m.UserAgent) sess["user_os"] = QUOTES(m.UserOS) sess["user_os_version"] = QUOTES(m.UserOSVersion) sess["user_browser"] = QUOTES(m.UserBrowser) @@ -251,7 +290,7 @@ func (s *Saver) handleSession(msg messages.Message) { case *messages.SessionEnd: sess["session_end_timestamp"] = fmt.Sprintf("%d", m.Timestamp) if err := s.updateSessionInfoFromCache(msg.SessionID(), sess); err != nil { - log.Printf("Error updating session info from cache: %v", err) + s.log.Warn(ctx, "failed to update session info from cache: %s", err) } case *messages.ConnectionInformation: sess["connection_effective_bandwidth"] = fmt.Sprintf("%d", m.Downlink) @@ -259,12 +298,12 @@ func (s *Saver) handleSession(msg messages.Message) { case *messages.Metadata: session, err := s.sessModule.Get(msg.SessionID()) if err != nil { - log.Printf("Error getting session info: %v", err) + s.log.Error(ctx, "error getting session info: %s", err) break } project, err := s.projModule.GetProject(session.ProjectID) if err != nil { - log.Printf("Error getting project info: %v", err) + s.log.Error(ctx, "error getting project info: %s", err) break } keyNo := project.GetMetadataNo(m.Key) @@ -320,6 +359,43 @@ func (s *Saver) handleSession(msg messages.Message) { currIssuesCount = 0 } sess["issues_count"] = fmt.Sprintf("%d", currIssuesCount+1) + // Mobile messages + case *messages.IOSSessionStart: + sess["session_start_timestamp"] = fmt.Sprintf("%d", m.Timestamp) + sess["user_uuid"] = QUOTES(m.UserUUID) + sess["user_os"] = QUOTES(m.UserOS) + sess["user_os_version"] = QUOTES(m.UserOSVersion) + sess["user_device"] = QUOTES(m.UserDevice) + sess["user_device_type"] = QUOTES(m.UserDeviceType) + sess["tracker_version"] = QUOTES(m.TrackerVersion) + sess["rev_id"] = QUOTES(m.RevID) + case *messages.IOSSessionEnd: + sess["session_end_timestamp"] = fmt.Sprintf("%d", m.Timestamp) + if err := s.updateSessionInfoFromCache(msg.SessionID(), sess); err != nil { + s.log.Warn(ctx, "failed to update session info from cache: %s", err) + } + case *messages.IOSMetadata: + session, err := s.sessModule.Get(msg.SessionID()) + if err != nil { + s.log.Error(ctx, "error getting session info: %s", err) + break + } + project, err := s.projModule.GetProject(session.ProjectID) + if err != nil { + s.log.Error(ctx, "error getting project info: %s", err) + break + } + keyNo := project.GetMetadataNo(m.Key) + if keyNo == 0 { + break + } + sess[fmt.Sprintf("metadata_%d", keyNo)] = QUOTES(m.Value) + case *messages.IOSUserID: + if m.ID != "" { + sess["user_id"] = QUOTES(m.ID) + } + case *messages.IOSUserAnonymousID: + sess["user_anonymous_id"] = QUOTES(m.ID) default: updated = false } @@ -342,7 +418,7 @@ func (s *Saver) Handle(msg messages.Message) { s.events = append(s.events, newEvent) } s.handleSession(msg) - if msg.TypeID() == messages.MsgSessionEnd { + if msg.TypeID() == messages.MsgSessionEnd || msg.TypeID() == messages.MsgIOSSessionEnd { if s.finishedSessions == nil { s.finishedSessions = make([]uint64, 0) } @@ -353,18 +429,18 @@ func (s *Saver) Handle(msg messages.Message) { func (s *Saver) commitEvents() { if len(s.events) == 0 { - log.Printf("empty events batch") + s.log.Info(context.Background(), "empty events batch") return } if err := s.db.InsertEvents(s.events); err != nil { - log.Printf("can't insert events: %s", err) + s.log.Error(context.Background(), "can't insert events: %s", err) } s.events = nil } func (s *Saver) commitSessions() { if len(s.finishedSessions) == 0 { - log.Printf("empty sessions batch") + s.log.Info(context.Background(), "empty sessions batch") return } l := len(s.finishedSessions) @@ -380,10 +456,14 @@ func (s *Saver) commitSessions() { toSend = append(toSend, sessionID) } } - if err := s.db.InsertSessions(sessions); err != nil { - log.Printf("can't insert sessions: %s", err) + if len(sessions) == 0 { + s.log.Info(context.Background(), "empty sessions batch to send") + return } - log.Printf("finished: %d, to keep: %d, to send: %d", l, len(toKeep), len(toSend)) + if err := s.db.InsertSessions(sessions); err != nil { + s.log.Error(context.Background(), "can't insert sessions: %s", err) + } + s.log.Info(context.Background(), "finished: %d, to keep: %d, to send: %d", l, len(toKeep), len(toSend)) // Clear current list of finished sessions for _, sessionID := range toSend { delete(s.sessions, sessionID) // delete session info @@ -398,10 +478,11 @@ func (s *Saver) Commit() { start := time.Now() for sessionID, _ := range s.updatedSessions { if err := s.sessModule.AddCached(sessionID, s.sessions[sessionID]); err != nil { - log.Printf("Error adding session to cache: %v", err) + ctx := context.WithValue(context.Background(), "sessionID", sessionID) + s.log.Error(ctx, "can't add session to cache: %s", err) } } - log.Printf("Cached %d sessions in %s", len(s.updatedSessions), time.Since(start)) + s.log.Info(context.Background(), "cached %d sessions in %s", len(s.updatedSessions), time.Since(start)) s.updatedSessions = nil // Commit events and sessions (send to Redshift) s.commitEvents() @@ -428,12 +509,13 @@ func (s *Saver) checkZombieSessions() { // Do that several times (save attempts number) after last attempt delete session from memory to avoid sessions with not filled fields zombieSession := s.sessions[sessionID] if zombieSession["session_start_timestamp"] == "" || zombieSession["session_end_timestamp"] == "" { + ctx := context.WithValue(context.Background(), "sessionID", sessionID) // Let's try to load session from cache if err := s.updateSessionInfoFromCache(sessionID, zombieSession); err != nil { - log.Printf("Error updating zombie session info from cache: %v", err) + s.log.Warn(ctx, "failed to update zombie session info from cache: %s", err) } else { s.sessions[sessionID] = zombieSession - log.Printf("Updated zombie session info from cache: %v", zombieSession) + s.log.Debug(ctx, "updated zombie session info from cache: %v", zombieSession) } } if zombieSession["session_start_timestamp"] == "" || zombieSession["session_end_timestamp"] == "" { @@ -445,7 +527,7 @@ func (s *Saver) checkZombieSessions() { } } if zombieSessionsCount > 0 { - log.Printf("Found %d zombie sessions", zombieSessionsCount) + s.log.Info(context.Background(), "found %d zombie sessions", zombieSessionsCount) } }