From efe1ae1dd7e5a215299755958596df78ddf0b004 Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Tue, 20 Dec 2022 13:51:53 +0100 Subject: [PATCH] feat(backend): removed old queue logger --- backend/cmd/db/main.go | 4 -- backend/cmd/ender/main.go | 3 +- backend/cmd/heuristics/main.go | 5 -- backend/internal/sessionender/ender.go | 6 +- backend/pkg/log/queue.go | 79 -------------------------- 5 files changed, 2 insertions(+), 95 deletions(-) delete mode 100644 backend/pkg/log/queue.go diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index c8dd60796..74e6149bc 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -17,7 +17,6 @@ import ( "openreplay/backend/pkg/db/postgres" "openreplay/backend/pkg/handlers" custom2 "openreplay/backend/pkg/handlers/custom" - logger "openreplay/backend/pkg/log" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/monitoring" "openreplay/backend/pkg/queue" @@ -59,7 +58,6 @@ func main() { // Init modules saver := datasaver.New(pg, producer) saver.InitStats() - statsLogger := logger.NewQueueStats(cfg.LoggerTimeout) msgFilter := []int{messages.MsgMetadata, messages.MsgIssueEvent, messages.MsgSessionStart, messages.MsgSessionEnd, messages.MsgUserID, messages.MsgUserAnonymousID, messages.MsgClickEvent, @@ -71,8 +69,6 @@ func main() { // Handler logic msgHandler := func(msg messages.Message) { - statsLogger.Collect(msg) - // Just save session data into db without additional checks if err := saver.InsertMessage(msg); err != nil { if !postgres.IsPkeyViolation(err) { diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index 3190b07d4..be582c2fd 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -15,7 +15,6 @@ import ( "openreplay/backend/pkg/db/cache" "openreplay/backend/pkg/db/postgres" "openreplay/backend/pkg/intervals" - logger "openreplay/backend/pkg/log" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/monitoring" "openreplay/backend/pkg/queue" @@ -33,7 +32,7 @@ func main() { pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres, 0, 0, metrics), cfg.ProjectExpirationTimeoutMs) defer pg.Close() - sessions, err := sessionender.New(metrics, intervals.EVENTS_SESSION_END_TIMEOUT, cfg.PartitionsNumber, logger.NewQueueStats(cfg.LoggerTimeout)) + sessions, err := sessionender.New(metrics, intervals.EVENTS_SESSION_END_TIMEOUT, cfg.PartitionsNumber) if err != nil { log.Printf("can't init ender service: %s", err) return diff --git a/backend/cmd/heuristics/main.go b/backend/cmd/heuristics/main.go index fdbb6552e..ac55b83bc 100644 --- a/backend/cmd/heuristics/main.go +++ b/backend/cmd/heuristics/main.go @@ -12,7 +12,6 @@ import ( "openreplay/backend/pkg/handlers" web2 "openreplay/backend/pkg/handlers/web" "openreplay/backend/pkg/intervals" - logger "openreplay/backend/pkg/log" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue" "openreplay/backend/pkg/sessions" @@ -44,14 +43,10 @@ func main() { // Create handler's aggregator builderMap := sessions.NewBuilderMap(handlersFabric) - // Init logger - statsLogger := logger.NewQueueStats(cfg.LoggerTimeout) - // Init producer and consumer for data bus producer := queue.NewProducer(cfg.MessageSizeLimit, true) msgHandler := func(msg messages.Message) { - statsLogger.Collect(msg) builderMap.HandleMessage(msg) } diff --git a/backend/internal/sessionender/ender.go b/backend/internal/sessionender/ender.go index dbd3eb901..c1c2c9b7f 100644 --- a/backend/internal/sessionender/ender.go +++ b/backend/internal/sessionender/ender.go @@ -5,7 +5,6 @@ import ( "fmt" "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "log" - log2 "openreplay/backend/pkg/log" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/monitoring" "time" @@ -29,10 +28,9 @@ type SessionEnder struct { timeCtrl *timeController activeSessions syncfloat64.UpDownCounter totalSessions syncfloat64.Counter - stats log2.QueueStats } -func New(metrics *monitoring.Metrics, timeout int64, parts int, stats log2.QueueStats) (*SessionEnder, error) { +func New(metrics *monitoring.Metrics, timeout int64, parts int) (*SessionEnder, error) { if metrics == nil { return nil, fmt.Errorf("metrics module is empty") } @@ -51,13 +49,11 @@ func New(metrics *monitoring.Metrics, timeout int64, parts int, stats log2.Queue timeCtrl: NewTimeController(parts), activeSessions: activeSessions, totalSessions: totalSessions, - stats: stats, }, nil } // UpdateSession save timestamp for new sessions and update for existing sessions func (se *SessionEnder) UpdateSession(msg messages.Message) { - se.stats.Collect(msg) var ( sessionID = msg.Meta().SessionID() batchTimestamp = msg.Meta().Batch().Timestamp() diff --git a/backend/pkg/log/queue.go b/backend/pkg/log/queue.go deleted file mode 100644 index e9ac5dc1f..000000000 --- a/backend/pkg/log/queue.go +++ /dev/null @@ -1,79 +0,0 @@ -package log - -import ( - "fmt" - "log" - "time" - - "openreplay/backend/pkg/messages" -) - -type partitionStats struct { - maxts int64 - mints int64 - lastts int64 - lastID uint64 - count int -} - -// Update partition statistic -func (prt *partitionStats) update(m *messages.BatchInfo) { - if prt.maxts < m.Timestamp() { - prt.maxts = m.Timestamp() - } - if prt.mints > m.Timestamp() || prt.mints == 0 { - prt.mints = m.Timestamp() - } - prt.lastts = m.Timestamp() - prt.lastID = m.ID() - prt.count += 1 -} - -type queueStats struct { - prts map[int32]*partitionStats - tick <-chan time.Time -} - -type QueueStats interface { - Collect(msg messages.Message) -} - -func NewQueueStats(sec int) *queueStats { - return &queueStats{ - prts: make(map[int32]*partitionStats), - tick: time.Tick(time.Duration(sec) * time.Second), - } -} - -// Collect writes new data to partition statistic -func (qs *queueStats) Collect(msg messages.Message) { - prti := int32(msg.SessionID() % 16) // TODO use GetKeyPartition from kafka/key.go - prt, ok := qs.prts[prti] - if !ok { - qs.prts[prti] = &partitionStats{} - prt = qs.prts[prti] - } - prt.update(msg.Meta().Batch()) - - select { - case <-qs.tick: - qs.log() - qs.reset() - default: - } -} - -// Print to console collected statistics -func (qs *queueStats) log() { - s := "Queue Statistics: " - for i, p := range qs.prts { - s = fmt.Sprintf("%v | %v:: lastTS %v, lastID %v, count %v, maxTS %v, minTS %v", - s, i, p.lastts, p.lastID, p.count, p.maxts, p.mints) - } - log.Println(s) -} - -// Clear all queue partitions -func (qs *queueStats) reset() { - qs.prts = make(map[int32]*partitionStats) -}