From 8711648ac7f80d0fbe5af99de7c6de56b0ce2b79 Mon Sep 17 00:00:00 2001 From: Shekar Siri Date: Wed, 26 Feb 2025 12:43:54 +0100 Subject: [PATCH] feat(analytics): table charts wip --- backend/pkg/analytics/charts/metric_table.go | 222 +++++++++++++------ backend/pkg/analytics/charts/model.go | 15 ++ backend/pkg/analytics/charts/query.go | 160 +------------ backend/pkg/analytics/db/connector.go | 11 + 4 files changed, 186 insertions(+), 222 deletions(-) diff --git a/backend/pkg/analytics/charts/metric_table.go b/backend/pkg/analytics/charts/metric_table.go index bf0fe279c..5db49cccd 100644 --- a/backend/pkg/analytics/charts/metric_table.go +++ b/backend/pkg/analytics/charts/metric_table.go @@ -8,81 +8,173 @@ import ( type TableQueryBuilder struct{} +type TableValue struct { + Name string `json:"name"` + Total uint64 `json:"total"` +} + +type TableResponse struct { + Total uint64 `json:"total"` + Count uint64 `json:"count"` + Values []TableValue `json:"values"` +} + func (t TableQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) { - return t.buildQuery(p) + // validate metricOf with MetricOfTable return error if empty or not supported + if p.MetricOf == "" { + return nil, fmt.Errorf("MetricOf is empty") + } + + // Validate that p.MetricOf is one of the supported MetricOfTable types + isValidMetricOf := false + switch MetricOfTable(p.MetricOf) { + case MetricOfTableBrowser, MetricOfTableDevice, MetricOfTableCountry, + MetricOfTableUserId, MetricOfTableIssues, MetricOfTableLocation, + MetricOfTableSessions, MetricOfTableErrors, MetricOfTableReferrer, + MetricOfTableFetch: + isValidMetricOf = true + } + + if !isValidMetricOf { + return nil, fmt.Errorf("unsupported MetricOf type: %s", p.MetricOf) + } + + query, err := t.buildQuery(p) + if err != nil { + return nil, err + } + rows, err := conn.Query(query) + if err != nil { + return nil, err + } + defer rows.Close() + + var ( + totalCount uint64 + rowsCount uint64 + values []TableValue + ) + + for rows.Next() { + var ( + total uint64 + name string + ) + if err := rows.Scan(&totalCount, &name, &total, &rowsCount); err != nil { + return nil, err + } + values = append(values, TableValue{Name: name, Total: total}) + } + + return &TableResponse{ + Total: totalCount, + Count: rowsCount, + Values: values, + }, nil } func (t TableQueryBuilder) buildQuery(r Payload) (string, error) { s := r.Series[0] - sessionFilters, eventFilters := partitionFilters(s.Filter.Filters) - sessionWhere := buildSessionWhere(sessionFilters) - eventWhere, seqHaving := buildEventsWhere(eventFilters, s.Filter.EventsOrder) - subQuery := fmt.Sprintf( - "SELECT %s,\n"+ - " MIN(%s) AS first_event_ts,\n"+ - " MAX(%s) AS last_event_ts\n"+ - "FROM %s AS main\n"+ - "WHERE main.project_id = %%(project_id)s\n"+ - " AND %s >= toDateTime(%%(start_time)s/1000)\n"+ - " AND %s <= toDateTime(%%(end_time)s/1000)\n"+ - " AND (%s)\n"+ - "GROUP BY %s\n"+ - "HAVING %s", - ColEventSessionID, - ColEventTime, - ColEventTime, - TableEvents, - ColEventTime, - ColEventTime, - strings.Join(eventWhere, " OR "), - ColEventSessionID, - seqHaving, - ) - - joinQuery := fmt.Sprintf( - "SELECT *\n"+ - "FROM %s AS s\n"+ - "INNER JOIN (\n"+ - " SELECT DISTINCT ev.session_id, ev.`$current_url` AS url_path\n"+ - " FROM %s AS ev\n"+ - " WHERE ev.created_at >= toDateTime(%%(start_time)s/1000)\n"+ - " AND ev.created_at <= toDateTime(%%(end_time)s/1000)\n"+ - " AND ev.project_id = %%(project_id)s\n"+ - " AND ev.`$event_name` = 'LOCATION'\n"+ - ") AS extra_event USING (session_id)\n"+ - "WHERE s.project_id = %%(project_id)s\n"+ - " AND isNotNull(s.duration)\n"+ - " AND s.datetime >= toDateTime(%%(start_time)s/1000)\n"+ - " AND s.datetime <= toDateTime(%%(end_time)s/1000)\n", - TableSessions, - TableEvents, - ) - - if len(sessionWhere) > 0 { - joinQuery += " AND " + strings.Join(sessionWhere, " AND ") + "\n" + groupByColumn := r.MetricOf + if groupByColumn == "" { + return "", fmt.Errorf("MetricOf is empty") } - main := fmt.Sprintf( - "SELECT s.session_id AS session_id, s.url_path\n"+ - "FROM (\n%s\n) AS f\n"+ - "INNER JOIN (\n%s) AS s\n"+ - " ON (s.session_id = f.session_id)\n", - subQuery, - joinQuery, + sessionFilters, eventFilters := partitionFilters(s.Filter.Filters) + eventConds, eventNames := buildEventConditions(eventFilters) + eventWhere := buildStaticEventWhere(r) + if len(eventConds) > 0 { + eventWhere += " AND " + strings.Join(eventConds, " AND ") + } + if len(eventNames) > 0 { + eventWhere += " AND main.`$event_name` IN (" + buildInClause(eventNames) + ")" + } + + sessionConds := buildSessionConditions(sessionFilters) + sessWhere, _ := buildStaticSessionWhere(r, sessionConds) + + // Build event subquery + var eventSubQuery string + if len(eventConds) > 0 { + // With HAVING clause + var pattern strings.Builder + for i := 0; i < len(eventConds); i++ { + fmt.Fprintf(&pattern, "(?%d)", i+1) + } + + var args strings.Builder + args.WriteString("toDateTime(main.created_at)") + for _, cond := range eventConds { + args.WriteString(",\n ") + args.WriteString(cond) + } + + eventSubQuery = fmt.Sprintf( + "SELECT main.session_id, MIN(main.created_at) AS first_event_ts, MAX(main.created_at) AS last_event_ts "+ + "FROM %s AS main "+ + "WHERE %s "+ + "AND main.session_id IN (SELECT s.session_id FROM %s AS s WHERE %s) "+ + "GROUP BY main.session_id "+ + "HAVING sequenceMatch('%s')(%s)", + TableEvents, + eventWhere, + TableSessions, + sessWhere, + pattern.String(), + args.String(), + ) + } else { + // No HAVING clause needed + eventSubQuery = fmt.Sprintf( + "SELECT main.session_id, MIN(main.created_at) AS first_event_ts, MAX(main.created_at) AS last_event_ts "+ + "FROM %s AS main "+ + "WHERE %s "+ + "AND main.session_id IN (SELECT s.session_id FROM %s AS s WHERE %s) "+ + "GROUP BY main.session_id", + TableEvents, + eventWhere, + TableSessions, + sessWhere, + ) + } + + sessionsQuery := fmt.Sprintf( + "SELECT * FROM %s AS s WHERE s.project_id = %d AND isNotNull(s.duration)%s AND s.datetime >= toDateTime(%d/1000) AND s.datetime <= toDateTime(%d/1000)", + TableSessions, + r.ProjectId, + func() string { + if sessWhere != "" { + return " AND " + sessWhere + } + return "" + }(), + r.StartTimestamp, + r.EndTimestamp, ) - final := fmt.Sprintf( - "SELECT COUNT(DISTINCT url_path) OVER () AS main_count,\n"+ - " url_path AS name,\n"+ - " COUNT(DISTINCT session_id) AS total,\n"+ - " COALESCE(SUM(COUNT(DISTINCT session_id)) OVER (), 0) AS total_count\n"+ - "FROM (\n%s) AS filtered_sessions\n"+ - "GROUP BY url_path\n"+ - "ORDER BY total DESC\n"+ - "LIMIT 200 OFFSET 0;", - main, + mainQuery := fmt.Sprintf( + "SELECT s.session_id AS session_id, s.%s AS %s FROM (%s) AS f INNER JOIN (%s) AS s ON s.session_id = f.session_id", + groupByColumn, groupByColumn, + eventSubQuery, + sessionsQuery, ) - return final, nil + finalQuery := fmt.Sprintf( + "SELECT COUNT(DISTINCT filtered_sessions.%s) OVER () AS main_count, "+ + "filtered_sessions.%s AS name, "+ + "COUNT(DISTINCT filtered_sessions.session_id) AS total, "+ + "(SELECT COUNT(DISTINCT session_id) FROM (%s) AS all_sessions) AS total_count "+ + "FROM (%s) AS filtered_sessions "+ + "GROUP BY filtered_sessions.%s "+ + "ORDER BY total DESC "+ + "LIMIT 0, 200;", + groupByColumn, + groupByColumn, + mainQuery, + mainQuery, + groupByColumn, + ) + + return finalQuery, nil } diff --git a/backend/pkg/analytics/charts/model.go b/backend/pkg/analytics/charts/model.go index b83790578..0e6aa3da4 100644 --- a/backend/pkg/analytics/charts/model.go +++ b/backend/pkg/analytics/charts/model.go @@ -68,6 +68,21 @@ type MetricPayload struct { Series []Series `json:"series"` } +type MetricOfTable string + +const ( + MetricOfTableBrowser MetricOfTable = "browser" + MetricOfTableDevice MetricOfTable = "device" + MetricOfTableCountry MetricOfTable = "country" + MetricOfTableUserId MetricOfTable = "userId" + MetricOfTableIssues MetricOfTable = "issues" + MetricOfTableLocation MetricOfTable = "location" + MetricOfTableSessions MetricOfTable = "sessions" + MetricOfTableErrors MetricOfTable = "errors" + MetricOfTableReferrer MetricOfTable = "referrer" + MetricOfTableFetch MetricOfTable = "fetch" +) + type FilterGroup struct { Filters []Filter `json:"filters"` EventsOrder EventOrder `json:"eventsOrder"` diff --git a/backend/pkg/analytics/charts/query.go b/backend/pkg/analytics/charts/query.go index 6ea789c8c..be8e2fe11 100644 --- a/backend/pkg/analytics/charts/query.go +++ b/backend/pkg/analytics/charts/query.go @@ -8,8 +8,9 @@ import ( type Payload struct { *MetricPayload - ProjectId int - UserId uint64 + GroupByColumn string // TODO remove this field + ProjectId int + UserId uint64 } type QueryBuilder interface { @@ -40,161 +41,6 @@ func partitionFilters(filters []Filter) (sessionFilters []Filter, eventFilters [ return } -func buildEventsWhere(filters []Filter, order EventOrder) (eventConditions []string, having string) { - basicEventTypes := "(" + - strings.Join([]string{ - fmt.Sprintf("%s = 'CLICK'", ColEventName), - fmt.Sprintf("%s = 'INPUT'", ColEventName), - fmt.Sprintf("%s = 'LOCATION'", ColEventName), - fmt.Sprintf("%s = 'CUSTOM'", ColEventName), - fmt.Sprintf("%s = 'REQUEST'", ColEventName), - }, " OR ") + ")" - - var seq []string - for _, f := range filters { - switch f.Type { - case FilterClick: - seq = append(seq, seqCond("CLICK", "selector", f)) - case FilterInput: - seq = append(seq, seqCond("INPUT", "label", f)) - case FilterLocation: - seq = append(seq, seqCond("LOCATION", "url_path", f)) - case FilterCustom: - seq = append(seq, seqCond("CUSTOM", "name", f)) - case FilterFetch: - seq = append(seq, seqFetchCond("REQUEST", f)) - case FilterFetchStatusCode: - seq = append(seq, seqCond("REQUEST", "status", f)) - default: - seq = append(seq, fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(string(f.Type)))) - } - } - eventConditions = []string{basicEventTypes} - - // then => sequenceMatch - // or => OR - // and => AND - switch order { - case EventOrderThen: - var pattern []string - for i := range seq { - pattern = append(pattern, fmt.Sprintf("(?%d)", i+1)) - } - having = fmt.Sprintf("sequenceMatch('%s')(\n%s,\n%s)", - strings.Join(pattern, ""), fmt.Sprintf("toUnixTimestamp(%s)", ColEventTime), strings.Join(seq, ",\n")) - case EventOrderAnd: - // build AND - having = strings.Join(seq, " AND ") - case EventOrderOr: - default: - // default => OR - var orParts []string - for _, p := range seq { - orParts = append(orParts, "("+p+")") - } - having = strings.Join(orParts, " OR ") - } - return eventConditions, having -} - -func buildSessionWhere(filters []Filter) []string { - var conds []string - for _, f := range filters { - switch f.Type { - case FilterUserCountry: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCountry, concatValues(f.Value))) - case FilterUserCity: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCity, concatValues(f.Value))) - case FilterUserState: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserState, concatValues(f.Value))) - case FilterUserId: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserID, concatValues(f.Value))) - case FilterUserAnonymousId: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserAnonymousID, concatValues(f.Value))) - case FilterUserOs: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserOS, concatValues(f.Value))) - case FilterUserBrowser: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserBrowser, concatValues(f.Value))) - case FilterUserDevice: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDevice, concatValues(f.Value))) - case FilterPlatform: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDeviceType, concatValues(f.Value))) - case FilterRevId: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColRevID, concatValues(f.Value))) - case FilterReferrer: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColBaseReferrer, concatValues(f.Value))) - case FilterDuration: - if len(f.Value) == 2 { - conds = append(conds, fmt.Sprintf("%s >= '%s'", ColDuration, f.Value[0])) - conds = append(conds, fmt.Sprintf("%s <= '%s'", ColDuration, f.Value[1])) - } - case FilterUtmSource: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmSource, concatValues(f.Value))) - case FilterUtmMedium: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmMedium, concatValues(f.Value))) - case FilterUtmCampaign: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmCampaign, concatValues(f.Value))) - case FilterMetadata: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColMetadata1, concatValues(f.Value))) - } - } - // adding /n to each condition for better readability, can be removed. - for i := range conds { - conds[i] += "\n" - } - return conds -} - -func concatValues(v []string) string { - return strings.Join(v, "") -} - -func seqCond(eventName, key string, f Filter) string { - op := parseOperator(f.Operator) - return fmt.Sprintf("(%s = '%s' AND JSONExtractString(toString(%s), '%s') %s '%s')", - ColEventName, strings.ToUpper(eventName), ColEventProperties, key, op, concatValues(f.Value)) -} - -func seqFetchCond(eventName string, f Filter) string { - w := []string{fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(eventName))} - var extras []string - for _, c := range f.Filters { - switch c.Type { - case FilterFetch: - if len(c.Value) > 0 { - extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventURLPath, concatValues(c.Value))) - } - case FilterFetchStatusCode: - if len(c.Value) > 0 { - extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventStatus, concatValues(c.Value))) - } - default: - // placeholder if needed - } - } - if len(extras) > 0 { - w = append(w, strings.Join(extras, " AND ")) - } - return "(" + strings.Join(w, " AND ") + ")" -} - -func parseOperator(op string) string { - // TODO implement this properly - switch strings.ToLower(op) { - case OperatorContains: - return "LIKE" - case OperatorStringIs, OperatorStringOn, "=", OperatorStringOnAny: - return "=" - case OperatorStringStartsWith: - return "LIKE" - case OperatorStringEndsWith: - // might interpret differently in real impl - return "=" - default: - return "=" - } -} - func buildEventConditions(filters []Filter) (conds, names []string) { for _, f := range filters { if f.IsEvent { diff --git a/backend/pkg/analytics/db/connector.go b/backend/pkg/analytics/db/connector.go index c06dfa998..62b31068c 100644 --- a/backend/pkg/analytics/db/connector.go +++ b/backend/pkg/analytics/db/connector.go @@ -22,6 +22,7 @@ type TableResponse struct { type Connector interface { Stop() error Query(query string) (driver.Rows, error) + QueryArgs(query string, args map[string]interface{}) (driver.Rows, error) } type connectorImpl struct { @@ -62,3 +63,13 @@ func (c *connectorImpl) Query(query string) (driver.Rows, error) { return rows, nil } + +func (c *connectorImpl) QueryArgs(query string, args map[string]interface{}) (driver.Rows, error) { + rows, err := c.conn.Query(context.Background(), query, args) + if err != nil { + return nil, err + } + //defer rows.Close() + + return rows, nil +}