diff --git a/backend/Dockerfile b/backend/Dockerfile index b7a494f86..710430306 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -25,7 +25,7 @@ ENV TZ=UTC \ MAXMINDDB_FILE=/root/geoip.mmdb \ UAPARSER_FILE=/root/regexes.yaml \ HTTP_PORT=80 \ - BEACON_SIZE_LIMIT=7000000 \ + BEACON_SIZE_LIMIT=3000000 \ KAFKA_USE_SSL=true \ KAFKA_MAX_POLL_INTERVAL_MS=400000 \ REDIS_STREAMS_MAX_LEN=3000 \ @@ -45,7 +45,8 @@ ENV TZ=UTC \ CACHE_ASSETS=true \ ASSETS_SIZE_LIMIT=6291456 \ FS_CLEAN_HRS=72 \ - LOG_QUEUE_STATS_INTERVAL_SEC=60 + LOG_QUEUE_STATS_INTERVAL_SEC=60 \ + QUEUE_MESSAGE_SIZE_LIMIT=1048576 ARG SERVICE_NAME diff --git a/ee/backend/pkg/kafka/consumer.go b/ee/backend/pkg/kafka/consumer.go index 65d2cd830..30210206a 100644 --- a/ee/backend/pkg/kafka/consumer.go +++ b/ee/backend/pkg/kafka/consumer.go @@ -43,6 +43,7 @@ func NewConsumer( "security.protocol": protocol, "go.application.rebalance.enable": true, "max.poll.interval.ms": env.Int("KAFKA_MAX_POLL_INTERVAL_MS"), + "max.partition.fetch.bytes": env.Int("QUEUE_MESSAGE_SIZE_LIMIT"), }) if err != nil { log.Fatalln(err) diff --git a/ee/backend/pkg/kafka/producer.go b/ee/backend/pkg/kafka/producer.go index 3b6bd3927..5946cc721 100644 --- a/ee/backend/pkg/kafka/producer.go +++ b/ee/backend/pkg/kafka/producer.go @@ -1,10 +1,9 @@ package kafka import ( - "log" - - "openreplay/backend/pkg/env" "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" + "log" + "openreplay/backend/pkg/env" ) type Producer struct { @@ -23,6 +22,7 @@ func NewProducer() *Producer { "security.protocol": protocol, "go.batch.producer": true, "queue.buffering.max.ms": 100, + "message.max.bytes": env.Int("QUEUE_MESSAGE_SIZE_LIMIT"), }) if err != nil { log.Fatalln(err)