From 9ff555cc64fc82098ee009b7150c91aa568f656f Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Mon, 19 Sep 2022 10:59:00 +0200 Subject: [PATCH] feat(backend): set seek pointer at the end of batch for avoiding memory leaks --- backend/cmd/assets/main.go | 1 + backend/cmd/db/main.go | 1 + backend/cmd/ender/main.go | 1 + backend/cmd/heuristics/main.go | 1 + backend/cmd/sink/main.go | 1 + backend/pkg/messages/batch.go | 8 ++++++++ 6 files changed, 13 insertions(+) diff --git a/backend/cmd/assets/main.go b/backend/cmd/assets/main.go index d62a25876..b81ff9b5a 100644 --- a/backend/cmd/assets/main.go +++ b/backend/cmd/assets/main.go @@ -63,6 +63,7 @@ func main() { } } } + iter.Close() }, true, cfg.MessageSizeLimit, diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index 6dc4b09cb..a807cc253 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -112,6 +112,7 @@ func main() { } }) } + iter.Close() } // Init consumer diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index 524af0894..a2dafa689 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -54,6 +54,7 @@ func main() { statsLogger.Collect(sessionID, meta) sessions.UpdateSession(sessionID, meta.Timestamp, iter.Message().Meta().Timestamp) } + iter.Close() }, false, cfg.MessageSizeLimit, diff --git a/backend/cmd/heuristics/main.go b/backend/cmd/heuristics/main.go index 49b3326bb..be27a86bd 100644 --- a/backend/cmd/heuristics/main.go +++ b/backend/cmd/heuristics/main.go @@ -64,6 +64,7 @@ func main() { lastMessageID = msg.Meta().Index builderMap.HandleMessage(sessionID, msg, iter.Message().Meta().Index) } + iter.Close() }, false, cfg.MessageSizeLimit, diff --git a/backend/cmd/sink/main.go b/backend/cmd/sink/main.go index f0b145fae..bd7fddf20 100644 --- a/backend/cmd/sink/main.go +++ b/backend/cmd/sink/main.go @@ -107,6 +107,7 @@ func main() { messageSize.Record(context.Background(), float64(len(data))) savedMessages.Add(context.Background(), 1) } + iter.Close() }, false, cfg.MessageSizeLimit, diff --git a/backend/pkg/messages/batch.go b/backend/pkg/messages/batch.go index 58b177aea..887e5ddb3 100644 --- a/backend/pkg/messages/batch.go +++ b/backend/pkg/messages/batch.go @@ -11,6 +11,7 @@ type Iterator interface { Next() bool // Return true if we have next message Type() int // Return type of the next message Message() Message // Return raw or decoded message + Close() } type iteratorImpl struct { @@ -184,6 +185,13 @@ func (i *iteratorImpl) Message() Message { return i.msg } +func (i *iteratorImpl) Close() { + _, err := i.data.Seek(0, io.SeekEnd) + if err != nil { + log.Printf("can't set seek pointer at the end: %s", err) + } +} + func messageHasSize(msgType uint64) bool { return !(msgType == 80 || msgType == 81 || msgType == 82) }