feat(product_analytics): table of cards

This commit is contained in:
Shekar Siri 2025-04-22 12:54:43 +02:00
parent c077841b4e
commit 3c5844e4ad
4 changed files with 207 additions and 175 deletions

View file

@ -2,7 +2,6 @@ package charts
import (
"fmt"
"log"
"openreplay/backend/pkg/analytics/db"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/logger"
@ -39,12 +38,12 @@ func (s *chartsImpl) GetData(projectId int, userID uint64, req *MetricPayload) (
}
qb, err := NewQueryBuilder(payload)
if err != nil {
log.Fatalf("Error creating query builder: %v", err)
return nil, fmt.Errorf("error creating query builder: %v", err)
}
resp, err := qb.Execute(payload, s.chConn)
if err != nil {
log.Fatalf("Error building query: %v", err)
return nil, fmt.Errorf("error executing query: %v", err)
}
return resp, nil

View file

@ -2,10 +2,21 @@ package charts
import (
"fmt"
"log"
"openreplay/backend/pkg/analytics/db"
"strings"
)
var validMetricOfValues = map[MetricOfTable]struct{}{
MetricOfTableBrowser: {},
MetricOfTableDevice: {},
MetricOfTableCountry: {},
MetricOfTableUserId: {},
MetricOfTableLocation: {},
MetricOfTableReferrer: {},
MetricOfTableFetch: {},
}
type TableQueryBuilder struct{}
type TableValue struct {
@ -19,162 +30,181 @@ type TableResponse struct {
Values []TableValue `json:"values"`
}
const (
MetricFormatSessionCount = "sessionCount"
MetricFormatUserCount = "userCount"
nilUUIDString = "00000000-0000-0000-0000-000000000000"
)
var propertySelectorMap = map[string]string{
string(MetricOfTableBrowser): "main.$browser AS metric_value",
string(MetricOfTableDevice): "main.$device AS metric_value",
string(MetricOfTableCountry): "main.$country AS metric_value",
string(MetricOfTableReferrer): "main.$referrer AS metric_value",
}
func (t TableQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) {
// validate metricOf with MetricOfTable return error if empty or not supported
if p.MetricOf == "" {
return nil, fmt.Errorf("MetricOf is empty")
}
// Validate that p.MetricOf is one of the supported MetricOfTable types
isValidMetricOf := false
switch MetricOfTable(p.MetricOf) {
case MetricOfTableBrowser, MetricOfTableDevice, MetricOfTableCountry,
MetricOfTableUserId, MetricOfTableIssues, MetricOfTableLocation,
MetricOfTableSessions, MetricOfTableErrors, MetricOfTableReferrer,
MetricOfTableFetch:
isValidMetricOf = true
if _, ok := validMetricOfValues[MetricOfTable(p.MetricOf)]; !ok {
return nil, fmt.Errorf("invalid MetricOf value: %s", p.MetricOf)
}
if !isValidMetricOf {
return nil, fmt.Errorf("unsupported MetricOf type: %s", p.MetricOf)
metricFormat := p.MetricFormat
if metricFormat != MetricFormatSessionCount && metricFormat != MetricFormatUserCount {
metricFormat = MetricFormatSessionCount
}
query, err := t.buildQuery(p)
query, err := t.buildQuery(p, metricFormat)
if err != nil {
return nil, err
return nil, fmt.Errorf("error building query: %w", err)
}
rows, err := conn.Query(query)
if err != nil {
return nil, err
log.Printf("Error executing query: %s\nQuery: %s", err, query)
return nil, fmt.Errorf("error executing query: %w", err)
}
defer rows.Close()
var (
totalCount uint64
rowsCount uint64
values []TableValue
overallTotalMetricValues uint64
overallCount uint64
values []TableValue
firstRow = true
)
for rows.Next() {
var (
total uint64
name string
name string
valueSpecificCount uint64
tempOverallTotalMetricValues uint64
tempOverallCount uint64
)
if err := rows.Scan(&totalCount, &name, &total, &rowsCount); err != nil {
return nil, err
if err := rows.Scan(&tempOverallTotalMetricValues, &name, &valueSpecificCount, &tempOverallCount); err != nil {
return nil, fmt.Errorf("error scanning row: %w", err)
}
values = append(values, TableValue{Name: name, Total: total})
if firstRow {
overallTotalMetricValues = tempOverallTotalMetricValues
overallCount = tempOverallCount
firstRow = false
}
values = append(values, TableValue{Name: name, Total: valueSpecificCount})
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating rows: %w", err)
}
return &TableResponse{
Total: totalCount,
Count: rowsCount,
Total: overallTotalMetricValues,
Count: overallCount,
Values: values,
}, nil
}
func (t TableQueryBuilder) buildQuery(r Payload) (string, error) {
func (t TableQueryBuilder) buildQuery(r Payload, metricFormat string) (string, error) {
if len(r.Series) == 0 {
return "", fmt.Errorf("payload Series cannot be empty")
}
s := r.Series[0]
groupByColumn := r.MetricOf
if groupByColumn == "" {
var propertyName string
if r.MetricOf == "" {
return "", fmt.Errorf("MetricOf is empty")
}
originalMetricOf := r.MetricOf
propertyName = originalMetricOf
sessionFilters, eventFilters := partitionFilters(s.Filter.Filters)
eventFilters := s.Filter.Filters
eventConds, eventNames := buildEventConditions(eventFilters)
eventWhere := buildStaticEventWhere(r)
baseWhereConditions := []string{
fmt.Sprintf("main.created_at >= toDateTime(%d/1000)", r.StartTimestamp),
fmt.Sprintf("main.created_at <= toDateTime(%d/1000)", r.EndTimestamp),
"sessions.duration > 0",
}
if r.ProjectId > 0 {
baseWhereConditions = append(baseWhereConditions, fmt.Sprintf("main.project_id = %d", r.ProjectId))
}
var aggregationExpression string
var aggregationAlias = "aggregation_id"
var specificWhereConditions []string
if metricFormat == MetricFormatUserCount {
aggregationExpression = fmt.Sprintf("if(empty(sessions.user_id), toString(sessions.user_uuid), sessions.user_id)")
userExclusionCondition := fmt.Sprintf("NOT (empty(sessions.user_id) AND (sessions.user_uuid IS NULL OR sessions.user_uuid = '%s'))", nilUUIDString)
specificWhereConditions = append(specificWhereConditions, userExclusionCondition)
} else {
aggregationExpression = "main.session_id"
}
var propertySelector string
var ok bool
propertySelector, ok = propertySelectorMap[originalMetricOf]
if !ok {
propertySelector = fmt.Sprintf("JSONExtractString(toString(main.$properties), '%s') AS metric_value", propertyName)
}
allWhereConditions := baseWhereConditions
if len(eventConds) > 0 {
eventWhere += " AND " + strings.Join(eventConds, " AND ")
allWhereConditions = append(allWhereConditions, eventConds...)
}
if len(eventNames) > 0 {
eventWhere += " AND main.`$event_name` IN (" + buildInClause(eventNames) + ")"
allWhereConditions = append(allWhereConditions, "main.`$event_name` IN ("+buildInClause(eventNames)+")")
}
allWhereConditions = append(allWhereConditions, specificWhereConditions...)
whereClause := strings.Join(allWhereConditions, " AND ")
sessionConds := buildSessionConditions(sessionFilters)
sessWhere, _ := buildStaticSessionWhere(r, sessionConds)
// Build event subquery
var eventSubQuery string
if len(eventConds) > 0 {
// With HAVING clause
var pattern strings.Builder
for i := 0; i < len(eventConds); i++ {
fmt.Fprintf(&pattern, "(?%d)", i+1)
}
var args strings.Builder
args.WriteString("toDateTime(main.created_at)")
for _, cond := range eventConds {
args.WriteString(",\n ")
args.WriteString(cond)
}
eventSubQuery = fmt.Sprintf(
"SELECT main.session_id, MIN(main.created_at) AS first_event_ts, MAX(main.created_at) AS last_event_ts "+
"FROM %s AS main "+
"WHERE %s "+
"AND main.session_id IN (SELECT s.session_id FROM %s AS s WHERE %s) "+
"GROUP BY main.session_id "+
"HAVING sequenceMatch('%s')(%s)",
TableEvents,
eventWhere,
TableSessions,
sessWhere,
pattern.String(),
args.String(),
)
} else {
// No HAVING clause needed
eventSubQuery = fmt.Sprintf(
"SELECT main.session_id, MIN(main.created_at) AS first_event_ts, MAX(main.created_at) AS last_event_ts "+
"FROM %s AS main "+
"WHERE %s "+
"AND main.session_id IN (SELECT s.session_id FROM %s AS s WHERE %s) "+
"GROUP BY main.session_id",
TableEvents,
eventWhere,
TableSessions,
sessWhere,
)
limit := r.Limit
if limit <= 0 {
limit = 10
}
page := r.Page
if page <= 0 {
page = 1
}
offset := (page - 1) * limit
limitClause := fmt.Sprintf("LIMIT %d OFFSET %d", limit, offset)
sessionsQuery := fmt.Sprintf(
"SELECT * FROM %s AS s WHERE s.project_id = %d AND isNotNull(s.duration)%s AND s.datetime >= toDateTime(%d/1000) AND s.datetime <= toDateTime(%d/1000)",
TableSessions,
r.ProjectId,
func() string {
if sessWhere != "" {
return " AND " + sessWhere
}
return ""
}(),
r.StartTimestamp,
r.EndTimestamp,
)
query := fmt.Sprintf(`
WITH filtered_data AS (
SELECT DISTINCT
%s,
%s AS %s
FROM product_analytics.events AS main
INNER JOIN experimental.sessions AS sessions ON main.session_id = sessions.session_id
WHERE %s
),
grouped_values AS (
SELECT
metric_value AS name,
countDistinct(%s) AS value_count
FROM filtered_data
WHERE name IS NOT NULL AND name != ''
GROUP BY name
)
SELECT
(SELECT count() FROM grouped_values) AS overall_total_metric_values,
name,
value_count,
(SELECT countDistinct(%s) FROM filtered_data) AS overall_total_count
FROM grouped_values
ORDER BY value_count DESC
%s
`,
propertySelector,
aggregationExpression,
aggregationAlias,
whereClause,
aggregationAlias,
aggregationAlias,
limitClause)
mainQuery := fmt.Sprintf(
"SELECT s.session_id AS session_id, s.%s AS %s FROM (%s) AS f INNER JOIN (%s) AS s ON s.session_id = f.session_id",
groupByColumn, groupByColumn,
eventSubQuery,
sessionsQuery,
)
finalQuery := fmt.Sprintf(
"SELECT COUNT(DISTINCT filtered_sessions.%s) OVER () AS main_count, "+
"filtered_sessions.%s AS name, "+
"COUNT(DISTINCT filtered_sessions.session_id) AS total, "+
"(SELECT COUNT(DISTINCT session_id) FROM (%s) AS all_sessions) AS total_count "+
"FROM (%s) AS filtered_sessions "+
"GROUP BY filtered_sessions.%s "+
"ORDER BY total DESC "+
"LIMIT 0, 200;",
groupByColumn,
groupByColumn,
mainQuery,
mainQuery,
groupByColumn,
)
return finalQuery, nil
return query, nil
}

View file

@ -66,21 +66,24 @@ type MetricPayload struct {
ViewType string `json:"viewType"`
Name string `json:"name"`
Series []Series `json:"series"`
Limit int `json:"limit"`
Page int `json:"page"`
}
type MetricOfTable string
const (
MetricOfTableBrowser MetricOfTable = "browser"
MetricOfTableDevice MetricOfTable = "device"
MetricOfTableCountry MetricOfTable = "country"
MetricOfTableUserId MetricOfTable = "userId"
MetricOfTableIssues MetricOfTable = "issues"
MetricOfTableLocation MetricOfTable = "location"
MetricOfTableSessions MetricOfTable = "sessions"
MetricOfTableErrors MetricOfTable = "errors"
MetricOfTableLocation MetricOfTable = "url_path" // TOP Pages
MetricOfTableBrowser MetricOfTable = "user_browser"
MetricOfTableReferrer MetricOfTable = "referrer"
MetricOfTableUserId MetricOfTable = "user_id"
MetricOfTableCountry MetricOfTable = "user_country"
MetricOfTableDevice MetricOfTable = "user_device"
MetricOfTableFetch MetricOfTable = "fetch"
//MetricOfTableIssues MetricOfTable = "issues"
//MetricOfTableSessions MetricOfTable = "sessions"
//MetricOfTableErrors MetricOfTable = "errors"
)
type FilterGroup struct {

View file

@ -43,60 +43,60 @@ func partitionFilters(filters []Filter) (sessionFilters []Filter, eventFilters [
func buildEventConditions(filters []Filter) (conds, names []string) {
for _, f := range filters {
if f.IsEvent {
switch f.Type {
case FilterClick:
c := buildCond("JSONExtractString(toString(main.`$properties`), 'label')", f.Value, "is")
if c != "" {
conds = append(conds, c)
}
names = append(names, "CLICK")
case FilterInput:
c := buildCond("JSONExtractString(toString(main.`$properties`), 'label')", f.Value, f.Operator)
if c != "" {
conds = append(conds, c)
}
names = append(names, "INPUT")
case FilterLocation:
c := buildCond("JSONExtractString(toString(main.`$properties`), 'url_path')", f.Value, f.Operator)
if c != "" {
conds = append(conds, c)
}
names = append(names, "LOCATION")
case FilterCustom:
c := buildCond("JSONExtractString(toString(main.`$properties`), 'name')", f.Value, f.Operator)
if c != "" {
conds = append(conds, c)
}
names = append(names, "CUSTOM")
case FilterFetch:
var fetchConds []string
for _, nf := range f.Filters {
switch nf.Type {
case "fetchUrl":
c := buildCond("JSONExtractString(toString(main.`$properties`), 'url_path')", nf.Value, f.Operator)
if c != "" {
fetchConds = append(fetchConds, c)
}
case "fetchStatusCode":
c := buildCond("JSONExtractFloat(toString(main.`$properties`), 'status')", nf.Value, f.Operator)
if c != "" {
fetchConds = append(fetchConds, c)
}
//if f.IsEvent {
switch f.Type {
case FilterClick:
c := buildCond("JSONExtractString(toString(main.`$properties`), 'label')", f.Value, "is")
if c != "" {
conds = append(conds, c)
}
names = append(names, "CLICK")
case FilterInput:
c := buildCond("JSONExtractString(toString(main.`$properties`), 'label')", f.Value, f.Operator)
if c != "" {
conds = append(conds, c)
}
names = append(names, "INPUT")
case FilterLocation:
c := buildCond("JSONExtractString(toString(main.`$properties`), 'url_path')", f.Value, f.Operator)
if c != "" {
conds = append(conds, c)
}
names = append(names, "LOCATION")
case FilterCustom:
c := buildCond("JSONExtractString(toString(main.`$properties`), 'name')", f.Value, f.Operator)
if c != "" {
conds = append(conds, c)
}
names = append(names, "CUSTOM")
case FilterFetch:
var fetchConds []string
for _, nf := range f.Filters {
switch nf.Type {
case "fetchUrl":
c := buildCond("JSONExtractString(toString(main.`$properties`), 'url_path')", nf.Value, f.Operator)
if c != "" {
fetchConds = append(fetchConds, c)
}
case "fetchStatusCode":
c := buildCond("JSONExtractFloat(toString(main.`$properties`), 'status')", nf.Value, f.Operator)
if c != "" {
fetchConds = append(fetchConds, c)
}
}
if len(fetchConds) > 0 {
conds = append(conds, strings.Join(fetchConds, " AND "))
}
names = append(names, "REQUEST")
case FilterTag:
c := buildCond("JSONExtractString(toString(main.`$properties`), 'tag')", f.Value, f.Operator)
if c != "" {
conds = append(conds, c)
}
names = append(names, "TAG")
}
if len(fetchConds) > 0 {
conds = append(conds, strings.Join(fetchConds, " AND "))
}
names = append(names, "REQUEST")
case FilterTag:
c := buildCond("JSONExtractString(toString(main.`$properties`), 'tag')", f.Value, f.Operator)
if c != "" {
conds = append(conds, c)
}
names = append(names, "TAG")
}
//}
}
return
}