feat(backend): fixed broken logs in failover module

This commit is contained in:
Alexander 2024-03-14 16:40:14 +01:00
parent b1ca164449
commit ee75b2795a
2 changed files with 17 additions and 10 deletions

View file

@ -34,7 +34,7 @@ func main() {
}
counter := storage.NewLogCounter()
sessionFinder, err := failover.NewSessionFinder(cfg, srv)
sessionFinder, err := failover.NewSessionFinder(log, cfg, srv)
if err != nil {
log.Fatal(ctx, "can't init sessionFinder module: %s", err)
}

View file

@ -1,10 +1,12 @@
package failover
import (
"context"
"fmt"
"log"
config "openreplay/backend/internal/config/storage"
"openreplay/backend/internal/storage"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
@ -25,6 +27,7 @@ func (s *sessionFinderMock) Stop() {}
// Finder implementation
type sessionFinderImpl struct {
log logger.Logger
topicName string
producerTimeout int
producer types.Producer
@ -34,7 +37,7 @@ type sessionFinderImpl struct {
done chan struct{}
}
func NewSessionFinder(cfg *config.Config, stg *storage.Storage) (SessionFinder, error) {
func NewSessionFinder(log logger.Logger, cfg *config.Config, stg *storage.Storage) (SessionFinder, error) {
switch {
case cfg == nil:
return nil, fmt.Errorf("config is empty")
@ -47,6 +50,7 @@ func NewSessionFinder(cfg *config.Config, stg *storage.Storage) (SessionFinder,
}
finder := &sessionFinderImpl{
log: log,
topicName: cfg.TopicFailover,
producerTimeout: cfg.ProducerCloseTimeout,
notFoundSessions: make(map[uint64]struct{}),
@ -60,6 +64,7 @@ func NewSessionFinder(cfg *config.Config, stg *storage.Storage) (SessionFinder,
cfg.TopicFailover,
},
messages.NewMessageIterator(
log,
func(msg messages.Message) {
m := msg.(*messages.SessionSearch)
finder.findSession(m.SessionID(), m.Timestamp, m.Partition)
@ -82,7 +87,7 @@ func (s *sessionFinderImpl) worker() {
default:
err := s.consumer.ConsumeNext()
if err != nil {
log.Fatalf("Error on consumption: %v", err)
s.log.Fatal(context.Background(), "error on consumption: %s", err)
}
}
}
@ -91,24 +96,25 @@ func (s *sessionFinderImpl) worker() {
func (s *sessionFinderImpl) findSession(sessionID, timestamp, partition uint64) {
sessEnd := &messages.SessionEnd{Timestamp: timestamp}
sessEnd.SetSessionID(sessionID)
err := s.storage.Process(sessEnd)
ctx := context.WithValue(context.Background(), "sessionID", sessionID)
err := s.storage.Process(ctx, sessEnd)
if err == nil {
log.Printf("found session: %d in partition: %d, original: %d",
sessionID, partition, sessionID%numberOfPartitions)
s.log.Info(ctx, "found session in partition: %d, original: %d",
partition, sessionID%numberOfPartitions)
if _, ok := s.notFoundSessions[sessionID]; ok {
delete(s.notFoundSessions, sessionID)
}
return
}
if _, ok := s.notFoundSessions[sessionID]; ok {
log.Printf("skip previously not found session: %d", sessionID)
s.log.Info(ctx, "skip previously not found session")
return
}
// Stop session search process if next partition is the same as original one
nextPartition := s.nextPartition(partition)
if nextPartition == sessionID%numberOfPartitions {
log.Printf("failover mechanism didn't help; sessID: %d", sessionID)
s.log.Info(ctx, "failover mechanism didn't help")
s.notFoundSessions[sessionID] = struct{}{}
return
}
@ -127,7 +133,8 @@ func (s *sessionFinderImpl) nextPartition(partition uint64) uint64 {
func (s *sessionFinderImpl) sendSearchMessage(sessionID, timestamp, partition uint64) {
msg := &messages.SessionSearch{Timestamp: timestamp, Partition: partition}
if err := s.producer.ProduceToPartition(s.topicName, partition, sessionID, msg.Encode()); err != nil {
log.Printf("can't send SessionSearch to failover topic: %s; sessID: %d", err, sessionID)
ctx := context.WithValue(context.Background(), "sessionID", sessionID)
s.log.Error(ctx, "can't send SessionSearch to failover topic: %s", err)
}
}