diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index bbdf8f7b8..bc25191c8 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -77,7 +77,7 @@ func main() { return } - session, err := pg.GetSession(msg.SessionID()) + session, err := pg.Cache.GetSession(msg.SessionID()) if session == nil { if err != nil && !errors.Is(err, cache.NilSessionInCacheError) { log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, msg.SessionID(), msg) diff --git a/backend/pkg/db/cache/cache.go b/backend/pkg/db/cache/cache.go index 9075ae73f..7913547a6 100644 --- a/backend/pkg/db/cache/cache.go +++ b/backend/pkg/db/cache/cache.go @@ -21,7 +21,8 @@ type ProjectMeta struct { type Cache interface { SetSession(sess *types.Session) HasSession(sessID uint64) bool - GetSession(sessionID uint64) (*types.Session, error) + GetSession(sessID uint64) (*types.Session, error) + SetSessionDuration(sessID, duration uint64) error GetProject(projectID uint32) (*types.Project, error) GetProjectByKey(projectKey string) (*types.Project, error) } diff --git a/backend/pkg/db/cache/messages-common.go b/backend/pkg/db/cache/messages-common.go index 46fc64790..717922756 100644 --- a/backend/pkg/db/cache/messages-common.go +++ b/backend/pkg/db/cache/messages-common.go @@ -4,7 +4,6 @@ import ( "log" . "openreplay/backend/pkg/messages" "time" - // . "openreplay/backend/pkg/db/types" ) func (c *PGCache) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, error) { @@ -16,14 +15,19 @@ func (c *PGCache) InsertSessionEncryptionKey(sessionID uint64, key []byte) error } func (c *PGCache) HandleSessionEnd(sessionID uint64) error { - if err := c.Conn.HandleSessionEnd(sessionID); err != nil { + dur, err := c.Conn.HandleSessionEnd(sessionID) + if err != nil { log.Printf("can't handle session end: %s", err) + return nil + } + if err := c.Cache.SetSessionDuration(sessionID, dur); err != nil { + log.Printf("can't update session duration: %s", err) } return nil } func (c *PGCache) InsertIssueEvent(sessionID uint64, crash *IssueEvent) error { - session, err := c.GetSession(sessionID) + session, err := c.Cache.GetSession(sessionID) if err != nil { return err } @@ -31,11 +35,11 @@ func (c *PGCache) InsertIssueEvent(sessionID uint64, crash *IssueEvent) error { } func (c *PGCache) InsertMetadata(sessionID uint64, metadata *Metadata) error { - session, err := c.GetSession(sessionID) + session, err := c.Cache.GetSession(sessionID) if err != nil { return err } - project, err := c.GetProject(session.ProjectID) + project, err := c.Cache.GetProject(session.ProjectID) if err != nil { return err } diff --git a/backend/pkg/db/cache/messages-ios.go b/backend/pkg/db/cache/messages-ios.go index e65051f33..961b78dad 100644 --- a/backend/pkg/db/cache/messages-ios.go +++ b/backend/pkg/db/cache/messages-ios.go @@ -7,7 +7,7 @@ import ( ) func (c *PGCache) InsertIOSSessionStart(sessionID uint64, s *IOSSessionStart) error { - if c.cache.HasSession(sessionID) { + if c.Cache.HasSession(sessionID) { return fmt.Errorf("session %d already in cache", sessionID) } newSess := &Session{ @@ -24,10 +24,10 @@ func (c *PGCache) InsertIOSSessionStart(sessionID uint64, s *IOSSessionStart) er UserCountry: s.UserCountry, UserDeviceType: s.UserDeviceType, } - c.cache.SetSession(newSess) + c.Cache.SetSession(newSess) if err := c.Conn.InsertSessionStart(sessionID, newSess); err != nil { // don't know why? - c.cache.SetSession(nil) + c.Cache.SetSession(nil) return err } return nil @@ -42,7 +42,7 @@ func (c *PGCache) InsertIOSScreenEnter(sessionID uint64, screenEnter *IOSScreenE if err := c.Conn.InsertIOSScreenEnter(sessionID, screenEnter); err != nil { return err } - session, err := c.GetSession(sessionID) + session, err := c.Cache.GetSession(sessionID) if err != nil { return err } @@ -54,7 +54,7 @@ func (c *PGCache) InsertIOSClickEvent(sessionID uint64, clickEvent *IOSClickEven if err := c.Conn.InsertIOSClickEvent(sessionID, clickEvent); err != nil { return err } - session, err := c.GetSession(sessionID) + session, err := c.Cache.GetSession(sessionID) if err != nil { return err } @@ -66,7 +66,7 @@ func (c *PGCache) InsertIOSInputEvent(sessionID uint64, inputEvent *IOSInputEven if err := c.Conn.InsertIOSInputEvent(sessionID, inputEvent); err != nil { return err } - session, err := c.GetSession(sessionID) + session, err := c.Cache.GetSession(sessionID) if err != nil { return err } @@ -75,7 +75,7 @@ func (c *PGCache) InsertIOSInputEvent(sessionID uint64, inputEvent *IOSInputEven } func (c *PGCache) InsertIOSCrash(sessionID uint64, crash *IOSCrash) error { - session, err := c.GetSession(sessionID) + session, err := c.Cache.GetSession(sessionID) if err != nil { return err } diff --git a/backend/pkg/db/cache/messages-web.go b/backend/pkg/db/cache/messages-web.go index de97ef42a..931d1f639 100644 --- a/backend/pkg/db/cache/messages-web.go +++ b/backend/pkg/db/cache/messages-web.go @@ -31,7 +31,7 @@ func (c *PGCache) InsertWebSessionStart(sessionID uint64, s *SessionStart) error } func (c *PGCache) HandleWebSessionStart(sessionID uint64, s *SessionStart) error { - if c.cache.HasSession(sessionID) { + if c.Cache.HasSession(sessionID) { return fmt.Errorf("session %d already in cache", sessionID) } newSess := &Session{ @@ -55,10 +55,10 @@ func (c *PGCache) HandleWebSessionStart(sessionID uint64, s *SessionStart) error UserDeviceHeapSize: s.UserDeviceHeapSize, UserID: &s.UserID, } - c.cache.SetSession(newSess) + c.Cache.SetSession(newSess) if err := c.Conn.HandleSessionStart(sessionID, newSess); err != nil { // don't know why? - c.cache.SetSession(nil) + c.Cache.SetSession(nil) return err } return nil @@ -80,7 +80,7 @@ func (c *PGCache) InsertWebIntegrationEvent(e *IntegrationEvent) error { return c.InsertWebErrorEvent(e.SessionID(), WrapIntegrationEvent(e)) } func (c *PGCache) InsertWebErrorEvent(sessionID uint64, e *ErrorEvent) error { - session, err := c.GetSession(sessionID) + session, err := c.Cache.GetSession(sessionID) if err != nil { return err } @@ -92,7 +92,7 @@ func (c *PGCache) InsertWebErrorEvent(sessionID uint64, e *ErrorEvent) error { } func (c *PGCache) InsertSessionReferrer(sessionID uint64, referrer string) error { - _, err := c.GetSession(sessionID) + _, err := c.Cache.GetSession(sessionID) if err != nil { return err } @@ -100,11 +100,11 @@ func (c *PGCache) InsertSessionReferrer(sessionID uint64, referrer string) error } func (c *PGCache) InsertWebFetchEvent(sessionID uint64, e *FetchEvent) error { - session, err := c.GetSession(sessionID) + session, err := c.Cache.GetSession(sessionID) if err != nil { return err } - project, err := c.GetProject(session.ProjectID) + project, err := c.Cache.GetProject(session.ProjectID) if err != nil { return err } @@ -112,11 +112,11 @@ func (c *PGCache) InsertWebFetchEvent(sessionID uint64, e *FetchEvent) error { } func (c *PGCache) InsertWebGraphQLEvent(sessionID uint64, e *GraphQLEvent) error { - session, err := c.GetSession(sessionID) + session, err := c.Cache.GetSession(sessionID) if err != nil { return err } - project, err := c.GetProject(session.ProjectID) + project, err := c.Cache.GetProject(session.ProjectID) if err != nil { return err } @@ -124,7 +124,7 @@ func (c *PGCache) InsertWebGraphQLEvent(sessionID uint64, e *GraphQLEvent) error } func (c *PGCache) InsertWebCustomEvent(sessionID uint64, e *CustomEvent) error { - session, err := c.GetSession(sessionID) + session, err := c.Cache.GetSession(sessionID) if err != nil { return err } @@ -132,7 +132,7 @@ func (c *PGCache) InsertWebCustomEvent(sessionID uint64, e *CustomEvent) error { } func (c *PGCache) InsertWebUserID(sessionID uint64, userID *UserID) error { - session, err := c.GetSession(sessionID) + session, err := c.Cache.GetSession(sessionID) if err != nil { return err } @@ -140,7 +140,7 @@ func (c *PGCache) InsertWebUserID(sessionID uint64, userID *UserID) error { } func (c *PGCache) InsertWebUserAnonymousID(sessionID uint64, userAnonymousID *UserAnonymousID) error { - session, err := c.GetSession(sessionID) + session, err := c.Cache.GetSession(sessionID) if err != nil { return err } @@ -148,7 +148,7 @@ func (c *PGCache) InsertWebUserAnonymousID(sessionID uint64, userAnonymousID *Us } func (c *PGCache) InsertWebPageEvent(sessionID uint64, e *PageEvent) error { - session, err := c.GetSession(sessionID) + session, err := c.Cache.GetSession(sessionID) if err != nil { return err } @@ -156,7 +156,7 @@ func (c *PGCache) InsertWebPageEvent(sessionID uint64, e *PageEvent) error { } func (c *PGCache) InsertWebClickEvent(sessionID uint64, e *ClickEvent) error { - session, err := c.GetSession(sessionID) + session, err := c.Cache.GetSession(sessionID) if err != nil { return err } @@ -164,7 +164,7 @@ func (c *PGCache) InsertWebClickEvent(sessionID uint64, e *ClickEvent) error { } func (c *PGCache) InsertWebInputEvent(sessionID uint64, e *InputEvent) error { - session, err := c.GetSession(sessionID) + session, err := c.Cache.GetSession(sessionID) if err != nil { return err } diff --git a/backend/pkg/db/cache/pg-cache.go b/backend/pkg/db/cache/pg-cache.go index 75e468d35..1b7a6710d 100644 --- a/backend/pkg/db/cache/pg-cache.go +++ b/backend/pkg/db/cache/pg-cache.go @@ -6,7 +6,7 @@ import ( type PGCache struct { *postgres.Conn - cache Cache + Cache Cache } func NewPGCache(conn *postgres.Conn, projectExpirationTimeoutMs int64) *PGCache { @@ -15,6 +15,6 @@ func NewPGCache(conn *postgres.Conn, projectExpirationTimeoutMs int64) *PGCache // Return PG wrapper with integrated cache layer return &PGCache{ Conn: conn, - cache: c, + Cache: c, } } diff --git a/backend/pkg/db/cache/session.go b/backend/pkg/db/cache/session.go index f03f1e955..0647531d6 100644 --- a/backend/pkg/db/cache/session.go +++ b/backend/pkg/db/cache/session.go @@ -2,6 +2,7 @@ package cache import ( "errors" + "fmt" "github.com/jackc/pgx/v4" . "openreplay/backend/pkg/db/types" "time" @@ -49,3 +50,19 @@ func (c *cacheImpl) GetSession(sessionID uint64) (*Session, error) { c.sessions[sessionID] = &SessionMeta{s, time.Now()} return s, nil } + +func (c *cacheImpl) SetSessionDuration(sessID, duration uint64) error { + if duration <= 0 { + return fmt.Errorf("session duration wrong value, val: %d", duration) + } + + c.mutex.Lock() + defer c.mutex.Unlock() + + // Updating session duration to avoid insert errors in CH + sess, ok := c.sessions[sessID] + if ok && sess.Session != nil { + sess.Session.Duration = &duration + } + return nil +} diff --git a/backend/pkg/db/postgres/messages-common.go b/backend/pkg/db/postgres/messages-common.go index 1742c18e6..e2bb73241 100644 --- a/backend/pkg/db/postgres/messages-common.go +++ b/backend/pkg/db/postgres/messages-common.go @@ -86,7 +86,8 @@ func (conn *Conn) InsertSessionEncryptionKey(sessionID uint64, key []byte) error return conn.c.Exec(`UPDATE sessions SET file_key = $2 WHERE session_id = $1`, sessionID, string(key)) } -func (conn *Conn) HandleSessionEnd(sessionID uint64) error { +func (conn *Conn) HandleSessionEnd(sessionID uint64) (uint64, error) { + var dur uint64 sqlRequest := ` UPDATE sessions SET issue_types=(SELECT @@ -95,11 +96,16 @@ func (conn *Conn) HandleSessionEnd(sessionID uint64) error { ELSE (COALESCE(ARRAY_AGG(DISTINCT ps.type), '{}'))::issue_type[] END - FROM events_common.issues - INNER JOIN issues AS ps USING (issue_id) - WHERE session_id = $1) - WHERE session_id = $1` - return conn.c.Exec(sqlRequest, sessionID) + FROM events_common.issues + INNER JOIN issues AS ps USING (issue_id) + WHERE session_id = $1) + WHERE session_id = $1 + RETURNING duration + ` + if err := conn.c.QueryRow(sqlRequest, sessionID).Scan(&dur); err != nil { + return 0, err + } + return dur, nil } func (conn *Conn) InsertRequest(sessionID uint64, timestamp uint64, index uint32, url string, duration uint64, success bool) error {