Removed global pg connection

This commit is contained in:
Alexander Zavorotynskiy 2022-05-04 14:21:15 +02:00
parent a4278aec23
commit 9cdb1e8ab7
3 changed files with 53 additions and 38 deletions

View file

@ -18,16 +18,17 @@ import (
"openreplay/backend/services/db/heuristics"
)
var pg *cache.PGCache
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
initStats()
pg = cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000*60*20)
// Init database
pg := cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000*60*20)
defer pg.Close()
// Init modules
heurFinder := heuristics.NewHandler()
mi := NewMessageInserter(pg)
si := NewStatsInserter(pg)
statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"))
@ -40,7 +41,7 @@ func main() {
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
statsLogger.HandleAndLog(sessionID, meta)
if err := insertMessage(sessionID, msg); err != nil {
if err := mi.insertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg)
}
@ -54,7 +55,7 @@ func main() {
return
}
err = insertStats(session, msg)
err = si.insertStats(session, msg)
if err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg)
}
@ -62,14 +63,14 @@ func main() {
heurFinder.HandleMessage(session, msg)
heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) {
// TODO: DRY code (carefully with the return statement logic)
if err := insertMessage(sessionID, msg); err != nil {
if err := mi.insertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
return
}
if err := insertStats(session, msg); err != nil {
if err := si.insertStats(session, msg); err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
})

View file

@ -1,68 +1,77 @@
package main
import (
"openreplay/backend/pkg/db/cache"
. "openreplay/backend/pkg/messages"
)
func insertMessage(sessionID uint64, msg Message) error {
type MessageInserter struct {
pg *cache.PGCache
}
func NewMessageInserter(pg *cache.PGCache) *MessageInserter {
return &MessageInserter{pg: pg}
}
func (mi *MessageInserter) insertMessage(sessionID uint64, msg Message) error {
switch m := msg.(type) {
// Common
case *Metadata:
return pg.InsertMetadata(sessionID, m)
return mi.pg.InsertMetadata(sessionID, m)
case *IssueEvent:
return pg.InsertIssueEvent(sessionID, m)
return mi.pg.InsertIssueEvent(sessionID, m)
//TODO: message adapter (transformer) (at the level of pkg/message) for types:
// case *IOSMetadata, *IOSIssueEvent and others
// Web
case *SessionStart:
return pg.InsertWebSessionStart(sessionID, m)
return mi.pg.InsertWebSessionStart(sessionID, m)
case *SessionEnd:
return pg.InsertWebSessionEnd(sessionID, m)
return mi.pg.InsertWebSessionEnd(sessionID, m)
case *UserID:
return pg.InsertWebUserID(sessionID, m)
return mi.pg.InsertWebUserID(sessionID, m)
case *UserAnonymousID:
return pg.InsertWebUserAnonymousID(sessionID, m)
return mi.pg.InsertWebUserAnonymousID(sessionID, m)
case *CustomEvent:
return pg.InsertWebCustomEvent(sessionID, m)
return mi.pg.InsertWebCustomEvent(sessionID, m)
case *ClickEvent:
return pg.InsertWebClickEvent(sessionID, m)
return mi.pg.InsertWebClickEvent(sessionID, m)
case *InputEvent:
return pg.InsertWebInputEvent(sessionID, m)
return mi.pg.InsertWebInputEvent(sessionID, m)
// Unique Web messages
// case *ResourceEvent:
// return pg.InsertWebResourceEvent(sessionID, m)
case *PageEvent:
return pg.InsertWebPageEvent(sessionID, m)
return mi.pg.InsertWebPageEvent(sessionID, m)
case *ErrorEvent:
return pg.InsertWebErrorEvent(sessionID, m)
return mi.pg.InsertWebErrorEvent(sessionID, m)
case *FetchEvent:
return pg.InsertWebFetchEvent(sessionID, m)
return mi.pg.InsertWebFetchEvent(sessionID, m)
case *GraphQLEvent:
return pg.InsertWebGraphQLEvent(sessionID, m)
return mi.pg.InsertWebGraphQLEvent(sessionID, m)
// IOS
case *IOSSessionStart:
return pg.InsertIOSSessionStart(sessionID, m)
return mi.pg.InsertIOSSessionStart(sessionID, m)
case *IOSSessionEnd:
return pg.InsertIOSSessionEnd(sessionID, m)
return mi.pg.InsertIOSSessionEnd(sessionID, m)
case *IOSUserID:
return pg.InsertIOSUserID(sessionID, m)
return mi.pg.InsertIOSUserID(sessionID, m)
case *IOSUserAnonymousID:
return pg.InsertIOSUserAnonymousID(sessionID, m)
return mi.pg.InsertIOSUserAnonymousID(sessionID, m)
case *IOSCustomEvent:
return pg.InsertIOSCustomEvent(sessionID, m)
return mi.pg.InsertIOSCustomEvent(sessionID, m)
case *IOSClickEvent:
return pg.InsertIOSClickEvent(sessionID, m)
return mi.pg.InsertIOSClickEvent(sessionID, m)
case *IOSInputEvent:
return pg.InsertIOSInputEvent(sessionID, m)
return mi.pg.InsertIOSInputEvent(sessionID, m)
// Unique IOS messages
case *IOSNetworkCall:
return pg.InsertIOSNetworkCall(sessionID, m)
return mi.pg.InsertIOSNetworkCall(sessionID, m)
case *IOSScreenEnter:
return pg.InsertIOSScreenEnter(sessionID, m)
return mi.pg.InsertIOSScreenEnter(sessionID, m)
case *IOSCrash:
return pg.InsertIOSCrash(sessionID, m)
return mi.pg.InsertIOSCrash(sessionID, m)
}
return nil // "Not implemented"
}

View file

@ -1,23 +1,28 @@
package main
import (
"openreplay/backend/pkg/db/cache"
. "openreplay/backend/pkg/db/types"
. "openreplay/backend/pkg/messages"
)
func initStats() {
// noop
type StatsInserter struct {
pg *cache.PGCache
}
func insertStats(session *Session, msg Message) error {
func NewStatsInserter(pg *cache.PGCache) *StatsInserter {
return &StatsInserter{pg: pg}
}
func (si *StatsInserter) insertStats(session *Session, msg Message) error {
switch m := msg.(type) {
// Web
case *PerformanceTrackAggr:
return pg.InsertWebStatsPerformance(session.SessionID, m)
return si.pg.InsertWebStatsPerformance(session.SessionID, m)
case *ResourceEvent:
return pg.InsertWebStatsResourceEvent(session.SessionID, m)
return si.pg.InsertWebStatsResourceEvent(session.SessionID, m)
case *LongTask:
return pg.InsertWebStatsLongtask(session.SessionID, m)
return si.pg.InsertWebStatsLongtask(session.SessionID, m)
// IOS
// case *IOSPerformanceAggregated: