From 81f70ddd4d81fce51db4437c6238bfadd60dbef7 Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Thu, 3 Aug 2023 18:05:36 +0400 Subject: [PATCH] feat(backend): logic improvements in sessionEnder --- backend/cmd/ender/main.go | 77 ++++++++++++++--- backend/internal/sessionender/ender.go | 85 ++++++++++++++++--- .../internal/sessionender/timecontroller.go | 23 +++-- 3 files changed, 154 insertions(+), 31 deletions(-) diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index f93b05f66..a9e90ab57 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -77,28 +77,69 @@ func main() { case <-tick: failedSessionEnds := make(map[uint64]uint64) duplicatedSessionEnds := make(map[uint64]uint64) + negativeDuration := make(map[uint64]uint64) + shorterDuration := make(map[uint64]int64) + diffDuration := make(map[uint64]int64) + noSessionInDB := make(map[uint64]uint64) + updatedDurations := 0 + newSessionEnds := 0 + + type SessionEndType int + const ( + FailedSessionEnd SessionEndType = iota + 1 + DuplicatedSessionEnd + NegativeDuration + ShorterDuration + DiffDuration + NewSessionEnd + NoSessionInDB + ) // Find ended sessions and send notification to other services - sessions.HandleEndedSessions(func(sessionID uint64, timestamp uint64) bool { + sessions.HandleEndedSessions(func(sessionID uint64, timestamp uint64) (bool, int) { msg := &messages.SessionEnd{Timestamp: timestamp} currDuration, err := pg.GetSessionDuration(sessionID) if err != nil { log.Printf("getSessionDuration failed, sessID: %d, err: %s", sessionID, err) } + sess, err := pg.Cache.GetSession(sessionID) + if err != nil { + log.Printf("can't get session from database to compare durations, sessID: %d, err: %s", sessionID, err) + } else { + newDur := timestamp - sess.Timestamp + // Skip if session was ended before with same duration + if currDuration == newDur { + duplicatedSessionEnds[sessionID] = currDuration + return true, int(DuplicatedSessionEnd) + } + // Skip if session was ended before with longer duration + if currDuration > newDur { + shorterDuration[sessionID] = int64(currDuration) - int64(newDur) + return true, int(ShorterDuration) + } + } newDuration, err := pg.InsertSessionEnd(sessionID, msg.Timestamp) if err != nil { if strings.Contains(err.Error(), "integer out of range") { // Skip session with broken duration failedSessionEnds[sessionID] = timestamp - return true + return true, int(FailedSessionEnd) + } + if strings.Contains(err.Error(), "is less than zero for uint64") { + negativeDuration[sessionID] = timestamp + return true, int(NegativeDuration) + } + if strings.Contains(err.Error(), "no rows in result set") { + noSessionInDB[sessionID] = timestamp + return true, int(NoSessionInDB) } log.Printf("can't save sessionEnd to database, sessID: %d, err: %s", sessionID, err) - return false + return false, 0 } + // Check one more time just in case if currDuration == newDuration { - // Skip session end duplicate duplicatedSessionEnds[sessionID] = currDuration - return true + return true, int(DuplicatedSessionEnd) } if cfg.UseEncryption { if key := storage.GenerateEncryptionKey(); key != nil { @@ -111,22 +152,34 @@ func main() { } if err := producer.Produce(cfg.TopicRawWeb, sessionID, msg.Encode()); err != nil { log.Printf("can't send sessionEnd to topic: %s; sessID: %d", err, sessionID) - return false + return false, 0 } - return true + if currDuration != 0 { + diffDuration[sessionID] = int64(newDuration) - int64(currDuration) + updatedDurations++ + } else { + newSessionEnds++ + } + return true, int(NewSessionEnd) }) - if len(failedSessionEnds) > 0 { - log.Println("sessions with wrong duration:", failedSessionEnds) + if n := len(failedSessionEnds); n > 0 { + log.Println("sessions with wrong duration:", n, failedSessionEnds) } - if len(duplicatedSessionEnds) > 0 { - log.Println("session end duplicates:", duplicatedSessionEnds) + if n := len(negativeDuration); n > 0 { + log.Println("sessions with negative duration:", n, negativeDuration) } + if n := len(noSessionInDB); n > 0 { + log.Printf("sessions without info in DB: %d, %v", n, noSessionInDB) + } + log.Printf("[INFO] failed: %d, negative: %d, shorter: %d, same: %d, updated: %d, new: %d, not found: %d", + len(failedSessionEnds), len(negativeDuration), len(shorterDuration), len(duplicatedSessionEnds), + updatedDurations, newSessionEnds, len(noSessionInDB)) producer.Flush(cfg.ProducerTimeout) if err := consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP); err != nil { log.Printf("can't commit messages with offset: %s", err) } case msg := <-consumer.Rebalanced(): - log.Println(msg) + log.Printf("Rebalanced: %+v", msg) default: if !memoryManager.HasFreeMemory() { continue diff --git a/backend/internal/sessionender/ender.go b/backend/internal/sessionender/ender.go index 26fcf850e..999f68b75 100644 --- a/backend/internal/sessionender/ender.go +++ b/backend/internal/sessionender/ender.go @@ -9,12 +9,12 @@ import ( ) // EndedSessionHandler handler for ended sessions -type EndedSessionHandler func(sessionID uint64, timestamp uint64) bool +type EndedSessionHandler func(sessionID uint64, timestamp uint64) (bool, int) // session holds information about user's session live status type session struct { - lastTimestamp int64 - lastUpdate int64 + lastTimestamp int64 // timestamp from message broker + lastUpdate int64 // local timestamp lastUserTime uint64 isEnded bool } @@ -24,6 +24,8 @@ type SessionEnder struct { timeout int64 sessions map[uint64]*session // map[sessionID]session timeCtrl *timeController + parts uint64 + enabled bool } func New(timeout int64, parts int) (*SessionEnder, error) { @@ -31,9 +33,38 @@ func New(timeout int64, parts int) (*SessionEnder, error) { timeout: timeout, sessions: make(map[uint64]*session), timeCtrl: NewTimeController(parts), + parts: uint64(parts), // ender uses all partitions by default + enabled: true, }, nil } +func (se *SessionEnder) Enable() { + se.enabled = true +} + +func (se *SessionEnder) Disable() { + se.enabled = false +} + +func (se *SessionEnder) ActivePartitions(parts []uint64) { + activeParts := make(map[uint64]bool, 0) + for _, p := range parts { + activeParts[p] = true + } + removedSessions := 0 + activeSessions := 0 + for sessID, _ := range se.sessions { + if !activeParts[sessID%se.parts] { + delete(se.sessions, sessID) + removedSessions++ + } else { + activeSessions++ + } + } + log.Printf("SessionEnder: %d sessions left in active partitions: %+v, removed %d sessions", + activeSessions, parts, removedSessions) +} + // UpdateSession save timestamp for new sessions and update for existing sessions func (se *SessionEnder) UpdateSession(msg messages.Message) { var ( @@ -46,14 +77,14 @@ func (se *SessionEnder) UpdateSession(msg messages.Message) { log.Printf("got empty timestamp for sessionID: %d", sessionID) return } - se.timeCtrl.UpdateTime(sessionID, batchTimestamp) + se.timeCtrl.UpdateTime(sessionID, batchTimestamp, localTimestamp) sess, ok := se.sessions[sessionID] if !ok { // Register new session se.sessions[sessionID] = &session{ - lastTimestamp: batchTimestamp, // timestamp from message broker - lastUpdate: localTimestamp, // local timestamp - lastUserTime: msgTimestamp, // last timestamp from user's machine + lastTimestamp: batchTimestamp, + lastUpdate: localTimestamp, + lastUserTime: msgTimestamp, // last timestamp from user's machine isEnded: false, } ender.IncreaseActiveSessions() @@ -74,21 +105,53 @@ func (se *SessionEnder) UpdateSession(msg messages.Message) { // HandleEndedSessions runs handler for each ended session and delete information about session in successful case func (se *SessionEnder) HandleEndedSessions(handler EndedSessionHandler) { + if !se.enabled { + log.Printf("SessionEnder is disabled") + return + } currTime := time.Now().UnixMilli() allSessions, removedSessions := len(se.sessions), 0 + brokerTime := make(map[int]int, 0) + serverTime := make(map[int]int, 0) + + isSessionEnded := func(sessID uint64, sess *session) (bool, int) { + // Has been finished already + if sess.isEnded { + return true, 1 + } + batchTimeDiff := se.timeCtrl.LastBatchTimestamp(sessID) - sess.lastTimestamp + + // Has been finished according to batch timestamp and hasn't been updated for a long time + if (batchTimeDiff >= se.timeout) && (currTime-sess.lastUpdate >= se.timeout) { + return true, 2 + } + + // Hasn't been finished according to batch timestamp but hasn't been read from partition for a long time + if (batchTimeDiff < se.timeout) && (currTime-se.timeCtrl.LastUpdateTimestamp(sessID) >= se.timeout) { + return true, 3 + } + return false, 0 + } + for sessID, sess := range se.sessions { - if sess.isEnded || (se.timeCtrl.LastTimestamp(sessID)-sess.lastTimestamp > se.timeout) || - (currTime-sess.lastUpdate > se.timeout) { + if ended, endCase := isSessionEnded(sessID, sess); ended { sess.isEnded = true - if handler(sessID, sess.lastUserTime) { + if res, _ := handler(sessID, sess.lastUserTime); res { delete(se.sessions, sessID) ender.DecreaseActiveSessions() ender.IncreaseClosedSessions() removedSessions++ + if endCase == 2 { + brokerTime[1]++ + } + if endCase == 3 { + serverTime[1]++ + } } else { log.Printf("sessID: %d, userTime: %d", sessID, sess.lastUserTime) } } } - log.Printf("Removed %d of %d sessions", removedSessions, allSessions) + log.Printf("Removed %d of %d sessions; brokerTime: %d, serverTime: %d", + removedSessions, allSessions, brokerTime, serverTime) } diff --git a/backend/internal/sessionender/timecontroller.go b/backend/internal/sessionender/timecontroller.go index 0126692c7..31b45482e 100644 --- a/backend/internal/sessionender/timecontroller.go +++ b/backend/internal/sessionender/timecontroller.go @@ -1,21 +1,28 @@ package sessionender type timeController struct { - parts uint64 - lastTimestamp map[uint64]int64 // map[partition]consumerTimeOfLastMessage + parts uint64 + lastBatchTimestamp map[uint64]int64 // map[partition]consumerTimeOfLastMessage + lastUpdateTimestamp map[uint64]int64 // map[partition]systemTimeOfLastMessage } func NewTimeController(parts int) *timeController { return &timeController{ - parts: uint64(parts), - lastTimestamp: make(map[uint64]int64), + parts: uint64(parts), + lastBatchTimestamp: make(map[uint64]int64), + lastUpdateTimestamp: make(map[uint64]int64), } } -func (tc *timeController) UpdateTime(sessionID uint64, timestamp int64) { - tc.lastTimestamp[sessionID%tc.parts] = timestamp +func (tc *timeController) UpdateTime(sessionID uint64, batchTimestamp, updateTimestamp int64) { + tc.lastBatchTimestamp[sessionID%tc.parts] = batchTimestamp + tc.lastUpdateTimestamp[sessionID%tc.parts] = updateTimestamp } -func (tc *timeController) LastTimestamp(sessionID uint64) int64 { - return tc.lastTimestamp[sessionID%tc.parts] +func (tc *timeController) LastBatchTimestamp(sessionID uint64) int64 { + return tc.lastBatchTimestamp[sessionID%tc.parts] +} + +func (tc *timeController) LastUpdateTimestamp(sessionID uint64) int64 { + return tc.lastUpdateTimestamp[sessionID%tc.parts] }