feat(backend): increased kafka message size
This commit is contained in:
parent
7224dfdefb
commit
242c6cfc57
3 changed files with 7 additions and 5 deletions
|
|
@ -25,7 +25,7 @@ ENV TZ=UTC \
|
||||||
MAXMINDDB_FILE=/root/geoip.mmdb \
|
MAXMINDDB_FILE=/root/geoip.mmdb \
|
||||||
UAPARSER_FILE=/root/regexes.yaml \
|
UAPARSER_FILE=/root/regexes.yaml \
|
||||||
HTTP_PORT=80 \
|
HTTP_PORT=80 \
|
||||||
BEACON_SIZE_LIMIT=7000000 \
|
BEACON_SIZE_LIMIT=3000000 \
|
||||||
KAFKA_USE_SSL=true \
|
KAFKA_USE_SSL=true \
|
||||||
KAFKA_MAX_POLL_INTERVAL_MS=400000 \
|
KAFKA_MAX_POLL_INTERVAL_MS=400000 \
|
||||||
REDIS_STREAMS_MAX_LEN=3000 \
|
REDIS_STREAMS_MAX_LEN=3000 \
|
||||||
|
|
@ -45,7 +45,8 @@ ENV TZ=UTC \
|
||||||
CACHE_ASSETS=true \
|
CACHE_ASSETS=true \
|
||||||
ASSETS_SIZE_LIMIT=6291456 \
|
ASSETS_SIZE_LIMIT=6291456 \
|
||||||
FS_CLEAN_HRS=72 \
|
FS_CLEAN_HRS=72 \
|
||||||
LOG_QUEUE_STATS_INTERVAL_SEC=60
|
LOG_QUEUE_STATS_INTERVAL_SEC=60 \
|
||||||
|
QUEUE_MESSAGE_SIZE_LIMIT=1048576
|
||||||
|
|
||||||
|
|
||||||
ARG SERVICE_NAME
|
ARG SERVICE_NAME
|
||||||
|
|
|
||||||
|
|
@ -43,6 +43,7 @@ func NewConsumer(
|
||||||
"security.protocol": protocol,
|
"security.protocol": protocol,
|
||||||
"go.application.rebalance.enable": true,
|
"go.application.rebalance.enable": true,
|
||||||
"max.poll.interval.ms": env.Int("KAFKA_MAX_POLL_INTERVAL_MS"),
|
"max.poll.interval.ms": env.Int("KAFKA_MAX_POLL_INTERVAL_MS"),
|
||||||
|
"max.partition.fetch.bytes": env.Int("QUEUE_MESSAGE_SIZE_LIMIT"),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalln(err)
|
log.Fatalln(err)
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,9 @@
|
||||||
package kafka
|
package kafka
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
|
||||||
|
|
||||||
"openreplay/backend/pkg/env"
|
|
||||||
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
|
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
|
||||||
|
"log"
|
||||||
|
"openreplay/backend/pkg/env"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Producer struct {
|
type Producer struct {
|
||||||
|
|
@ -23,6 +22,7 @@ func NewProducer() *Producer {
|
||||||
"security.protocol": protocol,
|
"security.protocol": protocol,
|
||||||
"go.batch.producer": true,
|
"go.batch.producer": true,
|
||||||
"queue.buffering.max.ms": 100,
|
"queue.buffering.max.ms": 100,
|
||||||
|
"message.max.bytes": env.Int("QUEUE_MESSAGE_SIZE_LIMIT"),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalln(err)
|
log.Fatalln(err)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue