feat(product_analytics): funnel query and response fixes
This commit is contained in:
parent
adb88fd9fc
commit
65ee3bcbb6
4 changed files with 97 additions and 33 deletions
|
|
@ -46,5 +46,6 @@ func (s *chartsImpl) GetData(projectId int, userID uint64, req *MetricPayload) (
|
||||||
return nil, fmt.Errorf("error executing query: %v", err)
|
return nil, fmt.Errorf("error executing query: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp, nil
|
//return resp, nil
|
||||||
|
return map[string]interface{}{"data": resp}, nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,9 +7,12 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type FunnelStepResult struct {
|
type FunnelStepResult struct {
|
||||||
LevelNumber uint64 `json:"step"`
|
LevelNumber uint64 `json:"step"`
|
||||||
StepName string `json:"type"`
|
StepName string `json:"type"`
|
||||||
CountAtLevel uint64 `json:"count"`
|
CountAtLevel uint64 `json:"count"`
|
||||||
|
Operator string `json:"operator"`
|
||||||
|
Value []string `json:"value"`
|
||||||
|
DropPct float64 `json:"dropPct"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type FunnelResponse struct {
|
type FunnelResponse struct {
|
||||||
|
|
@ -29,14 +32,44 @@ func (f FunnelQueryBuilder) Execute(p Payload, conn db.Connector) (interface{},
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
|
|
||||||
|
// extract step filters
|
||||||
|
s := p.MetricPayload.Series[0]
|
||||||
|
var stepFilters []Filter
|
||||||
|
for _, flt := range s.Filter.Filters {
|
||||||
|
if flt.IsEvent {
|
||||||
|
stepFilters = append(stepFilters, flt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var steps []FunnelStepResult
|
var steps []FunnelStepResult
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var r FunnelStepResult
|
var r FunnelStepResult
|
||||||
if err := rows.Scan(&r.LevelNumber, &r.StepName, &r.CountAtLevel); err != nil {
|
if err := rows.Scan(&r.LevelNumber, &r.StepName, &r.CountAtLevel); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
idx := int(r.LevelNumber) - 1
|
||||||
|
if idx >= 0 && idx < len(stepFilters) {
|
||||||
|
r.Operator = stepFilters[idx].Operator
|
||||||
|
r.Value = stepFilters[idx].Value
|
||||||
|
}
|
||||||
steps = append(steps, r)
|
steps = append(steps, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// compute drop percentages
|
||||||
|
if len(steps) > 0 {
|
||||||
|
prev := steps[0].CountAtLevel
|
||||||
|
steps[0].DropPct = 0
|
||||||
|
for i := 1; i < len(steps); i++ {
|
||||||
|
curr := steps[i].CountAtLevel
|
||||||
|
if prev > 0 {
|
||||||
|
steps[i].DropPct = (float64(prev-curr) / float64(prev)) * 100
|
||||||
|
} else {
|
||||||
|
steps[i].DropPct = 0
|
||||||
|
}
|
||||||
|
prev = curr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return FunnelResponse{Steps: steps}, nil
|
return FunnelResponse{Steps: steps}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -44,21 +77,24 @@ func (f FunnelQueryBuilder) buildQuery(p Payload) (string, error) {
|
||||||
if len(p.MetricPayload.Series) == 0 {
|
if len(p.MetricPayload.Series) == 0 {
|
||||||
return "", fmt.Errorf("series empty")
|
return "", fmt.Errorf("series empty")
|
||||||
}
|
}
|
||||||
|
|
||||||
s := p.MetricPayload.Series[0]
|
s := p.MetricPayload.Series[0]
|
||||||
metricFormat := p.MetricPayload.MetricFormat
|
metricFormat := p.MetricPayload.MetricFormat
|
||||||
|
|
||||||
// Separate global vs step filters
|
var (
|
||||||
var globalFilters, stepFilters []Filter
|
globalFilters []Filter
|
||||||
|
stepFilters []Filter
|
||||||
|
sessionDurationFilter *Filter
|
||||||
|
)
|
||||||
for _, flt := range s.Filter.Filters {
|
for _, flt := range s.Filter.Filters {
|
||||||
if flt.IsEvent {
|
if flt.IsEvent {
|
||||||
stepFilters = append(stepFilters, flt)
|
stepFilters = append(stepFilters, flt)
|
||||||
|
} else if flt.Type == "duration" {
|
||||||
|
sessionDurationFilter = &flt
|
||||||
} else {
|
} else {
|
||||||
globalFilters = append(globalFilters, flt)
|
globalFilters = append(globalFilters, flt)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 1. Collect required mainColumns from all filters (including nested)
|
|
||||||
requiredColumns := make(map[string]struct{})
|
requiredColumns := make(map[string]struct{})
|
||||||
var collectColumns func([]Filter)
|
var collectColumns func([]Filter)
|
||||||
collectColumns = func(filters []Filter) {
|
collectColumns = func(filters []Filter) {
|
||||||
|
|
@ -72,7 +108,6 @@ func (f FunnelQueryBuilder) buildQuery(p Payload) (string, error) {
|
||||||
collectColumns(globalFilters)
|
collectColumns(globalFilters)
|
||||||
collectColumns(stepFilters)
|
collectColumns(stepFilters)
|
||||||
|
|
||||||
// 2. Build SELECT clause for CTE
|
|
||||||
selectCols := []string{
|
selectCols := []string{
|
||||||
`e.created_at`,
|
`e.created_at`,
|
||||||
`e."$event_name" AS event_name`,
|
`e."$event_name" AS event_name`,
|
||||||
|
|
@ -89,40 +124,52 @@ func (f FunnelQueryBuilder) buildQuery(p Payload) (string, error) {
|
||||||
fmt.Sprintf("if('%s' = 'sessionCount', toString(e.session_id), coalesce(nullif(s.user_id,''),e.distinct_id)) AS entity_id", metricFormat),
|
fmt.Sprintf("if('%s' = 'sessionCount', toString(e.session_id), coalesce(nullif(s.user_id,''),e.distinct_id)) AS entity_id", metricFormat),
|
||||||
)
|
)
|
||||||
|
|
||||||
// 3. Global conditions
|
|
||||||
globalConds, _ := buildEventConditions(globalFilters, BuildConditionsOptions{
|
globalConds, _ := buildEventConditions(globalFilters, BuildConditionsOptions{
|
||||||
DefinedColumns: mainColumns,
|
DefinedColumns: mainColumns,
|
||||||
MainTableAlias: "e",
|
MainTableAlias: "e",
|
||||||
PropertiesColumnName: "$properties",
|
PropertiesColumnName: "$properties",
|
||||||
})
|
})
|
||||||
|
|
||||||
base := []string{
|
base := []string{
|
||||||
fmt.Sprintf("e.created_at >= toDateTime(%d/1000)", p.MetricPayload.StartTimestamp),
|
fmt.Sprintf("e.created_at >= toDateTime(%d/1000)", p.MetricPayload.StartTimestamp),
|
||||||
fmt.Sprintf("e.created_at < toDateTime(%d/1000)", p.MetricPayload.EndTimestamp+86400000),
|
fmt.Sprintf("e.created_at < toDateTime(%d/1000)", p.MetricPayload.EndTimestamp+86400000),
|
||||||
"s.duration > 0",
|
|
||||||
fmt.Sprintf("e.project_id = %d", p.ProjectId),
|
fmt.Sprintf("e.project_id = %d", p.ProjectId),
|
||||||
}
|
}
|
||||||
base = append(base, globalConds...)
|
base = append(base, globalConds...)
|
||||||
|
if sessionDurationFilter != nil {
|
||||||
|
vals := sessionDurationFilter.Value
|
||||||
|
if len(vals) > 0 && vals[0] != "" {
|
||||||
|
base = append(base, fmt.Sprintf("s.duration >= %s", vals[0]))
|
||||||
|
}
|
||||||
|
if len(vals) > 1 && vals[1] != "" {
|
||||||
|
base = append(base, fmt.Sprintf("s.duration <= %s", vals[1]))
|
||||||
|
}
|
||||||
|
}
|
||||||
where := strings.Join(base, " AND ")
|
where := strings.Join(base, " AND ")
|
||||||
|
|
||||||
// 4. Step conditions
|
var (
|
||||||
var stepNames []string
|
stepNames []string
|
||||||
var stepExprs []string
|
stepExprs []string
|
||||||
for i, filter := range stepFilters {
|
clickCount int
|
||||||
stepNames = append(stepNames, fmt.Sprintf("'%s'", filter.Type))
|
)
|
||||||
stepConds, _ := buildEventConditions([]Filter{filter}, BuildConditionsOptions{
|
for i, flt := range stepFilters {
|
||||||
DefinedColumns: cteColumnAliases(), // logical -> logical (CTE alias)
|
stepNames = append(stepNames, fmt.Sprintf("'%s'", flt.Type))
|
||||||
|
conds, _ := buildEventConditions([]Filter{flt}, BuildConditionsOptions{
|
||||||
|
DefinedColumns: cteColumnAliases(),
|
||||||
PropertiesColumnName: "properties",
|
PropertiesColumnName: "properties",
|
||||||
MainTableAlias: "",
|
MainTableAlias: "",
|
||||||
})
|
})
|
||||||
|
var exprParts []string
|
||||||
stepCondExprs := []string{fmt.Sprintf("event_name = funnel_steps[%d]", i+1)}
|
exprParts = append(exprParts, fmt.Sprintf("event_name = funnel_steps[%d]", i+1))
|
||||||
if len(stepConds) > 0 {
|
if flt.Type == "CLICK" {
|
||||||
stepCondExprs = append(stepCondExprs, stepConds...)
|
clickCount++
|
||||||
|
exprParts = append(exprParts, fmt.Sprintf("click_idx = %d", clickCount))
|
||||||
}
|
}
|
||||||
stepExprs = append(stepExprs, fmt.Sprintf("(%s)", strings.Join(stepCondExprs, " AND ")))
|
exprParts = append(exprParts, conds...)
|
||||||
|
stepExprs = append(stepExprs, fmt.Sprintf("(%s)", strings.Join(exprParts, " AND ")))
|
||||||
}
|
}
|
||||||
|
|
||||||
stepsArr := "[" + strings.Join(stepNames, ",") + "]"
|
stepsArr := fmt.Sprintf("[%s]", strings.Join(stepNames, ","))
|
||||||
windowArgs := strings.Join(stepExprs, ",\n ")
|
windowArgs := strings.Join(stepExprs, ",\n ")
|
||||||
|
|
||||||
q := fmt.Sprintf(`
|
q := fmt.Sprintf(`
|
||||||
|
|
@ -135,16 +182,28 @@ WITH
|
||||||
FROM product_analytics.events AS e
|
FROM product_analytics.events AS e
|
||||||
JOIN experimental.sessions AS s USING(session_id)
|
JOIN experimental.sessions AS s USING(session_id)
|
||||||
WHERE %s
|
WHERE %s
|
||||||
|
ORDER BY e.session_id, e.created_at
|
||||||
|
),
|
||||||
|
numbered_clicks AS (
|
||||||
|
SELECT
|
||||||
|
entity_id,
|
||||||
|
created_at,
|
||||||
|
row_number() OVER (PARTITION BY entity_id ORDER BY created_at) AS click_idx
|
||||||
|
FROM events_for_funnel
|
||||||
|
WHERE event_name = 'CLICK'
|
||||||
),
|
),
|
||||||
funnel_levels_reached AS (
|
funnel_levels_reached AS (
|
||||||
SELECT
|
SELECT
|
||||||
entity_id,
|
ef.entity_id,
|
||||||
windowFunnel(funnel_window_seconds)(
|
windowFunnel(funnel_window_seconds)(
|
||||||
toDateTime(created_at),
|
toDateTime(ef.created_at),
|
||||||
%s
|
%s
|
||||||
) AS max_level
|
) AS max_level
|
||||||
FROM events_for_funnel
|
FROM events_for_funnel ef
|
||||||
GROUP BY entity_id
|
LEFT JOIN numbered_clicks nc
|
||||||
|
ON ef.entity_id = nc.entity_id
|
||||||
|
AND ef.created_at = nc.created_at
|
||||||
|
GROUP BY ef.entity_id
|
||||||
),
|
),
|
||||||
counts_by_level AS (
|
counts_by_level AS (
|
||||||
SELECT
|
SELECT
|
||||||
|
|
@ -166,8 +225,12 @@ SELECT
|
||||||
ifNull(c.cnt, 0) AS count_at_level
|
ifNull(c.cnt, 0) AS count_at_level
|
||||||
FROM step_list AS s
|
FROM step_list AS s
|
||||||
LEFT JOIN counts_by_level AS c ON s.level_number = c.level_number
|
LEFT JOIN counts_by_level AS c ON s.level_number = c.level_number
|
||||||
ORDER BY s.level_number;
|
ORDER BY s.level_number;`,
|
||||||
`, stepsArr, strings.Join(selectCols, ",\n "), where, windowArgs)
|
stepsArr,
|
||||||
|
strings.Join(selectCols, ",\n "),
|
||||||
|
where,
|
||||||
|
windowArgs,
|
||||||
|
)
|
||||||
|
|
||||||
return q, nil
|
return q, nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -532,10 +532,10 @@ func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interfac
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return JourneyResponse{Data: JourneyData{
|
return JourneyData{
|
||||||
Nodes: finalNodes,
|
Nodes: finalNodes,
|
||||||
Links: finalLinks,
|
Links: finalLinks,
|
||||||
}}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h UserJourneyQueryBuilder) buildQuery(p Payload) (string, error) {
|
func (h UserJourneyQueryBuilder) buildQuery(p Payload) (string, error) {
|
||||||
|
|
|
||||||
|
|
@ -48,7 +48,7 @@ const (
|
||||||
MetricTypeTimeseries MetricType = "timeseries"
|
MetricTypeTimeseries MetricType = "timeseries"
|
||||||
MetricTypeTable MetricType = "table"
|
MetricTypeTable MetricType = "table"
|
||||||
MetricTypeFunnel MetricType = "funnel"
|
MetricTypeFunnel MetricType = "funnel"
|
||||||
MetricTypeHeatmap MetricType = "heatmaps"
|
MetricTypeHeatmap MetricType = "heatMap"
|
||||||
MetricTypeSession MetricType = "heatmaps_session"
|
MetricTypeSession MetricType = "heatmaps_session"
|
||||||
MetricUserJourney MetricType = "pathAnalysis"
|
MetricUserJourney MetricType = "pathAnalysis"
|
||||||
)
|
)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue