feat(product_analytics): user journey - wip

This commit is contained in:
Shekar Siri 2025-05-12 12:07:32 +02:00
parent 1d30b4d4cb
commit f789ee1bda
2 changed files with 287 additions and 111 deletions

View file

@ -11,10 +11,11 @@ import (
// Node represents a point in the journey diagram.
type Node struct {
Depth int `json:"depth"`
Name string `json:"name"`
EventType string `json:"eventType"`
ID int `json:"id"`
Depth int `json:"depth"`
Name string `json:"name"`
EventType string `json:"eventType"`
ID int `json:"id"`
StartingNode bool `json:"startingNode"`
}
// Link represents a transition between nodes.
@ -52,7 +53,7 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac
defer rows.Close()
type row struct {
Stage uint64
Stage int64
CurrentEventName string
CurrentEventProperty string
PrevEventName string
@ -82,24 +83,47 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac
rawData = append(rawData, r)
}
// Group data by stage and determine max stage
dataByStage := make(map[uint64][]row)
var maxStage uint64 = 0
// Group data by stage
dataByStage := make(map[int64][]row)
var minStage int64 = 0
var maxStage int64 = 0
for _, r := range rawData {
dataByStage[r.Stage] = append(dataByStage[r.Stage], r)
if r.Stage > maxStage {
maxStage = r.Stage
}
if r.Stage < minStage {
minStage = r.Stage
}
}
// Calculate total sessions per stage
stageTotals := make(map[uint64]uint64)
stageTotals := make(map[int64]uint64)
for stage, stageRows := range dataByStage {
for _, r := range stageRows {
stageTotals[stage] += r.SessionsCount
}
}
initialCount := stageTotals[1]
// Determine base count for percentage calculations
// We'll use the starting point (usually stage 1) as our base
var baseSessionsCount uint64
if count, exists := stageTotals[1]; exists {
baseSessionsCount = count
} else {
// If stage 1 doesn't exist, use the first available positive stage
for stage := int64(0); stage <= maxStage; stage++ {
if count, exists := stageTotals[stage]; exists {
baseSessionsCount = count
break
}
}
}
if baseSessionsCount == 0 {
baseSessionsCount = 1 // Prevent division by zero
}
// Number of top nodes to display per stage
topLimit := int(p.Rows)
@ -114,10 +138,20 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac
}
// Map to store top paths for each stage
topPathsByStage := make(map[uint64]map[pathKey]bool)
pathCountsByStage := make(map[uint64]map[pathKey]uint64)
topPathsByStage := make(map[int64]map[pathKey]bool)
pathCountsByStage := make(map[int64]map[pathKey]uint64)
for stage := minStage; stage <= maxStage; stage++ {
// Skip if this stage has no data
if _, exists := dataByStage[stage]; !exists {
continue
}
// Sort rows within each stage by session count (descending)
sort.Slice(dataByStage[stage], func(i, j int) bool {
return dataByStage[stage][i].SessionsCount > dataByStage[stage][j].SessionsCount
})
for stage := uint64(1); stage <= maxStage; stage++ {
// Initialize maps for this stage
topPathsByStage[stage] = make(map[pathKey]bool)
pathCountsByStage[stage] = make(map[pathKey]uint64)
@ -144,7 +178,7 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac
return paths[i].count > paths[j].count
})
// Mark top paths
// Mark top paths - take exactly topLimit or all if fewer available
for i, pc := range paths {
if i < topLimit {
topPathsByStage[stage][pc.path] = true
@ -152,35 +186,78 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac
}
}
// Step 2: Create nodes and track sessions
// Step 2: Create a normalized sequential depth mapping
// First, gather all stages that have data
var stagesWithData []int64
for stage := range dataByStage {
stagesWithData = append(stagesWithData, stage)
}
// Sort stages
sort.Slice(stagesWithData, func(i, j int) bool {
return stagesWithData[i] < stagesWithData[j]
})
var startingStage int64
for _, s := range stagesWithData {
if s > 0 {
startingStage = s
break
}
}
// Create a mapping from logical stage to display depth (ensuring no gaps)
stageToDepth := make(map[int64]int)
for i, stage := range stagesWithData {
stageToDepth[stage] = i
}
// Determine depth of central node (stage 1 or equivalent)
var centralDepth int
if depth, exists := stageToDepth[1]; exists {
centralDepth = depth
} else {
// If stage 1 doesn't exist, use the first positive stage
for _, stage := range stagesWithData {
if stage > 0 {
centralDepth = stageToDepth[stage]
break
}
}
}
// Step 3: Create nodes with normalized depths
var nodes []Node
var links []Link
nodeID := 0
// Maps to track nodes and sessions
nodeMap := make(map[string]int) // Stage|EventName|EventProp → nodeID
othersNodes := make(map[uint64]int) // stage → "Others" nodeID
dropNodes := make(map[uint64]int) // stage → "Drop" nodeID
nodeMap := make(map[string]int) // Stage|EventName|EventProp → nodeID
othersNodes := make(map[int64]int) // stage → "Others" nodeID
dropNodes := make(map[int64]int) // stage → "Drop" nodeID
incomingSessions := make(map[int]uint64) // nodeID → incoming sessions
outgoingSessions := make(map[int]uint64) // nodeID → outgoing sessions
// Create all nodes first
for stage := uint64(1); stage <= maxStage; stage++ {
// Create all nodes using normalized depths
for _, stage := range stagesWithData {
displayDepth := stageToDepth[stage]
// Create regular nodes for top paths
for path := range topPathsByStage[stage] {
nodeKey := fmt.Sprintf("%d|%s|%s", stage, path.eventName, path.eventProp)
nodeMap[nodeKey] = nodeID
nodes = append(nodes, Node{
ID: nodeID,
Depth: int(stage) - 1,
Name: path.eventProp,
EventType: path.eventName,
ID: nodeID,
Depth: displayDepth,
Name: path.eventProp,
EventType: path.eventName,
StartingNode: stage == startingStage,
})
// For stage 1, set incoming sessions
if stage == 1 {
// For the central stage (usually stage 1) or first stage, set incoming sessions
if (stage == 1) || (stage == minStage && minStage != 1) {
incomingSessions[nodeID] = pathCountsByStage[stage][path]
}
@ -203,14 +280,15 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac
othersNodes[stage] = nodeID
nodes = append(nodes, Node{
ID: nodeID,
Depth: int(stage) - 1,
Name: "other",
EventType: "OTHER",
ID: nodeID,
Depth: displayDepth,
Name: "other",
EventType: "OTHER",
StartingNode: stage == startingStage,
})
// For stage 1, set incoming sessions for Others
if stage == 1 {
// For the central stage or first stage, set incoming sessions for Others
if (stage == 1) || (stage == minStage && minStage != 1) {
incomingSessions[nodeID] = othersCount
}
@ -219,7 +297,7 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac
}
}
// Step 3: Create links between nodes
// Step 4: Create links between adjacent nodes only
// Use a map to deduplicate links
type linkKey struct {
src int
@ -228,10 +306,18 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac
linkSessions := make(map[linkKey]uint64)
linkTypes := make(map[linkKey]string)
for stage := uint64(2); stage <= maxStage; stage++ {
for _, r := range dataByStage[stage] {
// For each stage (except the first), create links from the previous stage
for i := 1; i < len(stagesWithData); i++ {
currentStage := stagesWithData[i]
prevStage := stagesWithData[i-1]
for _, r := range dataByStage[currentStage] {
// Skip if previous stage doesn't match expected
if r.Stage != currentStage {
continue
}
// Determine source node
prevStage := stage - 1
prevPathKey := fmt.Sprintf("%d|%s|%s", prevStage, r.PrevEventName, r.PrevEventProperty)
srcID, hasSrc := nodeMap[prevPathKey]
@ -252,14 +338,14 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac
var hasTgt bool
// Check if this path is in the top paths for this stage
if topPathsByStage[stage][curPath] {
if topPathsByStage[currentStage][curPath] {
// It's a top node
curPathKey := fmt.Sprintf("%d|%s|%s", stage, r.CurrentEventName, r.CurrentEventProperty)
curPathKey := fmt.Sprintf("%d|%s|%s", currentStage, r.CurrentEventName, r.CurrentEventProperty)
tgtID = nodeMap[curPathKey]
hasTgt = true
} else {
// It's part of Others
if othersID, hasOthers := othersNodes[stage]; hasOthers {
if othersID, hasOthers := othersNodes[currentStage]; hasOthers {
tgtID = othersID
hasTgt = true
}
@ -284,9 +370,11 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac
}
}
// Create deduplicated links
// Create deduplicated links with proper percentages
for lk, count := range linkSessions {
percent := math.Round(float64(count)*10000/float64(initialCount)) / 100
// Calculate percentage based on baseSessionsCount
percent := math.Round(float64(count)*10000/float64(baseSessionsCount)) / 100
links = append(links, Link{
Source: lk.src,
Target: lk.tgt,
@ -296,17 +384,25 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac
})
}
// Step 4: Calculate drops and create drop nodes
cumulativeDrops := make(map[uint64]uint64)
// Step 5: Calculate drops and create drop nodes (only for stages ≥ 0)
// Process forward drops (positive stages only)
for i := 0; i < len(stagesWithData)-1; i++ {
stage := stagesWithData[i]
// Skip negative stages for drops
if stage < 0 {
continue
}
for stage := uint64(1); stage < maxStage; stage++ {
// Calculate new drops at this stage
stageDrops := uint64(0)
dropsFromNode := make(map[int]uint64) // nodeID -> drop count
for _, node := range nodes {
nodeStage := uint64(node.Depth) + 1
if nodeStage != stage {
nodeDepth := node.Depth
// Skip if this node isn't in the current stage
if nodeDepth != stageToDepth[stage] {
continue
}
@ -320,65 +416,91 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac
}
}
// Calculate cumulative drops
if stage == 1 {
cumulativeDrops[stage] = stageDrops
} else {
cumulativeDrops[stage] = cumulativeDrops[stage-1] + stageDrops
// Skip if no drops
if stageDrops == 0 {
continue
}
// Create drop node if there are drops
if cumulativeDrops[stage] > 0 {
dropNodes[stage] = nodeID
// Determine next stage depth for drop node positioning
var dropDepth int
if i+1 < len(stagesWithData) {
dropDepth = stageToDepth[stagesWithData[i+1]]
} else {
dropDepth = stageToDepth[stage] + 1
}
nodes = append(nodes, Node{
ID: nodeID,
Depth: int(stage), // Drop nodes appear at the next depth
Name: "drop",
EventType: "DROP",
})
// Create drop node
dropNodes[stage] = nodeID
// Create links from nodes with drops to the drop node
for nid, dropCount := range dropsFromNode {
if dropCount == 0 {
continue
}
nodes = append(nodes, Node{
ID: nodeID,
Depth: dropDepth,
Name: "drop",
EventType: "DROP",
})
percent := math.Round(float64(dropCount)*10000/float64(initialCount)) / 100
links = append(links, Link{
Source: nid,
Target: nodeID,
SessionsCount: int(dropCount),
Value: percent,
EventType: "DROP",
})
// Create links from nodes with drops to the drop node
for nid, dropCount := range dropsFromNode {
if dropCount == 0 {
continue
}
// Link previous drop node to carry forward drops
if stage > 1 && cumulativeDrops[stage-1] > 0 {
if prevDropID, hasPrevDrop := dropNodes[stage-1]; hasPrevDrop {
percent := math.Round(float64(cumulativeDrops[stage-1])*10000/float64(initialCount)) / 100
// Calculate percentage based on baseSessionsCount
percent := math.Round(float64(dropCount)*10000/float64(baseSessionsCount)) / 100
links = append(links, Link{
Source: nid,
Target: nodeID,
SessionsCount: int(dropCount),
Value: percent,
EventType: "DROP",
})
}
// Link previous drop node to current drop node to show accumulation
if i > 0 {
for j := i - 1; j >= 0; j-- {
prevStage := stagesWithData[j]
if prevDropID, hasPrevDrop := dropNodes[prevStage]; hasPrevDrop {
// Link previous drop to current drop to show accumulation
prevDropCount := uint64(0)
for _, link := range links {
if link.Target == prevDropID && link.EventType == "DROP" {
prevDropCount += uint64(link.SessionsCount)
}
}
percent := math.Round(float64(prevDropCount)*10000/float64(baseSessionsCount)) / 100
links = append(links, Link{
Source: prevDropID,
Target: nodeID,
SessionsCount: int(cumulativeDrops[stage-1]),
SessionsCount: int(prevDropCount),
Value: percent,
EventType: "DROP",
})
break
}
}
nodeID++
}
nodeID++
}
// Filter and reindex
// Filter out nodes with no connections
nodeHasConnection := make(map[int]bool)
for _, link := range links {
nodeHasConnection[link.Source] = true
nodeHasConnection[link.Target] = true
}
// Make sure central nodes are included even if they don't have links
for _, node := range nodes {
if node.Depth == centralDepth {
nodeHasConnection[node.ID] = true
}
}
var filteredNodes []Node
for _, node := range nodes {
if nodeHasConnection[node.ID] {
@ -386,7 +508,7 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac
}
}
// Reassign IDs
// Reassign IDs to be sequential
nodeIDMap := make(map[int]int)
var finalNodes []Node
@ -396,7 +518,7 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac
finalNodes = append(finalNodes, node)
}
// Update links
// Update link references
var finalLinks []Link
for _, link := range links {
srcID, srcExists := nodeIDMap[link.Source]
@ -428,7 +550,15 @@ func (h UserJourneyQueryBuilder) buildQuery(p Payload) (string, error) {
startConds, _ := buildEventConditions(p.StartPoint, BuildConditionsOptions{DefinedColumns: mainColumns, MainTableAlias: "e"})
excludeConds, _ := buildEventConditions(p.Exclude, BuildConditionsOptions{DefinedColumns: mainColumns, MainTableAlias: "e"})
firstBase := []string{`e."$event_name" = 'LOCATION'`}
// use first element of StartPoint array for starting event
var startEvent string
if len(p.StartPoint) > 0 {
startEvent = string(p.StartPoint[0].Type)
} else {
startEvent = events[0]
}
firstBase := []string{fmt.Sprintf("e.\"$event_name\" = '%s'", startEvent)}
if len(startConds) > 0 {
firstBase = append(firstBase, startConds...)
}
@ -449,8 +579,19 @@ func (h UserJourneyQueryBuilder) buildQuery(p Payload) (string, error) {
fmt.Sprintf("e.project_id = %d", p.ProjectId),
)
startTime := time.Unix(p.StartTimestamp/1000, 0).UTC().Format("2006-01-02 15:04:05")
endTime := time.Unix(p.EndTimestamp/1000, 0).UTC().Format("2006-01-02 15:04:05")
previousColumns := p.PreviousColumns
if previousColumns <= 0 {
previousColumns = 0
}
maxCols := p.Columns
if maxCols > 0 {
maxCols++
}
q := fmt.Sprintf(`WITH
first_hits AS (
SELECT session_id, MIN(created_at) AS start_time
@ -458,18 +599,18 @@ func (h UserJourneyQueryBuilder) buildQuery(p Payload) (string, error) {
WHERE %s
GROUP BY session_id
),
journey_events AS (
journey_events_after AS (
SELECT
e.session_id,
e.distinct_id,
e."$event_name" AS event_name,
e.created_at,
multiIf(
e."$event_name" = 'LOCATION', JSONExtractString(toString(e."$properties"), 'url_path'),
e."$event_name" = 'CLICK', JSONExtractString(toString(e."$properties"), 'label'),
e."$event_name" = 'INPUT', JSONExtractString(toString(e."$properties"), 'label'),
NULL
) AS event_property
CASE
WHEN e."$event_name" = 'LOCATION' THEN JSONExtractString(toString(e."$properties"), 'url_path')
WHEN e."$event_name" = 'CLICK' THEN JSONExtractString(toString(e."$properties"), 'label')
WHEN e."$event_name" = 'INPUT' THEN JSONExtractString(toString(e."$properties"), 'label')
ELSE NULL
END AS event_property
FROM product_analytics.events AS e
JOIN first_hits AS f USING(session_id)
WHERE
@ -477,6 +618,31 @@ func (h UserJourneyQueryBuilder) buildQuery(p Payload) (string, error) {
AND e.created_at <= toDateTime('%s')
AND %s
),
journey_events_before AS (
SELECT
e.session_id,
e.distinct_id,
e."$event_name" AS event_name,
e.created_at,
CASE
WHEN e."$event_name" = 'LOCATION' THEN JSONExtractString(toString(e."$properties"), 'url_path')
WHEN e."$event_name" = 'CLICK' THEN JSONExtractString(toString(e."$properties"), 'label')
WHEN e."$event_name" = 'INPUT' THEN JSONExtractString(toString(e."$properties"), 'label')
ELSE NULL
END AS event_property
FROM product_analytics.events AS e
JOIN first_hits AS f USING(session_id)
WHERE
e.created_at < f.start_time
AND e.created_at >= toDateTime('%s')
AND %s
AND %d > 0 -- Only fetch previous events if PreviousColumns > 0
),
journey_events_combined AS (
SELECT *, 1 AS direction FROM journey_events_after
UNION ALL
SELECT *, -1 AS direction FROM journey_events_before
),
event_with_prev AS (
SELECT
session_id,
@ -484,14 +650,19 @@ func (h UserJourneyQueryBuilder) buildQuery(p Payload) (string, error) {
event_name,
event_property,
created_at,
direction,
any(event_name) OVER (PARTITION BY session_id ORDER BY created_at ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) AS previous_event_name,
any(event_property) OVER (PARTITION BY session_id ORDER BY created_at ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) AS previous_event_property
FROM journey_events
FROM journey_events_combined
),
staged AS (
SELECT
*,
sumIf(1, true) OVER (PARTITION BY session_id ORDER BY created_at ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS stage
CASE
WHEN direction = 1 THEN toInt64(sumIf(1, true) OVER (PARTITION BY session_id, direction ORDER BY created_at ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW))
WHEN direction = -1 THEN -1 * toInt64(sumIf(1, true) OVER (PARTITION BY session_id, direction ORDER BY created_at DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW))
ELSE 0
END AS stage
FROM event_with_prev
)
SELECT
@ -502,7 +673,7 @@ SELECT
COALESCE(previous_event_property, '') AS previous_event_property,
COUNT(DISTINCT session_id) AS sessions_count
FROM staged
WHERE stage <= %d
WHERE stage <= %d AND stage >= -%d
GROUP BY
stage,
event_name,
@ -513,7 +684,11 @@ ORDER BY stage, COUNT(DISTINCT session_id) DESC;`,
strings.Join(firstBase, " AND "),
endTime,
strings.Join(journeyBase, " AND "),
p.Columns,
startTime,
strings.Join(journeyBase, " AND "),
previousColumns,
maxCols,
previousColumns,
)
return q, nil
}

View file

@ -60,22 +60,23 @@ const (
)
type MetricPayload struct {
StartTimestamp int64 `json:"startTimestamp"`
EndTimestamp int64 `json:"endTimestamp"`
Density int `json:"density"`
MetricOf string `json:"metricOf"`
MetricType MetricType `json:"metricType"`
MetricValue []string `json:"metricValue"`
MetricFormat string `json:"metricFormat"`
ViewType string `json:"viewType"`
Name string `json:"name"`
Series []Series `json:"series"`
Limit int `json:"limit"`
Page int `json:"page"`
StartPoint []Filter `json:"startPoint"`
Exclude []Filter `json:"exclude"`
Rows uint64 `json:"rows"`
Columns uint64 `json:"columns"`
StartTimestamp int64 `json:"startTimestamp"`
EndTimestamp int64 `json:"endTimestamp"`
Density int `json:"density"`
MetricOf string `json:"metricOf"`
MetricType MetricType `json:"metricType"`
MetricValue []string `json:"metricValue"`
MetricFormat string `json:"metricFormat"`
ViewType string `json:"viewType"`
Name string `json:"name"`
Series []Series `json:"series"`
Limit int `json:"limit"`
Page int `json:"page"`
StartPoint []Filter `json:"startPoint"`
Exclude []Filter `json:"exclude"`
Rows uint64 `json:"rows"`
Columns uint64 `json:"columns"`
PreviousColumns uint64 `json:"previousColumns"`
}
type MetricOfTable string