fix(backend): added error log in kafka producer
This commit is contained in:
parent
acdd3596bc
commit
94c56205b9
1 changed files with 16 additions and 2 deletions
|
|
@ -1,10 +1,11 @@
|
|||
package kafka
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"openreplay/backend/pkg/env"
|
||||
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
|
||||
"openreplay/backend/pkg/env"
|
||||
)
|
||||
|
||||
type Producer struct {
|
||||
|
|
@ -27,7 +28,20 @@ func NewProducer() *Producer {
|
|||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
return &Producer{producer}
|
||||
newProducer := &Producer{producer}
|
||||
go newProducer.errorHandler()
|
||||
return newProducer
|
||||
}
|
||||
|
||||
func (p *Producer) errorHandler() {
|
||||
for e := range p.producer.Events() {
|
||||
switch ev := e.(type) {
|
||||
case *kafka.Message:
|
||||
if ev.TopicPartition.Error != nil {
|
||||
fmt.Printf("Delivery failed: topicPartition: %v, key: %d\n", ev.TopicPartition, decodeKey(ev.Key))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Producer) Produce(topic string, key uint64, value []byte) error {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue