feat(backend): moved recording sessionStart to db into http service and sessionEnd into ender service (#545)

Co-authored-by: Alexander Zavorotynskiy <alexander@openreplay.com>
This commit is contained in:
Alexander 2022-06-20 09:26:05 +02:00 committed by GitHub
parent 3da78cfe62
commit 623e241afb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 90 additions and 41 deletions

View file

@ -4,6 +4,8 @@ import (
"log"
"openreplay/backend/internal/config/ender"
"openreplay/backend/internal/sessionender"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/monitoring"
"time"
@ -30,6 +32,9 @@ func main() {
// Load service configuration
cfg := ender.New()
pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres), cfg.ProjectExpirationTimeoutMs)
defer pg.Close()
// Init all modules
statsLogger := logger.NewQueueStats(cfg.LoggerTimeout)
sessions, err := sessionender.New(metrics, intervals.EVENTS_SESSION_END_TIMEOUT, cfg.PartitionsNumber)
@ -70,8 +75,12 @@ func main() {
// Find ended sessions and send notification to other services
sessions.HandleEndedSessions(func(sessionID uint64, timestamp int64) bool {
msg := &messages.SessionEnd{Timestamp: uint64(timestamp)}
if err := pg.InsertSessionEnd(sessionID, msg.Timestamp); err != nil {
log.Printf("can't save sessionEnd to database, sessID: %d", sessionID)
return false
}
if err := producer.Produce(cfg.TopicRawWeb, sessionID, messages.Encode(msg)); err != nil {
log.Printf("can't send SessionEnd to trigger topic: %s; sessID: %d", err, sessionID)
log.Printf("can't send sessionEnd to topic: %s; sessID: %d", err, sessionID)
return false
}
return true

View file

@ -5,19 +5,23 @@ import (
)
type Config struct {
GroupEnder string
LoggerTimeout int
TopicRawWeb string
ProducerTimeout int
PartitionsNumber int
Postgres string
ProjectExpirationTimeoutMs int64
GroupEnder string
LoggerTimeout int
TopicRawWeb string
ProducerTimeout int
PartitionsNumber int
}
func New() *Config {
return &Config{
GroupEnder: env.String("GROUP_ENDER"),
LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"),
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
ProducerTimeout: 2000,
PartitionsNumber: env.Int("PARTITIONS_NUMBER"),
Postgres: env.String("POSTGRES_STRING"),
ProjectExpirationTimeoutMs: 1000 * 60 * 20,
GroupEnder: env.String("GROUP_ENDER"),
LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"),
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
ProducerTimeout: 2000,
PartitionsNumber: env.Int("PARTITIONS_NUMBER"),
}
}

View file

@ -19,9 +19,9 @@ func (mi *Saver) InsertMessage(sessionID uint64, msg Message) error {
// Web
case *SessionStart:
return mi.pg.InsertWebSessionStart(sessionID, m)
return mi.pg.HandleWebSessionStart(sessionID, m)
case *SessionEnd:
return mi.pg.InsertWebSessionEnd(sessionID, m)
return mi.pg.HandleWebSessionEnd(sessionID, m)
case *UserID:
return mi.pg.InsertWebUserID(sessionID, m)
case *UserAnonymousID:

View file

@ -98,7 +98,7 @@ func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request)
expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond)
tokenData = &token.TokenData{ID: sessionID, ExpTime: expTime.UnixMilli()}
e.services.Producer.Produce(e.cfg.TopicRawWeb, tokenData.ID, Encode(&SessionStart{
sessionStart := &SessionStart{
Timestamp: req.Timestamp,
ProjectID: uint64(p.ProjectID),
TrackerVersion: req.TrackerVersion,
@ -115,7 +115,13 @@ func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request)
UserDeviceMemorySize: req.DeviceMemory,
UserDeviceHeapSize: req.JsHeapSizeLimit,
UserID: req.UserID,
}))
}
// Save sessionStart to db
e.services.Database.InsertWebSessionStart(sessionID, sessionStart)
// Send sessionStart message to kafka
e.services.Producer.Produce(e.cfg.TopicRawWeb, tokenData.ID, Encode(sessionStart))
}
ResponseWithJSON(w, &StartSessionResponse{

View file

@ -7,18 +7,16 @@ import (
// . "openreplay/backend/pkg/db/types"
)
func (c *PGCache) insertSessionEnd(sessionID uint64, timestamp uint64) error {
//duration, err := c.Conn.InsertSessionEnd(sessionID, timestamp)
func (c *PGCache) InsertSessionEnd(sessionID uint64, timestamp uint64) error {
_, err := c.Conn.InsertSessionEnd(sessionID, timestamp)
if err != nil {
return err
}
return nil
}
func (c *PGCache) HandleSessionEnd(sessionID uint64) error {
c.DeleteSession(sessionID)
// session, err := c.GetSession(sessionID)
// if err != nil {
// return err
// }
// session.Duration = &duration
return nil
}

View file

@ -32,7 +32,7 @@ func (c *PGCache) InsertIOSSessionStart(sessionID uint64, s *IOSSessionStart) er
}
func (c *PGCache) InsertIOSSessionEnd(sessionID uint64, e *IOSSessionEnd) error {
return c.insertSessionEnd(sessionID, e.Timestamp)
return c.InsertSessionEnd(sessionID, e.Timestamp)
}
func (c *PGCache) InsertIOSScreenEnter(sessionID uint64, screenEnter *IOSScreenEnter) error {

View file

@ -7,6 +7,30 @@ import (
)
func (c *PGCache) InsertWebSessionStart(sessionID uint64, s *SessionStart) error {
return c.Conn.InsertSessionStart(sessionID, &Session{
SessionID: sessionID,
Platform: "web",
Timestamp: s.Timestamp,
ProjectID: uint32(s.ProjectID),
TrackerVersion: s.TrackerVersion,
RevID: s.RevID,
UserUUID: s.UserUUID,
UserOS: s.UserOS,
UserOSVersion: s.UserOSVersion,
UserDevice: s.UserDevice,
UserCountry: s.UserCountry,
// web properties (TODO: unite different platform types)
UserAgent: s.UserAgent,
UserBrowser: s.UserBrowser,
UserBrowserVersion: s.UserBrowserVersion,
UserDeviceType: s.UserDeviceType,
UserDeviceMemorySize: s.UserDeviceMemorySize,
UserDeviceHeapSize: s.UserDeviceHeapSize,
UserID: &s.UserID,
})
}
func (c *PGCache) HandleWebSessionStart(sessionID uint64, s *SessionStart) error {
if c.sessions[sessionID] != nil {
return errors.New("This session already in cache!")
}
@ -31,7 +55,7 @@ func (c *PGCache) InsertWebSessionStart(sessionID uint64, s *SessionStart) error
UserDeviceHeapSize: s.UserDeviceHeapSize,
UserID: &s.UserID,
}
if err := c.Conn.InsertSessionStart(sessionID, c.sessions[sessionID]); err != nil {
if err := c.Conn.HandleSessionStart(sessionID, c.sessions[sessionID]); err != nil {
c.sessions[sessionID] = nil
return err
}
@ -39,7 +63,11 @@ func (c *PGCache) InsertWebSessionStart(sessionID uint64, s *SessionStart) error
}
func (c *PGCache) InsertWebSessionEnd(sessionID uint64, e *SessionEnd) error {
return c.insertSessionEnd(sessionID, e.Timestamp)
return c.InsertSessionEnd(sessionID, e.Timestamp)
}
func (c *PGCache) HandleWebSessionEnd(sessionID uint64, e *SessionEnd) error {
return c.HandleSessionEnd(sessionID)
}
func (c *PGCache) InsertWebErrorEvent(sessionID uint64, e *ErrorEvent) error {

View file

@ -38,7 +38,7 @@ func (conn *Conn) insertAutocompleteValue(sessionID uint64, tp string, value str
}
func (conn *Conn) InsertSessionStart(sessionID uint64, s *types.Session) error {
if err := conn.exec(`
return conn.exec(`
INSERT INTO sessions (
session_id, project_id, start_ts,
user_uuid, user_device, user_device_type, user_country,
@ -66,9 +66,10 @@ func (conn *Conn) InsertSessionStart(sessionID uint64, s *types.Session) error {
s.Platform,
s.UserAgent, s.UserBrowser, s.UserBrowserVersion, s.UserDeviceMemorySize, s.UserDeviceHeapSize,
s.UserID,
); err != nil {
return err
}
)
}
func (conn *Conn) HandleSessionStart(sessionID uint64, s *types.Session) error {
conn.insertAutocompleteValue(sessionID, getAutocompleteType("USEROS", s.Platform), s.UserOS)
conn.insertAutocompleteValue(sessionID, getAutocompleteType("USERDEVICE", s.Platform), s.UserDevice)
conn.insertAutocompleteValue(sessionID, getAutocompleteType("USERCOUNTRY", s.Platform), s.UserCountry)
@ -79,6 +80,20 @@ func (conn *Conn) InsertSessionStart(sessionID uint64, s *types.Session) error {
}
func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, error) {
var dur uint64
if err := conn.queryRow(`
UPDATE sessions SET duration=$2 - start_ts
WHERE session_id=$1
RETURNING duration
`,
sessionID, timestamp,
).Scan(&dur); err != nil {
return 0, err
}
return dur, nil
}
func (conn *Conn) HandleSessionEnd(sessionID uint64, timestamp uint64) error {
// TODO: search acceleration?
sqlRequest := `
UPDATE sessions
@ -96,18 +111,7 @@ func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64,
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+8)
var dur uint64
if err := conn.queryRow(`
UPDATE sessions SET duration=$2 - start_ts
WHERE session_id=$1
RETURNING duration
`,
sessionID, timestamp,
).Scan(&dur); err != nil {
return 0, err
}
return dur, nil
return nil
}
func (conn *Conn) InsertRequest(sessionID uint64, timestamp uint64, index uint64, url string, duration uint64, success bool) error {