diff --git a/api/chalicelib/core/metrics/custom_metrics.py b/api/chalicelib/core/metrics/custom_metrics.py index 60abcfee2..bbba7b1c7 100644 --- a/api/chalicelib/core/metrics/custom_metrics.py +++ b/api/chalicelib/core/metrics/custom_metrics.py @@ -81,8 +81,8 @@ def __get_timeseries_chart(project: schemas.ProjectContext, data: schemas.CardTi series_charts = [] for i, s in enumerate(data.series): series_charts.append(sessions.search2_series(data=s.filter, project_id=project.project_id, density=data.density, - view_type=data.view_type, metric_type=data.metric_type, - metric_of=data.metric_of, metric_value=data.metric_value)) + metric_type=data.metric_type, metric_of=data.metric_of, + metric_value=data.metric_value)) results = [{}] * len(series_charts[0]) for i in range(len(results)): diff --git a/api/chalicelib/core/sessions/__init__.py b/api/chalicelib/core/sessions/__init__.py index e69de29bb..d6e05f166 100644 --- a/api/chalicelib/core/sessions/__init__.py +++ b/api/chalicelib/core/sessions/__init__.py @@ -0,0 +1,11 @@ +import logging + +from decouple import config + +logger = logging.getLogger(__name__) +from . import sessions as sessions_legacy + +if config("EXP_METRICS", cast=bool, default=False): + from . import sessions_ch as sessions +else: + from . import sessions diff --git a/api/chalicelib/core/sessions/sessions.py b/api/chalicelib/core/sessions/sessions.py index e2da4da62..08e481b9e 100644 --- a/api/chalicelib/core/sessions/sessions.py +++ b/api/chalicelib/core/sessions/sessions.py @@ -172,8 +172,8 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_ # TODO: remove "table of" search from this function def search2_series(data: schemas.SessionsSearchPayloadSchema, project_id: int, density: int, - view_type: schemas.MetricTimeseriesViewType, metric_type: schemas.MetricType, - metric_of: schemas.MetricOfTable, metric_value: List): + metric_type: schemas.MetricType, metric_of: schemas.MetricOfTimeseries | schemas.MetricOfTable, + metric_value: List): step_size = int(metrics_helper.__get_step_size(endTimestamp=data.endTimestamp, startTimestamp=data.startTimestamp, density=density, factor=1, decimal=True)) extra_event = None @@ -189,39 +189,35 @@ def search2_series(data: schemas.SessionsSearchPayloadSchema, project_id: int, d sessions = [] with pg_client.PostgresClient() as cur: if metric_type == schemas.MetricType.TIMESERIES: - if view_type == schemas.MetricTimeseriesViewType.LINE_CHART: - if metric_of == schemas.MetricOfTimeseries.SESSION_COUNT: - # main_query = cur.mogrify(f"""WITH full_sessions AS (SELECT DISTINCT ON(s.session_id) s.session_id, s.start_ts - main_query = cur.mogrify(f"""WITH full_sessions AS (SELECT s.session_id, s.start_ts - {query_part}) - SELECT generated_timestamp AS timestamp, - COUNT(s) AS count - FROM generate_series(%(startDate)s, %(endDate)s, %(step_size)s) AS generated_timestamp - LEFT JOIN LATERAL ( SELECT 1 AS s - FROM full_sessions - WHERE start_ts >= generated_timestamp - AND start_ts <= generated_timestamp + %(step_size)s) AS sessions ON (TRUE) - GROUP BY generated_timestamp - ORDER BY generated_timestamp;""", full_args) - elif metric_of == schemas.MetricOfTimeseries.USER_COUNT: - main_query = cur.mogrify(f"""WITH full_sessions AS (SELECT s.user_id, s.start_ts - {query_part} - AND s.user_id IS NOT NULL - AND s.user_id != '') - SELECT generated_timestamp AS timestamp, - COUNT(s) AS count - FROM generate_series(%(startDate)s, %(endDate)s, %(step_size)s) AS generated_timestamp - LEFT JOIN LATERAL ( SELECT DISTINCT user_id AS s - FROM full_sessions - WHERE start_ts >= generated_timestamp - AND start_ts <= generated_timestamp + %(step_size)s) AS sessions ON (TRUE) - GROUP BY generated_timestamp - ORDER BY generated_timestamp;""", full_args) - else: - raise Exception(f"Unsupported metricOf:{metric_of}") + if metric_of == schemas.MetricOfTimeseries.SESSION_COUNT: + # main_query = cur.mogrify(f"""WITH full_sessions AS (SELECT DISTINCT ON(s.session_id) s.session_id, s.start_ts + main_query = cur.mogrify(f"""WITH full_sessions AS (SELECT s.session_id, s.start_ts + {query_part}) + SELECT generated_timestamp AS timestamp, + COUNT(s) AS count + FROM generate_series(%(startDate)s, %(endDate)s, %(step_size)s) AS generated_timestamp + LEFT JOIN LATERAL ( SELECT 1 AS s + FROM full_sessions + WHERE start_ts >= generated_timestamp + AND start_ts <= generated_timestamp + %(step_size)s) AS sessions ON (TRUE) + GROUP BY generated_timestamp + ORDER BY generated_timestamp;""", full_args) + elif metric_of == schemas.MetricOfTimeseries.USER_COUNT: + main_query = cur.mogrify(f"""WITH full_sessions AS (SELECT s.user_id, s.start_ts + {query_part} + AND s.user_id IS NOT NULL + AND s.user_id != '') + SELECT generated_timestamp AS timestamp, + COUNT(s) AS count + FROM generate_series(%(startDate)s, %(endDate)s, %(step_size)s) AS generated_timestamp + LEFT JOIN LATERAL ( SELECT DISTINCT user_id AS s + FROM full_sessions + WHERE start_ts >= generated_timestamp + AND start_ts <= generated_timestamp + %(step_size)s) AS sessions ON (TRUE) + GROUP BY generated_timestamp + ORDER BY generated_timestamp;""", full_args) else: - main_query = cur.mogrify(f"""SELECT count(DISTINCT s.session_id) AS count - {query_part};""", full_args) + raise Exception(f"Unsupported metricOf:{metric_of}") logger.debug("--------------------") logger.debug(main_query) @@ -235,10 +231,8 @@ def search2_series(data: schemas.SessionsSearchPayloadSchema, project_id: int, d logger.warning(data.model_dump_json()) logger.warning("--------------------") raise err - if view_type == schemas.MetricTimeseriesViewType.LINE_CHART: - sessions = cur.fetchall() - else: - sessions = cur.fetchone()["count"] + sessions = cur.fetchall() + elif metric_type == schemas.MetricType.TABLE: if isinstance(metric_of, schemas.MetricOfTable): main_col = "user_id" diff --git a/api/chalicelib/core/sessions/sessions_ch.py b/api/chalicelib/core/sessions/sessions_ch.py index 887fb8f6b..2bc85f469 100644 --- a/api/chalicelib/core/sessions/sessions_ch.py +++ b/api/chalicelib/core/sessions/sessions_ch.py @@ -3,8 +3,9 @@ import logging from typing import List, Union import schemas -from chalicelib.core import events, metadata, projects, metrics, sessions -from chalicelib.core.sessions import sessions_favorite, performance_event +from chalicelib.core import events, metadata, projects +from chalicelib.core.metrics import metrics +from chalicelib.core.sessions import sessions_favorite, performance_event, sessions_legacy from chalicelib.utils import pg_client, helper, metrics_helper, ch_client, exp_ch_helper from chalicelib.utils import sql_helper as sh @@ -82,17 +83,18 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_ meta_keys = [] with ch_client.ClickHouseClient() as cur: if errors_only: - main_query = cur.format(f"""SELECT DISTINCT er.error_id, - COALESCE((SELECT TRUE - FROM {exp_ch_helper.get_user_viewed_errors_table()} AS ve - WHERE er.error_id = ve.error_id - AND ve.user_id = %(userId)s LIMIT 1), FALSE) AS viewed - {query_part};""", full_args) + main_query = cur.format(query=f"""SELECT DISTINCT er.error_id, + COALESCE((SELECT TRUE + FROM {exp_ch_helper.get_user_viewed_errors_table()} AS ve + WHERE er.error_id = ve.error_id + AND ve.user_id = %(userId)s LIMIT 1), FALSE) AS viewed + {query_part};""", parameters=full_args) elif count_only: - main_query = cur.mogrify(f"""SELECT COUNT(DISTINCT s.session_id) AS count_sessions, - COUNT(DISTINCT s.user_uuid) AS count_users - {query_part};""", full_args) + main_query = cur.format(query=f"""SELECT COUNT(DISTINCT s.session_id) AS count_sessions, + COUNT(DISTINCT s.user_uuid) AS count_users + {query_part};""", + parameters=full_args) elif data.group_by_user: g_sort = "count(full_sessions)" if data.order is None: @@ -125,11 +127,11 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_ ) AS users_sessions;""", full_args) elif ids_only: - main_query = cur.format(f"""SELECT DISTINCT ON(s.session_id) s.session_id - {query_part} - ORDER BY s.session_id desc - LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s;""", - full_args) + main_query = cur.format(query=f"""SELECT DISTINCT ON(s.session_id) s.session_id + {query_part} + ORDER BY s.session_id desc + LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s;""", + parameters=full_args) else: if data.order is None: data.order = schemas.SortOrderType.DESC.value @@ -143,21 +145,22 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_ meta_keys = metadata.get(project_id=project_id) meta_map = ",'metadata',toString(map(%s))" \ % ','.join([f"'{m['key']}',coalesce(metadata_{m['index']},'None')" for m in meta_keys]) - main_query = cur.format(f"""SELECT any(total) AS count, groupArray(%(sessions_limit)s)(details) AS sessions - FROM (SELECT total, details - FROM (SELECT COUNT() OVER () AS total, - s.{sort} AS sort_key, - map({SESSION_PROJECTION_COLS_CH_MAP}{meta_map}) AS details - {query_part} + main_query = cur.format(query=f"""SELECT any(total) AS count, + groupArray(%(sessions_limit)s)(details) AS sessions + FROM (SELECT total, details + FROM (SELECT COUNT() OVER () AS total, + s.{sort} AS sort_key, + map({SESSION_PROJECTION_COLS_CH_MAP}{meta_map}) AS details + {query_part} LEFT JOIN (SELECT DISTINCT session_id - FROM experimental.user_viewed_sessions - WHERE user_id = %(userId)s AND project_id=%(project_id)s - AND _timestamp >= toDateTime(%(startDate)s / 1000)) AS viewed_sessions - ON (viewed_sessions.session_id = s.session_id) - ) AS raw - ORDER BY sort_key {data.order} - LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s) AS sorted_sessions;""", - full_args) + FROM experimental.user_viewed_sessions + WHERE user_id = %(userId)s AND project_id=%(project_id)s + AND _timestamp >= toDateTime(%(startDate)s / 1000)) AS viewed_sessions + ON (viewed_sessions.session_id = s.session_id) + ) AS raw + ORDER BY sort_key {data.order} + LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s) AS sorted_sessions;""", + parameters=full_args) logging.debug("--------------------") logging.debug(main_query) logging.debug("--------------------") @@ -196,18 +199,18 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_ def search2_series(data: schemas.SessionsSearchPayloadSchema, project_id: int, density: int, - view_type: schemas.MetricTimeseriesViewType, metric_type: schemas.MetricType, - metric_of: schemas.MetricOfTable, metric_value: List): + metric_type: schemas.MetricType, metric_of: schemas.MetricOfTimeseries | schemas.MetricOfTable, + metric_value: List): step_size = int(metrics_helper.__get_step_size(endTimestamp=data.endTimestamp, startTimestamp=data.startTimestamp, density=density)) extra_event = None if metric_of == schemas.MetricOfTable.VISITED_URL: extra_event = f"""SELECT DISTINCT ev.session_id, ev.url_path - FROM {exp_ch_helper.get_main_events_table(data.startTimestamp)} AS ev - WHERE ev.datetime >= toDateTime(%(startDate)s / 1000) - AND ev.datetime <= toDateTime(%(endDate)s / 1000) - AND ev.project_id = %(project_id)s - AND ev.event_type = 'LOCATION'""" + FROM {exp_ch_helper.get_main_events_table(data.startTimestamp)} AS ev + WHERE ev.datetime >= toDateTime(%(startDate)s / 1000) + AND ev.datetime <= toDateTime(%(endDate)s / 1000) + AND ev.project_id = %(project_id)s + AND ev.event_type = 'LOCATION'""" elif metric_of == schemas.MetricOfTable.ISSUES and len(metric_value) > 0: data.filters.append(schemas.SessionSearchFilterSchema(value=metric_value, type=schemas.FilterType.ISSUE, operator=schemas.SearchEventOperator.IS)) @@ -218,45 +221,39 @@ def search2_series(data: schemas.SessionsSearchPayloadSchema, project_id: int, d sessions = [] with ch_client.ClickHouseClient() as cur: if metric_type == schemas.MetricType.TIMESERIES: - if view_type == schemas.MetricTimeseriesViewType.LINE_CHART: - if metric_of == schemas.MetricOfTimeseries.SESSION_COUNT: - query = f"""SELECT toUnixTimestamp( - toStartOfInterval(processed_sessions.datetime, INTERVAL %(step_size)s second) - ) * 1000 AS timestamp, - COUNT(processed_sessions.session_id) AS count - FROM (SELECT s.session_id AS session_id, - s.datetime AS datetime - {query_part}) AS processed_sessions - GROUP BY timestamp - ORDER BY timestamp;""" - elif metric_of == schemas.MetricOfTimeseries.USER_COUNT: - query = f"""SELECT toUnixTimestamp( - toStartOfInterval(processed_sessions.datetime, INTERVAL %(step_size)s second) - ) * 1000 AS timestamp, - COUNT(DISTINCT processed_sessions.user_id) AS count - FROM (SELECT s.user_id AS user_id, - s.datetime AS datetime - {query_part} - WHERE isNotNull(s.user_id) - AND s.user_id != '') AS processed_sessions - GROUP BY timestamp - ORDER BY timestamp;""" - else: - raise Exception(f"Unsupported metricOf:{metric_of}") - main_query = cur.format(query, full_args) + if metric_of == schemas.MetricOfTimeseries.SESSION_COUNT: + query = f"""SELECT toUnixTimestamp( + toStartOfInterval(processed_sessions.datetime, INTERVAL %(step_size)s second) + ) * 1000 AS timestamp, + COUNT(processed_sessions.session_id) AS count + FROM (SELECT s.session_id AS session_id, + s.datetime AS datetime + {query_part}) AS processed_sessions + GROUP BY timestamp + ORDER BY timestamp;""" + elif metric_of == schemas.MetricOfTimeseries.USER_COUNT: + query = f"""SELECT toUnixTimestamp( + toStartOfInterval(processed_sessions.datetime, INTERVAL %(step_size)s second) + ) * 1000 AS timestamp, + COUNT(DISTINCT processed_sessions.user_id) AS count + FROM (SELECT s.user_id AS user_id, + s.datetime AS datetime + {query_part} + WHERE isNotNull(s.user_id) + AND s.user_id != '') AS processed_sessions + GROUP BY timestamp + ORDER BY timestamp;""" else: - main_query = cur.format(f"""SELECT count(DISTINCT s.session_id) AS count - {query_part};""", full_args) + raise Exception(f"Unsupported metricOf:{metric_of}") + main_query = cur.format(query=query, parameters=full_args) logging.debug("--------------------") logging.debug(main_query) logging.debug("--------------------") sessions = cur.execute(main_query) - if view_type == schemas.MetricTimeseriesViewType.LINE_CHART: - sessions = metrics.__complete_missing_steps(start_time=data.startTimestamp, end_time=data.endTimestamp, - density=density, neutral={"count": 0}, rows=sessions) - else: - sessions = sessions[0]["count"] if len(sessions) > 0 else 0 + sessions = metrics.__complete_missing_steps(start_time=data.startTimestamp, end_time=data.endTimestamp, + density=density, neutral={"count": 0}, rows=sessions) + elif metric_type == schemas.MetricType.TABLE: full_args["limit_s"] = 0 full_args["limit_e"] = 200 @@ -287,7 +284,7 @@ def search2_series(data: schemas.SessionsSearchPayloadSchema, project_id: int, d elif metric_of == schemas.MetricOfTable.VISITED_URL: main_col = "url_path" extra_col = "s.url_path" - main_query = cur.format(f"""{pre_query} + main_query = cur.format(query=f"""{pre_query} SELECT COUNT(DISTINCT {main_col}) OVER () AS main_count, {main_col} AS name, count(DISTINCT session_id) AS session_count @@ -299,7 +296,7 @@ def search2_series(data: schemas.SessionsSearchPayloadSchema, project_id: int, d GROUP BY {main_col} ORDER BY session_count DESC LIMIT %(limit_e)s OFFSET %(limit_s)s;""", - full_args) + parameters=full_args) logging.debug("--------------------") logging.debug(main_query) logging.debug("--------------------") @@ -444,7 +441,7 @@ def search2_table(data: schemas.SessionsSearchPayloadSchema, project_id: int, de ORDER BY total DESC LIMIT %(limit_e)s OFFSET %(limit_s)s;""" - main_query = cur.format(main_query, full_args) + main_query = cur.format(query=main_query, parameters=full_args) logging.debug("--------------------") logging.debug(main_query) logging.debug("--------------------") @@ -1625,12 +1622,12 @@ def get_session_user(project_id, user_id): def session_exists(project_id, session_id): with ch_client.ClickHouseClient() as cur: - query = cur.format(f"""SELECT 1 - FROM {exp_ch_helper.get_main_sessions_table()} - WHERE session_id=%(session_id)s - AND project_id=%(project_id)s - LIMIT 1""", - {"project_id": project_id, "session_id": session_id}) + query = cur.format(query=f"""SELECT 1 + FROM {exp_ch_helper.get_main_sessions_table()} + WHERE session_id=%(session_id)s + AND project_id=%(project_id)s + LIMIT 1""", + parameters={"project_id": project_id, "session_id": session_id}) row = cur.execute(query) return row is not None @@ -1667,4 +1664,4 @@ def check_recording_status(project_id: int) -> dict: # TODO: rewrite this function to use ClickHouse def search_sessions_by_ids(project_id: int, session_ids: list, sort_by: str = 'session_id', ascending: bool = False) -> dict: - return sessions.search_sessions_by_ids(project_id, session_ids, sort_by, ascending) + return sessions_legacy.search_sessions_by_ids(project_id, session_ids, sort_by, ascending) diff --git a/api/chalicelib/utils/ch_client_exp.py b/api/chalicelib/utils/ch_client_exp.py index dc3c06041..24cd11df8 100644 --- a/api/chalicelib/utils/ch_client_exp.py +++ b/api/chalicelib/utils/ch_client_exp.py @@ -130,14 +130,16 @@ class ClickHouseClient: def __init__(self, database=None): if self.__client is None: - if database is None and config('CH_POOL', cast=bool, default=True): - self.__client = CH_pool.get_connection() - else: + if database is not None or not config('CH_POOL', cast=bool, default=True): self.__client = clickhouse_connect.get_client(**CH_CONFIG, database=database if database else config("ch_database", default="default"), settings=settings, **extra_args) + + else: + self.__client = CH_pool.get_connection() + self.__client.execute = transform_result(self.__client.query) self.__client.format = self.format diff --git a/api/run-alerts-dev.sh b/api/run-alerts-dev.sh index 8853f512c..a9f2cd367 100755 --- a/api/run-alerts-dev.sh +++ b/api/run-alerts-dev.sh @@ -1,4 +1,4 @@ #!/bin/zsh export TZ=UTC - +export CH_POOL=false uvicorn app_alerts:app --reload --port 8888 --log-level ${S_LOGLEVEL:-warning} \ No newline at end of file