diff --git a/ee/backend/pkg/db/redis/consumer.go b/ee/backend/pkg/db/redis/consumer.go index df377adc7..237b24850 100644 --- a/ee/backend/pkg/db/redis/consumer.go +++ b/ee/backend/pkg/db/redis/consumer.go @@ -1,6 +1,7 @@ package redis import ( + "context" "errors" "fmt" "log" @@ -50,7 +51,7 @@ func NewConsumer(client *Client, group string, streams []string) types.Consumer idsPending := make(streamPendingIDsMap) streamsCount := len(streams) for i := 0; i < streamsCount; i++ { - err := client.Redis.XGroupCreateMkStream(streams[i], group, "0").Err() + err := client.Redis.XGroupCreateMkStream(context.Background(), streams[i], group, "0").Err() if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" { log.Fatalln(err) } @@ -72,7 +73,7 @@ func NewConsumer(client *Client, group string, streams []string) types.Consumer } func (c *consumerImpl) ConsumeNextOld() (*QueueMessage, error) { - res, err := c.client.Redis.XReadGroup(&redis.XReadGroupArgs{ + res, err := c.client.Redis.XReadGroup(context.Background(), &redis.XReadGroupArgs{ Group: c.group, Consumer: c.group, Streams: c.streams, @@ -118,7 +119,7 @@ func (c *consumerImpl) ConsumeNextOld() (*QueueMessage, error) { Info: messages.NewBatchInfo(sessionID, r.Stream, bID, 0, int64(ts)), } if c.autoCommit { - if err = c.client.Redis.XAck(r.Stream, c.group, m.ID).Err(); err != nil { + if err = c.client.Redis.XAck(context.Background(), r.Stream, c.group, m.ID).Err(); err != nil { log.Printf("Acknoledgment error for messageID %s, err: %s", m.ID, err.Error()) } } else { @@ -146,7 +147,7 @@ func (c *consumerImpl) CommitBack(gap int64) error { maxI := sort.Search(len(idsInfo.ts), func(i int) bool { return idsInfo.ts[i] > maxTs }) - if err := c.client.Redis.XAck(stream, c.group, idsInfo.id[:maxI]...).Err(); err != nil { + if err := c.client.Redis.XAck(context.Background(), stream, c.group, idsInfo.id[:maxI]...).Err(); err != nil { return fmt.Errorf("RedisStreams: Acknoledgment error on commit %v", err) } c.idsPending[stream].id = idsInfo.id[maxI:] @@ -160,7 +161,7 @@ func (c *consumerImpl) Commit() error { if len(idsInfo.id) == 0 { continue } - if err := c.client.Redis.XAck(stream, c.group, idsInfo.id...).Err(); err != nil { + if err := c.client.Redis.XAck(context.Background(), stream, c.group, idsInfo.id...).Err(); err != nil { return fmt.Errorf("RedisStreams: Acknoledgment error on commit %v", err) } c.idsPending[stream].id = nil