From bef74d5284675dbda36608ac590d8f012db008b9 Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Tue, 6 Sep 2022 14:35:16 +0200 Subject: [PATCH] feat(backend): skip corrupted batch --- backend/cmd/heuristics/main.go | 9 ++++++++- backend/pkg/messages/batch.go | 2 +- backend/pkg/messages/raw.go | 1 + 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/backend/cmd/heuristics/main.go b/backend/cmd/heuristics/main.go index 977cbda9d..49b3326bb 100644 --- a/backend/cmd/heuristics/main.go +++ b/backend/cmd/heuristics/main.go @@ -53,9 +53,16 @@ func main() { cfg.TopicRawWeb, }, func(sessionID uint64, iter messages.Iterator, meta *types.Meta) { + var lastMessageID uint64 for iter.Next() { statsLogger.Collect(sessionID, meta) - builderMap.HandleMessage(sessionID, iter.Message().Decode(), iter.Message().Meta().Index) + msg := iter.Message().Decode() + if msg == nil { + log.Printf("failed batch, sess: %d, lastIndex: %d", sessionID, lastMessageID) + continue + } + lastMessageID = msg.Meta().Index + builderMap.HandleMessage(sessionID, msg, iter.Message().Meta().Index) } }, false, diff --git a/backend/pkg/messages/batch.go b/backend/pkg/messages/batch.go index 955d0cfc0..7dd6172bd 100644 --- a/backend/pkg/messages/batch.go +++ b/backend/pkg/messages/batch.go @@ -90,7 +90,7 @@ func (i *iteratorImpl) Next() bool { switch i.msgType { case MsgBatchMetadata: if i.index != 0 { // Might be several 0-0 BatchMeta in a row without an error though - log.Printf("Batch Meta found at the end of the batch") + log.Printf("Batch Metadata found at the end of the batch") return false } m := i.msg.Decode().(*BatchMetadata) diff --git a/backend/pkg/messages/raw.go b/backend/pkg/messages/raw.go index daa59accd..b9dba5de2 100644 --- a/backend/pkg/messages/raw.go +++ b/backend/pkg/messages/raw.go @@ -54,6 +54,7 @@ func (m *RawMessage) Decode() Message { msg, err := ReadMessage(m.tp, bytes.NewReader(m.data[1:])) if err != nil { log.Printf("decode err: %s", err) + return nil } msg.Meta().SetMeta(m.meta) return msg