fix(backend): fixed bug in sessions cache layer and in sessionEnd handler
This commit is contained in:
parent
e62aa0a3c8
commit
920c2523d6
8 changed files with 65 additions and 37 deletions
|
|
@ -77,7 +77,7 @@ func main() {
|
|||
return
|
||||
}
|
||||
|
||||
session, err := pg.GetSession(msg.SessionID())
|
||||
session, err := pg.Cache.GetSession(msg.SessionID())
|
||||
if session == nil {
|
||||
if err != nil && !errors.Is(err, cache.NilSessionInCacheError) {
|
||||
log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, msg.SessionID(), msg)
|
||||
|
|
|
|||
3
backend/pkg/db/cache/cache.go
vendored
3
backend/pkg/db/cache/cache.go
vendored
|
|
@ -21,7 +21,8 @@ type ProjectMeta struct {
|
|||
type Cache interface {
|
||||
SetSession(sess *types.Session)
|
||||
HasSession(sessID uint64) bool
|
||||
GetSession(sessionID uint64) (*types.Session, error)
|
||||
GetSession(sessID uint64) (*types.Session, error)
|
||||
SetSessionDuration(sessID, duration uint64) error
|
||||
GetProject(projectID uint32) (*types.Project, error)
|
||||
GetProjectByKey(projectKey string) (*types.Project, error)
|
||||
}
|
||||
|
|
|
|||
14
backend/pkg/db/cache/messages-common.go
vendored
14
backend/pkg/db/cache/messages-common.go
vendored
|
|
@ -4,7 +4,6 @@ import (
|
|||
"log"
|
||||
. "openreplay/backend/pkg/messages"
|
||||
"time"
|
||||
// . "openreplay/backend/pkg/db/types"
|
||||
)
|
||||
|
||||
func (c *PGCache) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, error) {
|
||||
|
|
@ -16,14 +15,19 @@ func (c *PGCache) InsertSessionEncryptionKey(sessionID uint64, key []byte) error
|
|||
}
|
||||
|
||||
func (c *PGCache) HandleSessionEnd(sessionID uint64) error {
|
||||
if err := c.Conn.HandleSessionEnd(sessionID); err != nil {
|
||||
dur, err := c.Conn.HandleSessionEnd(sessionID)
|
||||
if err != nil {
|
||||
log.Printf("can't handle session end: %s", err)
|
||||
return nil
|
||||
}
|
||||
if err := c.Cache.SetSessionDuration(sessionID, dur); err != nil {
|
||||
log.Printf("can't update session duration: %s", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *PGCache) InsertIssueEvent(sessionID uint64, crash *IssueEvent) error {
|
||||
session, err := c.GetSession(sessionID)
|
||||
session, err := c.Cache.GetSession(sessionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -31,11 +35,11 @@ func (c *PGCache) InsertIssueEvent(sessionID uint64, crash *IssueEvent) error {
|
|||
}
|
||||
|
||||
func (c *PGCache) InsertMetadata(sessionID uint64, metadata *Metadata) error {
|
||||
session, err := c.GetSession(sessionID)
|
||||
session, err := c.Cache.GetSession(sessionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
project, err := c.GetProject(session.ProjectID)
|
||||
project, err := c.Cache.GetProject(session.ProjectID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
14
backend/pkg/db/cache/messages-ios.go
vendored
14
backend/pkg/db/cache/messages-ios.go
vendored
|
|
@ -7,7 +7,7 @@ import (
|
|||
)
|
||||
|
||||
func (c *PGCache) InsertIOSSessionStart(sessionID uint64, s *IOSSessionStart) error {
|
||||
if c.cache.HasSession(sessionID) {
|
||||
if c.Cache.HasSession(sessionID) {
|
||||
return fmt.Errorf("session %d already in cache", sessionID)
|
||||
}
|
||||
newSess := &Session{
|
||||
|
|
@ -24,10 +24,10 @@ func (c *PGCache) InsertIOSSessionStart(sessionID uint64, s *IOSSessionStart) er
|
|||
UserCountry: s.UserCountry,
|
||||
UserDeviceType: s.UserDeviceType,
|
||||
}
|
||||
c.cache.SetSession(newSess)
|
||||
c.Cache.SetSession(newSess)
|
||||
if err := c.Conn.InsertSessionStart(sessionID, newSess); err != nil {
|
||||
// don't know why?
|
||||
c.cache.SetSession(nil)
|
||||
c.Cache.SetSession(nil)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
|
@ -42,7 +42,7 @@ func (c *PGCache) InsertIOSScreenEnter(sessionID uint64, screenEnter *IOSScreenE
|
|||
if err := c.Conn.InsertIOSScreenEnter(sessionID, screenEnter); err != nil {
|
||||
return err
|
||||
}
|
||||
session, err := c.GetSession(sessionID)
|
||||
session, err := c.Cache.GetSession(sessionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -54,7 +54,7 @@ func (c *PGCache) InsertIOSClickEvent(sessionID uint64, clickEvent *IOSClickEven
|
|||
if err := c.Conn.InsertIOSClickEvent(sessionID, clickEvent); err != nil {
|
||||
return err
|
||||
}
|
||||
session, err := c.GetSession(sessionID)
|
||||
session, err := c.Cache.GetSession(sessionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -66,7 +66,7 @@ func (c *PGCache) InsertIOSInputEvent(sessionID uint64, inputEvent *IOSInputEven
|
|||
if err := c.Conn.InsertIOSInputEvent(sessionID, inputEvent); err != nil {
|
||||
return err
|
||||
}
|
||||
session, err := c.GetSession(sessionID)
|
||||
session, err := c.Cache.GetSession(sessionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -75,7 +75,7 @@ func (c *PGCache) InsertIOSInputEvent(sessionID uint64, inputEvent *IOSInputEven
|
|||
}
|
||||
|
||||
func (c *PGCache) InsertIOSCrash(sessionID uint64, crash *IOSCrash) error {
|
||||
session, err := c.GetSession(sessionID)
|
||||
session, err := c.Cache.GetSession(sessionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
30
backend/pkg/db/cache/messages-web.go
vendored
30
backend/pkg/db/cache/messages-web.go
vendored
|
|
@ -31,7 +31,7 @@ func (c *PGCache) InsertWebSessionStart(sessionID uint64, s *SessionStart) error
|
|||
}
|
||||
|
||||
func (c *PGCache) HandleWebSessionStart(sessionID uint64, s *SessionStart) error {
|
||||
if c.cache.HasSession(sessionID) {
|
||||
if c.Cache.HasSession(sessionID) {
|
||||
return fmt.Errorf("session %d already in cache", sessionID)
|
||||
}
|
||||
newSess := &Session{
|
||||
|
|
@ -55,10 +55,10 @@ func (c *PGCache) HandleWebSessionStart(sessionID uint64, s *SessionStart) error
|
|||
UserDeviceHeapSize: s.UserDeviceHeapSize,
|
||||
UserID: &s.UserID,
|
||||
}
|
||||
c.cache.SetSession(newSess)
|
||||
c.Cache.SetSession(newSess)
|
||||
if err := c.Conn.HandleSessionStart(sessionID, newSess); err != nil {
|
||||
// don't know why?
|
||||
c.cache.SetSession(nil)
|
||||
c.Cache.SetSession(nil)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
|
@ -80,7 +80,7 @@ func (c *PGCache) InsertWebIntegrationEvent(e *IntegrationEvent) error {
|
|||
return c.InsertWebErrorEvent(e.SessionID(), WrapIntegrationEvent(e))
|
||||
}
|
||||
func (c *PGCache) InsertWebErrorEvent(sessionID uint64, e *ErrorEvent) error {
|
||||
session, err := c.GetSession(sessionID)
|
||||
session, err := c.Cache.GetSession(sessionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -92,7 +92,7 @@ func (c *PGCache) InsertWebErrorEvent(sessionID uint64, e *ErrorEvent) error {
|
|||
}
|
||||
|
||||
func (c *PGCache) InsertSessionReferrer(sessionID uint64, referrer string) error {
|
||||
_, err := c.GetSession(sessionID)
|
||||
_, err := c.Cache.GetSession(sessionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -100,11 +100,11 @@ func (c *PGCache) InsertSessionReferrer(sessionID uint64, referrer string) error
|
|||
}
|
||||
|
||||
func (c *PGCache) InsertWebFetchEvent(sessionID uint64, e *FetchEvent) error {
|
||||
session, err := c.GetSession(sessionID)
|
||||
session, err := c.Cache.GetSession(sessionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
project, err := c.GetProject(session.ProjectID)
|
||||
project, err := c.Cache.GetProject(session.ProjectID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -112,11 +112,11 @@ func (c *PGCache) InsertWebFetchEvent(sessionID uint64, e *FetchEvent) error {
|
|||
}
|
||||
|
||||
func (c *PGCache) InsertWebGraphQLEvent(sessionID uint64, e *GraphQLEvent) error {
|
||||
session, err := c.GetSession(sessionID)
|
||||
session, err := c.Cache.GetSession(sessionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
project, err := c.GetProject(session.ProjectID)
|
||||
project, err := c.Cache.GetProject(session.ProjectID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -124,7 +124,7 @@ func (c *PGCache) InsertWebGraphQLEvent(sessionID uint64, e *GraphQLEvent) error
|
|||
}
|
||||
|
||||
func (c *PGCache) InsertWebCustomEvent(sessionID uint64, e *CustomEvent) error {
|
||||
session, err := c.GetSession(sessionID)
|
||||
session, err := c.Cache.GetSession(sessionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -132,7 +132,7 @@ func (c *PGCache) InsertWebCustomEvent(sessionID uint64, e *CustomEvent) error {
|
|||
}
|
||||
|
||||
func (c *PGCache) InsertWebUserID(sessionID uint64, userID *UserID) error {
|
||||
session, err := c.GetSession(sessionID)
|
||||
session, err := c.Cache.GetSession(sessionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -140,7 +140,7 @@ func (c *PGCache) InsertWebUserID(sessionID uint64, userID *UserID) error {
|
|||
}
|
||||
|
||||
func (c *PGCache) InsertWebUserAnonymousID(sessionID uint64, userAnonymousID *UserAnonymousID) error {
|
||||
session, err := c.GetSession(sessionID)
|
||||
session, err := c.Cache.GetSession(sessionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -148,7 +148,7 @@ func (c *PGCache) InsertWebUserAnonymousID(sessionID uint64, userAnonymousID *Us
|
|||
}
|
||||
|
||||
func (c *PGCache) InsertWebPageEvent(sessionID uint64, e *PageEvent) error {
|
||||
session, err := c.GetSession(sessionID)
|
||||
session, err := c.Cache.GetSession(sessionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -156,7 +156,7 @@ func (c *PGCache) InsertWebPageEvent(sessionID uint64, e *PageEvent) error {
|
|||
}
|
||||
|
||||
func (c *PGCache) InsertWebClickEvent(sessionID uint64, e *ClickEvent) error {
|
||||
session, err := c.GetSession(sessionID)
|
||||
session, err := c.Cache.GetSession(sessionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -164,7 +164,7 @@ func (c *PGCache) InsertWebClickEvent(sessionID uint64, e *ClickEvent) error {
|
|||
}
|
||||
|
||||
func (c *PGCache) InsertWebInputEvent(sessionID uint64, e *InputEvent) error {
|
||||
session, err := c.GetSession(sessionID)
|
||||
session, err := c.Cache.GetSession(sessionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
4
backend/pkg/db/cache/pg-cache.go
vendored
4
backend/pkg/db/cache/pg-cache.go
vendored
|
|
@ -6,7 +6,7 @@ import (
|
|||
|
||||
type PGCache struct {
|
||||
*postgres.Conn
|
||||
cache Cache
|
||||
Cache Cache
|
||||
}
|
||||
|
||||
func NewPGCache(conn *postgres.Conn, projectExpirationTimeoutMs int64) *PGCache {
|
||||
|
|
@ -15,6 +15,6 @@ func NewPGCache(conn *postgres.Conn, projectExpirationTimeoutMs int64) *PGCache
|
|||
// Return PG wrapper with integrated cache layer
|
||||
return &PGCache{
|
||||
Conn: conn,
|
||||
cache: c,
|
||||
Cache: c,
|
||||
}
|
||||
}
|
||||
|
|
|
|||
17
backend/pkg/db/cache/session.go
vendored
17
backend/pkg/db/cache/session.go
vendored
|
|
@ -2,6 +2,7 @@ package cache
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/jackc/pgx/v4"
|
||||
. "openreplay/backend/pkg/db/types"
|
||||
"time"
|
||||
|
|
@ -49,3 +50,19 @@ func (c *cacheImpl) GetSession(sessionID uint64) (*Session, error) {
|
|||
c.sessions[sessionID] = &SessionMeta{s, time.Now()}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (c *cacheImpl) SetSessionDuration(sessID, duration uint64) error {
|
||||
if duration <= 0 {
|
||||
return fmt.Errorf("session duration wrong value, val: %d", duration)
|
||||
}
|
||||
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
// Updating session duration to avoid insert errors in CH
|
||||
sess, ok := c.sessions[sessID]
|
||||
if ok && sess.Session != nil {
|
||||
sess.Session.Duration = &duration
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -86,7 +86,8 @@ func (conn *Conn) InsertSessionEncryptionKey(sessionID uint64, key []byte) error
|
|||
return conn.c.Exec(`UPDATE sessions SET file_key = $2 WHERE session_id = $1`, sessionID, string(key))
|
||||
}
|
||||
|
||||
func (conn *Conn) HandleSessionEnd(sessionID uint64) error {
|
||||
func (conn *Conn) HandleSessionEnd(sessionID uint64) (uint64, error) {
|
||||
var dur uint64
|
||||
sqlRequest := `
|
||||
UPDATE sessions
|
||||
SET issue_types=(SELECT
|
||||
|
|
@ -95,11 +96,16 @@ func (conn *Conn) HandleSessionEnd(sessionID uint64) error {
|
|||
ELSE
|
||||
(COALESCE(ARRAY_AGG(DISTINCT ps.type), '{}'))::issue_type[]
|
||||
END
|
||||
FROM events_common.issues
|
||||
INNER JOIN issues AS ps USING (issue_id)
|
||||
WHERE session_id = $1)
|
||||
WHERE session_id = $1`
|
||||
return conn.c.Exec(sqlRequest, sessionID)
|
||||
FROM events_common.issues
|
||||
INNER JOIN issues AS ps USING (issue_id)
|
||||
WHERE session_id = $1)
|
||||
WHERE session_id = $1
|
||||
RETURNING duration
|
||||
`
|
||||
if err := conn.c.QueryRow(sqlRequest, sessionID).Scan(&dur); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return dur, nil
|
||||
}
|
||||
|
||||
func (conn *Conn) InsertRequest(sessionID uint64, timestamp uint64, index uint32, url string, duration uint64, success bool) error {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue