From caf66b305a21afa79ef817059f4a85e8b1641dca Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Tue, 21 Jun 2022 16:19:22 +0200 Subject: [PATCH] fix(backend): fixed bug when ender triggered on sessionEnd message --- backend/Dockerfile | 6 +++--- backend/cmd/db/main.go | 2 +- backend/cmd/ender/main.go | 6 +++++- backend/cmd/http/main.go | 2 +- backend/cmd/integrations/main.go | 2 +- backend/internal/config/db/config.go | 4 ++-- backend/pkg/db/postgres/connector.go | 10 ++++++---- 7 files changed, 19 insertions(+), 13 deletions(-) diff --git a/backend/Dockerfile b/backend/Dockerfile index 11f0100e9..40377b6fe 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -28,7 +28,7 @@ ENV TZ=UTC \ MAXMINDDB_FILE=/root/geoip.mmdb \ UAPARSER_FILE=/root/regexes.yaml \ HTTP_PORT=80 \ - BEACON_SIZE_LIMIT=7000000 \ + BEACON_SIZE_LIMIT=1000000 \ KAFKA_USE_SSL=true \ KAFKA_MAX_POLL_INTERVAL_MS=400000 \ REDIS_STREAMS_MAX_LEN=10000 \ @@ -51,8 +51,8 @@ ENV TZ=UTC \ FS_CLEAN_HRS=72 \ FILE_SPLIT_SIZE=300000 \ LOG_QUEUE_STATS_INTERVAL_SEC=60 \ - BATCH_QUEUE_LIMIT=20 \ - BATCH_SIZE_LIMIT=10000000 \ + DB_BATCH_QUEUE_LIMIT=20 \ + DB_BATCH_SIZE_LIMIT=10000000 \ PARTITIONS_NUMBER=1 diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index c863fdbeb..98bddef45 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -32,7 +32,7 @@ func main() { cfg := db.New() // Init database - pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres), cfg.ProjectExpirationTimeoutMs) + pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres, cfg.BatchQueueLimit, cfg.BatchSizeLimit), cfg.ProjectExpirationTimeoutMs) defer pg.Close() // HandlersFabric returns the list of message handlers we want to be applied to each incoming message. diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index 9751f26f3..9513dc16c 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -32,7 +32,7 @@ func main() { // Load service configuration cfg := ender.New() - pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres), cfg.ProjectExpirationTimeoutMs) + pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres, 0, 0), cfg.ProjectExpirationTimeoutMs) defer pg.Close() // Init all modules @@ -49,6 +49,10 @@ func main() { cfg.TopicRawWeb, }, func(sessionID uint64, msg messages.Message, meta *types.Meta) { + if msg.TypeID() == 3 { + // Skip message end + return + } statsLogger.Collect(sessionID, meta) sessions.UpdateSession(sessionID, meta.Timestamp) }, diff --git a/backend/cmd/http/main.go b/backend/cmd/http/main.go index 0c9603601..96562aee1 100644 --- a/backend/cmd/http/main.go +++ b/backend/cmd/http/main.go @@ -35,7 +35,7 @@ func main() { defer producer.Close(15000) // Connect to database - dbConn := cache.NewPGCache(postgres.NewConn(cfg.Postgres), 1000*60*20) + dbConn := cache.NewPGCache(postgres.NewConn(cfg.Postgres, 0, 0), 1000*60*20) defer dbConn.Close() // Build all services diff --git a/backend/cmd/integrations/main.go b/backend/cmd/integrations/main.go index 246e2c1fc..e25c1690f 100644 --- a/backend/cmd/integrations/main.go +++ b/backend/cmd/integrations/main.go @@ -26,7 +26,7 @@ func main() { cfg := config.New() - pg := postgres.NewConn(cfg.PostgresURI) + pg := postgres.NewConn(cfg.PostgresURI, 0, 0) defer pg.Close() tokenizer := token.NewTokenizer(cfg.TokenSecret) diff --git a/backend/internal/config/db/config.go b/backend/internal/config/db/config.go index f467b0c5c..25767afbf 100644 --- a/backend/internal/config/db/config.go +++ b/backend/internal/config/db/config.go @@ -26,7 +26,7 @@ func New() *Config { TopicRawWeb: env.String("TOPIC_RAW_WEB"), TopicAnalytics: env.String("TOPIC_ANALYTICS"), CommitBatchTimeout: 15 * time.Second, - BatchQueueLimit: env.Int("BATCH_QUEUE_LIMIT"), - BatchSizeLimit: env.Int("BATCH_SIZE_LIMIT"), + BatchQueueLimit: env.Int("DB_BATCH_QUEUE_LIMIT"), + BatchSizeLimit: env.Int("DB_BATCH_SIZE_LIMIT"), } } diff --git a/backend/pkg/db/postgres/connector.go b/backend/pkg/db/postgres/connector.go index 9b9724e58..532db3336 100644 --- a/backend/pkg/db/postgres/connector.go +++ b/backend/pkg/db/postgres/connector.go @@ -22,16 +22,18 @@ type Conn struct { batchSizeLimit int } -func NewConn(url string) *Conn { +func NewConn(url string, queueLimit, sizeLimit int) *Conn { c, err := pgxpool.Connect(context.Background(), url) if err != nil { log.Println(err) log.Fatalln("pgxpool.Connect Error") } return &Conn{ - c: c, - batches: make(map[uint64]*pgx.Batch), - batchSizes: make(map[uint64]int), + c: c, + batches: make(map[uint64]*pgx.Batch), + batchSizes: make(map[uint64]int), + batchQueueLimit: queueLimit, + batchSizeLimit: sizeLimit, } }