fix(backend): consimer.CommitBack limit to already committed
This commit is contained in:
parent
1ca1774225
commit
3a90bddaab
1 changed files with 16 additions and 2 deletions
|
|
@ -68,8 +68,8 @@ func (consumer *Consumer) DisableAutoCommit() {
|
|||
|
||||
|
||||
func (consumer *Consumer) Commit() error {
|
||||
consumer.c.Commit() // TODO: return error if it is not "No offset stored"
|
||||
return nil
|
||||
_, err := consumer.c.Commit()
|
||||
return err
|
||||
}
|
||||
|
||||
func (consumer *Consumer) CommitBack(gap int64) error {
|
||||
|
|
@ -86,6 +86,20 @@ func (consumer *Consumer) CommitBack(gap int64) error {
|
|||
if err != nil {
|
||||
return errors.Wrap(err, "Kafka Consumer back commit error")
|
||||
}
|
||||
|
||||
// Limiting to already committed
|
||||
committedOffsets, err := consumer.c.Committed(consumer.partitions, 2000) // memorise?
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Kafka Consumer retrieving committed error")
|
||||
}
|
||||
for _, ofs := range offsets {
|
||||
for _, commOfs := range committedOffsets {
|
||||
if commOfs.Partition == ofs.Partition && commOfs.Offset > ofs.Offset {
|
||||
ofs.Offset = commOfs.Offset
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: check per-partition errors: offsets[i].Error
|
||||
// As an option: can store offsets and enable autocommit instead
|
||||
_, err = consumer.c.CommitOffsets(offsets)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue