diff --git a/backend/pkg/analytics/charts/charts.go b/backend/pkg/analytics/charts/charts.go index 46a40d364..c460974cc 100644 --- a/backend/pkg/analytics/charts/charts.go +++ b/backend/pkg/analytics/charts/charts.go @@ -32,7 +32,7 @@ func (s *chartsImpl) GetData(projectId int, userID uint64, req *MetricPayload) ( return nil, fmt.Errorf("request is empty") } - payload := &Payload{ + payload := Payload{ ProjectId: projectId, UserId: userID, MetricPayload: req, diff --git a/backend/pkg/analytics/charts/metric_funnel.go b/backend/pkg/analytics/charts/metric_funnel.go index 0f58c886d..9de3a9dee 100644 --- a/backend/pkg/analytics/charts/metric_funnel.go +++ b/backend/pkg/analytics/charts/metric_funnel.go @@ -4,6 +4,6 @@ import "openreplay/backend/pkg/analytics/db" type FunnelQueryBuilder struct{} -func (f FunnelQueryBuilder) Execute(p *Payload, conn db.Connector) (interface{}, error) { +func (f FunnelQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) { return "-- Funnel query placeholder", nil } diff --git a/backend/pkg/analytics/charts/metric_table.go b/backend/pkg/analytics/charts/metric_table.go index 46135fd55..bf0fe279c 100644 --- a/backend/pkg/analytics/charts/metric_table.go +++ b/backend/pkg/analytics/charts/metric_table.go @@ -8,11 +8,11 @@ import ( type TableQueryBuilder struct{} -func (t TableQueryBuilder) Execute(p *Payload, conn db.Connector) (interface{}, error) { +func (t TableQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) { return t.buildQuery(p) } -func (t TableQueryBuilder) buildQuery(r *Payload) (string, error) { +func (t TableQueryBuilder) buildQuery(r Payload) (string, error) { s := r.Series[0] sessionFilters, eventFilters := partitionFilters(s.Filter.Filters) sessionWhere := buildSessionWhere(sessionFilters) @@ -86,168 +86,3 @@ func (t TableQueryBuilder) buildQuery(r *Payload) (string, error) { return final, nil } - -func partitionFilters(filters []Filter) (sessionFilters []Filter, eventFilters []Filter) { - for _, f := range filters { - if f.IsEvent { - eventFilters = append(eventFilters, f) - } else { - sessionFilters = append(sessionFilters, f) - } - } - return -} - -func buildSessionWhere(filters []Filter) []string { - var conds []string - for _, f := range filters { - switch f.Type { - case FilterUserCountry: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCountry, concatValues(f.Value))) - case FilterUserCity: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCity, concatValues(f.Value))) - case FilterUserState: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserState, concatValues(f.Value))) - case FilterUserId: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserID, concatValues(f.Value))) - case FilterUserAnonymousId: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserAnonymousID, concatValues(f.Value))) - case FilterUserOs: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserOS, concatValues(f.Value))) - case FilterUserBrowser: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserBrowser, concatValues(f.Value))) - case FilterUserDevice: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDevice, concatValues(f.Value))) - case FilterPlatform: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDeviceType, concatValues(f.Value))) - case FilterRevId: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColRevID, concatValues(f.Value))) - case FilterReferrer: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColBaseReferrer, concatValues(f.Value))) - case FilterDuration: - if len(f.Value) == 2 { - conds = append(conds, fmt.Sprintf("%s >= '%s'", ColDuration, f.Value[0])) - conds = append(conds, fmt.Sprintf("%s <= '%s'", ColDuration, f.Value[1])) - } - case FilterUtmSource: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmSource, concatValues(f.Value))) - case FilterUtmMedium: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmMedium, concatValues(f.Value))) - case FilterUtmCampaign: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmCampaign, concatValues(f.Value))) - case FilterMetadata: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColMetadata1, concatValues(f.Value))) - } - } - // adding /n to each condition for better readability, can be removed. - for i := range conds { - conds[i] += "\n" - } - return conds -} - -func concatValues(v []string) string { - return strings.Join(v, "") -} - -func buildEventsWhere(filters []Filter, order EventOrder) (eventConditions []string, having string) { - basicEventTypes := "(" + - strings.Join([]string{ - fmt.Sprintf("%s = 'CLICK'", ColEventName), - fmt.Sprintf("%s = 'INPUT'", ColEventName), - fmt.Sprintf("%s = 'LOCATION'", ColEventName), - fmt.Sprintf("%s = 'CUSTOM'", ColEventName), - fmt.Sprintf("%s = 'REQUEST'", ColEventName), - }, " OR ") + ")" - - var seq []string - for _, f := range filters { - switch f.Type { - case FilterClick: - seq = append(seq, seqCond("CLICK", "selector", f)) - case FilterInput: - seq = append(seq, seqCond("INPUT", "label", f)) - case FilterLocation: - seq = append(seq, seqCond("LOCATION", "url_path", f)) - case FilterCustom: - seq = append(seq, seqCond("CUSTOM", "name", f)) - case FilterFetch: - seq = append(seq, seqFetchCond("REQUEST", f)) - case FilterFetchStatusCode: - seq = append(seq, seqCond("REQUEST", "status", f)) - default: - seq = append(seq, fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(string(f.Type)))) - } - } - eventConditions = []string{basicEventTypes} - - // then => sequenceMatch - // or => OR - // and => AND - switch order { - case EventOrderThen: - var pattern []string - for i := range seq { - pattern = append(pattern, fmt.Sprintf("(?%d)", i+1)) - } - having = fmt.Sprintf("sequenceMatch('%s')(\n%s,\n%s)", - strings.Join(pattern, ""), fmt.Sprintf("toUnixTimestamp(%s)", ColEventTime), strings.Join(seq, ",\n")) - case EventOrderAnd: - // build AND - having = strings.Join(seq, " AND ") - default: - // default => OR - var orParts []string - for _, p := range seq { - orParts = append(orParts, "("+p+")") - } - having = strings.Join(orParts, " OR ") - } - return -} - -func seqCond(eventName, key string, f Filter) string { - op := parseOperator(f.Operator) - return fmt.Sprintf("(%s = '%s' AND JSONExtractString(toString(%s), '%s') %s '%s')", - ColEventName, strings.ToUpper(eventName), ColEventProperties, key, op, concatValues(f.Value)) -} - -func seqFetchCond(eventName string, f Filter) string { - w := []string{fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(eventName))} - var extras []string - for _, c := range f.Filters { - switch c.Type { - case FilterFetch: - if len(c.Value) > 0 { - extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventURLPath, concatValues(c.Value))) - } - case FilterFetchStatusCode: - if len(c.Value) > 0 { - extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventStatus, concatValues(c.Value))) - } - default: - // placeholder if needed - } - } - if len(extras) > 0 { - w = append(w, strings.Join(extras, " AND ")) - } - return "(" + strings.Join(w, " AND ") + ")" -} - -func parseOperator(op string) string { - // TODO implement this properly - switch strings.ToLower(op) { - case OperatorStringContains: - return "LIKE" - case OperatorStringIs, OperatorStringOn, "=", OperatorStringOnAny: - return "=" - case OperatorStringStartsWith: - return "LIKE" - case OperatorStringEndsWith: - // might interpret differently in real impl - return "=" - default: - return "=" - } -} diff --git a/backend/pkg/analytics/charts/metric_timeseries.go b/backend/pkg/analytics/charts/metric_timeseries.go index cc1df46c3..976b42b1b 100644 --- a/backend/pkg/analytics/charts/metric_timeseries.go +++ b/backend/pkg/analytics/charts/metric_timeseries.go @@ -4,11 +4,12 @@ import ( "fmt" "log" "openreplay/backend/pkg/analytics/db" + "strings" ) type TimeSeriesQueryBuilder struct{} -func (t TimeSeriesQueryBuilder) Execute(p *Payload, conn db.Connector) (interface{}, error) { +func (t TimeSeriesQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) { query, err := t.buildQuery(p) if err != nil { log.Fatalf("Error building query: %v", err) @@ -23,7 +24,6 @@ func (t TimeSeriesQueryBuilder) Execute(p *Payload, conn db.Connector) (interfac defer rows.Close() var results []DataPoint - for rows.Next() { var res DataPoint if err := rows.Scan(&res.Timestamp, &res.Count); err != nil { @@ -37,7 +37,7 @@ func (t TimeSeriesQueryBuilder) Execute(p *Payload, conn db.Connector) (interfac return filled, nil } -func (t TimeSeriesQueryBuilder) buildQuery(p *Payload) (string, error) { +func (t TimeSeriesQueryBuilder) buildQuery(p Payload) (string, error) { query := "" switch p.MetricOf { case "sessionCount": @@ -50,67 +50,102 @@ func (t TimeSeriesQueryBuilder) buildQuery(p *Payload) (string, error) { return query, nil } -func (TimeSeriesQueryBuilder) buildSessionCountQuery(p *Payload) string { - stepSize := int(getStepSize(p.StartTimestamp, p.EndTimestamp, p.Density, false, 1000)) - subquery := buildEventSubquery(p) - return fmt.Sprintf(`SELECT toUnixTimestamp( - toStartOfInterval(processed_sessions.datetime, INTERVAL %d second) -) * 1000 AS timestamp, -COUNT(processed_sessions.session_id) AS count -FROM ( - %s -) AS processed_sessions -GROUP BY timestamp -ORDER BY timestamp;`, stepSize, subquery) +func (TimeSeriesQueryBuilder) buildSessionCountQuery(p Payload) string { + eventConds, eventNames := buildEventConditions(p.Series[0].Filter.Filters) + sessionConds := buildSessionConditions(p.Series[0].Filter.Filters) + staticEvt := buildStaticEventWhere(p) + sessWhere, sessJoin := buildStaticSessionWhere(p, sessionConds) + eventsSubQuery := buildEventsSubQuery(eventConds, eventNames, staticEvt, sessWhere, sessJoin) + mainQuery := buildMainQuery(p, eventsSubQuery) + return mainQuery } -func (TimeSeriesQueryBuilder) buildUserCountQuery(p *Payload) string { - stepSize := int(getStepSize(p.StartTimestamp, p.EndTimestamp, p.Density, false, 1000)) - subquery := buildEventSubquery(p) - return fmt.Sprintf(`SELECT toUnixTimestamp( - toStartOfInterval(processed_sessions.datetime, INTERVAL %d second) -) * 1000 AS timestamp, -COUNT(DISTINCT processed_sessions.user_id) AS count -FROM ( - %s -) AS processed_sessions -GROUP BY timestamp -ORDER BY timestamp;`, stepSize, subquery) +func (TimeSeriesQueryBuilder) buildUserCountQuery(p Payload) string { + eventConds, eventNames := buildEventConditions(p.Series[0].Filter.Filters) + sessionConds := buildSessionConditions(p.Series[0].Filter.Filters) + staticEvt := buildStaticEventWhere(p) + sessWhere, sessJoin := buildStaticSessionWhere(p, sessionConds) + eventsSubQuery := buildEventsSubQuery(eventConds, eventNames, staticEvt, sessWhere, sessJoin) + mainQuery := buildMainQuery(p, eventsSubQuery) + return mainQuery } -func FillMissingDataPoints( - startTime, endTime int64, - density int, - neutral DataPoint, - rows []DataPoint, - timeCoefficient int64, -) []DataPoint { - if density <= 1 { - return rows +func buildEventsSubQuery(eventConds, eventNames []string, staticEvt, sessWhere, sessJoin string) string { + if len(eventConds) == 0 && len(eventNames) == 0 { + return fmt.Sprintf(noEventSubQueryTpl, sessJoin) } - - stepSize := uint64(getStepSize(startTime, endTime, density, false, 1000)) - bucketSize := stepSize * uint64(timeCoefficient) - - lookup := make(map[uint64]DataPoint) - for _, dp := range rows { - if dp.Timestamp < uint64(startTime) { - continue - } - bucket := uint64(startTime) + (((dp.Timestamp - uint64(startTime)) / bucketSize) * bucketSize) - lookup[bucket] = dp - } - - results := make([]DataPoint, 0, density) - for i := 0; i < density; i++ { - ts := uint64(startTime) + uint64(i)*bucketSize - if dp, ok := lookup[ts]; ok { - results = append(results, dp) - } else { - nd := neutral - nd.Timestamp = ts - results = append(results, nd) + var evtNameClause string + var unique []string + for _, name := range eventNames { + if !contains(unique, name) { + unique = append(unique, name) } } - return results + if len(unique) > 0 { + evtNameClause = fmt.Sprintf("AND main.`$event_name` IN (%s)", buildInClause(unique)) + } + having := "" + if len(eventConds) > 0 { + having = buildHavingClause(eventConds) + } + evtWhere := staticEvt + if len(eventConds) > 0 { + evtWhere += " AND " + strings.Join(eventConds, " AND ") + } + return fmt.Sprintf(eventsSubQueryTpl, evtWhere, sessWhere, evtNameClause, having, sessJoin) } + +func buildMainQuery(p Payload, subQuery string) string { + step := int(getStepSize(p.StartTimestamp, p.EndTimestamp, p.Density, false, 1000)) + return fmt.Sprintf(mainQueryTpl, p.StartTimestamp, p.EndTimestamp, step, subQuery, step) +} + +var eventsSubQueryTpl = ` +SELECT multiIf( + s.user_id IS NOT NULL AND s.user_id != '', s.user_id, + s.user_anonymous_id IS NOT NULL AND s.user_anonymous_id != '', s.user_anonymous_id, + toString(s.user_uuid)) AS user_id, + s.datetime AS datetime +FROM ( + SELECT main.session_id, + MIN(main.created_at) AS first_event_ts, + MAX(main.created_at) AS last_event_ts + FROM product_analytics.events AS main + WHERE %s + AND main.session_id IN ( + SELECT s.session_id + FROM experimental.sessions AS s + WHERE %s + ) + %s + GROUP BY session_id + %s + INNER JOIN ( + SELECT * + FROM experimental.sessions AS s + WHERE %s + ) AS s ON (s.session_id = f.session_id) +` + +var noEventSubQueryTpl = ` +SELECT multiIf( + s.user_id IS NOT NULL AND s.user_id != '', s.user_id, + s.user_anonymous_id IS NOT NULL AND s.user_anonymous_id != '', s.user_anonymous_id, + toString(s.user_uuid)) AS user_id, + s.datetime AS datetime +FROM experimental.sessions AS s +WHERE %s +` + +var mainQueryTpl = ` +SELECT gs.generate_series AS timestamp, + COALESCE(COUNT(DISTINCT processed_sessions.user_id), 0) AS count +FROM generate_series(%d, %d, %d) AS gs +LEFT JOIN ( + %s +) AS processed_sessions ON (TRUE) +WHERE processed_sessions.datetime >= toDateTime(timestamp / 1000) + AND processed_sessions.datetime < toDateTime((timestamp + %d) / 1000) +GROUP BY timestamp +ORDER BY timestamp; +` diff --git a/backend/pkg/analytics/charts/model.go b/backend/pkg/analytics/charts/model.go index 721008ea8..b83790578 100644 --- a/backend/pkg/analytics/charts/model.go +++ b/backend/pkg/analytics/charts/model.go @@ -68,12 +68,14 @@ type MetricPayload struct { Series []Series `json:"series"` } +type FilterGroup struct { + Filters []Filter `json:"filters"` + EventsOrder EventOrder `json:"eventsOrder"` +} + type Series struct { - Name string `json:"name"` - Filter struct { - Filters []Filter `json:"filters"` - EventsOrder EventOrder `json:"eventsOrder"` - } `json:"filter"` + Name string `json:"name"` + Filter FilterGroup `json:"filter"` } type Filter struct { @@ -81,6 +83,7 @@ type Filter struct { IsEvent bool `json:"isEvent"` Value []string `json:"value"` Operator string `json:"operator"` + Source string `json:"source,omitempty"` Filters []Filter `json:"filters"` } @@ -109,11 +112,10 @@ const ( FilterClick FilterType = "click" FilterInput FilterType = "input" FilterLocation FilterType = "location" + FilterTag FilterType = "tag" FilterCustom FilterType = "customEvent" FilterFetch FilterType = "fetch" - FilterFetchStatusCode FilterType = "status" - FilterTag FilterType = "tag" - FilterNetworkRequest FilterType = "fetch" + FilterFetchStatusCode FilterType = "fetchStatusCode" // Subfilter FilterGraphQLRequest FilterType = "graphql" FilterStateAction FilterType = "stateAction" FilterError FilterType = "error" @@ -139,7 +141,7 @@ const ( OperatorStringIsNot = "isNot" OperatorStringIsUndefined = "isUndefined" OperatorStringNotOn = "notOn" - OperatorStringContains = "contains" + OperatorContains = "contains" OperatorStringNotContains = "notContains" OperatorStringStartsWith = "startsWith" OperatorStringEndsWith = "endsWith" diff --git a/backend/pkg/analytics/charts/query.go b/backend/pkg/analytics/charts/query.go index 1d82bd5da..b018d2ce9 100644 --- a/backend/pkg/analytics/charts/query.go +++ b/backend/pkg/analytics/charts/query.go @@ -13,10 +13,10 @@ type Payload struct { } type QueryBuilder interface { - Execute(p *Payload, conn db.Connector) (interface{}, error) + Execute(p Payload, conn db.Connector) (interface{}, error) } -func NewQueryBuilder(p *Payload) (QueryBuilder, error) { +func NewQueryBuilder(p Payload) (QueryBuilder, error) { switch p.MetricType { case MetricTypeTimeseries: return TimeSeriesQueryBuilder{}, nil @@ -29,122 +29,496 @@ func NewQueryBuilder(p *Payload) (QueryBuilder, error) { } } -func buildEventSubquery(p *Payload) string { - baseEventsWhere := buildBaseEventsWhere(p) - sequenceCond := buildSequenceCondition(p.Series) - sessionsWhere := buildSessionsWhere(p) +//func pickIDField(p Payload) string { +// if p.MetricOf == "userCount" { +// return "user_id" +// } +// return "session_id" +//} - if sequenceCond.seqPattern == "" { - return fmt.Sprintf(` -SELECT s.%[1]s AS %[1]s, - s.datetime AS datetime -FROM ( - SELECT main.session_id, - MIN(main.created_at) AS first_event_ts, - MAX(main.created_at) AS last_event_ts - FROM product_analytics.events AS main - WHERE %[2]s - GROUP BY session_id -) AS f -INNER JOIN ( - SELECT * - FROM experimental.sessions AS s - WHERE %[3]s -) AS s ON (s.session_id = f.session_id) -`, pickIDField(p), baseEventsWhere, sessionsWhere) - } +//func buildBaseEventsWhere(p Payload) string { +// ts := fmt.Sprintf( +// `(main.created_at >= toDateTime(%d / 1000) AND main.created_at <= toDateTime(%d / 1000))`, +// p.StartTimestamp, +// p.EndTimestamp, +// ) +// return fmt.Sprintf(`main.project_id = %d AND %s`, p.ProjectId, ts) +//} - return fmt.Sprintf(` -SELECT s.%[1]s AS %[1]s, - s.datetime AS datetime -FROM ( - SELECT main.session_id, - MIN(main.created_at) AS first_event_ts, - MAX(main.created_at) AS last_event_ts - FROM product_analytics.events AS main - WHERE %[2]s - GROUP BY session_id - HAVING sequenceMatch('%[3]s')(toDateTime(main.created_at), %[4]s) -) AS f -INNER JOIN ( - SELECT * - FROM experimental.sessions AS s - WHERE %[5]s -) AS s ON (s.session_id = f.session_id) -`, pickIDField(p), baseEventsWhere, sequenceCond.seqPattern, sequenceCond.seqEvents, sessionsWhere) -} +//func buildSessionsWhere(p Payload) string { +// ts := fmt.Sprintf( +// `(s.datetime >= toDateTime(%d / 1000) AND s.datetime <= toDateTime(%d / 1000))`, +// p.StartTimestamp, +// p.EndTimestamp, +// ) +// return fmt.Sprintf(`s.project_id = %d AND isNotNull(s.duration) AND %s`, p.ProjectId, ts) +//} -func pickIDField(p *Payload) string { - if p.MetricOf == "userCount" { - return "user_id" - } - return "session_id" -} +//type sequenceParts struct { +// seqPattern string +// seqEvents string +//} -func buildBaseEventsWhere(p *Payload) string { - ts := fmt.Sprintf( - `(main.created_at >= toDateTime(%d / 1000) AND main.created_at <= toDateTime(%d / 1000))`, - p.StartTimestamp, - p.EndTimestamp, - ) - return fmt.Sprintf(`main.project_id = %d AND %s`, p.ProjectId, ts) -} +//func buildSequenceCondition(series []Series) sequenceParts { +// var events []string +// for _, s := range series { +// if len(s.Filter.Filters) > 0 { +// events = append(events, buildOneSeriesSequence(s.Filter.Filters)) +// } +// } +// if len(events) < 2 { +// return sequenceParts{"", ""} +// } +// pattern := "" +// for i := 1; i <= len(events); i++ { +// pattern += fmt.Sprintf("(?%d)", i) +// } +// return sequenceParts{ +// seqPattern: pattern, +// seqEvents: strings.Join(events, ", "), +// } +//} -func buildSessionsWhere(p *Payload) string { - ts := fmt.Sprintf( - `(s.datetime >= toDateTime(%d / 1000) AND s.datetime <= toDateTime(%d / 1000))`, - p.StartTimestamp, - p.EndTimestamp, - ) - return fmt.Sprintf(`s.project_id = %d AND isNotNull(s.duration) AND %s`, p.ProjectId, ts) -} +//func buildOneSeriesSequence(filters []Filter) string { +// return strings.Join(buildFilterConditions(filters), " AND ") +//} +// +//func buildFilterConditions(filters []Filter) []string { +// var out []string +// for _, f := range filters { +// switch f.Type { +// case FilterClick: +// out = append(out, +// fmt.Sprintf(`(main."$event_name" = 'CLICK' AND JSONExtractString(toString(main."$properties"), 'label') IN ('%s'))`, +// strings.Join(f.Value, "','"))) +// case FilterInput: +// out = append(out, +// fmt.Sprintf(`(main."$event_name" = 'INPUT' AND JSONExtractString(toString(main."$properties"), 'label') IN ('%s'))`, +// strings.Join(f.Value, "','"))) +// +// default: +// out = append(out, +// fmt.Sprintf(`(main."$event_name" = '%s')`, strings.ToUpper(string(f.Type)))) +// } +// } +// return out +//} -type sequenceParts struct { - seqPattern string - seqEvents string -} - -func buildSequenceCondition(series []Series) sequenceParts { - var events []string - for _, s := range series { - if len(s.Filter.Filters) > 0 { - events = append(events, buildOneSeriesSequence(s.Filter.Filters)) +func partitionFilters(filters []Filter) (sessionFilters []Filter, eventFilters []Filter) { + for _, f := range filters { + if f.IsEvent { + eventFilters = append(eventFilters, f) + } else { + sessionFilters = append(sessionFilters, f) } } - if len(events) < 2 { - return sequenceParts{"", ""} - } - pattern := "" - for i := 1; i <= len(events); i++ { - pattern += fmt.Sprintf("(?%d)", i) - } - return sequenceParts{ - seqPattern: pattern, - seqEvents: strings.Join(events, ", "), - } + return } -func buildOneSeriesSequence(filters []Filter) string { - return strings.Join(buildFilterConditions(filters), " AND ") -} +func buildEventsWhere(filters []Filter, order EventOrder) (eventConditions []string, having string) { + basicEventTypes := "(" + + strings.Join([]string{ + fmt.Sprintf("%s = 'CLICK'", ColEventName), + fmt.Sprintf("%s = 'INPUT'", ColEventName), + fmt.Sprintf("%s = 'LOCATION'", ColEventName), + fmt.Sprintf("%s = 'CUSTOM'", ColEventName), + fmt.Sprintf("%s = 'REQUEST'", ColEventName), + }, " OR ") + ")" -func buildFilterConditions(filters []Filter) []string { - var out []string + var seq []string for _, f := range filters { switch f.Type { case FilterClick: - out = append(out, - fmt.Sprintf(`(main."$event_name" = 'CLICK' AND JSONExtractString(toString(main."$properties"), 'label') IN ('%s'))`, - strings.Join(f.Value, "','"))) + seq = append(seq, seqCond("CLICK", "selector", f)) case FilterInput: - out = append(out, - fmt.Sprintf(`(main."$event_name" = 'INPUT' AND JSONExtractString(toString(main."$properties"), 'label') IN ('%s'))`, - strings.Join(f.Value, "','"))) - + seq = append(seq, seqCond("INPUT", "label", f)) + case FilterLocation: + seq = append(seq, seqCond("LOCATION", "url_path", f)) + case FilterCustom: + seq = append(seq, seqCond("CUSTOM", "name", f)) + case FilterFetch: + seq = append(seq, seqFetchCond("REQUEST", f)) + case FilterFetchStatusCode: + seq = append(seq, seqCond("REQUEST", "status", f)) default: - out = append(out, - fmt.Sprintf(`(main."$event_name" = '%s')`, strings.ToUpper(string(f.Type)))) + seq = append(seq, fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(string(f.Type)))) } } - return out + eventConditions = []string{basicEventTypes} + + // then => sequenceMatch + // or => OR + // and => AND + switch order { + case EventOrderThen: + var pattern []string + for i := range seq { + pattern = append(pattern, fmt.Sprintf("(?%d)", i+1)) + } + having = fmt.Sprintf("sequenceMatch('%s')(\n%s,\n%s)", + strings.Join(pattern, ""), fmt.Sprintf("toUnixTimestamp(%s)", ColEventTime), strings.Join(seq, ",\n")) + case EventOrderAnd: + // build AND + having = strings.Join(seq, " AND ") + case EventOrderOr: + default: + // default => OR + var orParts []string + for _, p := range seq { + orParts = append(orParts, "("+p+")") + } + having = strings.Join(orParts, " OR ") + } + return eventConditions, having +} + +func buildSessionWhere(filters []Filter) []string { + var conds []string + for _, f := range filters { + switch f.Type { + case FilterUserCountry: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCountry, concatValues(f.Value))) + case FilterUserCity: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCity, concatValues(f.Value))) + case FilterUserState: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserState, concatValues(f.Value))) + case FilterUserId: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserID, concatValues(f.Value))) + case FilterUserAnonymousId: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserAnonymousID, concatValues(f.Value))) + case FilterUserOs: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserOS, concatValues(f.Value))) + case FilterUserBrowser: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserBrowser, concatValues(f.Value))) + case FilterUserDevice: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDevice, concatValues(f.Value))) + case FilterPlatform: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDeviceType, concatValues(f.Value))) + case FilterRevId: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColRevID, concatValues(f.Value))) + case FilterReferrer: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColBaseReferrer, concatValues(f.Value))) + case FilterDuration: + if len(f.Value) == 2 { + conds = append(conds, fmt.Sprintf("%s >= '%s'", ColDuration, f.Value[0])) + conds = append(conds, fmt.Sprintf("%s <= '%s'", ColDuration, f.Value[1])) + } + case FilterUtmSource: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmSource, concatValues(f.Value))) + case FilterUtmMedium: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmMedium, concatValues(f.Value))) + case FilterUtmCampaign: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmCampaign, concatValues(f.Value))) + case FilterMetadata: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColMetadata1, concatValues(f.Value))) + } + } + // adding /n to each condition for better readability, can be removed. + for i := range conds { + conds[i] += "\n" + } + return conds +} + +func concatValues(v []string) string { + return strings.Join(v, "") +} + +func seqCond(eventName, key string, f Filter) string { + op := parseOperator(f.Operator) + return fmt.Sprintf("(%s = '%s' AND JSONExtractString(toString(%s), '%s') %s '%s')", + ColEventName, strings.ToUpper(eventName), ColEventProperties, key, op, concatValues(f.Value)) +} + +func seqFetchCond(eventName string, f Filter) string { + w := []string{fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(eventName))} + var extras []string + for _, c := range f.Filters { + switch c.Type { + case FilterFetch: + if len(c.Value) > 0 { + extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventURLPath, concatValues(c.Value))) + } + case FilterFetchStatusCode: + if len(c.Value) > 0 { + extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventStatus, concatValues(c.Value))) + } + default: + // placeholder if needed + } + } + if len(extras) > 0 { + w = append(w, strings.Join(extras, " AND ")) + } + return "(" + strings.Join(w, " AND ") + ")" +} + +func parseOperator(op string) string { + // TODO implement this properly + switch strings.ToLower(op) { + case OperatorContains: + return "LIKE" + case OperatorStringIs, OperatorStringOn, "=", OperatorStringOnAny: + return "=" + case OperatorStringStartsWith: + return "LIKE" + case OperatorStringEndsWith: + // might interpret differently in real impl + return "=" + default: + return "=" + } +} + +func buildEventConditions(filters []Filter) (conds, names []string) { + for _, f := range filters { + if f.IsEvent { + switch f.Type { + case FilterClick: + c := buildCond("JSONExtractString(toString(main.`$properties`), 'label')", f.Value, "is") + if c != "" { + conds = append(conds, c) + } + names = append(names, "CLICK") + case FilterInput: + c := buildCond("JSONExtractString(toString(main.`$properties`), 'label')", f.Value, f.Operator) + if c != "" { + conds = append(conds, c) + } + names = append(names, "INPUT") + case FilterLocation: + c := buildCond("JSONExtractString(toString(main.`$properties`), 'url_path')", f.Value, f.Operator) + if c != "" { + conds = append(conds, c) + } + names = append(names, "LOCATION") + case FilterCustom: + c := buildCond("JSONExtractString(toString(main.`$properties`), 'name')", f.Value, f.Operator) + if c != "" { + conds = append(conds, c) + } + names = append(names, "CUSTOM") + case FilterFetch: + var fetchConds []string + for _, nf := range f.Filters { + switch nf.Type { + case "fetchUrl": + c := buildCond("JSONExtractString(toString(main.`$properties`), 'url_path')", nf.Value, f.Operator) + if c != "" { + fetchConds = append(fetchConds, c) + } + case "fetchStatusCode": + c := buildCond("JSONExtractFloat(toString(main.`$properties`), 'status')", nf.Value, f.Operator) + if c != "" { + fetchConds = append(fetchConds, c) + } + } + } + if len(fetchConds) > 0 { + conds = append(conds, strings.Join(fetchConds, " AND ")) + } + names = append(names, "REQUEST") + case FilterTag: + c := buildCond("JSONExtractString(toString(main.`$properties`), 'tag')", f.Value, f.Operator) + if c != "" { + conds = append(conds, c) + } + names = append(names, "TAG") + } + } + } + return +} + +func buildSessionConditions(filters []Filter) []string { + var conds []string + for _, f := range filters { + if !f.IsEvent { + switch f.Type { + case FilterUserCountry: + conds = append(conds, buildCond("s.user_country", f.Value, f.Operator)) + case FilterUserCity: + conds = append(conds, buildCond("s.user_city", f.Value, f.Operator)) + case FilterUserState: + conds = append(conds, buildCond("s.user_state", f.Value, f.Operator)) + case FilterUserId: + conds = append(conds, buildCond("s.user_id", f.Value, f.Operator)) + case FilterUserAnonymousId: + conds = append(conds, buildCond("s.user_anonymous_id", f.Value, f.Operator)) + case FilterUserOs: + conds = append(conds, buildCond("s.user_os", f.Value, f.Operator)) + case FilterUserBrowser: + conds = append(conds, buildCond("s.user_browser", f.Value, f.Operator)) + case FilterUserDevice: + conds = append(conds, buildCond("s.user_device", f.Value, f.Operator)) + case FilterPlatform: + conds = append(conds, buildCond("s.user_device_type", f.Value, f.Operator)) + case FilterRevId: + conds = append(conds, buildCond("s.rev_id", f.Value, f.Operator)) + case FilterReferrer: + conds = append(conds, buildCond("s.base_referrer", f.Value, f.Operator)) + case FilterDuration: + if len(f.Value) == 2 { + conds = append(conds, fmt.Sprintf("s.duration >= '%s'", f.Value[0])) + conds = append(conds, fmt.Sprintf("s.duration <= '%s'", f.Value[1])) + } + case FilterUtmSource: + conds = append(conds, buildCond("s.utm_source", f.Value, f.Operator)) + case FilterUtmMedium: + conds = append(conds, buildCond("s.utm_medium", f.Value, f.Operator)) + case FilterUtmCampaign: + conds = append(conds, buildCond("s.utm_campaign", f.Value, f.Operator)) + case FilterMetadata: + if f.Source != "" { + conds = append(conds, buildCond(fmt.Sprintf("s.%s", f.Source), f.Value, f.Operator)) + } + } + } + } + return conds +} + +func buildCond(expr string, values []string, operator string) string { + if len(values) == 0 { + return "" + } + switch operator { + case "contains": + var conds []string + for _, v := range values { + conds = append(conds, fmt.Sprintf("%s ILIKE '%%%s%%'", expr, v)) + } + if len(conds) > 1 { + return "(" + strings.Join(conds, " OR ") + ")" + } + return conds[0] + case "notContains": + var conds []string + for _, v := range values { + conds = append(conds, fmt.Sprintf("NOT (%s ILIKE '%%%s%%')", expr, v)) + } + if len(conds) > 1 { + return "(" + strings.Join(conds, " OR ") + ")" + } + return conds[0] + case "startsWith": + var conds []string + for _, v := range values { + conds = append(conds, fmt.Sprintf("%s ILIKE '%s%%'", expr, v)) + } + if len(conds) > 1 { + return "(" + strings.Join(conds, " OR ") + ")" + } + return conds[0] + case "endsWith": + var conds []string + for _, v := range values { + conds = append(conds, fmt.Sprintf("%s ILIKE '%%%s'", expr, v)) + } + if len(conds) > 1 { + return "(" + strings.Join(conds, " OR ") + ")" + } + return conds[0] + default: + if len(values) > 1 { + var quoted []string + for _, v := range values { + quoted = append(quoted, fmt.Sprintf("'%s'", v)) + } + return fmt.Sprintf("%s IN (%s)", expr, strings.Join(quoted, ",")) + } + return fmt.Sprintf("%s = '%s'", expr, values[0]) + } +} + +func buildInClause(values []string) string { + var quoted []string + for _, v := range values { + quoted = append(quoted, fmt.Sprintf("'%s'", v)) + } + return strings.Join(quoted, ",") +} + +func buildStaticEventWhere(p Payload) string { + return strings.Join([]string{ + fmt.Sprintf("main.project_id = %d", p.ProjectId), + fmt.Sprintf("main.created_at >= toDateTime(%d / 1000)", p.StartTimestamp), + fmt.Sprintf("main.created_at <= toDateTime(%d / 1000)", p.EndTimestamp), + }, " AND ") +} + +func buildStaticSessionWhere(p Payload, sessionConds []string) (string, string) { + static := []string{fmt.Sprintf("s.project_id = %d", p.ProjectId)} + sessWhere := strings.Join(static, " AND ") + if len(sessionConds) > 0 { + sessWhere += " AND " + strings.Join(sessionConds, " AND ") + } + sessJoin := strings.Join(append(static, append(sessionConds, + fmt.Sprintf("s.datetime >= toDateTime(%d / 1000)", p.StartTimestamp), + fmt.Sprintf("s.datetime <= toDateTime(%d / 1000)", p.EndTimestamp))...), " AND ") + return sessWhere, sessJoin +} + +func buildHavingClause(conds []string) string { + seqConds := append([]string{}, conds...) + if len(seqConds) == 1 { + seqConds = append(seqConds, "1") + } + if len(seqConds) == 0 { + return "" + } + var parts []string + for i := range seqConds { + parts = append(parts, fmt.Sprintf("(?%d)", i+1)) + } + pattern := strings.Join(parts, "") + args := []string{"toDateTime(main.created_at)"} + args = append(args, seqConds...) + return fmt.Sprintf("HAVING sequenceMatch('%s')(%s)) AS f", pattern, strings.Join(args, ",\n ")) +} + +func contains(slice []string, s string) bool { + for _, v := range slice { + if v == s { + return true + } + } + return false +} + +func FillMissingDataPoints( + startTime, endTime int64, + density int, + neutral DataPoint, + rows []DataPoint, + timeCoefficient int64, +) []DataPoint { + if density <= 1 { + return rows + } + + stepSize := uint64(getStepSize(startTime, endTime, density, false, 1000)) + bucketSize := stepSize * uint64(timeCoefficient) + + lookup := make(map[uint64]DataPoint) + for _, dp := range rows { + if dp.Timestamp < uint64(startTime) { + continue + } + bucket := uint64(startTime) + (((dp.Timestamp - uint64(startTime)) / bucketSize) * bucketSize) + lookup[bucket] = dp + } + + results := make([]DataPoint, 0, density) + for i := 0; i < density; i++ { + ts := uint64(startTime) + uint64(i)*bucketSize + if dp, ok := lookup[ts]; ok { + results = append(results, dp) + } else { + nd := neutral + nd.Timestamp = ts + results = append(results, nd) + } + } + return results }