feat(analytics): session/user trends

This commit is contained in:
Alexander 2025-01-22 15:52:39 +01:00 committed by Shekar Siri
parent 98c82aa126
commit 25841f26a1
6 changed files with 478 additions and 4 deletions

View file

@ -46,6 +46,7 @@ func (e *handlersImpl) GetAll() []*api.Description {
{"/v1/analytics/{projectId}/cards/{id}", e.getCard, "GET"},
{"/v1/analytics/{projectId}/cards/{id}", e.updateCard, "PUT"},
{"/v1/analytics/{projectId}/cards/{id}", e.deleteCard, "DELETE"},
{"/v1/analytics/{projectId}/cards/{id}/sessions", e.getCardSessions, "POST"},
}
}
@ -296,3 +297,8 @@ func (e *handlersImpl) deleteCard(w http.ResponseWriter, r *http.Request) {
e.responser.ResponseWithJSON(e.log, r.Context(), w, nil, startTime, r.URL.Path, bodySize)
}
func (e *handlersImpl) getCardSessions(w http.ResponseWriter, r *http.Request) {
// TODO: implement this
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusNotImplemented, fmt.Errorf("not implemented"), time.Now(), r.URL.Path, 0)
}

View file

@ -192,3 +192,34 @@ func (s *CardListSort) GetSQLField() string {
func (s *CardListSort) GetSQLOrder() string {
return strings.ToUpper(s.Order)
}
// ---
/*
class IssueType(str, Enum):
CLICK_RAGE = 'click_rage'
DEAD_CLICK = 'dead_click'
EXCESSIVE_SCROLLING = 'excessive_scrolling'
BAD_REQUEST = 'bad_request'
MISSING_RESOURCE = 'missing_resource'
MEMORY = 'memory'
CPU = 'cpu'
SLOW_RESOURCE = 'slow_resource'
SLOW_PAGE_LOAD = 'slow_page_load'
CRASH = 'crash'
CUSTOM = 'custom'
JS_EXCEPTION = 'js_exception'
MOUSE_THRASHING = 'mouse_thrashing'
# IOS
TAP_RAGE = 'tap_rage'
*/
type IssueType string
type ChartData struct {
StartTs uint64 `json:"startTs"`
EndTs uint64 `json:"endTs"`
Density uint64 `json:"density"`
Filters []FilterItem `json:"filter"`
MetricOf string `json:"metricOf"`
MetricValue []IssueType `json:"metricValue"`
}

View file

@ -3,6 +3,8 @@ package charts
import (
"encoding/json"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"openreplay/backend/pkg/analytics/cards"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/logger"
@ -15,6 +17,7 @@ type Charts interface {
type chartsImpl struct {
log logger.Logger
pgconn pool.Pool
chConn driver.Conn
}
func New(log logger.Logger, conn pool.Pool) (Charts, error) {
@ -24,7 +27,39 @@ func New(log logger.Logger, conn pool.Pool) (Charts, error) {
}, nil
}
// def get_chart()
func (s *chartsImpl) GetData(projectId int, userID uint64, req *GetCardChartDataRequest) ([]DataPoint, error) {
if req == nil {
return nil, fmt.Errorf("request is empty")
}
switch {
case req.MetricType == "funnel":
return nil, fmt.Errorf("funnel metric type is not supported yet")
case req.MetricType == "heatMap":
return nil, fmt.Errorf("heatMap metric type is not supported yet")
case req.MetricType == "pathAnalysis":
return nil, fmt.Errorf("pathAnalysis metric type is not supported yet")
case req.MetricType == "timeseries":
return s.getTimeseriesCharts(projectId, userID, req)
case req.MetricType == "table":
return nil, fmt.Errorf("table metric type is not supported yet")
case req.MetricType == "errors":
fallthrough
case req.MetricType == "performance":
fallthrough
case req.MetricType == "resources":
fallthrough
case req.MetricType == "webVitals":
return s.getMetric(projectId, userID, req)
case req.MetricType == "retention":
return nil, fmt.Errorf("retention metric type is not supported yet")
case req.MetricType == "stickiness":
return nil, fmt.Errorf("stickiness metric type is not supported yet")
}
jsonInput := `
{
"data": [
@ -48,3 +83,40 @@ func (s *chartsImpl) GetData(projectId int, userID uint64, req *GetCardChartData
return resp.Data, nil
}
func (s *chartsImpl) getMetric(projectID int, userID uint64, req *GetCardChartDataRequest) ([]DataPoint, error) {
switch req.MetricOf {
case "countSessions": // metrics.get_processed_sessions
return nil, fmt.Errorf("countSessions metric type is not supported yet")
case "avgVisitedPages": // metrics.get_user_activity_avg_visited_pages
return nil, fmt.Errorf("avgVisitedPages metric type is not supported yet")
case "countRequests": // metrics.get_top_metrics_count_requests
return nil, fmt.Errorf("countRequests metric type is not supported yet")
case "impactedSessionsByJsErrors": // metrics.get_impacted_sessions_by_js_errors
return nil, fmt.Errorf("impactedSessionsByJsErrors metric type is not supported yet")
case "domainsErrors4xx": // metrics.get_domains_errors_4xx
return nil, fmt.Errorf("domainsErrors4xx metric type is not supported yet")
case "domainsErrors5xx": // metrics.get_domains_errors_5xx
return nil, fmt.Errorf("domainsErrors5xx metric type is not supported yet")
case "errorsPerDomains": // metrics.get_errors_per_domains
return nil, fmt.Errorf("errorsPerDomains metric type is not supported yet")
case "errorsPerType": // metrics.get_errors_per_type
return nil, fmt.Errorf("errorsPerType metric type is not supported yet")
}
return nil, fmt.Errorf("metric type is not supported yet")
}
func (s *chartsImpl) getTimeseriesCharts(projectID int, userID uint64, req *GetCardChartDataRequest) ([]DataPoint, error) {
charts := []interface{}{}
for _, series := range req.Series {
res, err := s.searchSeries(projectID, series)
if err != nil {
return nil, err
}
charts = append(charts, res)
}
results := []interface{}{}
return results, nil
}

View file

@ -0,0 +1,364 @@
package charts
import (
"context"
"fmt"
"log"
"strconv"
"strings"
)
type Fields map[string]string
func getSessionMetaFields() Fields {
return Fields{
"revId": "rev_id",
"country": "user_country",
"os": "user_os",
"platform": "user_device_type",
"device": "user_device",
"browser": "user_browser",
}
}
func getMetadataFields() Fields {
return Fields{
"userId": "user_id",
"userAnonymousId": "user_anonymous_id",
"metadata1": "metadata_1",
"metadata2": "metadata_2",
"metadata3": "metadata_3",
"metadata4": "metadata_4",
"metadata5": "metadata_5",
"metadata6": "metadata_6",
"metadata7": "metadata_7",
"metadata8": "metadata_8",
"metadata9": "metadata_9",
"metadata10": "metadata_10",
}
}
func getStepSize(startTimestamp, endTimestamp, density uint64, decimal bool, factor uint64) float64 {
stepSize := (endTimestamp / factor) - (startTimestamp / factor) // TODO: should I use float64 here?
if !decimal {
density--
}
return float64(stepSize) / float64(density)
}
func getBasicConstraints(tableName string, timeConstraint, roundStart bool, data map[string]interface{}, identifier string) []string { // Если tableName не пустая, добавляем точку
if tableName != "" {
tableName += "."
}
chSubQuery := []string{fmt.Sprintf("%s%s = toUInt16(:%s)", tableName, identifier, identifier)}
if timeConstraint {
if roundStart {
chSubQuery = append(chSubQuery, fmt.Sprintf("toStartOfInterval(%sdatetime, INTERVAL :step_size second) >= toDateTime(:startTimestamp/1000)", tableName))
} else {
chSubQuery = append(chSubQuery, fmt.Sprintf("%sdatetime >= toDateTime(:startTimestamp/1000)", tableName))
}
chSubQuery = append(chSubQuery, fmt.Sprintf("%sdatetime < toDateTime(:endTimestamp/1000)", tableName))
}
return append(chSubQuery, getGenericConstraint(data, tableName)...)
}
func getGenericConstraint(data map[string]interface{}, tableName string) []string {
return getConstraint(data, getSessionMetaFields(), tableName)
}
func getConstraint(data map[string]interface{}, fields Fields, tableName string) []string {
var constraints []string
filters, err := data["filters"].([]map[string]interface{})
if !err {
log.Println("error getting filters from data")
filters = make([]map[string]interface{}, 0) // to skip the next block
}
// process filters
for i, f := range filters {
key, _ := f["key"].(string)
value, _ := f["value"].(string)
if field, ok := fields[key]; ok {
if value == "*" || value == "" {
constraints = append(constraints, fmt.Sprintf("isNotNull(%s%s)", tableName, field))
} else {
// constraints.append(f"{table_name}{fields[f['key']]} = %({f['key']}_{i})s")
constraints = append(constraints, fmt.Sprintf("%s%s = %%(%s_%d)s", tableName, field, key, i)) // TODO: where we'll keep the value?
}
}
}
// TODO from Python: remove this in next release
offset := len(filters)
for i, f := range data {
key, _ := f.(string)
value, _ := data[key].(string)
if field, ok := fields[key]; ok {
if value == "*" || value == "" {
constraints = append(constraints, fmt.Sprintf("isNotNull(%s%s)", tableName, field))
} else {
intI, err := strconv.Atoi(i)
if err != nil {
log.Printf("error converting data[k] to int: %v", err)
continue
} else {
constraints = append(constraints, fmt.Sprintf("%s%s = %%(%s_%d)s", tableName, field, f, intI+offset))
}
}
}
}
return constraints
}
func getMetaConstraint(data map[string]interface{}) []string {
return getConstraint(data, getMetadataFields(), "sessions_metadata.")
}
func getConstraintValues(data map[string]interface{}) map[string]interface{} {
params := make(map[string]interface{})
if filters, ok := data["filters"].([]map[string]interface{}); ok {
for i, f := range filters {
key, _ := f["key"].(string)
value := f["value"]
params[fmt.Sprintf("%s_%d", key, i)] = value
}
// TODO from Python: remove this in next release
offset := len(data["filters"].([]map[string]interface{}))
i := 0
for k, v := range data {
params[fmt.Sprintf("%s_%d", k, i+offset)] = v
i++
}
}
return params
}
/*
def get_main_sessions_table(timestamp=0):
return "experimental.sessions_l7d_mv" \
if config("EXP_7D_MV", cast=bool, default=True) \
and timestamp and timestamp >= TimeUTC.now(delta_days=-7) else "experimental.sessions"
*/
func getMainSessionsTable(timestamp uint64) string {
return "experimental.sessions"
}
// Function to convert named parameters to positional parameters
func replaceNamedParams(query string, params map[string]interface{}) (string, []interface{}) {
var args []interface{}
i := 1
for key, val := range params {
placeholder := ":" + key
//query = strings.Replace(query, placeholder, "?", 1)
strVal := fmt.Sprintf("%v", val)
query = strings.Replace(query, placeholder, strVal, -1)
args = append(args, val)
i++
}
return query, args
}
// Helper function to generate a range of floats
func frange(start, end, step float64) []float64 {
var rangeValues []float64
for i := start; i < end; i += step {
rangeValues = append(rangeValues, i)
}
return rangeValues
}
// Helper function to add missing keys from the "complete" map to the "original" map
func addMissingKeys(original, complete map[string]interface{}) map[string]interface{} {
for k, v := range complete {
if _, exists := original[k]; !exists {
original[k] = v
}
}
return original
}
// CompleteMissingSteps fills in missing steps in the data
func CompleteMissingSteps(
startTime, endTime uint64,
density int,
neutral map[string]interface{},
rows []map[string]interface{},
timeKey string,
timeCoefficient int64,
) []map[string]interface{} {
if len(rows) == density {
return rows
}
// Calculate the step size
step := getStepSize(startTime, endTime, uint64(density), true, 1000)
optimal := make([][2]uint64, 0)
for _, i := range frange(float64(startTime)/float64(timeCoefficient), float64(endTime)/float64(timeCoefficient), step) {
startInterval := uint64(i * float64(timeCoefficient))
endInterval := uint64((i + step) * float64(timeCoefficient))
optimal = append(optimal, [2]uint64{startInterval, endInterval})
}
var result []map[string]interface{}
r, o := 0, 0
// Iterate over density
for i := 0; i < density; i++ {
// Clone the neutral map
neutralClone := make(map[string]interface{})
for k, v := range neutral {
if fn, ok := v.(func() interface{}); ok {
neutralClone[k] = fn()
} else {
neutralClone[k] = v
}
}
// If we can just add the rest of the rows to result
if r < len(rows) && len(result)+len(rows)-r == density {
result = append(result, rows[r:]...)
break
}
// Determine where the current row fits within the optimal intervals
if r < len(rows) && o < len(optimal) && rows[r][timeKey].(uint64) < optimal[o][0] {
rows[r] = addMissingKeys(rows[r], neutralClone)
result = append(result, rows[r])
r++
} else if r < len(rows) && o < len(optimal) && optimal[o][0] <= rows[r][timeKey].(uint64) && rows[r][timeKey].(uint64) < optimal[o][1] {
rows[r] = addMissingKeys(rows[r], neutralClone)
result = append(result, rows[r])
r++
o++
} else {
neutralClone[timeKey] = optimal[o][0]
result = append(result, neutralClone)
o++
}
}
return result
}
func progress(oldVal, newVal uint64) float64 {
if newVal > 0 {
return (float64(oldVal-newVal) / float64(newVal)) * 100
}
if oldVal == 0 {
return 0
}
return 100
}
// Trying to find a common part
func parse(projectID uint64, startTs, endTs uint64, density uint64, args map[string]interface{}) ([]string, []string, map[string]interface{}) {
stepSize := getStepSize(startTs, endTs, density, false, 1000)
chSubQuery := getBasicConstraints("sessions", true, false, args, "project_id")
chSubQueryChart := getBasicConstraints("sessions", true, true, args, "project_id")
metaCondition := getMetaConstraint(args)
chSubQuery = append(chSubQuery, metaCondition...)
chSubQueryChart = append(chSubQueryChart, metaCondition...)
params := map[string]interface{}{
"step_size": stepSize,
"project_id": projectID,
"startTimestamp": startTs,
"endTimestamp": endTs,
}
for k, v := range getConstraintValues(args) {
params[k] = v
}
return chSubQuery, chSubQueryChart, params
}
// Sessions trend
func (c *chartsImpl) getProcessedSessions(projectID uint64, startTs, endTs uint64, density uint64, args map[string]interface{}) {
chQuery := `
SELECT toUnixTimestamp(toStartOfInterval(sessions.datetime, INTERVAL :step_size second)) * 1000 AS timestamp,
COUNT(DISTINCT sessions.session_id) AS value
FROM :main_sessions_table AS sessions
WHERE :sub_query_chart
GROUP BY timestamp
ORDER BY timestamp;
`
chSubQuery, chSubQueryChart, params := parse(projectID, startTs, endTs, density, args)
chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1)
chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQueryChart, " AND "), -1)
preparedQuery, preparedArgs := replaceNamedParams(chQuery, params)
rows, err := c.chConn.Query(context.Background(), preparedQuery, preparedArgs)
if err != nil {
log.Fatalf("Error executing query: %v", err)
}
preparedRows := make([]map[string]interface{}, 0)
var sum uint64
for rows.Next() {
var timestamp, value uint64
if err := rows.Scan(&timestamp, &value); err != nil {
log.Fatalf("Error scanning row: %v", err)
}
fmt.Printf("Timestamp: %d, Value: %d\n", timestamp, value)
sum += value
preparedRows = append(preparedRows, map[string]interface{}{"timestamp": timestamp, "value": value})
}
results := map[string]interface{}{
"value": sum,
"chart": CompleteMissingSteps(startTs, endTs, int(density), map[string]interface{}{"value": 0}, preparedRows, "timestamp", 1000),
}
diff := endTs - startTs
endTs = startTs
startTs = endTs - diff
log.Println(results)
chQuery = fmt.Sprintf(`
SELECT COUNT(1) AS count
FROM :main_sessions_table AS sessions
WHERE :sub_query_chart;
`)
chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1)
chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQuery, " AND "), -1)
var count uint64
preparedQuery, preparedArgs = replaceNamedParams(chQuery, params)
if err := c.chConn.QueryRow(context.Background(), preparedQuery, preparedArgs).Scan(&count); err != nil {
log.Fatalf("Error executing query: %v", err)
}
results["progress"] = progress(count, results["value"].(uint64))
// TODO: this should be returned in any case
results["unit"] = "COUNT"
fmt.Println(results)
}
// Users trend
//func getUniqueUsers(projectID uint64, startTs, endTs uint64, density uint64, args map[string]interface{}) {
// chQuery := `
// SELECT toUnixTimestamp(toStartOfInterval(sessions.datetime, INTERVAL :step_size second)) * 1000 AS timestamp,
// COUNT(DISTINCT sessions.user_id) AS value
// FROM :main_sessions_table AS sessions
// WHERE :sub_query_chart
// GROUP BY timestamp
// ORDER BY timestamp;
// `
// chSubQuery, chSubQueryChart, params := parse(projectID, startTs, endTs, density, args)
// chSubQueryChart = append(chSubQueryChart, []string{"isNotNull(sessions.user_id)", "sessions.user_id!=''"}...)
//
// chQuery = strings.Replace(chQuery, ":main_sessions_table", getMainSessionsTable(startTs), -1)
// chQuery = strings.Replace(chQuery, ":sub_query_chart", strings.Join(chSubQueryChart, " AND "), -1)
//
// preparedQuery, preparedArgs := replaceNamedParams(chQuery, params)
//
// return
//}

View file

@ -41,8 +41,9 @@ type handlersImpl struct {
func (e *handlersImpl) GetAll() []*api.Description {
return []*api.Description{
{"/v1/analytics/{projectId}/cards/{id}/chart", e.getCardChartData, "POST"},
{"/v1/analytics/{projectId}/cards/{id}/chart", e.getCardChartData, "POST"}, // for dashboards
{"/v1/analytics/{projectId}/cards/{id}/try", e.getCardChartData, "POST"},
{"/v1/analytics/{projectId}/cards/try", e.getCardChartData, "POST"}, // for cards itself
}
}

View file

@ -8,9 +8,9 @@ type DataPoint struct {
}
type GetCardChartDataRequest struct {
MetricType string `json:"metricType" validate:"required,oneof=timeseries table funnel"`
MetricOf string `json:"metricOf" validate:"required,oneof=session_count user_count"`
ViewType string `json:"viewType" validate:"required,oneof=line_chart table_view"`
MetricType string `json:"metricType" validate:"required,oneof=timeseries table funnel errors performance resources webVitals pathAnalysis retention stickiness heatMap"`
MetricOf string `json:"metricOf" validate:"required,oneof=sessionCount userCount"`
ViewType string `json:"viewType" validate:"required,oneof=lineChart areaChart barChart pieChart progressChart table metric"`
MetricFormat string `json:"metricFormat" validate:"required,oneof=default percentage"`
SessionID int64 `json:"sessionId"`
Series []cards.CardSeries `json:"series" validate:"required,dive"`