feat(backend/db): prepared db service for refactoring

This commit is contained in:
Alexander Zavorotynskiy 2022-05-10 14:11:41 +02:00
parent ca9d76624b
commit 47007eb9d7
10 changed files with 368 additions and 93 deletions

View file

@ -2,9 +2,12 @@ package main
import (
"log"
"openreplay/backend/internal/builder"
"openreplay/backend/internal/config/db"
"openreplay/backend/internal/datasaver"
"openreplay/backend/internal/heuristics"
"openreplay/backend/internal/handlers"
"openreplay/backend/internal/handlers/custom"
"openreplay/backend/pkg/intervals"
"time"
"os"
@ -28,8 +31,17 @@ func main() {
pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres), cfg.ProjectExpirationTimeoutMs)
defer pg.Close()
// Declare message handlers we want to apply for each incoming message
msgHandlers := []handlers.MessageProcessor{
custom.NewMainHandler(),
custom.NewInputEventBuilder(),
custom.NewPageEventBuilder(),
}
// Create handler's aggregator
builderMap := builder.NewBuilderMap(msgHandlers...)
// Init modules
heurFinder := heuristics.NewHandler()
saver := datasaver.New(pg)
statsLogger := logger.NewQueueStats(cfg.LoggerTimeout)
@ -42,6 +54,7 @@ func main() {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg)
}
// TODO: can we lose data here because of db error?
return
}
@ -60,10 +73,10 @@ func main() {
}
// Handle heuristics and save to temporary queue in memory
heurFinder.HandleMessage(sessionID, msg)
builderMap.HandleMessage(sessionID, msg, msg.Meta().Index)
// Process saved heuristics messages as usual messages above in the code
heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) {
builderMap.IterateSessionReadyMessages(sessionID, func(msg messages.Message) {
// TODO: DRY code (carefully with the return statement logic)
if err := saver.InsertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
@ -82,8 +95,9 @@ func main() {
consumer := queue.NewMessageConsumer(
cfg.GroupDB,
[]string{
cfg.TopicRawWeb, // TODO: is it necessary or not?
cfg.TopicRawIOS,
cfg.TopicTrigger,
cfg.TopicTrigger, // to receive SessionEnd events
},
handler,
false,
@ -94,19 +108,22 @@ func main() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
tick := time.Tick(cfg.CommitBatchTimeout)
commitTick := time.Tick(cfg.CommitBatchTimeout)
checkTick := time.Tick(intervals.EVENTS_COMMIT_INTERVAL * time.Millisecond)
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
consumer.Close()
os.Exit(0)
case <-tick:
case <-commitTick:
pg.CommitBatches()
// TODO?: separate stats & regular messages
if err := consumer.Commit(); err != nil {
log.Printf("Error on consumer commit: %v", err)
}
case <-checkTick:
// checkTimeout
default:
err := consumer.ConsumeNext()
if err != nil {

View file

@ -54,6 +54,7 @@ func main() {
[]string{
cfg.TopicRawWeb,
cfg.TopicRawIOS,
cfg.TopicTrigger, // to receive SessionEnd events
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
statsLogger.Collect(sessionID, meta)

View file

@ -10,6 +10,7 @@ type builder struct {
readyMsgs []Message
timestamp uint64
processors []handlers.MessageProcessor
ended bool
}
func NewBuilder(handlers ...handlers.MessageProcessor) *builder {
@ -35,6 +36,13 @@ func (b *builder) handleMessage(message Message, messageID uint64) {
return
}
if _, isEnd := message.(*IOSSessionEnd); isEnd {
b.ended = true
}
if _, isEnd := message.(*SessionEnd); isEnd {
b.ended = true
}
for _, p := range b.processors {
/* If nil is not returned explicitely by Handle, but as the typed nil
("var i *IssueEvent; return i;")

View file

@ -42,3 +42,14 @@ func (m *builderMap) IterateReadyMessages(operatingTs int64, iter func(sessionID
}
}
}
func (m *builderMap) IterateSessionReadyMessages(sessionID uint64, iter func(msg Message)) {
session, ok := m.sessions[sessionID]
if !ok {
return
}
session.iterateReadyMessage(iter)
if session.ended {
delete(m.sessions, sessionID)
}
}

View file

@ -10,6 +10,7 @@ type Config struct {
ProjectExpirationTimeoutMs int64
LoggerTimeout int
GroupDB string
TopicRawWeb string
TopicRawIOS string
TopicTrigger string
CommitBatchTimeout time.Duration
@ -21,6 +22,7 @@ func New() *Config {
ProjectExpirationTimeoutMs: 1000 * 60 * 20,
LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"),
GroupDB: env.String("GROUP_DB"),
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
TopicRawIOS: env.String("TOPIC_RAW_IOS"),
TopicTrigger: env.String("TOPIC_TRIGGER"),
CommitBatchTimeout: 15 * time.Second,

View file

@ -1,4 +1,4 @@
package heuristics
package custom
import (
. "openreplay/backend/pkg/messages"
@ -12,6 +12,17 @@ type inputEventBuilder struct {
inputID uint64
}
func (b *inputEventBuilder) Handle(message Message, messageID uint64, timestamp uint64) Message {
//TODO implement me
panic("implement me")
}
func (b *inputEventBuilder) Build() Message {
// b.build()
//TODO implement me
panic("implement me")
}
func NewInputEventBuilder() *inputEventBuilder {
ieBuilder := &inputEventBuilder{}
ieBuilder.ClearLabels()
@ -25,7 +36,7 @@ func (b *inputEventBuilder) ClearLabels() {
func (b *inputEventBuilder) HandleSetInputTarget(msg *SetInputTarget) *InputEvent {
var inputEvent *InputEvent
if b.inputID != msg.ID {
inputEvent = b.Build()
inputEvent = b.build()
b.inputID = msg.ID
}
b.inputLabels[msg.ID] = msg.Label
@ -35,7 +46,7 @@ func (b *inputEventBuilder) HandleSetInputTarget(msg *SetInputTarget) *InputEven
func (b *inputEventBuilder) HandleSetInputValue(msg *SetInputValue, messageID uint64, timestamp uint64) *InputEvent {
var inputEvent *InputEvent
if b.inputID != msg.ID {
inputEvent = b.Build()
inputEvent = b.build()
b.inputID = msg.ID
}
if b.inputEvent == nil {
@ -63,7 +74,7 @@ func (b *inputEventBuilder) GetTimestamp() uint64 {
return b.inputEvent.Timestamp
}
func (b *inputEventBuilder) Build() *InputEvent {
func (b *inputEventBuilder) build() *InputEvent {
if b.inputEvent == nil {
return nil
}

View file

@ -0,0 +1,288 @@
package custom
import (
"net/url"
"openreplay/backend/pkg/intervals"
"strings"
"time"
. "openreplay/backend/pkg/messages"
)
func getURLExtention(URL string) string {
u, err := url.Parse(URL)
if err != nil {
return ""
}
i := strings.LastIndex(u.Path, ".")
return u.Path[i+1:]
}
func getResourceType(initiator string, URL string) string {
switch initiator {
case "xmlhttprequest", "fetch":
return "fetch"
case "img":
return "img"
default:
switch getURLExtention(URL) {
case "css":
return "stylesheet"
case "js":
return "script"
case "png", "gif", "jpg", "jpeg", "svg":
return "img"
case "mp4", "mkv", "ogg", "webm", "avi", "mp3":
return "media"
default:
return "other"
}
}
}
type builder struct {
readyMsgs []Message
timestamp uint64
lastProcessedTimestamp int64
peBuilder *pageEventBuilder
ieBuilder *inputEventBuilder
integrationsWaiting bool
sid uint64
}
func (b *builder) Build() Message {
//TODO implement me
panic("implement me")
}
func NewMainHandler() *builder {
return &builder{
peBuilder: &pageEventBuilder{},
ieBuilder: NewInputEventBuilder(),
integrationsWaiting: true,
}
}
func (b *builder) appendReadyMessage(msg Message) { // interface is never nil even if it holds nil value
b.readyMsgs = append(b.readyMsgs, msg)
}
func (b *builder) iterateReadyMessage(iter func(msg Message)) {
for _, readyMsg := range b.readyMsgs {
iter(readyMsg)
}
b.readyMsgs = nil
}
func (b *builder) buildPageEvent() {
if msg := b.peBuilder.Build(); msg != nil {
b.appendReadyMessage(msg)
}
}
func (b *builder) buildInputEvent() {
if msg := b.ieBuilder.Build(); msg != nil {
b.appendReadyMessage(msg)
}
}
func (b *builder) Handle(message Message, messageID uint64, timestamp uint64) Message {
b.timestamp = timestamp
b.lastProcessedTimestamp = time.Now().UnixMilli()
// Might happen before the first timestamp.
switch msg := message.(type) {
case *SessionStart,
*Metadata,
*UserID,
*UserAnonymousID:
b.appendReadyMessage(msg)
case *RawErrorEvent:
b.appendReadyMessage(&ErrorEvent{
MessageID: messageID,
Timestamp: msg.Timestamp,
Source: msg.Source,
Name: msg.Name,
Message: msg.Message,
Payload: msg.Payload,
})
}
if b.timestamp == 0 {
return nil
}
switch msg := message.(type) {
case *SetPageLocation:
if msg.NavigationStart == 0 {
b.appendReadyMessage(&PageEvent{
URL: msg.URL,
Referrer: msg.Referrer,
Loaded: false,
MessageID: messageID,
Timestamp: b.timestamp,
})
} else {
b.buildPageEvent()
b.buildInputEvent()
b.ieBuilder.ClearLabels()
b.peBuilder.HandleSetPageLocation(msg, messageID, b.timestamp)
// TODO: what to do with this code?
//b.miFinder.HandleSetPageLocation(msg)
//b.ciFinder.HandleSetPageLocation(msg)
}
case *PageLoadTiming:
if rm := b.peBuilder.HandlePageLoadTiming(msg); rm != nil {
b.appendReadyMessage(rm)
}
case *PageRenderTiming:
if rm := b.peBuilder.HandlePageRenderTiming(msg); rm != nil {
b.appendReadyMessage(rm)
}
case *PerformanceTrack:
// TODO: what to do with this code?
//if rm := b.ptaBuilder.HandlePerformanceTrack(msg, b.timestamp); rm != nil {
// b.appendReadyMessage(rm)
//}
//if rm := b.ciFinder.HandlePerformanceTrack(msg, messageID, b.timestamp); rm != nil {
// b.appendReadyMessage(rm)
//}
//if rm := b.miFinder.HandlePerformanceTrack(msg, messageID, b.timestamp); rm != nil {
// b.appendReadyMessage(rm)
//}
case *SetInputTarget:
if rm := b.ieBuilder.HandleSetInputTarget(msg); rm != nil {
b.appendReadyMessage(rm)
}
case *SetInputValue:
if rm := b.ieBuilder.HandleSetInputValue(msg, messageID, b.timestamp); rm != nil {
b.appendReadyMessage(rm)
}
case *MouseClick:
b.buildInputEvent()
// TODO: what to do with this code?
//if rm := b.crDetector.HandleMouseClick(msg, messageID, b.timestamp); rm != nil {
// b.appendReadyMessage(rm)
//}
if msg.Label != "" {
b.appendReadyMessage(&ClickEvent{
MessageID: messageID,
Label: msg.Label,
HesitationTime: msg.HesitationTime,
Timestamp: b.timestamp,
Selector: msg.Selector,
})
}
case *JSException:
b.appendReadyMessage(&ErrorEvent{
MessageID: messageID,
Timestamp: b.timestamp,
Source: "js_exception",
Name: msg.Name,
Message: msg.Message,
Payload: msg.Payload,
})
case *ResourceTiming:
tp := getResourceType(msg.Initiator, msg.URL)
success := msg.Duration != 0
b.appendReadyMessage(&ResourceEvent{
MessageID: messageID,
Timestamp: msg.Timestamp,
Duration: msg.Duration,
TTFB: msg.TTFB,
HeaderSize: msg.HeaderSize,
EncodedBodySize: msg.EncodedBodySize,
DecodedBodySize: msg.DecodedBodySize,
URL: msg.URL,
Type: tp,
Success: success,
})
if !success {
issueType := "missing_resource"
if tp == "fetch" {
issueType = "bad_request"
}
b.appendReadyMessage(&IssueEvent{
Type: issueType,
MessageID: messageID,
Timestamp: msg.Timestamp,
ContextString: msg.URL,
})
}
case *RawCustomEvent:
b.appendReadyMessage(&CustomEvent{
MessageID: messageID,
Timestamp: b.timestamp,
Name: msg.Name,
Payload: msg.Payload,
})
case *CustomIssue:
b.appendReadyMessage(&IssueEvent{
Type: "custom",
Timestamp: b.timestamp,
MessageID: messageID,
ContextString: msg.Name,
Payload: msg.Payload,
})
case *Fetch:
b.appendReadyMessage(&FetchEvent{
MessageID: messageID,
Timestamp: msg.Timestamp,
Method: msg.Method,
URL: msg.URL,
Request: msg.Request,
Response: msg.Response,
Status: msg.Status,
Duration: msg.Duration,
})
if msg.Status >= 400 {
b.appendReadyMessage(&IssueEvent{
Type: "bad_request",
MessageID: messageID,
Timestamp: msg.Timestamp,
ContextString: msg.URL,
})
}
case *GraphQL:
b.appendReadyMessage(&GraphQLEvent{
MessageID: messageID,
Timestamp: b.timestamp,
OperationKind: msg.OperationKind,
OperationName: msg.OperationName,
Variables: msg.Variables,
Response: msg.Response,
})
case *StateAction:
b.appendReadyMessage(&StateActionEvent{
MessageID: messageID,
Timestamp: b.timestamp,
Type: msg.Type,
})
// TODO: what to do with this code?
//case *CreateElementNode, *CreateTextNode:
// b.ddDetector.HandleNodeCreation()
//case *RemoveNode:
// b.ddDetector.HandleNodeRemoval(b.timestamp)
//case *CreateDocument:
// if rm := b.ddDetector.Build(); rm != nil {
// b.appendReadyMessage(rm)
// }
}
// TODO: what to do with this code?
//if rm := b.dcDetector.HandleMessage(message, messageID, b.timestamp); rm != nil {
// b.appendReadyMessage(rm)
//}
return nil
}
func (b *builder) checkTimeouts(ts int64) bool {
if b.timestamp == 0 {
return false // There was no timestamp events yet
}
if b.peBuilder.HasInstance() && int64(b.peBuilder.GetTimestamp())+intervals.EVENTS_PAGE_EVENT_TIMEOUT < ts {
b.buildPageEvent()
}
if b.ieBuilder.HasInstance() && int64(b.ieBuilder.GetTimestamp())+intervals.EVENTS_INPUT_EVENT_TIMEOUT < ts {
b.buildInputEvent()
}
return false
}

View file

@ -1,4 +1,4 @@
package heuristics
package custom
import (
. "openreplay/backend/pkg/messages"
@ -9,9 +9,25 @@ type pageEventBuilder struct {
firstTimingHandled bool
}
func (b *pageEventBuilder) Handle(message Message, messageID uint64, timestamp uint64) Message {
//TODO implement me
panic("implement me")
}
func (b *pageEventBuilder) Build() Message {
// b.build()
//TODO implement me
panic("implement me")
}
func NewPageEventBuilder() *pageEventBuilder {
ieBuilder := &pageEventBuilder{}
return ieBuilder
}
func (b *pageEventBuilder) buildIfTimingsComplete() *PageEvent {
if b.firstTimingHandled {
return b.Build()
return b.build()
}
b.firstTimingHandled = true
return nil
@ -83,7 +99,7 @@ func (b *pageEventBuilder) GetTimestamp() uint64 {
return b.pageEvent.Timestamp
}
func (b *pageEventBuilder) Build() *PageEvent {
func (b *pageEventBuilder) build() *PageEvent {
pageEvent := b.pageEvent
b.pageEvent = nil
b.firstTimingHandled = false

View file

@ -1,36 +0,0 @@
package heuristics
import (
. "openreplay/backend/pkg/messages"
)
type mainHandler map[uint64]*sessHandler
func NewHandler() mainHandler {
return make(mainHandler)
}
func (m mainHandler) getSessHandler(sessionID uint64) *sessHandler {
s := m[sessionID]
if s == nil {
s = newSessHandler()
m[sessionID] = s
}
return s
}
func (m mainHandler) HandleMessage(sessionID uint64, msg Message) {
s := m.getSessHandler(sessionID)
s.HandleMessage(msg)
}
func (m mainHandler) IterateSessionReadyMessages(sessionID uint64, iter func(msg Message)) {
s, ok := m[sessionID]
if !ok {
return
}
s.IterateReadyMessages(iter)
if s.IsEnded() {
delete(m, sessionID)
}
}

View file

@ -1,43 +0,0 @@
package heuristics
import (
. "openreplay/backend/pkg/messages"
)
type Handler interface {
HandleMessage(Message)
IterateReadyMessages(func(Message))
}
type sessHandler struct {
handlers []Handler
ended bool
}
func newSessHandler() *sessHandler {
return &sessHandler{
handlers: []Handler{},
}
}
func (s *sessHandler) HandleMessage(msg Message) {
for _, h := range s.handlers {
h.HandleMessage(msg)
}
if _, isEnd := msg.(*IOSSessionEnd); isEnd {
s.ended = true
}
if _, isEnd := msg.(*SessionEnd); isEnd {
s.ended = true
}
}
func (s *sessHandler) IterateReadyMessages(cb func(msg Message)) {
for _, h := range s.handlers {
h.IterateReadyMessages(cb)
}
}
func (s *sessHandler) IsEnded() bool {
return s.ended
}