* fix(alerts): fixed alerts

* fix(chalice): fixed product analytics

* fix(chalice): fixed product analytics
This commit is contained in:
Kraiem Taha Yassine 2024-12-20 17:37:01 +01:00 committed by GitHub
parent 21992ceadb
commit 7c4ff2ed3f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 128 additions and 124 deletions

View file

@ -81,8 +81,8 @@ def __get_timeseries_chart(project: schemas.ProjectContext, data: schemas.CardTi
series_charts = [] series_charts = []
for i, s in enumerate(data.series): for i, s in enumerate(data.series):
series_charts.append(sessions.search2_series(data=s.filter, project_id=project.project_id, density=data.density, series_charts.append(sessions.search2_series(data=s.filter, project_id=project.project_id, density=data.density,
view_type=data.view_type, metric_type=data.metric_type, metric_type=data.metric_type, metric_of=data.metric_of,
metric_of=data.metric_of, metric_value=data.metric_value)) metric_value=data.metric_value))
results = [{}] * len(series_charts[0]) results = [{}] * len(series_charts[0])
for i in range(len(results)): for i in range(len(results)):

View file

@ -0,0 +1,11 @@
import logging
from decouple import config
logger = logging.getLogger(__name__)
from . import sessions as sessions_legacy
if config("EXP_METRICS", cast=bool, default=False):
from . import sessions_ch as sessions
else:
from . import sessions

View file

@ -172,8 +172,8 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_
# TODO: remove "table of" search from this function # TODO: remove "table of" search from this function
def search2_series(data: schemas.SessionsSearchPayloadSchema, project_id: int, density: int, def search2_series(data: schemas.SessionsSearchPayloadSchema, project_id: int, density: int,
view_type: schemas.MetricTimeseriesViewType, metric_type: schemas.MetricType, metric_type: schemas.MetricType, metric_of: schemas.MetricOfTimeseries | schemas.MetricOfTable,
metric_of: schemas.MetricOfTable, metric_value: List): metric_value: List):
step_size = int(metrics_helper.__get_step_size(endTimestamp=data.endTimestamp, startTimestamp=data.startTimestamp, step_size = int(metrics_helper.__get_step_size(endTimestamp=data.endTimestamp, startTimestamp=data.startTimestamp,
density=density, factor=1, decimal=True)) density=density, factor=1, decimal=True))
extra_event = None extra_event = None
@ -189,39 +189,35 @@ def search2_series(data: schemas.SessionsSearchPayloadSchema, project_id: int, d
sessions = [] sessions = []
with pg_client.PostgresClient() as cur: with pg_client.PostgresClient() as cur:
if metric_type == schemas.MetricType.TIMESERIES: if metric_type == schemas.MetricType.TIMESERIES:
if view_type == schemas.MetricTimeseriesViewType.LINE_CHART: if metric_of == schemas.MetricOfTimeseries.SESSION_COUNT:
if metric_of == schemas.MetricOfTimeseries.SESSION_COUNT: # main_query = cur.mogrify(f"""WITH full_sessions AS (SELECT DISTINCT ON(s.session_id) s.session_id, s.start_ts
# main_query = cur.mogrify(f"""WITH full_sessions AS (SELECT DISTINCT ON(s.session_id) s.session_id, s.start_ts main_query = cur.mogrify(f"""WITH full_sessions AS (SELECT s.session_id, s.start_ts
main_query = cur.mogrify(f"""WITH full_sessions AS (SELECT s.session_id, s.start_ts {query_part})
{query_part}) SELECT generated_timestamp AS timestamp,
SELECT generated_timestamp AS timestamp, COUNT(s) AS count
COUNT(s) AS count FROM generate_series(%(startDate)s, %(endDate)s, %(step_size)s) AS generated_timestamp
FROM generate_series(%(startDate)s, %(endDate)s, %(step_size)s) AS generated_timestamp LEFT JOIN LATERAL ( SELECT 1 AS s
LEFT JOIN LATERAL ( SELECT 1 AS s FROM full_sessions
FROM full_sessions WHERE start_ts >= generated_timestamp
WHERE start_ts >= generated_timestamp AND start_ts <= generated_timestamp + %(step_size)s) AS sessions ON (TRUE)
AND start_ts <= generated_timestamp + %(step_size)s) AS sessions ON (TRUE) GROUP BY generated_timestamp
GROUP BY generated_timestamp ORDER BY generated_timestamp;""", full_args)
ORDER BY generated_timestamp;""", full_args) elif metric_of == schemas.MetricOfTimeseries.USER_COUNT:
elif metric_of == schemas.MetricOfTimeseries.USER_COUNT: main_query = cur.mogrify(f"""WITH full_sessions AS (SELECT s.user_id, s.start_ts
main_query = cur.mogrify(f"""WITH full_sessions AS (SELECT s.user_id, s.start_ts {query_part}
{query_part} AND s.user_id IS NOT NULL
AND s.user_id IS NOT NULL AND s.user_id != '')
AND s.user_id != '') SELECT generated_timestamp AS timestamp,
SELECT generated_timestamp AS timestamp, COUNT(s) AS count
COUNT(s) AS count FROM generate_series(%(startDate)s, %(endDate)s, %(step_size)s) AS generated_timestamp
FROM generate_series(%(startDate)s, %(endDate)s, %(step_size)s) AS generated_timestamp LEFT JOIN LATERAL ( SELECT DISTINCT user_id AS s
LEFT JOIN LATERAL ( SELECT DISTINCT user_id AS s FROM full_sessions
FROM full_sessions WHERE start_ts >= generated_timestamp
WHERE start_ts >= generated_timestamp AND start_ts <= generated_timestamp + %(step_size)s) AS sessions ON (TRUE)
AND start_ts <= generated_timestamp + %(step_size)s) AS sessions ON (TRUE) GROUP BY generated_timestamp
GROUP BY generated_timestamp ORDER BY generated_timestamp;""", full_args)
ORDER BY generated_timestamp;""", full_args)
else:
raise Exception(f"Unsupported metricOf:{metric_of}")
else: else:
main_query = cur.mogrify(f"""SELECT count(DISTINCT s.session_id) AS count raise Exception(f"Unsupported metricOf:{metric_of}")
{query_part};""", full_args)
logger.debug("--------------------") logger.debug("--------------------")
logger.debug(main_query) logger.debug(main_query)
@ -235,10 +231,8 @@ def search2_series(data: schemas.SessionsSearchPayloadSchema, project_id: int, d
logger.warning(data.model_dump_json()) logger.warning(data.model_dump_json())
logger.warning("--------------------") logger.warning("--------------------")
raise err raise err
if view_type == schemas.MetricTimeseriesViewType.LINE_CHART: sessions = cur.fetchall()
sessions = cur.fetchall()
else:
sessions = cur.fetchone()["count"]
elif metric_type == schemas.MetricType.TABLE: elif metric_type == schemas.MetricType.TABLE:
if isinstance(metric_of, schemas.MetricOfTable): if isinstance(metric_of, schemas.MetricOfTable):
main_col = "user_id" main_col = "user_id"

View file

@ -3,8 +3,9 @@ import logging
from typing import List, Union from typing import List, Union
import schemas import schemas
from chalicelib.core import events, metadata, projects, metrics, sessions from chalicelib.core import events, metadata, projects
from chalicelib.core.sessions import sessions_favorite, performance_event from chalicelib.core.metrics import metrics
from chalicelib.core.sessions import sessions_favorite, performance_event, sessions_legacy
from chalicelib.utils import pg_client, helper, metrics_helper, ch_client, exp_ch_helper from chalicelib.utils import pg_client, helper, metrics_helper, ch_client, exp_ch_helper
from chalicelib.utils import sql_helper as sh from chalicelib.utils import sql_helper as sh
@ -82,17 +83,18 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_
meta_keys = [] meta_keys = []
with ch_client.ClickHouseClient() as cur: with ch_client.ClickHouseClient() as cur:
if errors_only: if errors_only:
main_query = cur.format(f"""SELECT DISTINCT er.error_id, main_query = cur.format(query=f"""SELECT DISTINCT er.error_id,
COALESCE((SELECT TRUE COALESCE((SELECT TRUE
FROM {exp_ch_helper.get_user_viewed_errors_table()} AS ve FROM {exp_ch_helper.get_user_viewed_errors_table()} AS ve
WHERE er.error_id = ve.error_id WHERE er.error_id = ve.error_id
AND ve.user_id = %(userId)s LIMIT 1), FALSE) AS viewed AND ve.user_id = %(userId)s LIMIT 1), FALSE) AS viewed
{query_part};""", full_args) {query_part};""", parameters=full_args)
elif count_only: elif count_only:
main_query = cur.mogrify(f"""SELECT COUNT(DISTINCT s.session_id) AS count_sessions, main_query = cur.format(query=f"""SELECT COUNT(DISTINCT s.session_id) AS count_sessions,
COUNT(DISTINCT s.user_uuid) AS count_users COUNT(DISTINCT s.user_uuid) AS count_users
{query_part};""", full_args) {query_part};""",
parameters=full_args)
elif data.group_by_user: elif data.group_by_user:
g_sort = "count(full_sessions)" g_sort = "count(full_sessions)"
if data.order is None: if data.order is None:
@ -125,11 +127,11 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_
) AS users_sessions;""", ) AS users_sessions;""",
full_args) full_args)
elif ids_only: elif ids_only:
main_query = cur.format(f"""SELECT DISTINCT ON(s.session_id) s.session_id main_query = cur.format(query=f"""SELECT DISTINCT ON(s.session_id) s.session_id
{query_part} {query_part}
ORDER BY s.session_id desc ORDER BY s.session_id desc
LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s;""", LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s;""",
full_args) parameters=full_args)
else: else:
if data.order is None: if data.order is None:
data.order = schemas.SortOrderType.DESC.value data.order = schemas.SortOrderType.DESC.value
@ -143,21 +145,22 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_
meta_keys = metadata.get(project_id=project_id) meta_keys = metadata.get(project_id=project_id)
meta_map = ",'metadata',toString(map(%s))" \ meta_map = ",'metadata',toString(map(%s))" \
% ','.join([f"'{m['key']}',coalesce(metadata_{m['index']},'None')" for m in meta_keys]) % ','.join([f"'{m['key']}',coalesce(metadata_{m['index']},'None')" for m in meta_keys])
main_query = cur.format(f"""SELECT any(total) AS count, groupArray(%(sessions_limit)s)(details) AS sessions main_query = cur.format(query=f"""SELECT any(total) AS count,
FROM (SELECT total, details groupArray(%(sessions_limit)s)(details) AS sessions
FROM (SELECT COUNT() OVER () AS total, FROM (SELECT total, details
s.{sort} AS sort_key, FROM (SELECT COUNT() OVER () AS total,
map({SESSION_PROJECTION_COLS_CH_MAP}{meta_map}) AS details s.{sort} AS sort_key,
{query_part} map({SESSION_PROJECTION_COLS_CH_MAP}{meta_map}) AS details
{query_part}
LEFT JOIN (SELECT DISTINCT session_id LEFT JOIN (SELECT DISTINCT session_id
FROM experimental.user_viewed_sessions FROM experimental.user_viewed_sessions
WHERE user_id = %(userId)s AND project_id=%(project_id)s WHERE user_id = %(userId)s AND project_id=%(project_id)s
AND _timestamp >= toDateTime(%(startDate)s / 1000)) AS viewed_sessions AND _timestamp >= toDateTime(%(startDate)s / 1000)) AS viewed_sessions
ON (viewed_sessions.session_id = s.session_id) ON (viewed_sessions.session_id = s.session_id)
) AS raw ) AS raw
ORDER BY sort_key {data.order} ORDER BY sort_key {data.order}
LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s) AS sorted_sessions;""", LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s) AS sorted_sessions;""",
full_args) parameters=full_args)
logging.debug("--------------------") logging.debug("--------------------")
logging.debug(main_query) logging.debug(main_query)
logging.debug("--------------------") logging.debug("--------------------")
@ -196,18 +199,18 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_
def search2_series(data: schemas.SessionsSearchPayloadSchema, project_id: int, density: int, def search2_series(data: schemas.SessionsSearchPayloadSchema, project_id: int, density: int,
view_type: schemas.MetricTimeseriesViewType, metric_type: schemas.MetricType, metric_type: schemas.MetricType, metric_of: schemas.MetricOfTimeseries | schemas.MetricOfTable,
metric_of: schemas.MetricOfTable, metric_value: List): metric_value: List):
step_size = int(metrics_helper.__get_step_size(endTimestamp=data.endTimestamp, startTimestamp=data.startTimestamp, step_size = int(metrics_helper.__get_step_size(endTimestamp=data.endTimestamp, startTimestamp=data.startTimestamp,
density=density)) density=density))
extra_event = None extra_event = None
if metric_of == schemas.MetricOfTable.VISITED_URL: if metric_of == schemas.MetricOfTable.VISITED_URL:
extra_event = f"""SELECT DISTINCT ev.session_id, ev.url_path extra_event = f"""SELECT DISTINCT ev.session_id, ev.url_path
FROM {exp_ch_helper.get_main_events_table(data.startTimestamp)} AS ev FROM {exp_ch_helper.get_main_events_table(data.startTimestamp)} AS ev
WHERE ev.datetime >= toDateTime(%(startDate)s / 1000) WHERE ev.datetime >= toDateTime(%(startDate)s / 1000)
AND ev.datetime <= toDateTime(%(endDate)s / 1000) AND ev.datetime <= toDateTime(%(endDate)s / 1000)
AND ev.project_id = %(project_id)s AND ev.project_id = %(project_id)s
AND ev.event_type = 'LOCATION'""" AND ev.event_type = 'LOCATION'"""
elif metric_of == schemas.MetricOfTable.ISSUES and len(metric_value) > 0: elif metric_of == schemas.MetricOfTable.ISSUES and len(metric_value) > 0:
data.filters.append(schemas.SessionSearchFilterSchema(value=metric_value, type=schemas.FilterType.ISSUE, data.filters.append(schemas.SessionSearchFilterSchema(value=metric_value, type=schemas.FilterType.ISSUE,
operator=schemas.SearchEventOperator.IS)) operator=schemas.SearchEventOperator.IS))
@ -218,45 +221,39 @@ def search2_series(data: schemas.SessionsSearchPayloadSchema, project_id: int, d
sessions = [] sessions = []
with ch_client.ClickHouseClient() as cur: with ch_client.ClickHouseClient() as cur:
if metric_type == schemas.MetricType.TIMESERIES: if metric_type == schemas.MetricType.TIMESERIES:
if view_type == schemas.MetricTimeseriesViewType.LINE_CHART: if metric_of == schemas.MetricOfTimeseries.SESSION_COUNT:
if metric_of == schemas.MetricOfTimeseries.SESSION_COUNT: query = f"""SELECT toUnixTimestamp(
query = f"""SELECT toUnixTimestamp( toStartOfInterval(processed_sessions.datetime, INTERVAL %(step_size)s second)
toStartOfInterval(processed_sessions.datetime, INTERVAL %(step_size)s second) ) * 1000 AS timestamp,
) * 1000 AS timestamp, COUNT(processed_sessions.session_id) AS count
COUNT(processed_sessions.session_id) AS count FROM (SELECT s.session_id AS session_id,
FROM (SELECT s.session_id AS session_id, s.datetime AS datetime
s.datetime AS datetime {query_part}) AS processed_sessions
{query_part}) AS processed_sessions GROUP BY timestamp
GROUP BY timestamp ORDER BY timestamp;"""
ORDER BY timestamp;""" elif metric_of == schemas.MetricOfTimeseries.USER_COUNT:
elif metric_of == schemas.MetricOfTimeseries.USER_COUNT: query = f"""SELECT toUnixTimestamp(
query = f"""SELECT toUnixTimestamp( toStartOfInterval(processed_sessions.datetime, INTERVAL %(step_size)s second)
toStartOfInterval(processed_sessions.datetime, INTERVAL %(step_size)s second) ) * 1000 AS timestamp,
) * 1000 AS timestamp, COUNT(DISTINCT processed_sessions.user_id) AS count
COUNT(DISTINCT processed_sessions.user_id) AS count FROM (SELECT s.user_id AS user_id,
FROM (SELECT s.user_id AS user_id, s.datetime AS datetime
s.datetime AS datetime {query_part}
{query_part} WHERE isNotNull(s.user_id)
WHERE isNotNull(s.user_id) AND s.user_id != '') AS processed_sessions
AND s.user_id != '') AS processed_sessions GROUP BY timestamp
GROUP BY timestamp ORDER BY timestamp;"""
ORDER BY timestamp;"""
else:
raise Exception(f"Unsupported metricOf:{metric_of}")
main_query = cur.format(query, full_args)
else: else:
main_query = cur.format(f"""SELECT count(DISTINCT s.session_id) AS count raise Exception(f"Unsupported metricOf:{metric_of}")
{query_part};""", full_args) main_query = cur.format(query=query, parameters=full_args)
logging.debug("--------------------") logging.debug("--------------------")
logging.debug(main_query) logging.debug(main_query)
logging.debug("--------------------") logging.debug("--------------------")
sessions = cur.execute(main_query) sessions = cur.execute(main_query)
if view_type == schemas.MetricTimeseriesViewType.LINE_CHART: sessions = metrics.__complete_missing_steps(start_time=data.startTimestamp, end_time=data.endTimestamp,
sessions = metrics.__complete_missing_steps(start_time=data.startTimestamp, end_time=data.endTimestamp, density=density, neutral={"count": 0}, rows=sessions)
density=density, neutral={"count": 0}, rows=sessions)
else:
sessions = sessions[0]["count"] if len(sessions) > 0 else 0
elif metric_type == schemas.MetricType.TABLE: elif metric_type == schemas.MetricType.TABLE:
full_args["limit_s"] = 0 full_args["limit_s"] = 0
full_args["limit_e"] = 200 full_args["limit_e"] = 200
@ -287,7 +284,7 @@ def search2_series(data: schemas.SessionsSearchPayloadSchema, project_id: int, d
elif metric_of == schemas.MetricOfTable.VISITED_URL: elif metric_of == schemas.MetricOfTable.VISITED_URL:
main_col = "url_path" main_col = "url_path"
extra_col = "s.url_path" extra_col = "s.url_path"
main_query = cur.format(f"""{pre_query} main_query = cur.format(query=f"""{pre_query}
SELECT COUNT(DISTINCT {main_col}) OVER () AS main_count, SELECT COUNT(DISTINCT {main_col}) OVER () AS main_count,
{main_col} AS name, {main_col} AS name,
count(DISTINCT session_id) AS session_count count(DISTINCT session_id) AS session_count
@ -299,7 +296,7 @@ def search2_series(data: schemas.SessionsSearchPayloadSchema, project_id: int, d
GROUP BY {main_col} GROUP BY {main_col}
ORDER BY session_count DESC ORDER BY session_count DESC
LIMIT %(limit_e)s OFFSET %(limit_s)s;""", LIMIT %(limit_e)s OFFSET %(limit_s)s;""",
full_args) parameters=full_args)
logging.debug("--------------------") logging.debug("--------------------")
logging.debug(main_query) logging.debug(main_query)
logging.debug("--------------------") logging.debug("--------------------")
@ -444,7 +441,7 @@ def search2_table(data: schemas.SessionsSearchPayloadSchema, project_id: int, de
ORDER BY total DESC ORDER BY total DESC
LIMIT %(limit_e)s OFFSET %(limit_s)s;""" LIMIT %(limit_e)s OFFSET %(limit_s)s;"""
main_query = cur.format(main_query, full_args) main_query = cur.format(query=main_query, parameters=full_args)
logging.debug("--------------------") logging.debug("--------------------")
logging.debug(main_query) logging.debug(main_query)
logging.debug("--------------------") logging.debug("--------------------")
@ -1625,12 +1622,12 @@ def get_session_user(project_id, user_id):
def session_exists(project_id, session_id): def session_exists(project_id, session_id):
with ch_client.ClickHouseClient() as cur: with ch_client.ClickHouseClient() as cur:
query = cur.format(f"""SELECT 1 query = cur.format(query=f"""SELECT 1
FROM {exp_ch_helper.get_main_sessions_table()} FROM {exp_ch_helper.get_main_sessions_table()}
WHERE session_id=%(session_id)s WHERE session_id=%(session_id)s
AND project_id=%(project_id)s AND project_id=%(project_id)s
LIMIT 1""", LIMIT 1""",
{"project_id": project_id, "session_id": session_id}) parameters={"project_id": project_id, "session_id": session_id})
row = cur.execute(query) row = cur.execute(query)
return row is not None return row is not None
@ -1667,4 +1664,4 @@ def check_recording_status(project_id: int) -> dict:
# TODO: rewrite this function to use ClickHouse # TODO: rewrite this function to use ClickHouse
def search_sessions_by_ids(project_id: int, session_ids: list, sort_by: str = 'session_id', def search_sessions_by_ids(project_id: int, session_ids: list, sort_by: str = 'session_id',
ascending: bool = False) -> dict: ascending: bool = False) -> dict:
return sessions.search_sessions_by_ids(project_id, session_ids, sort_by, ascending) return sessions_legacy.search_sessions_by_ids(project_id, session_ids, sort_by, ascending)

View file

@ -130,14 +130,16 @@ class ClickHouseClient:
def __init__(self, database=None): def __init__(self, database=None):
if self.__client is None: if self.__client is None:
if database is None and config('CH_POOL', cast=bool, default=True): if database is not None or not config('CH_POOL', cast=bool, default=True):
self.__client = CH_pool.get_connection()
else:
self.__client = clickhouse_connect.get_client(**CH_CONFIG, self.__client = clickhouse_connect.get_client(**CH_CONFIG,
database=database if database else config("ch_database", database=database if database else config("ch_database",
default="default"), default="default"),
settings=settings, settings=settings,
**extra_args) **extra_args)
else:
self.__client = CH_pool.get_connection()
self.__client.execute = transform_result(self.__client.query) self.__client.execute = transform_result(self.__client.query)
self.__client.format = self.format self.__client.format = self.format

View file

@ -1,4 +1,4 @@
#!/bin/zsh #!/bin/zsh
export TZ=UTC export TZ=UTC
export CH_POOL=false
uvicorn app_alerts:app --reload --port 8888 --log-level ${S_LOGLEVEL:-warning} uvicorn app_alerts:app --reload --port 8888 --log-level ${S_LOGLEVEL:-warning}