feat(backend/sink): added last session ts in sink logs

This commit is contained in:
Alexander Zavorotynskiy 2022-06-02 10:50:14 +02:00
parent 485865f704
commit 2fb57962b8

View file

@ -3,6 +3,8 @@ package main
import (
"encoding/binary"
"log"
"openreplay/backend/internal/storage"
"openreplay/backend/pkg/flakeid"
"time"
"os"
@ -34,7 +36,7 @@ func main() {
rewriter := assets.NewRewriter(cfg.AssetsOrigin)
assetMessageHandler := assetscache.New(cfg, rewriter, producer)
count := 0
counter := storage.NewLogCounter()
consumer := queue.NewMessageConsumer(
cfg.GroupSink,
@ -43,7 +45,13 @@ func main() {
cfg.TopicRawWeb,
},
func(sessionID uint64, message Message, _ *types.Meta) {
count++
// If message timestamp is empty, use at least ts of session start
ts := message.Meta().Timestamp
if ts == 0 {
ts = int64(flakeid.ExtractTimestamp(sessionID))
}
// Log ts of last processed message
counter.Update(time.UnixMilli(ts))
typeID := message.TypeID()
if !IsReplayerType(typeID) {
@ -86,8 +94,7 @@ func main() {
if err := writer.SyncAll(); err != nil {
log.Fatalf("Sync error: %v\n", err)
}
log.Printf("%v messages during 30 sec", count)
count = 0
counter.Print()
if err := consumer.Commit(); err != nil {
log.Printf("can't commit messages: %s", err)
}