diff --git a/backend/services/db/main.go b/backend/services/db/main.go index 2ad6e4aa8..50e160342 100644 --- a/backend/services/db/main.go +++ b/backend/services/db/main.go @@ -18,16 +18,17 @@ import ( "openreplay/backend/services/db/heuristics" ) -var pg *cache.PGCache - func main() { log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) - initStats() - pg = cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000*60*20) + // Init database + pg := cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000*60*20) defer pg.Close() + // Init modules heurFinder := heuristics.NewHandler() + mi := NewMessageInserter(pg) + si := NewStatsInserter(pg) statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC")) @@ -40,7 +41,7 @@ func main() { func(sessionID uint64, msg messages.Message, meta *types.Meta) { statsLogger.HandleAndLog(sessionID, meta) - if err := insertMessage(sessionID, msg); err != nil { + if err := mi.insertMessage(sessionID, msg); err != nil { if !postgres.IsPkeyViolation(err) { log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg) } @@ -54,7 +55,7 @@ func main() { return } - err = insertStats(session, msg) + err = si.insertStats(session, msg) if err != nil { log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg) } @@ -62,14 +63,14 @@ func main() { heurFinder.HandleMessage(session, msg) heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) { // TODO: DRY code (carefully with the return statement logic) - if err := insertMessage(sessionID, msg); err != nil { + if err := mi.insertMessage(sessionID, msg); err != nil { if !postgres.IsPkeyViolation(err) { log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg) } return } - if err := insertStats(session, msg); err != nil { + if err := si.insertStats(session, msg); err != nil { log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg) } }) diff --git a/backend/services/db/messages.go b/backend/services/db/messages.go index d3e4ae1ed..2adca61c8 100644 --- a/backend/services/db/messages.go +++ b/backend/services/db/messages.go @@ -1,68 +1,77 @@ package main import ( + "openreplay/backend/pkg/db/cache" . "openreplay/backend/pkg/messages" ) -func insertMessage(sessionID uint64, msg Message) error { +type MessageInserter struct { + pg *cache.PGCache +} + +func NewMessageInserter(pg *cache.PGCache) *MessageInserter { + return &MessageInserter{pg: pg} +} + +func (mi *MessageInserter) insertMessage(sessionID uint64, msg Message) error { switch m := msg.(type) { // Common case *Metadata: - return pg.InsertMetadata(sessionID, m) + return mi.pg.InsertMetadata(sessionID, m) case *IssueEvent: - return pg.InsertIssueEvent(sessionID, m) + return mi.pg.InsertIssueEvent(sessionID, m) //TODO: message adapter (transformer) (at the level of pkg/message) for types: // case *IOSMetadata, *IOSIssueEvent and others // Web case *SessionStart: - return pg.InsertWebSessionStart(sessionID, m) + return mi.pg.InsertWebSessionStart(sessionID, m) case *SessionEnd: - return pg.InsertWebSessionEnd(sessionID, m) + return mi.pg.InsertWebSessionEnd(sessionID, m) case *UserID: - return pg.InsertWebUserID(sessionID, m) + return mi.pg.InsertWebUserID(sessionID, m) case *UserAnonymousID: - return pg.InsertWebUserAnonymousID(sessionID, m) + return mi.pg.InsertWebUserAnonymousID(sessionID, m) case *CustomEvent: - return pg.InsertWebCustomEvent(sessionID, m) + return mi.pg.InsertWebCustomEvent(sessionID, m) case *ClickEvent: - return pg.InsertWebClickEvent(sessionID, m) + return mi.pg.InsertWebClickEvent(sessionID, m) case *InputEvent: - return pg.InsertWebInputEvent(sessionID, m) + return mi.pg.InsertWebInputEvent(sessionID, m) // Unique Web messages // case *ResourceEvent: // return pg.InsertWebResourceEvent(sessionID, m) case *PageEvent: - return pg.InsertWebPageEvent(sessionID, m) + return mi.pg.InsertWebPageEvent(sessionID, m) case *ErrorEvent: - return pg.InsertWebErrorEvent(sessionID, m) + return mi.pg.InsertWebErrorEvent(sessionID, m) case *FetchEvent: - return pg.InsertWebFetchEvent(sessionID, m) + return mi.pg.InsertWebFetchEvent(sessionID, m) case *GraphQLEvent: - return pg.InsertWebGraphQLEvent(sessionID, m) + return mi.pg.InsertWebGraphQLEvent(sessionID, m) // IOS case *IOSSessionStart: - return pg.InsertIOSSessionStart(sessionID, m) + return mi.pg.InsertIOSSessionStart(sessionID, m) case *IOSSessionEnd: - return pg.InsertIOSSessionEnd(sessionID, m) + return mi.pg.InsertIOSSessionEnd(sessionID, m) case *IOSUserID: - return pg.InsertIOSUserID(sessionID, m) + return mi.pg.InsertIOSUserID(sessionID, m) case *IOSUserAnonymousID: - return pg.InsertIOSUserAnonymousID(sessionID, m) + return mi.pg.InsertIOSUserAnonymousID(sessionID, m) case *IOSCustomEvent: - return pg.InsertIOSCustomEvent(sessionID, m) + return mi.pg.InsertIOSCustomEvent(sessionID, m) case *IOSClickEvent: - return pg.InsertIOSClickEvent(sessionID, m) + return mi.pg.InsertIOSClickEvent(sessionID, m) case *IOSInputEvent: - return pg.InsertIOSInputEvent(sessionID, m) + return mi.pg.InsertIOSInputEvent(sessionID, m) // Unique IOS messages case *IOSNetworkCall: - return pg.InsertIOSNetworkCall(sessionID, m) + return mi.pg.InsertIOSNetworkCall(sessionID, m) case *IOSScreenEnter: - return pg.InsertIOSScreenEnter(sessionID, m) + return mi.pg.InsertIOSScreenEnter(sessionID, m) case *IOSCrash: - return pg.InsertIOSCrash(sessionID, m) + return mi.pg.InsertIOSCrash(sessionID, m) } return nil // "Not implemented" } diff --git a/backend/services/db/stats.go b/backend/services/db/stats.go index 2c3a5da38..7069b8656 100644 --- a/backend/services/db/stats.go +++ b/backend/services/db/stats.go @@ -1,23 +1,28 @@ package main import ( + "openreplay/backend/pkg/db/cache" . "openreplay/backend/pkg/db/types" . "openreplay/backend/pkg/messages" ) -func initStats() { - // noop +type StatsInserter struct { + pg *cache.PGCache } -func insertStats(session *Session, msg Message) error { +func NewStatsInserter(pg *cache.PGCache) *StatsInserter { + return &StatsInserter{pg: pg} +} + +func (si *StatsInserter) insertStats(session *Session, msg Message) error { switch m := msg.(type) { // Web case *PerformanceTrackAggr: - return pg.InsertWebStatsPerformance(session.SessionID, m) + return si.pg.InsertWebStatsPerformance(session.SessionID, m) case *ResourceEvent: - return pg.InsertWebStatsResourceEvent(session.SessionID, m) + return si.pg.InsertWebStatsResourceEvent(session.SessionID, m) case *LongTask: - return pg.InsertWebStatsLongtask(session.SessionID, m) + return si.pg.InsertWebStatsLongtask(session.SessionID, m) // IOS // case *IOSPerformanceAggregated: