From 94c56205b98a3d2175567a9df6b180a5e6de45a9 Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Mon, 16 May 2022 18:56:43 +0200 Subject: [PATCH] fix(backend): added error log in kafka producer --- ee/backend/pkg/kafka/producer.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/ee/backend/pkg/kafka/producer.go b/ee/backend/pkg/kafka/producer.go index 3b6bd3927..7f59d971f 100644 --- a/ee/backend/pkg/kafka/producer.go +++ b/ee/backend/pkg/kafka/producer.go @@ -1,10 +1,11 @@ package kafka import ( + "fmt" "log" - "openreplay/backend/pkg/env" "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" + "openreplay/backend/pkg/env" ) type Producer struct { @@ -27,7 +28,20 @@ func NewProducer() *Producer { if err != nil { log.Fatalln(err) } - return &Producer{producer} + newProducer := &Producer{producer} + go newProducer.errorHandler() + return newProducer +} + +func (p *Producer) errorHandler() { + for e := range p.producer.Events() { + switch ev := e.(type) { + case *kafka.Message: + if ev.TopicPartition.Error != nil { + fmt.Printf("Delivery failed: topicPartition: %v, key: %d\n", ev.TopicPartition, decodeKey(ev.Key)) + } + } + } } func (p *Producer) Produce(topic string, key uint64, value []byte) error {