feat(product_analytics): teimseries filter fixes

This commit is contained in:
Shekar Siri 2025-05-23 17:00:44 +02:00
parent 9df909d112
commit c750de6946
3 changed files with 98 additions and 186 deletions

View file

@ -52,6 +52,7 @@ var mainColumns = map[string]string{
"userBrowser": "$browser",
"userDevice": "sessions.user_device",
"referrer": "$referrer",
"fetchDuration": "$duration_s",
"ISSUE": "issue_type",
}

View file

@ -11,236 +11,137 @@ import (
type TimeSeriesQueryBuilder struct{}
func (t TimeSeriesQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) {
consolidated := map[uint64]map[string]uint64{}
for _, s := range p.Series {
query, err := t.buildQuery(p, s)
data := make(map[uint64]map[string]uint64)
for _, series := range p.Series {
query, err := t.buildQuery(p, series)
if err != nil {
log.Printf("Error building query for series %s: %v", s.Name, err)
return nil, fmt.Errorf("error building query for series %s: %v", s.Name, err)
log.Printf("buildQuery %s: %v", series.Name, err)
return nil, fmt.Errorf("series %s: %v", series.Name, err)
}
rows, err := conn.Query(query)
if err != nil {
log.Printf("Error executing query for series %s: %v", s.Name, err)
return nil, fmt.Errorf("error executing query for series %s: %v", s.Name, err)
log.Printf("exec %s: %v", series.Name, err)
return nil, fmt.Errorf("series %s: %v", series.Name, err)
}
var results []DataPoint
var pts []DataPoint
for rows.Next() {
var res DataPoint
if err := rows.Scan(&res.Timestamp, &res.Count); err != nil {
var dp DataPoint
if err := rows.Scan(&dp.Timestamp, &dp.Count); err != nil {
rows.Close()
return nil, err
}
results = append(results, res)
pts = append(pts, dp)
}
rows.Close()
filled := FillMissingDataPoints(p.StartTimestamp, p.EndTimestamp, p.Density, DataPoint{}, results, 1000)
filled := FillMissingDataPoints(p.StartTimestamp, p.EndTimestamp, p.Density, DataPoint{}, pts, 1000)
for _, dp := range filled {
if _, ok := consolidated[dp.Timestamp]; !ok {
consolidated[dp.Timestamp] = map[string]uint64{}
if data[dp.Timestamp] == nil {
data[dp.Timestamp] = map[string]uint64{}
}
consolidated[dp.Timestamp][s.Name] = dp.Count
data[dp.Timestamp][series.Name] = dp.Count
}
}
var timestamps []uint64
for ts := range consolidated {
for ts := range data {
timestamps = append(timestamps, ts)
}
sort.Slice(timestamps, func(i, j int) bool { return timestamps[i] < timestamps[j] })
var finalResults []map[string]interface{}
var result []map[string]interface{}
for _, ts := range timestamps {
row := map[string]interface{}{"timestamp": ts}
for _, s := range p.Series {
if count, ok := consolidated[ts][s.Name]; ok {
row[s.Name] = count
} else {
row[s.Name] = uint64(0)
for _, series := range p.Series {
row[series.Name] = data[ts][series.Name]
}
result = append(result, row)
}
finalResults = append(finalResults, row)
}
return finalResults, nil
return result, nil
}
func (t TimeSeriesQueryBuilder) buildQuery(p Payload, s Series) (string, error) {
var query string
switch p.MetricOf {
case "sessionCount":
query = t.buildSessionCountQuery(p, s)
return t.buildTimeSeriesQuery(p, s, "sessionCount", "session_id"), nil
case "userCount":
query = t.buildUserCountQuery(p, s)
return t.buildTimeSeriesQuery(p, s, "userCount", "user_id"), nil
default:
query = ""
return "", fmt.Errorf("unsupported metric %q", p.MetricOf)
}
return query, nil
}
func (TimeSeriesQueryBuilder) buildSessionCountQuery(p Payload, s Series) string {
//eventConds, eventNames := buildEventConditions(s.Filter.Filters)
eventConds, eventNames := buildEventConditions(s.Filter.Filters, BuildConditionsOptions{
func (t TimeSeriesQueryBuilder) buildTimeSeriesQuery(p Payload, s Series, metric, idField string) string {
sub := t.buildSubQuery(p, s, metric)
step := int(getStepSize(p.StartTimestamp, p.EndTimestamp, p.Density, false, 1000)) * 1000
return fmt.Sprintf(
"SELECT gs.generate_series AS timestamp, COALESCE(COUNT(DISTINCT ps.%s),0) AS count "+
"FROM generate_series(%d,%d,%d) AS gs "+
"LEFT JOIN (%s) AS ps ON TRUE "+
"WHERE ps.datetime >= toDateTime(timestamp/1000) AND ps.datetime < toDateTime((timestamp+%d)/1000) "+
"GROUP BY timestamp ORDER BY timestamp;",
idField, p.StartTimestamp, p.EndTimestamp, step, sub, step,
)
}
func (t TimeSeriesQueryBuilder) buildSubQuery(p Payload, s Series, metric string) string {
evConds, evNames := buildEventConditions(s.Filter.Filters, BuildConditionsOptions{
DefinedColumns: mainColumns,
MainTableAlias: "main",
PropertiesColumnName: "$properties",
})
sessionConds := buildSessionConditions(s.Filter.Filters)
sessConds := buildSessionConditions(s.Filter.Filters)
staticEvt := buildStaticEventWhere(p)
sessWhere, sessJoin := buildStaticSessionWhere(p, sessionConds)
eventsSubQuery := buildEventsSubQuery("sessionCount", eventConds, eventNames, staticEvt, sessWhere, sessJoin)
mainQuery := buildMainQuery(p, eventsSubQuery, "sessionCount")
return mainQuery
}
sessWhere, sessJoin := buildStaticSessionWhere(p, sessConds)
func (TimeSeriesQueryBuilder) buildUserCountQuery(p Payload, s Series) string {
eventConds, eventNames := buildEventConditions(s.Filter.Filters, BuildConditionsOptions{
DefinedColumns: mainColumns,
MainTableAlias: "main",
PropertiesColumnName: "$properties",
})
sessionConds := buildSessionConditions(s.Filter.Filters)
staticEvt := buildStaticEventWhere(p)
sessWhere, sessJoin := buildStaticSessionWhere(p, sessionConds)
eventsSubQuery := buildEventsSubQuery("userCount", eventConds, eventNames, staticEvt, sessWhere, sessJoin)
mainQuery := buildMainQuery(p, eventsSubQuery, "userCount")
return mainQuery
}
func buildEventsSubQuery(metric string, eventConds, eventNames []string, staticEvt, sessWhere, sessJoin string) string {
if len(eventConds) == 0 && len(eventNames) == 0 {
if len(evConds) == 0 && len(evNames) == 0 {
if metric == "sessionCount" {
return fmt.Sprintf(sessionNoFiltersSubQueryTpl, sessJoin)
return fmt.Sprintf(
"SELECT s.session_id AS session_id, s.datetime AS datetime "+
"FROM experimental.sessions AS s WHERE %s",
sessJoin,
)
}
return fmt.Sprintf(noFiltersSubQueryTpl, sessJoin)
return fmt.Sprintf(
"SELECT multiIf(s.user_id!='',s.user_id,s.user_anonymous_id!='',s.user_anonymous_id,toString(s.user_uuid)) AS user_id, s.datetime AS datetime "+
"FROM experimental.sessions AS s WHERE %s",
sessJoin,
)
}
var evtNameClause string
var unique []string
for _, name := range eventNames {
if !contains(unique, name) {
unique = append(unique, name)
uniq := make([]string, 0, len(evNames))
for _, name := range evNames {
if !contains(uniq, name) {
uniq = append(uniq, name)
}
}
if len(unique) > 0 {
evtNameClause = fmt.Sprintf("AND main.`$event_name` IN (%s)", buildInClause(unique))
nameClause := ""
if len(uniq) > 0 {
nameClause = fmt.Sprintf("AND main.`$event_name` IN (%s) ", buildInClause(uniq))
}
having := ""
if len(eventConds) > 0 {
having = buildHavingClause(eventConds)
}
evtWhere := staticEvt
if len(eventConds) > 0 {
evtWhere += " AND " + strings.Join(eventConds, " AND ")
}
if metric == "sessionCount" {
return fmt.Sprintf(sessionSubQueryTpl, evtWhere, sessWhere, evtNameClause, having, sessJoin)
}
return fmt.Sprintf(subQueryTpl, evtWhere, sessWhere, evtNameClause, having, sessJoin)
if len(evConds) > 0 {
having = buildHavingClause(evConds)
}
func buildMainQuery(p Payload, subQuery, metric string) string {
step := int(getStepSize(p.StartTimestamp, p.EndTimestamp, p.Density, false, 1000))
step = step * 1000
if metric == "sessionCount" {
return fmt.Sprintf(sessionMainQueryTpl, p.StartTimestamp, p.EndTimestamp, step, subQuery, step)
}
return fmt.Sprintf(mainQueryTpl, p.StartTimestamp, p.EndTimestamp, step, subQuery, step)
whereEvt := staticEvt
if len(evConds) > 0 {
whereEvt += " AND " + strings.Join(evConds, " AND ")
}
var subQueryTpl = `
SELECT multiIf(
s.user_id IS NOT NULL AND s.user_id != '', s.user_id,
s.user_anonymous_id IS NOT NULL AND s.user_anonymous_id != '', s.user_anonymous_id,
toString(s.user_uuid)) AS user_id,
s.datetime AS datetime
FROM (
SELECT main.session_id,
MIN(main.created_at) AS first_event_ts,
MAX(main.created_at) AS last_event_ts
FROM product_analytics.events AS main
WHERE %s
AND main.session_id IN (
SELECT s.session_id
FROM experimental.sessions AS s
WHERE %s
proj := map[string]string{
"sessionCount": "s.session_id AS session_id",
"userCount": "multiIf(s.user_id!='',s.user_id,s.user_anonymous_id!='',s.user_anonymous_id,toString(s.user_uuid)) AS user_id",
}[metric] + ", s.datetime AS datetime"
return fmt.Sprintf(
"SELECT %s FROM (SELECT main.session_id, MIN(main.created_at) AS first_event_ts, MAX(main.created_at) AS last_event_ts "+
"FROM product_analytics.events AS main "+
"WHERE %s AND main.session_id IN (SELECT s.session_id FROM experimental.sessions AS s WHERE %s) %s "+
"GROUP BY main.session_id %s "+
"INNER JOIN (SELECT * FROM experimental.sessions AS s WHERE %s) AS s ON s.session_id=f.session_id",
proj, whereEvt, sessWhere, nameClause, having, sessJoin,
)
%s
GROUP BY session_id
%s
INNER JOIN (
SELECT *
FROM experimental.sessions AS s
WHERE %s
) AS s ON (s.session_id = f.session_id)
`
var noFiltersSubQueryTpl = `
SELECT multiIf(
s.user_id IS NOT NULL AND s.user_id != '', s.user_id,
s.user_anonymous_id IS NOT NULL AND s.user_anonymous_id != '', s.user_anonymous_id,
toString(s.user_uuid)) AS user_id,
s.datetime AS datetime
FROM experimental.sessions AS s
WHERE %s
`
var sessionSubQueryTpl = `
SELECT s.session_id AS session_id,
s.datetime AS datetime
FROM (
SELECT main.session_id,
MIN(main.created_at) AS first_event_ts,
MAX(main.created_at) AS last_event_ts
FROM product_analytics.events AS main
WHERE %s
AND main.session_id IN (
SELECT s.session_id
FROM experimental.sessions AS s
WHERE %s
)
%s
GROUP BY session_id
%s
INNER JOIN (
SELECT *
FROM experimental.sessions AS s
WHERE %s
) AS s ON (s.session_id = f.session_id)
`
var sessionNoFiltersSubQueryTpl = `
SELECT s.session_id AS session_id,
s.datetime AS datetime
FROM experimental.sessions AS s
WHERE %s
`
var mainQueryTpl = `
SELECT gs.generate_series AS timestamp,
COALESCE(COUNT(DISTINCT processed_sessions.user_id), 0) AS count
FROM generate_series(%d, %d, %d) AS gs
LEFT JOIN (
%s
) AS processed_sessions ON (TRUE)
WHERE processed_sessions.datetime >= toDateTime(timestamp / 1000)
AND processed_sessions.datetime < toDateTime((timestamp + %d) / 1000)
GROUP BY timestamp
ORDER BY timestamp;
`
var sessionMainQueryTpl = `
SELECT gs.generate_series AS timestamp,
COALESCE(COUNT(DISTINCT processed_sessions.session_id), 0) AS count
FROM generate_series(%d, %d, %d) AS gs
LEFT JOIN (
%s
) AS processed_sessions ON (TRUE)
WHERE processed_sessions.datetime >= toDateTime(timestamp / 1000)
AND processed_sessions.datetime < toDateTime((timestamp + %d) / 1000)
GROUP BY timestamp
ORDER BY timestamp;
`
}

View file

@ -55,6 +55,7 @@ var propertyKeyMap = map[string]filterConfig{
"INPUT": {LogicalProperty: "label"},
"fetchUrl": {LogicalProperty: "url_path"},
"fetchStatusCode": {LogicalProperty: "status", IsNumeric: true},
//"fetchDuration": {LogicalProperty: "duration", IsNumeric: true},
//"ISSUE": {LogicalProperty: "issue_type"},
// TODO add more mappings as needed
}
@ -68,6 +69,13 @@ type filterConfig struct {
func getColumnAccessor(logical string, isNumeric bool, opts BuildConditionsOptions) string {
// helper: wrap names starting with $ in quotes
quote := func(name string) string {
prefix := opts.MainTableAlias + "."
if strings.HasPrefix(name, prefix) {
suffix := strings.TrimPrefix(name, prefix)
if strings.HasPrefix(suffix, "$") {
return fmt.Sprintf("%s.\"%s\"", opts.MainTableAlias, suffix)
}
}
if strings.HasPrefix(name, "$") {
return fmt.Sprintf("\"%s\"", name)
}
@ -101,7 +109,7 @@ func getColumnAccessor(logical string, isNumeric bool, opts BuildConditionsOptio
// JSON extraction
if isNumeric {
return fmt.Sprintf("toFloat64(JSONExtractString(toString(%s), '%s'))", colName, propKey)
return fmt.Sprintf("JSONExtractFloat(toString(%s), '%s')", colName, propKey)
}
return fmt.Sprintf("JSONExtractString(toString(%s), '%s')", colName, propKey)
}
@ -231,6 +239,8 @@ func buildCond(expr string, values []string, operator string, isNumeric bool) st
case "in", "notIn":
neg := operator == "notIn"
return inClause(expr, values, neg, isNumeric)
case ">=", ">", "<=", "<":
return multiValCond(expr, values, "%s "+operator+" %s", isNumeric)
default:
if op, ok := compOps[operator]; ok {
tmpl := "%s " + op + " %s"