Moved service configs to config module

This commit is contained in:
Alexander Zavorotynskiy 2022-05-05 10:23:36 +02:00
parent 167d1e117e
commit c050394116
5 changed files with 61 additions and 40 deletions

View file

@ -2,6 +2,7 @@ package main
import (
"log"
"openreplay/backend/internal/config/db"
"openreplay/backend/internal/datasaver"
"openreplay/backend/internal/heuristics"
"time"
@ -12,7 +13,6 @@ import (
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/env"
logger "openreplay/backend/pkg/log"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
@ -22,22 +22,23 @@ import (
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := db.New()
// Init database
pg := cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000*60*20)
pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres), cfg.ProjectExpirationTimeoutMs)
defer pg.Close()
// Init modules
heurFinder := heuristics.NewHandler()
mi := datasaver.NewMessageInserter(pg)
si := datasaver.NewStatsInserter(pg)
statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"))
saver := datasaver.New(pg)
statsLogger := logger.NewQueueStats(cfg.LoggerTimeout)
// Handler logic
handler := func(sessionID uint64, msg messages.Message, meta *types.Meta) {
statsLogger.Collect(sessionID, meta)
// Just insert message into db without additional checks
if err := mi.InsertMessage(sessionID, msg); err != nil {
// Just save session data into db without additional checks
if err := saver.InsertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg)
}
@ -52,26 +53,26 @@ func main() {
return
}
// Insert statistics
err = si.InsertStats(session, msg)
// Save statistics to db
err = saver.InsertStats(session, msg)
if err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg)
}
// Insert heuristics
// Handle heuristics and save to temporary queue in memory
heurFinder.HandleMessage(session, msg)
// TODO: ???
// Process saved heuristics messages as usual messages above in the code
heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) {
// TODO: DRY code (carefully with the return statement logic)
if err := mi.InsertMessage(sessionID, msg); err != nil {
if err := saver.InsertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
return
}
if err := si.InsertStats(session, msg); err != nil {
if err := saver.InsertStats(session, msg); err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
})
@ -79,21 +80,21 @@ func main() {
// Init consumer
consumer := queue.NewMessageConsumer(
env.String("GROUP_DB"),
cfg.GroupDB,
[]string{
env.String("TOPIC_RAW_IOS"),
env.String("TOPIC_TRIGGER"),
cfg.TopicRawIOS,
cfg.TopicTrigger,
},
handler,
false,
)
log.Printf("Db service started\n")
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
tick := time.Tick(15 * time.Second)
log.Printf("Db service started\n")
tick := time.Tick(cfg.CommitBatchTimeout)
for {
select {
case sig := <-sigchan:
@ -113,5 +114,4 @@ func main() {
}
}
}
}

View file

@ -0,0 +1,28 @@
package db
import (
"openreplay/backend/pkg/env"
"time"
)
type Config struct {
Postgres string
ProjectExpirationTimeoutMs int64
LoggerTimeout int
GroupDB string
TopicRawIOS string
TopicTrigger string
CommitBatchTimeout time.Duration
}
func New() *Config {
return &Config{
Postgres: env.String("POSTGRES_STRING"),
ProjectExpirationTimeoutMs: 1000 * 60 * 20,
LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"),
GroupDB: env.String("GROUP_DB"),
TopicRawIOS: env.String("TOPIC_RAW_IOS"),
TopicTrigger: env.String("TOPIC_TRIGGER"),
CommitBatchTimeout: 15 * time.Second,
}
}

View file

@ -1,19 +1,10 @@
package datasaver
import (
"openreplay/backend/pkg/db/cache"
. "openreplay/backend/pkg/messages"
)
type MessageInserter struct {
pg *cache.PGCache
}
func NewMessageInserter(pg *cache.PGCache) *MessageInserter {
return &MessageInserter{pg: pg}
}
func (mi *MessageInserter) InsertMessage(sessionID uint64, msg Message) error {
func (mi *Saver) InsertMessage(sessionID uint64, msg Message) error {
switch m := msg.(type) {
// Common
case *Metadata:

View file

@ -0,0 +1,11 @@
package datasaver
import "openreplay/backend/pkg/db/cache"
type Saver struct {
pg *cache.PGCache
}
func New(pg *cache.PGCache) *Saver {
return &Saver{pg: pg}
}

View file

@ -1,20 +1,11 @@
package datasaver
import (
"openreplay/backend/pkg/db/cache"
. "openreplay/backend/pkg/db/types"
. "openreplay/backend/pkg/messages"
)
type StatsInserter struct {
pg *cache.PGCache
}
func NewStatsInserter(pg *cache.PGCache) *StatsInserter {
return &StatsInserter{pg: pg}
}
func (si *StatsInserter) InsertStats(session *Session, msg Message) error {
func (si *Saver) InsertStats(session *Session, msg Message) error {
switch m := msg.(type) {
// Web
case *PerformanceTrackAggr: