Split ender into 2 services (ender and heuristics)

This commit is contained in:
Alexander Zavorotynskiy 2022-05-05 17:37:05 +02:00
parent 700ef0dcc6
commit f4212d6eaa
7 changed files with 186 additions and 38 deletions

View file

@ -2,8 +2,8 @@ package main
import (
"log"
"openreplay/backend/internal/builder"
"openreplay/backend/internal/config/ender"
builder "openreplay/backend/internal/ender"
"time"
"os"
@ -49,16 +49,15 @@ func main() {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
producer.Close(cfg.ProducerTimeout)
consumer.Commit()
consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP)
consumer.Close()
os.Exit(0)
case <-tick:
builderMap.IterateReadyMessages(time.Now().UnixMilli(), func(sessionID uint64, readyMsg messages.Message) {
producer.Produce(cfg.TopicTrigger, sessionID, messages.Encode(readyMsg))
})
// TODO: why exactly do we need Flush here and not in any other place?
producer.Flush(cfg.ProducerTimeout)
consumer.Commit()
consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP)
default:
if err := consumer.ConsumeNext(); err != nil {
log.Fatalf("Error on consuming: %v", err)

View file

@ -0,0 +1,68 @@
package main
import (
"log"
"openreplay/backend/internal/builder"
"openreplay/backend/internal/config/ender"
"openreplay/backend/pkg/intervals"
logger "openreplay/backend/pkg/log"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := ender.New()
builderMap := builder.NewBuilderMap()
statsLogger := logger.NewQueueStats(cfg.LoggerTimeout)
producer := queue.NewProducer()
consumer := queue.NewMessageConsumer(
cfg.GroupEvents,
[]string{
cfg.TopicRawWeb,
cfg.TopicRawIOS,
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
statsLogger.Collect(sessionID, meta)
builderMap.HandleMessage(sessionID, msg, msg.Meta().Index)
},
false,
)
tick := time.Tick(intervals.EVENTS_COMMIT_INTERVAL * time.Millisecond)
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
log.Printf("Ender service started\n")
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
producer.Close(cfg.ProducerTimeout)
consumer.Commit()
consumer.Close()
os.Exit(0)
case <-tick:
builderMap.IterateReadyMessages(time.Now().UnixMilli(), func(sessionID uint64, readyMsg messages.Message) {
producer.Produce(cfg.TopicTrigger, sessionID, messages.Encode(readyMsg))
})
producer.Flush(cfg.ProducerTimeout)
consumer.Commit()
default:
if err := consumer.ConsumeNext(); err != nil {
log.Fatalf("Error on consuming: %v", err)
}
}
}
// Config
}

View file

@ -44,9 +44,9 @@ type builder struct {
readyMsgs []Message
timestamp uint64
lastProcessedTimestamp int64
peBuilder *pageEventBuilder
peBuilder *pageEventBuilder // TODO: DB
ptaBuilder *performanceTrackAggrBuilder
ieBuilder *inputEventBuilder
ieBuilder *inputEventBuilder // TODO: DB
ciFinder *cpuIssueFinder
miFinder *memoryIssueFinder
ddDetector *domDropDetector
@ -117,6 +117,7 @@ func (b *builder) handleMessage(message Message, messageID uint64) {
b.lastProcessedTimestamp = time.Now().UnixMilli()
// Might happen before the first timestamp.
// TODO: to DB
switch msg := message.(type) {
case *SessionStart,
*Metadata,
@ -137,7 +138,7 @@ func (b *builder) handleMessage(message Message, messageID uint64) {
return
}
switch msg := message.(type) {
case *SetPageLocation:
case *SetPageLocation: // TODO: DB
if msg.NavigationStart == 0 {
b.appendReadyMessage(&PageEvent{
URL: msg.URL,
@ -154,11 +155,11 @@ func (b *builder) handleMessage(message Message, messageID uint64) {
b.miFinder.HandleSetPageLocation(msg)
b.ciFinder.HandleSetPageLocation(msg)
}
case *PageLoadTiming:
case *PageLoadTiming: // TODO: DB
if rm := b.peBuilder.HandlePageLoadTiming(msg); rm != nil {
b.appendReadyMessage(rm)
}
case *PageRenderTiming:
case *PageRenderTiming: // TODO: DB
if rm := b.peBuilder.HandlePageRenderTiming(msg); rm != nil {
b.appendReadyMessage(rm)
}
@ -172,20 +173,20 @@ func (b *builder) handleMessage(message Message, messageID uint64) {
if rm := b.miFinder.HandlePerformanceTrack(msg, messageID, b.timestamp); rm != nil {
b.appendReadyMessage(rm)
}
case *SetInputTarget:
case *SetInputTarget: // TODO: DB
if rm := b.ieBuilder.HandleSetInputTarget(msg); rm != nil {
b.appendReadyMessage(rm)
}
case *SetInputValue:
case *SetInputValue: // TODO: DB
if rm := b.ieBuilder.HandleSetInputValue(msg, messageID, b.timestamp); rm != nil {
b.appendReadyMessage(rm)
}
case *MouseClick:
case *MouseClick: // TODO: DB
b.buildInputEvent()
if rm := b.crDetector.HandleMouseClick(msg, messageID, b.timestamp); rm != nil {
b.appendReadyMessage(rm)
}
if msg.Label != "" {
if msg.Label != "" { // TODO: DB
b.appendReadyMessage(&ClickEvent{
MessageID: messageID,
Label: msg.Label,
@ -195,7 +196,7 @@ func (b *builder) handleMessage(message Message, messageID uint64) {
})
}
case *JSException:
b.appendReadyMessage(&ErrorEvent{
b.appendReadyMessage(&ErrorEvent{ // TODO: DB
MessageID: messageID,
Timestamp: b.timestamp,
Source: "js_exception",
@ -206,7 +207,7 @@ func (b *builder) handleMessage(message Message, messageID uint64) {
case *ResourceTiming:
tp := getResourceType(msg.Initiator, msg.URL)
success := msg.Duration != 0
b.appendReadyMessage(&ResourceEvent{
b.appendReadyMessage(&ResourceEvent{ // TODO: DB
MessageID: messageID,
Timestamp: msg.Timestamp,
Duration: msg.Duration,
@ -231,14 +232,14 @@ func (b *builder) handleMessage(message Message, messageID uint64) {
})
}
case *RawCustomEvent:
b.appendReadyMessage(&CustomEvent{
b.appendReadyMessage(&CustomEvent{ // TODO: DB
MessageID: messageID,
Timestamp: b.timestamp,
Name: msg.Name,
Payload: msg.Payload,
})
case *CustomIssue:
b.appendReadyMessage(&IssueEvent{
b.appendReadyMessage(&IssueEvent{ // TODO: DB
Type: "custom",
Timestamp: b.timestamp,
MessageID: messageID,
@ -246,7 +247,7 @@ func (b *builder) handleMessage(message Message, messageID uint64) {
Payload: msg.Payload,
})
case *Fetch:
b.appendReadyMessage(&FetchEvent{
b.appendReadyMessage(&FetchEvent{ // TODO: DB
MessageID: messageID,
Timestamp: msg.Timestamp,
Method: msg.Method,
@ -265,7 +266,7 @@ func (b *builder) handleMessage(message Message, messageID uint64) {
})
}
case *GraphQL:
b.appendReadyMessage(&GraphQLEvent{
b.appendReadyMessage(&GraphQLEvent{ // TODO: DB
MessageID: messageID,
Timestamp: b.timestamp,
OperationKind: msg.OperationKind,
@ -274,7 +275,7 @@ func (b *builder) handleMessage(message Message, messageID uint64) {
Response: msg.Response,
})
case *StateAction:
b.appendReadyMessage(&StateActionEvent{
b.appendReadyMessage(&StateActionEvent{ // TODO: DB
MessageID: messageID,
Timestamp: b.timestamp,
Type: msg.Type,

View file

@ -26,18 +26,6 @@ func (m builderMap) HandleMessage(sessionID uint64, msg Message, messageID uint6
b.handleMessage(msg, messageID)
}
func (m builderMap) IterateSessionReadyMessages(sessionID uint64, operatingTs int64, iter func(msg Message)) {
b, ok := m[sessionID]
if !ok {
return
}
sessionEnded := b.checkTimeouts(operatingTs)
b.iterateReadyMessage(iter)
if sessionEnded {
delete(m, sessionID)
}
}
func (m builderMap) IterateReadyMessages(operatingTs int64, iter func(sessionID uint64, msg Message)) {
for sessionID, b := range m {
sessionEnded := b.checkTimeouts(operatingTs)

View file

@ -0,0 +1,56 @@
package builder
import (
"log"
"openreplay/backend/pkg/intervals"
. "openreplay/backend/pkg/messages"
)
type builder struct {
readyMsgs []Message
timestamp uint64
sid uint64
}
func NewBuilder() *builder {
return &builder{}
}
func (b *builder) appendReadyMessage(msg Message) { // interface is never nil even if it holds nil value
b.readyMsgs = append(b.readyMsgs, msg)
}
func (b *builder) buildSessionEnd() {
if b.timestamp == 0 {
return
}
sessionEnd := &SessionEnd{
Timestamp: b.timestamp,
}
b.appendReadyMessage(sessionEnd)
}
func (b *builder) handleMessage(message Message, messageID uint64) {
timestamp := GetTimestamp(message)
if b.timestamp < timestamp {
b.timestamp = timestamp
}
if b.timestamp == 0 {
log.Printf("Empty timestamp, sessionID: %d, messageID: %d", b.sid, messageID)
return
}
}
func (b *builder) checkTimeouts(ts int64) bool {
if b.timestamp == 0 {
return false // There was no timestamp events yet
}
lastTsGap := ts - int64(b.timestamp)
if lastTsGap > intervals.EVENTS_SESSION_END_TIMEOUT {
b.buildSessionEnd()
return true
}
return false
}

View file

@ -0,0 +1,36 @@
package builder
import (
. "openreplay/backend/pkg/messages"
)
type builderMap map[uint64]*builder
func NewBuilderMap() builderMap {
return make(builderMap)
}
func (m builderMap) GetBuilder(sessionID uint64) *builder {
b := m[sessionID]
if b == nil {
b = NewBuilder()
m[sessionID] = b
b.sid = sessionID
}
return b
}
func (m builderMap) HandleMessage(sessionID uint64, msg Message, messageID uint64) {
b := m.GetBuilder(sessionID)
b.handleMessage(msg, messageID)
}
func (m builderMap) IterateReadyMessages(operatingTs int64, iter func(sessionID uint64, msg Message)) {
for sessionID, b := range m {
sessionEnded := b.checkTimeouts(operatingTs)
if sessionEnded {
delete(m, sessionID)
}
}
}

View file

@ -1,11 +1,11 @@
package intervals
const EVENTS_COMMIT_INTERVAL = 30 * 1000
const HEARTBEAT_INTERVAL = 2 * 60 * 1000
const INTEGRATIONS_REQUEST_INTERVAL = 1 * 60 * 1000
const EVENTS_PAGE_EVENT_TIMEOUT = 2 * 60 * 1000
const EVENTS_INPUT_EVENT_TIMEOUT = 2 * 60 * 1000
const EVENTS_COMMIT_INTERVAL = 30 * 1000 // как часто комитим сообщения в кафке (ender)
const HEARTBEAT_INTERVAL = 2 * 60 * 1000 // максимальный таймаут от трекера в рамках сессии
const INTEGRATIONS_REQUEST_INTERVAL = 1 * 60 * 1000 // интеграции
const EVENTS_PAGE_EVENT_TIMEOUT = 2 * 60 * 1000 // таймаут пейдж ивента
const EVENTS_INPUT_EVENT_TIMEOUT = 2 * 60 * 1000 //
const EVENTS_PERFORMANCE_AGGREGATION_TIMEOUT = 2 * 60 * 1000
const EVENTS_SESSION_END_TIMEOUT = HEARTBEAT_INTERVAL + 30*1000
const EVENTS_SESSION_END_TIMEOUT_WITH_INTEGRATIONS = HEARTBEAT_INTERVAL + 3*60*1000
const EVENTS_BACK_COMMIT_GAP = EVENTS_SESSION_END_TIMEOUT_WITH_INTEGRATIONS + 1*60*1000
const EVENTS_BACK_COMMIT_GAP = EVENTS_SESSION_END_TIMEOUT_WITH_INTEGRATIONS + 1*60*1000 // для бэк коммита