Fix duplicates (#628)
* feat(backend/ender): added check for sessionEnd duplicates
This commit is contained in:
parent
7e92ac29bf
commit
1997d326d6
5 changed files with 25 additions and 9 deletions
|
|
@ -82,10 +82,20 @@ func main() {
|
|||
// Find ended sessions and send notification to other services
|
||||
sessions.HandleEndedSessions(func(sessionID uint64, timestamp int64) bool {
|
||||
msg := &messages.SessionEnd{Timestamp: uint64(timestamp)}
|
||||
if err := pg.InsertSessionEnd(sessionID, msg.Timestamp); err != nil {
|
||||
currDuration, err := pg.GetSessionDuration(sessionID)
|
||||
if err != nil {
|
||||
log.Printf("getSessionDuration failed, sessID: %d, err: %s", sessionID, err)
|
||||
}
|
||||
newDuration, err := pg.InsertSessionEnd(sessionID, msg.Timestamp)
|
||||
if err != nil {
|
||||
log.Printf("can't save sessionEnd to database, sessID: %d, err: %s", sessionID, err)
|
||||
return false
|
||||
}
|
||||
if currDuration == newDuration {
|
||||
log.Printf("sessionEnd duplicate, sessID: %d, prevDur: %d, newDur: %d", sessionID,
|
||||
currDuration, newDuration)
|
||||
return true
|
||||
}
|
||||
if err := producer.Produce(cfg.TopicRawWeb, sessionID, messages.Encode(msg)); err != nil {
|
||||
log.Printf("can't send sessionEnd to topic: %s; sessID: %d", err, sessionID)
|
||||
return false
|
||||
|
|
|
|||
8
backend/pkg/db/cache/messages-common.go
vendored
8
backend/pkg/db/cache/messages-common.go
vendored
|
|
@ -7,12 +7,8 @@ import (
|
|||
// . "openreplay/backend/pkg/db/types"
|
||||
)
|
||||
|
||||
func (c *PGCache) InsertSessionEnd(sessionID uint64, timestamp uint64) error {
|
||||
_, err := c.Conn.InsertSessionEnd(sessionID, timestamp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
func (c *PGCache) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, error) {
|
||||
return c.Conn.InsertSessionEnd(sessionID, timestamp)
|
||||
}
|
||||
|
||||
func (c *PGCache) HandleSessionEnd(sessionID uint64) error {
|
||||
|
|
|
|||
3
backend/pkg/db/cache/messages-ios.go
vendored
3
backend/pkg/db/cache/messages-ios.go
vendored
|
|
@ -32,7 +32,8 @@ func (c *PGCache) InsertIOSSessionStart(sessionID uint64, s *IOSSessionStart) er
|
|||
}
|
||||
|
||||
func (c *PGCache) InsertIOSSessionEnd(sessionID uint64, e *IOSSessionEnd) error {
|
||||
return c.InsertSessionEnd(sessionID, e.Timestamp)
|
||||
_, err := c.InsertSessionEnd(sessionID, e.Timestamp)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *PGCache) InsertIOSScreenEnter(sessionID uint64, screenEnter *IOSScreenEnter) error {
|
||||
|
|
|
|||
3
backend/pkg/db/cache/messages-web.go
vendored
3
backend/pkg/db/cache/messages-web.go
vendored
|
|
@ -63,7 +63,8 @@ func (c *PGCache) HandleWebSessionStart(sessionID uint64, s *SessionStart) error
|
|||
}
|
||||
|
||||
func (c *PGCache) InsertWebSessionEnd(sessionID uint64, e *SessionEnd) error {
|
||||
return c.InsertSessionEnd(sessionID, e.Timestamp)
|
||||
_, err := c.InsertSessionEnd(sessionID, e.Timestamp)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *PGCache) HandleWebSessionEnd(sessionID uint64, e *SessionEnd) error {
|
||||
|
|
|
|||
|
|
@ -60,6 +60,14 @@ func (conn *Conn) HandleSessionStart(sessionID uint64, s *types.Session) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (conn *Conn) GetSessionDuration(sessionID uint64) (uint64, error) {
|
||||
var dur uint64
|
||||
if err := conn.c.QueryRow("SELECT COALESCE( duration, 0 ) FROM sessions WHERE session_id=$1", sessionID).Scan(&dur); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return dur, nil
|
||||
}
|
||||
|
||||
func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, error) {
|
||||
var dur uint64
|
||||
if err := conn.c.QueryRow(`
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue