diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index b54a8dc15..dbe2fa212 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -2,8 +2,8 @@ package main import ( "log" - "openreplay/backend/internal/builder" "openreplay/backend/internal/config/ender" + builder "openreplay/backend/internal/ender" "time" "os" @@ -49,16 +49,15 @@ func main() { case sig := <-sigchan: log.Printf("Caught signal %v: terminating\n", sig) producer.Close(cfg.ProducerTimeout) - consumer.Commit() + consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP) consumer.Close() os.Exit(0) case <-tick: builderMap.IterateReadyMessages(time.Now().UnixMilli(), func(sessionID uint64, readyMsg messages.Message) { producer.Produce(cfg.TopicTrigger, sessionID, messages.Encode(readyMsg)) }) - // TODO: why exactly do we need Flush here and not in any other place? producer.Flush(cfg.ProducerTimeout) - consumer.Commit() + consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP) default: if err := consumer.ConsumeNext(); err != nil { log.Fatalf("Error on consuming: %v", err) diff --git a/backend/cmd/heuristics/main.go b/backend/cmd/heuristics/main.go new file mode 100644 index 000000000..b33511bd1 --- /dev/null +++ b/backend/cmd/heuristics/main.go @@ -0,0 +1,68 @@ +package main + +import ( + "log" + "openreplay/backend/internal/builder" + "openreplay/backend/internal/config/ender" + "openreplay/backend/pkg/intervals" + logger "openreplay/backend/pkg/log" + "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/queue" + "openreplay/backend/pkg/queue/types" + "os" + "os/signal" + "syscall" + "time" +) + +func main() { + log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) + + cfg := ender.New() + + builderMap := builder.NewBuilderMap() + statsLogger := logger.NewQueueStats(cfg.LoggerTimeout) + producer := queue.NewProducer() + consumer := queue.NewMessageConsumer( + cfg.GroupEvents, + []string{ + cfg.TopicRawWeb, + cfg.TopicRawIOS, + }, + func(sessionID uint64, msg messages.Message, meta *types.Meta) { + statsLogger.Collect(sessionID, meta) + builderMap.HandleMessage(sessionID, msg, msg.Meta().Index) + }, + false, + ) + + tick := time.Tick(intervals.EVENTS_COMMIT_INTERVAL * time.Millisecond) + + sigchan := make(chan os.Signal, 1) + signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) + + log.Printf("Ender service started\n") + for { + select { + case sig := <-sigchan: + log.Printf("Caught signal %v: terminating\n", sig) + producer.Close(cfg.ProducerTimeout) + consumer.Commit() + consumer.Close() + os.Exit(0) + case <-tick: + builderMap.IterateReadyMessages(time.Now().UnixMilli(), func(sessionID uint64, readyMsg messages.Message) { + producer.Produce(cfg.TopicTrigger, sessionID, messages.Encode(readyMsg)) + }) + producer.Flush(cfg.ProducerTimeout) + consumer.Commit() + default: + if err := consumer.ConsumeNext(); err != nil { + log.Fatalf("Error on consuming: %v", err) + } + } + } + + // Config + +} diff --git a/backend/internal/builder/builder.go b/backend/internal/builder/builder.go index 1a89f67b6..bd9f26b19 100644 --- a/backend/internal/builder/builder.go +++ b/backend/internal/builder/builder.go @@ -44,9 +44,9 @@ type builder struct { readyMsgs []Message timestamp uint64 lastProcessedTimestamp int64 - peBuilder *pageEventBuilder + peBuilder *pageEventBuilder // TODO: DB ptaBuilder *performanceTrackAggrBuilder - ieBuilder *inputEventBuilder + ieBuilder *inputEventBuilder // TODO: DB ciFinder *cpuIssueFinder miFinder *memoryIssueFinder ddDetector *domDropDetector @@ -117,6 +117,7 @@ func (b *builder) handleMessage(message Message, messageID uint64) { b.lastProcessedTimestamp = time.Now().UnixMilli() // Might happen before the first timestamp. + // TODO: to DB switch msg := message.(type) { case *SessionStart, *Metadata, @@ -137,7 +138,7 @@ func (b *builder) handleMessage(message Message, messageID uint64) { return } switch msg := message.(type) { - case *SetPageLocation: + case *SetPageLocation: // TODO: DB if msg.NavigationStart == 0 { b.appendReadyMessage(&PageEvent{ URL: msg.URL, @@ -154,11 +155,11 @@ func (b *builder) handleMessage(message Message, messageID uint64) { b.miFinder.HandleSetPageLocation(msg) b.ciFinder.HandleSetPageLocation(msg) } - case *PageLoadTiming: + case *PageLoadTiming: // TODO: DB if rm := b.peBuilder.HandlePageLoadTiming(msg); rm != nil { b.appendReadyMessage(rm) } - case *PageRenderTiming: + case *PageRenderTiming: // TODO: DB if rm := b.peBuilder.HandlePageRenderTiming(msg); rm != nil { b.appendReadyMessage(rm) } @@ -172,20 +173,20 @@ func (b *builder) handleMessage(message Message, messageID uint64) { if rm := b.miFinder.HandlePerformanceTrack(msg, messageID, b.timestamp); rm != nil { b.appendReadyMessage(rm) } - case *SetInputTarget: + case *SetInputTarget: // TODO: DB if rm := b.ieBuilder.HandleSetInputTarget(msg); rm != nil { b.appendReadyMessage(rm) } - case *SetInputValue: + case *SetInputValue: // TODO: DB if rm := b.ieBuilder.HandleSetInputValue(msg, messageID, b.timestamp); rm != nil { b.appendReadyMessage(rm) } - case *MouseClick: + case *MouseClick: // TODO: DB b.buildInputEvent() if rm := b.crDetector.HandleMouseClick(msg, messageID, b.timestamp); rm != nil { b.appendReadyMessage(rm) } - if msg.Label != "" { + if msg.Label != "" { // TODO: DB b.appendReadyMessage(&ClickEvent{ MessageID: messageID, Label: msg.Label, @@ -195,7 +196,7 @@ func (b *builder) handleMessage(message Message, messageID uint64) { }) } case *JSException: - b.appendReadyMessage(&ErrorEvent{ + b.appendReadyMessage(&ErrorEvent{ // TODO: DB MessageID: messageID, Timestamp: b.timestamp, Source: "js_exception", @@ -206,7 +207,7 @@ func (b *builder) handleMessage(message Message, messageID uint64) { case *ResourceTiming: tp := getResourceType(msg.Initiator, msg.URL) success := msg.Duration != 0 - b.appendReadyMessage(&ResourceEvent{ + b.appendReadyMessage(&ResourceEvent{ // TODO: DB MessageID: messageID, Timestamp: msg.Timestamp, Duration: msg.Duration, @@ -231,14 +232,14 @@ func (b *builder) handleMessage(message Message, messageID uint64) { }) } case *RawCustomEvent: - b.appendReadyMessage(&CustomEvent{ + b.appendReadyMessage(&CustomEvent{ // TODO: DB MessageID: messageID, Timestamp: b.timestamp, Name: msg.Name, Payload: msg.Payload, }) case *CustomIssue: - b.appendReadyMessage(&IssueEvent{ + b.appendReadyMessage(&IssueEvent{ // TODO: DB Type: "custom", Timestamp: b.timestamp, MessageID: messageID, @@ -246,7 +247,7 @@ func (b *builder) handleMessage(message Message, messageID uint64) { Payload: msg.Payload, }) case *Fetch: - b.appendReadyMessage(&FetchEvent{ + b.appendReadyMessage(&FetchEvent{ // TODO: DB MessageID: messageID, Timestamp: msg.Timestamp, Method: msg.Method, @@ -265,7 +266,7 @@ func (b *builder) handleMessage(message Message, messageID uint64) { }) } case *GraphQL: - b.appendReadyMessage(&GraphQLEvent{ + b.appendReadyMessage(&GraphQLEvent{ // TODO: DB MessageID: messageID, Timestamp: b.timestamp, OperationKind: msg.OperationKind, @@ -274,7 +275,7 @@ func (b *builder) handleMessage(message Message, messageID uint64) { Response: msg.Response, }) case *StateAction: - b.appendReadyMessage(&StateActionEvent{ + b.appendReadyMessage(&StateActionEvent{ // TODO: DB MessageID: messageID, Timestamp: b.timestamp, Type: msg.Type, diff --git a/backend/internal/builder/builderMap.go b/backend/internal/builder/builderMap.go index 3f3e4d6e3..b7885da92 100644 --- a/backend/internal/builder/builderMap.go +++ b/backend/internal/builder/builderMap.go @@ -26,18 +26,6 @@ func (m builderMap) HandleMessage(sessionID uint64, msg Message, messageID uint6 b.handleMessage(msg, messageID) } -func (m builderMap) IterateSessionReadyMessages(sessionID uint64, operatingTs int64, iter func(msg Message)) { - b, ok := m[sessionID] - if !ok { - return - } - sessionEnded := b.checkTimeouts(operatingTs) - b.iterateReadyMessage(iter) - if sessionEnded { - delete(m, sessionID) - } -} - func (m builderMap) IterateReadyMessages(operatingTs int64, iter func(sessionID uint64, msg Message)) { for sessionID, b := range m { sessionEnded := b.checkTimeouts(operatingTs) diff --git a/backend/internal/ender/builder.go b/backend/internal/ender/builder.go new file mode 100644 index 000000000..0389f74d1 --- /dev/null +++ b/backend/internal/ender/builder.go @@ -0,0 +1,56 @@ +package builder + +import ( + "log" + "openreplay/backend/pkg/intervals" + . "openreplay/backend/pkg/messages" +) + +type builder struct { + readyMsgs []Message + timestamp uint64 + sid uint64 +} + +func NewBuilder() *builder { + return &builder{} +} + +func (b *builder) appendReadyMessage(msg Message) { // interface is never nil even if it holds nil value + b.readyMsgs = append(b.readyMsgs, msg) +} + +func (b *builder) buildSessionEnd() { + if b.timestamp == 0 { + return + } + sessionEnd := &SessionEnd{ + Timestamp: b.timestamp, + } + b.appendReadyMessage(sessionEnd) +} + +func (b *builder) handleMessage(message Message, messageID uint64) { + timestamp := GetTimestamp(message) + if b.timestamp < timestamp { + b.timestamp = timestamp + } + + if b.timestamp == 0 { + log.Printf("Empty timestamp, sessionID: %d, messageID: %d", b.sid, messageID) + return + } +} + +func (b *builder) checkTimeouts(ts int64) bool { + if b.timestamp == 0 { + return false // There was no timestamp events yet + } + + lastTsGap := ts - int64(b.timestamp) + if lastTsGap > intervals.EVENTS_SESSION_END_TIMEOUT { + b.buildSessionEnd() + return true + } + return false +} diff --git a/backend/internal/ender/builderMap.go b/backend/internal/ender/builderMap.go new file mode 100644 index 000000000..6eba1f9ad --- /dev/null +++ b/backend/internal/ender/builderMap.go @@ -0,0 +1,36 @@ +package builder + +import ( + . "openreplay/backend/pkg/messages" +) + +type builderMap map[uint64]*builder + +func NewBuilderMap() builderMap { + return make(builderMap) +} + +func (m builderMap) GetBuilder(sessionID uint64) *builder { + b := m[sessionID] + if b == nil { + b = NewBuilder() + m[sessionID] = b + b.sid = sessionID + + } + return b +} + +func (m builderMap) HandleMessage(sessionID uint64, msg Message, messageID uint64) { + b := m.GetBuilder(sessionID) + b.handleMessage(msg, messageID) +} + +func (m builderMap) IterateReadyMessages(operatingTs int64, iter func(sessionID uint64, msg Message)) { + for sessionID, b := range m { + sessionEnded := b.checkTimeouts(operatingTs) + if sessionEnded { + delete(m, sessionID) + } + } +} diff --git a/backend/pkg/intervals/intervals.go b/backend/pkg/intervals/intervals.go index c4dfbc835..2ce13ed5e 100644 --- a/backend/pkg/intervals/intervals.go +++ b/backend/pkg/intervals/intervals.go @@ -1,11 +1,11 @@ package intervals -const EVENTS_COMMIT_INTERVAL = 30 * 1000 -const HEARTBEAT_INTERVAL = 2 * 60 * 1000 -const INTEGRATIONS_REQUEST_INTERVAL = 1 * 60 * 1000 -const EVENTS_PAGE_EVENT_TIMEOUT = 2 * 60 * 1000 -const EVENTS_INPUT_EVENT_TIMEOUT = 2 * 60 * 1000 +const EVENTS_COMMIT_INTERVAL = 30 * 1000 // как часто комитим сообщения в кафке (ender) +const HEARTBEAT_INTERVAL = 2 * 60 * 1000 // максимальный таймаут от трекера в рамках сессии +const INTEGRATIONS_REQUEST_INTERVAL = 1 * 60 * 1000 // интеграции +const EVENTS_PAGE_EVENT_TIMEOUT = 2 * 60 * 1000 // таймаут пейдж ивента +const EVENTS_INPUT_EVENT_TIMEOUT = 2 * 60 * 1000 // const EVENTS_PERFORMANCE_AGGREGATION_TIMEOUT = 2 * 60 * 1000 const EVENTS_SESSION_END_TIMEOUT = HEARTBEAT_INTERVAL + 30*1000 const EVENTS_SESSION_END_TIMEOUT_WITH_INTEGRATIONS = HEARTBEAT_INTERVAL + 3*60*1000 -const EVENTS_BACK_COMMIT_GAP = EVENTS_SESSION_END_TIMEOUT_WITH_INTEGRATIONS + 1*60*1000 +const EVENTS_BACK_COMMIT_GAP = EVENTS_SESSION_END_TIMEOUT_WITH_INTEGRATIONS + 1*60*1000 // для бэк коммита