diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index 84b0d81ed..3cdd7d186 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -36,7 +36,7 @@ func main() { // Init database pg := cache.NewPGCache( - postgres.NewConn(cfg.Postgres.String(), cfg.BatchQueueLimit, cfg.BatchSizeLimit), cfg.ProjectExpirationTimeoutMs) + postgres.NewConn(cfg.Postgres.String(), cfg.BatchQueueLimit, cfg.BatchSizeLimit), cfg.ProjectExpiration) defer pg.Close() // HandlersFabric returns the list of message handlers we want to be applied to each incoming message. diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index da7ca9b89..5537c41d3 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -34,7 +34,7 @@ func main() { pprof.StartProfilingServer() } - pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres.String(), 0, 0), cfg.ProjectExpirationTimeoutMs) + pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres.String(), 0, 0), cfg.ProjectExpiration) defer pg.Close() sessions, err := sessionender.New(intervals.EVENTS_SESSION_END_TIMEOUT, cfg.PartitionsNumber) diff --git a/backend/cmd/http/main.go b/backend/cmd/http/main.go index 83eedaf29..244604ae4 100644 --- a/backend/cmd/http/main.go +++ b/backend/cmd/http/main.go @@ -36,7 +36,7 @@ func main() { defer producer.Close(15000) // Connect to database - dbConn := cache.NewPGCache(postgres.NewConn(cfg.Postgres.String(), 0, 0), 1000*60*20) + dbConn := cache.NewPGCache(postgres.NewConn(cfg.Postgres.String(), 0, 0), cfg.ProjectExpiration) defer dbConn.Close() // Build all services diff --git a/backend/internal/config/db/config.go b/backend/internal/config/db/config.go index d56137e77..f8fc73b64 100644 --- a/backend/internal/config/db/config.go +++ b/backend/internal/config/db/config.go @@ -9,17 +9,17 @@ import ( type Config struct { common.Config common.Postgres - ProjectExpirationTimeoutMs int64 `env:"PROJECT_EXPIRATION_TIMEOUT_MS,default=1200000"` - LoggerTimeout int `env:"LOG_QUEUE_STATS_INTERVAL_SEC,required"` - GroupDB string `env:"GROUP_DB,required"` - TopicRawWeb string `env:"TOPIC_RAW_WEB,required"` - TopicAnalytics string `env:"TOPIC_ANALYTICS,required"` - CommitBatchTimeout time.Duration `env:"COMMIT_BATCH_TIMEOUT,default=15s"` - BatchQueueLimit int `env:"DB_BATCH_QUEUE_LIMIT,required"` - BatchSizeLimit int `env:"DB_BATCH_SIZE_LIMIT,required"` - UseQuickwit bool `env:"QUICKWIT_ENABLED,default=false"` - QuickwitTopic string `env:"QUICKWIT_TOPIC,default=saas-quickwit"` - UseProfiler bool `env:"PROFILER_ENABLED,default=false"` + ProjectExpiration time.Duration `env:"PROJECT_EXPIRATION,default=10m"` + LoggerTimeout int `env:"LOG_QUEUE_STATS_INTERVAL_SEC,required"` + GroupDB string `env:"GROUP_DB,required"` + TopicRawWeb string `env:"TOPIC_RAW_WEB,required"` + TopicAnalytics string `env:"TOPIC_ANALYTICS,required"` + CommitBatchTimeout time.Duration `env:"COMMIT_BATCH_TIMEOUT,default=15s"` + BatchQueueLimit int `env:"DB_BATCH_QUEUE_LIMIT,required"` + BatchSizeLimit int `env:"DB_BATCH_SIZE_LIMIT,required"` + UseQuickwit bool `env:"QUICKWIT_ENABLED,default=false"` + QuickwitTopic string `env:"QUICKWIT_TOPIC,default=saas-quickwit"` + UseProfiler bool `env:"PROFILER_ENABLED,default=false"` } func New() *Config { diff --git a/backend/internal/config/ender/config.go b/backend/internal/config/ender/config.go index 23e927270..7819d3e65 100644 --- a/backend/internal/config/ender/config.go +++ b/backend/internal/config/ender/config.go @@ -3,19 +3,20 @@ package ender import ( "openreplay/backend/internal/config/common" "openreplay/backend/internal/config/configurator" + "time" ) type Config struct { common.Config common.Postgres - ProjectExpirationTimeoutMs int64 `env:"PROJECT_EXPIRATION_TIMEOUT_MS,default=1200000"` - GroupEnder string `env:"GROUP_ENDER,required"` - LoggerTimeout int `env:"LOG_QUEUE_STATS_INTERVAL_SEC,required"` - TopicRawWeb string `env:"TOPIC_RAW_WEB,required"` - ProducerTimeout int `env:"PRODUCER_TIMEOUT,default=2000"` - PartitionsNumber int `env:"PARTITIONS_NUMBER,required"` - UseEncryption bool `env:"USE_ENCRYPTION,default=false"` - UseProfiler bool `env:"PROFILER_ENABLED,default=false"` + ProjectExpiration time.Duration `env:"PROJECT_EXPIRATION,default=10m"` + GroupEnder string `env:"GROUP_ENDER,required"` + LoggerTimeout int `env:"LOG_QUEUE_STATS_INTERVAL_SEC,required"` + TopicRawWeb string `env:"TOPIC_RAW_WEB,required"` + ProducerTimeout int `env:"PRODUCER_TIMEOUT,default=2000"` + PartitionsNumber int `env:"PARTITIONS_NUMBER,required"` + UseEncryption bool `env:"USE_ENCRYPTION,default=false"` + UseProfiler bool `env:"PROFILER_ENABLED,default=false"` } func New() *Config { diff --git a/backend/internal/config/http/config.go b/backend/internal/config/http/config.go index a4af87bd3..ad2efe9fa 100644 --- a/backend/internal/config/http/config.go +++ b/backend/internal/config/http/config.go @@ -10,6 +10,7 @@ import ( type Config struct { common.Config common.Postgres + ProjectExpiration time.Duration `env:"PROJECT_EXPIRATION,default=10m"` HTTPHost string `env:"HTTP_HOST,default="` HTTPPort string `env:"HTTP_PORT,required"` HTTPTimeout time.Duration `env:"HTTP_TIMEOUT,default=60s"` diff --git a/backend/pkg/db/cache/cache.go b/backend/pkg/db/cache/cache.go index 99ec50724..8f4cfce25 100644 --- a/backend/pkg/db/cache/cache.go +++ b/backend/pkg/db/cache/cache.go @@ -35,12 +35,12 @@ type cacheImpl struct { projectExpirationTimeout time.Duration } -func NewCache(conn *postgres.Conn, projectExpirationTimeoutMs int64) Cache { +func NewCache(conn *postgres.Conn, projectExpiration time.Duration) Cache { newCache := &cacheImpl{ conn: conn, sessions: make(map[uint64]*SessionMeta), projects: make(map[uint32]*ProjectMeta), - projectExpirationTimeout: time.Duration(1000 * projectExpirationTimeoutMs), + projectExpirationTimeout: projectExpiration, } go newCache.cleaner() return newCache diff --git a/backend/pkg/db/cache/pg-cache.go b/backend/pkg/db/cache/pg-cache.go index 1b7a6710d..d11c5f9cd 100644 --- a/backend/pkg/db/cache/pg-cache.go +++ b/backend/pkg/db/cache/pg-cache.go @@ -2,6 +2,7 @@ package cache import ( "openreplay/backend/pkg/db/postgres" + "time" ) type PGCache struct { @@ -9,9 +10,9 @@ type PGCache struct { Cache Cache } -func NewPGCache(conn *postgres.Conn, projectExpirationTimeoutMs int64) *PGCache { +func NewPGCache(conn *postgres.Conn, projectExpiration time.Duration) *PGCache { // Create in-memory cache layer for sessions and projects - c := NewCache(conn, projectExpirationTimeoutMs) + c := NewCache(conn, projectExpiration) // Return PG wrapper with integrated cache layer return &PGCache{ Conn: conn, diff --git a/ee/backend/internal/db/datasaver/messages.go b/ee/backend/internal/db/datasaver/messages.go index 0a729ee63..931803723 100644 --- a/ee/backend/internal/db/datasaver/messages.go +++ b/ee/backend/internal/db/datasaver/messages.go @@ -63,7 +63,7 @@ func (mi *Saver) InsertMessage(msg Message) error { if err != nil { log.Printf("can't get session info for CH: %s", err) } else { - project, err := mi.pg.GetProject(session.ProjectID) + project, err := mi.pg.Cache.GetProject(session.ProjectID) if err != nil { log.Printf("can't get project: %s", err) } else {