From 27ed4ee6b4e9ac952ef6c6c91253bf71a6fb1ec5 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 17 Apr 2025 09:00:43 +0200 Subject: [PATCH] feat(assist-api): moved some vars to env --- ee/backend/pkg/assist/builder.go | 2 +- ee/backend/pkg/sessionmanager/manager.go | 55 +++++++++++++----------- 2 files changed, 31 insertions(+), 26 deletions(-) diff --git a/ee/backend/pkg/assist/builder.go b/ee/backend/pkg/assist/builder.go index 3165fee79..83918a09e 100644 --- a/ee/backend/pkg/assist/builder.go +++ b/ee/backend/pkg/assist/builder.go @@ -26,7 +26,7 @@ type ServicesBuilder struct { func NewServiceBuilder(log logger.Logger, cfg *assist.Config, webMetrics web.Web, dbMetrics database.Database, pgconn pool.Pool, redis *redis.Client, prefix string) (*ServicesBuilder, error) { projectsManager := projects.New(log, pgconn, redis, dbMetrics) - sessManager, err := sessionmanager.New(log, redis.Redis) + sessManager, err := sessionmanager.New(log, cfg, redis.Redis) if err != nil { return nil, err } diff --git a/ee/backend/pkg/sessionmanager/manager.go b/ee/backend/pkg/sessionmanager/manager.go index e94162ca5..030ae7916 100644 --- a/ee/backend/pkg/sessionmanager/manager.go +++ b/ee/backend/pkg/sessionmanager/manager.go @@ -12,15 +12,14 @@ import ( "github.com/go-redis/redis" + "openreplay/backend/internal/config/assist" "openreplay/backend/pkg/logger" ) const ( - TickInterval = 10 * time.Second NodeKeyPattern = "assist:nodes:*" ActiveSessionPrefix = "assist:online_sessions:" RecentlyUpdatedSessions = "assist:updated_sessions" - BatchSize = 1000 ) type SessionData struct { @@ -49,34 +48,40 @@ type SessionManager interface { } type sessionManagerImpl struct { - ctx context.Context - log logger.Logger - client *redis.Client - ticker *time.Ticker - wg *sync.WaitGroup - stopChan chan struct{} - mutex *sync.RWMutex - cache map[string]*SessionData - sorted []*SessionData + ctx context.Context + log logger.Logger + client *redis.Client + ticker *time.Ticker + wg *sync.WaitGroup + stopChan chan struct{} + mutex *sync.RWMutex + cache map[string]*SessionData + sorted []*SessionData + batchSize int + scanSize int64 } -func New(log logger.Logger, redis *redis.Client) (SessionManager, error) { +func New(log logger.Logger, cfg *assist.Config, redis *redis.Client) (SessionManager, error) { switch { + case cfg == nil: + return nil, fmt.Errorf("config is required") case log == nil: return nil, fmt.Errorf("logger is required") case redis == nil: return nil, fmt.Errorf("redis client is required") } sm := &sessionManagerImpl{ - ctx: context.Background(), - log: log, - client: redis, - ticker: time.NewTicker(TickInterval), - wg: &sync.WaitGroup{}, - stopChan: make(chan struct{}), - mutex: &sync.RWMutex{}, - cache: make(map[string]*SessionData), - sorted: make([]*SessionData, 0), + ctx: context.Background(), + log: log, + client: redis, + ticker: time.NewTicker(cfg.CacheTTL), + wg: &sync.WaitGroup{}, + stopChan: make(chan struct{}), + mutex: &sync.RWMutex{}, + cache: make(map[string]*SessionData), + sorted: make([]*SessionData, 0), + batchSize: cfg.BatchSize, + scanSize: cfg.ScanSize, } return sm, nil } @@ -180,8 +185,8 @@ func (sm *sessionManagerImpl) getOnlineSessionIDs() (map[string]struct{}, error) func (sm *sessionManagerImpl) getSessionData(sessionIDs []string) map[string]*SessionData { sessionData := make(map[string]*SessionData, len(sessionIDs)) - for i := 0; i < len(sessionIDs); i += BatchSize { - end := i + BatchSize + for i := 0; i < len(sessionIDs); i += sm.batchSize { + end := i + sm.batchSize if end > len(sessionIDs) { end = len(sessionIDs) } @@ -222,7 +227,7 @@ func (sm *sessionManagerImpl) getSessionData(sessionIDs []string) map[string]*Se data.Raw = raw sessionData[batch[j]] = &data } - sm.log.Debug(sm.ctx, "Collected %d sessions", len(results)) + sm.log.Debug(sm.ctx, "Collected %d new sessions", len(results)) } sm.wg.Wait() @@ -288,7 +293,7 @@ func (sm *sessionManagerImpl) getAllRecentlyUpdatedSessions() (map[string]struct ) for { - batchIDs, cursor, err = sm.client.SScan(RecentlyUpdatedSessions, cursor, "*", 1000).Result() + batchIDs, cursor, err = sm.client.SScan(RecentlyUpdatedSessions, cursor, "*", sm.scanSize).Result() if err != nil { sm.log.Debug(sm.ctx, "Error scanning updated session IDs: %v", err) return nil, err