Refactoring in stats logger

This commit is contained in:
Alexander Zavorotynskiy 2022-05-04 16:17:57 +02:00
parent 74672d4321
commit 5b7c479f4d
3 changed files with 68 additions and 58 deletions

View file

@ -5,7 +5,6 @@ import (
"log"
"time"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue/types"
//"openreplay/backend/pkg/env"
)
@ -18,6 +17,19 @@ type partitionStats struct {
count int
}
// Update partition statistic
func (prt *partitionStats) update(m *types.Meta) {
if prt.maxts < m.Timestamp {
prt.maxts = m.Timestamp
}
if prt.mints > m.Timestamp || prt.mints == 0 {
prt.mints = m.Timestamp
}
prt.lastts = m.Timestamp
prt.lastID = m.ID
prt.count += 1
}
type queueStats struct {
prts map[int32]*partitionStats
tick <-chan time.Time
@ -30,43 +42,35 @@ func NewQueueStats(sec int) *queueStats {
}
}
func (qs *queueStats) HandleAndLog(sessionID uint64, m *types.Meta) {
// Collect writes new data to partition statistic
func (qs *queueStats) Collect(sessionID uint64, m *types.Meta) {
prti := int32(sessionID % 16) // TODO use GetKeyPartition from kafka/key.go
prt, ok := qs.prts[prti]
if !ok {
qs.prts[prti] = &partitionStats{}
prt = qs.prts[prti]
}
if prt.maxts < m.Timestamp {
prt.maxts = m.Timestamp
}
if prt.mints > m.Timestamp || prt.mints == 0 {
prt.mints = m.Timestamp
}
prt.lastts = m.Timestamp
prt.lastID = m.ID
prt.count += 1
prt.update(m)
select {
case <-qs.tick:
qs.LogThenReset()
qs.log()
qs.reset()
default:
}
}
func (qs *queueStats) LogThenReset() {
// Print to console collected statistics
func (qs *queueStats) log() {
s := "Queue Statistics: "
for i, p := range qs.prts {
s = fmt.Sprintf("%v | %v:: lastTS %v, lastID %v, count %v, maxTS %v, minTS %v",
s, i, p.lastts, p.lastID, p.count, p.maxts, p.mints)
}
log.Println(s)
// reset
qs.prts = make(map[int32]*partitionStats)
}
// TODO: list of message id to log (mb filter function with callback in messages/utils.go or something)
func LogMessage(s string, sessionID uint64, msg messages.Message, m *types.Meta) {
log.Printf("%v | SessionID: %v, Queue info: %v, Message: %v", s, sessionID, m, msg)
// Clear all queue partitions
func (qs *queueStats) reset() {
qs.prts = make(map[int32]*partitionStats)
}

View file

@ -29,52 +29,58 @@ func main() {
heurFinder := heuristics.NewHandler()
mi := NewMessageInserter(pg)
si := NewStatsInserter(pg)
statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"))
// Handler logic
handler := func(sessionID uint64, msg messages.Message, meta *types.Meta) {
statsLogger.Collect(sessionID, meta)
// Just insert message into db without additional checks
if err := mi.insertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg)
}
return
}
// Try to get session from db
session, err := pg.GetSession(sessionID)
if err != nil {
// Might happen due to the assets-related message TODO: log only if session is necessary for this kind of message
log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, sessionID, msg)
return
}
// Insert statistics
err = si.insertStats(session, msg)
if err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg)
}
heurFinder.HandleMessage(session, msg)
heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) {
// TODO: DRY code (carefully with the return statement logic)
if err := mi.insertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
return
}
if err := si.insertStats(session, msg); err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
})
}
// Init consumer
consumer := queue.NewMessageConsumer(
env.String("GROUP_DB"),
[]string{
env.String("TOPIC_RAW_IOS"),
env.String("TOPIC_TRIGGER"),
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
statsLogger.HandleAndLog(sessionID, meta)
if err := mi.insertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg)
}
return
}
session, err := pg.GetSession(sessionID)
if err != nil {
// Might happen due to the assets-related message TODO: log only if session is necessary for this kind of message
log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, sessionID, msg)
return
}
err = si.insertStats(session, msg)
if err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg)
}
heurFinder.HandleMessage(session, msg)
heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) {
// TODO: DRY code (carefully with the return statement logic)
if err := mi.insertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
return
}
if err := si.insertStats(session, msg); err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
})
},
handler,
false,
)

View file

@ -35,7 +35,7 @@ func main() {
env.String("TOPIC_RAW_IOS"),
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
statsLogger.HandleAndLog(sessionID, meta)
statsLogger.Collect(sessionID, meta)
builderMap.HandleMessage(sessionID, msg, msg.Meta().Index)
},
false,