diff --git a/backend/cmd/storage/main.go b/backend/cmd/storage/main.go index 57fbd1776..718704e87 100644 --- a/backend/cmd/storage/main.go +++ b/backend/cmd/storage/main.go @@ -28,6 +28,8 @@ func main() { return } + counter := storage.NewLogCounter() + consumer := queue.NewMessageConsumer( cfg.GroupStorage, []string{ @@ -37,6 +39,8 @@ func main() { switch msg.(type) { case *messages.SessionEnd: srv.UploadKey(strconv.FormatUint(sessionID, 10), 5) + // Log timestamp of last processed session + counter.Update(time.UnixMilli(msg.Meta().Timestamp)) } }, true, @@ -48,6 +52,7 @@ func main() { signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) cleanTick := time.Tick(time.Duration(cfg.FSCleanHRS) * time.Hour) + counterTick := time.Tick(time.Second * 30) for { select { case sig := <-sigchan: @@ -56,6 +61,8 @@ func main() { os.Exit(0) case <-cleanTick: go srv.CleanDir(cfg.FSDir) + case <-counterTick: + counter.Print() default: err := consumer.ConsumeNext() if err != nil { diff --git a/backend/internal/storage/counter.go b/backend/internal/storage/counter.go new file mode 100644 index 000000000..b1f0e2647 --- /dev/null +++ b/backend/internal/storage/counter.go @@ -0,0 +1,45 @@ +package storage + +import ( + "log" + "sync" + "time" +) + +type logCounter struct { + mu sync.Mutex + counter int + timestamp time.Time + lastTS time.Time +} + +func NewLogCounter() *logCounter { + nlc := &logCounter{} + nlc.init() + return nlc +} + +func (c *logCounter) init() { + c.mu.Lock() + c.counter = 0 + c.timestamp = time.Now() + c.mu.Unlock() +} + +func (c *logCounter) Update(ts time.Time) { + c.mu.Lock() + c.counter++ + c.lastTS = ts + c.mu.Unlock() +} + +func (c *logCounter) Print() { + c.mu.Lock() + log.Printf("counter: %d, duration: %s, lastSessionTS: %s", + c.counter, + time.Now().Sub(c.timestamp).String(), + c.lastTS.String(), + ) + c.mu.Unlock() + c.init() +}