feat(ender): grab all sessions per tick (#3163)

This commit is contained in:
Alexander 2025-03-14 17:16:56 +01:00 committed by GitHub
parent e92bfe3cfe
commit 23820b7ea5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 947 additions and 1 deletions

View file

@ -3,13 +3,13 @@ package projects
import (
"context"
"errors"
"openreplay/backend/pkg/metrics/database"
"time"
"openreplay/backend/pkg/cache"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/db/redis"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/metrics/database"
)
type Projects interface {

View file

@ -0,0 +1,306 @@
package main
import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"time"
"openreplay/backend/internal/config/ender"
"openreplay/backend/internal/sessionender"
"openreplay/backend/internal/storage"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/db/redis"
"openreplay/backend/pkg/intervals"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/memory"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/metrics"
"openreplay/backend/pkg/metrics/database"
enderMetrics "openreplay/backend/pkg/metrics/ender"
"openreplay/backend/pkg/projects"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/sessions"
)
func main() {
ctx := context.Background()
log := logger.New()
cfg := ender.New(log)
// Observability
dbMetric := database.New("ender")
enderMetric := enderMetrics.New("ender")
metrics.New(log, append(enderMetric.List(), dbMetric.List()...))
pgConn, err := pool.New(dbMetric, cfg.Postgres.String())
if err != nil {
log.Fatal(ctx, "can't init postgres connection: %s", err)
}
defer pgConn.Close()
redisClient, err := redis.New(&cfg.Redis)
if err != nil {
log.Warn(ctx, "can't init redis connection: %s", err)
}
defer redisClient.Close()
projManager := projects.New(log, pgConn, redisClient, dbMetric)
sessManager := sessions.New(log, pgConn, projManager, redisClient, dbMetric)
sessionEndGenerator, err := sessionender.New(enderMetric, intervals.EVENTS_SESSION_END_TIMEOUT, cfg.PartitionsNumber)
if err != nil {
log.Fatal(ctx, "can't init ender service: %s", err)
}
mobileMessages := []int{90, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 107, 110, 111}
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
consumer := queue.NewConsumer(
cfg.GroupEnder,
[]string{
cfg.TopicRawWeb,
cfg.TopicRawMobile,
},
messages.NewEnderMessageIterator(
log,
func(msg messages.Message) { sessionEndGenerator.UpdateSession(msg) },
append([]int{messages.MsgTimestamp}, mobileMessages...),
false),
false,
cfg.MessageSizeLimit,
)
memoryManager, err := memory.NewManager(log, cfg.MemoryLimitMB, cfg.MaxMemoryUsage)
if err != nil {
log.Fatal(ctx, "can't init memory manager: %s", err)
}
log.Info(ctx, "Ender service started")
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
tick := time.Tick(intervals.EVENTS_COMMIT_INTERVAL * time.Millisecond)
for {
select {
case sig := <-sigchan:
log.Info(ctx, "Caught signal %v: terminating", sig)
producer.Close(cfg.ProducerTimeout)
if err := consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP); err != nil {
log.Error(ctx, "can't commit messages with offset: %s", err)
}
consumer.Close()
os.Exit(0)
case <-tick:
details := newDetails()
// Find ended sessions and send notification to other services
sessionEndGenerator.HandleEndedSessions(func(sessions map[uint64]uint64) map[uint64]bool {
// Load all sessions from DB
sessionsList := make([]uint64, 0, len(sessions))
for sessionID := range sessions {
sessionsList = append(sessionsList, sessionID)
}
completedSessions := make(map[uint64]bool)
sessionsData, err := sessManager.GetManySessions(sessionsList)
if err != nil {
log.Error(ctx, "can't get sessions from database: %s", err)
return completedSessions
}
// Check if each session was ended
for sessionID, sess := range sessionsData {
sessCtx := context.WithValue(context.Background(), "sessionID", fmt.Sprintf("%d", sessionID))
timestamp := sessions[sessionID]
currDuration := *sess.Duration
newDur := timestamp - sess.Timestamp
// Skip if session was ended before with same duration
if currDuration == newDur {
details.Duplicated[sessionID] = currDuration
completedSessions[sessionID] = true
continue
}
if currDuration > newDur {
details.Shorter[sessionID] = int64(currDuration) - int64(newDur)
completedSessions[sessionID] = true
continue
}
newDuration, err := sessManager.UpdateDuration(sessionID, timestamp)
if err != nil {
if strings.Contains(err.Error(), "integer out of range") {
// Skip session with broken duration
details.Failed[sessionID] = timestamp
completedSessions[sessionID] = true
continue
}
if strings.Contains(err.Error(), "is less than zero for uint64") {
details.Negative[sessionID] = timestamp
completedSessions[sessionID] = true
continue
}
if strings.Contains(err.Error(), "no rows in result set") {
details.NotFound[sessionID] = timestamp
completedSessions[sessionID] = true
continue
}
log.Error(sessCtx, "can't update session duration, err: %s", err)
continue
}
// Check one more time just in case
if currDuration == newDuration {
details.Duplicated[sessionID] = currDuration
completedSessions[sessionID] = true
continue
}
msg := &messages.SessionEnd{Timestamp: timestamp}
if cfg.UseEncryption {
if key := storage.GenerateEncryptionKey(); key != nil {
if err := sessManager.UpdateEncryptionKey(sessionID, key); err != nil {
log.Warn(sessCtx, "can't save session encryption key: %s, session will not be encrypted", err)
} else {
msg.EncryptionKey = string(key)
}
}
}
if sess != nil && (sess.Platform == "ios" || sess.Platform == "android") {
msg := &messages.MobileSessionEnd{Timestamp: timestamp}
if err := producer.Produce(cfg.TopicRawMobile, sessionID, msg.Encode()); err != nil {
log.Error(sessCtx, "can't send MobileSessionEnd to mobile topic: %s", err)
continue
}
if err := producer.Produce(cfg.TopicRawImages, sessionID, msg.Encode()); err != nil {
log.Error(sessCtx, "can't send MobileSessionEnd signal to canvas topic: %s", err)
}
} else {
if err := producer.Produce(cfg.TopicRawWeb, sessionID, msg.Encode()); err != nil {
log.Error(sessCtx, "can't send sessionEnd to raw topic: %s", err)
continue
}
if err := producer.Produce(cfg.TopicCanvasImages, sessionID, msg.Encode()); err != nil {
log.Error(sessCtx, "can't send sessionEnd signal to canvas topic: %s", err)
}
}
if currDuration != 0 {
details.Diff[sessionID] = int64(newDuration) - int64(currDuration)
details.Updated++
} else {
details.New++
}
completedSessions[sessionID] = true
}
return completedSessions
})
details.Log(log, ctx)
producer.Flush(cfg.ProducerTimeout)
if err := consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP); err != nil {
log.Error(ctx, "can't commit messages with offset: %s", err)
}
case msg := <-consumer.Rebalanced():
if msg.Type == types.RebalanceTypeRevoke {
sessionEndGenerator.Disable()
} else {
sessionEndGenerator.ActivePartitions(msg.Partitions)
sessionEndGenerator.Enable()
}
default:
if !memoryManager.HasFreeMemory() {
continue
}
if err := consumer.ConsumeNext(); err != nil {
log.Fatal(ctx, "error on consuming: %s", err)
}
}
}
}
type logDetails struct {
Failed map[uint64]uint64
Duplicated map[uint64]uint64
Negative map[uint64]uint64
Shorter map[uint64]int64
NotFound map[uint64]uint64
Diff map[uint64]int64
Updated int
New int
}
func newDetails() *logDetails {
return &logDetails{
Failed: make(map[uint64]uint64),
Duplicated: make(map[uint64]uint64),
Negative: make(map[uint64]uint64),
Shorter: make(map[uint64]int64),
NotFound: make(map[uint64]uint64),
Diff: make(map[uint64]int64),
Updated: 0,
New: 0,
}
}
func (l *logDetails) Log(log logger.Logger, ctx context.Context) {
if n := len(l.Failed); n > 0 {
log.Debug(ctx, "sessions with wrong duration: %d, %v", n, l.Failed)
}
if n := len(l.Negative); n > 0 {
log.Debug(ctx, "sessions with negative duration: %d, %v", n, l.Negative)
}
if n := len(l.NotFound); n > 0 {
log.Debug(ctx, "sessions without info in DB: %d, %v", n, l.NotFound)
}
var logBuilder strings.Builder
logValues := []interface{}{}
if len(l.Failed) > 0 {
logBuilder.WriteString("failed: %d, ")
logValues = append(logValues, len(l.Failed))
}
if len(l.Negative) > 0 {
logBuilder.WriteString("negative: %d, ")
logValues = append(logValues, len(l.Negative))
}
if len(l.Shorter) > 0 {
logBuilder.WriteString("shorter: %d, ")
logValues = append(logValues, len(l.Shorter))
}
if len(l.Duplicated) > 0 {
logBuilder.WriteString("same: %d, ")
logValues = append(logValues, len(l.Duplicated))
}
if l.Updated > 0 {
logBuilder.WriteString("updated: %d, ")
logValues = append(logValues, l.Updated)
}
if l.New > 0 {
logBuilder.WriteString("new: %d, ")
logValues = append(logValues, l.New)
}
if len(l.NotFound) > 0 {
logBuilder.WriteString("not found: %d, ")
logValues = append(logValues, len(l.NotFound))
}
if logBuilder.Len() > 0 {
logMessage := logBuilder.String()
logMessage = logMessage[:len(logMessage)-2]
log.Info(ctx, logMessage, logValues...)
}
}
type SessionEndType int
const (
FailedSessionEnd SessionEndType = iota + 1
DuplicatedSessionEnd
NegativeDuration
ShorterDuration
NewSessionEnd
NoSessionInDB
)

View file

@ -0,0 +1,153 @@
package sessionender
import (
"time"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/metrics/ender"
)
// EndedSessionHandler handler for ended sessions
type EndedSessionHandler func(map[uint64]uint64) map[uint64]bool
// session holds information about user's session live status
type session struct {
lastTimestamp int64 // timestamp from message broker
lastUpdate int64 // local timestamp
lastUserTime uint64
isEnded bool
isMobile bool
}
// SessionEnder updates timestamp of last message for each session
type SessionEnder struct {
metrics ender.Ender
timeout int64
sessions map[uint64]*session // map[sessionID]session
timeCtrl *timeController
parts uint64
enabled bool
}
func New(metrics ender.Ender, timeout int64, parts int) (*SessionEnder, error) {
return &SessionEnder{
metrics: metrics,
timeout: timeout,
sessions: make(map[uint64]*session),
timeCtrl: NewTimeController(parts),
parts: uint64(parts), // ender uses all partitions by default
enabled: true,
}, nil
}
func (se *SessionEnder) Enable() {
se.enabled = true
}
func (se *SessionEnder) Disable() {
se.enabled = false
}
func (se *SessionEnder) ActivePartitions(parts []uint64) {
activeParts := make(map[uint64]bool, 0)
for _, p := range parts {
activeParts[p] = true
}
removedSessions := 0
activeSessions := 0
for sessID, _ := range se.sessions {
if !activeParts[sessID%se.parts] {
delete(se.sessions, sessID)
se.metrics.DecreaseActiveSessions()
removedSessions++
} else {
activeSessions++
}
}
}
// UpdateSession save timestamp for new sessions and update for existing sessions
func (se *SessionEnder) UpdateSession(msg messages.Message) {
var (
sessionID = msg.Meta().SessionID()
batchTimestamp = msg.Meta().Batch().Timestamp()
msgTimestamp = msg.Meta().Timestamp
localTimestamp = time.Now().UnixMilli()
)
if messages.IsMobileType(msg.TypeID()) {
msgTimestamp = messages.GetTimestamp(msg)
}
if batchTimestamp == 0 {
return
}
se.timeCtrl.UpdateTime(sessionID, batchTimestamp, localTimestamp)
sess, ok := se.sessions[sessionID]
if !ok {
// Register new session
se.sessions[sessionID] = &session{
lastTimestamp: batchTimestamp,
lastUpdate: localTimestamp,
lastUserTime: msgTimestamp, // last timestamp from user's machine
isEnded: false,
isMobile: messages.IsMobileType(msg.TypeID()),
}
se.metrics.IncreaseActiveSessions()
se.metrics.IncreaseTotalSessions()
return
}
// Keep the highest user's timestamp for correct session duration value
if msgTimestamp > sess.lastUserTime {
sess.lastUserTime = msgTimestamp
}
// Keep information about the latest message for generating sessionEnd trigger
if batchTimestamp > sess.lastTimestamp {
sess.lastTimestamp = batchTimestamp
sess.lastUpdate = localTimestamp
sess.isEnded = false
}
}
// HandleEndedSessions runs handler for each ended session and delete information about session in successful case
func (se *SessionEnder) HandleEndedSessions(handler EndedSessionHandler) {
if !se.enabled {
return
}
currTime := time.Now().UnixMilli()
isSessionEnded := func(sessID uint64, sess *session) (bool, int) {
// Has been finished already
if sess.isEnded {
return true, 1
}
batchTimeDiff := se.timeCtrl.LastBatchTimestamp(sessID) - sess.lastTimestamp
// Has been finished according to batch timestamp and hasn't been updated for a long time
if (batchTimeDiff >= se.timeout) && (currTime-sess.lastUpdate >= se.timeout) {
return true, 2
}
// Hasn't been finished according to batch timestamp but hasn't been read from partition for a long time
if (batchTimeDiff < se.timeout) && (currTime-se.timeCtrl.LastUpdateTimestamp(sessID) >= se.timeout) {
return true, 3
}
return false, 0
}
// Find ended sessions
endedCandidates := make(map[uint64]uint64, len(se.sessions)/2) // [sessionID]lastUserTime
for sessID, sess := range se.sessions {
if ended, _ := isSessionEnded(sessID, sess); ended {
sess.isEnded = true
endedCandidates[sessID] = sess.lastUserTime
}
}
// Process ended sessions
for sessID, completed := range handler(endedCandidates) {
if completed {
delete(se.sessions, sessID)
se.metrics.DecreaseActiveSessions()
se.metrics.IncreaseClosedSessions()
}
}
}

View file

@ -0,0 +1,287 @@
package sessions
import (
"context"
"fmt"
"openreplay/backend/pkg/metrics/database"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/db/redis"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/projects"
"openreplay/backend/pkg/url"
)
type Sessions interface {
Add(session *Session) error
AddCached(sessionID uint64, data map[string]string) error
Get(sessionID uint64) (*Session, error)
GetUpdated(sessionID uint64, keepInCache bool) (*Session, error)
GetCached(sessionID uint64) (map[string]string, error)
GetDuration(sessionID uint64) (uint64, error)
GetManySessions(sessionIDs []uint64) (map[uint64]*Session, error)
UpdateDuration(sessionID uint64, timestamp uint64) (uint64, error)
UpdateEncryptionKey(sessionID uint64, key []byte) error
UpdateUserID(sessionID uint64, userID string) error
UpdateAnonymousID(sessionID uint64, userAnonymousID string) error
UpdateReferrer(sessionID uint64, referrer string) error
UpdateUTM(sessionID uint64, url string) error
UpdateMetadata(sessionID uint64, key, value string) error
UpdateEventsStats(sessionID uint64, events, pages int) error
UpdateIssuesStats(sessionID uint64, errors, issueScore int) error
Commit()
}
type sessionsImpl struct {
log logger.Logger
cache Cache
storage Storage
updates Updates
projects projects.Projects
}
func New(log logger.Logger, db pool.Pool, proj projects.Projects, redis *redis.Client, metrics database.Database) Sessions {
return &sessionsImpl{
log: log,
cache: NewInMemoryCache(log, NewCache(redis, metrics)),
storage: NewStorage(db),
updates: NewSessionUpdates(log, db, metrics),
projects: proj,
}
}
// Add usage: /start endpoint in http service
func (s *sessionsImpl) Add(session *Session) error {
ctx := context.WithValue(context.Background(), "sessionID", session.SessionID)
if cachedSession, err := s.cache.Get(session.SessionID); err == nil {
s.log.Info(ctx, "[!] Session already exists in cache, new: %+v, cached: %+v", session, cachedSession)
}
err := s.storage.Add(session)
if err != nil {
return err
}
proj, err := s.projects.GetProject(session.ProjectID)
if err != nil {
return err
}
session.SaveRequestPayload = proj.SaveRequestPayloads
if err := s.cache.Set(session); err != nil {
s.log.Warn(ctx, "failed to cache session: %s", err)
}
return nil
}
func (s *sessionsImpl) getFromDB(sessionID uint64) (*Session, error) {
session, err := s.storage.Get(sessionID)
if err != nil {
return nil, fmt.Errorf("failed to get session from postgres: %s", err)
}
proj, err := s.projects.GetProject(session.ProjectID)
if err != nil {
return nil, fmt.Errorf("failed to get active project: %d, err: %s", session.ProjectID, err)
}
session.SaveRequestPayload = proj.SaveRequestPayloads
return session, nil
}
// Get usage: db message processor + connectors in feature
func (s *sessionsImpl) Get(sessionID uint64) (*Session, error) {
if sess, err := s.cache.Get(sessionID); err == nil {
return sess, nil
}
// Get from postgres and update in-memory and redis caches
session, err := s.getFromDB(sessionID)
if err != nil {
return nil, err
}
s.cache.Set(session)
return session, nil
}
// Special method for clickhouse connector
func (s *sessionsImpl) GetUpdated(sessionID uint64, keepInCache bool) (*Session, error) {
session, err := s.getFromDB(sessionID)
if err != nil {
return nil, err
}
if !keepInCache {
return session, nil
}
if err := s.cache.Set(session); err != nil {
ctx := context.WithValue(context.Background(), "sessionID", sessionID)
s.log.Warn(ctx, "failed to cache session: %s", err)
}
return session, nil
}
func (s *sessionsImpl) AddCached(sessionID uint64, data map[string]string) error {
return s.cache.SetCache(sessionID, data)
}
func (s *sessionsImpl) GetCached(sessionID uint64) (map[string]string, error) {
return s.cache.GetCache(sessionID)
}
// GetDuration usage: in ender to check current and new duration to avoid duplicates
func (s *sessionsImpl) GetDuration(sessionID uint64) (uint64, error) {
if sess, err := s.cache.Get(sessionID); err == nil {
if sess.Duration != nil {
return *sess.Duration, nil
}
return 0, nil
}
session, err := s.getFromDB(sessionID)
if err != nil {
return 0, err
}
if err := s.cache.Set(session); err != nil {
ctx := context.WithValue(context.Background(), "sessionID", sessionID)
s.log.Warn(ctx, "failed to cache session: %s", err)
}
if session.Duration != nil {
return *session.Duration, nil
}
return 0, nil
}
// GetManySessions is useful for the ender service only (grab session's startTs and duration)
func (s *sessionsImpl) GetManySessions(sessionIDs []uint64) (map[uint64]*Session, error) {
res := make(map[uint64]*Session, len(sessionIDs))
toRequest := make([]uint64, 0, len(sessionIDs))
// Grab sessions from the cache
for _, sessionID := range sessionIDs {
if sess, err := s.cache.Get(sessionID); err == nil {
res[sessionID] = sess
} else {
toRequest = append(toRequest, sessionID)
}
}
if len(toRequest) == 0 {
return res, nil
}
// Grab the rest from the database
sessionFromDB, err := s.storage.GetMany(toRequest)
if err != nil {
return nil, err
}
for _, sess := range sessionFromDB {
res[sess.SessionID] = sess
}
return res, nil
}
// UpdateDuration usage: in ender to update session duration
func (s *sessionsImpl) UpdateDuration(sessionID uint64, timestamp uint64) (uint64, error) {
newDuration, err := s.storage.UpdateDuration(sessionID, timestamp)
if err != nil {
return 0, err
}
// Update session info in cache for future usage (for example in connectors)
session, err := s.getFromDB(sessionID)
if err != nil {
return 0, err
}
session.Duration = &newDuration
if err := s.cache.Set(session); err != nil {
ctx := context.WithValue(context.Background(), "sessionID", sessionID)
s.log.Warn(ctx, "failed to cache session: %s", err)
}
return newDuration, nil
}
// UpdateEncryptionKey usage: in ender to update session encryption key if encryption is enabled
func (s *sessionsImpl) UpdateEncryptionKey(sessionID uint64, key []byte) error {
ctx := context.WithValue(context.Background(), "sessionID", sessionID)
if err := s.storage.InsertEncryptionKey(sessionID, key); err != nil {
return err
}
if session, err := s.cache.Get(sessionID); err != nil {
session.EncryptionKey = string(key)
if err := s.cache.Set(session); err != nil {
s.log.Warn(ctx, "failed to cache session: %s", err)
}
return nil
}
session, err := s.getFromDB(sessionID)
if err != nil {
s.log.Error(ctx, "failed to get session from postgres: %s", err)
return nil
}
if err := s.cache.Set(session); err != nil {
s.log.Warn(ctx, "failed to cache session: %s", err)
}
return nil
}
// UpdateUserID usage: in db handler
func (s *sessionsImpl) UpdateUserID(sessionID uint64, userID string) error {
s.updates.AddUserID(sessionID, userID)
return nil
}
// UpdateAnonymousID usage: in db handler
func (s *sessionsImpl) UpdateAnonymousID(sessionID uint64, userAnonymousID string) error {
s.updates.AddUserID(sessionID, userAnonymousID)
return nil
}
// UpdateReferrer usage: in db handler on each page event
func (s *sessionsImpl) UpdateReferrer(sessionID uint64, referrer string) error {
if referrer == "" {
return nil
}
baseReferrer := url.DiscardURLQuery(referrer)
s.updates.SetReferrer(sessionID, referrer, baseReferrer)
return nil
}
func (s *sessionsImpl) UpdateUTM(sessionID uint64, pageUrl string) error {
params, err := url.GetURLQueryParams(pageUrl)
if err != nil {
return err
}
utmSource := params["utm_source"]
utmMedium := params["utm_medium"]
utmCampaign := params["utm_campaign"]
if utmSource == "" && utmMedium == "" && utmCampaign == "" {
return nil
}
s.updates.SetUTM(sessionID, utmSource, utmMedium, utmCampaign)
return nil
}
// UpdateMetadata usage: in db handler on each metadata event
func (s *sessionsImpl) UpdateMetadata(sessionID uint64, key, value string) error {
session, err := s.Get(sessionID)
if err != nil {
return err
}
project, err := s.projects.GetProject(session.ProjectID)
if err != nil {
return err
}
keyNo := project.GetMetadataNo(key)
if keyNo == 0 {
return nil
}
s.updates.SetMetadata(sessionID, keyNo, value)
return nil
}
func (s *sessionsImpl) UpdateEventsStats(sessionID uint64, events, pages int) error {
s.updates.AddEvents(sessionID, events, pages)
return nil
}
func (s *sessionsImpl) UpdateIssuesStats(sessionID uint64, errors, issueScore int) error {
s.updates.AddIssues(sessionID, errors, issueScore)
return nil
}
func (s *sessionsImpl) Commit() {
s.updates.Commit()
}

View file

@ -0,0 +1,200 @@
package sessions
import (
"fmt"
"github.com/jackc/pgtype"
"github.com/lib/pq"
"openreplay/backend/pkg/db/postgres/pool"
)
type Storage interface {
Add(sess *Session) error
Get(sessionID uint64) (*Session, error)
GetMany(sessionIDs []uint64) ([]*Session, error)
GetDuration(sessionID uint64) (uint64, error)
UpdateDuration(sessionID uint64, timestamp uint64) (uint64, error)
InsertEncryptionKey(sessionID uint64, key []byte) error
InsertUserID(sessionID uint64, userID string) error
InsertUserAnonymousID(sessionID uint64, userAnonymousID string) error
InsertReferrer(sessionID uint64, referrer, baseReferrer string) error
InsertMetadata(sessionID uint64, keyNo uint, value string) error
}
type storageImpl struct {
db pool.Pool
}
func NewStorage(db pool.Pool) Storage {
return &storageImpl{
db: db,
}
}
func (s *storageImpl) Add(sess *Session) error {
return s.db.Exec(`
INSERT INTO sessions (
session_id, project_id, start_ts,
user_uuid, user_device, user_device_type, user_country,
user_os, user_os_version,
rev_id,
tracker_version, issue_score,
platform,
user_browser, user_browser_version, user_device_memory_size, user_device_heap_size,
user_id, user_state, user_city, timezone, screen_width, screen_height
) VALUES (
$1, $2, $3,
$4, $5, $6, $7,
$8, NULLIF($9, ''),
NULLIF($10, ''),
$11, $12,
$13,
NULLIF($14, ''), NULLIF($15, ''), NULLIF($16, 0), NULLIF($17, 0::bigint),
NULLIF(LEFT($18, 8000), ''), NULLIF($19, ''), NULLIF($20, ''), $21, $22, $23
)`,
sess.SessionID, sess.ProjectID, sess.Timestamp,
sess.UserUUID, sess.UserDevice, sess.UserDeviceType, sess.UserCountry,
sess.UserOS, sess.UserOSVersion,
sess.RevID,
sess.TrackerVersion, sess.Timestamp/1000,
sess.Platform,
sess.UserBrowser, sess.UserBrowserVersion, sess.UserDeviceMemorySize, sess.UserDeviceHeapSize,
sess.UserID, sess.UserState, sess.UserCity, sess.Timezone, sess.ScreenWidth, sess.ScreenHeight,
)
}
func (s *storageImpl) Get(sessionID uint64) (*Session, error) {
sess := &Session{SessionID: sessionID}
var revID, userOSVersion, userBrowser, userBrowserVersion, userState, userCity *string
var issueTypes pgtype.EnumArray
if err := s.db.QueryRow(`
SELECT platform,
duration, project_id, start_ts, timezone,
user_uuid, user_os, user_os_version,
user_device, user_device_type, user_country, user_state, user_city,
rev_id, tracker_version,
user_id, user_anonymous_id, referrer,
pages_count, events_count, errors_count, issue_types,
user_browser, user_browser_version, issue_score,
metadata_1, metadata_2, metadata_3, metadata_4, metadata_5,
metadata_6, metadata_7, metadata_8, metadata_9, metadata_10,
utm_source, utm_medium, utm_campaign
FROM sessions
WHERE session_id=$1
`,
sessionID,
).Scan(&sess.Platform,
&sess.Duration, &sess.ProjectID, &sess.Timestamp, &sess.Timezone,
&sess.UserUUID, &sess.UserOS, &userOSVersion,
&sess.UserDevice, &sess.UserDeviceType, &sess.UserCountry, &userState, &userCity,
&revID, &sess.TrackerVersion,
&sess.UserID, &sess.UserAnonymousID, &sess.Referrer,
&sess.PagesCount, &sess.EventsCount, &sess.ErrorsCount, &issueTypes,
&userBrowser, &userBrowserVersion, &sess.IssueScore,
&sess.Metadata1, &sess.Metadata2, &sess.Metadata3, &sess.Metadata4, &sess.Metadata5,
&sess.Metadata6, &sess.Metadata7, &sess.Metadata8, &sess.Metadata9, &sess.Metadata10,
&sess.UtmSource, &sess.UtmMedium, &sess.UtmCampaign); err != nil {
return nil, err
}
if userOSVersion != nil {
sess.UserOSVersion = *userOSVersion
}
if userBrowser != nil {
sess.UserBrowser = *userBrowser
}
if userBrowserVersion != nil {
sess.UserBrowserVersion = *userBrowserVersion
}
if revID != nil {
sess.RevID = *revID
}
issueTypes.AssignTo(&sess.IssueTypes)
if userState != nil {
sess.UserState = *userState
}
if userCity != nil {
sess.UserCity = *userCity
}
return sess, nil
}
// For the ender service only
func (s *storageImpl) GetMany(sessionIDs []uint64) ([]*Session, error) {
rows, err := s.db.Query("SELECT session_id, COALESCE( duration, 0 ), start_ts FROM sessions WHERE session_id = ANY($1)", pq.Array(sessionIDs))
if err != nil {
return nil, err
}
defer rows.Close()
sessions := make([]*Session, 0, len(sessionIDs))
for rows.Next() {
sess := &Session{}
if err := rows.Scan(&sess.SessionID, &sess.Duration, &sess.Timestamp); err != nil {
return nil, err
}
sessions = append(sessions, sess)
}
return sessions, nil
}
func (s *storageImpl) GetDuration(sessionID uint64) (uint64, error) {
var dur uint64
if err := s.db.QueryRow("SELECT COALESCE( duration, 0 ) FROM sessions WHERE session_id=$1", sessionID).Scan(&dur); err != nil {
return 0, err
}
return dur, nil
}
func (s *storageImpl) UpdateDuration(sessionID uint64, timestamp uint64) (uint64, error) {
var dur uint64
if err := s.db.QueryRow(`
UPDATE sessions SET duration=$2 - start_ts
WHERE session_id=$1
RETURNING duration
`,
sessionID, timestamp,
).Scan(&dur); err != nil {
return 0, err
}
return dur, nil
}
func (s *storageImpl) InsertEncryptionKey(sessionID uint64, key []byte) error {
sqlRequest := `
UPDATE sessions
SET file_key = $2
WHERE session_id = $1`
return s.db.Exec(sqlRequest, sessionID, string(key))
}
func (s *storageImpl) InsertUserID(sessionID uint64, userID string) error {
sqlRequest := `
UPDATE sessions
SET user_id = LEFT($1, 8000)
WHERE session_id = $2`
return s.db.Exec(sqlRequest, userID, sessionID)
}
func (s *storageImpl) InsertUserAnonymousID(sessionID uint64, userAnonymousID string) error {
sqlRequest := `
UPDATE sessions
SET user_anonymous_id = LEFT($1, 8000)
WHERE session_id = $2`
return s.db.Exec(sqlRequest, userAnonymousID, sessionID)
}
func (s *storageImpl) InsertReferrer(sessionID uint64, referrer, baseReferrer string) error {
sqlRequest := `
UPDATE sessions
SET referrer = LEFT($1, 8000), base_referrer = LEFT($2, 8000)
WHERE session_id = $3 AND referrer IS NULL`
return s.db.Exec(sqlRequest, referrer, baseReferrer, sessionID)
}
func (s *storageImpl) InsertMetadata(sessionID uint64, keyNo uint, value string) error {
sqlRequest := `
UPDATE sessions
SET metadata_%v = LEFT($1, 8000)
WHERE session_id = $2`
return s.db.Exec(fmt.Sprintf(sqlRequest, keyNo), value, sessionID)
}