From 85b87e17dfbf91eab8e3ebe239fd3e668f224cf1 Mon Sep 17 00:00:00 2001 From: Alex Kaminskii Date: Wed, 11 May 2022 21:14:23 +0200 Subject: [PATCH] refactor(backend/internals): builder: message order & timestamps check --- backend/internal/builder/builder.go | 30 ++++++++++++++++++-------- backend/internal/builder/builderMap.go | 6 +++++- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/backend/internal/builder/builder.go b/backend/internal/builder/builder.go index ff3d91e1b..38a64ab54 100644 --- a/backend/internal/builder/builder.go +++ b/backend/internal/builder/builder.go @@ -6,10 +6,12 @@ import ( ) type builder struct { - readyMsgs []Message - timestamp uint64 - processors []handlers.MessageProcessor - ended bool + readyMsgs []Message + timestamp uint64 + lastMessageID uint64 + lastSystemTimestamp int64 + processors []handlers.MessageProcessor + ended bool } func NewBuilder(handlers ...handlers.MessageProcessor) *builder { @@ -35,15 +37,25 @@ func (b *builder) checkSessionEnd(message Message) { } func (b *builder) handleMessage(message Message, messageID uint64) { - timestamp := GetTimestamp(message) - if b.timestamp < timestamp { - b.timestamp = timestamp + if messageID < b.lastMessageID { + // May happen in case of duplicated messages in kafka (if `idempotence: false`) + return } - if b.timestamp == 0 { - // in case of SessionStart. TODO: make timestamp system transparent + timestamp := GetTimestamp(message) + if timestamp == 0 { + // May happen in case of messages that are single-in-batch, + // e.g. SessionStart or RawErrorEvent (emitted by `integrations`). + + // TODO: make timestamp system transparent; + return + } + if timestamp < b.timestamp { + // Shouldn't happen after messageID check which is done above. TODO: log this case. return } + b.timestamp = timestamp + b.lastSystemTimestamp = time.Now().UnixMilli() for _, p := range b.processors { if rm := p.Handle(message, messageID, b.timestamp); rm != nil { b.readyMsgs = append(b.readyMsgs, rm) diff --git a/backend/internal/builder/builderMap.go b/backend/internal/builder/builderMap.go index ed47abdce..fcd52b0cc 100644 --- a/backend/internal/builder/builderMap.go +++ b/backend/internal/builder/builderMap.go @@ -1,10 +1,14 @@ package builder import ( + "time" + "openreplay/backend/internal/handlers" . "openreplay/backend/pkg/messages" ) +const FORCE_DELETE_TIMEOUT = 4 * time.Hour + type builderMap struct { handlersFabric func() []handlers.MessageProcessor sessions map[uint64]*builder @@ -32,7 +36,7 @@ func (m *builderMap) HandleMessage(sessionID uint64, msg Message, messageID uint } func (m *builderMap) iterateSessionReadyMessages(sessionID uint64, b *builder, iter func(msg Message)) { - if b.ended { + if b.ended || b.lastSystemTimestamp+FORCE_DELETE_TIMEOUT < time.Now().UnixMilli() { for _, p := range b.processors { if rm := p.Build(); rm != nil { b.readyMsgs = append(b.readyMsgs, rm)