feat(product_analytics): user journey - handle duration filter

This commit is contained in:
Shekar Siri 2025-05-12 15:06:25 +02:00
parent bf62be2a4a
commit adb88fd9fc

View file

@ -5,6 +5,7 @@ import (
"math"
"openreplay/backend/pkg/analytics/db"
"sort"
"strconv"
"strings"
"time"
)
@ -563,10 +564,30 @@ func (h UserJourneyQueryBuilder) buildQuery(p Payload) (string, error) {
startConds = fixProps(startConds)
excludeConds = fixProps(excludeConds)
// extract global filters from first series
// extract global filters and duration from first series
s := p.MetricPayload.Series[0]
var durationMin, durationMax int64
var okMin, okMax bool
var err error
var globalFilters []Filter
for _, flt := range s.Filter.Filters {
if flt.Type == "duration" {
if len(flt.Value) > 0 && flt.Value[0] != "" {
durationMin, err = strconv.ParseInt(flt.Value[0], 10, 64)
if err != nil {
return "", err
}
okMin = true
}
if len(flt.Value) > 1 && flt.Value[1] != "" {
durationMax, err = strconv.ParseInt(flt.Value[1], 10, 64)
if err != nil {
return "", err
}
okMax = true
}
continue
}
if flt.IsEvent {
continue
}
@ -575,6 +596,16 @@ func (h UserJourneyQueryBuilder) buildQuery(p Payload) (string, error) {
globalConds, _ := buildEventConditions(globalFilters, BuildConditionsOptions{DefinedColumns: mainColumns, MainTableAlias: "e"})
globalConds = fixProps(globalConds)
// assemble duration condition
var durCond string
if okMin && okMax {
durCond = fmt.Sprintf("ss.duration BETWEEN %d AND %d", durationMin, durationMax)
} else if okMin {
durCond = fmt.Sprintf("ss.duration >= %d", durationMin)
} else if okMax {
durCond = fmt.Sprintf("ss.duration <= %d", durationMax)
}
// determine starting event
var startEvent string
if len(p.StartPoint) > 0 {
@ -583,7 +614,7 @@ func (h UserJourneyQueryBuilder) buildQuery(p Payload) (string, error) {
startEvent = events[0]
}
// assemble first_hits WHERE clause
// assemble first_hits WHERE clause with optional duration
firstBase := []string{fmt.Sprintf("e.\"$event_name\" = '%s'", startEvent)}
if len(startConds) > 0 {
firstBase = append(firstBase, startConds...)
@ -599,6 +630,9 @@ func (h UserJourneyQueryBuilder) buildQuery(p Payload) (string, error) {
time.Unix(p.EndTimestamp/1000, 0).UTC().Format("2006-01-02 15:04:05"),
),
)
if durCond != "" {
firstBase = append(firstBase, durCond)
}
// assemble journey WHERE clause
journeyBase := []string{laterCond}
@ -629,10 +663,11 @@ func (h UserJourneyQueryBuilder) buildQuery(p Payload) (string, error) {
// build final query
q := fmt.Sprintf(`WITH
first_hits AS (
SELECT session_id, MIN(created_at) AS start_time
SELECT e.session_id, MIN(e.created_at) AS start_time
FROM product_analytics.events AS e
JOIN experimental.sessions AS ss USING(session_id)
WHERE %s
GROUP BY session_id
GROUP BY e.session_id
),
journey_events_after AS (
SELECT
@ -671,7 +706,7 @@ func (h UserJourneyQueryBuilder) buildQuery(p Payload) (string, error) {
e.created_at < f.start_time
AND e.created_at >= toDateTime('%s')
AND %s
AND %d > 0 -- Only fetch previous events if PreviousColumns > 0
AND %d > 0
),
journey_events_combined AS (
SELECT *, 1 AS direction FROM journey_events_after