diff --git a/backend/cmd/analytics/main.go b/backend/cmd/analytics/main.go index 8a7b95c29..cb53cb338 100644 --- a/backend/cmd/analytics/main.go +++ b/backend/cmd/analytics/main.go @@ -2,71 +2,51 @@ package main import ( "context" - "os" - "os/signal" - "syscall" - + analyticsConfig "openreplay/backend/internal/config/analytics" + "openreplay/backend/pkg/analytics" + "openreplay/backend/pkg/analytics/db" + "openreplay/backend/pkg/db/postgres/pool" "openreplay/backend/pkg/logger" + "openreplay/backend/pkg/metrics" + //analyticsMetrics "openreplay/backend/pkg/metrics/analytics" + //databaseMetrics "openreplay/backend/pkg/metrics/database" + "openreplay/backend/pkg/metrics/database" + "openreplay/backend/pkg/metrics/web" + "openreplay/backend/pkg/server" + "openreplay/backend/pkg/server/api" ) func main() { ctx := context.Background() log := logger.New() - log.Info(ctx, "Cacher service started") + cfg := analyticsConfig.New(log) + webMetrics := web.New("analytics") + dbMetrics := database.New("analytics") + metrics.New(log, append(webMetrics.List(), dbMetrics.List()...)) - sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - - for { - select { - case sig := <-sigchan: - log.Error(ctx, "Caught signal %v: terminating", sig) - os.Exit(0) - } + pgConn, err := pool.New(dbMetrics, cfg.Postgres.String()) + if err != nil { + log.Fatal(ctx, "can't init postgres connection: %s", err) } -} + defer pgConn.Close() -// -//import ( -// "context" -// -// analyticsConfig "openreplay/backend/internal/config/analytics" -// "openreplay/backend/pkg/analytics" -// "openreplay/backend/pkg/db/postgres/pool" -// "openreplay/backend/pkg/logger" -// "openreplay/backend/pkg/metrics" -// "openreplay/backend/pkg/metrics/database" -// "openreplay/backend/pkg/metrics/web" -// "openreplay/backend/pkg/server" -// "openreplay/backend/pkg/server/api" -//) -// -//func main() { -// ctx := context.Background() -// log := logger.New() -// cfg := analyticsConfig.New(log) -// // Observability -// webMetrics := web.New("analytics") -// dbMetrics := database.New("analytics") -// metrics.New(log, append(webMetrics.List(), dbMetrics.List()...)) -// -// pgConn, err := pool.New(dbMetrics, cfg.Postgres.String()) -// if err != nil { -// log.Fatal(ctx, "can't init postgres connection: %s", err) -// } -// defer pgConn.Close() -// -// builder, err := analytics.NewServiceBuilder(log, cfg, webMetrics, dbMetrics, pgConn) -// if err != nil { -// log.Fatal(ctx, "can't init services: %s", err) -// } -// -// router, err := api.NewRouter(&cfg.HTTP, log) -// if err != nil { -// log.Fatal(ctx, "failed while creating router: %s", err) -// } -// router.AddHandlers(api.NoPrefix, builder.CardsAPI, builder.DashboardsAPI, builder.ChartsAPI) -// router.AddMiddlewares(builder.Auth.Middleware, builder.RateLimiter.Middleware, builder.AuditTrail.Middleware) -// -// server.Run(ctx, log, &cfg.HTTP, router) -//} + chConn, err := db.NewConnector(cfg.Clickhouse) + if err != nil { + log.Fatal(ctx, "can't init clickhouse connection: %s", err) + } + defer chConn.Stop() + + builder, err := analytics.NewServiceBuilder(log, cfg, webMetrics, dbMetrics, pgConn, chConn) + if err != nil { + log.Fatal(ctx, "can't init services: %s", err) + } + + router, err := api.NewRouter(&cfg.HTTP, log) + if err != nil { + log.Fatal(ctx, "failed while creating router: %s", err) + } + router.AddHandlers(api.NoPrefix, builder.CardsAPI, builder.DashboardsAPI, builder.ChartsAPI) + router.AddMiddlewares(builder.Auth.Middleware, builder.RateLimiter.Middleware, builder.AuditTrail.Middleware) + + server.Run(ctx, log, &cfg.HTTP, router) +} diff --git a/backend/internal/config/analytics/config.go b/backend/internal/config/analytics/config.go index b6ca5ce4c..90398240a 100644 --- a/backend/internal/config/analytics/config.go +++ b/backend/internal/config/analytics/config.go @@ -14,6 +14,7 @@ import ( type Config struct { common.Config common.Postgres + common.Clickhouse redis.Redis objectstorage.ObjectsConfig common.HTTP diff --git a/backend/pkg/analytics/builder.go b/backend/pkg/analytics/builder.go index 68098dc01..743373b50 100644 --- a/backend/pkg/analytics/builder.go +++ b/backend/pkg/analytics/builder.go @@ -3,6 +3,7 @@ package analytics import ( "github.com/go-playground/validator/v10" "openreplay/backend/pkg/analytics/charts" + "openreplay/backend/pkg/analytics/db" "openreplay/backend/pkg/metrics/database" "time" @@ -27,13 +28,14 @@ type ServicesBuilder struct { ChartsAPI api.Handlers } -func NewServiceBuilder(log logger.Logger, cfg *analytics.Config, webMetrics web.Web, dbMetrics database.Database, pgconn pool.Pool) (*ServicesBuilder, error) { +func NewServiceBuilder(log logger.Logger, cfg *analytics.Config, webMetrics web.Web, dbMetrics database.Database, pgconn pool.Pool, chConn db.Connector) (*ServicesBuilder, error) { responser := api.NewResponser(webMetrics) audiTrail, err := tracer.NewTracer(log, pgconn, dbMetrics) if err != nil { return nil, err } reqValidator := validator.New() + cardsService, err := cards.New(log, pgconn) if err != nil { return nil, err @@ -42,6 +44,7 @@ func NewServiceBuilder(log logger.Logger, cfg *analytics.Config, webMetrics web. if err != nil { return nil, err } + dashboardsService, err := dashboards.New(log, pgconn) if err != nil { return nil, err @@ -50,7 +53,8 @@ func NewServiceBuilder(log logger.Logger, cfg *analytics.Config, webMetrics web. if err != nil { return nil, err } - chartsService, err := charts.New(log, pgconn) + + chartsService, err := charts.New(log, pgconn, chConn) if err != nil { return nil, err } @@ -58,6 +62,7 @@ func NewServiceBuilder(log logger.Logger, cfg *analytics.Config, webMetrics web. if err != nil { return nil, err } + return &ServicesBuilder{ Auth: auth.NewAuth(log, cfg.JWTSecret, cfg.JWTSpotSecret, pgconn, nil, api.NoPrefix), RateLimiter: limiter.NewUserRateLimiter(10, 30, 1*time.Minute, 5*time.Minute), diff --git a/backend/pkg/analytics/cards/cards.go b/backend/pkg/analytics/cards/cards.go index 1425a61f0..794db1c98 100644 --- a/backend/pkg/analytics/cards/cards.go +++ b/backend/pkg/analytics/cards/cards.go @@ -6,7 +6,6 @@ import ( "fmt" "strings" - "github.com/jackc/pgx/v4" "github.com/lib/pq" "openreplay/backend/pkg/db/postgres/pool" @@ -48,12 +47,12 @@ func (s *cardsImpl) Create(projectId int, userID uint64, req *CardCreateRequest) ctx := context.Background() defer func() { if err != nil { - tx.Rollback(ctx) + err := tx.TxRollback() if err != nil { return } } else { - err := tx.Commit(ctx) + err := tx.TxCommit() if err != nil { return } @@ -67,8 +66,8 @@ func (s *cardsImpl) Create(projectId int, userID uint64, req *CardCreateRequest) RETURNING metric_id, project_id, user_id, name, metric_type, view_type, metric_of, metric_value, metric_format, is_public, created_at, edited_at` card := &CardGetResponse{} - err = tx.QueryRow( - ctx, sql, + err = tx.TxQueryRow( + sql, projectId, userID, req.Name, req.MetricType, req.ViewType, req.MetricOf, req.MetricValue, req.MetricFormat, req.IsPublic, ).Scan( &card.CardID, @@ -98,7 +97,7 @@ func (s *cardsImpl) Create(projectId int, userID uint64, req *CardCreateRequest) return card, nil } -func (s *cardsImpl) CreateSeries(ctx context.Context, tx pgx.Tx, metricId int64, series []CardSeriesBase) []CardSeries { +func (s *cardsImpl) CreateSeries(ctx context.Context, tx *pool.Tx, metricId int64, series []CardSeriesBase) []CardSeries { if len(series) == 0 { return nil // No series to create } @@ -126,7 +125,7 @@ func (s *cardsImpl) CreateSeries(ctx context.Context, tx pgx.Tx, metricId int64, query := fmt.Sprintf(sql, strings.Join(values, ",")) s.log.Info(ctx, "Executing query: %s with args: %v", query, args) - rows, err := tx.Query(ctx, query, args...) + rows, err := tx.TxQuery(query, args...) if err != nil { s.log.Error(ctx, "failed to execute batch insert for series: %v", err) return nil @@ -359,12 +358,12 @@ func (s *cardsImpl) Update(projectId int, cardID int64, userID uint64, req *Card ctx := context.Background() defer func() { if err != nil { - tx.Rollback(ctx) + err := tx.TxRollback() if err != nil { return } } else { - err := tx.Commit(ctx) + err := tx.TxCommit() if err != nil { return } @@ -379,7 +378,7 @@ func (s *cardsImpl) Update(projectId int, cardID int64, userID uint64, req *Card RETURNING metric_id, project_id, user_id, name, metric_type, view_type, metric_of, metric_value, metric_format, is_public, created_at, edited_at` card := &CardGetResponse{} - err = tx.QueryRow(ctx, sql, + err = tx.TxQueryRow(sql, req.Name, req.MetricType, req.ViewType, req.MetricOf, req.MetricValue, req.MetricFormat, req.IsPublic, cardID, projectId, ).Scan( &card.CardID, &card.ProjectID, &card.UserID, &card.Name, &card.MetricType, &card.ViewType, &card.MetricOf, diff --git a/backend/pkg/analytics/cards/handlers.go b/backend/pkg/analytics/cards/handlers.go index f0cf16d02..a47c1153a 100644 --- a/backend/pkg/analytics/cards/handlers.go +++ b/backend/pkg/analytics/cards/handlers.go @@ -46,6 +46,7 @@ func (e *handlersImpl) GetAll() []*api.Description { {"/v1/analytics/{projectId}/cards/{id}", e.getCard, "GET"}, {"/v1/analytics/{projectId}/cards/{id}", e.updateCard, "PUT"}, {"/v1/analytics/{projectId}/cards/{id}", e.deleteCard, "DELETE"}, + {"/v1/analytics/{projectId}/cards/{id}/sessions", e.getCardSessions, "POST"}, } } @@ -296,3 +297,8 @@ func (e *handlersImpl) deleteCard(w http.ResponseWriter, r *http.Request) { e.responser.ResponseWithJSON(e.log, r.Context(), w, nil, startTime, r.URL.Path, bodySize) } + +func (e *handlersImpl) getCardSessions(w http.ResponseWriter, r *http.Request) { + // TODO: implement this + e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusNotImplemented, fmt.Errorf("not implemented"), time.Now(), r.URL.Path, 0) +} diff --git a/backend/pkg/analytics/cards/model.go b/backend/pkg/analytics/cards/model.go index 5ab4144f0..0b42df18a 100644 --- a/backend/pkg/analytics/cards/model.go +++ b/backend/pkg/analytics/cards/model.go @@ -6,6 +6,24 @@ import ( "time" ) +type MetricType string +type MetricOfTimeseries string +type MetricOfTable string + +const ( + MetricTypeTimeseries MetricType = "TIMESERIES" + MetricTypeTable MetricType = "TABLE" + + MetricOfTimeseriesSessionCount MetricOfTimeseries = "SESSION_COUNT" + MetricOfTimeseriesUserCount MetricOfTimeseries = "USER_COUNT" + + MetricOfTableVisitedURL MetricOfTable = "VISITED_URL" + MetricOfTableIssues MetricOfTable = "ISSUES" + MetricOfTableUserCountry MetricOfTable = "USER_COUNTRY" + MetricOfTableUserDevice MetricOfTable = "USER_DEVICE" + MetricOfTableUserBrowser MetricOfTable = "USER_BROWSER" +) + // CardBase Common fields for the Card entity type CardBase struct { Name string `json:"name" validate:"required"` @@ -49,8 +67,8 @@ type CardSeries struct { } type SeriesFilter struct { - EventOrder string `json:"eventOrder" validate:"required,oneof=then or and"` - Filters []FilterItem `json:"filters"` + EventsOrder string `json:"eventsOrder" validate:"required,oneof=then or and"` + Filters []FilterItem `json:"filters"` } type FilterItem struct { @@ -192,3 +210,34 @@ func (s *CardListSort) GetSQLField() string { func (s *CardListSort) GetSQLOrder() string { return strings.ToUpper(s.Order) } + +// --- + +/* +class IssueType(str, Enum): + + CLICK_RAGE = 'click_rage' + DEAD_CLICK = 'dead_click' + EXCESSIVE_SCROLLING = 'excessive_scrolling' + BAD_REQUEST = 'bad_request' + MISSING_RESOURCE = 'missing_resource' + MEMORY = 'memory' + CPU = 'cpu' + SLOW_RESOURCE = 'slow_resource' + SLOW_PAGE_LOAD = 'slow_page_load' + CRASH = 'crash' + CUSTOM = 'custom' + JS_EXCEPTION = 'js_exception' + MOUSE_THRASHING = 'mouse_thrashing' + # IOS + TAP_RAGE = 'tap_rage' +*/ +type IssueType string +type ChartData struct { + StartTs uint64 `json:"startTs"` + EndTs uint64 `json:"endTs"` + Density uint64 `json:"density"` + Filters []FilterItem `json:"filter"` + MetricOf string `json:"metricOf"` + MetricValue []IssueType `json:"metricValue"` +} diff --git a/backend/pkg/analytics/charts/charts.go b/backend/pkg/analytics/charts/charts.go index 1916695fa..e9f24bc31 100644 --- a/backend/pkg/analytics/charts/charts.go +++ b/backend/pkg/analytics/charts/charts.go @@ -1,50 +1,51 @@ package charts import ( - "encoding/json" "fmt" - + "openreplay/backend/pkg/analytics/db" "openreplay/backend/pkg/db/postgres/pool" "openreplay/backend/pkg/logger" ) type Charts interface { - GetData(projectId int, userId uint64, req *GetCardChartDataRequest) ([]DataPoint, error) + GetData(projectId int, userId uint64, req *MetricPayload) (interface{}, error) } type chartsImpl struct { log logger.Logger pgconn pool.Pool + chConn db.Connector } -func New(log logger.Logger, conn pool.Pool) (Charts, error) { +func New(log logger.Logger, conn pool.Pool, chConn db.Connector) (Charts, error) { return &chartsImpl{ log: log, pgconn: conn, + chConn: chConn, }, nil } -func (s *chartsImpl) GetData(projectId int, userID uint64, req *GetCardChartDataRequest) ([]DataPoint, error) { - jsonInput := ` - { - "data": [ - { - "timestamp": 1733934939000, - "Series A": 100, - "Series B": 200 - }, - { - "timestamp": 1733935939000, - "Series A": 150, - "Series B": 250 - } - ] - }` - - var resp GetCardChartDataResponse - if err := json.Unmarshal([]byte(jsonInput), &resp); err != nil { - return nil, fmt.Errorf("failed to unmarshal response: %w", err) +// GetData def get_chart() +func (s *chartsImpl) GetData(projectId int, userID uint64, req *MetricPayload) (interface{}, error) { + if req == nil { + return nil, fmt.Errorf("request is empty") } - return resp.Data, nil + payload := Payload{ + ProjectId: projectId, + UserId: userID, + MetricPayload: req, + } + qb, err := NewQueryBuilder(payload) + if err != nil { + return nil, fmt.Errorf("error creating query builder: %v", err) + } + + resp, err := qb.Execute(payload, s.chConn) + if err != nil { + return nil, fmt.Errorf("error executing query: %v", err) + } + + //return resp, nil + return map[string]interface{}{"data": resp}, nil } diff --git a/backend/pkg/analytics/charts/counters.go b/backend/pkg/analytics/charts/counters.go new file mode 100644 index 000000000..520ca7163 --- /dev/null +++ b/backend/pkg/analytics/charts/counters.go @@ -0,0 +1,427 @@ +package charts + +import ( + "fmt" + "log" + "strconv" + "strings" +) + +type Fields map[string]string + +func getSessionMetaFields() Fields { + return Fields{ + "revId": "rev_id", + "country": "user_country", + "os": "user_os", + "platform": "user_device_type", + "device": "user_device", + "browser": "user_browser", + } +} + +func getMetadataFields() Fields { + return Fields{ + "userId": "user_id", + "userAnonymousId": "user_anonymous_id", + "metadata1": "metadata_1", + "metadata2": "metadata_2", + "metadata3": "metadata_3", + "metadata4": "metadata_4", + "metadata5": "metadata_5", + "metadata6": "metadata_6", + "metadata7": "metadata_7", + "metadata8": "metadata_8", + "metadata9": "metadata_9", + "metadata10": "metadata_10", + } +} + +func getStepSize(startTimestamp, endTimestamp int64, density int, decimal bool, factor int) float64 { + factorInt64 := int64(factor) + stepSize := (endTimestamp / factorInt64) - (startTimestamp / factorInt64) + + if density <= 1 { + return float64(stepSize) + } + + if decimal { + return float64(stepSize) / float64(density) + } + + return float64(stepSize / int64(density-1)) +} + +//func getStepSize(startTimestamp, endTimestamp, density uint64, decimal bool, factor uint64) float64 { +// stepSize := (endTimestamp / factor) - (startTimestamp / factor) // TODO: should I use float64 here? +// if !decimal { +// density-- +// } +// return float64(stepSize) / float64(density) +//} + +func getBasicConstraints(tableName string, timeConstraint, roundStart bool, data map[string]interface{}, identifier string) []string { // Если tableName не пустая, добавляем точку + if tableName != "" { + tableName += "." + } + chSubQuery := []string{fmt.Sprintf("%s%s = toUInt16(:%s)", tableName, identifier, identifier)} + + if timeConstraint { + if roundStart { + chSubQuery = append(chSubQuery, fmt.Sprintf("toStartOfInterval(%sdatetime, INTERVAL :step_size second) >= toDateTime(:startTimestamp/1000)", tableName)) + } else { + chSubQuery = append(chSubQuery, fmt.Sprintf("%sdatetime >= toDateTime(:startTimestamp/1000)", tableName)) + } + chSubQuery = append(chSubQuery, fmt.Sprintf("%sdatetime < toDateTime(:endTimestamp/1000)", tableName)) + } + return append(chSubQuery, getGenericConstraint(data, tableName)...) +} + +func getGenericConstraint(data map[string]interface{}, tableName string) []string { + return getConstraint(data, getSessionMetaFields(), tableName) +} + +func getConstraint(data map[string]interface{}, fields Fields, tableName string) []string { + var constraints []string + filters, err := data["filters"].([]map[string]interface{}) + if !err { + log.Println("error getting filters from data") + filters = make([]map[string]interface{}, 0) // to skip the next block + } + + // process filters + for i, f := range filters { + key, _ := f["key"].(string) + value, _ := f["value"].(string) + + if field, ok := fields[key]; ok { + if value == "*" || value == "" { + constraints = append(constraints, fmt.Sprintf("isNotNull(%s%s)", tableName, field)) + } else { + // constraints.append(f"{table_name}{fields[f['key']]} = %({f['key']}_{i})s") + constraints = append(constraints, fmt.Sprintf("%s%s = %%(%s_%d)s", tableName, field, key, i)) // TODO: where we'll keep the value? + } + } + } + + // TODO from Python: remove this in next release + offset := len(filters) + for i, f := range data { + key, _ := f.(string) + value, _ := data[key].(string) + + if field, ok := fields[key]; ok { + if value == "*" || value == "" { + constraints = append(constraints, fmt.Sprintf("isNotNull(%s%s)", tableName, field)) + } else { + intI, err := strconv.Atoi(i) + if err != nil { + log.Printf("error converting data[k] to int: %v", err) + continue + } else { + constraints = append(constraints, fmt.Sprintf("%s%s = %%(%s_%d)s", tableName, field, f, intI+offset)) + } + } + } + } + return constraints +} + +func getMetaConstraint(data map[string]interface{}) []string { + return getConstraint(data, getMetadataFields(), "sessions_metadata.") +} + +func getConstraintValues(data map[string]interface{}) map[string]interface{} { + params := make(map[string]interface{}) + + if filters, ok := data["filters"].([]map[string]interface{}); ok { + for i, f := range filters { + key, _ := f["key"].(string) + value := f["value"] + params[fmt.Sprintf("%s_%d", key, i)] = value + } + + // TODO from Python: remove this in next release + offset := len(data["filters"].([]map[string]interface{})) + i := 0 + for k, v := range data { + params[fmt.Sprintf("%s_%d", k, i+offset)] = v + i++ + } + } + + return params +} + +/* +def get_main_sessions_table(timestamp=0): + + return "experimental.sessions_l7d_mv" \ + if config("EXP_7D_MV", cast=bool, default=True) \ + and timestamp and timestamp >= TimeUTC.now(delta_days=-7) else "experimental.sessions" +*/ +func getMainSessionsTable(timestamp int64) string { + return "experimental.sessions" +} + +// Function to convert named parameters to positional parameters +func replaceNamedParams(query string, params map[string]interface{}) (string, []interface{}) { + var args []interface{} + i := 1 + for key, val := range params { + placeholder := ":" + key + //query = strings.Replace(query, placeholder, "?", 1) + strVal := fmt.Sprintf("%v", val) + query = strings.Replace(query, placeholder, strVal, -1) + args = append(args, val) + i++ + } + return query, args +} + +// Helper function to generate a range of floats +func frange(start, end, step float64) []float64 { + var rangeValues []float64 + for i := start; i < end; i += step { + rangeValues = append(rangeValues, i) + } + return rangeValues +} + +// Helper function to add missing keys from the "complete" map to the "original" map +func addMissingKeys(original, complete map[string]interface{}) map[string]interface{} { + for k, v := range complete { + if _, exists := original[k]; !exists { + original[k] = v + } + } + return original +} + +// CompleteMissingSteps fills in missing steps in the data +func CompleteMissingSteps( + startTime, endTime int64, + density int, + neutral map[string]interface{}, + rows []map[string]interface{}, + timeKey string, + timeCoefficient int64, +) []map[string]interface{} { + if len(rows) == density { + return rows + } + + // Calculate the step size + step := getStepSize(startTime, endTime, density, true, 1000) + optimal := make([][2]uint64, 0) + for _, i := range frange(float64(startTime)/float64(timeCoefficient), float64(endTime)/float64(timeCoefficient), step) { + startInterval := uint64(i * float64(timeCoefficient)) + endInterval := uint64((i + step) * float64(timeCoefficient)) + optimal = append(optimal, [2]uint64{startInterval, endInterval}) + } + + var result []map[string]interface{} + r, o := 0, 0 + + // Iterate over density + for i := 0; i < density; i++ { + // Clone the neutral map + neutralClone := make(map[string]interface{}) + for k, v := range neutral { + if fn, ok := v.(func() interface{}); ok { + neutralClone[k] = fn() + } else { + neutralClone[k] = v + } + } + + // If we can just add the rest of the rows to result + if r < len(rows) && len(result)+len(rows)-r == density { + result = append(result, rows[r:]...) + break + } + + // Determine where the current row fits within the optimal intervals + if r < len(rows) && o < len(optimal) && rows[r][timeKey].(uint64) < optimal[o][0] { + rows[r] = addMissingKeys(rows[r], neutralClone) + result = append(result, rows[r]) + r++ + } else if r < len(rows) && o < len(optimal) && optimal[o][0] <= rows[r][timeKey].(uint64) && rows[r][timeKey].(uint64) < optimal[o][1] { + rows[r] = addMissingKeys(rows[r], neutralClone) + result = append(result, rows[r]) + r++ + o++ + } else { + neutralClone[timeKey] = optimal[o][0] + result = append(result, neutralClone) + o++ + } + } + return result +} + +func progress(oldVal, newVal uint64) float64 { + if newVal > 0 { + return (float64(oldVal-newVal) / float64(newVal)) * 100 + } + if oldVal == 0 { + return 0 + } + return 100 +} + +// Trying to find a common part +func parse(projectID int, startTs, endTs int64, density int, args map[string]interface{}) ([]string, []string, map[string]interface{}) { + stepSize := getStepSize(startTs, endTs, density, false, 1000) + chSubQuery := getBasicConstraints("sessions", true, false, args, "project_id") + chSubQueryChart := getBasicConstraints("sessions", true, true, args, "project_id") + metaCondition := getMetaConstraint(args) + chSubQuery = append(chSubQuery, metaCondition...) + chSubQueryChart = append(chSubQueryChart, metaCondition...) + + params := map[string]interface{}{ + "step_size": stepSize, + "project_id": projectID, + "startTimestamp": startTs, + "endTimestamp": endTs, + } + for k, v := range getConstraintValues(args) { + params[k] = v + } + return chSubQuery, chSubQueryChart, params +} + +// Sessions trend +//func (s *chartsImpl) getProcessedSessions(projectID int, startTs, endTs int64, density int, args map[string]interface{}) (interface{}, error) { +// chQuery := ` +// SELECT toUnixTimestamp(toStartOfInterval(sessions.datetime, INTERVAL :step_size second)) * 1000 AS timestamp, +// COUNT(DISTINCT sessions.session_id) AS value +// FROM :main_sessions_table AS sessions +// WHERE :sub_query_chart +// GROUP BY timestamp +// ORDER BY timestamp; +// ` +// chSubQuery, chSubQueryChart, params := parse(projectID, startTs, endTs, density, args) +// +// chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1) +// chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQueryChart, " AND "), -1) +// +// preparedQuery, preparedArgs := replaceNamedParams(chQuery, params) +// rows, err := s.chConn.Query(context.Background(), preparedQuery, preparedArgs) +// if err != nil { +// log.Fatalf("Error executing query: %v", err) +// } +// preparedRows := make([]map[string]interface{}, 0) +// var sum uint64 +// for rows.Next() { +// var timestamp, value uint64 +// if err := rows.Scan(×tamp, &value); err != nil { +// log.Fatalf("Error scanning row: %v", err) +// } +// fmt.Printf("Timestamp: %d, Value: %d\n", timestamp, value) +// sum += value +// preparedRows = append(preparedRows, map[string]interface{}{"timestamp": timestamp, "value": value}) +// } +// +// results := map[string]interface{}{ +// "value": sum, +// "chart": CompleteMissingSteps(startTs, endTs, int(density), map[string]interface{}{"value": 0}, preparedRows, "timestamp", 1000), +// } +// +// diff := endTs - startTs +// endTs = startTs +// startTs = endTs - diff +// +// log.Println(results) +// +// chQuery = fmt.Sprintf(` +// SELECT COUNT(1) AS count +// FROM :main_sessions_table AS sessions +// WHERE :sub_query_chart; +// `) +// chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1) +// chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQuery, " AND "), -1) +// +// var count uint64 +// +// preparedQuery, preparedArgs = replaceNamedParams(chQuery, params) +// if err := s.chConn.QueryRow(context.Background(), preparedQuery, preparedArgs).Scan(&count); err != nil { +// log.Fatalf("Error executing query: %v", err) +// } +// +// results["progress"] = progress(count, results["value"].(uint64)) +// +// // TODO: this should be returned in any case +// results["unit"] = "COUNT" +// fmt.Println(results) +// +// return results, nil +//} +// +//// Users trend +//func (s *chartsImpl) getUniqueUsers(projectID int, startTs, endTs int64, density int, args map[string]interface{}) (interface{}, error) { +// chQuery := ` +// SELECT toUnixTimestamp(toStartOfInterval(sessions.datetime, INTERVAL :step_size second)) * 1000 AS timestamp, +// COUNT(DISTINCT sessions.user_id) AS value +// FROM :main_sessions_table AS sessions +// WHERE :sub_query_chart +// GROUP BY timestamp +// ORDER BY timestamp; +// ` +// chSubQuery, chSubQueryChart, params := parse(projectID, startTs, endTs, density, args) +// chSubQueryChart = append(chSubQueryChart, []string{"isNotNull(sessions.user_id)", "sessions.user_id!=''"}...) +// +// chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1) +// chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQueryChart, " AND "), -1) +// +// preparedQuery, preparedArgs := replaceNamedParams(chQuery, params) +// rows, err := s.chConn.Query(context.Background(), preparedQuery, preparedArgs) +// if err != nil { +// log.Fatalf("Error executing query: %v", err) +// } +// preparedRows := make([]map[string]interface{}, 0) +// var sum uint64 +// for rows.Next() { +// var timestamp, value uint64 +// if err := rows.Scan(×tamp, &value); err != nil { +// log.Fatalf("Error scanning row: %v", err) +// } +// fmt.Printf("Timestamp: %d, Value: %d\n", timestamp, value) +// sum += value +// preparedRows = append(preparedRows, map[string]interface{}{"timestamp": timestamp, "value": value}) +// } +// +// results := map[string]interface{}{ +// "value": sum, +// "chart": CompleteMissingSteps(startTs, endTs, int(density), map[string]interface{}{"value": 0}, preparedRows, "timestamp", 1000), +// } +// +// diff := endTs - startTs +// endTs = startTs +// startTs = endTs - diff +// +// log.Println(results) +// +// chQuery = fmt.Sprintf(` +// SELECT COUNT(DISTINCT user_id) AS count +// FROM :main_sessions_table AS sessions +// WHERE :sub_query_chart; +// `) +// chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1) +// chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQuery, " AND "), -1) +// +// var count uint64 +// +// preparedQuery, preparedArgs = replaceNamedParams(chQuery, params) +// if err := s.chConn.QueryRow(context.Background(), preparedQuery, preparedArgs).Scan(&count); err != nil { +// log.Fatalf("Error executing query: %v", err) +// } +// +// results["progress"] = progress(count, results["value"].(uint64)) +// +// // TODO: this should be returned in any case +// results["unit"] = "COUNT" +// fmt.Println(results) +// +// return results, nil +//} diff --git a/backend/pkg/analytics/charts/handlers.go b/backend/pkg/analytics/charts/handlers.go index 771732b43..8e759ea91 100644 --- a/backend/pkg/analytics/charts/handlers.go +++ b/backend/pkg/analytics/charts/handlers.go @@ -41,8 +41,9 @@ type handlersImpl struct { func (e *handlersImpl) GetAll() []*api.Description { return []*api.Description{ - {"/v1/analytics/{projectId}/cards/{id}/chart", e.getCardChartData, "POST"}, + {"/v1/analytics/{projectId}/cards/{id}/chart", e.getCardChartData, "POST"}, // for dashboards {"/v1/analytics/{projectId}/cards/{id}/try", e.getCardChartData, "POST"}, + {"/v1/analytics/{projectId}/cards/try", e.getCardChartData, "POST"}, // for cards itself } } @@ -73,7 +74,7 @@ func (e *handlersImpl) getCardChartData(w http.ResponseWriter, r *http.Request) } bodySize = len(bodyBytes) - req := &GetCardChartDataRequest{} + req := &MetricPayload{} if err := json.Unmarshal(bodyBytes, req); err != nil { e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize) return diff --git a/backend/pkg/analytics/charts/metric_funnel.go b/backend/pkg/analytics/charts/metric_funnel.go new file mode 100644 index 000000000..59df29070 --- /dev/null +++ b/backend/pkg/analytics/charts/metric_funnel.go @@ -0,0 +1,236 @@ +package charts + +import ( + "fmt" + "openreplay/backend/pkg/analytics/db" + "strings" +) + +type FunnelStepResult struct { + LevelNumber uint64 `json:"step"` + StepName string `json:"type"` + CountAtLevel uint64 `json:"count"` + Operator string `json:"operator"` + Value []string `json:"value"` + DropPct float64 `json:"dropPct"` +} + +type FunnelResponse struct { + Steps []FunnelStepResult `json:"stages"` +} + +type FunnelQueryBuilder struct{} + +func (f FunnelQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) { + q, err := f.buildQuery(p) + if err != nil { + return nil, err + } + rows, err := conn.Query(q) + if err != nil { + return nil, err + } + defer rows.Close() + + // extract step filters + s := p.MetricPayload.Series[0] + var stepFilters []Filter + for _, flt := range s.Filter.Filters { + if flt.IsEvent { + stepFilters = append(stepFilters, flt) + } + } + + var steps []FunnelStepResult + for rows.Next() { + var r FunnelStepResult + if err := rows.Scan(&r.LevelNumber, &r.StepName, &r.CountAtLevel); err != nil { + return nil, err + } + idx := int(r.LevelNumber) - 1 + if idx >= 0 && idx < len(stepFilters) { + r.Operator = stepFilters[idx].Operator + r.Value = stepFilters[idx].Value + } + steps = append(steps, r) + } + + // compute drop percentages + if len(steps) > 0 { + prev := steps[0].CountAtLevel + steps[0].DropPct = 0 + for i := 1; i < len(steps); i++ { + curr := steps[i].CountAtLevel + if prev > 0 { + steps[i].DropPct = (float64(prev-curr) / float64(prev)) * 100 + } else { + steps[i].DropPct = 0 + } + prev = curr + } + } + + return FunnelResponse{Steps: steps}, nil +} + +func (f FunnelQueryBuilder) buildQuery(p Payload) (string, error) { + if len(p.MetricPayload.Series) == 0 { + return "", fmt.Errorf("series empty") + } + s := p.MetricPayload.Series[0] + metricFormat := p.MetricPayload.MetricFormat + + var ( + globalFilters []Filter + stepFilters []Filter + sessionDurationFilter *Filter + ) + for _, flt := range s.Filter.Filters { + if flt.IsEvent { + stepFilters = append(stepFilters, flt) + } else if flt.Type == "duration" { + sessionDurationFilter = &flt + } else { + globalFilters = append(globalFilters, flt) + } + } + + requiredColumns := make(map[string]struct{}) + var collectColumns func([]Filter) + collectColumns = func(filters []Filter) { + for _, flt := range filters { + if col, ok := mainColumns[string(flt.Type)]; ok { + requiredColumns[col] = struct{}{} + } + collectColumns(flt.Filters) + } + } + collectColumns(globalFilters) + collectColumns(stepFilters) + + selectCols := []string{ + `e.created_at`, + `e."$event_name" AS event_name`, + `e."$properties" AS properties`, + } + for col := range requiredColumns { + logical := reverseLookup(mainColumns, col) + selectCols = append(selectCols, fmt.Sprintf(`e."%s" AS %s`, col, logical)) + } + selectCols = append(selectCols, + `e.session_id`, + `e.distinct_id`, + `s.user_id AS session_user_id`, + fmt.Sprintf("if('%s' = 'sessionCount', toString(e.session_id), coalesce(nullif(s.user_id,''),e.distinct_id)) AS entity_id", metricFormat), + ) + + globalConds, _ := buildEventConditions(globalFilters, BuildConditionsOptions{ + DefinedColumns: mainColumns, + MainTableAlias: "e", + PropertiesColumnName: "$properties", + }) + + base := []string{ + fmt.Sprintf("e.created_at >= toDateTime(%d/1000)", p.MetricPayload.StartTimestamp), + fmt.Sprintf("e.created_at < toDateTime(%d/1000)", p.MetricPayload.EndTimestamp+86400000), + fmt.Sprintf("e.project_id = %d", p.ProjectId), + } + base = append(base, globalConds...) + if sessionDurationFilter != nil { + vals := sessionDurationFilter.Value + if len(vals) > 0 && vals[0] != "" { + base = append(base, fmt.Sprintf("s.duration >= %s", vals[0])) + } + if len(vals) > 1 && vals[1] != "" { + base = append(base, fmt.Sprintf("s.duration <= %s", vals[1])) + } + } + where := strings.Join(base, " AND ") + + var ( + stepNames []string + stepExprs []string + clickCount int + ) + for i, flt := range stepFilters { + stepNames = append(stepNames, fmt.Sprintf("'%s'", flt.Type)) + conds, _ := buildEventConditions([]Filter{flt}, BuildConditionsOptions{ + DefinedColumns: cteColumnAliases(), + PropertiesColumnName: "properties", + MainTableAlias: "", + }) + var exprParts []string + exprParts = append(exprParts, fmt.Sprintf("event_name = funnel_steps[%d]", i+1)) + if flt.Type == "CLICK" { + clickCount++ + exprParts = append(exprParts, fmt.Sprintf("click_idx = %d", clickCount)) + } + exprParts = append(exprParts, conds...) + stepExprs = append(stepExprs, fmt.Sprintf("(%s)", strings.Join(exprParts, " AND "))) + } + + stepsArr := fmt.Sprintf("[%s]", strings.Join(stepNames, ",")) + windowArgs := strings.Join(stepExprs, ",\n ") + + q := fmt.Sprintf(` +WITH + %s AS funnel_steps, + 86400 AS funnel_window_seconds, + events_for_funnel AS ( + SELECT + %s + FROM product_analytics.events AS e + JOIN experimental.sessions AS s USING(session_id) + WHERE %s + ORDER BY e.session_id, e.created_at + ), + numbered_clicks AS ( + SELECT + entity_id, + created_at, + row_number() OVER (PARTITION BY entity_id ORDER BY created_at) AS click_idx + FROM events_for_funnel + WHERE event_name = 'CLICK' + ), + funnel_levels_reached AS ( + SELECT + ef.entity_id, + windowFunnel(funnel_window_seconds)( + toDateTime(ef.created_at), + %s + ) AS max_level + FROM events_for_funnel ef + LEFT JOIN numbered_clicks nc + ON ef.entity_id = nc.entity_id + AND ef.created_at = nc.created_at + GROUP BY ef.entity_id + ), + counts_by_level AS ( + SELECT + seq.number + 1 AS level_number, + countDistinctIf(entity_id, max_level >= seq.number + 1) AS cnt + FROM funnel_levels_reached + CROSS JOIN numbers(length(funnel_steps)) AS seq + GROUP BY seq.number + ), + step_list AS ( + SELECT + seq.number + 1 AS level_number, + funnel_steps[seq.number + 1] AS step_name + FROM numbers(length(funnel_steps)) AS seq + ) +SELECT + s.level_number, + s.step_name, + ifNull(c.cnt, 0) AS count_at_level +FROM step_list AS s +LEFT JOIN counts_by_level AS c ON s.level_number = c.level_number +ORDER BY s.level_number;`, + stepsArr, + strings.Join(selectCols, ",\n "), + where, + windowArgs, + ) + + return q, nil +} diff --git a/backend/pkg/analytics/charts/metric_heatmaps.go b/backend/pkg/analytics/charts/metric_heatmaps.go new file mode 100644 index 000000000..77bc17655 --- /dev/null +++ b/backend/pkg/analytics/charts/metric_heatmaps.go @@ -0,0 +1,100 @@ +package charts + +import ( + "fmt" + "openreplay/backend/pkg/analytics/db" + "strings" +) + +type HeatmapPoint struct { + NormalizedX float64 `json:"normalizedX"` + NormalizedY float64 `json:"normalizedY"` +} + +type HeatmapResponse struct { + Points []HeatmapPoint `json:"data"` +} + +type HeatmapQueryBuilder struct{} + +func (h HeatmapQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) { + q, err := h.buildQuery(p) + if err != nil { + return nil, err + } + rows, err := conn.Query(q) + if err != nil { + return nil, err + } + defer rows.Close() + + var pts []HeatmapPoint + for rows.Next() { + var x, y float64 + if err := rows.Scan(&x, &y); err != nil { + return nil, err + } + pts = append(pts, HeatmapPoint{x, y}) + } + + return HeatmapResponse{ + Points: pts, + }, nil +} + +func (h HeatmapQueryBuilder) buildQuery(p Payload) (string, error) { + if len(p.MetricPayload.Series) == 0 { + return "", fmt.Errorf("series empty") + } + s := p.MetricPayload.Series[0] + + var globalFilters, eventFilters []Filter + for _, flt := range s.Filter.Filters { + if flt.IsEvent { + eventFilters = append(eventFilters, flt) + } else { + globalFilters = append(globalFilters, flt) + } + } + + globalConds, _ := buildEventConditions(globalFilters, BuildConditionsOptions{ + DefinedColumns: mainColumns, + MainTableAlias: "e", + }) + + eventConds, _ := buildEventConditions(eventFilters, BuildConditionsOptions{ + DefinedColumns: mainColumns, + MainTableAlias: "e", + }) + + base := []string{ + fmt.Sprintf("e.created_at >= toDateTime(%d/1000)", p.MetricPayload.StartTimestamp), + fmt.Sprintf("e.created_at < toDateTime(%d/1000)", p.MetricPayload.EndTimestamp), + fmt.Sprintf("e.project_id = %d", p.ProjectId), + "e.session_id IS NOT NULL", + "e.`$event_name` = 'CLICK'", + } + base = append(base, globalConds...) + + //if len(globalNames) > 0 { + // base = append(base, "e.`$event_name` IN ("+buildInClause(globalNames)+")") + //} + + //if len(eventNames) > 0 { + // base = append(base, "e.`$event_name` IN ("+buildInClause(eventNames)+")") + //} + + base = append(base, eventConds...) + + where := strings.Join(base, " AND ") + + q := fmt.Sprintf(` +SELECT + JSONExtractFloat(toString(e."$properties"), 'normalized_x') AS normalized_x, + JSONExtractFloat(toString(e."$properties"), 'normalized_y') AS normalized_y +FROM product_analytics.events AS e +-- JOIN experimental.sessions AS s USING(session_id) +WHERE %s LIMIT 500;`, where) + + return q, nil +} diff --git a/backend/pkg/analytics/charts/metric_heatmaps_session.go b/backend/pkg/analytics/charts/metric_heatmaps_session.go new file mode 100644 index 000000000..b8ab2c5d5 --- /dev/null +++ b/backend/pkg/analytics/charts/metric_heatmaps_session.go @@ -0,0 +1,96 @@ +package charts + +import ( + "fmt" + "openreplay/backend/pkg/analytics/db" + "strings" +) + +type HeatmapSessionResponse struct { + SessionID uint64 `json:"session_id"` + StartTs uint64 `json:"start_ts"` + Duration uint32 `json:"duration"` + EventTimestamp uint64 `json:"event_timestamp"` +} + +type HeatmapSessionQueryBuilder struct{} + +func (h HeatmapSessionQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) { + shortestQ, err := h.buildQuery(p) + if err != nil { + return nil, err + } + var sid uint64 + var startTs uint64 + var duration uint32 + var eventTs uint64 + row, err := conn.QueryRow(shortestQ) + if err != nil { + return nil, err + } + + if err := row.Scan(&sid, &startTs, &duration, &eventTs); err != nil { + return nil, err + } + + // TODO get mob urls + + return HeatmapSessionResponse{ + SessionID: sid, + StartTs: startTs, + Duration: duration, + EventTimestamp: eventTs, + }, nil +} + +func (h HeatmapSessionQueryBuilder) buildQuery(p Payload) (string, error) { + if len(p.MetricPayload.Series) == 0 { + return "", fmt.Errorf("series empty") + } + s := p.MetricPayload.Series[0] + + var globalFilters, eventFilters []Filter + for _, flt := range s.Filter.Filters { + if flt.IsEvent { + eventFilters = append(eventFilters, flt) + } else { + globalFilters = append(globalFilters, flt) + } + } + + globalConds, _ := buildEventConditions(globalFilters, BuildConditionsOptions{ + DefinedColumns: mainColumns, + MainTableAlias: "e", + }) + + eventConds, _ := buildEventConditions(eventFilters, BuildConditionsOptions{ + DefinedColumns: mainColumns, + MainTableAlias: "e", + }) + + base := []string{ + fmt.Sprintf("e.created_at >= toDateTime(%d/1000)", p.MetricPayload.StartTimestamp), + fmt.Sprintf("e.created_at < toDateTime(%d/1000)", p.MetricPayload.EndTimestamp+86400000), + fmt.Sprintf("e.project_id = %d", p.ProjectId), + "s.duration > 500", + "e.`$event_name` = 'LOCATION'", + } + base = append(base, eventConds...) + base = append(base, globalConds...) + + where := strings.Join(base, " AND ") + + q := fmt.Sprintf(` + SELECT + s.session_id, + toUnixTimestamp(s.datetime) * 1000 as startTs, + s.duration, + toUnixTimestamp(e.created_at) * 1000 as eventTs + FROM product_analytics.events AS e + JOIN experimental.sessions AS s USING(session_id) + WHERE %s + ORDER BY e.created_at ASC, s.duration ASC + LIMIT 1;`, where) + + return q, nil +} diff --git a/backend/pkg/analytics/charts/metric_table.go b/backend/pkg/analytics/charts/metric_table.go new file mode 100644 index 000000000..d5251b567 --- /dev/null +++ b/backend/pkg/analytics/charts/metric_table.go @@ -0,0 +1,241 @@ +package charts + +import ( + "fmt" + "log" + "openreplay/backend/pkg/analytics/db" + "strings" +) + +var validMetricOfValues = map[MetricOfTable]struct{}{ + MetricOfTableBrowser: {}, + MetricOfTableDevice: {}, + MetricOfTableCountry: {}, + MetricOfTableUserId: {}, + MetricOfTableLocation: {}, + MetricOfTableReferrer: {}, + MetricOfTableFetch: {}, +} + +type TableQueryBuilder struct{} + +type TableValue struct { + Name string `json:"name"` + Total uint64 `json:"total"` +} + +type TableResponse struct { + Total uint64 `json:"total"` + Count uint64 `json:"count"` + Values []TableValue `json:"values"` +} + +const ( + MetricFormatSessionCount = "sessionCount" + MetricFormatUserCount = "userCount" + nilUUIDString = "00000000-0000-0000-0000-000000000000" +) + +var propertySelectorMap = map[string]string{ + string(MetricOfTableLocation): "JSONExtractString(toString(main.$properties), 'url_path') AS metric_value", + //string(MetricOfTableUserId): "if(empty(sessions.user_id), 'Anonymous', sessions.user_id) AS metric_value", + string(MetricOfTableUserId): "if(empty(sessions.user_id) OR sessions.user_id IS NULL, 'Anonymous', sessions.user_id) AS metric_value", + string(MetricOfTableBrowser): "main.$browser AS metric_value", + //string(MetricOfTableDevice): "sessions.user_device AS metric_value", + string(MetricOfTableDevice): "if(empty(sessions.user_device) OR sessions.user_device IS NULL, 'Undefined', sessions.user_device) AS metric_value", + string(MetricOfTableCountry): "toString(sessions.user_country) AS metric_value", + string(MetricOfTableReferrer): "main.$referrer AS metric_value", + string(MetricOfTableFetch): "JSONExtractString(toString(main.$properties), 'url_path') AS metric_value", +} + +var mainColumns = map[string]string{ + "userBrowser": "$browser", + "userDevice": "sessions.user_device", + "referrer": "$referrer", + "fetchDuration": "$duration_s", + "ISSUE": "issue_type", +} + +func (t TableQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) { + if p.MetricOf == "" { + return nil, fmt.Errorf("MetricOf is empty") + } + + if _, ok := validMetricOfValues[MetricOfTable(p.MetricOf)]; !ok { + return nil, fmt.Errorf("invalid MetricOf value: %s", p.MetricOf) + } + + metricFormat := p.MetricFormat + if metricFormat != MetricFormatSessionCount && metricFormat != MetricFormatUserCount { + metricFormat = MetricFormatSessionCount + } + + query, err := t.buildQuery(p, metricFormat) + if err != nil { + return nil, fmt.Errorf("error building query: %w", err) + } + + rows, err := conn.Query(query) + if err != nil { + log.Printf("Error executing query: %s\nQuery: %s", err, query) + return nil, fmt.Errorf("error executing query: %w", err) + } + defer rows.Close() + + var overallTotalMetricValues uint64 + var overallCount uint64 + values := make([]TableValue, 0) + firstRow := true + + for rows.Next() { + var ( + name string + valueSpecificCount uint64 + tempOverallTotalMetricValues uint64 + tempOverallCount uint64 + ) + + if err := rows.Scan(&tempOverallTotalMetricValues, &name, &valueSpecificCount, &tempOverallCount); err != nil { + return nil, fmt.Errorf("error scanning row: %w", err) + } + + if firstRow { + overallTotalMetricValues = tempOverallTotalMetricValues + overallCount = tempOverallCount + firstRow = false + } + values = append(values, TableValue{Name: name, Total: valueSpecificCount}) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating rows: %w", err) + } + + return &TableResponse{ + Total: overallTotalMetricValues, + Count: overallCount, + Values: values, + }, nil +} + +func (t TableQueryBuilder) buildQuery(r Payload, metricFormat string) (string, error) { + if len(r.Series) == 0 { + return "", fmt.Errorf("payload Series cannot be empty") + } + s := r.Series[0] + + // sessions_data WHERE conditions + durConds, _ := buildDurationWhere(s.Filter.Filters) + sessFilters, _ := filterOutTypes(s.Filter.Filters, []FilterType{FilterDuration, FilterUserAnonymousId}) + sessConds, evtNames := buildEventConditions(sessFilters, BuildConditionsOptions{DefinedColumns: mainColumns, MainTableAlias: "main"}) + sessionDataConds := append(durConds, sessConds...) + // date range for sessions_data + sessionDataConds = append(sessionDataConds, + fmt.Sprintf("main.created_at BETWEEN toDateTime(%d/1000) AND toDateTime(%d/1000)", r.StartTimestamp, r.EndTimestamp), + ) + // clean empty + var sdClean []string + for _, c := range sessionDataConds { + if strings.TrimSpace(c) != "" { + sdClean = append(sdClean, c) + } + } + sessionDataWhere := "" + if len(sdClean) > 0 { + sessionDataWhere = "WHERE " + strings.Join(sdClean, " AND ") + } + if len(evtNames) > 0 { + sessionDataWhere += fmt.Sprintf(" AND main.$event_name IN ('%s')", strings.Join(evtNames, "','")) + } + + // filtered_data WHERE conditions + propSel, ok := propertySelectorMap[r.MetricOf] + if !ok { + propSel = fmt.Sprintf("JSONExtractString(toString(main.$properties), '%s') AS metric_value", r.MetricOf) + } + parts := strings.SplitN(propSel, " AS ", 2) + propertyExpr := parts[0] + + tAgg := "main.session_id" + specConds := []string{} + if metricFormat == MetricFormatUserCount { + tAgg = "if(empty(sessions.user_id), toString(sessions.user_uuid), sessions.user_id)" + specConds = append(specConds, + fmt.Sprintf("NOT (empty(sessions.user_id) AND (sessions.user_uuid IS NULL OR sessions.user_uuid = '%s'))", nilUUIDString), + ) + } + + // metric-specific filter + _, mFilt := filterOutTypes(s.Filter.Filters, []FilterType{FilterType(r.MetricOf)}) + metricCond := eventNameCondition("", r.MetricOf) + if len(mFilt) > 0 { + //conds, _ := buildEventConditions(mFilt, BuildConditionsOptions{DefinedColumns: map[string]string{"userId": "user_id"}, MainTableAlias: "main"}) + //metricCond = strings.Join(conds, " AND ") + } + + filteredConds := []string{ + fmt.Sprintf("main.project_id = %d", r.ProjectId), + metricCond, + fmt.Sprintf("main.created_at BETWEEN toDateTime(%d/1000) AND toDateTime(%d/1000)", r.StartTimestamp, r.EndTimestamp), + } + filteredConds = append(filteredConds, specConds...) + // clean empty + var fClean []string + for _, c := range filteredConds { + if strings.TrimSpace(c) != "" { + fClean = append(fClean, c) + } + } + filteredWhere := "" + if len(fClean) > 0 { + filteredWhere = "WHERE " + strings.Join(fClean, " AND ") + } + + limit := r.Limit + if limit <= 0 { + limit = 10 + } + offset := (r.Page - 1) * limit + + query := fmt.Sprintf(` +WITH sessions_data AS ( + SELECT session_id + FROM product_analytics.events AS main + JOIN experimental.sessions AS sessions USING (session_id) + %s + GROUP BY session_id +), +filtered_data AS ( + SELECT %s AS name, %s AS session_id + FROM product_analytics.events AS main + JOIN sessions_data USING (session_id) + JOIN experimental.sessions AS sessions USING (session_id) + %s +), +totals AS ( + SELECT count() AS overall_total_metric_values, + countDistinct(session_id) AS overall_total_count + FROM filtered_data +), +grouped_values AS ( + SELECT name, + countDistinct(session_id) AS value_count + FROM filtered_data + GROUP BY name +) +SELECT t.overall_total_metric_values, + g.name, + g.value_count, + t.overall_total_count +FROM grouped_values AS g +CROSS JOIN totals AS t +ORDER BY g.value_count DESC +LIMIT %d OFFSET %d;`, + sessionDataWhere, + propertyExpr, + tAgg, + filteredWhere, + limit, + offset, + ) + return query, nil +} diff --git a/backend/pkg/analytics/charts/metric_table_errors.go b/backend/pkg/analytics/charts/metric_table_errors.go new file mode 100644 index 000000000..96d70360c --- /dev/null +++ b/backend/pkg/analytics/charts/metric_table_errors.go @@ -0,0 +1,188 @@ +package charts + +import ( + "fmt" + "log" + "strings" + + "openreplay/backend/pkg/analytics/db" +) + +type TableErrorsQueryBuilder struct{} + +type ErrorChartPoint struct { + Timestamp int64 `json:"timestamp"` + Count uint64 `json:"count"` +} + +type ErrorItem struct { + ErrorID string `json:"errorId"` + Name string `json:"name"` + Message string `json:"message"` + Users uint64 `json:"users"` + Total uint64 `json:"total"` + Sessions uint64 `json:"sessions"` + FirstOccurrence int64 `json:"firstOccurrence"` + LastOccurrence int64 `json:"lastOccurrence"` + Chart []ErrorChartPoint `json:"chart"` +} + +type TableErrorsResponse struct { + Total uint64 `json:"total"` + Errors []ErrorItem `json:"errors"` +} + +func (t TableErrorsQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) { + query, err := t.buildQuery(p) + if err != nil { + return nil, err + } + rows, err := conn.Query(query) + if err != nil { + log.Printf("Error executing query: %s\nQuery: %s", err, query) + return nil, err + } + defer rows.Close() + + var resp TableErrorsResponse + for rows.Next() { + var e ErrorItem + var ts []int64 + var cs []uint64 + if err := rows.Scan( + &e.ErrorID, &e.Name, &e.Message, + &e.Users, &e.Total, &e.Sessions, + &e.FirstOccurrence, &e.LastOccurrence, + &ts, &cs, + ); err != nil { + return nil, err + } + for i := range ts { + e.Chart = append(e.Chart, ErrorChartPoint{Timestamp: ts[i], Count: cs[i]}) + } + resp.Errors = append(resp.Errors, e) + } + resp.Total = uint64(len(resp.Errors)) + return resp, nil +} + +func (t TableErrorsQueryBuilder) buildQuery(p Payload) (string, error) { + if len(p.Series) == 0 { + return "", fmt.Errorf("payload Series cannot be empty") + } + + density := p.Density + if density < 2 { + density = 7 + } + durMs := p.EndTimestamp - p.StartTimestamp + stepMs := durMs / int64(density-1) + startMs := (p.StartTimestamp / 1000) * 1000 + endMs := (p.EndTimestamp / 1000) * 1000 + + limit := p.Limit + if limit <= 0 { + limit = 10 + } + page := p.Page + if page <= 0 { + page = 1 + } + offset := (page - 1) * limit + + ef, en := buildEventConditions( + p.Series[0].Filter.Filters, + BuildConditionsOptions{DefinedColumns: mainColumns}, + ) + conds := []string{ + "`$event_name` = 'ERROR'", + fmt.Sprintf("project_id = %d", p.ProjectId), + fmt.Sprintf("created_at >= toDateTime(%d/1000)", startMs), + fmt.Sprintf("created_at <= toDateTime(%d/1000)", endMs), + } + if len(ef) > 0 { + conds = append(conds, ef...) + } + if len(en) > 0 { + conds = append(conds, "`$event_name` IN ("+buildInClause(en)+")") + } + whereClause := strings.Join(conds, " AND ") + + sql := fmt.Sprintf(`WITH + events AS ( + SELECT + error_id, + JSONExtractString(toString("$properties"), 'name') AS name, + JSONExtractString(toString("$properties"), 'message') AS message, + distinct_id, + session_id, + created_at + FROM product_analytics.events + WHERE %s + ), + sessions_per_interval AS ( + SELECT + error_id, + toUInt64(%d + (toUInt64((toUnixTimestamp64Milli(created_at) - %d) / %d) * %d)) AS bucket_ts, + countDistinct(session_id) AS session_count + FROM events + GROUP BY error_id, bucket_ts + ), + buckets AS ( + SELECT + toUInt64(generate_series) AS bucket_ts + FROM generate_series( + %d, + %d, + %d + ) + ), + error_meta AS ( + SELECT + error_id, + name, + message, + countDistinct(distinct_id) AS users, + count() AS total, + countDistinct(session_id) AS sessions, + min(created_at) AS first_occurrence, + max(created_at) AS last_occurrence + FROM events + GROUP BY error_id, name, message + ), + error_chart AS ( + SELECT + e.error_id AS error_id, + groupArray(b.bucket_ts) AS timestamps, + groupArray(coalesce(s.session_count, 0)) AS counts + FROM (SELECT DISTINCT error_id FROM events) AS e + CROSS JOIN buckets AS b + LEFT JOIN sessions_per_interval AS s + ON s.error_id = e.error_id + AND s.bucket_ts = b.bucket_ts + GROUP BY e.error_id + ) +SELECT + m.error_id, + m.name, + m.message, + m.users, + m.total, + m.sessions, + toUnixTimestamp64Milli(toDateTime64(m.first_occurrence, 3)) AS first_occurrence, + toUnixTimestamp64Milli(toDateTime64(m.last_occurrence, 3)) AS last_occurrence, + ec.timestamps, + ec.counts +FROM error_meta AS m +LEFT JOIN error_chart AS ec + ON m.error_id = ec.error_id +ORDER BY m.last_occurrence DESC +LIMIT %d OFFSET %d;`, + whereClause, + startMs, startMs, stepMs, stepMs, // New formula parameters + startMs, endMs, stepMs, + limit, offset, + ) + + return sql, nil +} diff --git a/backend/pkg/analytics/charts/metric_timeseries.go b/backend/pkg/analytics/charts/metric_timeseries.go new file mode 100644 index 000000000..00f3e9325 --- /dev/null +++ b/backend/pkg/analytics/charts/metric_timeseries.go @@ -0,0 +1,147 @@ +package charts + +import ( + "fmt" + "log" + "openreplay/backend/pkg/analytics/db" + "sort" + "strings" +) + +type TimeSeriesQueryBuilder struct{} + +func (t TimeSeriesQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) { + data := make(map[uint64]map[string]uint64) + for _, series := range p.Series { + query, err := t.buildQuery(p, series) + if err != nil { + log.Printf("buildQuery %s: %v", series.Name, err) + return nil, fmt.Errorf("series %s: %v", series.Name, err) + } + rows, err := conn.Query(query) + if err != nil { + log.Printf("exec %s: %v", series.Name, err) + return nil, fmt.Errorf("series %s: %v", series.Name, err) + } + var pts []DataPoint + for rows.Next() { + var dp DataPoint + if err := rows.Scan(&dp.Timestamp, &dp.Count); err != nil { + rows.Close() + return nil, err + } + pts = append(pts, dp) + } + rows.Close() + + filled := FillMissingDataPoints(p.StartTimestamp, p.EndTimestamp, p.Density, DataPoint{}, pts, 1000) + for _, dp := range filled { + if data[dp.Timestamp] == nil { + data[dp.Timestamp] = map[string]uint64{} + } + data[dp.Timestamp][series.Name] = dp.Count + } + } + + var timestamps []uint64 + for ts := range data { + timestamps = append(timestamps, ts) + } + sort.Slice(timestamps, func(i, j int) bool { return timestamps[i] < timestamps[j] }) + + var result []map[string]interface{} + for _, ts := range timestamps { + row := map[string]interface{}{"timestamp": ts} + for _, series := range p.Series { + row[series.Name] = data[ts][series.Name] + } + result = append(result, row) + } + return result, nil +} + +func (t TimeSeriesQueryBuilder) buildQuery(p Payload, s Series) (string, error) { + switch p.MetricOf { + case "sessionCount": + return t.buildTimeSeriesQuery(p, s, "sessionCount", "session_id"), nil + case "userCount": + return t.buildTimeSeriesQuery(p, s, "userCount", "user_id"), nil + default: + return "", fmt.Errorf("unsupported metric %q", p.MetricOf) + } +} + +func (t TimeSeriesQueryBuilder) buildTimeSeriesQuery(p Payload, s Series, metric, idField string) string { + sub := t.buildSubQuery(p, s, metric) + step := int(getStepSize(p.StartTimestamp, p.EndTimestamp, p.Density, false, 1000)) * 1000 + + return fmt.Sprintf( + "SELECT gs.generate_series AS timestamp, COALESCE(COUNT(DISTINCT ps.%s),0) AS count "+ + "FROM generate_series(%d,%d,%d) AS gs "+ + "LEFT JOIN (%s) AS ps ON TRUE "+ + "WHERE ps.datetime >= toDateTime(timestamp/1000) AND ps.datetime < toDateTime((timestamp+%d)/1000) "+ + "GROUP BY timestamp ORDER BY timestamp;", + idField, p.StartTimestamp, p.EndTimestamp, step, sub, step, + ) +} + +func (t TimeSeriesQueryBuilder) buildSubQuery(p Payload, s Series, metric string) string { + evConds, evNames := buildEventConditions(s.Filter.Filters, BuildConditionsOptions{ + DefinedColumns: mainColumns, + MainTableAlias: "main", + PropertiesColumnName: "$properties", + }) + sessConds := buildSessionConditions(s.Filter.Filters) + staticEvt := buildStaticEventWhere(p) + sessWhere, sessJoin := buildStaticSessionWhere(p, sessConds) + + if len(evConds) == 0 && len(evNames) == 0 { + if metric == "sessionCount" { + return fmt.Sprintf( + "SELECT s.session_id AS session_id, s.datetime AS datetime "+ + "FROM experimental.sessions AS s WHERE %s", + sessJoin, + ) + } + return fmt.Sprintf( + "SELECT multiIf(s.user_id!='',s.user_id,s.user_anonymous_id!='',s.user_anonymous_id,toString(s.user_uuid)) AS user_id, s.datetime AS datetime "+ + "FROM experimental.sessions AS s WHERE %s", + sessJoin, + ) + } + + uniq := make([]string, 0, len(evNames)) + for _, name := range evNames { + if !contains(uniq, name) { + uniq = append(uniq, name) + } + } + nameClause := "" + if len(uniq) > 0 { + nameClause = fmt.Sprintf("AND main.`$event_name` IN (%s) ", buildInClause(uniq)) + } + + having := "" + if len(evConds) > 0 { + having = buildHavingClause(evConds) + } + + whereEvt := staticEvt + if len(evConds) > 0 { + whereEvt += " AND " + strings.Join(evConds, " AND ") + } + + proj := map[string]string{ + "sessionCount": "s.session_id AS session_id", + "userCount": "multiIf(s.user_id!='',s.user_id,s.user_anonymous_id!='',s.user_anonymous_id,toString(s.user_uuid)) AS user_id", + }[metric] + ", s.datetime AS datetime" + + return fmt.Sprintf( + "SELECT %s FROM (SELECT main.session_id, MIN(main.created_at) AS first_event_ts, MAX(main.created_at) AS last_event_ts "+ + "FROM product_analytics.events AS main "+ + "WHERE %s AND main.session_id IN (SELECT s.session_id FROM experimental.sessions AS s WHERE %s) %s "+ + "GROUP BY main.session_id %s "+ + "INNER JOIN (SELECT * FROM experimental.sessions AS s WHERE %s) AS s ON s.session_id=f.session_id", + proj, whereEvt, sessWhere, nameClause, having, sessJoin, + ) +} diff --git a/backend/pkg/analytics/charts/metric_user_journey.go b/backend/pkg/analytics/charts/metric_user_journey.go new file mode 100644 index 000000000..413f2289e --- /dev/null +++ b/backend/pkg/analytics/charts/metric_user_journey.go @@ -0,0 +1,764 @@ +package charts + +import ( + "fmt" + "math" + "openreplay/backend/pkg/analytics/db" + "sort" + "strconv" + "strings" + "time" +) + +// Node represents a point in the journey diagram. +type Node struct { + Depth int `json:"depth"` + Name string `json:"name"` + EventType string `json:"eventType"` + ID int `json:"id"` + StartingNode bool `json:"startingNode"` +} + +// Link represents a transition between nodes. +type Link struct { + EventType string `json:"eventType"` + SessionsCount int `json:"sessionsCount"` + Value float64 `json:"value"` + Source int `json:"source"` + Target int `json:"target"` +} + +// JourneyData holds all nodes and links for the response. +type JourneyData struct { + Nodes []Node `json:"nodes"` + Links []Link `json:"links"` +} + +// JourneyResponse is the API response structure. +type JourneyResponse struct { + Data JourneyData `json:"data"` +} + +// UserJourneyQueryBuilder builds and executes the journey query. +type UserJourneyQueryBuilder struct{} + +func (h UserJourneyQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) { + q, err := h.buildQuery(p) + if err != nil { + return nil, err + } + rows, err := conn.Query(q) + if err != nil { + return nil, err + } + defer rows.Close() + + type row struct { + Stage int64 + CurrentEventName string + CurrentEventProperty string + PrevEventName string + PrevEventProperty string + SessionsCount uint64 + } + + // Parse all rows into a slice + var rawData []row + for rows.Next() { + var r row + if err := rows.Scan( + &r.Stage, + &r.CurrentEventName, + &r.CurrentEventProperty, + &r.PrevEventName, + &r.PrevEventProperty, + &r.SessionsCount, + ); err != nil { + return nil, err + } + + if r.SessionsCount == 0 { + continue + } + + rawData = append(rawData, r) + } + + // Group data by stage + dataByStage := make(map[int64][]row) + var minStage int64 = 0 + var maxStage int64 = 0 + + for _, r := range rawData { + dataByStage[r.Stage] = append(dataByStage[r.Stage], r) + if r.Stage > maxStage { + maxStage = r.Stage + } + if r.Stage < minStage { + minStage = r.Stage + } + } + + // Calculate total sessions per stage + stageTotals := make(map[int64]uint64) + for stage, stageRows := range dataByStage { + for _, r := range stageRows { + stageTotals[stage] += r.SessionsCount + } + } + + // Determine base count for percentage calculations + // We'll use the starting point (usually stage 1) as our base + var baseSessionsCount uint64 + if count, exists := stageTotals[1]; exists { + baseSessionsCount = count + } else { + // If stage 1 doesn't exist, use the first available positive stage + for stage := int64(0); stage <= maxStage; stage++ { + if count, exists := stageTotals[stage]; exists { + baseSessionsCount = count + break + } + } + } + + if baseSessionsCount == 0 { + baseSessionsCount = 1 // Prevent division by zero + } + + // Number of top nodes to display per stage + topLimit := int(p.Rows) + if topLimit <= 0 { + topLimit = 5 // Default if not specified + } + + // Step 1: Determine the top paths at each stage based on destination + type pathKey struct { + eventName string + eventProp string + } + + // Map to store top paths for each stage + topPathsByStage := make(map[int64]map[pathKey]bool) + pathCountsByStage := make(map[int64]map[pathKey]uint64) + + for stage := minStage; stage <= maxStage; stage++ { + // Skip if this stage has no data + if _, exists := dataByStage[stage]; !exists { + continue + } + + // Sort rows within each stage by session count (descending) + sort.Slice(dataByStage[stage], func(i, j int) bool { + return dataByStage[stage][i].SessionsCount > dataByStage[stage][j].SessionsCount + }) + + // Initialize maps for this stage + topPathsByStage[stage] = make(map[pathKey]bool) + pathCountsByStage[stage] = make(map[pathKey]uint64) + + // First, aggregate by path to get total sessions per path + for _, r := range dataByStage[stage] { + key := pathKey{eventName: r.CurrentEventName, eventProp: r.CurrentEventProperty} + pathCountsByStage[stage][key] += r.SessionsCount + } + + // Then sort paths by session count + type pathCount struct { + path pathKey + count uint64 + } + + var paths []pathCount + for path, count := range pathCountsByStage[stage] { + paths = append(paths, pathCount{path: path, count: count}) + } + + // Sort descending by count + sort.Slice(paths, func(i, j int) bool { + return paths[i].count > paths[j].count + }) + + // Mark top paths - take exactly topLimit or all if fewer available + for i, pc := range paths { + if i < topLimit { + topPathsByStage[stage][pc.path] = true + } + } + } + + // Step 2: Create a normalized sequential depth mapping + // First, gather all stages that have data + var stagesWithData []int64 + for stage := range dataByStage { + stagesWithData = append(stagesWithData, stage) + } + + // Sort stages + sort.Slice(stagesWithData, func(i, j int) bool { + return stagesWithData[i] < stagesWithData[j] + }) + + var startingStage int64 + for _, s := range stagesWithData { + if s > 0 { + startingStage = s + break + } + } + + // Create a mapping from logical stage to display depth (ensuring no gaps) + stageToDepth := make(map[int64]int) + for i, stage := range stagesWithData { + stageToDepth[stage] = i + } + + // Determine depth of central node (stage 1 or equivalent) + var centralDepth int + if depth, exists := stageToDepth[1]; exists { + centralDepth = depth + } else { + // If stage 1 doesn't exist, use the first positive stage + for _, stage := range stagesWithData { + if stage > 0 { + centralDepth = stageToDepth[stage] + break + } + } + } + + // Step 3: Create nodes with normalized depths + var nodes []Node + var links []Link + nodeID := 0 + + // Maps to track nodes and sessions + nodeMap := make(map[string]int) // Stage|EventName|EventProp → nodeID + othersNodes := make(map[int64]int) // stage → "Others" nodeID + dropNodes := make(map[int64]int) // stage → "Drop" nodeID + + incomingSessions := make(map[int]uint64) // nodeID → incoming sessions + outgoingSessions := make(map[int]uint64) // nodeID → outgoing sessions + + // Create all nodes using normalized depths + for _, stage := range stagesWithData { + displayDepth := stageToDepth[stage] + + // Create regular nodes for top paths + for path := range topPathsByStage[stage] { + nodeKey := fmt.Sprintf("%d|%s|%s", stage, path.eventName, path.eventProp) + nodeMap[nodeKey] = nodeID + + nodes = append(nodes, Node{ + ID: nodeID, + Depth: displayDepth, + Name: path.eventProp, + EventType: path.eventName, + StartingNode: stage == startingStage, + }) + + // For the central stage (usually stage 1) or first stage, set incoming sessions + if (stage == 1) || (stage == minStage && minStage != 1) { + incomingSessions[nodeID] = pathCountsByStage[stage][path] + } + + nodeID++ + } + + // Calculate if we need an "Others" node (when total paths > topLimit) + totalPaths := len(pathCountsByStage[stage]) + if totalPaths > topLimit { + // Calculate sessions that will go to Others + othersCount := uint64(0) + for path, count := range pathCountsByStage[stage] { + if !topPathsByStage[stage][path] { + othersCount += count + } + } + + // Only create Others if it has sessions + if othersCount > 0 { + othersNodes[stage] = nodeID + + nodes = append(nodes, Node{ + ID: nodeID, + Depth: displayDepth, + Name: "other", + EventType: "OTHER", + StartingNode: stage == startingStage, + }) + + // For the central stage or first stage, set incoming sessions for Others + if (stage == 1) || (stage == minStage && minStage != 1) { + incomingSessions[nodeID] = othersCount + } + + nodeID++ + } + } + } + + // Step 4: Create links between adjacent nodes only + // Use a map to deduplicate links + type linkKey struct { + src int + tgt int + } + linkSessions := make(map[linkKey]uint64) + linkTypes := make(map[linkKey]string) + + // For each stage (except the first), create links from the previous stage + for i := 1; i < len(stagesWithData); i++ { + currentStage := stagesWithData[i] + prevStage := stagesWithData[i-1] + + for _, r := range dataByStage[currentStage] { + // Skip if previous stage doesn't match expected + if r.Stage != currentStage { + continue + } + + // Determine source node + prevPathKey := fmt.Sprintf("%d|%s|%s", prevStage, r.PrevEventName, r.PrevEventProperty) + srcID, hasSrc := nodeMap[prevPathKey] + + if !hasSrc { + // If source isn't a top node, use Others from previous stage + if othersID, hasOthers := othersNodes[prevStage]; hasOthers { + srcID = othersID + hasSrc = true + } else { + // Skip if we can't find a source + continue + } + } + + // Determine target node + curPath := pathKey{eventName: r.CurrentEventName, eventProp: r.CurrentEventProperty} + var tgtID int + var hasTgt bool + + // Check if this path is in the top paths for this stage + if topPathsByStage[currentStage][curPath] { + // It's a top node + curPathKey := fmt.Sprintf("%d|%s|%s", currentStage, r.CurrentEventName, r.CurrentEventProperty) + tgtID = nodeMap[curPathKey] + hasTgt = true + } else { + // It's part of Others + if othersID, hasOthers := othersNodes[currentStage]; hasOthers { + tgtID = othersID + hasTgt = true + } + } + + if !hasSrc || !hasTgt { + continue + } + + // Update session tracking + incomingSessions[tgtID] += r.SessionsCount + outgoingSessions[srcID] += r.SessionsCount + + // Record link (deduplicating) + lk := linkKey{src: srcID, tgt: tgtID} + linkSessions[lk] += r.SessionsCount + + // Prefer non-OTHER event type + if linkTypes[lk] == "" || linkTypes[lk] == "OTHER" { + linkTypes[lk] = r.CurrentEventName + } + } + } + + // Create deduplicated links with proper percentages + for lk, count := range linkSessions { + // Calculate percentage based on baseSessionsCount + percent := math.Round(float64(count)*10000/float64(baseSessionsCount)) / 100 + + links = append(links, Link{ + Source: lk.src, + Target: lk.tgt, + SessionsCount: int(count), + Value: percent, + EventType: linkTypes[lk], + }) + } + + // Step 5: Calculate drops and create drop nodes (only for stages ≥ 0) + // Process forward drops (positive stages only) + for i := 0; i < len(stagesWithData)-1; i++ { + stage := stagesWithData[i] + + // Skip negative stages for drops + if stage < 0 { + continue + } + + // Calculate new drops at this stage + stageDrops := uint64(0) + dropsFromNode := make(map[int]uint64) // nodeID -> drop count + + for _, node := range nodes { + nodeDepth := node.Depth + + // Skip if this node isn't in the current stage + if nodeDepth != stageToDepth[stage] { + continue + } + + incoming := incomingSessions[node.ID] + outgoing := outgoingSessions[node.ID] + + if incoming > outgoing { + dropCount := incoming - outgoing + dropsFromNode[node.ID] = dropCount + stageDrops += dropCount + } + } + + // Skip if no drops + if stageDrops == 0 { + continue + } + + // Determine next stage depth for drop node positioning + var dropDepth int + if i+1 < len(stagesWithData) { + dropDepth = stageToDepth[stagesWithData[i+1]] + } else { + dropDepth = stageToDepth[stage] + 1 + } + + // Create drop node + dropNodes[stage] = nodeID + + nodes = append(nodes, Node{ + ID: nodeID, + Depth: dropDepth, + Name: "drop", + EventType: "DROP", + }) + + // Create links from nodes with drops to the drop node + for nid, dropCount := range dropsFromNode { + if dropCount == 0 { + continue + } + + // Calculate percentage based on baseSessionsCount + percent := math.Round(float64(dropCount)*10000/float64(baseSessionsCount)) / 100 + + links = append(links, Link{ + Source: nid, + Target: nodeID, + SessionsCount: int(dropCount), + Value: percent, + EventType: "DROP", + }) + } + + // Link previous drop node to current drop node to show accumulation + if i > 0 { + for j := i - 1; j >= 0; j-- { + prevStage := stagesWithData[j] + if prevDropID, hasPrevDrop := dropNodes[prevStage]; hasPrevDrop { + // Link previous drop to current drop to show accumulation + prevDropCount := uint64(0) + for _, link := range links { + if link.Target == prevDropID && link.EventType == "DROP" { + prevDropCount += uint64(link.SessionsCount) + } + } + + percent := math.Round(float64(prevDropCount)*10000/float64(baseSessionsCount)) / 100 + + links = append(links, Link{ + Source: prevDropID, + Target: nodeID, + SessionsCount: int(prevDropCount), + Value: percent, + EventType: "DROP", + }) + break + } + } + } + + nodeID++ + } + + // Filter out nodes with no connections + nodeHasConnection := make(map[int]bool) + for _, link := range links { + nodeHasConnection[link.Source] = true + nodeHasConnection[link.Target] = true + } + + // Make sure central nodes are included even if they don't have links + for _, node := range nodes { + if node.Depth == centralDepth { + nodeHasConnection[node.ID] = true + } + } + + var filteredNodes []Node + for _, node := range nodes { + if nodeHasConnection[node.ID] { + filteredNodes = append(filteredNodes, node) + } + } + + // Reassign IDs to be sequential + nodeIDMap := make(map[int]int) + var finalNodes []Node = make([]Node, 0, len(filteredNodes)) + + for newID, node := range filteredNodes { + nodeIDMap[node.ID] = newID + node.ID = newID + finalNodes = append(finalNodes, node) + } + + // Update link references + var finalLinks []Link = make([]Link, 0, len(links)) + for _, link := range links { + srcID, srcExists := nodeIDMap[link.Source] + tgtID, tgtExists := nodeIDMap[link.Target] + + if srcExists && tgtExists { + link.Source = srcID + link.Target = tgtID + finalLinks = append(finalLinks, link) + } + } + + return JourneyData{ + Nodes: finalNodes, + Links: finalLinks, + }, nil +} + +func (h UserJourneyQueryBuilder) buildQuery(p Payload) (string, error) { + // prepare event list filter + events := p.MetricValue + if len(events) == 0 { + events = []string{"LOCATION"} + } + vals := make([]string, len(events)) + for i, v := range events { + vals[i] = fmt.Sprintf("'%s'", v) + } + laterCond := fmt.Sprintf("e.\"$event_name\" IN (%s)", strings.Join(vals, ",")) + + // build start and exclude conditions + startConds, _ := buildEventConditions(p.StartPoint, BuildConditionsOptions{DefinedColumns: mainColumns, MainTableAlias: "e"}) + excludeConds, _ := buildEventConditions(p.Exclude, BuildConditionsOptions{DefinedColumns: mainColumns, MainTableAlias: "e"}) + + // quote properties column correctly + fixProps := func(conds []string) []string { + for i, c := range conds { + conds[i] = strings.ReplaceAll(c, "e.$properties", "e.\"$properties\"") + } + return conds + } + startConds = fixProps(startConds) + excludeConds = fixProps(excludeConds) + + // extract global filters and duration from first series + s := p.MetricPayload.Series[0] + var durationMin, durationMax int64 + var okMin, okMax bool + var err error + var globalFilters []Filter + for _, flt := range s.Filter.Filters { + if flt.Type == "duration" { + if len(flt.Value) > 0 && flt.Value[0] != "" { + durationMin, err = strconv.ParseInt(flt.Value[0], 10, 64) + if err != nil { + return "", err + } + okMin = true + } + if len(flt.Value) > 1 && flt.Value[1] != "" { + durationMax, err = strconv.ParseInt(flt.Value[1], 10, 64) + if err != nil { + return "", err + } + okMax = true + } + continue + } + if flt.IsEvent { + continue + } + globalFilters = append(globalFilters, flt) + } + globalConds, _ := buildEventConditions(globalFilters, BuildConditionsOptions{DefinedColumns: mainColumns, MainTableAlias: "e"}) + globalConds = fixProps(globalConds) + + // assemble duration condition + var durCond string + if okMin && okMax { + durCond = fmt.Sprintf("ss.duration BETWEEN %d AND %d", durationMin, durationMax) + } else if okMin { + durCond = fmt.Sprintf("ss.duration >= %d", durationMin) + } else if okMax { + durCond = fmt.Sprintf("ss.duration <= %d", durationMax) + } + + // determine starting event + var startEvent string + if len(p.StartPoint) > 0 { + startEvent = string(p.StartPoint[0].Type) + } else { + startEvent = events[0] + } + + // assemble first_hits WHERE clause with optional duration + firstBase := []string{fmt.Sprintf("e.\"$event_name\" = '%s'", startEvent)} + if len(startConds) > 0 { + firstBase = append(firstBase, startConds...) + } + if len(globalConds) > 0 { + firstBase = append(firstBase, globalConds...) + } + firstBase = append(firstBase, + fmt.Sprintf("e.project_id = %d", p.ProjectId), + "e.session_id IS NOT NULL", + fmt.Sprintf("e.created_at BETWEEN toDateTime('%s') AND toDateTime('%s')", + time.Unix(p.StartTimestamp/1000, 0).UTC().Format("2006-01-02 15:04:05"), + time.Unix(p.EndTimestamp/1000, 0).UTC().Format("2006-01-02 15:04:05"), + ), + ) + if durCond != "" { + firstBase = append(firstBase, durCond) + } + + // assemble journey WHERE clause + journeyBase := []string{laterCond} + if len(excludeConds) > 0 { + journeyBase = append(journeyBase, "NOT ("+strings.Join(excludeConds, " AND ")+")") + } + if len(globalConds) > 0 { + journeyBase = append(journeyBase, globalConds...) + } + journeyBase = append(journeyBase, + fmt.Sprintf("e.project_id = %d", p.ProjectId), + ) + + // format time bounds + startTime := time.Unix(p.StartTimestamp/1000, 0).UTC().Format("2006-01-02 15:04:05") + endTime := time.Unix(p.EndTimestamp/1000, 0).UTC().Format("2006-01-02 15:04:05") + + // set column limits + previousColumns := p.PreviousColumns + if previousColumns <= 0 { + previousColumns = 0 + } + maxCols := p.Columns + if maxCols > 0 { + maxCols++ + } + + // build final query + q := fmt.Sprintf(`WITH + first_hits AS ( + SELECT e.session_id, MIN(e.created_at) AS start_time + FROM product_analytics.events AS e + JOIN experimental.sessions AS ss USING(session_id) + WHERE %s + GROUP BY e.session_id + ), + journey_events_after AS ( + SELECT + e.session_id, + e.distinct_id, + e."$event_name" AS event_name, + e.created_at, + CASE + WHEN e."$event_name" = 'LOCATION' THEN JSONExtractString(toString(e."$properties"), 'url_path') + WHEN e."$event_name" = 'CLICK' THEN JSONExtractString(toString(e."$properties"), 'label') + WHEN e."$event_name" = 'INPUT' THEN JSONExtractString(toString(e."$properties"), 'label') + ELSE NULL + END AS event_property + FROM product_analytics.events AS e + JOIN first_hits AS f USING(session_id) + WHERE + e.created_at >= f.start_time + AND e.created_at <= toDateTime('%s') + AND %s + ), + journey_events_before AS ( + SELECT + e.session_id, + e.distinct_id, + e."$event_name" AS event_name, + e.created_at, + CASE + WHEN e."$event_name" = 'LOCATION' THEN JSONExtractString(toString(e."$properties"), 'url_path') + WHEN e."$event_name" = 'CLICK' THEN JSONExtractString(toString(e."$properties"), 'label') + WHEN e."$event_name" = 'INPUT' THEN JSONExtractString(toString(e."$properties"), 'label') + ELSE NULL + END AS event_property + FROM product_analytics.events AS e + JOIN first_hits AS f USING(session_id) + WHERE + e.created_at < f.start_time + AND e.created_at >= toDateTime('%s') + AND %s + AND %d > 0 + ), + journey_events_combined AS ( + SELECT *, 1 AS direction FROM journey_events_after + UNION ALL + SELECT *, -1 AS direction FROM journey_events_before + ), + event_with_prev AS ( + SELECT + session_id, + distinct_id, + event_name, + event_property, + created_at, + direction, + any(event_name) OVER (PARTITION BY session_id ORDER BY created_at ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) AS previous_event_name, + any(event_property) OVER (PARTITION BY session_id ORDER BY created_at ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) AS previous_event_property + FROM journey_events_combined + ), + staged AS ( + SELECT + *, + CASE + WHEN direction = 1 THEN toInt64(sumIf(1, true) OVER (PARTITION BY session_id, direction ORDER BY created_at ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) + WHEN direction = -1 THEN -1 * toInt64(sumIf(1, true) OVER (PARTITION BY session_id, direction ORDER BY created_at DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) + ELSE 0 + END AS stage + FROM event_with_prev + ) +SELECT + stage AS stage, + event_name AS current_event_name, + event_property AS current_event_property, + COALESCE(previous_event_name, '') AS previous_event_name, + COALESCE(previous_event_property, '') AS previous_event_property, + COUNT(DISTINCT session_id) AS sessions_count +FROM staged +WHERE stage <= %d AND stage >= -%d +GROUP BY + stage, + event_name, + event_property, + previous_event_name, + previous_event_property +ORDER BY stage, COUNT(DISTINCT session_id) DESC;`, + strings.Join(firstBase, " AND "), + endTime, + strings.Join(journeyBase, " AND "), + startTime, + strings.Join(journeyBase, " AND "), + previousColumns, + maxCols, + previousColumns, + ) + return q, nil +} diff --git a/backend/pkg/analytics/charts/model.go b/backend/pkg/analytics/charts/model.go index 81016c7e6..2318a5041 100644 --- a/backend/pkg/analytics/charts/model.go +++ b/backend/pkg/analytics/charts/model.go @@ -1,21 +1,184 @@ package charts -import "openreplay/backend/pkg/analytics/cards" +type Table string +type Column string +type MetricType string +type FilterType string +type EventType string +type EventOrder string + +const ( + TableEvents Table = "product_analytics.events" + TableSessions Table = "experimental.sessions" +) + +const ( + ColEventTime Column = "main.created_at" + ColEventName Column = "main.`$event_name`" + ColEventProjectID Column = "main.project_id" + ColEventProperties Column = "main.`$properties`" + ColEventSessionID Column = "main.session_id" + ColEventURLPath Column = "main.url_path" + ColEventStatus Column = "main.status" +) + +const ( + ColSessionID Column = "s.session_id" + ColDuration Column = "s.duration" + ColUserCountry Column = "s.user_country" + ColUserCity Column = "s.user_city" + ColUserState Column = "s.user_state" + ColUserID Column = "s.user_id" + ColUserAnonymousID Column = "s.user_anonymous_id" + ColUserOS Column = "s.user_os" + ColUserBrowser Column = "s.user_browser" + ColUserDevice Column = "s.user_device" + ColUserDeviceType Column = "s.user_device_type" + ColRevID Column = "s.rev_id" + ColBaseReferrer Column = "s.base_referrer" + ColUtmSource Column = "s.utm_source" + ColUtmMedium Column = "s.utm_medium" + ColUtmCampaign Column = "s.utm_campaign" + ColMetadata1 Column = "s.metadata_1" + ColSessionProjectID Column = "s.project_id" + ColSessionIsNotNull Column = "isNotNull(s.duration)" +) + +const ( + MetricTypeTimeseries MetricType = "timeseries" + MetricTypeTable MetricType = "table" + MetricTypeFunnel MetricType = "funnel" + MetricTypeHeatmap MetricType = "heatMap" + MetricTypeSession MetricType = "heatmaps_session" + MetricUserJourney MetricType = "pathAnalysis" +) + +const ( + EventOrderThen EventOrder = "then" + EventOrderOr EventOrder = "or" + EventOrderAnd EventOrder = "and" +) + +type MetricPayload struct { + StartTimestamp int64 `json:"startTimestamp"` + EndTimestamp int64 `json:"endTimestamp"` + Density int `json:"density"` + MetricOf string `json:"metricOf"` + MetricType MetricType `json:"metricType"` + MetricValue []string `json:"metricValue"` + MetricFormat string `json:"metricFormat"` + ViewType string `json:"viewType"` + Name string `json:"name"` + Series []Series `json:"series"` + Limit int `json:"limit"` + Page int `json:"page"` + StartPoint []Filter `json:"startPoint"` + Exclude []Filter `json:"excludes"` + Rows uint64 `json:"rows"` + Columns uint64 `json:"columns"` + PreviousColumns uint64 `json:"previousColumns"` +} + +type MetricOfTable string + +const ( + MetricOfTableLocation MetricOfTable = "LOCATION" // TOP Pages + MetricOfTableBrowser MetricOfTable = "userBrowser" + MetricOfTableReferrer MetricOfTable = "referrer" + MetricOfTableUserId MetricOfTable = "userId" + MetricOfTableCountry MetricOfTable = "userCountry" + MetricOfTableDevice MetricOfTable = "userDevice" + MetricOfTableFetch MetricOfTable = "FETCH" + + //MetricOfTableIssues MetricOfTable = "issues" + //MetricOfTableSessions MetricOfTable = "sessions" + //MetricOfTableErrors MetricOfTable = "errors" +) + +type FilterGroup struct { + Filters []Filter `json:"filters"` + EventsOrder EventOrder `json:"eventsOrder"` +} + +type Series struct { + Name string `json:"name"` + Filter FilterGroup `json:"filter"` +} + +type Filter struct { + Type FilterType `json:"type"` + IsEvent bool `json:"isEvent"` + Value []string `json:"value"` + Operator string `json:"operator"` + Source string `json:"source,omitempty"` + Filters []Filter `json:"filters"` +} + +const ( + FilterUserId FilterType = "userId" + FilterUserAnonymousId FilterType = "userAnonymousId" + FilterReferrer FilterType = "referrer" + FilterDuration FilterType = "duration" + FilterUtmSource FilterType = "utmSource" + FilterUtmMedium FilterType = "utmMedium" + FilterUtmCampaign FilterType = "utmCampaign" + FilterUserCountry FilterType = "userCountry" + FilterUserCity FilterType = "userCity" + FilterUserState FilterType = "userState" + FilterUserOs FilterType = "userOs" + FilterUserBrowser FilterType = "userBrowser" + FilterUserDevice FilterType = "userDevice" + FilterPlatform FilterType = "platform" + FilterRevId FilterType = "revId" + FilterIssue FilterType = "issue" + FilterMetadata FilterType = "metadata" +) + +// Event filters +const ( + FilterClick FilterType = "CLICK" + FilterInput FilterType = "INPUT" + FilterLocation FilterType = "LOCATION" + FilterTag FilterType = "tag" + FilterCustom FilterType = "customEvent" + FilterFetch FilterType = "fetch" + FilterFetchStatusCode FilterType = "fetchStatusCode" // Subfilter + FilterGraphQLRequest FilterType = "graphql" + FilterStateAction FilterType = "stateAction" + FilterError FilterType = "error" + FilterAvgCpuLoad FilterType = "avgCpuLoad" + FilterAvgMemoryUsage FilterType = "avgMemoryUsage" +) + +// MOBILE FILTERS +const ( + FilterUserOsIos FilterType = "userOsIos" + FilterUserDeviceIos FilterType = "userDeviceIos" + FilterUserCountryIos FilterType = "userCountryIos" + FilterUserIdIos FilterType = "userIdIos" + FilterUserAnonymousIdIos FilterType = "userAnonymousIdIos" + FilterRevIdIos FilterType = "revIdIos" +) + +const ( + OperatorStringIs = "is" + OperatorStringIsAny = "isAny" + OperatorStringOn = "on" + OperatorStringOnAny = "onAny" + OperatorStringIsNot = "isNot" + OperatorStringIsUndefined = "isUndefined" + OperatorStringNotOn = "notOn" + OperatorContains = "contains" + OperatorStringNotContains = "notContains" + OperatorStringStartsWith = "startsWith" + OperatorStringEndsWith = "endsWith" +) type DataPoint struct { - Timestamp int64 `json:"timestamp"` - Series map[string]int64 `json:"series"` + Timestamp uint64 `json:"timestamp"` + Count uint64 `json:"count"` } -type GetCardChartDataRequest struct { - MetricType string `json:"metricType" validate:"required,oneof=timeseries table funnel"` - MetricOf string `json:"metricOf" validate:"required,oneof=session_count user_count"` - ViewType string `json:"viewType" validate:"required,oneof=line_chart table_view"` - MetricFormat string `json:"metricFormat" validate:"required,oneof=default percentage"` - SessionID int64 `json:"sessionId"` - Series []cards.CardSeries `json:"series" validate:"required,dive"` -} - -type GetCardChartDataResponse struct { - Data []DataPoint `json:"data"` -} +//type TimeseriesResponse struct { +// Data []DataPoint `json:"data"` +//} diff --git a/backend/pkg/analytics/charts/query.go b/backend/pkg/analytics/charts/query.go new file mode 100644 index 000000000..f87abb07d --- /dev/null +++ b/backend/pkg/analytics/charts/query.go @@ -0,0 +1,497 @@ +package charts + +import ( + "fmt" + "log" + "openreplay/backend/pkg/analytics/db" + "strconv" + "strings" +) + +type Payload struct { + *MetricPayload + GroupByColumn string // TODO remove this field + ProjectId int + UserId uint64 +} + +type QueryBuilder interface { + Execute(p Payload, conn db.Connector) (interface{}, error) +} + +func NewQueryBuilder(p Payload) (QueryBuilder, error) { + switch p.MetricType { + case MetricTypeTimeseries: + return TimeSeriesQueryBuilder{}, nil + case MetricTypeFunnel: + return FunnelQueryBuilder{}, nil + case MetricTypeTable: + if p.MetricOf == "jsException" { + return TableErrorsQueryBuilder{}, nil + } + return TableQueryBuilder{}, nil + case MetricTypeHeatmap: + return HeatmapQueryBuilder{}, nil + case MetricTypeSession: + return HeatmapSessionQueryBuilder{}, nil + case MetricUserJourney: + return UserJourneyQueryBuilder{}, nil + default: + return nil, fmt.Errorf("unknown metric type: %s", p.MetricType) + } +} + +type BuildConditionsOptions struct { + MainTableAlias string + PropertiesColumnName string + DefinedColumns map[string]string +} + +var propertyKeyMap = map[string]filterConfig{ + "LOCATION": {LogicalProperty: "url_path"}, + "FETCH": {LogicalProperty: "url_path"}, + "REQUEST": {LogicalProperty: "url_path"}, + "CLICK": {LogicalProperty: "label"}, + "INPUT": {LogicalProperty: "label"}, + "fetchUrl": {LogicalProperty: "url_path"}, + "fetchStatusCode": {LogicalProperty: "status", IsNumeric: true}, + //"fetchDuration": {LogicalProperty: "duration", IsNumeric: true}, + //"ISSUE": {LogicalProperty: "issue_type"}, + // TODO add more mappings as needed +} + +// filterConfig holds configuration for a filter type +type filterConfig struct { + LogicalProperty string + IsNumeric bool +} + +func getColumnAccessor(logical string, isNumeric bool, opts BuildConditionsOptions) string { + // helper: wrap names starting with $ in quotes + quote := func(name string) string { + prefix := opts.MainTableAlias + "." + if strings.HasPrefix(name, prefix) { + suffix := strings.TrimPrefix(name, prefix) + if strings.HasPrefix(suffix, "$") { + return fmt.Sprintf("%s.\"%s\"", opts.MainTableAlias, suffix) + } + } + if strings.HasPrefix(name, "$") { + return fmt.Sprintf("\"%s\"", name) + } + return name + } + + // explicit column mapping + if col, ok := opts.DefinedColumns[logical]; ok { + col = quote(col) + if opts.MainTableAlias != "" { + if strings.Contains(col, ".") { + return fmt.Sprintf("%s", col) + } + return fmt.Sprintf("%s.%s", opts.MainTableAlias, col) + } + return col + } + + // determine property key + propKey := logical + if cfg, ok := propertyKeyMap[logical]; ok { + propKey = cfg.LogicalProperty + } + + // build properties column reference + colName := opts.PropertiesColumnName + if opts.MainTableAlias != "" { + colName = fmt.Sprintf("%s.%s", opts.MainTableAlias, colName) + } + colName = quote(colName) + + // JSON extraction + if isNumeric { + return fmt.Sprintf("JSONExtractFloat(toString(%s), '%s')", colName, propKey) + } + return fmt.Sprintf("JSONExtractString(toString(%s), '%s')", colName, propKey) +} + +// buildEventConditions builds SQL conditions and names from filters +func buildEventConditions(filters []Filter, options ...BuildConditionsOptions) (conds, names []string) { + opts := BuildConditionsOptions{ + MainTableAlias: "", + PropertiesColumnName: "$properties", + DefinedColumns: make(map[string]string), + } + if len(options) > 0 { + opt := options[0] + if opt.MainTableAlias != "" { + opts.MainTableAlias = opt.MainTableAlias + } + if opt.PropertiesColumnName != "" { + opts.PropertiesColumnName = opt.PropertiesColumnName + } + if opt.DefinedColumns != nil { + opts.DefinedColumns = opt.DefinedColumns + } + } + for _, f := range filters { + if f.Type == FilterDuration { + continue + } + + fConds, fNames := addFilter(f, opts) + if len(fConds) > 0 { + conds = append(conds, fConds...) + names = append(names, fNames...) + } + } + return +} + +// addFilter processes a single Filter and returns its SQL conditions and associated event names +func addFilter(f Filter, opts BuildConditionsOptions) (conds []string, names []string) { + var ftype = string(f.Type) + // resolve filter configuration, default if missing + cfg, ok := propertyKeyMap[ftype] + if !ok { + cfg = filterConfig{LogicalProperty: ftype, IsNumeric: false} + log.Printf("using default config for type: %v", f.Type) + } + acc := getColumnAccessor(cfg.LogicalProperty, cfg.IsNumeric, opts) + + // operator-based conditions + switch f.Operator { + case "isAny", "onAny": + if f.IsEvent { + names = append(names, ftype) + } + default: + if c := buildCond(acc, f.Value, f.Operator, cfg.IsNumeric); c != "" { + conds = append(conds, c) + if f.IsEvent { + names = append(names, ftype) + } + } + } + + // nested sub-filters + if len(f.Filters) > 0 { + subConds, subNames := buildEventConditions(f.Filters, opts) + if len(subConds) > 0 { + conds = append(conds, strings.Join(subConds, " AND ")) + names = append(names, subNames...) + } + } + + return +} + +var compOps = map[string]string{ + "equals": "=", "is": "=", "on": "=", + "notEquals": "<>", "not": "<>", "off": "<>", + "greaterThan": ">", "gt": ">", + "greaterThanOrEqual": ">=", "gte": ">=", + "lessThan": "<", "lt": "<", + "lessThanOrEqual": "<=", "lte": "<=", +} + +// buildCond constructs a condition string based on operator and values +func buildCond(expr string, values []string, operator string, isNumeric bool) string { + if len(values) == 0 { + return "" + } + switch operator { + case "contains": + // wrap values with % on both sides + wrapped := make([]string, len(values)) + for i, v := range values { + wrapped[i] = fmt.Sprintf("%%%s%%", v) + } + return multiValCond(expr, wrapped, "%s ILIKE %s", false) + case "notContains": + wrapped := make([]string, len(values)) + for i, v := range values { + wrapped[i] = fmt.Sprintf("%%%s%%", v) + } + cond := multiValCond(expr, wrapped, "%s ILIKE %s", false) + return "NOT (" + cond + ")" + case "startsWith": + wrapped := make([]string, len(values)) + for i, v := range values { + wrapped[i] = v + "%" + } + return multiValCond(expr, wrapped, "%s ILIKE %s", false) + case "endsWith": + wrapped := make([]string, len(values)) + for i, v := range values { + wrapped[i] = "%" + v + } + return multiValCond(expr, wrapped, "%s ILIKE %s", false) + case "regex": + // build match expressions + var parts []string + for _, v := range values { + parts = append(parts, fmt.Sprintf("match(%s, '%s')", expr, v)) + } + if len(parts) > 1 { + return "(" + strings.Join(parts, " OR ") + ")" + } + return parts[0] + case "in", "notIn": + neg := operator == "notIn" + return inClause(expr, values, neg, isNumeric) + case ">=", ">", "<=", "<": + return multiValCond(expr, values, "%s "+operator+" %s", isNumeric) + default: + if op, ok := compOps[operator]; ok { + tmpl := "%s " + op + " %s" + return multiValCond(expr, values, tmpl, isNumeric) + } + // fallback equals + tmpl := "%s = %s" + return multiValCond(expr, values, tmpl, isNumeric) + } +} + +// formatCondition applies a template to a single value, handling quoting +func formatCondition(expr, tmpl, value string, isNumeric bool) string { + val := value + if !isNumeric { + val = fmt.Sprintf("'%s'", value) + } + return fmt.Sprintf(tmpl, expr, val) +} + +// multiValCond applies a template to one or multiple values, using formatCondition +func multiValCond(expr string, values []string, tmpl string, isNumeric bool) string { + if len(values) == 1 { + return formatCondition(expr, tmpl, values[0], isNumeric) + } + parts := make([]string, len(values)) + for i, v := range values { + parts[i] = formatCondition(expr, tmpl, v, isNumeric) + } + return "(" + strings.Join(parts, " OR ") + ")" +} + +// inClause constructs IN/NOT IN clauses with proper quoting +func inClause(expr string, values []string, negate, isNumeric bool) string { + op := "IN" + if negate { + op = "NOT IN" + } + + if len(values) == 1 { + return fmt.Sprintf("%s %s (%s)", expr, op, func() string { + if isNumeric { + return values[0] + } + return fmt.Sprintf("'%s'", values[0]) + }()) + } + quoted := make([]string, len(values)) + for i, v := range values { + if isNumeric { + quoted[i] = v + } else { + quoted[i] = fmt.Sprintf("'%s'", v) + } + } + return fmt.Sprintf("%s %s (%s)", expr, op, strings.Join(quoted, ", ")) +} + +func buildSessionConditions(filters []Filter) []string { + var conds []string + + return conds +} + +func buildInClause(values []string) string { + var quoted []string + for _, v := range values { + quoted = append(quoted, fmt.Sprintf("'%s'", v)) + } + return strings.Join(quoted, ",") +} + +func buildStaticEventWhere(p Payload) string { + return strings.Join([]string{ + fmt.Sprintf("main.project_id = %d", p.ProjectId), + fmt.Sprintf("main.created_at >= toDateTime(%d / 1000)", p.StartTimestamp), + fmt.Sprintf("main.created_at <= toDateTime(%d / 1000)", p.EndTimestamp), + }, " AND ") +} + +func buildStaticSessionWhere(p Payload, sessionConds []string) (string, string) { + static := []string{fmt.Sprintf("s.project_id = %d", p.ProjectId)} + sessWhere := strings.Join(static, " AND ") + if len(sessionConds) > 0 { + sessWhere += " AND " + strings.Join(sessionConds, " AND ") + } + sessJoin := strings.Join(append(static, append(sessionConds, + fmt.Sprintf("s.datetime >= toDateTime(%d / 1000)", p.StartTimestamp), + fmt.Sprintf("s.datetime <= toDateTime(%d / 1000)", p.EndTimestamp))...), " AND ") + return sessWhere, sessJoin +} + +func buildHavingClause(conds []string) string { + seqConds := append([]string{}, conds...) + if len(seqConds) == 1 { + seqConds = append(seqConds, "1") + } + if len(seqConds) == 0 { + return "" + } + var parts []string + for i := range seqConds { + parts = append(parts, fmt.Sprintf("(?%d)", i+1)) + } + pattern := strings.Join(parts, "") + args := []string{"toDateTime(main.created_at)"} + args = append(args, seqConds...) + return fmt.Sprintf("HAVING sequenceMatch('%s')(%s)) AS f", pattern, strings.Join(args, ",\n ")) +} + +func contains(slice []string, s string) bool { + for _, v := range slice { + if v == s { + return true + } + } + return false +} + +func FillMissingDataPoints( + startTime, endTime int64, + density int, + neutral DataPoint, + rows []DataPoint, + timeCoefficient int64, +) []DataPoint { + if density <= 1 { + return rows + } + + stepSize := uint64(getStepSize(startTime, endTime, density, false, 1000)) + bucketSize := stepSize * uint64(timeCoefficient) + + lookup := make(map[uint64]DataPoint) + for _, dp := range rows { + if dp.Timestamp < uint64(startTime) { + continue + } + bucket := uint64(startTime) + (((dp.Timestamp - uint64(startTime)) / bucketSize) * bucketSize) + lookup[bucket] = dp + } + + results := make([]DataPoint, 0, density) + for i := 0; i < density; i++ { + ts := uint64(startTime) + uint64(i)*bucketSize + if dp, ok := lookup[ts]; ok { + results = append(results, dp) + } else { + nd := neutral + nd.Timestamp = ts + results = append(results, nd) + } + } + return results +} + +func partitionFilters(filters []Filter) (sessionFilters []Filter, eventFilters []Filter) { + for _, f := range filters { + if f.IsEvent { + eventFilters = append(eventFilters, f) + } else { + sessionFilters = append(sessionFilters, f) + } + } + return +} + +// Returns a map: logical property -> CTE alias (e.g., "userBrowser" -> "userBrowser") +func cteColumnAliases() map[string]string { + aliases := make(map[string]string) + for logical := range mainColumns { + aliases[logical] = logical + } + return aliases +} + +// Returns a map: logical property -> source column (e.g., "userBrowser" -> "$browser") +func cteSourceColumns() map[string]string { + cols := make(map[string]string) + for logical, col := range mainColumns { + cols[logical] = col + } + return cols +} + +// Helper for reverse lookup (used for dynamic SELECT) +func reverseLookup(m map[string]string, value string) string { + for k, v := range m { + if v == value { + return k + } + } + return "" +} + +func eventNameCondition(table, metricOf string) string { + if table == "" { + table = "main" + } + switch metricOf { + case string(MetricOfTableFetch): + return fmt.Sprintf("%s.`$event_name` = 'REQUEST'", table) + case string(MetricOfTableLocation): + return fmt.Sprintf("%s.`$event_name` = 'LOCATION'", table) + default: + return "" + } +} + +func buildDurationWhere(filters []Filter) ([]string, []Filter) { + var conds []string + var rest []Filter + for _, f := range filters { + if string(f.Type) == "duration" { + v := f.Value + if len(v) == 1 { + if v[0] != "" { + if d, err := strconv.ParseInt(v[0], 10, 64); err == nil { + conds = append(conds, fmt.Sprintf("sessions.duration >= %d", d)) + } + } + } else if len(v) >= 2 { + if v[0] != "" { + if d, err := strconv.ParseInt(v[0], 10, 64); err == nil { + conds = append(conds, fmt.Sprintf("sessions.duration >= %d", d)) + } + } + if v[1] != "" { + if d, err := strconv.ParseInt(v[1], 10, 64); err == nil { + conds = append(conds, fmt.Sprintf("sessions.duration <= %d", d)) + } + } + } + } else { + rest = append(rest, f) + } + } + return conds, rest +} + +func filterOutTypes(filters []Filter, typesToRemove []FilterType) (kept []Filter, removed []Filter) { + removeMap := make(map[FilterType]struct{}, len(typesToRemove)) + for _, t := range typesToRemove { + removeMap[t] = struct{}{} + } + for _, f := range filters { + if _, ok := removeMap[f.Type]; ok { + removed = append(removed, f) + } else { + kept = append(kept, f) + } + } + return +} diff --git a/backend/pkg/analytics/dashboards/dashboards.go b/backend/pkg/analytics/dashboards/dashboards.go index cb260f572..057c9c830 100644 --- a/backend/pkg/analytics/dashboards/dashboards.go +++ b/backend/pkg/analytics/dashboards/dashboards.go @@ -1,7 +1,6 @@ package dashboards import ( - "context" "encoding/json" "errors" "fmt" @@ -336,15 +335,14 @@ func (s *dashboardsImpl) AddCards(projectId int, dashboardId int, userId uint64, return fmt.Errorf("failed to start transaction: %w", err) } - ctx := context.Background() defer func() { if err != nil { - tx.Rollback(ctx) + err := tx.TxRollback() if err != nil { return } } else { - err := tx.Commit(ctx) + err := tx.TxCommit() if err != nil { return } @@ -356,7 +354,7 @@ func (s *dashboardsImpl) AddCards(projectId int, dashboardId int, userId uint64, for _, metricID := range req.MetricIDs { // Check if the widget already exists var exists bool - err := tx.QueryRow(ctx, ` + err := tx.TxQueryRow(` SELECT EXISTS ( SELECT 1 FROM public.dashboard_widgets WHERE dashboard_id = $1 AND metric_id = $2 @@ -371,10 +369,9 @@ func (s *dashboardsImpl) AddCards(projectId int, dashboardId int, userId uint64, } // Insert new widget - _, err = tx.Exec(ctx, ` - INSERT INTO public.dashboard_widgets (dashboard_id, metric_id, user_id, config) - VALUES ($1, $2, $3, $4) - `, dashboardId, metricID, userId, req.Config) + query := `INSERT INTO public.dashboard_widgets (dashboard_id, metric_id, user_id, config) + VALUES ($1, $2, $3, $4)` + err = tx.TxExec(query, dashboardId, metricID, userId, req.Config) if err != nil { return fmt.Errorf("failed to insert widget: %w", err) } @@ -382,7 +379,7 @@ func (s *dashboardsImpl) AddCards(projectId int, dashboardId int, userId uint64, } // Commit transaction - if err := tx.Commit(ctx); err != nil { + if err := tx.TxCommit(); err != nil { return fmt.Errorf("failed to commit transaction: %w", err) } diff --git a/backend/pkg/analytics/db/connector.go b/backend/pkg/analytics/db/connector.go new file mode 100644 index 000000000..45983ee16 --- /dev/null +++ b/backend/pkg/analytics/db/connector.go @@ -0,0 +1,86 @@ +package db + +import ( + "context" + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "openreplay/backend/internal/config/common" + "time" +) + +type TableValue struct { + Name string `json:"name"` + Total uint64 `json:"total"` +} + +type TableResponse struct { + Total uint64 `json:"total"` + Count uint64 `json:"count"` + Values []TableValue `json:"values"` +} + +type Connector interface { + Stop() error + Query(query string) (driver.Rows, error) + QueryRow(query string) (driver.Row, error) + QueryArgs(query string, args map[string]interface{}) (driver.Rows, error) +} + +type connectorImpl struct { + conn driver.Conn +} + +func NewConnector(cfg common.Clickhouse) (Connector, error) { + conn, err := clickhouse.Open(&clickhouse.Options{ + Addr: []string{cfg.GetTrimmedURL()}, + Auth: clickhouse.Auth{ + Database: cfg.Database, + Username: cfg.LegacyUserName, + Password: cfg.LegacyPassword, + }, + MaxOpenConns: 20, + MaxIdleConns: 15, + ConnMaxLifetime: 3 * time.Minute, + Compression: &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }, + }) + if err != nil { + return nil, err + } + return &connectorImpl{conn: conn}, nil +} + +func (c *connectorImpl) Stop() error { + return c.conn.Close() +} + +func (c *connectorImpl) Query(query string) (driver.Rows, error) { + rows, err := c.conn.Query(context.Background(), query) + if err != nil { + return nil, err + } + //defer rows.Close() + + return rows, nil +} + +func (c *connectorImpl) QueryRow(query string) (driver.Row, error) { + row := c.conn.QueryRow(context.Background(), query) + if err := row.Err(); err != nil { + return nil, err + } + //defer row.Close() + + return row, nil +} + +func (c *connectorImpl) QueryArgs(query string, args map[string]interface{}) (driver.Rows, error) { + rows, err := c.conn.Query(context.Background(), query, args) + if err != nil { + return nil, err + } + //defer rows.Close() + + return rows, nil +} diff --git a/backend/pkg/analytics/query/chartQuery.go b/backend/pkg/analytics/query/chartQuery.go new file mode 100644 index 000000000..a9c69e0c0 --- /dev/null +++ b/backend/pkg/analytics/query/chartQuery.go @@ -0,0 +1,654 @@ +package query + +//package main +// +//import ( +// "fmt" +// "strings" +//) +// +////func main() { +//// var r Root +//// //err := json.Unmarshal([]byte(jsonInput), &r) +//// //if err != nil { +//// // panic(err) +//// //} +//// // +//// ////fmt.Println("ARGS:", r) +//// //fmt.Println(buildQuery(r)) +//// ////fmt.Println("QUERY PART:", qp) +//// +//// builder := NewQueryBuilder() +//// query, err := builder.BuildQuery(r) +//// if err != nil { +//// fmt.Println("ERROR:", err) +//// } +//// +//// fmt.Println(query) +////} +// +//type Table string +//type Column string +//type FilterType string +//type EventOrder string +//type FetchFilterType string +// +//const ( +// UserOs FilterType = "userOs" +// UserBrowser FilterType = "userBrowser" +// UserDevice FilterType = "userDevice" +// UserCountry FilterType = "userCountry" +// UserCity FilterType = "userCity" +// UserState FilterType = "userState" +// UserId FilterType = "userId" +// UserAnonymousId FilterType = "userAnonymousId" +// Referrer FilterType = "referrer" +// RevId FilterType = "revId" +// UserOsIos FilterType = "userOsIos" +// UserDeviceIos FilterType = "userDeviceIos" +// UserCountryIos FilterType = "userCountryIos" +// UserIdIos FilterType = "userIdIos" +// UserAnonymousIdIos FilterType = "userAnonymousIdIos" +// RevIdIos FilterType = "revIdIos" +// Duration FilterType = "duration" +// Platform FilterType = "platform" +// Metadata FilterType = "metadata" +// Issue FilterType = "issue" +// EventsCount FilterType = "eventsCount" +// UtmSource FilterType = "utmSource" +// UtmMedium FilterType = "utmMedium" +// UtmCampaign FilterType = "utmCampaign" +// ThermalState FilterType = "thermalState" +// MainThreadCPU FilterType = "mainThreadCPU" +// ViewComponent FilterType = "viewComponent" +// LogEvent FilterType = "logEvent" +// ClickEvent FilterType = "clickEvent" +// MemoryUsage FilterType = "memoryUsage" +//) +// +//const ( +// Click FilterType = "click" +// Input FilterType = "input" +// Location FilterType = "location" +// Custom FilterType = "custom" +// Request FilterType = "request" +// Fetch FilterType = "fetch" +// GraphQL FilterType = "graphql" +// StateAction FilterType = "stateAction" +// Error FilterType = "error" +// Tag FilterType = "tag" +// ClickMobile FilterType = "clickMobile" +// InputMobile FilterType = "inputMobile" +// ViewMobile FilterType = "viewMobile" +// CustomMobile FilterType = "customMobile" +// RequestMobile FilterType = "requestMobile" +// ErrorMobile FilterType = "errorMobile" +// SwipeMobile FilterType = "swipeMobile" +//) +// +//const ( +// EventOrderThen EventOrder = "then" +// EventOrderOr EventOrder = "or" +// EventOrderAnd EventOrder = "and" +//) +// +//const ( +// FetchFilterTypeFetchUrl FilterType = "fetchUrl" +// FetchFilterTypeFetchStatusCode FilterType = "fetchStatusCode" +// FetchFilterTypeFetchMethod FilterType = "fetchMethod" +// FetchFilterTypeFetchDuration FilterType = "fetchDuration" +// FetchFilterTypeFetchRequestBody FilterType = "fetchRequestBody" +// FetchFilterTypeFetchResponseBody FilterType = "fetchResponseBody" +//) +// +//const ( +// OperatorStringIs = "is" +// OperatorStringIsAny = "isAny" +// OperatorStringOn = "on" +// OperatorStringOnAny = "onAny" +// OperatorStringIsNot = "isNot" +// OperatorStringIsUndefined = "isUndefined" +// OperatorStringNotOn = "notOn" +// OperatorStringContains = "contains" +// OperatorStringNotContains = "notContains" +// OperatorStringStartsWith = "startsWith" +// OperatorStringEndsWith = "endsWith" +//) +// +//const ( +// OperatorMathEq = "=" +// OperatorMathLt = "<" +// OperatorMathGt = ">" +// OperatorMathLe = "<=" +// OperatorMathGe = ">=" +//) +// +////-------------------------------------------------- +//// Constants for columns, tables, etc. +////-------------------------------------------------- +// +//const ( +// TableEvents Table = "product_analytics.events" +// TableSessions Table = "experimental.sessions" +// +// ColEventTime Column = "main.created_at" +// ColEventName Column = "main.`$event_name`" +// ColEventProjectID Column = "main.project_id" +// ColEventProperties Column = "main.`$properties`" +// ColEventSessionID Column = "main.session_id" +// ColEventURLPath Column = "main.url_path" +// ColEventStatus Column = "main.status" +// +// ColSessionID Column = "s.session_id" +// ColDuration Column = "s.duration" +// ColUserCountry Column = "s.user_country" +// ColUserCity Column = "s.user_city" +// ColUserState Column = "s.user_state" +// ColUserID Column = "s.user_id" +// ColUserAnonymousID Column = "s.user_anonymous_id" +// ColUserOS Column = "s.user_os" +// ColUserBrowser Column = "s.user_browser" +// ColUserDevice Column = "s.user_device" +// ColUserDeviceType Column = "s.user_device_type" +// ColRevID Column = "s.rev_id" +// ColBaseReferrer Column = "s.base_referrer" +// ColUtmSource Column = "s.utm_source" +// ColUtmMedium Column = "s.utm_medium" +// ColUtmCampaign Column = "s.utm_campaign" +// ColMetadata1 Column = "s.metadata_1" +// ColSessionProjectID Column = "s.project_id" +// ColSessionIsNotNull Column = "isNotNull(s.duration)" +//) +// +//type Root struct { +// StartTimestamp int64 `json:"startTimestamp"` +// EndTimestamp int64 `json:"endTimestamp"` +// Series []Series `json:"series"` +// ProjectID int64 `json:"projectId"` +//} +// +//type Series struct { +// SeriesID int64 `json:"seriesId"` +// Name string `json:"name"` +// Filter SeriesFilter `json:"filter"` +//} +// +//type SeriesFilter struct { +// Filters []FilterObj `json:"filters"` +// EventsOrder EventOrder `json:"eventsOrder"` +//} +// +//type FilterObj struct { +// Key string `json:"key"` +// Type FilterType `json:"type"` +// IsEvent bool `json:"isEvent"` +// Value []string `json:"value"` +// Operator string `json:"operator"` +// Source string `json:"source"` +// Filters []FilterObj `json:"filters"` +//} +// +//// -------------------------------------------------- +//func buildQuery(r Root) string { +// s := r.Series[0] +// +// // iterate over series and partition filters +// //for _, s := range r.Series { +// // sessionFilters, eventFilters := partitionFilters(s.Filter.Filters) +// // sessionWhere := buildSessionWhere(sessionFilters) +// // eventWhere, seqHaving := buildEventsWhere(eventFilters, s.Filter.EventsOrder) +// // fmt.Println("SESSION FILTERS:", sessionFilters) +// // fmt.Println("EVENT FILTERS:", eventFilters) +// // fmt.Println("SESSION WHERE:", sessionWhere) +// // fmt.Println("EVENT WHERE:", eventWhere) +// // fmt.Println("SEQ HAVING:", seqHaving) +// //} +// +// sessionFilters, eventFilters := partitionFilters(s.Filter.Filters) +// sessionWhere := buildSessionWhere(sessionFilters) +// eventWhere, seqHaving := buildEventsWhere(eventFilters, s.Filter.EventsOrder) +// +// subQuery := fmt.Sprintf( +// "SELECT %s,\n"+ +// " MIN(%s) AS first_event_ts,\n"+ +// " MAX(%s) AS last_event_ts\n"+ +// "FROM %s AS main\n"+ +// "WHERE main.project_id = %%(project_id)s\n"+ +// " AND %s >= toDateTime(%%(start_time)s/1000)\n"+ +// " AND %s <= toDateTime(%%(end_time)s/1000)\n"+ +// " AND (%s)\n"+ +// "GROUP BY %s\n"+ +// "HAVING %s", +// ColEventSessionID, +// ColEventTime, +// ColEventTime, +// TableEvents, +// ColEventTime, +// ColEventTime, +// strings.Join(eventWhere, " OR "), +// ColEventSessionID, +// seqHaving, +// ) +// +// joinQuery := fmt.Sprintf( +// "SELECT *\n"+ +// "FROM %s AS s\n"+ +// "INNER JOIN (\n"+ +// " SELECT DISTINCT ev.session_id, ev.`$current_url` AS url_path\n"+ +// " FROM %s AS ev\n"+ +// " WHERE ev.created_at >= toDateTime(%%(start_time)s/1000)\n"+ +// " AND ev.created_at <= toDateTime(%%(end_time)s/1000)\n"+ +// " AND ev.project_id = %%(project_id)s\n"+ +// " AND ev.`$event_name` = 'LOCATION'\n"+ +// ") AS extra_event USING (session_id)\n"+ +// "WHERE s.project_id = %%(project_id)s\n"+ +// " AND %s\n"+ +// " AND s.datetime >= toDateTime(%%(start_time)s/1000)\n"+ +// " AND s.datetime <= toDateTime(%%(end_time)s/1000)\n", +// TableSessions, +// TableEvents, +// ColSessionIsNotNull, +// ) +// +// if len(sessionWhere) > 0 { +// joinQuery += " AND " + strings.Join(sessionWhere, " AND ") + "\n" +// } +// +// main := fmt.Sprintf( +// "SELECT s.session_id AS session_id, s.url_path\n"+ +// "FROM (\n%s\n) AS f\n"+ +// "INNER JOIN (\n%s) AS s\n"+ +// " ON (s.session_id = f.session_id)\n", +// subQuery, +// joinQuery, +// ) +// +// final := fmt.Sprintf( +// "SELECT COUNT(DISTINCT url_path) OVER () AS main_count,\n"+ +// " url_path AS name,\n"+ +// " COUNT(DISTINCT session_id) AS total,\n"+ +// " COALESCE(SUM(COUNT(DISTINCT session_id)) OVER (), 0) AS total_count\n"+ +// "FROM (\n%s) AS filtered_sessions\n"+ +// "GROUP BY url_path\n"+ +// "ORDER BY total DESC\n"+ +// "LIMIT 200 OFFSET 0;", +// main, +// ) +// +// return final +//} +// +//func partitionFilters(filters []FilterObj) (sessionFilters, eventFilters []FilterObj) { +// for _, f := range filters { +// if f.IsEvent { +// eventFilters = append(eventFilters, f) +// } else { +// sessionFilters = append(sessionFilters, f) +// } +// } +// return +//} +// +//func buildSessionWhere(filters []FilterObj) []string { +// var conds []string +// for _, f := range filters { +// switch f.Type { +// case UserCountry: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCountry, concatValues(f.Value))) +// case UserCity: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCity, concatValues(f.Value))) +// case UserState: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserState, concatValues(f.Value))) +// case UserId: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserID, concatValues(f.Value))) +// case UserAnonymousId: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserAnonymousID, concatValues(f.Value))) +// case UserOs: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserOS, concatValues(f.Value))) +// case UserBrowser: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserBrowser, concatValues(f.Value))) +// case UserDevice: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDevice, concatValues(f.Value))) +// case Platform: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDeviceType, concatValues(f.Value))) +// case RevId: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColRevID, concatValues(f.Value))) +// case Referrer: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColBaseReferrer, concatValues(f.Value))) +// case Duration: +// if len(f.Value) == 2 { +// conds = append(conds, fmt.Sprintf("%s >= '%s'", ColDuration, f.Value[0])) +// conds = append(conds, fmt.Sprintf("%s <= '%s'", ColDuration, f.Value[1])) +// } +// case UtmSource: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmSource, concatValues(f.Value))) +// case UtmMedium: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmMedium, concatValues(f.Value))) +// case UtmCampaign: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmCampaign, concatValues(f.Value))) +// case Metadata: +// conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColMetadata1, concatValues(f.Value))) +// } +// } +// // add /n to each condition +// for i := range conds { +// conds[i] += "\n" +// } +// return conds +//} +// +//func parseOperator(op string) string { +// switch strings.ToLower(op) { +// case OperatorStringContains: +// return OperatorMathEq // interpret as "LIKE" if needed +// case OperatorStringIs, OperatorStringOn, "=", OperatorStringOnAny: +// return OperatorMathEq +// case OperatorStringStartsWith: +// // might interpret differently in real impl +// return OperatorMathEq +// case OperatorStringEndsWith: +// // might interpret differently in real impl +// return OperatorMathEq +// default: +// return OperatorMathEq +// } +//} +// +//func buildEventsWhere(filters []FilterObj, order EventOrder) (eventConditions []string, having string) { +// basicEventTypes := "(" + +// strings.Join([]string{ +// fmt.Sprintf("%s = 'CLICK'", ColEventName), +// fmt.Sprintf("%s = 'INPUT'", ColEventName), +// fmt.Sprintf("%s = 'LOCATION'", ColEventName), +// fmt.Sprintf("%s = 'CUSTOM'", ColEventName), +// fmt.Sprintf("%s = 'REQUEST'", ColEventName), +// }, " OR ") + ")" +// +// var seq []string +// for _, f := range filters { +// switch f.Type { +// case Click: +// seq = append(seq, seqCond("CLICK", "selector", f)) +// case Input: +// seq = append(seq, seqCond("INPUT", "label", f)) +// case Location: +// seq = append(seq, seqCond("LOCATION", "url_path", f)) +// case Custom: +// seq = append(seq, seqCond("CUSTOM", "name", f)) +// case Fetch: +// seq = append(seq, seqFetchCond("REQUEST", f)) +// case FetchFilterTypeFetchStatusCode: +// seq = append(seq, seqCond("REQUEST", "status", f)) +// default: +// seq = append(seq, fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(string(f.Type)))) +// } +// } +// eventConditions = []string{basicEventTypes} +// +// // then => sequenceMatch +// // or => OR +// // and => AND +// switch order { +// case EventOrderThen: +// var pattern []string +// for i := range seq { +// pattern = append(pattern, fmt.Sprintf("(?%d)", i+1)) +// } +// having = fmt.Sprintf("sequenceMatch('%s')(\n%s,\n%s)", +// strings.Join(pattern, ""), fmt.Sprintf("toUnixTimestamp(%s)", ColEventTime), strings.Join(seq, ",\n")) +// case EventOrderAnd: +// // build AND +// having = strings.Join(seq, " AND ") +// default: +// // default => OR +// var orParts []string +// for _, p := range seq { +// orParts = append(orParts, "("+p+")") +// } +// having = strings.Join(orParts, " OR ") +// } +// return +//} +// +//func seqCond(eventName, key string, f FilterObj) string { +// op := parseOperator(f.Operator) +// return fmt.Sprintf("(%s = '%s' AND JSONExtractString(toString(%s), '%s') %s '%s')", +// ColEventName, strings.ToUpper(eventName), ColEventProperties, key, op, concatValues(f.Value)) +//} +// +//func seqFetchCond(eventName string, f FilterObj) string { +// w := []string{fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(eventName))} +// var extras []string +// for _, c := range f.Filters { +// switch c.Type { +// case Fetch: +// if len(c.Value) > 0 { +// extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventURLPath, concatValues(c.Value))) +// } +// case FetchFilterTypeFetchStatusCode: +// if len(c.Value) > 0 { +// extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventStatus, concatValues(c.Value))) +// } +// default: +// // placeholder if needed +// } +// } +// if len(extras) > 0 { +// w = append(w, strings.Join(extras, " AND ")) +// } +// return "(" + strings.Join(w, " AND ") + ")" +//} +// +//func concatValues(v []string) string { +// return strings.Join(v, "") +//} + +//const jsonInput = ` +//{ +// "startTimestamp": 1737043724664, +// "endTimestamp": 1737130124664, +// "projectId": 1, +// "series": [ +// { +// "seriesId": 610, +// "name": "Series 1", +// "filter": { +// "filters": [ +// { +// "type": "click", +// "isEvent": true, +// "value": ["DEPLOYMENT"], +// "operator": "on", +// "filters": [] +// }, +// { +// "type": "input", +// "isEvent": true, +// "value": ["a"], +// "operator": "contains", +// "filters": [] +// }, +// { +// "type": "location", +// "isEvent": true, +// "value": ["/en/using-or/"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "userCountry", +// "isEvent": false, +// "value": ["AD"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "userCity", +// "isEvent": false, +// "value": ["Mumbai"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "userState", +// "isEvent": false, +// "value": ["Maharashtra"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "userId", +// "isEvent": false, +// "value": ["test@test.com"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "userAnonymousId", +// "isEvent": false, +// "value": ["asd"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "userOs", +// "isEvent": false, +// "value": ["Mac OS X"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "userBrowser", +// "isEvent": false, +// "value": ["Chrome"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "userDevice", +// "isEvent": false, +// "value": ["iPhone"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "platform", +// "isEvent": false, +// "value": ["desktop"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "revId", +// "isEvent": false, +// "value": ["v1"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "referrer", +// "isEvent": false, +// "value": ["https://www.google.com/"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "duration", +// "isEvent": false, +// "value": ["60000", "6000000"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "utmSource", +// "isEvent": false, +// "value": ["aaa"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "utmMedium", +// "isEvent": false, +// "value": ["aa"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "utmCampaign", +// "isEvent": false, +// "value": ["aaa"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "metadata", +// "isEvent": false, +// "value": ["bbbb"], +// "operator": "is", +// "source": "userId", +// "filters": [] +// }, +// { +// "type": "custom", +// "isEvent": true, +// "value": ["test"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "fetch", +// "isEvent": true, +// "value": [], +// "operator": "is", +// "filters": [ +// { +// "type": "fetchUrl", +// "isEvent": false, +// "value": ["/ai/docs/chat"], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "fetchStatusCode", +// "isEvent": false, +// "value": ["400"], +// "operator": "=", +// "filters": [] +// }, +// { +// "type": "fetchMethod", +// "isEvent": false, +// "value": [], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "fetchDuration", +// "isEvent": false, +// "value": [], +// "operator": "=", +// "filters": [] +// }, +// { +// "type": "fetchRequestBody", +// "isEvent": false, +// "value": [], +// "operator": "is", +// "filters": [] +// }, +// { +// "type": "fetchResponseBody", +// "isEvent": false, +// "value": [], +// "operator": "is", +// "filters": [] +// } +// ] +// } +// ], +// "eventsOrder": "then" +// } +// } +// ] +//} +//` diff --git a/backend/pkg/analytics/query/funnel.go b/backend/pkg/analytics/query/funnel.go new file mode 100644 index 000000000..c8632b401 --- /dev/null +++ b/backend/pkg/analytics/query/funnel.go @@ -0,0 +1,7 @@ +package query + +type FunnelQueryBuilder struct{} + +func (f FunnelQueryBuilder) Build(p MetricPayload) string { + return "-- Funnel query placeholder" +} diff --git a/backend/pkg/analytics/query/model.go b/backend/pkg/analytics/query/model.go new file mode 100644 index 000000000..9a315e0db --- /dev/null +++ b/backend/pkg/analytics/query/model.go @@ -0,0 +1,137 @@ +package query + +type Table string +type Column string +type MetricType string +type FilterType string +type EventOrder string + +const ( + TableEvents Table = "product_analytics.events" + TableSessions Table = "experimental.sessions" +) + +const ( + ColEventTime Column = "main.created_at" + ColEventName Column = "main.`$event_name`" + ColEventProjectID Column = "main.project_id" + ColEventProperties Column = "main.`$properties`" + ColEventSessionID Column = "main.session_id" + ColEventURLPath Column = "main.url_path" + ColEventStatus Column = "main.status" +) + +const ( + ColSessionID Column = "s.session_id" + ColDuration Column = "s.duration" + ColUserCountry Column = "s.user_country" + ColUserCity Column = "s.user_city" + ColUserState Column = "s.user_state" + ColUserID Column = "s.user_id" + ColUserAnonymousID Column = "s.user_anonymous_id" + ColUserOS Column = "s.user_os" + ColUserBrowser Column = "s.user_browser" + ColUserDevice Column = "s.user_device" + ColUserDeviceType Column = "s.user_device_type" + ColRevID Column = "s.rev_id" + ColBaseReferrer Column = "s.base_referrer" + ColUtmSource Column = "s.utm_source" + ColUtmMedium Column = "s.utm_medium" + ColUtmCampaign Column = "s.utm_campaign" + ColMetadata1 Column = "s.metadata_1" + ColSessionProjectID Column = "s.project_id" + ColSessionIsNotNull Column = "isNotNull(s.duration)" +) + +const ( + MetricTypeTimeseries MetricType = "timeseries" + MetricTypeTable MetricType = "table" + MetricTypeFunnel MetricType = "funnel" +) + +const ( + EventOrderThen EventOrder = "then" + EventOrderOr EventOrder = "or" + EventOrderAnd EventOrder = "and" +) + +type MetricPayload struct { + StartTimestamp int64 `json:"startTimestamp"` + EndTimestamp int64 `json:"endTimestamp"` + Density int `json:"density"` + MetricOf string `json:"metricOf"` + MetricType MetricType `json:"metricType"` + MetricFormat string `json:"metricFormat"` + ViewType string `json:"viewType"` + Name string `json:"name"` + Series []Series `json:"series"` + CompareTo *string `json:"compareTo"` +} + +type Series struct { + Name string `json:"name"` + Filter struct { + Filters []Filter `json:"filters"` + EventsOrder EventOrder `json:"eventsOrder"` + } `json:"filter"` +} + +type Filter struct { + Type FilterType `json:"type"` + IsEvent bool `json:"isEvent"` + Value []string `json:"value"` + Operator string `json:"operator"` + Filters []Filter `json:"filters"` +} + +const ( + FilterUserOs FilterType = "userOs" + FilterUserBrowser FilterType = "userBrowser" + FilterUserDevice FilterType = "userDevice" + FilterUserCountry FilterType = "userCountry" + FilterUserCity FilterType = "userCity" + FilterUserState FilterType = "userState" + FilterUserId FilterType = "userId" + FilterUserAnonymousId FilterType = "userAnonymousId" + FilterReferrer FilterType = "referrer" + FilterRevId FilterType = "revId" + FilterUserOsIos FilterType = "userOsIos" + FilterUserDeviceIos FilterType = "userDeviceIos" + FilterUserCountryIos FilterType = "userCountryIos" + FilterUserIdIos FilterType = "userIdIos" + FilterUserAnonymousIdIos FilterType = "userAnonymousIdIos" + FilterRevIdIos FilterType = "revIdIos" + FilterDuration FilterType = "duration" + FilterPlatform FilterType = "platform" + FilterMetadata FilterType = "metadata" + FilterIssue FilterType = "issue" + FilterEventsCount FilterType = "eventsCount" + FilterUtmSource FilterType = "utmSource" + FilterUtmMedium FilterType = "utmMedium" + FilterUtmCampaign FilterType = "utmCampaign" + FilterThermalState FilterType = "thermalState" + FilterMainThreadCPU FilterType = "mainThreadCPU" + FilterViewComponent FilterType = "viewComponent" + FilterLogEvent FilterType = "logEvent" + FilterMemoryUsage FilterType = "memoryUsage" + FilterClick FilterType = "click" + FilterInput FilterType = "input" + FilterLocation FilterType = "location" + FilterCustom FilterType = "customEvent" + FilterFetch FilterType = "fetch" + FilterFetchStatusCode FilterType = "status" +) + +const ( + OperatorStringIs = "is" + OperatorStringIsAny = "isAny" + OperatorStringOn = "on" + OperatorStringOnAny = "onAny" + OperatorStringIsNot = "isNot" + OperatorStringIsUndefined = "isUndefined" + OperatorStringNotOn = "notOn" + OperatorStringContains = "contains" + OperatorStringNotContains = "notContains" + OperatorStringStartsWith = "startsWith" + OperatorStringEndsWith = "endsWith" +) diff --git a/backend/pkg/analytics/query/queryBuilder.go b/backend/pkg/analytics/query/queryBuilder.go new file mode 100644 index 000000000..b45359a04 --- /dev/null +++ b/backend/pkg/analytics/query/queryBuilder.go @@ -0,0 +1,253 @@ +package query + +import ( + "encoding/json" + "fmt" + "strings" +) + +type NewQueryBuilder interface { + Build(MetricPayload) string +} + +func buildEventSubquery(p MetricPayload) string { + baseEventsWhere := buildBaseEventsWhere(p) + sequenceCond := buildSequenceCondition(p.Series) + sessionsWhere := buildSessionsWhere(p) + + // If there's no sequence pattern, skip HAVING entirely. + if sequenceCond.seqPattern == "" { + return fmt.Sprintf(` +SELECT s.%[1]s AS %[1]s, + s.datetime AS datetime +FROM ( + SELECT main.session_id, + MIN(main.created_at) AS first_event_ts, + MAX(main.created_at) AS last_event_ts + FROM product_analytics.events AS main + WHERE %[2]s + GROUP BY session_id +) AS f +INNER JOIN ( + SELECT * + FROM experimental.sessions AS s + WHERE %[3]s +) AS s ON (s.session_id = f.session_id) +`, pickIDField(p), baseEventsWhere, sessionsWhere) + } + + return fmt.Sprintf(` +SELECT s.%[1]s AS %[1]s, + s.datetime AS datetime +FROM ( + SELECT main.session_id, + MIN(main.created_at) AS first_event_ts, + MAX(main.created_at) AS last_event_ts + FROM product_analytics.events AS main + WHERE %[2]s + GROUP BY session_id + HAVING sequenceMatch('%[3]s')(toDateTime(main.created_at), %[4]s) +) AS f +INNER JOIN ( + SELECT * + FROM experimental.sessions AS s + WHERE %[5]s +) AS s ON (s.session_id = f.session_id) +`, pickIDField(p), baseEventsWhere, sequenceCond.seqPattern, sequenceCond.seqEvents, sessionsWhere) +} + +func pickIDField(p MetricPayload) string { + if p.MetricOf == "userCount" { + return "user_id" + } + return "session_id" +} + +func buildBaseEventsWhere(p MetricPayload) string { + projectID := 5 + ts := fmt.Sprintf( + `(main.created_at >= toDateTime(%d / 1000) AND main.created_at <= toDateTime(%d / 1000))`, + p.StartTimestamp, + p.EndTimestamp, + ) + return fmt.Sprintf(`main.project_id = %d AND %s`, projectID, ts) +} + +func buildSessionsWhere(p MetricPayload) string { + projectID := 5 + ts := fmt.Sprintf( + `(s.datetime >= toDateTime(%d / 1000) AND s.datetime <= toDateTime(%d / 1000))`, + p.StartTimestamp, + p.EndTimestamp, + ) + return fmt.Sprintf(`s.project_id = %d AND isNotNull(s.duration) AND %s`, projectID, ts) +} + +type sequenceParts struct { + seqPattern string + seqEvents string +} + +func buildSequenceCondition(series []Series) sequenceParts { + var events []string + for _, s := range series { + if len(s.Filter.Filters) > 0 { + events = append(events, buildOneSeriesSequence(s.Filter.Filters)) + } + } + + if len(events) == 0 { + return sequenceParts{"", ""} + } + + // For n events, we need a pattern like `(?1)(?2)(?3)...( ?n )`. + pattern := "" + for i := 1; i <= len(events); i++ { + pattern += fmt.Sprintf("(?%d)", i) + } + + return sequenceParts{ + seqPattern: pattern, + seqEvents: strings.Join(events, ", "), + } +} + +func buildOneSeriesSequence(filters []Filter) string { + return strings.Join(buildFilterConditions(filters), " AND ") +} + +func buildFilterConditions(filters []Filter) []string { + var out []string + for _, f := range filters { + switch f.Type { + case FilterClick: + out = append(out, + fmt.Sprintf(`(main."$event_name" = 'CLICK' AND JSONExtractString(toString(main."$properties"), 'label') IN ('%s'))`, + strings.Join(f.Value, "','"))) + case FilterInput: + out = append(out, + fmt.Sprintf(`(main."$event_name" = 'INPUT' AND JSONExtractString(toString(main."$properties"), 'label') IN ('%s'))`, + strings.Join(f.Value, "','"))) + // TODO add more cases to cover all the events + default: + out = append(out, + fmt.Sprintf(`(main."$event_name" = '%s')`, strings.ToUpper(string(f.Type)))) + } + } + return out +} + +func main() { + //input := GetPayload(MetricTypeTimeseries) + input := GetPayload(MetricTypeTable) + + var payload MetricPayload + err := json.Unmarshal([]byte(input), &payload) + if err != nil { + return + } + + var qb NewQueryBuilder + switch payload.MetricType { + case MetricTypeTimeseries: + qb = TimeSeriesQueryBuilder{} + case MetricTypeFunnel: + qb = FunnelQueryBuilder{} + case MetricTypeTable: + qb = TableQueryBuilder{} + default: + qb = TimeSeriesQueryBuilder{} + } + + query := qb.Build(payload) + fmt.Println(query) +} + +func GetPayload(metricType MetricType) string { + switch metricType { + case MetricTypeTimeseries: + return `{ + "startTimestamp": 1738796399999, + "endTimestamp": 1739401199999, + "density": 7, + "metricOf": "sessionCount", + "metricValue": [], + "metricType": "timeseries", + "metricFormat": "sessionCount", + "viewType": "lineChart", + "name": "Untitled Trend", + "series": [ + { + "name": "Series 1", + "filter": { + "filters": [ + { + "type": "userId", + "isEvent": false, + "value": [ + "test@test.com" + ], + "operator": "is", + "filters": [] + } + ], + "eventsOrder": "then" + } + } + ] + }` + case MetricTypeFunnel: + return `{}` + case MetricTypeTable: + return `{ + "startTimestamp": 1737586800000, + "endTimestamp": 1738277999999, + "density": 7, + "metricOf": "userDevice", + "metricType": "table", + "metricFormat": "sessionCount", + "viewType": "table", + "name": "Untitled Trend", + "series": [ + { + "name": "Series 1", + "filter": { + "filters": [ + { + "type": "click", + "isEvent": true, + "value": ["Manuscripts"], + "operator": "on", + "filters": [] + } + ], + "eventsOrder": "then" + } + }, + { + "name": "Series 2", + "filter": { + "filters": [ + { + "type": "input", + "isEvent": true, + "value": ["test"], + "operator": "is", + "filters": [] + } + ], + "eventsOrder": "then" + } + } + ], + "page": 1, + "limit": 20, + "compareTo": null, + "config": { + "col": 2 + } + }` + default: + return `{}` + } +} diff --git a/backend/pkg/analytics/query/table.go b/backend/pkg/analytics/query/table.go new file mode 100644 index 000000000..616f25f65 --- /dev/null +++ b/backend/pkg/analytics/query/table.go @@ -0,0 +1,252 @@ +package query + +import ( + "fmt" + "strings" +) + +type TableQueryBuilder struct{} + +func (t TableQueryBuilder) Build(p MetricPayload) string { + return t.buildQuery(p) +} + +func (t TableQueryBuilder) buildQuery(r MetricPayload) string { + s := r.Series[0] + sessionFilters, eventFilters := partitionFilters(s.Filter.Filters) + sessionWhere := buildSessionWhere(sessionFilters) + eventWhere, seqHaving := buildEventsWhere(eventFilters, s.Filter.EventsOrder) + + subQuery := fmt.Sprintf( + "SELECT %s,\n"+ + " MIN(%s) AS first_event_ts,\n"+ + " MAX(%s) AS last_event_ts\n"+ + "FROM %s AS main\n"+ + "WHERE main.project_id = %%(project_id)s\n"+ + " AND %s >= toDateTime(%%(start_time)s/1000)\n"+ + " AND %s <= toDateTime(%%(end_time)s/1000)\n"+ + " AND (%s)\n"+ + "GROUP BY %s\n"+ + "HAVING %s", + ColEventSessionID, + ColEventTime, + ColEventTime, + TableEvents, + ColEventTime, + ColEventTime, + strings.Join(eventWhere, " OR "), + ColEventSessionID, + seqHaving, + ) + + joinQuery := fmt.Sprintf( + "SELECT *\n"+ + "FROM %s AS s\n"+ + "INNER JOIN (\n"+ + " SELECT DISTINCT ev.session_id, ev.`$current_url` AS url_path\n"+ + " FROM %s AS ev\n"+ + " WHERE ev.created_at >= toDateTime(%%(start_time)s/1000)\n"+ + " AND ev.created_at <= toDateTime(%%(end_time)s/1000)\n"+ + " AND ev.project_id = %%(project_id)s\n"+ + " AND ev.`$event_name` = 'LOCATION'\n"+ + ") AS extra_event USING (session_id)\n"+ + "WHERE s.project_id = %%(project_id)s\n"+ + " AND isNotNull(s.duration)\n"+ + " AND s.datetime >= toDateTime(%%(start_time)s/1000)\n"+ + " AND s.datetime <= toDateTime(%%(end_time)s/1000)\n", + TableSessions, + TableEvents, + ) + + if len(sessionWhere) > 0 { + joinQuery += " AND " + strings.Join(sessionWhere, " AND ") + "\n" + } + + main := fmt.Sprintf( + "SELECT s.session_id AS session_id, s.url_path\n"+ + "FROM (\n%s\n) AS f\n"+ + "INNER JOIN (\n%s) AS s\n"+ + " ON (s.session_id = f.session_id)\n", + subQuery, + joinQuery, + ) + + final := fmt.Sprintf( + "SELECT COUNT(DISTINCT url_path) OVER () AS main_count,\n"+ + " url_path AS name,\n"+ + " COUNT(DISTINCT session_id) AS total,\n"+ + " COALESCE(SUM(COUNT(DISTINCT session_id)) OVER (), 0) AS total_count\n"+ + "FROM (\n%s) AS filtered_sessions\n"+ + "GROUP BY url_path\n"+ + "ORDER BY total DESC\n"+ + "LIMIT 200 OFFSET 0;", + main, + ) + + return final +} + +func partitionFilters(filters []Filter) (sessionFilters []Filter, eventFilters []Filter) { + for _, f := range filters { + if f.IsEvent { + eventFilters = append(eventFilters, f) + } else { + sessionFilters = append(sessionFilters, f) + } + } + return +} + +func buildSessionWhere(filters []Filter) []string { + var conds []string + for _, f := range filters { + switch f.Type { + case FilterUserCountry: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCountry, concatValues(f.Value))) + case FilterUserCity: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCity, concatValues(f.Value))) + case FilterUserState: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserState, concatValues(f.Value))) + case FilterUserId: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserID, concatValues(f.Value))) + case FilterUserAnonymousId: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserAnonymousID, concatValues(f.Value))) + case FilterUserOs: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserOS, concatValues(f.Value))) + case FilterUserBrowser: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserBrowser, concatValues(f.Value))) + case FilterUserDevice: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDevice, concatValues(f.Value))) + case FilterPlatform: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDeviceType, concatValues(f.Value))) + case FilterRevId: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColRevID, concatValues(f.Value))) + case FilterReferrer: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColBaseReferrer, concatValues(f.Value))) + case FilterDuration: + if len(f.Value) == 2 { + conds = append(conds, fmt.Sprintf("%s >= '%s'", ColDuration, f.Value[0])) + conds = append(conds, fmt.Sprintf("%s <= '%s'", ColDuration, f.Value[1])) + } + case FilterUtmSource: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmSource, concatValues(f.Value))) + case FilterUtmMedium: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmMedium, concatValues(f.Value))) + case FilterUtmCampaign: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmCampaign, concatValues(f.Value))) + case FilterMetadata: + conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColMetadata1, concatValues(f.Value))) + } + } + // adding /n to each condition for better readability, can be removed. + for i := range conds { + conds[i] += "\n" + } + return conds +} + +func concatValues(v []string) string { + return strings.Join(v, "") +} + +func buildEventsWhere(filters []Filter, order EventOrder) (eventConditions []string, having string) { + basicEventTypes := "(" + + strings.Join([]string{ + fmt.Sprintf("%s = 'CLICK'", ColEventName), + fmt.Sprintf("%s = 'INPUT'", ColEventName), + fmt.Sprintf("%s = 'LOCATION'", ColEventName), + fmt.Sprintf("%s = 'CUSTOM'", ColEventName), + fmt.Sprintf("%s = 'REQUEST'", ColEventName), + }, " OR ") + ")" + + var seq []string + for _, f := range filters { + switch f.Type { + case FilterClick: + seq = append(seq, seqCond("CLICK", "selector", f)) + case FilterInput: + seq = append(seq, seqCond("INPUT", "label", f)) + case FilterLocation: + seq = append(seq, seqCond("LOCATION", "url_path", f)) + case FilterCustom: + seq = append(seq, seqCond("CUSTOM", "name", f)) + case FilterFetch: + seq = append(seq, seqFetchCond("REQUEST", f)) + case FilterFetchStatusCode: + seq = append(seq, seqCond("REQUEST", "status", f)) + default: + seq = append(seq, fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(string(f.Type)))) + } + } + eventConditions = []string{basicEventTypes} + + // then => sequenceMatch + // or => OR + // and => AND + switch order { + case EventOrderThen: + var pattern []string + for i := range seq { + pattern = append(pattern, fmt.Sprintf("(?%d)", i+1)) + } + having = fmt.Sprintf("sequenceMatch('%s')(\n%s,\n%s)", + strings.Join(pattern, ""), fmt.Sprintf("toUnixTimestamp(%s)", ColEventTime), strings.Join(seq, ",\n")) + case EventOrderAnd: + // build AND + having = strings.Join(seq, " AND ") + default: + // default => OR + var orParts []string + for _, p := range seq { + orParts = append(orParts, "("+p+")") + } + having = strings.Join(orParts, " OR ") + } + return +} + +func seqCond(eventName, key string, f Filter) string { + op := parseOperator(f.Operator) + return fmt.Sprintf("(%s = '%s' AND JSONExtractString(toString(%s), '%s') %s '%s')", + ColEventName, strings.ToUpper(eventName), ColEventProperties, key, op, concatValues(f.Value)) +} + +func seqFetchCond(eventName string, f Filter) string { + w := []string{fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(eventName))} + var extras []string + for _, c := range f.Filters { + switch c.Type { + case FilterFetch: + if len(c.Value) > 0 { + extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventURLPath, concatValues(c.Value))) + } + case FilterFetchStatusCode: + if len(c.Value) > 0 { + extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventStatus, concatValues(c.Value))) + } + default: + // placeholder if needed + } + } + if len(extras) > 0 { + w = append(w, strings.Join(extras, " AND ")) + } + return "(" + strings.Join(w, " AND ") + ")" +} + +func parseOperator(op string) string { + // TODO implement this properly + switch strings.ToLower(op) { + case OperatorStringContains: + return "LIKE" + case OperatorStringIs, OperatorStringOn, "=", OperatorStringOnAny: + return "=" + case OperatorStringStartsWith: + return "LIKE" + case OperatorStringEndsWith: + // might interpret differently in real impl + return "=" + default: + return "=" + } +} diff --git a/backend/pkg/analytics/query/timeseries.go b/backend/pkg/analytics/query/timeseries.go new file mode 100644 index 000000000..73d49f6cb --- /dev/null +++ b/backend/pkg/analytics/query/timeseries.go @@ -0,0 +1,42 @@ +package query + +import "fmt" + +type TimeSeriesQueryBuilder struct{} + +func (t TimeSeriesQueryBuilder) Build(p MetricPayload) string { + switch p.MetricOf { + case "sessionCount": + return t.buildSessionCountQuery(p) + case "userCount": + return t.buildUserCountQuery(p) + default: + return "" + } +} + +func (TimeSeriesQueryBuilder) buildSessionCountQuery(p MetricPayload) string { + subquery := buildEventSubquery(p) + return fmt.Sprintf(`SELECT toUnixTimestamp( + toStartOfInterval(processed_sessions.datetime, INTERVAL 115199 second) +) * 1000 AS timestamp, +COUNT(processed_sessions.session_id) AS count +FROM ( + %s +) AS processed_sessions +GROUP BY timestamp +ORDER BY timestamp;`, subquery) +} + +func (TimeSeriesQueryBuilder) buildUserCountQuery(p MetricPayload) string { + subquery := buildEventSubquery(p) + return fmt.Sprintf(`SELECT toUnixTimestamp( + toStartOfInterval(processed_sessions.datetime, INTERVAL 115199 second) +) * 1000 AS timestamp, +COUNT(DISTINCT processed_sessions.user_id) AS count +FROM ( + %s +) AS processed_sessions +GROUP BY timestamp +ORDER BY timestamp;`, subquery) +} diff --git a/backend/pkg/db/postgres/pool/pool.go b/backend/pkg/db/postgres/pool/pool.go index f6d82e6c3..747654695 100644 --- a/backend/pkg/db/postgres/pool/pool.go +++ b/backend/pkg/db/postgres/pool/pool.go @@ -110,6 +110,15 @@ func (tx *Tx) TxExec(sql string, args ...interface{}) error { return err } +func (tx *Tx) TxQuery(sql string, args ...interface{}) (pgx.Rows, error) { + start := time.Now() + res, err := tx.origTx.Query(getTimeoutContext(), sql, args...) + method, table := methodName(sql) + tx.metrics.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), method, table) + tx.metrics.IncreaseTotalRequests(method, table) + return res, err +} + func (tx *Tx) TxQueryRow(sql string, args ...interface{}) pgx.Row { start := time.Now() res := tx.origTx.QueryRow(context.Background(), sql, args...) diff --git a/backend/pkg/server/api/middleware.go b/backend/pkg/server/api/middleware.go index 423e7e0d9..518645f14 100644 --- a/backend/pkg/server/api/middleware.go +++ b/backend/pkg/server/api/middleware.go @@ -23,11 +23,11 @@ func (e *routerImpl) healthMiddleware(next http.Handler) http.Handler { func (e *routerImpl) corsMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if e.cfg.UseAccessControlHeaders { - // Prepare headers for preflight requests - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Methods", "POST,GET,PATCH,DELETE") - w.Header().Set("Access-Control-Allow-Headers", "Content-Type,Authorization,Content-Encoding") + if origin := r.Header.Get("Origin"); origin == "http://localhost:3333" { + w.Header().Set("Access-Control-Allow-Origin", origin) + w.Header().Set("Access-Control-Allow-Methods", "POST, GET, PATCH, DELETE, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, Content-Encoding") + w.Header().Set("Access-Control-Allow-Credentials", "true") } if r.Method == http.MethodOptions { w.Header().Set("Cache-Control", "max-age=86400")