feat(product_analytics): funnels card

This commit is contained in:
Shekar Siri 2025-04-25 18:04:10 +02:00
parent 6e57d2105d
commit 4204b41dbd
3 changed files with 173 additions and 20 deletions

View file

@ -1,9 +1,159 @@
package charts package charts
import "openreplay/backend/pkg/analytics/db" import (
"fmt"
"openreplay/backend/pkg/analytics/db"
"strings"
)
type FunnelStepResult struct {
LevelNumber uint64 `json:"step"`
StepName string `json:"type"`
CountAtLevel uint64 `json:"count"`
}
type FunnelResponse struct {
Steps []FunnelStepResult `json:"stages"`
}
type FunnelQueryBuilder struct{} type FunnelQueryBuilder struct{}
func (f FunnelQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) { func (f FunnelQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) {
return "-- Funnel query placeholder", nil q, err := f.buildQuery(p)
if err != nil {
return nil, err
}
rows, err := conn.Query(q)
if err != nil {
return nil, err
}
defer rows.Close()
var steps []FunnelStepResult
for rows.Next() {
var r FunnelStepResult
if err := rows.Scan(&r.LevelNumber, &r.StepName, &r.CountAtLevel); err != nil {
return nil, err
}
steps = append(steps, r)
}
return FunnelResponse{Steps: steps}, nil
}
func (f FunnelQueryBuilder) buildQuery(p Payload) (string, error) {
if len(p.MetricPayload.Series) == 0 {
return "", fmt.Errorf("series empty")
}
s := p.MetricPayload.Series[0]
metricFormat := p.MetricPayload.MetricFormat
// separate global vs step filters based on IsEvent flag
var globalFilters []Filter
var eventFilters []Filter
for _, flt := range s.Filter.Filters {
if flt.IsEvent {
eventFilters = append(eventFilters, flt)
} else {
globalFilters = append(globalFilters, flt)
}
}
// Global filters
globalConds, globalNames := buildEventConditions(globalFilters, BuildConditionsOptions{
DefinedColumns: mainColumns,
MainTableAlias: "e",
})
base := []string{
fmt.Sprintf("e.created_at >= toDateTime(%d/1000)", p.MetricPayload.StartTimestamp),
fmt.Sprintf("e.created_at < toDateTime(%d/1000)", p.MetricPayload.EndTimestamp+86400000),
"s.duration > 0",
fmt.Sprintf("e.project_id = %d", p.ProjectId),
}
base = append(base, globalConds...)
if len(globalNames) > 0 {
base = append(base, "e.`$event_name` IN ("+buildInClause(globalNames)+")")
}
// Build steps and per-step conditions only for eventFilters
var stepNames []string
var stepExprs []string
for i, filter := range eventFilters {
// Step name from filter type
stepNames = append(stepNames, fmt.Sprintf("'%s'", filter.Type))
exprs, _ := buildEventConditions([]Filter{filter}, BuildConditionsOptions{DefinedColumns: mainColumns})
// replace main.$properties references
for j, c := range exprs {
c = strings.ReplaceAll(c, "toString(main.`$properties`)", "properties")
c = strings.ReplaceAll(c, "main.`$properties`", "properties")
// wrap JSON for JSONExtractString
c = strings.ReplaceAll(c, "JSONExtractString(properties", "JSONExtractString(toString(properties)")
exprs[j] = c
}
var expr string
if len(exprs) > 0 {
expr = fmt.Sprintf("(event_name = funnel_steps[%d] AND %s)", i+1, strings.Join(exprs, " AND "))
} else {
expr = fmt.Sprintf("(event_name = funnel_steps[%d])", i+1)
}
stepExprs = append(stepExprs, expr)
}
stepsArr := "[" + strings.Join(stepNames, ",") + "]"
windowArgs := strings.Join(stepExprs, ",")
// Compose WHERE clause
where := strings.Join(base, " AND ")
// Final query
q := fmt.Sprintf(`
WITH
%s AS funnel_steps,
86400 AS funnel_window_seconds,
events_for_funnel AS (
SELECT
e.created_at,
e."$event_name" AS event_name,
e."$properties" AS properties,
e.session_id,
e.distinct_id,
s.user_id AS session_user_id,
if('%s' = 'sessionCount', toString(e.session_id), coalesce(nullif(s.user_id,''),e.distinct_id)) AS entity_id
FROM product_analytics.events AS e
JOIN experimental.sessions AS s USING(session_id)
WHERE %s
),
funnel_levels_reached AS (
SELECT
entity_id,
windowFunnel(funnel_window_seconds)(
toDateTime(created_at),
%s
) AS max_level
FROM events_for_funnel
GROUP BY entity_id
),
counts_by_level AS (
SELECT
seq.number + 1 AS level_number,
countDistinctIf(entity_id, max_level >= seq.number + 1) AS cnt
FROM funnel_levels_reached
CROSS JOIN numbers(length(funnel_steps)) AS seq
GROUP BY seq.number
),
step_list AS (
SELECT
seq.number + 1 AS level_number,
funnel_steps[seq.number + 1] AS step_name
FROM numbers(length(funnel_steps)) AS seq
)
SELECT
s.level_number,
s.step_name,
ifNull(c.cnt, 0) AS count_at_level
FROM step_list AS s
LEFT JOIN counts_by_level AS c ON s.level_number = c.level_number
ORDER BY s.level_number;
`, stepsArr, metricFormat, where, windowArgs)
return q, nil
} }

View file

@ -44,10 +44,11 @@ var propertySelectorMap = map[string]string{
} }
var mainColumns = map[string]string{ var mainColumns = map[string]string{
"user_browser": "$browser", "userBrowser": "$browser",
"user_device": "$device_type", "userDevice": "$device_type",
"user_country": "$country", "userCountry": "$country",
"referrer": "$referrer", "referrer": "$referrer",
// TODO add more columns if needed
} }
func (t TableQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) { func (t TableQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) {

View file

@ -42,6 +42,8 @@ func partitionFilters(filters []Filter) (sessionFilters []Filter, eventFilters [
} }
var validFilterTypes = map[FilterType]struct{}{ var validFilterTypes = map[FilterType]struct{}{
"LOCATION": {},
"CLICK": {},
FilterClick: {}, FilterClick: {},
FilterInput: {}, FilterInput: {},
FilterLocation: {}, FilterLocation: {},
@ -78,17 +80,13 @@ type filterConfig struct {
IsNumeric bool IsNumeric bool
} }
var filterTypeConfigs = map[FilterType]filterConfig{ var propertyKeyMap = map[string]filterConfig{
FilterClick: {LogicalProperty: "label", EventName: "CLICK"}, "LOCATION": {LogicalProperty: "url_path"},
FilterInput: {LogicalProperty: "label", EventName: "INPUT"}, "CLICK": {LogicalProperty: "label"},
FilterLocation: {LogicalProperty: "url_path", EventName: "LOCATION"}, "INPUT": {LogicalProperty: "label"},
FilterCustom: {LogicalProperty: "name", EventName: "CUSTOM"},
FilterTag: {LogicalProperty: "tag", EventName: "TAG"},
}
var nestedFilterTypeConfigs = map[string]filterConfig{
"fetchUrl": {LogicalProperty: "url_path"}, "fetchUrl": {LogicalProperty: "url_path"},
"fetchStatusCode": {LogicalProperty: "status", IsNumeric: true}, "fetchStatusCode": {LogicalProperty: "status", IsNumeric: true},
// TODO add more mappings as needed
} }
func getColumnAccessor(logicalProp string, isNumeric bool, opts BuildConditionsOptions) string { func getColumnAccessor(logicalProp string, isNumeric bool, opts BuildConditionsOptions) string {
@ -126,14 +124,14 @@ func buildEventConditions(filters []Filter, options ...BuildConditionsOptions) (
for _, f := range filters { for _, f := range filters {
_, isValidType := validFilterTypes[f.Type] _, isValidType := validFilterTypes[f.Type]
if !isValidType || !f.IsEvent { if !isValidType {
continue continue
} }
if f.Type == FilterFetch { if f.Type == FilterFetch {
var fetchConds []string var fetchConds []string
for _, nf := range f.Filters { for _, nf := range f.Filters {
nestedConfig, ok := nestedFilterTypeConfigs[string(nf.Type)] nestedConfig, ok := propertyKeyMap[string(nf.Type)]
if !ok { if !ok {
continue continue
} }
@ -149,16 +147,20 @@ func buildEventConditions(filters []Filter, options ...BuildConditionsOptions) (
names = append(names, "REQUEST") names = append(names, "REQUEST")
} }
} else { } else {
config, ok := filterTypeConfigs[f.Type] config, ok := propertyKeyMap[string(f.Type)]
if !ok { if !ok {
continue config = filterConfig{
LogicalProperty: string(f.Type),
}
} }
accessor := getColumnAccessor(config.LogicalProperty, config.IsNumeric, opts) accessor := getColumnAccessor(config.LogicalProperty, config.IsNumeric, opts)
c := buildCond(accessor, f.Value, f.Operator) c := buildCond(accessor, f.Value, f.Operator)
if c != "" { if c != "" {
conds = append(conds, c) conds = append(conds, c)
names = append(names, config.EventName) if f.IsEvent {
names = append(names, string(f.Type))
}
} }
} }
} }