diff --git a/backend/cmd/analytics/main.go b/backend/cmd/analytics/main.go index 8a7b95c29..1a4b099dd 100644 --- a/backend/cmd/analytics/main.go +++ b/backend/cmd/analytics/main.go @@ -2,71 +2,49 @@ package main import ( "context" - "os" - "os/signal" - "syscall" - + analyticsConfig "openreplay/backend/internal/config/analytics" + "openreplay/backend/pkg/analytics" + "openreplay/backend/pkg/analytics/db" + "openreplay/backend/pkg/db/postgres/pool" "openreplay/backend/pkg/logger" + "openreplay/backend/pkg/metrics" + analyticsMetrics "openreplay/backend/pkg/metrics/analytics" + databaseMetrics "openreplay/backend/pkg/metrics/database" + "openreplay/backend/pkg/metrics/web" + "openreplay/backend/pkg/server" + "openreplay/backend/pkg/server/api" ) func main() { ctx := context.Background() log := logger.New() - log.Info(ctx, "Cacher service started") + cfg := analyticsConfig.New(log) + webMetrics := web.New("analytics") + metrics.New(log, append(webMetrics.List(), append(analyticsMetrics.List(), databaseMetrics.List()...)...)) - sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - - for { - select { - case sig := <-sigchan: - log.Error(ctx, "Caught signal %v: terminating", sig) - os.Exit(0) - } + pgConn, err := pool.New(cfg.Postgres.String()) + if err != nil { + log.Fatal(ctx, "can't init postgres connection: %s", err) } -} + defer pgConn.Close() -// -//import ( -// "context" -// -// analyticsConfig "openreplay/backend/internal/config/analytics" -// "openreplay/backend/pkg/analytics" -// "openreplay/backend/pkg/db/postgres/pool" -// "openreplay/backend/pkg/logger" -// "openreplay/backend/pkg/metrics" -// "openreplay/backend/pkg/metrics/database" -// "openreplay/backend/pkg/metrics/web" -// "openreplay/backend/pkg/server" -// "openreplay/backend/pkg/server/api" -//) -// -//func main() { -// ctx := context.Background() -// log := logger.New() -// cfg := analyticsConfig.New(log) -// // Observability -// webMetrics := web.New("analytics") -// dbMetrics := database.New("analytics") -// metrics.New(log, append(webMetrics.List(), dbMetrics.List()...)) -// -// pgConn, err := pool.New(dbMetrics, cfg.Postgres.String()) -// if err != nil { -// log.Fatal(ctx, "can't init postgres connection: %s", err) -// } -// defer pgConn.Close() -// -// builder, err := analytics.NewServiceBuilder(log, cfg, webMetrics, dbMetrics, pgConn) -// if err != nil { -// log.Fatal(ctx, "can't init services: %s", err) -// } -// -// router, err := api.NewRouter(&cfg.HTTP, log) -// if err != nil { -// log.Fatal(ctx, "failed while creating router: %s", err) -// } -// router.AddHandlers(api.NoPrefix, builder.CardsAPI, builder.DashboardsAPI, builder.ChartsAPI) -// router.AddMiddlewares(builder.Auth.Middleware, builder.RateLimiter.Middleware, builder.AuditTrail.Middleware) -// -// server.Run(ctx, log, &cfg.HTTP, router) -//} + chConn, err := db.NewConnector(cfg.Clickhouse) + if err != nil { + log.Fatal(ctx, "can't init clickhouse connection: %s", err) + } + defer chConn.Stop() + + builder, err := analytics.NewServiceBuilder(log, cfg, webMetrics, pgConn, chConn) + if err != nil { + log.Fatal(ctx, "can't init services: %s", err) + } + + router, err := api.NewRouter(&cfg.HTTP, log) + if err != nil { + log.Fatal(ctx, "failed while creating router: %s", err) + } + router.AddHandlers(api.NoPrefix, builder.CardsAPI, builder.DashboardsAPI, builder.ChartsAPI) + router.AddMiddlewares(builder.Auth.Middleware, builder.RateLimiter.Middleware, builder.AuditTrail.Middleware) + + server.Run(ctx, log, &cfg.HTTP, router) +} diff --git a/backend/internal/config/analytics/config.go b/backend/internal/config/analytics/config.go index b6ca5ce4c..90398240a 100644 --- a/backend/internal/config/analytics/config.go +++ b/backend/internal/config/analytics/config.go @@ -14,6 +14,7 @@ import ( type Config struct { common.Config common.Postgres + common.Clickhouse redis.Redis objectstorage.ObjectsConfig common.HTTP diff --git a/backend/pkg/analytics/builder.go b/backend/pkg/analytics/builder.go index 68098dc01..743373b50 100644 --- a/backend/pkg/analytics/builder.go +++ b/backend/pkg/analytics/builder.go @@ -3,6 +3,7 @@ package analytics import ( "github.com/go-playground/validator/v10" "openreplay/backend/pkg/analytics/charts" + "openreplay/backend/pkg/analytics/db" "openreplay/backend/pkg/metrics/database" "time" @@ -27,13 +28,14 @@ type ServicesBuilder struct { ChartsAPI api.Handlers } -func NewServiceBuilder(log logger.Logger, cfg *analytics.Config, webMetrics web.Web, dbMetrics database.Database, pgconn pool.Pool) (*ServicesBuilder, error) { +func NewServiceBuilder(log logger.Logger, cfg *analytics.Config, webMetrics web.Web, dbMetrics database.Database, pgconn pool.Pool, chConn db.Connector) (*ServicesBuilder, error) { responser := api.NewResponser(webMetrics) audiTrail, err := tracer.NewTracer(log, pgconn, dbMetrics) if err != nil { return nil, err } reqValidator := validator.New() + cardsService, err := cards.New(log, pgconn) if err != nil { return nil, err @@ -42,6 +44,7 @@ func NewServiceBuilder(log logger.Logger, cfg *analytics.Config, webMetrics web. if err != nil { return nil, err } + dashboardsService, err := dashboards.New(log, pgconn) if err != nil { return nil, err @@ -50,7 +53,8 @@ func NewServiceBuilder(log logger.Logger, cfg *analytics.Config, webMetrics web. if err != nil { return nil, err } - chartsService, err := charts.New(log, pgconn) + + chartsService, err := charts.New(log, pgconn, chConn) if err != nil { return nil, err } @@ -58,6 +62,7 @@ func NewServiceBuilder(log logger.Logger, cfg *analytics.Config, webMetrics web. if err != nil { return nil, err } + return &ServicesBuilder{ Auth: auth.NewAuth(log, cfg.JWTSecret, cfg.JWTSpotSecret, pgconn, nil, api.NoPrefix), RateLimiter: limiter.NewUserRateLimiter(10, 30, 1*time.Minute, 5*time.Minute), diff --git a/backend/pkg/analytics/cards/model.go b/backend/pkg/analytics/cards/model.go index 4b5dd8a10..0b42df18a 100644 --- a/backend/pkg/analytics/cards/model.go +++ b/backend/pkg/analytics/cards/model.go @@ -67,8 +67,8 @@ type CardSeries struct { } type SeriesFilter struct { - EventOrder string `json:"eventOrder" validate:"required,oneof=then or and"` - Filters []FilterItem `json:"filters"` + EventsOrder string `json:"eventsOrder" validate:"required,oneof=then or and"` + Filters []FilterItem `json:"filters"` } type FilterItem struct { diff --git a/backend/pkg/analytics/charts/charts.go b/backend/pkg/analytics/charts/charts.go index 724a814a7..46a40d364 100644 --- a/backend/pkg/analytics/charts/charts.go +++ b/backend/pkg/analytics/charts/charts.go @@ -1,155 +1,51 @@ package charts import ( - "encoding/json" "fmt" - "github.com/ClickHouse/clickhouse-go/v2/lib/driver" - "openreplay/backend/pkg/analytics/cards" - + "log" + "openreplay/backend/pkg/analytics/db" "openreplay/backend/pkg/db/postgres/pool" "openreplay/backend/pkg/logger" ) type Charts interface { - GetData(projectId int, userId uint64, req *GetCardChartDataRequest) ([]DataPoint, error) + GetData(projectId int, userId uint64, req *MetricPayload) (interface{}, error) } type chartsImpl struct { log logger.Logger pgconn pool.Pool - chConn driver.Conn + chConn db.Connector } -func New(log logger.Logger, conn pool.Pool) (Charts, error) { +func New(log logger.Logger, conn pool.Pool, chConn db.Connector) (Charts, error) { return &chartsImpl{ log: log, pgconn: conn, + chConn: chConn, }, nil } // GetData def get_chart() -func (s *chartsImpl) GetData(projectId int, userID uint64, req *GetCardChartDataRequest) ([]DataPoint, error) { +func (s *chartsImpl) GetData(projectId int, userID uint64, req *MetricPayload) (interface{}, error) { if req == nil { return nil, fmt.Errorf("request is empty") } - switch { - case req.MetricType == "funnel": - return nil, fmt.Errorf("funnel metric type is not supported yet") - case req.MetricType == "heatMap": - return nil, fmt.Errorf("heatMap metric type is not supported yet") - case req.MetricType == "pathAnalysis": - return nil, fmt.Errorf("pathAnalysis metric type is not supported yet") - - case req.MetricType == "timeseries": - return s.getTimeseriesCharts(projectId, userID, req) - case req.MetricType == "table": - return nil, fmt.Errorf("table metric type is not supported yet") - - case req.MetricType == "errors": - fallthrough - case req.MetricType == "performance": - fallthrough - case req.MetricType == "resources": - fallthrough - case req.MetricType == "webVitals": - return s.getMetric(projectId, userID, req) - - case req.MetricType == "retention": - return nil, fmt.Errorf("retention metric type is not supported yet") - case req.MetricType == "stickiness": - return nil, fmt.Errorf("stickiness metric type is not supported yet") + payload := &Payload{ + ProjectId: projectId, + UserId: userID, + MetricPayload: req, } - jsonInput := ` - { - "data": [ - { - "timestamp": 1733934939000, - "Series A": 100, - "Series B": 200 - }, - { - "timestamp": 1733935939000, - "Series A": 150, - "Series B": 250 - } - ] - }` - - var resp GetCardChartDataResponse - if err := json.Unmarshal([]byte(jsonInput), &resp); err != nil { - return nil, fmt.Errorf("failed to unmarshal response: %w", err) + qb, err := NewQueryBuilder(payload) + if err != nil { + log.Fatalf("Error creating query builder: %v", err) } - return resp.Data, nil -} - -func (s *chartsImpl) getMetric(projectID int, userID uint64, req *GetCardChartDataRequest) ([]DataPoint, error) { - switch req.MetricOf { - case "countSessions": // metrics.get_processed_sessions - return nil, fmt.Errorf("countSessions metric type is not supported yet") - case "avgVisitedPages": // metrics.get_user_activity_avg_visited_pages - return nil, fmt.Errorf("avgVisitedPages metric type is not supported yet") - case "countRequests": // metrics.get_top_metrics_count_requests - return nil, fmt.Errorf("countRequests metric type is not supported yet") - case "impactedSessionsByJsErrors": // metrics.get_impacted_sessions_by_js_errors - return nil, fmt.Errorf("impactedSessionsByJsErrors metric type is not supported yet") - case "domainsErrors4xx": // metrics.get_domains_errors_4xx - return nil, fmt.Errorf("domainsErrors4xx metric type is not supported yet") - case "domainsErrors5xx": // metrics.get_domains_errors_5xx - return nil, fmt.Errorf("domainsErrors5xx metric type is not supported yet") - case "errorsPerDomains": // metrics.get_errors_per_domains - return nil, fmt.Errorf("errorsPerDomains metric type is not supported yet") - case "errorsPerType": // metrics.get_errors_per_type - return nil, fmt.Errorf("errorsPerType metric type is not supported yet") - - } - return nil, fmt.Errorf("metric type is not supported yet") - -} - -func (s *chartsImpl) getTimeseriesCharts(projectID int, userID uint64, req *GetCardChartDataRequest) ([]DataPoint, error) { - var dataPoints []DataPoint - var stepSize = getStepSize(req.StartTimestamp, req.EndTimestamp, req.Density, true, 1000) - var query string - - switch req.MetricOf { - case "sessionCount": - query = fmt.Sprintf(` - SELECT - toUnixTimestamp(toStartOfInterval(processed_sessions.datetime, INTERVAL %d second)) * 1000 AS timestamp, - COUNT(processed_sessions.session_id) AS count - FROM ( - SELECT - s.session_id AS session_id, - s.datetime AS datetime - %s - ) AS processed_sessions - GROUP BY timestamp - ORDER BY timestamp; - `, stepSize, "query_part") // Replace "query_part" with the actual query part - default: - return nil, fmt.Errorf("unsupported metric: %s", req.MetricOf) - } - - fmt.Printf("stepSize: %v\n", stepSize) - - for _, series := range req.Series { - res, err := s.searchSeries(projectID, series) - if err != nil { - return nil, fmt.Errorf("failed to search series: %w", err) - } - if seriesData, ok := res.([]DataPoint); ok { - dataPoints = append(dataPoints, seriesData...) - } else { - return nil, fmt.Errorf("unexpected data format from searchSeries") - } - } - return dataPoints, nil -} - -func (s *chartsImpl) searchSeries(projectID int, series cards.CardSeries) (interface{}, error) { - - // Placeholder implementation - return []DataPoint{}, nil + resp, err := qb.Execute(payload, s.chConn) + if err != nil { + log.Fatalf("Error building query: %v", err) + } + + return resp, nil } diff --git a/backend/pkg/analytics/charts/counters.go b/backend/pkg/analytics/charts/counters.go index 431104b03..520ca7163 100644 --- a/backend/pkg/analytics/charts/counters.go +++ b/backend/pkg/analytics/charts/counters.go @@ -1,7 +1,6 @@ package charts import ( - "context" "fmt" "log" "strconv" @@ -162,7 +161,7 @@ def get_main_sessions_table(timestamp=0): and timestamp and timestamp >= TimeUTC.now(delta_days=-7) else "experimental.sessions" */ func getMainSessionsTable(timestamp int64) string { - return "product_analytics.sessions" + return "experimental.sessions" } // Function to convert named parameters to positional parameters @@ -272,7 +271,7 @@ func progress(oldVal, newVal uint64) float64 { } // Trying to find a common part -func parse(projectID uint64, startTs, endTs int64, density int, args map[string]interface{}) ([]string, []string, map[string]interface{}) { +func parse(projectID int, startTs, endTs int64, density int, args map[string]interface{}) ([]string, []string, map[string]interface{}) { stepSize := getStepSize(startTs, endTs, density, false, 1000) chSubQuery := getBasicConstraints("sessions", true, false, args, "project_id") chSubQueryChart := getBasicConstraints("sessions", true, true, args, "project_id") @@ -293,134 +292,136 @@ func parse(projectID uint64, startTs, endTs int64, density int, args map[string] } // Sessions trend -func (s *chartsImpl) getProcessedSessions(projectID uint64, startTs, endTs int64, density int, args map[string]interface{}) { - chQuery := ` - SELECT toUnixTimestamp(toStartOfInterval(sessions.datetime, INTERVAL :step_size second)) * 1000 AS timestamp, - COUNT(DISTINCT sessions.session_id) AS value - FROM :main_sessions_table AS sessions - WHERE :sub_query_chart - GROUP BY timestamp - ORDER BY timestamp; - ` - chSubQuery, chSubQueryChart, params := parse(projectID, startTs, endTs, density, args) - - chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1) - chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQueryChart, " AND "), -1) - - preparedQuery, preparedArgs := replaceNamedParams(chQuery, params) - rows, err := s.chConn.Query(context.Background(), preparedQuery, preparedArgs) - if err != nil { - log.Fatalf("Error executing query: %v", err) - } - preparedRows := make([]map[string]interface{}, 0) - var sum uint64 - for rows.Next() { - var timestamp, value uint64 - if err := rows.Scan(×tamp, &value); err != nil { - log.Fatalf("Error scanning row: %v", err) - } - fmt.Printf("Timestamp: %d, Value: %d\n", timestamp, value) - sum += value - preparedRows = append(preparedRows, map[string]interface{}{"timestamp": timestamp, "value": value}) - } - - results := map[string]interface{}{ - "value": sum, - "chart": CompleteMissingSteps(startTs, endTs, int(density), map[string]interface{}{"value": 0}, preparedRows, "timestamp", 1000), - } - - diff := endTs - startTs - endTs = startTs - startTs = endTs - diff - - log.Println(results) - - chQuery = fmt.Sprintf(` - SELECT COUNT(1) AS count - FROM :main_sessions_table AS sessions - WHERE :sub_query_chart; - `) - chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1) - chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQuery, " AND "), -1) - - var count uint64 - - preparedQuery, preparedArgs = replaceNamedParams(chQuery, params) - if err := s.chConn.QueryRow(context.Background(), preparedQuery, preparedArgs).Scan(&count); err != nil { - log.Fatalf("Error executing query: %v", err) - } - - results["progress"] = progress(count, results["value"].(uint64)) - - // TODO: this should be returned in any case - results["unit"] = "COUNT" - fmt.Println(results) -} - -// Users trend -func (c *chartsImpl) getUniqueUsers(projectID uint64, startTs, endTs uint64, density uint64, args map[string]interface{}) { - chQuery := ` - SELECT toUnixTimestamp(toStartOfInterval(sessions.datetime, INTERVAL :step_size second)) * 1000 AS timestamp, - COUNT(DISTINCT sessions.user_id) AS value - FROM :main_sessions_table AS sessions - WHERE :sub_query_chart - GROUP BY timestamp - ORDER BY timestamp; - ` - chSubQuery, chSubQueryChart, params := parse(projectID, startTs, endTs, density, args) - chSubQueryChart = append(chSubQueryChart, []string{"isNotNull(sessions.user_id)", "sessions.user_id!=''"}...) - - chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1) - chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQueryChart, " AND "), -1) - - preparedQuery, preparedArgs := replaceNamedParams(chQuery, params) - rows, err := c.chConn.Query(context.Background(), preparedQuery, preparedArgs) - if err != nil { - log.Fatalf("Error executing query: %v", err) - } - preparedRows := make([]map[string]interface{}, 0) - var sum uint64 - for rows.Next() { - var timestamp, value uint64 - if err := rows.Scan(×tamp, &value); err != nil { - log.Fatalf("Error scanning row: %v", err) - } - fmt.Printf("Timestamp: %d, Value: %d\n", timestamp, value) - sum += value - preparedRows = append(preparedRows, map[string]interface{}{"timestamp": timestamp, "value": value}) - } - - results := map[string]interface{}{ - "value": sum, - "chart": CompleteMissingSteps(startTs, endTs, int(density), map[string]interface{}{"value": 0}, preparedRows, "timestamp", 1000), - } - - diff := endTs - startTs - endTs = startTs - startTs = endTs - diff - - log.Println(results) - - chQuery = fmt.Sprintf(` - SELECT COUNT(DISTINCT user_id) AS count - FROM :main_sessions_table AS sessions - WHERE :sub_query_chart; - `) - chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1) - chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQuery, " AND "), -1) - - var count uint64 - - preparedQuery, preparedArgs = replaceNamedParams(chQuery, params) - if err := c.chConn.QueryRow(context.Background(), preparedQuery, preparedArgs).Scan(&count); err != nil { - log.Fatalf("Error executing query: %v", err) - } - - results["progress"] = progress(count, results["value"].(uint64)) - - // TODO: this should be returned in any case - results["unit"] = "COUNT" - fmt.Println(results) - - return -} +//func (s *chartsImpl) getProcessedSessions(projectID int, startTs, endTs int64, density int, args map[string]interface{}) (interface{}, error) { +// chQuery := ` +// SELECT toUnixTimestamp(toStartOfInterval(sessions.datetime, INTERVAL :step_size second)) * 1000 AS timestamp, +// COUNT(DISTINCT sessions.session_id) AS value +// FROM :main_sessions_table AS sessions +// WHERE :sub_query_chart +// GROUP BY timestamp +// ORDER BY timestamp; +// ` +// chSubQuery, chSubQueryChart, params := parse(projectID, startTs, endTs, density, args) +// +// chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1) +// chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQueryChart, " AND "), -1) +// +// preparedQuery, preparedArgs := replaceNamedParams(chQuery, params) +// rows, err := s.chConn.Query(context.Background(), preparedQuery, preparedArgs) +// if err != nil { +// log.Fatalf("Error executing query: %v", err) +// } +// preparedRows := make([]map[string]interface{}, 0) +// var sum uint64 +// for rows.Next() { +// var timestamp, value uint64 +// if err := rows.Scan(×tamp, &value); err != nil { +// log.Fatalf("Error scanning row: %v", err) +// } +// fmt.Printf("Timestamp: %d, Value: %d\n", timestamp, value) +// sum += value +// preparedRows = append(preparedRows, map[string]interface{}{"timestamp": timestamp, "value": value}) +// } +// +// results := map[string]interface{}{ +// "value": sum, +// "chart": CompleteMissingSteps(startTs, endTs, int(density), map[string]interface{}{"value": 0}, preparedRows, "timestamp", 1000), +// } +// +// diff := endTs - startTs +// endTs = startTs +// startTs = endTs - diff +// +// log.Println(results) +// +// chQuery = fmt.Sprintf(` +// SELECT COUNT(1) AS count +// FROM :main_sessions_table AS sessions +// WHERE :sub_query_chart; +// `) +// chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1) +// chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQuery, " AND "), -1) +// +// var count uint64 +// +// preparedQuery, preparedArgs = replaceNamedParams(chQuery, params) +// if err := s.chConn.QueryRow(context.Background(), preparedQuery, preparedArgs).Scan(&count); err != nil { +// log.Fatalf("Error executing query: %v", err) +// } +// +// results["progress"] = progress(count, results["value"].(uint64)) +// +// // TODO: this should be returned in any case +// results["unit"] = "COUNT" +// fmt.Println(results) +// +// return results, nil +//} +// +//// Users trend +//func (s *chartsImpl) getUniqueUsers(projectID int, startTs, endTs int64, density int, args map[string]interface{}) (interface{}, error) { +// chQuery := ` +// SELECT toUnixTimestamp(toStartOfInterval(sessions.datetime, INTERVAL :step_size second)) * 1000 AS timestamp, +// COUNT(DISTINCT sessions.user_id) AS value +// FROM :main_sessions_table AS sessions +// WHERE :sub_query_chart +// GROUP BY timestamp +// ORDER BY timestamp; +// ` +// chSubQuery, chSubQueryChart, params := parse(projectID, startTs, endTs, density, args) +// chSubQueryChart = append(chSubQueryChart, []string{"isNotNull(sessions.user_id)", "sessions.user_id!=''"}...) +// +// chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1) +// chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQueryChart, " AND "), -1) +// +// preparedQuery, preparedArgs := replaceNamedParams(chQuery, params) +// rows, err := s.chConn.Query(context.Background(), preparedQuery, preparedArgs) +// if err != nil { +// log.Fatalf("Error executing query: %v", err) +// } +// preparedRows := make([]map[string]interface{}, 0) +// var sum uint64 +// for rows.Next() { +// var timestamp, value uint64 +// if err := rows.Scan(×tamp, &value); err != nil { +// log.Fatalf("Error scanning row: %v", err) +// } +// fmt.Printf("Timestamp: %d, Value: %d\n", timestamp, value) +// sum += value +// preparedRows = append(preparedRows, map[string]interface{}{"timestamp": timestamp, "value": value}) +// } +// +// results := map[string]interface{}{ +// "value": sum, +// "chart": CompleteMissingSteps(startTs, endTs, int(density), map[string]interface{}{"value": 0}, preparedRows, "timestamp", 1000), +// } +// +// diff := endTs - startTs +// endTs = startTs +// startTs = endTs - diff +// +// log.Println(results) +// +// chQuery = fmt.Sprintf(` +// SELECT COUNT(DISTINCT user_id) AS count +// FROM :main_sessions_table AS sessions +// WHERE :sub_query_chart; +// `) +// chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1) +// chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQuery, " AND "), -1) +// +// var count uint64 +// +// preparedQuery, preparedArgs = replaceNamedParams(chQuery, params) +// if err := s.chConn.QueryRow(context.Background(), preparedQuery, preparedArgs).Scan(&count); err != nil { +// log.Fatalf("Error executing query: %v", err) +// } +// +// results["progress"] = progress(count, results["value"].(uint64)) +// +// // TODO: this should be returned in any case +// results["unit"] = "COUNT" +// fmt.Println(results) +// +// return results, nil +//} diff --git a/backend/pkg/analytics/charts/handlers.go b/backend/pkg/analytics/charts/handlers.go index 2b35c5b3c..8e759ea91 100644 --- a/backend/pkg/analytics/charts/handlers.go +++ b/backend/pkg/analytics/charts/handlers.go @@ -74,7 +74,7 @@ func (e *handlersImpl) getCardChartData(w http.ResponseWriter, r *http.Request) } bodySize = len(bodyBytes) - req := &GetCardChartDataRequest{} + req := &MetricPayload{} if err := json.Unmarshal(bodyBytes, req); err != nil { e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize) return diff --git a/backend/pkg/analytics/charts/metric_funnel.go b/backend/pkg/analytics/charts/metric_funnel.go new file mode 100644 index 000000000..0f58c886d --- /dev/null +++ b/backend/pkg/analytics/charts/metric_funnel.go @@ -0,0 +1,9 @@ +package charts + +import "openreplay/backend/pkg/analytics/db" + +type FunnelQueryBuilder struct{} + +func (f FunnelQueryBuilder) Execute(p *Payload, conn db.Connector) (interface{}, error) { + return "-- Funnel query placeholder", nil +} diff --git a/backend/pkg/analytics/charts/metric_table.go b/backend/pkg/analytics/charts/metric_table.go new file mode 100644 index 000000000..46135fd55 --- /dev/null +++ b/backend/pkg/analytics/charts/metric_table.go @@ -0,0 +1,253 @@ +package charts + +import ( + "fmt" + "openreplay/backend/pkg/analytics/db" + "strings" +) + +type TableQueryBuilder struct{} + +func (t TableQueryBuilder) Execute(p *Payload, conn db.Connector) (interface{}, error) { + return t.buildQuery(p) +} + +func (t TableQueryBuilder) buildQuery(r *Payload) (string, error) { + s := r.Series[0] + sessionFilters, eventFilters := partitionFilters(s.Filter.Filters) + sessionWhere := buildSessionWhere(sessionFilters) + eventWhere, seqHaving := buildEventsWhere(eventFilters, s.Filter.EventsOrder) + + subQuery := fmt.Sprintf( + "SELECT %s,\n"+ + " MIN(%s) AS first_event_ts,\n"+ + " MAX(%s) AS last_event_ts\n"+ + "FROM %s AS main\n"+ + "WHERE main.project_id = %%(project_id)s\n"+ + " AND %s >= toDateTime(%%(start_time)s/1000)\n"+ + " AND %s <= toDateTime(%%(end_time)s/1000)\n"+ + " AND (%s)\n"+ + "GROUP BY %s\n"+ + "HAVING %s", + ColEventSessionID, + ColEventTime, + ColEventTime, + TableEvents, + ColEventTime, + ColEventTime, + strings.Join(eventWhere, " OR "), + ColEventSessionID, + seqHaving, + ) + + joinQuery := fmt.Sprintf( + "SELECT *\n"+ + "FROM %s AS s\n"+ + "INNER JOIN (\n"+ + " SELECT DISTINCT ev.session_id, ev.`$current_url` AS url_path\n"+ + " FROM %s AS ev\n"+ + " WHERE ev.created_at >= toDateTime(%%(start_time)s/1000)\n"+ + " AND ev.created_at <= toDateTime(%%(end_time)s/1000)\n"+ + " AND ev.project_id = %%(project_id)s\n"+ + " AND ev.`$event_name` = 'LOCATION'\n"+ + ") AS extra_event USING (session_id)\n"+ + "WHERE s.project_id = %%(project_id)s\n"+ + " AND isNotNull(s.duration)\n"+ + " AND s.datetime >= toDateTime(%%(start_time)s/1000)\n"+ + " AND s.datetime <= toDateTime(%%(end_time)s/1000)\n", + TableSessions, + TableEvents, + ) + + if len(sessionWhere) > 0 { + joinQuery += " AND " + strings.Join(sessionWhere, " AND ") + "\n" + } + + main := fmt.Sprintf( + "SELECT s.session_id AS session_id, s.url_path\n"+ + "FROM (\n%s\n) AS f\n"+ + "INNER JOIN (\n%s) AS s\n"+ + " ON (s.session_id = f.session_id)\n", + subQuery, + joinQuery, + ) + + final := fmt.Sprintf( + "SELECT COUNT(DISTINCT url_path) OVER () AS main_count,\n"+ + " url_path AS name,\n"+ + " COUNT(DISTINCT session_id) AS total,\n"+ + " COALESCE(SUM(COUNT(DISTINCT session_id)) OVER (), 0) AS total_count\n"+ + "FROM (\n%s) AS filtered_sessions\n"+ + "GROUP BY url_path\n"+ + "ORDER BY total DESC\n"+ + "LIMIT 200 OFFSET 0;", + main, + ) + + return final, nil +} + +func partitionFilters(filters []Filter) (sessionFilters []Filter, eventFilters []Filter) { + for _, f := range filters { + if f.IsEvent { + eventFilters = append(eventFilters, f) + } else { + sessionFilters = append(sessionFilters, f) + } + } + return +} + +func buildSessionWhere(filters []Filter) []string { + var conds []string + for _, f := range filters { + switch f.Type { + case FilterUserCountry: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCountry, concatValues(f.Value))) + case FilterUserCity: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCity, concatValues(f.Value))) + case FilterUserState: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserState, concatValues(f.Value))) + case FilterUserId: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserID, concatValues(f.Value))) + case FilterUserAnonymousId: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserAnonymousID, concatValues(f.Value))) + case FilterUserOs: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserOS, concatValues(f.Value))) + case FilterUserBrowser: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserBrowser, concatValues(f.Value))) + case FilterUserDevice: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDevice, concatValues(f.Value))) + case FilterPlatform: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDeviceType, concatValues(f.Value))) + case FilterRevId: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColRevID, concatValues(f.Value))) + case FilterReferrer: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColBaseReferrer, concatValues(f.Value))) + case FilterDuration: + if len(f.Value) == 2 { + conds = append(conds, fmt.Sprintf("%s >= '%s'", ColDuration, f.Value[0])) + conds = append(conds, fmt.Sprintf("%s <= '%s'", ColDuration, f.Value[1])) + } + case FilterUtmSource: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmSource, concatValues(f.Value))) + case FilterUtmMedium: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmMedium, concatValues(f.Value))) + case FilterUtmCampaign: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmCampaign, concatValues(f.Value))) + case FilterMetadata: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColMetadata1, concatValues(f.Value))) + } + } + // adding /n to each condition for better readability, can be removed. + for i := range conds { + conds[i] += "\n" + } + return conds +} + +func concatValues(v []string) string { + return strings.Join(v, "") +} + +func buildEventsWhere(filters []Filter, order EventOrder) (eventConditions []string, having string) { + basicEventTypes := "(" + + strings.Join([]string{ + fmt.Sprintf("%s = 'CLICK'", ColEventName), + fmt.Sprintf("%s = 'INPUT'", ColEventName), + fmt.Sprintf("%s = 'LOCATION'", ColEventName), + fmt.Sprintf("%s = 'CUSTOM'", ColEventName), + fmt.Sprintf("%s = 'REQUEST'", ColEventName), + }, " OR ") + ")" + + var seq []string + for _, f := range filters { + switch f.Type { + case FilterClick: + seq = append(seq, seqCond("CLICK", "selector", f)) + case FilterInput: + seq = append(seq, seqCond("INPUT", "label", f)) + case FilterLocation: + seq = append(seq, seqCond("LOCATION", "url_path", f)) + case FilterCustom: + seq = append(seq, seqCond("CUSTOM", "name", f)) + case FilterFetch: + seq = append(seq, seqFetchCond("REQUEST", f)) + case FilterFetchStatusCode: + seq = append(seq, seqCond("REQUEST", "status", f)) + default: + seq = append(seq, fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(string(f.Type)))) + } + } + eventConditions = []string{basicEventTypes} + + // then => sequenceMatch + // or => OR + // and => AND + switch order { + case EventOrderThen: + var pattern []string + for i := range seq { + pattern = append(pattern, fmt.Sprintf("(?%d)", i+1)) + } + having = fmt.Sprintf("sequenceMatch('%s')(\n%s,\n%s)", + strings.Join(pattern, ""), fmt.Sprintf("toUnixTimestamp(%s)", ColEventTime), strings.Join(seq, ",\n")) + case EventOrderAnd: + // build AND + having = strings.Join(seq, " AND ") + default: + // default => OR + var orParts []string + for _, p := range seq { + orParts = append(orParts, "("+p+")") + } + having = strings.Join(orParts, " OR ") + } + return +} + +func seqCond(eventName, key string, f Filter) string { + op := parseOperator(f.Operator) + return fmt.Sprintf("(%s = '%s' AND JSONExtractString(toString(%s), '%s') %s '%s')", + ColEventName, strings.ToUpper(eventName), ColEventProperties, key, op, concatValues(f.Value)) +} + +func seqFetchCond(eventName string, f Filter) string { + w := []string{fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(eventName))} + var extras []string + for _, c := range f.Filters { + switch c.Type { + case FilterFetch: + if len(c.Value) > 0 { + extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventURLPath, concatValues(c.Value))) + } + case FilterFetchStatusCode: + if len(c.Value) > 0 { + extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventStatus, concatValues(c.Value))) + } + default: + // placeholder if needed + } + } + if len(extras) > 0 { + w = append(w, strings.Join(extras, " AND ")) + } + return "(" + strings.Join(w, " AND ") + ")" +} + +func parseOperator(op string) string { + // TODO implement this properly + switch strings.ToLower(op) { + case OperatorStringContains: + return "LIKE" + case OperatorStringIs, OperatorStringOn, "=", OperatorStringOnAny: + return "=" + case OperatorStringStartsWith: + return "LIKE" + case OperatorStringEndsWith: + // might interpret differently in real impl + return "=" + default: + return "=" + } +} diff --git a/backend/pkg/analytics/charts/metric_timeseries.go b/backend/pkg/analytics/charts/metric_timeseries.go new file mode 100644 index 000000000..cc1df46c3 --- /dev/null +++ b/backend/pkg/analytics/charts/metric_timeseries.go @@ -0,0 +1,116 @@ +package charts + +import ( + "fmt" + "log" + "openreplay/backend/pkg/analytics/db" +) + +type TimeSeriesQueryBuilder struct{} + +func (t TimeSeriesQueryBuilder) Execute(p *Payload, conn db.Connector) (interface{}, error) { + query, err := t.buildQuery(p) + if err != nil { + log.Fatalf("Error building query: %v", err) + return nil, err + } + + rows, err := conn.Query(query) + if err != nil { + log.Fatalf("Error executing query: %v", err) + return nil, err + } + defer rows.Close() + + var results []DataPoint + + for rows.Next() { + var res DataPoint + if err := rows.Scan(&res.Timestamp, &res.Count); err != nil { + return nil, err + } + //sum += res.Count + results = append(results, res) + } + + filled := FillMissingDataPoints(p.StartTimestamp, p.EndTimestamp, p.Density, DataPoint{}, results, 1000) + return filled, nil +} + +func (t TimeSeriesQueryBuilder) buildQuery(p *Payload) (string, error) { + query := "" + switch p.MetricOf { + case "sessionCount": + query = t.buildSessionCountQuery(p) + case "userCount": + query = t.buildUserCountQuery(p) + default: + query = "" + } + return query, nil +} + +func (TimeSeriesQueryBuilder) buildSessionCountQuery(p *Payload) string { + stepSize := int(getStepSize(p.StartTimestamp, p.EndTimestamp, p.Density, false, 1000)) + subquery := buildEventSubquery(p) + return fmt.Sprintf(`SELECT toUnixTimestamp( + toStartOfInterval(processed_sessions.datetime, INTERVAL %d second) +) * 1000 AS timestamp, +COUNT(processed_sessions.session_id) AS count +FROM ( + %s +) AS processed_sessions +GROUP BY timestamp +ORDER BY timestamp;`, stepSize, subquery) +} + +func (TimeSeriesQueryBuilder) buildUserCountQuery(p *Payload) string { + stepSize := int(getStepSize(p.StartTimestamp, p.EndTimestamp, p.Density, false, 1000)) + subquery := buildEventSubquery(p) + return fmt.Sprintf(`SELECT toUnixTimestamp( + toStartOfInterval(processed_sessions.datetime, INTERVAL %d second) +) * 1000 AS timestamp, +COUNT(DISTINCT processed_sessions.user_id) AS count +FROM ( + %s +) AS processed_sessions +GROUP BY timestamp +ORDER BY timestamp;`, stepSize, subquery) +} + +func FillMissingDataPoints( + startTime, endTime int64, + density int, + neutral DataPoint, + rows []DataPoint, + timeCoefficient int64, +) []DataPoint { + if density <= 1 { + return rows + } + + stepSize := uint64(getStepSize(startTime, endTime, density, false, 1000)) + bucketSize := stepSize * uint64(timeCoefficient) + + lookup := make(map[uint64]DataPoint) + for _, dp := range rows { + if dp.Timestamp < uint64(startTime) { + continue + } + bucket := uint64(startTime) + (((dp.Timestamp - uint64(startTime)) / bucketSize) * bucketSize) + lookup[bucket] = dp + } + + results := make([]DataPoint, 0, density) + for i := 0; i < density; i++ { + ts := uint64(startTime) + uint64(i)*bucketSize + if dp, ok := lookup[ts]; ok { + results = append(results, dp) + } else { + nd := neutral + nd.Timestamp = ts + results = append(results, nd) + } + } + return results +} diff --git a/backend/pkg/analytics/charts/model.go b/backend/pkg/analytics/charts/model.go index 0ff630f2f..721008ea8 100644 --- a/backend/pkg/analytics/charts/model.go +++ b/backend/pkg/analytics/charts/model.go @@ -1,57 +1,155 @@ package charts -import "openreplay/backend/pkg/analytics/cards" - -type DataPoint struct { - Timestamp int64 `json:"timestamp"` - Series map[string]int64 `json:"series"` -} - -type GetCardChartDataRequest struct { - StartTimestamp int64 `json:"startTimestamp" validate:"required"` - EndTimestamp int64 `json:"endTimestamp" validate:"required"` - Density int `json:"density" validate:"required"` - MetricType string `json:"metricType" validate:"required,oneof=timeseries table funnel errors performance resources webVitals pathAnalysis retention stickiness heatMap"` - MetricOf string `json:"metricOf" validate:"required,oneof=sessionCount userCount"` - ViewType string `json:"viewType" validate:"required,oneof=lineChart areaChart barChart pieChart progressChart table metric"` - MetricFormat string `json:"metricFormat" validate:"required,oneof=default percentage"` - SessionID int64 `json:"sessionId"` - Series []cards.CardSeries `json:"series" validate:"required,dive"` -} - -type GetCardChartDataResponse struct { - Data []DataPoint `json:"data"` -} - +type Table string +type Column string type MetricType string -type MetricOfTimeseries string -type MetricOfTable string +type FilterType string +type EventType string +type EventOrder string const ( - MetricTypeTimeseries MetricType = "TIMESERIES" - MetricTypeTable MetricType = "TABLE" - - MetricOfTimeseriesSessionCount MetricOfTimeseries = "SESSION_COUNT" - MetricOfTimeseriesUserCount MetricOfTimeseries = "USER_COUNT" - - MetricOfTableVisitedURL MetricOfTable = "VISITED_URL" - MetricOfTableIssues MetricOfTable = "ISSUES" - MetricOfTableUserCountry MetricOfTable = "USER_COUNTRY" - MetricOfTableUserDevice MetricOfTable = "USER_DEVICE" - MetricOfTableUserBrowser MetricOfTable = "USER_BROWSER" + TableEvents Table = "product_analytics.events" + TableSessions Table = "experimental.sessions" ) -type SessionsSearchPayload struct { - StartTimestamp int64 - EndTimestamp int64 - Filters []SessionSearchFilter +const ( + ColEventTime Column = "main.created_at" + ColEventName Column = "main.`$event_name`" + ColEventProjectID Column = "main.project_id" + ColEventProperties Column = "main.`$properties`" + ColEventSessionID Column = "main.session_id" + ColEventURLPath Column = "main.url_path" + ColEventStatus Column = "main.status" +) + +const ( + ColSessionID Column = "s.session_id" + ColDuration Column = "s.duration" + ColUserCountry Column = "s.user_country" + ColUserCity Column = "s.user_city" + ColUserState Column = "s.user_state" + ColUserID Column = "s.user_id" + ColUserAnonymousID Column = "s.user_anonymous_id" + ColUserOS Column = "s.user_os" + ColUserBrowser Column = "s.user_browser" + ColUserDevice Column = "s.user_device" + ColUserDeviceType Column = "s.user_device_type" + ColRevID Column = "s.rev_id" + ColBaseReferrer Column = "s.base_referrer" + ColUtmSource Column = "s.utm_source" + ColUtmMedium Column = "s.utm_medium" + ColUtmCampaign Column = "s.utm_campaign" + ColMetadata1 Column = "s.metadata_1" + ColSessionProjectID Column = "s.project_id" + ColSessionIsNotNull Column = "isNotNull(s.duration)" +) + +const ( + MetricTypeTimeseries MetricType = "timeseries" + MetricTypeTable MetricType = "table" + MetricTypeFunnel MetricType = "funnel" +) + +const ( + EventOrderThen EventOrder = "then" + EventOrderOr EventOrder = "or" + EventOrderAnd EventOrder = "and" +) + +type MetricPayload struct { + StartTimestamp int64 `json:"startTimestamp"` + EndTimestamp int64 `json:"endTimestamp"` + Density int `json:"density"` + MetricOf string `json:"metricOf"` + MetricType MetricType `json:"metricType"` + MetricFormat string `json:"metricFormat"` + ViewType string `json:"viewType"` + Name string `json:"name"` + Series []Series `json:"series"` } -type SessionSearchFilter struct { - Type FilterType - Value interface{} - Operator SearchEventOperator +type Series struct { + Name string `json:"name"` + Filter struct { + Filters []Filter `json:"filters"` + EventsOrder EventOrder `json:"eventsOrder"` + } `json:"filter"` } -type SearchEventOperator string // Define constants as needed -type FilterType string // Define constants as needed +type Filter struct { + Type FilterType `json:"type"` + IsEvent bool `json:"isEvent"` + Value []string `json:"value"` + Operator string `json:"operator"` + Filters []Filter `json:"filters"` +} + +const ( + FilterUserId FilterType = "userId" + FilterUserAnonymousId FilterType = "userAnonymousId" + FilterReferrer FilterType = "referrer" + FilterDuration FilterType = "duration" + FilterUtmSource FilterType = "utmSource" + FilterUtmMedium FilterType = "utmMedium" + FilterUtmCampaign FilterType = "utmCampaign" + FilterUserCountry FilterType = "userCountry" + FilterUserCity FilterType = "userCity" + FilterUserState FilterType = "userState" + FilterUserOs FilterType = "userOs" + FilterUserBrowser FilterType = "userBrowser" + FilterUserDevice FilterType = "userDevice" + FilterPlatform FilterType = "platform" + FilterRevId FilterType = "revId" + FilterIssue FilterType = "issue" + FilterMetadata FilterType = "metadata" +) + +// Event filters +const ( + FilterClick FilterType = "click" + FilterInput FilterType = "input" + FilterLocation FilterType = "location" + FilterCustom FilterType = "customEvent" + FilterFetch FilterType = "fetch" + FilterFetchStatusCode FilterType = "status" + FilterTag FilterType = "tag" + FilterNetworkRequest FilterType = "fetch" + FilterGraphQLRequest FilterType = "graphql" + FilterStateAction FilterType = "stateAction" + FilterError FilterType = "error" + FilterAvgCpuLoad FilterType = "avgCpuLoad" + FilterAvgMemoryUsage FilterType = "avgMemoryUsage" +) + +// MOBILE FILTERS +const ( + FilterUserOsIos FilterType = "userOsIos" + FilterUserDeviceIos FilterType = "userDeviceIos" + FilterUserCountryIos FilterType = "userCountryIos" + FilterUserIdIos FilterType = "userIdIos" + FilterUserAnonymousIdIos FilterType = "userAnonymousIdIos" + FilterRevIdIos FilterType = "revIdIos" +) + +const ( + OperatorStringIs = "is" + OperatorStringIsAny = "isAny" + OperatorStringOn = "on" + OperatorStringOnAny = "onAny" + OperatorStringIsNot = "isNot" + OperatorStringIsUndefined = "isUndefined" + OperatorStringNotOn = "notOn" + OperatorStringContains = "contains" + OperatorStringNotContains = "notContains" + OperatorStringStartsWith = "startsWith" + OperatorStringEndsWith = "endsWith" +) + +type DataPoint struct { + Timestamp uint64 `json:"timestamp"` + Count uint64 `json:"count"` +} + +//type TimeseriesResponse struct { +// Data []DataPoint `json:"data"` +//} diff --git a/backend/pkg/analytics/charts/query.go b/backend/pkg/analytics/charts/query.go new file mode 100644 index 000000000..1d82bd5da --- /dev/null +++ b/backend/pkg/analytics/charts/query.go @@ -0,0 +1,150 @@ +package charts + +import ( + "fmt" + "openreplay/backend/pkg/analytics/db" + "strings" +) + +type Payload struct { + *MetricPayload + ProjectId int + UserId uint64 +} + +type QueryBuilder interface { + Execute(p *Payload, conn db.Connector) (interface{}, error) +} + +func NewQueryBuilder(p *Payload) (QueryBuilder, error) { + switch p.MetricType { + case MetricTypeTimeseries: + return TimeSeriesQueryBuilder{}, nil + case MetricTypeFunnel: + return FunnelQueryBuilder{}, nil + case MetricTypeTable: + return TableQueryBuilder{}, nil + default: + return nil, fmt.Errorf("unknown metric type: %s", p.MetricType) + } +} + +func buildEventSubquery(p *Payload) string { + baseEventsWhere := buildBaseEventsWhere(p) + sequenceCond := buildSequenceCondition(p.Series) + sessionsWhere := buildSessionsWhere(p) + + if sequenceCond.seqPattern == "" { + return fmt.Sprintf(` +SELECT s.%[1]s AS %[1]s, + s.datetime AS datetime +FROM ( + SELECT main.session_id, + MIN(main.created_at) AS first_event_ts, + MAX(main.created_at) AS last_event_ts + FROM product_analytics.events AS main + WHERE %[2]s + GROUP BY session_id +) AS f +INNER JOIN ( + SELECT * + FROM experimental.sessions AS s + WHERE %[3]s +) AS s ON (s.session_id = f.session_id) +`, pickIDField(p), baseEventsWhere, sessionsWhere) + } + + return fmt.Sprintf(` +SELECT s.%[1]s AS %[1]s, + s.datetime AS datetime +FROM ( + SELECT main.session_id, + MIN(main.created_at) AS first_event_ts, + MAX(main.created_at) AS last_event_ts + FROM product_analytics.events AS main + WHERE %[2]s + GROUP BY session_id + HAVING sequenceMatch('%[3]s')(toDateTime(main.created_at), %[4]s) +) AS f +INNER JOIN ( + SELECT * + FROM experimental.sessions AS s + WHERE %[5]s +) AS s ON (s.session_id = f.session_id) +`, pickIDField(p), baseEventsWhere, sequenceCond.seqPattern, sequenceCond.seqEvents, sessionsWhere) +} + +func pickIDField(p *Payload) string { + if p.MetricOf == "userCount" { + return "user_id" + } + return "session_id" +} + +func buildBaseEventsWhere(p *Payload) string { + ts := fmt.Sprintf( + `(main.created_at >= toDateTime(%d / 1000) AND main.created_at <= toDateTime(%d / 1000))`, + p.StartTimestamp, + p.EndTimestamp, + ) + return fmt.Sprintf(`main.project_id = %d AND %s`, p.ProjectId, ts) +} + +func buildSessionsWhere(p *Payload) string { + ts := fmt.Sprintf( + `(s.datetime >= toDateTime(%d / 1000) AND s.datetime <= toDateTime(%d / 1000))`, + p.StartTimestamp, + p.EndTimestamp, + ) + return fmt.Sprintf(`s.project_id = %d AND isNotNull(s.duration) AND %s`, p.ProjectId, ts) +} + +type sequenceParts struct { + seqPattern string + seqEvents string +} + +func buildSequenceCondition(series []Series) sequenceParts { + var events []string + for _, s := range series { + if len(s.Filter.Filters) > 0 { + events = append(events, buildOneSeriesSequence(s.Filter.Filters)) + } + } + if len(events) < 2 { + return sequenceParts{"", ""} + } + pattern := "" + for i := 1; i <= len(events); i++ { + pattern += fmt.Sprintf("(?%d)", i) + } + return sequenceParts{ + seqPattern: pattern, + seqEvents: strings.Join(events, ", "), + } +} + +func buildOneSeriesSequence(filters []Filter) string { + return strings.Join(buildFilterConditions(filters), " AND ") +} + +func buildFilterConditions(filters []Filter) []string { + var out []string + for _, f := range filters { + switch f.Type { + case FilterClick: + out = append(out, + fmt.Sprintf(`(main."$event_name" = 'CLICK' AND JSONExtractString(toString(main."$properties"), 'label') IN ('%s'))`, + strings.Join(f.Value, "','"))) + case FilterInput: + out = append(out, + fmt.Sprintf(`(main."$event_name" = 'INPUT' AND JSONExtractString(toString(main."$properties"), 'label') IN ('%s'))`, + strings.Join(f.Value, "','"))) + + default: + out = append(out, + fmt.Sprintf(`(main."$event_name" = '%s')`, strings.ToUpper(string(f.Type)))) + } + } + return out +} diff --git a/backend/pkg/analytics/db/connector.go b/backend/pkg/analytics/db/connector.go new file mode 100644 index 000000000..c06dfa998 --- /dev/null +++ b/backend/pkg/analytics/db/connector.go @@ -0,0 +1,64 @@ +package db + +import ( + "context" + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "openreplay/backend/internal/config/common" + "time" +) + +type TableValue struct { + Name string `json:"name"` + Total uint64 `json:"total"` +} + +type TableResponse struct { + Total uint64 `json:"total"` + Count uint64 `json:"count"` + Values []TableValue `json:"values"` +} + +type Connector interface { + Stop() error + Query(query string) (driver.Rows, error) +} + +type connectorImpl struct { + conn driver.Conn +} + +func NewConnector(cfg common.Clickhouse) (Connector, error) { + conn, err := clickhouse.Open(&clickhouse.Options{ + Addr: []string{cfg.GetTrimmedURL()}, + Auth: clickhouse.Auth{ + Database: cfg.Database, + Username: cfg.LegacyUserName, + Password: cfg.LegacyPassword, + }, + MaxOpenConns: 20, + MaxIdleConns: 15, + ConnMaxLifetime: 3 * time.Minute, + Compression: &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }, + }) + if err != nil { + return nil, err + } + return &connectorImpl{conn: conn}, nil +} + +func (c *connectorImpl) Stop() error { + return c.conn.Close() +} + +func (c *connectorImpl) Query(query string) (driver.Rows, error) { + rows, err := c.conn.Query(context.Background(), query) + if err != nil { + return nil, err + } + //defer rows.Close() + + return rows, nil +} diff --git a/backend/pkg/analytics/query/chartQuery.go b/backend/pkg/analytics/query/chartQuery.go index 47724b53d..a9c69e0c0 100644 --- a/backend/pkg/analytics/query/chartQuery.go +++ b/backend/pkg/analytics/query/chartQuery.go @@ -1,649 +1,654 @@ -package main +package query -import ( - "encoding/json" - "fmt" - "strings" -) +//package main +// +//import ( +// "fmt" +// "strings" +//) +// +////func main() { +//// var r Root +//// //err := json.Unmarshal([]byte(jsonInput), &r) +//// //if err != nil { +//// // panic(err) +//// //} +//// // +//// ////fmt.Println("ARGS:", r) +//// //fmt.Println(buildQuery(r)) +//// ////fmt.Println("QUERY PART:", qp) +//// +//// builder := NewQueryBuilder() +//// query, err := builder.BuildQuery(r) +//// if err != nil { +//// fmt.Println("ERROR:", err) +//// } +//// +//// fmt.Println(query) +////} +// +//type Table string +//type Column string +//type FilterType string +//type EventOrder string +//type FetchFilterType string +// +//const ( +// UserOs FilterType = "userOs" +// UserBrowser FilterType = "userBrowser" +// UserDevice FilterType = "userDevice" +// UserCountry FilterType = "userCountry" +// UserCity FilterType = "userCity" +// UserState FilterType = "userState" +// UserId FilterType = "userId" +// UserAnonymousId FilterType = "userAnonymousId" +// Referrer FilterType = "referrer" +// RevId FilterType = "revId" +// UserOsIos FilterType = "userOsIos" +// UserDeviceIos FilterType = "userDeviceIos" +// UserCountryIos FilterType = "userCountryIos" +// UserIdIos FilterType = "userIdIos" +// UserAnonymousIdIos FilterType = "userAnonymousIdIos" +// RevIdIos FilterType = "revIdIos" +// Duration FilterType = "duration" +// Platform FilterType = "platform" +// Metadata FilterType = "metadata" +// Issue FilterType = "issue" +// EventsCount FilterType = "eventsCount" +// UtmSource FilterType = "utmSource" +// UtmMedium FilterType = "utmMedium" +// UtmCampaign FilterType = "utmCampaign" +// ThermalState FilterType = "thermalState" +// MainThreadCPU FilterType = "mainThreadCPU" +// ViewComponent FilterType = "viewComponent" +// LogEvent FilterType = "logEvent" +// ClickEvent FilterType = "clickEvent" +// MemoryUsage FilterType = "memoryUsage" +//) +// +//const ( +// Click FilterType = "click" +// Input FilterType = "input" +// Location FilterType = "location" +// Custom FilterType = "custom" +// Request FilterType = "request" +// Fetch FilterType = "fetch" +// GraphQL FilterType = "graphql" +// StateAction FilterType = "stateAction" +// Error FilterType = "error" +// Tag FilterType = "tag" +// ClickMobile FilterType = "clickMobile" +// InputMobile FilterType = "inputMobile" +// ViewMobile FilterType = "viewMobile" +// CustomMobile FilterType = "customMobile" +// RequestMobile FilterType = "requestMobile" +// ErrorMobile FilterType = "errorMobile" +// SwipeMobile FilterType = "swipeMobile" +//) +// +//const ( +// EventOrderThen EventOrder = "then" +// EventOrderOr EventOrder = "or" +// EventOrderAnd EventOrder = "and" +//) +// +//const ( +// FetchFilterTypeFetchUrl FilterType = "fetchUrl" +// FetchFilterTypeFetchStatusCode FilterType = "fetchStatusCode" +// FetchFilterTypeFetchMethod FilterType = "fetchMethod" +// FetchFilterTypeFetchDuration FilterType = "fetchDuration" +// FetchFilterTypeFetchRequestBody FilterType = "fetchRequestBody" +// FetchFilterTypeFetchResponseBody FilterType = "fetchResponseBody" +//) +// +//const ( +// OperatorStringIs = "is" +// OperatorStringIsAny = "isAny" +// OperatorStringOn = "on" +// OperatorStringOnAny = "onAny" +// OperatorStringIsNot = "isNot" +// OperatorStringIsUndefined = "isUndefined" +// OperatorStringNotOn = "notOn" +// OperatorStringContains = "contains" +// OperatorStringNotContains = "notContains" +// OperatorStringStartsWith = "startsWith" +// OperatorStringEndsWith = "endsWith" +//) +// +//const ( +// OperatorMathEq = "=" +// OperatorMathLt = "<" +// OperatorMathGt = ">" +// OperatorMathLe = "<=" +// OperatorMathGe = ">=" +//) +// +////-------------------------------------------------- +//// Constants for columns, tables, etc. +////-------------------------------------------------- +// +//const ( +// TableEvents Table = "product_analytics.events" +// TableSessions Table = "experimental.sessions" +// +// ColEventTime Column = "main.created_at" +// ColEventName Column = "main.`$event_name`" +// ColEventProjectID Column = "main.project_id" +// ColEventProperties Column = "main.`$properties`" +// ColEventSessionID Column = "main.session_id" +// ColEventURLPath Column = "main.url_path" +// ColEventStatus Column = "main.status" +// +// ColSessionID Column = "s.session_id" +// ColDuration Column = "s.duration" +// ColUserCountry Column = "s.user_country" +// ColUserCity Column = "s.user_city" +// ColUserState Column = "s.user_state" +// ColUserID Column = "s.user_id" +// ColUserAnonymousID Column = "s.user_anonymous_id" +// ColUserOS Column = "s.user_os" +// ColUserBrowser Column = "s.user_browser" +// ColUserDevice Column = "s.user_device" +// ColUserDeviceType Column = "s.user_device_type" +// ColRevID Column = "s.rev_id" +// ColBaseReferrer Column = "s.base_referrer" +// ColUtmSource Column = "s.utm_source" +// ColUtmMedium Column = "s.utm_medium" +// ColUtmCampaign Column = "s.utm_campaign" +// ColMetadata1 Column = "s.metadata_1" +// ColSessionProjectID Column = "s.project_id" +// ColSessionIsNotNull Column = "isNotNull(s.duration)" +//) +// +//type Root struct { +// StartTimestamp int64 `json:"startTimestamp"` +// EndTimestamp int64 `json:"endTimestamp"` +// Series []Series `json:"series"` +// ProjectID int64 `json:"projectId"` +//} +// +//type Series struct { +// SeriesID int64 `json:"seriesId"` +// Name string `json:"name"` +// Filter SeriesFilter `json:"filter"` +//} +// +//type SeriesFilter struct { +// Filters []FilterObj `json:"filters"` +// EventsOrder EventOrder `json:"eventsOrder"` +//} +// +//type FilterObj struct { +// Key string `json:"key"` +// Type FilterType `json:"type"` +// IsEvent bool `json:"isEvent"` +// Value []string `json:"value"` +// Operator string `json:"operator"` +// Source string `json:"source"` +// Filters []FilterObj `json:"filters"` +//} +// +//// -------------------------------------------------- +//func buildQuery(r Root) string { +// s := r.Series[0] +// +// // iterate over series and partition filters +// //for _, s := range r.Series { +// // sessionFilters, eventFilters := partitionFilters(s.Filter.Filters) +// // sessionWhere := buildSessionWhere(sessionFilters) +// // eventWhere, seqHaving := buildEventsWhere(eventFilters, s.Filter.EventsOrder) +// // fmt.Println("SESSION FILTERS:", sessionFilters) +// // fmt.Println("EVENT FILTERS:", eventFilters) +// // fmt.Println("SESSION WHERE:", sessionWhere) +// // fmt.Println("EVENT WHERE:", eventWhere) +// // fmt.Println("SEQ HAVING:", seqHaving) +// //} +// +// sessionFilters, eventFilters := partitionFilters(s.Filter.Filters) +// sessionWhere := buildSessionWhere(sessionFilters) +// eventWhere, seqHaving := buildEventsWhere(eventFilters, s.Filter.EventsOrder) +// +// subQuery := fmt.Sprintf( +// "SELECT %s,\n"+ +// " MIN(%s) AS first_event_ts,\n"+ +// " MAX(%s) AS last_event_ts\n"+ +// "FROM %s AS main\n"+ +// "WHERE main.project_id = %%(project_id)s\n"+ +// " AND %s >= toDateTime(%%(start_time)s/1000)\n"+ +// " AND %s <= toDateTime(%%(end_time)s/1000)\n"+ +// " AND (%s)\n"+ +// "GROUP BY %s\n"+ +// "HAVING %s", +// ColEventSessionID, +// ColEventTime, +// ColEventTime, +// TableEvents, +// ColEventTime, +// ColEventTime, +// strings.Join(eventWhere, " OR "), +// ColEventSessionID, +// seqHaving, +// ) +// +// joinQuery := fmt.Sprintf( +// "SELECT *\n"+ +// "FROM %s AS s\n"+ +// "INNER JOIN (\n"+ +// " SELECT DISTINCT ev.session_id, ev.`$current_url` AS url_path\n"+ +// " FROM %s AS ev\n"+ +// " WHERE ev.created_at >= toDateTime(%%(start_time)s/1000)\n"+ +// " AND ev.created_at <= toDateTime(%%(end_time)s/1000)\n"+ +// " AND ev.project_id = %%(project_id)s\n"+ +// " AND ev.`$event_name` = 'LOCATION'\n"+ +// ") AS extra_event USING (session_id)\n"+ +// "WHERE s.project_id = %%(project_id)s\n"+ +// " AND %s\n"+ +// " AND s.datetime >= toDateTime(%%(start_time)s/1000)\n"+ +// " AND s.datetime <= toDateTime(%%(end_time)s/1000)\n", +// TableSessions, +// TableEvents, +// ColSessionIsNotNull, +// ) +// +// if len(sessionWhere) > 0 { +// joinQuery += " AND " + strings.Join(sessionWhere, " AND ") + "\n" +// } +// +// main := fmt.Sprintf( +// "SELECT s.session_id AS session_id, s.url_path\n"+ +// "FROM (\n%s\n) AS f\n"+ +// "INNER JOIN (\n%s) AS s\n"+ +// " ON (s.session_id = f.session_id)\n", +// subQuery, +// joinQuery, +// ) +// +// final := fmt.Sprintf( +// "SELECT COUNT(DISTINCT url_path) OVER () AS main_count,\n"+ +// " url_path AS name,\n"+ +// " COUNT(DISTINCT session_id) AS total,\n"+ +// " COALESCE(SUM(COUNT(DISTINCT session_id)) OVER (), 0) AS total_count\n"+ +// "FROM (\n%s) AS filtered_sessions\n"+ +// "GROUP BY url_path\n"+ +// "ORDER BY total DESC\n"+ +// "LIMIT 200 OFFSET 0;", +// main, +// ) +// +// return final +//} +// +//func partitionFilters(filters []FilterObj) (sessionFilters, eventFilters []FilterObj) { +// for _, f := range filters { +// if f.IsEvent { +// eventFilters = append(eventFilters, f) +// } else { +// sessionFilters = append(sessionFilters, f) +// } +// } +// return +//} +// +//func buildSessionWhere(filters []FilterObj) []string { +// var conds []string +// for _, f := range filters { +// switch f.Type { +// case UserCountry: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCountry, concatValues(f.Value))) +// case UserCity: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCity, concatValues(f.Value))) +// case UserState: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserState, concatValues(f.Value))) +// case UserId: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserID, concatValues(f.Value))) +// case UserAnonymousId: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserAnonymousID, concatValues(f.Value))) +// case UserOs: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserOS, concatValues(f.Value))) +// case UserBrowser: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserBrowser, concatValues(f.Value))) +// case UserDevice: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDevice, concatValues(f.Value))) +// case Platform: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDeviceType, concatValues(f.Value))) +// case RevId: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColRevID, concatValues(f.Value))) +// case Referrer: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColBaseReferrer, concatValues(f.Value))) +// case Duration: +// if len(f.Value) == 2 { +// conds = append(conds, fmt.Sprintf("%s >= '%s'", ColDuration, f.Value[0])) +// conds = append(conds, fmt.Sprintf("%s <= '%s'", ColDuration, f.Value[1])) +// } +// case UtmSource: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmSource, concatValues(f.Value))) +// case UtmMedium: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmMedium, concatValues(f.Value))) +// case UtmCampaign: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmCampaign, concatValues(f.Value))) +// case Metadata: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColMetadata1, concatValues(f.Value))) +// } +// } +// // add /n to each condition +// for i := range conds { +// conds[i] += "\n" +// } +// return conds +//} +// +//func parseOperator(op string) string { +// switch strings.ToLower(op) { +// case OperatorStringContains: +// return OperatorMathEq // interpret as "LIKE" if needed +// case OperatorStringIs, OperatorStringOn, "=", OperatorStringOnAny: +// return OperatorMathEq +// case OperatorStringStartsWith: +// // might interpret differently in real impl +// return OperatorMathEq +// case OperatorStringEndsWith: +// // might interpret differently in real impl +// return OperatorMathEq +// default: +// return OperatorMathEq +// } +//} +// +//func buildEventsWhere(filters []FilterObj, order EventOrder) (eventConditions []string, having string) { +// basicEventTypes := "(" + +// strings.Join([]string{ +// fmt.Sprintf("%s = 'CLICK'", ColEventName), +// fmt.Sprintf("%s = 'INPUT'", ColEventName), +// fmt.Sprintf("%s = 'LOCATION'", ColEventName), +// fmt.Sprintf("%s = 'CUSTOM'", ColEventName), +// fmt.Sprintf("%s = 'REQUEST'", ColEventName), +// }, " OR ") + ")" +// +// var seq []string +// for _, f := range filters { +// switch f.Type { +// case Click: +// seq = append(seq, seqCond("CLICK", "selector", f)) +// case Input: +// seq = append(seq, seqCond("INPUT", "label", f)) +// case Location: +// seq = append(seq, seqCond("LOCATION", "url_path", f)) +// case Custom: +// seq = append(seq, seqCond("CUSTOM", "name", f)) +// case Fetch: +// seq = append(seq, seqFetchCond("REQUEST", f)) +// case FetchFilterTypeFetchStatusCode: +// seq = append(seq, seqCond("REQUEST", "status", f)) +// default: +// seq = append(seq, fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(string(f.Type)))) +// } +// } +// eventConditions = []string{basicEventTypes} +// +// // then => sequenceMatch +// // or => OR +// // and => AND +// switch order { +// case EventOrderThen: +// var pattern []string +// for i := range seq { +// pattern = append(pattern, fmt.Sprintf("(?%d)", i+1)) +// } +// having = fmt.Sprintf("sequenceMatch('%s')(\n%s,\n%s)", +// strings.Join(pattern, ""), fmt.Sprintf("toUnixTimestamp(%s)", ColEventTime), strings.Join(seq, ",\n")) +// case EventOrderAnd: +// // build AND +// having = strings.Join(seq, " AND ") +// default: +// // default => OR +// var orParts []string +// for _, p := range seq { +// orParts = append(orParts, "("+p+")") +// } +// having = strings.Join(orParts, " OR ") +// } +// return +//} +// +//func seqCond(eventName, key string, f FilterObj) string { +// op := parseOperator(f.Operator) +// return fmt.Sprintf("(%s = '%s' AND JSONExtractString(toString(%s), '%s') %s '%s')", +// ColEventName, strings.ToUpper(eventName), ColEventProperties, key, op, concatValues(f.Value)) +//} +// +//func seqFetchCond(eventName string, f FilterObj) string { +// w := []string{fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(eventName))} +// var extras []string +// for _, c := range f.Filters { +// switch c.Type { +// case Fetch: +// if len(c.Value) > 0 { +// extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventURLPath, concatValues(c.Value))) +// } +// case FetchFilterTypeFetchStatusCode: +// if len(c.Value) > 0 { +// extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventStatus, concatValues(c.Value))) +// } +// default: +// // placeholder if needed +// } +// } +// if len(extras) > 0 { +// w = append(w, strings.Join(extras, " AND ")) +// } +// return "(" + strings.Join(w, " AND ") + ")" +//} +// +//func concatValues(v []string) string { +// return strings.Join(v, "") +//} -func main() { - var r Root - err := json.Unmarshal([]byte(jsonInput), &r) - if err != nil { - panic(err) - } - - //fmt.Println("ARGS:", r) - fmt.Println(buildQuery(r)) - //fmt.Println("QUERY PART:", qp) -} - -type Table string -type Column string -type FilterType string -type EventOrder string -type FetchFilterType string - -const ( - UserOs FilterType = "userOs" - UserBrowser FilterType = "userBrowser" - UserDevice FilterType = "userDevice" - UserCountry FilterType = "userCountry" - UserCity FilterType = "userCity" - UserState FilterType = "userState" - UserId FilterType = "userId" - UserAnonymousId FilterType = "userAnonymousId" - Referrer FilterType = "referrer" - RevId FilterType = "revId" - UserOsIos FilterType = "userOsIos" - UserDeviceIos FilterType = "userDeviceIos" - UserCountryIos FilterType = "userCountryIos" - UserIdIos FilterType = "userIdIos" - UserAnonymousIdIos FilterType = "userAnonymousIdIos" - RevIdIos FilterType = "revIdIos" - Duration FilterType = "duration" - Platform FilterType = "platform" - Metadata FilterType = "metadata" - Issue FilterType = "issue" - EventsCount FilterType = "eventsCount" - UtmSource FilterType = "utmSource" - UtmMedium FilterType = "utmMedium" - UtmCampaign FilterType = "utmCampaign" - ThermalState FilterType = "thermalState" - MainThreadCPU FilterType = "mainThreadCPU" - ViewComponent FilterType = "viewComponent" - LogEvent FilterType = "logEvent" - ClickEvent FilterType = "clickEvent" - MemoryUsage FilterType = "memoryUsage" -) - -const ( - Click FilterType = "click" - Input FilterType = "input" - Location FilterType = "location" - Custom FilterType = "custom" - Request FilterType = "request" - Fetch FilterType = "fetch" - GraphQL FilterType = "graphql" - StateAction FilterType = "stateAction" - Error FilterType = "error" - Tag FilterType = "tag" - ClickMobile FilterType = "clickMobile" - InputMobile FilterType = "inputMobile" - ViewMobile FilterType = "viewMobile" - CustomMobile FilterType = "customMobile" - RequestMobile FilterType = "requestMobile" - ErrorMobile FilterType = "errorMobile" - SwipeMobile FilterType = "swipeMobile" -) - -const ( - EventOrderThen EventOrder = "then" - EventOrderOr EventOrder = "or" - EventOrderAnd EventOrder = "and" -) - -const ( - FetchFilterTypeFetchUrl FilterType = "fetchUrl" - FetchFilterTypeFetchStatusCode FilterType = "fetchStatusCode" - FetchFilterTypeFetchMethod FilterType = "fetchMethod" - FetchFilterTypeFetchDuration FilterType = "fetchDuration" - FetchFilterTypeFetchRequestBody FilterType = "fetchRequestBody" - FetchFilterTypeFetchResponseBody FilterType = "fetchResponseBody" -) - -const ( - OperatorStringIs = "is" - OperatorStringIsAny = "isAny" - OperatorStringOn = "on" - OperatorStringOnAny = "onAny" - OperatorStringIsNot = "isNot" - OperatorStringIsUndefined = "isUndefined" - OperatorStringNotOn = "notOn" - OperatorStringContains = "contains" - OperatorStringNotContains = "notContains" - OperatorStringStartsWith = "startsWith" - OperatorStringEndsWith = "endsWith" -) - -const ( - OperatorMathEq = "=" - OperatorMathLt = "<" - OperatorMathGt = ">" - OperatorMathLe = "<=" - OperatorMathGe = ">=" -) - -//-------------------------------------------------- -// Constants for columns, tables, etc. -//-------------------------------------------------- - -const ( - TableEvents Table = "product_analytics.events" - TableSessions Table = "experimental.sessions" - - ColEventTime Column = "main.created_at" - ColEventName Column = "main.`$event_name`" - ColEventProjectID Column = "main.project_id" - ColEventProperties Column = "main.`$properties`" - ColEventSessionID Column = "main.session_id" - ColEventURLPath Column = "main.url_path" - ColEventStatus Column = "main.status" - - ColSessionID Column = "s.session_id" - ColDuration Column = "s.duration" - ColUserCountry Column = "s.user_country" - ColUserCity Column = "s.user_city" - ColUserState Column = "s.user_state" - ColUserID Column = "s.user_id" - ColUserAnonymousID Column = "s.user_anonymous_id" - ColUserOS Column = "s.user_os" - ColUserBrowser Column = "s.user_browser" - ColUserDevice Column = "s.user_device" - ColUserDeviceType Column = "s.user_device_type" - ColRevID Column = "s.rev_id" - ColBaseReferrer Column = "s.base_referrer" - ColUtmSource Column = "s.utm_source" - ColUtmMedium Column = "s.utm_medium" - ColUtmCampaign Column = "s.utm_campaign" - ColMetadata1 Column = "s.metadata_1" - ColSessionProjectID Column = "s.project_id" - ColSessionIsNotNull Column = "isNotNull(s.duration)" -) - -type Root struct { - StartTimestamp int64 `json:"startTimestamp"` - EndTimestamp int64 `json:"endTimestamp"` - Series []Series `json:"series"` -} - -type Series struct { - SeriesID int64 `json:"seriesId"` - Name string `json:"name"` - Filter SeriesFilter `json:"filter"` -} - -type SeriesFilter struct { - Filters []FilterObj `json:"filters"` - EventsOrder EventOrder `json:"eventsOrder"` -} - -type FilterObj struct { - Type FilterType `json:"type"` - IsEvent bool `json:"isEvent"` - Value []string `json:"value"` - Operator string `json:"operator"` - Source string `json:"source"` - Filters []FilterObj `json:"filters"` -} - -// -------------------------------------------------- -func buildQuery(r Root) string { - s := r.Series[0] - - // iterate over series and partition filters - //for _, s := range r.Series { - // sessionFilters, eventFilters := partitionFilters(s.Filter.Filters) - // sessionWhere := buildSessionWhere(sessionFilters) - // eventWhere, seqHaving := buildEventsWhere(eventFilters, s.Filter.EventsOrder) - // fmt.Println("SESSION FILTERS:", sessionFilters) - // fmt.Println("EVENT FILTERS:", eventFilters) - // fmt.Println("SESSION WHERE:", sessionWhere) - // fmt.Println("EVENT WHERE:", eventWhere) - // fmt.Println("SEQ HAVING:", seqHaving) - //} - - sessionFilters, eventFilters := partitionFilters(s.Filter.Filters) - sessionWhere := buildSessionWhere(sessionFilters) - eventWhere, seqHaving := buildEventsWhere(eventFilters, s.Filter.EventsOrder) - - subQuery := fmt.Sprintf( - "SELECT %s,\n"+ - " MIN(%s) AS first_event_ts,\n"+ - " MAX(%s) AS last_event_ts\n"+ - "FROM %s AS main\n"+ - "WHERE main.project_id = %%(project_id)s\n"+ - " AND %s >= toDateTime(%%(start_time)s/1000)\n"+ - " AND %s <= toDateTime(%%(end_time)s/1000)\n"+ - " AND (%s)\n"+ - "GROUP BY %s\n"+ - "HAVING %s", - ColEventSessionID, - ColEventTime, - ColEventTime, - TableEvents, - ColEventTime, - ColEventTime, - strings.Join(eventWhere, " OR "), - ColEventSessionID, - seqHaving, - ) - - joinQuery := fmt.Sprintf( - "SELECT *\n"+ - "FROM %s AS s\n"+ - "INNER JOIN (\n"+ - " SELECT DISTINCT ev.session_id, ev.`$current_url` AS url_path\n"+ - " FROM %s AS ev\n"+ - " WHERE ev.created_at >= toDateTime(%%(start_time)s/1000)\n"+ - " AND ev.created_at <= toDateTime(%%(end_time)s/1000)\n"+ - " AND ev.project_id = %%(project_id)s\n"+ - " AND ev.`$event_name` = 'LOCATION'\n"+ - ") AS extra_event USING (session_id)\n"+ - "WHERE s.project_id = %%(project_id)s\n"+ - " AND %s\n"+ - " AND s.datetime >= toDateTime(%%(start_time)s/1000)\n"+ - " AND s.datetime <= toDateTime(%%(end_time)s/1000)\n", - TableSessions, - TableEvents, - ColSessionIsNotNull, - ) - - if len(sessionWhere) > 0 { - joinQuery += " AND " + strings.Join(sessionWhere, " AND ") + "\n" - } - - main := fmt.Sprintf( - "SELECT s.session_id AS session_id, s.url_path\n"+ - "FROM (\n%s\n) AS f\n"+ - "INNER JOIN (\n%s) AS s\n"+ - " ON (s.session_id = f.session_id)\n", - subQuery, - joinQuery, - ) - - final := fmt.Sprintf( - "SELECT COUNT(DISTINCT url_path) OVER () AS main_count,\n"+ - " url_path AS name,\n"+ - " COUNT(DISTINCT session_id) AS total,\n"+ - " COALESCE(SUM(COUNT(DISTINCT session_id)) OVER (), 0) AS total_count\n"+ - "FROM (\n%s) AS filtered_sessions\n"+ - "GROUP BY url_path\n"+ - "ORDER BY total DESC\n"+ - "LIMIT 200 OFFSET 0;", - main, - ) - - return final -} - -func partitionFilters(filters []FilterObj) (sessionFilters, eventFilters []FilterObj) { - for _, f := range filters { - if f.IsEvent { - eventFilters = append(eventFilters, f) - } else { - sessionFilters = append(sessionFilters, f) - } - } - return -} - -func buildSessionWhere(filters []FilterObj) []string { - var conds []string - for _, f := range filters { - switch f.Type { - case UserCountry: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCountry, concatValues(f.Value))) - case UserCity: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCity, concatValues(f.Value))) - case UserState: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserState, concatValues(f.Value))) - case UserId: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserID, concatValues(f.Value))) - case UserAnonymousId: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserAnonymousID, concatValues(f.Value))) - case UserOs: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserOS, concatValues(f.Value))) - case UserBrowser: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserBrowser, concatValues(f.Value))) - case UserDevice: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDevice, concatValues(f.Value))) - case Platform: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDeviceType, concatValues(f.Value))) - case RevId: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColRevID, concatValues(f.Value))) - case Referrer: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColBaseReferrer, concatValues(f.Value))) - case Duration: - if len(f.Value) == 2 { - conds = append(conds, fmt.Sprintf("%s >= '%s'", ColDuration, f.Value[0])) - conds = append(conds, fmt.Sprintf("%s <= '%s'", ColDuration, f.Value[1])) - } - case UtmSource: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmSource, concatValues(f.Value))) - case UtmMedium: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmMedium, concatValues(f.Value))) - case UtmCampaign: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmCampaign, concatValues(f.Value))) - case Metadata: - conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColMetadata1, concatValues(f.Value))) - } - } - // add /n to each condition - for i := range conds { - conds[i] += "\n" - } - return conds -} - -func parseOperator(op string) string { - switch strings.ToLower(op) { - case OperatorStringContains: - return OperatorMathEq // interpret as "LIKE" if needed - case OperatorStringIs, OperatorStringOn, "=", OperatorStringOnAny: - return OperatorMathEq - case OperatorStringStartsWith: - // might interpret differently in real impl - return OperatorMathEq - case OperatorStringEndsWith: - // might interpret differently in real impl - return OperatorMathEq - default: - return OperatorMathEq - } -} - -func buildEventsWhere(filters []FilterObj, order EventOrder) (eventConditions []string, having string) { - basicEventTypes := "(" + - strings.Join([]string{ - fmt.Sprintf("%s = 'CLICK'", ColEventName), - fmt.Sprintf("%s = 'INPUT'", ColEventName), - fmt.Sprintf("%s = 'LOCATION'", ColEventName), - fmt.Sprintf("%s = 'CUSTOM'", ColEventName), - fmt.Sprintf("%s = 'REQUEST'", ColEventName), - }, " OR ") + ")" - - var seq []string - for _, f := range filters { - switch f.Type { - case Click: - seq = append(seq, seqCond("CLICK", "selector", f)) - case Input: - seq = append(seq, seqCond("INPUT", "label", f)) - case Location: - seq = append(seq, seqCond("LOCATION", "url_path", f)) - case Custom: - seq = append(seq, seqCond("CUSTOM", "name", f)) - case Fetch: - seq = append(seq, seqFetchCond("REQUEST", f)) - case FetchFilterTypeFetchStatusCode: - seq = append(seq, seqCond("REQUEST", "status", f)) - default: - seq = append(seq, fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(string(f.Type)))) - } - } - eventConditions = []string{basicEventTypes} - - // then => sequenceMatch - // or => OR - // and => AND - switch order { - case EventOrderThen: - var pattern []string - for i := range seq { - pattern = append(pattern, fmt.Sprintf("(?%d)", i+1)) - } - having = fmt.Sprintf("sequenceMatch('%s')(\n%s,\n%s)", - strings.Join(pattern, ""), fmt.Sprintf("toUnixTimestamp(%s)", ColEventTime), strings.Join(seq, ",\n")) - case EventOrderAnd: - // build AND - having = strings.Join(seq, " AND ") - default: - // default => OR - var orParts []string - for _, p := range seq { - orParts = append(orParts, "("+p+")") - } - having = strings.Join(orParts, " OR ") - } - return -} - -func seqCond(eventName, key string, f FilterObj) string { - op := parseOperator(f.Operator) - return fmt.Sprintf("(%s = '%s' AND JSONExtractString(toString(%s), '%s') %s '%s')", - ColEventName, strings.ToUpper(eventName), ColEventProperties, key, op, concatValues(f.Value)) -} - -func seqFetchCond(eventName string, f FilterObj) string { - w := []string{fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(eventName))} - var extras []string - for _, c := range f.Filters { - switch c.Type { - case Fetch: - if len(c.Value) > 0 { - extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventURLPath, concatValues(c.Value))) - } - case FetchFilterTypeFetchStatusCode: - if len(c.Value) > 0 { - extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventStatus, concatValues(c.Value))) - } - default: - // placeholder if needed - } - } - if len(extras) > 0 { - w = append(w, strings.Join(extras, " AND ")) - } - return "(" + strings.Join(w, " AND ") + ")" -} - -func concatValues(v []string) string { - return strings.Join(v, "") -} - -const jsonInput = ` -{ - "startTimestamp": 1737043724664, - "endTimestamp": 1737130124664, - "series": [ - { - "seriesId": 610, - "name": "Series 1", - "filter": { - "filters": [ - { - "type": "click", - "isEvent": true, - "value": ["DEPLOYMENT"], - "operator": "on", - "filters": [] - }, - { - "type": "input", - "isEvent": true, - "value": ["a"], - "operator": "contains", - "filters": [] - }, - { - "type": "location", - "isEvent": true, - "value": ["/en/using-or/"], - "operator": "is", - "filters": [] - }, - { - "type": "userCountry", - "isEvent": false, - "value": ["AD"], - "operator": "is", - "filters": [] - }, - { - "type": "userCity", - "isEvent": false, - "value": ["Mumbai"], - "operator": "is", - "filters": [] - }, - { - "type": "userState", - "isEvent": false, - "value": ["Maharashtra"], - "operator": "is", - "filters": [] - }, - { - "type": "userId", - "isEvent": false, - "value": ["test@test.com"], - "operator": "is", - "filters": [] - }, - { - "type": "userAnonymousId", - "isEvent": false, - "value": ["asd"], - "operator": "is", - "filters": [] - }, - { - "type": "userOs", - "isEvent": false, - "value": ["Mac OS X"], - "operator": "is", - "filters": [] - }, - { - "type": "userBrowser", - "isEvent": false, - "value": ["Chrome"], - "operator": "is", - "filters": [] - }, - { - "type": "userDevice", - "isEvent": false, - "value": ["iPhone"], - "operator": "is", - "filters": [] - }, - { - "type": "platform", - "isEvent": false, - "value": ["desktop"], - "operator": "is", - "filters": [] - }, - { - "type": "revId", - "isEvent": false, - "value": ["v1"], - "operator": "is", - "filters": [] - }, - { - "type": "referrer", - "isEvent": false, - "value": ["https://www.google.com/"], - "operator": "is", - "filters": [] - }, - { - "type": "duration", - "isEvent": false, - "value": ["60000", "6000000"], - "operator": "is", - "filters": [] - }, - { - "type": "tag", - "isEvent": true, - "value": ["8"], - "operator": "is", - "filters": [] - }, - { - "type": "utmSource", - "isEvent": false, - "value": ["aaa"], - "operator": "is", - "filters": [] - }, - { - "type": "utmMedium", - "isEvent": false, - "value": ["aa"], - "operator": "is", - "filters": [] - }, - { - "type": "utmCampaign", - "isEvent": false, - "value": ["aaa"], - "operator": "is", - "filters": [] - }, - { - "type": "metadata", - "isEvent": false, - "value": ["bbbb"], - "operator": "is", - "source": "userId", - "filters": [] - }, - { - "type": "custom", - "isEvent": true, - "value": ["test"], - "operator": "is", - "filters": [] - }, - { - "type": "fetch", - "isEvent": true, - "value": [], - "operator": "is", - "filters": [ - { - "type": "fetchUrl", - "isEvent": false, - "value": ["/ai/docs/chat"], - "operator": "is", - "filters": [] - }, - { - "type": "fetchStatusCode", - "isEvent": false, - "value": ["400"], - "operator": "=", - "filters": [] - }, - { - "type": "fetchMethod", - "isEvent": false, - "value": [], - "operator": "is", - "filters": [] - }, - { - "type": "fetchDuration", - "isEvent": false, - "value": [], - "operator": "=", - "filters": [] - }, - { - "type": "fetchRequestBody", - "isEvent": false, - "value": [], - "operator": "is", - "filters": [] - }, - { - "type": "fetchResponseBody", - "isEvent": false, - "value": [], - "operator": "is", - "filters": [] - } - ] - } - ], - "eventsOrder": "then" - } - } - ] -} -` +//const jsonInput = ` +//{ +// "startTimestamp": 1737043724664, +// "endTimestamp": 1737130124664, +// "projectId": 1, +// "series": [ +// { +// "seriesId": 610, +// "name": "Series 1", +// "filter": { +// "filters": [ +// { +// "type": "click", +// "isEvent": true, +// "value": ["DEPLOYMENT"], +// "operator": "on", +// "filters": [] +// }, +// { +// "type": "input", +// "isEvent": true, +// "value": ["a"], +// "operator": "contains", +// "filters": [] +// }, +// { +// "type": "location", +// "isEvent": true, +// "value": ["/en/using-or/"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "userCountry", +// "isEvent": false, +// "value": ["AD"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "userCity", +// "isEvent": false, +// "value": ["Mumbai"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "userState", +// "isEvent": false, +// "value": ["Maharashtra"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "userId", +// "isEvent": false, +// "value": ["test@test.com"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "userAnonymousId", +// "isEvent": false, +// "value": ["asd"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "userOs", +// "isEvent": false, +// "value": ["Mac OS X"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "userBrowser", +// "isEvent": false, +// "value": ["Chrome"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "userDevice", +// "isEvent": false, +// "value": ["iPhone"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "platform", +// "isEvent": false, +// "value": ["desktop"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "revId", +// "isEvent": false, +// "value": ["v1"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "referrer", +// "isEvent": false, +// "value": ["https://www.google.com/"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "duration", +// "isEvent": false, +// "value": ["60000", "6000000"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "utmSource", +// "isEvent": false, +// "value": ["aaa"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "utmMedium", +// "isEvent": false, +// "value": ["aa"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "utmCampaign", +// "isEvent": false, +// "value": ["aaa"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "metadata", +// "isEvent": false, +// "value": ["bbbb"], +// "operator": "is", +// "source": "userId", +// "filters": [] +// }, +// { +// "type": "custom", +// "isEvent": true, +// "value": ["test"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "fetch", +// "isEvent": true, +// "value": [], +// "operator": "is", +// "filters": [ +// { +// "type": "fetchUrl", +// "isEvent": false, +// "value": ["/ai/docs/chat"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "fetchStatusCode", +// "isEvent": false, +// "value": ["400"], +// "operator": "=", +// "filters": [] +// }, +// { +// "type": "fetchMethod", +// "isEvent": false, +// "value": [], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "fetchDuration", +// "isEvent": false, +// "value": [], +// "operator": "=", +// "filters": [] +// }, +// { +// "type": "fetchRequestBody", +// "isEvent": false, +// "value": [], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "fetchResponseBody", +// "isEvent": false, +// "value": [], +// "operator": "is", +// "filters": [] +// } +// ] +// } +// ], +// "eventsOrder": "then" +// } +// } +// ] +//} +//` diff --git a/backend/pkg/analytics/query/funnel.go b/backend/pkg/analytics/query/funnel.go new file mode 100644 index 000000000..c8632b401 --- /dev/null +++ b/backend/pkg/analytics/query/funnel.go @@ -0,0 +1,7 @@ +package query + +type FunnelQueryBuilder struct{} + +func (f FunnelQueryBuilder) Build(p MetricPayload) string { + return "-- Funnel query placeholder" +} diff --git a/backend/pkg/analytics/query/model.go b/backend/pkg/analytics/query/model.go new file mode 100644 index 000000000..9a315e0db --- /dev/null +++ b/backend/pkg/analytics/query/model.go @@ -0,0 +1,137 @@ +package query + +type Table string +type Column string +type MetricType string +type FilterType string +type EventOrder string + +const ( + TableEvents Table = "product_analytics.events" + TableSessions Table = "experimental.sessions" +) + +const ( + ColEventTime Column = "main.created_at" + ColEventName Column = "main.`$event_name`" + ColEventProjectID Column = "main.project_id" + ColEventProperties Column = "main.`$properties`" + ColEventSessionID Column = "main.session_id" + ColEventURLPath Column = "main.url_path" + ColEventStatus Column = "main.status" +) + +const ( + ColSessionID Column = "s.session_id" + ColDuration Column = "s.duration" + ColUserCountry Column = "s.user_country" + ColUserCity Column = "s.user_city" + ColUserState Column = "s.user_state" + ColUserID Column = "s.user_id" + ColUserAnonymousID Column = "s.user_anonymous_id" + ColUserOS Column = "s.user_os" + ColUserBrowser Column = "s.user_browser" + ColUserDevice Column = "s.user_device" + ColUserDeviceType Column = "s.user_device_type" + ColRevID Column = "s.rev_id" + ColBaseReferrer Column = "s.base_referrer" + ColUtmSource Column = "s.utm_source" + ColUtmMedium Column = "s.utm_medium" + ColUtmCampaign Column = "s.utm_campaign" + ColMetadata1 Column = "s.metadata_1" + ColSessionProjectID Column = "s.project_id" + ColSessionIsNotNull Column = "isNotNull(s.duration)" +) + +const ( + MetricTypeTimeseries MetricType = "timeseries" + MetricTypeTable MetricType = "table" + MetricTypeFunnel MetricType = "funnel" +) + +const ( + EventOrderThen EventOrder = "then" + EventOrderOr EventOrder = "or" + EventOrderAnd EventOrder = "and" +) + +type MetricPayload struct { + StartTimestamp int64 `json:"startTimestamp"` + EndTimestamp int64 `json:"endTimestamp"` + Density int `json:"density"` + MetricOf string `json:"metricOf"` + MetricType MetricType `json:"metricType"` + MetricFormat string `json:"metricFormat"` + ViewType string `json:"viewType"` + Name string `json:"name"` + Series []Series `json:"series"` + CompareTo *string `json:"compareTo"` +} + +type Series struct { + Name string `json:"name"` + Filter struct { + Filters []Filter `json:"filters"` + EventsOrder EventOrder `json:"eventsOrder"` + } `json:"filter"` +} + +type Filter struct { + Type FilterType `json:"type"` + IsEvent bool `json:"isEvent"` + Value []string `json:"value"` + Operator string `json:"operator"` + Filters []Filter `json:"filters"` +} + +const ( + FilterUserOs FilterType = "userOs" + FilterUserBrowser FilterType = "userBrowser" + FilterUserDevice FilterType = "userDevice" + FilterUserCountry FilterType = "userCountry" + FilterUserCity FilterType = "userCity" + FilterUserState FilterType = "userState" + FilterUserId FilterType = "userId" + FilterUserAnonymousId FilterType = "userAnonymousId" + FilterReferrer FilterType = "referrer" + FilterRevId FilterType = "revId" + FilterUserOsIos FilterType = "userOsIos" + FilterUserDeviceIos FilterType = "userDeviceIos" + FilterUserCountryIos FilterType = "userCountryIos" + FilterUserIdIos FilterType = "userIdIos" + FilterUserAnonymousIdIos FilterType = "userAnonymousIdIos" + FilterRevIdIos FilterType = "revIdIos" + FilterDuration FilterType = "duration" + FilterPlatform FilterType = "platform" + FilterMetadata FilterType = "metadata" + FilterIssue FilterType = "issue" + FilterEventsCount FilterType = "eventsCount" + FilterUtmSource FilterType = "utmSource" + FilterUtmMedium FilterType = "utmMedium" + FilterUtmCampaign FilterType = "utmCampaign" + FilterThermalState FilterType = "thermalState" + FilterMainThreadCPU FilterType = "mainThreadCPU" + FilterViewComponent FilterType = "viewComponent" + FilterLogEvent FilterType = "logEvent" + FilterMemoryUsage FilterType = "memoryUsage" + FilterClick FilterType = "click" + FilterInput FilterType = "input" + FilterLocation FilterType = "location" + FilterCustom FilterType = "customEvent" + FilterFetch FilterType = "fetch" + FilterFetchStatusCode FilterType = "status" +) + +const ( + OperatorStringIs = "is" + OperatorStringIsAny = "isAny" + OperatorStringOn = "on" + OperatorStringOnAny = "onAny" + OperatorStringIsNot = "isNot" + OperatorStringIsUndefined = "isUndefined" + OperatorStringNotOn = "notOn" + OperatorStringContains = "contains" + OperatorStringNotContains = "notContains" + OperatorStringStartsWith = "startsWith" + OperatorStringEndsWith = "endsWith" +) diff --git a/backend/pkg/analytics/query/queryBuilder.go b/backend/pkg/analytics/query/queryBuilder.go new file mode 100644 index 000000000..b45359a04 --- /dev/null +++ b/backend/pkg/analytics/query/queryBuilder.go @@ -0,0 +1,253 @@ +package query + +import ( + "encoding/json" + "fmt" + "strings" +) + +type NewQueryBuilder interface { + Build(MetricPayload) string +} + +func buildEventSubquery(p MetricPayload) string { + baseEventsWhere := buildBaseEventsWhere(p) + sequenceCond := buildSequenceCondition(p.Series) + sessionsWhere := buildSessionsWhere(p) + + // If there's no sequence pattern, skip HAVING entirely. + if sequenceCond.seqPattern == "" { + return fmt.Sprintf(` +SELECT s.%[1]s AS %[1]s, + s.datetime AS datetime +FROM ( + SELECT main.session_id, + MIN(main.created_at) AS first_event_ts, + MAX(main.created_at) AS last_event_ts + FROM product_analytics.events AS main + WHERE %[2]s + GROUP BY session_id +) AS f +INNER JOIN ( + SELECT * + FROM experimental.sessions AS s + WHERE %[3]s +) AS s ON (s.session_id = f.session_id) +`, pickIDField(p), baseEventsWhere, sessionsWhere) + } + + return fmt.Sprintf(` +SELECT s.%[1]s AS %[1]s, + s.datetime AS datetime +FROM ( + SELECT main.session_id, + MIN(main.created_at) AS first_event_ts, + MAX(main.created_at) AS last_event_ts + FROM product_analytics.events AS main + WHERE %[2]s + GROUP BY session_id + HAVING sequenceMatch('%[3]s')(toDateTime(main.created_at), %[4]s) +) AS f +INNER JOIN ( + SELECT * + FROM experimental.sessions AS s + WHERE %[5]s +) AS s ON (s.session_id = f.session_id) +`, pickIDField(p), baseEventsWhere, sequenceCond.seqPattern, sequenceCond.seqEvents, sessionsWhere) +} + +func pickIDField(p MetricPayload) string { + if p.MetricOf == "userCount" { + return "user_id" + } + return "session_id" +} + +func buildBaseEventsWhere(p MetricPayload) string { + projectID := 5 + ts := fmt.Sprintf( + `(main.created_at >= toDateTime(%d / 1000) AND main.created_at <= toDateTime(%d / 1000))`, + p.StartTimestamp, + p.EndTimestamp, + ) + return fmt.Sprintf(`main.project_id = %d AND %s`, projectID, ts) +} + +func buildSessionsWhere(p MetricPayload) string { + projectID := 5 + ts := fmt.Sprintf( + `(s.datetime >= toDateTime(%d / 1000) AND s.datetime <= toDateTime(%d / 1000))`, + p.StartTimestamp, + p.EndTimestamp, + ) + return fmt.Sprintf(`s.project_id = %d AND isNotNull(s.duration) AND %s`, projectID, ts) +} + +type sequenceParts struct { + seqPattern string + seqEvents string +} + +func buildSequenceCondition(series []Series) sequenceParts { + var events []string + for _, s := range series { + if len(s.Filter.Filters) > 0 { + events = append(events, buildOneSeriesSequence(s.Filter.Filters)) + } + } + + if len(events) == 0 { + return sequenceParts{"", ""} + } + + // For n events, we need a pattern like `(?1)(?2)(?3)...( ?n )`. + pattern := "" + for i := 1; i <= len(events); i++ { + pattern += fmt.Sprintf("(?%d)", i) + } + + return sequenceParts{ + seqPattern: pattern, + seqEvents: strings.Join(events, ", "), + } +} + +func buildOneSeriesSequence(filters []Filter) string { + return strings.Join(buildFilterConditions(filters), " AND ") +} + +func buildFilterConditions(filters []Filter) []string { + var out []string + for _, f := range filters { + switch f.Type { + case FilterClick: + out = append(out, + fmt.Sprintf(`(main."$event_name" = 'CLICK' AND JSONExtractString(toString(main."$properties"), 'label') IN ('%s'))`, + strings.Join(f.Value, "','"))) + case FilterInput: + out = append(out, + fmt.Sprintf(`(main."$event_name" = 'INPUT' AND JSONExtractString(toString(main."$properties"), 'label') IN ('%s'))`, + strings.Join(f.Value, "','"))) + // TODO add more cases to cover all the events + default: + out = append(out, + fmt.Sprintf(`(main."$event_name" = '%s')`, strings.ToUpper(string(f.Type)))) + } + } + return out +} + +func main() { + //input := GetPayload(MetricTypeTimeseries) + input := GetPayload(MetricTypeTable) + + var payload MetricPayload + err := json.Unmarshal([]byte(input), &payload) + if err != nil { + return + } + + var qb NewQueryBuilder + switch payload.MetricType { + case MetricTypeTimeseries: + qb = TimeSeriesQueryBuilder{} + case MetricTypeFunnel: + qb = FunnelQueryBuilder{} + case MetricTypeTable: + qb = TableQueryBuilder{} + default: + qb = TimeSeriesQueryBuilder{} + } + + query := qb.Build(payload) + fmt.Println(query) +} + +func GetPayload(metricType MetricType) string { + switch metricType { + case MetricTypeTimeseries: + return `{ + "startTimestamp": 1738796399999, + "endTimestamp": 1739401199999, + "density": 7, + "metricOf": "sessionCount", + "metricValue": [], + "metricType": "timeseries", + "metricFormat": "sessionCount", + "viewType": "lineChart", + "name": "Untitled Trend", + "series": [ + { + "name": "Series 1", + "filter": { + "filters": [ + { + "type": "userId", + "isEvent": false, + "value": [ + "test@test.com" + ], + "operator": "is", + "filters": [] + } + ], + "eventsOrder": "then" + } + } + ] + }` + case MetricTypeFunnel: + return `{}` + case MetricTypeTable: + return `{ + "startTimestamp": 1737586800000, + "endTimestamp": 1738277999999, + "density": 7, + "metricOf": "userDevice", + "metricType": "table", + "metricFormat": "sessionCount", + "viewType": "table", + "name": "Untitled Trend", + "series": [ + { + "name": "Series 1", + "filter": { + "filters": [ + { + "type": "click", + "isEvent": true, + "value": ["Manuscripts"], + "operator": "on", + "filters": [] + } + ], + "eventsOrder": "then" + } + }, + { + "name": "Series 2", + "filter": { + "filters": [ + { + "type": "input", + "isEvent": true, + "value": ["test"], + "operator": "is", + "filters": [] + } + ], + "eventsOrder": "then" + } + } + ], + "page": 1, + "limit": 20, + "compareTo": null, + "config": { + "col": 2 + } + }` + default: + return `{}` + } +} diff --git a/backend/pkg/analytics/query/table.go b/backend/pkg/analytics/query/table.go new file mode 100644 index 000000000..616f25f65 --- /dev/null +++ b/backend/pkg/analytics/query/table.go @@ -0,0 +1,252 @@ +package query + +import ( + "fmt" + "strings" +) + +type TableQueryBuilder struct{} + +func (t TableQueryBuilder) Build(p MetricPayload) string { + return t.buildQuery(p) +} + +func (t TableQueryBuilder) buildQuery(r MetricPayload) string { + s := r.Series[0] + sessionFilters, eventFilters := partitionFilters(s.Filter.Filters) + sessionWhere := buildSessionWhere(sessionFilters) + eventWhere, seqHaving := buildEventsWhere(eventFilters, s.Filter.EventsOrder) + + subQuery := fmt.Sprintf( + "SELECT %s,\n"+ + " MIN(%s) AS first_event_ts,\n"+ + " MAX(%s) AS last_event_ts\n"+ + "FROM %s AS main\n"+ + "WHERE main.project_id = %%(project_id)s\n"+ + " AND %s >= toDateTime(%%(start_time)s/1000)\n"+ + " AND %s <= toDateTime(%%(end_time)s/1000)\n"+ + " AND (%s)\n"+ + "GROUP BY %s\n"+ + "HAVING %s", + ColEventSessionID, + ColEventTime, + ColEventTime, + TableEvents, + ColEventTime, + ColEventTime, + strings.Join(eventWhere, " OR "), + ColEventSessionID, + seqHaving, + ) + + joinQuery := fmt.Sprintf( + "SELECT *\n"+ + "FROM %s AS s\n"+ + "INNER JOIN (\n"+ + " SELECT DISTINCT ev.session_id, ev.`$current_url` AS url_path\n"+ + " FROM %s AS ev\n"+ + " WHERE ev.created_at >= toDateTime(%%(start_time)s/1000)\n"+ + " AND ev.created_at <= toDateTime(%%(end_time)s/1000)\n"+ + " AND ev.project_id = %%(project_id)s\n"+ + " AND ev.`$event_name` = 'LOCATION'\n"+ + ") AS extra_event USING (session_id)\n"+ + "WHERE s.project_id = %%(project_id)s\n"+ + " AND isNotNull(s.duration)\n"+ + " AND s.datetime >= toDateTime(%%(start_time)s/1000)\n"+ + " AND s.datetime <= toDateTime(%%(end_time)s/1000)\n", + TableSessions, + TableEvents, + ) + + if len(sessionWhere) > 0 { + joinQuery += " AND " + strings.Join(sessionWhere, " AND ") + "\n" + } + + main := fmt.Sprintf( + "SELECT s.session_id AS session_id, s.url_path\n"+ + "FROM (\n%s\n) AS f\n"+ + "INNER JOIN (\n%s) AS s\n"+ + " ON (s.session_id = f.session_id)\n", + subQuery, + joinQuery, + ) + + final := fmt.Sprintf( + "SELECT COUNT(DISTINCT url_path) OVER () AS main_count,\n"+ + " url_path AS name,\n"+ + " COUNT(DISTINCT session_id) AS total,\n"+ + " COALESCE(SUM(COUNT(DISTINCT session_id)) OVER (), 0) AS total_count\n"+ + "FROM (\n%s) AS filtered_sessions\n"+ + "GROUP BY url_path\n"+ + "ORDER BY total DESC\n"+ + "LIMIT 200 OFFSET 0;", + main, + ) + + return final +} + +func partitionFilters(filters []Filter) (sessionFilters []Filter, eventFilters []Filter) { + for _, f := range filters { + if f.IsEvent { + eventFilters = append(eventFilters, f) + } else { + sessionFilters = append(sessionFilters, f) + } + } + return +} + +func buildSessionWhere(filters []Filter) []string { + var conds []string + for _, f := range filters { + switch f.Type { + case FilterUserCountry: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCountry, concatValues(f.Value))) + case FilterUserCity: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCity, concatValues(f.Value))) + case FilterUserState: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserState, concatValues(f.Value))) + case FilterUserId: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserID, concatValues(f.Value))) + case FilterUserAnonymousId: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserAnonymousID, concatValues(f.Value))) + case FilterUserOs: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserOS, concatValues(f.Value))) + case FilterUserBrowser: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserBrowser, concatValues(f.Value))) + case FilterUserDevice: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDevice, concatValues(f.Value))) + case FilterPlatform: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDeviceType, concatValues(f.Value))) + case FilterRevId: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColRevID, concatValues(f.Value))) + case FilterReferrer: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColBaseReferrer, concatValues(f.Value))) + case FilterDuration: + if len(f.Value) == 2 { + conds = append(conds, fmt.Sprintf("%s >= '%s'", ColDuration, f.Value[0])) + conds = append(conds, fmt.Sprintf("%s <= '%s'", ColDuration, f.Value[1])) + } + case FilterUtmSource: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmSource, concatValues(f.Value))) + case FilterUtmMedium: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmMedium, concatValues(f.Value))) + case FilterUtmCampaign: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmCampaign, concatValues(f.Value))) + case FilterMetadata: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColMetadata1, concatValues(f.Value))) + } + } + // adding /n to each condition for better readability, can be removed. + for i := range conds { + conds[i] += "\n" + } + return conds +} + +func concatValues(v []string) string { + return strings.Join(v, "") +} + +func buildEventsWhere(filters []Filter, order EventOrder) (eventConditions []string, having string) { + basicEventTypes := "(" + + strings.Join([]string{ + fmt.Sprintf("%s = 'CLICK'", ColEventName), + fmt.Sprintf("%s = 'INPUT'", ColEventName), + fmt.Sprintf("%s = 'LOCATION'", ColEventName), + fmt.Sprintf("%s = 'CUSTOM'", ColEventName), + fmt.Sprintf("%s = 'REQUEST'", ColEventName), + }, " OR ") + ")" + + var seq []string + for _, f := range filters { + switch f.Type { + case FilterClick: + seq = append(seq, seqCond("CLICK", "selector", f)) + case FilterInput: + seq = append(seq, seqCond("INPUT", "label", f)) + case FilterLocation: + seq = append(seq, seqCond("LOCATION", "url_path", f)) + case FilterCustom: + seq = append(seq, seqCond("CUSTOM", "name", f)) + case FilterFetch: + seq = append(seq, seqFetchCond("REQUEST", f)) + case FilterFetchStatusCode: + seq = append(seq, seqCond("REQUEST", "status", f)) + default: + seq = append(seq, fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(string(f.Type)))) + } + } + eventConditions = []string{basicEventTypes} + + // then => sequenceMatch + // or => OR + // and => AND + switch order { + case EventOrderThen: + var pattern []string + for i := range seq { + pattern = append(pattern, fmt.Sprintf("(?%d)", i+1)) + } + having = fmt.Sprintf("sequenceMatch('%s')(\n%s,\n%s)", + strings.Join(pattern, ""), fmt.Sprintf("toUnixTimestamp(%s)", ColEventTime), strings.Join(seq, ",\n")) + case EventOrderAnd: + // build AND + having = strings.Join(seq, " AND ") + default: + // default => OR + var orParts []string + for _, p := range seq { + orParts = append(orParts, "("+p+")") + } + having = strings.Join(orParts, " OR ") + } + return +} + +func seqCond(eventName, key string, f Filter) string { + op := parseOperator(f.Operator) + return fmt.Sprintf("(%s = '%s' AND JSONExtractString(toString(%s), '%s') %s '%s')", + ColEventName, strings.ToUpper(eventName), ColEventProperties, key, op, concatValues(f.Value)) +} + +func seqFetchCond(eventName string, f Filter) string { + w := []string{fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(eventName))} + var extras []string + for _, c := range f.Filters { + switch c.Type { + case FilterFetch: + if len(c.Value) > 0 { + extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventURLPath, concatValues(c.Value))) + } + case FilterFetchStatusCode: + if len(c.Value) > 0 { + extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventStatus, concatValues(c.Value))) + } + default: + // placeholder if needed + } + } + if len(extras) > 0 { + w = append(w, strings.Join(extras, " AND ")) + } + return "(" + strings.Join(w, " AND ") + ")" +} + +func parseOperator(op string) string { + // TODO implement this properly + switch strings.ToLower(op) { + case OperatorStringContains: + return "LIKE" + case OperatorStringIs, OperatorStringOn, "=", OperatorStringOnAny: + return "=" + case OperatorStringStartsWith: + return "LIKE" + case OperatorStringEndsWith: + // might interpret differently in real impl + return "=" + default: + return "=" + } +} diff --git a/backend/pkg/analytics/query/timeseries.go b/backend/pkg/analytics/query/timeseries.go new file mode 100644 index 000000000..73d49f6cb --- /dev/null +++ b/backend/pkg/analytics/query/timeseries.go @@ -0,0 +1,42 @@ +package query + +import "fmt" + +type TimeSeriesQueryBuilder struct{} + +func (t TimeSeriesQueryBuilder) Build(p MetricPayload) string { + switch p.MetricOf { + case "sessionCount": + return t.buildSessionCountQuery(p) + case "userCount": + return t.buildUserCountQuery(p) + default: + return "" + } +} + +func (TimeSeriesQueryBuilder) buildSessionCountQuery(p MetricPayload) string { + subquery := buildEventSubquery(p) + return fmt.Sprintf(`SELECT toUnixTimestamp( + toStartOfInterval(processed_sessions.datetime, INTERVAL 115199 second) +) * 1000 AS timestamp, +COUNT(processed_sessions.session_id) AS count +FROM ( + %s +) AS processed_sessions +GROUP BY timestamp +ORDER BY timestamp;`, subquery) +} + +func (TimeSeriesQueryBuilder) buildUserCountQuery(p MetricPayload) string { + subquery := buildEventSubquery(p) + return fmt.Sprintf(`SELECT toUnixTimestamp( + toStartOfInterval(processed_sessions.datetime, INTERVAL 115199 second) +) * 1000 AS timestamp, +COUNT(DISTINCT processed_sessions.user_id) AS count +FROM ( + %s +) AS processed_sessions +GROUP BY timestamp +ORDER BY timestamp;`, subquery) +}