From d837c14be414f1342e33489e1db611d7cd97dd60 Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Thu, 16 Jun 2022 14:00:50 +0200 Subject: [PATCH] feat(backend): start using analytics topic for heuristics and trigger topic only for sessionEnd between sink and storage --- backend/cmd/db/main.go | 3 +-- backend/cmd/ender/main.go | 3 +-- backend/cmd/heuristics/main.go | 4 +--- backend/cmd/sink/main.go | 11 ++++------- backend/cmd/storage/main.go | 2 +- backend/internal/config/db/config.go | 6 ++---- backend/internal/config/ender/config.go | 4 ---- backend/internal/config/heuristics/config.go | 4 ++-- 8 files changed, 12 insertions(+), 25 deletions(-) diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index f64b9f5d5..c863fdbeb 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -102,8 +102,7 @@ func main() { cfg.GroupDB, []string{ cfg.TopicRawWeb, - cfg.TopicRawIOS, - cfg.TopicTrigger, // to receive SessionEnd events + cfg.TopicAnalytics, }, handler, false, diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index 825046e3d..82f9b51d0 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -42,7 +42,6 @@ func main() { cfg.GroupEnder, []string{ cfg.TopicRawWeb, - cfg.TopicRawIOS, }, func(sessionID uint64, msg messages.Message, meta *types.Meta) { statsLogger.Collect(sessionID, meta) @@ -71,7 +70,7 @@ func main() { // Find ended sessions and send notification to other services sessions.HandleEndedSessions(func(sessionID uint64, timestamp int64) bool { msg := &messages.SessionEnd{Timestamp: uint64(timestamp)} - if err := producer.Produce(cfg.TopicTrigger, sessionID, messages.Encode(msg)); err != nil { + if err := producer.Produce(cfg.TopicRawWeb, sessionID, messages.Encode(msg)); err != nil { log.Printf("can't send SessionEnd to trigger topic: %s; sessID: %d", err, sessionID) return false } diff --git a/backend/cmd/heuristics/main.go b/backend/cmd/heuristics/main.go index 80134116d..0aa0415c4 100644 --- a/backend/cmd/heuristics/main.go +++ b/backend/cmd/heuristics/main.go @@ -60,8 +60,6 @@ func main() { cfg.GroupHeuristics, []string{ cfg.TopicRawWeb, - cfg.TopicRawIOS, - cfg.TopicTrigger, // to receive SessionEnd events }, func(sessionID uint64, msg messages.Message, meta *types.Meta) { statsLogger.Collect(sessionID, meta) @@ -86,7 +84,7 @@ func main() { os.Exit(0) case <-tick: builderMap.IterateReadyMessages(func(sessionID uint64, readyMsg messages.Message) { - producer.Produce(cfg.TopicTrigger, sessionID, messages.Encode(readyMsg)) + producer.Produce(cfg.TopicAnalytics, sessionID, messages.Encode(readyMsg)) }) producer.Flush(cfg.ProducerTimeout) consumer.Commit() diff --git a/backend/cmd/sink/main.go b/backend/cmd/sink/main.go index 0dff365b0..b1d3d21a0 100644 --- a/backend/cmd/sink/main.go +++ b/backend/cmd/sink/main.go @@ -60,9 +60,7 @@ func main() { consumer := queue.NewMessageConsumer( cfg.GroupSink, []string{ - cfg.TopicRawIOS, cfg.TopicRawWeb, - cfg.TopicTrigger, }, func(sessionID uint64, message Message, _ *types.Meta) { // Process assets @@ -73,12 +71,11 @@ func main() { // Filter message typeID := message.TypeID() - // Send SessionFinished trigger to storage service - switch m := message.(type) { + // Send SessionEnd trigger to storage service + switch message.(type) { case *SessionEnd: - msg := &SessionFinished{Timestamp: m.Timestamp} - if err := producer.Produce(cfg.TopicTrigger, sessionID, Encode(msg)); err != nil { - log.Printf("can't send SessionFinished to trigger topic: %s; sessID: %d", err, sessionID) + if err := producer.Produce(cfg.TopicTrigger, sessionID, Encode(message)); err != nil { + log.Printf("can't send SessionEnd to trigger topic: %s; sessID: %d", err, sessionID) } return } diff --git a/backend/cmd/storage/main.go b/backend/cmd/storage/main.go index db2305d53..2f92d0259 100644 --- a/backend/cmd/storage/main.go +++ b/backend/cmd/storage/main.go @@ -44,7 +44,7 @@ func main() { }, func(sessionID uint64, msg messages.Message, meta *types.Meta) { switch msg.(type) { - case *messages.SessionFinished: + case *messages.SessionEnd: srv.UploadKey(strconv.FormatUint(sessionID, 10), 5) // Log timestamp of last processed session counter.Update(sessionID, time.UnixMilli(meta.Timestamp)) diff --git a/backend/internal/config/db/config.go b/backend/internal/config/db/config.go index 7dc262855..f467b0c5c 100644 --- a/backend/internal/config/db/config.go +++ b/backend/internal/config/db/config.go @@ -11,8 +11,7 @@ type Config struct { LoggerTimeout int GroupDB string TopicRawWeb string - TopicRawIOS string - TopicTrigger string + TopicAnalytics string CommitBatchTimeout time.Duration BatchQueueLimit int BatchSizeLimit int @@ -25,8 +24,7 @@ func New() *Config { LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"), GroupDB: env.String("GROUP_DB"), TopicRawWeb: env.String("TOPIC_RAW_WEB"), - TopicRawIOS: env.String("TOPIC_RAW_IOS"), - TopicTrigger: env.String("TOPIC_TRIGGER"), + TopicAnalytics: env.String("TOPIC_ANALYTICS"), CommitBatchTimeout: 15 * time.Second, BatchQueueLimit: env.Int("BATCH_QUEUE_LIMIT"), BatchSizeLimit: env.Int("BATCH_SIZE_LIMIT"), diff --git a/backend/internal/config/ender/config.go b/backend/internal/config/ender/config.go index 4aec62c19..5898c69ec 100644 --- a/backend/internal/config/ender/config.go +++ b/backend/internal/config/ender/config.go @@ -6,10 +6,8 @@ import ( type Config struct { GroupEnder string - TopicTrigger string LoggerTimeout int TopicRawWeb string - TopicRawIOS string ProducerTimeout int PartitionsNumber int } @@ -17,10 +15,8 @@ type Config struct { func New() *Config { return &Config{ GroupEnder: env.String("GROUP_ENDER"), - TopicTrigger: env.String("TOPIC_TRIGGER"), LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"), TopicRawWeb: env.String("TOPIC_RAW_WEB"), - TopicRawIOS: env.String("TOPIC_RAW_IOS"), ProducerTimeout: 2000, PartitionsNumber: env.Int("PARTITIONS_NUMBER"), } diff --git a/backend/internal/config/heuristics/config.go b/backend/internal/config/heuristics/config.go index 1b9b30639..e4d5005ad 100644 --- a/backend/internal/config/heuristics/config.go +++ b/backend/internal/config/heuristics/config.go @@ -6,7 +6,7 @@ import ( type Config struct { GroupHeuristics string - TopicTrigger string + TopicAnalytics string LoggerTimeout int TopicRawWeb string TopicRawIOS string @@ -16,7 +16,7 @@ type Config struct { func New() *Config { return &Config{ GroupHeuristics: env.String("GROUP_HEURISTICS"), - TopicTrigger: env.String("TOPIC_TRIGGER"), + TopicAnalytics: env.String("TOPIC_ANALYTICS"), LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"), TopicRawWeb: env.String("TOPIC_RAW_WEB"), TopicRawIOS: env.String("TOPIC_RAW_IOS"),