feat(assist): redis consumer fix
This commit is contained in:
parent
6c44970666
commit
27f3ec5585
1 changed files with 6 additions and 5 deletions
|
|
@ -1,6 +1,7 @@
|
|||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
|
|
@ -50,7 +51,7 @@ func NewConsumer(client *Client, group string, streams []string) types.Consumer
|
|||
idsPending := make(streamPendingIDsMap)
|
||||
streamsCount := len(streams)
|
||||
for i := 0; i < streamsCount; i++ {
|
||||
err := client.Redis.XGroupCreateMkStream(streams[i], group, "0").Err()
|
||||
err := client.Redis.XGroupCreateMkStream(context.Background(), streams[i], group, "0").Err()
|
||||
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
|
|
@ -72,7 +73,7 @@ func NewConsumer(client *Client, group string, streams []string) types.Consumer
|
|||
}
|
||||
|
||||
func (c *consumerImpl) ConsumeNextOld() (*QueueMessage, error) {
|
||||
res, err := c.client.Redis.XReadGroup(&redis.XReadGroupArgs{
|
||||
res, err := c.client.Redis.XReadGroup(context.Background(), &redis.XReadGroupArgs{
|
||||
Group: c.group,
|
||||
Consumer: c.group,
|
||||
Streams: c.streams,
|
||||
|
|
@ -118,7 +119,7 @@ func (c *consumerImpl) ConsumeNextOld() (*QueueMessage, error) {
|
|||
Info: messages.NewBatchInfo(sessionID, r.Stream, bID, 0, int64(ts)),
|
||||
}
|
||||
if c.autoCommit {
|
||||
if err = c.client.Redis.XAck(r.Stream, c.group, m.ID).Err(); err != nil {
|
||||
if err = c.client.Redis.XAck(context.Background(), r.Stream, c.group, m.ID).Err(); err != nil {
|
||||
log.Printf("Acknoledgment error for messageID %s, err: %s", m.ID, err.Error())
|
||||
}
|
||||
} else {
|
||||
|
|
@ -146,7 +147,7 @@ func (c *consumerImpl) CommitBack(gap int64) error {
|
|||
maxI := sort.Search(len(idsInfo.ts), func(i int) bool {
|
||||
return idsInfo.ts[i] > maxTs
|
||||
})
|
||||
if err := c.client.Redis.XAck(stream, c.group, idsInfo.id[:maxI]...).Err(); err != nil {
|
||||
if err := c.client.Redis.XAck(context.Background(), stream, c.group, idsInfo.id[:maxI]...).Err(); err != nil {
|
||||
return fmt.Errorf("RedisStreams: Acknoledgment error on commit %v", err)
|
||||
}
|
||||
c.idsPending[stream].id = idsInfo.id[maxI:]
|
||||
|
|
@ -160,7 +161,7 @@ func (c *consumerImpl) Commit() error {
|
|||
if len(idsInfo.id) == 0 {
|
||||
continue
|
||||
}
|
||||
if err := c.client.Redis.XAck(stream, c.group, idsInfo.id...).Err(); err != nil {
|
||||
if err := c.client.Redis.XAck(context.Background(), stream, c.group, idsInfo.id...).Err(); err != nil {
|
||||
return fmt.Errorf("RedisStreams: Acknoledgment error on commit %v", err)
|
||||
}
|
||||
c.idsPending[stream].id = nil
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue