feat(product_analytics): heatmaps wip

This commit is contained in:
Shekar Siri 2025-04-29 16:05:07 +02:00
parent c6076c5e7e
commit 5d6d94ed4d
6 changed files with 337 additions and 154 deletions

View file

@ -3,7 +3,6 @@ package charts
import (
"fmt"
"openreplay/backend/pkg/analytics/db"
"strconv"
"strings"
)
@ -49,33 +48,52 @@ func (f FunnelQueryBuilder) buildQuery(p Payload) (string, error) {
s := p.MetricPayload.Series[0]
metricFormat := p.MetricPayload.MetricFormat
// separate global vs step filters based on IsEvent flag
var globalFilters, eventFilters []Filter
// Separate global vs step filters
var globalFilters, stepFilters []Filter
for _, flt := range s.Filter.Filters {
if flt.IsEvent {
eventFilters = append(eventFilters, flt)
stepFilters = append(stepFilters, flt)
} else {
globalFilters = append(globalFilters, flt)
}
}
// extract duration filter
var minDur, maxDur int64
for i := len(globalFilters) - 1; i >= 0; i-- {
if globalFilters[i].Type == "duration" {
vals := globalFilters[i].Value // []string
if len(vals) == 2 {
minDur, _ = strconv.ParseInt(vals[0], 10, 64)
maxDur, _ = strconv.ParseInt(vals[1], 10, 64)
// 1. Collect required mainColumns from all filters (including nested)
requiredColumns := make(map[string]struct{})
var collectColumns func([]Filter)
collectColumns = func(filters []Filter) {
for _, flt := range filters {
if col, ok := mainColumns[string(flt.Type)]; ok {
requiredColumns[col] = struct{}{}
}
globalFilters = append(globalFilters[:i], globalFilters[i+1:]...)
collectColumns(flt.Filters)
}
}
collectColumns(globalFilters)
collectColumns(stepFilters)
// Global filters
globalConds, globalNames := buildEventConditions(globalFilters, BuildConditionsOptions{
DefinedColumns: mainColumns,
MainTableAlias: "e",
// 2. Build SELECT clause for CTE
selectCols := []string{
`e.created_at`,
`e."$event_name" AS event_name`,
`e."$properties" AS properties`,
}
for col := range requiredColumns {
logical := reverseLookup(mainColumns, col)
selectCols = append(selectCols, fmt.Sprintf(`e."%s" AS %s`, col, logical))
}
selectCols = append(selectCols,
`e.session_id`,
`e.distinct_id`,
`s.user_id AS session_user_id`,
fmt.Sprintf("if('%s' = 'sessionCount', toString(e.session_id), coalesce(nullif(s.user_id,''),e.distinct_id)) AS entity_id", metricFormat),
)
// 3. Global conditions
globalConds, _ := buildEventConditions(globalFilters, BuildConditionsOptions{
DefinedColumns: cteColumnAliases(), // logical -> logical (CTE alias)
MainTableAlias: "e",
PropertiesColumnName: "$properties",
})
base := []string{
fmt.Sprintf("e.created_at >= toDateTime(%d/1000)", p.MetricPayload.StartTimestamp),
@ -83,55 +101,37 @@ func (f FunnelQueryBuilder) buildQuery(p Payload) (string, error) {
"s.duration > 0",
fmt.Sprintf("e.project_id = %d", p.ProjectId),
}
if maxDur > 0 {
base = append(base, fmt.Sprintf("s.duration BETWEEN %d AND %d", minDur, maxDur))
}
base = append(base, globalConds...)
if len(globalNames) > 0 {
base = append(base, "e.`$event_name` IN ("+buildInClause(globalNames)+")")
}
// Build steps and per-step conditions only for eventFilters
var stepNames []string
var stepExprs []string
for i, filter := range eventFilters {
stepNames = append(stepNames, fmt.Sprintf("'%s'", filter.Type))
exprs, _ := buildEventConditions([]Filter{filter}, BuildConditionsOptions{DefinedColumns: mainColumns})
for j, c := range exprs {
c = strings.ReplaceAll(c, "toString(main.`$properties`)", "properties")
c = strings.ReplaceAll(c, "main.`$properties`", "properties")
c = strings.ReplaceAll(c, "JSONExtractString(properties", "JSONExtractString(toString(properties)")
exprs[j] = c
}
var expr string
if len(exprs) > 0 {
expr = fmt.Sprintf("(event_name = funnel_steps[%d] AND %s)", i+1, strings.Join(exprs, " AND "))
} else {
expr = fmt.Sprintf("(event_name = funnel_steps[%d])", i+1)
}
stepExprs = append(stepExprs, expr)
}
stepsArr := "[" + strings.Join(stepNames, ",") + "]"
windowArgs := strings.Join(stepExprs, ",")
// Compose WHERE clause
where := strings.Join(base, " AND ")
// Final query
// 4. Step conditions
var stepNames []string
var stepExprs []string
for i, filter := range stepFilters {
stepNames = append(stepNames, fmt.Sprintf("'%s'", filter.Type))
stepConds, _ := buildEventConditions([]Filter{filter}, BuildConditionsOptions{
DefinedColumns: cteColumnAliases(), // logical -> logical (CTE alias)
PropertiesColumnName: "properties",
MainTableAlias: "",
})
stepCondExprs := []string{fmt.Sprintf("event_name = funnel_steps[%d]", i+1)}
if len(stepConds) > 0 {
stepCondExprs = append(stepCondExprs, stepConds...)
}
stepExprs = append(stepExprs, fmt.Sprintf("(%s)", strings.Join(stepCondExprs, " AND ")))
}
stepsArr := "[" + strings.Join(stepNames, ",") + "]"
windowArgs := strings.Join(stepExprs, ",\n ")
q := fmt.Sprintf(`
WITH
%s AS funnel_steps,
86400 AS funnel_window_seconds,
events_for_funnel AS (
SELECT
e.created_at,
e."$event_name" AS event_name,
e."$properties" AS properties,
e.session_id,
e.distinct_id,
s.user_id AS session_user_id,
if('%s' = 'sessionCount', toString(e.session_id), coalesce(nullif(s.user_id,''),e.distinct_id)) AS entity_id
%s
FROM product_analytics.events AS e
JOIN experimental.sessions AS s USING(session_id)
WHERE %s
@ -167,7 +167,7 @@ SELECT
FROM step_list AS s
LEFT JOIN counts_by_level AS c ON s.level_number = c.level_number
ORDER BY s.level_number;
`, stepsArr, metricFormat, where, windowArgs)
`, stepsArr, strings.Join(selectCols, ",\n "), where, windowArgs)
return q, nil
}

View file

@ -0,0 +1,99 @@
package charts
import (
"fmt"
"openreplay/backend/pkg/analytics/db"
"strings"
)
type HeatmapPoint struct {
NormalizedX float64 `json:"normalized_x"`
NormalizedY float64 `json:"normalized_y"`
}
type HeatmapResponse struct {
Points []HeatmapPoint `json:"points"`
}
type HeatmapQueryBuilder struct{}
func (h HeatmapQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) {
q, err := h.buildQuery(p)
if err != nil {
return nil, err
}
rows, err := conn.Query(q)
if err != nil {
return nil, err
}
defer rows.Close()
var pts []HeatmapPoint
for rows.Next() {
var x, y float64
if err := rows.Scan(&x, &y); err != nil {
return nil, err
}
pts = append(pts, HeatmapPoint{x, y})
}
return HeatmapResponse{
Points: pts,
}, nil
}
func (h HeatmapQueryBuilder) buildQuery(p Payload) (string, error) {
if len(p.MetricPayload.Series) == 0 {
return "", fmt.Errorf("series empty")
}
s := p.MetricPayload.Series[0]
var globalFilters, eventFilters []Filter
for _, flt := range s.Filter.Filters {
if flt.IsEvent {
eventFilters = append(eventFilters, flt)
} else {
globalFilters = append(globalFilters, flt)
}
}
globalConds, globalNames := buildEventConditions(globalFilters, BuildConditionsOptions{
DefinedColumns: mainColumns,
MainTableAlias: "e",
})
eventConds, eventNames := buildEventConditions(eventFilters, BuildConditionsOptions{
DefinedColumns: mainColumns,
MainTableAlias: "e",
})
base := []string{
fmt.Sprintf("e.created_at >= toDateTime(%d/1000)", p.MetricPayload.StartTimestamp),
fmt.Sprintf("e.created_at < toDateTime(%d/1000)", p.MetricPayload.EndTimestamp+86400000),
fmt.Sprintf("e.project_id = %d", p.ProjectId),
}
base = append(base, globalConds...)
if len(globalNames) > 0 {
base = append(base, "e.`$event_name` IN ("+buildInClause(globalNames)+")")
}
if len(eventNames) > 0 {
base = append(base, "e.`$event_name` IN ("+buildInClause(eventNames)+")")
}
base = append(base, eventConds...)
where := strings.Join(base, " AND ")
q := fmt.Sprintf(`
SELECT
JSONExtractFloat(toString(e."$properties"), 'normalized_x') AS normalized_x,
JSONExtractFloat(toString(e."$properties"), 'normalized_y') AS normalized_y
FROM product_analytics.events AS e
JOIN experimental.sessions AS s USING(session_id)
WHERE %s;`, where)
return q, nil
}

View file

@ -0,0 +1,82 @@
package charts
import (
"fmt"
"openreplay/backend/pkg/analytics/db"
"strings"
)
type HeatmapSessionResponse struct {
//Points []HeatmapPoint `json:"points"`
SessionID uint64 `json:"session_id"`
}
type HeatmapSessionQueryBuilder struct{}
func (h HeatmapSessionQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) {
shortestQ, err := h.buildQuery(p)
if err != nil {
return nil, err
}
var sid uint64
row, err := conn.QueryRow(shortestQ)
if err != nil {
return nil, err
}
if err := row.Scan(&sid); err != nil {
return nil, err
}
return HeatmapSessionResponse{
SessionID: sid,
}, nil
}
func (h HeatmapSessionQueryBuilder) buildQuery(p Payload) (string, error) {
if len(p.MetricPayload.Series) == 0 {
return "", fmt.Errorf("series empty")
}
s := p.MetricPayload.Series[0]
var globalFilters, eventFilters []Filter
for _, flt := range s.Filter.Filters {
if flt.IsEvent {
eventFilters = append(eventFilters, flt)
} else {
globalFilters = append(globalFilters, flt)
}
}
globalConds, globalNames := buildEventConditions(globalFilters, BuildConditionsOptions{
DefinedColumns: mainColumns,
MainTableAlias: "e",
})
eventConds, _ := buildEventConditions(eventFilters, BuildConditionsOptions{
DefinedColumns: mainColumns,
MainTableAlias: "e",
})
base := []string{
fmt.Sprintf("e.created_at >= toDateTime(%d/1000)", p.MetricPayload.StartTimestamp),
fmt.Sprintf("e.created_at < toDateTime(%d/1000)", p.MetricPayload.EndTimestamp+86400000),
fmt.Sprintf("e.project_id = %d", p.ProjectId),
"e.\"$event_name\" = 'CLICK'",
}
base = append(base, globalConds...)
if len(globalNames) > 0 {
base = append(base, "e.`$event_name` IN ("+buildInClause(globalNames)+")")
}
base = append(base, eventConds...)
where := strings.Join(base, " AND ")
return fmt.Sprintf(`
SELECT
s.session_id
FROM product_analytics.events AS e
JOIN experimental.sessions AS s USING(session_id)
WHERE %s
ORDER BY s.duration ASC
LIMIT 1;`, where), nil
}

View file

@ -48,6 +48,7 @@ const (
MetricTypeTimeseries MetricType = "timeseries"
MetricTypeTable MetricType = "table"
MetricTypeFunnel MetricType = "funnel"
MetricTypeHeatmap MetricType = "heatmaps"
)
const (

View file

@ -25,22 +25,13 @@ func NewQueryBuilder(p Payload) (QueryBuilder, error) {
return FunnelQueryBuilder{}, nil
case MetricTypeTable:
return TableQueryBuilder{}, nil
case MetricTypeHeatmap:
return HeatmapQueryBuilder{}, nil
default:
return nil, fmt.Errorf("unknown metric type: %s", p.MetricType)
}
}
func partitionFilters(filters []Filter) (sessionFilters []Filter, eventFilters []Filter) {
for _, f := range filters {
if f.IsEvent {
eventFilters = append(eventFilters, f)
} else {
sessionFilters = append(sessionFilters, f)
}
}
return
}
var validFilterTypes = map[FilterType]struct{}{
"LOCATION": {},
"CLICK": {},
@ -90,17 +81,16 @@ var propertyKeyMap = map[string]filterConfig{
}
func getColumnAccessor(logicalProp string, isNumeric bool, opts BuildConditionsOptions) string {
// Use CTE alias if present in DefinedColumns
if actualCol, ok := opts.DefinedColumns[logicalProp]; ok && actualCol != "" {
return fmt.Sprintf("%s.`%s`", opts.MainTableAlias, actualCol)
return actualCol
}
// Otherwise, extract from $properties JSON
jsonFunc := "JSONExtractString"
if isNumeric {
jsonFunc = "JSONExtractFloat" // Or JSONExtractInt, etc.
jsonFunc = "JSONExtractFloat"
}
return fmt.Sprintf("%s(toString(%s.`%s`), '%s')",
jsonFunc, opts.MainTableAlias, opts.PropertiesColumnName, logicalProp)
return fmt.Sprintf("%s(toString(%s), '%s')", jsonFunc, opts.PropertiesColumnName, logicalProp)
}
func buildEventConditions(filters []Filter, options ...BuildConditionsOptions) (conds, names []string) {
@ -109,7 +99,6 @@ func buildEventConditions(filters []Filter, options ...BuildConditionsOptions) (
PropertiesColumnName: "$properties",
DefinedColumns: make(map[string]string),
}
if len(options) > 0 {
if options[0].MainTableAlias != "" {
opts.MainTableAlias = options[0].MainTableAlias
@ -121,24 +110,21 @@ func buildEventConditions(filters []Filter, options ...BuildConditionsOptions) (
opts.DefinedColumns = options[0].DefinedColumns
}
}
for _, f := range filters {
_, isValidType := validFilterTypes[f.Type]
if !isValidType {
_, okType := validFilterTypes[f.Type]
if !okType {
continue
}
// process main filter
if f.Type == FilterFetch {
var fetchConds []string
for _, nf := range f.Filters {
nestedConfig, ok := propertyKeyMap[string(nf.Type)]
cfg, ok := propertyKeyMap[string(nf.Type)]
if !ok {
continue
}
accessor := getColumnAccessor(nestedConfig.LogicalProperty, nestedConfig.IsNumeric, opts)
c := buildCond(accessor, nf.Value, f.Operator) // Uses parent filter's operator
if c != "" {
acc := getColumnAccessor(cfg.LogicalProperty, cfg.IsNumeric, opts)
if c := buildCond(acc, nf.Value, f.Operator); c != "" {
fetchConds = append(fetchConds, c)
}
}
@ -147,80 +133,35 @@ func buildEventConditions(filters []Filter, options ...BuildConditionsOptions) (
names = append(names, "REQUEST")
}
} else {
config, ok := propertyKeyMap[string(f.Type)]
cfg, ok := propertyKeyMap[string(f.Type)]
if !ok {
config = filterConfig{
LogicalProperty: string(f.Type),
cfg = filterConfig{LogicalProperty: string(f.Type)}
}
acc := getColumnAccessor(cfg.LogicalProperty, cfg.IsNumeric, opts)
// when the Operator isAny or onAny just add the event name to the list
if f.Operator == "isAny" || f.Operator == "onAny" {
if f.IsEvent {
names = append(names, string(f.Type))
}
continue
}
accessor := getColumnAccessor(config.LogicalProperty, config.IsNumeric, opts)
c := buildCond(accessor, f.Value, f.Operator)
if c != "" {
if c := buildCond(acc, f.Value, f.Operator); c != "" {
conds = append(conds, c)
if f.IsEvent {
names = append(names, string(f.Type))
}
}
}
}
return
}
func buildEventConditionsX(filters []Filter) (conds, names []string) {
for _, f := range filters {
if f.IsEvent {
switch f.Type {
case FilterClick:
c := buildCond("JSONExtractString(toString(main.`$properties`), 'label')", f.Value, "is")
if c != "" {
conds = append(conds, c)
}
names = append(names, "CLICK")
case FilterInput:
c := buildCond("JSONExtractString(toString(main.`$properties`), 'label')", f.Value, f.Operator)
if c != "" {
conds = append(conds, c)
}
names = append(names, "INPUT")
case FilterLocation:
c := buildCond("JSONExtractString(toString(main.`$properties`), 'url_path')", f.Value, f.Operator)
if c != "" {
conds = append(conds, c)
}
names = append(names, "LOCATION")
case FilterCustom:
c := buildCond("JSONExtractString(toString(main.`$properties`), 'name')", f.Value, f.Operator)
if c != "" {
conds = append(conds, c)
}
names = append(names, "CUSTOM")
case FilterFetch:
var fetchConds []string
for _, nf := range f.Filters {
switch nf.Type {
case "fetchUrl":
c := buildCond("JSONExtractString(toString(main.`$properties`), 'url_path')", nf.Value, f.Operator)
if c != "" {
fetchConds = append(fetchConds, c)
}
case "fetchStatusCode":
c := buildCond("JSONExtractFloat(toString(main.`$properties`), 'status')", nf.Value, f.Operator)
if c != "" {
fetchConds = append(fetchConds, c)
}
}
}
if len(fetchConds) > 0 {
conds = append(conds, strings.Join(fetchConds, " AND "))
}
names = append(names, "REQUEST")
case FilterTag:
c := buildCond("JSONExtractString(toString(main.`$properties`), 'tag')", f.Value, f.Operator)
if c != "" {
conds = append(conds, c)
}
names = append(names, "TAG")
// process sub-filters
if len(f.Filters) > 0 && f.Type != FilterFetch {
subOpts := opts // Inherit parent's options
subConds, subNames := buildEventConditions(f.Filters, subOpts)
if len(subConds) > 0 {
conds = append(conds, strings.Join(subConds, " AND "))
names = append(names, subNames...)
}
}
}
@ -285,6 +226,16 @@ func buildCond(expr string, values []string, operator string) string {
for _, v := range values {
conds = append(conds, fmt.Sprintf("%s ILIKE '%%%s%%'", expr, v))
}
if len(conds) > 1 {
return "(" + strings.Join(conds, " OR ") + ")"
}
return conds[0]
case "regex":
var conds []string
for _, v := range values {
conds = append(conds, fmt.Sprintf("match(%s, '%s')", expr, v))
}
if len(conds) > 1 {
return "(" + strings.Join(conds, " OR ") + ")"
}
@ -316,12 +267,12 @@ func buildCond(expr string, values []string, operator string) string {
return "(" + strings.Join(conds, " OR ") + ")"
}
return conds[0]
case "notEquals":
case "notEquals", "not", "off":
if len(values) > 1 {
return fmt.Sprintf("%s NOT IN (%s)", expr, buildInClause(values))
}
return fmt.Sprintf("%s <> '%s'", expr, values[0])
case "greaterThan":
case "greaterThan", "gt":
var conds []string
for _, v := range values {
conds = append(conds, fmt.Sprintf("%s > '%s'", expr, v))
@ -330,7 +281,7 @@ func buildCond(expr string, values []string, operator string) string {
return "(" + strings.Join(conds, " OR ") + ")"
}
return conds[0]
case "greaterThanOrEqual":
case "greaterThanOrEqual", "gte":
var conds []string
for _, v := range values {
conds = append(conds, fmt.Sprintf("%s >= '%s'", expr, v))
@ -339,7 +290,7 @@ func buildCond(expr string, values []string, operator string) string {
return "(" + strings.Join(conds, " OR ") + ")"
}
return conds[0]
case "lessThan":
case "lessThan", "lt":
var conds []string
for _, v := range values {
conds = append(conds, fmt.Sprintf("%s < '%s'", expr, v))
@ -348,7 +299,7 @@ func buildCond(expr string, values []string, operator string) string {
return "(" + strings.Join(conds, " OR ") + ")"
}
return conds[0]
case "lessThanOrEqual":
case "lessThanOrEqual", "lte":
var conds []string
for _, v := range values {
conds = append(conds, fmt.Sprintf("%s <= '%s'", expr, v))
@ -367,7 +318,7 @@ func buildCond(expr string, values []string, operator string) string {
return fmt.Sprintf("%s NOT IN (%s)", expr, buildInClause(values))
}
return fmt.Sprintf("%s <> '%s'", expr, values[0])
case "equals", "is":
case "equals", "is", "on":
if len(values) > 1 {
return fmt.Sprintf("%s IN (%s)", expr, buildInClause(values))
}
@ -471,3 +422,42 @@ func FillMissingDataPoints(
}
return results
}
func partitionFilters(filters []Filter) (sessionFilters []Filter, eventFilters []Filter) {
for _, f := range filters {
if f.IsEvent {
eventFilters = append(eventFilters, f)
} else {
sessionFilters = append(sessionFilters, f)
}
}
return
}
// Returns a map: logical property -> CTE alias (e.g., "userBrowser" -> "userBrowser")
func cteColumnAliases() map[string]string {
aliases := make(map[string]string)
for logical := range mainColumns {
aliases[logical] = logical
}
return aliases
}
// Returns a map: logical property -> source column (e.g., "userBrowser" -> "$browser")
func cteSourceColumns() map[string]string {
cols := make(map[string]string)
for logical, col := range mainColumns {
cols[logical] = col
}
return cols
}
// Helper for reverse lookup (used for dynamic SELECT)
func reverseLookup(m map[string]string, value string) string {
for k, v := range m {
if v == value {
return k
}
}
return ""
}

View file

@ -22,6 +22,7 @@ type TableResponse struct {
type Connector interface {
Stop() error
Query(query string) (driver.Rows, error)
QueryRow(query string) (driver.Row, error)
QueryArgs(query string, args map[string]interface{}) (driver.Rows, error)
}
@ -64,6 +65,16 @@ func (c *connectorImpl) Query(query string) (driver.Rows, error) {
return rows, nil
}
func (c *connectorImpl) QueryRow(query string) (driver.Row, error) {
row := c.conn.QueryRow(context.Background(), query)
if err := row.Err(); err != nil {
return nil, err
}
//defer row.Close()
return row, nil
}
func (c *connectorImpl) QueryArgs(query string, args map[string]interface{}) (driver.Rows, error) {
rows, err := c.conn.Query(context.Background(), query, args)
if err != nil {