From ee75b2795a9f479b59aa37c57e8a172c7be5d852 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 14 Mar 2024 16:40:14 +0100 Subject: [PATCH] feat(backend): fixed broken logs in failover module --- backend/cmd/storage/main.go | 2 +- ee/backend/pkg/failover/failover.go | 25 ++++++++++++++++--------- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/backend/cmd/storage/main.go b/backend/cmd/storage/main.go index d1595a290..2209f9058 100644 --- a/backend/cmd/storage/main.go +++ b/backend/cmd/storage/main.go @@ -34,7 +34,7 @@ func main() { } counter := storage.NewLogCounter() - sessionFinder, err := failover.NewSessionFinder(cfg, srv) + sessionFinder, err := failover.NewSessionFinder(log, cfg, srv) if err != nil { log.Fatal(ctx, "can't init sessionFinder module: %s", err) } diff --git a/ee/backend/pkg/failover/failover.go b/ee/backend/pkg/failover/failover.go index 857b5381b..2815bd70b 100644 --- a/ee/backend/pkg/failover/failover.go +++ b/ee/backend/pkg/failover/failover.go @@ -1,10 +1,12 @@ package failover import ( + "context" "fmt" - "log" + config "openreplay/backend/internal/config/storage" "openreplay/backend/internal/storage" + "openreplay/backend/pkg/logger" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue" "openreplay/backend/pkg/queue/types" @@ -25,6 +27,7 @@ func (s *sessionFinderMock) Stop() {} // Finder implementation type sessionFinderImpl struct { + log logger.Logger topicName string producerTimeout int producer types.Producer @@ -34,7 +37,7 @@ type sessionFinderImpl struct { done chan struct{} } -func NewSessionFinder(cfg *config.Config, stg *storage.Storage) (SessionFinder, error) { +func NewSessionFinder(log logger.Logger, cfg *config.Config, stg *storage.Storage) (SessionFinder, error) { switch { case cfg == nil: return nil, fmt.Errorf("config is empty") @@ -47,6 +50,7 @@ func NewSessionFinder(cfg *config.Config, stg *storage.Storage) (SessionFinder, } finder := &sessionFinderImpl{ + log: log, topicName: cfg.TopicFailover, producerTimeout: cfg.ProducerCloseTimeout, notFoundSessions: make(map[uint64]struct{}), @@ -60,6 +64,7 @@ func NewSessionFinder(cfg *config.Config, stg *storage.Storage) (SessionFinder, cfg.TopicFailover, }, messages.NewMessageIterator( + log, func(msg messages.Message) { m := msg.(*messages.SessionSearch) finder.findSession(m.SessionID(), m.Timestamp, m.Partition) @@ -82,7 +87,7 @@ func (s *sessionFinderImpl) worker() { default: err := s.consumer.ConsumeNext() if err != nil { - log.Fatalf("Error on consumption: %v", err) + s.log.Fatal(context.Background(), "error on consumption: %s", err) } } } @@ -91,24 +96,25 @@ func (s *sessionFinderImpl) worker() { func (s *sessionFinderImpl) findSession(sessionID, timestamp, partition uint64) { sessEnd := &messages.SessionEnd{Timestamp: timestamp} sessEnd.SetSessionID(sessionID) - err := s.storage.Process(sessEnd) + ctx := context.WithValue(context.Background(), "sessionID", sessionID) + err := s.storage.Process(ctx, sessEnd) if err == nil { - log.Printf("found session: %d in partition: %d, original: %d", - sessionID, partition, sessionID%numberOfPartitions) + s.log.Info(ctx, "found session in partition: %d, original: %d", + partition, sessionID%numberOfPartitions) if _, ok := s.notFoundSessions[sessionID]; ok { delete(s.notFoundSessions, sessionID) } return } if _, ok := s.notFoundSessions[sessionID]; ok { - log.Printf("skip previously not found session: %d", sessionID) + s.log.Info(ctx, "skip previously not found session") return } // Stop session search process if next partition is the same as original one nextPartition := s.nextPartition(partition) if nextPartition == sessionID%numberOfPartitions { - log.Printf("failover mechanism didn't help; sessID: %d", sessionID) + s.log.Info(ctx, "failover mechanism didn't help") s.notFoundSessions[sessionID] = struct{}{} return } @@ -127,7 +133,8 @@ func (s *sessionFinderImpl) nextPartition(partition uint64) uint64 { func (s *sessionFinderImpl) sendSearchMessage(sessionID, timestamp, partition uint64) { msg := &messages.SessionSearch{Timestamp: timestamp, Partition: partition} if err := s.producer.ProduceToPartition(s.topicName, partition, sessionID, msg.Encode()); err != nil { - log.Printf("can't send SessionSearch to failover topic: %s; sessID: %d", err, sessionID) + ctx := context.WithValue(context.Background(), "sessionID", sessionID) + s.log.Error(ctx, "can't send SessionSearch to failover topic: %s", err) } }