diff --git a/.github/workflows/workers-ee.yaml b/.github/workflows/workers-ee.yaml index a61d75160..18fca8872 100644 --- a/.github/workflows/workers-ee.yaml +++ b/.github/workflows/workers-ee.yaml @@ -47,7 +47,13 @@ jobs: # # Getting the images to build # - git diff --name-only HEAD HEAD~1 | grep backend/services | cut -d '/' -f3 | uniq > backend/images_to_build.txt + { + git diff --name-only HEAD HEAD~1 | grep -E "backend/cmd|backend/services" | grep -vE ^ee/ | cut -d '/' -f3 + + git diff --name-only HEAD HEAD~1 | grep -E "backend/pkg|backend/internal" | grep -vE ^ee/ | cut -d '/' -f3 | uniq | while read -r pkg_name ; do + grep -rl "pkg/$pkg_name" backend/services backend/cmd | cut -d '/' -f3 + done + } | uniq > backend/images_to_build.txt [[ $(cat backend/images_to_build.txt) != "" ]] || (echo "Nothing to build here"; exit 0) # # Pushing image to registry diff --git a/.github/workflows/workers.yaml b/.github/workflows/workers.yaml index 7ce78ad6f..e6a91647d 100644 --- a/.github/workflows/workers.yaml +++ b/.github/workflows/workers.yaml @@ -51,8 +51,8 @@ jobs: { git diff --name-only HEAD HEAD~1 | grep -E "backend/cmd|backend/services" | grep -vE ^ee/ | cut -d '/' -f3 - git diff --name-only HEAD HEAD~1 | grep backend/pkg | grep -vE ^ee/ | cut -d '/' -f3 | uniq | while read -r pkg_name ; do - grep -rl "pkg/$pkg_name" backend/services | cut -d '/' -f3 + git diff --name-only HEAD HEAD~1 | grep -E "backend/pkg|backend/internal" | grep -vE ^ee/ | cut -d '/' -f3 | uniq | while read -r pkg_name ; do + grep -rl "pkg/$pkg_name" backend/services backend/cmd | cut -d '/' -f3 done } | uniq > backend/images_to_build.txt diff --git a/backend/build.sh b/backend/build.sh index 8e3bea86a..b4de3c2de 100755 --- a/backend/build.sh +++ b/backend/build.sh @@ -23,7 +23,7 @@ function build_service() { image="$1" echo "BUILDING $image" case "$image" in - http) + http | db) echo build http docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --build-arg SERVICE_NAME=$image -f ./cmd/Dockerfile . [[ $PUSH_IMAGE -eq 1 ]] && { diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go new file mode 100644 index 000000000..962057213 --- /dev/null +++ b/backend/cmd/db/main.go @@ -0,0 +1,117 @@ +package main + +import ( + "log" + "openreplay/backend/internal/config/db" + "openreplay/backend/internal/datasaver" + "openreplay/backend/internal/heuristics" + "time" + + "os" + "os/signal" + "syscall" + + "openreplay/backend/pkg/db/cache" + "openreplay/backend/pkg/db/postgres" + logger "openreplay/backend/pkg/log" + "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/queue" + "openreplay/backend/pkg/queue/types" +) + +func main() { + log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) + + cfg := db.New() + + // Init database + pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres), cfg.ProjectExpirationTimeoutMs) + defer pg.Close() + + // Init modules + heurFinder := heuristics.NewHandler() + saver := datasaver.New(pg) + statsLogger := logger.NewQueueStats(cfg.LoggerTimeout) + + // Handler logic + handler := func(sessionID uint64, msg messages.Message, meta *types.Meta) { + statsLogger.Collect(sessionID, meta) + + // Just save session data into db without additional checks + if err := saver.InsertMessage(sessionID, msg); err != nil { + if !postgres.IsPkeyViolation(err) { + log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg) + } + return + } + + // Try to get session from db for the following handlers + session, err := pg.GetSession(sessionID) + if err != nil { + // Might happen due to the assets-related message TODO: log only if session is necessary for this kind of message + log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, sessionID, msg) + return + } + + // Save statistics to db + err = saver.InsertStats(session, msg) + if err != nil { + log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg) + } + + // Handle heuristics and save to temporary queue in memory + heurFinder.HandleMessage(session, msg) + + // Process saved heuristics messages as usual messages above in the code + heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) { + // TODO: DRY code (carefully with the return statement logic) + if err := saver.InsertMessage(sessionID, msg); err != nil { + if !postgres.IsPkeyViolation(err) { + log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg) + } + return + } + + if err := saver.InsertStats(session, msg); err != nil { + log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg) + } + }) + } + + // Init consumer + consumer := queue.NewMessageConsumer( + cfg.GroupDB, + []string{ + cfg.TopicRawIOS, + cfg.TopicTrigger, + }, + handler, + false, + ) + + log.Printf("Db service started\n") + + sigchan := make(chan os.Signal, 1) + signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) + + tick := time.Tick(cfg.CommitBatchTimeout) + for { + select { + case sig := <-sigchan: + log.Printf("Caught signal %v: terminating\n", sig) + consumer.Close() + os.Exit(0) + case <-tick: + pg.CommitBatches() + // TODO?: separate stats & regular messages + if err := consumer.Commit(); err != nil { + log.Printf("Error on consumer commit: %v", err) + } + default: + err := consumer.ConsumeNext() + if err != nil { + log.Fatalf("Error on consumption: %v", err) // TODO: is always fatal? + } + } + } +} diff --git a/backend/internal/config/db/config.go b/backend/internal/config/db/config.go new file mode 100644 index 000000000..fb35a199c --- /dev/null +++ b/backend/internal/config/db/config.go @@ -0,0 +1,28 @@ +package db + +import ( + "openreplay/backend/pkg/env" + "time" +) + +type Config struct { + Postgres string + ProjectExpirationTimeoutMs int64 + LoggerTimeout int + GroupDB string + TopicRawIOS string + TopicTrigger string + CommitBatchTimeout time.Duration +} + +func New() *Config { + return &Config{ + Postgres: env.String("POSTGRES_STRING"), + ProjectExpirationTimeoutMs: 1000 * 60 * 20, + LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"), + GroupDB: env.String("GROUP_DB"), + TopicRawIOS: env.String("TOPIC_RAW_IOS"), + TopicTrigger: env.String("TOPIC_TRIGGER"), + CommitBatchTimeout: 15 * time.Second, + } +} diff --git a/backend/internal/datasaver/messages.go b/backend/internal/datasaver/messages.go new file mode 100644 index 000000000..1e774888d --- /dev/null +++ b/backend/internal/datasaver/messages.go @@ -0,0 +1,66 @@ +package datasaver + +import ( + . "openreplay/backend/pkg/messages" +) + +func (mi *Saver) InsertMessage(sessionID uint64, msg Message) error { + switch m := msg.(type) { + // Common + case *Metadata: + return mi.pg.InsertMetadata(sessionID, m) + case *IssueEvent: + return mi.pg.InsertIssueEvent(sessionID, m) + //TODO: message adapter (transformer) (at the level of pkg/message) for types: *IOSMetadata, *IOSIssueEvent and others + + // Web + case *SessionStart: + return mi.pg.InsertWebSessionStart(sessionID, m) + case *SessionEnd: + return mi.pg.InsertWebSessionEnd(sessionID, m) + case *UserID: + return mi.pg.InsertWebUserID(sessionID, m) + case *UserAnonymousID: + return mi.pg.InsertWebUserAnonymousID(sessionID, m) + case *CustomEvent: + return mi.pg.InsertWebCustomEvent(sessionID, m) + case *ClickEvent: + return mi.pg.InsertWebClickEvent(sessionID, m) + case *InputEvent: + return mi.pg.InsertWebInputEvent(sessionID, m) + + // Unique Web messages + case *PageEvent: + return mi.pg.InsertWebPageEvent(sessionID, m) + case *ErrorEvent: + return mi.pg.InsertWebErrorEvent(sessionID, m) + case *FetchEvent: + return mi.pg.InsertWebFetchEvent(sessionID, m) + case *GraphQLEvent: + return mi.pg.InsertWebGraphQLEvent(sessionID, m) + + // IOS + case *IOSSessionStart: + return mi.pg.InsertIOSSessionStart(sessionID, m) + case *IOSSessionEnd: + return mi.pg.InsertIOSSessionEnd(sessionID, m) + case *IOSUserID: + return mi.pg.InsertIOSUserID(sessionID, m) + case *IOSUserAnonymousID: + return mi.pg.InsertIOSUserAnonymousID(sessionID, m) + case *IOSCustomEvent: + return mi.pg.InsertIOSCustomEvent(sessionID, m) + case *IOSClickEvent: + return mi.pg.InsertIOSClickEvent(sessionID, m) + case *IOSInputEvent: + return mi.pg.InsertIOSInputEvent(sessionID, m) + // Unique IOS messages + case *IOSNetworkCall: + return mi.pg.InsertIOSNetworkCall(sessionID, m) + case *IOSScreenEnter: + return mi.pg.InsertIOSScreenEnter(sessionID, m) + case *IOSCrash: + return mi.pg.InsertIOSCrash(sessionID, m) + } + return nil // "Not implemented" +} diff --git a/backend/internal/datasaver/saver.go b/backend/internal/datasaver/saver.go new file mode 100644 index 000000000..4cd742718 --- /dev/null +++ b/backend/internal/datasaver/saver.go @@ -0,0 +1,11 @@ +package datasaver + +import "openreplay/backend/pkg/db/cache" + +type Saver struct { + pg *cache.PGCache +} + +func New(pg *cache.PGCache) *Saver { + return &Saver{pg: pg} +} diff --git a/backend/internal/datasaver/stats.go b/backend/internal/datasaver/stats.go new file mode 100644 index 000000000..a57d91824 --- /dev/null +++ b/backend/internal/datasaver/stats.go @@ -0,0 +1,19 @@ +package datasaver + +import ( + . "openreplay/backend/pkg/db/types" + . "openreplay/backend/pkg/messages" +) + +func (si *Saver) InsertStats(session *Session, msg Message) error { + switch m := msg.(type) { + // Web + case *PerformanceTrackAggr: + return si.pg.InsertWebStatsPerformance(session.SessionID, m) + case *ResourceEvent: + return si.pg.InsertWebStatsResourceEvent(session.SessionID, m) + case *LongTask: + return si.pg.InsertWebStatsLongtask(session.SessionID, m) + } + return nil +} diff --git a/backend/services/db/heuristics/anr.go b/backend/internal/heuristics/anr.go similarity index 96% rename from backend/services/db/heuristics/anr.go rename to backend/internal/heuristics/anr.go index 0475b00be..7cec8fc97 100644 --- a/backend/services/db/heuristics/anr.go +++ b/backend/internal/heuristics/anr.go @@ -18,8 +18,6 @@ func (h *anr) buildIf(timestamp uint64) { m := &IOSIssueEvent{ Type: "anr", ContextString: h.lastLabel, - //Context: "{}", - //Payload: fmt.SPrint } m.Timestamp = h.lastHeartbeatTimestamp m.Index = h.lastHeartbeatIndex // Associated Index/ MessageID ? diff --git a/backend/services/db/heuristics/clickrage.go b/backend/internal/heuristics/clickrage.go similarity index 96% rename from backend/services/db/heuristics/clickrage.go rename to backend/internal/heuristics/clickrage.go index 9a1db92d5..4d19bf92e 100644 --- a/backend/services/db/heuristics/clickrage.go +++ b/backend/internal/heuristics/clickrage.go @@ -21,8 +21,6 @@ func (h *clickrage) build() { m := &IOSIssueEvent{ Type: "click_rage", ContextString: h.lastLabel, - //Context: "{}", - //Payload: fmt.SPrint } m.Timestamp = h.firstInARawTimestamp m.Index = h.firstInARawSeqIndex // Associated Index/ MessageID ? diff --git a/backend/services/db/heuristics/heuristics.go b/backend/internal/heuristics/heuristics.go similarity index 100% rename from backend/services/db/heuristics/heuristics.go rename to backend/internal/heuristics/heuristics.go diff --git a/backend/services/db/heuristics/performance.go b/backend/internal/heuristics/performance.go similarity index 100% rename from backend/services/db/heuristics/performance.go rename to backend/internal/heuristics/performance.go diff --git a/backend/services/db/heuristics/readyMessageStore.go b/backend/internal/heuristics/readyMessageStore.go similarity index 100% rename from backend/services/db/heuristics/readyMessageStore.go rename to backend/internal/heuristics/readyMessageStore.go diff --git a/backend/services/db/heuristics/session.go b/backend/internal/heuristics/session.go similarity index 100% rename from backend/services/db/heuristics/session.go rename to backend/internal/heuristics/session.go diff --git a/backend/pkg/log/queue.go b/backend/pkg/log/queue.go index 62e47cbbe..ced815bd2 100644 --- a/backend/pkg/log/queue.go +++ b/backend/pkg/log/queue.go @@ -5,7 +5,6 @@ import ( "log" "time" - "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue/types" //"openreplay/backend/pkg/env" ) @@ -18,6 +17,19 @@ type partitionStats struct { count int } +// Update partition statistic +func (prt *partitionStats) update(m *types.Meta) { + if prt.maxts < m.Timestamp { + prt.maxts = m.Timestamp + } + if prt.mints > m.Timestamp || prt.mints == 0 { + prt.mints = m.Timestamp + } + prt.lastts = m.Timestamp + prt.lastID = m.ID + prt.count += 1 +} + type queueStats struct { prts map[int32]*partitionStats tick <-chan time.Time @@ -30,43 +42,35 @@ func NewQueueStats(sec int) *queueStats { } } -func (qs *queueStats) HandleAndLog(sessionID uint64, m *types.Meta) { +// Collect writes new data to partition statistic +func (qs *queueStats) Collect(sessionID uint64, m *types.Meta) { prti := int32(sessionID % 16) // TODO use GetKeyPartition from kafka/key.go prt, ok := qs.prts[prti] if !ok { qs.prts[prti] = &partitionStats{} prt = qs.prts[prti] } - - if prt.maxts < m.Timestamp { - prt.maxts = m.Timestamp - } - if prt.mints > m.Timestamp || prt.mints == 0 { - prt.mints = m.Timestamp - } - prt.lastts = m.Timestamp - prt.lastID = m.ID - prt.count += 1 + prt.update(m) select { case <-qs.tick: - qs.LogThenReset() + qs.log() + qs.reset() default: } } -func (qs *queueStats) LogThenReset() { +// Print to console collected statistics +func (qs *queueStats) log() { s := "Queue Statistics: " for i, p := range qs.prts { s = fmt.Sprintf("%v | %v:: lastTS %v, lastID %v, count %v, maxTS %v, minTS %v", s, i, p.lastts, p.lastID, p.count, p.maxts, p.mints) } log.Println(s) - // reset - qs.prts = make(map[int32]*partitionStats) } -// TODO: list of message id to log (mb filter function with callback in messages/utils.go or something) -func LogMessage(s string, sessionID uint64, msg messages.Message, m *types.Meta) { - log.Printf("%v | SessionID: %v, Queue info: %v, Message: %v", s, sessionID, m, msg) +// Clear all queue partitions +func (qs *queueStats) reset() { + qs.prts = make(map[int32]*partitionStats) } diff --git a/backend/services/db/build_hack b/backend/services/db/build_hack new file mode 100644 index 000000000..e69de29bb diff --git a/backend/services/db/main.go b/backend/services/db/main.go deleted file mode 100644 index 2ad6e4aa8..000000000 --- a/backend/services/db/main.go +++ /dev/null @@ -1,109 +0,0 @@ -package main - -import ( - "log" - "time" - - "os" - "os/signal" - "syscall" - - "openreplay/backend/pkg/db/cache" - "openreplay/backend/pkg/db/postgres" - "openreplay/backend/pkg/env" - logger "openreplay/backend/pkg/log" - "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/queue" - "openreplay/backend/pkg/queue/types" - "openreplay/backend/services/db/heuristics" -) - -var pg *cache.PGCache - -func main() { - log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) - - initStats() - pg = cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000*60*20) - defer pg.Close() - - heurFinder := heuristics.NewHandler() - - statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC")) - - consumer := queue.NewMessageConsumer( - env.String("GROUP_DB"), - []string{ - env.String("TOPIC_RAW_IOS"), - env.String("TOPIC_TRIGGER"), - }, - func(sessionID uint64, msg messages.Message, meta *types.Meta) { - statsLogger.HandleAndLog(sessionID, meta) - - if err := insertMessage(sessionID, msg); err != nil { - if !postgres.IsPkeyViolation(err) { - log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg) - } - return - } - - session, err := pg.GetSession(sessionID) - if err != nil { - // Might happen due to the assets-related message TODO: log only if session is necessary for this kind of message - log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, sessionID, msg) - return - } - - err = insertStats(session, msg) - if err != nil { - log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg) - } - - heurFinder.HandleMessage(session, msg) - heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) { - // TODO: DRY code (carefully with the return statement logic) - if err := insertMessage(sessionID, msg); err != nil { - if !postgres.IsPkeyViolation(err) { - log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg) - } - return - } - - if err := insertStats(session, msg); err != nil { - log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg) - } - }) - }, - false, - ) - - sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - - tick := time.Tick(15 * time.Second) - - log.Printf("Db service started\n") - for { - select { - case sig := <-sigchan: - log.Printf("Caught signal %v: terminating\n", sig) - consumer.Close() - os.Exit(0) - case <-tick: - pg.CommitBatches() - if err := commitStats(); err != nil { - log.Printf("Error on stats commit: %v", err) - } - // TODO?: separate stats & regular messages - if err := consumer.Commit(); err != nil { - log.Printf("Error on consumer commit: %v", err) - } - default: - err := consumer.ConsumeNext() - if err != nil { - log.Fatalf("Error on consumption: %v", err) // TODO: is always fatal? - } - } - } - -} diff --git a/backend/services/db/messages.go b/backend/services/db/messages.go deleted file mode 100644 index d3e4ae1ed..000000000 --- a/backend/services/db/messages.go +++ /dev/null @@ -1,68 +0,0 @@ -package main - -import ( - . "openreplay/backend/pkg/messages" -) - -func insertMessage(sessionID uint64, msg Message) error { - switch m := msg.(type) { - // Common - case *Metadata: - return pg.InsertMetadata(sessionID, m) - case *IssueEvent: - return pg.InsertIssueEvent(sessionID, m) - //TODO: message adapter (transformer) (at the level of pkg/message) for types: - // case *IOSMetadata, *IOSIssueEvent and others - - // Web - case *SessionStart: - return pg.InsertWebSessionStart(sessionID, m) - case *SessionEnd: - return pg.InsertWebSessionEnd(sessionID, m) - case *UserID: - return pg.InsertWebUserID(sessionID, m) - case *UserAnonymousID: - return pg.InsertWebUserAnonymousID(sessionID, m) - case *CustomEvent: - return pg.InsertWebCustomEvent(sessionID, m) - case *ClickEvent: - return pg.InsertWebClickEvent(sessionID, m) - case *InputEvent: - return pg.InsertWebInputEvent(sessionID, m) - // Unique Web messages - // case *ResourceEvent: - // return pg.InsertWebResourceEvent(sessionID, m) - case *PageEvent: - return pg.InsertWebPageEvent(sessionID, m) - case *ErrorEvent: - return pg.InsertWebErrorEvent(sessionID, m) - case *FetchEvent: - return pg.InsertWebFetchEvent(sessionID, m) - case *GraphQLEvent: - return pg.InsertWebGraphQLEvent(sessionID, m) - - // IOS - case *IOSSessionStart: - return pg.InsertIOSSessionStart(sessionID, m) - case *IOSSessionEnd: - return pg.InsertIOSSessionEnd(sessionID, m) - case *IOSUserID: - return pg.InsertIOSUserID(sessionID, m) - case *IOSUserAnonymousID: - return pg.InsertIOSUserAnonymousID(sessionID, m) - case *IOSCustomEvent: - return pg.InsertIOSCustomEvent(sessionID, m) - case *IOSClickEvent: - return pg.InsertIOSClickEvent(sessionID, m) - case *IOSInputEvent: - return pg.InsertIOSInputEvent(sessionID, m) - // Unique IOS messages - case *IOSNetworkCall: - return pg.InsertIOSNetworkCall(sessionID, m) - case *IOSScreenEnter: - return pg.InsertIOSScreenEnter(sessionID, m) - case *IOSCrash: - return pg.InsertIOSCrash(sessionID, m) - } - return nil // "Not implemented" -} diff --git a/backend/services/db/stats.go b/backend/services/db/stats.go deleted file mode 100644 index 2c3a5da38..000000000 --- a/backend/services/db/stats.go +++ /dev/null @@ -1,33 +0,0 @@ -package main - -import ( - . "openreplay/backend/pkg/db/types" - . "openreplay/backend/pkg/messages" -) - -func initStats() { - // noop -} - -func insertStats(session *Session, msg Message) error { - switch m := msg.(type) { - // Web - case *PerformanceTrackAggr: - return pg.InsertWebStatsPerformance(session.SessionID, m) - case *ResourceEvent: - return pg.InsertWebStatsResourceEvent(session.SessionID, m) - case *LongTask: - return pg.InsertWebStatsLongtask(session.SessionID, m) - - // IOS - // case *IOSPerformanceAggregated: - // return pg.InsertIOSPerformanceAggregated(session, m) - // case *IOSNetworkCall: - // return pg.InsertIOSNetworkCall(session, m) - } - return nil -} - -func commitStats() error { - return nil -} diff --git a/backend/services/ender/main.go b/backend/services/ender/main.go index f2430f3a0..4170a178e 100644 --- a/backend/services/ender/main.go +++ b/backend/services/ender/main.go @@ -35,7 +35,7 @@ func main() { env.String("TOPIC_RAW_IOS"), }, func(sessionID uint64, msg messages.Message, meta *types.Meta) { - statsLogger.HandleAndLog(sessionID, meta) + statsLogger.Collect(sessionID, meta) builderMap.HandleMessage(sessionID, msg, msg.Meta().Index) }, false,