diff --git a/ee/backend/pkg/kafka/consumer.go b/ee/backend/pkg/kafka/consumer.go index f86912938..b951fcd9c 100644 --- a/ee/backend/pkg/kafka/consumer.go +++ b/ee/backend/pkg/kafka/consumer.go @@ -177,6 +177,7 @@ func (consumer *Consumer) ConsumeNext() error { decodeKey(e.Key), *(e.TopicPartition.Topic), uint64(e.TopicPartition.Offset), + uint64(e.TopicPartition.Partition), ts)) consumer.lastReceivedPrtTs[e.TopicPartition.Partition] = ts case kafka.Error: