feat(backend): start using analytics topic for heuristics and trigger topic only for sessionEnd between sink and storage

This commit is contained in:
Alexander Zavorotynskiy 2022-06-16 14:00:50 +02:00
parent 75504409e7
commit d837c14be4
8 changed files with 12 additions and 25 deletions

View file

@ -102,8 +102,7 @@ func main() {
cfg.GroupDB,
[]string{
cfg.TopicRawWeb,
cfg.TopicRawIOS,
cfg.TopicTrigger, // to receive SessionEnd events
cfg.TopicAnalytics,
},
handler,
false,

View file

@ -42,7 +42,6 @@ func main() {
cfg.GroupEnder,
[]string{
cfg.TopicRawWeb,
cfg.TopicRawIOS,
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
statsLogger.Collect(sessionID, meta)
@ -71,7 +70,7 @@ func main() {
// Find ended sessions and send notification to other services
sessions.HandleEndedSessions(func(sessionID uint64, timestamp int64) bool {
msg := &messages.SessionEnd{Timestamp: uint64(timestamp)}
if err := producer.Produce(cfg.TopicTrigger, sessionID, messages.Encode(msg)); err != nil {
if err := producer.Produce(cfg.TopicRawWeb, sessionID, messages.Encode(msg)); err != nil {
log.Printf("can't send SessionEnd to trigger topic: %s; sessID: %d", err, sessionID)
return false
}

View file

@ -60,8 +60,6 @@ func main() {
cfg.GroupHeuristics,
[]string{
cfg.TopicRawWeb,
cfg.TopicRawIOS,
cfg.TopicTrigger, // to receive SessionEnd events
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
statsLogger.Collect(sessionID, meta)
@ -86,7 +84,7 @@ func main() {
os.Exit(0)
case <-tick:
builderMap.IterateReadyMessages(func(sessionID uint64, readyMsg messages.Message) {
producer.Produce(cfg.TopicTrigger, sessionID, messages.Encode(readyMsg))
producer.Produce(cfg.TopicAnalytics, sessionID, messages.Encode(readyMsg))
})
producer.Flush(cfg.ProducerTimeout)
consumer.Commit()

View file

@ -60,9 +60,7 @@ func main() {
consumer := queue.NewMessageConsumer(
cfg.GroupSink,
[]string{
cfg.TopicRawIOS,
cfg.TopicRawWeb,
cfg.TopicTrigger,
},
func(sessionID uint64, message Message, _ *types.Meta) {
// Process assets
@ -73,12 +71,11 @@ func main() {
// Filter message
typeID := message.TypeID()
// Send SessionFinished trigger to storage service
switch m := message.(type) {
// Send SessionEnd trigger to storage service
switch message.(type) {
case *SessionEnd:
msg := &SessionFinished{Timestamp: m.Timestamp}
if err := producer.Produce(cfg.TopicTrigger, sessionID, Encode(msg)); err != nil {
log.Printf("can't send SessionFinished to trigger topic: %s; sessID: %d", err, sessionID)
if err := producer.Produce(cfg.TopicTrigger, sessionID, Encode(message)); err != nil {
log.Printf("can't send SessionEnd to trigger topic: %s; sessID: %d", err, sessionID)
}
return
}

View file

@ -44,7 +44,7 @@ func main() {
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
switch msg.(type) {
case *messages.SessionFinished:
case *messages.SessionEnd:
srv.UploadKey(strconv.FormatUint(sessionID, 10), 5)
// Log timestamp of last processed session
counter.Update(sessionID, time.UnixMilli(meta.Timestamp))

View file

@ -11,8 +11,7 @@ type Config struct {
LoggerTimeout int
GroupDB string
TopicRawWeb string
TopicRawIOS string
TopicTrigger string
TopicAnalytics string
CommitBatchTimeout time.Duration
BatchQueueLimit int
BatchSizeLimit int
@ -25,8 +24,7 @@ func New() *Config {
LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"),
GroupDB: env.String("GROUP_DB"),
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
TopicRawIOS: env.String("TOPIC_RAW_IOS"),
TopicTrigger: env.String("TOPIC_TRIGGER"),
TopicAnalytics: env.String("TOPIC_ANALYTICS"),
CommitBatchTimeout: 15 * time.Second,
BatchQueueLimit: env.Int("BATCH_QUEUE_LIMIT"),
BatchSizeLimit: env.Int("BATCH_SIZE_LIMIT"),

View file

@ -6,10 +6,8 @@ import (
type Config struct {
GroupEnder string
TopicTrigger string
LoggerTimeout int
TopicRawWeb string
TopicRawIOS string
ProducerTimeout int
PartitionsNumber int
}
@ -17,10 +15,8 @@ type Config struct {
func New() *Config {
return &Config{
GroupEnder: env.String("GROUP_ENDER"),
TopicTrigger: env.String("TOPIC_TRIGGER"),
LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"),
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
TopicRawIOS: env.String("TOPIC_RAW_IOS"),
ProducerTimeout: 2000,
PartitionsNumber: env.Int("PARTITIONS_NUMBER"),
}

View file

@ -6,7 +6,7 @@ import (
type Config struct {
GroupHeuristics string
TopicTrigger string
TopicAnalytics string
LoggerTimeout int
TopicRawWeb string
TopicRawIOS string
@ -16,7 +16,7 @@ type Config struct {
func New() *Config {
return &Config{
GroupHeuristics: env.String("GROUP_HEURISTICS"),
TopicTrigger: env.String("TOPIC_TRIGGER"),
TopicAnalytics: env.String("TOPIC_ANALYTICS"),
LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"),
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
TopicRawIOS: env.String("TOPIC_RAW_IOS"),