pulled dev

This commit is contained in:
Андрей Бабушкин 2025-03-17 11:26:07 +01:00
commit f3af4cb5a5
108 changed files with 1225 additions and 225 deletions

View file

@ -1,8 +1,16 @@
FROM python:3.12-alpine
FROM python:3.12-alpine AS builder
LABEL maintainer="Rajesh Rajendran<rjshrjndrn@gmail.com>"
LABEL maintainer="KRAIEM Taha Yassine<tahayk2@gmail.com>"
RUN apk add --no-cache build-base=~1.2 tini=~0.19
RUN apk add --no-cache build-base
WORKDIR /work
COPY requirements.txt ./requirements.txt
RUN pip install --no-cache-dir --upgrade uv && \
export UV_SYSTEM_PYTHON=true && \
uv pip install --no-cache-dir --upgrade pip setuptools wheel && \
uv pip install --no-cache-dir --upgrade -r requirements.txt
FROM python:3.12-alpine
ARG GIT_SHA
ARG envarg
# Add Tini
@ -13,18 +21,11 @@ ENV SOURCE_MAP_VERSION=0.7.4 \
PRIVATE_ENDPOINTS=false \
ENTERPRISE_BUILD=${envarg} \
GIT_SHA=$GIT_SHA
COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin
WORKDIR /work
COPY requirements.txt ./requirements.txt
RUN pip install --no-cache-dir --upgrade uv && \
uv pip install --no-cache-dir --upgrade pip setuptools wheel --system && \
uv pip install --no-cache-dir --upgrade -r requirements.txt --system
COPY . .
RUN mv env.default .env && \
adduser -u 1001 openreplay -D
USER 1001
RUN apk add --no-cache tini && mv env.default .env
ENTRYPOINT ["/sbin/tini", "--"]
CMD ["./entrypoint.sh"]

View file

@ -1,5 +1,5 @@
from chalicelib.core.errors.modules import errors_helper
from chalicelib.utils import errors_helper
from chalicelib.utils import pg_client, helper
from chalicelib.utils.TimeUTC import TimeUTC
from chalicelib.utils.metrics_helper import get_step_size
@ -98,8 +98,7 @@ def get_details(project_id, error_id, user_id, **data):
device_partition,
country_partition,
chart24,
chart30,
custom_tags
chart30
FROM (SELECT error_id,
name,
message,
@ -114,15 +113,8 @@ def get_details(project_id, error_id, user_id, **data):
MIN(timestamp) AS first_occurrence
FROM events.errors
WHERE error_id = %(error_id)s) AS time_details ON (TRUE)
INNER JOIN (SELECT session_id AS last_session_id,
coalesce(custom_tags, '[]')::jsonb AS custom_tags
INNER JOIN (SELECT session_id AS last_session_id
FROM events.errors
LEFT JOIN LATERAL (
SELECT jsonb_agg(jsonb_build_object(errors_tags.key, errors_tags.value)) AS custom_tags
FROM errors_tags
WHERE errors_tags.error_id = %(error_id)s
AND errors_tags.session_id = errors.session_id
AND errors_tags.message_id = errors.message_id) AS errors_tags ON (TRUE)
WHERE error_id = %(error_id)s
ORDER BY errors.timestamp DESC
LIMIT 1) AS last_session_details ON (TRUE)

View file

@ -1,6 +1,7 @@
from typing import Optional
import schemas
from chalicelib.core.sourcemaps import sourcemaps
def __get_basic_constraints(platform: Optional[schemas.PlatformType] = None, time_constraint: bool = True,
@ -42,3 +43,16 @@ def __get_basic_constraints_ch(platform=None, time_constraint=True, startTime_ar
elif platform == schemas.PlatformType.DESKTOP:
ch_sub_query.append("user_device_type = 'desktop'")
return ch_sub_query
def format_first_stack_frame(error):
error["stack"] = sourcemaps.format_payload(error.pop("payload"), truncate_to_first=True)
for s in error["stack"]:
for c in s.get("context", []):
for sci, sc in enumerate(c):
if isinstance(sc, str) and len(sc) > 1000:
c[sci] = sc[:1000]
# convert bytes to string:
if isinstance(s["filename"], bytes):
s["filename"] = s["filename"].decode("utf-8")
return error

View file

@ -870,12 +870,12 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
events_conditions[-1]["condition"] = []
if not is_any and event.value not in [None, "*", ""]:
event_where.append(
sh.multi_conditions(f"(main1.message {op} %({e_k})s OR main1.name {op} %({e_k})s)",
sh.multi_conditions(f"(toString(main1.`$properties`.message) {op} %({e_k})s OR toString(main1.`$properties`.name) {op} %({e_k})s)",
event.value, value_key=e_k))
events_conditions[-1]["condition"].append(event_where[-1])
events_extra_join += f" AND {event_where[-1]}"
if len(event.source) > 0 and event.source[0] not in [None, "*", ""]:
event_where.append(sh.multi_conditions(f"main1.source = %({s_k})s", event.source, value_key=s_k))
event_where.append(sh.multi_conditions(f"toString(main1.`$properties`.source) = %({s_k})s", event.source, value_key=s_k))
events_conditions[-1]["condition"].append(event_where[-1])
events_extra_join += f" AND {event_where[-1]}"

View file

@ -2,7 +2,7 @@ import schemas
from chalicelib.core import events, metadata, events_mobile, \
issues, assist, canvas, user_testing
from . import sessions_mobs, sessions_devtool
from chalicelib.utils import errors_helper
from chalicelib.core.errors.modules import errors_helper
from chalicelib.utils import pg_client, helper
from chalicelib.core.modules import MOB_KEY, get_file_key

View file

@ -1,14 +0,0 @@
from chalicelib.core.sourcemaps import sourcemaps
def format_first_stack_frame(error):
error["stack"] = sourcemaps.format_payload(error.pop("payload"), truncate_to_first=True)
for s in error["stack"]:
for c in s.get("context", []):
for sci, sc in enumerate(c):
if isinstance(sc, str) and len(sc) > 1000:
c[sci] = sc[:1000]
# convert bytes to string:
if isinstance(s["filename"], bytes):
s["filename"] = s["filename"].decode("utf-8")
return error

View file

@ -19,6 +19,16 @@ PG_CONFIG = dict(_PG_CONFIG)
if config("PG_TIMEOUT", cast=int, default=0) > 0:
PG_CONFIG["options"] = f"-c statement_timeout={config('PG_TIMEOUT', cast=int) * 1000}"
if config('PG_POOL', cast=bool, default=True):
PG_CONFIG = {
**PG_CONFIG,
# Keepalive settings
"keepalives": 1, # Enable keepalives
"keepalives_idle": 300, # Seconds before sending keepalive
"keepalives_interval": 10, # Seconds between keepalives
"keepalives_count": 3 # Number of keepalives before giving up
}
class ORThreadedConnectionPool(psycopg2.pool.ThreadedConnectionPool):
def __init__(self, minconn, maxconn, *args, **kwargs):
@ -55,6 +65,7 @@ RETRY = 0
def make_pool():
if not config('PG_POOL', cast=bool, default=True):
logger.info("PG_POOL is disabled, not creating a new one")
return
global postgreSQL_pool
global RETRY
@ -176,8 +187,7 @@ class PostgresClient:
async def init():
logger.info(f">use PG_POOL:{config('PG_POOL', default=True)}")
if config('PG_POOL', cast=bool, default=True):
make_pool()
make_pool()
async def terminate():

View file

@ -7,7 +7,6 @@ import (
"fmt"
"hash/fnv"
"log"
"openreplay/backend/pkg/metrics/database"
"strings"
"time"
@ -19,6 +18,7 @@ import (
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/hashid"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/metrics/database"
"openreplay/backend/pkg/sessions"
"openreplay/backend/pkg/url"
)
@ -106,25 +106,25 @@ func (c *connectorImpl) newBatch(name, query string) error {
}
var batches = map[string]string{
"sessions": "INSERT INTO experimental.sessions (session_id, project_id, user_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, user_state, user_city, datetime, duration, pages_count, events_count, errors_count, issue_score, referrer, issue_types, tracker_version, user_browser, user_browser_version, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10, timezone, utm_source, utm_medium, utm_campaign) VALUES (?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), ?, ?, ?, ?)",
"sessions": "INSERT INTO experimental.sessions (session_id, project_id, user_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, user_state, user_city, datetime, duration, pages_count, events_count, errors_count, issue_score, referrer, issue_types, tracker_version, user_browser, user_browser_version, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10, platform, timezone, utm_source, utm_medium, utm_campaign) VALUES (?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?)",
"autocompletes": "INSERT INTO experimental.autocomplete (project_id, type, value) VALUES (?, ?, SUBSTR(?, 1, 8000))",
"pages": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$current_url", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"clicks": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$current_url", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"inputs": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$duration_s", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"errors": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"performance": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"requests": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$duration_s", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"custom": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"graphql": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"issuesEvents": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", issue_type, issue_id, "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"pages": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$device", "$os_version", "$current_url", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"clicks": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$device", "$os_version", "$current_url", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"inputs": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$device", "$os_version", "$duration_s", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"errors": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$device", "$os_version", error_id, "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"performance": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$device", "$os_version", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"requests": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$device", "$os_version", "$duration_s", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"custom": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$device", "$os_version", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"graphql": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$device", "$os_version", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"issuesEvents": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$device", "$os_version", issue_type, issue_id, "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"issues": "INSERT INTO experimental.issues (project_id, issue_id, type, context_string) VALUES (?, ?, ?, ?)",
"mobile_sessions": "INSERT INTO experimental.sessions (session_id, project_id, user_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, user_state, user_city, datetime, duration, pages_count, events_count, errors_count, issue_score, referrer, issue_types, tracker_version, user_browser, user_browser_version, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10, platform, timezone) VALUES (?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), ?, ?)",
"mobile_custom": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"mobile_clicks": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"mobile_swipes": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"mobile_inputs": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"mobile_requests": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"mobile_crashes": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"mobile_custom": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$device", "$os_version", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"mobile_clicks": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$device", "$os_version", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"mobile_swipes": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$device", "$os_version", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"mobile_inputs": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$device", "$os_version", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"mobile_requests": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$device", "$os_version", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"mobile_crashes": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$device", "$os_version", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
}
func (c *connectorImpl) Prepare() error {
@ -215,6 +215,7 @@ func (c *connectorImpl) InsertWebSession(session *sessions.Session) error {
session.Metadata8,
session.Metadata9,
session.Metadata10,
"web",
session.Timezone,
session.UtmSource,
session.UtmMedium,
@ -246,8 +247,10 @@ func (c *connectorImpl) InsertWebInputDuration(session *sessions.Session, msg *m
return nil
}
jsonString, err := json.Marshal(map[string]interface{}{
"label": msg.Label,
"hesitation_time": nullableUint32(uint32(msg.HesitationTime)),
"label": msg.Label,
"hesitation_time": nullableUint32(uint32(msg.HesitationTime)),
"user_device": session.UserDevice,
"user_device_type": session.UserDeviceType,
})
if err != nil {
return fmt.Errorf("can't marshal input event: %s", err)
@ -262,6 +265,8 @@ func (c *connectorImpl) InsertWebInputDuration(session *sessions.Session, msg *m
eventTime.Unix(),
session.UserUUID,
true,
session.Platform,
session.UserOSVersion,
nullableUint16(uint16(msg.InputDuration)),
jsonString,
); err != nil {
@ -278,12 +283,14 @@ func (c *connectorImpl) InsertMouseThrashing(session *sessions.Session, msg *mes
return fmt.Errorf("can't extract url parts: %s", err)
}
jsonString, err := json.Marshal(map[string]interface{}{
"issue_id": issueID,
"issue_type": "mouse_thrashing",
"url": cropString(msg.Url),
"url_host": host,
"url_path": path,
"url_hostpath": hostpath,
"issue_id": issueID,
"issue_type": "mouse_thrashing",
"url": cropString(msg.Url),
"url_host": host,
"url_path": path,
"url_hostpath": hostpath,
"user_device": session.UserDevice,
"user_device_type": session.UserDeviceType,
})
if err != nil {
return fmt.Errorf("can't marshal issue event: %s", err)
@ -298,6 +305,8 @@ func (c *connectorImpl) InsertMouseThrashing(session *sessions.Session, msg *mes
eventTime.Unix(),
session.UserUUID,
true,
session.Platform,
session.UserOSVersion,
"mouse_thrashing",
issueID,
jsonString,
@ -330,12 +339,14 @@ func (c *connectorImpl) InsertIssue(session *sessions.Session, msg *messages.Iss
return fmt.Errorf("can't extract url parts: %s", err)
}
jsonString, err := json.Marshal(map[string]interface{}{
"issue_id": issueID,
"issue_type": msg.Type,
"url": cropString(msg.Url),
"url_host": host,
"url_path": path,
"url_hostpath": hostpath,
"issue_id": issueID,
"issue_type": msg.Type,
"url": cropString(msg.Url),
"url_host": host,
"url_path": path,
"url_hostpath": hostpath,
"user_device": session.UserDevice,
"user_device_type": session.UserDeviceType,
})
if err != nil {
return fmt.Errorf("can't marshal issue event: %s", err)
@ -350,6 +361,8 @@ func (c *connectorImpl) InsertIssue(session *sessions.Session, msg *messages.Iss
eventTime.Unix(),
session.UserUUID,
true,
session.Platform,
session.UserOSVersion,
msg.Type,
issueID,
jsonString,
@ -421,6 +434,8 @@ func (c *connectorImpl) InsertWebPageEvent(session *sessions.Session, msg *messa
"dom_building_time": domBuildingTime,
"dom_content_loaded_event_time": domContentLoadedEventTime,
"load_event_time": loadEventTime,
"user_device": session.UserDevice,
"user_device_type": session.UserDeviceType,
})
if err != nil {
return fmt.Errorf("can't marshal page event: %s", err)
@ -435,6 +450,8 @@ func (c *connectorImpl) InsertWebPageEvent(session *sessions.Session, msg *messa
eventTime.Unix(),
session.UserUUID,
true,
session.Platform,
session.UserOSVersion,
cropString(msg.URL),
jsonString,
); err != nil {
@ -468,15 +485,17 @@ func (c *connectorImpl) InsertWebClickEvent(session *sessions.Session, msg *mess
return fmt.Errorf("can't extract url parts: %s", err)
}
jsonString, err := json.Marshal(map[string]interface{}{
"label": msg.Label,
"hesitation_time": nullableUint32(uint32(msg.HesitationTime)),
"selector": msg.Selector,
"normalized_x": nX,
"normalized_y": nY,
"url": cropString(msg.Url),
"url_host": host,
"url_path": path,
"url_hostpath": hostpath,
"label": msg.Label,
"hesitation_time": nullableUint32(uint32(msg.HesitationTime)),
"selector": msg.Selector,
"normalized_x": nX,
"normalized_y": nY,
"url": cropString(msg.Url),
"url_host": host,
"url_path": path,
"url_hostpath": hostpath,
"user_device": session.UserDevice,
"user_device_type": session.UserDeviceType,
})
if err != nil {
return fmt.Errorf("can't marshal click event: %s", err)
@ -491,6 +510,8 @@ func (c *connectorImpl) InsertWebClickEvent(session *sessions.Session, msg *mess
eventTime.Unix(),
session.UserUUID,
true,
session.Platform,
session.UserOSVersion,
cropString(msg.Url),
jsonString,
); err != nil {
@ -501,11 +522,6 @@ func (c *connectorImpl) InsertWebClickEvent(session *sessions.Session, msg *mess
}
func (c *connectorImpl) InsertWebErrorEvent(session *sessions.Session, msg *types.ErrorEvent) error {
keys, values := make([]string, 0, len(msg.Tags)), make([]*string, 0, len(msg.Tags))
for k, v := range msg.Tags {
keys = append(keys, k)
values = append(values, v)
}
// Check error source before insert to avoid panic from clickhouse lib
switch msg.Source {
case "js_exception", "bugsnag", "cloudwatch", "datadog", "elasticsearch", "newrelic", "rollbar", "sentry", "stackdriver", "sumologic":
@ -514,12 +530,11 @@ func (c *connectorImpl) InsertWebErrorEvent(session *sessions.Session, msg *type
}
msgID, _ := msg.ID(session.ProjectID)
jsonString, err := json.Marshal(map[string]interface{}{
"source": msg.Source,
"name": nullableString(msg.Name),
"message": msg.Message,
"error_id": msgID,
"error_tags_keys": keys,
"error_tags_values": values,
"source": msg.Source,
"name": nullableString(msg.Name),
"message": msg.Message,
"user_device": session.UserDevice,
"user_device_type": session.UserDeviceType,
})
if err != nil {
return fmt.Errorf("can't marshal error event: %s", err)
@ -534,6 +549,9 @@ func (c *connectorImpl) InsertWebErrorEvent(session *sessions.Session, msg *type
eventTime.Unix(),
session.UserUUID,
true,
session.Platform,
session.UserOSVersion,
msgID,
jsonString,
); err != nil {
c.checkError("errors", err)
@ -565,6 +583,8 @@ func (c *connectorImpl) InsertWebPerformanceTrackAggr(session *sessions.Session,
"min_used_js_heap_size": msg.MinUsedJSHeapSize,
"avg_used_js_heap_size": msg.AvgUsedJSHeapSize,
"max_used_js_heap_size": msg.MaxUsedJSHeapSize,
"user_device": session.UserDevice,
"user_device_type": session.UserDeviceType,
})
if err != nil {
return fmt.Errorf("can't marshal performance event: %s", err)
@ -579,6 +599,8 @@ func (c *connectorImpl) InsertWebPerformanceTrackAggr(session *sessions.Session,
eventTime.Unix(),
session.UserUUID,
true,
session.Platform,
session.UserOSVersion,
jsonString,
); err != nil {
c.checkError("performance", err)
@ -602,16 +624,18 @@ func (c *connectorImpl) InsertRequest(session *sessions.Session, msg *messages.N
return fmt.Errorf("can't extract url parts: %s", err)
}
jsonString, err := json.Marshal(map[string]interface{}{
"request_body": request,
"response_body": response,
"status": uint16(msg.Status),
"method": url.EnsureMethod(msg.Method),
"success": msg.Status < 400,
"transfer_size": uint32(msg.TransferredBodySize),
"url": cropString(msg.URL),
"url_host": host,
"url_path": path,
"url_hostpath": hostpath,
"request_body": request,
"response_body": response,
"status": uint16(msg.Status),
"method": url.EnsureMethod(msg.Method),
"success": msg.Status < 400,
"transfer_size": uint32(msg.TransferredBodySize),
"url": cropString(msg.URL),
"url_host": host,
"url_path": path,
"url_hostpath": hostpath,
"user_device": session.UserDevice,
"user_device_type": session.UserDeviceType,
})
if err != nil {
return fmt.Errorf("can't marshal request event: %s", err)
@ -626,6 +650,8 @@ func (c *connectorImpl) InsertRequest(session *sessions.Session, msg *messages.N
eventTime.Unix(),
session.UserUUID,
true,
session.Platform,
session.UserOSVersion,
nullableUint16(uint16(msg.Duration)),
jsonString,
); err != nil {
@ -637,8 +663,10 @@ func (c *connectorImpl) InsertRequest(session *sessions.Session, msg *messages.N
func (c *connectorImpl) InsertCustom(session *sessions.Session, msg *messages.CustomEvent) error {
jsonString, err := json.Marshal(map[string]interface{}{
"name": msg.Name,
"payload": msg.Payload,
"name": msg.Name,
"payload": msg.Payload,
"user_device": session.UserDevice,
"user_device_type": session.UserDeviceType,
})
if err != nil {
return fmt.Errorf("can't marshal custom event: %s", err)
@ -653,6 +681,8 @@ func (c *connectorImpl) InsertCustom(session *sessions.Session, msg *messages.Cu
eventTime.Unix(),
session.UserUUID,
true,
session.Platform,
session.UserOSVersion,
jsonString,
); err != nil {
c.checkError("custom", err)
@ -663,9 +693,11 @@ func (c *connectorImpl) InsertCustom(session *sessions.Session, msg *messages.Cu
func (c *connectorImpl) InsertGraphQL(session *sessions.Session, msg *messages.GraphQL) error {
jsonString, err := json.Marshal(map[string]interface{}{
"name": msg.OperationName,
"request_body": nullableString(msg.Variables),
"response_body": nullableString(msg.Response),
"name": msg.OperationName,
"request_body": nullableString(msg.Variables),
"response_body": nullableString(msg.Response),
"user_device": session.UserDevice,
"user_device_type": session.UserDeviceType,
})
if err != nil {
return fmt.Errorf("can't marshal graphql event: %s", err)
@ -680,6 +712,8 @@ func (c *connectorImpl) InsertGraphQL(session *sessions.Session, msg *messages.G
eventTime.Unix(),
session.UserUUID,
true,
session.Platform,
session.UserOSVersion,
jsonString,
); err != nil {
c.checkError("graphql", err)
@ -727,7 +761,7 @@ func (c *connectorImpl) InsertMobileSession(session *sessions.Session) error {
session.Metadata8,
session.Metadata9,
session.Metadata10,
"ios",
"mobile",
session.Timezone,
); err != nil {
c.checkError("mobile_sessions", err)
@ -738,8 +772,10 @@ func (c *connectorImpl) InsertMobileSession(session *sessions.Session) error {
func (c *connectorImpl) InsertMobileCustom(session *sessions.Session, msg *messages.MobileEvent) error {
jsonString, err := json.Marshal(map[string]interface{}{
"name": msg.Name,
"payload": msg.Payload,
"name": msg.Name,
"payload": msg.Payload,
"user_device": session.UserDevice,
"user_device_type": session.UserDeviceType,
})
if err != nil {
return fmt.Errorf("can't marshal mobile custom event: %s", err)
@ -754,6 +790,8 @@ func (c *connectorImpl) InsertMobileCustom(session *sessions.Session, msg *messa
eventTime.Unix(),
session.UserUUID,
true,
session.Platform,
session.UserOSVersion,
jsonString,
); err != nil {
c.checkError("mobile_custom", err)
@ -767,7 +805,9 @@ func (c *connectorImpl) InsertMobileClick(session *sessions.Session, msg *messag
return nil
}
jsonString, err := json.Marshal(map[string]interface{}{
"label": msg.Label,
"label": msg.Label,
"user_device": session.UserDevice,
"user_device_type": session.UserDeviceType,
})
if err != nil {
return fmt.Errorf("can't marshal mobile clicks event: %s", err)
@ -782,6 +822,8 @@ func (c *connectorImpl) InsertMobileClick(session *sessions.Session, msg *messag
eventTime.Unix(),
session.UserUUID,
true,
session.Platform,
session.UserOSVersion,
jsonString,
); err != nil {
c.checkError("mobile_clicks", err)
@ -795,8 +837,10 @@ func (c *connectorImpl) InsertMobileSwipe(session *sessions.Session, msg *messag
return nil
}
jsonString, err := json.Marshal(map[string]interface{}{
"label": msg.Label,
"direction": nullableString(msg.Direction),
"label": msg.Label,
"direction": nullableString(msg.Direction),
"user_device": session.UserDevice,
"user_device_type": session.UserDeviceType,
})
if err != nil {
return fmt.Errorf("can't marshal mobile swipe event: %s", err)
@ -811,6 +855,8 @@ func (c *connectorImpl) InsertMobileSwipe(session *sessions.Session, msg *messag
eventTime.Unix(),
session.UserUUID,
true,
session.Platform,
session.UserOSVersion,
jsonString,
); err != nil {
c.checkError("mobile_swipes", err)
@ -824,7 +870,9 @@ func (c *connectorImpl) InsertMobileInput(session *sessions.Session, msg *messag
return nil
}
jsonString, err := json.Marshal(map[string]interface{}{
"label": msg.Label,
"label": msg.Label,
"user_device": session.UserDevice,
"user_device_type": session.UserDeviceType,
})
if err != nil {
return fmt.Errorf("can't marshal mobile input event: %s", err)
@ -839,6 +887,8 @@ func (c *connectorImpl) InsertMobileInput(session *sessions.Session, msg *messag
eventTime.Unix(),
session.UserUUID,
true,
session.Platform,
session.UserOSVersion,
jsonString,
); err != nil {
c.checkError("mobile_inputs", err)
@ -858,13 +908,15 @@ func (c *connectorImpl) InsertMobileRequest(session *sessions.Session, msg *mess
response = &msg.Response
}
jsonString, err := json.Marshal(map[string]interface{}{
"url": cropString(msg.URL),
"request_body": request,
"response_body": response,
"status": uint16(msg.Status),
"method": url.EnsureMethod(msg.Method),
"duration": uint16(msg.Duration),
"success": msg.Status < 400,
"url": cropString(msg.URL),
"request_body": request,
"response_body": response,
"status": uint16(msg.Status),
"method": url.EnsureMethod(msg.Method),
"duration": uint16(msg.Duration),
"success": msg.Status < 400,
"user_device": session.UserDevice,
"user_device_type": session.UserDeviceType,
})
if err != nil {
return fmt.Errorf("can't marshal mobile request event: %s", err)
@ -879,6 +931,8 @@ func (c *connectorImpl) InsertMobileRequest(session *sessions.Session, msg *mess
eventTime.Unix(),
session.UserUUID,
true,
session.Platform,
session.UserOSVersion,
jsonString,
); err != nil {
c.checkError("mobile_requests", err)
@ -889,9 +943,11 @@ func (c *connectorImpl) InsertMobileRequest(session *sessions.Session, msg *mess
func (c *connectorImpl) InsertMobileCrash(session *sessions.Session, msg *messages.MobileCrash) error {
jsonString, err := json.Marshal(map[string]interface{}{
"name": msg.Name,
"reason": msg.Reason,
"stacktrace": msg.Stacktrace,
"name": msg.Name,
"reason": msg.Reason,
"stacktrace": msg.Stacktrace,
"user_device": session.UserDevice,
"user_device_type": session.UserDeviceType,
})
if err != nil {
return fmt.Errorf("can't marshal mobile crash event: %s", err)
@ -906,6 +962,8 @@ func (c *connectorImpl) InsertMobileCrash(session *sessions.Session, msg *messag
eventTime.Unix(),
session.UserUUID,
true,
session.Platform,
session.UserOSVersion,
jsonString,
); err != nil {
c.checkError("mobile_crashes", err)

View file

@ -181,11 +181,6 @@ func (conn *Conn) InsertWebErrorEvent(sess *sessions.Session, e *types.ErrorEven
if err := conn.bulks.Get("webErrorEvents").Append(sess.SessionID, truncSqIdx(e.MessageID), e.Timestamp, errorID); err != nil {
conn.log.Error(sessCtx, "insert web error event err: %s", err)
}
for key, value := range e.Tags {
if err := conn.bulks.Get("webErrorTags").Append(sess.SessionID, truncSqIdx(e.MessageID), errorID, key, value); err != nil {
conn.log.Error(sessCtx, "insert web error token err: %s", err)
}
}
return nil
}

View file

@ -61,7 +61,6 @@ func parseTags(tagsJSON string) (tags map[string]*string, err error) {
}
func WrapJSException(m *JSException) (*ErrorEvent, error) {
meta, err := parseTags(m.Metadata)
return &ErrorEvent{
MessageID: m.Meta().Index,
Timestamp: m.Meta().Timestamp,
@ -69,9 +68,8 @@ func WrapJSException(m *JSException) (*ErrorEvent, error) {
Name: m.Name,
Message: m.Message,
Payload: m.Payload,
Tags: meta,
OriginType: m.TypeID(),
}, err
}, nil
}
func WrapIntegrationEvent(m *IntegrationEvent) *ErrorEvent {

View file

@ -3,13 +3,13 @@ package projects
import (
"context"
"errors"
"openreplay/backend/pkg/metrics/database"
"time"
"openreplay/backend/pkg/cache"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/db/redis"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/metrics/database"
)
type Projects interface {

1
ee/api/.gitignore vendored
View file

@ -235,7 +235,6 @@ Pipfile.lock
/chalicelib/utils/dev.py
/chalicelib/utils/email_handler.py
/chalicelib/utils/email_helper.py
/chalicelib/utils/errors_helper.py
/chalicelib/utils/event_filter_definition.py
/chalicelib/utils/github_client_v3.py
/chalicelib/utils/helper.py

View file

@ -95,28 +95,26 @@ def get_details(project_id, error_id, user_id, **data):
"error_id": error_id}
main_ch_query = f"""\
WITH pre_processed AS (SELECT toString(`$properties`.error_id) AS error_id,
toString(`$properties`.name) AS name,
toString(`$properties`.message) AS message,
WITH pre_processed AS (SELECT toString(`$properties`.error_id) AS error_id,
toString(`$properties`.name) AS name,
toString(`$properties`.message) AS message,
session_id,
created_at AS datetime,
`$user_id` AS user_id,
`$browser` AS user_browser,
`$browser_version` AS user_browser_version,
`$os` AS user_os,
'UNDEFINED' AS user_os_version,
NULL AS user_device_type,
`$device` AS user_device,
`$country` AS user_country,
[] AS error_tags_keys,
[] AS error_tags_values
created_at AS datetime,
`$user_id` AS user_id,
`$browser` AS user_browser,
`$browser_version` AS user_browser_version,
`$os` AS user_os,
'$os_version' AS user_os_version,
toString(`$properties`.user_device_type) AS user_device_type,
toString(`$properties`.user_device) AS user_device,
`$country` AS user_country
FROM {MAIN_ERR_SESS_TABLE} AS errors
WHERE {" AND ".join(ch_basic_query)}
)
SELECT %(error_id)s AS error_id, name, message,users,
first_occurrence,last_occurrence,last_session_id,
sessions,browsers_partition,os_partition,device_partition,
country_partition,chart24,chart30,custom_tags
country_partition,chart24,chart30
FROM (SELECT error_id,
name,
message
@ -131,8 +129,7 @@ def get_details(project_id, error_id, user_id, **data):
INNER JOIN (SELECT toUnixTimestamp(max(datetime)) * 1000 AS last_occurrence,
toUnixTimestamp(min(datetime)) * 1000 AS first_occurrence
FROM pre_processed) AS time_details ON TRUE
INNER JOIN (SELECT session_id AS last_session_id,
arrayMap((key, value)->(map(key, value)), error_tags_keys, error_tags_values) AS custom_tags
INNER JOIN (SELECT session_id AS last_session_id
FROM pre_processed
ORDER BY datetime DESC
LIMIT 1) AS last_session_details ON TRUE

View file

@ -59,7 +59,6 @@ rm -rf ./chalicelib/utils/captcha.py
rm -rf ./chalicelib/utils/dev.py
rm -rf ./chalicelib/utils/email_handler.py
rm -rf ./chalicelib/utils/email_helper.py
rm -rf ./chalicelib/utils/errors_helper.py
rm -rf ./chalicelib/utils/event_filter_definition.py
rm -rf ./chalicelib/utils/github_client_v3.py
rm -rf ./chalicelib/utils/helper.py

View file

@ -0,0 +1,306 @@
package main
import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"time"
"openreplay/backend/internal/config/ender"
"openreplay/backend/internal/sessionender"
"openreplay/backend/internal/storage"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/db/redis"
"openreplay/backend/pkg/intervals"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/memory"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/metrics"
"openreplay/backend/pkg/metrics/database"
enderMetrics "openreplay/backend/pkg/metrics/ender"
"openreplay/backend/pkg/projects"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/sessions"
)
func main() {
ctx := context.Background()
log := logger.New()
cfg := ender.New(log)
// Observability
dbMetric := database.New("ender")
enderMetric := enderMetrics.New("ender")
metrics.New(log, append(enderMetric.List(), dbMetric.List()...))
pgConn, err := pool.New(dbMetric, cfg.Postgres.String())
if err != nil {
log.Fatal(ctx, "can't init postgres connection: %s", err)
}
defer pgConn.Close()
redisClient, err := redis.New(&cfg.Redis)
if err != nil {
log.Warn(ctx, "can't init redis connection: %s", err)
}
defer redisClient.Close()
projManager := projects.New(log, pgConn, redisClient, dbMetric)
sessManager := sessions.New(log, pgConn, projManager, redisClient, dbMetric)
sessionEndGenerator, err := sessionender.New(enderMetric, intervals.EVENTS_SESSION_END_TIMEOUT, cfg.PartitionsNumber)
if err != nil {
log.Fatal(ctx, "can't init ender service: %s", err)
}
mobileMessages := []int{90, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 107, 110, 111}
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
consumer := queue.NewConsumer(
cfg.GroupEnder,
[]string{
cfg.TopicRawWeb,
cfg.TopicRawMobile,
},
messages.NewEnderMessageIterator(
log,
func(msg messages.Message) { sessionEndGenerator.UpdateSession(msg) },
append([]int{messages.MsgTimestamp}, mobileMessages...),
false),
false,
cfg.MessageSizeLimit,
)
memoryManager, err := memory.NewManager(log, cfg.MemoryLimitMB, cfg.MaxMemoryUsage)
if err != nil {
log.Fatal(ctx, "can't init memory manager: %s", err)
}
log.Info(ctx, "Ender service started")
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
tick := time.Tick(intervals.EVENTS_COMMIT_INTERVAL * time.Millisecond)
for {
select {
case sig := <-sigchan:
log.Info(ctx, "Caught signal %v: terminating", sig)
producer.Close(cfg.ProducerTimeout)
if err := consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP); err != nil {
log.Error(ctx, "can't commit messages with offset: %s", err)
}
consumer.Close()
os.Exit(0)
case <-tick:
details := newDetails()
// Find ended sessions and send notification to other services
sessionEndGenerator.HandleEndedSessions(func(sessions map[uint64]uint64) map[uint64]bool {
// Load all sessions from DB
sessionsList := make([]uint64, 0, len(sessions))
for sessionID := range sessions {
sessionsList = append(sessionsList, sessionID)
}
completedSessions := make(map[uint64]bool)
sessionsData, err := sessManager.GetManySessions(sessionsList)
if err != nil {
log.Error(ctx, "can't get sessions from database: %s", err)
return completedSessions
}
// Check if each session was ended
for sessionID, sess := range sessionsData {
sessCtx := context.WithValue(context.Background(), "sessionID", fmt.Sprintf("%d", sessionID))
timestamp := sessions[sessionID]
currDuration := *sess.Duration
newDur := timestamp - sess.Timestamp
// Skip if session was ended before with same duration
if currDuration == newDur {
details.Duplicated[sessionID] = currDuration
completedSessions[sessionID] = true
continue
}
if currDuration > newDur {
details.Shorter[sessionID] = int64(currDuration) - int64(newDur)
completedSessions[sessionID] = true
continue
}
newDuration, err := sessManager.UpdateDuration(sessionID, timestamp)
if err != nil {
if strings.Contains(err.Error(), "integer out of range") {
// Skip session with broken duration
details.Failed[sessionID] = timestamp
completedSessions[sessionID] = true
continue
}
if strings.Contains(err.Error(), "is less than zero for uint64") {
details.Negative[sessionID] = timestamp
completedSessions[sessionID] = true
continue
}
if strings.Contains(err.Error(), "no rows in result set") {
details.NotFound[sessionID] = timestamp
completedSessions[sessionID] = true
continue
}
log.Error(sessCtx, "can't update session duration, err: %s", err)
continue
}
// Check one more time just in case
if currDuration == newDuration {
details.Duplicated[sessionID] = currDuration
completedSessions[sessionID] = true
continue
}
msg := &messages.SessionEnd{Timestamp: timestamp}
if cfg.UseEncryption {
if key := storage.GenerateEncryptionKey(); key != nil {
if err := sessManager.UpdateEncryptionKey(sessionID, key); err != nil {
log.Warn(sessCtx, "can't save session encryption key: %s, session will not be encrypted", err)
} else {
msg.EncryptionKey = string(key)
}
}
}
if sess != nil && (sess.Platform == "ios" || sess.Platform == "android") {
msg := &messages.MobileSessionEnd{Timestamp: timestamp}
if err := producer.Produce(cfg.TopicRawMobile, sessionID, msg.Encode()); err != nil {
log.Error(sessCtx, "can't send MobileSessionEnd to mobile topic: %s", err)
continue
}
if err := producer.Produce(cfg.TopicRawImages, sessionID, msg.Encode()); err != nil {
log.Error(sessCtx, "can't send MobileSessionEnd signal to canvas topic: %s", err)
}
} else {
if err := producer.Produce(cfg.TopicRawWeb, sessionID, msg.Encode()); err != nil {
log.Error(sessCtx, "can't send sessionEnd to raw topic: %s", err)
continue
}
if err := producer.Produce(cfg.TopicCanvasImages, sessionID, msg.Encode()); err != nil {
log.Error(sessCtx, "can't send sessionEnd signal to canvas topic: %s", err)
}
}
if currDuration != 0 {
details.Diff[sessionID] = int64(newDuration) - int64(currDuration)
details.Updated++
} else {
details.New++
}
completedSessions[sessionID] = true
}
return completedSessions
})
details.Log(log, ctx)
producer.Flush(cfg.ProducerTimeout)
if err := consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP); err != nil {
log.Error(ctx, "can't commit messages with offset: %s", err)
}
case msg := <-consumer.Rebalanced():
if msg.Type == types.RebalanceTypeRevoke {
sessionEndGenerator.Disable()
} else {
sessionEndGenerator.ActivePartitions(msg.Partitions)
sessionEndGenerator.Enable()
}
default:
if !memoryManager.HasFreeMemory() {
continue
}
if err := consumer.ConsumeNext(); err != nil {
log.Fatal(ctx, "error on consuming: %s", err)
}
}
}
}
type logDetails struct {
Failed map[uint64]uint64
Duplicated map[uint64]uint64
Negative map[uint64]uint64
Shorter map[uint64]int64
NotFound map[uint64]uint64
Diff map[uint64]int64
Updated int
New int
}
func newDetails() *logDetails {
return &logDetails{
Failed: make(map[uint64]uint64),
Duplicated: make(map[uint64]uint64),
Negative: make(map[uint64]uint64),
Shorter: make(map[uint64]int64),
NotFound: make(map[uint64]uint64),
Diff: make(map[uint64]int64),
Updated: 0,
New: 0,
}
}
func (l *logDetails) Log(log logger.Logger, ctx context.Context) {
if n := len(l.Failed); n > 0 {
log.Debug(ctx, "sessions with wrong duration: %d, %v", n, l.Failed)
}
if n := len(l.Negative); n > 0 {
log.Debug(ctx, "sessions with negative duration: %d, %v", n, l.Negative)
}
if n := len(l.NotFound); n > 0 {
log.Debug(ctx, "sessions without info in DB: %d, %v", n, l.NotFound)
}
var logBuilder strings.Builder
logValues := []interface{}{}
if len(l.Failed) > 0 {
logBuilder.WriteString("failed: %d, ")
logValues = append(logValues, len(l.Failed))
}
if len(l.Negative) > 0 {
logBuilder.WriteString("negative: %d, ")
logValues = append(logValues, len(l.Negative))
}
if len(l.Shorter) > 0 {
logBuilder.WriteString("shorter: %d, ")
logValues = append(logValues, len(l.Shorter))
}
if len(l.Duplicated) > 0 {
logBuilder.WriteString("same: %d, ")
logValues = append(logValues, len(l.Duplicated))
}
if l.Updated > 0 {
logBuilder.WriteString("updated: %d, ")
logValues = append(logValues, l.Updated)
}
if l.New > 0 {
logBuilder.WriteString("new: %d, ")
logValues = append(logValues, l.New)
}
if len(l.NotFound) > 0 {
logBuilder.WriteString("not found: %d, ")
logValues = append(logValues, len(l.NotFound))
}
if logBuilder.Len() > 0 {
logMessage := logBuilder.String()
logMessage = logMessage[:len(logMessage)-2]
log.Info(ctx, logMessage, logValues...)
}
}
type SessionEndType int
const (
FailedSessionEnd SessionEndType = iota + 1
DuplicatedSessionEnd
NegativeDuration
ShorterDuration
NewSessionEnd
NoSessionInDB
)

View file

@ -0,0 +1,153 @@
package sessionender
import (
"time"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/metrics/ender"
)
// EndedSessionHandler handler for ended sessions
type EndedSessionHandler func(map[uint64]uint64) map[uint64]bool
// session holds information about user's session live status
type session struct {
lastTimestamp int64 // timestamp from message broker
lastUpdate int64 // local timestamp
lastUserTime uint64
isEnded bool
isMobile bool
}
// SessionEnder updates timestamp of last message for each session
type SessionEnder struct {
metrics ender.Ender
timeout int64
sessions map[uint64]*session // map[sessionID]session
timeCtrl *timeController
parts uint64
enabled bool
}
func New(metrics ender.Ender, timeout int64, parts int) (*SessionEnder, error) {
return &SessionEnder{
metrics: metrics,
timeout: timeout,
sessions: make(map[uint64]*session),
timeCtrl: NewTimeController(parts),
parts: uint64(parts), // ender uses all partitions by default
enabled: true,
}, nil
}
func (se *SessionEnder) Enable() {
se.enabled = true
}
func (se *SessionEnder) Disable() {
se.enabled = false
}
func (se *SessionEnder) ActivePartitions(parts []uint64) {
activeParts := make(map[uint64]bool, 0)
for _, p := range parts {
activeParts[p] = true
}
removedSessions := 0
activeSessions := 0
for sessID, _ := range se.sessions {
if !activeParts[sessID%se.parts] {
delete(se.sessions, sessID)
se.metrics.DecreaseActiveSessions()
removedSessions++
} else {
activeSessions++
}
}
}
// UpdateSession save timestamp for new sessions and update for existing sessions
func (se *SessionEnder) UpdateSession(msg messages.Message) {
var (
sessionID = msg.Meta().SessionID()
batchTimestamp = msg.Meta().Batch().Timestamp()
msgTimestamp = msg.Meta().Timestamp
localTimestamp = time.Now().UnixMilli()
)
if messages.IsMobileType(msg.TypeID()) {
msgTimestamp = messages.GetTimestamp(msg)
}
if batchTimestamp == 0 {
return
}
se.timeCtrl.UpdateTime(sessionID, batchTimestamp, localTimestamp)
sess, ok := se.sessions[sessionID]
if !ok {
// Register new session
se.sessions[sessionID] = &session{
lastTimestamp: batchTimestamp,
lastUpdate: localTimestamp,
lastUserTime: msgTimestamp, // last timestamp from user's machine
isEnded: false,
isMobile: messages.IsMobileType(msg.TypeID()),
}
se.metrics.IncreaseActiveSessions()
se.metrics.IncreaseTotalSessions()
return
}
// Keep the highest user's timestamp for correct session duration value
if msgTimestamp > sess.lastUserTime {
sess.lastUserTime = msgTimestamp
}
// Keep information about the latest message for generating sessionEnd trigger
if batchTimestamp > sess.lastTimestamp {
sess.lastTimestamp = batchTimestamp
sess.lastUpdate = localTimestamp
sess.isEnded = false
}
}
// HandleEndedSessions runs handler for each ended session and delete information about session in successful case
func (se *SessionEnder) HandleEndedSessions(handler EndedSessionHandler) {
if !se.enabled {
return
}
currTime := time.Now().UnixMilli()
isSessionEnded := func(sessID uint64, sess *session) (bool, int) {
// Has been finished already
if sess.isEnded {
return true, 1
}
batchTimeDiff := se.timeCtrl.LastBatchTimestamp(sessID) - sess.lastTimestamp
// Has been finished according to batch timestamp and hasn't been updated for a long time
if (batchTimeDiff >= se.timeout) && (currTime-sess.lastUpdate >= se.timeout) {
return true, 2
}
// Hasn't been finished according to batch timestamp but hasn't been read from partition for a long time
if (batchTimeDiff < se.timeout) && (currTime-se.timeCtrl.LastUpdateTimestamp(sessID) >= se.timeout) {
return true, 3
}
return false, 0
}
// Find ended sessions
endedCandidates := make(map[uint64]uint64, len(se.sessions)/2) // [sessionID]lastUserTime
for sessID, sess := range se.sessions {
if ended, _ := isSessionEnded(sessID, sess); ended {
sess.isEnded = true
endedCandidates[sessID] = sess.lastUserTime
}
}
// Process ended sessions
for sessID, completed := range handler(endedCandidates) {
if completed {
delete(se.sessions, sessID)
se.metrics.DecreaseActiveSessions()
se.metrics.IncreaseClosedSessions()
}
}
}

View file

@ -0,0 +1,287 @@
package sessions
import (
"context"
"fmt"
"openreplay/backend/pkg/metrics/database"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/db/redis"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/projects"
"openreplay/backend/pkg/url"
)
type Sessions interface {
Add(session *Session) error
AddCached(sessionID uint64, data map[string]string) error
Get(sessionID uint64) (*Session, error)
GetUpdated(sessionID uint64, keepInCache bool) (*Session, error)
GetCached(sessionID uint64) (map[string]string, error)
GetDuration(sessionID uint64) (uint64, error)
GetManySessions(sessionIDs []uint64) (map[uint64]*Session, error)
UpdateDuration(sessionID uint64, timestamp uint64) (uint64, error)
UpdateEncryptionKey(sessionID uint64, key []byte) error
UpdateUserID(sessionID uint64, userID string) error
UpdateAnonymousID(sessionID uint64, userAnonymousID string) error
UpdateReferrer(sessionID uint64, referrer string) error
UpdateUTM(sessionID uint64, url string) error
UpdateMetadata(sessionID uint64, key, value string) error
UpdateEventsStats(sessionID uint64, events, pages int) error
UpdateIssuesStats(sessionID uint64, errors, issueScore int) error
Commit()
}
type sessionsImpl struct {
log logger.Logger
cache Cache
storage Storage
updates Updates
projects projects.Projects
}
func New(log logger.Logger, db pool.Pool, proj projects.Projects, redis *redis.Client, metrics database.Database) Sessions {
return &sessionsImpl{
log: log,
cache: NewInMemoryCache(log, NewCache(redis, metrics)),
storage: NewStorage(db),
updates: NewSessionUpdates(log, db, metrics),
projects: proj,
}
}
// Add usage: /start endpoint in http service
func (s *sessionsImpl) Add(session *Session) error {
ctx := context.WithValue(context.Background(), "sessionID", session.SessionID)
if cachedSession, err := s.cache.Get(session.SessionID); err == nil {
s.log.Info(ctx, "[!] Session already exists in cache, new: %+v, cached: %+v", session, cachedSession)
}
err := s.storage.Add(session)
if err != nil {
return err
}
proj, err := s.projects.GetProject(session.ProjectID)
if err != nil {
return err
}
session.SaveRequestPayload = proj.SaveRequestPayloads
if err := s.cache.Set(session); err != nil {
s.log.Warn(ctx, "failed to cache session: %s", err)
}
return nil
}
func (s *sessionsImpl) getFromDB(sessionID uint64) (*Session, error) {
session, err := s.storage.Get(sessionID)
if err != nil {
return nil, fmt.Errorf("failed to get session from postgres: %s", err)
}
proj, err := s.projects.GetProject(session.ProjectID)
if err != nil {
return nil, fmt.Errorf("failed to get active project: %d, err: %s", session.ProjectID, err)
}
session.SaveRequestPayload = proj.SaveRequestPayloads
return session, nil
}
// Get usage: db message processor + connectors in feature
func (s *sessionsImpl) Get(sessionID uint64) (*Session, error) {
if sess, err := s.cache.Get(sessionID); err == nil {
return sess, nil
}
// Get from postgres and update in-memory and redis caches
session, err := s.getFromDB(sessionID)
if err != nil {
return nil, err
}
s.cache.Set(session)
return session, nil
}
// Special method for clickhouse connector
func (s *sessionsImpl) GetUpdated(sessionID uint64, keepInCache bool) (*Session, error) {
session, err := s.getFromDB(sessionID)
if err != nil {
return nil, err
}
if !keepInCache {
return session, nil
}
if err := s.cache.Set(session); err != nil {
ctx := context.WithValue(context.Background(), "sessionID", sessionID)
s.log.Warn(ctx, "failed to cache session: %s", err)
}
return session, nil
}
func (s *sessionsImpl) AddCached(sessionID uint64, data map[string]string) error {
return s.cache.SetCache(sessionID, data)
}
func (s *sessionsImpl) GetCached(sessionID uint64) (map[string]string, error) {
return s.cache.GetCache(sessionID)
}
// GetDuration usage: in ender to check current and new duration to avoid duplicates
func (s *sessionsImpl) GetDuration(sessionID uint64) (uint64, error) {
if sess, err := s.cache.Get(sessionID); err == nil {
if sess.Duration != nil {
return *sess.Duration, nil
}
return 0, nil
}
session, err := s.getFromDB(sessionID)
if err != nil {
return 0, err
}
if err := s.cache.Set(session); err != nil {
ctx := context.WithValue(context.Background(), "sessionID", sessionID)
s.log.Warn(ctx, "failed to cache session: %s", err)
}
if session.Duration != nil {
return *session.Duration, nil
}
return 0, nil
}
// GetManySessions is useful for the ender service only (grab session's startTs and duration)
func (s *sessionsImpl) GetManySessions(sessionIDs []uint64) (map[uint64]*Session, error) {
res := make(map[uint64]*Session, len(sessionIDs))
toRequest := make([]uint64, 0, len(sessionIDs))
// Grab sessions from the cache
for _, sessionID := range sessionIDs {
if sess, err := s.cache.Get(sessionID); err == nil {
res[sessionID] = sess
} else {
toRequest = append(toRequest, sessionID)
}
}
if len(toRequest) == 0 {
return res, nil
}
// Grab the rest from the database
sessionFromDB, err := s.storage.GetMany(toRequest)
if err != nil {
return nil, err
}
for _, sess := range sessionFromDB {
res[sess.SessionID] = sess
}
return res, nil
}
// UpdateDuration usage: in ender to update session duration
func (s *sessionsImpl) UpdateDuration(sessionID uint64, timestamp uint64) (uint64, error) {
newDuration, err := s.storage.UpdateDuration(sessionID, timestamp)
if err != nil {
return 0, err
}
// Update session info in cache for future usage (for example in connectors)
session, err := s.getFromDB(sessionID)
if err != nil {
return 0, err
}
session.Duration = &newDuration
if err := s.cache.Set(session); err != nil {
ctx := context.WithValue(context.Background(), "sessionID", sessionID)
s.log.Warn(ctx, "failed to cache session: %s", err)
}
return newDuration, nil
}
// UpdateEncryptionKey usage: in ender to update session encryption key if encryption is enabled
func (s *sessionsImpl) UpdateEncryptionKey(sessionID uint64, key []byte) error {
ctx := context.WithValue(context.Background(), "sessionID", sessionID)
if err := s.storage.InsertEncryptionKey(sessionID, key); err != nil {
return err
}
if session, err := s.cache.Get(sessionID); err != nil {
session.EncryptionKey = string(key)
if err := s.cache.Set(session); err != nil {
s.log.Warn(ctx, "failed to cache session: %s", err)
}
return nil
}
session, err := s.getFromDB(sessionID)
if err != nil {
s.log.Error(ctx, "failed to get session from postgres: %s", err)
return nil
}
if err := s.cache.Set(session); err != nil {
s.log.Warn(ctx, "failed to cache session: %s", err)
}
return nil
}
// UpdateUserID usage: in db handler
func (s *sessionsImpl) UpdateUserID(sessionID uint64, userID string) error {
s.updates.AddUserID(sessionID, userID)
return nil
}
// UpdateAnonymousID usage: in db handler
func (s *sessionsImpl) UpdateAnonymousID(sessionID uint64, userAnonymousID string) error {
s.updates.AddUserID(sessionID, userAnonymousID)
return nil
}
// UpdateReferrer usage: in db handler on each page event
func (s *sessionsImpl) UpdateReferrer(sessionID uint64, referrer string) error {
if referrer == "" {
return nil
}
baseReferrer := url.DiscardURLQuery(referrer)
s.updates.SetReferrer(sessionID, referrer, baseReferrer)
return nil
}
func (s *sessionsImpl) UpdateUTM(sessionID uint64, pageUrl string) error {
params, err := url.GetURLQueryParams(pageUrl)
if err != nil {
return err
}
utmSource := params["utm_source"]
utmMedium := params["utm_medium"]
utmCampaign := params["utm_campaign"]
if utmSource == "" && utmMedium == "" && utmCampaign == "" {
return nil
}
s.updates.SetUTM(sessionID, utmSource, utmMedium, utmCampaign)
return nil
}
// UpdateMetadata usage: in db handler on each metadata event
func (s *sessionsImpl) UpdateMetadata(sessionID uint64, key, value string) error {
session, err := s.Get(sessionID)
if err != nil {
return err
}
project, err := s.projects.GetProject(session.ProjectID)
if err != nil {
return err
}
keyNo := project.GetMetadataNo(key)
if keyNo == 0 {
return nil
}
s.updates.SetMetadata(sessionID, keyNo, value)
return nil
}
func (s *sessionsImpl) UpdateEventsStats(sessionID uint64, events, pages int) error {
s.updates.AddEvents(sessionID, events, pages)
return nil
}
func (s *sessionsImpl) UpdateIssuesStats(sessionID uint64, errors, issueScore int) error {
s.updates.AddIssues(sessionID, errors, issueScore)
return nil
}
func (s *sessionsImpl) Commit() {
s.updates.Commit()
}

View file

@ -0,0 +1,200 @@
package sessions
import (
"fmt"
"github.com/jackc/pgtype"
"github.com/lib/pq"
"openreplay/backend/pkg/db/postgres/pool"
)
type Storage interface {
Add(sess *Session) error
Get(sessionID uint64) (*Session, error)
GetMany(sessionIDs []uint64) ([]*Session, error)
GetDuration(sessionID uint64) (uint64, error)
UpdateDuration(sessionID uint64, timestamp uint64) (uint64, error)
InsertEncryptionKey(sessionID uint64, key []byte) error
InsertUserID(sessionID uint64, userID string) error
InsertUserAnonymousID(sessionID uint64, userAnonymousID string) error
InsertReferrer(sessionID uint64, referrer, baseReferrer string) error
InsertMetadata(sessionID uint64, keyNo uint, value string) error
}
type storageImpl struct {
db pool.Pool
}
func NewStorage(db pool.Pool) Storage {
return &storageImpl{
db: db,
}
}
func (s *storageImpl) Add(sess *Session) error {
return s.db.Exec(`
INSERT INTO sessions (
session_id, project_id, start_ts,
user_uuid, user_device, user_device_type, user_country,
user_os, user_os_version,
rev_id,
tracker_version, issue_score,
platform,
user_browser, user_browser_version, user_device_memory_size, user_device_heap_size,
user_id, user_state, user_city, timezone, screen_width, screen_height
) VALUES (
$1, $2, $3,
$4, $5, $6, $7,
$8, NULLIF($9, ''),
NULLIF($10, ''),
$11, $12,
$13,
NULLIF($14, ''), NULLIF($15, ''), NULLIF($16, 0), NULLIF($17, 0::bigint),
NULLIF(LEFT($18, 8000), ''), NULLIF($19, ''), NULLIF($20, ''), $21, $22, $23
)`,
sess.SessionID, sess.ProjectID, sess.Timestamp,
sess.UserUUID, sess.UserDevice, sess.UserDeviceType, sess.UserCountry,
sess.UserOS, sess.UserOSVersion,
sess.RevID,
sess.TrackerVersion, sess.Timestamp/1000,
sess.Platform,
sess.UserBrowser, sess.UserBrowserVersion, sess.UserDeviceMemorySize, sess.UserDeviceHeapSize,
sess.UserID, sess.UserState, sess.UserCity, sess.Timezone, sess.ScreenWidth, sess.ScreenHeight,
)
}
func (s *storageImpl) Get(sessionID uint64) (*Session, error) {
sess := &Session{SessionID: sessionID}
var revID, userOSVersion, userBrowser, userBrowserVersion, userState, userCity *string
var issueTypes pgtype.EnumArray
if err := s.db.QueryRow(`
SELECT platform,
duration, project_id, start_ts, timezone,
user_uuid, user_os, user_os_version,
user_device, user_device_type, user_country, user_state, user_city,
rev_id, tracker_version,
user_id, user_anonymous_id, referrer,
pages_count, events_count, errors_count, issue_types,
user_browser, user_browser_version, issue_score,
metadata_1, metadata_2, metadata_3, metadata_4, metadata_5,
metadata_6, metadata_7, metadata_8, metadata_9, metadata_10,
utm_source, utm_medium, utm_campaign
FROM sessions
WHERE session_id=$1
`,
sessionID,
).Scan(&sess.Platform,
&sess.Duration, &sess.ProjectID, &sess.Timestamp, &sess.Timezone,
&sess.UserUUID, &sess.UserOS, &userOSVersion,
&sess.UserDevice, &sess.UserDeviceType, &sess.UserCountry, &userState, &userCity,
&revID, &sess.TrackerVersion,
&sess.UserID, &sess.UserAnonymousID, &sess.Referrer,
&sess.PagesCount, &sess.EventsCount, &sess.ErrorsCount, &issueTypes,
&userBrowser, &userBrowserVersion, &sess.IssueScore,
&sess.Metadata1, &sess.Metadata2, &sess.Metadata3, &sess.Metadata4, &sess.Metadata5,
&sess.Metadata6, &sess.Metadata7, &sess.Metadata8, &sess.Metadata9, &sess.Metadata10,
&sess.UtmSource, &sess.UtmMedium, &sess.UtmCampaign); err != nil {
return nil, err
}
if userOSVersion != nil {
sess.UserOSVersion = *userOSVersion
}
if userBrowser != nil {
sess.UserBrowser = *userBrowser
}
if userBrowserVersion != nil {
sess.UserBrowserVersion = *userBrowserVersion
}
if revID != nil {
sess.RevID = *revID
}
issueTypes.AssignTo(&sess.IssueTypes)
if userState != nil {
sess.UserState = *userState
}
if userCity != nil {
sess.UserCity = *userCity
}
return sess, nil
}
// For the ender service only
func (s *storageImpl) GetMany(sessionIDs []uint64) ([]*Session, error) {
rows, err := s.db.Query("SELECT session_id, COALESCE( duration, 0 ), start_ts FROM sessions WHERE session_id = ANY($1)", pq.Array(sessionIDs))
if err != nil {
return nil, err
}
defer rows.Close()
sessions := make([]*Session, 0, len(sessionIDs))
for rows.Next() {
sess := &Session{}
if err := rows.Scan(&sess.SessionID, &sess.Duration, &sess.Timestamp); err != nil {
return nil, err
}
sessions = append(sessions, sess)
}
return sessions, nil
}
func (s *storageImpl) GetDuration(sessionID uint64) (uint64, error) {
var dur uint64
if err := s.db.QueryRow("SELECT COALESCE( duration, 0 ) FROM sessions WHERE session_id=$1", sessionID).Scan(&dur); err != nil {
return 0, err
}
return dur, nil
}
func (s *storageImpl) UpdateDuration(sessionID uint64, timestamp uint64) (uint64, error) {
var dur uint64
if err := s.db.QueryRow(`
UPDATE sessions SET duration=$2 - start_ts
WHERE session_id=$1
RETURNING duration
`,
sessionID, timestamp,
).Scan(&dur); err != nil {
return 0, err
}
return dur, nil
}
func (s *storageImpl) InsertEncryptionKey(sessionID uint64, key []byte) error {
sqlRequest := `
UPDATE sessions
SET file_key = $2
WHERE session_id = $1`
return s.db.Exec(sqlRequest, sessionID, string(key))
}
func (s *storageImpl) InsertUserID(sessionID uint64, userID string) error {
sqlRequest := `
UPDATE sessions
SET user_id = LEFT($1, 8000)
WHERE session_id = $2`
return s.db.Exec(sqlRequest, userID, sessionID)
}
func (s *storageImpl) InsertUserAnonymousID(sessionID uint64, userAnonymousID string) error {
sqlRequest := `
UPDATE sessions
SET user_anonymous_id = LEFT($1, 8000)
WHERE session_id = $2`
return s.db.Exec(sqlRequest, userAnonymousID, sessionID)
}
func (s *storageImpl) InsertReferrer(sessionID uint64, referrer, baseReferrer string) error {
sqlRequest := `
UPDATE sessions
SET referrer = LEFT($1, 8000), base_referrer = LEFT($2, 8000)
WHERE session_id = $3 AND referrer IS NULL`
return s.db.Exec(sqlRequest, referrer, baseReferrer, sessionID)
}
func (s *storageImpl) InsertMetadata(sessionID uint64, keyNo uint, value string) error {
sqlRequest := `
UPDATE sessions
SET metadata_%v = LEFT($1, 8000)
WHERE session_id = $2`
return s.db.Exec(fmt.Sprintf(sqlRequest, keyNo), value, sessionID)
}

View file

@ -56,6 +56,7 @@ CREATE TABLE IF NOT EXISTS product_analytics.devices
"$screen_height" UInt16 DEFAULT 0,
"$screen_width" UInt16 DEFAULT 0,
"$os" LowCardinality(String) DEFAULT '',
"$os_version" LowCardinality(String) DEFAULT '',
"$browser" LowCardinality(String) DEFAULT '',
"$browser_version" String DEFAULT '',
@ -119,9 +120,10 @@ CREATE TABLE IF NOT EXISTS product_analytics.events
"$sdk_version" LowCardinality(String),
"$device_id" String,
"$os" LowCardinality(String) DEFAULT '',
"$os_version" LowCardinality(String) DEFAULT '',
"$browser" LowCardinality(String) DEFAULT '',
"$browser_version" String DEFAULT '',
"$device" String DEFAULT '' COMMENT 'web/mobile',
"$device" LowCardinality(String) DEFAULT '' COMMENT 'in session, it is platform; web/mobile',
"$screen_height" UInt16 DEFAULT 0,
"$screen_width" UInt16 DEFAULT 0,
"$current_url" String DEFAULT '',
@ -141,6 +143,7 @@ CREATE TABLE IF NOT EXISTS product_analytics.events
"$timezone" Int8 DEFAULT 0 COMMENT 'timezone will be x10 in order to take into consideration countries with tz=N,5H',
issue_type Enum8(''=0,'click_rage'=1,'dead_click'=2,'excessive_scrolling'=3,'bad_request'=4,'missing_resource'=5,'memory'=6,'cpu'=7,'slow_resource'=8,'slow_page_load'=9,'crash'=10,'ml_cpu'=11,'ml_memory'=12,'ml_dead_click'=13,'ml_click_rage'=14,'ml_mouse_thrashing'=15,'ml_excessive_scrolling'=16,'ml_slow_resources'=17,'custom'=18,'js_exception'=19,'mouse_thrashing'=20,'app_crash'=21) DEFAULT '',
issue_id String DEFAULT '',
error_id String DEFAULT '',
-- Created by the backend
"$tags" Array(String) DEFAULT [] COMMENT 'tags are used to filter events',
"$import" BOOL DEFAULT FALSE,

View file

@ -389,6 +389,7 @@ CREATE TABLE IF NOT EXISTS product_analytics.devices
"$screen_height" UInt16 DEFAULT 0,
"$screen_width" UInt16 DEFAULT 0,
"$os" LowCardinality(String) DEFAULT '',
"$os_version" LowCardinality(String) DEFAULT '',
"$browser" LowCardinality(String) DEFAULT '',
"$browser_version" String DEFAULT '',
@ -452,9 +453,10 @@ CREATE TABLE IF NOT EXISTS product_analytics.events
"$sdk_version" LowCardinality(String),
"$device_id" String,
"$os" LowCardinality(String) DEFAULT '',
"$os_version" LowCardinality(String) DEFAULT '',
"$browser" LowCardinality(String) DEFAULT '',
"$browser_version" String DEFAULT '',
"$device" String DEFAULT '' COMMENT 'web/mobile',
"$device" LowCardinality(String) DEFAULT '' COMMENT 'in session, it is platform; web/mobile',
"$screen_height" UInt16 DEFAULT 0,
"$screen_width" UInt16 DEFAULT 0,
"$current_url" String DEFAULT '',
@ -474,6 +476,7 @@ CREATE TABLE IF NOT EXISTS product_analytics.events
"$timezone" Int8 DEFAULT 0 COMMENT 'timezone will be x10 in order to take into consideration countries with tz=N,5H',
issue_type Enum8(''=0,'click_rage'=1,'dead_click'=2,'excessive_scrolling'=3,'bad_request'=4,'missing_resource'=5,'memory'=6,'cpu'=7,'slow_resource'=8,'slow_page_load'=9,'crash'=10,'ml_cpu'=11,'ml_memory'=12,'ml_dead_click'=13,'ml_click_rage'=14,'ml_mouse_thrashing'=15,'ml_excessive_scrolling'=16,'ml_slow_resources'=17,'custom'=18,'js_exception'=19,'mouse_thrashing'=20,'app_crash'=21) DEFAULT '',
issue_id String DEFAULT '',
error_id String DEFAULT '',
-- Created by the backend
"$tags" Array(String) DEFAULT [] COMMENT 'tags are used to filter events',
"$import" BOOL DEFAULT FALSE,

View file

@ -23,4 +23,4 @@ MINIO_SECRET_KEY = ''
# APP and TRACKER VERSIONS
VERSION = 1.22.0
TRACKER_VERSION = '15.0.5'
TRACKER_VERSION = '16.0.1'

View file

@ -12,7 +12,7 @@ COPY nginx.conf /etc/nginx/conf.d/default.conf
# Default step in docker build
FROM nginx:alpine
FROM cgr.dev/chainguard/nginx
LABEL maintainer=Rajesh<rajesh@openreplay.com>
ARG GIT_SHA
LABEL GIT_SHA=$GIT_SHA
@ -22,10 +22,3 @@ COPY nginx.conf /etc/nginx/conf.d/default.conf
ENV GIT_SHA=$GIT_SHA
EXPOSE 8080
RUN chown -R nginx:nginx /var/cache/nginx && \
chown -R nginx:nginx /var/log/nginx && \
chown -R nginx:nginx /etc/nginx/conf.d && \
touch /var/run/nginx.pid && \
chown -R nginx:nginx /var/run/nginx.pid
USER nginx

Binary file not shown.

Before

Width:  |  Height:  |  Size: 58 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 233 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 73 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 204 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 58 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 233 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 46 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 251 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 41 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 93 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 54 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 241 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 36 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 97 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 119 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 434 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 173 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 668 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 43 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 93 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 78 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 174 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 76 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 172 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 119 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 257 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 702 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 34 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 84 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 42 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 146 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 47 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 131 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 118 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 29 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 79 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 106 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 378 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 28 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 65 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 101 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 326 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 58 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 136 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 41 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 98 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 21 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 193 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 67 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 192 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 73 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 179 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 8.1 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 11 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 11 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 14 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.4 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 16 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 7.2 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 10 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 8.5 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 6.2 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 8.1 KiB

View file

@ -35,7 +35,7 @@ function WidgetDateRange({
};
const onChangeComparison = (period: any) => {
if (compPeriod) {
if (compPeriod && period) {
if (compPeriod.start === period.start && compPeriod.end === period.end) {
return;
}

View file

@ -88,7 +88,7 @@ function WebPlayer(props: any) {
);
if (usePrefetched) {
if (mobData?.data) {
WebPlayerInst.preloadFirstFile(mobData?.data);
WebPlayerInst.preloadFirstFile(mobData?.data, mobData?.fileKey);
}
}
setContextValue({ player: WebPlayerInst, store: PlayerStore });

View file

@ -71,11 +71,11 @@ function PageInsightsPanel({ setActiveTab }: Props) {
}
}, [insightsFilters]);
const onPageSelect = ({ value }: any) => {
const event = events.find((item) => item.url === value.value);
const onPageSelect = (value: any) => {
const event = events.find((item) => item.url === value);
Player.jump(event.timestamp - startTs + JUMP_OFFSET);
Player.pause();
setInsightsFilters({ ...insightsFilters, url: value.value });
setInsightsFilters({ ...insightsFilters, url: value });
};
return (
@ -88,10 +88,9 @@ function PageInsightsPanel({ setActiveTab }: Props) {
placeholder="change"
options={urlOptions}
defaultValue={defaultValue}
onChange={onPageSelect}
onChange={(value) => onPageSelect(value)}
id="change-dropdown"
className="w-full rounded-lg max-w-[270px]"
dropdownStyle={{}}
/>
</Form.Item>
<Tooltip title={t('Close Panel')} placement="bottomRight">

View file

@ -182,6 +182,7 @@ function SessionItem(props: RouteComponentProps & Props) {
await sessionStore.getFirstMob(sessionId);
setPrefetched(PREFETCH_STATE.fetched);
} catch (e) {
setPrefetched(PREFETCH_STATE.none)
console.error('Error while prefetching first mob', e);
}
}, [prefetchState, live, isAssist, isMobile, sessionStore, sessionId]);

View file

@ -28,18 +28,18 @@ export const checkValues = (key: any, value: any) => {
};
export const filterMap = ({
category,
value,
key,
operator,
sourceOperator,
source,
custom,
isEvent,
filters,
sort,
order
}: any) => ({
category,
value,
key,
operator,
sourceOperator,
source,
custom,
isEvent,
filters,
sort,
order
}: any) => ({
value: checkValues(key, value),
custom,
type: category === FilterCategory.METADATA ? FilterKey.METADATA : key,
@ -254,7 +254,7 @@ class SearchStore {
this.savedSearch = new SavedSearch({});
sessionStore.clearList();
void this.fetchSessions(true);
// void this.fetchSessions(true);
}
async checkForLatestSessionCount(): Promise<void> {

View file

@ -200,7 +200,7 @@ export default class SessionStore {
userTimezone = '';
prefetchedMobUrls: Record<string, { data: Uint8Array; entryNum: number }> =
prefetchedMobUrls: Record<string, { data: Uint8Array; entryNum: number, fileKey?: string }> =
{};
prefetched: boolean = false;
@ -230,13 +230,13 @@ export default class SessionStore {
};
getFirstMob = async (sessionId: string) => {
const { domURL } = await sessionService.getFirstMobUrl(sessionId);
const { domURL, fileKey } = await sessionService.getFirstMobUrl(sessionId);
await loadFile(domURL[0], (data) =>
this.setPrefetchedMobUrl(sessionId, data),
this.setPrefetchedMobUrl(sessionId, data, fileKey),
);
};
setPrefetchedMobUrl = (sessionId: string, fileData: Uint8Array) => {
setPrefetchedMobUrl = (sessionId: string, fileData: Uint8Array, fileKey?: string) => {
const keys = Object.keys(this.prefetchedMobUrls);
const toLimit = 10 - keys.length;
if (toLimit < 0) {
@ -255,6 +255,7 @@ export default class SessionStore {
: 0;
this.prefetchedMobUrls[sessionId] = {
data: fileData,
fileKey,
entryNum: nextEntryNum,
};
};

View file

@ -45,7 +45,7 @@ export interface SessionFilesInfo {
devtoolsURL: string[];
/** deprecated */
mobsUrl: string[];
fileKey: string | null;
fileKey?: string | null;
events: Record<string, any>[];
stackEvents: Record<string, any>[];
frustrations: Record<string, any>[];

View file

@ -224,7 +224,8 @@ export default class MessageLoader {
preloaded = false;
async preloadFirstFile(data: Uint8Array) {
async preloadFirstFile(data: Uint8Array, fileKey?: string) {
this.session.fileKey = fileKey;
this.mobParser = this.createNewParser(true, this.processMessages, 'p:dom');
try {
@ -343,10 +344,6 @@ export default class MessageLoader {
const efsDomFilePromise = requestEFSDom(this.session.sessionId);
const efsDevtoolsFilePromise = requestEFSDevtools(this.session.sessionId);
const [domData, devtoolsData] = await Promise.allSettled([
efsDomFilePromise,
efsDevtoolsFilePromise,
]);
const domParser = this.createNewParser(
false,
this.processMessages,
@ -357,6 +354,11 @@ export default class MessageLoader {
this.processMessages,
'devtoolsEFS',
);
const [domData, devtoolsData] = await Promise.allSettled([
efsDomFilePromise,
efsDevtoolsFilePromise,
]);
const parseDomPromise: Promise<any> =
domData.status === 'fulfilled'
? domParser(domData.value)
@ -366,7 +368,8 @@ export default class MessageLoader {
? devtoolsParser(devtoolsData.value)
: Promise.reject('No devtools file in EFS');
await Promise.all([parseDomPromise, parseDevtoolsPromise]);
await Promise.allSettled([parseDomPromise, parseDevtoolsPromise]);
this.store.update({ domLoading: false, devtoolsLoading: false });
this.messageManager.onFileReadFinally();
this.messageManager.onFileReadSuccess();
};

View file

@ -211,6 +211,7 @@ export default class MessageManager {
public onFileReadFinally = () => {
this.waitingForFiles = false;
this.setMessagesLoading(false);
this.state.update({ messagesProcessed: true });
};

View file

@ -102,8 +102,8 @@ export default class WebPlayer extends Player {
window.playerJumpToTime = this.jump.bind(this);
}
preloadFirstFile(data: Uint8Array) {
void this.messageLoader.preloadFirstFile(data);
preloadFirstFile(data: Uint8Array, fileKey?: string) {
void this.messageLoader.preloadFirstFile(data, fileKey);
}
reinit(session: SessionFilesInfo) {

View file

@ -45,7 +45,7 @@ export default class SettingsService {
.catch((e) => Promise.reject(e));
}
getFirstMobUrl(sessionId: string): Promise<{ domURL: string[] }> {
getFirstMobUrl(sessionId: string): Promise<{ domURL: string[], fileKey?: string }> {
return this.client
.get(`/sessions/${sessionId}/first-mob`)
.then((r) => r.json())

View file

@ -6,7 +6,7 @@ COMMON_JWT_REFRESH_SECRET="change_me_jwt_refresh"
COMMON_S3_KEY="change_me_s3_key"
COMMON_S3_SECRET="change_me_s3_secret"
COMMON_PG_PASSWORD="change_me_pg_password"
COMMON_VERSION="v1.16.0"
COMMON_VERSION="v1.21.0"
## DB versions
######################################
POSTGRES_VERSION="14.5.0"

View file

@ -15,7 +15,7 @@ services:
image: bitnami/redis:${REDIS_VERSION}
container_name: redis
volumes:
- redisdata:/var/lib/postgresql/data
- redisdata:/bitnami/redis/data
networks:
- openreplay-net
environment:
@ -208,15 +208,6 @@ services:
- sourcemapreader.env
restart: unless-stopped
videostorage-openreplay:
image: public.ecr.aws/p1t3u8a3/videostorage:${COMMON_VERSION}
container_name: videostorage
networks:
- openreplay-net
env_file:
- videostorage.env
restart: unless-stopped
http-openreplay:
image: public.ecr.aws/p1t3u8a3/http:${COMMON_VERSION}
container_name: http

Some files were not shown because too many files have changed in this diff Show more