From 5d6d94ed4d5636f3d89fd47c7d99bb37d6973d3e Mon Sep 17 00:00:00 2001 From: Shekar Siri Date: Tue, 29 Apr 2025 16:05:07 +0200 Subject: [PATCH] feat(product_analytics): heatmaps wip --- backend/pkg/analytics/charts/metric_funnel.go | 116 +++++------ .../pkg/analytics/charts/metric_heatmaps.go | 99 ++++++++++ .../charts/metric_heatmaps_session.go | 82 ++++++++ backend/pkg/analytics/charts/model.go | 1 + backend/pkg/analytics/charts/query.go | 182 +++++++++--------- backend/pkg/analytics/db/connector.go | 11 ++ 6 files changed, 337 insertions(+), 154 deletions(-) create mode 100644 backend/pkg/analytics/charts/metric_heatmaps.go create mode 100644 backend/pkg/analytics/charts/metric_heatmaps_session.go diff --git a/backend/pkg/analytics/charts/metric_funnel.go b/backend/pkg/analytics/charts/metric_funnel.go index 9a562e352..f4e857f3f 100644 --- a/backend/pkg/analytics/charts/metric_funnel.go +++ b/backend/pkg/analytics/charts/metric_funnel.go @@ -3,7 +3,6 @@ package charts import ( "fmt" "openreplay/backend/pkg/analytics/db" - "strconv" "strings" ) @@ -49,33 +48,52 @@ func (f FunnelQueryBuilder) buildQuery(p Payload) (string, error) { s := p.MetricPayload.Series[0] metricFormat := p.MetricPayload.MetricFormat - // separate global vs step filters based on IsEvent flag - var globalFilters, eventFilters []Filter + // Separate global vs step filters + var globalFilters, stepFilters []Filter for _, flt := range s.Filter.Filters { if flt.IsEvent { - eventFilters = append(eventFilters, flt) + stepFilters = append(stepFilters, flt) } else { globalFilters = append(globalFilters, flt) } } - // extract duration filter - var minDur, maxDur int64 - for i := len(globalFilters) - 1; i >= 0; i-- { - if globalFilters[i].Type == "duration" { - vals := globalFilters[i].Value // []string - if len(vals) == 2 { - minDur, _ = strconv.ParseInt(vals[0], 10, 64) - maxDur, _ = strconv.ParseInt(vals[1], 10, 64) + // 1. Collect required mainColumns from all filters (including nested) + requiredColumns := make(map[string]struct{}) + var collectColumns func([]Filter) + collectColumns = func(filters []Filter) { + for _, flt := range filters { + if col, ok := mainColumns[string(flt.Type)]; ok { + requiredColumns[col] = struct{}{} } - globalFilters = append(globalFilters[:i], globalFilters[i+1:]...) + collectColumns(flt.Filters) } } + collectColumns(globalFilters) + collectColumns(stepFilters) - // Global filters - globalConds, globalNames := buildEventConditions(globalFilters, BuildConditionsOptions{ - DefinedColumns: mainColumns, - MainTableAlias: "e", + // 2. Build SELECT clause for CTE + selectCols := []string{ + `e.created_at`, + `e."$event_name" AS event_name`, + `e."$properties" AS properties`, + } + for col := range requiredColumns { + logical := reverseLookup(mainColumns, col) + selectCols = append(selectCols, fmt.Sprintf(`e."%s" AS %s`, col, logical)) + } + selectCols = append(selectCols, + `e.session_id`, + `e.distinct_id`, + `s.user_id AS session_user_id`, + fmt.Sprintf("if('%s' = 'sessionCount', toString(e.session_id), coalesce(nullif(s.user_id,''),e.distinct_id)) AS entity_id", metricFormat), + ) + + // 3. Global conditions + globalConds, _ := buildEventConditions(globalFilters, BuildConditionsOptions{ + DefinedColumns: cteColumnAliases(), // logical -> logical (CTE alias) + MainTableAlias: "e", + PropertiesColumnName: "$properties", }) base := []string{ fmt.Sprintf("e.created_at >= toDateTime(%d/1000)", p.MetricPayload.StartTimestamp), @@ -83,55 +101,37 @@ func (f FunnelQueryBuilder) buildQuery(p Payload) (string, error) { "s.duration > 0", fmt.Sprintf("e.project_id = %d", p.ProjectId), } - if maxDur > 0 { - base = append(base, fmt.Sprintf("s.duration BETWEEN %d AND %d", minDur, maxDur)) - } - base = append(base, globalConds...) - if len(globalNames) > 0 { - base = append(base, "e.`$event_name` IN ("+buildInClause(globalNames)+")") - } - - // Build steps and per-step conditions only for eventFilters - var stepNames []string - var stepExprs []string - for i, filter := range eventFilters { - stepNames = append(stepNames, fmt.Sprintf("'%s'", filter.Type)) - exprs, _ := buildEventConditions([]Filter{filter}, BuildConditionsOptions{DefinedColumns: mainColumns}) - for j, c := range exprs { - c = strings.ReplaceAll(c, "toString(main.`$properties`)", "properties") - c = strings.ReplaceAll(c, "main.`$properties`", "properties") - c = strings.ReplaceAll(c, "JSONExtractString(properties", "JSONExtractString(toString(properties)") - exprs[j] = c - } - var expr string - if len(exprs) > 0 { - expr = fmt.Sprintf("(event_name = funnel_steps[%d] AND %s)", i+1, strings.Join(exprs, " AND ")) - } else { - expr = fmt.Sprintf("(event_name = funnel_steps[%d])", i+1) - } - stepExprs = append(stepExprs, expr) - } - stepsArr := "[" + strings.Join(stepNames, ",") + "]" - windowArgs := strings.Join(stepExprs, ",") - - // Compose WHERE clause where := strings.Join(base, " AND ") - // Final query + // 4. Step conditions + var stepNames []string + var stepExprs []string + for i, filter := range stepFilters { + stepNames = append(stepNames, fmt.Sprintf("'%s'", filter.Type)) + stepConds, _ := buildEventConditions([]Filter{filter}, BuildConditionsOptions{ + DefinedColumns: cteColumnAliases(), // logical -> logical (CTE alias) + PropertiesColumnName: "properties", + MainTableAlias: "", + }) + + stepCondExprs := []string{fmt.Sprintf("event_name = funnel_steps[%d]", i+1)} + if len(stepConds) > 0 { + stepCondExprs = append(stepCondExprs, stepConds...) + } + stepExprs = append(stepExprs, fmt.Sprintf("(%s)", strings.Join(stepCondExprs, " AND "))) + } + + stepsArr := "[" + strings.Join(stepNames, ",") + "]" + windowArgs := strings.Join(stepExprs, ",\n ") + q := fmt.Sprintf(` WITH %s AS funnel_steps, 86400 AS funnel_window_seconds, events_for_funnel AS ( SELECT - e.created_at, - e."$event_name" AS event_name, - e."$properties" AS properties, - e.session_id, - e.distinct_id, - s.user_id AS session_user_id, - if('%s' = 'sessionCount', toString(e.session_id), coalesce(nullif(s.user_id,''),e.distinct_id)) AS entity_id + %s FROM product_analytics.events AS e JOIN experimental.sessions AS s USING(session_id) WHERE %s @@ -167,7 +167,7 @@ SELECT FROM step_list AS s LEFT JOIN counts_by_level AS c ON s.level_number = c.level_number ORDER BY s.level_number; -`, stepsArr, metricFormat, where, windowArgs) +`, stepsArr, strings.Join(selectCols, ",\n "), where, windowArgs) return q, nil } diff --git a/backend/pkg/analytics/charts/metric_heatmaps.go b/backend/pkg/analytics/charts/metric_heatmaps.go new file mode 100644 index 000000000..1acd4cd4c --- /dev/null +++ b/backend/pkg/analytics/charts/metric_heatmaps.go @@ -0,0 +1,99 @@ +package charts + +import ( + "fmt" + "openreplay/backend/pkg/analytics/db" + "strings" +) + +type HeatmapPoint struct { + NormalizedX float64 `json:"normalized_x"` + NormalizedY float64 `json:"normalized_y"` +} + +type HeatmapResponse struct { + Points []HeatmapPoint `json:"points"` +} + +type HeatmapQueryBuilder struct{} + +func (h HeatmapQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) { + q, err := h.buildQuery(p) + if err != nil { + return nil, err + } + rows, err := conn.Query(q) + if err != nil { + return nil, err + } + defer rows.Close() + + var pts []HeatmapPoint + for rows.Next() { + var x, y float64 + if err := rows.Scan(&x, &y); err != nil { + return nil, err + } + pts = append(pts, HeatmapPoint{x, y}) + } + + return HeatmapResponse{ + Points: pts, + }, nil +} + +func (h HeatmapQueryBuilder) buildQuery(p Payload) (string, error) { + if len(p.MetricPayload.Series) == 0 { + return "", fmt.Errorf("series empty") + } + s := p.MetricPayload.Series[0] + + var globalFilters, eventFilters []Filter + for _, flt := range s.Filter.Filters { + if flt.IsEvent { + eventFilters = append(eventFilters, flt) + } else { + globalFilters = append(globalFilters, flt) + } + } + + globalConds, globalNames := buildEventConditions(globalFilters, BuildConditionsOptions{ + DefinedColumns: mainColumns, + MainTableAlias: "e", + }) + + eventConds, eventNames := buildEventConditions(eventFilters, BuildConditionsOptions{ + DefinedColumns: mainColumns, + MainTableAlias: "e", + }) + + base := []string{ + fmt.Sprintf("e.created_at >= toDateTime(%d/1000)", p.MetricPayload.StartTimestamp), + fmt.Sprintf("e.created_at < toDateTime(%d/1000)", p.MetricPayload.EndTimestamp+86400000), + fmt.Sprintf("e.project_id = %d", p.ProjectId), + } + base = append(base, globalConds...) + if len(globalNames) > 0 { + base = append(base, "e.`$event_name` IN ("+buildInClause(globalNames)+")") + } + + if len(eventNames) > 0 { + base = append(base, "e.`$event_name` IN ("+buildInClause(eventNames)+")") + } + + base = append(base, eventConds...) + + where := strings.Join(base, " AND ") + + q := fmt.Sprintf(` +SELECT + + + JSONExtractFloat(toString(e."$properties"), 'normalized_x') AS normalized_x, + JSONExtractFloat(toString(e."$properties"), 'normalized_y') AS normalized_y +FROM product_analytics.events AS e +JOIN experimental.sessions AS s USING(session_id) +WHERE %s;`, where) + + return q, nil +} diff --git a/backend/pkg/analytics/charts/metric_heatmaps_session.go b/backend/pkg/analytics/charts/metric_heatmaps_session.go new file mode 100644 index 000000000..049a775e9 --- /dev/null +++ b/backend/pkg/analytics/charts/metric_heatmaps_session.go @@ -0,0 +1,82 @@ +package charts + +import ( + "fmt" + "openreplay/backend/pkg/analytics/db" + "strings" +) + +type HeatmapSessionResponse struct { + //Points []HeatmapPoint `json:"points"` + SessionID uint64 `json:"session_id"` +} + +type HeatmapSessionQueryBuilder struct{} + +func (h HeatmapSessionQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) { + shortestQ, err := h.buildQuery(p) + if err != nil { + return nil, err + } + var sid uint64 + row, err := conn.QueryRow(shortestQ) + if err != nil { + return nil, err + } + + if err := row.Scan(&sid); err != nil { + return nil, err + } + + return HeatmapSessionResponse{ + SessionID: sid, + }, nil +} + +func (h HeatmapSessionQueryBuilder) buildQuery(p Payload) (string, error) { + if len(p.MetricPayload.Series) == 0 { + return "", fmt.Errorf("series empty") + } + s := p.MetricPayload.Series[0] + + var globalFilters, eventFilters []Filter + for _, flt := range s.Filter.Filters { + if flt.IsEvent { + eventFilters = append(eventFilters, flt) + } else { + globalFilters = append(globalFilters, flt) + } + } + + globalConds, globalNames := buildEventConditions(globalFilters, BuildConditionsOptions{ + DefinedColumns: mainColumns, + MainTableAlias: "e", + }) + eventConds, _ := buildEventConditions(eventFilters, BuildConditionsOptions{ + DefinedColumns: mainColumns, + MainTableAlias: "e", + }) + + base := []string{ + fmt.Sprintf("e.created_at >= toDateTime(%d/1000)", p.MetricPayload.StartTimestamp), + fmt.Sprintf("e.created_at < toDateTime(%d/1000)", p.MetricPayload.EndTimestamp+86400000), + fmt.Sprintf("e.project_id = %d", p.ProjectId), + "e.\"$event_name\" = 'CLICK'", + } + base = append(base, globalConds...) + if len(globalNames) > 0 { + base = append(base, "e.`$event_name` IN ("+buildInClause(globalNames)+")") + } + base = append(base, eventConds...) + + where := strings.Join(base, " AND ") + + return fmt.Sprintf(` + SELECT + s.session_id + FROM product_analytics.events AS e + JOIN experimental.sessions AS s USING(session_id) + WHERE %s + ORDER BY s.duration ASC + LIMIT 1;`, where), nil +} diff --git a/backend/pkg/analytics/charts/model.go b/backend/pkg/analytics/charts/model.go index e22abcbe5..15425897b 100644 --- a/backend/pkg/analytics/charts/model.go +++ b/backend/pkg/analytics/charts/model.go @@ -48,6 +48,7 @@ const ( MetricTypeTimeseries MetricType = "timeseries" MetricTypeTable MetricType = "table" MetricTypeFunnel MetricType = "funnel" + MetricTypeHeatmap MetricType = "heatmaps" ) const ( diff --git a/backend/pkg/analytics/charts/query.go b/backend/pkg/analytics/charts/query.go index 5b59001df..1e62a3a8e 100644 --- a/backend/pkg/analytics/charts/query.go +++ b/backend/pkg/analytics/charts/query.go @@ -25,22 +25,13 @@ func NewQueryBuilder(p Payload) (QueryBuilder, error) { return FunnelQueryBuilder{}, nil case MetricTypeTable: return TableQueryBuilder{}, nil + case MetricTypeHeatmap: + return HeatmapQueryBuilder{}, nil default: return nil, fmt.Errorf("unknown metric type: %s", p.MetricType) } } -func partitionFilters(filters []Filter) (sessionFilters []Filter, eventFilters []Filter) { - for _, f := range filters { - if f.IsEvent { - eventFilters = append(eventFilters, f) - } else { - sessionFilters = append(sessionFilters, f) - } - } - return -} - var validFilterTypes = map[FilterType]struct{}{ "LOCATION": {}, "CLICK": {}, @@ -90,17 +81,16 @@ var propertyKeyMap = map[string]filterConfig{ } func getColumnAccessor(logicalProp string, isNumeric bool, opts BuildConditionsOptions) string { + // Use CTE alias if present in DefinedColumns if actualCol, ok := opts.DefinedColumns[logicalProp]; ok && actualCol != "" { - return fmt.Sprintf("%s.`%s`", opts.MainTableAlias, actualCol) + return actualCol } - + // Otherwise, extract from $properties JSON jsonFunc := "JSONExtractString" if isNumeric { - jsonFunc = "JSONExtractFloat" // Or JSONExtractInt, etc. + jsonFunc = "JSONExtractFloat" } - - return fmt.Sprintf("%s(toString(%s.`%s`), '%s')", - jsonFunc, opts.MainTableAlias, opts.PropertiesColumnName, logicalProp) + return fmt.Sprintf("%s(toString(%s), '%s')", jsonFunc, opts.PropertiesColumnName, logicalProp) } func buildEventConditions(filters []Filter, options ...BuildConditionsOptions) (conds, names []string) { @@ -109,7 +99,6 @@ func buildEventConditions(filters []Filter, options ...BuildConditionsOptions) ( PropertiesColumnName: "$properties", DefinedColumns: make(map[string]string), } - if len(options) > 0 { if options[0].MainTableAlias != "" { opts.MainTableAlias = options[0].MainTableAlias @@ -121,24 +110,21 @@ func buildEventConditions(filters []Filter, options ...BuildConditionsOptions) ( opts.DefinedColumns = options[0].DefinedColumns } } - for _, f := range filters { - _, isValidType := validFilterTypes[f.Type] - if !isValidType { + _, okType := validFilterTypes[f.Type] + if !okType { continue } - + // process main filter if f.Type == FilterFetch { var fetchConds []string for _, nf := range f.Filters { - nestedConfig, ok := propertyKeyMap[string(nf.Type)] + cfg, ok := propertyKeyMap[string(nf.Type)] if !ok { continue } - - accessor := getColumnAccessor(nestedConfig.LogicalProperty, nestedConfig.IsNumeric, opts) - c := buildCond(accessor, nf.Value, f.Operator) // Uses parent filter's operator - if c != "" { + acc := getColumnAccessor(cfg.LogicalProperty, cfg.IsNumeric, opts) + if c := buildCond(acc, nf.Value, f.Operator); c != "" { fetchConds = append(fetchConds, c) } } @@ -147,80 +133,35 @@ func buildEventConditions(filters []Filter, options ...BuildConditionsOptions) ( names = append(names, "REQUEST") } } else { - config, ok := propertyKeyMap[string(f.Type)] + cfg, ok := propertyKeyMap[string(f.Type)] if !ok { - config = filterConfig{ - LogicalProperty: string(f.Type), + cfg = filterConfig{LogicalProperty: string(f.Type)} + } + acc := getColumnAccessor(cfg.LogicalProperty, cfg.IsNumeric, opts) + + // when the Operator isAny or onAny just add the event name to the list + if f.Operator == "isAny" || f.Operator == "onAny" { + if f.IsEvent { + names = append(names, string(f.Type)) } + continue } - accessor := getColumnAccessor(config.LogicalProperty, config.IsNumeric, opts) - c := buildCond(accessor, f.Value, f.Operator) - if c != "" { + if c := buildCond(acc, f.Value, f.Operator); c != "" { conds = append(conds, c) if f.IsEvent { names = append(names, string(f.Type)) } } } - } - return -} -func buildEventConditionsX(filters []Filter) (conds, names []string) { - for _, f := range filters { - if f.IsEvent { - switch f.Type { - case FilterClick: - c := buildCond("JSONExtractString(toString(main.`$properties`), 'label')", f.Value, "is") - if c != "" { - conds = append(conds, c) - } - names = append(names, "CLICK") - case FilterInput: - c := buildCond("JSONExtractString(toString(main.`$properties`), 'label')", f.Value, f.Operator) - if c != "" { - conds = append(conds, c) - } - names = append(names, "INPUT") - case FilterLocation: - c := buildCond("JSONExtractString(toString(main.`$properties`), 'url_path')", f.Value, f.Operator) - if c != "" { - conds = append(conds, c) - } - names = append(names, "LOCATION") - case FilterCustom: - c := buildCond("JSONExtractString(toString(main.`$properties`), 'name')", f.Value, f.Operator) - if c != "" { - conds = append(conds, c) - } - names = append(names, "CUSTOM") - case FilterFetch: - var fetchConds []string - for _, nf := range f.Filters { - switch nf.Type { - case "fetchUrl": - c := buildCond("JSONExtractString(toString(main.`$properties`), 'url_path')", nf.Value, f.Operator) - if c != "" { - fetchConds = append(fetchConds, c) - } - case "fetchStatusCode": - c := buildCond("JSONExtractFloat(toString(main.`$properties`), 'status')", nf.Value, f.Operator) - if c != "" { - fetchConds = append(fetchConds, c) - } - } - } - if len(fetchConds) > 0 { - conds = append(conds, strings.Join(fetchConds, " AND ")) - } - names = append(names, "REQUEST") - case FilterTag: - c := buildCond("JSONExtractString(toString(main.`$properties`), 'tag')", f.Value, f.Operator) - if c != "" { - conds = append(conds, c) - } - names = append(names, "TAG") + // process sub-filters + if len(f.Filters) > 0 && f.Type != FilterFetch { + subOpts := opts // Inherit parent's options + subConds, subNames := buildEventConditions(f.Filters, subOpts) + if len(subConds) > 0 { + conds = append(conds, strings.Join(subConds, " AND ")) + names = append(names, subNames...) } } } @@ -285,6 +226,16 @@ func buildCond(expr string, values []string, operator string) string { for _, v := range values { conds = append(conds, fmt.Sprintf("%s ILIKE '%%%s%%'", expr, v)) } + if len(conds) > 1 { + return "(" + strings.Join(conds, " OR ") + ")" + } + return conds[0] + case "regex": + var conds []string + for _, v := range values { + conds = append(conds, fmt.Sprintf("match(%s, '%s')", expr, v)) + } + if len(conds) > 1 { return "(" + strings.Join(conds, " OR ") + ")" } @@ -316,12 +267,12 @@ func buildCond(expr string, values []string, operator string) string { return "(" + strings.Join(conds, " OR ") + ")" } return conds[0] - case "notEquals": + case "notEquals", "not", "off": if len(values) > 1 { return fmt.Sprintf("%s NOT IN (%s)", expr, buildInClause(values)) } return fmt.Sprintf("%s <> '%s'", expr, values[0]) - case "greaterThan": + case "greaterThan", "gt": var conds []string for _, v := range values { conds = append(conds, fmt.Sprintf("%s > '%s'", expr, v)) @@ -330,7 +281,7 @@ func buildCond(expr string, values []string, operator string) string { return "(" + strings.Join(conds, " OR ") + ")" } return conds[0] - case "greaterThanOrEqual": + case "greaterThanOrEqual", "gte": var conds []string for _, v := range values { conds = append(conds, fmt.Sprintf("%s >= '%s'", expr, v)) @@ -339,7 +290,7 @@ func buildCond(expr string, values []string, operator string) string { return "(" + strings.Join(conds, " OR ") + ")" } return conds[0] - case "lessThan": + case "lessThan", "lt": var conds []string for _, v := range values { conds = append(conds, fmt.Sprintf("%s < '%s'", expr, v)) @@ -348,7 +299,7 @@ func buildCond(expr string, values []string, operator string) string { return "(" + strings.Join(conds, " OR ") + ")" } return conds[0] - case "lessThanOrEqual": + case "lessThanOrEqual", "lte": var conds []string for _, v := range values { conds = append(conds, fmt.Sprintf("%s <= '%s'", expr, v)) @@ -367,7 +318,7 @@ func buildCond(expr string, values []string, operator string) string { return fmt.Sprintf("%s NOT IN (%s)", expr, buildInClause(values)) } return fmt.Sprintf("%s <> '%s'", expr, values[0]) - case "equals", "is": + case "equals", "is", "on": if len(values) > 1 { return fmt.Sprintf("%s IN (%s)", expr, buildInClause(values)) } @@ -471,3 +422,42 @@ func FillMissingDataPoints( } return results } + +func partitionFilters(filters []Filter) (sessionFilters []Filter, eventFilters []Filter) { + for _, f := range filters { + if f.IsEvent { + eventFilters = append(eventFilters, f) + } else { + sessionFilters = append(sessionFilters, f) + } + } + return +} + +// Returns a map: logical property -> CTE alias (e.g., "userBrowser" -> "userBrowser") +func cteColumnAliases() map[string]string { + aliases := make(map[string]string) + for logical := range mainColumns { + aliases[logical] = logical + } + return aliases +} + +// Returns a map: logical property -> source column (e.g., "userBrowser" -> "$browser") +func cteSourceColumns() map[string]string { + cols := make(map[string]string) + for logical, col := range mainColumns { + cols[logical] = col + } + return cols +} + +// Helper for reverse lookup (used for dynamic SELECT) +func reverseLookup(m map[string]string, value string) string { + for k, v := range m { + if v == value { + return k + } + } + return "" +} diff --git a/backend/pkg/analytics/db/connector.go b/backend/pkg/analytics/db/connector.go index 62b31068c..45983ee16 100644 --- a/backend/pkg/analytics/db/connector.go +++ b/backend/pkg/analytics/db/connector.go @@ -22,6 +22,7 @@ type TableResponse struct { type Connector interface { Stop() error Query(query string) (driver.Rows, error) + QueryRow(query string) (driver.Row, error) QueryArgs(query string, args map[string]interface{}) (driver.Rows, error) } @@ -64,6 +65,16 @@ func (c *connectorImpl) Query(query string) (driver.Rows, error) { return rows, nil } +func (c *connectorImpl) QueryRow(query string) (driver.Row, error) { + row := c.conn.QueryRow(context.Background(), query) + if err := row.Err(); err != nil { + return nil, err + } + //defer row.Close() + + return row, nil +} + func (c *connectorImpl) QueryArgs(query string, args map[string]interface{}) (driver.Rows, error) { rows, err := c.conn.Query(context.Background(), query, args) if err != nil {