diff --git a/backend/build.sh b/backend/build.sh index a4d95341a..e67178c8c 100755 --- a/backend/build.sh +++ b/backend/build.sh @@ -23,7 +23,7 @@ function build_service() { image="$1" echo "BUILDING $image" case "$image" in - http | db | sink) + http | db | sink | ender | heuristics) echo build http docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --platform linux/amd64 --build-arg SERVICE_NAME=$image -f ./cmd/Dockerfile . [[ $PUSH_IMAGE -eq 1 ]] && { diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index 962057213..1c1d3bf0e 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -2,9 +2,11 @@ package main import ( "log" + "openreplay/backend/internal/builder" "openreplay/backend/internal/config/db" "openreplay/backend/internal/datasaver" - "openreplay/backend/internal/heuristics" + "openreplay/backend/internal/handlers" + "openreplay/backend/internal/handlers/custom" "time" "os" @@ -28,9 +30,21 @@ func main() { pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres), cfg.ProjectExpirationTimeoutMs) defer pg.Close() + // HandlersFabric returns the list of message handlers we want to be applied to each incoming message. + handlersFabric := func() []handlers.MessageProcessor { + return []handlers.MessageProcessor{ + &custom.EventMapper{}, + custom.NewInputEventBuilder(), + custom.NewPageEventBuilder(), + } + } + + // Create handler's aggregator + builderMap := builder.NewBuilderMap(handlersFabric) + // Init modules - heurFinder := heuristics.NewHandler() saver := datasaver.New(pg) + saver.InitStats() statsLogger := logger.NewQueueStats(cfg.LoggerTimeout) // Handler logic @@ -60,10 +74,10 @@ func main() { } // Handle heuristics and save to temporary queue in memory - heurFinder.HandleMessage(session, msg) + builderMap.HandleMessage(sessionID, msg, msg.Meta().Index) // Process saved heuristics messages as usual messages above in the code - heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) { + builderMap.IterateSessionReadyMessages(sessionID, func(msg messages.Message) { // TODO: DRY code (carefully with the return statement logic) if err := saver.InsertMessage(sessionID, msg); err != nil { if !postgres.IsPkeyViolation(err) { @@ -82,8 +96,9 @@ func main() { consumer := queue.NewMessageConsumer( cfg.GroupDB, []string{ + cfg.TopicRawWeb, cfg.TopicRawIOS, - cfg.TopicTrigger, + cfg.TopicTrigger, // to receive SessionEnd events }, handler, false, @@ -94,15 +109,18 @@ func main() { sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - tick := time.Tick(cfg.CommitBatchTimeout) + commitTick := time.Tick(cfg.CommitBatchTimeout) for { select { case sig := <-sigchan: log.Printf("Caught signal %v: terminating\n", sig) consumer.Close() os.Exit(0) - case <-tick: + case <-commitTick: pg.CommitBatches() + if err := saver.CommitStats(); err != nil { + log.Printf("Error on stats commit: %v", err) + } // TODO?: separate stats & regular messages if err := consumer.Commit(); err != nil { log.Printf("Error on consumer commit: %v", err) diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go new file mode 100644 index 000000000..5d82b67db --- /dev/null +++ b/backend/cmd/ender/main.go @@ -0,0 +1,79 @@ +package main + +import ( + "log" + "openreplay/backend/internal/config/ender" + "openreplay/backend/internal/sessionender" + "time" + + "os" + "os/signal" + "syscall" + + "openreplay/backend/pkg/intervals" + logger "openreplay/backend/pkg/log" + "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/queue" + "openreplay/backend/pkg/queue/types" +) + +func main() { + log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) + + // Load service configuration + cfg := ender.New() + + // Init all modules + statsLogger := logger.NewQueueStats(cfg.LoggerTimeout) + sessions := sessionender.New(intervals.EVENTS_SESSION_END_TIMEOUT) + producer := queue.NewProducer() + consumer := queue.NewMessageConsumer( + cfg.GroupEvents, + []string{ + cfg.TopicRawWeb, + cfg.TopicRawIOS, + }, + func(sessionID uint64, msg messages.Message, meta *types.Meta) { + statsLogger.Collect(sessionID, meta) + sessions.UpdateSession(sessionID, messages.GetTimestamp(msg)) + }, + false, + ) + + log.Printf("Ender service started\n") + + sigchan := make(chan os.Signal, 1) + signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) + + tick := time.Tick(intervals.EVENTS_COMMIT_INTERVAL * time.Millisecond) + for { + select { + case sig := <-sigchan: + log.Printf("Caught signal %v: terminating\n", sig) + producer.Close(cfg.ProducerTimeout) + if err := consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP); err != nil { + log.Printf("can't commit messages with offset: %s", err) + } + consumer.Close() + os.Exit(0) + case <-tick: + // Find ended sessions and send notification to other services + sessions.HandleEndedSessions(func(sessionID uint64, timestamp int64) bool { + msg := &messages.SessionEnd{Timestamp: uint64(timestamp)} + if err := producer.Produce(cfg.TopicTrigger, sessionID, messages.Encode(msg)); err != nil { + log.Printf("can't send message to queue: %s", err) + return false + } + return true + }) + producer.Flush(cfg.ProducerTimeout) + if err := consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP); err != nil { + log.Printf("can't commit messages with offset: %s", err) + } + default: + if err := consumer.ConsumeNext(); err != nil { + log.Fatalf("Error on consuming: %v", err) + } + } + } +} diff --git a/backend/cmd/heuristics/main.go b/backend/cmd/heuristics/main.go new file mode 100644 index 000000000..6edf01a92 --- /dev/null +++ b/backend/cmd/heuristics/main.go @@ -0,0 +1,95 @@ +package main + +import ( + "log" + "openreplay/backend/internal/builder" + "openreplay/backend/internal/config/ender" + "openreplay/backend/internal/handlers" + "openreplay/backend/internal/handlers/custom" + "openreplay/backend/internal/handlers/ios" + "openreplay/backend/internal/handlers/web" + "openreplay/backend/pkg/intervals" + logger "openreplay/backend/pkg/log" + "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/queue" + "openreplay/backend/pkg/queue/types" + "os" + "os/signal" + "syscall" + "time" +) + +func main() { + log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) + + // Load service configuration + cfg := ender.New() + + // HandlersFabric returns the list of message handlers we want to be applied to each incoming message. + handlersFabric := func() []handlers.MessageProcessor { + return []handlers.MessageProcessor{ + // web handlers + &web.ClickRageDetector{}, + &web.CpuIssueDetector{}, + &web.DeadClickDetector{}, + &web.MemoryIssueDetector{}, + &web.NetworkIssueDetector{}, + &web.PerformanceAggregator{}, + // iOS handlers + &ios.AppNotResponding{}, + &ios.ClickRageDetector{}, + &ios.PerformanceAggregator{}, + // Other handlers (you can add your custom handlers here) + &custom.CustomHandler{}, + } + } + + // Create handler's aggregator + builderMap := builder.NewBuilderMap(handlersFabric) + + // Init logger + statsLogger := logger.NewQueueStats(cfg.LoggerTimeout) + + // Init producer and consumer for data bus + producer := queue.NewProducer() + consumer := queue.NewMessageConsumer( + cfg.GroupEvents, + []string{ + cfg.TopicRawWeb, + cfg.TopicRawIOS, + cfg.TopicTrigger, // to receive SessionEnd events + }, + func(sessionID uint64, msg messages.Message, meta *types.Meta) { + statsLogger.Collect(sessionID, meta) + builderMap.HandleMessage(sessionID, msg, msg.Meta().Index) + }, + false, + ) + + log.Printf("Heuristics service started\n") + + sigchan := make(chan os.Signal, 1) + signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) + + tick := time.Tick(intervals.EVENTS_COMMIT_INTERVAL * time.Millisecond) + for { + select { + case sig := <-sigchan: + log.Printf("Caught signal %v: terminating\n", sig) + producer.Close(cfg.ProducerTimeout) + consumer.Commit() + consumer.Close() + os.Exit(0) + case <-tick: + builderMap.IterateReadyMessages(func(sessionID uint64, readyMsg messages.Message) { + producer.Produce(cfg.TopicTrigger, sessionID, messages.Encode(readyMsg)) + }) + producer.Flush(cfg.ProducerTimeout) + consumer.Commit() + default: + if err := consumer.ConsumeNext(); err != nil { + log.Fatalf("Error on consuming: %v", err) + } + } + } +} diff --git a/backend/internal/builder/builder.go b/backend/internal/builder/builder.go new file mode 100644 index 000000000..dd7bb675a --- /dev/null +++ b/backend/internal/builder/builder.go @@ -0,0 +1,67 @@ +package builder + +import ( + "time" + + "openreplay/backend/internal/handlers" + . "openreplay/backend/pkg/messages" +) + +type builder struct { + readyMsgs []Message + timestamp uint64 + lastMessageID uint64 + lastSystemTime time.Time + processors []handlers.MessageProcessor + ended bool +} + +func NewBuilder(handlers ...handlers.MessageProcessor) *builder { + return &builder{ + processors: handlers, + } +} + +func (b *builder) iterateReadyMessages(iter func(msg Message)) { + for _, readyMsg := range b.readyMsgs { + iter(readyMsg) + } + b.readyMsgs = nil +} + +func (b *builder) checkSessionEnd(message Message) { + if _, isEnd := message.(*IOSSessionEnd); isEnd { + b.ended = true + } + if _, isEnd := message.(*SessionEnd); isEnd { + b.ended = true + } +} + +func (b *builder) handleMessage(message Message, messageID uint64) { + if messageID < b.lastMessageID { + // May happen in case of duplicated messages in kafka (if `idempotence: false`) + return + } + timestamp := GetTimestamp(message) + if timestamp == 0 { + // May happen in case of messages that are single-in-batch, + // e.g. SessionStart or RawErrorEvent (emitted by `integrations`). + + // TODO: make timestamp system transparent; + return + } + if timestamp < b.timestamp { + // Shouldn't happen after messageID check which is done above. TODO: log this case. + return + } + + b.timestamp = timestamp + b.lastSystemTime = time.Now() + for _, p := range b.processors { + if rm := p.Handle(message, messageID, b.timestamp); rm != nil { + b.readyMsgs = append(b.readyMsgs, rm) + } + } + b.checkSessionEnd(message) +} diff --git a/backend/internal/builder/builderMap.go b/backend/internal/builder/builderMap.go new file mode 100644 index 000000000..af2ecf0d5 --- /dev/null +++ b/backend/internal/builder/builderMap.go @@ -0,0 +1,74 @@ +package builder + +import ( + "time" + + "openreplay/backend/internal/handlers" + . "openreplay/backend/pkg/messages" +) + +const FORCE_DELETE_TIMEOUT = 4 * time.Hour + +type builderMap struct { + handlersFabric func() []handlers.MessageProcessor + sessions map[uint64]*builder +} + +func NewBuilderMap(handlersFabric func() []handlers.MessageProcessor) *builderMap { + return &builderMap{ + handlersFabric: handlersFabric, + sessions: make(map[uint64]*builder), + } +} + +func (m *builderMap) GetBuilder(sessionID uint64) *builder { + b := m.sessions[sessionID] + if b == nil { + b = NewBuilder(m.handlersFabric()...) // Should create new instances + m.sessions[sessionID] = b + } + return b +} + +func (m *builderMap) HandleMessage(sessionID uint64, msg Message, messageID uint64) { + b := m.GetBuilder(sessionID) + b.handleMessage(msg, messageID) +} + +func (m *builderMap) iterateSessionReadyMessages(sessionID uint64, b *builder, iter func(msg Message)) { + if b.ended || b.lastSystemTime.Add(FORCE_DELETE_TIMEOUT).Before(time.Now()) { + for _, p := range b.processors { + if rm := p.Build(); rm != nil { + b.readyMsgs = append(b.readyMsgs, rm) + } + } + } + b.iterateReadyMessages(iter) + if b.ended { + delete(m.sessions, sessionID) + } +} + +func (m *builderMap) IterateReadyMessages(iter func(sessionID uint64, msg Message)) { + for sessionID, session := range m.sessions { + m.iterateSessionReadyMessages( + sessionID, + session, + func(msg Message) { + iter(sessionID, msg) + }, + ) + } +} + +func (m *builderMap) IterateSessionReadyMessages(sessionID uint64, iter func(msg Message)) { + session, ok := m.sessions[sessionID] + if !ok { + return + } + m.iterateSessionReadyMessages( + sessionID, + session, + iter, + ) +} diff --git a/backend/internal/config/db/config.go b/backend/internal/config/db/config.go index fb35a199c..e074399dc 100644 --- a/backend/internal/config/db/config.go +++ b/backend/internal/config/db/config.go @@ -10,6 +10,7 @@ type Config struct { ProjectExpirationTimeoutMs int64 LoggerTimeout int GroupDB string + TopicRawWeb string TopicRawIOS string TopicTrigger string CommitBatchTimeout time.Duration @@ -21,6 +22,7 @@ func New() *Config { ProjectExpirationTimeoutMs: 1000 * 60 * 20, LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"), GroupDB: env.String("GROUP_DB"), + TopicRawWeb: env.String("TOPIC_RAW_WEB"), TopicRawIOS: env.String("TOPIC_RAW_IOS"), TopicTrigger: env.String("TOPIC_TRIGGER"), CommitBatchTimeout: 15 * time.Second, diff --git a/backend/internal/config/ender/config.go b/backend/internal/config/ender/config.go new file mode 100644 index 000000000..e39fbc240 --- /dev/null +++ b/backend/internal/config/ender/config.go @@ -0,0 +1,25 @@ +package ender + +import ( + "openreplay/backend/pkg/env" +) + +type Config struct { + GroupEvents string + TopicTrigger string + LoggerTimeout int + TopicRawWeb string + TopicRawIOS string + ProducerTimeout int +} + +func New() *Config { + return &Config{ + GroupEvents: env.String("GROUP_ENDER"), + TopicTrigger: env.String("TOPIC_TRIGGER"), + LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"), + TopicRawWeb: env.String("TOPIC_RAW_WEB"), + TopicRawIOS: env.String("TOPIC_RAW_IOS"), + ProducerTimeout: 2000, + } +} diff --git a/backend/internal/datasaver/stats.go b/backend/internal/datasaver/stats.go index a57d91824..26efe51b5 100644 --- a/backend/internal/datasaver/stats.go +++ b/backend/internal/datasaver/stats.go @@ -5,6 +5,10 @@ import ( . "openreplay/backend/pkg/messages" ) +func (si *Saver) InitStats() { + // noop +} + func (si *Saver) InsertStats(session *Session, msg Message) error { switch m := msg.(type) { // Web @@ -17,3 +21,7 @@ func (si *Saver) InsertStats(session *Session, msg Message) error { } return nil } + +func (si *Saver) CommitStats() error { + return nil +} diff --git a/backend/internal/handlers/custom/customHandler.go b/backend/internal/handlers/custom/customHandler.go new file mode 100644 index 000000000..9b191189e --- /dev/null +++ b/backend/internal/handlers/custom/customHandler.go @@ -0,0 +1,16 @@ +package custom + +import . "openreplay/backend/pkg/messages" + +type CustomHandler struct { + lastTimestamp uint64 +} + +func (h *CustomHandler) Handle(message Message, messageID uint64, timestamp uint64) Message { + h.lastTimestamp = timestamp + return nil +} + +func (h *CustomHandler) Build() Message { + return nil +} diff --git a/backend/internal/handlers/custom/eventMapper.go b/backend/internal/handlers/custom/eventMapper.go new file mode 100644 index 000000000..5d118ff7d --- /dev/null +++ b/backend/internal/handlers/custom/eventMapper.go @@ -0,0 +1,135 @@ +package custom + +import ( + "net/url" + "strings" + + . "openreplay/backend/pkg/messages" +) + +func getURLExtention(URL string) string { + u, err := url.Parse(URL) + if err != nil { + return "" + } + i := strings.LastIndex(u.Path, ".") + return u.Path[i+1:] +} + +func getResourceType(initiator string, URL string) string { + switch initiator { + case "xmlhttprequest", "fetch": + return "fetch" + case "img": + return "img" + default: + switch getURLExtention(URL) { + case "css": + return "stylesheet" + case "js": + return "script" + case "png", "gif", "jpg", "jpeg", "svg": + return "img" + case "mp4", "mkv", "ogg", "webm", "avi", "mp3": + return "media" + default: + return "other" + } + } +} + +type EventMapper struct{} + +func (b *EventMapper) Build() Message { + return nil +} + +func (b *EventMapper) Handle(message Message, messageID uint64, timestamp uint64) Message { + switch msg := message.(type) { + case *RawErrorEvent: + // !!! This won't be handled because the Meta() timestamp emitted by `integrations` will be 0 + // TODO: move to db directly + return &ErrorEvent{ + MessageID: messageID, + Timestamp: msg.Timestamp, + Source: msg.Source, + Name: msg.Name, + Message: msg.Message, + Payload: msg.Payload, + } + case *MouseClick: + if msg.Label != "" { + return &ClickEvent{ + MessageID: messageID, + Label: msg.Label, + HesitationTime: msg.HesitationTime, + Timestamp: timestamp, + Selector: msg.Selector, + } + } + case *JSException: + return &ErrorEvent{ + MessageID: messageID, + Timestamp: timestamp, + Source: "js_exception", + Name: msg.Name, + Message: msg.Message, + Payload: msg.Payload, + } + case *ResourceTiming: + return &ResourceEvent{ + MessageID: messageID, + Timestamp: msg.Timestamp, + Duration: msg.Duration, + TTFB: msg.TTFB, + HeaderSize: msg.HeaderSize, + EncodedBodySize: msg.EncodedBodySize, + DecodedBodySize: msg.DecodedBodySize, + URL: msg.URL, + Type: getResourceType(msg.Initiator, msg.URL), + Success: msg.Duration != 0, + } + case *RawCustomEvent: + return &CustomEvent{ + MessageID: messageID, + Timestamp: timestamp, + Name: msg.Name, + Payload: msg.Payload, + } + case *CustomIssue: + return &IssueEvent{ + Type: "custom", + Timestamp: timestamp, + MessageID: messageID, + ContextString: msg.Name, + Payload: msg.Payload, + } + case *Fetch: + return &FetchEvent{ + MessageID: messageID, + Timestamp: msg.Timestamp, + Method: msg.Method, + URL: msg.URL, + Request: msg.Request, + Response: msg.Response, + Status: msg.Status, + Duration: msg.Duration, + } + case *GraphQL: + return &GraphQLEvent{ + MessageID: messageID, + Timestamp: timestamp, + OperationKind: msg.OperationKind, + OperationName: msg.OperationName, + Variables: msg.Variables, + Response: msg.Response, + } + case *StateAction: + return &StateActionEvent{ + MessageID: messageID, + Timestamp: timestamp, + Type: msg.Type, + } + } + return nil +} diff --git a/backend/internal/handlers/custom/inputEventBuilder.go b/backend/internal/handlers/custom/inputEventBuilder.go new file mode 100644 index 000000000..e07470f37 --- /dev/null +++ b/backend/internal/handlers/custom/inputEventBuilder.go @@ -0,0 +1,77 @@ +package custom + +import ( + . "openreplay/backend/pkg/messages" +) + +const INPUT_EVENT_TIMEOUT = 1 * 60 * 1000 + +type inputLabels map[uint64]string + +type inputEventBuilder struct { + inputEvent *InputEvent + inputLabels inputLabels + inputID uint64 +} + +func NewInputEventBuilder() *inputEventBuilder { + ieBuilder := &inputEventBuilder{} + ieBuilder.clearLabels() + return ieBuilder +} + +func (b *inputEventBuilder) clearLabels() { + b.inputLabels = make(inputLabels) +} + +func (b *inputEventBuilder) Handle(message Message, messageID uint64, timestamp uint64) Message { + var inputEvent Message = nil + switch msg := message.(type) { + case *SetInputTarget: + if b.inputID != msg.ID { + inputEvent = b.Build() + b.inputID = msg.ID + } + b.inputLabels[msg.ID] = msg.Label + return inputEvent + case *SetInputValue: + if b.inputID != msg.ID { + inputEvent = b.Build() + b.inputID = msg.ID + } + if b.inputEvent == nil { + b.inputEvent = &InputEvent{ + MessageID: messageID, + Timestamp: timestamp, + Value: msg.Value, + ValueMasked: msg.Mask > 0, + } + } else { + b.inputEvent.Value = msg.Value + b.inputEvent.ValueMasked = msg.Mask > 0 + } + return inputEvent + case *CreateDocument: + inputEvent = b.Build() + b.clearLabels() + return inputEvent + case *MouseClick: + return b.Build() + } + + if b.inputEvent != nil && b.inputEvent.Timestamp+INPUT_EVENT_TIMEOUT < timestamp { + return b.Build() + } + return nil +} + +func (b *inputEventBuilder) Build() Message { + if b.inputEvent == nil { + return nil + } + inputEvent := b.inputEvent + inputEvent.Label = b.inputLabels[b.inputID] // might be empty string + + b.inputEvent = nil + return inputEvent +} diff --git a/backend/internal/handlers/custom/pageEventBuilder.go b/backend/internal/handlers/custom/pageEventBuilder.go new file mode 100644 index 000000000..d95768983 --- /dev/null +++ b/backend/internal/handlers/custom/pageEventBuilder.go @@ -0,0 +1,106 @@ +package custom + +import ( + . "openreplay/backend/pkg/messages" +) + +const PAGE_EVENT_TIMEOUT = 1 * 60 * 1000 + +type pageEventBuilder struct { + pageEvent *PageEvent + firstTimingHandled bool +} + +func NewPageEventBuilder() *pageEventBuilder { + ieBuilder := &pageEventBuilder{} + return ieBuilder +} + +func (b *pageEventBuilder) Handle(message Message, messageID uint64, timestamp uint64) Message { + switch msg := message.(type) { + case *SetPageLocation: + if msg.NavigationStart == 0 { // routing without new page loading + return &PageEvent{ + URL: msg.URL, + Referrer: msg.Referrer, + Loaded: false, + MessageID: messageID, + Timestamp: timestamp, + } + } else { + pageEvent := b.Build() + b.pageEvent = &PageEvent{ + URL: msg.URL, + Referrer: msg.Referrer, + Loaded: true, + MessageID: messageID, + Timestamp: timestamp, + } + return pageEvent + } + case *PageLoadTiming: + if b.pageEvent == nil { + break + } + if msg.RequestStart <= 30000 { + b.pageEvent.RequestStart = msg.RequestStart + } + if msg.ResponseStart <= 30000 { + b.pageEvent.ResponseStart = msg.ResponseStart + } + if msg.ResponseEnd <= 30000 { + b.pageEvent.ResponseEnd = msg.ResponseEnd + } + if msg.DomContentLoadedEventStart <= 30000 { + b.pageEvent.DomContentLoadedEventStart = msg.DomContentLoadedEventStart + } + if msg.DomContentLoadedEventEnd <= 30000 { + b.pageEvent.DomContentLoadedEventEnd = msg.DomContentLoadedEventEnd + } + if msg.LoadEventStart <= 30000 { + b.pageEvent.LoadEventStart = msg.LoadEventStart + } + if msg.LoadEventEnd <= 30000 { + b.pageEvent.LoadEventEnd = msg.LoadEventEnd + } + if msg.FirstPaint <= 30000 { + b.pageEvent.FirstPaint = msg.FirstPaint + } + if msg.FirstContentfulPaint <= 30000 { + b.pageEvent.FirstContentfulPaint = msg.FirstContentfulPaint + } + return b.buildIfTimingsComplete() + case *PageRenderTiming: + if b.pageEvent == nil { + break + } + b.pageEvent.SpeedIndex = msg.SpeedIndex + b.pageEvent.VisuallyComplete = msg.VisuallyComplete + b.pageEvent.TimeToInteractive = msg.TimeToInteractive + return b.buildIfTimingsComplete() + + } + + if b.pageEvent != nil && b.pageEvent.Timestamp+PAGE_EVENT_TIMEOUT < timestamp { + return b.Build() + } + return nil +} + +func (b *pageEventBuilder) Build() Message { + if b.pageEvent == nil { + return nil + } + pageEvent := b.pageEvent + b.pageEvent = nil + b.firstTimingHandled = false + return pageEvent +} + +func (b *pageEventBuilder) buildIfTimingsComplete() Message { + if b.firstTimingHandled { + return b.Build() + } + b.firstTimingHandled = true + return nil +} diff --git a/backend/internal/handlers/ios/appNotResponding.go b/backend/internal/handlers/ios/appNotResponding.go new file mode 100644 index 000000000..b5f6cd2f0 --- /dev/null +++ b/backend/internal/handlers/ios/appNotResponding.go @@ -0,0 +1,69 @@ +package ios + +import ( + "openreplay/backend/internal/handlers" + . "openreplay/backend/pkg/messages" +) + +/* + Handler name: AppNotResponding + Input events: IOSClickEvent, + IOSInputEvent, + IOSPerformanceEvent, + IOSSessionEnd + Output event: IOSIssueEvent +*/ + +const MIN_TIME_AFTER_LAST_HEARTBEAT = 60 * 1000 + +type AppNotResponding struct { + handlers.ReadyMessageStore + lastLabel string + lastHeartbeatTimestamp uint64 + lastHeartbeatIndex uint64 + lastTimestamp uint64 +} + +func (h *AppNotResponding) Handle(message Message, messageID uint64, timestamp uint64) Message { + h.lastTimestamp = timestamp + var event Message = nil + switch m := message.(type) { + case *IOSClickEvent: + event = h.build(m.Timestamp) + h.lastLabel = m.Label + h.lastHeartbeatTimestamp = m.Timestamp + h.lastHeartbeatIndex = m.Index + case *IOSInputEvent: + event = h.build(m.Timestamp) + h.lastLabel = m.Label + h.lastHeartbeatTimestamp = m.Timestamp + h.lastHeartbeatIndex = m.Index + case *IOSPerformanceEvent: + event = h.build(m.Timestamp) + h.lastHeartbeatTimestamp = m.Timestamp + h.lastHeartbeatIndex = m.Index + case *IOSSessionEnd: + event = h.build(m.Timestamp) + } + return event +} + +func (h *AppNotResponding) Build() Message { + return h.build(h.lastTimestamp) +} + +func (h *AppNotResponding) build(timestamp uint64) Message { + if h.lastHeartbeatTimestamp != 0 && h.lastHeartbeatTimestamp+MIN_TIME_AFTER_LAST_HEARTBEAT <= timestamp { + event := &IOSIssueEvent{ + Type: "anr", + ContextString: h.lastLabel, + Timestamp: h.lastHeartbeatTimestamp, + } + event.Index = h.lastHeartbeatIndex // Associated Index/ MessageID ? + // Reset + h.lastHeartbeatTimestamp = 0 + h.lastHeartbeatIndex = 0 + return event + } + return nil +} diff --git a/backend/internal/heuristics/clickrage.go b/backend/internal/handlers/ios/clickRage.go similarity index 50% rename from backend/internal/heuristics/clickrage.go rename to backend/internal/handlers/ios/clickRage.go index 4d19bf92e..6562e05c1 100644 --- a/backend/internal/heuristics/clickrage.go +++ b/backend/internal/handlers/ios/clickRage.go @@ -1,14 +1,22 @@ -package heuristics +package ios import ( + "openreplay/backend/internal/handlers" + "openreplay/backend/internal/handlers/web" . "openreplay/backend/pkg/messages" ) -const CLICK_TIME_DIFF = 200 -const MIN_CLICKS_IN_A_ROW = 3 +/* + Handler name: ClickRage + Input events: IOSClickEvent, + IOSSessionEnd + Output event: IOSIssueEvent +*/ -type clickrage struct { - readyMessageStore +const CLICK_TIME_DIFF = 200 + +type ClickRageDetector struct { + handlers.ReadyMessageStore lastTimestamp uint64 lastLabel string firstInARawTimestamp uint64 @@ -16,32 +24,16 @@ type clickrage struct { countsInARow int } -func (h *clickrage) build() { - if h.countsInARow >= MIN_CLICKS_IN_A_ROW { - m := &IOSIssueEvent{ - Type: "click_rage", - ContextString: h.lastLabel, - } - m.Timestamp = h.firstInARawTimestamp - m.Index = h.firstInARawSeqIndex // Associated Index/ MessageID ? - h.append(m) - } - h.lastTimestamp = 0 - h.lastLabel = "" - h.firstInARawTimestamp = 0 - h.firstInARawSeqIndex = 0 - h.countsInARow = 0 -} - -func (h *clickrage) HandleMessage(msg Message) { - switch m := msg.(type) { +func (h *ClickRageDetector) Handle(message Message, messageID uint64, timestamp uint64) Message { + var event Message = nil + switch m := message.(type) { case *IOSClickEvent: if h.lastTimestamp+CLICK_TIME_DIFF < m.Timestamp && h.lastLabel == m.Label { h.lastTimestamp = m.Timestamp h.countsInARow += 1 - return + return nil } - h.build() + event = h.Build() if m.Label != "" { h.lastTimestamp = m.Timestamp h.lastLabel = m.Label @@ -50,6 +42,25 @@ func (h *clickrage) HandleMessage(msg Message) { h.countsInARow = 1 } case *IOSSessionEnd: - h.build() + event = h.Build() } + return event +} + +func (h *ClickRageDetector) Build() Message { + if h.countsInARow >= web.MIN_CLICKS_IN_A_ROW { + event := &IOSIssueEvent{ + Type: "click_rage", + ContextString: h.lastLabel, + } + event.Timestamp = h.firstInARawTimestamp + event.Index = h.firstInARawSeqIndex // Associated Index/ MessageID ? + return event + } + h.lastTimestamp = 0 + h.lastLabel = "" + h.firstInARawTimestamp = 0 + h.firstInARawSeqIndex = 0 + h.countsInARow = 0 + return nil } diff --git a/backend/internal/heuristics/performance.go b/backend/internal/handlers/ios/performanceAggregator.go similarity index 68% rename from backend/internal/heuristics/performance.go rename to backend/internal/handlers/ios/performanceAggregator.go index c7494a793..df87298bd 100644 --- a/backend/internal/heuristics/performance.go +++ b/backend/internal/handlers/ios/performanceAggregator.go @@ -1,9 +1,17 @@ -package heuristics +package ios import ( + "openreplay/backend/internal/handlers" . "openreplay/backend/pkg/messages" ) +/* + Handler name: PerformanceAggregator + Input events: IOSPerformanceEvent, + IOSSessionEnd + Output event: IssueEvent +*/ + const AGGR_TIME = 15 * 60 * 1000 type valueAggregator struct { @@ -18,45 +26,29 @@ func (va *valueAggregator) aggregate() uint64 { return uint64(va.sum / va.count) } -type performanceAggregator struct { - readyMessageStore - pa *IOSPerformanceAggregated - fps valueAggregator - cpu valueAggregator - memory valueAggregator - battery valueAggregator +type PerformanceAggregator struct { + handlers.ReadyMessageStore + pa *IOSPerformanceAggregated + fps valueAggregator + cpu valueAggregator + memory valueAggregator + battery valueAggregator + lastTimestamp uint64 } -func (h *performanceAggregator) build(timestamp uint64) { - if h.pa == nil { - return - } - h.pa.TimestampEnd = timestamp - h.pa.AvgFPS = h.fps.aggregate() - h.pa.AvgCPU = h.cpu.aggregate() - h.pa.AvgMemory = h.memory.aggregate() - h.pa.AvgBattery = h.battery.aggregate() - - h.append(h.pa) - - h.pa = &IOSPerformanceAggregated{} - for _, agg := range []valueAggregator{h.fps, h.cpu, h.memory, h.battery} { - agg.sum = 0 - agg.count = 0 - } -} - -func (h *performanceAggregator) HandleMessage(msg Message) { +func (h *PerformanceAggregator) Handle(message Message, messageID uint64, timestamp uint64) Message { + h.lastTimestamp = timestamp if h.pa == nil { h.pa = &IOSPerformanceAggregated{} // TODO: struct type in messages } - switch m := msg.(type) { // TODO: All Timestampe messages + var event Message = nil + switch m := message.(type) { // TODO: All Timestamp messages case *IOSPerformanceEvent: if h.pa.TimestampStart == 0 { h.pa.TimestampStart = m.Timestamp } if h.pa.TimestampStart+AGGR_TIME <= m.Timestamp { - h.build(m.Timestamp) + event = h.Build() } switch m.Name { case "fps": @@ -97,6 +89,28 @@ func (h *performanceAggregator) HandleMessage(msg Message) { } } case *IOSSessionEnd: - h.build(m.Timestamp) + event = h.Build() } + return event +} + +func (h *PerformanceAggregator) Build() Message { + if h.pa == nil { + return nil + } + + h.pa.TimestampEnd = h.lastTimestamp + h.pa.AvgFPS = h.fps.aggregate() + h.pa.AvgCPU = h.cpu.aggregate() + h.pa.AvgMemory = h.memory.aggregate() + h.pa.AvgBattery = h.battery.aggregate() + + event := h.pa + + h.pa = &IOSPerformanceAggregated{} + for _, agg := range []valueAggregator{h.fps, h.cpu, h.memory, h.battery} { + agg.sum = 0 + agg.count = 0 + } + return event } diff --git a/backend/internal/handlers/messageProcessor.go b/backend/internal/handlers/messageProcessor.go new file mode 100644 index 000000000..c4235c18b --- /dev/null +++ b/backend/internal/handlers/messageProcessor.go @@ -0,0 +1,11 @@ +package handlers + +import . "openreplay/backend/pkg/messages" + +// Heuristic interface - common interface for user's realisations +// U can create your own message handler and easily connect to heuristics service + +type MessageProcessor interface { + Handle(message Message, messageID uint64, timestamp uint64) Message + Build() Message +} diff --git a/backend/internal/heuristics/readyMessageStore.go b/backend/internal/handlers/readyMessageStore.go similarity index 51% rename from backend/internal/heuristics/readyMessageStore.go rename to backend/internal/handlers/readyMessageStore.go index bbe77585d..c0c386571 100644 --- a/backend/internal/heuristics/readyMessageStore.go +++ b/backend/internal/handlers/readyMessageStore.go @@ -1,18 +1,18 @@ -package heuristics +package handlers import ( . "openreplay/backend/pkg/messages" ) -type readyMessageStore struct { +type ReadyMessageStore struct { store []Message } -func (s *readyMessageStore) append(msg Message) { +func (s *ReadyMessageStore) Append(msg Message) { s.store = append(s.store, msg) } -func (s *readyMessageStore) IterateReadyMessages(cb func(msg Message)) { +func (s *ReadyMessageStore) IterateReadyMessages(cb func(msg Message)) { for _, msg := range s.store { cb(msg) } diff --git a/backend/internal/handlers/web/clickRage.go b/backend/internal/handlers/web/clickRage.go new file mode 100644 index 000000000..e22eb6454 --- /dev/null +++ b/backend/internal/handlers/web/clickRage.go @@ -0,0 +1,75 @@ +package web + +import ( + "encoding/json" + "log" + + . "openreplay/backend/pkg/messages" +) + +/* + Handler name: ClickRage + Input event: MouseClick + Output event: IssueEvent +*/ + +const MAX_TIME_DIFF = 300 +const MIN_CLICKS_IN_A_ROW = 3 + +type ClickRageDetector struct { + lastTimestamp uint64 + lastLabel string + firstInARawTimestamp uint64 + firstInARawMessageId uint64 + countsInARow int +} + +func (crd *ClickRageDetector) reset() { + crd.lastTimestamp = 0 + crd.lastLabel = "" + crd.firstInARawTimestamp = 0 + crd.firstInARawMessageId = 0 + crd.countsInARow = 0 +} + +func (crd *ClickRageDetector) Build() Message { + defer crd.reset() + if crd.countsInARow >= MIN_CLICKS_IN_A_ROW { + payload, err := json.Marshal(struct{ Count int }{crd.countsInARow}) + if err != nil { + log.Printf("can't marshal ClickRage payload to json: %s", err) + } + event := &IssueEvent{ + Type: "click_rage", + ContextString: crd.lastLabel, + Payload: string(payload), + Timestamp: crd.firstInARawTimestamp, + MessageID: crd.firstInARawMessageId, + } + return event + } + return nil +} + +func (crd *ClickRageDetector) Handle(message Message, messageID uint64, timestamp uint64) Message { + switch msg := message.(type) { + case *MouseClick: + // TODO: check if we it is ok to capture clickRage event without the connected ClickEvent in db. + if msg.Label == "" { + return crd.Build() + } + if crd.lastLabel == msg.Label && timestamp-crd.lastTimestamp < MAX_TIME_DIFF { + crd.lastTimestamp = timestamp + crd.countsInARow += 1 + return nil + } + event := crd.Build() + crd.lastTimestamp = timestamp + crd.lastLabel = msg.Label + crd.firstInARawTimestamp = timestamp + crd.firstInARawMessageId = messageID + crd.countsInARow = 1 + return event + } + return nil +} diff --git a/backend/internal/handlers/web/cpuIssue.go b/backend/internal/handlers/web/cpuIssue.go new file mode 100644 index 000000000..56f483e8b --- /dev/null +++ b/backend/internal/handlers/web/cpuIssue.go @@ -0,0 +1,93 @@ +package web + +import ( + "encoding/json" + "log" + + . "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/messages/performance" +) + +/* + Handler name: CpuIssue + Input events: PerformanceTrack, + SetPageLocation + Output event: IssueEvent +*/ + +const CPU_THRESHOLD = 70 // % out of 100 +const CPU_MIN_DURATION_TRIGGER = 6 * 1000 + +type CpuIssueDetector struct { + startTimestamp uint64 + startMessageID uint64 + lastTimestamp uint64 + maxRate uint64 + contextString string +} + +func (f *CpuIssueDetector) Build() Message { + if f.startTimestamp == 0 { + return nil + } + duration := f.lastTimestamp - f.startTimestamp + timestamp := f.startTimestamp + messageID := f.startMessageID + maxRate := f.maxRate + + f.startTimestamp = 0 + f.startMessageID = 0 + f.maxRate = 0 + if duration < CPU_MIN_DURATION_TRIGGER { + return nil + } + + payload, err := json.Marshal(struct { + Duration uint64 + Rate uint64 + }{duration, maxRate}) + if err != nil { + log.Printf("can't marshal CpuIssue payload to json: %s", err) + } + + return &IssueEvent{ + Type: "cpu", + Timestamp: timestamp, + MessageID: messageID, + ContextString: f.contextString, + Payload: string(payload), + } +} + +func (f *CpuIssueDetector) Handle(message Message, messageID uint64, timestamp uint64) Message { + switch msg := message.(type) { + case *PerformanceTrack: + dt := performance.TimeDiff(timestamp, f.lastTimestamp) + if dt == 0 { + return nil // TODO: handle error + } + + f.lastTimestamp = timestamp + + if msg.Frames == -1 || msg.Ticks == -1 { + return f.Build() + } + + cpuRate := performance.CPURate(msg.Ticks, dt) + + if cpuRate >= CPU_THRESHOLD { + if f.startTimestamp == 0 { + f.startTimestamp = timestamp + f.startMessageID = messageID + } + if f.maxRate < cpuRate { + f.maxRate = cpuRate + } + } else { + return f.Build() + } + case *SetPageLocation: + f.contextString = msg.URL + } + return nil +} diff --git a/backend/internal/handlers/web/deadClick.go b/backend/internal/handlers/web/deadClick.go new file mode 100644 index 000000000..6377b074e --- /dev/null +++ b/backend/internal/handlers/web/deadClick.go @@ -0,0 +1,93 @@ +package web + +import ( + . "openreplay/backend/pkg/messages" +) + +/* + Handler name: DeadClick + Input events: SetInputTarget, + CreateDocument, + MouseClick, + SetNodeAttribute, + RemoveNodeAttribute, + CreateElementNode, + CreateTextNode, + MoveNode, + RemoveNode, + SetCSSData, + CSSInsertRule, + CSSDeleteRule + Output event: IssueEvent +*/ + +const CLICK_RELATION_TIME = 1400 + +type DeadClickDetector struct { + lastTimestamp uint64 + lastMouseClick *MouseClick + lastClickTimestamp uint64 + lastMessageID uint64 + inputIDSet map[uint64]bool +} + +func (d *DeadClickDetector) reset() { + d.inputIDSet = nil + d.lastMouseClick = nil + d.lastClickTimestamp = 0 + d.lastMessageID = 0 +} + +func (d *DeadClickDetector) build(timestamp uint64) Message { + defer d.reset() + if d.lastMouseClick == nil || d.lastClickTimestamp+CLICK_RELATION_TIME > timestamp { // reaction is instant + return nil + } + event := &IssueEvent{ + Type: "dead_click", + ContextString: d.lastMouseClick.Label, + Timestamp: d.lastClickTimestamp, + MessageID: d.lastMessageID, + } + return event +} + +func (d *DeadClickDetector) Build() Message { + return d.build(d.lastTimestamp) +} + +func (d *DeadClickDetector) Handle(message Message, messageID uint64, timestamp uint64) Message { + d.lastTimestamp = timestamp + switch msg := message.(type) { + case *SetInputTarget: + if d.inputIDSet == nil { + d.inputIDSet = make(map[uint64]bool) + } + d.inputIDSet[msg.ID] = true + case *CreateDocument: + d.inputIDSet = nil + case *MouseClick: + if msg.Label == "" { + return nil + } + event := d.build(timestamp) + if d.inputIDSet[msg.ID] { // ignore if input + return event + } + d.lastMouseClick = msg + d.lastClickTimestamp = timestamp + d.lastMessageID = messageID + return event + case *SetNodeAttribute, + *RemoveNodeAttribute, + *CreateElementNode, + *CreateTextNode, + *MoveNode, + *RemoveNode, + *SetCSSData, + *CSSInsertRule, + *CSSDeleteRule: + return d.build(timestamp) + } + return nil +} diff --git a/backend/internal/handlers/web/domDrop.go b/backend/internal/handlers/web/domDrop.go new file mode 100644 index 000000000..4a3ec2065 --- /dev/null +++ b/backend/internal/handlers/web/domDrop.go @@ -0,0 +1,55 @@ +package web + +import ( + . "openreplay/backend/pkg/messages" +) + +/* + Handler name: DomDrop + Input events: CreateElementNode, + CreateTextNode, + RemoveNode + Output event: DOMDrop +*/ + +const DROP_WINDOW = 200 //ms +const CRITICAL_COUNT = 1 // Our login page contains 20. But on crush it removes only roots (1-3 nodes). +// TODO: smart detection (making whole DOM tree would eat all memory) + +type domDropDetector struct { + removedCount int + lastDropTimestamp uint64 +} + +func (dd *domDropDetector) reset() { + dd.removedCount = 0 + dd.lastDropTimestamp = 0 +} + +func (dd *domDropDetector) Handle(message Message, _ uint64, timestamp uint64) Message { + switch message.(type) { + case *CreateElementNode, + *CreateTextNode: + dd.removedCount = 0 + dd.lastDropTimestamp = 0 + case *RemoveNode: + if dd.lastDropTimestamp+DROP_WINDOW > timestamp { + dd.removedCount += 1 + } else { + dd.removedCount = 1 + } + dd.lastDropTimestamp = timestamp + } + return nil +} + +func (dd *domDropDetector) Build() Message { + defer dd.reset() + if dd.removedCount >= CRITICAL_COUNT { + domDrop := &DOMDrop{ + Timestamp: dd.lastDropTimestamp, + } + return domDrop + } + return nil +} diff --git a/backend/internal/handlers/web/memoryIssue.go b/backend/internal/handlers/web/memoryIssue.go new file mode 100644 index 000000000..487c396a9 --- /dev/null +++ b/backend/internal/handlers/web/memoryIssue.go @@ -0,0 +1,85 @@ +package web + +import ( + "encoding/json" + "log" + "math" + + . "openreplay/backend/pkg/messages" +) + +/* + Handler name: MemoryIssue + Input events: PerformanceTrack, + SetPageLocation + Output event: IssueEvent +*/ + +const MIN_COUNT = 3 +const MEM_RATE_THRESHOLD = 300 // % to average + +type MemoryIssueDetector struct { + startMessageID uint64 + startTimestamp uint64 + rate int + count float64 + sum float64 + contextString string +} + +func (f *MemoryIssueDetector) reset() { + f.startTimestamp = 0 + f.startMessageID = 0 + f.rate = 0 +} + +func (f *MemoryIssueDetector) Build() Message { + if f.startTimestamp == 0 { + return nil + } + payload, err := json.Marshal(struct{ Rate int }{f.rate - 100}) + if err != nil { + log.Printf("can't marshal MemoryIssue payload to json: %s", err) + } + event := &IssueEvent{ + Type: "memory", + Timestamp: f.startTimestamp, + MessageID: f.startMessageID, + ContextString: f.contextString, + Payload: string(payload), + } + f.reset() + return event +} + +func (f *MemoryIssueDetector) Handle(message Message, messageID uint64, timestamp uint64) Message { + switch msg := message.(type) { + case *PerformanceTrack: + if f.count < MIN_COUNT { + f.sum += float64(msg.UsedJSHeapSize) + f.count++ + return nil + } + + average := f.sum / f.count + rate := int(math.Round(float64(msg.UsedJSHeapSize) / average * 100)) + + f.sum += float64(msg.UsedJSHeapSize) + f.count++ + + if rate >= MEM_RATE_THRESHOLD { + if f.startTimestamp == 0 { + f.startTimestamp = timestamp + f.startMessageID = messageID + } + if f.rate < rate { + f.rate = rate + } + } else { + return f.Build() + } + case *SetPageLocation: + f.contextString = msg.URL + } + return nil +} diff --git a/backend/internal/handlers/web/networkIssue.go b/backend/internal/handlers/web/networkIssue.go new file mode 100644 index 000000000..ed51351e5 --- /dev/null +++ b/backend/internal/handlers/web/networkIssue.go @@ -0,0 +1,47 @@ +package web + +import ( + . "openreplay/backend/pkg/messages" +) + +/* + Handler name: NetworkIssue + Input events: ResourceTiming, + Fetch + Output event: IssueEvent +*/ + +type NetworkIssueDetector struct{} + +func (f *NetworkIssueDetector) Build() Message { + return nil +} + +func (f *NetworkIssueDetector) Handle(message Message, messageID uint64, timestamp uint64) Message { + switch msg := message.(type) { + case *ResourceTiming: + success := msg.Duration != 0 // The only available way here + if !success { + issueType := "missing_resource" + if msg.Initiator == "fetch" || msg.Initiator == "xmlhttprequest" { + issueType = "bad_request" + } + return &IssueEvent{ + Type: issueType, + MessageID: messageID, + Timestamp: msg.Timestamp, + ContextString: msg.URL, + } + } + case *Fetch: + if msg.Status >= 400 { + return &IssueEvent{ + Type: "bad_request", + MessageID: messageID, + Timestamp: msg.Timestamp, + ContextString: msg.URL, + } + } + } + return nil +} diff --git a/backend/internal/handlers/web/performanceAggregator.go b/backend/internal/handlers/web/performanceAggregator.go new file mode 100644 index 000000000..928cedeb9 --- /dev/null +++ b/backend/internal/handlers/web/performanceAggregator.go @@ -0,0 +1,117 @@ +package web + +import ( + "math" + + . "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/messages/performance" +) + +/* + Handler name: PerformanceAggregator + Input event: PerformanceTrack + Output event: PerformanceTrackAggr +*/ + +const AGGREGATION_WINDOW = 2 * 60 * 1000 + +type PerformanceAggregator struct { + *PerformanceTrackAggr + lastTimestamp uint64 + count float64 + sumFrameRate float64 + sumTickRate float64 + sumTotalJSHeapSize float64 + sumUsedJSHeapSize float64 +} + +func (b *PerformanceAggregator) start(timestamp uint64) { + b.PerformanceTrackAggr = &PerformanceTrackAggr{ + TimestampStart: timestamp, + } + b.lastTimestamp = timestamp +} + +func (b *PerformanceAggregator) reset() { + b.PerformanceTrackAggr = nil + b.count = 0 + b.sumFrameRate = 0 + b.sumTickRate = 0 + b.sumTotalJSHeapSize = 0 + b.sumUsedJSHeapSize = 0 + b.lastTimestamp = 0 +} + +func (b *PerformanceAggregator) Handle(message Message, _ uint64, timestamp uint64) Message { + switch msg := message.(type) { + case *PerformanceTrack: + if b.PerformanceTrackAggr == nil || msg.Frames == -1 || msg.Ticks == -1 { + pta := b.Build() + b.start(timestamp) + return pta + } + + dt := performance.TimeDiff(timestamp, b.lastTimestamp) + if dt == 0 { + return nil // shouldn't happen + } + + frameRate := performance.FrameRate(msg.Frames, dt) + tickRate := performance.TickRate(msg.Ticks, dt) + + fps := uint64(math.Round(frameRate)) + cpu := performance.CPURateFromTickRate(tickRate) + if fps < b.MinFPS || b.MinFPS == 0 { + b.MinFPS = fps + } + if fps > b.MaxFPS { + b.MaxFPS = fps + } + if cpu < b.MinCPU || b.MinCPU == 0 { + b.MinCPU = cpu + } + if cpu > b.MaxCPU { + b.MaxCPU = cpu + } + if msg.TotalJSHeapSize < b.MinTotalJSHeapSize || b.MinTotalJSHeapSize == 0 { + b.MinTotalJSHeapSize = msg.TotalJSHeapSize + } + if msg.TotalJSHeapSize > b.MaxTotalJSHeapSize { + b.MaxTotalJSHeapSize = msg.TotalJSHeapSize + } + if msg.UsedJSHeapSize < b.MinUsedJSHeapSize || b.MinUsedJSHeapSize == 0 { + b.MinUsedJSHeapSize = msg.UsedJSHeapSize + } + if msg.UsedJSHeapSize > b.MaxUsedJSHeapSize { + b.MaxUsedJSHeapSize = msg.UsedJSHeapSize + } + b.sumFrameRate += frameRate + b.sumTickRate += tickRate + b.sumTotalJSHeapSize += float64(msg.TotalJSHeapSize) + b.sumUsedJSHeapSize += float64(msg.UsedJSHeapSize) + b.count += 1 + b.lastTimestamp = timestamp + } + if b.PerformanceTrackAggr != nil && + timestamp-b.PerformanceTrackAggr.TimestampStart >= AGGREGATION_WINDOW { + return b.Build() + } + return nil +} + +func (b *PerformanceAggregator) Build() Message { + if b.PerformanceTrackAggr == nil { + return nil + } + if b.count != 0 && b.PerformanceTrackAggr.TimestampStart < b.lastTimestamp { // the last one shouldn't happen + b.PerformanceTrackAggr.TimestampEnd = b.lastTimestamp + b.PerformanceTrackAggr.AvgFPS = uint64(math.Round(b.sumFrameRate / b.count)) + b.PerformanceTrackAggr.AvgCPU = 100 - uint64(math.Round(b.sumTickRate*100/b.count)) + b.PerformanceTrackAggr.AvgTotalJSHeapSize = uint64(math.Round(b.sumTotalJSHeapSize / b.count)) + b.PerformanceTrackAggr.AvgUsedJSHeapSize = uint64(math.Round(b.sumUsedJSHeapSize / b.count)) + b.reset() + return b.PerformanceTrackAggr + } + b.reset() + return nil +} diff --git a/backend/internal/heuristics/anr.go b/backend/internal/heuristics/anr.go deleted file mode 100644 index 7cec8fc97..000000000 --- a/backend/internal/heuristics/anr.go +++ /dev/null @@ -1,49 +0,0 @@ -package heuristics - -import ( - . "openreplay/backend/pkg/messages" -) - -const MIN_TIME_AFTER_LAST_HEARTBEAT = 60 * 1000 - -type anr struct { - readyMessageStore - lastLabel string - lastHeartbeatTimestamp uint64 - lastHeartbeatIndex uint64 -} - -func (h *anr) buildIf(timestamp uint64) { - if h.lastHeartbeatTimestamp != 0 && h.lastHeartbeatTimestamp+MIN_TIME_AFTER_LAST_HEARTBEAT <= timestamp { - m := &IOSIssueEvent{ - Type: "anr", - ContextString: h.lastLabel, - } - m.Timestamp = h.lastHeartbeatTimestamp - m.Index = h.lastHeartbeatIndex // Associated Index/ MessageID ? - h.append(m) - h.lastHeartbeatTimestamp = 0 - h.lastHeartbeatIndex = 0 - } -} - -func (h *anr) HandleMessage(msg Message) { - switch m := msg.(type) { - case *IOSClickEvent: - h.buildIf(m.Timestamp) - h.lastLabel = m.Label - h.lastHeartbeatTimestamp = m.Timestamp - h.lastHeartbeatIndex = m.Index - case *IOSInputEvent: - h.buildIf(m.Timestamp) - h.lastLabel = m.Label - h.lastHeartbeatTimestamp = m.Timestamp - h.lastHeartbeatIndex = m.Index - case *IOSPerformanceEvent: - h.buildIf(m.Timestamp) - h.lastHeartbeatTimestamp = m.Timestamp - h.lastHeartbeatIndex = m.Index - case *IOSSessionEnd: - h.buildIf(m.Timestamp) - } -} diff --git a/backend/internal/heuristics/heuristics.go b/backend/internal/heuristics/heuristics.go deleted file mode 100644 index 677574951..000000000 --- a/backend/internal/heuristics/heuristics.go +++ /dev/null @@ -1,64 +0,0 @@ -package heuristics - -import ( - . "openreplay/backend/pkg/db/types" - . "openreplay/backend/pkg/messages" -) - -type MessageHandler interface { - HandleMessage(Message) -} -type ReadyMessagesIterator interface { - IterateReadyMessages(func(Message)) -} - -type Handler interface { - MessageHandler - ReadyMessagesIterator -} - -type mainHandler map[uint64]*sessHandler - -func NewHandler() mainHandler { - return make(mainHandler) -} - -func (m mainHandler) getSessHandler(session *Session) *sessHandler { - if session == nil { - //AAAA - return nil - } - s := m[session.SessionID] - if s == nil { - s = newSessHandler(session) - m[session.SessionID] = s - } - return s -} - -func (m mainHandler) HandleMessage(session *Session, msg Message) { - s := m.getSessHandler(session) - s.HandleMessage(msg) -} - -func (m mainHandler) IterateSessionReadyMessages(sessionID uint64, iter func(msg Message)) { - s, ok := m[sessionID] - if !ok { - return - } - s.IterateReadyMessages(iter) - if s.IsEnded() { - delete(m, sessionID) - } -} - -func (m mainHandler) IterateReadyMessages(iter func(sessionID uint64, msg Message)) { - for sessionID, s := range m { - s.IterateReadyMessages(func(msg Message) { - iter(sessionID, msg) - }) - if s.IsEnded() { - delete(m, sessionID) - } - } -} diff --git a/backend/internal/heuristics/session.go b/backend/internal/heuristics/session.go deleted file mode 100644 index 3946bf918..000000000 --- a/backend/internal/heuristics/session.go +++ /dev/null @@ -1,45 +0,0 @@ -package heuristics - -import ( - . "openreplay/backend/pkg/db/types" - . "openreplay/backend/pkg/messages" -) - -type sessHandler struct { - session *Session - handlers []Handler - ended bool -} - -func newSessHandler(session *Session) *sessHandler { - return &sessHandler{ - session: session, - handlers: []Handler{ - new(clickrage), - new(performanceAggregator), - new(anr), - }, - } -} - -func (s *sessHandler) HandleMessage(msg Message) { - for _, h := range s.handlers { - h.HandleMessage(msg) - } - if _, isEnd := msg.(*IOSSessionEnd); isEnd { - s.ended = true - } - if _, isEnd := msg.(*SessionEnd); isEnd { - s.ended = true - } -} - -func (s *sessHandler) IterateReadyMessages(cb func(msg Message)) { - for _, h := range s.handlers { - h.IterateReadyMessages(cb) - } -} - -func (s *sessHandler) IsEnded() bool { - return s.ended -} diff --git a/backend/internal/sessionender/ender.go b/backend/internal/sessionender/ender.go new file mode 100644 index 000000000..54bd399ac --- /dev/null +++ b/backend/internal/sessionender/ender.go @@ -0,0 +1,62 @@ +package sessionender + +import ( + "log" + "time" +) + +// EndedSessionHandler handler for ended sessions +type EndedSessionHandler func(sessionID uint64, timestamp int64) bool + +// session holds information about user's session live status +type session struct { + lastTimestamp int64 + isEnded bool +} + +// SessionEnder updates timestamp of last message for each session +type SessionEnder struct { + timeout int64 + sessions map[uint64]*session // map[sessionID]session +} + +func New(timeout int64) *SessionEnder { + return &SessionEnder{ + timeout: timeout, + sessions: make(map[uint64]*session), + } +} + +// UpdateSession save timestamp for new sessions and update for existing sessions +func (se *SessionEnder) UpdateSession(sessionID, timestamp uint64) { + currTS := int64(timestamp) + if currTS == 0 { + log.Printf("got empty timestamp for sessionID: %d", sessionID) + return + } + sess, ok := se.sessions[sessionID] + if !ok { + se.sessions[sessionID] = &session{ + lastTimestamp: currTS, + isEnded: false, + } + return + } + if currTS > sess.lastTimestamp { + sess.lastTimestamp = currTS + sess.isEnded = false + } +} + +// HandleEndedSessions runs handler for each ended session and delete information about session in successful case +func (se *SessionEnder) HandleEndedSessions(handler EndedSessionHandler) { + deadLine := time.Now().UnixMilli() - se.timeout + for sessID, sess := range se.sessions { + if sess.isEnded || sess.lastTimestamp < deadLine { + sess.isEnded = true + if handler(sessID, sess.lastTimestamp) { + delete(se.sessions, sessID) + } + } + } +} diff --git a/backend/pkg/intervals/intervals.go b/backend/pkg/intervals/intervals.go index c4dfbc835..226d79d35 100644 --- a/backend/pkg/intervals/intervals.go +++ b/backend/pkg/intervals/intervals.go @@ -1,11 +1,8 @@ package intervals -const EVENTS_COMMIT_INTERVAL = 30 * 1000 -const HEARTBEAT_INTERVAL = 2 * 60 * 1000 -const INTEGRATIONS_REQUEST_INTERVAL = 1 * 60 * 1000 -const EVENTS_PAGE_EVENT_TIMEOUT = 2 * 60 * 1000 -const EVENTS_INPUT_EVENT_TIMEOUT = 2 * 60 * 1000 -const EVENTS_PERFORMANCE_AGGREGATION_TIMEOUT = 2 * 60 * 1000 +const EVENTS_COMMIT_INTERVAL = 30 * 1000 // как часто комитим сообщения в кафке (ender) +const HEARTBEAT_INTERVAL = 2 * 60 * 1000 // максимальный таймаут от трекера в рамках сессии +const INTEGRATIONS_REQUEST_INTERVAL = 1 * 60 * 1000 // интеграции const EVENTS_SESSION_END_TIMEOUT = HEARTBEAT_INTERVAL + 30*1000 const EVENTS_SESSION_END_TIMEOUT_WITH_INTEGRATIONS = HEARTBEAT_INTERVAL + 3*60*1000 -const EVENTS_BACK_COMMIT_GAP = EVENTS_SESSION_END_TIMEOUT_WITH_INTEGRATIONS + 1*60*1000 +const EVENTS_BACK_COMMIT_GAP = EVENTS_SESSION_END_TIMEOUT_WITH_INTEGRATIONS + 1*60*1000 // для бэк коммита diff --git a/backend/pkg/messages/messages.go b/backend/pkg/messages/messages.go index 82a0f4f97..ba10eb026 100644 --- a/backend/pkg/messages/messages.go +++ b/backend/pkg/messages/messages.go @@ -1396,7 +1396,7 @@ type IssueEvent struct { Type string ContextString string Context string - Payload string + Payload string // TODO: check, maybe it's better to use empty interface here } func (msg *IssueEvent) Encode() []byte { diff --git a/backend/services/ender/build_hack b/backend/services/ender/build_hack new file mode 100644 index 000000000..e69de29bb diff --git a/backend/services/ender/builder/builder.go b/backend/services/ender/builder/builder.go deleted file mode 100644 index 1a89f67b6..000000000 --- a/backend/services/ender/builder/builder.go +++ /dev/null @@ -1,335 +0,0 @@ -package builder - -import ( - "net/url" - "strings" - "time" - - "openreplay/backend/pkg/intervals" - . "openreplay/backend/pkg/messages" -) - -func getURLExtention(URL string) string { - u, err := url.Parse(URL) - if err != nil { - return "" - } - i := strings.LastIndex(u.Path, ".") - return u.Path[i+1:] -} - -func getResourceType(initiator string, URL string) string { - switch initiator { - case "xmlhttprequest", "fetch": - return "fetch" - case "img": - return "img" - default: - switch getURLExtention(URL) { - case "css": - return "stylesheet" - case "js": - return "script" - case "png", "gif", "jpg", "jpeg", "svg": - return "img" - case "mp4", "mkv", "ogg", "webm", "avi", "mp3": - return "media" - default: - return "other" - } - } -} - -type builder struct { - readyMsgs []Message - timestamp uint64 - lastProcessedTimestamp int64 - peBuilder *pageEventBuilder - ptaBuilder *performanceTrackAggrBuilder - ieBuilder *inputEventBuilder - ciFinder *cpuIssueFinder - miFinder *memoryIssueFinder - ddDetector *domDropDetector - crDetector *clickRageDetector - dcDetector *deadClickDetector - integrationsWaiting bool - - sid uint64 -} - -func NewBuilder() *builder { - return &builder{ - peBuilder: &pageEventBuilder{}, - ptaBuilder: &performanceTrackAggrBuilder{}, - ieBuilder: NewInputEventBuilder(), - ciFinder: &cpuIssueFinder{}, - miFinder: &memoryIssueFinder{}, - ddDetector: &domDropDetector{}, - crDetector: &clickRageDetector{}, - dcDetector: &deadClickDetector{}, - integrationsWaiting: true, - } -} - -func (b *builder) appendReadyMessage(msg Message) { // interface is never nil even if it holds nil value - b.readyMsgs = append(b.readyMsgs, msg) -} - -func (b *builder) iterateReadyMessage(iter func(msg Message)) { - for _, readyMsg := range b.readyMsgs { - iter(readyMsg) - } - b.readyMsgs = nil -} - -func (b *builder) buildSessionEnd() { - if b.timestamp == 0 { - return - } - sessionEnd := &SessionEnd{ - Timestamp: b.timestamp, // + delay? - } - b.appendReadyMessage(sessionEnd) -} - -func (b *builder) buildPageEvent() { - if msg := b.peBuilder.Build(); msg != nil { - b.appendReadyMessage(msg) - } -} -func (b *builder) buildPerformanceTrackAggr() { - if msg := b.ptaBuilder.Build(); msg != nil { - b.appendReadyMessage(msg) - } -} -func (b *builder) buildInputEvent() { - if msg := b.ieBuilder.Build(); msg != nil { - b.appendReadyMessage(msg) - } -} - -func (b *builder) handleMessage(message Message, messageID uint64) { - timestamp := GetTimestamp(message) - if b.timestamp < timestamp { // unnecessary? TODO: test and remove - b.timestamp = timestamp - } - - b.lastProcessedTimestamp = time.Now().UnixMilli() - - // Might happen before the first timestamp. - switch msg := message.(type) { - case *SessionStart, - *Metadata, - *UserID, - *UserAnonymousID: - b.appendReadyMessage(msg) - case *RawErrorEvent: - b.appendReadyMessage(&ErrorEvent{ - MessageID: messageID, - Timestamp: msg.Timestamp, - Source: msg.Source, - Name: msg.Name, - Message: msg.Message, - Payload: msg.Payload, - }) - } - if b.timestamp == 0 { - return - } - switch msg := message.(type) { - case *SetPageLocation: - if msg.NavigationStart == 0 { - b.appendReadyMessage(&PageEvent{ - URL: msg.URL, - Referrer: msg.Referrer, - Loaded: false, - MessageID: messageID, - Timestamp: b.timestamp, - }) - } else { - b.buildPageEvent() - b.buildInputEvent() - b.ieBuilder.ClearLabels() - b.peBuilder.HandleSetPageLocation(msg, messageID, b.timestamp) - b.miFinder.HandleSetPageLocation(msg) - b.ciFinder.HandleSetPageLocation(msg) - } - case *PageLoadTiming: - if rm := b.peBuilder.HandlePageLoadTiming(msg); rm != nil { - b.appendReadyMessage(rm) - } - case *PageRenderTiming: - if rm := b.peBuilder.HandlePageRenderTiming(msg); rm != nil { - b.appendReadyMessage(rm) - } - case *PerformanceTrack: - if rm := b.ptaBuilder.HandlePerformanceTrack(msg, b.timestamp); rm != nil { - b.appendReadyMessage(rm) - } - if rm := b.ciFinder.HandlePerformanceTrack(msg, messageID, b.timestamp); rm != nil { - b.appendReadyMessage(rm) - } - if rm := b.miFinder.HandlePerformanceTrack(msg, messageID, b.timestamp); rm != nil { - b.appendReadyMessage(rm) - } - case *SetInputTarget: - if rm := b.ieBuilder.HandleSetInputTarget(msg); rm != nil { - b.appendReadyMessage(rm) - } - case *SetInputValue: - if rm := b.ieBuilder.HandleSetInputValue(msg, messageID, b.timestamp); rm != nil { - b.appendReadyMessage(rm) - } - case *MouseClick: - b.buildInputEvent() - if rm := b.crDetector.HandleMouseClick(msg, messageID, b.timestamp); rm != nil { - b.appendReadyMessage(rm) - } - if msg.Label != "" { - b.appendReadyMessage(&ClickEvent{ - MessageID: messageID, - Label: msg.Label, - HesitationTime: msg.HesitationTime, - Timestamp: b.timestamp, - Selector: msg.Selector, - }) - } - case *JSException: - b.appendReadyMessage(&ErrorEvent{ - MessageID: messageID, - Timestamp: b.timestamp, - Source: "js_exception", - Name: msg.Name, - Message: msg.Message, - Payload: msg.Payload, - }) - case *ResourceTiming: - tp := getResourceType(msg.Initiator, msg.URL) - success := msg.Duration != 0 - b.appendReadyMessage(&ResourceEvent{ - MessageID: messageID, - Timestamp: msg.Timestamp, - Duration: msg.Duration, - TTFB: msg.TTFB, - HeaderSize: msg.HeaderSize, - EncodedBodySize: msg.EncodedBodySize, - DecodedBodySize: msg.DecodedBodySize, - URL: msg.URL, - Type: tp, - Success: success, - }) - if !success { - issueType := "missing_resource" - if tp == "fetch" { - issueType = "bad_request" - } - b.appendReadyMessage(&IssueEvent{ - Type: issueType, - MessageID: messageID, - Timestamp: msg.Timestamp, - ContextString: msg.URL, - }) - } - case *RawCustomEvent: - b.appendReadyMessage(&CustomEvent{ - MessageID: messageID, - Timestamp: b.timestamp, - Name: msg.Name, - Payload: msg.Payload, - }) - case *CustomIssue: - b.appendReadyMessage(&IssueEvent{ - Type: "custom", - Timestamp: b.timestamp, - MessageID: messageID, - ContextString: msg.Name, - Payload: msg.Payload, - }) - case *Fetch: - b.appendReadyMessage(&FetchEvent{ - MessageID: messageID, - Timestamp: msg.Timestamp, - Method: msg.Method, - URL: msg.URL, - Request: msg.Request, - Response: msg.Response, - Status: msg.Status, - Duration: msg.Duration, - }) - if msg.Status >= 400 { - b.appendReadyMessage(&IssueEvent{ - Type: "bad_request", - MessageID: messageID, - Timestamp: msg.Timestamp, - ContextString: msg.URL, - }) - } - case *GraphQL: - b.appendReadyMessage(&GraphQLEvent{ - MessageID: messageID, - Timestamp: b.timestamp, - OperationKind: msg.OperationKind, - OperationName: msg.OperationName, - Variables: msg.Variables, - Response: msg.Response, - }) - case *StateAction: - b.appendReadyMessage(&StateActionEvent{ - MessageID: messageID, - Timestamp: b.timestamp, - Type: msg.Type, - }) - case *CreateElementNode, - *CreateTextNode: - b.ddDetector.HandleNodeCreation() - case *RemoveNode: - b.ddDetector.HandleNodeRemoval(b.timestamp) - case *CreateDocument: - if rm := b.ddDetector.Build(); rm != nil { - b.appendReadyMessage(rm) - } - } - if rm := b.dcDetector.HandleMessage(message, messageID, b.timestamp); rm != nil { - b.appendReadyMessage(rm) - } -} - -func (b *builder) checkTimeouts(ts int64) bool { - if b.timestamp == 0 { - return false // There was no timestamp events yet - } - - if b.peBuilder.HasInstance() && int64(b.peBuilder.GetTimestamp())+intervals.EVENTS_PAGE_EVENT_TIMEOUT < ts { - b.buildPageEvent() - } - if b.ieBuilder.HasInstance() && int64(b.ieBuilder.GetTimestamp())+intervals.EVENTS_INPUT_EVENT_TIMEOUT < ts { - b.buildInputEvent() - } - if b.ptaBuilder.HasInstance() && int64(b.ptaBuilder.GetStartTimestamp())+intervals.EVENTS_PERFORMANCE_AGGREGATION_TIMEOUT < ts { - b.buildPerformanceTrackAggr() - } - - lastTsGap := ts - int64(b.timestamp) - //b.lastProcessedTimestamp - //log.Printf("checking timeouts for sess %v: %v now, %v sesstime; gap %v",b.sid, ts, b.timestamp, lastTsGap) - if lastTsGap > intervals.EVENTS_SESSION_END_TIMEOUT { - if rm := b.ddDetector.Build(); rm != nil { - b.appendReadyMessage(rm) - } - if rm := b.ciFinder.Build(); rm != nil { - b.appendReadyMessage(rm) - } - if rm := b.miFinder.Build(); rm != nil { - b.appendReadyMessage(rm) - } - if rm := b.crDetector.Build(); rm != nil { - b.appendReadyMessage(rm) - } - if rm := b.dcDetector.HandleReaction(b.timestamp); rm != nil { - b.appendReadyMessage(rm) - } - b.buildSessionEnd() - return true - } - return false -} diff --git a/backend/services/ender/builder/builderMap.go b/backend/services/ender/builder/builderMap.go deleted file mode 100644 index 3f3e4d6e3..000000000 --- a/backend/services/ender/builder/builderMap.go +++ /dev/null @@ -1,51 +0,0 @@ -package builder - -import ( - . "openreplay/backend/pkg/messages" -) - -type builderMap map[uint64]*builder - -func NewBuilderMap() builderMap { - return make(builderMap) -} - -func (m builderMap) GetBuilder(sessionID uint64) *builder { - b := m[sessionID] - if b == nil { - b = NewBuilder() - m[sessionID] = b - b.sid = sessionID - - } - return b -} - -func (m builderMap) HandleMessage(sessionID uint64, msg Message, messageID uint64) { - b := m.GetBuilder(sessionID) - b.handleMessage(msg, messageID) -} - -func (m builderMap) IterateSessionReadyMessages(sessionID uint64, operatingTs int64, iter func(msg Message)) { - b, ok := m[sessionID] - if !ok { - return - } - sessionEnded := b.checkTimeouts(operatingTs) - b.iterateReadyMessage(iter) - if sessionEnded { - delete(m, sessionID) - } -} - -func (m builderMap) IterateReadyMessages(operatingTs int64, iter func(sessionID uint64, msg Message)) { - for sessionID, b := range m { - sessionEnded := b.checkTimeouts(operatingTs) - b.iterateReadyMessage(func(msg Message) { - iter(sessionID, msg) - }) - if sessionEnded { - delete(m, sessionID) - } - } -} diff --git a/backend/services/ender/builder/clikRageDetector.go b/backend/services/ender/builder/clikRageDetector.go deleted file mode 100644 index f25efbcd9..000000000 --- a/backend/services/ender/builder/clikRageDetector.go +++ /dev/null @@ -1,55 +0,0 @@ -package builder - -import ( - "encoding/json" - - . "openreplay/backend/pkg/messages" -) - -const CLICK_TIME_DIFF = 300 -const MIN_CLICKS_IN_A_ROW = 3 - -type clickRageDetector struct { - lastTimestamp uint64 - lastLabel string - firstInARawTimestamp uint64 - firstInARawMessageId uint64 - countsInARow int -} - -func (crd *clickRageDetector) Build() *IssueEvent { - var i *IssueEvent - if crd.countsInARow >= MIN_CLICKS_IN_A_ROW { - payload, _ := json.Marshal(struct{ Count int }{crd.countsInARow}) - i = &IssueEvent{ - Type: "click_rage", - ContextString: crd.lastLabel, - Payload: string(payload), // TODO: json encoder - Timestamp: crd.firstInARawTimestamp, - MessageID: crd.firstInARawMessageId, - } - } - crd.lastTimestamp = 0 - crd.lastLabel = "" - crd.firstInARawTimestamp = 0 - crd.firstInARawMessageId = 0 - crd.countsInARow = 0 - return i -} - -func (crd *clickRageDetector) HandleMouseClick(msg *MouseClick, messageID uint64, timestamp uint64) *IssueEvent { - if crd.lastTimestamp+CLICK_TIME_DIFF > timestamp && crd.lastLabel == msg.Label { - crd.lastTimestamp = timestamp - crd.countsInARow += 1 - return nil - } - i := crd.Build() - if msg.Label != "" { - crd.lastTimestamp = timestamp - crd.lastLabel = msg.Label - crd.firstInARawTimestamp = timestamp - crd.firstInARawMessageId = messageID - crd.countsInARow = 1 - } - return i -} diff --git a/backend/services/ender/builder/cpuIssueFinder.go b/backend/services/ender/builder/cpuIssueFinder.go deleted file mode 100644 index 1af867ea3..000000000 --- a/backend/services/ender/builder/cpuIssueFinder.go +++ /dev/null @@ -1,81 +0,0 @@ -package builder - -import ( - "encoding/json" - - . "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/messages/performance" -) - -const CPU_THRESHOLD = 70 // % out of 100 -const CPU_MIN_DURATION_TRIGGER = 6 * 1000 - -type cpuIssueFinder struct { - startTimestamp uint64 - startMessageID uint64 - lastTimestamp uint64 - maxRate uint64 - contextString string -} - -func (f *cpuIssueFinder) Build() *IssueEvent { - if f.startTimestamp == 0 { - return nil - } - duration := f.lastTimestamp - f.startTimestamp - timestamp := f.startTimestamp - messageID := f.startMessageID - maxRate := f.maxRate - - f.startTimestamp = 0 - f.startMessageID = 0 - f.maxRate = 0 - if duration < CPU_MIN_DURATION_TRIGGER { - return nil - } - - payload, _ := json.Marshal(struct { - Duration uint64 - Rate uint64 - }{duration, maxRate}) - return &IssueEvent{ - Type: "cpu", - Timestamp: timestamp, - MessageID: messageID, - ContextString: f.contextString, - Payload: string(payload), - } -} - -func (f *cpuIssueFinder) HandleSetPageLocation(msg *SetPageLocation) { - f.contextString = msg.URL -} - -func (f *cpuIssueFinder) HandlePerformanceTrack(msg *PerformanceTrack, messageID uint64, timestamp uint64) *IssueEvent { - dt := performance.TimeDiff(timestamp, f.lastTimestamp) - if dt == 0 { - return nil // TODO: handle error - } - - f.lastTimestamp = timestamp - - if msg.Frames == -1 || msg.Ticks == -1 { - return f.Build() - } - - cpuRate := performance.CPURate(msg.Ticks, dt) - - if cpuRate >= CPU_THRESHOLD { - if f.startTimestamp == 0 { - f.startTimestamp = timestamp - f.startMessageID = messageID - } - if f.maxRate < cpuRate { - f.maxRate = cpuRate - } - } else { - return f.Build() - } - - return nil -} diff --git a/backend/services/ender/builder/deadClickDetector.go b/backend/services/ender/builder/deadClickDetector.go deleted file mode 100644 index de977b7bd..000000000 --- a/backend/services/ender/builder/deadClickDetector.go +++ /dev/null @@ -1,66 +0,0 @@ -package builder - -import ( - . "openreplay/backend/pkg/messages" -) - -const CLICK_RELATION_TIME = 1400 - -type deadClickDetector struct { - lastMouseClick *MouseClick - lastTimestamp uint64 - lastMessageID uint64 - inputIDSet map[uint64]bool -} - -func (d *deadClickDetector) HandleReaction(timestamp uint64) *IssueEvent { - var i *IssueEvent - if d.lastMouseClick != nil && d.lastTimestamp+CLICK_RELATION_TIME < timestamp { - i = &IssueEvent{ - Type: "dead_click", - ContextString: d.lastMouseClick.Label, - Timestamp: d.lastTimestamp, - MessageID: d.lastMessageID, - } - } - d.inputIDSet = nil - d.lastMouseClick = nil - d.lastTimestamp = 0 - d.lastMessageID = 0 - return i -} - -func (d *deadClickDetector) HandleMessage(msg Message, messageID uint64, timestamp uint64) *IssueEvent { - var i *IssueEvent - switch m := msg.(type) { - case *SetInputTarget: - if d.inputIDSet == nil { - d.inputIDSet = make(map[uint64]bool) - } - d.inputIDSet[m.ID] = true - case *CreateDocument: - d.inputIDSet = nil - case *MouseClick: - if m.Label == "" { - return nil - } - i = d.HandleReaction(timestamp) - if d.inputIDSet[m.ID] { // ignore if input - return i - } - d.lastMouseClick = m - d.lastTimestamp = timestamp - d.lastMessageID = messageID - case *SetNodeAttribute, - *RemoveNodeAttribute, - *CreateElementNode, - *CreateTextNode, - *MoveNode, - *RemoveNode, - *SetCSSData, - *CSSInsertRule, - *CSSDeleteRule: - i = d.HandleReaction(timestamp) - } - return i -} diff --git a/backend/services/ender/builder/domDropDetector.go b/backend/services/ender/builder/domDropDetector.go deleted file mode 100644 index 3643038c1..000000000 --- a/backend/services/ender/builder/domDropDetector.go +++ /dev/null @@ -1,39 +0,0 @@ -package builder - -import ( - . "openreplay/backend/pkg/messages" -) - -type domDropDetector struct { - removedCount int - lastDropTimestamp uint64 -} - -const DROP_WINDOW = 200 //ms -const CRITICAL_COUNT = 1 // Our login page contains 20. But on crush it removes only roots (1-3 nodes). - -func (dd *domDropDetector) HandleNodeCreation() { - dd.removedCount = 0 - dd.lastDropTimestamp = 0 -} - -func (dd *domDropDetector) HandleNodeRemoval(ts uint64) { - if dd.lastDropTimestamp+DROP_WINDOW > ts { - dd.removedCount += 1 - } else { - dd.removedCount = 1 - } - dd.lastDropTimestamp = ts -} - -func (dd *domDropDetector) Build() *DOMDrop { - var domDrop *DOMDrop - if dd.removedCount >= CRITICAL_COUNT { - domDrop = &DOMDrop{ - Timestamp: dd.lastDropTimestamp, - } - } - dd.removedCount = 0 - dd.lastDropTimestamp = 0 - return domDrop -} diff --git a/backend/services/ender/builder/inputEventBuilder.go b/backend/services/ender/builder/inputEventBuilder.go deleted file mode 100644 index ce1b710ca..000000000 --- a/backend/services/ender/builder/inputEventBuilder.go +++ /dev/null @@ -1,79 +0,0 @@ -package builder - -import ( - . "openreplay/backend/pkg/messages" -) - -type inputLabels map[uint64]string - -type inputEventBuilder struct { - inputEvent *InputEvent - inputLabels inputLabels - inputID uint64 -} - -func NewInputEventBuilder() *inputEventBuilder { - ieBuilder := &inputEventBuilder{} - ieBuilder.ClearLabels() - return ieBuilder -} - -func (b *inputEventBuilder) ClearLabels() { - b.inputLabels = make(inputLabels) -} - -func (b *inputEventBuilder) HandleSetInputTarget(msg *SetInputTarget) *InputEvent { - var inputEvent *InputEvent - if b.inputID != msg.ID { - inputEvent = b.Build() - b.inputID = msg.ID - } - b.inputLabels[msg.ID] = msg.Label - return inputEvent -} - -func (b *inputEventBuilder) HandleSetInputValue(msg *SetInputValue, messageID uint64, timestamp uint64) *InputEvent { - var inputEvent *InputEvent - if b.inputID != msg.ID { - inputEvent = b.Build() - b.inputID = msg.ID - } - if b.inputEvent == nil { - b.inputEvent = &InputEvent{ - MessageID: messageID, - Timestamp: timestamp, - Value: msg.Value, - ValueMasked: msg.Mask > 0, - } - } else { - b.inputEvent.Value = msg.Value - b.inputEvent.ValueMasked = msg.Mask > 0 - } - return inputEvent -} - -func (b *inputEventBuilder) HasInstance() bool { - return b.inputEvent != nil -} - -func (b *inputEventBuilder) GetTimestamp() uint64 { - if b.inputEvent == nil { - return 0 - } - return b.inputEvent.Timestamp -} - -func (b *inputEventBuilder) Build() *InputEvent { - if b.inputEvent == nil { - return nil - } - inputEvent := b.inputEvent - label, exists := b.inputLabels[b.inputID] - if !exists { - return nil - } - inputEvent.Label = label - - b.inputEvent = nil - return inputEvent -} diff --git a/backend/services/ender/builder/memoryIssueFinder.go b/backend/services/ender/builder/memoryIssueFinder.go deleted file mode 100644 index 0d6d71420..000000000 --- a/backend/services/ender/builder/memoryIssueFinder.go +++ /dev/null @@ -1,70 +0,0 @@ -package builder - -import ( - "encoding/json" - "math" - - . "openreplay/backend/pkg/messages" -) - -const MIN_COUNT = 3 -const MEM_RATE_THRESHOLD = 300 // % to average - -type memoryIssueFinder struct { - startMessageID uint64 - startTimestamp uint64 - rate int - count float64 - sum float64 - contextString string -} - -func (f *memoryIssueFinder) Build() *IssueEvent { - if f.startTimestamp == 0 { - return nil - } - payload, _ := json.Marshal(struct{ Rate int }{f.rate - 100}) - i := &IssueEvent{ - Type: "memory", - Timestamp: f.startTimestamp, - MessageID: f.startMessageID, - ContextString: f.contextString, - Payload: string(payload), - } - f.startTimestamp = 0 - f.startMessageID = 0 - f.rate = 0 - return i -} - -func (f *memoryIssueFinder) HandleSetPageLocation(msg *SetPageLocation) { - f.contextString = msg.URL -} - -func (f *memoryIssueFinder) HandlePerformanceTrack(msg *PerformanceTrack, messageID uint64, timestamp uint64) *IssueEvent { - if f.count < MIN_COUNT { - f.sum += float64(msg.UsedJSHeapSize) - f.count++ - return nil - } - - average := f.sum / f.count - rate := int(math.Round(float64(msg.UsedJSHeapSize) / average * 100)) - - f.sum += float64(msg.UsedJSHeapSize) - f.count++ - - if rate >= MEM_RATE_THRESHOLD { - if f.startTimestamp == 0 { - f.startTimestamp = timestamp - f.startMessageID = messageID - } - if f.rate < rate { - f.rate = rate - } - } else { - return f.Build() - } - - return nil -} diff --git a/backend/services/ender/builder/pageEventBuilder.go b/backend/services/ender/builder/pageEventBuilder.go deleted file mode 100644 index 2b0665894..000000000 --- a/backend/services/ender/builder/pageEventBuilder.go +++ /dev/null @@ -1,91 +0,0 @@ -package builder - -import ( - . "openreplay/backend/pkg/messages" -) - -type pageEventBuilder struct { - pageEvent *PageEvent - firstTimingHandled bool -} - -func (b *pageEventBuilder) buildIfTimingsComplete() *PageEvent { - if b.firstTimingHandled { - return b.Build() - } - b.firstTimingHandled = true - return nil -} - -// Only for Loaded: true -func (b *pageEventBuilder) HandleSetPageLocation(msg *SetPageLocation, messageID uint64, timestamp uint64) { - b.pageEvent = &PageEvent{ - URL: msg.URL, - Referrer: msg.Referrer, - Loaded: true, - MessageID: messageID, - Timestamp: timestamp, - } -} - -func (b *pageEventBuilder) HandlePageLoadTiming(msg *PageLoadTiming) *PageEvent { - if !b.HasInstance() { - return nil - } - if msg.RequestStart <= 30000 { - b.pageEvent.RequestStart = msg.RequestStart - } - if msg.ResponseStart <= 30000 { - b.pageEvent.ResponseStart = msg.ResponseStart - } - if msg.ResponseEnd <= 30000 { - b.pageEvent.ResponseEnd = msg.ResponseEnd - } - if msg.DomContentLoadedEventStart <= 30000 { - b.pageEvent.DomContentLoadedEventStart = msg.DomContentLoadedEventStart - } - if msg.DomContentLoadedEventEnd <= 30000 { - b.pageEvent.DomContentLoadedEventEnd = msg.DomContentLoadedEventEnd - } - if msg.LoadEventStart <= 30000 { - b.pageEvent.LoadEventStart = msg.LoadEventStart - } - if msg.LoadEventEnd <= 30000 { - b.pageEvent.LoadEventEnd = msg.LoadEventEnd - } - if msg.FirstPaint <= 30000 { - b.pageEvent.FirstPaint = msg.FirstPaint - } - if msg.FirstContentfulPaint <= 30000 { - b.pageEvent.FirstContentfulPaint = msg.FirstContentfulPaint - } - return b.buildIfTimingsComplete() -} - -func (b *pageEventBuilder) HandlePageRenderTiming(msg *PageRenderTiming) *PageEvent { - if !b.HasInstance() { - return nil - } - b.pageEvent.SpeedIndex = msg.SpeedIndex - b.pageEvent.VisuallyComplete = msg.VisuallyComplete - b.pageEvent.TimeToInteractive = msg.TimeToInteractive - return b.buildIfTimingsComplete() -} - -func (b *pageEventBuilder) HasInstance() bool { - return b.pageEvent != nil -} - -func (b *pageEventBuilder) GetTimestamp() uint64 { - if b.pageEvent == nil { - return 0 - } - return b.pageEvent.Timestamp -} - -func (b *pageEventBuilder) Build() *PageEvent { - pageEvent := b.pageEvent - b.pageEvent = nil - b.firstTimingHandled = false - return pageEvent -} diff --git a/backend/services/ender/builder/performanceTrackAggrBuilder.go b/backend/services/ender/builder/performanceTrackAggrBuilder.go deleted file mode 100644 index 70b751f55..000000000 --- a/backend/services/ender/builder/performanceTrackAggrBuilder.go +++ /dev/null @@ -1,106 +0,0 @@ -package builder - -import ( - "math" - - . "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/messages/performance" -) - -type performanceTrackAggrBuilder struct { - performanceTrackAggr *PerformanceTrackAggr - lastTimestamp uint64 - count float64 - sumFrameRate float64 - sumTickRate float64 - sumTotalJSHeapSize float64 - sumUsedJSHeapSize float64 -} - -func (b *performanceTrackAggrBuilder) start(timestamp uint64) { - b.performanceTrackAggr = &PerformanceTrackAggr{ - TimestampStart: timestamp, - } - b.lastTimestamp = timestamp -} - -func (b *performanceTrackAggrBuilder) HandlePerformanceTrack(msg *PerformanceTrack, timestamp uint64) *PerformanceTrackAggr { - if msg.Frames == -1 || msg.Ticks == -1 || !b.HasInstance() { - performanceTrackAggr := b.Build() - b.start(timestamp) - return performanceTrackAggr - } - - dt := performance.TimeDiff(timestamp, b.lastTimestamp) - if dt == 0 { - return nil // TODO: handle error - } - - frameRate := performance.FrameRate(msg.Frames, dt) - tickRate := performance.TickRate(msg.Ticks, dt) - - fps := uint64(math.Round(frameRate)) - cpu := performance.CPURateFromTickRate(tickRate) - if fps < b.performanceTrackAggr.MinFPS || b.performanceTrackAggr.MinFPS == 0 { - b.performanceTrackAggr.MinFPS = fps - } - if fps > b.performanceTrackAggr.MaxFPS { - b.performanceTrackAggr.MaxFPS = fps - } - if cpu < b.performanceTrackAggr.MinCPU || b.performanceTrackAggr.MinCPU == 0 { - b.performanceTrackAggr.MinCPU = cpu - } - if cpu > b.performanceTrackAggr.MaxCPU { - b.performanceTrackAggr.MaxCPU = cpu - } - if msg.TotalJSHeapSize < b.performanceTrackAggr.MinTotalJSHeapSize || b.performanceTrackAggr.MinTotalJSHeapSize == 0 { - b.performanceTrackAggr.MinTotalJSHeapSize = msg.TotalJSHeapSize - } - if msg.TotalJSHeapSize > b.performanceTrackAggr.MaxTotalJSHeapSize { - b.performanceTrackAggr.MaxTotalJSHeapSize = msg.TotalJSHeapSize - } - if msg.UsedJSHeapSize < b.performanceTrackAggr.MinUsedJSHeapSize || b.performanceTrackAggr.MinUsedJSHeapSize == 0 { - b.performanceTrackAggr.MinUsedJSHeapSize = msg.UsedJSHeapSize - } - if msg.UsedJSHeapSize > b.performanceTrackAggr.MaxUsedJSHeapSize { - b.performanceTrackAggr.MaxUsedJSHeapSize = msg.UsedJSHeapSize - } - b.sumFrameRate += frameRate - b.sumTickRate += tickRate - b.sumTotalJSHeapSize += float64(msg.TotalJSHeapSize) - b.sumUsedJSHeapSize += float64(msg.UsedJSHeapSize) - b.count += 1 - b.lastTimestamp = timestamp - return nil -} - -func (b *performanceTrackAggrBuilder) HasInstance() bool { - return b.performanceTrackAggr != nil -} - -func (b *performanceTrackAggrBuilder) GetStartTimestamp() uint64 { - if b.performanceTrackAggr == nil { - return 0 - } - return b.performanceTrackAggr.TimestampStart -} - -func (b *performanceTrackAggrBuilder) Build() *PerformanceTrackAggr { - var performanceTrackAggr *PerformanceTrackAggr - if b.HasInstance() && b.GetStartTimestamp() != b.lastTimestamp && b.count != 0 { - performanceTrackAggr = b.performanceTrackAggr - performanceTrackAggr.TimestampEnd = b.lastTimestamp - performanceTrackAggr.AvgFPS = uint64(math.Round(b.sumFrameRate / b.count)) - performanceTrackAggr.AvgCPU = 100 - uint64(math.Round(b.sumTickRate*100/b.count)) - performanceTrackAggr.AvgTotalJSHeapSize = uint64(math.Round(b.sumTotalJSHeapSize / b.count)) - performanceTrackAggr.AvgUsedJSHeapSize = uint64(math.Round(b.sumUsedJSHeapSize / b.count)) - } - b.performanceTrackAggr = nil - b.count = 0 - b.sumFrameRate = 0 - b.sumTickRate = 0 - b.sumTotalJSHeapSize = 0 - b.sumUsedJSHeapSize = 0 - b.lastTimestamp = 0 - return performanceTrackAggr -} diff --git a/backend/services/ender/main.go b/backend/services/ender/main.go deleted file mode 100644 index 4170a178e..000000000 --- a/backend/services/ender/main.go +++ /dev/null @@ -1,71 +0,0 @@ -package main - -import ( - "log" - "time" - - "os" - "os/signal" - "syscall" - - "openreplay/backend/pkg/env" - "openreplay/backend/pkg/intervals" - logger "openreplay/backend/pkg/log" - "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/queue" - "openreplay/backend/pkg/queue/types" - "openreplay/backend/services/ender/builder" -) - -func main() { - log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) - - GROUP_EVENTS := env.String("GROUP_ENDER") - TOPIC_TRIGGER := env.String("TOPIC_TRIGGER") - - builderMap := builder.NewBuilderMap() - - statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC")) - - producer := queue.NewProducer() - consumer := queue.NewMessageConsumer( - GROUP_EVENTS, - []string{ - env.String("TOPIC_RAW_WEB"), - env.String("TOPIC_RAW_IOS"), - }, - func(sessionID uint64, msg messages.Message, meta *types.Meta) { - statsLogger.Collect(sessionID, meta) - builderMap.HandleMessage(sessionID, msg, msg.Meta().Index) - }, - false, - ) - - tick := time.Tick(intervals.EVENTS_COMMIT_INTERVAL * time.Millisecond) - - sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - - log.Printf("Ender service started\n") - for { - select { - case sig := <-sigchan: - log.Printf("Caught signal %v: terminating\n", sig) - producer.Close(2000) - consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP) - consumer.Close() - os.Exit(0) - case <-tick: - builderMap.IterateReadyMessages(time.Now().UnixMilli(), func(sessionID uint64, readyMsg messages.Message) { - producer.Produce(TOPIC_TRIGGER, sessionID, messages.Encode(readyMsg)) - }) - // TODO: why exactly do we need Flush here and not in any other place? - producer.Flush(2000) - consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP) - default: - if err := consumer.ConsumeNext(); err != nil { - log.Fatalf("Error on consuming: %v", err) - } - } - } -} diff --git a/backend/services/heuristics/build_hack b/backend/services/heuristics/build_hack new file mode 100644 index 000000000..e69de29bb diff --git a/ee/backend/internal/datasaver/stats.go b/ee/backend/internal/datasaver/stats.go new file mode 100644 index 000000000..501a861aa --- /dev/null +++ b/ee/backend/internal/datasaver/stats.go @@ -0,0 +1,79 @@ +package datasaver + +import ( + "log" + "time" + + "openreplay/backend/pkg/db/clickhouse" + "openreplay/backend/pkg/env" +) + +var ch *clickhouse.Connector +var finalizeTicker <-chan time.Time + +func (si *Saver) InitStats() { + ch = clickhouse.NewConnector(env.String("CLICKHOUSE_STRING")) + if err := ch.Prepare(); err != nil { + log.Fatalf("Clickhouse prepare error: %v\n", err) + } + + finalizeTicker = time.Tick(20 * time.Minute) + +} + +func (si *Saver) InsertStats(session *Session, msg Message) error { + switch m := msg.(type) { + // Web + case *SessionEnd: + return si.pg.InsertWebSession(session) + case *PerformanceTrackAggr: + return si.pg.InsertWebPerformanceTrackAggr(session, m) + case *ClickEvent: + return si.pg.InsertWebClickEvent(session, m) + case *InputEvent: + return si.pg.InsertWebInputEvent(session, m) + // Unique for Web + case *PageEvent: + si.pg.InsertWebPageEvent(session, m) + case *ResourceEvent: + return si.pg.InsertWebResourceEvent(session, m) + case *ErrorEvent: + return si.pg.InsertWebErrorEvent(session, m) + case *LongTask: + return si.pg.InsertLongtask(session, m) + + // IOS + case *IOSSessionEnd: + return si.pg.InsertIOSSession(session) + case *IOSPerformanceAggregated: + return si.pg.InsertIOSPerformanceAggregated(session, m) + case *IOSClickEvent: + return si.pg.InsertIOSClickEvent(session, m) + case *IOSInputEvent: + return si.pg.InsertIOSInputEvent(session, m) + // Unique for Web + case *IOSScreenEnter: + //ch.InsertIOSView(session, m) + case *IOSCrash: + return si.pg.InsertIOSCrash(session, m) + case *IOSNetworkCall: + return si.pg.InsertIOSNetworkCall(session, m) + } + return nil +} + +func (si *Saver) CommitStats() error { + select { + case <-finalizeTicker: + if err := ch.FinaliseSessionsTable(); err != nil { + log.Printf("Stats: FinaliseSessionsTable returned an error. %v", err) + } + default: + } + errCommit := ch.Commit() + errPrepare := ch.Prepare() + if errCommit != nil { + return errCommit + } + return errPrepare +} diff --git a/ee/backend/services/db/stats.go b/ee/backend/services/db/stats.go deleted file mode 100644 index 9d250fc51..000000000 --- a/ee/backend/services/db/stats.go +++ /dev/null @@ -1,83 +0,0 @@ -package main - -import ( - "log" - "time" - - - . "openreplay/backend/pkg/messages" - . "openreplay/backend/pkg/db/types" - "openreplay/backend/pkg/db/clickhouse" - "openreplay/backend/pkg/env" -) - -var ch *clickhouse.Connector -var finalizeTicker <-chan time.Time - -func initStats() { - ch = clickhouse.NewConnector(env.String("CLICKHOUSE_STRING")) - if err := ch.Prepare(); err != nil { - log.Fatalf("Clickhouse prepare error: %v\n", err) - } - - finalizeTicker = time.Tick(20 * time.Minute) - -} - -func insertStats(session *Session, msg Message) error { - switch m := msg.(type) { - // Web - case *SessionEnd: - return ch.InsertWebSession(session) - case *PerformanceTrackAggr: - return ch.InsertWebPerformanceTrackAggr(session, m) - case *ClickEvent: - return ch.InsertWebClickEvent(session, m) - case *InputEvent: - return ch.InsertWebInputEvent(session, m) - // Unique for Web - case *PageEvent: - ch.InsertWebPageEvent(session, m) - case *ResourceEvent: - return ch.InsertWebResourceEvent(session, m) - case *ErrorEvent: - return ch.InsertWebErrorEvent(session, m) - case *LongTask: - return ch.InsertLongtask(session, m) - - // IOS - case *IOSSessionEnd: - return ch.InsertIOSSession(session) - case *IOSPerformanceAggregated: - return ch.InsertIOSPerformanceAggregated(session, m) - case *IOSClickEvent: - return ch.InsertIOSClickEvent(session, m) - case *IOSInputEvent: - return ch.InsertIOSInputEvent(session, m) - // Unique for Web - case *IOSScreenEnter: - //ch.InsertIOSView(session, m) - case *IOSCrash: - return ch.InsertIOSCrash(session, m) - case *IOSNetworkCall: - return ch.InsertIOSNetworkCall(session, m) - } - return nil -} - -func commitStats() error { - select { - case <-finalizeTicker: - if err := ch.FinaliseSessionsTable(); err != nil { - log.Printf("Stats: FinaliseSessionsTable returned an error. %v", err) - } - default: - } - errCommit := ch.Commit() - errPrepare := ch.Prepare() - if errCommit != nil { - return errCommit - } - return errPrepare -} -