openreplay/ee/backend/pkg/db/redis/consumer.go
Alexander 4b8f3bee25
Sessions refactoring (#1371)
* feat(backend): moved sql requests related to sessions table to one place

* feat(backend): refactoring in db.Saver handler

* feat(backend): hude refactoring in db/postgres module

* fix(backend): workable feature flags

* fix(backend): workable integrations

* fix(backend): workable sessions and projects modules

* fix(backend): added missed projects module to sessions

* feat(backend): renaming

* feat(backend): moved session struct to sessions module and split methods into interface, cache and storage levels

* feat(backend): moved project struct to projects module

* feat(backend): added projects model

* feat(backend): implemented new in memory cache for sessions and projects

* feat(backend): implemented new cache in projects

* feat(backend): there are 2 methods in cache module now: Get() and GetAndRefresh()

* feat(backend): added cache update operations

* fix(backend): fixed import cycle

* fix(backend): fixed panic in db message handler

* fix(backend): fixed panic in projects module

* fix(backend): fixed panic in sessions.GetDuration

* feat(backend): added direct call to get session duration if session is already in cache

* feat(backend): used pg pool everywhere except db service

* fix(backend): added missing part after rebase

* fix(backend): removed old sessions file

* feat(backend): added refactored redis client with produce/consume options

* feat(backend): added cache layer for projects

* fix(backend): added missing redis config

* fix(backend): added missing method for producer

* feat(backend): cache integration for sessions

* feat(backend): temporary method to get session directly from db

* feat(backend): adapt EE version of message handler

* fix(backend): fixed issue in fts realisation

* fix(backend): added redis cache to sessions module

* fix(backend): set 0 duration or hesitation time for inputs without focus event

* feat(backend): added cache for session updates and failover mechanism for batch.Insert() operation

* feat(backend): debug log

* feat(backend): more debug log

* feat(backend): removed debug log

* fix(backend): fixed an issue of tracking input events with empty label

* fix(backend): disabled debug log in projects cache

* fix(backend): renamed session updater

* fix(backend): fixed closed pool issue in DB service

* fix(backend): fixed dead lock in db Stop() method

* fix(backend): fixed panic in heuristics service

* feat(backend): enabled redis cache in projects

* feat(backend): clear cache on each update operation

* feat(backend): fully integrated cache layer with auto switch

* feat(backend): small refactoring in session updates

* fix(backend): fixed wrong events counter issue

* feat(backend): enabled full cache support in ender and http services

* fix(backend/ee): added missed import

* feat(backend): added second cache layer for db to speed up the service

* feat(backend): disable redis cache

* feat(backend): moved redis cache to ee
2023-07-06 10:55:43 +02:00

173 lines
4.5 KiB
Go

package redis
import (
"errors"
"fmt"
"github.com/go-redis/redis"
"log"
"net"
redis2 "openreplay/backend/pkg/db/redis"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue/types"
"sort"
"strconv"
"strings"
)
type idsInfo struct {
id []string
ts []int64
}
type streamPendingIDsMap map[string]*idsInfo
type consumerImpl struct {
client *redis2.Client
group string
streams []string
idsPending streamPendingIDsMap
lastTs int64
autoCommit bool
event chan interface{}
}
type QueueMessage struct {
Data []byte
Info *messages.BatchInfo
}
func (c *consumerImpl) ConsumeNext() error {
//TODO implement me
panic("implement me")
}
func (c *consumerImpl) Close() {
//TODO implement me
panic("implement me")
}
func NewConsumer(client *redis2.Client, group string, streams []string) types.Consumer {
idsPending := make(streamPendingIDsMap)
streamsCount := len(streams)
for i := 0; i < streamsCount; i++ {
err := client.Redis.XGroupCreateMkStream(streams[i], group, "0").Err()
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
log.Fatalln(err)
}
idsPending[streams[i]] = new(idsInfo)
// ">" is for never-delivered messages.
// Otherwise - never acknowledged only
// TODO: understand why in case of "0" it eats 100% cpu
streams = append(streams, ">")
}
return &consumerImpl{
client: client,
streams: streams,
group: group,
autoCommit: true,
idsPending: idsPending,
event: make(chan interface{}, 4),
}
}
func (c *consumerImpl) ConsumeNextOld() (*QueueMessage, error) {
res, err := c.client.Redis.XReadGroup(&redis.XReadGroupArgs{
Group: c.group,
Consumer: c.group,
Streams: c.streams,
Count: c.client.Cfg.ReadCount,
Block: c.client.Cfg.ReadBlockDuration,
}).Result()
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
return nil, err
}
if err == redis.Nil {
return nil, errors.New("key does not exist")
}
return nil, err
}
// TODO: remove debug logs
log.Printf("info: res.size: %d", len(res))
for _, r := range res {
log.Printf("info: messages.size: %d", len(r.Messages))
for _, m := range r.Messages {
sessionIDString, ok := m.Values["sessionID"].(string)
if !ok {
return nil, fmt.Errorf("can't cast sessionID value for messageID %s", m.ID)
}
sessionID, err := strconv.ParseUint(sessionIDString, 10, 64)
if err != nil {
return nil, fmt.Errorf("can't parse sessionID %s for messageID %s", sessionIDString, m.ID)
}
valueString, ok := m.Values["value"].(string)
if !ok {
return nil, fmt.Errorf("can't cast value for messageID %s", m.ID)
}
// Assuming that ID has a correct format
idParts := strings.Split(m.ID, "-")
ts, _ := strconv.ParseUint(idParts[0], 10, 64)
idx, _ := strconv.ParseUint(idParts[1], 10, 64)
if idx > 0x1FFF {
return nil, errors.New("too many messages per ms in redis")
}
bID := ts<<13 | (idx & 0x1FFF) // Max: 4096 messages/ms for 69 years
result := &QueueMessage{
Data: []byte(valueString),
Info: messages.NewBatchInfo(sessionID, r.Stream, bID, 0, int64(ts)),
}
if c.autoCommit {
if err = c.client.Redis.XAck(r.Stream, c.group, m.ID).Err(); err != nil {
log.Printf("Acknoledgment error for messageID %s, err: %s", m.ID, err.Error())
}
} else {
c.lastTs = int64(ts)
c.idsPending[r.Stream].id = append(c.idsPending[r.Stream].id, m.ID)
c.idsPending[r.Stream].ts = append(c.idsPending[r.Stream].ts, int64(ts))
}
return result, nil
}
}
return nil, errors.New("no messages")
}
func (c *consumerImpl) CommitBack(gap int64) error {
if c.lastTs == 0 {
return nil
}
maxTs := c.lastTs - gap
for stream, idsInfo := range c.idsPending {
if len(idsInfo.id) == 0 {
continue
}
maxI := sort.Search(len(idsInfo.ts), func(i int) bool {
return idsInfo.ts[i] > maxTs
})
if err := c.client.Redis.XAck(stream, c.group, idsInfo.id[:maxI]...).Err(); err != nil {
return fmt.Errorf("RedisStreams: Acknoledgment error on commit %v", err)
}
c.idsPending[stream].id = idsInfo.id[maxI:]
c.idsPending[stream].ts = idsInfo.ts[maxI:]
}
return nil
}
func (c *consumerImpl) Commit() error {
for stream, idsInfo := range c.idsPending {
if len(idsInfo.id) == 0 {
continue
}
if err := c.client.Redis.XAck(stream, c.group, idsInfo.id...).Err(); err != nil {
return fmt.Errorf("RedisStreams: Acknoledgment error on commit %v", err)
}
c.idsPending[stream].id = nil
c.idsPending[stream].ts = nil
}
return nil
}
func (c *consumerImpl) Rebalanced() <-chan interface{} {
return c.event
}