From f6123c1c0814d056e19fe34d323f8081b2825ea1 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 3 Aug 2023 15:26:23 +0400 Subject: [PATCH] Ender logic update (#1435) (#1436) * Ender logic update (#1435) * feat(backend): new session end detection logic + several fixes * feat(backend): support partitions managment in ender * feat(backend): added new consumer support to redis * feat(backend): added support for new consumer in kafka * feat(backend): added new consumer support to redis (ee) * feat(backend): small refactoring in ender * fix(backend): fixed missing js_exception issue type in sessions table --- backend/cmd/ender/main.go | 84 +++++++++++++++--- backend/internal/db/datasaver/saver.go | 2 +- backend/internal/sessionender/ender.go | 85 ++++++++++++++++--- .../internal/sessionender/timecontroller.go | 23 +++-- backend/pkg/db/types/error-event.go | 2 +- backend/pkg/queue/types/types.go | 14 ++- backend/pkg/redisstream/consumer.go | 7 +- ee/backend/pkg/db/redis/consumer.go | 6 +- ee/backend/pkg/kafka/consumer.go | 21 ++++- 9 files changed, 200 insertions(+), 44 deletions(-) diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index 5f84e0983..23af6a05f 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -6,6 +6,7 @@ import ( "openreplay/backend/pkg/db/redis" "openreplay/backend/pkg/memory" "openreplay/backend/pkg/projects" + "openreplay/backend/pkg/queue/types" "openreplay/backend/pkg/sessions" "os" "os/signal" @@ -94,28 +95,69 @@ func main() { case <-tick: failedSessionEnds := make(map[uint64]uint64) duplicatedSessionEnds := make(map[uint64]uint64) + negativeDuration := make(map[uint64]uint64) + shorterDuration := make(map[uint64]int64) + diffDuration := make(map[uint64]int64) + noSessionInDB := make(map[uint64]uint64) + updatedDurations := 0 + newSessionEnds := 0 + + type SessionEndType int + const ( + FailedSessionEnd SessionEndType = iota + 1 + DuplicatedSessionEnd + NegativeDuration + ShorterDuration + DiffDuration + NewSessionEnd + NoSessionInDB + ) // Find ended sessions and send notification to other services - sessionEndGenerator.HandleEndedSessions(func(sessionID uint64, timestamp uint64) bool { + sessionEndGenerator.HandleEndedSessions(func(sessionID uint64, timestamp uint64) (bool, int) { msg := &messages.SessionEnd{Timestamp: timestamp} currDuration, err := sessManager.GetDuration(sessionID) if err != nil { log.Printf("getSessionDuration failed, sessID: %d, err: %s", sessionID, err) } + sess, err := sessManager.Get(sessionID) + if err != nil { + log.Printf("can't get session from database to compare durations, sessID: %d, err: %s", sessionID, err) + } else { + newDur := timestamp - sess.Timestamp + // Skip if session was ended before with same duration + if currDuration == newDur { + duplicatedSessionEnds[sessionID] = currDuration + return true, int(DuplicatedSessionEnd) + } + // Skip if session was ended before with longer duration + if currDuration > newDur { + shorterDuration[sessionID] = int64(currDuration) - int64(newDur) + return true, int(ShorterDuration) + } + } newDuration, err := sessManager.UpdateDuration(sessionID, msg.Timestamp) if err != nil { if strings.Contains(err.Error(), "integer out of range") { // Skip session with broken duration failedSessionEnds[sessionID] = timestamp - return true + return true, int(FailedSessionEnd) + } + if strings.Contains(err.Error(), "is less than zero for uint64") { + negativeDuration[sessionID] = timestamp + return true, int(NegativeDuration) + } + if strings.Contains(err.Error(), "no rows in result set") { + noSessionInDB[sessionID] = timestamp + return true, int(NoSessionInDB) } log.Printf("can't save sessionEnd to database, sessID: %d, err: %s", sessionID, err) - return false + return false, 0 } + // Check one more time just in case if currDuration == newDuration { - // Skip session end duplicate duplicatedSessionEnds[sessionID] = currDuration - return true + return true, int(DuplicatedSessionEnd) } if cfg.UseEncryption { if key := storage.GenerateEncryptionKey(); key != nil { @@ -128,22 +170,40 @@ func main() { } if err := producer.Produce(cfg.TopicRawWeb, sessionID, msg.Encode()); err != nil { log.Printf("can't send sessionEnd to topic: %s; sessID: %d", err, sessionID) - return false + return false, 0 } - return true + if currDuration != 0 { + diffDuration[sessionID] = int64(newDuration) - int64(currDuration) + updatedDurations++ + } else { + newSessionEnds++ + } + return true, int(NewSessionEnd) }) - if len(failedSessionEnds) > 0 { - log.Println("sessions with wrong duration:", failedSessionEnds) + if n := len(failedSessionEnds); n > 0 { + log.Println("sessions with wrong duration:", n, failedSessionEnds) } - if len(duplicatedSessionEnds) > 0 { - log.Println("session end duplicates:", duplicatedSessionEnds) + if n := len(negativeDuration); n > 0 { + log.Println("sessions with negative duration:", n, negativeDuration) } + if n := len(noSessionInDB); n > 0 { + log.Printf("sessions without info in DB: %d, %v", n, noSessionInDB) + } + log.Printf("[INFO] failed: %d, negative: %d, shorter: %d, same: %d, updated: %d, new: %d, not found: %d", + len(failedSessionEnds), len(negativeDuration), len(shorterDuration), len(duplicatedSessionEnds), + updatedDurations, newSessionEnds, len(noSessionInDB)) producer.Flush(cfg.ProducerTimeout) if err := consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP); err != nil { log.Printf("can't commit messages with offset: %s", err) } case msg := <-consumer.Rebalanced(): - log.Println(msg) + log.Printf("Rebalanced event, type: %s, partitions: %+v", msg.Type, msg.Partitions) + if msg.Type == types.RebalanceTypeRevoke { + sessionEndGenerator.Disable() + } else { + sessionEndGenerator.ActivePartitions(msg.Partitions) + sessionEndGenerator.Enable() + } default: if !memoryManager.HasFreeMemory() { continue diff --git a/backend/internal/db/datasaver/saver.go b/backend/internal/db/datasaver/saver.go index fdd74c47c..a1247c424 100644 --- a/backend/internal/db/datasaver/saver.go +++ b/backend/internal/db/datasaver/saver.go @@ -123,7 +123,7 @@ func (s *saverImpl) handleMessage(msg Message) error { if err = s.pg.InsertWebErrorEvent(session, types.WrapJSException(m)); err != nil { return err } - return s.sessions.UpdateIssuesStats(session.SessionID, 0, 1000) + return s.sessions.UpdateIssuesStats(session.SessionID, 1, 1000) case *IntegrationEvent: return s.pg.InsertWebErrorEvent(session, types.WrapIntegrationEvent(m)) case *InputChange: diff --git a/backend/internal/sessionender/ender.go b/backend/internal/sessionender/ender.go index 26fcf850e..999f68b75 100644 --- a/backend/internal/sessionender/ender.go +++ b/backend/internal/sessionender/ender.go @@ -9,12 +9,12 @@ import ( ) // EndedSessionHandler handler for ended sessions -type EndedSessionHandler func(sessionID uint64, timestamp uint64) bool +type EndedSessionHandler func(sessionID uint64, timestamp uint64) (bool, int) // session holds information about user's session live status type session struct { - lastTimestamp int64 - lastUpdate int64 + lastTimestamp int64 // timestamp from message broker + lastUpdate int64 // local timestamp lastUserTime uint64 isEnded bool } @@ -24,6 +24,8 @@ type SessionEnder struct { timeout int64 sessions map[uint64]*session // map[sessionID]session timeCtrl *timeController + parts uint64 + enabled bool } func New(timeout int64, parts int) (*SessionEnder, error) { @@ -31,9 +33,38 @@ func New(timeout int64, parts int) (*SessionEnder, error) { timeout: timeout, sessions: make(map[uint64]*session), timeCtrl: NewTimeController(parts), + parts: uint64(parts), // ender uses all partitions by default + enabled: true, }, nil } +func (se *SessionEnder) Enable() { + se.enabled = true +} + +func (se *SessionEnder) Disable() { + se.enabled = false +} + +func (se *SessionEnder) ActivePartitions(parts []uint64) { + activeParts := make(map[uint64]bool, 0) + for _, p := range parts { + activeParts[p] = true + } + removedSessions := 0 + activeSessions := 0 + for sessID, _ := range se.sessions { + if !activeParts[sessID%se.parts] { + delete(se.sessions, sessID) + removedSessions++ + } else { + activeSessions++ + } + } + log.Printf("SessionEnder: %d sessions left in active partitions: %+v, removed %d sessions", + activeSessions, parts, removedSessions) +} + // UpdateSession save timestamp for new sessions and update for existing sessions func (se *SessionEnder) UpdateSession(msg messages.Message) { var ( @@ -46,14 +77,14 @@ func (se *SessionEnder) UpdateSession(msg messages.Message) { log.Printf("got empty timestamp for sessionID: %d", sessionID) return } - se.timeCtrl.UpdateTime(sessionID, batchTimestamp) + se.timeCtrl.UpdateTime(sessionID, batchTimestamp, localTimestamp) sess, ok := se.sessions[sessionID] if !ok { // Register new session se.sessions[sessionID] = &session{ - lastTimestamp: batchTimestamp, // timestamp from message broker - lastUpdate: localTimestamp, // local timestamp - lastUserTime: msgTimestamp, // last timestamp from user's machine + lastTimestamp: batchTimestamp, + lastUpdate: localTimestamp, + lastUserTime: msgTimestamp, // last timestamp from user's machine isEnded: false, } ender.IncreaseActiveSessions() @@ -74,21 +105,53 @@ func (se *SessionEnder) UpdateSession(msg messages.Message) { // HandleEndedSessions runs handler for each ended session and delete information about session in successful case func (se *SessionEnder) HandleEndedSessions(handler EndedSessionHandler) { + if !se.enabled { + log.Printf("SessionEnder is disabled") + return + } currTime := time.Now().UnixMilli() allSessions, removedSessions := len(se.sessions), 0 + brokerTime := make(map[int]int, 0) + serverTime := make(map[int]int, 0) + + isSessionEnded := func(sessID uint64, sess *session) (bool, int) { + // Has been finished already + if sess.isEnded { + return true, 1 + } + batchTimeDiff := se.timeCtrl.LastBatchTimestamp(sessID) - sess.lastTimestamp + + // Has been finished according to batch timestamp and hasn't been updated for a long time + if (batchTimeDiff >= se.timeout) && (currTime-sess.lastUpdate >= se.timeout) { + return true, 2 + } + + // Hasn't been finished according to batch timestamp but hasn't been read from partition for a long time + if (batchTimeDiff < se.timeout) && (currTime-se.timeCtrl.LastUpdateTimestamp(sessID) >= se.timeout) { + return true, 3 + } + return false, 0 + } + for sessID, sess := range se.sessions { - if sess.isEnded || (se.timeCtrl.LastTimestamp(sessID)-sess.lastTimestamp > se.timeout) || - (currTime-sess.lastUpdate > se.timeout) { + if ended, endCase := isSessionEnded(sessID, sess); ended { sess.isEnded = true - if handler(sessID, sess.lastUserTime) { + if res, _ := handler(sessID, sess.lastUserTime); res { delete(se.sessions, sessID) ender.DecreaseActiveSessions() ender.IncreaseClosedSessions() removedSessions++ + if endCase == 2 { + brokerTime[1]++ + } + if endCase == 3 { + serverTime[1]++ + } } else { log.Printf("sessID: %d, userTime: %d", sessID, sess.lastUserTime) } } } - log.Printf("Removed %d of %d sessions", removedSessions, allSessions) + log.Printf("Removed %d of %d sessions; brokerTime: %d, serverTime: %d", + removedSessions, allSessions, brokerTime, serverTime) } diff --git a/backend/internal/sessionender/timecontroller.go b/backend/internal/sessionender/timecontroller.go index 0126692c7..31b45482e 100644 --- a/backend/internal/sessionender/timecontroller.go +++ b/backend/internal/sessionender/timecontroller.go @@ -1,21 +1,28 @@ package sessionender type timeController struct { - parts uint64 - lastTimestamp map[uint64]int64 // map[partition]consumerTimeOfLastMessage + parts uint64 + lastBatchTimestamp map[uint64]int64 // map[partition]consumerTimeOfLastMessage + lastUpdateTimestamp map[uint64]int64 // map[partition]systemTimeOfLastMessage } func NewTimeController(parts int) *timeController { return &timeController{ - parts: uint64(parts), - lastTimestamp: make(map[uint64]int64), + parts: uint64(parts), + lastBatchTimestamp: make(map[uint64]int64), + lastUpdateTimestamp: make(map[uint64]int64), } } -func (tc *timeController) UpdateTime(sessionID uint64, timestamp int64) { - tc.lastTimestamp[sessionID%tc.parts] = timestamp +func (tc *timeController) UpdateTime(sessionID uint64, batchTimestamp, updateTimestamp int64) { + tc.lastBatchTimestamp[sessionID%tc.parts] = batchTimestamp + tc.lastUpdateTimestamp[sessionID%tc.parts] = updateTimestamp } -func (tc *timeController) LastTimestamp(sessionID uint64) int64 { - return tc.lastTimestamp[sessionID%tc.parts] +func (tc *timeController) LastBatchTimestamp(sessionID uint64) int64 { + return tc.lastBatchTimestamp[sessionID%tc.parts] +} + +func (tc *timeController) LastUpdateTimestamp(sessionID uint64) int64 { + return tc.lastUpdateTimestamp[sessionID%tc.parts] } diff --git a/backend/pkg/db/types/error-event.go b/backend/pkg/db/types/error-event.go index 9f2f1a886..55dae8872 100644 --- a/backend/pkg/db/types/error-event.go +++ b/backend/pkg/db/types/error-event.go @@ -65,7 +65,7 @@ func WrapJSException(m *JSException) *ErrorEvent { } return &ErrorEvent{ MessageID: m.Meta().Index, - Timestamp: uint64(m.Meta().Timestamp), + Timestamp: m.Meta().Timestamp, Source: SOURCE_JS, Name: m.Name, Message: m.Message, diff --git a/backend/pkg/queue/types/types.go b/backend/pkg/queue/types/types.go index 21ee49d60..1f789d793 100644 --- a/backend/pkg/queue/types/types.go +++ b/backend/pkg/queue/types/types.go @@ -1,12 +1,24 @@ package types +type RebalanceType string + +const ( + RebalanceTypeAssign RebalanceType = "assign" + RebalanceTypeRevoke RebalanceType = "revoke" +) + +type PartitionsRebalancedEvent struct { + Type RebalanceType + Partitions []uint64 +} + // Consumer reads batches of session data from queue (redis or kafka) type Consumer interface { ConsumeNext() error CommitBack(gap int64) error Commit() error Close() - Rebalanced() <-chan interface{} + Rebalanced() <-chan *PartitionsRebalancedEvent } // Producer sends batches of session data to queue (redis or kafka) diff --git a/backend/pkg/redisstream/consumer.go b/backend/pkg/redisstream/consumer.go index 3c5b6d0a4..fcbd15db5 100644 --- a/backend/pkg/redisstream/consumer.go +++ b/backend/pkg/redisstream/consumer.go @@ -4,6 +4,7 @@ import ( "log" "net" "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/queue/types" "sort" "strconv" "strings" @@ -27,7 +28,7 @@ type Consumer struct { idsPending streamPendingIDsMap lastTs int64 autoCommit bool - event chan interface{} + event chan *types.PartitionsRebalancedEvent } func NewConsumer(group string, streams []string, messageIterator messages.MessageIterator) *Consumer { @@ -58,13 +59,13 @@ func NewConsumer(group string, streams []string, messageIterator messages.Messag group: group, autoCommit: true, idsPending: idsPending, - event: make(chan interface{}, 4), + event: make(chan *types.PartitionsRebalancedEvent, 4), } } const READ_COUNT = 10 -func (c *Consumer) Rebalanced() <-chan interface{} { +func (c *Consumer) Rebalanced() <-chan *types.PartitionsRebalancedEvent { return c.event } diff --git a/ee/backend/pkg/db/redis/consumer.go b/ee/backend/pkg/db/redis/consumer.go index eb89b48c1..2d9c68f2a 100644 --- a/ee/backend/pkg/db/redis/consumer.go +++ b/ee/backend/pkg/db/redis/consumer.go @@ -28,7 +28,7 @@ type consumerImpl struct { idsPending streamPendingIDsMap lastTs int64 autoCommit bool - event chan interface{} + event chan *types.PartitionsRebalancedEvent } type QueueMessage struct { @@ -67,7 +67,7 @@ func NewConsumer(client *Client, group string, streams []string) types.Consumer group: group, autoCommit: true, idsPending: idsPending, - event: make(chan interface{}, 4), + event: make(chan *types.PartitionsRebalancedEvent, 4), } } @@ -169,6 +169,6 @@ func (c *consumerImpl) Commit() error { return nil } -func (c *consumerImpl) Rebalanced() <-chan interface{} { +func (c *consumerImpl) Rebalanced() <-chan *types.PartitionsRebalancedEvent { return c.event } diff --git a/ee/backend/pkg/kafka/consumer.go b/ee/backend/pkg/kafka/consumer.go index fc8c98eaa..403d8b94e 100644 --- a/ee/backend/pkg/kafka/consumer.go +++ b/ee/backend/pkg/kafka/consumer.go @@ -7,6 +7,7 @@ import ( "openreplay/backend/pkg/env" "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/queue/types" "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/pkg/errors" @@ -20,6 +21,7 @@ type Consumer struct { commitTicker *time.Ticker pollTimeout uint events chan interface{} + rebalanced chan *types.PartitionsRebalancedEvent lastReceivedPrtTs map[int32]int64 } @@ -72,7 +74,8 @@ func NewConsumer( messageIterator: messageIterator, commitTicker: commitTicker, pollTimeout: 200, - events: make(chan interface{}, 4), + events: make(chan interface{}, 32), + rebalanced: make(chan *types.PartitionsRebalancedEvent, 32), lastReceivedPrtTs: make(map[int32]int64, 16), } @@ -96,15 +99,25 @@ func (consumer *Consumer) reBalanceCallback(_ *kafka.Consumer, e kafka.Event) er case kafka.RevokedPartitions: // receive before re-balancing partitions; stop consuming messages and commit current state consumer.events <- evt.String() + parts := make([]uint64, len(evt.Partitions)) + for i, p := range evt.Partitions { + parts[i] = uint64(p.Partition) + } + consumer.rebalanced <- &types.PartitionsRebalancedEvent{Type: types.RebalanceTypeRevoke, Partitions: parts} case kafka.AssignedPartitions: // receive after re-balancing partitions; continue consuming messages - //consumer.events <- evt.String() + consumer.events <- evt.String() + parts := make([]uint64, len(evt.Partitions)) + for i, p := range evt.Partitions { + parts[i] = uint64(p.Partition) + } + consumer.rebalanced <- &types.PartitionsRebalancedEvent{Type: types.RebalanceTypeAssign, Partitions: parts} } return nil } -func (consumer *Consumer) Rebalanced() <-chan interface{} { - return consumer.events +func (consumer *Consumer) Rebalanced() <-chan *types.PartitionsRebalancedEvent { + return consumer.rebalanced } func (consumer *Consumer) Commit() error {