feat(backend): skip corrupted batch

This commit is contained in:
Alexander Zavorotynskiy 2022-09-06 14:35:16 +02:00
parent fb64706282
commit bef74d5284
3 changed files with 10 additions and 2 deletions

View file

@ -53,9 +53,16 @@ func main() {
cfg.TopicRawWeb,
},
func(sessionID uint64, iter messages.Iterator, meta *types.Meta) {
var lastMessageID uint64
for iter.Next() {
statsLogger.Collect(sessionID, meta)
builderMap.HandleMessage(sessionID, iter.Message().Decode(), iter.Message().Meta().Index)
msg := iter.Message().Decode()
if msg == nil {
log.Printf("failed batch, sess: %d, lastIndex: %d", sessionID, lastMessageID)
continue
}
lastMessageID = msg.Meta().Index
builderMap.HandleMessage(sessionID, msg, iter.Message().Meta().Index)
}
},
false,

View file

@ -90,7 +90,7 @@ func (i *iteratorImpl) Next() bool {
switch i.msgType {
case MsgBatchMetadata:
if i.index != 0 { // Might be several 0-0 BatchMeta in a row without an error though
log.Printf("Batch Meta found at the end of the batch")
log.Printf("Batch Metadata found at the end of the batch")
return false
}
m := i.msg.Decode().(*BatchMetadata)

View file

@ -54,6 +54,7 @@ func (m *RawMessage) Decode() Message {
msg, err := ReadMessage(m.tp, bytes.NewReader(m.data[1:]))
if err != nil {
log.Printf("decode err: %s", err)
return nil
}
msg.Meta().SetMeta(m.meta)
return msg