feat(analytics): table charts wip

This commit is contained in:
Shekar Siri 2025-02-26 12:43:54 +01:00
parent 6ad249bf6e
commit 8711648ac7
4 changed files with 186 additions and 222 deletions

View file

@ -8,81 +8,173 @@ import (
type TableQueryBuilder struct{}
type TableValue struct {
Name string `json:"name"`
Total uint64 `json:"total"`
}
type TableResponse struct {
Total uint64 `json:"total"`
Count uint64 `json:"count"`
Values []TableValue `json:"values"`
}
func (t TableQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) {
return t.buildQuery(p)
// validate metricOf with MetricOfTable return error if empty or not supported
if p.MetricOf == "" {
return nil, fmt.Errorf("MetricOf is empty")
}
// Validate that p.MetricOf is one of the supported MetricOfTable types
isValidMetricOf := false
switch MetricOfTable(p.MetricOf) {
case MetricOfTableBrowser, MetricOfTableDevice, MetricOfTableCountry,
MetricOfTableUserId, MetricOfTableIssues, MetricOfTableLocation,
MetricOfTableSessions, MetricOfTableErrors, MetricOfTableReferrer,
MetricOfTableFetch:
isValidMetricOf = true
}
if !isValidMetricOf {
return nil, fmt.Errorf("unsupported MetricOf type: %s", p.MetricOf)
}
query, err := t.buildQuery(p)
if err != nil {
return nil, err
}
rows, err := conn.Query(query)
if err != nil {
return nil, err
}
defer rows.Close()
var (
totalCount uint64
rowsCount uint64
values []TableValue
)
for rows.Next() {
var (
total uint64
name string
)
if err := rows.Scan(&totalCount, &name, &total, &rowsCount); err != nil {
return nil, err
}
values = append(values, TableValue{Name: name, Total: total})
}
return &TableResponse{
Total: totalCount,
Count: rowsCount,
Values: values,
}, nil
}
func (t TableQueryBuilder) buildQuery(r Payload) (string, error) {
s := r.Series[0]
sessionFilters, eventFilters := partitionFilters(s.Filter.Filters)
sessionWhere := buildSessionWhere(sessionFilters)
eventWhere, seqHaving := buildEventsWhere(eventFilters, s.Filter.EventsOrder)
subQuery := fmt.Sprintf(
"SELECT %s,\n"+
" MIN(%s) AS first_event_ts,\n"+
" MAX(%s) AS last_event_ts\n"+
"FROM %s AS main\n"+
"WHERE main.project_id = %%(project_id)s\n"+
" AND %s >= toDateTime(%%(start_time)s/1000)\n"+
" AND %s <= toDateTime(%%(end_time)s/1000)\n"+
" AND (%s)\n"+
"GROUP BY %s\n"+
"HAVING %s",
ColEventSessionID,
ColEventTime,
ColEventTime,
TableEvents,
ColEventTime,
ColEventTime,
strings.Join(eventWhere, " OR "),
ColEventSessionID,
seqHaving,
)
joinQuery := fmt.Sprintf(
"SELECT *\n"+
"FROM %s AS s\n"+
"INNER JOIN (\n"+
" SELECT DISTINCT ev.session_id, ev.`$current_url` AS url_path\n"+
" FROM %s AS ev\n"+
" WHERE ev.created_at >= toDateTime(%%(start_time)s/1000)\n"+
" AND ev.created_at <= toDateTime(%%(end_time)s/1000)\n"+
" AND ev.project_id = %%(project_id)s\n"+
" AND ev.`$event_name` = 'LOCATION'\n"+
") AS extra_event USING (session_id)\n"+
"WHERE s.project_id = %%(project_id)s\n"+
" AND isNotNull(s.duration)\n"+
" AND s.datetime >= toDateTime(%%(start_time)s/1000)\n"+
" AND s.datetime <= toDateTime(%%(end_time)s/1000)\n",
TableSessions,
TableEvents,
)
if len(sessionWhere) > 0 {
joinQuery += " AND " + strings.Join(sessionWhere, " AND ") + "\n"
groupByColumn := r.MetricOf
if groupByColumn == "" {
return "", fmt.Errorf("MetricOf is empty")
}
main := fmt.Sprintf(
"SELECT s.session_id AS session_id, s.url_path\n"+
"FROM (\n%s\n) AS f\n"+
"INNER JOIN (\n%s) AS s\n"+
" ON (s.session_id = f.session_id)\n",
subQuery,
joinQuery,
sessionFilters, eventFilters := partitionFilters(s.Filter.Filters)
eventConds, eventNames := buildEventConditions(eventFilters)
eventWhere := buildStaticEventWhere(r)
if len(eventConds) > 0 {
eventWhere += " AND " + strings.Join(eventConds, " AND ")
}
if len(eventNames) > 0 {
eventWhere += " AND main.`$event_name` IN (" + buildInClause(eventNames) + ")"
}
sessionConds := buildSessionConditions(sessionFilters)
sessWhere, _ := buildStaticSessionWhere(r, sessionConds)
// Build event subquery
var eventSubQuery string
if len(eventConds) > 0 {
// With HAVING clause
var pattern strings.Builder
for i := 0; i < len(eventConds); i++ {
fmt.Fprintf(&pattern, "(?%d)", i+1)
}
var args strings.Builder
args.WriteString("toDateTime(main.created_at)")
for _, cond := range eventConds {
args.WriteString(",\n ")
args.WriteString(cond)
}
eventSubQuery = fmt.Sprintf(
"SELECT main.session_id, MIN(main.created_at) AS first_event_ts, MAX(main.created_at) AS last_event_ts "+
"FROM %s AS main "+
"WHERE %s "+
"AND main.session_id IN (SELECT s.session_id FROM %s AS s WHERE %s) "+
"GROUP BY main.session_id "+
"HAVING sequenceMatch('%s')(%s)",
TableEvents,
eventWhere,
TableSessions,
sessWhere,
pattern.String(),
args.String(),
)
} else {
// No HAVING clause needed
eventSubQuery = fmt.Sprintf(
"SELECT main.session_id, MIN(main.created_at) AS first_event_ts, MAX(main.created_at) AS last_event_ts "+
"FROM %s AS main "+
"WHERE %s "+
"AND main.session_id IN (SELECT s.session_id FROM %s AS s WHERE %s) "+
"GROUP BY main.session_id",
TableEvents,
eventWhere,
TableSessions,
sessWhere,
)
}
sessionsQuery := fmt.Sprintf(
"SELECT * FROM %s AS s WHERE s.project_id = %d AND isNotNull(s.duration)%s AND s.datetime >= toDateTime(%d/1000) AND s.datetime <= toDateTime(%d/1000)",
TableSessions,
r.ProjectId,
func() string {
if sessWhere != "" {
return " AND " + sessWhere
}
return ""
}(),
r.StartTimestamp,
r.EndTimestamp,
)
final := fmt.Sprintf(
"SELECT COUNT(DISTINCT url_path) OVER () AS main_count,\n"+
" url_path AS name,\n"+
" COUNT(DISTINCT session_id) AS total,\n"+
" COALESCE(SUM(COUNT(DISTINCT session_id)) OVER (), 0) AS total_count\n"+
"FROM (\n%s) AS filtered_sessions\n"+
"GROUP BY url_path\n"+
"ORDER BY total DESC\n"+
"LIMIT 200 OFFSET 0;",
main,
mainQuery := fmt.Sprintf(
"SELECT s.session_id AS session_id, s.%s AS %s FROM (%s) AS f INNER JOIN (%s) AS s ON s.session_id = f.session_id",
groupByColumn, groupByColumn,
eventSubQuery,
sessionsQuery,
)
return final, nil
finalQuery := fmt.Sprintf(
"SELECT COUNT(DISTINCT filtered_sessions.%s) OVER () AS main_count, "+
"filtered_sessions.%s AS name, "+
"COUNT(DISTINCT filtered_sessions.session_id) AS total, "+
"(SELECT COUNT(DISTINCT session_id) FROM (%s) AS all_sessions) AS total_count "+
"FROM (%s) AS filtered_sessions "+
"GROUP BY filtered_sessions.%s "+
"ORDER BY total DESC "+
"LIMIT 0, 200;",
groupByColumn,
groupByColumn,
mainQuery,
mainQuery,
groupByColumn,
)
return finalQuery, nil
}

View file

@ -68,6 +68,21 @@ type MetricPayload struct {
Series []Series `json:"series"`
}
type MetricOfTable string
const (
MetricOfTableBrowser MetricOfTable = "browser"
MetricOfTableDevice MetricOfTable = "device"
MetricOfTableCountry MetricOfTable = "country"
MetricOfTableUserId MetricOfTable = "userId"
MetricOfTableIssues MetricOfTable = "issues"
MetricOfTableLocation MetricOfTable = "location"
MetricOfTableSessions MetricOfTable = "sessions"
MetricOfTableErrors MetricOfTable = "errors"
MetricOfTableReferrer MetricOfTable = "referrer"
MetricOfTableFetch MetricOfTable = "fetch"
)
type FilterGroup struct {
Filters []Filter `json:"filters"`
EventsOrder EventOrder `json:"eventsOrder"`

View file

@ -8,8 +8,9 @@ import (
type Payload struct {
*MetricPayload
ProjectId int
UserId uint64
GroupByColumn string // TODO remove this field
ProjectId int
UserId uint64
}
type QueryBuilder interface {
@ -40,161 +41,6 @@ func partitionFilters(filters []Filter) (sessionFilters []Filter, eventFilters [
return
}
func buildEventsWhere(filters []Filter, order EventOrder) (eventConditions []string, having string) {
basicEventTypes := "(" +
strings.Join([]string{
fmt.Sprintf("%s = 'CLICK'", ColEventName),
fmt.Sprintf("%s = 'INPUT'", ColEventName),
fmt.Sprintf("%s = 'LOCATION'", ColEventName),
fmt.Sprintf("%s = 'CUSTOM'", ColEventName),
fmt.Sprintf("%s = 'REQUEST'", ColEventName),
}, " OR ") + ")"
var seq []string
for _, f := range filters {
switch f.Type {
case FilterClick:
seq = append(seq, seqCond("CLICK", "selector", f))
case FilterInput:
seq = append(seq, seqCond("INPUT", "label", f))
case FilterLocation:
seq = append(seq, seqCond("LOCATION", "url_path", f))
case FilterCustom:
seq = append(seq, seqCond("CUSTOM", "name", f))
case FilterFetch:
seq = append(seq, seqFetchCond("REQUEST", f))
case FilterFetchStatusCode:
seq = append(seq, seqCond("REQUEST", "status", f))
default:
seq = append(seq, fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(string(f.Type))))
}
}
eventConditions = []string{basicEventTypes}
// then => sequenceMatch
// or => OR
// and => AND
switch order {
case EventOrderThen:
var pattern []string
for i := range seq {
pattern = append(pattern, fmt.Sprintf("(?%d)", i+1))
}
having = fmt.Sprintf("sequenceMatch('%s')(\n%s,\n%s)",
strings.Join(pattern, ""), fmt.Sprintf("toUnixTimestamp(%s)", ColEventTime), strings.Join(seq, ",\n"))
case EventOrderAnd:
// build AND
having = strings.Join(seq, " AND ")
case EventOrderOr:
default:
// default => OR
var orParts []string
for _, p := range seq {
orParts = append(orParts, "("+p+")")
}
having = strings.Join(orParts, " OR ")
}
return eventConditions, having
}
func buildSessionWhere(filters []Filter) []string {
var conds []string
for _, f := range filters {
switch f.Type {
case FilterUserCountry:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCountry, concatValues(f.Value)))
case FilterUserCity:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCity, concatValues(f.Value)))
case FilterUserState:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserState, concatValues(f.Value)))
case FilterUserId:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserID, concatValues(f.Value)))
case FilterUserAnonymousId:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserAnonymousID, concatValues(f.Value)))
case FilterUserOs:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserOS, concatValues(f.Value)))
case FilterUserBrowser:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserBrowser, concatValues(f.Value)))
case FilterUserDevice:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDevice, concatValues(f.Value)))
case FilterPlatform:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDeviceType, concatValues(f.Value)))
case FilterRevId:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColRevID, concatValues(f.Value)))
case FilterReferrer:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColBaseReferrer, concatValues(f.Value)))
case FilterDuration:
if len(f.Value) == 2 {
conds = append(conds, fmt.Sprintf("%s >= '%s'", ColDuration, f.Value[0]))
conds = append(conds, fmt.Sprintf("%s <= '%s'", ColDuration, f.Value[1]))
}
case FilterUtmSource:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmSource, concatValues(f.Value)))
case FilterUtmMedium:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmMedium, concatValues(f.Value)))
case FilterUtmCampaign:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmCampaign, concatValues(f.Value)))
case FilterMetadata:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColMetadata1, concatValues(f.Value)))
}
}
// adding /n to each condition for better readability, can be removed.
for i := range conds {
conds[i] += "\n"
}
return conds
}
func concatValues(v []string) string {
return strings.Join(v, "")
}
func seqCond(eventName, key string, f Filter) string {
op := parseOperator(f.Operator)
return fmt.Sprintf("(%s = '%s' AND JSONExtractString(toString(%s), '%s') %s '%s')",
ColEventName, strings.ToUpper(eventName), ColEventProperties, key, op, concatValues(f.Value))
}
func seqFetchCond(eventName string, f Filter) string {
w := []string{fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(eventName))}
var extras []string
for _, c := range f.Filters {
switch c.Type {
case FilterFetch:
if len(c.Value) > 0 {
extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventURLPath, concatValues(c.Value)))
}
case FilterFetchStatusCode:
if len(c.Value) > 0 {
extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventStatus, concatValues(c.Value)))
}
default:
// placeholder if needed
}
}
if len(extras) > 0 {
w = append(w, strings.Join(extras, " AND "))
}
return "(" + strings.Join(w, " AND ") + ")"
}
func parseOperator(op string) string {
// TODO implement this properly
switch strings.ToLower(op) {
case OperatorContains:
return "LIKE"
case OperatorStringIs, OperatorStringOn, "=", OperatorStringOnAny:
return "="
case OperatorStringStartsWith:
return "LIKE"
case OperatorStringEndsWith:
// might interpret differently in real impl
return "="
default:
return "="
}
}
func buildEventConditions(filters []Filter) (conds, names []string) {
for _, f := range filters {
if f.IsEvent {

View file

@ -22,6 +22,7 @@ type TableResponse struct {
type Connector interface {
Stop() error
Query(query string) (driver.Rows, error)
QueryArgs(query string, args map[string]interface{}) (driver.Rows, error)
}
type connectorImpl struct {
@ -62,3 +63,13 @@ func (c *connectorImpl) Query(query string) (driver.Rows, error) {
return rows, nil
}
func (c *connectorImpl) QueryArgs(query string, args map[string]interface{}) (driver.Rows, error) {
rows, err := c.conn.Query(context.Background(), query, args)
if err != nil {
return nil, err
}
//defer rows.Close()
return rows, nil
}