fix(backend/ee/kafka): Partition-wise back-commit

This commit is contained in:
Alex Kaminskii 2022-06-03 13:52:31 +02:00
parent f626636ed7
commit 1495f3bc5d

View file

@ -22,7 +22,7 @@ type Consumer struct {
commitTicker *time.Ticker
pollTimeout uint
lastKafkaEventTs int64
lastReceivedPrtTs map[int32]int64
}
func NewConsumer(
@ -77,7 +77,10 @@ func (consumer *Consumer) Commit() error {
return nil
}
func (consumer *Consumer) CommitAtTimestamp(commitTs int64) error {
func (consumer *Consumer) commitAtTimestamps(
getPartitionTime func(kafka.TopicPartition) (bool, int64),
limitToCommitted bool,
) error {
assigned, err := consumer.c.Assignment()
if err != nil {
return err
@ -85,7 +88,11 @@ func (consumer *Consumer) CommitAtTimestamp(commitTs int64) error {
logPartitions("Actually assigned:", assigned)
var timestamps []kafka.TopicPartition
for _, p := range assigned { // p is a copy here sinse partition is not a pointer
for _, p := range assigned { // p is a copy here since it is not a pointer
shouldCommit, commitTs := getPartitionTime(p)
if !shouldCommit {
continue
} // didn't receive anything yet
p.Offset = kafka.Offset(commitTs)
timestamps = append(timestamps, p)
}
@ -94,13 +101,13 @@ func (consumer *Consumer) CommitAtTimestamp(commitTs int64) error {
return errors.Wrap(err, "Kafka Consumer back commit error")
}
// Limiting to already committed
committed, err := consumer.c.Committed(assigned, 2000) // memorise?
logPartitions("Actually committed:", committed)
if err != nil {
return errors.Wrap(err, "Kafka Consumer retrieving committed error")
}
for _, offs := range offsets {
if limitToCommitted {
// Limiting to already committed
committed, err := consumer.c.Committed(assigned, 2000) // memorise?
if err != nil {
return errors.Wrap(err, "Kafka Consumer retrieving committed error")
}
logPartitions("Actually committed:", committed)
for _, comm := range committed {
if comm.Offset == kafka.OffsetStored ||
comm.Offset == kafka.OffsetInvalid ||
@ -108,10 +115,12 @@ func (consumer *Consumer) CommitAtTimestamp(commitTs int64) error {
comm.Offset == kafka.OffsetEnd {
continue
}
if comm.Partition == offs.Partition &&
(comm.Topic != nil && offs.Topic != nil && *comm.Topic == *offs.Topic) &&
comm.Offset > offs.Offset {
offs.Offset = comm.Offset
for _, offs := range offsets {
if offs.Partition == comm.Partition &&
(comm.Topic != nil && offs.Topic != nil && *comm.Topic == *offs.Topic) &&
comm.Offset > offs.Offset {
offs.Offset = comm.Offset
}
}
}
}
@ -122,11 +131,19 @@ func (consumer *Consumer) CommitAtTimestamp(commitTs int64) error {
}
func (consumer *Consumer) CommitBack(gap int64) error {
if consumer.lastKafkaEventTs == 0 {
return nil
}
commitTs := consumer.lastKafkaEventTs - gap
return consumer.CommitAtTimestamp(commitTs)
return consumer.commitAtTimestamps(func(p kafka.TopicPartition) (bool, int64) {
lastTs, ok := consumer.lastReceivedPrtTs[p.Partition]
if !ok {
return false, 0
}
return true, lastTs - gap
}, true)
}
func (consumer *Consumer) CommitAtTimestamp(commitTs int64) error {
return consumer.commitAtTimestamps(func(p kafka.TopicPartition) (bool, int64) {
return true, commitTs
}, false)
}
func (consumer *Consumer) ConsumeNext() error {
@ -154,7 +171,7 @@ func (consumer *Consumer) ConsumeNext() error {
ID: uint64(e.TopicPartition.Offset),
Timestamp: ts,
})
consumer.lastKafkaEventTs = ts
consumer.lastReceivedPrtTs[e.TopicPartition.Partition] = ts
// case kafka.AssignedPartitions:
// logPartitions("Kafka Consumer: Partitions Assigned", e.Partitions)
// consumer.partitions = e.Partitions