diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index f6cd481a7..564fcbab5 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -2,9 +2,12 @@ package main import ( "log" + "openreplay/backend/internal/builder" "openreplay/backend/internal/config/db" "openreplay/backend/internal/datasaver" - "openreplay/backend/internal/heuristics" + "openreplay/backend/internal/handlers" + "openreplay/backend/internal/handlers/custom" + "openreplay/backend/pkg/intervals" "time" "os" @@ -28,8 +31,17 @@ func main() { pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres), cfg.ProjectExpirationTimeoutMs) defer pg.Close() + // Declare message handlers we want to apply for each incoming message + msgHandlers := []handlers.MessageProcessor{ + custom.NewMainHandler(), + custom.NewInputEventBuilder(), + custom.NewPageEventBuilder(), + } + + // Create handler's aggregator + builderMap := builder.NewBuilderMap(msgHandlers...) + // Init modules - heurFinder := heuristics.NewHandler() saver := datasaver.New(pg) statsLogger := logger.NewQueueStats(cfg.LoggerTimeout) @@ -42,6 +54,7 @@ func main() { if !postgres.IsPkeyViolation(err) { log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg) } + // TODO: can we lose data here because of db error? return } @@ -60,10 +73,10 @@ func main() { } // Handle heuristics and save to temporary queue in memory - heurFinder.HandleMessage(sessionID, msg) + builderMap.HandleMessage(sessionID, msg, msg.Meta().Index) // Process saved heuristics messages as usual messages above in the code - heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) { + builderMap.IterateSessionReadyMessages(sessionID, func(msg messages.Message) { // TODO: DRY code (carefully with the return statement logic) if err := saver.InsertMessage(sessionID, msg); err != nil { if !postgres.IsPkeyViolation(err) { @@ -82,8 +95,9 @@ func main() { consumer := queue.NewMessageConsumer( cfg.GroupDB, []string{ + cfg.TopicRawWeb, // TODO: is it necessary or not? cfg.TopicRawIOS, - cfg.TopicTrigger, + cfg.TopicTrigger, // to receive SessionEnd events }, handler, false, @@ -94,19 +108,22 @@ func main() { sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - tick := time.Tick(cfg.CommitBatchTimeout) + commitTick := time.Tick(cfg.CommitBatchTimeout) + checkTick := time.Tick(intervals.EVENTS_COMMIT_INTERVAL * time.Millisecond) for { select { case sig := <-sigchan: log.Printf("Caught signal %v: terminating\n", sig) consumer.Close() os.Exit(0) - case <-tick: + case <-commitTick: pg.CommitBatches() // TODO?: separate stats & regular messages if err := consumer.Commit(); err != nil { log.Printf("Error on consumer commit: %v", err) } + case <-checkTick: + // checkTimeout default: err := consumer.ConsumeNext() if err != nil { diff --git a/backend/cmd/heuristics/main.go b/backend/cmd/heuristics/main.go index f5e3b675f..ddfd29095 100644 --- a/backend/cmd/heuristics/main.go +++ b/backend/cmd/heuristics/main.go @@ -54,6 +54,7 @@ func main() { []string{ cfg.TopicRawWeb, cfg.TopicRawIOS, + cfg.TopicTrigger, // to receive SessionEnd events }, func(sessionID uint64, msg messages.Message, meta *types.Meta) { statsLogger.Collect(sessionID, meta) diff --git a/backend/internal/builder/builder.go b/backend/internal/builder/builder.go index c35457d62..a00ad194a 100644 --- a/backend/internal/builder/builder.go +++ b/backend/internal/builder/builder.go @@ -10,6 +10,7 @@ type builder struct { readyMsgs []Message timestamp uint64 processors []handlers.MessageProcessor + ended bool } func NewBuilder(handlers ...handlers.MessageProcessor) *builder { @@ -35,6 +36,13 @@ func (b *builder) handleMessage(message Message, messageID uint64) { return } + if _, isEnd := message.(*IOSSessionEnd); isEnd { + b.ended = true + } + if _, isEnd := message.(*SessionEnd); isEnd { + b.ended = true + } + for _, p := range b.processors { /* If nil is not returned explicitely by Handle, but as the typed nil ("var i *IssueEvent; return i;") diff --git a/backend/internal/builder/builderMap.go b/backend/internal/builder/builderMap.go index 5bc01e78d..6b2c22bec 100644 --- a/backend/internal/builder/builderMap.go +++ b/backend/internal/builder/builderMap.go @@ -42,3 +42,14 @@ func (m *builderMap) IterateReadyMessages(operatingTs int64, iter func(sessionID } } } + +func (m *builderMap) IterateSessionReadyMessages(sessionID uint64, iter func(msg Message)) { + session, ok := m.sessions[sessionID] + if !ok { + return + } + session.iterateReadyMessage(iter) + if session.ended { + delete(m.sessions, sessionID) + } +} diff --git a/backend/internal/config/db/config.go b/backend/internal/config/db/config.go index fb35a199c..e074399dc 100644 --- a/backend/internal/config/db/config.go +++ b/backend/internal/config/db/config.go @@ -10,6 +10,7 @@ type Config struct { ProjectExpirationTimeoutMs int64 LoggerTimeout int GroupDB string + TopicRawWeb string TopicRawIOS string TopicTrigger string CommitBatchTimeout time.Duration @@ -21,6 +22,7 @@ func New() *Config { ProjectExpirationTimeoutMs: 1000 * 60 * 20, LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"), GroupDB: env.String("GROUP_DB"), + TopicRawWeb: env.String("TOPIC_RAW_WEB"), TopicRawIOS: env.String("TOPIC_RAW_IOS"), TopicTrigger: env.String("TOPIC_TRIGGER"), CommitBatchTimeout: 15 * time.Second, diff --git a/backend/internal/heuristics/inputEventBuilder.go b/backend/internal/handlers/custom/inputEventBuilder.go similarity index 80% rename from backend/internal/heuristics/inputEventBuilder.go rename to backend/internal/handlers/custom/inputEventBuilder.go index 624e15e47..770e714af 100644 --- a/backend/internal/heuristics/inputEventBuilder.go +++ b/backend/internal/handlers/custom/inputEventBuilder.go @@ -1,4 +1,4 @@ -package heuristics +package custom import ( . "openreplay/backend/pkg/messages" @@ -12,6 +12,17 @@ type inputEventBuilder struct { inputID uint64 } +func (b *inputEventBuilder) Handle(message Message, messageID uint64, timestamp uint64) Message { + //TODO implement me + panic("implement me") +} + +func (b *inputEventBuilder) Build() Message { + // b.build() + //TODO implement me + panic("implement me") +} + func NewInputEventBuilder() *inputEventBuilder { ieBuilder := &inputEventBuilder{} ieBuilder.ClearLabels() @@ -25,7 +36,7 @@ func (b *inputEventBuilder) ClearLabels() { func (b *inputEventBuilder) HandleSetInputTarget(msg *SetInputTarget) *InputEvent { var inputEvent *InputEvent if b.inputID != msg.ID { - inputEvent = b.Build() + inputEvent = b.build() b.inputID = msg.ID } b.inputLabels[msg.ID] = msg.Label @@ -35,7 +46,7 @@ func (b *inputEventBuilder) HandleSetInputTarget(msg *SetInputTarget) *InputEven func (b *inputEventBuilder) HandleSetInputValue(msg *SetInputValue, messageID uint64, timestamp uint64) *InputEvent { var inputEvent *InputEvent if b.inputID != msg.ID { - inputEvent = b.Build() + inputEvent = b.build() b.inputID = msg.ID } if b.inputEvent == nil { @@ -63,7 +74,7 @@ func (b *inputEventBuilder) GetTimestamp() uint64 { return b.inputEvent.Timestamp } -func (b *inputEventBuilder) Build() *InputEvent { +func (b *inputEventBuilder) build() *InputEvent { if b.inputEvent == nil { return nil } diff --git a/backend/internal/handlers/custom/mainHandler.go b/backend/internal/handlers/custom/mainHandler.go new file mode 100644 index 000000000..7e653c250 --- /dev/null +++ b/backend/internal/handlers/custom/mainHandler.go @@ -0,0 +1,288 @@ +package custom + +import ( + "net/url" + "openreplay/backend/pkg/intervals" + "strings" + "time" + + . "openreplay/backend/pkg/messages" +) + +func getURLExtention(URL string) string { + u, err := url.Parse(URL) + if err != nil { + return "" + } + i := strings.LastIndex(u.Path, ".") + return u.Path[i+1:] +} + +func getResourceType(initiator string, URL string) string { + switch initiator { + case "xmlhttprequest", "fetch": + return "fetch" + case "img": + return "img" + default: + switch getURLExtention(URL) { + case "css": + return "stylesheet" + case "js": + return "script" + case "png", "gif", "jpg", "jpeg", "svg": + return "img" + case "mp4", "mkv", "ogg", "webm", "avi", "mp3": + return "media" + default: + return "other" + } + } +} + +type builder struct { + readyMsgs []Message + timestamp uint64 + lastProcessedTimestamp int64 + peBuilder *pageEventBuilder + ieBuilder *inputEventBuilder + integrationsWaiting bool + sid uint64 +} + +func (b *builder) Build() Message { + //TODO implement me + panic("implement me") +} + +func NewMainHandler() *builder { + return &builder{ + peBuilder: &pageEventBuilder{}, + ieBuilder: NewInputEventBuilder(), + integrationsWaiting: true, + } +} + +func (b *builder) appendReadyMessage(msg Message) { // interface is never nil even if it holds nil value + b.readyMsgs = append(b.readyMsgs, msg) +} + +func (b *builder) iterateReadyMessage(iter func(msg Message)) { + for _, readyMsg := range b.readyMsgs { + iter(readyMsg) + } + b.readyMsgs = nil +} + +func (b *builder) buildPageEvent() { + if msg := b.peBuilder.Build(); msg != nil { + b.appendReadyMessage(msg) + } +} + +func (b *builder) buildInputEvent() { + if msg := b.ieBuilder.Build(); msg != nil { + b.appendReadyMessage(msg) + } +} + +func (b *builder) Handle(message Message, messageID uint64, timestamp uint64) Message { + b.timestamp = timestamp + b.lastProcessedTimestamp = time.Now().UnixMilli() + + // Might happen before the first timestamp. + switch msg := message.(type) { + case *SessionStart, + *Metadata, + *UserID, + *UserAnonymousID: + b.appendReadyMessage(msg) + case *RawErrorEvent: + b.appendReadyMessage(&ErrorEvent{ + MessageID: messageID, + Timestamp: msg.Timestamp, + Source: msg.Source, + Name: msg.Name, + Message: msg.Message, + Payload: msg.Payload, + }) + } + if b.timestamp == 0 { + return nil + } + switch msg := message.(type) { + case *SetPageLocation: + if msg.NavigationStart == 0 { + b.appendReadyMessage(&PageEvent{ + URL: msg.URL, + Referrer: msg.Referrer, + Loaded: false, + MessageID: messageID, + Timestamp: b.timestamp, + }) + } else { + b.buildPageEvent() + b.buildInputEvent() + b.ieBuilder.ClearLabels() + b.peBuilder.HandleSetPageLocation(msg, messageID, b.timestamp) + // TODO: what to do with this code? + //b.miFinder.HandleSetPageLocation(msg) + //b.ciFinder.HandleSetPageLocation(msg) + } + case *PageLoadTiming: + if rm := b.peBuilder.HandlePageLoadTiming(msg); rm != nil { + b.appendReadyMessage(rm) + } + case *PageRenderTiming: + if rm := b.peBuilder.HandlePageRenderTiming(msg); rm != nil { + b.appendReadyMessage(rm) + } + case *PerformanceTrack: + // TODO: what to do with this code? + //if rm := b.ptaBuilder.HandlePerformanceTrack(msg, b.timestamp); rm != nil { + // b.appendReadyMessage(rm) + //} + //if rm := b.ciFinder.HandlePerformanceTrack(msg, messageID, b.timestamp); rm != nil { + // b.appendReadyMessage(rm) + //} + //if rm := b.miFinder.HandlePerformanceTrack(msg, messageID, b.timestamp); rm != nil { + // b.appendReadyMessage(rm) + //} + case *SetInputTarget: + if rm := b.ieBuilder.HandleSetInputTarget(msg); rm != nil { + b.appendReadyMessage(rm) + } + case *SetInputValue: + if rm := b.ieBuilder.HandleSetInputValue(msg, messageID, b.timestamp); rm != nil { + b.appendReadyMessage(rm) + } + case *MouseClick: + b.buildInputEvent() + // TODO: what to do with this code? + //if rm := b.crDetector.HandleMouseClick(msg, messageID, b.timestamp); rm != nil { + // b.appendReadyMessage(rm) + //} + if msg.Label != "" { + b.appendReadyMessage(&ClickEvent{ + MessageID: messageID, + Label: msg.Label, + HesitationTime: msg.HesitationTime, + Timestamp: b.timestamp, + Selector: msg.Selector, + }) + } + case *JSException: + b.appendReadyMessage(&ErrorEvent{ + MessageID: messageID, + Timestamp: b.timestamp, + Source: "js_exception", + Name: msg.Name, + Message: msg.Message, + Payload: msg.Payload, + }) + case *ResourceTiming: + tp := getResourceType(msg.Initiator, msg.URL) + success := msg.Duration != 0 + b.appendReadyMessage(&ResourceEvent{ + MessageID: messageID, + Timestamp: msg.Timestamp, + Duration: msg.Duration, + TTFB: msg.TTFB, + HeaderSize: msg.HeaderSize, + EncodedBodySize: msg.EncodedBodySize, + DecodedBodySize: msg.DecodedBodySize, + URL: msg.URL, + Type: tp, + Success: success, + }) + if !success { + issueType := "missing_resource" + if tp == "fetch" { + issueType = "bad_request" + } + b.appendReadyMessage(&IssueEvent{ + Type: issueType, + MessageID: messageID, + Timestamp: msg.Timestamp, + ContextString: msg.URL, + }) + } + case *RawCustomEvent: + b.appendReadyMessage(&CustomEvent{ + MessageID: messageID, + Timestamp: b.timestamp, + Name: msg.Name, + Payload: msg.Payload, + }) + case *CustomIssue: + b.appendReadyMessage(&IssueEvent{ + Type: "custom", + Timestamp: b.timestamp, + MessageID: messageID, + ContextString: msg.Name, + Payload: msg.Payload, + }) + case *Fetch: + b.appendReadyMessage(&FetchEvent{ + MessageID: messageID, + Timestamp: msg.Timestamp, + Method: msg.Method, + URL: msg.URL, + Request: msg.Request, + Response: msg.Response, + Status: msg.Status, + Duration: msg.Duration, + }) + if msg.Status >= 400 { + b.appendReadyMessage(&IssueEvent{ + Type: "bad_request", + MessageID: messageID, + Timestamp: msg.Timestamp, + ContextString: msg.URL, + }) + } + case *GraphQL: + b.appendReadyMessage(&GraphQLEvent{ + MessageID: messageID, + Timestamp: b.timestamp, + OperationKind: msg.OperationKind, + OperationName: msg.OperationName, + Variables: msg.Variables, + Response: msg.Response, + }) + case *StateAction: + b.appendReadyMessage(&StateActionEvent{ + MessageID: messageID, + Timestamp: b.timestamp, + Type: msg.Type, + }) + // TODO: what to do with this code? + //case *CreateElementNode, *CreateTextNode: + // b.ddDetector.HandleNodeCreation() + //case *RemoveNode: + // b.ddDetector.HandleNodeRemoval(b.timestamp) + //case *CreateDocument: + // if rm := b.ddDetector.Build(); rm != nil { + // b.appendReadyMessage(rm) + // } + } + // TODO: what to do with this code? + //if rm := b.dcDetector.HandleMessage(message, messageID, b.timestamp); rm != nil { + // b.appendReadyMessage(rm) + //} + return nil +} + +func (b *builder) checkTimeouts(ts int64) bool { + if b.timestamp == 0 { + return false // There was no timestamp events yet + } + + if b.peBuilder.HasInstance() && int64(b.peBuilder.GetTimestamp())+intervals.EVENTS_PAGE_EVENT_TIMEOUT < ts { + b.buildPageEvent() + } + if b.ieBuilder.HasInstance() && int64(b.ieBuilder.GetTimestamp())+intervals.EVENTS_INPUT_EVENT_TIMEOUT < ts { + b.buildInputEvent() + } + return false +} diff --git a/backend/internal/heuristics/pageEventBuilder.go b/backend/internal/handlers/custom/pageEventBuilder.go similarity index 83% rename from backend/internal/heuristics/pageEventBuilder.go rename to backend/internal/handlers/custom/pageEventBuilder.go index 96a1b287e..765fd31a2 100644 --- a/backend/internal/heuristics/pageEventBuilder.go +++ b/backend/internal/handlers/custom/pageEventBuilder.go @@ -1,4 +1,4 @@ -package heuristics +package custom import ( . "openreplay/backend/pkg/messages" @@ -9,9 +9,25 @@ type pageEventBuilder struct { firstTimingHandled bool } +func (b *pageEventBuilder) Handle(message Message, messageID uint64, timestamp uint64) Message { + //TODO implement me + panic("implement me") +} + +func (b *pageEventBuilder) Build() Message { + // b.build() + //TODO implement me + panic("implement me") +} + +func NewPageEventBuilder() *pageEventBuilder { + ieBuilder := &pageEventBuilder{} + return ieBuilder +} + func (b *pageEventBuilder) buildIfTimingsComplete() *PageEvent { if b.firstTimingHandled { - return b.Build() + return b.build() } b.firstTimingHandled = true return nil @@ -83,7 +99,7 @@ func (b *pageEventBuilder) GetTimestamp() uint64 { return b.pageEvent.Timestamp } -func (b *pageEventBuilder) Build() *PageEvent { +func (b *pageEventBuilder) build() *PageEvent { pageEvent := b.pageEvent b.pageEvent = nil b.firstTimingHandled = false diff --git a/backend/internal/heuristics/heuristics.go b/backend/internal/heuristics/heuristics.go deleted file mode 100644 index c55ad33b3..000000000 --- a/backend/internal/heuristics/heuristics.go +++ /dev/null @@ -1,36 +0,0 @@ -package heuristics - -import ( - . "openreplay/backend/pkg/messages" -) - -type mainHandler map[uint64]*sessHandler - -func NewHandler() mainHandler { - return make(mainHandler) -} - -func (m mainHandler) getSessHandler(sessionID uint64) *sessHandler { - s := m[sessionID] - if s == nil { - s = newSessHandler() - m[sessionID] = s - } - return s -} - -func (m mainHandler) HandleMessage(sessionID uint64, msg Message) { - s := m.getSessHandler(sessionID) - s.HandleMessage(msg) -} - -func (m mainHandler) IterateSessionReadyMessages(sessionID uint64, iter func(msg Message)) { - s, ok := m[sessionID] - if !ok { - return - } - s.IterateReadyMessages(iter) - if s.IsEnded() { - delete(m, sessionID) - } -} diff --git a/backend/internal/heuristics/session.go b/backend/internal/heuristics/session.go deleted file mode 100644 index 3c7951750..000000000 --- a/backend/internal/heuristics/session.go +++ /dev/null @@ -1,43 +0,0 @@ -package heuristics - -import ( - . "openreplay/backend/pkg/messages" -) - -type Handler interface { - HandleMessage(Message) - IterateReadyMessages(func(Message)) -} - -type sessHandler struct { - handlers []Handler - ended bool -} - -func newSessHandler() *sessHandler { - return &sessHandler{ - handlers: []Handler{}, - } -} - -func (s *sessHandler) HandleMessage(msg Message) { - for _, h := range s.handlers { - h.HandleMessage(msg) - } - if _, isEnd := msg.(*IOSSessionEnd); isEnd { - s.ended = true - } - if _, isEnd := msg.(*SessionEnd); isEnd { - s.ended = true - } -} - -func (s *sessHandler) IterateReadyMessages(cb func(msg Message)) { - for _, h := range s.handlers { - h.IterateReadyMessages(cb) - } -} - -func (s *sessHandler) IsEnded() bool { - return s.ended -}