From 2fb57962b88bdeb8a4b51602c1b6f0a7eb7ae50c Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Thu, 2 Jun 2022 10:50:14 +0200 Subject: [PATCH] feat(backend/sink): added last session ts in sink logs --- backend/cmd/sink/main.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/backend/cmd/sink/main.go b/backend/cmd/sink/main.go index a4c76d3f4..14bc485ce 100644 --- a/backend/cmd/sink/main.go +++ b/backend/cmd/sink/main.go @@ -3,6 +3,8 @@ package main import ( "encoding/binary" "log" + "openreplay/backend/internal/storage" + "openreplay/backend/pkg/flakeid" "time" "os" @@ -34,7 +36,7 @@ func main() { rewriter := assets.NewRewriter(cfg.AssetsOrigin) assetMessageHandler := assetscache.New(cfg, rewriter, producer) - count := 0 + counter := storage.NewLogCounter() consumer := queue.NewMessageConsumer( cfg.GroupSink, @@ -43,7 +45,13 @@ func main() { cfg.TopicRawWeb, }, func(sessionID uint64, message Message, _ *types.Meta) { - count++ + // If message timestamp is empty, use at least ts of session start + ts := message.Meta().Timestamp + if ts == 0 { + ts = int64(flakeid.ExtractTimestamp(sessionID)) + } + // Log ts of last processed message + counter.Update(time.UnixMilli(ts)) typeID := message.TypeID() if !IsReplayerType(typeID) { @@ -86,8 +94,7 @@ func main() { if err := writer.SyncAll(); err != nil { log.Fatalf("Sync error: %v\n", err) } - log.Printf("%v messages during 30 sec", count) - count = 0 + counter.Print() if err := consumer.Commit(); err != nil { log.Printf("can't commit messages: %s", err) }