feat(backend): set seek pointer at the end of batch for avoiding memory leaks

This commit is contained in:
Alexander Zavorotynskiy 2022-09-19 10:59:00 +02:00
parent 8a6dc3df24
commit 9ff555cc64
6 changed files with 13 additions and 0 deletions

View file

@ -63,6 +63,7 @@ func main() {
}
}
}
iter.Close()
},
true,
cfg.MessageSizeLimit,

View file

@ -112,6 +112,7 @@ func main() {
}
})
}
iter.Close()
}
// Init consumer

View file

@ -54,6 +54,7 @@ func main() {
statsLogger.Collect(sessionID, meta)
sessions.UpdateSession(sessionID, meta.Timestamp, iter.Message().Meta().Timestamp)
}
iter.Close()
},
false,
cfg.MessageSizeLimit,

View file

@ -64,6 +64,7 @@ func main() {
lastMessageID = msg.Meta().Index
builderMap.HandleMessage(sessionID, msg, iter.Message().Meta().Index)
}
iter.Close()
},
false,
cfg.MessageSizeLimit,

View file

@ -107,6 +107,7 @@ func main() {
messageSize.Record(context.Background(), float64(len(data)))
savedMessages.Add(context.Background(), 1)
}
iter.Close()
},
false,
cfg.MessageSizeLimit,

View file

@ -11,6 +11,7 @@ type Iterator interface {
Next() bool // Return true if we have next message
Type() int // Return type of the next message
Message() Message // Return raw or decoded message
Close()
}
type iteratorImpl struct {
@ -184,6 +185,13 @@ func (i *iteratorImpl) Message() Message {
return i.msg
}
func (i *iteratorImpl) Close() {
_, err := i.data.Seek(0, io.SeekEnd)
if err != nil {
log.Printf("can't set seek pointer at the end: %s", err)
}
}
func messageHasSize(msgType uint64) bool {
return !(msgType == 80 || msgType == 81 || msgType == 82)
}