feat(backend): added projects filter to connector logic (#2130)
This commit is contained in:
parent
905953f899
commit
fcc0195528
2 changed files with 47 additions and 0 deletions
|
|
@ -5,6 +5,8 @@ import (
|
|||
"openreplay/backend/internal/config/configurator"
|
||||
"openreplay/backend/internal/config/objectstorage"
|
||||
"openreplay/backend/internal/config/redis"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
|
@ -24,6 +26,7 @@ type Config struct {
|
|||
TopicAnalytics string `env:"TOPIC_ANALYTICS,required"`
|
||||
CommitBatchTimeout time.Duration `env:"COMMIT_BATCH_TIMEOUT,default=5s"`
|
||||
UseProfiler bool `env:"PROFILER_ENABLED,default=false"`
|
||||
ProjectIDs string `env:"PROJECT_IDS"`
|
||||
}
|
||||
|
||||
func New() *Config {
|
||||
|
|
@ -31,3 +34,22 @@ func New() *Config {
|
|||
configurator.Process(cfg)
|
||||
return cfg
|
||||
}
|
||||
|
||||
func (c *Config) GetAllowedProjectIDs() []int {
|
||||
stringIDs := strings.Split(c.ProjectIDs, ",")
|
||||
if len(stringIDs) == 0 {
|
||||
return nil
|
||||
}
|
||||
ids := make([]int, 0, len(stringIDs))
|
||||
for _, id := range stringIDs {
|
||||
intID, err := strconv.Atoi(id)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
ids = append(ids, intID)
|
||||
}
|
||||
if len(ids) == 0 {
|
||||
return nil
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ type Saver struct {
|
|||
lastUpdate map[uint64]time.Time
|
||||
finishedSessions []uint64
|
||||
events []map[string]string
|
||||
projectIDs map[uint32]bool
|
||||
}
|
||||
|
||||
func New(log logger.Logger, cfg *config.Config, db Database, sessions sessions.Sessions, projects projects.Projects) *Saver {
|
||||
|
|
@ -41,6 +42,16 @@ func New(log logger.Logger, cfg *config.Config, db Database, sessions sessions.S
|
|||
if err := validateColumnNames(eventColumns); err != nil {
|
||||
log.Error(ctx, "can't validate events column names: %s", err)
|
||||
}
|
||||
// Parse project IDs
|
||||
projectIDs := make(map[uint32]bool, len(cfg.ProjectIDs))
|
||||
if len(cfg.GetAllowedProjectIDs()) == 0 {
|
||||
log.Info(ctx, "empty project IDs white list")
|
||||
projectIDs = nil
|
||||
} else {
|
||||
for _, id := range cfg.GetAllowedProjectIDs() {
|
||||
projectIDs[uint32(id)] = true
|
||||
}
|
||||
}
|
||||
return &Saver{
|
||||
log: log,
|
||||
cfg: cfg,
|
||||
|
|
@ -49,6 +60,7 @@ func New(log logger.Logger, cfg *config.Config, db Database, sessions sessions.S
|
|||
projModule: projects,
|
||||
updatedSessions: make(map[uint64]bool, 0),
|
||||
lastUpdate: make(map[uint64]time.Time, 0),
|
||||
projectIDs: projectIDs,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -410,6 +422,19 @@ func (s *Saver) handleSession(msg messages.Message) {
|
|||
}
|
||||
|
||||
func (s *Saver) Handle(msg messages.Message) {
|
||||
if s.projectIDs != nil {
|
||||
// Check if project ID is allowed
|
||||
sessInfo, err := s.sessModule.Get(msg.SessionID())
|
||||
if err != nil {
|
||||
s.log.Error(context.Background(), "can't get session info: %s, skipping message", err)
|
||||
return
|
||||
}
|
||||
if !s.projectIDs[sessInfo.ProjectID] {
|
||||
s.log.Debug(context.Background(), "project ID %d is not allowed, skipping message", sessInfo.ProjectID)
|
||||
return
|
||||
}
|
||||
s.log.Debug(context.Background(), "project ID %d is allowed", sessInfo.ProjectID)
|
||||
}
|
||||
newEvent := handleEvent(msg)
|
||||
if newEvent != nil {
|
||||
if s.events == nil {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue