feat(backend): added batch and compression configuration for kafka producer

This commit is contained in:
Alexander Zavorotynskiy 2023-01-11 16:52:32 +01:00
parent 44f9e4c120
commit 6972f84275
2 changed files with 16 additions and 9 deletions

View file

@ -76,8 +76,8 @@ ENV TZ=UTC \
USE_FAILOVER=false \
GROUP_STORAGE_FAILOVER=failover \
TOPIC_STORAGE_FAILOVER=storage-failover \
PROFILER_ENABLED=false
PROFILER_ENABLED=false \
COMPRESSION_TYPE=zstd
ARG SERVICE_NAME

View file

@ -15,13 +15,20 @@ type Producer struct {
func NewProducer(messageSizeLimit int, useBatch bool) *Producer {
kafkaConfig := &kafka.ConfigMap{
"enable.idempotence": true,
"bootstrap.servers": env.String("KAFKA_SERVERS"),
"go.delivery.reports": true,
"security.protocol": "plaintext",
"go.batch.producer": useBatch,
"queue.buffering.max.ms": 100,
"message.max.bytes": messageSizeLimit,
"enable.idempotence": true,
"bootstrap.servers": env.String("KAFKA_SERVERS"),
"go.delivery.reports": true,
"security.protocol": "plaintext",
"go.batch.producer": useBatch,
"message.max.bytes": messageSizeLimit, // should be synced with broker config
"linger.ms": 1000,
"queue.buffering.max.ms": 1000,
"batch.num.messages": 1000,
"queue.buffering.max.messages": 1000,
"retries": 3,
"retry.backoff.ms": 100,
"max.in.flight.requests.per.connection": 1,
"compression.type": env.String("COMPRESSION_TYPE"),
}
// Apply ssl configuration
if env.Bool("KAFKA_USE_SSL") {