diff --git a/backend/pkg/analytics/charts/metric_timeseries.go b/backend/pkg/analytics/charts/metric_timeseries.go index 976b42b1b..44aee857c 100644 --- a/backend/pkg/analytics/charts/metric_timeseries.go +++ b/backend/pkg/analytics/charts/metric_timeseries.go @@ -4,75 +4,109 @@ import ( "fmt" "log" "openreplay/backend/pkg/analytics/db" + "sort" "strings" ) type TimeSeriesQueryBuilder struct{} func (t TimeSeriesQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) { - query, err := t.buildQuery(p) - if err != nil { - log.Fatalf("Error building query: %v", err) - return nil, err - } + consolidated := map[uint64]map[string]uint64{} - rows, err := conn.Query(query) - if err != nil { - log.Fatalf("Error executing query: %v", err) - return nil, err - } - defer rows.Close() - - var results []DataPoint - for rows.Next() { - var res DataPoint - if err := rows.Scan(&res.Timestamp, &res.Count); err != nil { + for _, s := range p.Series { + query, err := t.buildQuery(p, s) + if err != nil { + log.Fatalf("Error building query for series %s: %v", s.Name, err) return nil, err } - //sum += res.Count - results = append(results, res) + + rows, err := conn.Query(query) + if err != nil { + log.Fatalf("Error executing query for series %s: %v", s.Name, err) + return nil, err + } + + var results []DataPoint + for rows.Next() { + var res DataPoint + if err := rows.Scan(&res.Timestamp, &res.Count); err != nil { + rows.Close() + return nil, err + } + results = append(results, res) + } + rows.Close() + + filled := FillMissingDataPoints(p.StartTimestamp, p.EndTimestamp, p.Density, DataPoint{}, results, 1000) + for _, dp := range filled { + if _, ok := consolidated[dp.Timestamp]; !ok { + consolidated[dp.Timestamp] = map[string]uint64{} + } + consolidated[dp.Timestamp][s.Name] = dp.Count + } } - filled := FillMissingDataPoints(p.StartTimestamp, p.EndTimestamp, p.Density, DataPoint{}, results, 1000) - return filled, nil + var timestamps []uint64 + for ts := range consolidated { + timestamps = append(timestamps, ts) + } + sort.Slice(timestamps, func(i, j int) bool { return timestamps[i] < timestamps[j] }) + + var finalResults []map[string]interface{} + for _, ts := range timestamps { + row := map[string]interface{}{"timestamp": ts} + for _, s := range p.Series { + if count, ok := consolidated[ts][s.Name]; ok { + row[s.Name] = count + } else { + row[s.Name] = uint64(0) + } + } + finalResults = append(finalResults, row) + } + + return finalResults, nil } -func (t TimeSeriesQueryBuilder) buildQuery(p Payload) (string, error) { - query := "" +func (t TimeSeriesQueryBuilder) buildQuery(p Payload, s Series) (string, error) { + var query string switch p.MetricOf { case "sessionCount": - query = t.buildSessionCountQuery(p) + query = t.buildSessionCountQuery(p, s) case "userCount": - query = t.buildUserCountQuery(p) + query = t.buildUserCountQuery(p, s) default: query = "" } return query, nil } -func (TimeSeriesQueryBuilder) buildSessionCountQuery(p Payload) string { - eventConds, eventNames := buildEventConditions(p.Series[0].Filter.Filters) - sessionConds := buildSessionConditions(p.Series[0].Filter.Filters) +func (TimeSeriesQueryBuilder) buildSessionCountQuery(p Payload, s Series) string { + eventConds, eventNames := buildEventConditions(s.Filter.Filters) + sessionConds := buildSessionConditions(s.Filter.Filters) staticEvt := buildStaticEventWhere(p) sessWhere, sessJoin := buildStaticSessionWhere(p, sessionConds) - eventsSubQuery := buildEventsSubQuery(eventConds, eventNames, staticEvt, sessWhere, sessJoin) - mainQuery := buildMainQuery(p, eventsSubQuery) + eventsSubQuery := buildEventsSubQuery("sessionCount", eventConds, eventNames, staticEvt, sessWhere, sessJoin) + mainQuery := buildMainQuery(p, eventsSubQuery, "sessionCount") return mainQuery } -func (TimeSeriesQueryBuilder) buildUserCountQuery(p Payload) string { - eventConds, eventNames := buildEventConditions(p.Series[0].Filter.Filters) - sessionConds := buildSessionConditions(p.Series[0].Filter.Filters) +func (TimeSeriesQueryBuilder) buildUserCountQuery(p Payload, s Series) string { + eventConds, eventNames := buildEventConditions(s.Filter.Filters) + sessionConds := buildSessionConditions(s.Filter.Filters) staticEvt := buildStaticEventWhere(p) sessWhere, sessJoin := buildStaticSessionWhere(p, sessionConds) - eventsSubQuery := buildEventsSubQuery(eventConds, eventNames, staticEvt, sessWhere, sessJoin) - mainQuery := buildMainQuery(p, eventsSubQuery) + eventsSubQuery := buildEventsSubQuery("userCount", eventConds, eventNames, staticEvt, sessWhere, sessJoin) + mainQuery := buildMainQuery(p, eventsSubQuery, "userCount") return mainQuery } -func buildEventsSubQuery(eventConds, eventNames []string, staticEvt, sessWhere, sessJoin string) string { +func buildEventsSubQuery(metric string, eventConds, eventNames []string, staticEvt, sessWhere, sessJoin string) string { if len(eventConds) == 0 && len(eventNames) == 0 { - return fmt.Sprintf(noEventSubQueryTpl, sessJoin) + if metric == "sessionCount" { + return fmt.Sprintf(sessionNoFiltersSubQueryTpl, sessJoin) + } + return fmt.Sprintf(noFiltersSubQueryTpl, sessJoin) } var evtNameClause string var unique []string @@ -92,15 +126,21 @@ func buildEventsSubQuery(eventConds, eventNames []string, staticEvt, sessWhere, if len(eventConds) > 0 { evtWhere += " AND " + strings.Join(eventConds, " AND ") } - return fmt.Sprintf(eventsSubQueryTpl, evtWhere, sessWhere, evtNameClause, having, sessJoin) + if metric == "sessionCount" { + return fmt.Sprintf(sessionSubQueryTpl, evtWhere, sessWhere, evtNameClause, having, sessJoin) + } + return fmt.Sprintf(subQueryTpl, evtWhere, sessWhere, evtNameClause, having, sessJoin) } -func buildMainQuery(p Payload, subQuery string) string { +func buildMainQuery(p Payload, subQuery, metric string) string { step := int(getStepSize(p.StartTimestamp, p.EndTimestamp, p.Density, false, 1000)) + if metric == "sessionCount" { + return fmt.Sprintf(sessionMainQueryTpl, p.StartTimestamp, p.EndTimestamp, step, subQuery, step) + } return fmt.Sprintf(mainQueryTpl, p.StartTimestamp, p.EndTimestamp, step, subQuery, step) } -var eventsSubQueryTpl = ` +var subQueryTpl = ` SELECT multiIf( s.user_id IS NOT NULL AND s.user_id != '', s.user_id, s.user_anonymous_id IS NOT NULL AND s.user_anonymous_id != '', s.user_anonymous_id, @@ -127,7 +167,7 @@ FROM ( ) AS s ON (s.session_id = f.session_id) ` -var noEventSubQueryTpl = ` +var noFiltersSubQueryTpl = ` SELECT multiIf( s.user_id IS NOT NULL AND s.user_id != '', s.user_id, s.user_anonymous_id IS NOT NULL AND s.user_anonymous_id != '', s.user_anonymous_id, @@ -137,6 +177,37 @@ FROM experimental.sessions AS s WHERE %s ` +var sessionSubQueryTpl = ` +SELECT s.session_id AS session_id, + s.datetime AS datetime +FROM ( + SELECT main.session_id, + MIN(main.created_at) AS first_event_ts, + MAX(main.created_at) AS last_event_ts + FROM product_analytics.events AS main + WHERE %s + AND main.session_id IN ( + SELECT s.session_id + FROM experimental.sessions AS s + WHERE %s + ) + %s + GROUP BY session_id + %s + INNER JOIN ( + SELECT * + FROM experimental.sessions AS s + WHERE %s + ) AS s ON (s.session_id = f.session_id) +` + +var sessionNoFiltersSubQueryTpl = ` +SELECT s.session_id AS session_id, + s.datetime AS datetime +FROM experimental.sessions AS s +WHERE %s +` + var mainQueryTpl = ` SELECT gs.generate_series AS timestamp, COALESCE(COUNT(DISTINCT processed_sessions.user_id), 0) AS count @@ -149,3 +220,16 @@ WHERE processed_sessions.datetime >= toDateTime(timestamp / 1000) GROUP BY timestamp ORDER BY timestamp; ` + +var sessionMainQueryTpl = ` +SELECT gs.generate_series AS timestamp, + COALESCE(COUNT(DISTINCT processed_sessions.session_id), 0) AS count +FROM generate_series(%d, %d, %d) AS gs +LEFT JOIN ( + %s +) AS processed_sessions ON (TRUE) +WHERE processed_sessions.datetime >= toDateTime(timestamp / 1000) + AND processed_sessions.datetime < toDateTime((timestamp + %d) / 1000) +GROUP BY timestamp +ORDER BY timestamp; +`