feat(connector): added new events + projectID filter

This commit is contained in:
Alexander 2024-05-23 12:09:59 +02:00
parent a4577300a2
commit 4c96cbcef3
3 changed files with 143 additions and 3 deletions

View file

@ -6,6 +6,8 @@ import (
"openreplay/backend/internal/config/objectstorage"
"openreplay/backend/internal/config/redis"
"openreplay/backend/pkg/logger"
"strconv"
"strings"
"time"
)
@ -26,6 +28,7 @@ type Config struct {
TopicAnalytics string `env:"TOPIC_ANALYTICS,required"`
CommitBatchTimeout time.Duration `env:"COMMIT_BATCH_TIMEOUT,default=5s"`
UseProfiler bool `env:"PROFILER_ENABLED,default=false"`
ProjectIDs string `env:"PROJECT_IDS"`
}
func New(log logger.Logger) *Config {
@ -33,3 +36,22 @@ func New(log logger.Logger) *Config {
configurator.Process(log, cfg)
return cfg
}
func (c *Config) GetAllowedProjectIDs() []int {
stringIDs := strings.Split(c.ProjectIDs, ",")
if len(stringIDs) == 0 {
return nil
}
ids := make([]int, 0, len(stringIDs))
for _, id := range stringIDs {
intID, err := strconv.Atoi(id)
if err != nil {
continue
}
ids = append(ids, intID)
}
if len(ids) == 0 {
return nil
}
return ids
}

View file

@ -4,10 +4,11 @@ import "strconv"
var sessionColumns = []string{
"sessionid",
"user_agent",
"user_browser",
"user_browser_version",
"user_country",
"user_city",
"user_state",
"user_device",
"user_device_heap_size",
"user_device_memory_size",
@ -20,6 +21,8 @@ var sessionColumns = []string{
"referrer",
"user_anonymous_id",
"user_id",
"tracker_version",
"rev_id",
"session_start_timestamp",
"session_end_timestamp",
"session_duration",
@ -98,8 +101,77 @@ var eventColumns = []string{
"issueevent_url",
"customissue_name",
"customissue_payload",
"received_at",
"batch_order_number",
"mouseclick_label",
"mouseclick_selector",
"mouseclick_url",
"mouseclick_hesitation_time",
"mouseclick_timestamp",
"pageevent_url",
"pageevent_referrer",
"pageevent_speed_index",
"pageevent_timestamp",
"inputevent_label",
"inputevent_hesitation_time",
"inputevent_input_duration",
"inputevent_timestamp",
"mobile_event_name",
"mobile_event_payload",
"mobile_networkcall_type",
"mobile_networkcall_method",
"mobile_networkcall_url",
"mobile_networkcall_request",
"mobile_networkcall_response",
"mobile_networkcall_status",
"mobile_networkcall_timestamp",
"mobile_networkcall_duration",
"mobile_clickevent_x",
"mobile_clickevent_y",
"mobile_clickevent_timestamp",
"mobile_clickevent_label",
"mobile_swipeevent_x",
"mobile_swipeevent_y",
"mobile_swipeevent_timestamp",
"mobile_swipeevent_label",
"mobile_inputevent_label",
"mobile_inputevent_value",
"mobile_crash_name",
"mobile_crash_reason",
"mobile_crash_stacktrace",
"mobile_issueevent_timestamp",
"mobile_issueevent_type",
"mobile_issueevent_context_string",
"mobile_issueevent_context",
"mobile_issueevent_payload",
"mobile_viewcomponentevent_screen_name",
"mobile_viewcomponentevent_view_name",
"mobile_viewcomponentevent_visible",
"mobile_viewcomponentevent_timestamp",
}
var eventInts = []string{
"networkrequest_status",
"networkrequest_timestamp",
"networkrequest_duration",
"issueevent_message_id",
"issueevent_timestamp",
"mouseclick_hesitation_time",
"mouseclick_timestamp",
"pageevent_speed_index",
"pageevent_timestamp",
"inputevent_hesitation_time",
"inputevent_input_duration",
"inputevent_timestamp",
"mobile_networkcall_status",
"mobile_networkcall_timestamp",
"mobile_networkcall_duration",
"mobile_clickevent_x",
"mobile_clickevent_y",
"mobile_clickevent_timestamp",
"mobile_swipeevent_x",
"mobile_swipeevent_y",
"mobile_swipeevent_timestamp",
"mobile_issueevent_timestamp",
"mobile_viewcomponentevent_timestamp",
}
func QUOTES(s string) string {

View file

@ -26,6 +26,7 @@ type Saver struct {
lastUpdate map[uint64]time.Time
finishedSessions []uint64
events []map[string]string
projectIDs map[uint32]bool
}
func New(log logger.Logger, cfg *config.Config, db Database, sessions sessions.Sessions, projects projects.Projects) *Saver {
@ -41,6 +42,16 @@ func New(log logger.Logger, cfg *config.Config, db Database, sessions sessions.S
if err := validateColumnNames(eventColumns); err != nil {
log.Error(ctx, "can't validate events column names: %s", err)
}
// Parse project IDs
projectIDs := make(map[uint32]bool, len(cfg.ProjectIDs))
if len(cfg.GetAllowedProjectIDs()) == 0 {
log.Info(ctx, "empty project IDs white list")
projectIDs = nil
} else {
for _, id := range cfg.GetAllowedProjectIDs() {
projectIDs[uint32(id)] = true
}
}
return &Saver{
log: log,
cfg: cfg,
@ -49,6 +60,7 @@ func New(log logger.Logger, cfg *config.Config, db Database, sessions sessions.S
projModule: projects,
updatedSessions: make(map[uint64]bool, 0),
lastUpdate: make(map[uint64]time.Time, 0),
projectIDs: projectIDs,
}
}
@ -87,6 +99,22 @@ func handleEvent(msg messages.Message) map[string]string {
case *messages.CustomIssue:
event["customissue_name"] = QUOTES(m.Name)
event["customissue_payload"] = QUOTES(m.Payload)
case *messages.MouseClick:
event["mouseclick_label"] = QUOTES(m.Label)
event["mouseclick_selector"] = QUOTES(m.Selector)
event["mouseclick_url"] = QUOTES(msg.Meta().Url)
event["mouseclick_hesitation_time"] = fmt.Sprintf("%d", m.HesitationTime)
event["mouseclick_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
case *messages.PageEvent:
event["pageevent_url"] = QUOTES(m.URL)
event["pageevent_referrer"] = QUOTES(m.Referrer)
event["pageevent_speed_index"] = fmt.Sprintf("%d", m.SpeedIndex)
event["pageevent_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
case *messages.InputChange:
event["inputevent_label"] = QUOTES(m.Label)
event["inputevent_hesitation_time"] = fmt.Sprintf("%d", m.HesitationTime)
event["inputevent_input_duration"] = fmt.Sprintf("%d", m.InputDuration)
event["inputevent_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
// Mobile events
case *messages.MobileEvent:
event["mobile_event_name"] = QUOTES(m.Name)
@ -123,6 +151,11 @@ func handleEvent(msg messages.Message) map[string]string {
event["mobile_issueevent_context_string"] = QUOTES(m.ContextString)
event["mobile_issueevent_context"] = QUOTES(m.Context)
event["mobile_issueevent_payload"] = QUOTES(m.Payload)
case *messages.MobileViewComponentEvent:
event["mobile_viewcomponentevent_screen_name"] = QUOTES(m.ScreenName)
event["mobile_viewcomponentevent_view_name"] = QUOTES(m.ViewName)
event["mobile_viewcomponentevent_visible"] = fmt.Sprintf("%t", m.Visible)
event["mobile_viewcomponentevent_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
}
if len(event) == 0 {
@ -410,6 +443,19 @@ func (s *Saver) handleSession(msg messages.Message) {
}
func (s *Saver) Handle(msg messages.Message) {
if s.projectIDs != nil {
// Check if project ID is allowed
sessInfo, err := s.sessModule.Get(msg.SessionID())
if err != nil {
s.log.Error(context.Background(), "can't get session info: %s, skipping message", err)
return
}
if !s.projectIDs[sessInfo.ProjectID] {
s.log.Debug(context.Background(), "project ID %d is not allowed, skipping message", sessInfo.ProjectID)
return
}
s.log.Info(context.Background(), "project ID %d is allowed", sessInfo.ProjectID)
}
newEvent := handleEvent(msg)
if newEvent != nil {
if s.events == nil {