diff --git a/backend/pkg/analytics/cards/handlers.go b/backend/pkg/analytics/cards/handlers.go index f0cf16d02..a47c1153a 100644 --- a/backend/pkg/analytics/cards/handlers.go +++ b/backend/pkg/analytics/cards/handlers.go @@ -46,6 +46,7 @@ func (e *handlersImpl) GetAll() []*api.Description { {"/v1/analytics/{projectId}/cards/{id}", e.getCard, "GET"}, {"/v1/analytics/{projectId}/cards/{id}", e.updateCard, "PUT"}, {"/v1/analytics/{projectId}/cards/{id}", e.deleteCard, "DELETE"}, + {"/v1/analytics/{projectId}/cards/{id}/sessions", e.getCardSessions, "POST"}, } } @@ -296,3 +297,8 @@ func (e *handlersImpl) deleteCard(w http.ResponseWriter, r *http.Request) { e.responser.ResponseWithJSON(e.log, r.Context(), w, nil, startTime, r.URL.Path, bodySize) } + +func (e *handlersImpl) getCardSessions(w http.ResponseWriter, r *http.Request) { + // TODO: implement this + e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusNotImplemented, fmt.Errorf("not implemented"), time.Now(), r.URL.Path, 0) +} diff --git a/backend/pkg/analytics/cards/model.go b/backend/pkg/analytics/cards/model.go index 5ab4144f0..0e88dfbf5 100644 --- a/backend/pkg/analytics/cards/model.go +++ b/backend/pkg/analytics/cards/model.go @@ -192,3 +192,34 @@ func (s *CardListSort) GetSQLField() string { func (s *CardListSort) GetSQLOrder() string { return strings.ToUpper(s.Order) } + +// --- + +/* +class IssueType(str, Enum): + + CLICK_RAGE = 'click_rage' + DEAD_CLICK = 'dead_click' + EXCESSIVE_SCROLLING = 'excessive_scrolling' + BAD_REQUEST = 'bad_request' + MISSING_RESOURCE = 'missing_resource' + MEMORY = 'memory' + CPU = 'cpu' + SLOW_RESOURCE = 'slow_resource' + SLOW_PAGE_LOAD = 'slow_page_load' + CRASH = 'crash' + CUSTOM = 'custom' + JS_EXCEPTION = 'js_exception' + MOUSE_THRASHING = 'mouse_thrashing' + # IOS + TAP_RAGE = 'tap_rage' +*/ +type IssueType string +type ChartData struct { + StartTs uint64 `json:"startTs"` + EndTs uint64 `json:"endTs"` + Density uint64 `json:"density"` + Filters []FilterItem `json:"filter"` + MetricOf string `json:"metricOf"` + MetricValue []IssueType `json:"metricValue"` +} diff --git a/backend/pkg/analytics/charts/charts.go b/backend/pkg/analytics/charts/charts.go index 1916695fa..bf242f6b4 100644 --- a/backend/pkg/analytics/charts/charts.go +++ b/backend/pkg/analytics/charts/charts.go @@ -3,6 +3,8 @@ package charts import ( "encoding/json" "fmt" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "openreplay/backend/pkg/analytics/cards" "openreplay/backend/pkg/db/postgres/pool" "openreplay/backend/pkg/logger" @@ -15,6 +17,7 @@ type Charts interface { type chartsImpl struct { log logger.Logger pgconn pool.Pool + chConn driver.Conn } func New(log logger.Logger, conn pool.Pool) (Charts, error) { @@ -24,7 +27,39 @@ func New(log logger.Logger, conn pool.Pool) (Charts, error) { }, nil } +// def get_chart() func (s *chartsImpl) GetData(projectId int, userID uint64, req *GetCardChartDataRequest) ([]DataPoint, error) { + if req == nil { + return nil, fmt.Errorf("request is empty") + } + switch { + case req.MetricType == "funnel": + return nil, fmt.Errorf("funnel metric type is not supported yet") + case req.MetricType == "heatMap": + return nil, fmt.Errorf("heatMap metric type is not supported yet") + case req.MetricType == "pathAnalysis": + return nil, fmt.Errorf("pathAnalysis metric type is not supported yet") + + case req.MetricType == "timeseries": + return s.getTimeseriesCharts(projectId, userID, req) + case req.MetricType == "table": + return nil, fmt.Errorf("table metric type is not supported yet") + + case req.MetricType == "errors": + fallthrough + case req.MetricType == "performance": + fallthrough + case req.MetricType == "resources": + fallthrough + case req.MetricType == "webVitals": + return s.getMetric(projectId, userID, req) + + case req.MetricType == "retention": + return nil, fmt.Errorf("retention metric type is not supported yet") + case req.MetricType == "stickiness": + return nil, fmt.Errorf("stickiness metric type is not supported yet") + + } jsonInput := ` { "data": [ @@ -48,3 +83,40 @@ func (s *chartsImpl) GetData(projectId int, userID uint64, req *GetCardChartData return resp.Data, nil } + +func (s *chartsImpl) getMetric(projectID int, userID uint64, req *GetCardChartDataRequest) ([]DataPoint, error) { + switch req.MetricOf { + case "countSessions": // metrics.get_processed_sessions + return nil, fmt.Errorf("countSessions metric type is not supported yet") + case "avgVisitedPages": // metrics.get_user_activity_avg_visited_pages + return nil, fmt.Errorf("avgVisitedPages metric type is not supported yet") + case "countRequests": // metrics.get_top_metrics_count_requests + return nil, fmt.Errorf("countRequests metric type is not supported yet") + case "impactedSessionsByJsErrors": // metrics.get_impacted_sessions_by_js_errors + return nil, fmt.Errorf("impactedSessionsByJsErrors metric type is not supported yet") + case "domainsErrors4xx": // metrics.get_domains_errors_4xx + return nil, fmt.Errorf("domainsErrors4xx metric type is not supported yet") + case "domainsErrors5xx": // metrics.get_domains_errors_5xx + return nil, fmt.Errorf("domainsErrors5xx metric type is not supported yet") + case "errorsPerDomains": // metrics.get_errors_per_domains + return nil, fmt.Errorf("errorsPerDomains metric type is not supported yet") + case "errorsPerType": // metrics.get_errors_per_type + return nil, fmt.Errorf("errorsPerType metric type is not supported yet") + + } + return nil, fmt.Errorf("metric type is not supported yet") + +} + +func (s *chartsImpl) getTimeseriesCharts(projectID int, userID uint64, req *GetCardChartDataRequest) ([]DataPoint, error) { + charts := []interface{}{} + for _, series := range req.Series { + res, err := s.searchSeries(projectID, series) + if err != nil { + return nil, err + } + charts = append(charts, res) + } + results := []interface{}{} + return results, nil +} diff --git a/backend/pkg/analytics/charts/counters.go b/backend/pkg/analytics/charts/counters.go new file mode 100644 index 000000000..92a664a0a --- /dev/null +++ b/backend/pkg/analytics/charts/counters.go @@ -0,0 +1,364 @@ +package charts + +import ( + "context" + "fmt" + "log" + "strconv" + "strings" +) + +type Fields map[string]string + +func getSessionMetaFields() Fields { + return Fields{ + "revId": "rev_id", + "country": "user_country", + "os": "user_os", + "platform": "user_device_type", + "device": "user_device", + "browser": "user_browser", + } +} + +func getMetadataFields() Fields { + return Fields{ + "userId": "user_id", + "userAnonymousId": "user_anonymous_id", + "metadata1": "metadata_1", + "metadata2": "metadata_2", + "metadata3": "metadata_3", + "metadata4": "metadata_4", + "metadata5": "metadata_5", + "metadata6": "metadata_6", + "metadata7": "metadata_7", + "metadata8": "metadata_8", + "metadata9": "metadata_9", + "metadata10": "metadata_10", + } +} + +func getStepSize(startTimestamp, endTimestamp, density uint64, decimal bool, factor uint64) float64 { + stepSize := (endTimestamp / factor) - (startTimestamp / factor) // TODO: should I use float64 here? + if !decimal { + density-- + } + return float64(stepSize) / float64(density) +} + +func getBasicConstraints(tableName string, timeConstraint, roundStart bool, data map[string]interface{}, identifier string) []string { // Если tableName не пустая, добавляем точку + if tableName != "" { + tableName += "." + } + chSubQuery := []string{fmt.Sprintf("%s%s = toUInt16(:%s)", tableName, identifier, identifier)} + + if timeConstraint { + if roundStart { + chSubQuery = append(chSubQuery, fmt.Sprintf("toStartOfInterval(%sdatetime, INTERVAL :step_size second) >= toDateTime(:startTimestamp/1000)", tableName)) + } else { + chSubQuery = append(chSubQuery, fmt.Sprintf("%sdatetime >= toDateTime(:startTimestamp/1000)", tableName)) + } + chSubQuery = append(chSubQuery, fmt.Sprintf("%sdatetime < toDateTime(:endTimestamp/1000)", tableName)) + } + return append(chSubQuery, getGenericConstraint(data, tableName)...) +} + +func getGenericConstraint(data map[string]interface{}, tableName string) []string { + return getConstraint(data, getSessionMetaFields(), tableName) +} + +func getConstraint(data map[string]interface{}, fields Fields, tableName string) []string { + var constraints []string + filters, err := data["filters"].([]map[string]interface{}) + if !err { + log.Println("error getting filters from data") + filters = make([]map[string]interface{}, 0) // to skip the next block + } + + // process filters + for i, f := range filters { + key, _ := f["key"].(string) + value, _ := f["value"].(string) + + if field, ok := fields[key]; ok { + if value == "*" || value == "" { + constraints = append(constraints, fmt.Sprintf("isNotNull(%s%s)", tableName, field)) + } else { + // constraints.append(f"{table_name}{fields[f['key']]} = %({f['key']}_{i})s") + constraints = append(constraints, fmt.Sprintf("%s%s = %%(%s_%d)s", tableName, field, key, i)) // TODO: where we'll keep the value? + } + } + } + + // TODO from Python: remove this in next release + offset := len(filters) + for i, f := range data { + key, _ := f.(string) + value, _ := data[key].(string) + + if field, ok := fields[key]; ok { + if value == "*" || value == "" { + constraints = append(constraints, fmt.Sprintf("isNotNull(%s%s)", tableName, field)) + } else { + intI, err := strconv.Atoi(i) + if err != nil { + log.Printf("error converting data[k] to int: %v", err) + continue + } else { + constraints = append(constraints, fmt.Sprintf("%s%s = %%(%s_%d)s", tableName, field, f, intI+offset)) + } + } + } + } + return constraints +} + +func getMetaConstraint(data map[string]interface{}) []string { + return getConstraint(data, getMetadataFields(), "sessions_metadata.") +} + +func getConstraintValues(data map[string]interface{}) map[string]interface{} { + params := make(map[string]interface{}) + + if filters, ok := data["filters"].([]map[string]interface{}); ok { + for i, f := range filters { + key, _ := f["key"].(string) + value := f["value"] + params[fmt.Sprintf("%s_%d", key, i)] = value + } + + // TODO from Python: remove this in next release + offset := len(data["filters"].([]map[string]interface{})) + i := 0 + for k, v := range data { + params[fmt.Sprintf("%s_%d", k, i+offset)] = v + i++ + } + } + + return params +} + +/* +def get_main_sessions_table(timestamp=0): + + return "experimental.sessions_l7d_mv" \ + if config("EXP_7D_MV", cast=bool, default=True) \ + and timestamp and timestamp >= TimeUTC.now(delta_days=-7) else "experimental.sessions" +*/ +func getMainSessionsTable(timestamp uint64) string { + return "experimental.sessions" +} + +// Function to convert named parameters to positional parameters +func replaceNamedParams(query string, params map[string]interface{}) (string, []interface{}) { + var args []interface{} + i := 1 + for key, val := range params { + placeholder := ":" + key + //query = strings.Replace(query, placeholder, "?", 1) + strVal := fmt.Sprintf("%v", val) + query = strings.Replace(query, placeholder, strVal, -1) + args = append(args, val) + i++ + } + return query, args +} + +// Helper function to generate a range of floats +func frange(start, end, step float64) []float64 { + var rangeValues []float64 + for i := start; i < end; i += step { + rangeValues = append(rangeValues, i) + } + return rangeValues +} + +// Helper function to add missing keys from the "complete" map to the "original" map +func addMissingKeys(original, complete map[string]interface{}) map[string]interface{} { + for k, v := range complete { + if _, exists := original[k]; !exists { + original[k] = v + } + } + return original +} + +// CompleteMissingSteps fills in missing steps in the data +func CompleteMissingSteps( + startTime, endTime uint64, + density int, + neutral map[string]interface{}, + rows []map[string]interface{}, + timeKey string, + timeCoefficient int64, +) []map[string]interface{} { + if len(rows) == density { + return rows + } + + // Calculate the step size + step := getStepSize(startTime, endTime, uint64(density), true, 1000) + optimal := make([][2]uint64, 0) + for _, i := range frange(float64(startTime)/float64(timeCoefficient), float64(endTime)/float64(timeCoefficient), step) { + startInterval := uint64(i * float64(timeCoefficient)) + endInterval := uint64((i + step) * float64(timeCoefficient)) + optimal = append(optimal, [2]uint64{startInterval, endInterval}) + } + + var result []map[string]interface{} + r, o := 0, 0 + + // Iterate over density + for i := 0; i < density; i++ { + // Clone the neutral map + neutralClone := make(map[string]interface{}) + for k, v := range neutral { + if fn, ok := v.(func() interface{}); ok { + neutralClone[k] = fn() + } else { + neutralClone[k] = v + } + } + + // If we can just add the rest of the rows to result + if r < len(rows) && len(result)+len(rows)-r == density { + result = append(result, rows[r:]...) + break + } + + // Determine where the current row fits within the optimal intervals + if r < len(rows) && o < len(optimal) && rows[r][timeKey].(uint64) < optimal[o][0] { + rows[r] = addMissingKeys(rows[r], neutralClone) + result = append(result, rows[r]) + r++ + } else if r < len(rows) && o < len(optimal) && optimal[o][0] <= rows[r][timeKey].(uint64) && rows[r][timeKey].(uint64) < optimal[o][1] { + rows[r] = addMissingKeys(rows[r], neutralClone) + result = append(result, rows[r]) + r++ + o++ + } else { + neutralClone[timeKey] = optimal[o][0] + result = append(result, neutralClone) + o++ + } + } + return result +} + +func progress(oldVal, newVal uint64) float64 { + if newVal > 0 { + return (float64(oldVal-newVal) / float64(newVal)) * 100 + } + if oldVal == 0 { + return 0 + } + return 100 +} + +// Trying to find a common part +func parse(projectID uint64, startTs, endTs uint64, density uint64, args map[string]interface{}) ([]string, []string, map[string]interface{}) { + stepSize := getStepSize(startTs, endTs, density, false, 1000) + chSubQuery := getBasicConstraints("sessions", true, false, args, "project_id") + chSubQueryChart := getBasicConstraints("sessions", true, true, args, "project_id") + metaCondition := getMetaConstraint(args) + chSubQuery = append(chSubQuery, metaCondition...) + chSubQueryChart = append(chSubQueryChart, metaCondition...) + + params := map[string]interface{}{ + "step_size": stepSize, + "project_id": projectID, + "startTimestamp": startTs, + "endTimestamp": endTs, + } + for k, v := range getConstraintValues(args) { + params[k] = v + } + return chSubQuery, chSubQueryChart, params +} + +// Sessions trend +func (c *chartsImpl) getProcessedSessions(projectID uint64, startTs, endTs uint64, density uint64, args map[string]interface{}) { + chQuery := ` + SELECT toUnixTimestamp(toStartOfInterval(sessions.datetime, INTERVAL :step_size second)) * 1000 AS timestamp, + COUNT(DISTINCT sessions.session_id) AS value + FROM :main_sessions_table AS sessions + WHERE :sub_query_chart + GROUP BY timestamp + ORDER BY timestamp; + ` + chSubQuery, chSubQueryChart, params := parse(projectID, startTs, endTs, density, args) + + chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1) + chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQueryChart, " AND "), -1) + + preparedQuery, preparedArgs := replaceNamedParams(chQuery, params) + rows, err := c.chConn.Query(context.Background(), preparedQuery, preparedArgs) + if err != nil { + log.Fatalf("Error executing query: %v", err) + } + preparedRows := make([]map[string]interface{}, 0) + var sum uint64 + for rows.Next() { + var timestamp, value uint64 + if err := rows.Scan(×tamp, &value); err != nil { + log.Fatalf("Error scanning row: %v", err) + } + fmt.Printf("Timestamp: %d, Value: %d\n", timestamp, value) + sum += value + preparedRows = append(preparedRows, map[string]interface{}{"timestamp": timestamp, "value": value}) + } + + results := map[string]interface{}{ + "value": sum, + "chart": CompleteMissingSteps(startTs, endTs, int(density), map[string]interface{}{"value": 0}, preparedRows, "timestamp", 1000), + } + + diff := endTs - startTs + endTs = startTs + startTs = endTs - diff + + log.Println(results) + + chQuery = fmt.Sprintf(` + SELECT COUNT(1) AS count + FROM :main_sessions_table AS sessions + WHERE :sub_query_chart; + `) + chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1) + chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQuery, " AND "), -1) + + var count uint64 + + preparedQuery, preparedArgs = replaceNamedParams(chQuery, params) + if err := c.chConn.QueryRow(context.Background(), preparedQuery, preparedArgs).Scan(&count); err != nil { + log.Fatalf("Error executing query: %v", err) + } + + results["progress"] = progress(count, results["value"].(uint64)) + + // TODO: this should be returned in any case + results["unit"] = "COUNT" + fmt.Println(results) +} + +// Users trend +//func getUniqueUsers(projectID uint64, startTs, endTs uint64, density uint64, args map[string]interface{}) { +// chQuery := ` +// SELECT toUnixTimestamp(toStartOfInterval(sessions.datetime, INTERVAL :step_size second)) * 1000 AS timestamp, +// COUNT(DISTINCT sessions.user_id) AS value +// FROM :main_sessions_table AS sessions +// WHERE :sub_query_chart +// GROUP BY timestamp +// ORDER BY timestamp; +// ` +// chSubQuery, chSubQueryChart, params := parse(projectID, startTs, endTs, density, args) +// chSubQueryChart = append(chSubQueryChart, []string{"isNotNull(sessions.user_id)", "sessions.user_id!=''"}...) +// +// chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1) +// chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQueryChart, " AND "), -1) +// +// preparedQuery, preparedArgs := replaceNamedParams(chQuery, params) +// +// return +//} diff --git a/backend/pkg/analytics/charts/handlers.go b/backend/pkg/analytics/charts/handlers.go index 771732b43..2b35c5b3c 100644 --- a/backend/pkg/analytics/charts/handlers.go +++ b/backend/pkg/analytics/charts/handlers.go @@ -41,8 +41,9 @@ type handlersImpl struct { func (e *handlersImpl) GetAll() []*api.Description { return []*api.Description{ - {"/v1/analytics/{projectId}/cards/{id}/chart", e.getCardChartData, "POST"}, + {"/v1/analytics/{projectId}/cards/{id}/chart", e.getCardChartData, "POST"}, // for dashboards {"/v1/analytics/{projectId}/cards/{id}/try", e.getCardChartData, "POST"}, + {"/v1/analytics/{projectId}/cards/try", e.getCardChartData, "POST"}, // for cards itself } } diff --git a/backend/pkg/analytics/charts/model.go b/backend/pkg/analytics/charts/model.go index 81016c7e6..399fd531c 100644 --- a/backend/pkg/analytics/charts/model.go +++ b/backend/pkg/analytics/charts/model.go @@ -8,9 +8,9 @@ type DataPoint struct { } type GetCardChartDataRequest struct { - MetricType string `json:"metricType" validate:"required,oneof=timeseries table funnel"` - MetricOf string `json:"metricOf" validate:"required,oneof=session_count user_count"` - ViewType string `json:"viewType" validate:"required,oneof=line_chart table_view"` + MetricType string `json:"metricType" validate:"required,oneof=timeseries table funnel errors performance resources webVitals pathAnalysis retention stickiness heatMap"` + MetricOf string `json:"metricOf" validate:"required,oneof=sessionCount userCount"` + ViewType string `json:"viewType" validate:"required,oneof=lineChart areaChart barChart pieChart progressChart table metric"` MetricFormat string `json:"metricFormat" validate:"required,oneof=default percentage"` SessionID int64 `json:"sessionId"` Series []cards.CardSeries `json:"series" validate:"required,dive"`