diff --git a/backend/internal/builder/builder.go b/backend/internal/builder/builder.go index dd7bb675a..f7c0d2f91 100644 --- a/backend/internal/builder/builder.go +++ b/backend/internal/builder/builder.go @@ -1,6 +1,7 @@ package builder import ( + "log" "time" "openreplay/backend/internal/handlers" @@ -8,6 +9,7 @@ import ( ) type builder struct { + sessionID uint64 readyMsgs []Message timestamp uint64 lastMessageID uint64 @@ -16,8 +18,9 @@ type builder struct { ended bool } -func NewBuilder(handlers ...handlers.MessageProcessor) *builder { +func NewBuilder(sessionID uint64, handlers ...handlers.MessageProcessor) *builder { return &builder{ + sessionID: sessionID, processors: handlers, } } @@ -41,18 +44,16 @@ func (b *builder) checkSessionEnd(message Message) { func (b *builder) handleMessage(message Message, messageID uint64) { if messageID < b.lastMessageID { // May happen in case of duplicated messages in kafka (if `idempotence: false`) + log.Printf("skip message with wrong msgID, sessID: %d, msgID: %d, lastID: %d", b.sessionID, messageID, b.lastMessageID) return } timestamp := GetTimestamp(message) if timestamp == 0 { - // May happen in case of messages that are single-in-batch, - // e.g. SessionStart or RawErrorEvent (emitted by `integrations`). - - // TODO: make timestamp system transparent; + log.Printf("skip message with empty timestamp, sessID: %d, msgID: %d, msgType: %d", b.sessionID, messageID, message.TypeID()) return } if timestamp < b.timestamp { - // Shouldn't happen after messageID check which is done above. TODO: log this case. + log.Printf("skip message with wrong timestamp, sessID: %d, msgID: %d, type: %d", b.sessionID, messageID, message.TypeID()) return } diff --git a/backend/internal/builder/builderMap.go b/backend/internal/builder/builderMap.go index af2ecf0d5..d905cdb45 100644 --- a/backend/internal/builder/builderMap.go +++ b/backend/internal/builder/builderMap.go @@ -24,7 +24,7 @@ func NewBuilderMap(handlersFabric func() []handlers.MessageProcessor) *builderMa func (m *builderMap) GetBuilder(sessionID uint64) *builder { b := m.sessions[sessionID] if b == nil { - b = NewBuilder(m.handlersFabric()...) // Should create new instances + b = NewBuilder(sessionID, m.handlersFabric()...) // Should create new instances m.sessions[sessionID] = b } return b