feat(backend): incoming queue stats logging for ender & db
This commit is contained in:
parent
e8e87c5ff5
commit
95c5190b07
4 changed files with 91 additions and 9 deletions
|
|
@ -43,7 +43,8 @@ ENV TZ=UTC \
|
|||
AWS_REGION_ASSETS=eu-central-1 \
|
||||
CACHE_ASSETS=true \
|
||||
ASSETS_SIZE_LIMIT=6291456 \
|
||||
FS_CLEAN_HRS=72
|
||||
FS_CLEAN_HRS=72 \
|
||||
LOG_QUEUE_STATS_INTERVAL_SEC=60
|
||||
|
||||
|
||||
ARG SERVICE_NAME
|
||||
|
|
|
|||
77
backend/pkg/log/queue.go
Normal file
77
backend/pkg/log/queue.go
Normal file
|
|
@ -0,0 +1,77 @@
|
|||
package log
|
||||
|
||||
import (
|
||||
"time"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"openreplay/backend/pkg/messages"
|
||||
"openreplay/backend/pkg/queue/types"
|
||||
//"openreplay/backend/pkg/env"
|
||||
)
|
||||
|
||||
|
||||
type partitionStats struct {
|
||||
maxts int64
|
||||
mints int64
|
||||
lastts int64
|
||||
lastID uint64
|
||||
count int
|
||||
}
|
||||
|
||||
type queueStats struct {
|
||||
prts map[int32]*partitionStats
|
||||
tick <-chan time.Time
|
||||
}
|
||||
|
||||
func NewQueueStats(sec int)*queueStats {
|
||||
return &queueStats{
|
||||
prts: make(map[int32]*partitionStats),
|
||||
tick: time.Tick(time.Duration(sec) * time.Second),
|
||||
}
|
||||
}
|
||||
|
||||
func (qs *queueStats) HandleAndLog(sessionID uint64, m *types.Meta) {
|
||||
prti := int32(sessionID % 16) // TODO use GetKeyPartition from kafka/key.go
|
||||
prt, ok := qs.prts[prti]
|
||||
if !ok {
|
||||
qs.prts[prti] = &partitionStats{}
|
||||
prt = qs.prts[prti]
|
||||
}
|
||||
|
||||
if prt.maxts < m.Timestamp {
|
||||
prt.maxts = m.Timestamp
|
||||
}
|
||||
if prt.mints > m.Timestamp || prt.mints == 0 {
|
||||
prt.mints = m.Timestamp
|
||||
}
|
||||
prt.lastts = m.Timestamp
|
||||
prt.lastID = m.ID
|
||||
prt.count += 1
|
||||
|
||||
|
||||
select {
|
||||
case <-qs.tick:
|
||||
qs.LogThenReset()
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
func (qs *queueStats) LogThenReset() {
|
||||
s := "Queue Statistics: "
|
||||
for i, p := range qs.prts {
|
||||
s = fmt.Sprintf("%v | %v:: lastTS %v, lastID %v, count %v, maxTS %v, minTS %v",
|
||||
s, i, p.lastts, p.lastID, p.count, p.maxts, p.mints)
|
||||
}
|
||||
log.Println(s)
|
||||
// reset
|
||||
qs.prts = make(map[int32]*partitionStats)
|
||||
}
|
||||
|
||||
|
||||
// TODO: list of message id to log (mb filter function with callback in messages/utils.go or something)
|
||||
func LogMessage(s string, sessionID uint64, msg messages.Message, m *types.Meta) {
|
||||
log.Printf("%v | SessionID: %v, Queue info: %v, Message: %v", s, sessionID, m, msg)
|
||||
}
|
||||
|
||||
|
|
@ -15,6 +15,7 @@ import (
|
|||
"openreplay/backend/pkg/queue"
|
||||
"openreplay/backend/pkg/queue/types"
|
||||
"openreplay/backend/services/db/heuristics"
|
||||
logger "openreplay/backend/pkg/log"
|
||||
)
|
||||
|
||||
var pg *cache.PGCache
|
||||
|
|
@ -28,13 +29,18 @@ func main() {
|
|||
|
||||
heurFinder := heuristics.NewHandler()
|
||||
|
||||
|
||||
statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"))
|
||||
|
||||
consumer := queue.NewMessageConsumer(
|
||||
env.String("GROUP_DB"),
|
||||
[]string{
|
||||
env.String("TOPIC_RAW_IOS"),
|
||||
env.String("TOPIC_TRIGGER"),
|
||||
},
|
||||
func(sessionID uint64, msg messages.Message, _ *types.Meta) {
|
||||
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
|
||||
statsLogger.HandleAndLog(sessionID, meta)
|
||||
|
||||
if err := insertMessage(sessionID, msg); err != nil {
|
||||
if !postgres.IsPkeyViolation(err) {
|
||||
log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg)
|
||||
|
|
@ -64,8 +70,7 @@ func main() {
|
|||
return
|
||||
}
|
||||
|
||||
err = insertStats(session, msg)
|
||||
if err != nil {
|
||||
if err := insertStats(session, msg); err != nil {
|
||||
log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg)
|
||||
}
|
||||
})
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import (
|
|||
"openreplay/backend/pkg/messages"
|
||||
"openreplay/backend/pkg/queue"
|
||||
"openreplay/backend/pkg/queue/types"
|
||||
logger "openreplay/backend/pkg/log"
|
||||
"openreplay/backend/services/ender/builder"
|
||||
)
|
||||
|
||||
|
|
@ -23,7 +24,8 @@ func main() {
|
|||
TOPIC_TRIGGER := env.String("TOPIC_TRIGGER")
|
||||
|
||||
builderMap := builder.NewBuilderMap()
|
||||
var lastTs int64 = 0
|
||||
|
||||
statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"))
|
||||
|
||||
producer := queue.NewProducer()
|
||||
consumer := queue.NewMessageConsumer(
|
||||
|
|
@ -33,11 +35,8 @@ func main() {
|
|||
env.String("TOPIC_RAW_IOS"),
|
||||
},
|
||||
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
|
||||
lastTs = meta.Timestamp
|
||||
statsLogger.HandleAndLog(sessionID, meta)
|
||||
builderMap.HandleMessage(sessionID, msg, msg.Meta().Index)
|
||||
// builderMap.IterateSessionReadyMessages(sessionID, lastTs, func(readyMsg messages.Message) {
|
||||
// producer.Produce(TOPIC_TRIGGER, sessionID, messages.Encode(readyMsg))
|
||||
// })
|
||||
},
|
||||
)
|
||||
consumer.DisableAutoCommit()
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue