feat(backend/heuristics): message handlers refactoring

This commit is contained in:
Alexander Zavorotynskiy 2022-05-09 16:51:10 +02:00
parent 6ab6d342c0
commit ca9d76624b
16 changed files with 223 additions and 131 deletions

View file

@ -4,6 +4,10 @@ import (
"log"
"openreplay/backend/internal/builder"
"openreplay/backend/internal/config/ender"
"openreplay/backend/internal/handlers"
"openreplay/backend/internal/handlers/custom"
"openreplay/backend/internal/handlers/ios"
"openreplay/backend/internal/handlers/web"
"openreplay/backend/pkg/intervals"
logger "openreplay/backend/pkg/log"
"openreplay/backend/pkg/messages"
@ -18,10 +22,32 @@ import (
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
// Load service configuration
cfg := ender.New()
builderMap := builder.NewBuilderMap()
// Declare message handlers we want to apply for each incoming message
msgHandlers := []handlers.MessageProcessor{
// web handlers
&web.ClickRageDetector{},
&web.CpuIssueDetector{},
&web.DeadClickDetector{},
&web.MemoryIssueDetector{},
&web.PerformanceAggregator{},
// iOS handlers
&ios.AppNotResponding{},
&ios.ClickRageDetector{},
&ios.PerformanceAggregator{},
// Other handlers (you can add your custom handlers here)
&custom.CustomHandler{},
}
// Create handler's aggregator
builderMap := builder.NewBuilderMap(msgHandlers...)
// Init logger
statsLogger := logger.NewQueueStats(cfg.LoggerTimeout)
// Init producer and consumer for data bus
producer := queue.NewProducer()
consumer := queue.NewMessageConsumer(
cfg.GroupEvents,
@ -36,7 +62,7 @@ func main() {
false,
)
log.Printf("Ender service started\n")
log.Printf("Heuristics service started\n")
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

View file

@ -1,31 +1,20 @@
package builder
import (
"openreplay/backend/internal/handlers"
"openreplay/backend/pkg/intervals"
. "openreplay/backend/pkg/messages"
)
type messageProcessor interface {
Handle(message Message, messageID uint64, timestamp uint64) Message
Build() Message
}
type builder struct {
readyMsgs []Message
timestamp uint64
processors []messageProcessor
processors []handlers.MessageProcessor
}
func NewBuilder() *builder {
func NewBuilder(handlers ...handlers.MessageProcessor) *builder {
return &builder{
processors: []messageProcessor{
&performanceTrackAggrBuilder{},
&cpuIssueFinder{},
&memoryIssueFinder{},
// &domDropDetector{},
&clickRageDetector{},
&deadClickDetector{},
},
processors: handlers,
}
}

View file

@ -1,37 +1,44 @@
package builder
import (
"openreplay/backend/internal/handlers"
. "openreplay/backend/pkg/messages"
)
type builderMap map[uint64]*builder
func NewBuilderMap() builderMap {
return make(builderMap)
type builderMap struct {
handlers []handlers.MessageProcessor
sessions map[uint64]*builder
}
func (m builderMap) GetBuilder(sessionID uint64) *builder {
b := m[sessionID]
func NewBuilderMap(handlers ...handlers.MessageProcessor) *builderMap {
return &builderMap{
handlers: handlers,
sessions: make(map[uint64]*builder),
}
}
func (m *builderMap) GetBuilder(sessionID uint64) *builder {
b := m.sessions[sessionID]
if b == nil {
b = NewBuilder()
m[sessionID] = b
b = NewBuilder(m.handlers...)
m.sessions[sessionID] = b
}
return b
}
func (m builderMap) HandleMessage(sessionID uint64, msg Message, messageID uint64) {
func (m *builderMap) HandleMessage(sessionID uint64, msg Message, messageID uint64) {
b := m.GetBuilder(sessionID)
b.handleMessage(msg, messageID)
}
func (m builderMap) IterateReadyMessages(operatingTs int64, iter func(sessionID uint64, msg Message)) {
for sessionID, b := range m {
func (m *builderMap) IterateReadyMessages(operatingTs int64, iter func(sessionID uint64, msg Message)) {
for sessionID, b := range m.sessions {
sessionEnded := b.checkTimeouts(operatingTs)
b.iterateReadyMessage(func(msg Message) {
iter(sessionID, msg)
})
if sessionEnded {
delete(m, sessionID)
delete(m.sessions, sessionID)
}
}
}

View file

@ -0,0 +1,16 @@
package custom
import . "openreplay/backend/pkg/messages"
type CustomHandler struct {
lastTimestamp uint64
}
func (h *CustomHandler) Handle(message Message, messageID uint64, timestamp uint64) Message {
h.lastTimestamp = timestamp
return nil
}
func (h *CustomHandler) Build() Message {
return nil
}

View file

@ -1,34 +1,23 @@
package heuristics
package ios
import (
"openreplay/backend/internal/handlers"
. "openreplay/backend/pkg/messages"
)
// app is not responding detector
const MIN_TIME_AFTER_LAST_HEARTBEAT = 60 * 1000
type anr struct {
readyMessageStore
type AppNotResponding struct {
handlers.ReadyMessageStore
lastLabel string
lastHeartbeatTimestamp uint64
lastHeartbeatIndex uint64
}
func (h *anr) buildIf(timestamp uint64) {
if h.lastHeartbeatTimestamp != 0 && h.lastHeartbeatTimestamp+MIN_TIME_AFTER_LAST_HEARTBEAT <= timestamp {
m := &IOSIssueEvent{
Type: "anr",
ContextString: h.lastLabel,
}
m.Timestamp = h.lastHeartbeatTimestamp
m.Index = h.lastHeartbeatIndex // Associated Index/ MessageID ?
h.append(m)
h.lastHeartbeatTimestamp = 0
h.lastHeartbeatIndex = 0
}
}
func (h *anr) HandleMessage(msg Message) {
switch m := msg.(type) {
func (h *AppNotResponding) Handle(message Message, messageID uint64, timestamp uint64) Message {
switch m := message.(type) {
case *IOSClickEvent:
h.buildIf(m.Timestamp)
h.lastLabel = m.Label
@ -46,4 +35,28 @@ func (h *anr) HandleMessage(msg Message) {
case *IOSSessionEnd:
h.buildIf(m.Timestamp)
}
return nil
}
func (h *AppNotResponding) Build() Message {
//TODO implement me
panic("implement me")
}
func (h *AppNotResponding) buildIf(timestamp uint64) {
if h.lastHeartbeatTimestamp != 0 && h.lastHeartbeatTimestamp+MIN_TIME_AFTER_LAST_HEARTBEAT <= timestamp {
m := &IOSIssueEvent{
Type: "anr",
ContextString: h.lastLabel,
}
m.Timestamp = h.lastHeartbeatTimestamp
m.Index = h.lastHeartbeatIndex // Associated Index/ MessageID ?
h.Append(m)
h.lastHeartbeatTimestamp = 0
h.lastHeartbeatIndex = 0
}
}
func (h *AppNotResponding) HandleMessage(msg Message) {
// TODO: delete it
}

View file

@ -1,14 +1,17 @@
package heuristics
package ios
import (
"openreplay/backend/internal/handlers"
"openreplay/backend/internal/handlers/web"
. "openreplay/backend/pkg/messages"
)
const CLICK_TIME_DIFF = 200
const MIN_CLICKS_IN_A_ROW = 3
type clickrage struct {
readyMessageStore
//const MIN_CLICKS_IN_A_ROW = 3
type ClickRageDetector struct {
handlers.ReadyMessageStore
lastTimestamp uint64
lastLabel string
firstInARawTimestamp uint64
@ -16,30 +19,13 @@ type clickrage struct {
countsInARow int
}
func (h *clickrage) build() {
if h.countsInARow >= MIN_CLICKS_IN_A_ROW {
m := &IOSIssueEvent{
Type: "click_rage",
ContextString: h.lastLabel,
}
m.Timestamp = h.firstInARawTimestamp
m.Index = h.firstInARawSeqIndex // Associated Index/ MessageID ?
h.append(m)
}
h.lastTimestamp = 0
h.lastLabel = ""
h.firstInARawTimestamp = 0
h.firstInARawSeqIndex = 0
h.countsInARow = 0
}
func (h *clickrage) HandleMessage(msg Message) {
switch m := msg.(type) {
func (h *ClickRageDetector) Handle(message Message, messageID uint64, timestamp uint64) Message {
switch m := message.(type) {
case *IOSClickEvent:
if h.lastTimestamp+CLICK_TIME_DIFF < m.Timestamp && h.lastLabel == m.Label {
h.lastTimestamp = m.Timestamp
h.countsInARow += 1
return
return nil
}
h.build()
if m.Label != "" {
@ -52,4 +38,31 @@ func (h *clickrage) HandleMessage(msg Message) {
case *IOSSessionEnd:
h.build()
}
return nil
}
func (h *ClickRageDetector) Build() Message {
//TODO implement me
panic("implement me")
}
func (h *ClickRageDetector) build() {
if h.countsInARow >= web.MIN_CLICKS_IN_A_ROW {
m := &IOSIssueEvent{
Type: "click_rage",
ContextString: h.lastLabel,
}
m.Timestamp = h.firstInARawTimestamp
m.Index = h.firstInARawSeqIndex // Associated Index/ MessageID ?
h.Append(m)
}
h.lastTimestamp = 0
h.lastLabel = ""
h.firstInARawTimestamp = 0
h.firstInARawSeqIndex = 0
h.countsInARow = 0
}
func (h *ClickRageDetector) HandleMessage(msg Message) {
// TODO: delete it
}

View file

@ -1,6 +1,7 @@
package heuristics
package ios
import (
"openreplay/backend/internal/handlers"
. "openreplay/backend/pkg/messages"
)
@ -18,8 +19,8 @@ func (va *valueAggregator) aggregate() uint64 {
return uint64(va.sum / va.count)
}
type performanceAggregator struct {
readyMessageStore
type PerformanceAggregator struct {
handlers.ReadyMessageStore
pa *IOSPerformanceAggregated
fps valueAggregator
cpu valueAggregator
@ -27,30 +28,11 @@ type performanceAggregator struct {
battery valueAggregator
}
func (h *performanceAggregator) build(timestamp uint64) {
if h.pa == nil {
return
}
h.pa.TimestampEnd = timestamp
h.pa.AvgFPS = h.fps.aggregate()
h.pa.AvgCPU = h.cpu.aggregate()
h.pa.AvgMemory = h.memory.aggregate()
h.pa.AvgBattery = h.battery.aggregate()
h.append(h.pa)
h.pa = &IOSPerformanceAggregated{}
for _, agg := range []valueAggregator{h.fps, h.cpu, h.memory, h.battery} {
agg.sum = 0
agg.count = 0
}
}
func (h *performanceAggregator) HandleMessage(msg Message) {
func (h *PerformanceAggregator) Handle(message Message, messageID uint64, timestamp uint64) Message {
if h.pa == nil {
h.pa = &IOSPerformanceAggregated{} // TODO: struct type in messages
}
switch m := msg.(type) { // TODO: All Timestampe messages
switch m := message.(type) { // TODO: All Timestampe messages
case *IOSPerformanceEvent:
if h.pa.TimestampStart == 0 {
h.pa.TimestampStart = m.Timestamp
@ -99,4 +81,33 @@ func (h *performanceAggregator) HandleMessage(msg Message) {
case *IOSSessionEnd:
h.build(m.Timestamp)
}
return nil
}
func (h *PerformanceAggregator) Build() Message {
//TODO implement me
panic("implement me")
}
func (h *PerformanceAggregator) build(timestamp uint64) {
if h.pa == nil {
return
}
h.pa.TimestampEnd = timestamp
h.pa.AvgFPS = h.fps.aggregate()
h.pa.AvgCPU = h.cpu.aggregate()
h.pa.AvgMemory = h.memory.aggregate()
h.pa.AvgBattery = h.battery.aggregate()
h.Append(h.pa)
h.pa = &IOSPerformanceAggregated{}
for _, agg := range []valueAggregator{h.fps, h.cpu, h.memory, h.battery} {
agg.sum = 0
agg.count = 0
}
}
func (h *PerformanceAggregator) HandleMessage(msg Message) {
// TODO: delete it
}

View file

@ -0,0 +1,11 @@
package handlers
import . "openreplay/backend/pkg/messages"
// Heuristic interface - common interface for user's realisations
// U can create your own message handler and easily connect to heuristics service
type MessageProcessor interface {
Handle(message Message, messageID uint64, timestamp uint64) Message
Build() Message
}

View file

@ -1,18 +1,18 @@
package heuristics
package handlers
import (
. "openreplay/backend/pkg/messages"
)
type readyMessageStore struct {
type ReadyMessageStore struct {
store []Message
}
func (s *readyMessageStore) append(msg Message) {
func (s *ReadyMessageStore) Append(msg Message) {
s.store = append(s.store, msg)
}
func (s *readyMessageStore) IterateReadyMessages(cb func(msg Message)) {
func (s *ReadyMessageStore) IterateReadyMessages(cb func(msg Message)) {
for _, msg := range s.store {
cb(msg)
}

View file

@ -1,4 +1,4 @@
package builder
package web
import (
"encoding/json"
@ -6,10 +6,12 @@ import (
. "openreplay/backend/pkg/messages"
)
// TODO: Description of click rage detector
const MAX_TIME_DIFF = 300
const MIN_CLICKS_IN_A_ROW = 3
type clickRageDetector struct {
type ClickRageDetector struct {
lastTimestamp uint64
lastLabel string
firstInARawTimestamp uint64
@ -17,7 +19,7 @@ type clickRageDetector struct {
countsInARow int
}
func (crd *clickRageDetector) reset() {
func (crd *ClickRageDetector) reset() {
crd.lastTimestamp = 0
crd.lastLabel = ""
crd.firstInARawTimestamp = 0
@ -25,7 +27,7 @@ func (crd *clickRageDetector) reset() {
crd.countsInARow = 0
}
func (crd *clickRageDetector) Build() Message {
func (crd *ClickRageDetector) Build() Message {
if crd.countsInARow >= MIN_CLICKS_IN_A_ROW {
payload, _ := json.Marshal(struct{ Count int }{crd.countsInARow})
i := &IssueEvent{
@ -42,7 +44,7 @@ func (crd *clickRageDetector) Build() Message {
return nil
}
func (crd *clickRageDetector) Handle(message Message, messageID uint64, timestamp uint64) Message {
func (crd *ClickRageDetector) Handle(message Message, messageID uint64, timestamp uint64) Message {
switch msg := message.(type) {
case *MouseClick:
// TODO: check if we it is ok to capture clickrages without the connected CleckEvent in db.

View file

@ -1,4 +1,4 @@
package builder
package web
import (
"encoding/json"
@ -7,10 +7,12 @@ import (
"openreplay/backend/pkg/messages/performance"
)
// TODO: Description of cpu issue detector
const CPU_THRESHOLD = 70 // % out of 100
const CPU_MIN_DURATION_TRIGGER = 6 * 1000
type cpuIssueFinder struct {
type CpuIssueDetector struct {
startTimestamp uint64
startMessageID uint64
lastTimestamp uint64
@ -18,7 +20,7 @@ type cpuIssueFinder struct {
contextString string
}
func (f *cpuIssueFinder) Build() Message {
func (f *CpuIssueDetector) Build() Message {
if f.startTimestamp == 0 {
return nil
}
@ -47,7 +49,7 @@ func (f *cpuIssueFinder) Build() Message {
}
}
func (f *cpuIssueFinder) Handle(message Message, messageID uint64, timestamp uint64) Message {
func (f *CpuIssueDetector) Handle(message Message, messageID uint64, timestamp uint64) Message {
switch msg := message.(type) {
case *PerformanceTrack:
dt := performance.TimeDiff(timestamp, f.lastTimestamp)

View file

@ -1,12 +1,14 @@
package builder
package web
import (
. "openreplay/backend/pkg/messages"
)
// TODO: Description of dead click detector
const CLICK_RELATION_TIME = 1400
type deadClickDetector struct {
type DeadClickDetector struct {
lastTimestamp uint64
lastMouseClick *MouseClick
lastClickTimestamp uint64
@ -14,14 +16,14 @@ type deadClickDetector struct {
inputIDSet map[uint64]bool
}
func (d *deadClickDetector) reset() {
func (d *DeadClickDetector) reset() {
d.inputIDSet = nil
d.lastMouseClick = nil
d.lastClickTimestamp = 0
d.lastMessageID = 0
}
func (d *deadClickDetector) handleReaction(timestamp uint64) Message {
func (d *DeadClickDetector) handleReaction(timestamp uint64) Message {
if d.lastMouseClick == nil || d.lastClickTimestamp+CLICK_RELATION_TIME > timestamp { // riaction is instant
d.reset()
return nil
@ -36,11 +38,11 @@ func (d *deadClickDetector) handleReaction(timestamp uint64) Message {
return i
}
func (d *deadClickDetector) Build() Message {
func (d *DeadClickDetector) Build() Message {
return d.handleReaction(d.lastTimestamp)
}
func (d *deadClickDetector) Handle(message Message, messageID uint64, timestamp uint64) Message {
func (d *DeadClickDetector) Handle(message Message, messageID uint64, timestamp uint64) Message {
d.lastTimestamp = timestamp
switch msg := message.(type) {
case *SetInputTarget:

View file

@ -1,9 +1,11 @@
package builder
package web
import (
. "openreplay/backend/pkg/messages"
)
// TODO: Description of dom drop detector
const DROP_WINDOW = 200 //ms
const CRITICAL_COUNT = 1 // Our login page contains 20. But on crush it removes only roots (1-3 nodes).
// TODO: smart detection (making whole DOM tree would eat all memory)

View file

@ -1,4 +1,4 @@
package builder
package web
import (
"encoding/json"
@ -7,10 +7,12 @@ import (
. "openreplay/backend/pkg/messages"
)
// TODO: Description of memory issue detector
const MIN_COUNT = 3
const MEM_RATE_THRESHOLD = 300 // % to average
type memoryIssueFinder struct {
type MemoryIssueDetector struct {
startMessageID uint64
startTimestamp uint64
rate int
@ -19,7 +21,7 @@ type memoryIssueFinder struct {
contextString string
}
func (f *memoryIssueFinder) Build() Message {
func (f *MemoryIssueDetector) Build() Message {
if f.startTimestamp == 0 {
return nil
}
@ -37,7 +39,7 @@ func (f *memoryIssueFinder) Build() Message {
return i
}
func (f *memoryIssueFinder) Handle(message Message, messageID uint64, timestamp uint64) Message {
func (f *MemoryIssueDetector) Handle(message Message, messageID uint64, timestamp uint64) Message {
switch msg := message.(type) {
case *PerformanceTrack:
if f.count < MIN_COUNT {

View file

@ -1,4 +1,4 @@
package builder
package web
import (
"math"
@ -9,7 +9,7 @@ import (
const AGGREGATION_WINDOW = 2 * 60 * 1000
type performanceTrackAggrBuilder struct {
type PerformanceAggregator struct {
*PerformanceTrackAggr
lastTimestamp uint64
count float64
@ -19,14 +19,14 @@ type performanceTrackAggrBuilder struct {
sumUsedJSHeapSize float64
}
func (b *performanceTrackAggrBuilder) start(timestamp uint64) {
func (b *PerformanceAggregator) start(timestamp uint64) {
b.PerformanceTrackAggr = &PerformanceTrackAggr{
TimestampStart: timestamp,
}
b.lastTimestamp = timestamp
}
func (b *performanceTrackAggrBuilder) reset() {
func (b *PerformanceAggregator) reset() {
b.PerformanceTrackAggr = nil
b.count = 0
b.sumFrameRate = 0
@ -36,7 +36,7 @@ func (b *performanceTrackAggrBuilder) reset() {
b.lastTimestamp = 0
}
func (b *performanceTrackAggrBuilder) Handle(message Message, _ uint64, timestamp uint64) Message {
func (b *PerformanceAggregator) Handle(message Message, _ uint64, timestamp uint64) Message {
switch msg := message.(type) {
case *PerformanceTrack:
if b.PerformanceTrackAggr == nil || msg.Frames == -1 || msg.Ticks == -1 {
@ -93,7 +93,7 @@ func (b *performanceTrackAggrBuilder) Handle(message Message, _ uint64, timestam
return nil
}
func (b *performanceTrackAggrBuilder) Build() Message {
func (b *PerformanceAggregator) Build() Message {
if b.PerformanceTrackAggr == nil {
return nil
}

View file

@ -16,11 +16,7 @@ type sessHandler struct {
func newSessHandler() *sessHandler {
return &sessHandler{
handlers: []Handler{
new(clickrage),
new(performanceAggregator),
new(anr),
},
handlers: []Handler{},
}
}