feat(analytics): base structure

This commit is contained in:
Shekar Siri 2025-02-18 11:42:39 +01:00
parent 4709918254
commit b0bf357be1
19 changed files with 2282 additions and 1015 deletions

View file

@ -2,71 +2,49 @@ package main
import (
"context"
"os"
"os/signal"
"syscall"
analyticsConfig "openreplay/backend/internal/config/analytics"
"openreplay/backend/pkg/analytics"
"openreplay/backend/pkg/analytics/db"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/metrics"
analyticsMetrics "openreplay/backend/pkg/metrics/analytics"
databaseMetrics "openreplay/backend/pkg/metrics/database"
"openreplay/backend/pkg/metrics/web"
"openreplay/backend/pkg/server"
"openreplay/backend/pkg/server/api"
)
func main() {
ctx := context.Background()
log := logger.New()
log.Info(ctx, "Cacher service started")
cfg := analyticsConfig.New(log)
webMetrics := web.New("analytics")
metrics.New(log, append(webMetrics.List(), append(analyticsMetrics.List(), databaseMetrics.List()...)...))
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
for {
select {
case sig := <-sigchan:
log.Error(ctx, "Caught signal %v: terminating", sig)
os.Exit(0)
}
pgConn, err := pool.New(cfg.Postgres.String())
if err != nil {
log.Fatal(ctx, "can't init postgres connection: %s", err)
}
}
defer pgConn.Close()
//
//import (
// "context"
//
// analyticsConfig "openreplay/backend/internal/config/analytics"
// "openreplay/backend/pkg/analytics"
// "openreplay/backend/pkg/db/postgres/pool"
// "openreplay/backend/pkg/logger"
// "openreplay/backend/pkg/metrics"
// "openreplay/backend/pkg/metrics/database"
// "openreplay/backend/pkg/metrics/web"
// "openreplay/backend/pkg/server"
// "openreplay/backend/pkg/server/api"
//)
//
//func main() {
// ctx := context.Background()
// log := logger.New()
// cfg := analyticsConfig.New(log)
// // Observability
// webMetrics := web.New("analytics")
// dbMetrics := database.New("analytics")
// metrics.New(log, append(webMetrics.List(), dbMetrics.List()...))
//
// pgConn, err := pool.New(dbMetrics, cfg.Postgres.String())
// if err != nil {
// log.Fatal(ctx, "can't init postgres connection: %s", err)
// }
// defer pgConn.Close()
//
// builder, err := analytics.NewServiceBuilder(log, cfg, webMetrics, dbMetrics, pgConn)
// if err != nil {
// log.Fatal(ctx, "can't init services: %s", err)
// }
//
// router, err := api.NewRouter(&cfg.HTTP, log)
// if err != nil {
// log.Fatal(ctx, "failed while creating router: %s", err)
// }
// router.AddHandlers(api.NoPrefix, builder.CardsAPI, builder.DashboardsAPI, builder.ChartsAPI)
// router.AddMiddlewares(builder.Auth.Middleware, builder.RateLimiter.Middleware, builder.AuditTrail.Middleware)
//
// server.Run(ctx, log, &cfg.HTTP, router)
//}
chConn, err := db.NewConnector(cfg.Clickhouse)
if err != nil {
log.Fatal(ctx, "can't init clickhouse connection: %s", err)
}
defer chConn.Stop()
builder, err := analytics.NewServiceBuilder(log, cfg, webMetrics, pgConn, chConn)
if err != nil {
log.Fatal(ctx, "can't init services: %s", err)
}
router, err := api.NewRouter(&cfg.HTTP, log)
if err != nil {
log.Fatal(ctx, "failed while creating router: %s", err)
}
router.AddHandlers(api.NoPrefix, builder.CardsAPI, builder.DashboardsAPI, builder.ChartsAPI)
router.AddMiddlewares(builder.Auth.Middleware, builder.RateLimiter.Middleware, builder.AuditTrail.Middleware)
server.Run(ctx, log, &cfg.HTTP, router)
}

View file

@ -14,6 +14,7 @@ import (
type Config struct {
common.Config
common.Postgres
common.Clickhouse
redis.Redis
objectstorage.ObjectsConfig
common.HTTP

View file

@ -3,6 +3,7 @@ package analytics
import (
"github.com/go-playground/validator/v10"
"openreplay/backend/pkg/analytics/charts"
"openreplay/backend/pkg/analytics/db"
"openreplay/backend/pkg/metrics/database"
"time"
@ -27,13 +28,14 @@ type ServicesBuilder struct {
ChartsAPI api.Handlers
}
func NewServiceBuilder(log logger.Logger, cfg *analytics.Config, webMetrics web.Web, dbMetrics database.Database, pgconn pool.Pool) (*ServicesBuilder, error) {
func NewServiceBuilder(log logger.Logger, cfg *analytics.Config, webMetrics web.Web, dbMetrics database.Database, pgconn pool.Pool, chConn db.Connector) (*ServicesBuilder, error) {
responser := api.NewResponser(webMetrics)
audiTrail, err := tracer.NewTracer(log, pgconn, dbMetrics)
if err != nil {
return nil, err
}
reqValidator := validator.New()
cardsService, err := cards.New(log, pgconn)
if err != nil {
return nil, err
@ -42,6 +44,7 @@ func NewServiceBuilder(log logger.Logger, cfg *analytics.Config, webMetrics web.
if err != nil {
return nil, err
}
dashboardsService, err := dashboards.New(log, pgconn)
if err != nil {
return nil, err
@ -50,7 +53,8 @@ func NewServiceBuilder(log logger.Logger, cfg *analytics.Config, webMetrics web.
if err != nil {
return nil, err
}
chartsService, err := charts.New(log, pgconn)
chartsService, err := charts.New(log, pgconn, chConn)
if err != nil {
return nil, err
}
@ -58,6 +62,7 @@ func NewServiceBuilder(log logger.Logger, cfg *analytics.Config, webMetrics web.
if err != nil {
return nil, err
}
return &ServicesBuilder{
Auth: auth.NewAuth(log, cfg.JWTSecret, cfg.JWTSpotSecret, pgconn, nil, api.NoPrefix),
RateLimiter: limiter.NewUserRateLimiter(10, 30, 1*time.Minute, 5*time.Minute),

View file

@ -67,8 +67,8 @@ type CardSeries struct {
}
type SeriesFilter struct {
EventOrder string `json:"eventOrder" validate:"required,oneof=then or and"`
Filters []FilterItem `json:"filters"`
EventsOrder string `json:"eventsOrder" validate:"required,oneof=then or and"`
Filters []FilterItem `json:"filters"`
}
type FilterItem struct {

View file

@ -1,155 +1,51 @@
package charts
import (
"encoding/json"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"openreplay/backend/pkg/analytics/cards"
"log"
"openreplay/backend/pkg/analytics/db"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/logger"
)
type Charts interface {
GetData(projectId int, userId uint64, req *GetCardChartDataRequest) ([]DataPoint, error)
GetData(projectId int, userId uint64, req *MetricPayload) (interface{}, error)
}
type chartsImpl struct {
log logger.Logger
pgconn pool.Pool
chConn driver.Conn
chConn db.Connector
}
func New(log logger.Logger, conn pool.Pool) (Charts, error) {
func New(log logger.Logger, conn pool.Pool, chConn db.Connector) (Charts, error) {
return &chartsImpl{
log: log,
pgconn: conn,
chConn: chConn,
}, nil
}
// GetData def get_chart()
func (s *chartsImpl) GetData(projectId int, userID uint64, req *GetCardChartDataRequest) ([]DataPoint, error) {
func (s *chartsImpl) GetData(projectId int, userID uint64, req *MetricPayload) (interface{}, error) {
if req == nil {
return nil, fmt.Errorf("request is empty")
}
switch {
case req.MetricType == "funnel":
return nil, fmt.Errorf("funnel metric type is not supported yet")
case req.MetricType == "heatMap":
return nil, fmt.Errorf("heatMap metric type is not supported yet")
case req.MetricType == "pathAnalysis":
return nil, fmt.Errorf("pathAnalysis metric type is not supported yet")
case req.MetricType == "timeseries":
return s.getTimeseriesCharts(projectId, userID, req)
case req.MetricType == "table":
return nil, fmt.Errorf("table metric type is not supported yet")
case req.MetricType == "errors":
fallthrough
case req.MetricType == "performance":
fallthrough
case req.MetricType == "resources":
fallthrough
case req.MetricType == "webVitals":
return s.getMetric(projectId, userID, req)
case req.MetricType == "retention":
return nil, fmt.Errorf("retention metric type is not supported yet")
case req.MetricType == "stickiness":
return nil, fmt.Errorf("stickiness metric type is not supported yet")
payload := &Payload{
ProjectId: projectId,
UserId: userID,
MetricPayload: req,
}
jsonInput := `
{
"data": [
{
"timestamp": 1733934939000,
"Series A": 100,
"Series B": 200
},
{
"timestamp": 1733935939000,
"Series A": 150,
"Series B": 250
}
]
}`
var resp GetCardChartDataResponse
if err := json.Unmarshal([]byte(jsonInput), &resp); err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
qb, err := NewQueryBuilder(payload)
if err != nil {
log.Fatalf("Error creating query builder: %v", err)
}
return resp.Data, nil
}
func (s *chartsImpl) getMetric(projectID int, userID uint64, req *GetCardChartDataRequest) ([]DataPoint, error) {
switch req.MetricOf {
case "countSessions": // metrics.get_processed_sessions
return nil, fmt.Errorf("countSessions metric type is not supported yet")
case "avgVisitedPages": // metrics.get_user_activity_avg_visited_pages
return nil, fmt.Errorf("avgVisitedPages metric type is not supported yet")
case "countRequests": // metrics.get_top_metrics_count_requests
return nil, fmt.Errorf("countRequests metric type is not supported yet")
case "impactedSessionsByJsErrors": // metrics.get_impacted_sessions_by_js_errors
return nil, fmt.Errorf("impactedSessionsByJsErrors metric type is not supported yet")
case "domainsErrors4xx": // metrics.get_domains_errors_4xx
return nil, fmt.Errorf("domainsErrors4xx metric type is not supported yet")
case "domainsErrors5xx": // metrics.get_domains_errors_5xx
return nil, fmt.Errorf("domainsErrors5xx metric type is not supported yet")
case "errorsPerDomains": // metrics.get_errors_per_domains
return nil, fmt.Errorf("errorsPerDomains metric type is not supported yet")
case "errorsPerType": // metrics.get_errors_per_type
return nil, fmt.Errorf("errorsPerType metric type is not supported yet")
}
return nil, fmt.Errorf("metric type is not supported yet")
}
func (s *chartsImpl) getTimeseriesCharts(projectID int, userID uint64, req *GetCardChartDataRequest) ([]DataPoint, error) {
var dataPoints []DataPoint
var stepSize = getStepSize(req.StartTimestamp, req.EndTimestamp, req.Density, true, 1000)
var query string
switch req.MetricOf {
case "sessionCount":
query = fmt.Sprintf(`
SELECT
toUnixTimestamp(toStartOfInterval(processed_sessions.datetime, INTERVAL %d second)) * 1000 AS timestamp,
COUNT(processed_sessions.session_id) AS count
FROM (
SELECT
s.session_id AS session_id,
s.datetime AS datetime
%s
) AS processed_sessions
GROUP BY timestamp
ORDER BY timestamp;
`, stepSize, "query_part") // Replace "query_part" with the actual query part
default:
return nil, fmt.Errorf("unsupported metric: %s", req.MetricOf)
}
fmt.Printf("stepSize: %v\n", stepSize)
for _, series := range req.Series {
res, err := s.searchSeries(projectID, series)
if err != nil {
return nil, fmt.Errorf("failed to search series: %w", err)
}
if seriesData, ok := res.([]DataPoint); ok {
dataPoints = append(dataPoints, seriesData...)
} else {
return nil, fmt.Errorf("unexpected data format from searchSeries")
}
}
return dataPoints, nil
}
func (s *chartsImpl) searchSeries(projectID int, series cards.CardSeries) (interface{}, error) {
// Placeholder implementation
return []DataPoint{}, nil
resp, err := qb.Execute(payload, s.chConn)
if err != nil {
log.Fatalf("Error building query: %v", err)
}
return resp, nil
}

View file

@ -1,7 +1,6 @@
package charts
import (
"context"
"fmt"
"log"
"strconv"
@ -162,7 +161,7 @@ def get_main_sessions_table(timestamp=0):
and timestamp and timestamp >= TimeUTC.now(delta_days=-7) else "experimental.sessions"
*/
func getMainSessionsTable(timestamp int64) string {
return "product_analytics.sessions"
return "experimental.sessions"
}
// Function to convert named parameters to positional parameters
@ -272,7 +271,7 @@ func progress(oldVal, newVal uint64) float64 {
}
// Trying to find a common part
func parse(projectID uint64, startTs, endTs int64, density int, args map[string]interface{}) ([]string, []string, map[string]interface{}) {
func parse(projectID int, startTs, endTs int64, density int, args map[string]interface{}) ([]string, []string, map[string]interface{}) {
stepSize := getStepSize(startTs, endTs, density, false, 1000)
chSubQuery := getBasicConstraints("sessions", true, false, args, "project_id")
chSubQueryChart := getBasicConstraints("sessions", true, true, args, "project_id")
@ -293,134 +292,136 @@ func parse(projectID uint64, startTs, endTs int64, density int, args map[string]
}
// Sessions trend
func (s *chartsImpl) getProcessedSessions(projectID uint64, startTs, endTs int64, density int, args map[string]interface{}) {
chQuery := `
SELECT toUnixTimestamp(toStartOfInterval(sessions.datetime, INTERVAL :step_size second)) * 1000 AS timestamp,
COUNT(DISTINCT sessions.session_id) AS value
FROM :main_sessions_table AS sessions
WHERE :sub_query_chart
GROUP BY timestamp
ORDER BY timestamp;
`
chSubQuery, chSubQueryChart, params := parse(projectID, startTs, endTs, density, args)
chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1)
chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQueryChart, " AND "), -1)
preparedQuery, preparedArgs := replaceNamedParams(chQuery, params)
rows, err := s.chConn.Query(context.Background(), preparedQuery, preparedArgs)
if err != nil {
log.Fatalf("Error executing query: %v", err)
}
preparedRows := make([]map[string]interface{}, 0)
var sum uint64
for rows.Next() {
var timestamp, value uint64
if err := rows.Scan(&timestamp, &value); err != nil {
log.Fatalf("Error scanning row: %v", err)
}
fmt.Printf("Timestamp: %d, Value: %d\n", timestamp, value)
sum += value
preparedRows = append(preparedRows, map[string]interface{}{"timestamp": timestamp, "value": value})
}
results := map[string]interface{}{
"value": sum,
"chart": CompleteMissingSteps(startTs, endTs, int(density), map[string]interface{}{"value": 0}, preparedRows, "timestamp", 1000),
}
diff := endTs - startTs
endTs = startTs
startTs = endTs - diff
log.Println(results)
chQuery = fmt.Sprintf(`
SELECT COUNT(1) AS count
FROM :main_sessions_table AS sessions
WHERE :sub_query_chart;
`)
chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1)
chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQuery, " AND "), -1)
var count uint64
preparedQuery, preparedArgs = replaceNamedParams(chQuery, params)
if err := s.chConn.QueryRow(context.Background(), preparedQuery, preparedArgs).Scan(&count); err != nil {
log.Fatalf("Error executing query: %v", err)
}
results["progress"] = progress(count, results["value"].(uint64))
// TODO: this should be returned in any case
results["unit"] = "COUNT"
fmt.Println(results)
}
// Users trend
func (c *chartsImpl) getUniqueUsers(projectID uint64, startTs, endTs uint64, density uint64, args map[string]interface{}) {
chQuery := `
SELECT toUnixTimestamp(toStartOfInterval(sessions.datetime, INTERVAL :step_size second)) * 1000 AS timestamp,
COUNT(DISTINCT sessions.user_id) AS value
FROM :main_sessions_table AS sessions
WHERE :sub_query_chart
GROUP BY timestamp
ORDER BY timestamp;
`
chSubQuery, chSubQueryChart, params := parse(projectID, startTs, endTs, density, args)
chSubQueryChart = append(chSubQueryChart, []string{"isNotNull(sessions.user_id)", "sessions.user_id!=''"}...)
chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1)
chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQueryChart, " AND "), -1)
preparedQuery, preparedArgs := replaceNamedParams(chQuery, params)
rows, err := c.chConn.Query(context.Background(), preparedQuery, preparedArgs)
if err != nil {
log.Fatalf("Error executing query: %v", err)
}
preparedRows := make([]map[string]interface{}, 0)
var sum uint64
for rows.Next() {
var timestamp, value uint64
if err := rows.Scan(&timestamp, &value); err != nil {
log.Fatalf("Error scanning row: %v", err)
}
fmt.Printf("Timestamp: %d, Value: %d\n", timestamp, value)
sum += value
preparedRows = append(preparedRows, map[string]interface{}{"timestamp": timestamp, "value": value})
}
results := map[string]interface{}{
"value": sum,
"chart": CompleteMissingSteps(startTs, endTs, int(density), map[string]interface{}{"value": 0}, preparedRows, "timestamp", 1000),
}
diff := endTs - startTs
endTs = startTs
startTs = endTs - diff
log.Println(results)
chQuery = fmt.Sprintf(`
SELECT COUNT(DISTINCT user_id) AS count
FROM :main_sessions_table AS sessions
WHERE :sub_query_chart;
`)
chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1)
chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQuery, " AND "), -1)
var count uint64
preparedQuery, preparedArgs = replaceNamedParams(chQuery, params)
if err := c.chConn.QueryRow(context.Background(), preparedQuery, preparedArgs).Scan(&count); err != nil {
log.Fatalf("Error executing query: %v", err)
}
results["progress"] = progress(count, results["value"].(uint64))
// TODO: this should be returned in any case
results["unit"] = "COUNT"
fmt.Println(results)
return
}
//func (s *chartsImpl) getProcessedSessions(projectID int, startTs, endTs int64, density int, args map[string]interface{}) (interface{}, error) {
// chQuery := `
// SELECT toUnixTimestamp(toStartOfInterval(sessions.datetime, INTERVAL :step_size second)) * 1000 AS timestamp,
// COUNT(DISTINCT sessions.session_id) AS value
// FROM :main_sessions_table AS sessions
// WHERE :sub_query_chart
// GROUP BY timestamp
// ORDER BY timestamp;
// `
// chSubQuery, chSubQueryChart, params := parse(projectID, startTs, endTs, density, args)
//
// chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1)
// chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQueryChart, " AND "), -1)
//
// preparedQuery, preparedArgs := replaceNamedParams(chQuery, params)
// rows, err := s.chConn.Query(context.Background(), preparedQuery, preparedArgs)
// if err != nil {
// log.Fatalf("Error executing query: %v", err)
// }
// preparedRows := make([]map[string]interface{}, 0)
// var sum uint64
// for rows.Next() {
// var timestamp, value uint64
// if err := rows.Scan(&timestamp, &value); err != nil {
// log.Fatalf("Error scanning row: %v", err)
// }
// fmt.Printf("Timestamp: %d, Value: %d\n", timestamp, value)
// sum += value
// preparedRows = append(preparedRows, map[string]interface{}{"timestamp": timestamp, "value": value})
// }
//
// results := map[string]interface{}{
// "value": sum,
// "chart": CompleteMissingSteps(startTs, endTs, int(density), map[string]interface{}{"value": 0}, preparedRows, "timestamp", 1000),
// }
//
// diff := endTs - startTs
// endTs = startTs
// startTs = endTs - diff
//
// log.Println(results)
//
// chQuery = fmt.Sprintf(`
// SELECT COUNT(1) AS count
// FROM :main_sessions_table AS sessions
// WHERE :sub_query_chart;
// `)
// chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1)
// chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQuery, " AND "), -1)
//
// var count uint64
//
// preparedQuery, preparedArgs = replaceNamedParams(chQuery, params)
// if err := s.chConn.QueryRow(context.Background(), preparedQuery, preparedArgs).Scan(&count); err != nil {
// log.Fatalf("Error executing query: %v", err)
// }
//
// results["progress"] = progress(count, results["value"].(uint64))
//
// // TODO: this should be returned in any case
// results["unit"] = "COUNT"
// fmt.Println(results)
//
// return results, nil
//}
//
//// Users trend
//func (s *chartsImpl) getUniqueUsers(projectID int, startTs, endTs int64, density int, args map[string]interface{}) (interface{}, error) {
// chQuery := `
// SELECT toUnixTimestamp(toStartOfInterval(sessions.datetime, INTERVAL :step_size second)) * 1000 AS timestamp,
// COUNT(DISTINCT sessions.user_id) AS value
// FROM :main_sessions_table AS sessions
// WHERE :sub_query_chart
// GROUP BY timestamp
// ORDER BY timestamp;
// `
// chSubQuery, chSubQueryChart, params := parse(projectID, startTs, endTs, density, args)
// chSubQueryChart = append(chSubQueryChart, []string{"isNotNull(sessions.user_id)", "sessions.user_id!=''"}...)
//
// chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1)
// chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQueryChart, " AND "), -1)
//
// preparedQuery, preparedArgs := replaceNamedParams(chQuery, params)
// rows, err := s.chConn.Query(context.Background(), preparedQuery, preparedArgs)
// if err != nil {
// log.Fatalf("Error executing query: %v", err)
// }
// preparedRows := make([]map[string]interface{}, 0)
// var sum uint64
// for rows.Next() {
// var timestamp, value uint64
// if err := rows.Scan(&timestamp, &value); err != nil {
// log.Fatalf("Error scanning row: %v", err)
// }
// fmt.Printf("Timestamp: %d, Value: %d\n", timestamp, value)
// sum += value
// preparedRows = append(preparedRows, map[string]interface{}{"timestamp": timestamp, "value": value})
// }
//
// results := map[string]interface{}{
// "value": sum,
// "chart": CompleteMissingSteps(startTs, endTs, int(density), map[string]interface{}{"value": 0}, preparedRows, "timestamp", 1000),
// }
//
// diff := endTs - startTs
// endTs = startTs
// startTs = endTs - diff
//
// log.Println(results)
//
// chQuery = fmt.Sprintf(`
// SELECT COUNT(DISTINCT user_id) AS count
// FROM :main_sessions_table AS sessions
// WHERE :sub_query_chart;
// `)
// chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1)
// chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQuery, " AND "), -1)
//
// var count uint64
//
// preparedQuery, preparedArgs = replaceNamedParams(chQuery, params)
// if err := s.chConn.QueryRow(context.Background(), preparedQuery, preparedArgs).Scan(&count); err != nil {
// log.Fatalf("Error executing query: %v", err)
// }
//
// results["progress"] = progress(count, results["value"].(uint64))
//
// // TODO: this should be returned in any case
// results["unit"] = "COUNT"
// fmt.Println(results)
//
// return results, nil
//}

View file

@ -74,7 +74,7 @@ func (e *handlersImpl) getCardChartData(w http.ResponseWriter, r *http.Request)
}
bodySize = len(bodyBytes)
req := &GetCardChartDataRequest{}
req := &MetricPayload{}
if err := json.Unmarshal(bodyBytes, req); err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return

View file

@ -0,0 +1,9 @@
package charts
import "openreplay/backend/pkg/analytics/db"
type FunnelQueryBuilder struct{}
func (f FunnelQueryBuilder) Execute(p *Payload, conn db.Connector) (interface{}, error) {
return "-- Funnel query placeholder", nil
}

View file

@ -0,0 +1,253 @@
package charts
import (
"fmt"
"openreplay/backend/pkg/analytics/db"
"strings"
)
type TableQueryBuilder struct{}
func (t TableQueryBuilder) Execute(p *Payload, conn db.Connector) (interface{}, error) {
return t.buildQuery(p)
}
func (t TableQueryBuilder) buildQuery(r *Payload) (string, error) {
s := r.Series[0]
sessionFilters, eventFilters := partitionFilters(s.Filter.Filters)
sessionWhere := buildSessionWhere(sessionFilters)
eventWhere, seqHaving := buildEventsWhere(eventFilters, s.Filter.EventsOrder)
subQuery := fmt.Sprintf(
"SELECT %s,\n"+
" MIN(%s) AS first_event_ts,\n"+
" MAX(%s) AS last_event_ts\n"+
"FROM %s AS main\n"+
"WHERE main.project_id = %%(project_id)s\n"+
" AND %s >= toDateTime(%%(start_time)s/1000)\n"+
" AND %s <= toDateTime(%%(end_time)s/1000)\n"+
" AND (%s)\n"+
"GROUP BY %s\n"+
"HAVING %s",
ColEventSessionID,
ColEventTime,
ColEventTime,
TableEvents,
ColEventTime,
ColEventTime,
strings.Join(eventWhere, " OR "),
ColEventSessionID,
seqHaving,
)
joinQuery := fmt.Sprintf(
"SELECT *\n"+
"FROM %s AS s\n"+
"INNER JOIN (\n"+
" SELECT DISTINCT ev.session_id, ev.`$current_url` AS url_path\n"+
" FROM %s AS ev\n"+
" WHERE ev.created_at >= toDateTime(%%(start_time)s/1000)\n"+
" AND ev.created_at <= toDateTime(%%(end_time)s/1000)\n"+
" AND ev.project_id = %%(project_id)s\n"+
" AND ev.`$event_name` = 'LOCATION'\n"+
") AS extra_event USING (session_id)\n"+
"WHERE s.project_id = %%(project_id)s\n"+
" AND isNotNull(s.duration)\n"+
" AND s.datetime >= toDateTime(%%(start_time)s/1000)\n"+
" AND s.datetime <= toDateTime(%%(end_time)s/1000)\n",
TableSessions,
TableEvents,
)
if len(sessionWhere) > 0 {
joinQuery += " AND " + strings.Join(sessionWhere, " AND ") + "\n"
}
main := fmt.Sprintf(
"SELECT s.session_id AS session_id, s.url_path\n"+
"FROM (\n%s\n) AS f\n"+
"INNER JOIN (\n%s) AS s\n"+
" ON (s.session_id = f.session_id)\n",
subQuery,
joinQuery,
)
final := fmt.Sprintf(
"SELECT COUNT(DISTINCT url_path) OVER () AS main_count,\n"+
" url_path AS name,\n"+
" COUNT(DISTINCT session_id) AS total,\n"+
" COALESCE(SUM(COUNT(DISTINCT session_id)) OVER (), 0) AS total_count\n"+
"FROM (\n%s) AS filtered_sessions\n"+
"GROUP BY url_path\n"+
"ORDER BY total DESC\n"+
"LIMIT 200 OFFSET 0;",
main,
)
return final, nil
}
func partitionFilters(filters []Filter) (sessionFilters []Filter, eventFilters []Filter) {
for _, f := range filters {
if f.IsEvent {
eventFilters = append(eventFilters, f)
} else {
sessionFilters = append(sessionFilters, f)
}
}
return
}
func buildSessionWhere(filters []Filter) []string {
var conds []string
for _, f := range filters {
switch f.Type {
case FilterUserCountry:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCountry, concatValues(f.Value)))
case FilterUserCity:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCity, concatValues(f.Value)))
case FilterUserState:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserState, concatValues(f.Value)))
case FilterUserId:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserID, concatValues(f.Value)))
case FilterUserAnonymousId:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserAnonymousID, concatValues(f.Value)))
case FilterUserOs:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserOS, concatValues(f.Value)))
case FilterUserBrowser:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserBrowser, concatValues(f.Value)))
case FilterUserDevice:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDevice, concatValues(f.Value)))
case FilterPlatform:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDeviceType, concatValues(f.Value)))
case FilterRevId:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColRevID, concatValues(f.Value)))
case FilterReferrer:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColBaseReferrer, concatValues(f.Value)))
case FilterDuration:
if len(f.Value) == 2 {
conds = append(conds, fmt.Sprintf("%s >= '%s'", ColDuration, f.Value[0]))
conds = append(conds, fmt.Sprintf("%s <= '%s'", ColDuration, f.Value[1]))
}
case FilterUtmSource:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmSource, concatValues(f.Value)))
case FilterUtmMedium:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmMedium, concatValues(f.Value)))
case FilterUtmCampaign:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmCampaign, concatValues(f.Value)))
case FilterMetadata:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColMetadata1, concatValues(f.Value)))
}
}
// adding /n to each condition for better readability, can be removed.
for i := range conds {
conds[i] += "\n"
}
return conds
}
func concatValues(v []string) string {
return strings.Join(v, "")
}
func buildEventsWhere(filters []Filter, order EventOrder) (eventConditions []string, having string) {
basicEventTypes := "(" +
strings.Join([]string{
fmt.Sprintf("%s = 'CLICK'", ColEventName),
fmt.Sprintf("%s = 'INPUT'", ColEventName),
fmt.Sprintf("%s = 'LOCATION'", ColEventName),
fmt.Sprintf("%s = 'CUSTOM'", ColEventName),
fmt.Sprintf("%s = 'REQUEST'", ColEventName),
}, " OR ") + ")"
var seq []string
for _, f := range filters {
switch f.Type {
case FilterClick:
seq = append(seq, seqCond("CLICK", "selector", f))
case FilterInput:
seq = append(seq, seqCond("INPUT", "label", f))
case FilterLocation:
seq = append(seq, seqCond("LOCATION", "url_path", f))
case FilterCustom:
seq = append(seq, seqCond("CUSTOM", "name", f))
case FilterFetch:
seq = append(seq, seqFetchCond("REQUEST", f))
case FilterFetchStatusCode:
seq = append(seq, seqCond("REQUEST", "status", f))
default:
seq = append(seq, fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(string(f.Type))))
}
}
eventConditions = []string{basicEventTypes}
// then => sequenceMatch
// or => OR
// and => AND
switch order {
case EventOrderThen:
var pattern []string
for i := range seq {
pattern = append(pattern, fmt.Sprintf("(?%d)", i+1))
}
having = fmt.Sprintf("sequenceMatch('%s')(\n%s,\n%s)",
strings.Join(pattern, ""), fmt.Sprintf("toUnixTimestamp(%s)", ColEventTime), strings.Join(seq, ",\n"))
case EventOrderAnd:
// build AND
having = strings.Join(seq, " AND ")
default:
// default => OR
var orParts []string
for _, p := range seq {
orParts = append(orParts, "("+p+")")
}
having = strings.Join(orParts, " OR ")
}
return
}
func seqCond(eventName, key string, f Filter) string {
op := parseOperator(f.Operator)
return fmt.Sprintf("(%s = '%s' AND JSONExtractString(toString(%s), '%s') %s '%s')",
ColEventName, strings.ToUpper(eventName), ColEventProperties, key, op, concatValues(f.Value))
}
func seqFetchCond(eventName string, f Filter) string {
w := []string{fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(eventName))}
var extras []string
for _, c := range f.Filters {
switch c.Type {
case FilterFetch:
if len(c.Value) > 0 {
extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventURLPath, concatValues(c.Value)))
}
case FilterFetchStatusCode:
if len(c.Value) > 0 {
extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventStatus, concatValues(c.Value)))
}
default:
// placeholder if needed
}
}
if len(extras) > 0 {
w = append(w, strings.Join(extras, " AND "))
}
return "(" + strings.Join(w, " AND ") + ")"
}
func parseOperator(op string) string {
// TODO implement this properly
switch strings.ToLower(op) {
case OperatorStringContains:
return "LIKE"
case OperatorStringIs, OperatorStringOn, "=", OperatorStringOnAny:
return "="
case OperatorStringStartsWith:
return "LIKE"
case OperatorStringEndsWith:
// might interpret differently in real impl
return "="
default:
return "="
}
}

View file

@ -0,0 +1,116 @@
package charts
import (
"fmt"
"log"
"openreplay/backend/pkg/analytics/db"
)
type TimeSeriesQueryBuilder struct{}
func (t TimeSeriesQueryBuilder) Execute(p *Payload, conn db.Connector) (interface{}, error) {
query, err := t.buildQuery(p)
if err != nil {
log.Fatalf("Error building query: %v", err)
return nil, err
}
rows, err := conn.Query(query)
if err != nil {
log.Fatalf("Error executing query: %v", err)
return nil, err
}
defer rows.Close()
var results []DataPoint
for rows.Next() {
var res DataPoint
if err := rows.Scan(&res.Timestamp, &res.Count); err != nil {
return nil, err
}
//sum += res.Count
results = append(results, res)
}
filled := FillMissingDataPoints(p.StartTimestamp, p.EndTimestamp, p.Density, DataPoint{}, results, 1000)
return filled, nil
}
func (t TimeSeriesQueryBuilder) buildQuery(p *Payload) (string, error) {
query := ""
switch p.MetricOf {
case "sessionCount":
query = t.buildSessionCountQuery(p)
case "userCount":
query = t.buildUserCountQuery(p)
default:
query = ""
}
return query, nil
}
func (TimeSeriesQueryBuilder) buildSessionCountQuery(p *Payload) string {
stepSize := int(getStepSize(p.StartTimestamp, p.EndTimestamp, p.Density, false, 1000))
subquery := buildEventSubquery(p)
return fmt.Sprintf(`SELECT toUnixTimestamp(
toStartOfInterval(processed_sessions.datetime, INTERVAL %d second)
) * 1000 AS timestamp,
COUNT(processed_sessions.session_id) AS count
FROM (
%s
) AS processed_sessions
GROUP BY timestamp
ORDER BY timestamp;`, stepSize, subquery)
}
func (TimeSeriesQueryBuilder) buildUserCountQuery(p *Payload) string {
stepSize := int(getStepSize(p.StartTimestamp, p.EndTimestamp, p.Density, false, 1000))
subquery := buildEventSubquery(p)
return fmt.Sprintf(`SELECT toUnixTimestamp(
toStartOfInterval(processed_sessions.datetime, INTERVAL %d second)
) * 1000 AS timestamp,
COUNT(DISTINCT processed_sessions.user_id) AS count
FROM (
%s
) AS processed_sessions
GROUP BY timestamp
ORDER BY timestamp;`, stepSize, subquery)
}
func FillMissingDataPoints(
startTime, endTime int64,
density int,
neutral DataPoint,
rows []DataPoint,
timeCoefficient int64,
) []DataPoint {
if density <= 1 {
return rows
}
stepSize := uint64(getStepSize(startTime, endTime, density, false, 1000))
bucketSize := stepSize * uint64(timeCoefficient)
lookup := make(map[uint64]DataPoint)
for _, dp := range rows {
if dp.Timestamp < uint64(startTime) {
continue
}
bucket := uint64(startTime) + (((dp.Timestamp - uint64(startTime)) / bucketSize) * bucketSize)
lookup[bucket] = dp
}
results := make([]DataPoint, 0, density)
for i := 0; i < density; i++ {
ts := uint64(startTime) + uint64(i)*bucketSize
if dp, ok := lookup[ts]; ok {
results = append(results, dp)
} else {
nd := neutral
nd.Timestamp = ts
results = append(results, nd)
}
}
return results
}

View file

@ -1,57 +1,155 @@
package charts
import "openreplay/backend/pkg/analytics/cards"
type DataPoint struct {
Timestamp int64 `json:"timestamp"`
Series map[string]int64 `json:"series"`
}
type GetCardChartDataRequest struct {
StartTimestamp int64 `json:"startTimestamp" validate:"required"`
EndTimestamp int64 `json:"endTimestamp" validate:"required"`
Density int `json:"density" validate:"required"`
MetricType string `json:"metricType" validate:"required,oneof=timeseries table funnel errors performance resources webVitals pathAnalysis retention stickiness heatMap"`
MetricOf string `json:"metricOf" validate:"required,oneof=sessionCount userCount"`
ViewType string `json:"viewType" validate:"required,oneof=lineChart areaChart barChart pieChart progressChart table metric"`
MetricFormat string `json:"metricFormat" validate:"required,oneof=default percentage"`
SessionID int64 `json:"sessionId"`
Series []cards.CardSeries `json:"series" validate:"required,dive"`
}
type GetCardChartDataResponse struct {
Data []DataPoint `json:"data"`
}
type Table string
type Column string
type MetricType string
type MetricOfTimeseries string
type MetricOfTable string
type FilterType string
type EventType string
type EventOrder string
const (
MetricTypeTimeseries MetricType = "TIMESERIES"
MetricTypeTable MetricType = "TABLE"
MetricOfTimeseriesSessionCount MetricOfTimeseries = "SESSION_COUNT"
MetricOfTimeseriesUserCount MetricOfTimeseries = "USER_COUNT"
MetricOfTableVisitedURL MetricOfTable = "VISITED_URL"
MetricOfTableIssues MetricOfTable = "ISSUES"
MetricOfTableUserCountry MetricOfTable = "USER_COUNTRY"
MetricOfTableUserDevice MetricOfTable = "USER_DEVICE"
MetricOfTableUserBrowser MetricOfTable = "USER_BROWSER"
TableEvents Table = "product_analytics.events"
TableSessions Table = "experimental.sessions"
)
type SessionsSearchPayload struct {
StartTimestamp int64
EndTimestamp int64
Filters []SessionSearchFilter
const (
ColEventTime Column = "main.created_at"
ColEventName Column = "main.`$event_name`"
ColEventProjectID Column = "main.project_id"
ColEventProperties Column = "main.`$properties`"
ColEventSessionID Column = "main.session_id"
ColEventURLPath Column = "main.url_path"
ColEventStatus Column = "main.status"
)
const (
ColSessionID Column = "s.session_id"
ColDuration Column = "s.duration"
ColUserCountry Column = "s.user_country"
ColUserCity Column = "s.user_city"
ColUserState Column = "s.user_state"
ColUserID Column = "s.user_id"
ColUserAnonymousID Column = "s.user_anonymous_id"
ColUserOS Column = "s.user_os"
ColUserBrowser Column = "s.user_browser"
ColUserDevice Column = "s.user_device"
ColUserDeviceType Column = "s.user_device_type"
ColRevID Column = "s.rev_id"
ColBaseReferrer Column = "s.base_referrer"
ColUtmSource Column = "s.utm_source"
ColUtmMedium Column = "s.utm_medium"
ColUtmCampaign Column = "s.utm_campaign"
ColMetadata1 Column = "s.metadata_1"
ColSessionProjectID Column = "s.project_id"
ColSessionIsNotNull Column = "isNotNull(s.duration)"
)
const (
MetricTypeTimeseries MetricType = "timeseries"
MetricTypeTable MetricType = "table"
MetricTypeFunnel MetricType = "funnel"
)
const (
EventOrderThen EventOrder = "then"
EventOrderOr EventOrder = "or"
EventOrderAnd EventOrder = "and"
)
type MetricPayload struct {
StartTimestamp int64 `json:"startTimestamp"`
EndTimestamp int64 `json:"endTimestamp"`
Density int `json:"density"`
MetricOf string `json:"metricOf"`
MetricType MetricType `json:"metricType"`
MetricFormat string `json:"metricFormat"`
ViewType string `json:"viewType"`
Name string `json:"name"`
Series []Series `json:"series"`
}
type SessionSearchFilter struct {
Type FilterType
Value interface{}
Operator SearchEventOperator
type Series struct {
Name string `json:"name"`
Filter struct {
Filters []Filter `json:"filters"`
EventsOrder EventOrder `json:"eventsOrder"`
} `json:"filter"`
}
type SearchEventOperator string // Define constants as needed
type FilterType string // Define constants as needed
type Filter struct {
Type FilterType `json:"type"`
IsEvent bool `json:"isEvent"`
Value []string `json:"value"`
Operator string `json:"operator"`
Filters []Filter `json:"filters"`
}
const (
FilterUserId FilterType = "userId"
FilterUserAnonymousId FilterType = "userAnonymousId"
FilterReferrer FilterType = "referrer"
FilterDuration FilterType = "duration"
FilterUtmSource FilterType = "utmSource"
FilterUtmMedium FilterType = "utmMedium"
FilterUtmCampaign FilterType = "utmCampaign"
FilterUserCountry FilterType = "userCountry"
FilterUserCity FilterType = "userCity"
FilterUserState FilterType = "userState"
FilterUserOs FilterType = "userOs"
FilterUserBrowser FilterType = "userBrowser"
FilterUserDevice FilterType = "userDevice"
FilterPlatform FilterType = "platform"
FilterRevId FilterType = "revId"
FilterIssue FilterType = "issue"
FilterMetadata FilterType = "metadata"
)
// Event filters
const (
FilterClick FilterType = "click"
FilterInput FilterType = "input"
FilterLocation FilterType = "location"
FilterCustom FilterType = "customEvent"
FilterFetch FilterType = "fetch"
FilterFetchStatusCode FilterType = "status"
FilterTag FilterType = "tag"
FilterNetworkRequest FilterType = "fetch"
FilterGraphQLRequest FilterType = "graphql"
FilterStateAction FilterType = "stateAction"
FilterError FilterType = "error"
FilterAvgCpuLoad FilterType = "avgCpuLoad"
FilterAvgMemoryUsage FilterType = "avgMemoryUsage"
)
// MOBILE FILTERS
const (
FilterUserOsIos FilterType = "userOsIos"
FilterUserDeviceIos FilterType = "userDeviceIos"
FilterUserCountryIos FilterType = "userCountryIos"
FilterUserIdIos FilterType = "userIdIos"
FilterUserAnonymousIdIos FilterType = "userAnonymousIdIos"
FilterRevIdIos FilterType = "revIdIos"
)
const (
OperatorStringIs = "is"
OperatorStringIsAny = "isAny"
OperatorStringOn = "on"
OperatorStringOnAny = "onAny"
OperatorStringIsNot = "isNot"
OperatorStringIsUndefined = "isUndefined"
OperatorStringNotOn = "notOn"
OperatorStringContains = "contains"
OperatorStringNotContains = "notContains"
OperatorStringStartsWith = "startsWith"
OperatorStringEndsWith = "endsWith"
)
type DataPoint struct {
Timestamp uint64 `json:"timestamp"`
Count uint64 `json:"count"`
}
//type TimeseriesResponse struct {
// Data []DataPoint `json:"data"`
//}

View file

@ -0,0 +1,150 @@
package charts
import (
"fmt"
"openreplay/backend/pkg/analytics/db"
"strings"
)
type Payload struct {
*MetricPayload
ProjectId int
UserId uint64
}
type QueryBuilder interface {
Execute(p *Payload, conn db.Connector) (interface{}, error)
}
func NewQueryBuilder(p *Payload) (QueryBuilder, error) {
switch p.MetricType {
case MetricTypeTimeseries:
return TimeSeriesQueryBuilder{}, nil
case MetricTypeFunnel:
return FunnelQueryBuilder{}, nil
case MetricTypeTable:
return TableQueryBuilder{}, nil
default:
return nil, fmt.Errorf("unknown metric type: %s", p.MetricType)
}
}
func buildEventSubquery(p *Payload) string {
baseEventsWhere := buildBaseEventsWhere(p)
sequenceCond := buildSequenceCondition(p.Series)
sessionsWhere := buildSessionsWhere(p)
if sequenceCond.seqPattern == "" {
return fmt.Sprintf(`
SELECT s.%[1]s AS %[1]s,
s.datetime AS datetime
FROM (
SELECT main.session_id,
MIN(main.created_at) AS first_event_ts,
MAX(main.created_at) AS last_event_ts
FROM product_analytics.events AS main
WHERE %[2]s
GROUP BY session_id
) AS f
INNER JOIN (
SELECT *
FROM experimental.sessions AS s
WHERE %[3]s
) AS s ON (s.session_id = f.session_id)
`, pickIDField(p), baseEventsWhere, sessionsWhere)
}
return fmt.Sprintf(`
SELECT s.%[1]s AS %[1]s,
s.datetime AS datetime
FROM (
SELECT main.session_id,
MIN(main.created_at) AS first_event_ts,
MAX(main.created_at) AS last_event_ts
FROM product_analytics.events AS main
WHERE %[2]s
GROUP BY session_id
HAVING sequenceMatch('%[3]s')(toDateTime(main.created_at), %[4]s)
) AS f
INNER JOIN (
SELECT *
FROM experimental.sessions AS s
WHERE %[5]s
) AS s ON (s.session_id = f.session_id)
`, pickIDField(p), baseEventsWhere, sequenceCond.seqPattern, sequenceCond.seqEvents, sessionsWhere)
}
func pickIDField(p *Payload) string {
if p.MetricOf == "userCount" {
return "user_id"
}
return "session_id"
}
func buildBaseEventsWhere(p *Payload) string {
ts := fmt.Sprintf(
`(main.created_at >= toDateTime(%d / 1000) AND main.created_at <= toDateTime(%d / 1000))`,
p.StartTimestamp,
p.EndTimestamp,
)
return fmt.Sprintf(`main.project_id = %d AND %s`, p.ProjectId, ts)
}
func buildSessionsWhere(p *Payload) string {
ts := fmt.Sprintf(
`(s.datetime >= toDateTime(%d / 1000) AND s.datetime <= toDateTime(%d / 1000))`,
p.StartTimestamp,
p.EndTimestamp,
)
return fmt.Sprintf(`s.project_id = %d AND isNotNull(s.duration) AND %s`, p.ProjectId, ts)
}
type sequenceParts struct {
seqPattern string
seqEvents string
}
func buildSequenceCondition(series []Series) sequenceParts {
var events []string
for _, s := range series {
if len(s.Filter.Filters) > 0 {
events = append(events, buildOneSeriesSequence(s.Filter.Filters))
}
}
if len(events) < 2 {
return sequenceParts{"", ""}
}
pattern := ""
for i := 1; i <= len(events); i++ {
pattern += fmt.Sprintf("(?%d)", i)
}
return sequenceParts{
seqPattern: pattern,
seqEvents: strings.Join(events, ", "),
}
}
func buildOneSeriesSequence(filters []Filter) string {
return strings.Join(buildFilterConditions(filters), " AND ")
}
func buildFilterConditions(filters []Filter) []string {
var out []string
for _, f := range filters {
switch f.Type {
case FilterClick:
out = append(out,
fmt.Sprintf(`(main."$event_name" = 'CLICK' AND JSONExtractString(toString(main."$properties"), 'label') IN ('%s'))`,
strings.Join(f.Value, "','")))
case FilterInput:
out = append(out,
fmt.Sprintf(`(main."$event_name" = 'INPUT' AND JSONExtractString(toString(main."$properties"), 'label') IN ('%s'))`,
strings.Join(f.Value, "','")))
default:
out = append(out,
fmt.Sprintf(`(main."$event_name" = '%s')`, strings.ToUpper(string(f.Type))))
}
}
return out
}

View file

@ -0,0 +1,64 @@
package db
import (
"context"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"openreplay/backend/internal/config/common"
"time"
)
type TableValue struct {
Name string `json:"name"`
Total uint64 `json:"total"`
}
type TableResponse struct {
Total uint64 `json:"total"`
Count uint64 `json:"count"`
Values []TableValue `json:"values"`
}
type Connector interface {
Stop() error
Query(query string) (driver.Rows, error)
}
type connectorImpl struct {
conn driver.Conn
}
func NewConnector(cfg common.Clickhouse) (Connector, error) {
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{cfg.GetTrimmedURL()},
Auth: clickhouse.Auth{
Database: cfg.Database,
Username: cfg.LegacyUserName,
Password: cfg.LegacyPassword,
},
MaxOpenConns: 20,
MaxIdleConns: 15,
ConnMaxLifetime: 3 * time.Minute,
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
},
})
if err != nil {
return nil, err
}
return &connectorImpl{conn: conn}, nil
}
func (c *connectorImpl) Stop() error {
return c.conn.Close()
}
func (c *connectorImpl) Query(query string) (driver.Rows, error) {
rows, err := c.conn.Query(context.Background(), query)
if err != nil {
return nil, err
}
//defer rows.Close()
return rows, nil
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,7 @@
package query
type FunnelQueryBuilder struct{}
func (f FunnelQueryBuilder) Build(p MetricPayload) string {
return "-- Funnel query placeholder"
}

View file

@ -0,0 +1,137 @@
package query
type Table string
type Column string
type MetricType string
type FilterType string
type EventOrder string
const (
TableEvents Table = "product_analytics.events"
TableSessions Table = "experimental.sessions"
)
const (
ColEventTime Column = "main.created_at"
ColEventName Column = "main.`$event_name`"
ColEventProjectID Column = "main.project_id"
ColEventProperties Column = "main.`$properties`"
ColEventSessionID Column = "main.session_id"
ColEventURLPath Column = "main.url_path"
ColEventStatus Column = "main.status"
)
const (
ColSessionID Column = "s.session_id"
ColDuration Column = "s.duration"
ColUserCountry Column = "s.user_country"
ColUserCity Column = "s.user_city"
ColUserState Column = "s.user_state"
ColUserID Column = "s.user_id"
ColUserAnonymousID Column = "s.user_anonymous_id"
ColUserOS Column = "s.user_os"
ColUserBrowser Column = "s.user_browser"
ColUserDevice Column = "s.user_device"
ColUserDeviceType Column = "s.user_device_type"
ColRevID Column = "s.rev_id"
ColBaseReferrer Column = "s.base_referrer"
ColUtmSource Column = "s.utm_source"
ColUtmMedium Column = "s.utm_medium"
ColUtmCampaign Column = "s.utm_campaign"
ColMetadata1 Column = "s.metadata_1"
ColSessionProjectID Column = "s.project_id"
ColSessionIsNotNull Column = "isNotNull(s.duration)"
)
const (
MetricTypeTimeseries MetricType = "timeseries"
MetricTypeTable MetricType = "table"
MetricTypeFunnel MetricType = "funnel"
)
const (
EventOrderThen EventOrder = "then"
EventOrderOr EventOrder = "or"
EventOrderAnd EventOrder = "and"
)
type MetricPayload struct {
StartTimestamp int64 `json:"startTimestamp"`
EndTimestamp int64 `json:"endTimestamp"`
Density int `json:"density"`
MetricOf string `json:"metricOf"`
MetricType MetricType `json:"metricType"`
MetricFormat string `json:"metricFormat"`
ViewType string `json:"viewType"`
Name string `json:"name"`
Series []Series `json:"series"`
CompareTo *string `json:"compareTo"`
}
type Series struct {
Name string `json:"name"`
Filter struct {
Filters []Filter `json:"filters"`
EventsOrder EventOrder `json:"eventsOrder"`
} `json:"filter"`
}
type Filter struct {
Type FilterType `json:"type"`
IsEvent bool `json:"isEvent"`
Value []string `json:"value"`
Operator string `json:"operator"`
Filters []Filter `json:"filters"`
}
const (
FilterUserOs FilterType = "userOs"
FilterUserBrowser FilterType = "userBrowser"
FilterUserDevice FilterType = "userDevice"
FilterUserCountry FilterType = "userCountry"
FilterUserCity FilterType = "userCity"
FilterUserState FilterType = "userState"
FilterUserId FilterType = "userId"
FilterUserAnonymousId FilterType = "userAnonymousId"
FilterReferrer FilterType = "referrer"
FilterRevId FilterType = "revId"
FilterUserOsIos FilterType = "userOsIos"
FilterUserDeviceIos FilterType = "userDeviceIos"
FilterUserCountryIos FilterType = "userCountryIos"
FilterUserIdIos FilterType = "userIdIos"
FilterUserAnonymousIdIos FilterType = "userAnonymousIdIos"
FilterRevIdIos FilterType = "revIdIos"
FilterDuration FilterType = "duration"
FilterPlatform FilterType = "platform"
FilterMetadata FilterType = "metadata"
FilterIssue FilterType = "issue"
FilterEventsCount FilterType = "eventsCount"
FilterUtmSource FilterType = "utmSource"
FilterUtmMedium FilterType = "utmMedium"
FilterUtmCampaign FilterType = "utmCampaign"
FilterThermalState FilterType = "thermalState"
FilterMainThreadCPU FilterType = "mainThreadCPU"
FilterViewComponent FilterType = "viewComponent"
FilterLogEvent FilterType = "logEvent"
FilterMemoryUsage FilterType = "memoryUsage"
FilterClick FilterType = "click"
FilterInput FilterType = "input"
FilterLocation FilterType = "location"
FilterCustom FilterType = "customEvent"
FilterFetch FilterType = "fetch"
FilterFetchStatusCode FilterType = "status"
)
const (
OperatorStringIs = "is"
OperatorStringIsAny = "isAny"
OperatorStringOn = "on"
OperatorStringOnAny = "onAny"
OperatorStringIsNot = "isNot"
OperatorStringIsUndefined = "isUndefined"
OperatorStringNotOn = "notOn"
OperatorStringContains = "contains"
OperatorStringNotContains = "notContains"
OperatorStringStartsWith = "startsWith"
OperatorStringEndsWith = "endsWith"
)

View file

@ -0,0 +1,253 @@
package query
import (
"encoding/json"
"fmt"
"strings"
)
type NewQueryBuilder interface {
Build(MetricPayload) string
}
func buildEventSubquery(p MetricPayload) string {
baseEventsWhere := buildBaseEventsWhere(p)
sequenceCond := buildSequenceCondition(p.Series)
sessionsWhere := buildSessionsWhere(p)
// If there's no sequence pattern, skip HAVING entirely.
if sequenceCond.seqPattern == "" {
return fmt.Sprintf(`
SELECT s.%[1]s AS %[1]s,
s.datetime AS datetime
FROM (
SELECT main.session_id,
MIN(main.created_at) AS first_event_ts,
MAX(main.created_at) AS last_event_ts
FROM product_analytics.events AS main
WHERE %[2]s
GROUP BY session_id
) AS f
INNER JOIN (
SELECT *
FROM experimental.sessions AS s
WHERE %[3]s
) AS s ON (s.session_id = f.session_id)
`, pickIDField(p), baseEventsWhere, sessionsWhere)
}
return fmt.Sprintf(`
SELECT s.%[1]s AS %[1]s,
s.datetime AS datetime
FROM (
SELECT main.session_id,
MIN(main.created_at) AS first_event_ts,
MAX(main.created_at) AS last_event_ts
FROM product_analytics.events AS main
WHERE %[2]s
GROUP BY session_id
HAVING sequenceMatch('%[3]s')(toDateTime(main.created_at), %[4]s)
) AS f
INNER JOIN (
SELECT *
FROM experimental.sessions AS s
WHERE %[5]s
) AS s ON (s.session_id = f.session_id)
`, pickIDField(p), baseEventsWhere, sequenceCond.seqPattern, sequenceCond.seqEvents, sessionsWhere)
}
func pickIDField(p MetricPayload) string {
if p.MetricOf == "userCount" {
return "user_id"
}
return "session_id"
}
func buildBaseEventsWhere(p MetricPayload) string {
projectID := 5
ts := fmt.Sprintf(
`(main.created_at >= toDateTime(%d / 1000) AND main.created_at <= toDateTime(%d / 1000))`,
p.StartTimestamp,
p.EndTimestamp,
)
return fmt.Sprintf(`main.project_id = %d AND %s`, projectID, ts)
}
func buildSessionsWhere(p MetricPayload) string {
projectID := 5
ts := fmt.Sprintf(
`(s.datetime >= toDateTime(%d / 1000) AND s.datetime <= toDateTime(%d / 1000))`,
p.StartTimestamp,
p.EndTimestamp,
)
return fmt.Sprintf(`s.project_id = %d AND isNotNull(s.duration) AND %s`, projectID, ts)
}
type sequenceParts struct {
seqPattern string
seqEvents string
}
func buildSequenceCondition(series []Series) sequenceParts {
var events []string
for _, s := range series {
if len(s.Filter.Filters) > 0 {
events = append(events, buildOneSeriesSequence(s.Filter.Filters))
}
}
if len(events) == 0 {
return sequenceParts{"", ""}
}
// For n events, we need a pattern like `(?1)(?2)(?3)...( ?n )`.
pattern := ""
for i := 1; i <= len(events); i++ {
pattern += fmt.Sprintf("(?%d)", i)
}
return sequenceParts{
seqPattern: pattern,
seqEvents: strings.Join(events, ", "),
}
}
func buildOneSeriesSequence(filters []Filter) string {
return strings.Join(buildFilterConditions(filters), " AND ")
}
func buildFilterConditions(filters []Filter) []string {
var out []string
for _, f := range filters {
switch f.Type {
case FilterClick:
out = append(out,
fmt.Sprintf(`(main."$event_name" = 'CLICK' AND JSONExtractString(toString(main."$properties"), 'label') IN ('%s'))`,
strings.Join(f.Value, "','")))
case FilterInput:
out = append(out,
fmt.Sprintf(`(main."$event_name" = 'INPUT' AND JSONExtractString(toString(main."$properties"), 'label') IN ('%s'))`,
strings.Join(f.Value, "','")))
// TODO add more cases to cover all the events
default:
out = append(out,
fmt.Sprintf(`(main."$event_name" = '%s')`, strings.ToUpper(string(f.Type))))
}
}
return out
}
func main() {
//input := GetPayload(MetricTypeTimeseries)
input := GetPayload(MetricTypeTable)
var payload MetricPayload
err := json.Unmarshal([]byte(input), &payload)
if err != nil {
return
}
var qb NewQueryBuilder
switch payload.MetricType {
case MetricTypeTimeseries:
qb = TimeSeriesQueryBuilder{}
case MetricTypeFunnel:
qb = FunnelQueryBuilder{}
case MetricTypeTable:
qb = TableQueryBuilder{}
default:
qb = TimeSeriesQueryBuilder{}
}
query := qb.Build(payload)
fmt.Println(query)
}
func GetPayload(metricType MetricType) string {
switch metricType {
case MetricTypeTimeseries:
return `{
"startTimestamp": 1738796399999,
"endTimestamp": 1739401199999,
"density": 7,
"metricOf": "sessionCount",
"metricValue": [],
"metricType": "timeseries",
"metricFormat": "sessionCount",
"viewType": "lineChart",
"name": "Untitled Trend",
"series": [
{
"name": "Series 1",
"filter": {
"filters": [
{
"type": "userId",
"isEvent": false,
"value": [
"test@test.com"
],
"operator": "is",
"filters": []
}
],
"eventsOrder": "then"
}
}
]
}`
case MetricTypeFunnel:
return `{}`
case MetricTypeTable:
return `{
"startTimestamp": 1737586800000,
"endTimestamp": 1738277999999,
"density": 7,
"metricOf": "userDevice",
"metricType": "table",
"metricFormat": "sessionCount",
"viewType": "table",
"name": "Untitled Trend",
"series": [
{
"name": "Series 1",
"filter": {
"filters": [
{
"type": "click",
"isEvent": true,
"value": ["Manuscripts"],
"operator": "on",
"filters": []
}
],
"eventsOrder": "then"
}
},
{
"name": "Series 2",
"filter": {
"filters": [
{
"type": "input",
"isEvent": true,
"value": ["test"],
"operator": "is",
"filters": []
}
],
"eventsOrder": "then"
}
}
],
"page": 1,
"limit": 20,
"compareTo": null,
"config": {
"col": 2
}
}`
default:
return `{}`
}
}

View file

@ -0,0 +1,252 @@
package query
import (
"fmt"
"strings"
)
type TableQueryBuilder struct{}
func (t TableQueryBuilder) Build(p MetricPayload) string {
return t.buildQuery(p)
}
func (t TableQueryBuilder) buildQuery(r MetricPayload) string {
s := r.Series[0]
sessionFilters, eventFilters := partitionFilters(s.Filter.Filters)
sessionWhere := buildSessionWhere(sessionFilters)
eventWhere, seqHaving := buildEventsWhere(eventFilters, s.Filter.EventsOrder)
subQuery := fmt.Sprintf(
"SELECT %s,\n"+
" MIN(%s) AS first_event_ts,\n"+
" MAX(%s) AS last_event_ts\n"+
"FROM %s AS main\n"+
"WHERE main.project_id = %%(project_id)s\n"+
" AND %s >= toDateTime(%%(start_time)s/1000)\n"+
" AND %s <= toDateTime(%%(end_time)s/1000)\n"+
" AND (%s)\n"+
"GROUP BY %s\n"+
"HAVING %s",
ColEventSessionID,
ColEventTime,
ColEventTime,
TableEvents,
ColEventTime,
ColEventTime,
strings.Join(eventWhere, " OR "),
ColEventSessionID,
seqHaving,
)
joinQuery := fmt.Sprintf(
"SELECT *\n"+
"FROM %s AS s\n"+
"INNER JOIN (\n"+
" SELECT DISTINCT ev.session_id, ev.`$current_url` AS url_path\n"+
" FROM %s AS ev\n"+
" WHERE ev.created_at >= toDateTime(%%(start_time)s/1000)\n"+
" AND ev.created_at <= toDateTime(%%(end_time)s/1000)\n"+
" AND ev.project_id = %%(project_id)s\n"+
" AND ev.`$event_name` = 'LOCATION'\n"+
") AS extra_event USING (session_id)\n"+
"WHERE s.project_id = %%(project_id)s\n"+
" AND isNotNull(s.duration)\n"+
" AND s.datetime >= toDateTime(%%(start_time)s/1000)\n"+
" AND s.datetime <= toDateTime(%%(end_time)s/1000)\n",
TableSessions,
TableEvents,
)
if len(sessionWhere) > 0 {
joinQuery += " AND " + strings.Join(sessionWhere, " AND ") + "\n"
}
main := fmt.Sprintf(
"SELECT s.session_id AS session_id, s.url_path\n"+
"FROM (\n%s\n) AS f\n"+
"INNER JOIN (\n%s) AS s\n"+
" ON (s.session_id = f.session_id)\n",
subQuery,
joinQuery,
)
final := fmt.Sprintf(
"SELECT COUNT(DISTINCT url_path) OVER () AS main_count,\n"+
" url_path AS name,\n"+
" COUNT(DISTINCT session_id) AS total,\n"+
" COALESCE(SUM(COUNT(DISTINCT session_id)) OVER (), 0) AS total_count\n"+
"FROM (\n%s) AS filtered_sessions\n"+
"GROUP BY url_path\n"+
"ORDER BY total DESC\n"+
"LIMIT 200 OFFSET 0;",
main,
)
return final
}
func partitionFilters(filters []Filter) (sessionFilters []Filter, eventFilters []Filter) {
for _, f := range filters {
if f.IsEvent {
eventFilters = append(eventFilters, f)
} else {
sessionFilters = append(sessionFilters, f)
}
}
return
}
func buildSessionWhere(filters []Filter) []string {
var conds []string
for _, f := range filters {
switch f.Type {
case FilterUserCountry:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCountry, concatValues(f.Value)))
case FilterUserCity:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCity, concatValues(f.Value)))
case FilterUserState:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserState, concatValues(f.Value)))
case FilterUserId:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserID, concatValues(f.Value)))
case FilterUserAnonymousId:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserAnonymousID, concatValues(f.Value)))
case FilterUserOs:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserOS, concatValues(f.Value)))
case FilterUserBrowser:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserBrowser, concatValues(f.Value)))
case FilterUserDevice:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDevice, concatValues(f.Value)))
case FilterPlatform:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDeviceType, concatValues(f.Value)))
case FilterRevId:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColRevID, concatValues(f.Value)))
case FilterReferrer:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColBaseReferrer, concatValues(f.Value)))
case FilterDuration:
if len(f.Value) == 2 {
conds = append(conds, fmt.Sprintf("%s >= '%s'", ColDuration, f.Value[0]))
conds = append(conds, fmt.Sprintf("%s <= '%s'", ColDuration, f.Value[1]))
}
case FilterUtmSource:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmSource, concatValues(f.Value)))
case FilterUtmMedium:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmMedium, concatValues(f.Value)))
case FilterUtmCampaign:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmCampaign, concatValues(f.Value)))
case FilterMetadata:
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColMetadata1, concatValues(f.Value)))
}
}
// adding /n to each condition for better readability, can be removed.
for i := range conds {
conds[i] += "\n"
}
return conds
}
func concatValues(v []string) string {
return strings.Join(v, "")
}
func buildEventsWhere(filters []Filter, order EventOrder) (eventConditions []string, having string) {
basicEventTypes := "(" +
strings.Join([]string{
fmt.Sprintf("%s = 'CLICK'", ColEventName),
fmt.Sprintf("%s = 'INPUT'", ColEventName),
fmt.Sprintf("%s = 'LOCATION'", ColEventName),
fmt.Sprintf("%s = 'CUSTOM'", ColEventName),
fmt.Sprintf("%s = 'REQUEST'", ColEventName),
}, " OR ") + ")"
var seq []string
for _, f := range filters {
switch f.Type {
case FilterClick:
seq = append(seq, seqCond("CLICK", "selector", f))
case FilterInput:
seq = append(seq, seqCond("INPUT", "label", f))
case FilterLocation:
seq = append(seq, seqCond("LOCATION", "url_path", f))
case FilterCustom:
seq = append(seq, seqCond("CUSTOM", "name", f))
case FilterFetch:
seq = append(seq, seqFetchCond("REQUEST", f))
case FilterFetchStatusCode:
seq = append(seq, seqCond("REQUEST", "status", f))
default:
seq = append(seq, fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(string(f.Type))))
}
}
eventConditions = []string{basicEventTypes}
// then => sequenceMatch
// or => OR
// and => AND
switch order {
case EventOrderThen:
var pattern []string
for i := range seq {
pattern = append(pattern, fmt.Sprintf("(?%d)", i+1))
}
having = fmt.Sprintf("sequenceMatch('%s')(\n%s,\n%s)",
strings.Join(pattern, ""), fmt.Sprintf("toUnixTimestamp(%s)", ColEventTime), strings.Join(seq, ",\n"))
case EventOrderAnd:
// build AND
having = strings.Join(seq, " AND ")
default:
// default => OR
var orParts []string
for _, p := range seq {
orParts = append(orParts, "("+p+")")
}
having = strings.Join(orParts, " OR ")
}
return
}
func seqCond(eventName, key string, f Filter) string {
op := parseOperator(f.Operator)
return fmt.Sprintf("(%s = '%s' AND JSONExtractString(toString(%s), '%s') %s '%s')",
ColEventName, strings.ToUpper(eventName), ColEventProperties, key, op, concatValues(f.Value))
}
func seqFetchCond(eventName string, f Filter) string {
w := []string{fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(eventName))}
var extras []string
for _, c := range f.Filters {
switch c.Type {
case FilterFetch:
if len(c.Value) > 0 {
extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventURLPath, concatValues(c.Value)))
}
case FilterFetchStatusCode:
if len(c.Value) > 0 {
extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventStatus, concatValues(c.Value)))
}
default:
// placeholder if needed
}
}
if len(extras) > 0 {
w = append(w, strings.Join(extras, " AND "))
}
return "(" + strings.Join(w, " AND ") + ")"
}
func parseOperator(op string) string {
// TODO implement this properly
switch strings.ToLower(op) {
case OperatorStringContains:
return "LIKE"
case OperatorStringIs, OperatorStringOn, "=", OperatorStringOnAny:
return "="
case OperatorStringStartsWith:
return "LIKE"
case OperatorStringEndsWith:
// might interpret differently in real impl
return "="
default:
return "="
}
}

View file

@ -0,0 +1,42 @@
package query
import "fmt"
type TimeSeriesQueryBuilder struct{}
func (t TimeSeriesQueryBuilder) Build(p MetricPayload) string {
switch p.MetricOf {
case "sessionCount":
return t.buildSessionCountQuery(p)
case "userCount":
return t.buildUserCountQuery(p)
default:
return ""
}
}
func (TimeSeriesQueryBuilder) buildSessionCountQuery(p MetricPayload) string {
subquery := buildEventSubquery(p)
return fmt.Sprintf(`SELECT toUnixTimestamp(
toStartOfInterval(processed_sessions.datetime, INTERVAL 115199 second)
) * 1000 AS timestamp,
COUNT(processed_sessions.session_id) AS count
FROM (
%s
) AS processed_sessions
GROUP BY timestamp
ORDER BY timestamp;`, subquery)
}
func (TimeSeriesQueryBuilder) buildUserCountQuery(p MetricPayload) string {
subquery := buildEventSubquery(p)
return fmt.Sprintf(`SELECT toUnixTimestamp(
toStartOfInterval(processed_sessions.datetime, INTERVAL 115199 second)
) * 1000 AS timestamp,
COUNT(DISTINCT processed_sessions.user_id) AS count
FROM (
%s
) AS processed_sessions
GROUP BY timestamp
ORDER BY timestamp;`, subquery)
}