feat(backend): get fresh session from db only on SessionEnd message
This commit is contained in:
parent
125cf9c102
commit
df8aed3464
5 changed files with 13 additions and 32 deletions
|
|
@ -3,6 +3,7 @@ package main
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"log"
|
"log"
|
||||||
|
types2 "openreplay/backend/pkg/db/types"
|
||||||
"openreplay/backend/pkg/queue/types"
|
"openreplay/backend/pkg/queue/types"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
|
@ -77,7 +78,15 @@ func main() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
session, err := pg.Cache.GetSession(msg.SessionID())
|
var (
|
||||||
|
session *types2.Session
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
if msg.TypeID() == messages.MsgSessionEnd {
|
||||||
|
session, err = pg.GetSession(msg.SessionID())
|
||||||
|
} else {
|
||||||
|
session, err = pg.Cache.GetSession(msg.SessionID())
|
||||||
|
}
|
||||||
if session == nil {
|
if session == nil {
|
||||||
if err != nil && !errors.Is(err, cache.NilSessionInCacheError) {
|
if err != nil && !errors.Is(err, cache.NilSessionInCacheError) {
|
||||||
log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, msg.SessionID(), msg)
|
log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, msg.SessionID(), msg)
|
||||||
|
|
|
||||||
1
backend/pkg/db/cache/cache.go
vendored
1
backend/pkg/db/cache/cache.go
vendored
|
|
@ -22,7 +22,6 @@ type Cache interface {
|
||||||
SetSession(sess *types.Session)
|
SetSession(sess *types.Session)
|
||||||
HasSession(sessID uint64) bool
|
HasSession(sessID uint64) bool
|
||||||
GetSession(sessID uint64) (*types.Session, error)
|
GetSession(sessID uint64) (*types.Session, error)
|
||||||
SetSessionDuration(sessID, duration uint64) error
|
|
||||||
GetProject(projectID uint32) (*types.Project, error)
|
GetProject(projectID uint32) (*types.Project, error)
|
||||||
GetProjectByKey(projectKey string) (*types.Project, error)
|
GetProjectByKey(projectKey string) (*types.Project, error)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
7
backend/pkg/db/cache/messages-common.go
vendored
7
backend/pkg/db/cache/messages-common.go
vendored
|
|
@ -15,13 +15,8 @@ func (c *PGCache) InsertSessionEncryptionKey(sessionID uint64, key []byte) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *PGCache) HandleSessionEnd(sessionID uint64) error {
|
func (c *PGCache) HandleSessionEnd(sessionID uint64) error {
|
||||||
dur, err := c.Conn.HandleSessionEnd(sessionID)
|
if err := c.Conn.HandleSessionEnd(sessionID); err != nil {
|
||||||
if err != nil {
|
|
||||||
log.Printf("can't handle session end: %s", err)
|
log.Printf("can't handle session end: %s", err)
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if err := c.Cache.SetSessionDuration(sessionID, dur); err != nil {
|
|
||||||
log.Printf("can't update session duration: %s", err)
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
17
backend/pkg/db/cache/session.go
vendored
17
backend/pkg/db/cache/session.go
vendored
|
|
@ -2,7 +2,6 @@ package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"github.com/jackc/pgx/v4"
|
"github.com/jackc/pgx/v4"
|
||||||
. "openreplay/backend/pkg/db/types"
|
. "openreplay/backend/pkg/db/types"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -50,19 +49,3 @@ func (c *cacheImpl) GetSession(sessionID uint64) (*Session, error) {
|
||||||
c.sessions[sessionID] = &SessionMeta{s, time.Now()}
|
c.sessions[sessionID] = &SessionMeta{s, time.Now()}
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cacheImpl) SetSessionDuration(sessID, duration uint64) error {
|
|
||||||
if duration <= 0 {
|
|
||||||
return fmt.Errorf("session duration wrong value, val: %d", duration)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.mutex.Lock()
|
|
||||||
defer c.mutex.Unlock()
|
|
||||||
|
|
||||||
// Updating session duration to avoid insert errors in CH
|
|
||||||
sess, ok := c.sessions[sessID]
|
|
||||||
if ok && sess.Session != nil {
|
|
||||||
sess.Session.Duration = &duration
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -86,8 +86,7 @@ func (conn *Conn) InsertSessionEncryptionKey(sessionID uint64, key []byte) error
|
||||||
return conn.c.Exec(`UPDATE sessions SET file_key = $2 WHERE session_id = $1`, sessionID, string(key))
|
return conn.c.Exec(`UPDATE sessions SET file_key = $2 WHERE session_id = $1`, sessionID, string(key))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) HandleSessionEnd(sessionID uint64) (uint64, error) {
|
func (conn *Conn) HandleSessionEnd(sessionID uint64) error {
|
||||||
var dur uint64
|
|
||||||
sqlRequest := `
|
sqlRequest := `
|
||||||
UPDATE sessions
|
UPDATE sessions
|
||||||
SET issue_types=(SELECT
|
SET issue_types=(SELECT
|
||||||
|
|
@ -100,12 +99,8 @@ func (conn *Conn) HandleSessionEnd(sessionID uint64) (uint64, error) {
|
||||||
INNER JOIN issues AS ps USING (issue_id)
|
INNER JOIN issues AS ps USING (issue_id)
|
||||||
WHERE session_id = $1)
|
WHERE session_id = $1)
|
||||||
WHERE session_id = $1
|
WHERE session_id = $1
|
||||||
RETURNING duration
|
|
||||||
`
|
`
|
||||||
if err := conn.c.QueryRow(sqlRequest, sessionID).Scan(&dur); err != nil {
|
return conn.c.Exec(sqlRequest, sessionID)
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return dur, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) InsertRequest(sessionID uint64, timestamp uint64, index uint32, url string, duration uint64, success bool) error {
|
func (conn *Conn) InsertRequest(sessionID uint64, timestamp uint64, index uint32, url string, duration uint64, success bool) error {
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue