diff --git a/backend/internal/config/common/config.go b/backend/internal/config/common/config.go index 460c46789..073981351 100644 --- a/backend/internal/config/common/config.go +++ b/backend/internal/config/common/config.go @@ -48,6 +48,7 @@ type Redshift struct { User string `env:"REDSHIFT_USER"` Password string `env:"REDSHIFT_PASSWORD"` Database string `env:"REDSHIFT_DATABASE"` + Bucket string `env:"REDSHIFT_BUCKET,default=rdshftbucket"` } // Clickhouse config diff --git a/backend/internal/config/objectstorage/config.go b/backend/internal/config/objectstorage/config.go index a1ca6568d..7ffab5f70 100644 --- a/backend/internal/config/objectstorage/config.go +++ b/backend/internal/config/objectstorage/config.go @@ -14,6 +14,7 @@ type ObjectsConfig struct { AzureAccountName string `env:"AZURE_ACCOUNT_NAME"` AzureAccountKey string `env:"AZURE_ACCOUNT_KEY"` UseS3Tags bool `env:"USE_S3_TAGS,default=true"` + AWSIAMRole string `env:"AWS_IAM_ROLE"` } func (c *ObjectsConfig) UseFileTags() bool { diff --git a/backend/pkg/logger/logger.go b/backend/pkg/logger/logger.go index 7f9b43862..d30d7a42c 100644 --- a/backend/pkg/logger/logger.go +++ b/backend/pkg/logger/logger.go @@ -9,6 +9,7 @@ import ( ) type Logger interface { + Debug(ctx context.Context, message string, args ...interface{}) Info(ctx context.Context, message string, args ...interface{}) Warn(ctx context.Context, message string, args ...interface{}) Error(ctx context.Context, message string, args ...interface{}) @@ -51,6 +52,10 @@ func (l *loggerImpl) prepare(ctx context.Context, logger *zap.Logger) *zap.Logge return logger } +func (l *loggerImpl) Debug(ctx context.Context, message string, args ...interface{}) { + l.prepare(ctx, l.l.With(zap.String("level", "debug"))).Debug(fmt.Sprintf(message, args...)) +} + func (l *loggerImpl) Info(ctx context.Context, message string, args ...interface{}) { l.prepare(ctx, l.l.With(zap.String("level", "info"))).Info(fmt.Sprintf(message, args...)) } diff --git a/ee/backend/cmd/connector/main.go b/ee/backend/cmd/connector/main.go index c0886d69f..5e9f6d3de 100644 --- a/ee/backend/cmd/connector/main.go +++ b/ee/backend/cmd/connector/main.go @@ -27,21 +27,29 @@ func main() { if err != nil { log.Fatal(ctx, "can't init object storage: %s", err) } + batches, err := saver.NewBatches(cfg, objStore) + if err != nil { + log.Fatal(ctx, "can't init s3 buckets: %s", err) + } var db saver.Database switch cfg.ConnectorType { case "redshift": - if db, err = saver.NewRedshift(log, cfg, objStore); err != nil { + if db, err = saver.NewRedshift(log, cfg, batches); err != nil { log.Fatal(ctx, "can't init redshift connection: %s", err) } case "clickhouse": - if db, err = saver.NewClickHouse(log, cfg); err != nil { + if db, err = saver.NewClickHouse(log, cfg, batches); err != nil { log.Fatal(ctx, "can't init clickhouse connection: %s", err) } case "elasticsearch": if db, err = saver.NewElasticSearch(log, cfg); err != nil { log.Fatal(ctx, "can't init elasticsearch connection: %s", err) } + case "s3": + if db, err = saver.NewS3Storage(log, cfg, batches); err != nil { + log.Fatal(ctx, "can't init s3 connection: %s", err) + } default: log.Fatal(ctx, "unknown connector type: %s", cfg.ConnectorType) } @@ -73,7 +81,13 @@ func main() { messages.MsgSessionStart, messages.MsgSessionEnd, messages.MsgConnectionInformation, messages.MsgMetadata, messages.MsgPageEvent, messages.MsgPerformanceTrackAggr, messages.MsgUserID, messages.MsgUserAnonymousID, messages.MsgJSException, messages.MsgJSExceptionDeprecated, - messages.MsgInputEvent, messages.MsgMouseClick, messages.MsgIssueEventDeprecated} + messages.MsgInputEvent, messages.MsgMouseClick, messages.MsgIssueEventDeprecated, + // Mobile messages + messages.MsgMobileSessionStart, messages.MsgMobileSessionEnd, messages.MsgMobileUserID, messages.MsgMobileUserAnonymousID, + messages.MsgMobileMetadata, messages.MsgMobileEvent, messages.MsgMobileNetworkCall, + messages.MsgMobileClickEvent, messages.MsgMobileSwipeEvent, messages.MsgMobileInputEvent, + messages.MsgMobileCrash, messages.MsgMobileIssueEvent, + } // Init consumer consumer := queue.NewConsumer( diff --git a/ee/backend/pkg/connector/clickhouse.go b/ee/backend/pkg/connector/clickhouse.go index 283bc2fbf..0c66c11cc 100644 --- a/ee/backend/pkg/connector/clickhouse.go +++ b/ee/backend/pkg/connector/clickhouse.go @@ -13,12 +13,13 @@ import ( ) type ClickHouse struct { - log logger.Logger - cfg *connector.Config - conn driver.Conn + log logger.Logger + cfg *connector.Config + conn driver.Conn + batches *Batches } -func NewClickHouse(log logger.Logger, cfg *connector.Config) (*ClickHouse, error) { +func NewClickHouse(log logger.Logger, cfg *connector.Config, batches *Batches) (*ClickHouse, error) { url := cfg.Clickhouse.URL url = strings.TrimPrefix(url, "tcp://") url = strings.TrimSuffix(url, "/default") @@ -44,16 +45,21 @@ func NewClickHouse(log logger.Logger, cfg *connector.Config) (*ClickHouse, error return nil, err } c := &ClickHouse{ - log: log, - cfg: cfg, - conn: conn, + log: log, + cfg: cfg, + conn: conn, + batches: batches, } return c, nil } +func (c *ClickHouse) InsertEvents(batch []map[string]string) error { + return c.insertEventsUsingBuffer(batch) +} + const eventsSQL = "INSERT INTO connector_events_buffer (sessionid, consolelog_level, consolelog_value, customevent_name, customevent_payload, jsexception_message, jsexception_name, jsexception_payload, jsexception_metadata, networkrequest_type, networkrequest_method, networkrequest_url, networkrequest_request, networkrequest_response, networkrequest_status, networkrequest_timestamp, networkrequest_duration, issueevent_message_id, issueevent_timestamp, issueevent_type, issueevent_context_string, issueevent_context, issueevent_payload, issueevent_url, customissue_name, customissue_payload, received_at, batch_order_number) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" -func (c *ClickHouse) InsertEvents(batch []map[string]string) error { +func (c *ClickHouse) insertEventsUsingBuffer(batch []map[string]string) error { bulk, err := c.conn.PrepareBatch(context.Background(), eventsSQL) if err != nil { return err @@ -97,9 +103,13 @@ func (c *ClickHouse) InsertEvents(batch []map[string]string) error { return bulk.Send() } +func (c *ClickHouse) InsertSessions(batch []map[string]string) error { + return c.insertSessionsUsingBuffer(batch) +} + const sessionsSQL = "INSERT INTO connector_user_sessions_buffer (sessionid, user_agent, user_browser, user_browser_version, user_country, user_device, user_device_heap_size, user_device_memory_size, user_device_type, user_os, user_os_version, user_uuid, connection_effective_bandwidth, connection_type, referrer, user_anonymous_id, user_id, session_start_timestamp, session_end_timestamp, session_duration, first_contentful_paint, speed_index, visually_complete, timing_time_to_interactive, avg_cpu, avg_fps, max_cpu, max_fps, max_total_js_heap_size, max_used_js_heap_size, js_exceptions_count, inputs_count, clicks_count, issues_count, pages_count, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" -func (c *ClickHouse) InsertSessions(batch []map[string]string) error { +func (c *ClickHouse) insertSessionsUsingBuffer(batch []map[string]string) error { bulk, err := c.conn.PrepareBatch(context.Background(), sessionsSQL) if err != nil { return err diff --git a/ee/backend/pkg/connector/redshift.go b/ee/backend/pkg/connector/redshift.go index 2ce1fad56..0d58ab2da 100644 --- a/ee/backend/pkg/connector/redshift.go +++ b/ee/backend/pkg/connector/redshift.go @@ -1,28 +1,25 @@ package connector import ( - "bytes" "context" "database/sql" "fmt" - "github.com/google/uuid" _ "github.com/lib/pq" "openreplay/backend/internal/config/connector" "openreplay/backend/pkg/logger" - "openreplay/backend/pkg/objectstorage" ) type Redshift struct { - log logger.Logger - cfg *connector.Config - ctx context.Context - db *sql.DB - objStorage objectstorage.ObjectStorage + log logger.Logger + cfg *connector.Config + ctx context.Context + db *sql.DB + batches *Batches } -func NewRedshift(log logger.Logger, cfg *connector.Config, objStorage objectstorage.ObjectStorage) (*Redshift, error) { +func NewRedshift(log logger.Logger, cfg *connector.Config, batches *Batches) (*Redshift, error) { var source string if cfg.ConnectionString != "" { source = cfg.ConnectionString @@ -39,117 +36,68 @@ func NewRedshift(log logger.Logger, cfg *connector.Config, objStorage objectstor return nil, err } return &Redshift{ - log: log, - cfg: cfg, - ctx: context.Background(), - db: sqldb, - objStorage: objStorage, + log: log, + cfg: cfg, + ctx: context.Background(), + db: sqldb, + batches: batches, }, nil } -func eventsToBuffer(batch []map[string]string) *bytes.Buffer { - buf := bytes.NewBuffer(nil) - - // Write header - for _, column := range eventColumns { - buf.WriteString(column + "|") - } - buf.Truncate(buf.Len() - 1) - - // Write data - for _, event := range batch { - buf.WriteString("\n") - for _, column := range eventColumns { - buf.WriteString(event[column] + "|") - } - buf.Truncate(buf.Len() - 1) - } - return buf -} - -func (r *Redshift) InsertEvents(batch []map[string]string) error { - // Send data to S3 - fileName := fmt.Sprintf("connector_data/%s-%s.csv", r.cfg.EventsTableName, uuid.New().String()) - // Create csv file - buf := eventsToBuffer(batch) - - reader := bytes.NewReader(buf.Bytes()) - if err := r.objStorage.Upload(reader, fileName, "text/csv", objectstorage.NoCompression); err != nil { - return fmt.Errorf("can't upload file to s3: %s", err) - } - // Copy data from s3 bucket to redshift - if err := r.Copy(r.cfg.EventsTableName, fileName, "|", true, false); err != nil { - return fmt.Errorf("can't copy data from s3 to redshift: %s", err) - } - r.log.Info(context.Background(), "events batch of %d events is successfully saved", len(batch)) - return nil -} - -func sessionsToBuffer(batch []map[string]string) *bytes.Buffer { - buf := bytes.NewBuffer(nil) - - // Write header - for _, column := range sessionColumns { - buf.WriteString(column + "|") - } - buf.Truncate(buf.Len() - 1) - - // Write data - for _, sess := range batch { - buf.WriteString("\n") - for _, column := range sessionColumns { - buf.WriteString(sess[column] + "|") - } - buf.Truncate(buf.Len() - 1) - } - return buf -} - func (r *Redshift) InsertSessions(batch []map[string]string) error { - // Send data to S3 - fileName := fmt.Sprintf("connector_data/%s-%s.csv", r.cfg.SessionsTableName, uuid.New().String()) - // Create csv file - buf := sessionsToBuffer(batch) - - reader := bytes.NewReader(buf.Bytes()) - if err := r.objStorage.Upload(reader, fileName, "text/csv", objectstorage.NoCompression); err != nil { - return fmt.Errorf("can't upload file to s3: %s", err) + fileName := generateName(r.cfg.SessionsTableName) + if err := r.batches.Insert(batch, fileName, sessionColumns); err != nil { + return fmt.Errorf("can't insert sessions batch: %s", err) } // Copy data from s3 bucket to redshift - if err := r.Copy(r.cfg.SessionsTableName, fileName, "|", true, false); err != nil { + if err := r.copy(r.cfg.SessionsTableName, fileName, "|", true, false); err != nil { return fmt.Errorf("can't copy data from s3 to redshift: %s", err) } r.log.Info(context.Background(), "sessions batch of %d sessions is successfully saved", len(batch)) return nil } -func (r *Redshift) Copy(tableName, fileName, delimiter string, creds, gzip bool) error { +func (r *Redshift) InsertEvents(batch []map[string]string) error { + fileName := generateName(r.cfg.EventsTableName) + if err := r.batches.Insert(batch, fileName, eventColumns); err != nil { + return fmt.Errorf("can't insert events batch: %s", err) + } + // Copy data from s3 bucket to redshift + if err := r.copy(r.cfg.EventsTableName, fileName, "|", true, false); err != nil { + return fmt.Errorf("can't copy data from s3 to redshift: %s", err) + } + r.log.Info(context.Background(), "events batch of %d events is successfully saved", len(batch)) + return nil +} + +func (r *Redshift) copy(tableName, fileName, delimiter string, creds, gzip bool) error { var ( credentials string gzipSQL string ) if creds { - credentials = fmt.Sprintf(`ACCESS_KEY_ID '%s' SECRET_ACCESS_KEY '%s'`, r.cfg.AWSAccessKeyID, r.cfg.AWSSecretAccessKey) + if r.cfg.AWSAccessKeyID != "" && r.cfg.AWSSecretAccessKey != "" { + credentials = fmt.Sprintf(`ACCESS_KEY_ID '%s' SECRET_ACCESS_KEY '%s'`, r.cfg.AWSAccessKeyID, r.cfg.AWSSecretAccessKey) + } else if r.cfg.AWSIAMRole != "" { + credentials = fmt.Sprintf(`IAM_ROLE '%s'`, r.cfg.AWSIAMRole) + } else { + credentials = "IAM_ROLE default" + } } if gzip { gzipSQL = "GZIP" } - bucketName := "rdshftbucket" - filePath := fmt.Sprintf("s3://%s/%s", bucketName, fileName) + filePath := fmt.Sprintf("s3://%s/%s", r.cfg.Redshift.Bucket, fileName) - copySQL := fmt.Sprintf(`COPY "%s" FROM '%s' WITH %s TIMEFORMAT 'auto' DATEFORMAT 'auto' TRUNCATECOLUMNS - STATUPDATE ON %s DELIMITER AS '%s' IGNOREHEADER 1 REMOVEQUOTES ESCAPE TRIMBLANKS EMPTYASNULL ACCEPTANYDATE`, + copySQL := fmt.Sprintf(`COPY "%s" FROM '%s' WITH %s TIMEFORMAT 'auto' DATEFORMAT 'auto' TRUNCATECOLUMNS STATUPDATE ON %s DELIMITER AS '%s' IGNOREHEADER 1 REMOVEQUOTES ESCAPE TRIMBLANKS EMPTYASNULL ACCEPTANYDATE`, tableName, filePath, gzipSQL, credentials, delimiter) + r.log.Debug(context.Background(), "Executing COPY SQL: %s", copySQL) _, err := r.db.ExecContext(r.ctx, copySQL) return err } -func (r *Redshift) ExecutionDuration(fileName string) (int, error) { - return 0, nil -} - func (r *Redshift) Close() error { return r.db.Close() } diff --git a/ee/backend/pkg/connector/s3.go b/ee/backend/pkg/connector/s3.go new file mode 100644 index 000000000..7b41266e2 --- /dev/null +++ b/ee/backend/pkg/connector/s3.go @@ -0,0 +1,47 @@ +package connector + +import ( + "context" + "fmt" + + "openreplay/backend/internal/config/connector" + "openreplay/backend/pkg/logger" +) + +type S3Storage struct { + log logger.Logger + cfg *connector.Config + ctx context.Context + batches *Batches +} + +func NewS3Storage(log logger.Logger, cfg *connector.Config, buckets *Batches) (*S3Storage, error) { + return &S3Storage{ + log: log, + cfg: cfg, + ctx: context.Background(), + batches: buckets, + }, nil +} + +func (ds *S3Storage) InsertSessions(batch []map[string]string) error { + fileName := generateName(ds.cfg.SessionsTableName) + if err := ds.batches.Insert(batch, fileName, sessionColumns); err != nil { + return fmt.Errorf("can't insert sessions batch: %s", err) + } + ds.log.Info(context.Background(), "sessions batch of %d sessions is successfully saved", len(batch)) + return nil +} + +func (ds *S3Storage) InsertEvents(batch []map[string]string) error { + fileName := generateName(ds.cfg.EventsTableName) + if err := ds.batches.Insert(batch, fileName, eventColumns); err != nil { + return fmt.Errorf("can't insert events batch: %s", err) + } + ds.log.Info(context.Background(), "events batch of %d events is successfully saved", len(batch)) + return nil +} + +func (ds *S3Storage) Close() error { + return nil +} diff --git a/ee/backend/pkg/connector/s3batches.go b/ee/backend/pkg/connector/s3batches.go new file mode 100644 index 000000000..38669ff7b --- /dev/null +++ b/ee/backend/pkg/connector/s3batches.go @@ -0,0 +1,56 @@ +package connector + +import ( + "bytes" + "fmt" + + "github.com/google/uuid" + + "openreplay/backend/internal/config/connector" + "openreplay/backend/pkg/objectstorage" +) + +type Batches struct { + cfg *connector.Config + objStorage objectstorage.ObjectStorage +} + +func NewBatches(cfg *connector.Config, objStorage objectstorage.ObjectStorage) (*Batches, error) { + return &Batches{ + cfg: cfg, + objStorage: objStorage, + }, nil +} + +func (ds *Batches) Insert(batch []map[string]string, fileName string, columns []string) error { + buf := dataToCSV(batch, columns) + reader := bytes.NewReader(buf.Bytes()) + if err := ds.objStorage.Upload(reader, fileName, "text/csv", objectstorage.NoCompression); err != nil { + return fmt.Errorf("can't upload file to s3: %s", err) + } + return nil +} + +func generateName(table string) string { + return fmt.Sprintf("connector_data/%s-%s.csv", table, uuid.New().String()) +} + +func dataToCSV(batch []map[string]string, columns []string) *bytes.Buffer { + buf := bytes.NewBuffer(nil) + + // Write header (column names) + for _, column := range columns { + buf.WriteString(column + "|") + } + buf.Truncate(buf.Len() - 1) + + // Write data (rows) + for _, data := range batch { + buf.WriteString("\n") + for _, column := range columns { + buf.WriteString(data[column] + "|") + } + buf.Truncate(buf.Len() - 1) + } + return buf +} diff --git a/ee/backend/pkg/connector/saver.go b/ee/backend/pkg/connector/saver.go index 24dffe2d2..c218f1303 100644 --- a/ee/backend/pkg/connector/saver.go +++ b/ee/backend/pkg/connector/saver.go @@ -87,14 +87,48 @@ func handleEvent(msg messages.Message) map[string]string { case *messages.CustomIssue: event["customissue_name"] = QUOTES(m.Name) event["customissue_payload"] = QUOTES(m.Payload) + // Mobile events + case *messages.MobileEvent: + event["mobile_event_name"] = QUOTES(m.Name) + event["mobile_event_payload"] = QUOTES(m.Payload) + case *messages.MobileNetworkCall: + event["mobile_networkcall_type"] = QUOTES(m.Type) + event["mobile_networkcall_method"] = QUOTES(m.Method) + event["mobile_networkcall_url"] = QUOTES(m.URL) + event["mobile_networkcall_request"] = QUOTES(m.Request) + event["mobile_networkcall_response"] = QUOTES(m.Response) + event["mobile_networkcall_status"] = fmt.Sprintf("%d", m.Status) + event["mobile_networkcall_timestamp"] = fmt.Sprintf("%d", m.Timestamp) + event["mobile_networkcall_duration"] = fmt.Sprintf("%d", m.Duration) + case *messages.MobileClickEvent: + event["mobile_clickevent_x"] = fmt.Sprintf("%d", m.X) + event["mobile_clickevent_y"] = fmt.Sprintf("%d", m.Y) + event["mobile_clickevent_timestamp"] = fmt.Sprintf("%d", m.Timestamp) + event["mobile_clickevent_label"] = QUOTES(m.Label) + case *messages.MobileSwipeEvent: + event["mobile_swipeevent_x"] = fmt.Sprintf("%d", m.X) + event["mobile_swipeevent_y"] = fmt.Sprintf("%d", m.Y) + event["mobile_swipeevent_timestamp"] = fmt.Sprintf("%d", m.Timestamp) + event["mobile_swipeevent_label"] = QUOTES(m.Label) + case *messages.MobileInputEvent: + event["mobile_inputevent_label"] = QUOTES(m.Label) + event["mobile_inputevent_value"] = QUOTES(m.Value) + case *messages.MobileCrash: + event["mobile_crash_name"] = QUOTES(m.Name) + event["mobile_crash_reason"] = QUOTES(m.Reason) + event["mobile_crash_stacktrace"] = QUOTES(m.Stacktrace) + case *messages.MobileIssueEvent: + event["mobile_issueevent_timestamp"] = fmt.Sprintf("%d", m.Timestamp) + event["mobile_issueevent_type"] = QUOTES(m.Type) + event["mobile_issueevent_context_string"] = QUOTES(m.ContextString) + event["mobile_issueevent_context"] = QUOTES(m.Context) + event["mobile_issueevent_payload"] = QUOTES(m.Payload) } if len(event) == 0 { return nil } event["sessionid"] = fmt.Sprintf("%d", msg.SessionID()) - event["received_at"] = fmt.Sprintf("%d", uint64(time.Now().UnixMilli())) - event["batch_order_number"] = fmt.Sprintf("%d", 0) return event } @@ -127,9 +161,6 @@ func (s *Saver) updateSessionInfoFromCache(sessID uint64, sess map[string]string sess["session_duration"] = fmt.Sprintf("%d", end-start) } } - if sess["user_agent"] == "" && info.UserAgent != "" { - sess["user_agent"] = QUOTES(info.UserAgent) - } if sess["user_browser"] == "" && info.UserBrowser != "" { sess["user_browser"] = QUOTES(info.UserBrowser) } @@ -205,7 +236,10 @@ func (s *Saver) handleSession(msg messages.Message) { case *messages.SessionStart, *messages.SessionEnd, *messages.ConnectionInformation, *messages.Metadata, *messages.PageEvent, *messages.PerformanceTrackAggr, *messages.UserID, *messages.UserAnonymousID, *messages.JSException, *messages.JSExceptionDeprecated, *messages.InputEvent, *messages.MouseClick, - *messages.IssueEvent, *messages.IssueEventDeprecated: + *messages.IssueEvent, *messages.IssueEventDeprecated, + // Mobile messages + *messages.MobileSessionStart, *messages.MobileSessionEnd, *messages.MobileUserID, *messages.MobileUserAnonymousID, + *messages.MobileMetadata: default: return } @@ -239,7 +273,6 @@ func (s *Saver) handleSession(msg messages.Message) { case *messages.SessionStart: sess["session_start_timestamp"] = fmt.Sprintf("%d", m.Timestamp) sess["user_uuid"] = QUOTES(m.UserUUID) - sess["user_agent"] = QUOTES(m.UserAgent) sess["user_os"] = QUOTES(m.UserOS) sess["user_os_version"] = QUOTES(m.UserOSVersion) sess["user_browser"] = QUOTES(m.UserBrowser) @@ -326,6 +359,43 @@ func (s *Saver) handleSession(msg messages.Message) { currIssuesCount = 0 } sess["issues_count"] = fmt.Sprintf("%d", currIssuesCount+1) + // Mobile messages + case *messages.MobileSessionStart: + sess["session_start_timestamp"] = fmt.Sprintf("%d", m.Timestamp) + sess["user_uuid"] = QUOTES(m.UserUUID) + sess["user_os"] = QUOTES(m.UserOS) + sess["user_os_version"] = QUOTES(m.UserOSVersion) + sess["user_device"] = QUOTES(m.UserDevice) + sess["user_device_type"] = QUOTES(m.UserDeviceType) + sess["tracker_version"] = QUOTES(m.TrackerVersion) + sess["rev_id"] = QUOTES(m.RevID) + case *messages.MobileSessionEnd: + sess["session_end_timestamp"] = fmt.Sprintf("%d", m.Timestamp) + if err := s.updateSessionInfoFromCache(msg.SessionID(), sess); err != nil { + s.log.Warn(ctx, "failed to update session info from cache: %s", err) + } + case *messages.MobileMetadata: + session, err := s.sessModule.Get(msg.SessionID()) + if err != nil { + s.log.Error(ctx, "error getting session info: %s", err) + break + } + project, err := s.projModule.GetProject(session.ProjectID) + if err != nil { + s.log.Error(ctx, "error getting project info: %s", err) + break + } + keyNo := project.GetMetadataNo(m.Key) + if keyNo == 0 { + break + } + sess[fmt.Sprintf("metadata_%d", keyNo)] = QUOTES(m.Value) + case *messages.MobileUserID: + if m.ID != "" { + sess["user_id"] = QUOTES(m.ID) + } + case *messages.MobileUserAnonymousID: + sess["user_anonymous_id"] = QUOTES(m.ID) default: updated = false } @@ -348,7 +418,7 @@ func (s *Saver) Handle(msg messages.Message) { s.events = append(s.events, newEvent) } s.handleSession(msg) - if msg.TypeID() == messages.MsgSessionEnd { + if msg.TypeID() == messages.MsgSessionEnd || msg.TypeID() == messages.MsgMobileSessionEnd { if s.finishedSessions == nil { s.finishedSessions = make([]uint64, 0) } @@ -386,6 +456,10 @@ func (s *Saver) commitSessions() { toSend = append(toSend, sessionID) } } + if len(sessions) == 0 { + s.log.Info(context.Background(), "empty sessions batch to send") + return + } if err := s.db.InsertSessions(sessions); err != nil { s.log.Error(context.Background(), "can't insert sessions: %s", err) } @@ -441,7 +515,7 @@ func (s *Saver) checkZombieSessions() { s.log.Warn(ctx, "failed to update zombie session info from cache: %s", err) } else { s.sessions[sessionID] = zombieSession - s.log.Info(ctx, "updated zombie session info from cache: %v", zombieSession) + s.log.Debug(ctx, "updated zombie session info from cache: %v", zombieSession) } } if zombieSession["session_start_timestamp"] == "" || zombieSession["session_end_timestamp"] == "" {