feat(backend/handlers): removed unix timestamp from header builders

This commit is contained in:
Alexander Zavorotynskiy 2022-05-11 16:45:31 +02:00
parent 26e23d594f
commit c77966a789
7 changed files with 39 additions and 87 deletions

View file

@ -7,7 +7,6 @@ import (
"openreplay/backend/internal/datasaver"
"openreplay/backend/internal/handlers"
"openreplay/backend/internal/handlers/custom"
"openreplay/backend/pkg/intervals"
"time"
"os"
@ -33,9 +32,9 @@ func main() {
// Declare message handlers we want to apply for each incoming message
msgHandlers := []handlers.MessageProcessor{
custom.NewMainHandler(),
custom.NewInputEventBuilder(),
custom.NewPageEventBuilder(),
custom.NewMainHandler(), // TODO: separate to several handler
//custom.NewInputEventBuilder(),
//custom.NewPageEventBuilder(),
}
// Create handler's aggregator
@ -54,7 +53,6 @@ func main() {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg)
}
// TODO: can we lose data here because of db error?
return
}
@ -95,7 +93,7 @@ func main() {
consumer := queue.NewMessageConsumer(
cfg.GroupDB,
[]string{
cfg.TopicRawWeb, // TODO: is it necessary or not?
cfg.TopicRawWeb,
cfg.TopicRawIOS,
cfg.TopicTrigger, // to receive SessionEnd events
},
@ -109,7 +107,6 @@ func main() {
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
commitTick := time.Tick(cfg.CommitBatchTimeout)
checkTick := time.Tick(intervals.EVENTS_COMMIT_INTERVAL * time.Millisecond)
for {
select {
case sig := <-sigchan:
@ -118,12 +115,14 @@ func main() {
os.Exit(0)
case <-commitTick:
pg.CommitBatches()
// TODO: ee commit stats !!!
//if err := commitStats(); err != nil {
// log.Printf("Error on stats commit: %v", err)
//}
// TODO?: separate stats & regular messages
if err := consumer.Commit(); err != nil {
log.Printf("Error on consumer commit: %v", err)
}
case <-checkTick:
// checkTimeout
default:
err := consumer.ConsumeNext()
if err != nil {

View file

@ -78,7 +78,7 @@ func main() {
consumer.Close()
os.Exit(0)
case <-tick:
builderMap.IterateReadyMessages(time.Now().UnixMilli(), func(sessionID uint64, readyMsg messages.Message) {
builderMap.IterateReadyMessages(func(sessionID uint64, readyMsg messages.Message) {
producer.Produce(cfg.TopicTrigger, sessionID, messages.Encode(readyMsg))
})
producer.Flush(cfg.ProducerTimeout)

View file

@ -2,7 +2,6 @@ package builder
import (
"openreplay/backend/internal/handlers"
"openreplay/backend/pkg/intervals"
. "openreplay/backend/pkg/messages"
)
@ -26,6 +25,15 @@ func (b *builder) iterateReadyMessage(iter func(msg Message)) {
b.readyMsgs = nil
}
func (b *builder) checkSessionEnd(message Message) {
if _, isEnd := message.(*IOSSessionEnd); isEnd {
b.ended = true
}
if _, isEnd := message.(*SessionEnd); isEnd {
b.ended = true
}
}
func (b *builder) handleMessage(message Message, messageID uint64) {
timestamp := GetTimestamp(message)
if b.timestamp < timestamp {
@ -36,42 +44,10 @@ func (b *builder) handleMessage(message Message, messageID uint64) {
return
}
if _, isEnd := message.(*IOSSessionEnd); isEnd {
b.ended = true
}
if _, isEnd := message.(*SessionEnd); isEnd {
b.ended = true
}
b.checkSessionEnd(message)
for _, p := range b.processors {
/* If nil is not returned explicitely by Handle, but as the typed nil
("var i *IssueEvent; return i;")
The `rm != nil` will be true.
TODO: enforce nil to be nil(?) or add `isNil() bool` to the Message types
because this part is expected to be etendable by user with custom messageProcessor's.
Use of reflrction will be probably bad on millions of messages?
*/
if rm := p.Handle(message, messageID, b.timestamp); rm != nil {
b.readyMsgs = append(b.readyMsgs, rm)
}
}
}
func (b *builder) checkTimeouts(ts int64) bool {
if b.timestamp == 0 {
return false // SessionStart happened only
}
lastTsGap := ts - int64(b.timestamp)
// Maybe listen for `trigger` and react on SessionEnd instead (less reliable)
if lastTsGap > intervals.EVENTS_SESSION_END_TIMEOUT {
for _, p := range b.processors {
// TODO: same as above
if rm := p.Build(); rm != nil {
b.readyMsgs = append(b.readyMsgs, rm)
}
}
return true
}
return false
}

View file

@ -31,13 +31,19 @@ func (m *builderMap) HandleMessage(sessionID uint64, msg Message, messageID uint
b.handleMessage(msg, messageID)
}
func (m *builderMap) IterateReadyMessages(operatingTs int64, iter func(sessionID uint64, msg Message)) {
func (m *builderMap) IterateReadyMessages(iter func(sessionID uint64, msg Message)) {
for sessionID, b := range m.sessions {
sessionEnded := b.checkTimeouts(operatingTs)
if b.ended {
for _, p := range b.processors {
if rm := p.Build(); rm != nil {
b.readyMsgs = append(b.readyMsgs, rm)
}
}
}
b.iterateReadyMessage(func(msg Message) {
iter(sessionID, msg)
})
if sessionEnded {
if b.ended {
delete(m.sessions, sessionID)
}
}

View file

@ -125,9 +125,6 @@ func (b *builder) Handle(message Message, messageID uint64, timestamp uint64) Me
b.buildInputEvent()
b.ieBuilder.ClearLabels()
b.peBuilder.HandleSetPageLocation(msg, messageID, b.timestamp)
// TODO: what to do with this code?
//b.miFinder.HandleSetPageLocation(msg)
//b.ciFinder.HandleSetPageLocation(msg)
}
case *PageLoadTiming:
if rm := b.peBuilder.HandlePageLoadTiming(msg); rm != nil {
@ -137,17 +134,6 @@ func (b *builder) Handle(message Message, messageID uint64, timestamp uint64) Me
if rm := b.peBuilder.HandlePageRenderTiming(msg); rm != nil {
b.appendReadyMessage(rm)
}
case *PerformanceTrack:
// TODO: what to do with this code?
//if rm := b.ptaBuilder.HandlePerformanceTrack(msg, b.timestamp); rm != nil {
// b.appendReadyMessage(rm)
//}
//if rm := b.ciFinder.HandlePerformanceTrack(msg, messageID, b.timestamp); rm != nil {
// b.appendReadyMessage(rm)
//}
//if rm := b.miFinder.HandlePerformanceTrack(msg, messageID, b.timestamp); rm != nil {
// b.appendReadyMessage(rm)
//}
case *SetInputTarget:
if rm := b.ieBuilder.HandleSetInputTarget(msg); rm != nil {
b.appendReadyMessage(rm)
@ -158,10 +144,6 @@ func (b *builder) Handle(message Message, messageID uint64, timestamp uint64) Me
}
case *MouseClick:
b.buildInputEvent()
// TODO: what to do with this code?
//if rm := b.crDetector.HandleMouseClick(msg, messageID, b.timestamp); rm != nil {
// b.appendReadyMessage(rm)
//}
if msg.Label != "" {
b.appendReadyMessage(&ClickEvent{
MessageID: messageID,
@ -256,20 +238,7 @@ func (b *builder) Handle(message Message, messageID uint64, timestamp uint64) Me
Timestamp: b.timestamp,
Type: msg.Type,
})
// TODO: what to do with this code?
//case *CreateElementNode, *CreateTextNode:
// b.ddDetector.HandleNodeCreation()
//case *RemoveNode:
// b.ddDetector.HandleNodeRemoval(b.timestamp)
//case *CreateDocument:
// if rm := b.ddDetector.Build(); rm != nil {
// b.appendReadyMessage(rm)
// }
}
// TODO: what to do with this code?
//if rm := b.dcDetector.HandleMessage(message, messageID, b.timestamp); rm != nil {
// b.appendReadyMessage(rm)
//}
return nil
}

View file

@ -3,7 +3,6 @@ package ios
import (
"openreplay/backend/internal/handlers"
. "openreplay/backend/pkg/messages"
"time"
)
/*
@ -22,9 +21,11 @@ type AppNotResponding struct {
lastLabel string
lastHeartbeatTimestamp uint64
lastHeartbeatIndex uint64
lastTimestamp uint64
}
func (h *AppNotResponding) Handle(message Message, messageID uint64, timestamp uint64) Message {
h.lastTimestamp = timestamp
var event Message = nil
switch m := message.(type) {
case *IOSClickEvent:
@ -48,7 +49,7 @@ func (h *AppNotResponding) Handle(message Message, messageID uint64, timestamp u
}
func (h *AppNotResponding) Build() Message {
return h.build(uint64(time.Now().Unix()))
return h.build(h.lastTimestamp)
}
func (h *AppNotResponding) build(timestamp uint64) Message {

View file

@ -3,7 +3,6 @@ package ios
import (
"openreplay/backend/internal/handlers"
. "openreplay/backend/pkg/messages"
"time"
)
/*
@ -29,14 +28,16 @@ func (va *valueAggregator) aggregate() uint64 {
type PerformanceAggregator struct {
handlers.ReadyMessageStore
pa *IOSPerformanceAggregated
fps valueAggregator
cpu valueAggregator
memory valueAggregator
battery valueAggregator
pa *IOSPerformanceAggregated
fps valueAggregator
cpu valueAggregator
memory valueAggregator
battery valueAggregator
lastTimestamp uint64
}
func (h *PerformanceAggregator) Handle(message Message, messageID uint64, timestamp uint64) Message {
h.lastTimestamp = timestamp
if h.pa == nil {
h.pa = &IOSPerformanceAggregated{} // TODO: struct type in messages
}
@ -94,7 +95,7 @@ func (h *PerformanceAggregator) Handle(message Message, messageID uint64, timest
}
func (h *PerformanceAggregator) Build() Message {
return h.build(uint64(time.Now().Unix()))
return h.build(h.lastTimestamp)
}
func (h *PerformanceAggregator) build(timestamp uint64) Message {