diff --git a/backend/pkg/projects/projects.go b/backend/pkg/projects/projects.go index 162a83408..a178841c7 100644 --- a/backend/pkg/projects/projects.go +++ b/backend/pkg/projects/projects.go @@ -3,13 +3,13 @@ package projects import ( "context" "errors" - "openreplay/backend/pkg/metrics/database" "time" "openreplay/backend/pkg/cache" "openreplay/backend/pkg/db/postgres/pool" "openreplay/backend/pkg/db/redis" "openreplay/backend/pkg/logger" + "openreplay/backend/pkg/metrics/database" ) type Projects interface { diff --git a/ee/backend/cmd/ender/main.go b/ee/backend/cmd/ender/main.go new file mode 100644 index 000000000..17f4447e6 --- /dev/null +++ b/ee/backend/cmd/ender/main.go @@ -0,0 +1,306 @@ +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "openreplay/backend/internal/config/ender" + "openreplay/backend/internal/sessionender" + "openreplay/backend/internal/storage" + "openreplay/backend/pkg/db/postgres/pool" + "openreplay/backend/pkg/db/redis" + "openreplay/backend/pkg/intervals" + "openreplay/backend/pkg/logger" + "openreplay/backend/pkg/memory" + "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/metrics" + "openreplay/backend/pkg/metrics/database" + enderMetrics "openreplay/backend/pkg/metrics/ender" + "openreplay/backend/pkg/projects" + "openreplay/backend/pkg/queue" + "openreplay/backend/pkg/queue/types" + "openreplay/backend/pkg/sessions" +) + +func main() { + ctx := context.Background() + log := logger.New() + cfg := ender.New(log) + // Observability + dbMetric := database.New("ender") + enderMetric := enderMetrics.New("ender") + metrics.New(log, append(enderMetric.List(), dbMetric.List()...)) + + pgConn, err := pool.New(dbMetric, cfg.Postgres.String()) + if err != nil { + log.Fatal(ctx, "can't init postgres connection: %s", err) + } + defer pgConn.Close() + + redisClient, err := redis.New(&cfg.Redis) + if err != nil { + log.Warn(ctx, "can't init redis connection: %s", err) + } + defer redisClient.Close() + + projManager := projects.New(log, pgConn, redisClient, dbMetric) + sessManager := sessions.New(log, pgConn, projManager, redisClient, dbMetric) + + sessionEndGenerator, err := sessionender.New(enderMetric, intervals.EVENTS_SESSION_END_TIMEOUT, cfg.PartitionsNumber) + if err != nil { + log.Fatal(ctx, "can't init ender service: %s", err) + } + + mobileMessages := []int{90, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 107, 110, 111} + + producer := queue.NewProducer(cfg.MessageSizeLimit, true) + consumer := queue.NewConsumer( + cfg.GroupEnder, + []string{ + cfg.TopicRawWeb, + cfg.TopicRawMobile, + }, + messages.NewEnderMessageIterator( + log, + func(msg messages.Message) { sessionEndGenerator.UpdateSession(msg) }, + append([]int{messages.MsgTimestamp}, mobileMessages...), + false), + false, + cfg.MessageSizeLimit, + ) + + memoryManager, err := memory.NewManager(log, cfg.MemoryLimitMB, cfg.MaxMemoryUsage) + if err != nil { + log.Fatal(ctx, "can't init memory manager: %s", err) + } + + log.Info(ctx, "Ender service started") + + sigchan := make(chan os.Signal, 1) + signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) + + tick := time.Tick(intervals.EVENTS_COMMIT_INTERVAL * time.Millisecond) + for { + select { + case sig := <-sigchan: + log.Info(ctx, "Caught signal %v: terminating", sig) + producer.Close(cfg.ProducerTimeout) + if err := consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP); err != nil { + log.Error(ctx, "can't commit messages with offset: %s", err) + } + consumer.Close() + os.Exit(0) + case <-tick: + details := newDetails() + + // Find ended sessions and send notification to other services + sessionEndGenerator.HandleEndedSessions(func(sessions map[uint64]uint64) map[uint64]bool { + // Load all sessions from DB + sessionsList := make([]uint64, 0, len(sessions)) + for sessionID := range sessions { + sessionsList = append(sessionsList, sessionID) + } + completedSessions := make(map[uint64]bool) + sessionsData, err := sessManager.GetManySessions(sessionsList) + if err != nil { + log.Error(ctx, "can't get sessions from database: %s", err) + return completedSessions + } + + // Check if each session was ended + for sessionID, sess := range sessionsData { + sessCtx := context.WithValue(context.Background(), "sessionID", fmt.Sprintf("%d", sessionID)) + + timestamp := sessions[sessionID] + currDuration := *sess.Duration + newDur := timestamp - sess.Timestamp + + // Skip if session was ended before with same duration + if currDuration == newDur { + details.Duplicated[sessionID] = currDuration + completedSessions[sessionID] = true + continue + } + if currDuration > newDur { + details.Shorter[sessionID] = int64(currDuration) - int64(newDur) + completedSessions[sessionID] = true + continue + } + + newDuration, err := sessManager.UpdateDuration(sessionID, timestamp) + if err != nil { + if strings.Contains(err.Error(), "integer out of range") { + // Skip session with broken duration + details.Failed[sessionID] = timestamp + completedSessions[sessionID] = true + continue + } + if strings.Contains(err.Error(), "is less than zero for uint64") { + details.Negative[sessionID] = timestamp + completedSessions[sessionID] = true + continue + } + if strings.Contains(err.Error(), "no rows in result set") { + details.NotFound[sessionID] = timestamp + completedSessions[sessionID] = true + continue + } + log.Error(sessCtx, "can't update session duration, err: %s", err) + continue + } + // Check one more time just in case + if currDuration == newDuration { + details.Duplicated[sessionID] = currDuration + completedSessions[sessionID] = true + continue + } + msg := &messages.SessionEnd{Timestamp: timestamp} + if cfg.UseEncryption { + if key := storage.GenerateEncryptionKey(); key != nil { + if err := sessManager.UpdateEncryptionKey(sessionID, key); err != nil { + log.Warn(sessCtx, "can't save session encryption key: %s, session will not be encrypted", err) + } else { + msg.EncryptionKey = string(key) + } + } + } + if sess != nil && (sess.Platform == "ios" || sess.Platform == "android") { + msg := &messages.MobileSessionEnd{Timestamp: timestamp} + if err := producer.Produce(cfg.TopicRawMobile, sessionID, msg.Encode()); err != nil { + log.Error(sessCtx, "can't send MobileSessionEnd to mobile topic: %s", err) + continue + } + if err := producer.Produce(cfg.TopicRawImages, sessionID, msg.Encode()); err != nil { + log.Error(sessCtx, "can't send MobileSessionEnd signal to canvas topic: %s", err) + } + } else { + if err := producer.Produce(cfg.TopicRawWeb, sessionID, msg.Encode()); err != nil { + log.Error(sessCtx, "can't send sessionEnd to raw topic: %s", err) + continue + } + if err := producer.Produce(cfg.TopicCanvasImages, sessionID, msg.Encode()); err != nil { + log.Error(sessCtx, "can't send sessionEnd signal to canvas topic: %s", err) + } + } + + if currDuration != 0 { + details.Diff[sessionID] = int64(newDuration) - int64(currDuration) + details.Updated++ + } else { + details.New++ + } + completedSessions[sessionID] = true + } + return completedSessions + }) + details.Log(log, ctx) + producer.Flush(cfg.ProducerTimeout) + if err := consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP); err != nil { + log.Error(ctx, "can't commit messages with offset: %s", err) + } + case msg := <-consumer.Rebalanced(): + if msg.Type == types.RebalanceTypeRevoke { + sessionEndGenerator.Disable() + } else { + sessionEndGenerator.ActivePartitions(msg.Partitions) + sessionEndGenerator.Enable() + } + default: + if !memoryManager.HasFreeMemory() { + continue + } + if err := consumer.ConsumeNext(); err != nil { + log.Fatal(ctx, "error on consuming: %s", err) + } + } + } +} + +type logDetails struct { + Failed map[uint64]uint64 + Duplicated map[uint64]uint64 + Negative map[uint64]uint64 + Shorter map[uint64]int64 + NotFound map[uint64]uint64 + Diff map[uint64]int64 + Updated int + New int +} + +func newDetails() *logDetails { + return &logDetails{ + Failed: make(map[uint64]uint64), + Duplicated: make(map[uint64]uint64), + Negative: make(map[uint64]uint64), + Shorter: make(map[uint64]int64), + NotFound: make(map[uint64]uint64), + Diff: make(map[uint64]int64), + Updated: 0, + New: 0, + } +} + +func (l *logDetails) Log(log logger.Logger, ctx context.Context) { + if n := len(l.Failed); n > 0 { + log.Debug(ctx, "sessions with wrong duration: %d, %v", n, l.Failed) + } + if n := len(l.Negative); n > 0 { + log.Debug(ctx, "sessions with negative duration: %d, %v", n, l.Negative) + } + if n := len(l.NotFound); n > 0 { + log.Debug(ctx, "sessions without info in DB: %d, %v", n, l.NotFound) + } + var logBuilder strings.Builder + logValues := []interface{}{} + + if len(l.Failed) > 0 { + logBuilder.WriteString("failed: %d, ") + logValues = append(logValues, len(l.Failed)) + } + if len(l.Negative) > 0 { + logBuilder.WriteString("negative: %d, ") + logValues = append(logValues, len(l.Negative)) + } + if len(l.Shorter) > 0 { + logBuilder.WriteString("shorter: %d, ") + logValues = append(logValues, len(l.Shorter)) + } + if len(l.Duplicated) > 0 { + logBuilder.WriteString("same: %d, ") + logValues = append(logValues, len(l.Duplicated)) + } + if l.Updated > 0 { + logBuilder.WriteString("updated: %d, ") + logValues = append(logValues, l.Updated) + } + if l.New > 0 { + logBuilder.WriteString("new: %d, ") + logValues = append(logValues, l.New) + } + if len(l.NotFound) > 0 { + logBuilder.WriteString("not found: %d, ") + logValues = append(logValues, len(l.NotFound)) + } + + if logBuilder.Len() > 0 { + logMessage := logBuilder.String() + logMessage = logMessage[:len(logMessage)-2] + log.Info(ctx, logMessage, logValues...) + } +} + +type SessionEndType int + +const ( + FailedSessionEnd SessionEndType = iota + 1 + DuplicatedSessionEnd + NegativeDuration + ShorterDuration + NewSessionEnd + NoSessionInDB +) diff --git a/ee/backend/internal/sessionender/ender.go b/ee/backend/internal/sessionender/ender.go new file mode 100644 index 000000000..41ea44e8d --- /dev/null +++ b/ee/backend/internal/sessionender/ender.go @@ -0,0 +1,153 @@ +package sessionender + +import ( + "time" + + "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/metrics/ender" +) + +// EndedSessionHandler handler for ended sessions +type EndedSessionHandler func(map[uint64]uint64) map[uint64]bool + +// session holds information about user's session live status +type session struct { + lastTimestamp int64 // timestamp from message broker + lastUpdate int64 // local timestamp + lastUserTime uint64 + isEnded bool + isMobile bool +} + +// SessionEnder updates timestamp of last message for each session +type SessionEnder struct { + metrics ender.Ender + timeout int64 + sessions map[uint64]*session // map[sessionID]session + timeCtrl *timeController + parts uint64 + enabled bool +} + +func New(metrics ender.Ender, timeout int64, parts int) (*SessionEnder, error) { + return &SessionEnder{ + metrics: metrics, + timeout: timeout, + sessions: make(map[uint64]*session), + timeCtrl: NewTimeController(parts), + parts: uint64(parts), // ender uses all partitions by default + enabled: true, + }, nil +} + +func (se *SessionEnder) Enable() { + se.enabled = true +} + +func (se *SessionEnder) Disable() { + se.enabled = false +} + +func (se *SessionEnder) ActivePartitions(parts []uint64) { + activeParts := make(map[uint64]bool, 0) + for _, p := range parts { + activeParts[p] = true + } + removedSessions := 0 + activeSessions := 0 + for sessID, _ := range se.sessions { + if !activeParts[sessID%se.parts] { + delete(se.sessions, sessID) + se.metrics.DecreaseActiveSessions() + removedSessions++ + } else { + activeSessions++ + } + } +} + +// UpdateSession save timestamp for new sessions and update for existing sessions +func (se *SessionEnder) UpdateSession(msg messages.Message) { + var ( + sessionID = msg.Meta().SessionID() + batchTimestamp = msg.Meta().Batch().Timestamp() + msgTimestamp = msg.Meta().Timestamp + localTimestamp = time.Now().UnixMilli() + ) + if messages.IsMobileType(msg.TypeID()) { + msgTimestamp = messages.GetTimestamp(msg) + } + if batchTimestamp == 0 { + return + } + se.timeCtrl.UpdateTime(sessionID, batchTimestamp, localTimestamp) + sess, ok := se.sessions[sessionID] + if !ok { + // Register new session + se.sessions[sessionID] = &session{ + lastTimestamp: batchTimestamp, + lastUpdate: localTimestamp, + lastUserTime: msgTimestamp, // last timestamp from user's machine + isEnded: false, + isMobile: messages.IsMobileType(msg.TypeID()), + } + se.metrics.IncreaseActiveSessions() + se.metrics.IncreaseTotalSessions() + return + } + // Keep the highest user's timestamp for correct session duration value + if msgTimestamp > sess.lastUserTime { + sess.lastUserTime = msgTimestamp + } + // Keep information about the latest message for generating sessionEnd trigger + if batchTimestamp > sess.lastTimestamp { + sess.lastTimestamp = batchTimestamp + sess.lastUpdate = localTimestamp + sess.isEnded = false + } +} + +// HandleEndedSessions runs handler for each ended session and delete information about session in successful case +func (se *SessionEnder) HandleEndedSessions(handler EndedSessionHandler) { + if !se.enabled { + return + } + currTime := time.Now().UnixMilli() + + isSessionEnded := func(sessID uint64, sess *session) (bool, int) { + // Has been finished already + if sess.isEnded { + return true, 1 + } + batchTimeDiff := se.timeCtrl.LastBatchTimestamp(sessID) - sess.lastTimestamp + + // Has been finished according to batch timestamp and hasn't been updated for a long time + if (batchTimeDiff >= se.timeout) && (currTime-sess.lastUpdate >= se.timeout) { + return true, 2 + } + + // Hasn't been finished according to batch timestamp but hasn't been read from partition for a long time + if (batchTimeDiff < se.timeout) && (currTime-se.timeCtrl.LastUpdateTimestamp(sessID) >= se.timeout) { + return true, 3 + } + return false, 0 + } + + // Find ended sessions + endedCandidates := make(map[uint64]uint64, len(se.sessions)/2) // [sessionID]lastUserTime + for sessID, sess := range se.sessions { + if ended, _ := isSessionEnded(sessID, sess); ended { + sess.isEnded = true + endedCandidates[sessID] = sess.lastUserTime + } + } + + // Process ended sessions + for sessID, completed := range handler(endedCandidates) { + if completed { + delete(se.sessions, sessID) + se.metrics.DecreaseActiveSessions() + se.metrics.IncreaseClosedSessions() + } + } +} diff --git a/ee/backend/pkg/sessions/sessions.go b/ee/backend/pkg/sessions/sessions.go new file mode 100644 index 000000000..d44d7768c --- /dev/null +++ b/ee/backend/pkg/sessions/sessions.go @@ -0,0 +1,287 @@ +package sessions + +import ( + "context" + "fmt" + "openreplay/backend/pkg/metrics/database" + + "openreplay/backend/pkg/db/postgres/pool" + "openreplay/backend/pkg/db/redis" + "openreplay/backend/pkg/logger" + "openreplay/backend/pkg/projects" + "openreplay/backend/pkg/url" +) + +type Sessions interface { + Add(session *Session) error + AddCached(sessionID uint64, data map[string]string) error + Get(sessionID uint64) (*Session, error) + GetUpdated(sessionID uint64, keepInCache bool) (*Session, error) + GetCached(sessionID uint64) (map[string]string, error) + GetDuration(sessionID uint64) (uint64, error) + GetManySessions(sessionIDs []uint64) (map[uint64]*Session, error) + UpdateDuration(sessionID uint64, timestamp uint64) (uint64, error) + UpdateEncryptionKey(sessionID uint64, key []byte) error + UpdateUserID(sessionID uint64, userID string) error + UpdateAnonymousID(sessionID uint64, userAnonymousID string) error + UpdateReferrer(sessionID uint64, referrer string) error + UpdateUTM(sessionID uint64, url string) error + UpdateMetadata(sessionID uint64, key, value string) error + UpdateEventsStats(sessionID uint64, events, pages int) error + UpdateIssuesStats(sessionID uint64, errors, issueScore int) error + Commit() +} + +type sessionsImpl struct { + log logger.Logger + cache Cache + storage Storage + updates Updates + projects projects.Projects +} + +func New(log logger.Logger, db pool.Pool, proj projects.Projects, redis *redis.Client, metrics database.Database) Sessions { + return &sessionsImpl{ + log: log, + cache: NewInMemoryCache(log, NewCache(redis, metrics)), + storage: NewStorage(db), + updates: NewSessionUpdates(log, db, metrics), + projects: proj, + } +} + +// Add usage: /start endpoint in http service +func (s *sessionsImpl) Add(session *Session) error { + ctx := context.WithValue(context.Background(), "sessionID", session.SessionID) + if cachedSession, err := s.cache.Get(session.SessionID); err == nil { + s.log.Info(ctx, "[!] Session already exists in cache, new: %+v, cached: %+v", session, cachedSession) + } + err := s.storage.Add(session) + if err != nil { + return err + } + proj, err := s.projects.GetProject(session.ProjectID) + if err != nil { + return err + } + session.SaveRequestPayload = proj.SaveRequestPayloads + if err := s.cache.Set(session); err != nil { + s.log.Warn(ctx, "failed to cache session: %s", err) + } + return nil +} + +func (s *sessionsImpl) getFromDB(sessionID uint64) (*Session, error) { + session, err := s.storage.Get(sessionID) + if err != nil { + return nil, fmt.Errorf("failed to get session from postgres: %s", err) + } + proj, err := s.projects.GetProject(session.ProjectID) + if err != nil { + return nil, fmt.Errorf("failed to get active project: %d, err: %s", session.ProjectID, err) + } + session.SaveRequestPayload = proj.SaveRequestPayloads + return session, nil +} + +// Get usage: db message processor + connectors in feature +func (s *sessionsImpl) Get(sessionID uint64) (*Session, error) { + if sess, err := s.cache.Get(sessionID); err == nil { + return sess, nil + } + + // Get from postgres and update in-memory and redis caches + session, err := s.getFromDB(sessionID) + if err != nil { + return nil, err + } + s.cache.Set(session) + return session, nil +} + +// Special method for clickhouse connector +func (s *sessionsImpl) GetUpdated(sessionID uint64, keepInCache bool) (*Session, error) { + session, err := s.getFromDB(sessionID) + if err != nil { + return nil, err + } + if !keepInCache { + return session, nil + } + if err := s.cache.Set(session); err != nil { + ctx := context.WithValue(context.Background(), "sessionID", sessionID) + s.log.Warn(ctx, "failed to cache session: %s", err) + } + return session, nil +} + +func (s *sessionsImpl) AddCached(sessionID uint64, data map[string]string) error { + return s.cache.SetCache(sessionID, data) +} + +func (s *sessionsImpl) GetCached(sessionID uint64) (map[string]string, error) { + return s.cache.GetCache(sessionID) +} + +// GetDuration usage: in ender to check current and new duration to avoid duplicates +func (s *sessionsImpl) GetDuration(sessionID uint64) (uint64, error) { + if sess, err := s.cache.Get(sessionID); err == nil { + if sess.Duration != nil { + return *sess.Duration, nil + } + return 0, nil + } + session, err := s.getFromDB(sessionID) + if err != nil { + return 0, err + } + if err := s.cache.Set(session); err != nil { + ctx := context.WithValue(context.Background(), "sessionID", sessionID) + s.log.Warn(ctx, "failed to cache session: %s", err) + } + if session.Duration != nil { + return *session.Duration, nil + } + return 0, nil +} + +// GetManySessions is useful for the ender service only (grab session's startTs and duration) +func (s *sessionsImpl) GetManySessions(sessionIDs []uint64) (map[uint64]*Session, error) { + res := make(map[uint64]*Session, len(sessionIDs)) + toRequest := make([]uint64, 0, len(sessionIDs)) + // Grab sessions from the cache + for _, sessionID := range sessionIDs { + if sess, err := s.cache.Get(sessionID); err == nil { + res[sessionID] = sess + } else { + toRequest = append(toRequest, sessionID) + } + } + if len(toRequest) == 0 { + return res, nil + } + // Grab the rest from the database + sessionFromDB, err := s.storage.GetMany(toRequest) + if err != nil { + return nil, err + } + for _, sess := range sessionFromDB { + res[sess.SessionID] = sess + } + return res, nil +} + +// UpdateDuration usage: in ender to update session duration +func (s *sessionsImpl) UpdateDuration(sessionID uint64, timestamp uint64) (uint64, error) { + newDuration, err := s.storage.UpdateDuration(sessionID, timestamp) + if err != nil { + return 0, err + } + // Update session info in cache for future usage (for example in connectors) + session, err := s.getFromDB(sessionID) + if err != nil { + return 0, err + } + + session.Duration = &newDuration + if err := s.cache.Set(session); err != nil { + ctx := context.WithValue(context.Background(), "sessionID", sessionID) + s.log.Warn(ctx, "failed to cache session: %s", err) + } + return newDuration, nil +} + +// UpdateEncryptionKey usage: in ender to update session encryption key if encryption is enabled +func (s *sessionsImpl) UpdateEncryptionKey(sessionID uint64, key []byte) error { + ctx := context.WithValue(context.Background(), "sessionID", sessionID) + if err := s.storage.InsertEncryptionKey(sessionID, key); err != nil { + return err + } + if session, err := s.cache.Get(sessionID); err != nil { + session.EncryptionKey = string(key) + if err := s.cache.Set(session); err != nil { + s.log.Warn(ctx, "failed to cache session: %s", err) + } + return nil + } + session, err := s.getFromDB(sessionID) + if err != nil { + s.log.Error(ctx, "failed to get session from postgres: %s", err) + return nil + } + if err := s.cache.Set(session); err != nil { + s.log.Warn(ctx, "failed to cache session: %s", err) + } + return nil +} + +// UpdateUserID usage: in db handler +func (s *sessionsImpl) UpdateUserID(sessionID uint64, userID string) error { + s.updates.AddUserID(sessionID, userID) + return nil +} + +// UpdateAnonymousID usage: in db handler +func (s *sessionsImpl) UpdateAnonymousID(sessionID uint64, userAnonymousID string) error { + s.updates.AddUserID(sessionID, userAnonymousID) + return nil +} + +// UpdateReferrer usage: in db handler on each page event +func (s *sessionsImpl) UpdateReferrer(sessionID uint64, referrer string) error { + if referrer == "" { + return nil + } + baseReferrer := url.DiscardURLQuery(referrer) + s.updates.SetReferrer(sessionID, referrer, baseReferrer) + return nil +} + +func (s *sessionsImpl) UpdateUTM(sessionID uint64, pageUrl string) error { + params, err := url.GetURLQueryParams(pageUrl) + if err != nil { + return err + } + utmSource := params["utm_source"] + utmMedium := params["utm_medium"] + utmCampaign := params["utm_campaign"] + if utmSource == "" && utmMedium == "" && utmCampaign == "" { + return nil + } + s.updates.SetUTM(sessionID, utmSource, utmMedium, utmCampaign) + return nil +} + +// UpdateMetadata usage: in db handler on each metadata event +func (s *sessionsImpl) UpdateMetadata(sessionID uint64, key, value string) error { + session, err := s.Get(sessionID) + if err != nil { + return err + } + project, err := s.projects.GetProject(session.ProjectID) + if err != nil { + return err + } + + keyNo := project.GetMetadataNo(key) + if keyNo == 0 { + return nil + } + + s.updates.SetMetadata(sessionID, keyNo, value) + return nil +} + +func (s *sessionsImpl) UpdateEventsStats(sessionID uint64, events, pages int) error { + s.updates.AddEvents(sessionID, events, pages) + return nil +} + +func (s *sessionsImpl) UpdateIssuesStats(sessionID uint64, errors, issueScore int) error { + s.updates.AddIssues(sessionID, errors, issueScore) + return nil +} + +func (s *sessionsImpl) Commit() { + s.updates.Commit() +} diff --git a/ee/backend/pkg/sessions/storage.go b/ee/backend/pkg/sessions/storage.go new file mode 100644 index 000000000..41602c42a --- /dev/null +++ b/ee/backend/pkg/sessions/storage.go @@ -0,0 +1,200 @@ +package sessions + +import ( + "fmt" + + "github.com/jackc/pgtype" + "github.com/lib/pq" + + "openreplay/backend/pkg/db/postgres/pool" +) + +type Storage interface { + Add(sess *Session) error + Get(sessionID uint64) (*Session, error) + GetMany(sessionIDs []uint64) ([]*Session, error) + GetDuration(sessionID uint64) (uint64, error) + UpdateDuration(sessionID uint64, timestamp uint64) (uint64, error) + InsertEncryptionKey(sessionID uint64, key []byte) error + InsertUserID(sessionID uint64, userID string) error + InsertUserAnonymousID(sessionID uint64, userAnonymousID string) error + InsertReferrer(sessionID uint64, referrer, baseReferrer string) error + InsertMetadata(sessionID uint64, keyNo uint, value string) error +} + +type storageImpl struct { + db pool.Pool +} + +func NewStorage(db pool.Pool) Storage { + return &storageImpl{ + db: db, + } +} + +func (s *storageImpl) Add(sess *Session) error { + return s.db.Exec(` + INSERT INTO sessions ( + session_id, project_id, start_ts, + user_uuid, user_device, user_device_type, user_country, + user_os, user_os_version, + rev_id, + tracker_version, issue_score, + platform, + user_browser, user_browser_version, user_device_memory_size, user_device_heap_size, + user_id, user_state, user_city, timezone, screen_width, screen_height + ) VALUES ( + $1, $2, $3, + $4, $5, $6, $7, + $8, NULLIF($9, ''), + NULLIF($10, ''), + $11, $12, + $13, + NULLIF($14, ''), NULLIF($15, ''), NULLIF($16, 0), NULLIF($17, 0::bigint), + NULLIF(LEFT($18, 8000), ''), NULLIF($19, ''), NULLIF($20, ''), $21, $22, $23 + )`, + sess.SessionID, sess.ProjectID, sess.Timestamp, + sess.UserUUID, sess.UserDevice, sess.UserDeviceType, sess.UserCountry, + sess.UserOS, sess.UserOSVersion, + sess.RevID, + sess.TrackerVersion, sess.Timestamp/1000, + sess.Platform, + sess.UserBrowser, sess.UserBrowserVersion, sess.UserDeviceMemorySize, sess.UserDeviceHeapSize, + sess.UserID, sess.UserState, sess.UserCity, sess.Timezone, sess.ScreenWidth, sess.ScreenHeight, + ) +} + +func (s *storageImpl) Get(sessionID uint64) (*Session, error) { + sess := &Session{SessionID: sessionID} + var revID, userOSVersion, userBrowser, userBrowserVersion, userState, userCity *string + var issueTypes pgtype.EnumArray + if err := s.db.QueryRow(` + SELECT platform, + duration, project_id, start_ts, timezone, + user_uuid, user_os, user_os_version, + user_device, user_device_type, user_country, user_state, user_city, + rev_id, tracker_version, + user_id, user_anonymous_id, referrer, + pages_count, events_count, errors_count, issue_types, + user_browser, user_browser_version, issue_score, + metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, + metadata_6, metadata_7, metadata_8, metadata_9, metadata_10, + utm_source, utm_medium, utm_campaign + FROM sessions + WHERE session_id=$1 + `, + sessionID, + ).Scan(&sess.Platform, + &sess.Duration, &sess.ProjectID, &sess.Timestamp, &sess.Timezone, + &sess.UserUUID, &sess.UserOS, &userOSVersion, + &sess.UserDevice, &sess.UserDeviceType, &sess.UserCountry, &userState, &userCity, + &revID, &sess.TrackerVersion, + &sess.UserID, &sess.UserAnonymousID, &sess.Referrer, + &sess.PagesCount, &sess.EventsCount, &sess.ErrorsCount, &issueTypes, + &userBrowser, &userBrowserVersion, &sess.IssueScore, + &sess.Metadata1, &sess.Metadata2, &sess.Metadata3, &sess.Metadata4, &sess.Metadata5, + &sess.Metadata6, &sess.Metadata7, &sess.Metadata8, &sess.Metadata9, &sess.Metadata10, + &sess.UtmSource, &sess.UtmMedium, &sess.UtmCampaign); err != nil { + return nil, err + } + if userOSVersion != nil { + sess.UserOSVersion = *userOSVersion + } + if userBrowser != nil { + sess.UserBrowser = *userBrowser + } + if userBrowserVersion != nil { + sess.UserBrowserVersion = *userBrowserVersion + } + if revID != nil { + sess.RevID = *revID + } + issueTypes.AssignTo(&sess.IssueTypes) + if userState != nil { + sess.UserState = *userState + } + if userCity != nil { + sess.UserCity = *userCity + } + return sess, nil +} + +// For the ender service only +func (s *storageImpl) GetMany(sessionIDs []uint64) ([]*Session, error) { + rows, err := s.db.Query("SELECT session_id, COALESCE( duration, 0 ), start_ts FROM sessions WHERE session_id = ANY($1)", pq.Array(sessionIDs)) + if err != nil { + return nil, err + } + defer rows.Close() + sessions := make([]*Session, 0, len(sessionIDs)) + for rows.Next() { + sess := &Session{} + if err := rows.Scan(&sess.SessionID, &sess.Duration, &sess.Timestamp); err != nil { + return nil, err + } + sessions = append(sessions, sess) + } + return sessions, nil +} + +func (s *storageImpl) GetDuration(sessionID uint64) (uint64, error) { + var dur uint64 + if err := s.db.QueryRow("SELECT COALESCE( duration, 0 ) FROM sessions WHERE session_id=$1", sessionID).Scan(&dur); err != nil { + return 0, err + } + return dur, nil +} + +func (s *storageImpl) UpdateDuration(sessionID uint64, timestamp uint64) (uint64, error) { + var dur uint64 + if err := s.db.QueryRow(` + UPDATE sessions SET duration=$2 - start_ts + WHERE session_id=$1 + RETURNING duration + `, + sessionID, timestamp, + ).Scan(&dur); err != nil { + return 0, err + } + return dur, nil +} + +func (s *storageImpl) InsertEncryptionKey(sessionID uint64, key []byte) error { + sqlRequest := ` + UPDATE sessions + SET file_key = $2 + WHERE session_id = $1` + return s.db.Exec(sqlRequest, sessionID, string(key)) +} + +func (s *storageImpl) InsertUserID(sessionID uint64, userID string) error { + sqlRequest := ` + UPDATE sessions + SET user_id = LEFT($1, 8000) + WHERE session_id = $2` + return s.db.Exec(sqlRequest, userID, sessionID) +} + +func (s *storageImpl) InsertUserAnonymousID(sessionID uint64, userAnonymousID string) error { + sqlRequest := ` + UPDATE sessions + SET user_anonymous_id = LEFT($1, 8000) + WHERE session_id = $2` + return s.db.Exec(sqlRequest, userAnonymousID, sessionID) +} + +func (s *storageImpl) InsertReferrer(sessionID uint64, referrer, baseReferrer string) error { + sqlRequest := ` + UPDATE sessions + SET referrer = LEFT($1, 8000), base_referrer = LEFT($2, 8000) + WHERE session_id = $3 AND referrer IS NULL` + return s.db.Exec(sqlRequest, referrer, baseReferrer, sessionID) +} + +func (s *storageImpl) InsertMetadata(sessionID uint64, keyNo uint, value string) error { + sqlRequest := ` + UPDATE sessions + SET metadata_%v = LEFT($1, 8000) + WHERE session_id = $2` + return s.db.Exec(fmt.Sprintf(sqlRequest, keyNo), value, sessionID) +}