feat(assist-api): moved some vars to env
This commit is contained in:
parent
59251af8c6
commit
27ed4ee6b4
2 changed files with 31 additions and 26 deletions
|
|
@ -26,7 +26,7 @@ type ServicesBuilder struct {
|
|||
|
||||
func NewServiceBuilder(log logger.Logger, cfg *assist.Config, webMetrics web.Web, dbMetrics database.Database, pgconn pool.Pool, redis *redis.Client, prefix string) (*ServicesBuilder, error) {
|
||||
projectsManager := projects.New(log, pgconn, redis, dbMetrics)
|
||||
sessManager, err := sessionmanager.New(log, redis.Redis)
|
||||
sessManager, err := sessionmanager.New(log, cfg, redis.Redis)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,15 +12,14 @@ import (
|
|||
|
||||
"github.com/go-redis/redis"
|
||||
|
||||
"openreplay/backend/internal/config/assist"
|
||||
"openreplay/backend/pkg/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
TickInterval = 10 * time.Second
|
||||
NodeKeyPattern = "assist:nodes:*"
|
||||
ActiveSessionPrefix = "assist:online_sessions:"
|
||||
RecentlyUpdatedSessions = "assist:updated_sessions"
|
||||
BatchSize = 1000
|
||||
)
|
||||
|
||||
type SessionData struct {
|
||||
|
|
@ -49,34 +48,40 @@ type SessionManager interface {
|
|||
}
|
||||
|
||||
type sessionManagerImpl struct {
|
||||
ctx context.Context
|
||||
log logger.Logger
|
||||
client *redis.Client
|
||||
ticker *time.Ticker
|
||||
wg *sync.WaitGroup
|
||||
stopChan chan struct{}
|
||||
mutex *sync.RWMutex
|
||||
cache map[string]*SessionData
|
||||
sorted []*SessionData
|
||||
ctx context.Context
|
||||
log logger.Logger
|
||||
client *redis.Client
|
||||
ticker *time.Ticker
|
||||
wg *sync.WaitGroup
|
||||
stopChan chan struct{}
|
||||
mutex *sync.RWMutex
|
||||
cache map[string]*SessionData
|
||||
sorted []*SessionData
|
||||
batchSize int
|
||||
scanSize int64
|
||||
}
|
||||
|
||||
func New(log logger.Logger, redis *redis.Client) (SessionManager, error) {
|
||||
func New(log logger.Logger, cfg *assist.Config, redis *redis.Client) (SessionManager, error) {
|
||||
switch {
|
||||
case cfg == nil:
|
||||
return nil, fmt.Errorf("config is required")
|
||||
case log == nil:
|
||||
return nil, fmt.Errorf("logger is required")
|
||||
case redis == nil:
|
||||
return nil, fmt.Errorf("redis client is required")
|
||||
}
|
||||
sm := &sessionManagerImpl{
|
||||
ctx: context.Background(),
|
||||
log: log,
|
||||
client: redis,
|
||||
ticker: time.NewTicker(TickInterval),
|
||||
wg: &sync.WaitGroup{},
|
||||
stopChan: make(chan struct{}),
|
||||
mutex: &sync.RWMutex{},
|
||||
cache: make(map[string]*SessionData),
|
||||
sorted: make([]*SessionData, 0),
|
||||
ctx: context.Background(),
|
||||
log: log,
|
||||
client: redis,
|
||||
ticker: time.NewTicker(cfg.CacheTTL),
|
||||
wg: &sync.WaitGroup{},
|
||||
stopChan: make(chan struct{}),
|
||||
mutex: &sync.RWMutex{},
|
||||
cache: make(map[string]*SessionData),
|
||||
sorted: make([]*SessionData, 0),
|
||||
batchSize: cfg.BatchSize,
|
||||
scanSize: cfg.ScanSize,
|
||||
}
|
||||
return sm, nil
|
||||
}
|
||||
|
|
@ -180,8 +185,8 @@ func (sm *sessionManagerImpl) getOnlineSessionIDs() (map[string]struct{}, error)
|
|||
func (sm *sessionManagerImpl) getSessionData(sessionIDs []string) map[string]*SessionData {
|
||||
sessionData := make(map[string]*SessionData, len(sessionIDs))
|
||||
|
||||
for i := 0; i < len(sessionIDs); i += BatchSize {
|
||||
end := i + BatchSize
|
||||
for i := 0; i < len(sessionIDs); i += sm.batchSize {
|
||||
end := i + sm.batchSize
|
||||
if end > len(sessionIDs) {
|
||||
end = len(sessionIDs)
|
||||
}
|
||||
|
|
@ -222,7 +227,7 @@ func (sm *sessionManagerImpl) getSessionData(sessionIDs []string) map[string]*Se
|
|||
data.Raw = raw
|
||||
sessionData[batch[j]] = &data
|
||||
}
|
||||
sm.log.Debug(sm.ctx, "Collected %d sessions", len(results))
|
||||
sm.log.Debug(sm.ctx, "Collected %d new sessions", len(results))
|
||||
}
|
||||
|
||||
sm.wg.Wait()
|
||||
|
|
@ -288,7 +293,7 @@ func (sm *sessionManagerImpl) getAllRecentlyUpdatedSessions() (map[string]struct
|
|||
)
|
||||
|
||||
for {
|
||||
batchIDs, cursor, err = sm.client.SScan(RecentlyUpdatedSessions, cursor, "*", 1000).Result()
|
||||
batchIDs, cursor, err = sm.client.SScan(RecentlyUpdatedSessions, cursor, "*", sm.scanSize).Result()
|
||||
if err != nil {
|
||||
sm.log.Debug(sm.ctx, "Error scanning updated session IDs: %v", err)
|
||||
return nil, err
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue