From 1495f3bc5d7208eff9053181665050543709412a Mon Sep 17 00:00:00 2001 From: Alex Kaminskii Date: Fri, 3 Jun 2022 13:52:31 +0200 Subject: [PATCH] fix(backend/ee/kafka): Partition-wise back-commit --- ee/backend/pkg/kafka/consumer.go | 57 +++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/ee/backend/pkg/kafka/consumer.go b/ee/backend/pkg/kafka/consumer.go index 65d2cd830..e252d7aa3 100644 --- a/ee/backend/pkg/kafka/consumer.go +++ b/ee/backend/pkg/kafka/consumer.go @@ -22,7 +22,7 @@ type Consumer struct { commitTicker *time.Ticker pollTimeout uint - lastKafkaEventTs int64 + lastReceivedPrtTs map[int32]int64 } func NewConsumer( @@ -77,7 +77,10 @@ func (consumer *Consumer) Commit() error { return nil } -func (consumer *Consumer) CommitAtTimestamp(commitTs int64) error { +func (consumer *Consumer) commitAtTimestamps( + getPartitionTime func(kafka.TopicPartition) (bool, int64), + limitToCommitted bool, +) error { assigned, err := consumer.c.Assignment() if err != nil { return err @@ -85,7 +88,11 @@ func (consumer *Consumer) CommitAtTimestamp(commitTs int64) error { logPartitions("Actually assigned:", assigned) var timestamps []kafka.TopicPartition - for _, p := range assigned { // p is a copy here sinse partition is not a pointer + for _, p := range assigned { // p is a copy here since it is not a pointer + shouldCommit, commitTs := getPartitionTime(p) + if !shouldCommit { + continue + } // didn't receive anything yet p.Offset = kafka.Offset(commitTs) timestamps = append(timestamps, p) } @@ -94,13 +101,13 @@ func (consumer *Consumer) CommitAtTimestamp(commitTs int64) error { return errors.Wrap(err, "Kafka Consumer back commit error") } - // Limiting to already committed - committed, err := consumer.c.Committed(assigned, 2000) // memorise? - logPartitions("Actually committed:", committed) - if err != nil { - return errors.Wrap(err, "Kafka Consumer retrieving committed error") - } - for _, offs := range offsets { + if limitToCommitted { + // Limiting to already committed + committed, err := consumer.c.Committed(assigned, 2000) // memorise? + if err != nil { + return errors.Wrap(err, "Kafka Consumer retrieving committed error") + } + logPartitions("Actually committed:", committed) for _, comm := range committed { if comm.Offset == kafka.OffsetStored || comm.Offset == kafka.OffsetInvalid || @@ -108,10 +115,12 @@ func (consumer *Consumer) CommitAtTimestamp(commitTs int64) error { comm.Offset == kafka.OffsetEnd { continue } - if comm.Partition == offs.Partition && - (comm.Topic != nil && offs.Topic != nil && *comm.Topic == *offs.Topic) && - comm.Offset > offs.Offset { - offs.Offset = comm.Offset + for _, offs := range offsets { + if offs.Partition == comm.Partition && + (comm.Topic != nil && offs.Topic != nil && *comm.Topic == *offs.Topic) && + comm.Offset > offs.Offset { + offs.Offset = comm.Offset + } } } } @@ -122,11 +131,19 @@ func (consumer *Consumer) CommitAtTimestamp(commitTs int64) error { } func (consumer *Consumer) CommitBack(gap int64) error { - if consumer.lastKafkaEventTs == 0 { - return nil - } - commitTs := consumer.lastKafkaEventTs - gap - return consumer.CommitAtTimestamp(commitTs) + return consumer.commitAtTimestamps(func(p kafka.TopicPartition) (bool, int64) { + lastTs, ok := consumer.lastReceivedPrtTs[p.Partition] + if !ok { + return false, 0 + } + return true, lastTs - gap + }, true) +} + +func (consumer *Consumer) CommitAtTimestamp(commitTs int64) error { + return consumer.commitAtTimestamps(func(p kafka.TopicPartition) (bool, int64) { + return true, commitTs + }, false) } func (consumer *Consumer) ConsumeNext() error { @@ -154,7 +171,7 @@ func (consumer *Consumer) ConsumeNext() error { ID: uint64(e.TopicPartition.Offset), Timestamp: ts, }) - consumer.lastKafkaEventTs = ts + consumer.lastReceivedPrtTs[e.TopicPartition.Partition] = ts // case kafka.AssignedPartitions: // logPartitions("Kafka Consumer: Partitions Assigned", e.Partitions) // consumer.partitions = e.Partitions