openreplay/ee/backend/pkg/db/redis/consumer.go
Alexander 676041d90b
Ender logic update (#1435)
* feat(backend): new session end detection logic + several fixes

* feat(backend): support partitions managment in ender

* feat(backend): added new consumer support to redis

* feat(backend): added support for new consumer in kafka

* feat(backend): added new consumer support to redis (ee)

* feat(backend): small refactoring in ender
2023-08-03 14:57:15 +04:00

174 lines
4.5 KiB
Go

package redis
import (
"errors"
"fmt"
"log"
"net"
"sort"
"strconv"
"strings"
"github.com/go-redis/redis"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue/types"
)
type idsInfo struct {
id []string
ts []int64
}
type streamPendingIDsMap map[string]*idsInfo
type consumerImpl struct {
client *Client
group string
streams []string
idsPending streamPendingIDsMap
lastTs int64
autoCommit bool
event chan *types.PartitionsRebalancedEvent
}
type QueueMessage struct {
Data []byte
Info *messages.BatchInfo
}
func (c *consumerImpl) ConsumeNext() error {
//TODO implement me
panic("implement me")
}
func (c *consumerImpl) Close() {
//TODO implement me
panic("implement me")
}
func NewConsumer(client *Client, group string, streams []string) types.Consumer {
idsPending := make(streamPendingIDsMap)
streamsCount := len(streams)
for i := 0; i < streamsCount; i++ {
err := client.Redis.XGroupCreateMkStream(streams[i], group, "0").Err()
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
log.Fatalln(err)
}
idsPending[streams[i]] = new(idsInfo)
// ">" is for never-delivered messages.
// Otherwise - never acknowledged only
// TODO: understand why in case of "0" it eats 100% cpu
streams = append(streams, ">")
}
return &consumerImpl{
client: client,
streams: streams,
group: group,
autoCommit: true,
idsPending: idsPending,
event: make(chan *types.PartitionsRebalancedEvent, 4),
}
}
func (c *consumerImpl) ConsumeNextOld() (*QueueMessage, error) {
res, err := c.client.Redis.XReadGroup(&redis.XReadGroupArgs{
Group: c.group,
Consumer: c.group,
Streams: c.streams,
Count: c.client.Cfg.ReadCount,
Block: c.client.Cfg.ReadBlockDuration,
}).Result()
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
return nil, err
}
if err == redis.Nil {
return nil, errors.New("key does not exist")
}
return nil, err
}
// TODO: remove debug logs
log.Printf("info: res.size: %d", len(res))
for _, r := range res {
log.Printf("info: messages.size: %d", len(r.Messages))
for _, m := range r.Messages {
sessionIDString, ok := m.Values["sessionID"].(string)
if !ok {
return nil, fmt.Errorf("can't cast sessionID value for messageID %s", m.ID)
}
sessionID, err := strconv.ParseUint(sessionIDString, 10, 64)
if err != nil {
return nil, fmt.Errorf("can't parse sessionID %s for messageID %s", sessionIDString, m.ID)
}
valueString, ok := m.Values["value"].(string)
if !ok {
return nil, fmt.Errorf("can't cast value for messageID %s", m.ID)
}
// Assuming that ID has a correct format
idParts := strings.Split(m.ID, "-")
ts, _ := strconv.ParseUint(idParts[0], 10, 64)
idx, _ := strconv.ParseUint(idParts[1], 10, 64)
if idx > 0x1FFF {
return nil, errors.New("too many messages per ms in redis")
}
bID := ts<<13 | (idx & 0x1FFF) // Max: 4096 messages/ms for 69 years
result := &QueueMessage{
Data: []byte(valueString),
Info: messages.NewBatchInfo(sessionID, r.Stream, bID, 0, int64(ts)),
}
if c.autoCommit {
if err = c.client.Redis.XAck(r.Stream, c.group, m.ID).Err(); err != nil {
log.Printf("Acknoledgment error for messageID %s, err: %s", m.ID, err.Error())
}
} else {
c.lastTs = int64(ts)
c.idsPending[r.Stream].id = append(c.idsPending[r.Stream].id, m.ID)
c.idsPending[r.Stream].ts = append(c.idsPending[r.Stream].ts, int64(ts))
}
return result, nil
}
}
return nil, errors.New("no messages")
}
func (c *consumerImpl) CommitBack(gap int64) error {
if c.lastTs == 0 {
return nil
}
maxTs := c.lastTs - gap
for stream, idsInfo := range c.idsPending {
if len(idsInfo.id) == 0 {
continue
}
maxI := sort.Search(len(idsInfo.ts), func(i int) bool {
return idsInfo.ts[i] > maxTs
})
if err := c.client.Redis.XAck(stream, c.group, idsInfo.id[:maxI]...).Err(); err != nil {
return fmt.Errorf("RedisStreams: Acknoledgment error on commit %v", err)
}
c.idsPending[stream].id = idsInfo.id[maxI:]
c.idsPending[stream].ts = idsInfo.ts[maxI:]
}
return nil
}
func (c *consumerImpl) Commit() error {
for stream, idsInfo := range c.idsPending {
if len(idsInfo.id) == 0 {
continue
}
if err := c.client.Redis.XAck(stream, c.group, idsInfo.id...).Err(); err != nil {
return fmt.Errorf("RedisStreams: Acknoledgment error on commit %v", err)
}
c.idsPending[stream].id = nil
c.idsPending[stream].ts = nil
}
return nil
}
func (c *consumerImpl) Rebalanced() <-chan *types.PartitionsRebalancedEvent {
return c.event
}