From 623e241afbb18ed09d05f03035bbe290fba06d27 Mon Sep 17 00:00:00 2001 From: Alexander Date: Mon, 20 Jun 2022 09:26:05 +0200 Subject: [PATCH] feat(backend): moved recording sessionStart to db into http service and sessionEnd into ender service (#545) Co-authored-by: Alexander Zavorotynskiy --- backend/cmd/ender/main.go | 11 +++++- backend/internal/config/ender/config.go | 24 +++++++------ backend/internal/db/datasaver/messages.go | 4 +-- backend/internal/http/router/handlers-web.go | 10 ++++-- backend/pkg/db/cache/messages-common.go | 12 +++---- backend/pkg/db/cache/messages-ios.go | 2 +- backend/pkg/db/cache/messages-web.go | 32 +++++++++++++++-- backend/pkg/db/postgres/messages-common.go | 36 +++++++++++--------- 8 files changed, 90 insertions(+), 41 deletions(-) diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index 82f9b51d0..9751f26f3 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -4,6 +4,8 @@ import ( "log" "openreplay/backend/internal/config/ender" "openreplay/backend/internal/sessionender" + "openreplay/backend/pkg/db/cache" + "openreplay/backend/pkg/db/postgres" "openreplay/backend/pkg/monitoring" "time" @@ -30,6 +32,9 @@ func main() { // Load service configuration cfg := ender.New() + pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres), cfg.ProjectExpirationTimeoutMs) + defer pg.Close() + // Init all modules statsLogger := logger.NewQueueStats(cfg.LoggerTimeout) sessions, err := sessionender.New(metrics, intervals.EVENTS_SESSION_END_TIMEOUT, cfg.PartitionsNumber) @@ -70,8 +75,12 @@ func main() { // Find ended sessions and send notification to other services sessions.HandleEndedSessions(func(sessionID uint64, timestamp int64) bool { msg := &messages.SessionEnd{Timestamp: uint64(timestamp)} + if err := pg.InsertSessionEnd(sessionID, msg.Timestamp); err != nil { + log.Printf("can't save sessionEnd to database, sessID: %d", sessionID) + return false + } if err := producer.Produce(cfg.TopicRawWeb, sessionID, messages.Encode(msg)); err != nil { - log.Printf("can't send SessionEnd to trigger topic: %s; sessID: %d", err, sessionID) + log.Printf("can't send sessionEnd to topic: %s; sessID: %d", err, sessionID) return false } return true diff --git a/backend/internal/config/ender/config.go b/backend/internal/config/ender/config.go index 5898c69ec..7203eae33 100644 --- a/backend/internal/config/ender/config.go +++ b/backend/internal/config/ender/config.go @@ -5,19 +5,23 @@ import ( ) type Config struct { - GroupEnder string - LoggerTimeout int - TopicRawWeb string - ProducerTimeout int - PartitionsNumber int + Postgres string + ProjectExpirationTimeoutMs int64 + GroupEnder string + LoggerTimeout int + TopicRawWeb string + ProducerTimeout int + PartitionsNumber int } func New() *Config { return &Config{ - GroupEnder: env.String("GROUP_ENDER"), - LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"), - TopicRawWeb: env.String("TOPIC_RAW_WEB"), - ProducerTimeout: 2000, - PartitionsNumber: env.Int("PARTITIONS_NUMBER"), + Postgres: env.String("POSTGRES_STRING"), + ProjectExpirationTimeoutMs: 1000 * 60 * 20, + GroupEnder: env.String("GROUP_ENDER"), + LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"), + TopicRawWeb: env.String("TOPIC_RAW_WEB"), + ProducerTimeout: 2000, + PartitionsNumber: env.Int("PARTITIONS_NUMBER"), } } diff --git a/backend/internal/db/datasaver/messages.go b/backend/internal/db/datasaver/messages.go index 0f24d237e..112559fb1 100644 --- a/backend/internal/db/datasaver/messages.go +++ b/backend/internal/db/datasaver/messages.go @@ -19,9 +19,9 @@ func (mi *Saver) InsertMessage(sessionID uint64, msg Message) error { // Web case *SessionStart: - return mi.pg.InsertWebSessionStart(sessionID, m) + return mi.pg.HandleWebSessionStart(sessionID, m) case *SessionEnd: - return mi.pg.InsertWebSessionEnd(sessionID, m) + return mi.pg.HandleWebSessionEnd(sessionID, m) case *UserID: return mi.pg.InsertWebUserID(sessionID, m) case *UserAnonymousID: diff --git a/backend/internal/http/router/handlers-web.go b/backend/internal/http/router/handlers-web.go index 7514a78fc..69cbe4675 100644 --- a/backend/internal/http/router/handlers-web.go +++ b/backend/internal/http/router/handlers-web.go @@ -98,7 +98,7 @@ func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond) tokenData = &token.TokenData{ID: sessionID, ExpTime: expTime.UnixMilli()} - e.services.Producer.Produce(e.cfg.TopicRawWeb, tokenData.ID, Encode(&SessionStart{ + sessionStart := &SessionStart{ Timestamp: req.Timestamp, ProjectID: uint64(p.ProjectID), TrackerVersion: req.TrackerVersion, @@ -115,7 +115,13 @@ func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) UserDeviceMemorySize: req.DeviceMemory, UserDeviceHeapSize: req.JsHeapSizeLimit, UserID: req.UserID, - })) + } + + // Save sessionStart to db + e.services.Database.InsertWebSessionStart(sessionID, sessionStart) + + // Send sessionStart message to kafka + e.services.Producer.Produce(e.cfg.TopicRawWeb, tokenData.ID, Encode(sessionStart)) } ResponseWithJSON(w, &StartSessionResponse{ diff --git a/backend/pkg/db/cache/messages-common.go b/backend/pkg/db/cache/messages-common.go index 90b97efbf..eb4365a61 100644 --- a/backend/pkg/db/cache/messages-common.go +++ b/backend/pkg/db/cache/messages-common.go @@ -7,18 +7,16 @@ import ( // . "openreplay/backend/pkg/db/types" ) -func (c *PGCache) insertSessionEnd(sessionID uint64, timestamp uint64) error { - //duration, err := c.Conn.InsertSessionEnd(sessionID, timestamp) +func (c *PGCache) InsertSessionEnd(sessionID uint64, timestamp uint64) error { _, err := c.Conn.InsertSessionEnd(sessionID, timestamp) if err != nil { return err } + return nil +} + +func (c *PGCache) HandleSessionEnd(sessionID uint64) error { c.DeleteSession(sessionID) - // session, err := c.GetSession(sessionID) - // if err != nil { - // return err - // } - // session.Duration = &duration return nil } diff --git a/backend/pkg/db/cache/messages-ios.go b/backend/pkg/db/cache/messages-ios.go index 4bbc8c1f5..4195976c3 100644 --- a/backend/pkg/db/cache/messages-ios.go +++ b/backend/pkg/db/cache/messages-ios.go @@ -32,7 +32,7 @@ func (c *PGCache) InsertIOSSessionStart(sessionID uint64, s *IOSSessionStart) er } func (c *PGCache) InsertIOSSessionEnd(sessionID uint64, e *IOSSessionEnd) error { - return c.insertSessionEnd(sessionID, e.Timestamp) + return c.InsertSessionEnd(sessionID, e.Timestamp) } func (c *PGCache) InsertIOSScreenEnter(sessionID uint64, screenEnter *IOSScreenEnter) error { diff --git a/backend/pkg/db/cache/messages-web.go b/backend/pkg/db/cache/messages-web.go index 71f2c38d0..7da7006af 100644 --- a/backend/pkg/db/cache/messages-web.go +++ b/backend/pkg/db/cache/messages-web.go @@ -7,6 +7,30 @@ import ( ) func (c *PGCache) InsertWebSessionStart(sessionID uint64, s *SessionStart) error { + return c.Conn.InsertSessionStart(sessionID, &Session{ + SessionID: sessionID, + Platform: "web", + Timestamp: s.Timestamp, + ProjectID: uint32(s.ProjectID), + TrackerVersion: s.TrackerVersion, + RevID: s.RevID, + UserUUID: s.UserUUID, + UserOS: s.UserOS, + UserOSVersion: s.UserOSVersion, + UserDevice: s.UserDevice, + UserCountry: s.UserCountry, + // web properties (TODO: unite different platform types) + UserAgent: s.UserAgent, + UserBrowser: s.UserBrowser, + UserBrowserVersion: s.UserBrowserVersion, + UserDeviceType: s.UserDeviceType, + UserDeviceMemorySize: s.UserDeviceMemorySize, + UserDeviceHeapSize: s.UserDeviceHeapSize, + UserID: &s.UserID, + }) +} + +func (c *PGCache) HandleWebSessionStart(sessionID uint64, s *SessionStart) error { if c.sessions[sessionID] != nil { return errors.New("This session already in cache!") } @@ -31,7 +55,7 @@ func (c *PGCache) InsertWebSessionStart(sessionID uint64, s *SessionStart) error UserDeviceHeapSize: s.UserDeviceHeapSize, UserID: &s.UserID, } - if err := c.Conn.InsertSessionStart(sessionID, c.sessions[sessionID]); err != nil { + if err := c.Conn.HandleSessionStart(sessionID, c.sessions[sessionID]); err != nil { c.sessions[sessionID] = nil return err } @@ -39,7 +63,11 @@ func (c *PGCache) InsertWebSessionStart(sessionID uint64, s *SessionStart) error } func (c *PGCache) InsertWebSessionEnd(sessionID uint64, e *SessionEnd) error { - return c.insertSessionEnd(sessionID, e.Timestamp) + return c.InsertSessionEnd(sessionID, e.Timestamp) +} + +func (c *PGCache) HandleWebSessionEnd(sessionID uint64, e *SessionEnd) error { + return c.HandleSessionEnd(sessionID) } func (c *PGCache) InsertWebErrorEvent(sessionID uint64, e *ErrorEvent) error { diff --git a/backend/pkg/db/postgres/messages-common.go b/backend/pkg/db/postgres/messages-common.go index 0b3039a45..b92e4fd6f 100644 --- a/backend/pkg/db/postgres/messages-common.go +++ b/backend/pkg/db/postgres/messages-common.go @@ -38,7 +38,7 @@ func (conn *Conn) insertAutocompleteValue(sessionID uint64, tp string, value str } func (conn *Conn) InsertSessionStart(sessionID uint64, s *types.Session) error { - if err := conn.exec(` + return conn.exec(` INSERT INTO sessions ( session_id, project_id, start_ts, user_uuid, user_device, user_device_type, user_country, @@ -66,9 +66,10 @@ func (conn *Conn) InsertSessionStart(sessionID uint64, s *types.Session) error { s.Platform, s.UserAgent, s.UserBrowser, s.UserBrowserVersion, s.UserDeviceMemorySize, s.UserDeviceHeapSize, s.UserID, - ); err != nil { - return err - } + ) +} + +func (conn *Conn) HandleSessionStart(sessionID uint64, s *types.Session) error { conn.insertAutocompleteValue(sessionID, getAutocompleteType("USEROS", s.Platform), s.UserOS) conn.insertAutocompleteValue(sessionID, getAutocompleteType("USERDEVICE", s.Platform), s.UserDevice) conn.insertAutocompleteValue(sessionID, getAutocompleteType("USERCOUNTRY", s.Platform), s.UserCountry) @@ -79,6 +80,20 @@ func (conn *Conn) InsertSessionStart(sessionID uint64, s *types.Session) error { } func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, error) { + var dur uint64 + if err := conn.queryRow(` + UPDATE sessions SET duration=$2 - start_ts + WHERE session_id=$1 + RETURNING duration + `, + sessionID, timestamp, + ).Scan(&dur); err != nil { + return 0, err + } + return dur, nil +} + +func (conn *Conn) HandleSessionEnd(sessionID uint64, timestamp uint64) error { // TODO: search acceleration? sqlRequest := ` UPDATE sessions @@ -96,18 +111,7 @@ func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, // Record approximate message size conn.updateBatchSize(sessionID, len(sqlRequest)+8) - - var dur uint64 - if err := conn.queryRow(` - UPDATE sessions SET duration=$2 - start_ts - WHERE session_id=$1 - RETURNING duration - `, - sessionID, timestamp, - ).Scan(&dur); err != nil { - return 0, err - } - return dur, nil + return nil } func (conn *Conn) InsertRequest(sessionID uint64, timestamp uint64, index uint64, url string, duration uint64, success bool) error {