From 9cdb1e8ab753a4fea695bd13e4348f88c32b027f Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Wed, 4 May 2022 14:21:15 +0200 Subject: [PATCH 1/7] Removed global pg connection --- backend/services/db/main.go | 17 +++++----- backend/services/db/messages.go | 57 +++++++++++++++++++-------------- backend/services/db/stats.go | 17 ++++++---- 3 files changed, 53 insertions(+), 38 deletions(-) diff --git a/backend/services/db/main.go b/backend/services/db/main.go index 2ad6e4aa8..50e160342 100644 --- a/backend/services/db/main.go +++ b/backend/services/db/main.go @@ -18,16 +18,17 @@ import ( "openreplay/backend/services/db/heuristics" ) -var pg *cache.PGCache - func main() { log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) - initStats() - pg = cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000*60*20) + // Init database + pg := cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000*60*20) defer pg.Close() + // Init modules heurFinder := heuristics.NewHandler() + mi := NewMessageInserter(pg) + si := NewStatsInserter(pg) statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC")) @@ -40,7 +41,7 @@ func main() { func(sessionID uint64, msg messages.Message, meta *types.Meta) { statsLogger.HandleAndLog(sessionID, meta) - if err := insertMessage(sessionID, msg); err != nil { + if err := mi.insertMessage(sessionID, msg); err != nil { if !postgres.IsPkeyViolation(err) { log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg) } @@ -54,7 +55,7 @@ func main() { return } - err = insertStats(session, msg) + err = si.insertStats(session, msg) if err != nil { log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg) } @@ -62,14 +63,14 @@ func main() { heurFinder.HandleMessage(session, msg) heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) { // TODO: DRY code (carefully with the return statement logic) - if err := insertMessage(sessionID, msg); err != nil { + if err := mi.insertMessage(sessionID, msg); err != nil { if !postgres.IsPkeyViolation(err) { log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg) } return } - if err := insertStats(session, msg); err != nil { + if err := si.insertStats(session, msg); err != nil { log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg) } }) diff --git a/backend/services/db/messages.go b/backend/services/db/messages.go index d3e4ae1ed..2adca61c8 100644 --- a/backend/services/db/messages.go +++ b/backend/services/db/messages.go @@ -1,68 +1,77 @@ package main import ( + "openreplay/backend/pkg/db/cache" . "openreplay/backend/pkg/messages" ) -func insertMessage(sessionID uint64, msg Message) error { +type MessageInserter struct { + pg *cache.PGCache +} + +func NewMessageInserter(pg *cache.PGCache) *MessageInserter { + return &MessageInserter{pg: pg} +} + +func (mi *MessageInserter) insertMessage(sessionID uint64, msg Message) error { switch m := msg.(type) { // Common case *Metadata: - return pg.InsertMetadata(sessionID, m) + return mi.pg.InsertMetadata(sessionID, m) case *IssueEvent: - return pg.InsertIssueEvent(sessionID, m) + return mi.pg.InsertIssueEvent(sessionID, m) //TODO: message adapter (transformer) (at the level of pkg/message) for types: // case *IOSMetadata, *IOSIssueEvent and others // Web case *SessionStart: - return pg.InsertWebSessionStart(sessionID, m) + return mi.pg.InsertWebSessionStart(sessionID, m) case *SessionEnd: - return pg.InsertWebSessionEnd(sessionID, m) + return mi.pg.InsertWebSessionEnd(sessionID, m) case *UserID: - return pg.InsertWebUserID(sessionID, m) + return mi.pg.InsertWebUserID(sessionID, m) case *UserAnonymousID: - return pg.InsertWebUserAnonymousID(sessionID, m) + return mi.pg.InsertWebUserAnonymousID(sessionID, m) case *CustomEvent: - return pg.InsertWebCustomEvent(sessionID, m) + return mi.pg.InsertWebCustomEvent(sessionID, m) case *ClickEvent: - return pg.InsertWebClickEvent(sessionID, m) + return mi.pg.InsertWebClickEvent(sessionID, m) case *InputEvent: - return pg.InsertWebInputEvent(sessionID, m) + return mi.pg.InsertWebInputEvent(sessionID, m) // Unique Web messages // case *ResourceEvent: // return pg.InsertWebResourceEvent(sessionID, m) case *PageEvent: - return pg.InsertWebPageEvent(sessionID, m) + return mi.pg.InsertWebPageEvent(sessionID, m) case *ErrorEvent: - return pg.InsertWebErrorEvent(sessionID, m) + return mi.pg.InsertWebErrorEvent(sessionID, m) case *FetchEvent: - return pg.InsertWebFetchEvent(sessionID, m) + return mi.pg.InsertWebFetchEvent(sessionID, m) case *GraphQLEvent: - return pg.InsertWebGraphQLEvent(sessionID, m) + return mi.pg.InsertWebGraphQLEvent(sessionID, m) // IOS case *IOSSessionStart: - return pg.InsertIOSSessionStart(sessionID, m) + return mi.pg.InsertIOSSessionStart(sessionID, m) case *IOSSessionEnd: - return pg.InsertIOSSessionEnd(sessionID, m) + return mi.pg.InsertIOSSessionEnd(sessionID, m) case *IOSUserID: - return pg.InsertIOSUserID(sessionID, m) + return mi.pg.InsertIOSUserID(sessionID, m) case *IOSUserAnonymousID: - return pg.InsertIOSUserAnonymousID(sessionID, m) + return mi.pg.InsertIOSUserAnonymousID(sessionID, m) case *IOSCustomEvent: - return pg.InsertIOSCustomEvent(sessionID, m) + return mi.pg.InsertIOSCustomEvent(sessionID, m) case *IOSClickEvent: - return pg.InsertIOSClickEvent(sessionID, m) + return mi.pg.InsertIOSClickEvent(sessionID, m) case *IOSInputEvent: - return pg.InsertIOSInputEvent(sessionID, m) + return mi.pg.InsertIOSInputEvent(sessionID, m) // Unique IOS messages case *IOSNetworkCall: - return pg.InsertIOSNetworkCall(sessionID, m) + return mi.pg.InsertIOSNetworkCall(sessionID, m) case *IOSScreenEnter: - return pg.InsertIOSScreenEnter(sessionID, m) + return mi.pg.InsertIOSScreenEnter(sessionID, m) case *IOSCrash: - return pg.InsertIOSCrash(sessionID, m) + return mi.pg.InsertIOSCrash(sessionID, m) } return nil // "Not implemented" } diff --git a/backend/services/db/stats.go b/backend/services/db/stats.go index 2c3a5da38..7069b8656 100644 --- a/backend/services/db/stats.go +++ b/backend/services/db/stats.go @@ -1,23 +1,28 @@ package main import ( + "openreplay/backend/pkg/db/cache" . "openreplay/backend/pkg/db/types" . "openreplay/backend/pkg/messages" ) -func initStats() { - // noop +type StatsInserter struct { + pg *cache.PGCache } -func insertStats(session *Session, msg Message) error { +func NewStatsInserter(pg *cache.PGCache) *StatsInserter { + return &StatsInserter{pg: pg} +} + +func (si *StatsInserter) insertStats(session *Session, msg Message) error { switch m := msg.(type) { // Web case *PerformanceTrackAggr: - return pg.InsertWebStatsPerformance(session.SessionID, m) + return si.pg.InsertWebStatsPerformance(session.SessionID, m) case *ResourceEvent: - return pg.InsertWebStatsResourceEvent(session.SessionID, m) + return si.pg.InsertWebStatsResourceEvent(session.SessionID, m) case *LongTask: - return pg.InsertWebStatsLongtask(session.SessionID, m) + return si.pg.InsertWebStatsLongtask(session.SessionID, m) // IOS // case *IOSPerformanceAggregated: From 74672d4321cbaa67abc52cacd1bf9a11f27e52e0 Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Wed, 4 May 2022 14:36:42 +0200 Subject: [PATCH 2/7] Removed unused code --- backend/services/db/heuristics/anr.go | 2 -- backend/services/db/heuristics/clickrage.go | 2 -- backend/services/db/main.go | 3 --- backend/services/db/messages.go | 8 +++----- backend/services/db/stats.go | 10 ---------- 5 files changed, 3 insertions(+), 22 deletions(-) diff --git a/backend/services/db/heuristics/anr.go b/backend/services/db/heuristics/anr.go index 0475b00be..7cec8fc97 100644 --- a/backend/services/db/heuristics/anr.go +++ b/backend/services/db/heuristics/anr.go @@ -18,8 +18,6 @@ func (h *anr) buildIf(timestamp uint64) { m := &IOSIssueEvent{ Type: "anr", ContextString: h.lastLabel, - //Context: "{}", - //Payload: fmt.SPrint } m.Timestamp = h.lastHeartbeatTimestamp m.Index = h.lastHeartbeatIndex // Associated Index/ MessageID ? diff --git a/backend/services/db/heuristics/clickrage.go b/backend/services/db/heuristics/clickrage.go index 9a1db92d5..4d19bf92e 100644 --- a/backend/services/db/heuristics/clickrage.go +++ b/backend/services/db/heuristics/clickrage.go @@ -21,8 +21,6 @@ func (h *clickrage) build() { m := &IOSIssueEvent{ Type: "click_rage", ContextString: h.lastLabel, - //Context: "{}", - //Payload: fmt.SPrint } m.Timestamp = h.firstInARawTimestamp m.Index = h.firstInARawSeqIndex // Associated Index/ MessageID ? diff --git a/backend/services/db/main.go b/backend/services/db/main.go index 50e160342..600913120 100644 --- a/backend/services/db/main.go +++ b/backend/services/db/main.go @@ -92,9 +92,6 @@ func main() { os.Exit(0) case <-tick: pg.CommitBatches() - if err := commitStats(); err != nil { - log.Printf("Error on stats commit: %v", err) - } // TODO?: separate stats & regular messages if err := consumer.Commit(); err != nil { log.Printf("Error on consumer commit: %v", err) diff --git a/backend/services/db/messages.go b/backend/services/db/messages.go index 2adca61c8..1a583ae6a 100644 --- a/backend/services/db/messages.go +++ b/backend/services/db/messages.go @@ -20,8 +20,7 @@ func (mi *MessageInserter) insertMessage(sessionID uint64, msg Message) error { return mi.pg.InsertMetadata(sessionID, m) case *IssueEvent: return mi.pg.InsertIssueEvent(sessionID, m) - //TODO: message adapter (transformer) (at the level of pkg/message) for types: - // case *IOSMetadata, *IOSIssueEvent and others + //TODO: message adapter (transformer) (at the level of pkg/message) for types: *IOSMetadata, *IOSIssueEvent and others // Web case *SessionStart: @@ -38,9 +37,8 @@ func (mi *MessageInserter) insertMessage(sessionID uint64, msg Message) error { return mi.pg.InsertWebClickEvent(sessionID, m) case *InputEvent: return mi.pg.InsertWebInputEvent(sessionID, m) - // Unique Web messages - // case *ResourceEvent: - // return pg.InsertWebResourceEvent(sessionID, m) + + // Unique Web messages case *PageEvent: return mi.pg.InsertWebPageEvent(sessionID, m) case *ErrorEvent: diff --git a/backend/services/db/stats.go b/backend/services/db/stats.go index 7069b8656..a2c2024fb 100644 --- a/backend/services/db/stats.go +++ b/backend/services/db/stats.go @@ -23,16 +23,6 @@ func (si *StatsInserter) insertStats(session *Session, msg Message) error { return si.pg.InsertWebStatsResourceEvent(session.SessionID, m) case *LongTask: return si.pg.InsertWebStatsLongtask(session.SessionID, m) - - // IOS - // case *IOSPerformanceAggregated: - // return pg.InsertIOSPerformanceAggregated(session, m) - // case *IOSNetworkCall: - // return pg.InsertIOSNetworkCall(session, m) } return nil } - -func commitStats() error { - return nil -} From 5b7c479f4dfc5f5b2517d31baab86a2d06c15c10 Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Wed, 4 May 2022 16:17:57 +0200 Subject: [PATCH 3/7] Refactoring in stats logger --- backend/pkg/log/queue.go | 42 +++++++++-------- backend/services/db/main.go | 82 ++++++++++++++++++---------------- backend/services/ender/main.go | 2 +- 3 files changed, 68 insertions(+), 58 deletions(-) diff --git a/backend/pkg/log/queue.go b/backend/pkg/log/queue.go index 62e47cbbe..ced815bd2 100644 --- a/backend/pkg/log/queue.go +++ b/backend/pkg/log/queue.go @@ -5,7 +5,6 @@ import ( "log" "time" - "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue/types" //"openreplay/backend/pkg/env" ) @@ -18,6 +17,19 @@ type partitionStats struct { count int } +// Update partition statistic +func (prt *partitionStats) update(m *types.Meta) { + if prt.maxts < m.Timestamp { + prt.maxts = m.Timestamp + } + if prt.mints > m.Timestamp || prt.mints == 0 { + prt.mints = m.Timestamp + } + prt.lastts = m.Timestamp + prt.lastID = m.ID + prt.count += 1 +} + type queueStats struct { prts map[int32]*partitionStats tick <-chan time.Time @@ -30,43 +42,35 @@ func NewQueueStats(sec int) *queueStats { } } -func (qs *queueStats) HandleAndLog(sessionID uint64, m *types.Meta) { +// Collect writes new data to partition statistic +func (qs *queueStats) Collect(sessionID uint64, m *types.Meta) { prti := int32(sessionID % 16) // TODO use GetKeyPartition from kafka/key.go prt, ok := qs.prts[prti] if !ok { qs.prts[prti] = &partitionStats{} prt = qs.prts[prti] } - - if prt.maxts < m.Timestamp { - prt.maxts = m.Timestamp - } - if prt.mints > m.Timestamp || prt.mints == 0 { - prt.mints = m.Timestamp - } - prt.lastts = m.Timestamp - prt.lastID = m.ID - prt.count += 1 + prt.update(m) select { case <-qs.tick: - qs.LogThenReset() + qs.log() + qs.reset() default: } } -func (qs *queueStats) LogThenReset() { +// Print to console collected statistics +func (qs *queueStats) log() { s := "Queue Statistics: " for i, p := range qs.prts { s = fmt.Sprintf("%v | %v:: lastTS %v, lastID %v, count %v, maxTS %v, minTS %v", s, i, p.lastts, p.lastID, p.count, p.maxts, p.mints) } log.Println(s) - // reset - qs.prts = make(map[int32]*partitionStats) } -// TODO: list of message id to log (mb filter function with callback in messages/utils.go or something) -func LogMessage(s string, sessionID uint64, msg messages.Message, m *types.Meta) { - log.Printf("%v | SessionID: %v, Queue info: %v, Message: %v", s, sessionID, m, msg) +// Clear all queue partitions +func (qs *queueStats) reset() { + qs.prts = make(map[int32]*partitionStats) } diff --git a/backend/services/db/main.go b/backend/services/db/main.go index 600913120..619e6816e 100644 --- a/backend/services/db/main.go +++ b/backend/services/db/main.go @@ -29,52 +29,58 @@ func main() { heurFinder := heuristics.NewHandler() mi := NewMessageInserter(pg) si := NewStatsInserter(pg) - statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC")) + // Handler logic + handler := func(sessionID uint64, msg messages.Message, meta *types.Meta) { + statsLogger.Collect(sessionID, meta) + + // Just insert message into db without additional checks + if err := mi.insertMessage(sessionID, msg); err != nil { + if !postgres.IsPkeyViolation(err) { + log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg) + } + return + } + + // Try to get session from db + session, err := pg.GetSession(sessionID) + if err != nil { + // Might happen due to the assets-related message TODO: log only if session is necessary for this kind of message + log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, sessionID, msg) + return + } + + // Insert statistics + err = si.insertStats(session, msg) + if err != nil { + log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg) + } + + heurFinder.HandleMessage(session, msg) + heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) { + // TODO: DRY code (carefully with the return statement logic) + if err := mi.insertMessage(sessionID, msg); err != nil { + if !postgres.IsPkeyViolation(err) { + log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg) + } + return + } + + if err := si.insertStats(session, msg); err != nil { + log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg) + } + }) + } + + // Init consumer consumer := queue.NewMessageConsumer( env.String("GROUP_DB"), []string{ env.String("TOPIC_RAW_IOS"), env.String("TOPIC_TRIGGER"), }, - func(sessionID uint64, msg messages.Message, meta *types.Meta) { - statsLogger.HandleAndLog(sessionID, meta) - - if err := mi.insertMessage(sessionID, msg); err != nil { - if !postgres.IsPkeyViolation(err) { - log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg) - } - return - } - - session, err := pg.GetSession(sessionID) - if err != nil { - // Might happen due to the assets-related message TODO: log only if session is necessary for this kind of message - log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, sessionID, msg) - return - } - - err = si.insertStats(session, msg) - if err != nil { - log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg) - } - - heurFinder.HandleMessage(session, msg) - heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) { - // TODO: DRY code (carefully with the return statement logic) - if err := mi.insertMessage(sessionID, msg); err != nil { - if !postgres.IsPkeyViolation(err) { - log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg) - } - return - } - - if err := si.insertStats(session, msg); err != nil { - log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg) - } - }) - }, + handler, false, ) diff --git a/backend/services/ender/main.go b/backend/services/ender/main.go index f2430f3a0..4170a178e 100644 --- a/backend/services/ender/main.go +++ b/backend/services/ender/main.go @@ -35,7 +35,7 @@ func main() { env.String("TOPIC_RAW_IOS"), }, func(sessionID uint64, msg messages.Message, meta *types.Meta) { - statsLogger.HandleAndLog(sessionID, meta) + statsLogger.Collect(sessionID, meta) builderMap.HandleMessage(sessionID, msg, msg.Meta().Index) }, false, From 167d1e117e96c285f68611d4866246061b759f03 Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Thu, 5 May 2022 09:45:38 +0200 Subject: [PATCH 4/7] Made correct project layout --- backend/{services => cmd}/db/main.go | 20 +++++++++++-------- .../db => internal/datasaver}/messages.go | 4 ++-- .../db => internal/datasaver}/stats.go | 4 ++-- .../db => internal}/heuristics/anr.go | 0 .../db => internal}/heuristics/clickrage.go | 0 .../db => internal}/heuristics/heuristics.go | 0 .../db => internal}/heuristics/performance.go | 0 .../heuristics/readyMessageStore.go | 0 .../db => internal}/heuristics/session.go | 0 backend/services/db/build_hack | 0 10 files changed, 16 insertions(+), 12 deletions(-) rename backend/{services => cmd}/db/main.go (85%) rename backend/{services/db => internal/datasaver}/messages.go (96%) rename backend/{services/db => internal/datasaver}/stats.go (87%) rename backend/{services/db => internal}/heuristics/anr.go (100%) rename backend/{services/db => internal}/heuristics/clickrage.go (100%) rename backend/{services/db => internal}/heuristics/heuristics.go (100%) rename backend/{services/db => internal}/heuristics/performance.go (100%) rename backend/{services/db => internal}/heuristics/readyMessageStore.go (100%) rename backend/{services/db => internal}/heuristics/session.go (100%) create mode 100644 backend/services/db/build_hack diff --git a/backend/services/db/main.go b/backend/cmd/db/main.go similarity index 85% rename from backend/services/db/main.go rename to backend/cmd/db/main.go index 619e6816e..675c9f975 100644 --- a/backend/services/db/main.go +++ b/backend/cmd/db/main.go @@ -2,6 +2,8 @@ package main import ( "log" + "openreplay/backend/internal/datasaver" + "openreplay/backend/internal/heuristics" "time" "os" @@ -15,7 +17,6 @@ import ( "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue" "openreplay/backend/pkg/queue/types" - "openreplay/backend/services/db/heuristics" ) func main() { @@ -27,8 +28,8 @@ func main() { // Init modules heurFinder := heuristics.NewHandler() - mi := NewMessageInserter(pg) - si := NewStatsInserter(pg) + mi := datasaver.NewMessageInserter(pg) + si := datasaver.NewStatsInserter(pg) statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC")) // Handler logic @@ -36,14 +37,14 @@ func main() { statsLogger.Collect(sessionID, meta) // Just insert message into db without additional checks - if err := mi.insertMessage(sessionID, msg); err != nil { + if err := mi.InsertMessage(sessionID, msg); err != nil { if !postgres.IsPkeyViolation(err) { log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg) } return } - // Try to get session from db + // Try to get session from db for the following handlers session, err := pg.GetSession(sessionID) if err != nil { // Might happen due to the assets-related message TODO: log only if session is necessary for this kind of message @@ -52,22 +53,25 @@ func main() { } // Insert statistics - err = si.insertStats(session, msg) + err = si.InsertStats(session, msg) if err != nil { log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg) } + // Insert heuristics heurFinder.HandleMessage(session, msg) + + // TODO: ??? heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) { // TODO: DRY code (carefully with the return statement logic) - if err := mi.insertMessage(sessionID, msg); err != nil { + if err := mi.InsertMessage(sessionID, msg); err != nil { if !postgres.IsPkeyViolation(err) { log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg) } return } - if err := si.insertStats(session, msg); err != nil { + if err := si.InsertStats(session, msg); err != nil { log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg) } }) diff --git a/backend/services/db/messages.go b/backend/internal/datasaver/messages.go similarity index 96% rename from backend/services/db/messages.go rename to backend/internal/datasaver/messages.go index 1a583ae6a..34a7b8dd4 100644 --- a/backend/services/db/messages.go +++ b/backend/internal/datasaver/messages.go @@ -1,4 +1,4 @@ -package main +package datasaver import ( "openreplay/backend/pkg/db/cache" @@ -13,7 +13,7 @@ func NewMessageInserter(pg *cache.PGCache) *MessageInserter { return &MessageInserter{pg: pg} } -func (mi *MessageInserter) insertMessage(sessionID uint64, msg Message) error { +func (mi *MessageInserter) InsertMessage(sessionID uint64, msg Message) error { switch m := msg.(type) { // Common case *Metadata: diff --git a/backend/services/db/stats.go b/backend/internal/datasaver/stats.go similarity index 87% rename from backend/services/db/stats.go rename to backend/internal/datasaver/stats.go index a2c2024fb..250c01775 100644 --- a/backend/services/db/stats.go +++ b/backend/internal/datasaver/stats.go @@ -1,4 +1,4 @@ -package main +package datasaver import ( "openreplay/backend/pkg/db/cache" @@ -14,7 +14,7 @@ func NewStatsInserter(pg *cache.PGCache) *StatsInserter { return &StatsInserter{pg: pg} } -func (si *StatsInserter) insertStats(session *Session, msg Message) error { +func (si *StatsInserter) InsertStats(session *Session, msg Message) error { switch m := msg.(type) { // Web case *PerformanceTrackAggr: diff --git a/backend/services/db/heuristics/anr.go b/backend/internal/heuristics/anr.go similarity index 100% rename from backend/services/db/heuristics/anr.go rename to backend/internal/heuristics/anr.go diff --git a/backend/services/db/heuristics/clickrage.go b/backend/internal/heuristics/clickrage.go similarity index 100% rename from backend/services/db/heuristics/clickrage.go rename to backend/internal/heuristics/clickrage.go diff --git a/backend/services/db/heuristics/heuristics.go b/backend/internal/heuristics/heuristics.go similarity index 100% rename from backend/services/db/heuristics/heuristics.go rename to backend/internal/heuristics/heuristics.go diff --git a/backend/services/db/heuristics/performance.go b/backend/internal/heuristics/performance.go similarity index 100% rename from backend/services/db/heuristics/performance.go rename to backend/internal/heuristics/performance.go diff --git a/backend/services/db/heuristics/readyMessageStore.go b/backend/internal/heuristics/readyMessageStore.go similarity index 100% rename from backend/services/db/heuristics/readyMessageStore.go rename to backend/internal/heuristics/readyMessageStore.go diff --git a/backend/services/db/heuristics/session.go b/backend/internal/heuristics/session.go similarity index 100% rename from backend/services/db/heuristics/session.go rename to backend/internal/heuristics/session.go diff --git a/backend/services/db/build_hack b/backend/services/db/build_hack new file mode 100644 index 000000000..e69de29bb From c0503941167c06cb8c1331840857167326a25c68 Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Thu, 5 May 2022 10:23:36 +0200 Subject: [PATCH 5/7] Moved service configs to config module --- backend/cmd/db/main.go | 40 +++++++++++++------------- backend/internal/config/db/config.go | 28 ++++++++++++++++++ backend/internal/datasaver/messages.go | 11 +------ backend/internal/datasaver/saver.go | 11 +++++++ backend/internal/datasaver/stats.go | 11 +------ 5 files changed, 61 insertions(+), 40 deletions(-) create mode 100644 backend/internal/config/db/config.go create mode 100644 backend/internal/datasaver/saver.go diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index 675c9f975..962057213 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -2,6 +2,7 @@ package main import ( "log" + "openreplay/backend/internal/config/db" "openreplay/backend/internal/datasaver" "openreplay/backend/internal/heuristics" "time" @@ -12,7 +13,6 @@ import ( "openreplay/backend/pkg/db/cache" "openreplay/backend/pkg/db/postgres" - "openreplay/backend/pkg/env" logger "openreplay/backend/pkg/log" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue" @@ -22,22 +22,23 @@ import ( func main() { log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) + cfg := db.New() + // Init database - pg := cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000*60*20) + pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres), cfg.ProjectExpirationTimeoutMs) defer pg.Close() // Init modules heurFinder := heuristics.NewHandler() - mi := datasaver.NewMessageInserter(pg) - si := datasaver.NewStatsInserter(pg) - statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC")) + saver := datasaver.New(pg) + statsLogger := logger.NewQueueStats(cfg.LoggerTimeout) // Handler logic handler := func(sessionID uint64, msg messages.Message, meta *types.Meta) { statsLogger.Collect(sessionID, meta) - // Just insert message into db without additional checks - if err := mi.InsertMessage(sessionID, msg); err != nil { + // Just save session data into db without additional checks + if err := saver.InsertMessage(sessionID, msg); err != nil { if !postgres.IsPkeyViolation(err) { log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg) } @@ -52,26 +53,26 @@ func main() { return } - // Insert statistics - err = si.InsertStats(session, msg) + // Save statistics to db + err = saver.InsertStats(session, msg) if err != nil { log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg) } - // Insert heuristics + // Handle heuristics and save to temporary queue in memory heurFinder.HandleMessage(session, msg) - // TODO: ??? + // Process saved heuristics messages as usual messages above in the code heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) { // TODO: DRY code (carefully with the return statement logic) - if err := mi.InsertMessage(sessionID, msg); err != nil { + if err := saver.InsertMessage(sessionID, msg); err != nil { if !postgres.IsPkeyViolation(err) { log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg) } return } - if err := si.InsertStats(session, msg); err != nil { + if err := saver.InsertStats(session, msg); err != nil { log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg) } }) @@ -79,21 +80,21 @@ func main() { // Init consumer consumer := queue.NewMessageConsumer( - env.String("GROUP_DB"), + cfg.GroupDB, []string{ - env.String("TOPIC_RAW_IOS"), - env.String("TOPIC_TRIGGER"), + cfg.TopicRawIOS, + cfg.TopicTrigger, }, handler, false, ) + log.Printf("Db service started\n") + sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - tick := time.Tick(15 * time.Second) - - log.Printf("Db service started\n") + tick := time.Tick(cfg.CommitBatchTimeout) for { select { case sig := <-sigchan: @@ -113,5 +114,4 @@ func main() { } } } - } diff --git a/backend/internal/config/db/config.go b/backend/internal/config/db/config.go new file mode 100644 index 000000000..fb35a199c --- /dev/null +++ b/backend/internal/config/db/config.go @@ -0,0 +1,28 @@ +package db + +import ( + "openreplay/backend/pkg/env" + "time" +) + +type Config struct { + Postgres string + ProjectExpirationTimeoutMs int64 + LoggerTimeout int + GroupDB string + TopicRawIOS string + TopicTrigger string + CommitBatchTimeout time.Duration +} + +func New() *Config { + return &Config{ + Postgres: env.String("POSTGRES_STRING"), + ProjectExpirationTimeoutMs: 1000 * 60 * 20, + LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"), + GroupDB: env.String("GROUP_DB"), + TopicRawIOS: env.String("TOPIC_RAW_IOS"), + TopicTrigger: env.String("TOPIC_TRIGGER"), + CommitBatchTimeout: 15 * time.Second, + } +} diff --git a/backend/internal/datasaver/messages.go b/backend/internal/datasaver/messages.go index 34a7b8dd4..1e774888d 100644 --- a/backend/internal/datasaver/messages.go +++ b/backend/internal/datasaver/messages.go @@ -1,19 +1,10 @@ package datasaver import ( - "openreplay/backend/pkg/db/cache" . "openreplay/backend/pkg/messages" ) -type MessageInserter struct { - pg *cache.PGCache -} - -func NewMessageInserter(pg *cache.PGCache) *MessageInserter { - return &MessageInserter{pg: pg} -} - -func (mi *MessageInserter) InsertMessage(sessionID uint64, msg Message) error { +func (mi *Saver) InsertMessage(sessionID uint64, msg Message) error { switch m := msg.(type) { // Common case *Metadata: diff --git a/backend/internal/datasaver/saver.go b/backend/internal/datasaver/saver.go new file mode 100644 index 000000000..4cd742718 --- /dev/null +++ b/backend/internal/datasaver/saver.go @@ -0,0 +1,11 @@ +package datasaver + +import "openreplay/backend/pkg/db/cache" + +type Saver struct { + pg *cache.PGCache +} + +func New(pg *cache.PGCache) *Saver { + return &Saver{pg: pg} +} diff --git a/backend/internal/datasaver/stats.go b/backend/internal/datasaver/stats.go index 250c01775..a57d91824 100644 --- a/backend/internal/datasaver/stats.go +++ b/backend/internal/datasaver/stats.go @@ -1,20 +1,11 @@ package datasaver import ( - "openreplay/backend/pkg/db/cache" . "openreplay/backend/pkg/db/types" . "openreplay/backend/pkg/messages" ) -type StatsInserter struct { - pg *cache.PGCache -} - -func NewStatsInserter(pg *cache.PGCache) *StatsInserter { - return &StatsInserter{pg: pg} -} - -func (si *StatsInserter) InsertStats(session *Session, msg Message) error { +func (si *Saver) InsertStats(session *Session, msg Message) error { switch m := msg.(type) { // Web case *PerformanceTrackAggr: From 74756b240917bfb4a7faef391c621f87f049542d Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Thu, 5 May 2022 10:46:48 +0200 Subject: [PATCH 6/7] Refactoring of the db service --- backend/build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/build.sh b/backend/build.sh index 8e3bea86a..b4de3c2de 100755 --- a/backend/build.sh +++ b/backend/build.sh @@ -23,7 +23,7 @@ function build_service() { image="$1" echo "BUILDING $image" case "$image" in - http) + http | db) echo build http docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --build-arg SERVICE_NAME=$image -f ./cmd/Dockerfile . [[ $PUSH_IMAGE -eq 1 ]] && { From b6d57b45abda237147c37216d8a6607a201de0ae Mon Sep 17 00:00:00 2001 From: Alex Kaminskii Date: Thu, 5 May 2022 12:49:44 +0200 Subject: [PATCH 7/7] chore(github-workflow): backend --- .github/workflows/workers-ee.yaml | 8 +++++++- .github/workflows/workers.yaml | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/.github/workflows/workers-ee.yaml b/.github/workflows/workers-ee.yaml index a61d75160..18fca8872 100644 --- a/.github/workflows/workers-ee.yaml +++ b/.github/workflows/workers-ee.yaml @@ -47,7 +47,13 @@ jobs: # # Getting the images to build # - git diff --name-only HEAD HEAD~1 | grep backend/services | cut -d '/' -f3 | uniq > backend/images_to_build.txt + { + git diff --name-only HEAD HEAD~1 | grep -E "backend/cmd|backend/services" | grep -vE ^ee/ | cut -d '/' -f3 + + git diff --name-only HEAD HEAD~1 | grep -E "backend/pkg|backend/internal" | grep -vE ^ee/ | cut -d '/' -f3 | uniq | while read -r pkg_name ; do + grep -rl "pkg/$pkg_name" backend/services backend/cmd | cut -d '/' -f3 + done + } | uniq > backend/images_to_build.txt [[ $(cat backend/images_to_build.txt) != "" ]] || (echo "Nothing to build here"; exit 0) # # Pushing image to registry diff --git a/.github/workflows/workers.yaml b/.github/workflows/workers.yaml index 7ce78ad6f..e6a91647d 100644 --- a/.github/workflows/workers.yaml +++ b/.github/workflows/workers.yaml @@ -51,8 +51,8 @@ jobs: { git diff --name-only HEAD HEAD~1 | grep -E "backend/cmd|backend/services" | grep -vE ^ee/ | cut -d '/' -f3 - git diff --name-only HEAD HEAD~1 | grep backend/pkg | grep -vE ^ee/ | cut -d '/' -f3 | uniq | while read -r pkg_name ; do - grep -rl "pkg/$pkg_name" backend/services | cut -d '/' -f3 + git diff --name-only HEAD HEAD~1 | grep -E "backend/pkg|backend/internal" | grep -vE ^ee/ | cut -d '/' -f3 | uniq | while read -r pkg_name ; do + grep -rl "pkg/$pkg_name" backend/services backend/cmd | cut -d '/' -f3 done } | uniq > backend/images_to_build.txt