Merge pull request #459 from openreplay/ender_refactoring

Ender refactoring
This commit is contained in:
Alexander 2022-05-13 17:32:14 +02:00 committed by GitHub
commit a69f3f0e83
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
46 changed files with 1588 additions and 1363 deletions

View file

@ -23,7 +23,7 @@ function build_service() {
image="$1"
echo "BUILDING $image"
case "$image" in
http | db | sink)
http | db | sink | ender | heuristics)
echo build http
docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --platform linux/amd64 --build-arg SERVICE_NAME=$image -f ./cmd/Dockerfile .
[[ $PUSH_IMAGE -eq 1 ]] && {

View file

@ -2,9 +2,11 @@ package main
import (
"log"
"openreplay/backend/internal/builder"
"openreplay/backend/internal/config/db"
"openreplay/backend/internal/datasaver"
"openreplay/backend/internal/heuristics"
"openreplay/backend/internal/handlers"
"openreplay/backend/internal/handlers/custom"
"time"
"os"
@ -28,9 +30,21 @@ func main() {
pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres), cfg.ProjectExpirationTimeoutMs)
defer pg.Close()
// HandlersFabric returns the list of message handlers we want to be applied to each incoming message.
handlersFabric := func() []handlers.MessageProcessor {
return []handlers.MessageProcessor{
&custom.EventMapper{},
custom.NewInputEventBuilder(),
custom.NewPageEventBuilder(),
}
}
// Create handler's aggregator
builderMap := builder.NewBuilderMap(handlersFabric)
// Init modules
heurFinder := heuristics.NewHandler()
saver := datasaver.New(pg)
saver.InitStats()
statsLogger := logger.NewQueueStats(cfg.LoggerTimeout)
// Handler logic
@ -60,10 +74,10 @@ func main() {
}
// Handle heuristics and save to temporary queue in memory
heurFinder.HandleMessage(session, msg)
builderMap.HandleMessage(sessionID, msg, msg.Meta().Index)
// Process saved heuristics messages as usual messages above in the code
heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) {
builderMap.IterateSessionReadyMessages(sessionID, func(msg messages.Message) {
// TODO: DRY code (carefully with the return statement logic)
if err := saver.InsertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
@ -82,8 +96,9 @@ func main() {
consumer := queue.NewMessageConsumer(
cfg.GroupDB,
[]string{
cfg.TopicRawWeb,
cfg.TopicRawIOS,
cfg.TopicTrigger,
cfg.TopicTrigger, // to receive SessionEnd events
},
handler,
false,
@ -94,15 +109,18 @@ func main() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
tick := time.Tick(cfg.CommitBatchTimeout)
commitTick := time.Tick(cfg.CommitBatchTimeout)
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
consumer.Close()
os.Exit(0)
case <-tick:
case <-commitTick:
pg.CommitBatches()
if err := saver.CommitStats(); err != nil {
log.Printf("Error on stats commit: %v", err)
}
// TODO?: separate stats & regular messages
if err := consumer.Commit(); err != nil {
log.Printf("Error on consumer commit: %v", err)

79
backend/cmd/ender/main.go Normal file
View file

@ -0,0 +1,79 @@
package main
import (
"log"
"openreplay/backend/internal/config/ender"
"openreplay/backend/internal/sessionender"
"time"
"os"
"os/signal"
"syscall"
"openreplay/backend/pkg/intervals"
logger "openreplay/backend/pkg/log"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
)
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
// Load service configuration
cfg := ender.New()
// Init all modules
statsLogger := logger.NewQueueStats(cfg.LoggerTimeout)
sessions := sessionender.New(intervals.EVENTS_SESSION_END_TIMEOUT)
producer := queue.NewProducer()
consumer := queue.NewMessageConsumer(
cfg.GroupEvents,
[]string{
cfg.TopicRawWeb,
cfg.TopicRawIOS,
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
statsLogger.Collect(sessionID, meta)
sessions.UpdateSession(sessionID, messages.GetTimestamp(msg))
},
false,
)
log.Printf("Ender service started\n")
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
tick := time.Tick(intervals.EVENTS_COMMIT_INTERVAL * time.Millisecond)
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
producer.Close(cfg.ProducerTimeout)
if err := consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP); err != nil {
log.Printf("can't commit messages with offset: %s", err)
}
consumer.Close()
os.Exit(0)
case <-tick:
// Find ended sessions and send notification to other services
sessions.HandleEndedSessions(func(sessionID uint64, timestamp int64) bool {
msg := &messages.SessionEnd{Timestamp: uint64(timestamp)}
if err := producer.Produce(cfg.TopicTrigger, sessionID, messages.Encode(msg)); err != nil {
log.Printf("can't send message to queue: %s", err)
return false
}
return true
})
producer.Flush(cfg.ProducerTimeout)
if err := consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP); err != nil {
log.Printf("can't commit messages with offset: %s", err)
}
default:
if err := consumer.ConsumeNext(); err != nil {
log.Fatalf("Error on consuming: %v", err)
}
}
}
}

View file

@ -0,0 +1,95 @@
package main
import (
"log"
"openreplay/backend/internal/builder"
"openreplay/backend/internal/config/ender"
"openreplay/backend/internal/handlers"
"openreplay/backend/internal/handlers/custom"
"openreplay/backend/internal/handlers/ios"
"openreplay/backend/internal/handlers/web"
"openreplay/backend/pkg/intervals"
logger "openreplay/backend/pkg/log"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
// Load service configuration
cfg := ender.New()
// HandlersFabric returns the list of message handlers we want to be applied to each incoming message.
handlersFabric := func() []handlers.MessageProcessor {
return []handlers.MessageProcessor{
// web handlers
&web.ClickRageDetector{},
&web.CpuIssueDetector{},
&web.DeadClickDetector{},
&web.MemoryIssueDetector{},
&web.NetworkIssueDetector{},
&web.PerformanceAggregator{},
// iOS handlers
&ios.AppNotResponding{},
&ios.ClickRageDetector{},
&ios.PerformanceAggregator{},
// Other handlers (you can add your custom handlers here)
&custom.CustomHandler{},
}
}
// Create handler's aggregator
builderMap := builder.NewBuilderMap(handlersFabric)
// Init logger
statsLogger := logger.NewQueueStats(cfg.LoggerTimeout)
// Init producer and consumer for data bus
producer := queue.NewProducer()
consumer := queue.NewMessageConsumer(
cfg.GroupEvents,
[]string{
cfg.TopicRawWeb,
cfg.TopicRawIOS,
cfg.TopicTrigger, // to receive SessionEnd events
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
statsLogger.Collect(sessionID, meta)
builderMap.HandleMessage(sessionID, msg, msg.Meta().Index)
},
false,
)
log.Printf("Heuristics service started\n")
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
tick := time.Tick(intervals.EVENTS_COMMIT_INTERVAL * time.Millisecond)
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
producer.Close(cfg.ProducerTimeout)
consumer.Commit()
consumer.Close()
os.Exit(0)
case <-tick:
builderMap.IterateReadyMessages(func(sessionID uint64, readyMsg messages.Message) {
producer.Produce(cfg.TopicTrigger, sessionID, messages.Encode(readyMsg))
})
producer.Flush(cfg.ProducerTimeout)
consumer.Commit()
default:
if err := consumer.ConsumeNext(); err != nil {
log.Fatalf("Error on consuming: %v", err)
}
}
}
}

View file

@ -0,0 +1,67 @@
package builder
import (
"time"
"openreplay/backend/internal/handlers"
. "openreplay/backend/pkg/messages"
)
type builder struct {
readyMsgs []Message
timestamp uint64
lastMessageID uint64
lastSystemTime time.Time
processors []handlers.MessageProcessor
ended bool
}
func NewBuilder(handlers ...handlers.MessageProcessor) *builder {
return &builder{
processors: handlers,
}
}
func (b *builder) iterateReadyMessages(iter func(msg Message)) {
for _, readyMsg := range b.readyMsgs {
iter(readyMsg)
}
b.readyMsgs = nil
}
func (b *builder) checkSessionEnd(message Message) {
if _, isEnd := message.(*IOSSessionEnd); isEnd {
b.ended = true
}
if _, isEnd := message.(*SessionEnd); isEnd {
b.ended = true
}
}
func (b *builder) handleMessage(message Message, messageID uint64) {
if messageID < b.lastMessageID {
// May happen in case of duplicated messages in kafka (if `idempotence: false`)
return
}
timestamp := GetTimestamp(message)
if timestamp == 0 {
// May happen in case of messages that are single-in-batch,
// e.g. SessionStart or RawErrorEvent (emitted by `integrations`).
// TODO: make timestamp system transparent;
return
}
if timestamp < b.timestamp {
// Shouldn't happen after messageID check which is done above. TODO: log this case.
return
}
b.timestamp = timestamp
b.lastSystemTime = time.Now()
for _, p := range b.processors {
if rm := p.Handle(message, messageID, b.timestamp); rm != nil {
b.readyMsgs = append(b.readyMsgs, rm)
}
}
b.checkSessionEnd(message)
}

View file

@ -0,0 +1,74 @@
package builder
import (
"time"
"openreplay/backend/internal/handlers"
. "openreplay/backend/pkg/messages"
)
const FORCE_DELETE_TIMEOUT = 4 * time.Hour
type builderMap struct {
handlersFabric func() []handlers.MessageProcessor
sessions map[uint64]*builder
}
func NewBuilderMap(handlersFabric func() []handlers.MessageProcessor) *builderMap {
return &builderMap{
handlersFabric: handlersFabric,
sessions: make(map[uint64]*builder),
}
}
func (m *builderMap) GetBuilder(sessionID uint64) *builder {
b := m.sessions[sessionID]
if b == nil {
b = NewBuilder(m.handlersFabric()...) // Should create new instances
m.sessions[sessionID] = b
}
return b
}
func (m *builderMap) HandleMessage(sessionID uint64, msg Message, messageID uint64) {
b := m.GetBuilder(sessionID)
b.handleMessage(msg, messageID)
}
func (m *builderMap) iterateSessionReadyMessages(sessionID uint64, b *builder, iter func(msg Message)) {
if b.ended || b.lastSystemTime.Add(FORCE_DELETE_TIMEOUT).Before(time.Now()) {
for _, p := range b.processors {
if rm := p.Build(); rm != nil {
b.readyMsgs = append(b.readyMsgs, rm)
}
}
}
b.iterateReadyMessages(iter)
if b.ended {
delete(m.sessions, sessionID)
}
}
func (m *builderMap) IterateReadyMessages(iter func(sessionID uint64, msg Message)) {
for sessionID, session := range m.sessions {
m.iterateSessionReadyMessages(
sessionID,
session,
func(msg Message) {
iter(sessionID, msg)
},
)
}
}
func (m *builderMap) IterateSessionReadyMessages(sessionID uint64, iter func(msg Message)) {
session, ok := m.sessions[sessionID]
if !ok {
return
}
m.iterateSessionReadyMessages(
sessionID,
session,
iter,
)
}

View file

@ -10,6 +10,7 @@ type Config struct {
ProjectExpirationTimeoutMs int64
LoggerTimeout int
GroupDB string
TopicRawWeb string
TopicRawIOS string
TopicTrigger string
CommitBatchTimeout time.Duration
@ -21,6 +22,7 @@ func New() *Config {
ProjectExpirationTimeoutMs: 1000 * 60 * 20,
LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"),
GroupDB: env.String("GROUP_DB"),
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
TopicRawIOS: env.String("TOPIC_RAW_IOS"),
TopicTrigger: env.String("TOPIC_TRIGGER"),
CommitBatchTimeout: 15 * time.Second,

View file

@ -0,0 +1,25 @@
package ender
import (
"openreplay/backend/pkg/env"
)
type Config struct {
GroupEvents string
TopicTrigger string
LoggerTimeout int
TopicRawWeb string
TopicRawIOS string
ProducerTimeout int
}
func New() *Config {
return &Config{
GroupEvents: env.String("GROUP_ENDER"),
TopicTrigger: env.String("TOPIC_TRIGGER"),
LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"),
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
TopicRawIOS: env.String("TOPIC_RAW_IOS"),
ProducerTimeout: 2000,
}
}

View file

@ -5,6 +5,10 @@ import (
. "openreplay/backend/pkg/messages"
)
func (si *Saver) InitStats() {
// noop
}
func (si *Saver) InsertStats(session *Session, msg Message) error {
switch m := msg.(type) {
// Web
@ -17,3 +21,7 @@ func (si *Saver) InsertStats(session *Session, msg Message) error {
}
return nil
}
func (si *Saver) CommitStats() error {
return nil
}

View file

@ -0,0 +1,16 @@
package custom
import . "openreplay/backend/pkg/messages"
type CustomHandler struct {
lastTimestamp uint64
}
func (h *CustomHandler) Handle(message Message, messageID uint64, timestamp uint64) Message {
h.lastTimestamp = timestamp
return nil
}
func (h *CustomHandler) Build() Message {
return nil
}

View file

@ -0,0 +1,135 @@
package custom
import (
"net/url"
"strings"
. "openreplay/backend/pkg/messages"
)
func getURLExtention(URL string) string {
u, err := url.Parse(URL)
if err != nil {
return ""
}
i := strings.LastIndex(u.Path, ".")
return u.Path[i+1:]
}
func getResourceType(initiator string, URL string) string {
switch initiator {
case "xmlhttprequest", "fetch":
return "fetch"
case "img":
return "img"
default:
switch getURLExtention(URL) {
case "css":
return "stylesheet"
case "js":
return "script"
case "png", "gif", "jpg", "jpeg", "svg":
return "img"
case "mp4", "mkv", "ogg", "webm", "avi", "mp3":
return "media"
default:
return "other"
}
}
}
type EventMapper struct{}
func (b *EventMapper) Build() Message {
return nil
}
func (b *EventMapper) Handle(message Message, messageID uint64, timestamp uint64) Message {
switch msg := message.(type) {
case *RawErrorEvent:
// !!! This won't be handled because the Meta() timestamp emitted by `integrations` will be 0
// TODO: move to db directly
return &ErrorEvent{
MessageID: messageID,
Timestamp: msg.Timestamp,
Source: msg.Source,
Name: msg.Name,
Message: msg.Message,
Payload: msg.Payload,
}
case *MouseClick:
if msg.Label != "" {
return &ClickEvent{
MessageID: messageID,
Label: msg.Label,
HesitationTime: msg.HesitationTime,
Timestamp: timestamp,
Selector: msg.Selector,
}
}
case *JSException:
return &ErrorEvent{
MessageID: messageID,
Timestamp: timestamp,
Source: "js_exception",
Name: msg.Name,
Message: msg.Message,
Payload: msg.Payload,
}
case *ResourceTiming:
return &ResourceEvent{
MessageID: messageID,
Timestamp: msg.Timestamp,
Duration: msg.Duration,
TTFB: msg.TTFB,
HeaderSize: msg.HeaderSize,
EncodedBodySize: msg.EncodedBodySize,
DecodedBodySize: msg.DecodedBodySize,
URL: msg.URL,
Type: getResourceType(msg.Initiator, msg.URL),
Success: msg.Duration != 0,
}
case *RawCustomEvent:
return &CustomEvent{
MessageID: messageID,
Timestamp: timestamp,
Name: msg.Name,
Payload: msg.Payload,
}
case *CustomIssue:
return &IssueEvent{
Type: "custom",
Timestamp: timestamp,
MessageID: messageID,
ContextString: msg.Name,
Payload: msg.Payload,
}
case *Fetch:
return &FetchEvent{
MessageID: messageID,
Timestamp: msg.Timestamp,
Method: msg.Method,
URL: msg.URL,
Request: msg.Request,
Response: msg.Response,
Status: msg.Status,
Duration: msg.Duration,
}
case *GraphQL:
return &GraphQLEvent{
MessageID: messageID,
Timestamp: timestamp,
OperationKind: msg.OperationKind,
OperationName: msg.OperationName,
Variables: msg.Variables,
Response: msg.Response,
}
case *StateAction:
return &StateActionEvent{
MessageID: messageID,
Timestamp: timestamp,
Type: msg.Type,
}
}
return nil
}

View file

@ -0,0 +1,77 @@
package custom
import (
. "openreplay/backend/pkg/messages"
)
const INPUT_EVENT_TIMEOUT = 1 * 60 * 1000
type inputLabels map[uint64]string
type inputEventBuilder struct {
inputEvent *InputEvent
inputLabels inputLabels
inputID uint64
}
func NewInputEventBuilder() *inputEventBuilder {
ieBuilder := &inputEventBuilder{}
ieBuilder.clearLabels()
return ieBuilder
}
func (b *inputEventBuilder) clearLabels() {
b.inputLabels = make(inputLabels)
}
func (b *inputEventBuilder) Handle(message Message, messageID uint64, timestamp uint64) Message {
var inputEvent Message = nil
switch msg := message.(type) {
case *SetInputTarget:
if b.inputID != msg.ID {
inputEvent = b.Build()
b.inputID = msg.ID
}
b.inputLabels[msg.ID] = msg.Label
return inputEvent
case *SetInputValue:
if b.inputID != msg.ID {
inputEvent = b.Build()
b.inputID = msg.ID
}
if b.inputEvent == nil {
b.inputEvent = &InputEvent{
MessageID: messageID,
Timestamp: timestamp,
Value: msg.Value,
ValueMasked: msg.Mask > 0,
}
} else {
b.inputEvent.Value = msg.Value
b.inputEvent.ValueMasked = msg.Mask > 0
}
return inputEvent
case *CreateDocument:
inputEvent = b.Build()
b.clearLabels()
return inputEvent
case *MouseClick:
return b.Build()
}
if b.inputEvent != nil && b.inputEvent.Timestamp+INPUT_EVENT_TIMEOUT < timestamp {
return b.Build()
}
return nil
}
func (b *inputEventBuilder) Build() Message {
if b.inputEvent == nil {
return nil
}
inputEvent := b.inputEvent
inputEvent.Label = b.inputLabels[b.inputID] // might be empty string
b.inputEvent = nil
return inputEvent
}

View file

@ -0,0 +1,106 @@
package custom
import (
. "openreplay/backend/pkg/messages"
)
const PAGE_EVENT_TIMEOUT = 1 * 60 * 1000
type pageEventBuilder struct {
pageEvent *PageEvent
firstTimingHandled bool
}
func NewPageEventBuilder() *pageEventBuilder {
ieBuilder := &pageEventBuilder{}
return ieBuilder
}
func (b *pageEventBuilder) Handle(message Message, messageID uint64, timestamp uint64) Message {
switch msg := message.(type) {
case *SetPageLocation:
if msg.NavigationStart == 0 { // routing without new page loading
return &PageEvent{
URL: msg.URL,
Referrer: msg.Referrer,
Loaded: false,
MessageID: messageID,
Timestamp: timestamp,
}
} else {
pageEvent := b.Build()
b.pageEvent = &PageEvent{
URL: msg.URL,
Referrer: msg.Referrer,
Loaded: true,
MessageID: messageID,
Timestamp: timestamp,
}
return pageEvent
}
case *PageLoadTiming:
if b.pageEvent == nil {
break
}
if msg.RequestStart <= 30000 {
b.pageEvent.RequestStart = msg.RequestStart
}
if msg.ResponseStart <= 30000 {
b.pageEvent.ResponseStart = msg.ResponseStart
}
if msg.ResponseEnd <= 30000 {
b.pageEvent.ResponseEnd = msg.ResponseEnd
}
if msg.DomContentLoadedEventStart <= 30000 {
b.pageEvent.DomContentLoadedEventStart = msg.DomContentLoadedEventStart
}
if msg.DomContentLoadedEventEnd <= 30000 {
b.pageEvent.DomContentLoadedEventEnd = msg.DomContentLoadedEventEnd
}
if msg.LoadEventStart <= 30000 {
b.pageEvent.LoadEventStart = msg.LoadEventStart
}
if msg.LoadEventEnd <= 30000 {
b.pageEvent.LoadEventEnd = msg.LoadEventEnd
}
if msg.FirstPaint <= 30000 {
b.pageEvent.FirstPaint = msg.FirstPaint
}
if msg.FirstContentfulPaint <= 30000 {
b.pageEvent.FirstContentfulPaint = msg.FirstContentfulPaint
}
return b.buildIfTimingsComplete()
case *PageRenderTiming:
if b.pageEvent == nil {
break
}
b.pageEvent.SpeedIndex = msg.SpeedIndex
b.pageEvent.VisuallyComplete = msg.VisuallyComplete
b.pageEvent.TimeToInteractive = msg.TimeToInteractive
return b.buildIfTimingsComplete()
}
if b.pageEvent != nil && b.pageEvent.Timestamp+PAGE_EVENT_TIMEOUT < timestamp {
return b.Build()
}
return nil
}
func (b *pageEventBuilder) Build() Message {
if b.pageEvent == nil {
return nil
}
pageEvent := b.pageEvent
b.pageEvent = nil
b.firstTimingHandled = false
return pageEvent
}
func (b *pageEventBuilder) buildIfTimingsComplete() Message {
if b.firstTimingHandled {
return b.Build()
}
b.firstTimingHandled = true
return nil
}

View file

@ -0,0 +1,69 @@
package ios
import (
"openreplay/backend/internal/handlers"
. "openreplay/backend/pkg/messages"
)
/*
Handler name: AppNotResponding
Input events: IOSClickEvent,
IOSInputEvent,
IOSPerformanceEvent,
IOSSessionEnd
Output event: IOSIssueEvent
*/
const MIN_TIME_AFTER_LAST_HEARTBEAT = 60 * 1000
type AppNotResponding struct {
handlers.ReadyMessageStore
lastLabel string
lastHeartbeatTimestamp uint64
lastHeartbeatIndex uint64
lastTimestamp uint64
}
func (h *AppNotResponding) Handle(message Message, messageID uint64, timestamp uint64) Message {
h.lastTimestamp = timestamp
var event Message = nil
switch m := message.(type) {
case *IOSClickEvent:
event = h.build(m.Timestamp)
h.lastLabel = m.Label
h.lastHeartbeatTimestamp = m.Timestamp
h.lastHeartbeatIndex = m.Index
case *IOSInputEvent:
event = h.build(m.Timestamp)
h.lastLabel = m.Label
h.lastHeartbeatTimestamp = m.Timestamp
h.lastHeartbeatIndex = m.Index
case *IOSPerformanceEvent:
event = h.build(m.Timestamp)
h.lastHeartbeatTimestamp = m.Timestamp
h.lastHeartbeatIndex = m.Index
case *IOSSessionEnd:
event = h.build(m.Timestamp)
}
return event
}
func (h *AppNotResponding) Build() Message {
return h.build(h.lastTimestamp)
}
func (h *AppNotResponding) build(timestamp uint64) Message {
if h.lastHeartbeatTimestamp != 0 && h.lastHeartbeatTimestamp+MIN_TIME_AFTER_LAST_HEARTBEAT <= timestamp {
event := &IOSIssueEvent{
Type: "anr",
ContextString: h.lastLabel,
Timestamp: h.lastHeartbeatTimestamp,
}
event.Index = h.lastHeartbeatIndex // Associated Index/ MessageID ?
// Reset
h.lastHeartbeatTimestamp = 0
h.lastHeartbeatIndex = 0
return event
}
return nil
}

View file

@ -1,14 +1,22 @@
package heuristics
package ios
import (
"openreplay/backend/internal/handlers"
"openreplay/backend/internal/handlers/web"
. "openreplay/backend/pkg/messages"
)
const CLICK_TIME_DIFF = 200
const MIN_CLICKS_IN_A_ROW = 3
/*
Handler name: ClickRage
Input events: IOSClickEvent,
IOSSessionEnd
Output event: IOSIssueEvent
*/
type clickrage struct {
readyMessageStore
const CLICK_TIME_DIFF = 200
type ClickRageDetector struct {
handlers.ReadyMessageStore
lastTimestamp uint64
lastLabel string
firstInARawTimestamp uint64
@ -16,32 +24,16 @@ type clickrage struct {
countsInARow int
}
func (h *clickrage) build() {
if h.countsInARow >= MIN_CLICKS_IN_A_ROW {
m := &IOSIssueEvent{
Type: "click_rage",
ContextString: h.lastLabel,
}
m.Timestamp = h.firstInARawTimestamp
m.Index = h.firstInARawSeqIndex // Associated Index/ MessageID ?
h.append(m)
}
h.lastTimestamp = 0
h.lastLabel = ""
h.firstInARawTimestamp = 0
h.firstInARawSeqIndex = 0
h.countsInARow = 0
}
func (h *clickrage) HandleMessage(msg Message) {
switch m := msg.(type) {
func (h *ClickRageDetector) Handle(message Message, messageID uint64, timestamp uint64) Message {
var event Message = nil
switch m := message.(type) {
case *IOSClickEvent:
if h.lastTimestamp+CLICK_TIME_DIFF < m.Timestamp && h.lastLabel == m.Label {
h.lastTimestamp = m.Timestamp
h.countsInARow += 1
return
return nil
}
h.build()
event = h.Build()
if m.Label != "" {
h.lastTimestamp = m.Timestamp
h.lastLabel = m.Label
@ -50,6 +42,25 @@ func (h *clickrage) HandleMessage(msg Message) {
h.countsInARow = 1
}
case *IOSSessionEnd:
h.build()
event = h.Build()
}
return event
}
func (h *ClickRageDetector) Build() Message {
if h.countsInARow >= web.MIN_CLICKS_IN_A_ROW {
event := &IOSIssueEvent{
Type: "click_rage",
ContextString: h.lastLabel,
}
event.Timestamp = h.firstInARawTimestamp
event.Index = h.firstInARawSeqIndex // Associated Index/ MessageID ?
return event
}
h.lastTimestamp = 0
h.lastLabel = ""
h.firstInARawTimestamp = 0
h.firstInARawSeqIndex = 0
h.countsInARow = 0
return nil
}

View file

@ -1,9 +1,17 @@
package heuristics
package ios
import (
"openreplay/backend/internal/handlers"
. "openreplay/backend/pkg/messages"
)
/*
Handler name: PerformanceAggregator
Input events: IOSPerformanceEvent,
IOSSessionEnd
Output event: IssueEvent
*/
const AGGR_TIME = 15 * 60 * 1000
type valueAggregator struct {
@ -18,45 +26,29 @@ func (va *valueAggregator) aggregate() uint64 {
return uint64(va.sum / va.count)
}
type performanceAggregator struct {
readyMessageStore
pa *IOSPerformanceAggregated
fps valueAggregator
cpu valueAggregator
memory valueAggregator
battery valueAggregator
type PerformanceAggregator struct {
handlers.ReadyMessageStore
pa *IOSPerformanceAggregated
fps valueAggregator
cpu valueAggregator
memory valueAggregator
battery valueAggregator
lastTimestamp uint64
}
func (h *performanceAggregator) build(timestamp uint64) {
if h.pa == nil {
return
}
h.pa.TimestampEnd = timestamp
h.pa.AvgFPS = h.fps.aggregate()
h.pa.AvgCPU = h.cpu.aggregate()
h.pa.AvgMemory = h.memory.aggregate()
h.pa.AvgBattery = h.battery.aggregate()
h.append(h.pa)
h.pa = &IOSPerformanceAggregated{}
for _, agg := range []valueAggregator{h.fps, h.cpu, h.memory, h.battery} {
agg.sum = 0
agg.count = 0
}
}
func (h *performanceAggregator) HandleMessage(msg Message) {
func (h *PerformanceAggregator) Handle(message Message, messageID uint64, timestamp uint64) Message {
h.lastTimestamp = timestamp
if h.pa == nil {
h.pa = &IOSPerformanceAggregated{} // TODO: struct type in messages
}
switch m := msg.(type) { // TODO: All Timestampe messages
var event Message = nil
switch m := message.(type) { // TODO: All Timestamp messages
case *IOSPerformanceEvent:
if h.pa.TimestampStart == 0 {
h.pa.TimestampStart = m.Timestamp
}
if h.pa.TimestampStart+AGGR_TIME <= m.Timestamp {
h.build(m.Timestamp)
event = h.Build()
}
switch m.Name {
case "fps":
@ -97,6 +89,28 @@ func (h *performanceAggregator) HandleMessage(msg Message) {
}
}
case *IOSSessionEnd:
h.build(m.Timestamp)
event = h.Build()
}
return event
}
func (h *PerformanceAggregator) Build() Message {
if h.pa == nil {
return nil
}
h.pa.TimestampEnd = h.lastTimestamp
h.pa.AvgFPS = h.fps.aggregate()
h.pa.AvgCPU = h.cpu.aggregate()
h.pa.AvgMemory = h.memory.aggregate()
h.pa.AvgBattery = h.battery.aggregate()
event := h.pa
h.pa = &IOSPerformanceAggregated{}
for _, agg := range []valueAggregator{h.fps, h.cpu, h.memory, h.battery} {
agg.sum = 0
agg.count = 0
}
return event
}

View file

@ -0,0 +1,11 @@
package handlers
import . "openreplay/backend/pkg/messages"
// Heuristic interface - common interface for user's realisations
// U can create your own message handler and easily connect to heuristics service
type MessageProcessor interface {
Handle(message Message, messageID uint64, timestamp uint64) Message
Build() Message
}

View file

@ -1,18 +1,18 @@
package heuristics
package handlers
import (
. "openreplay/backend/pkg/messages"
)
type readyMessageStore struct {
type ReadyMessageStore struct {
store []Message
}
func (s *readyMessageStore) append(msg Message) {
func (s *ReadyMessageStore) Append(msg Message) {
s.store = append(s.store, msg)
}
func (s *readyMessageStore) IterateReadyMessages(cb func(msg Message)) {
func (s *ReadyMessageStore) IterateReadyMessages(cb func(msg Message)) {
for _, msg := range s.store {
cb(msg)
}

View file

@ -0,0 +1,75 @@
package web
import (
"encoding/json"
"log"
. "openreplay/backend/pkg/messages"
)
/*
Handler name: ClickRage
Input event: MouseClick
Output event: IssueEvent
*/
const MAX_TIME_DIFF = 300
const MIN_CLICKS_IN_A_ROW = 3
type ClickRageDetector struct {
lastTimestamp uint64
lastLabel string
firstInARawTimestamp uint64
firstInARawMessageId uint64
countsInARow int
}
func (crd *ClickRageDetector) reset() {
crd.lastTimestamp = 0
crd.lastLabel = ""
crd.firstInARawTimestamp = 0
crd.firstInARawMessageId = 0
crd.countsInARow = 0
}
func (crd *ClickRageDetector) Build() Message {
defer crd.reset()
if crd.countsInARow >= MIN_CLICKS_IN_A_ROW {
payload, err := json.Marshal(struct{ Count int }{crd.countsInARow})
if err != nil {
log.Printf("can't marshal ClickRage payload to json: %s", err)
}
event := &IssueEvent{
Type: "click_rage",
ContextString: crd.lastLabel,
Payload: string(payload),
Timestamp: crd.firstInARawTimestamp,
MessageID: crd.firstInARawMessageId,
}
return event
}
return nil
}
func (crd *ClickRageDetector) Handle(message Message, messageID uint64, timestamp uint64) Message {
switch msg := message.(type) {
case *MouseClick:
// TODO: check if we it is ok to capture clickRage event without the connected ClickEvent in db.
if msg.Label == "" {
return crd.Build()
}
if crd.lastLabel == msg.Label && timestamp-crd.lastTimestamp < MAX_TIME_DIFF {
crd.lastTimestamp = timestamp
crd.countsInARow += 1
return nil
}
event := crd.Build()
crd.lastTimestamp = timestamp
crd.lastLabel = msg.Label
crd.firstInARawTimestamp = timestamp
crd.firstInARawMessageId = messageID
crd.countsInARow = 1
return event
}
return nil
}

View file

@ -0,0 +1,93 @@
package web
import (
"encoding/json"
"log"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/messages/performance"
)
/*
Handler name: CpuIssue
Input events: PerformanceTrack,
SetPageLocation
Output event: IssueEvent
*/
const CPU_THRESHOLD = 70 // % out of 100
const CPU_MIN_DURATION_TRIGGER = 6 * 1000
type CpuIssueDetector struct {
startTimestamp uint64
startMessageID uint64
lastTimestamp uint64
maxRate uint64
contextString string
}
func (f *CpuIssueDetector) Build() Message {
if f.startTimestamp == 0 {
return nil
}
duration := f.lastTimestamp - f.startTimestamp
timestamp := f.startTimestamp
messageID := f.startMessageID
maxRate := f.maxRate
f.startTimestamp = 0
f.startMessageID = 0
f.maxRate = 0
if duration < CPU_MIN_DURATION_TRIGGER {
return nil
}
payload, err := json.Marshal(struct {
Duration uint64
Rate uint64
}{duration, maxRate})
if err != nil {
log.Printf("can't marshal CpuIssue payload to json: %s", err)
}
return &IssueEvent{
Type: "cpu",
Timestamp: timestamp,
MessageID: messageID,
ContextString: f.contextString,
Payload: string(payload),
}
}
func (f *CpuIssueDetector) Handle(message Message, messageID uint64, timestamp uint64) Message {
switch msg := message.(type) {
case *PerformanceTrack:
dt := performance.TimeDiff(timestamp, f.lastTimestamp)
if dt == 0 {
return nil // TODO: handle error
}
f.lastTimestamp = timestamp
if msg.Frames == -1 || msg.Ticks == -1 {
return f.Build()
}
cpuRate := performance.CPURate(msg.Ticks, dt)
if cpuRate >= CPU_THRESHOLD {
if f.startTimestamp == 0 {
f.startTimestamp = timestamp
f.startMessageID = messageID
}
if f.maxRate < cpuRate {
f.maxRate = cpuRate
}
} else {
return f.Build()
}
case *SetPageLocation:
f.contextString = msg.URL
}
return nil
}

View file

@ -0,0 +1,93 @@
package web
import (
. "openreplay/backend/pkg/messages"
)
/*
Handler name: DeadClick
Input events: SetInputTarget,
CreateDocument,
MouseClick,
SetNodeAttribute,
RemoveNodeAttribute,
CreateElementNode,
CreateTextNode,
MoveNode,
RemoveNode,
SetCSSData,
CSSInsertRule,
CSSDeleteRule
Output event: IssueEvent
*/
const CLICK_RELATION_TIME = 1400
type DeadClickDetector struct {
lastTimestamp uint64
lastMouseClick *MouseClick
lastClickTimestamp uint64
lastMessageID uint64
inputIDSet map[uint64]bool
}
func (d *DeadClickDetector) reset() {
d.inputIDSet = nil
d.lastMouseClick = nil
d.lastClickTimestamp = 0
d.lastMessageID = 0
}
func (d *DeadClickDetector) build(timestamp uint64) Message {
defer d.reset()
if d.lastMouseClick == nil || d.lastClickTimestamp+CLICK_RELATION_TIME > timestamp { // reaction is instant
return nil
}
event := &IssueEvent{
Type: "dead_click",
ContextString: d.lastMouseClick.Label,
Timestamp: d.lastClickTimestamp,
MessageID: d.lastMessageID,
}
return event
}
func (d *DeadClickDetector) Build() Message {
return d.build(d.lastTimestamp)
}
func (d *DeadClickDetector) Handle(message Message, messageID uint64, timestamp uint64) Message {
d.lastTimestamp = timestamp
switch msg := message.(type) {
case *SetInputTarget:
if d.inputIDSet == nil {
d.inputIDSet = make(map[uint64]bool)
}
d.inputIDSet[msg.ID] = true
case *CreateDocument:
d.inputIDSet = nil
case *MouseClick:
if msg.Label == "" {
return nil
}
event := d.build(timestamp)
if d.inputIDSet[msg.ID] { // ignore if input
return event
}
d.lastMouseClick = msg
d.lastClickTimestamp = timestamp
d.lastMessageID = messageID
return event
case *SetNodeAttribute,
*RemoveNodeAttribute,
*CreateElementNode,
*CreateTextNode,
*MoveNode,
*RemoveNode,
*SetCSSData,
*CSSInsertRule,
*CSSDeleteRule:
return d.build(timestamp)
}
return nil
}

View file

@ -0,0 +1,55 @@
package web
import (
. "openreplay/backend/pkg/messages"
)
/*
Handler name: DomDrop
Input events: CreateElementNode,
CreateTextNode,
RemoveNode
Output event: DOMDrop
*/
const DROP_WINDOW = 200 //ms
const CRITICAL_COUNT = 1 // Our login page contains 20. But on crush it removes only roots (1-3 nodes).
// TODO: smart detection (making whole DOM tree would eat all memory)
type domDropDetector struct {
removedCount int
lastDropTimestamp uint64
}
func (dd *domDropDetector) reset() {
dd.removedCount = 0
dd.lastDropTimestamp = 0
}
func (dd *domDropDetector) Handle(message Message, _ uint64, timestamp uint64) Message {
switch message.(type) {
case *CreateElementNode,
*CreateTextNode:
dd.removedCount = 0
dd.lastDropTimestamp = 0
case *RemoveNode:
if dd.lastDropTimestamp+DROP_WINDOW > timestamp {
dd.removedCount += 1
} else {
dd.removedCount = 1
}
dd.lastDropTimestamp = timestamp
}
return nil
}
func (dd *domDropDetector) Build() Message {
defer dd.reset()
if dd.removedCount >= CRITICAL_COUNT {
domDrop := &DOMDrop{
Timestamp: dd.lastDropTimestamp,
}
return domDrop
}
return nil
}

View file

@ -0,0 +1,85 @@
package web
import (
"encoding/json"
"log"
"math"
. "openreplay/backend/pkg/messages"
)
/*
Handler name: MemoryIssue
Input events: PerformanceTrack,
SetPageLocation
Output event: IssueEvent
*/
const MIN_COUNT = 3
const MEM_RATE_THRESHOLD = 300 // % to average
type MemoryIssueDetector struct {
startMessageID uint64
startTimestamp uint64
rate int
count float64
sum float64
contextString string
}
func (f *MemoryIssueDetector) reset() {
f.startTimestamp = 0
f.startMessageID = 0
f.rate = 0
}
func (f *MemoryIssueDetector) Build() Message {
if f.startTimestamp == 0 {
return nil
}
payload, err := json.Marshal(struct{ Rate int }{f.rate - 100})
if err != nil {
log.Printf("can't marshal MemoryIssue payload to json: %s", err)
}
event := &IssueEvent{
Type: "memory",
Timestamp: f.startTimestamp,
MessageID: f.startMessageID,
ContextString: f.contextString,
Payload: string(payload),
}
f.reset()
return event
}
func (f *MemoryIssueDetector) Handle(message Message, messageID uint64, timestamp uint64) Message {
switch msg := message.(type) {
case *PerformanceTrack:
if f.count < MIN_COUNT {
f.sum += float64(msg.UsedJSHeapSize)
f.count++
return nil
}
average := f.sum / f.count
rate := int(math.Round(float64(msg.UsedJSHeapSize) / average * 100))
f.sum += float64(msg.UsedJSHeapSize)
f.count++
if rate >= MEM_RATE_THRESHOLD {
if f.startTimestamp == 0 {
f.startTimestamp = timestamp
f.startMessageID = messageID
}
if f.rate < rate {
f.rate = rate
}
} else {
return f.Build()
}
case *SetPageLocation:
f.contextString = msg.URL
}
return nil
}

View file

@ -0,0 +1,47 @@
package web
import (
. "openreplay/backend/pkg/messages"
)
/*
Handler name: NetworkIssue
Input events: ResourceTiming,
Fetch
Output event: IssueEvent
*/
type NetworkIssueDetector struct{}
func (f *NetworkIssueDetector) Build() Message {
return nil
}
func (f *NetworkIssueDetector) Handle(message Message, messageID uint64, timestamp uint64) Message {
switch msg := message.(type) {
case *ResourceTiming:
success := msg.Duration != 0 // The only available way here
if !success {
issueType := "missing_resource"
if msg.Initiator == "fetch" || msg.Initiator == "xmlhttprequest" {
issueType = "bad_request"
}
return &IssueEvent{
Type: issueType,
MessageID: messageID,
Timestamp: msg.Timestamp,
ContextString: msg.URL,
}
}
case *Fetch:
if msg.Status >= 400 {
return &IssueEvent{
Type: "bad_request",
MessageID: messageID,
Timestamp: msg.Timestamp,
ContextString: msg.URL,
}
}
}
return nil
}

View file

@ -0,0 +1,117 @@
package web
import (
"math"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/messages/performance"
)
/*
Handler name: PerformanceAggregator
Input event: PerformanceTrack
Output event: PerformanceTrackAggr
*/
const AGGREGATION_WINDOW = 2 * 60 * 1000
type PerformanceAggregator struct {
*PerformanceTrackAggr
lastTimestamp uint64
count float64
sumFrameRate float64
sumTickRate float64
sumTotalJSHeapSize float64
sumUsedJSHeapSize float64
}
func (b *PerformanceAggregator) start(timestamp uint64) {
b.PerformanceTrackAggr = &PerformanceTrackAggr{
TimestampStart: timestamp,
}
b.lastTimestamp = timestamp
}
func (b *PerformanceAggregator) reset() {
b.PerformanceTrackAggr = nil
b.count = 0
b.sumFrameRate = 0
b.sumTickRate = 0
b.sumTotalJSHeapSize = 0
b.sumUsedJSHeapSize = 0
b.lastTimestamp = 0
}
func (b *PerformanceAggregator) Handle(message Message, _ uint64, timestamp uint64) Message {
switch msg := message.(type) {
case *PerformanceTrack:
if b.PerformanceTrackAggr == nil || msg.Frames == -1 || msg.Ticks == -1 {
pta := b.Build()
b.start(timestamp)
return pta
}
dt := performance.TimeDiff(timestamp, b.lastTimestamp)
if dt == 0 {
return nil // shouldn't happen
}
frameRate := performance.FrameRate(msg.Frames, dt)
tickRate := performance.TickRate(msg.Ticks, dt)
fps := uint64(math.Round(frameRate))
cpu := performance.CPURateFromTickRate(tickRate)
if fps < b.MinFPS || b.MinFPS == 0 {
b.MinFPS = fps
}
if fps > b.MaxFPS {
b.MaxFPS = fps
}
if cpu < b.MinCPU || b.MinCPU == 0 {
b.MinCPU = cpu
}
if cpu > b.MaxCPU {
b.MaxCPU = cpu
}
if msg.TotalJSHeapSize < b.MinTotalJSHeapSize || b.MinTotalJSHeapSize == 0 {
b.MinTotalJSHeapSize = msg.TotalJSHeapSize
}
if msg.TotalJSHeapSize > b.MaxTotalJSHeapSize {
b.MaxTotalJSHeapSize = msg.TotalJSHeapSize
}
if msg.UsedJSHeapSize < b.MinUsedJSHeapSize || b.MinUsedJSHeapSize == 0 {
b.MinUsedJSHeapSize = msg.UsedJSHeapSize
}
if msg.UsedJSHeapSize > b.MaxUsedJSHeapSize {
b.MaxUsedJSHeapSize = msg.UsedJSHeapSize
}
b.sumFrameRate += frameRate
b.sumTickRate += tickRate
b.sumTotalJSHeapSize += float64(msg.TotalJSHeapSize)
b.sumUsedJSHeapSize += float64(msg.UsedJSHeapSize)
b.count += 1
b.lastTimestamp = timestamp
}
if b.PerformanceTrackAggr != nil &&
timestamp-b.PerformanceTrackAggr.TimestampStart >= AGGREGATION_WINDOW {
return b.Build()
}
return nil
}
func (b *PerformanceAggregator) Build() Message {
if b.PerformanceTrackAggr == nil {
return nil
}
if b.count != 0 && b.PerformanceTrackAggr.TimestampStart < b.lastTimestamp { // the last one shouldn't happen
b.PerformanceTrackAggr.TimestampEnd = b.lastTimestamp
b.PerformanceTrackAggr.AvgFPS = uint64(math.Round(b.sumFrameRate / b.count))
b.PerformanceTrackAggr.AvgCPU = 100 - uint64(math.Round(b.sumTickRate*100/b.count))
b.PerformanceTrackAggr.AvgTotalJSHeapSize = uint64(math.Round(b.sumTotalJSHeapSize / b.count))
b.PerformanceTrackAggr.AvgUsedJSHeapSize = uint64(math.Round(b.sumUsedJSHeapSize / b.count))
b.reset()
return b.PerformanceTrackAggr
}
b.reset()
return nil
}

View file

@ -1,49 +0,0 @@
package heuristics
import (
. "openreplay/backend/pkg/messages"
)
const MIN_TIME_AFTER_LAST_HEARTBEAT = 60 * 1000
type anr struct {
readyMessageStore
lastLabel string
lastHeartbeatTimestamp uint64
lastHeartbeatIndex uint64
}
func (h *anr) buildIf(timestamp uint64) {
if h.lastHeartbeatTimestamp != 0 && h.lastHeartbeatTimestamp+MIN_TIME_AFTER_LAST_HEARTBEAT <= timestamp {
m := &IOSIssueEvent{
Type: "anr",
ContextString: h.lastLabel,
}
m.Timestamp = h.lastHeartbeatTimestamp
m.Index = h.lastHeartbeatIndex // Associated Index/ MessageID ?
h.append(m)
h.lastHeartbeatTimestamp = 0
h.lastHeartbeatIndex = 0
}
}
func (h *anr) HandleMessage(msg Message) {
switch m := msg.(type) {
case *IOSClickEvent:
h.buildIf(m.Timestamp)
h.lastLabel = m.Label
h.lastHeartbeatTimestamp = m.Timestamp
h.lastHeartbeatIndex = m.Index
case *IOSInputEvent:
h.buildIf(m.Timestamp)
h.lastLabel = m.Label
h.lastHeartbeatTimestamp = m.Timestamp
h.lastHeartbeatIndex = m.Index
case *IOSPerformanceEvent:
h.buildIf(m.Timestamp)
h.lastHeartbeatTimestamp = m.Timestamp
h.lastHeartbeatIndex = m.Index
case *IOSSessionEnd:
h.buildIf(m.Timestamp)
}
}

View file

@ -1,64 +0,0 @@
package heuristics
import (
. "openreplay/backend/pkg/db/types"
. "openreplay/backend/pkg/messages"
)
type MessageHandler interface {
HandleMessage(Message)
}
type ReadyMessagesIterator interface {
IterateReadyMessages(func(Message))
}
type Handler interface {
MessageHandler
ReadyMessagesIterator
}
type mainHandler map[uint64]*sessHandler
func NewHandler() mainHandler {
return make(mainHandler)
}
func (m mainHandler) getSessHandler(session *Session) *sessHandler {
if session == nil {
//AAAA
return nil
}
s := m[session.SessionID]
if s == nil {
s = newSessHandler(session)
m[session.SessionID] = s
}
return s
}
func (m mainHandler) HandleMessage(session *Session, msg Message) {
s := m.getSessHandler(session)
s.HandleMessage(msg)
}
func (m mainHandler) IterateSessionReadyMessages(sessionID uint64, iter func(msg Message)) {
s, ok := m[sessionID]
if !ok {
return
}
s.IterateReadyMessages(iter)
if s.IsEnded() {
delete(m, sessionID)
}
}
func (m mainHandler) IterateReadyMessages(iter func(sessionID uint64, msg Message)) {
for sessionID, s := range m {
s.IterateReadyMessages(func(msg Message) {
iter(sessionID, msg)
})
if s.IsEnded() {
delete(m, sessionID)
}
}
}

View file

@ -1,45 +0,0 @@
package heuristics
import (
. "openreplay/backend/pkg/db/types"
. "openreplay/backend/pkg/messages"
)
type sessHandler struct {
session *Session
handlers []Handler
ended bool
}
func newSessHandler(session *Session) *sessHandler {
return &sessHandler{
session: session,
handlers: []Handler{
new(clickrage),
new(performanceAggregator),
new(anr),
},
}
}
func (s *sessHandler) HandleMessage(msg Message) {
for _, h := range s.handlers {
h.HandleMessage(msg)
}
if _, isEnd := msg.(*IOSSessionEnd); isEnd {
s.ended = true
}
if _, isEnd := msg.(*SessionEnd); isEnd {
s.ended = true
}
}
func (s *sessHandler) IterateReadyMessages(cb func(msg Message)) {
for _, h := range s.handlers {
h.IterateReadyMessages(cb)
}
}
func (s *sessHandler) IsEnded() bool {
return s.ended
}

View file

@ -0,0 +1,62 @@
package sessionender
import (
"log"
"time"
)
// EndedSessionHandler handler for ended sessions
type EndedSessionHandler func(sessionID uint64, timestamp int64) bool
// session holds information about user's session live status
type session struct {
lastTimestamp int64
isEnded bool
}
// SessionEnder updates timestamp of last message for each session
type SessionEnder struct {
timeout int64
sessions map[uint64]*session // map[sessionID]session
}
func New(timeout int64) *SessionEnder {
return &SessionEnder{
timeout: timeout,
sessions: make(map[uint64]*session),
}
}
// UpdateSession save timestamp for new sessions and update for existing sessions
func (se *SessionEnder) UpdateSession(sessionID, timestamp uint64) {
currTS := int64(timestamp)
if currTS == 0 {
log.Printf("got empty timestamp for sessionID: %d", sessionID)
return
}
sess, ok := se.sessions[sessionID]
if !ok {
se.sessions[sessionID] = &session{
lastTimestamp: currTS,
isEnded: false,
}
return
}
if currTS > sess.lastTimestamp {
sess.lastTimestamp = currTS
sess.isEnded = false
}
}
// HandleEndedSessions runs handler for each ended session and delete information about session in successful case
func (se *SessionEnder) HandleEndedSessions(handler EndedSessionHandler) {
deadLine := time.Now().UnixMilli() - se.timeout
for sessID, sess := range se.sessions {
if sess.isEnded || sess.lastTimestamp < deadLine {
sess.isEnded = true
if handler(sessID, sess.lastTimestamp) {
delete(se.sessions, sessID)
}
}
}
}

View file

@ -1,11 +1,8 @@
package intervals
const EVENTS_COMMIT_INTERVAL = 30 * 1000
const HEARTBEAT_INTERVAL = 2 * 60 * 1000
const INTEGRATIONS_REQUEST_INTERVAL = 1 * 60 * 1000
const EVENTS_PAGE_EVENT_TIMEOUT = 2 * 60 * 1000
const EVENTS_INPUT_EVENT_TIMEOUT = 2 * 60 * 1000
const EVENTS_PERFORMANCE_AGGREGATION_TIMEOUT = 2 * 60 * 1000
const EVENTS_COMMIT_INTERVAL = 30 * 1000 // как часто комитим сообщения в кафке (ender)
const HEARTBEAT_INTERVAL = 2 * 60 * 1000 // максимальный таймаут от трекера в рамках сессии
const INTEGRATIONS_REQUEST_INTERVAL = 1 * 60 * 1000 // интеграции
const EVENTS_SESSION_END_TIMEOUT = HEARTBEAT_INTERVAL + 30*1000
const EVENTS_SESSION_END_TIMEOUT_WITH_INTEGRATIONS = HEARTBEAT_INTERVAL + 3*60*1000
const EVENTS_BACK_COMMIT_GAP = EVENTS_SESSION_END_TIMEOUT_WITH_INTEGRATIONS + 1*60*1000
const EVENTS_BACK_COMMIT_GAP = EVENTS_SESSION_END_TIMEOUT_WITH_INTEGRATIONS + 1*60*1000 // для бэк коммита

View file

@ -1396,7 +1396,7 @@ type IssueEvent struct {
Type string
ContextString string
Context string
Payload string
Payload string // TODO: check, maybe it's better to use empty interface here
}
func (msg *IssueEvent) Encode() []byte {

View file

View file

@ -1,335 +0,0 @@
package builder
import (
"net/url"
"strings"
"time"
"openreplay/backend/pkg/intervals"
. "openreplay/backend/pkg/messages"
)
func getURLExtention(URL string) string {
u, err := url.Parse(URL)
if err != nil {
return ""
}
i := strings.LastIndex(u.Path, ".")
return u.Path[i+1:]
}
func getResourceType(initiator string, URL string) string {
switch initiator {
case "xmlhttprequest", "fetch":
return "fetch"
case "img":
return "img"
default:
switch getURLExtention(URL) {
case "css":
return "stylesheet"
case "js":
return "script"
case "png", "gif", "jpg", "jpeg", "svg":
return "img"
case "mp4", "mkv", "ogg", "webm", "avi", "mp3":
return "media"
default:
return "other"
}
}
}
type builder struct {
readyMsgs []Message
timestamp uint64
lastProcessedTimestamp int64
peBuilder *pageEventBuilder
ptaBuilder *performanceTrackAggrBuilder
ieBuilder *inputEventBuilder
ciFinder *cpuIssueFinder
miFinder *memoryIssueFinder
ddDetector *domDropDetector
crDetector *clickRageDetector
dcDetector *deadClickDetector
integrationsWaiting bool
sid uint64
}
func NewBuilder() *builder {
return &builder{
peBuilder: &pageEventBuilder{},
ptaBuilder: &performanceTrackAggrBuilder{},
ieBuilder: NewInputEventBuilder(),
ciFinder: &cpuIssueFinder{},
miFinder: &memoryIssueFinder{},
ddDetector: &domDropDetector{},
crDetector: &clickRageDetector{},
dcDetector: &deadClickDetector{},
integrationsWaiting: true,
}
}
func (b *builder) appendReadyMessage(msg Message) { // interface is never nil even if it holds nil value
b.readyMsgs = append(b.readyMsgs, msg)
}
func (b *builder) iterateReadyMessage(iter func(msg Message)) {
for _, readyMsg := range b.readyMsgs {
iter(readyMsg)
}
b.readyMsgs = nil
}
func (b *builder) buildSessionEnd() {
if b.timestamp == 0 {
return
}
sessionEnd := &SessionEnd{
Timestamp: b.timestamp, // + delay?
}
b.appendReadyMessage(sessionEnd)
}
func (b *builder) buildPageEvent() {
if msg := b.peBuilder.Build(); msg != nil {
b.appendReadyMessage(msg)
}
}
func (b *builder) buildPerformanceTrackAggr() {
if msg := b.ptaBuilder.Build(); msg != nil {
b.appendReadyMessage(msg)
}
}
func (b *builder) buildInputEvent() {
if msg := b.ieBuilder.Build(); msg != nil {
b.appendReadyMessage(msg)
}
}
func (b *builder) handleMessage(message Message, messageID uint64) {
timestamp := GetTimestamp(message)
if b.timestamp < timestamp { // unnecessary? TODO: test and remove
b.timestamp = timestamp
}
b.lastProcessedTimestamp = time.Now().UnixMilli()
// Might happen before the first timestamp.
switch msg := message.(type) {
case *SessionStart,
*Metadata,
*UserID,
*UserAnonymousID:
b.appendReadyMessage(msg)
case *RawErrorEvent:
b.appendReadyMessage(&ErrorEvent{
MessageID: messageID,
Timestamp: msg.Timestamp,
Source: msg.Source,
Name: msg.Name,
Message: msg.Message,
Payload: msg.Payload,
})
}
if b.timestamp == 0 {
return
}
switch msg := message.(type) {
case *SetPageLocation:
if msg.NavigationStart == 0 {
b.appendReadyMessage(&PageEvent{
URL: msg.URL,
Referrer: msg.Referrer,
Loaded: false,
MessageID: messageID,
Timestamp: b.timestamp,
})
} else {
b.buildPageEvent()
b.buildInputEvent()
b.ieBuilder.ClearLabels()
b.peBuilder.HandleSetPageLocation(msg, messageID, b.timestamp)
b.miFinder.HandleSetPageLocation(msg)
b.ciFinder.HandleSetPageLocation(msg)
}
case *PageLoadTiming:
if rm := b.peBuilder.HandlePageLoadTiming(msg); rm != nil {
b.appendReadyMessage(rm)
}
case *PageRenderTiming:
if rm := b.peBuilder.HandlePageRenderTiming(msg); rm != nil {
b.appendReadyMessage(rm)
}
case *PerformanceTrack:
if rm := b.ptaBuilder.HandlePerformanceTrack(msg, b.timestamp); rm != nil {
b.appendReadyMessage(rm)
}
if rm := b.ciFinder.HandlePerformanceTrack(msg, messageID, b.timestamp); rm != nil {
b.appendReadyMessage(rm)
}
if rm := b.miFinder.HandlePerformanceTrack(msg, messageID, b.timestamp); rm != nil {
b.appendReadyMessage(rm)
}
case *SetInputTarget:
if rm := b.ieBuilder.HandleSetInputTarget(msg); rm != nil {
b.appendReadyMessage(rm)
}
case *SetInputValue:
if rm := b.ieBuilder.HandleSetInputValue(msg, messageID, b.timestamp); rm != nil {
b.appendReadyMessage(rm)
}
case *MouseClick:
b.buildInputEvent()
if rm := b.crDetector.HandleMouseClick(msg, messageID, b.timestamp); rm != nil {
b.appendReadyMessage(rm)
}
if msg.Label != "" {
b.appendReadyMessage(&ClickEvent{
MessageID: messageID,
Label: msg.Label,
HesitationTime: msg.HesitationTime,
Timestamp: b.timestamp,
Selector: msg.Selector,
})
}
case *JSException:
b.appendReadyMessage(&ErrorEvent{
MessageID: messageID,
Timestamp: b.timestamp,
Source: "js_exception",
Name: msg.Name,
Message: msg.Message,
Payload: msg.Payload,
})
case *ResourceTiming:
tp := getResourceType(msg.Initiator, msg.URL)
success := msg.Duration != 0
b.appendReadyMessage(&ResourceEvent{
MessageID: messageID,
Timestamp: msg.Timestamp,
Duration: msg.Duration,
TTFB: msg.TTFB,
HeaderSize: msg.HeaderSize,
EncodedBodySize: msg.EncodedBodySize,
DecodedBodySize: msg.DecodedBodySize,
URL: msg.URL,
Type: tp,
Success: success,
})
if !success {
issueType := "missing_resource"
if tp == "fetch" {
issueType = "bad_request"
}
b.appendReadyMessage(&IssueEvent{
Type: issueType,
MessageID: messageID,
Timestamp: msg.Timestamp,
ContextString: msg.URL,
})
}
case *RawCustomEvent:
b.appendReadyMessage(&CustomEvent{
MessageID: messageID,
Timestamp: b.timestamp,
Name: msg.Name,
Payload: msg.Payload,
})
case *CustomIssue:
b.appendReadyMessage(&IssueEvent{
Type: "custom",
Timestamp: b.timestamp,
MessageID: messageID,
ContextString: msg.Name,
Payload: msg.Payload,
})
case *Fetch:
b.appendReadyMessage(&FetchEvent{
MessageID: messageID,
Timestamp: msg.Timestamp,
Method: msg.Method,
URL: msg.URL,
Request: msg.Request,
Response: msg.Response,
Status: msg.Status,
Duration: msg.Duration,
})
if msg.Status >= 400 {
b.appendReadyMessage(&IssueEvent{
Type: "bad_request",
MessageID: messageID,
Timestamp: msg.Timestamp,
ContextString: msg.URL,
})
}
case *GraphQL:
b.appendReadyMessage(&GraphQLEvent{
MessageID: messageID,
Timestamp: b.timestamp,
OperationKind: msg.OperationKind,
OperationName: msg.OperationName,
Variables: msg.Variables,
Response: msg.Response,
})
case *StateAction:
b.appendReadyMessage(&StateActionEvent{
MessageID: messageID,
Timestamp: b.timestamp,
Type: msg.Type,
})
case *CreateElementNode,
*CreateTextNode:
b.ddDetector.HandleNodeCreation()
case *RemoveNode:
b.ddDetector.HandleNodeRemoval(b.timestamp)
case *CreateDocument:
if rm := b.ddDetector.Build(); rm != nil {
b.appendReadyMessage(rm)
}
}
if rm := b.dcDetector.HandleMessage(message, messageID, b.timestamp); rm != nil {
b.appendReadyMessage(rm)
}
}
func (b *builder) checkTimeouts(ts int64) bool {
if b.timestamp == 0 {
return false // There was no timestamp events yet
}
if b.peBuilder.HasInstance() && int64(b.peBuilder.GetTimestamp())+intervals.EVENTS_PAGE_EVENT_TIMEOUT < ts {
b.buildPageEvent()
}
if b.ieBuilder.HasInstance() && int64(b.ieBuilder.GetTimestamp())+intervals.EVENTS_INPUT_EVENT_TIMEOUT < ts {
b.buildInputEvent()
}
if b.ptaBuilder.HasInstance() && int64(b.ptaBuilder.GetStartTimestamp())+intervals.EVENTS_PERFORMANCE_AGGREGATION_TIMEOUT < ts {
b.buildPerformanceTrackAggr()
}
lastTsGap := ts - int64(b.timestamp)
//b.lastProcessedTimestamp
//log.Printf("checking timeouts for sess %v: %v now, %v sesstime; gap %v",b.sid, ts, b.timestamp, lastTsGap)
if lastTsGap > intervals.EVENTS_SESSION_END_TIMEOUT {
if rm := b.ddDetector.Build(); rm != nil {
b.appendReadyMessage(rm)
}
if rm := b.ciFinder.Build(); rm != nil {
b.appendReadyMessage(rm)
}
if rm := b.miFinder.Build(); rm != nil {
b.appendReadyMessage(rm)
}
if rm := b.crDetector.Build(); rm != nil {
b.appendReadyMessage(rm)
}
if rm := b.dcDetector.HandleReaction(b.timestamp); rm != nil {
b.appendReadyMessage(rm)
}
b.buildSessionEnd()
return true
}
return false
}

View file

@ -1,51 +0,0 @@
package builder
import (
. "openreplay/backend/pkg/messages"
)
type builderMap map[uint64]*builder
func NewBuilderMap() builderMap {
return make(builderMap)
}
func (m builderMap) GetBuilder(sessionID uint64) *builder {
b := m[sessionID]
if b == nil {
b = NewBuilder()
m[sessionID] = b
b.sid = sessionID
}
return b
}
func (m builderMap) HandleMessage(sessionID uint64, msg Message, messageID uint64) {
b := m.GetBuilder(sessionID)
b.handleMessage(msg, messageID)
}
func (m builderMap) IterateSessionReadyMessages(sessionID uint64, operatingTs int64, iter func(msg Message)) {
b, ok := m[sessionID]
if !ok {
return
}
sessionEnded := b.checkTimeouts(operatingTs)
b.iterateReadyMessage(iter)
if sessionEnded {
delete(m, sessionID)
}
}
func (m builderMap) IterateReadyMessages(operatingTs int64, iter func(sessionID uint64, msg Message)) {
for sessionID, b := range m {
sessionEnded := b.checkTimeouts(operatingTs)
b.iterateReadyMessage(func(msg Message) {
iter(sessionID, msg)
})
if sessionEnded {
delete(m, sessionID)
}
}
}

View file

@ -1,55 +0,0 @@
package builder
import (
"encoding/json"
. "openreplay/backend/pkg/messages"
)
const CLICK_TIME_DIFF = 300
const MIN_CLICKS_IN_A_ROW = 3
type clickRageDetector struct {
lastTimestamp uint64
lastLabel string
firstInARawTimestamp uint64
firstInARawMessageId uint64
countsInARow int
}
func (crd *clickRageDetector) Build() *IssueEvent {
var i *IssueEvent
if crd.countsInARow >= MIN_CLICKS_IN_A_ROW {
payload, _ := json.Marshal(struct{ Count int }{crd.countsInARow})
i = &IssueEvent{
Type: "click_rage",
ContextString: crd.lastLabel,
Payload: string(payload), // TODO: json encoder
Timestamp: crd.firstInARawTimestamp,
MessageID: crd.firstInARawMessageId,
}
}
crd.lastTimestamp = 0
crd.lastLabel = ""
crd.firstInARawTimestamp = 0
crd.firstInARawMessageId = 0
crd.countsInARow = 0
return i
}
func (crd *clickRageDetector) HandleMouseClick(msg *MouseClick, messageID uint64, timestamp uint64) *IssueEvent {
if crd.lastTimestamp+CLICK_TIME_DIFF > timestamp && crd.lastLabel == msg.Label {
crd.lastTimestamp = timestamp
crd.countsInARow += 1
return nil
}
i := crd.Build()
if msg.Label != "" {
crd.lastTimestamp = timestamp
crd.lastLabel = msg.Label
crd.firstInARawTimestamp = timestamp
crd.firstInARawMessageId = messageID
crd.countsInARow = 1
}
return i
}

View file

@ -1,81 +0,0 @@
package builder
import (
"encoding/json"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/messages/performance"
)
const CPU_THRESHOLD = 70 // % out of 100
const CPU_MIN_DURATION_TRIGGER = 6 * 1000
type cpuIssueFinder struct {
startTimestamp uint64
startMessageID uint64
lastTimestamp uint64
maxRate uint64
contextString string
}
func (f *cpuIssueFinder) Build() *IssueEvent {
if f.startTimestamp == 0 {
return nil
}
duration := f.lastTimestamp - f.startTimestamp
timestamp := f.startTimestamp
messageID := f.startMessageID
maxRate := f.maxRate
f.startTimestamp = 0
f.startMessageID = 0
f.maxRate = 0
if duration < CPU_MIN_DURATION_TRIGGER {
return nil
}
payload, _ := json.Marshal(struct {
Duration uint64
Rate uint64
}{duration, maxRate})
return &IssueEvent{
Type: "cpu",
Timestamp: timestamp,
MessageID: messageID,
ContextString: f.contextString,
Payload: string(payload),
}
}
func (f *cpuIssueFinder) HandleSetPageLocation(msg *SetPageLocation) {
f.contextString = msg.URL
}
func (f *cpuIssueFinder) HandlePerformanceTrack(msg *PerformanceTrack, messageID uint64, timestamp uint64) *IssueEvent {
dt := performance.TimeDiff(timestamp, f.lastTimestamp)
if dt == 0 {
return nil // TODO: handle error
}
f.lastTimestamp = timestamp
if msg.Frames == -1 || msg.Ticks == -1 {
return f.Build()
}
cpuRate := performance.CPURate(msg.Ticks, dt)
if cpuRate >= CPU_THRESHOLD {
if f.startTimestamp == 0 {
f.startTimestamp = timestamp
f.startMessageID = messageID
}
if f.maxRate < cpuRate {
f.maxRate = cpuRate
}
} else {
return f.Build()
}
return nil
}

View file

@ -1,66 +0,0 @@
package builder
import (
. "openreplay/backend/pkg/messages"
)
const CLICK_RELATION_TIME = 1400
type deadClickDetector struct {
lastMouseClick *MouseClick
lastTimestamp uint64
lastMessageID uint64
inputIDSet map[uint64]bool
}
func (d *deadClickDetector) HandleReaction(timestamp uint64) *IssueEvent {
var i *IssueEvent
if d.lastMouseClick != nil && d.lastTimestamp+CLICK_RELATION_TIME < timestamp {
i = &IssueEvent{
Type: "dead_click",
ContextString: d.lastMouseClick.Label,
Timestamp: d.lastTimestamp,
MessageID: d.lastMessageID,
}
}
d.inputIDSet = nil
d.lastMouseClick = nil
d.lastTimestamp = 0
d.lastMessageID = 0
return i
}
func (d *deadClickDetector) HandleMessage(msg Message, messageID uint64, timestamp uint64) *IssueEvent {
var i *IssueEvent
switch m := msg.(type) {
case *SetInputTarget:
if d.inputIDSet == nil {
d.inputIDSet = make(map[uint64]bool)
}
d.inputIDSet[m.ID] = true
case *CreateDocument:
d.inputIDSet = nil
case *MouseClick:
if m.Label == "" {
return nil
}
i = d.HandleReaction(timestamp)
if d.inputIDSet[m.ID] { // ignore if input
return i
}
d.lastMouseClick = m
d.lastTimestamp = timestamp
d.lastMessageID = messageID
case *SetNodeAttribute,
*RemoveNodeAttribute,
*CreateElementNode,
*CreateTextNode,
*MoveNode,
*RemoveNode,
*SetCSSData,
*CSSInsertRule,
*CSSDeleteRule:
i = d.HandleReaction(timestamp)
}
return i
}

View file

@ -1,39 +0,0 @@
package builder
import (
. "openreplay/backend/pkg/messages"
)
type domDropDetector struct {
removedCount int
lastDropTimestamp uint64
}
const DROP_WINDOW = 200 //ms
const CRITICAL_COUNT = 1 // Our login page contains 20. But on crush it removes only roots (1-3 nodes).
func (dd *domDropDetector) HandleNodeCreation() {
dd.removedCount = 0
dd.lastDropTimestamp = 0
}
func (dd *domDropDetector) HandleNodeRemoval(ts uint64) {
if dd.lastDropTimestamp+DROP_WINDOW > ts {
dd.removedCount += 1
} else {
dd.removedCount = 1
}
dd.lastDropTimestamp = ts
}
func (dd *domDropDetector) Build() *DOMDrop {
var domDrop *DOMDrop
if dd.removedCount >= CRITICAL_COUNT {
domDrop = &DOMDrop{
Timestamp: dd.lastDropTimestamp,
}
}
dd.removedCount = 0
dd.lastDropTimestamp = 0
return domDrop
}

View file

@ -1,79 +0,0 @@
package builder
import (
. "openreplay/backend/pkg/messages"
)
type inputLabels map[uint64]string
type inputEventBuilder struct {
inputEvent *InputEvent
inputLabels inputLabels
inputID uint64
}
func NewInputEventBuilder() *inputEventBuilder {
ieBuilder := &inputEventBuilder{}
ieBuilder.ClearLabels()
return ieBuilder
}
func (b *inputEventBuilder) ClearLabels() {
b.inputLabels = make(inputLabels)
}
func (b *inputEventBuilder) HandleSetInputTarget(msg *SetInputTarget) *InputEvent {
var inputEvent *InputEvent
if b.inputID != msg.ID {
inputEvent = b.Build()
b.inputID = msg.ID
}
b.inputLabels[msg.ID] = msg.Label
return inputEvent
}
func (b *inputEventBuilder) HandleSetInputValue(msg *SetInputValue, messageID uint64, timestamp uint64) *InputEvent {
var inputEvent *InputEvent
if b.inputID != msg.ID {
inputEvent = b.Build()
b.inputID = msg.ID
}
if b.inputEvent == nil {
b.inputEvent = &InputEvent{
MessageID: messageID,
Timestamp: timestamp,
Value: msg.Value,
ValueMasked: msg.Mask > 0,
}
} else {
b.inputEvent.Value = msg.Value
b.inputEvent.ValueMasked = msg.Mask > 0
}
return inputEvent
}
func (b *inputEventBuilder) HasInstance() bool {
return b.inputEvent != nil
}
func (b *inputEventBuilder) GetTimestamp() uint64 {
if b.inputEvent == nil {
return 0
}
return b.inputEvent.Timestamp
}
func (b *inputEventBuilder) Build() *InputEvent {
if b.inputEvent == nil {
return nil
}
inputEvent := b.inputEvent
label, exists := b.inputLabels[b.inputID]
if !exists {
return nil
}
inputEvent.Label = label
b.inputEvent = nil
return inputEvent
}

View file

@ -1,70 +0,0 @@
package builder
import (
"encoding/json"
"math"
. "openreplay/backend/pkg/messages"
)
const MIN_COUNT = 3
const MEM_RATE_THRESHOLD = 300 // % to average
type memoryIssueFinder struct {
startMessageID uint64
startTimestamp uint64
rate int
count float64
sum float64
contextString string
}
func (f *memoryIssueFinder) Build() *IssueEvent {
if f.startTimestamp == 0 {
return nil
}
payload, _ := json.Marshal(struct{ Rate int }{f.rate - 100})
i := &IssueEvent{
Type: "memory",
Timestamp: f.startTimestamp,
MessageID: f.startMessageID,
ContextString: f.contextString,
Payload: string(payload),
}
f.startTimestamp = 0
f.startMessageID = 0
f.rate = 0
return i
}
func (f *memoryIssueFinder) HandleSetPageLocation(msg *SetPageLocation) {
f.contextString = msg.URL
}
func (f *memoryIssueFinder) HandlePerformanceTrack(msg *PerformanceTrack, messageID uint64, timestamp uint64) *IssueEvent {
if f.count < MIN_COUNT {
f.sum += float64(msg.UsedJSHeapSize)
f.count++
return nil
}
average := f.sum / f.count
rate := int(math.Round(float64(msg.UsedJSHeapSize) / average * 100))
f.sum += float64(msg.UsedJSHeapSize)
f.count++
if rate >= MEM_RATE_THRESHOLD {
if f.startTimestamp == 0 {
f.startTimestamp = timestamp
f.startMessageID = messageID
}
if f.rate < rate {
f.rate = rate
}
} else {
return f.Build()
}
return nil
}

View file

@ -1,91 +0,0 @@
package builder
import (
. "openreplay/backend/pkg/messages"
)
type pageEventBuilder struct {
pageEvent *PageEvent
firstTimingHandled bool
}
func (b *pageEventBuilder) buildIfTimingsComplete() *PageEvent {
if b.firstTimingHandled {
return b.Build()
}
b.firstTimingHandled = true
return nil
}
// Only for Loaded: true
func (b *pageEventBuilder) HandleSetPageLocation(msg *SetPageLocation, messageID uint64, timestamp uint64) {
b.pageEvent = &PageEvent{
URL: msg.URL,
Referrer: msg.Referrer,
Loaded: true,
MessageID: messageID,
Timestamp: timestamp,
}
}
func (b *pageEventBuilder) HandlePageLoadTiming(msg *PageLoadTiming) *PageEvent {
if !b.HasInstance() {
return nil
}
if msg.RequestStart <= 30000 {
b.pageEvent.RequestStart = msg.RequestStart
}
if msg.ResponseStart <= 30000 {
b.pageEvent.ResponseStart = msg.ResponseStart
}
if msg.ResponseEnd <= 30000 {
b.pageEvent.ResponseEnd = msg.ResponseEnd
}
if msg.DomContentLoadedEventStart <= 30000 {
b.pageEvent.DomContentLoadedEventStart = msg.DomContentLoadedEventStart
}
if msg.DomContentLoadedEventEnd <= 30000 {
b.pageEvent.DomContentLoadedEventEnd = msg.DomContentLoadedEventEnd
}
if msg.LoadEventStart <= 30000 {
b.pageEvent.LoadEventStart = msg.LoadEventStart
}
if msg.LoadEventEnd <= 30000 {
b.pageEvent.LoadEventEnd = msg.LoadEventEnd
}
if msg.FirstPaint <= 30000 {
b.pageEvent.FirstPaint = msg.FirstPaint
}
if msg.FirstContentfulPaint <= 30000 {
b.pageEvent.FirstContentfulPaint = msg.FirstContentfulPaint
}
return b.buildIfTimingsComplete()
}
func (b *pageEventBuilder) HandlePageRenderTiming(msg *PageRenderTiming) *PageEvent {
if !b.HasInstance() {
return nil
}
b.pageEvent.SpeedIndex = msg.SpeedIndex
b.pageEvent.VisuallyComplete = msg.VisuallyComplete
b.pageEvent.TimeToInteractive = msg.TimeToInteractive
return b.buildIfTimingsComplete()
}
func (b *pageEventBuilder) HasInstance() bool {
return b.pageEvent != nil
}
func (b *pageEventBuilder) GetTimestamp() uint64 {
if b.pageEvent == nil {
return 0
}
return b.pageEvent.Timestamp
}
func (b *pageEventBuilder) Build() *PageEvent {
pageEvent := b.pageEvent
b.pageEvent = nil
b.firstTimingHandled = false
return pageEvent
}

View file

@ -1,106 +0,0 @@
package builder
import (
"math"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/messages/performance"
)
type performanceTrackAggrBuilder struct {
performanceTrackAggr *PerformanceTrackAggr
lastTimestamp uint64
count float64
sumFrameRate float64
sumTickRate float64
sumTotalJSHeapSize float64
sumUsedJSHeapSize float64
}
func (b *performanceTrackAggrBuilder) start(timestamp uint64) {
b.performanceTrackAggr = &PerformanceTrackAggr{
TimestampStart: timestamp,
}
b.lastTimestamp = timestamp
}
func (b *performanceTrackAggrBuilder) HandlePerformanceTrack(msg *PerformanceTrack, timestamp uint64) *PerformanceTrackAggr {
if msg.Frames == -1 || msg.Ticks == -1 || !b.HasInstance() {
performanceTrackAggr := b.Build()
b.start(timestamp)
return performanceTrackAggr
}
dt := performance.TimeDiff(timestamp, b.lastTimestamp)
if dt == 0 {
return nil // TODO: handle error
}
frameRate := performance.FrameRate(msg.Frames, dt)
tickRate := performance.TickRate(msg.Ticks, dt)
fps := uint64(math.Round(frameRate))
cpu := performance.CPURateFromTickRate(tickRate)
if fps < b.performanceTrackAggr.MinFPS || b.performanceTrackAggr.MinFPS == 0 {
b.performanceTrackAggr.MinFPS = fps
}
if fps > b.performanceTrackAggr.MaxFPS {
b.performanceTrackAggr.MaxFPS = fps
}
if cpu < b.performanceTrackAggr.MinCPU || b.performanceTrackAggr.MinCPU == 0 {
b.performanceTrackAggr.MinCPU = cpu
}
if cpu > b.performanceTrackAggr.MaxCPU {
b.performanceTrackAggr.MaxCPU = cpu
}
if msg.TotalJSHeapSize < b.performanceTrackAggr.MinTotalJSHeapSize || b.performanceTrackAggr.MinTotalJSHeapSize == 0 {
b.performanceTrackAggr.MinTotalJSHeapSize = msg.TotalJSHeapSize
}
if msg.TotalJSHeapSize > b.performanceTrackAggr.MaxTotalJSHeapSize {
b.performanceTrackAggr.MaxTotalJSHeapSize = msg.TotalJSHeapSize
}
if msg.UsedJSHeapSize < b.performanceTrackAggr.MinUsedJSHeapSize || b.performanceTrackAggr.MinUsedJSHeapSize == 0 {
b.performanceTrackAggr.MinUsedJSHeapSize = msg.UsedJSHeapSize
}
if msg.UsedJSHeapSize > b.performanceTrackAggr.MaxUsedJSHeapSize {
b.performanceTrackAggr.MaxUsedJSHeapSize = msg.UsedJSHeapSize
}
b.sumFrameRate += frameRate
b.sumTickRate += tickRate
b.sumTotalJSHeapSize += float64(msg.TotalJSHeapSize)
b.sumUsedJSHeapSize += float64(msg.UsedJSHeapSize)
b.count += 1
b.lastTimestamp = timestamp
return nil
}
func (b *performanceTrackAggrBuilder) HasInstance() bool {
return b.performanceTrackAggr != nil
}
func (b *performanceTrackAggrBuilder) GetStartTimestamp() uint64 {
if b.performanceTrackAggr == nil {
return 0
}
return b.performanceTrackAggr.TimestampStart
}
func (b *performanceTrackAggrBuilder) Build() *PerformanceTrackAggr {
var performanceTrackAggr *PerformanceTrackAggr
if b.HasInstance() && b.GetStartTimestamp() != b.lastTimestamp && b.count != 0 {
performanceTrackAggr = b.performanceTrackAggr
performanceTrackAggr.TimestampEnd = b.lastTimestamp
performanceTrackAggr.AvgFPS = uint64(math.Round(b.sumFrameRate / b.count))
performanceTrackAggr.AvgCPU = 100 - uint64(math.Round(b.sumTickRate*100/b.count))
performanceTrackAggr.AvgTotalJSHeapSize = uint64(math.Round(b.sumTotalJSHeapSize / b.count))
performanceTrackAggr.AvgUsedJSHeapSize = uint64(math.Round(b.sumUsedJSHeapSize / b.count))
}
b.performanceTrackAggr = nil
b.count = 0
b.sumFrameRate = 0
b.sumTickRate = 0
b.sumTotalJSHeapSize = 0
b.sumUsedJSHeapSize = 0
b.lastTimestamp = 0
return performanceTrackAggr
}

View file

@ -1,71 +0,0 @@
package main
import (
"log"
"time"
"os"
"os/signal"
"syscall"
"openreplay/backend/pkg/env"
"openreplay/backend/pkg/intervals"
logger "openreplay/backend/pkg/log"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/services/ender/builder"
)
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
GROUP_EVENTS := env.String("GROUP_ENDER")
TOPIC_TRIGGER := env.String("TOPIC_TRIGGER")
builderMap := builder.NewBuilderMap()
statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"))
producer := queue.NewProducer()
consumer := queue.NewMessageConsumer(
GROUP_EVENTS,
[]string{
env.String("TOPIC_RAW_WEB"),
env.String("TOPIC_RAW_IOS"),
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
statsLogger.Collect(sessionID, meta)
builderMap.HandleMessage(sessionID, msg, msg.Meta().Index)
},
false,
)
tick := time.Tick(intervals.EVENTS_COMMIT_INTERVAL * time.Millisecond)
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
log.Printf("Ender service started\n")
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
producer.Close(2000)
consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP)
consumer.Close()
os.Exit(0)
case <-tick:
builderMap.IterateReadyMessages(time.Now().UnixMilli(), func(sessionID uint64, readyMsg messages.Message) {
producer.Produce(TOPIC_TRIGGER, sessionID, messages.Encode(readyMsg))
})
// TODO: why exactly do we need Flush here and not in any other place?
producer.Flush(2000)
consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP)
default:
if err := consumer.ConsumeNext(); err != nil {
log.Fatalf("Error on consuming: %v", err)
}
}
}
}

View file

View file

@ -0,0 +1,79 @@
package datasaver
import (
"log"
"time"
"openreplay/backend/pkg/db/clickhouse"
"openreplay/backend/pkg/env"
)
var ch *clickhouse.Connector
var finalizeTicker <-chan time.Time
func (si *Saver) InitStats() {
ch = clickhouse.NewConnector(env.String("CLICKHOUSE_STRING"))
if err := ch.Prepare(); err != nil {
log.Fatalf("Clickhouse prepare error: %v\n", err)
}
finalizeTicker = time.Tick(20 * time.Minute)
}
func (si *Saver) InsertStats(session *Session, msg Message) error {
switch m := msg.(type) {
// Web
case *SessionEnd:
return si.pg.InsertWebSession(session)
case *PerformanceTrackAggr:
return si.pg.InsertWebPerformanceTrackAggr(session, m)
case *ClickEvent:
return si.pg.InsertWebClickEvent(session, m)
case *InputEvent:
return si.pg.InsertWebInputEvent(session, m)
// Unique for Web
case *PageEvent:
si.pg.InsertWebPageEvent(session, m)
case *ResourceEvent:
return si.pg.InsertWebResourceEvent(session, m)
case *ErrorEvent:
return si.pg.InsertWebErrorEvent(session, m)
case *LongTask:
return si.pg.InsertLongtask(session, m)
// IOS
case *IOSSessionEnd:
return si.pg.InsertIOSSession(session)
case *IOSPerformanceAggregated:
return si.pg.InsertIOSPerformanceAggregated(session, m)
case *IOSClickEvent:
return si.pg.InsertIOSClickEvent(session, m)
case *IOSInputEvent:
return si.pg.InsertIOSInputEvent(session, m)
// Unique for Web
case *IOSScreenEnter:
//ch.InsertIOSView(session, m)
case *IOSCrash:
return si.pg.InsertIOSCrash(session, m)
case *IOSNetworkCall:
return si.pg.InsertIOSNetworkCall(session, m)
}
return nil
}
func (si *Saver) CommitStats() error {
select {
case <-finalizeTicker:
if err := ch.FinaliseSessionsTable(); err != nil {
log.Printf("Stats: FinaliseSessionsTable returned an error. %v", err)
}
default:
}
errCommit := ch.Commit()
errPrepare := ch.Prepare()
if errCommit != nil {
return errCommit
}
return errPrepare
}

View file

@ -1,83 +0,0 @@
package main
import (
"log"
"time"
. "openreplay/backend/pkg/messages"
. "openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/db/clickhouse"
"openreplay/backend/pkg/env"
)
var ch *clickhouse.Connector
var finalizeTicker <-chan time.Time
func initStats() {
ch = clickhouse.NewConnector(env.String("CLICKHOUSE_STRING"))
if err := ch.Prepare(); err != nil {
log.Fatalf("Clickhouse prepare error: %v\n", err)
}
finalizeTicker = time.Tick(20 * time.Minute)
}
func insertStats(session *Session, msg Message) error {
switch m := msg.(type) {
// Web
case *SessionEnd:
return ch.InsertWebSession(session)
case *PerformanceTrackAggr:
return ch.InsertWebPerformanceTrackAggr(session, m)
case *ClickEvent:
return ch.InsertWebClickEvent(session, m)
case *InputEvent:
return ch.InsertWebInputEvent(session, m)
// Unique for Web
case *PageEvent:
ch.InsertWebPageEvent(session, m)
case *ResourceEvent:
return ch.InsertWebResourceEvent(session, m)
case *ErrorEvent:
return ch.InsertWebErrorEvent(session, m)
case *LongTask:
return ch.InsertLongtask(session, m)
// IOS
case *IOSSessionEnd:
return ch.InsertIOSSession(session)
case *IOSPerformanceAggregated:
return ch.InsertIOSPerformanceAggregated(session, m)
case *IOSClickEvent:
return ch.InsertIOSClickEvent(session, m)
case *IOSInputEvent:
return ch.InsertIOSInputEvent(session, m)
// Unique for Web
case *IOSScreenEnter:
//ch.InsertIOSView(session, m)
case *IOSCrash:
return ch.InsertIOSCrash(session, m)
case *IOSNetworkCall:
return ch.InsertIOSNetworkCall(session, m)
}
return nil
}
func commitStats() error {
select {
case <-finalizeTicker:
if err := ch.FinaliseSessionsTable(); err != nil {
log.Printf("Stats: FinaliseSessionsTable returned an error. %v", err)
}
default:
}
errCommit := ch.Commit()
errPrepare := ch.Prepare()
if errCommit != nil {
return errCommit
}
return errPrepare
}