diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index dbe2fa212..5d82b67db 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -3,7 +3,7 @@ package main import ( "log" "openreplay/backend/internal/config/ender" - builder "openreplay/backend/internal/ender" + "openreplay/backend/internal/sessionender" "time" "os" @@ -20,10 +20,12 @@ import ( func main() { log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) + // Load service configuration cfg := ender.New() - builderMap := builder.NewBuilderMap() + // Init all modules statsLogger := logger.NewQueueStats(cfg.LoggerTimeout) + sessions := sessionender.New(intervals.EVENTS_SESSION_END_TIMEOUT) producer := queue.NewProducer() consumer := queue.NewMessageConsumer( cfg.GroupEvents, @@ -33,31 +35,41 @@ func main() { }, func(sessionID uint64, msg messages.Message, meta *types.Meta) { statsLogger.Collect(sessionID, meta) - builderMap.HandleMessage(sessionID, msg, msg.Meta().Index) + sessions.UpdateSession(sessionID, messages.GetTimestamp(msg)) }, false, ) - tick := time.Tick(intervals.EVENTS_COMMIT_INTERVAL * time.Millisecond) + log.Printf("Ender service started\n") sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - log.Printf("Ender service started\n") + tick := time.Tick(intervals.EVENTS_COMMIT_INTERVAL * time.Millisecond) for { select { case sig := <-sigchan: log.Printf("Caught signal %v: terminating\n", sig) producer.Close(cfg.ProducerTimeout) - consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP) + if err := consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP); err != nil { + log.Printf("can't commit messages with offset: %s", err) + } consumer.Close() os.Exit(0) case <-tick: - builderMap.IterateReadyMessages(time.Now().UnixMilli(), func(sessionID uint64, readyMsg messages.Message) { - producer.Produce(cfg.TopicTrigger, sessionID, messages.Encode(readyMsg)) + // Find ended sessions and send notification to other services + sessions.HandleEndedSessions(func(sessionID uint64, timestamp int64) bool { + msg := &messages.SessionEnd{Timestamp: uint64(timestamp)} + if err := producer.Produce(cfg.TopicTrigger, sessionID, messages.Encode(msg)); err != nil { + log.Printf("can't send message to queue: %s", err) + return false + } + return true }) producer.Flush(cfg.ProducerTimeout) - consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP) + if err := consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP); err != nil { + log.Printf("can't commit messages with offset: %s", err) + } default: if err := consumer.ConsumeNext(); err != nil { log.Fatalf("Error on consuming: %v", err) diff --git a/backend/internal/ender/builder.go b/backend/internal/ender/builder.go deleted file mode 100644 index 0389f74d1..000000000 --- a/backend/internal/ender/builder.go +++ /dev/null @@ -1,56 +0,0 @@ -package builder - -import ( - "log" - "openreplay/backend/pkg/intervals" - . "openreplay/backend/pkg/messages" -) - -type builder struct { - readyMsgs []Message - timestamp uint64 - sid uint64 -} - -func NewBuilder() *builder { - return &builder{} -} - -func (b *builder) appendReadyMessage(msg Message) { // interface is never nil even if it holds nil value - b.readyMsgs = append(b.readyMsgs, msg) -} - -func (b *builder) buildSessionEnd() { - if b.timestamp == 0 { - return - } - sessionEnd := &SessionEnd{ - Timestamp: b.timestamp, - } - b.appendReadyMessage(sessionEnd) -} - -func (b *builder) handleMessage(message Message, messageID uint64) { - timestamp := GetTimestamp(message) - if b.timestamp < timestamp { - b.timestamp = timestamp - } - - if b.timestamp == 0 { - log.Printf("Empty timestamp, sessionID: %d, messageID: %d", b.sid, messageID) - return - } -} - -func (b *builder) checkTimeouts(ts int64) bool { - if b.timestamp == 0 { - return false // There was no timestamp events yet - } - - lastTsGap := ts - int64(b.timestamp) - if lastTsGap > intervals.EVENTS_SESSION_END_TIMEOUT { - b.buildSessionEnd() - return true - } - return false -} diff --git a/backend/internal/ender/builderMap.go b/backend/internal/ender/builderMap.go deleted file mode 100644 index 6eba1f9ad..000000000 --- a/backend/internal/ender/builderMap.go +++ /dev/null @@ -1,36 +0,0 @@ -package builder - -import ( - . "openreplay/backend/pkg/messages" -) - -type builderMap map[uint64]*builder - -func NewBuilderMap() builderMap { - return make(builderMap) -} - -func (m builderMap) GetBuilder(sessionID uint64) *builder { - b := m[sessionID] - if b == nil { - b = NewBuilder() - m[sessionID] = b - b.sid = sessionID - - } - return b -} - -func (m builderMap) HandleMessage(sessionID uint64, msg Message, messageID uint64) { - b := m.GetBuilder(sessionID) - b.handleMessage(msg, messageID) -} - -func (m builderMap) IterateReadyMessages(operatingTs int64, iter func(sessionID uint64, msg Message)) { - for sessionID, b := range m { - sessionEnded := b.checkTimeouts(operatingTs) - if sessionEnded { - delete(m, sessionID) - } - } -} diff --git a/backend/internal/sessionender/ender.go b/backend/internal/sessionender/ender.go new file mode 100644 index 000000000..54bd399ac --- /dev/null +++ b/backend/internal/sessionender/ender.go @@ -0,0 +1,62 @@ +package sessionender + +import ( + "log" + "time" +) + +// EndedSessionHandler handler for ended sessions +type EndedSessionHandler func(sessionID uint64, timestamp int64) bool + +// session holds information about user's session live status +type session struct { + lastTimestamp int64 + isEnded bool +} + +// SessionEnder updates timestamp of last message for each session +type SessionEnder struct { + timeout int64 + sessions map[uint64]*session // map[sessionID]session +} + +func New(timeout int64) *SessionEnder { + return &SessionEnder{ + timeout: timeout, + sessions: make(map[uint64]*session), + } +} + +// UpdateSession save timestamp for new sessions and update for existing sessions +func (se *SessionEnder) UpdateSession(sessionID, timestamp uint64) { + currTS := int64(timestamp) + if currTS == 0 { + log.Printf("got empty timestamp for sessionID: %d", sessionID) + return + } + sess, ok := se.sessions[sessionID] + if !ok { + se.sessions[sessionID] = &session{ + lastTimestamp: currTS, + isEnded: false, + } + return + } + if currTS > sess.lastTimestamp { + sess.lastTimestamp = currTS + sess.isEnded = false + } +} + +// HandleEndedSessions runs handler for each ended session and delete information about session in successful case +func (se *SessionEnder) HandleEndedSessions(handler EndedSessionHandler) { + deadLine := time.Now().UnixMilli() - se.timeout + for sessID, sess := range se.sessions { + if sess.isEnded || sess.lastTimestamp < deadLine { + sess.isEnded = true + if handler(sessID, sess.lastTimestamp) { + delete(se.sessions, sessID) + } + } + } +}