From 1997d326d6c70b3f303eddf6a102c78ae2df0d23 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 21 Jul 2022 10:55:20 +0200 Subject: [PATCH] Fix duplicates (#628) * feat(backend/ender): added check for sessionEnd duplicates --- backend/cmd/ender/main.go | 12 +++++++++++- backend/pkg/db/cache/messages-common.go | 8 ++------ backend/pkg/db/cache/messages-ios.go | 3 ++- backend/pkg/db/cache/messages-web.go | 3 ++- backend/pkg/db/postgres/messages-common.go | 8 ++++++++ 5 files changed, 25 insertions(+), 9 deletions(-) diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index c0613fca0..1fd2f4e64 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -82,10 +82,20 @@ func main() { // Find ended sessions and send notification to other services sessions.HandleEndedSessions(func(sessionID uint64, timestamp int64) bool { msg := &messages.SessionEnd{Timestamp: uint64(timestamp)} - if err := pg.InsertSessionEnd(sessionID, msg.Timestamp); err != nil { + currDuration, err := pg.GetSessionDuration(sessionID) + if err != nil { + log.Printf("getSessionDuration failed, sessID: %d, err: %s", sessionID, err) + } + newDuration, err := pg.InsertSessionEnd(sessionID, msg.Timestamp) + if err != nil { log.Printf("can't save sessionEnd to database, sessID: %d, err: %s", sessionID, err) return false } + if currDuration == newDuration { + log.Printf("sessionEnd duplicate, sessID: %d, prevDur: %d, newDur: %d", sessionID, + currDuration, newDuration) + return true + } if err := producer.Produce(cfg.TopicRawWeb, sessionID, messages.Encode(msg)); err != nil { log.Printf("can't send sessionEnd to topic: %s; sessID: %d", err, sessionID) return false diff --git a/backend/pkg/db/cache/messages-common.go b/backend/pkg/db/cache/messages-common.go index cebdaf5e7..41cdb1895 100644 --- a/backend/pkg/db/cache/messages-common.go +++ b/backend/pkg/db/cache/messages-common.go @@ -7,12 +7,8 @@ import ( // . "openreplay/backend/pkg/db/types" ) -func (c *PGCache) InsertSessionEnd(sessionID uint64, timestamp uint64) error { - _, err := c.Conn.InsertSessionEnd(sessionID, timestamp) - if err != nil { - return err - } - return nil +func (c *PGCache) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, error) { + return c.Conn.InsertSessionEnd(sessionID, timestamp) } func (c *PGCache) HandleSessionEnd(sessionID uint64) error { diff --git a/backend/pkg/db/cache/messages-ios.go b/backend/pkg/db/cache/messages-ios.go index 87cfef13a..e0463c431 100644 --- a/backend/pkg/db/cache/messages-ios.go +++ b/backend/pkg/db/cache/messages-ios.go @@ -32,7 +32,8 @@ func (c *PGCache) InsertIOSSessionStart(sessionID uint64, s *IOSSessionStart) er } func (c *PGCache) InsertIOSSessionEnd(sessionID uint64, e *IOSSessionEnd) error { - return c.InsertSessionEnd(sessionID, e.Timestamp) + _, err := c.InsertSessionEnd(sessionID, e.Timestamp) + return err } func (c *PGCache) InsertIOSScreenEnter(sessionID uint64, screenEnter *IOSScreenEnter) error { diff --git a/backend/pkg/db/cache/messages-web.go b/backend/pkg/db/cache/messages-web.go index e704f6c9f..0a864e6d3 100644 --- a/backend/pkg/db/cache/messages-web.go +++ b/backend/pkg/db/cache/messages-web.go @@ -63,7 +63,8 @@ func (c *PGCache) HandleWebSessionStart(sessionID uint64, s *SessionStart) error } func (c *PGCache) InsertWebSessionEnd(sessionID uint64, e *SessionEnd) error { - return c.InsertSessionEnd(sessionID, e.Timestamp) + _, err := c.InsertSessionEnd(sessionID, e.Timestamp) + return err } func (c *PGCache) HandleWebSessionEnd(sessionID uint64, e *SessionEnd) error { diff --git a/backend/pkg/db/postgres/messages-common.go b/backend/pkg/db/postgres/messages-common.go index 8409d38e7..2925acde3 100644 --- a/backend/pkg/db/postgres/messages-common.go +++ b/backend/pkg/db/postgres/messages-common.go @@ -60,6 +60,14 @@ func (conn *Conn) HandleSessionStart(sessionID uint64, s *types.Session) error { return nil } +func (conn *Conn) GetSessionDuration(sessionID uint64) (uint64, error) { + var dur uint64 + if err := conn.c.QueryRow("SELECT COALESCE( duration, 0 ) FROM sessions WHERE session_id=$1", sessionID).Scan(&dur); err != nil { + return 0, err + } + return dur, nil +} + func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, error) { var dur uint64 if err := conn.c.QueryRow(`