fix(backend): fixed bug when ender triggered on sessionEnd message

This commit is contained in:
Alexander Zavorotynskiy 2022-06-21 16:19:22 +02:00
parent 26daf936c5
commit caf66b305a
7 changed files with 19 additions and 13 deletions

View file

@ -28,7 +28,7 @@ ENV TZ=UTC \
MAXMINDDB_FILE=/root/geoip.mmdb \
UAPARSER_FILE=/root/regexes.yaml \
HTTP_PORT=80 \
BEACON_SIZE_LIMIT=7000000 \
BEACON_SIZE_LIMIT=1000000 \
KAFKA_USE_SSL=true \
KAFKA_MAX_POLL_INTERVAL_MS=400000 \
REDIS_STREAMS_MAX_LEN=10000 \
@ -51,8 +51,8 @@ ENV TZ=UTC \
FS_CLEAN_HRS=72 \
FILE_SPLIT_SIZE=300000 \
LOG_QUEUE_STATS_INTERVAL_SEC=60 \
BATCH_QUEUE_LIMIT=20 \
BATCH_SIZE_LIMIT=10000000 \
DB_BATCH_QUEUE_LIMIT=20 \
DB_BATCH_SIZE_LIMIT=10000000 \
PARTITIONS_NUMBER=1

View file

@ -32,7 +32,7 @@ func main() {
cfg := db.New()
// Init database
pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres), cfg.ProjectExpirationTimeoutMs)
pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres, cfg.BatchQueueLimit, cfg.BatchSizeLimit), cfg.ProjectExpirationTimeoutMs)
defer pg.Close()
// HandlersFabric returns the list of message handlers we want to be applied to each incoming message.

View file

@ -32,7 +32,7 @@ func main() {
// Load service configuration
cfg := ender.New()
pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres), cfg.ProjectExpirationTimeoutMs)
pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres, 0, 0), cfg.ProjectExpirationTimeoutMs)
defer pg.Close()
// Init all modules
@ -49,6 +49,10 @@ func main() {
cfg.TopicRawWeb,
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
if msg.TypeID() == 3 {
// Skip message end
return
}
statsLogger.Collect(sessionID, meta)
sessions.UpdateSession(sessionID, meta.Timestamp)
},

View file

@ -35,7 +35,7 @@ func main() {
defer producer.Close(15000)
// Connect to database
dbConn := cache.NewPGCache(postgres.NewConn(cfg.Postgres), 1000*60*20)
dbConn := cache.NewPGCache(postgres.NewConn(cfg.Postgres, 0, 0), 1000*60*20)
defer dbConn.Close()
// Build all services

View file

@ -26,7 +26,7 @@ func main() {
cfg := config.New()
pg := postgres.NewConn(cfg.PostgresURI)
pg := postgres.NewConn(cfg.PostgresURI, 0, 0)
defer pg.Close()
tokenizer := token.NewTokenizer(cfg.TokenSecret)

View file

@ -26,7 +26,7 @@ func New() *Config {
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
TopicAnalytics: env.String("TOPIC_ANALYTICS"),
CommitBatchTimeout: 15 * time.Second,
BatchQueueLimit: env.Int("BATCH_QUEUE_LIMIT"),
BatchSizeLimit: env.Int("BATCH_SIZE_LIMIT"),
BatchQueueLimit: env.Int("DB_BATCH_QUEUE_LIMIT"),
BatchSizeLimit: env.Int("DB_BATCH_SIZE_LIMIT"),
}
}

View file

@ -22,16 +22,18 @@ type Conn struct {
batchSizeLimit int
}
func NewConn(url string) *Conn {
func NewConn(url string, queueLimit, sizeLimit int) *Conn {
c, err := pgxpool.Connect(context.Background(), url)
if err != nil {
log.Println(err)
log.Fatalln("pgxpool.Connect Error")
}
return &Conn{
c: c,
batches: make(map[uint64]*pgx.Batch),
batchSizes: make(map[uint64]int),
c: c,
batches: make(map[uint64]*pgx.Batch),
batchSizes: make(map[uint64]int),
batchQueueLimit: queueLimit,
batchSizeLimit: sizeLimit,
}
}