diff --git a/ee/backend/pkg/kafka/consumer.go b/ee/backend/pkg/kafka/consumer.go index a9d0c2a3a..1483c2ccf 100644 --- a/ee/backend/pkg/kafka/consumer.go +++ b/ee/backend/pkg/kafka/consumer.go @@ -23,7 +23,6 @@ type Consumer struct { pollTimeout uint lastKafkaEventTs int64 - partitions []kafka.TopicPartition } func NewConsumer(group string, topics []string, messageHandler types.MessageHandler) *Consumer { @@ -68,17 +67,19 @@ func (consumer *Consumer) DisableAutoCommit() { func (consumer *Consumer) Commit() error { - _, err := consumer.c.Commit() - return err + consumer.c.Commit() // TODO: return error if it is not "No offset stored" + return nil } -func (consumer *Consumer) CommitBack(gap int64) error { - if consumer.lastKafkaEventTs == 0 || consumer.partitions == nil { - return nil +func (consumer *Consumer) CommitAtTimestamp(commitTs int64) error { + assigned, err := consumer.c.Assignment() + if err != nil { + return err } - commitTs := consumer.lastKafkaEventTs - gap + logPartitions("Actually assigned:", assigned) + var timestamps []kafka.TopicPartition - for _, p := range consumer.partitions { // p is a copy here sinse partition is not a pointer + for _, p := range assigned { // p is a copy here sinse partition is not a pointer p.Offset = kafka.Offset(commitTs) timestamps = append(timestamps, p) } @@ -88,25 +89,39 @@ func (consumer *Consumer) CommitBack(gap int64) error { } // Limiting to already committed - committedOffsets, err := consumer.c.Committed(consumer.partitions, 2000) // memorise? + committed, err := consumer.c.Committed(assigned, 2000) // memorise? + logPartitions("Actually committed:",committed) if err != nil { return errors.Wrap(err, "Kafka Consumer retrieving committed error") } - for _, ofs := range offsets { - for _, commOfs := range committedOffsets { - if commOfs.Partition == ofs.Partition && commOfs.Offset > ofs.Offset { - ofs.Offset = commOfs.Offset + for _, offs := range offsets { + for _, comm := range committed { + if comm.Offset == kafka.OffsetStored || + comm.Offset == kafka.OffsetInvalid || + comm.Offset == kafka.OffsetBeginning || + comm.Offset == kafka.OffsetEnd { continue } + if comm.Partition == offs.Partition && + (comm.Topic != nil && offs.Topic != nil && *comm.Topic == *offs.Topic) && + comm.Offset > offs.Offset { + offs.Offset = comm.Offset } } - } + } // TODO: check per-partition errors: offsets[i].Error - // As an option: can store offsets and enable autocommit instead _, err = consumer.c.CommitOffsets(offsets) return errors.Wrap(err, "Kafka Consumer back commit error") } +func (consumer *Consumer) CommitBack(gap int64) error { + if consumer.lastKafkaEventTs == 0 { + return nil + } + commitTs := consumer.lastKafkaEventTs - gap + return consumer.CommitAtTimestamp(commitTs) +} + func (consumer *Consumer) ConsumeNext() error { ev := consumer.c.Poll(int(consumer.pollTimeout)) if ev == nil { @@ -131,14 +146,15 @@ func (consumer *Consumer) ConsumeNext() error { Timestamp: ts, }) consumer.lastKafkaEventTs = ts - case kafka.AssignedPartitions: - logPartitions("Kafka Consumer: Partitions Assigned", e.Partitions) - consumer.partitions = e.Partitions - consumer.c.Assign(e.Partitions) - case kafka.RevokedPartitions: - log.Println("Kafka Cosumer: Partitions Revoked") - consumer.partitions = nil - consumer.c.Unassign() + // case kafka.AssignedPartitions: + // logPartitions("Kafka Consumer: Partitions Assigned", e.Partitions) + // consumer.partitions = e.Partitions + // consumer.c.Assign(e.Partitions) + // log.Printf("Actually partitions assigned!") + // case kafka.RevokedPartitions: + // log.Println("Kafka Cosumer: Partitions Revoked") + // consumer.partitions = nil + // consumer.c.Unassign() case kafka.Error: if e.Code() == kafka.ErrAllBrokersDown { os.Exit(1)