diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index 564fcbab5..8236586c2 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -7,7 +7,6 @@ import ( "openreplay/backend/internal/datasaver" "openreplay/backend/internal/handlers" "openreplay/backend/internal/handlers/custom" - "openreplay/backend/pkg/intervals" "time" "os" @@ -33,9 +32,9 @@ func main() { // Declare message handlers we want to apply for each incoming message msgHandlers := []handlers.MessageProcessor{ - custom.NewMainHandler(), - custom.NewInputEventBuilder(), - custom.NewPageEventBuilder(), + custom.NewMainHandler(), // TODO: separate to several handler + //custom.NewInputEventBuilder(), + //custom.NewPageEventBuilder(), } // Create handler's aggregator @@ -54,7 +53,6 @@ func main() { if !postgres.IsPkeyViolation(err) { log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg) } - // TODO: can we lose data here because of db error? return } @@ -95,7 +93,7 @@ func main() { consumer := queue.NewMessageConsumer( cfg.GroupDB, []string{ - cfg.TopicRawWeb, // TODO: is it necessary or not? + cfg.TopicRawWeb, cfg.TopicRawIOS, cfg.TopicTrigger, // to receive SessionEnd events }, @@ -109,7 +107,6 @@ func main() { signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) commitTick := time.Tick(cfg.CommitBatchTimeout) - checkTick := time.Tick(intervals.EVENTS_COMMIT_INTERVAL * time.Millisecond) for { select { case sig := <-sigchan: @@ -118,12 +115,14 @@ func main() { os.Exit(0) case <-commitTick: pg.CommitBatches() + // TODO: ee commit stats !!! + //if err := commitStats(); err != nil { + // log.Printf("Error on stats commit: %v", err) + //} // TODO?: separate stats & regular messages if err := consumer.Commit(); err != nil { log.Printf("Error on consumer commit: %v", err) } - case <-checkTick: - // checkTimeout default: err := consumer.ConsumeNext() if err != nil { diff --git a/backend/cmd/heuristics/main.go b/backend/cmd/heuristics/main.go index ddfd29095..9c77cb4ba 100644 --- a/backend/cmd/heuristics/main.go +++ b/backend/cmd/heuristics/main.go @@ -78,7 +78,7 @@ func main() { consumer.Close() os.Exit(0) case <-tick: - builderMap.IterateReadyMessages(time.Now().UnixMilli(), func(sessionID uint64, readyMsg messages.Message) { + builderMap.IterateReadyMessages(func(sessionID uint64, readyMsg messages.Message) { producer.Produce(cfg.TopicTrigger, sessionID, messages.Encode(readyMsg)) }) producer.Flush(cfg.ProducerTimeout) diff --git a/backend/internal/builder/builder.go b/backend/internal/builder/builder.go index a00ad194a..4916764cd 100644 --- a/backend/internal/builder/builder.go +++ b/backend/internal/builder/builder.go @@ -2,7 +2,6 @@ package builder import ( "openreplay/backend/internal/handlers" - "openreplay/backend/pkg/intervals" . "openreplay/backend/pkg/messages" ) @@ -26,6 +25,15 @@ func (b *builder) iterateReadyMessage(iter func(msg Message)) { b.readyMsgs = nil } +func (b *builder) checkSessionEnd(message Message) { + if _, isEnd := message.(*IOSSessionEnd); isEnd { + b.ended = true + } + if _, isEnd := message.(*SessionEnd); isEnd { + b.ended = true + } +} + func (b *builder) handleMessage(message Message, messageID uint64) { timestamp := GetTimestamp(message) if b.timestamp < timestamp { @@ -36,42 +44,10 @@ func (b *builder) handleMessage(message Message, messageID uint64) { return } - if _, isEnd := message.(*IOSSessionEnd); isEnd { - b.ended = true - } - if _, isEnd := message.(*SessionEnd); isEnd { - b.ended = true - } - + b.checkSessionEnd(message) for _, p := range b.processors { - /* If nil is not returned explicitely by Handle, but as the typed nil - ("var i *IssueEvent; return i;") - The `rm != nil` will be true. - TODO: enforce nil to be nil(?) or add `isNil() bool` to the Message types - because this part is expected to be etendable by user with custom messageProcessor's. - Use of reflrction will be probably bad on millions of messages? - */ if rm := p.Handle(message, messageID, b.timestamp); rm != nil { b.readyMsgs = append(b.readyMsgs, rm) } } } - -func (b *builder) checkTimeouts(ts int64) bool { - if b.timestamp == 0 { - return false // SessionStart happened only - } - - lastTsGap := ts - int64(b.timestamp) - // Maybe listen for `trigger` and react on SessionEnd instead (less reliable) - if lastTsGap > intervals.EVENTS_SESSION_END_TIMEOUT { - for _, p := range b.processors { - // TODO: same as above - if rm := p.Build(); rm != nil { - b.readyMsgs = append(b.readyMsgs, rm) - } - } - return true - } - return false -} diff --git a/backend/internal/builder/builderMap.go b/backend/internal/builder/builderMap.go index 6b2c22bec..b393bcd28 100644 --- a/backend/internal/builder/builderMap.go +++ b/backend/internal/builder/builderMap.go @@ -31,13 +31,19 @@ func (m *builderMap) HandleMessage(sessionID uint64, msg Message, messageID uint b.handleMessage(msg, messageID) } -func (m *builderMap) IterateReadyMessages(operatingTs int64, iter func(sessionID uint64, msg Message)) { +func (m *builderMap) IterateReadyMessages(iter func(sessionID uint64, msg Message)) { for sessionID, b := range m.sessions { - sessionEnded := b.checkTimeouts(operatingTs) + if b.ended { + for _, p := range b.processors { + if rm := p.Build(); rm != nil { + b.readyMsgs = append(b.readyMsgs, rm) + } + } + } b.iterateReadyMessage(func(msg Message) { iter(sessionID, msg) }) - if sessionEnded { + if b.ended { delete(m.sessions, sessionID) } } diff --git a/backend/internal/handlers/custom/mainHandler.go b/backend/internal/handlers/custom/mainHandler.go index 7e653c250..52a6278c0 100644 --- a/backend/internal/handlers/custom/mainHandler.go +++ b/backend/internal/handlers/custom/mainHandler.go @@ -125,9 +125,6 @@ func (b *builder) Handle(message Message, messageID uint64, timestamp uint64) Me b.buildInputEvent() b.ieBuilder.ClearLabels() b.peBuilder.HandleSetPageLocation(msg, messageID, b.timestamp) - // TODO: what to do with this code? - //b.miFinder.HandleSetPageLocation(msg) - //b.ciFinder.HandleSetPageLocation(msg) } case *PageLoadTiming: if rm := b.peBuilder.HandlePageLoadTiming(msg); rm != nil { @@ -137,17 +134,6 @@ func (b *builder) Handle(message Message, messageID uint64, timestamp uint64) Me if rm := b.peBuilder.HandlePageRenderTiming(msg); rm != nil { b.appendReadyMessage(rm) } - case *PerformanceTrack: - // TODO: what to do with this code? - //if rm := b.ptaBuilder.HandlePerformanceTrack(msg, b.timestamp); rm != nil { - // b.appendReadyMessage(rm) - //} - //if rm := b.ciFinder.HandlePerformanceTrack(msg, messageID, b.timestamp); rm != nil { - // b.appendReadyMessage(rm) - //} - //if rm := b.miFinder.HandlePerformanceTrack(msg, messageID, b.timestamp); rm != nil { - // b.appendReadyMessage(rm) - //} case *SetInputTarget: if rm := b.ieBuilder.HandleSetInputTarget(msg); rm != nil { b.appendReadyMessage(rm) @@ -158,10 +144,6 @@ func (b *builder) Handle(message Message, messageID uint64, timestamp uint64) Me } case *MouseClick: b.buildInputEvent() - // TODO: what to do with this code? - //if rm := b.crDetector.HandleMouseClick(msg, messageID, b.timestamp); rm != nil { - // b.appendReadyMessage(rm) - //} if msg.Label != "" { b.appendReadyMessage(&ClickEvent{ MessageID: messageID, @@ -256,20 +238,7 @@ func (b *builder) Handle(message Message, messageID uint64, timestamp uint64) Me Timestamp: b.timestamp, Type: msg.Type, }) - // TODO: what to do with this code? - //case *CreateElementNode, *CreateTextNode: - // b.ddDetector.HandleNodeCreation() - //case *RemoveNode: - // b.ddDetector.HandleNodeRemoval(b.timestamp) - //case *CreateDocument: - // if rm := b.ddDetector.Build(); rm != nil { - // b.appendReadyMessage(rm) - // } } - // TODO: what to do with this code? - //if rm := b.dcDetector.HandleMessage(message, messageID, b.timestamp); rm != nil { - // b.appendReadyMessage(rm) - //} return nil } diff --git a/backend/internal/handlers/ios/appNotResponding.go b/backend/internal/handlers/ios/appNotResponding.go index 097361c00..b5f6cd2f0 100644 --- a/backend/internal/handlers/ios/appNotResponding.go +++ b/backend/internal/handlers/ios/appNotResponding.go @@ -3,7 +3,6 @@ package ios import ( "openreplay/backend/internal/handlers" . "openreplay/backend/pkg/messages" - "time" ) /* @@ -22,9 +21,11 @@ type AppNotResponding struct { lastLabel string lastHeartbeatTimestamp uint64 lastHeartbeatIndex uint64 + lastTimestamp uint64 } func (h *AppNotResponding) Handle(message Message, messageID uint64, timestamp uint64) Message { + h.lastTimestamp = timestamp var event Message = nil switch m := message.(type) { case *IOSClickEvent: @@ -48,7 +49,7 @@ func (h *AppNotResponding) Handle(message Message, messageID uint64, timestamp u } func (h *AppNotResponding) Build() Message { - return h.build(uint64(time.Now().Unix())) + return h.build(h.lastTimestamp) } func (h *AppNotResponding) build(timestamp uint64) Message { diff --git a/backend/internal/handlers/ios/performanceAggregator.go b/backend/internal/handlers/ios/performanceAggregator.go index b4bc812c7..2a9401748 100644 --- a/backend/internal/handlers/ios/performanceAggregator.go +++ b/backend/internal/handlers/ios/performanceAggregator.go @@ -3,7 +3,6 @@ package ios import ( "openreplay/backend/internal/handlers" . "openreplay/backend/pkg/messages" - "time" ) /* @@ -29,14 +28,16 @@ func (va *valueAggregator) aggregate() uint64 { type PerformanceAggregator struct { handlers.ReadyMessageStore - pa *IOSPerformanceAggregated - fps valueAggregator - cpu valueAggregator - memory valueAggregator - battery valueAggregator + pa *IOSPerformanceAggregated + fps valueAggregator + cpu valueAggregator + memory valueAggregator + battery valueAggregator + lastTimestamp uint64 } func (h *PerformanceAggregator) Handle(message Message, messageID uint64, timestamp uint64) Message { + h.lastTimestamp = timestamp if h.pa == nil { h.pa = &IOSPerformanceAggregated{} // TODO: struct type in messages } @@ -94,7 +95,7 @@ func (h *PerformanceAggregator) Handle(message Message, messageID uint64, timest } func (h *PerformanceAggregator) Build() Message { - return h.build(uint64(time.Now().Unix())) + return h.build(h.lastTimestamp) } func (h *PerformanceAggregator) build(timestamp uint64) Message {