From 5b7c479f4dfc5f5b2517d31baab86a2d06c15c10 Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Wed, 4 May 2022 16:17:57 +0200 Subject: [PATCH] Refactoring in stats logger --- backend/pkg/log/queue.go | 42 +++++++++-------- backend/services/db/main.go | 82 ++++++++++++++++++---------------- backend/services/ender/main.go | 2 +- 3 files changed, 68 insertions(+), 58 deletions(-) diff --git a/backend/pkg/log/queue.go b/backend/pkg/log/queue.go index 62e47cbbe..ced815bd2 100644 --- a/backend/pkg/log/queue.go +++ b/backend/pkg/log/queue.go @@ -5,7 +5,6 @@ import ( "log" "time" - "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue/types" //"openreplay/backend/pkg/env" ) @@ -18,6 +17,19 @@ type partitionStats struct { count int } +// Update partition statistic +func (prt *partitionStats) update(m *types.Meta) { + if prt.maxts < m.Timestamp { + prt.maxts = m.Timestamp + } + if prt.mints > m.Timestamp || prt.mints == 0 { + prt.mints = m.Timestamp + } + prt.lastts = m.Timestamp + prt.lastID = m.ID + prt.count += 1 +} + type queueStats struct { prts map[int32]*partitionStats tick <-chan time.Time @@ -30,43 +42,35 @@ func NewQueueStats(sec int) *queueStats { } } -func (qs *queueStats) HandleAndLog(sessionID uint64, m *types.Meta) { +// Collect writes new data to partition statistic +func (qs *queueStats) Collect(sessionID uint64, m *types.Meta) { prti := int32(sessionID % 16) // TODO use GetKeyPartition from kafka/key.go prt, ok := qs.prts[prti] if !ok { qs.prts[prti] = &partitionStats{} prt = qs.prts[prti] } - - if prt.maxts < m.Timestamp { - prt.maxts = m.Timestamp - } - if prt.mints > m.Timestamp || prt.mints == 0 { - prt.mints = m.Timestamp - } - prt.lastts = m.Timestamp - prt.lastID = m.ID - prt.count += 1 + prt.update(m) select { case <-qs.tick: - qs.LogThenReset() + qs.log() + qs.reset() default: } } -func (qs *queueStats) LogThenReset() { +// Print to console collected statistics +func (qs *queueStats) log() { s := "Queue Statistics: " for i, p := range qs.prts { s = fmt.Sprintf("%v | %v:: lastTS %v, lastID %v, count %v, maxTS %v, minTS %v", s, i, p.lastts, p.lastID, p.count, p.maxts, p.mints) } log.Println(s) - // reset - qs.prts = make(map[int32]*partitionStats) } -// TODO: list of message id to log (mb filter function with callback in messages/utils.go or something) -func LogMessage(s string, sessionID uint64, msg messages.Message, m *types.Meta) { - log.Printf("%v | SessionID: %v, Queue info: %v, Message: %v", s, sessionID, m, msg) +// Clear all queue partitions +func (qs *queueStats) reset() { + qs.prts = make(map[int32]*partitionStats) } diff --git a/backend/services/db/main.go b/backend/services/db/main.go index 600913120..619e6816e 100644 --- a/backend/services/db/main.go +++ b/backend/services/db/main.go @@ -29,52 +29,58 @@ func main() { heurFinder := heuristics.NewHandler() mi := NewMessageInserter(pg) si := NewStatsInserter(pg) - statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC")) + // Handler logic + handler := func(sessionID uint64, msg messages.Message, meta *types.Meta) { + statsLogger.Collect(sessionID, meta) + + // Just insert message into db without additional checks + if err := mi.insertMessage(sessionID, msg); err != nil { + if !postgres.IsPkeyViolation(err) { + log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg) + } + return + } + + // Try to get session from db + session, err := pg.GetSession(sessionID) + if err != nil { + // Might happen due to the assets-related message TODO: log only if session is necessary for this kind of message + log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, sessionID, msg) + return + } + + // Insert statistics + err = si.insertStats(session, msg) + if err != nil { + log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg) + } + + heurFinder.HandleMessage(session, msg) + heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) { + // TODO: DRY code (carefully with the return statement logic) + if err := mi.insertMessage(sessionID, msg); err != nil { + if !postgres.IsPkeyViolation(err) { + log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg) + } + return + } + + if err := si.insertStats(session, msg); err != nil { + log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg) + } + }) + } + + // Init consumer consumer := queue.NewMessageConsumer( env.String("GROUP_DB"), []string{ env.String("TOPIC_RAW_IOS"), env.String("TOPIC_TRIGGER"), }, - func(sessionID uint64, msg messages.Message, meta *types.Meta) { - statsLogger.HandleAndLog(sessionID, meta) - - if err := mi.insertMessage(sessionID, msg); err != nil { - if !postgres.IsPkeyViolation(err) { - log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg) - } - return - } - - session, err := pg.GetSession(sessionID) - if err != nil { - // Might happen due to the assets-related message TODO: log only if session is necessary for this kind of message - log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, sessionID, msg) - return - } - - err = si.insertStats(session, msg) - if err != nil { - log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg) - } - - heurFinder.HandleMessage(session, msg) - heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) { - // TODO: DRY code (carefully with the return statement logic) - if err := mi.insertMessage(sessionID, msg); err != nil { - if !postgres.IsPkeyViolation(err) { - log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg) - } - return - } - - if err := si.insertStats(session, msg); err != nil { - log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg) - } - }) - }, + handler, false, ) diff --git a/backend/services/ender/main.go b/backend/services/ender/main.go index f2430f3a0..4170a178e 100644 --- a/backend/services/ender/main.go +++ b/backend/services/ender/main.go @@ -35,7 +35,7 @@ func main() { env.String("TOPIC_RAW_IOS"), }, func(sessionID uint64, msg messages.Message, meta *types.Meta) { - statsLogger.HandleAndLog(sessionID, meta) + statsLogger.Collect(sessionID, meta) builderMap.HandleMessage(sessionID, msg, msg.Meta().Index) }, false,