From c0503941167c06cb8c1331840857167326a25c68 Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Thu, 5 May 2022 10:23:36 +0200 Subject: [PATCH] Moved service configs to config module --- backend/cmd/db/main.go | 40 +++++++++++++------------- backend/internal/config/db/config.go | 28 ++++++++++++++++++ backend/internal/datasaver/messages.go | 11 +------ backend/internal/datasaver/saver.go | 11 +++++++ backend/internal/datasaver/stats.go | 11 +------ 5 files changed, 61 insertions(+), 40 deletions(-) create mode 100644 backend/internal/config/db/config.go create mode 100644 backend/internal/datasaver/saver.go diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index 675c9f975..962057213 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -2,6 +2,7 @@ package main import ( "log" + "openreplay/backend/internal/config/db" "openreplay/backend/internal/datasaver" "openreplay/backend/internal/heuristics" "time" @@ -12,7 +13,6 @@ import ( "openreplay/backend/pkg/db/cache" "openreplay/backend/pkg/db/postgres" - "openreplay/backend/pkg/env" logger "openreplay/backend/pkg/log" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue" @@ -22,22 +22,23 @@ import ( func main() { log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) + cfg := db.New() + // Init database - pg := cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000*60*20) + pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres), cfg.ProjectExpirationTimeoutMs) defer pg.Close() // Init modules heurFinder := heuristics.NewHandler() - mi := datasaver.NewMessageInserter(pg) - si := datasaver.NewStatsInserter(pg) - statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC")) + saver := datasaver.New(pg) + statsLogger := logger.NewQueueStats(cfg.LoggerTimeout) // Handler logic handler := func(sessionID uint64, msg messages.Message, meta *types.Meta) { statsLogger.Collect(sessionID, meta) - // Just insert message into db without additional checks - if err := mi.InsertMessage(sessionID, msg); err != nil { + // Just save session data into db without additional checks + if err := saver.InsertMessage(sessionID, msg); err != nil { if !postgres.IsPkeyViolation(err) { log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg) } @@ -52,26 +53,26 @@ func main() { return } - // Insert statistics - err = si.InsertStats(session, msg) + // Save statistics to db + err = saver.InsertStats(session, msg) if err != nil { log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg) } - // Insert heuristics + // Handle heuristics and save to temporary queue in memory heurFinder.HandleMessage(session, msg) - // TODO: ??? + // Process saved heuristics messages as usual messages above in the code heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) { // TODO: DRY code (carefully with the return statement logic) - if err := mi.InsertMessage(sessionID, msg); err != nil { + if err := saver.InsertMessage(sessionID, msg); err != nil { if !postgres.IsPkeyViolation(err) { log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg) } return } - if err := si.InsertStats(session, msg); err != nil { + if err := saver.InsertStats(session, msg); err != nil { log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg) } }) @@ -79,21 +80,21 @@ func main() { // Init consumer consumer := queue.NewMessageConsumer( - env.String("GROUP_DB"), + cfg.GroupDB, []string{ - env.String("TOPIC_RAW_IOS"), - env.String("TOPIC_TRIGGER"), + cfg.TopicRawIOS, + cfg.TopicTrigger, }, handler, false, ) + log.Printf("Db service started\n") + sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - tick := time.Tick(15 * time.Second) - - log.Printf("Db service started\n") + tick := time.Tick(cfg.CommitBatchTimeout) for { select { case sig := <-sigchan: @@ -113,5 +114,4 @@ func main() { } } } - } diff --git a/backend/internal/config/db/config.go b/backend/internal/config/db/config.go new file mode 100644 index 000000000..fb35a199c --- /dev/null +++ b/backend/internal/config/db/config.go @@ -0,0 +1,28 @@ +package db + +import ( + "openreplay/backend/pkg/env" + "time" +) + +type Config struct { + Postgres string + ProjectExpirationTimeoutMs int64 + LoggerTimeout int + GroupDB string + TopicRawIOS string + TopicTrigger string + CommitBatchTimeout time.Duration +} + +func New() *Config { + return &Config{ + Postgres: env.String("POSTGRES_STRING"), + ProjectExpirationTimeoutMs: 1000 * 60 * 20, + LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"), + GroupDB: env.String("GROUP_DB"), + TopicRawIOS: env.String("TOPIC_RAW_IOS"), + TopicTrigger: env.String("TOPIC_TRIGGER"), + CommitBatchTimeout: 15 * time.Second, + } +} diff --git a/backend/internal/datasaver/messages.go b/backend/internal/datasaver/messages.go index 34a7b8dd4..1e774888d 100644 --- a/backend/internal/datasaver/messages.go +++ b/backend/internal/datasaver/messages.go @@ -1,19 +1,10 @@ package datasaver import ( - "openreplay/backend/pkg/db/cache" . "openreplay/backend/pkg/messages" ) -type MessageInserter struct { - pg *cache.PGCache -} - -func NewMessageInserter(pg *cache.PGCache) *MessageInserter { - return &MessageInserter{pg: pg} -} - -func (mi *MessageInserter) InsertMessage(sessionID uint64, msg Message) error { +func (mi *Saver) InsertMessage(sessionID uint64, msg Message) error { switch m := msg.(type) { // Common case *Metadata: diff --git a/backend/internal/datasaver/saver.go b/backend/internal/datasaver/saver.go new file mode 100644 index 000000000..4cd742718 --- /dev/null +++ b/backend/internal/datasaver/saver.go @@ -0,0 +1,11 @@ +package datasaver + +import "openreplay/backend/pkg/db/cache" + +type Saver struct { + pg *cache.PGCache +} + +func New(pg *cache.PGCache) *Saver { + return &Saver{pg: pg} +} diff --git a/backend/internal/datasaver/stats.go b/backend/internal/datasaver/stats.go index 250c01775..a57d91824 100644 --- a/backend/internal/datasaver/stats.go +++ b/backend/internal/datasaver/stats.go @@ -1,20 +1,11 @@ package datasaver import ( - "openreplay/backend/pkg/db/cache" . "openreplay/backend/pkg/db/types" . "openreplay/backend/pkg/messages" ) -type StatsInserter struct { - pg *cache.PGCache -} - -func NewStatsInserter(pg *cache.PGCache) *StatsInserter { - return &StatsInserter{pg: pg} -} - -func (si *StatsInserter) InsertStats(session *Session, msg Message) error { +func (si *Saver) InsertStats(session *Session, msg Message) error { switch m := msg.(type) { // Web case *PerformanceTrackAggr: