From f789ee1bda10320182bc1ec632e6d94767943a62 Mon Sep 17 00:00:00 2001 From: Shekar Siri Date: Mon, 12 May 2025 12:07:32 +0200 Subject: [PATCH] feat(product_analytics): user journey - wip --- .../analytics/charts/metric_user_journey.go | 365 +++++++++++++----- backend/pkg/analytics/charts/model.go | 33 +- 2 files changed, 287 insertions(+), 111 deletions(-) diff --git a/backend/pkg/analytics/charts/metric_user_journey.go b/backend/pkg/analytics/charts/metric_user_journey.go index e4acec780..82332ca6b 100644 --- a/backend/pkg/analytics/charts/metric_user_journey.go +++ b/backend/pkg/analytics/charts/metric_user_journey.go @@ -11,10 +11,11 @@ import ( // Node represents a point in the journey diagram. type Node struct { - Depth int `json:"depth"` - Name string `json:"name"` - EventType string `json:"eventType"` - ID int `json:"id"` + Depth int `json:"depth"` + Name string `json:"name"` + EventType string `json:"eventType"` + ID int `json:"id"` + StartingNode bool `json:"startingNode"` } // Link represents a transition between nodes. @@ -52,7 +53,7 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac defer rows.Close() type row struct { - Stage uint64 + Stage int64 CurrentEventName string CurrentEventProperty string PrevEventName string @@ -82,24 +83,47 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac rawData = append(rawData, r) } - // Group data by stage and determine max stage - dataByStage := make(map[uint64][]row) - var maxStage uint64 = 0 + // Group data by stage + dataByStage := make(map[int64][]row) + var minStage int64 = 0 + var maxStage int64 = 0 + for _, r := range rawData { dataByStage[r.Stage] = append(dataByStage[r.Stage], r) if r.Stage > maxStage { maxStage = r.Stage } + if r.Stage < minStage { + minStage = r.Stage + } } // Calculate total sessions per stage - stageTotals := make(map[uint64]uint64) + stageTotals := make(map[int64]uint64) for stage, stageRows := range dataByStage { for _, r := range stageRows { stageTotals[stage] += r.SessionsCount } } - initialCount := stageTotals[1] + + // Determine base count for percentage calculations + // We'll use the starting point (usually stage 1) as our base + var baseSessionsCount uint64 + if count, exists := stageTotals[1]; exists { + baseSessionsCount = count + } else { + // If stage 1 doesn't exist, use the first available positive stage + for stage := int64(0); stage <= maxStage; stage++ { + if count, exists := stageTotals[stage]; exists { + baseSessionsCount = count + break + } + } + } + + if baseSessionsCount == 0 { + baseSessionsCount = 1 // Prevent division by zero + } // Number of top nodes to display per stage topLimit := int(p.Rows) @@ -114,10 +138,20 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac } // Map to store top paths for each stage - topPathsByStage := make(map[uint64]map[pathKey]bool) - pathCountsByStage := make(map[uint64]map[pathKey]uint64) + topPathsByStage := make(map[int64]map[pathKey]bool) + pathCountsByStage := make(map[int64]map[pathKey]uint64) + + for stage := minStage; stage <= maxStage; stage++ { + // Skip if this stage has no data + if _, exists := dataByStage[stage]; !exists { + continue + } + + // Sort rows within each stage by session count (descending) + sort.Slice(dataByStage[stage], func(i, j int) bool { + return dataByStage[stage][i].SessionsCount > dataByStage[stage][j].SessionsCount + }) - for stage := uint64(1); stage <= maxStage; stage++ { // Initialize maps for this stage topPathsByStage[stage] = make(map[pathKey]bool) pathCountsByStage[stage] = make(map[pathKey]uint64) @@ -144,7 +178,7 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac return paths[i].count > paths[j].count }) - // Mark top paths + // Mark top paths - take exactly topLimit or all if fewer available for i, pc := range paths { if i < topLimit { topPathsByStage[stage][pc.path] = true @@ -152,35 +186,78 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac } } - // Step 2: Create nodes and track sessions + // Step 2: Create a normalized sequential depth mapping + // First, gather all stages that have data + var stagesWithData []int64 + for stage := range dataByStage { + stagesWithData = append(stagesWithData, stage) + } + + // Sort stages + sort.Slice(stagesWithData, func(i, j int) bool { + return stagesWithData[i] < stagesWithData[j] + }) + + var startingStage int64 + for _, s := range stagesWithData { + if s > 0 { + startingStage = s + break + } + } + + // Create a mapping from logical stage to display depth (ensuring no gaps) + stageToDepth := make(map[int64]int) + for i, stage := range stagesWithData { + stageToDepth[stage] = i + } + + // Determine depth of central node (stage 1 or equivalent) + var centralDepth int + if depth, exists := stageToDepth[1]; exists { + centralDepth = depth + } else { + // If stage 1 doesn't exist, use the first positive stage + for _, stage := range stagesWithData { + if stage > 0 { + centralDepth = stageToDepth[stage] + break + } + } + } + + // Step 3: Create nodes with normalized depths var nodes []Node var links []Link nodeID := 0 // Maps to track nodes and sessions - nodeMap := make(map[string]int) // Stage|EventName|EventProp → nodeID - othersNodes := make(map[uint64]int) // stage → "Others" nodeID - dropNodes := make(map[uint64]int) // stage → "Drop" nodeID + nodeMap := make(map[string]int) // Stage|EventName|EventProp → nodeID + othersNodes := make(map[int64]int) // stage → "Others" nodeID + dropNodes := make(map[int64]int) // stage → "Drop" nodeID incomingSessions := make(map[int]uint64) // nodeID → incoming sessions outgoingSessions := make(map[int]uint64) // nodeID → outgoing sessions - // Create all nodes first - for stage := uint64(1); stage <= maxStage; stage++ { + // Create all nodes using normalized depths + for _, stage := range stagesWithData { + displayDepth := stageToDepth[stage] + // Create regular nodes for top paths for path := range topPathsByStage[stage] { nodeKey := fmt.Sprintf("%d|%s|%s", stage, path.eventName, path.eventProp) nodeMap[nodeKey] = nodeID nodes = append(nodes, Node{ - ID: nodeID, - Depth: int(stage) - 1, - Name: path.eventProp, - EventType: path.eventName, + ID: nodeID, + Depth: displayDepth, + Name: path.eventProp, + EventType: path.eventName, + StartingNode: stage == startingStage, }) - // For stage 1, set incoming sessions - if stage == 1 { + // For the central stage (usually stage 1) or first stage, set incoming sessions + if (stage == 1) || (stage == minStage && minStage != 1) { incomingSessions[nodeID] = pathCountsByStage[stage][path] } @@ -203,14 +280,15 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac othersNodes[stage] = nodeID nodes = append(nodes, Node{ - ID: nodeID, - Depth: int(stage) - 1, - Name: "other", - EventType: "OTHER", + ID: nodeID, + Depth: displayDepth, + Name: "other", + EventType: "OTHER", + StartingNode: stage == startingStage, }) - // For stage 1, set incoming sessions for Others - if stage == 1 { + // For the central stage or first stage, set incoming sessions for Others + if (stage == 1) || (stage == minStage && minStage != 1) { incomingSessions[nodeID] = othersCount } @@ -219,7 +297,7 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac } } - // Step 3: Create links between nodes + // Step 4: Create links between adjacent nodes only // Use a map to deduplicate links type linkKey struct { src int @@ -228,10 +306,18 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac linkSessions := make(map[linkKey]uint64) linkTypes := make(map[linkKey]string) - for stage := uint64(2); stage <= maxStage; stage++ { - for _, r := range dataByStage[stage] { + // For each stage (except the first), create links from the previous stage + for i := 1; i < len(stagesWithData); i++ { + currentStage := stagesWithData[i] + prevStage := stagesWithData[i-1] + + for _, r := range dataByStage[currentStage] { + // Skip if previous stage doesn't match expected + if r.Stage != currentStage { + continue + } + // Determine source node - prevStage := stage - 1 prevPathKey := fmt.Sprintf("%d|%s|%s", prevStage, r.PrevEventName, r.PrevEventProperty) srcID, hasSrc := nodeMap[prevPathKey] @@ -252,14 +338,14 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac var hasTgt bool // Check if this path is in the top paths for this stage - if topPathsByStage[stage][curPath] { + if topPathsByStage[currentStage][curPath] { // It's a top node - curPathKey := fmt.Sprintf("%d|%s|%s", stage, r.CurrentEventName, r.CurrentEventProperty) + curPathKey := fmt.Sprintf("%d|%s|%s", currentStage, r.CurrentEventName, r.CurrentEventProperty) tgtID = nodeMap[curPathKey] hasTgt = true } else { // It's part of Others - if othersID, hasOthers := othersNodes[stage]; hasOthers { + if othersID, hasOthers := othersNodes[currentStage]; hasOthers { tgtID = othersID hasTgt = true } @@ -284,9 +370,11 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac } } - // Create deduplicated links + // Create deduplicated links with proper percentages for lk, count := range linkSessions { - percent := math.Round(float64(count)*10000/float64(initialCount)) / 100 + // Calculate percentage based on baseSessionsCount + percent := math.Round(float64(count)*10000/float64(baseSessionsCount)) / 100 + links = append(links, Link{ Source: lk.src, Target: lk.tgt, @@ -296,17 +384,25 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac }) } - // Step 4: Calculate drops and create drop nodes - cumulativeDrops := make(map[uint64]uint64) + // Step 5: Calculate drops and create drop nodes (only for stages ≥ 0) + // Process forward drops (positive stages only) + for i := 0; i < len(stagesWithData)-1; i++ { + stage := stagesWithData[i] + + // Skip negative stages for drops + if stage < 0 { + continue + } - for stage := uint64(1); stage < maxStage; stage++ { // Calculate new drops at this stage stageDrops := uint64(0) dropsFromNode := make(map[int]uint64) // nodeID -> drop count for _, node := range nodes { - nodeStage := uint64(node.Depth) + 1 - if nodeStage != stage { + nodeDepth := node.Depth + + // Skip if this node isn't in the current stage + if nodeDepth != stageToDepth[stage] { continue } @@ -320,65 +416,91 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac } } - // Calculate cumulative drops - if stage == 1 { - cumulativeDrops[stage] = stageDrops - } else { - cumulativeDrops[stage] = cumulativeDrops[stage-1] + stageDrops + // Skip if no drops + if stageDrops == 0 { + continue } - // Create drop node if there are drops - if cumulativeDrops[stage] > 0 { - dropNodes[stage] = nodeID + // Determine next stage depth for drop node positioning + var dropDepth int + if i+1 < len(stagesWithData) { + dropDepth = stageToDepth[stagesWithData[i+1]] + } else { + dropDepth = stageToDepth[stage] + 1 + } - nodes = append(nodes, Node{ - ID: nodeID, - Depth: int(stage), // Drop nodes appear at the next depth - Name: "drop", - EventType: "DROP", - }) + // Create drop node + dropNodes[stage] = nodeID - // Create links from nodes with drops to the drop node - for nid, dropCount := range dropsFromNode { - if dropCount == 0 { - continue - } + nodes = append(nodes, Node{ + ID: nodeID, + Depth: dropDepth, + Name: "drop", + EventType: "DROP", + }) - percent := math.Round(float64(dropCount)*10000/float64(initialCount)) / 100 - links = append(links, Link{ - Source: nid, - Target: nodeID, - SessionsCount: int(dropCount), - Value: percent, - EventType: "DROP", - }) + // Create links from nodes with drops to the drop node + for nid, dropCount := range dropsFromNode { + if dropCount == 0 { + continue } - // Link previous drop node to carry forward drops - if stage > 1 && cumulativeDrops[stage-1] > 0 { - if prevDropID, hasPrevDrop := dropNodes[stage-1]; hasPrevDrop { - percent := math.Round(float64(cumulativeDrops[stage-1])*10000/float64(initialCount)) / 100 + // Calculate percentage based on baseSessionsCount + percent := math.Round(float64(dropCount)*10000/float64(baseSessionsCount)) / 100 + + links = append(links, Link{ + Source: nid, + Target: nodeID, + SessionsCount: int(dropCount), + Value: percent, + EventType: "DROP", + }) + } + + // Link previous drop node to current drop node to show accumulation + if i > 0 { + for j := i - 1; j >= 0; j-- { + prevStage := stagesWithData[j] + if prevDropID, hasPrevDrop := dropNodes[prevStage]; hasPrevDrop { + // Link previous drop to current drop to show accumulation + prevDropCount := uint64(0) + for _, link := range links { + if link.Target == prevDropID && link.EventType == "DROP" { + prevDropCount += uint64(link.SessionsCount) + } + } + + percent := math.Round(float64(prevDropCount)*10000/float64(baseSessionsCount)) / 100 + links = append(links, Link{ Source: prevDropID, Target: nodeID, - SessionsCount: int(cumulativeDrops[stage-1]), + SessionsCount: int(prevDropCount), Value: percent, EventType: "DROP", }) + break } } - - nodeID++ } + + nodeID++ } - // Filter and reindex + // Filter out nodes with no connections nodeHasConnection := make(map[int]bool) for _, link := range links { nodeHasConnection[link.Source] = true nodeHasConnection[link.Target] = true } + // Make sure central nodes are included even if they don't have links + for _, node := range nodes { + if node.Depth == centralDepth { + nodeHasConnection[node.ID] = true + } + } + var filteredNodes []Node for _, node := range nodes { if nodeHasConnection[node.ID] { @@ -386,7 +508,7 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac } } - // Reassign IDs + // Reassign IDs to be sequential nodeIDMap := make(map[int]int) var finalNodes []Node @@ -396,7 +518,7 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac finalNodes = append(finalNodes, node) } - // Update links + // Update link references var finalLinks []Link for _, link := range links { srcID, srcExists := nodeIDMap[link.Source] @@ -428,7 +550,15 @@ func (h UserJourneyQueryBuilder) buildQuery(p Payload) (string, error) { startConds, _ := buildEventConditions(p.StartPoint, BuildConditionsOptions{DefinedColumns: mainColumns, MainTableAlias: "e"}) excludeConds, _ := buildEventConditions(p.Exclude, BuildConditionsOptions{DefinedColumns: mainColumns, MainTableAlias: "e"}) - firstBase := []string{`e."$event_name" = 'LOCATION'`} + // use first element of StartPoint array for starting event + var startEvent string + if len(p.StartPoint) > 0 { + startEvent = string(p.StartPoint[0].Type) + } else { + startEvent = events[0] + } + + firstBase := []string{fmt.Sprintf("e.\"$event_name\" = '%s'", startEvent)} if len(startConds) > 0 { firstBase = append(firstBase, startConds...) } @@ -449,8 +579,19 @@ func (h UserJourneyQueryBuilder) buildQuery(p Payload) (string, error) { fmt.Sprintf("e.project_id = %d", p.ProjectId), ) + startTime := time.Unix(p.StartTimestamp/1000, 0).UTC().Format("2006-01-02 15:04:05") endTime := time.Unix(p.EndTimestamp/1000, 0).UTC().Format("2006-01-02 15:04:05") + previousColumns := p.PreviousColumns + if previousColumns <= 0 { + previousColumns = 0 + } + + maxCols := p.Columns + if maxCols > 0 { + maxCols++ + } + q := fmt.Sprintf(`WITH first_hits AS ( SELECT session_id, MIN(created_at) AS start_time @@ -458,18 +599,18 @@ func (h UserJourneyQueryBuilder) buildQuery(p Payload) (string, error) { WHERE %s GROUP BY session_id ), - journey_events AS ( + journey_events_after AS ( SELECT e.session_id, e.distinct_id, e."$event_name" AS event_name, e.created_at, - multiIf( - e."$event_name" = 'LOCATION', JSONExtractString(toString(e."$properties"), 'url_path'), - e."$event_name" = 'CLICK', JSONExtractString(toString(e."$properties"), 'label'), - e."$event_name" = 'INPUT', JSONExtractString(toString(e."$properties"), 'label'), - NULL - ) AS event_property + CASE + WHEN e."$event_name" = 'LOCATION' THEN JSONExtractString(toString(e."$properties"), 'url_path') + WHEN e."$event_name" = 'CLICK' THEN JSONExtractString(toString(e."$properties"), 'label') + WHEN e."$event_name" = 'INPUT' THEN JSONExtractString(toString(e."$properties"), 'label') + ELSE NULL + END AS event_property FROM product_analytics.events AS e JOIN first_hits AS f USING(session_id) WHERE @@ -477,6 +618,31 @@ func (h UserJourneyQueryBuilder) buildQuery(p Payload) (string, error) { AND e.created_at <= toDateTime('%s') AND %s ), + journey_events_before AS ( + SELECT + e.session_id, + e.distinct_id, + e."$event_name" AS event_name, + e.created_at, + CASE + WHEN e."$event_name" = 'LOCATION' THEN JSONExtractString(toString(e."$properties"), 'url_path') + WHEN e."$event_name" = 'CLICK' THEN JSONExtractString(toString(e."$properties"), 'label') + WHEN e."$event_name" = 'INPUT' THEN JSONExtractString(toString(e."$properties"), 'label') + ELSE NULL + END AS event_property + FROM product_analytics.events AS e + JOIN first_hits AS f USING(session_id) + WHERE + e.created_at < f.start_time + AND e.created_at >= toDateTime('%s') + AND %s + AND %d > 0 -- Only fetch previous events if PreviousColumns > 0 + ), + journey_events_combined AS ( + SELECT *, 1 AS direction FROM journey_events_after + UNION ALL + SELECT *, -1 AS direction FROM journey_events_before + ), event_with_prev AS ( SELECT session_id, @@ -484,14 +650,19 @@ func (h UserJourneyQueryBuilder) buildQuery(p Payload) (string, error) { event_name, event_property, created_at, + direction, any(event_name) OVER (PARTITION BY session_id ORDER BY created_at ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) AS previous_event_name, any(event_property) OVER (PARTITION BY session_id ORDER BY created_at ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) AS previous_event_property - FROM journey_events + FROM journey_events_combined ), staged AS ( SELECT *, - sumIf(1, true) OVER (PARTITION BY session_id ORDER BY created_at ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS stage + CASE + WHEN direction = 1 THEN toInt64(sumIf(1, true) OVER (PARTITION BY session_id, direction ORDER BY created_at ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) + WHEN direction = -1 THEN -1 * toInt64(sumIf(1, true) OVER (PARTITION BY session_id, direction ORDER BY created_at DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) + ELSE 0 + END AS stage FROM event_with_prev ) SELECT @@ -502,7 +673,7 @@ SELECT COALESCE(previous_event_property, '') AS previous_event_property, COUNT(DISTINCT session_id) AS sessions_count FROM staged -WHERE stage <= %d +WHERE stage <= %d AND stage >= -%d GROUP BY stage, event_name, @@ -513,7 +684,11 @@ ORDER BY stage, COUNT(DISTINCT session_id) DESC;`, strings.Join(firstBase, " AND "), endTime, strings.Join(journeyBase, " AND "), - p.Columns, + startTime, + strings.Join(journeyBase, " AND "), + previousColumns, + maxCols, + previousColumns, ) return q, nil } diff --git a/backend/pkg/analytics/charts/model.go b/backend/pkg/analytics/charts/model.go index 0bde5e62f..959fa2537 100644 --- a/backend/pkg/analytics/charts/model.go +++ b/backend/pkg/analytics/charts/model.go @@ -60,22 +60,23 @@ const ( ) type MetricPayload struct { - StartTimestamp int64 `json:"startTimestamp"` - EndTimestamp int64 `json:"endTimestamp"` - Density int `json:"density"` - MetricOf string `json:"metricOf"` - MetricType MetricType `json:"metricType"` - MetricValue []string `json:"metricValue"` - MetricFormat string `json:"metricFormat"` - ViewType string `json:"viewType"` - Name string `json:"name"` - Series []Series `json:"series"` - Limit int `json:"limit"` - Page int `json:"page"` - StartPoint []Filter `json:"startPoint"` - Exclude []Filter `json:"exclude"` - Rows uint64 `json:"rows"` - Columns uint64 `json:"columns"` + StartTimestamp int64 `json:"startTimestamp"` + EndTimestamp int64 `json:"endTimestamp"` + Density int `json:"density"` + MetricOf string `json:"metricOf"` + MetricType MetricType `json:"metricType"` + MetricValue []string `json:"metricValue"` + MetricFormat string `json:"metricFormat"` + ViewType string `json:"viewType"` + Name string `json:"name"` + Series []Series `json:"series"` + Limit int `json:"limit"` + Page int `json:"page"` + StartPoint []Filter `json:"startPoint"` + Exclude []Filter `json:"exclude"` + Rows uint64 `json:"rows"` + Columns uint64 `json:"columns"` + PreviousColumns uint64 `json:"previousColumns"` } type MetricOfTable string