diff --git a/backend/Dockerfile b/backend/Dockerfile index 6ca305ca1..5cefd4cb4 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -43,7 +43,8 @@ ENV TZ=UTC \ AWS_REGION_ASSETS=eu-central-1 \ CACHE_ASSETS=true \ ASSETS_SIZE_LIMIT=6291456 \ - FS_CLEAN_HRS=72 + FS_CLEAN_HRS=72 \ + LOG_QUEUE_STATS_INTERVAL_SEC=60 ARG SERVICE_NAME diff --git a/backend/pkg/log/queue.go b/backend/pkg/log/queue.go new file mode 100644 index 000000000..a49f38c0d --- /dev/null +++ b/backend/pkg/log/queue.go @@ -0,0 +1,77 @@ +package log + +import ( + "time" + "fmt" + "log" + + "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/queue/types" + //"openreplay/backend/pkg/env" +) + + +type partitionStats struct { + maxts int64 + mints int64 + lastts int64 + lastID uint64 + count int +} + +type queueStats struct { + prts map[int32]*partitionStats + tick <-chan time.Time +} + +func NewQueueStats(sec int)*queueStats { + return &queueStats{ + prts: make(map[int32]*partitionStats), + tick: time.Tick(time.Duration(sec) * time.Second), + } +} + +func (qs *queueStats) HandleAndLog(sessionID uint64, m *types.Meta) { + prti := int32(sessionID % 16) // TODO use GetKeyPartition from kafka/key.go + prt, ok := qs.prts[prti] + if !ok { + qs.prts[prti] = &partitionStats{} + prt = qs.prts[prti] + } + + if prt.maxts < m.Timestamp { + prt.maxts = m.Timestamp + } + if prt.mints > m.Timestamp || prt.mints == 0 { + prt.mints = m.Timestamp + } + prt.lastts = m.Timestamp + prt.lastID = m.ID + prt.count += 1 + + + select { + case <-qs.tick: + qs.LogThenReset() + default: + } +} + + +func (qs *queueStats) LogThenReset() { + s := "Queue Statistics: " + for i, p := range qs.prts { + s = fmt.Sprintf("%v | %v:: lastTS %v, lastID %v, count %v, maxTS %v, minTS %v", + s, i, p.lastts, p.lastID, p.count, p.maxts, p.mints) + } + log.Println(s) + // reset + qs.prts = make(map[int32]*partitionStats) +} + + +// TODO: list of message id to log (mb filter function with callback in messages/utils.go or something) +func LogMessage(s string, sessionID uint64, msg messages.Message, m *types.Meta) { + log.Printf("%v | SessionID: %v, Queue info: %v, Message: %v", s, sessionID, m, msg) +} + diff --git a/backend/services/db/main.go b/backend/services/db/main.go index 5d2759c90..b60f7e7db 100644 --- a/backend/services/db/main.go +++ b/backend/services/db/main.go @@ -15,6 +15,7 @@ import ( "openreplay/backend/pkg/queue" "openreplay/backend/pkg/queue/types" "openreplay/backend/services/db/heuristics" + logger "openreplay/backend/pkg/log" ) var pg *cache.PGCache @@ -28,13 +29,18 @@ func main() { heurFinder := heuristics.NewHandler() + + statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC")) + consumer := queue.NewMessageConsumer( env.String("GROUP_DB"), []string{ env.String("TOPIC_RAW_IOS"), env.String("TOPIC_TRIGGER"), }, - func(sessionID uint64, msg messages.Message, _ *types.Meta) { + func(sessionID uint64, msg messages.Message, meta *types.Meta) { + statsLogger.HandleAndLog(sessionID, meta) + if err := insertMessage(sessionID, msg); err != nil { if !postgres.IsPkeyViolation(err) { log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg) @@ -64,8 +70,7 @@ func main() { return } - err = insertStats(session, msg) - if err != nil { + if err := insertStats(session, msg); err != nil { log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg) } }) diff --git a/backend/services/ender/main.go b/backend/services/ender/main.go index 9c62d14b0..e8d739f0e 100644 --- a/backend/services/ender/main.go +++ b/backend/services/ender/main.go @@ -13,6 +13,7 @@ import ( "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue" "openreplay/backend/pkg/queue/types" + logger "openreplay/backend/pkg/log" "openreplay/backend/services/ender/builder" ) @@ -23,7 +24,8 @@ func main() { TOPIC_TRIGGER := env.String("TOPIC_TRIGGER") builderMap := builder.NewBuilderMap() - var lastTs int64 = 0 + + statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC")) producer := queue.NewProducer() consumer := queue.NewMessageConsumer( @@ -33,11 +35,8 @@ func main() { env.String("TOPIC_RAW_IOS"), }, func(sessionID uint64, msg messages.Message, meta *types.Meta) { - lastTs = meta.Timestamp + statsLogger.HandleAndLog(sessionID, meta) builderMap.HandleMessage(sessionID, msg, msg.Meta().Index) - // builderMap.IterateSessionReadyMessages(sessionID, lastTs, func(readyMsg messages.Message) { - // producer.Produce(TOPIC_TRIGGER, sessionID, messages.Encode(readyMsg)) - // }) }, ) consumer.DisableAutoCommit()