fix(ee/kafka): restart worker after consumer timeout error

This commit is contained in:
ShiKhu 2022-03-30 13:13:50 +02:00
parent a034f97ad4
commit 5fea1fb785

View file

@ -9,9 +9,9 @@ import (
"github.com/pkg/errors"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
"openreplay/backend/pkg/env"
"openreplay/backend/pkg/queue/types"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
type Message = kafka.Message
@ -19,7 +19,7 @@ type Message = kafka.Message
type Consumer struct {
c *kafka.Consumer
messageHandler types.MessageHandler
commitTicker *time.Ticker
commitTicker *time.Ticker
pollTimeout uint
lastKafkaEventTs int64
@ -56,7 +56,7 @@ func NewConsumer(group string, topics []string, messageHandler types.MessageHand
return &Consumer{
c: c,
messageHandler: messageHandler,
commitTicker: time.NewTicker(2 * time.Minute),
commitTicker: time.NewTicker(2 * time.Minute),
pollTimeout: 200,
}
}
@ -65,13 +65,12 @@ func (consumer *Consumer) DisableAutoCommit() {
consumer.commitTicker.Stop()
}
func (consumer *Consumer) Commit() error {
consumer.c.Commit() // TODO: return error if it is not "No offset stored"
return nil
}
func (consumer *Consumer) CommitAtTimestamp(commitTs int64) error {
func (consumer *Consumer) CommitAtTimestamp(commitTs int64) error {
assigned, err := consumer.c.Assignment()
if err != nil {
return err
@ -84,37 +83,38 @@ func (consumer *Consumer) CommitAtTimestamp(commitTs int64) error {
timestamps = append(timestamps, p)
}
offsets, err := consumer.c.OffsetsForTimes(timestamps, 2000)
if err != nil {
if err != nil {
return errors.Wrap(err, "Kafka Consumer back commit error")
}
// Limiting to already committed
committed, err := consumer.c.Committed(assigned, 2000) // memorise?
logPartitions("Actually committed:",committed)
logPartitions("Actually committed:", committed)
if err != nil {
return errors.Wrap(err, "Kafka Consumer retrieving committed error")
}
for _, offs := range offsets {
for _, comm := range committed {
if comm.Offset == kafka.OffsetStored ||
if comm.Offset == kafka.OffsetStored ||
comm.Offset == kafka.OffsetInvalid ||
comm.Offset == kafka.OffsetBeginning ||
comm.Offset == kafka.OffsetEnd { continue }
if comm.Partition == offs.Partition &&
comm.Offset == kafka.OffsetBeginning ||
comm.Offset == kafka.OffsetEnd {
continue
}
if comm.Partition == offs.Partition &&
(comm.Topic != nil && offs.Topic != nil && *comm.Topic == *offs.Topic) &&
comm.Offset > offs.Offset {
comm.Offset > offs.Offset {
offs.Offset = comm.Offset
}
}
}
// TODO: check per-partition errors: offsets[i].Error
// TODO: check per-partition errors: offsets[i].Error
_, err = consumer.c.CommitOffsets(offsets)
return errors.Wrap(err, "Kafka Consumer back commit error")
}
func (consumer *Consumer) CommitBack(gap int64) error {
func (consumer *Consumer) CommitBack(gap int64) error {
if consumer.lastKafkaEventTs == 0 {
return nil
}
@ -135,31 +135,31 @@ func (consumer *Consumer) ConsumeNext() error {
}
switch e := ev.(type) {
case *kafka.Message:
if e.TopicPartition.Error != nil {
return errors.Wrap(e.TopicPartition.Error, "Consumer Partition Error")
}
ts := e.Timestamp.UnixNano()/ 1e6
consumer.messageHandler(decodeKey(e.Key), e.Value, &types.Meta{
Topic: *(e.TopicPartition.Topic),
ID: uint64(e.TopicPartition.Offset),
Timestamp: ts,
})
consumer.lastKafkaEventTs = ts
// case kafka.AssignedPartitions:
// logPartitions("Kafka Consumer: Partitions Assigned", e.Partitions)
// consumer.partitions = e.Partitions
// consumer.c.Assign(e.Partitions)
// log.Printf("Actually partitions assigned!")
// case kafka.RevokedPartitions:
// log.Println("Kafka Cosumer: Partitions Revoked")
// consumer.partitions = nil
// consumer.c.Unassign()
case kafka.Error:
if e.Code() == kafka.ErrAllBrokersDown {
os.Exit(1)
}
log.Printf("Consumer error: %v\n", e)
case *kafka.Message:
if e.TopicPartition.Error != nil {
return errors.Wrap(e.TopicPartition.Error, "Consumer Partition Error")
}
ts := e.Timestamp.UnixNano() / 1e6
consumer.messageHandler(decodeKey(e.Key), e.Value, &types.Meta{
Topic: *(e.TopicPartition.Topic),
ID: uint64(e.TopicPartition.Offset),
Timestamp: ts,
})
consumer.lastKafkaEventTs = ts
// case kafka.AssignedPartitions:
// logPartitions("Kafka Consumer: Partitions Assigned", e.Partitions)
// consumer.partitions = e.Partitions
// consumer.c.Assign(e.Partitions)
// log.Printf("Actually partitions assigned!")
// case kafka.RevokedPartitions:
// log.Println("Kafka Cosumer: Partitions Revoked")
// consumer.partitions = nil
// consumer.c.Unassign()
case kafka.Error:
if e.Code() == kafka.ErrAllBrokersDown || e.Code() == kafka.ErrMaxPollExceeded {
os.Exit(1)
}
log.Printf("Consumer error: %v\n", e)
}
return nil
}
@ -173,8 +173,6 @@ func (consumer *Consumer) Close() {
}
}
// func (consumer *Consumer) consume(
// message func(m *kafka.Message) error,
// commit func(c *kafka.Consumer) error,
@ -230,7 +228,6 @@ func (consumer *Consumer) Close() {
// }
// }
// func (consumer *Consumer) Consume(
// message func(key uint64, value []byte) error,
// ) error {