feat(backend): additional logs in messageHandler
This commit is contained in:
parent
ca78bca3d1
commit
a32ac65f35
2 changed files with 8 additions and 7 deletions
|
|
@ -1,6 +1,7 @@
|
|||
package builder
|
||||
|
||||
import (
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"openreplay/backend/internal/handlers"
|
||||
|
|
@ -8,6 +9,7 @@ import (
|
|||
)
|
||||
|
||||
type builder struct {
|
||||
sessionID uint64
|
||||
readyMsgs []Message
|
||||
timestamp uint64
|
||||
lastMessageID uint64
|
||||
|
|
@ -16,8 +18,9 @@ type builder struct {
|
|||
ended bool
|
||||
}
|
||||
|
||||
func NewBuilder(handlers ...handlers.MessageProcessor) *builder {
|
||||
func NewBuilder(sessionID uint64, handlers ...handlers.MessageProcessor) *builder {
|
||||
return &builder{
|
||||
sessionID: sessionID,
|
||||
processors: handlers,
|
||||
}
|
||||
}
|
||||
|
|
@ -41,18 +44,16 @@ func (b *builder) checkSessionEnd(message Message) {
|
|||
func (b *builder) handleMessage(message Message, messageID uint64) {
|
||||
if messageID < b.lastMessageID {
|
||||
// May happen in case of duplicated messages in kafka (if `idempotence: false`)
|
||||
log.Printf("skip message with wrong msgID, sessID: %d, msgID: %d, lastID: %d", b.sessionID, messageID, b.lastMessageID)
|
||||
return
|
||||
}
|
||||
timestamp := GetTimestamp(message)
|
||||
if timestamp == 0 {
|
||||
// May happen in case of messages that are single-in-batch,
|
||||
// e.g. SessionStart or RawErrorEvent (emitted by `integrations`).
|
||||
|
||||
// TODO: make timestamp system transparent;
|
||||
log.Printf("skip message with empty timestamp, sessID: %d, msgID: %d, msgType: %d", b.sessionID, messageID, message.TypeID())
|
||||
return
|
||||
}
|
||||
if timestamp < b.timestamp {
|
||||
// Shouldn't happen after messageID check which is done above. TODO: log this case.
|
||||
log.Printf("skip message with wrong timestamp, sessID: %d, msgID: %d, type: %d", b.sessionID, messageID, message.TypeID())
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ func NewBuilderMap(handlersFabric func() []handlers.MessageProcessor) *builderMa
|
|||
func (m *builderMap) GetBuilder(sessionID uint64) *builder {
|
||||
b := m.sessions[sessionID]
|
||||
if b == nil {
|
||||
b = NewBuilder(m.handlersFabric()...) // Should create new instances
|
||||
b = NewBuilder(sessionID, m.handlersFabric()...) // Should create new instances
|
||||
m.sessions[sessionID] = b
|
||||
}
|
||||
return b
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue