From ddfaaeb6c5b9a93584441ccaae007f32dca53aed Mon Sep 17 00:00:00 2001 From: Kraiem Taha Yassine Date: Fri, 20 Dec 2024 18:23:48 +0100 Subject: [PATCH] Dev (#2898) * feat(chalice): autocomplete return top 10 with stats * fix(chalice): fixed autocomplete top 10 meta-filters * refactor(chalice): restricted sessions search --- api/chalicelib/core/sessions/sessions.py | 246 ---------------- api/chalicelib/core/sessions/sessions_ch.py | 254 ----------------- .../core/sessions/sessions_search.py | 260 +++++++++++++++++ .../core/usability_testing/service.py | 5 +- api/routers/core_dynamic.py | 16 +- ee/api/.gitignore | 1 + ee/api/chalicelib/core/sessions/__init__.py | 2 + .../core/sessions/sessions_search_exp.py | 269 ++++++++++++++++++ ee/api/clean-dev.sh | 1 + 9 files changed, 544 insertions(+), 510 deletions(-) create mode 100644 api/chalicelib/core/sessions/sessions_search.py create mode 100644 ee/api/chalicelib/core/sessions/sessions_search_exp.py diff --git a/api/chalicelib/core/sessions/sessions.py b/api/chalicelib/core/sessions/sessions.py index 08e481b9e..80163b5d8 100644 --- a/api/chalicelib/core/sessions/sessions.py +++ b/api/chalicelib/core/sessions/sessions.py @@ -9,166 +9,6 @@ from chalicelib.utils import sql_helper as sh logger = logging.getLogger(__name__) -SESSION_PROJECTION_BASE_COLS = """s.project_id, -s.session_id::text AS session_id, -s.user_uuid, -s.user_id, -s.user_os, -s.user_browser, -s.user_device, -s.user_device_type, -s.user_country, -s.user_city, -s.user_state, -s.start_ts, -s.duration, -s.events_count, -s.pages_count, -s.errors_count, -s.user_anonymous_id, -s.platform, -s.issue_score, -s.timezone, -to_jsonb(s.issue_types) AS issue_types """ - -SESSION_PROJECTION_COLS = SESSION_PROJECTION_BASE_COLS + """, -favorite_sessions.session_id NOTNULL AS favorite, -COALESCE((SELECT TRUE - FROM public.user_viewed_sessions AS fs - WHERE s.session_id = fs.session_id - AND fs.user_id = %(userId)s LIMIT 1), FALSE) AS viewed """ - - -# This function executes the query and return result -def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_id, errors_only=False, - error_status=schemas.ErrorStatus.ALL, count_only=False, issue=None, ids_only=False, - platform="web"): - if data.bookmarked: - data.startTimestamp, data.endTimestamp = sessions_favorite.get_start_end_timestamp(project_id, user_id) - - full_args, query_part = search_query_parts(data=data, error_status=error_status, errors_only=errors_only, - favorite_only=data.bookmarked, issue=issue, project_id=project_id, - user_id=user_id, platform=platform) - if data.limit is not None and data.page is not None: - full_args["sessions_limit"] = data.limit - full_args["sessions_limit_s"] = (data.page - 1) * data.limit - full_args["sessions_limit_e"] = data.page * data.limit - else: - full_args["sessions_limit"] = 200 - full_args["sessions_limit_s"] = 0 - full_args["sessions_limit_e"] = 200 - - meta_keys = [] - with pg_client.PostgresClient() as cur: - if errors_only: - main_query = cur.mogrify(f"""SELECT DISTINCT er.error_id, - COALESCE((SELECT TRUE - FROM public.user_viewed_errors AS ve - WHERE er.error_id = ve.error_id - AND ve.user_id = %(userId)s LIMIT 1), FALSE) AS viewed - {query_part};""", full_args) - - elif count_only: - main_query = cur.mogrify(f"""SELECT COUNT(DISTINCT s.session_id) AS count_sessions, - COUNT(DISTINCT s.user_uuid) AS count_users - {query_part};""", full_args) - elif data.group_by_user: - g_sort = "count(full_sessions)" - if data.order is None: - data.order = schemas.SortOrderType.DESC.value - else: - data.order = data.order - if data.sort is not None and data.sort != 'sessionsCount': - sort = helper.key_to_snake_case(data.sort) - g_sort = f"{'MIN' if data.order == schemas.SortOrderType.DESC else 'MAX'}({sort})" - else: - sort = 'start_ts' - - meta_keys = metadata.get(project_id=project_id) - main_query = cur.mogrify(f"""SELECT COUNT(*) AS count, - COALESCE(JSONB_AGG(users_sessions) - FILTER (WHERE rn>%(sessions_limit_s)s AND rn<=%(sessions_limit_e)s), '[]'::JSONB) AS sessions - FROM (SELECT user_id, - count(full_sessions) AS user_sessions_count, - jsonb_agg(full_sessions) FILTER (WHERE rn <= 1) AS last_session, - MIN(full_sessions.start_ts) AS first_session_ts, - ROW_NUMBER() OVER (ORDER BY {g_sort} {data.order}) AS rn - FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY {sort} {data.order}) AS rn - FROM (SELECT DISTINCT ON(s.session_id) {SESSION_PROJECTION_COLS} - {"," if len(meta_keys) > 0 else ""}{",".join([f'metadata_{m["index"]}' for m in meta_keys])} - {query_part} - ) AS filtred_sessions - ) AS full_sessions - GROUP BY user_id - ) AS users_sessions;""", - full_args) - elif ids_only: - main_query = cur.mogrify(f"""SELECT DISTINCT ON(s.session_id) s.session_id - {query_part} - ORDER BY s.session_id desc - LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s;""", - full_args) - else: - if data.order is None: - data.order = schemas.SortOrderType.DESC.value - else: - data.order = data.order - sort = 'session_id' - if data.sort is not None and data.sort != "session_id": - # sort += " " + data.order + "," + helper.key_to_snake_case(data.sort) - sort = helper.key_to_snake_case(data.sort) - - meta_keys = metadata.get(project_id=project_id) - main_query = cur.mogrify(f"""SELECT COUNT(full_sessions) AS count, - COALESCE(JSONB_AGG(full_sessions) - FILTER (WHERE rn>%(sessions_limit_s)s AND rn<=%(sessions_limit_e)s), '[]'::JSONB) AS sessions - FROM (SELECT *, ROW_NUMBER() OVER (ORDER BY {sort} {data.order}, issue_score DESC) AS rn - FROM (SELECT DISTINCT ON(s.session_id) {SESSION_PROJECTION_COLS} - {"," if len(meta_keys) > 0 else ""}{",".join([f'metadata_{m["index"]}' for m in meta_keys])} - {query_part} - ORDER BY s.session_id desc) AS filtred_sessions - ORDER BY {sort} {data.order}, issue_score DESC) AS full_sessions;""", - full_args) - logger.debug("--------------------") - logger.debug(main_query) - logger.debug("--------------------") - try: - cur.execute(main_query) - sessions = cur.fetchone() - except Exception as err: - logger.warning("--------- SESSIONS SEARCH QUERY EXCEPTION -----------") - logger.warning(main_query.decode('UTF-8')) - logger.warning("--------- PAYLOAD -----------") - logger.warning(data.model_dump_json()) - logger.warning("--------------------") - raise err - if errors_only or ids_only: - return helper.list_to_camel_case(cur.fetchall()) - - if count_only: - return helper.dict_to_camel_case(sessions) - - total = sessions["count"] - sessions = sessions["sessions"] - - if data.group_by_user: - for i, s in enumerate(sessions): - sessions[i] = {**s.pop("last_session")[0], **s} - sessions[i].pop("rn") - sessions[i]["metadata"] = {k["key"]: sessions[i][f'metadata_{k["index"]}'] for k in meta_keys \ - if sessions[i][f'metadata_{k["index"]}'] is not None} - else: - for i, s in enumerate(sessions): - sessions[i]["metadata"] = {k["key"]: sessions[i][f'metadata_{k["index"]}'] for k in meta_keys \ - if sessions[i][f'metadata_{k["index"]}'] is not None} - # if not data.group_by_user and data.sort is not None and data.sort != "session_id": - # sessions = sorted(sessions, key=lambda s: s[helper.key_to_snake_case(data.sort)], - # reverse=data.order.upper() == "DESC") - return { - 'total': total, - 'sessions': helper.list_to_camel_case(sessions) - } - # TODO: remove "table of" search from this function def search2_series(data: schemas.SessionsSearchPayloadSchema, project_id: int, density: int, @@ -1162,68 +1002,6 @@ def search_query_parts(data: schemas.SessionsSearchPayloadSchema, error_status, return full_args, query_part -def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None): - if project_id is None: - all_projects = projects.get_projects(tenant_id=tenant_id) - else: - all_projects = [ - projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False, - include_gdpr=False)] - - all_projects = {int(p["projectId"]): p["name"] for p in all_projects} - project_ids = list(all_projects.keys()) - - available_keys = metadata.get_keys_by_projects(project_ids) - for i in available_keys: - available_keys[i]["user_id"] = schemas.FilterType.USER_ID - available_keys[i]["user_anonymous_id"] = schemas.FilterType.USER_ANONYMOUS_ID - results = {} - for i in project_ids: - if m_key not in available_keys[i].values(): - available_keys.pop(i) - results[i] = {"total": 0, "sessions": [], "missingMetadata": True} - project_ids = list(available_keys.keys()) - if len(project_ids) > 0: - with pg_client.PostgresClient() as cur: - sub_queries = [] - for i in project_ids: - col_name = list(available_keys[i].keys())[list(available_keys[i].values()).index(m_key)] - sub_queries.append(cur.mogrify( - f"(SELECT COALESCE(COUNT(s.*)) AS count FROM public.sessions AS s WHERE s.project_id = %(id)s AND s.{col_name} = %(value)s) AS \"{i}\"", - {"id": i, "value": m_value}).decode('UTF-8')) - query = f"""SELECT {", ".join(sub_queries)};""" - cur.execute(query=query) - - rows = cur.fetchone() - - sub_queries = [] - for i in rows.keys(): - results[i] = {"total": rows[i], "sessions": [], "missingMetadata": False, "name": all_projects[int(i)]} - if rows[i] > 0: - col_name = list(available_keys[int(i)].keys())[list(available_keys[int(i)].values()).index(m_key)] - sub_queries.append( - cur.mogrify( - f"""( - SELECT * - FROM ( - SELECT DISTINCT ON(favorite_sessions.session_id, s.session_id) {SESSION_PROJECTION_COLS} - FROM public.sessions AS s LEFT JOIN (SELECT session_id - FROM public.user_favorite_sessions - WHERE user_favorite_sessions.user_id = %(userId)s - ) AS favorite_sessions USING (session_id) - WHERE s.project_id = %(id)s AND s.duration IS NOT NULL AND s.{col_name} = %(value)s - ) AS full_sessions - ORDER BY favorite DESC, issue_score DESC - LIMIT 10 - )""", - {"id": i, "value": m_value, "userId": user_id}).decode('UTF-8')) - if len(sub_queries) > 0: - cur.execute("\nUNION\n".join(sub_queries)) - rows = cur.fetchall() - for i in rows: - results[str(i["project_id"])]["sessions"].append(helper.dict_to_camel_case(i)) - return results - def get_user_sessions(project_id, user_id, start_date, end_date): with pg_client.PostgresClient() as cur: @@ -1335,27 +1113,3 @@ def check_recording_status(project_id: int) -> dict: "sessionsCount": row["sessions_count"] } - -def search_sessions_by_ids(project_id: int, session_ids: list, sort_by: str = 'session_id', - ascending: bool = False) -> dict: - if session_ids is None or len(session_ids) == 0: - return {"total": 0, "sessions": []} - with pg_client.PostgresClient() as cur: - meta_keys = metadata.get(project_id=project_id) - params = {"project_id": project_id, "session_ids": tuple(session_ids)} - order_direction = 'ASC' if ascending else 'DESC' - main_query = cur.mogrify(f"""SELECT {SESSION_PROJECTION_BASE_COLS} - {"," if len(meta_keys) > 0 else ""}{",".join([f'metadata_{m["index"]}' for m in meta_keys])} - FROM public.sessions AS s - WHERE project_id=%(project_id)s - AND session_id IN %(session_ids)s - ORDER BY {sort_by} {order_direction};""", params) - - cur.execute(main_query) - rows = cur.fetchall() - if len(meta_keys) > 0: - for s in rows: - s["metadata"] = {} - for m in meta_keys: - s["metadata"][m["key"]] = s.pop(f'metadata_{m["index"]}') - return {"total": len(rows), "sessions": helper.list_to_camel_case(rows)} diff --git a/api/chalicelib/core/sessions/sessions_ch.py b/api/chalicelib/core/sessions/sessions_ch.py index 2bc85f469..baea95a5f 100644 --- a/api/chalicelib/core/sessions/sessions_ch.py +++ b/api/chalicelib/core/sessions/sessions_ch.py @@ -11,193 +11,6 @@ from chalicelib.utils import sql_helper as sh logger = logging.getLogger(__name__) -SESSION_PROJECTION_COLS_CH = """\ -s.project_id, -s.session_id AS session_id, -s.user_uuid AS user_uuid, -s.user_id AS user_id, -s.user_os AS user_os, -s.user_browser AS user_browser, -s.user_device AS user_device, -s.user_device_type AS user_device_type, -s.user_country AS user_country, -s.user_city AS user_city, -s.user_state AS user_state, -toUnixTimestamp(s.datetime)*1000 AS start_ts, -s.duration AS duration, -s.events_count AS events_count, -s.pages_count AS pages_count, -s.errors_count AS errors_count, -s.user_anonymous_id AS user_anonymous_id, -s.platform AS platform, -s.timezone AS timezone, -coalesce(issue_score,0) AS issue_score, -s.issue_types AS issue_types -""" - -SESSION_PROJECTION_COLS_CH_MAP = """\ -'project_id', toString(%(project_id)s), -'session_id', toString(s.session_id), -'user_uuid', toString(s.user_uuid), -'user_id', toString(s.user_id), -'user_os', toString(s.user_os), -'user_browser', toString(s.user_browser), -'user_device', toString(s.user_device), -'user_device_type', toString(s.user_device_type), -'user_country', toString(s.user_country), -'user_city', toString(s.user_city), -'user_state', toString(s.user_state), -'start_ts', toString(toUnixTimestamp(s.datetime)*1000), -'duration', toString(s.duration), -'events_count', toString(s.events_count), -'pages_count', toString(s.pages_count), -'errors_count', toString(s.errors_count), -'user_anonymous_id', toString(s.user_anonymous_id), -'platform', toString(s.platform), -'timezone', toString(s.timezone), -'issue_score', toString(coalesce(issue_score,0)), -'viewed', toString(viewed_sessions.session_id > 0) -""" - - -# This function executes the query and return result -def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_id, errors_only=False, - error_status=schemas.ErrorStatus.ALL, count_only=False, issue=None, ids_only=False, - platform="web"): - if data.bookmarked: - data.startTimestamp, data.endTimestamp = sessions_favorite.get_start_end_timestamp(project_id, user_id) - full_args, query_part = search_query_parts_ch(data=data, error_status=error_status, errors_only=errors_only, - favorite_only=data.bookmarked, issue=issue, project_id=project_id, - user_id=user_id, platform=platform) - if data.sort == "startTs": - data.sort = "datetime" - if data.limit is not None and data.page is not None: - full_args["sessions_limit"] = data.limit - full_args["sessions_limit_s"] = (data.page - 1) * data.limit - full_args["sessions_limit_e"] = data.page * data.limit - else: - full_args["sessions_limit"] = 200 - full_args["sessions_limit_s"] = 0 - full_args["sessions_limit_e"] = 200 - - meta_keys = [] - with ch_client.ClickHouseClient() as cur: - if errors_only: - main_query = cur.format(query=f"""SELECT DISTINCT er.error_id, - COALESCE((SELECT TRUE - FROM {exp_ch_helper.get_user_viewed_errors_table()} AS ve - WHERE er.error_id = ve.error_id - AND ve.user_id = %(userId)s LIMIT 1), FALSE) AS viewed - {query_part};""", parameters=full_args) - - elif count_only: - main_query = cur.format(query=f"""SELECT COUNT(DISTINCT s.session_id) AS count_sessions, - COUNT(DISTINCT s.user_uuid) AS count_users - {query_part};""", - parameters=full_args) - elif data.group_by_user: - g_sort = "count(full_sessions)" - if data.order is None: - data.order = schemas.SortOrderType.DESC.value - else: - data.order = data.order - if data.sort is not None and data.sort != 'sessionsCount': - sort = helper.key_to_snake_case(data.sort) - g_sort = f"{'MIN' if data.order == schemas.SortOrderType.DESC else 'MAX'}({sort})" - else: - sort = 'start_ts' - - meta_keys = metadata.get(project_id=project_id) - meta_map = ",map(%s) AS 'metadata'" \ - % ','.join([f"'{m['key']}',coalesce(metadata_{m['index']},'None')" for m in meta_keys]) - main_query = cur.mogrify(f"""SELECT COUNT(*) AS count, - COALESCE(JSONB_AGG(users_sessions) - FILTER (WHERE rn>%(sessions_limit_s)s AND rn<=%(sessions_limit_e)s), '[]'::JSONB) AS sessions - FROM (SELECT user_id, - count(full_sessions) AS user_sessions_count, - jsonb_agg(full_sessions) FILTER (WHERE rn <= 1) AS last_session, - MIN(full_sessions.start_ts) AS first_session_ts, - ROW_NUMBER() OVER (ORDER BY {g_sort} {data.order}) AS rn - FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY {sort} {data.order}) AS rn - FROM (SELECT DISTINCT ON(s.session_id) {SESSION_PROJECTION_COLS_CH} {meta_map} - {query_part} - ) AS filtred_sessions - ) AS full_sessions - GROUP BY user_id - ) AS users_sessions;""", - full_args) - elif ids_only: - main_query = cur.format(query=f"""SELECT DISTINCT ON(s.session_id) s.session_id - {query_part} - ORDER BY s.session_id desc - LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s;""", - parameters=full_args) - else: - if data.order is None: - data.order = schemas.SortOrderType.DESC.value - else: - data.order = data.order - sort = 'session_id' - if data.sort is not None and data.sort != "session_id": - # sort += " " + data.order + "," + helper.key_to_snake_case(data.sort) - sort = helper.key_to_snake_case(data.sort) - - meta_keys = metadata.get(project_id=project_id) - meta_map = ",'metadata',toString(map(%s))" \ - % ','.join([f"'{m['key']}',coalesce(metadata_{m['index']},'None')" for m in meta_keys]) - main_query = cur.format(query=f"""SELECT any(total) AS count, - groupArray(%(sessions_limit)s)(details) AS sessions - FROM (SELECT total, details - FROM (SELECT COUNT() OVER () AS total, - s.{sort} AS sort_key, - map({SESSION_PROJECTION_COLS_CH_MAP}{meta_map}) AS details - {query_part} - LEFT JOIN (SELECT DISTINCT session_id - FROM experimental.user_viewed_sessions - WHERE user_id = %(userId)s AND project_id=%(project_id)s - AND _timestamp >= toDateTime(%(startDate)s / 1000)) AS viewed_sessions - ON (viewed_sessions.session_id = s.session_id) - ) AS raw - ORDER BY sort_key {data.order} - LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s) AS sorted_sessions;""", - parameters=full_args) - logging.debug("--------------------") - logging.debug(main_query) - logging.debug("--------------------") - try: - sessions = cur.execute(main_query) - except Exception as err: - logging.warning("--------- SESSIONS-CH SEARCH QUERY EXCEPTION -----------") - logging.warning(main_query) - logging.warning("--------- PAYLOAD -----------") - logging.warning(data.model_dump_json()) - logging.warning("--------------------") - raise err - if errors_only or ids_only: - return helper.list_to_camel_case(sessions) - - if len(sessions) > 0: - sessions = sessions[0] - - total = sessions["count"] - sessions = sessions["sessions"] - - if data.group_by_user: - for i, s in enumerate(sessions): - sessions[i] = {**s.pop("last_session")[0], **s} - sessions[i].pop("rn") - sessions[i]["metadata"] = ast.literal_eval(sessions[i]["metadata"]) - else: - for i in range(len(sessions)): - sessions[i]["metadata"] = ast.literal_eval(sessions[i]["metadata"]) - sessions[i] = schemas.SessionModel.parse_obj(helper.dict_to_camel_case(sessions[i])) - - return { - 'total': total, - 'sessions': sessions - } - - def search2_series(data: schemas.SessionsSearchPayloadSchema, project_id: int, density: int, metric_type: schemas.MetricType, metric_of: schemas.MetricOfTimeseries | schemas.MetricOfTable, metric_value: List): @@ -1494,68 +1307,6 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu return full_args, query_part -def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None): - if project_id is None: - all_projects = projects.get_projects(tenant_id=tenant_id) - else: - all_projects = [ - projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False, - include_gdpr=False)] - - all_projects = {int(p["projectId"]): p["name"] for p in all_projects} - project_ids = list(all_projects.keys()) - - available_keys = metadata.get_keys_by_projects(project_ids) - for i in available_keys: - available_keys[i]["user_id"] = schemas.FilterType.USER_ID - available_keys[i]["user_anonymous_id"] = schemas.FilterType.USER_ANONYMOUS_ID - results = {} - for i in project_ids: - if m_key not in available_keys[i].values(): - available_keys.pop(i) - results[i] = {"total": 0, "sessions": [], "missingMetadata": True} - project_ids = list(available_keys.keys()) - if len(project_ids) > 0: - with pg_client.PostgresClient() as cur: - sub_queries = [] - for i in project_ids: - col_name = list(available_keys[i].keys())[list(available_keys[i].values()).index(m_key)] - sub_queries.append(cur.mogrify( - f"(SELECT COALESCE(COUNT(s.*)) AS count FROM public.sessions AS s WHERE s.project_id = %(id)s AND s.{col_name} = %(value)s) AS \"{i}\"", - {"id": i, "value": m_value}).decode('UTF-8')) - query = f"""SELECT {", ".join(sub_queries)};""" - cur.execute(query=query) - - rows = cur.fetchone() - - sub_queries = [] - for i in rows.keys(): - results[i] = {"total": rows[i], "sessions": [], "missingMetadata": False, "name": all_projects[int(i)]} - if rows[i] > 0: - col_name = list(available_keys[int(i)].keys())[list(available_keys[int(i)].values()).index(m_key)] - sub_queries.append( - cur.mogrify( - f"""( - SELECT * - FROM ( - SELECT DISTINCT ON(favorite_sessions.session_id, s.session_id) {SESSION_PROJECTION_COLS_CH} - FROM public.sessions AS s LEFT JOIN (SELECT session_id - FROM public.user_favorite_sessions - WHERE user_favorite_sessions.user_id = %(userId)s - ) AS favorite_sessions USING (session_id) - WHERE s.project_id = %(id)s AND s.duration IS NOT NULL AND s.{col_name} = %(value)s - ) AS full_sessions - ORDER BY favorite DESC, issue_score DESC - LIMIT 10 - )""", - {"id": i, "value": m_value, "userId": user_id}).decode('UTF-8')) - if len(sub_queries) > 0: - cur.execute("\nUNION\n".join(sub_queries)) - rows = cur.fetchall() - for i in rows: - results[str(i["project_id"])]["sessions"].append(helper.dict_to_camel_case(i)) - return results - def get_user_sessions(project_id, user_id, start_date, end_date): with pg_client.PostgresClient() as cur: @@ -1660,8 +1411,3 @@ def check_recording_status(project_id: int) -> dict: "sessionsCount": row["sessions_count"] } - -# TODO: rewrite this function to use ClickHouse -def search_sessions_by_ids(project_id: int, session_ids: list, sort_by: str = 'session_id', - ascending: bool = False) -> dict: - return sessions_legacy.search_sessions_by_ids(project_id, session_ids, sort_by, ascending) diff --git a/api/chalicelib/core/sessions/sessions_search.py b/api/chalicelib/core/sessions/sessions_search.py new file mode 100644 index 000000000..eebf132b2 --- /dev/null +++ b/api/chalicelib/core/sessions/sessions_search.py @@ -0,0 +1,260 @@ +import logging +from typing import List, Union + +import schemas +from chalicelib.core import events, metadata, projects +from chalicelib.core.sessions import sessions_favorite, performance_event, sessions_legacy +from chalicelib.utils import pg_client, helper, metrics_helper +from chalicelib.utils import sql_helper as sh + +logger = logging.getLogger(__name__) + +SESSION_PROJECTION_BASE_COLS = """s.project_id, +s.session_id::text AS session_id, +s.user_uuid, +s.user_id, +s.user_os, +s.user_browser, +s.user_device, +s.user_device_type, +s.user_country, +s.user_city, +s.user_state, +s.start_ts, +s.duration, +s.events_count, +s.pages_count, +s.errors_count, +s.user_anonymous_id, +s.platform, +s.issue_score, +s.timezone, +to_jsonb(s.issue_types) AS issue_types """ + +SESSION_PROJECTION_COLS = SESSION_PROJECTION_BASE_COLS + """, +favorite_sessions.session_id NOTNULL AS favorite, +COALESCE((SELECT TRUE + FROM public.user_viewed_sessions AS fs + WHERE s.session_id = fs.session_id + AND fs.user_id = %(userId)s LIMIT 1), FALSE) AS viewed """ + + +# This function executes the query and return result +def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_id, errors_only=False, + error_status=schemas.ErrorStatus.ALL, count_only=False, issue=None, ids_only=False, + platform="web"): + if data.bookmarked: + data.startTimestamp, data.endTimestamp = sessions_favorite.get_start_end_timestamp(project_id, user_id) + + full_args, query_part = sessions_legacy.search_query_parts(data=data, error_status=error_status, + errors_only=errors_only, + favorite_only=data.bookmarked, issue=issue, + project_id=project_id, + user_id=user_id, platform=platform) + if data.limit is not None and data.page is not None: + full_args["sessions_limit"] = data.limit + full_args["sessions_limit_s"] = (data.page - 1) * data.limit + full_args["sessions_limit_e"] = data.page * data.limit + else: + full_args["sessions_limit"] = 200 + full_args["sessions_limit_s"] = 0 + full_args["sessions_limit_e"] = 200 + + meta_keys = [] + with pg_client.PostgresClient() as cur: + if errors_only: + main_query = cur.mogrify(f"""SELECT DISTINCT er.error_id, + COALESCE((SELECT TRUE + FROM public.user_viewed_errors AS ve + WHERE er.error_id = ve.error_id + AND ve.user_id = %(userId)s LIMIT 1), FALSE) AS viewed + {query_part};""", full_args) + + elif count_only: + main_query = cur.mogrify(f"""SELECT COUNT(DISTINCT s.session_id) AS count_sessions, + COUNT(DISTINCT s.user_uuid) AS count_users + {query_part};""", full_args) + elif data.group_by_user: + g_sort = "count(full_sessions)" + if data.order is None: + data.order = schemas.SortOrderType.DESC.value + else: + data.order = data.order + if data.sort is not None and data.sort != 'sessionsCount': + sort = helper.key_to_snake_case(data.sort) + g_sort = f"{'MIN' if data.order == schemas.SortOrderType.DESC else 'MAX'}({sort})" + else: + sort = 'start_ts' + + meta_keys = metadata.get(project_id=project_id) + main_query = cur.mogrify(f"""SELECT COUNT(*) AS count, + COALESCE(JSONB_AGG(users_sessions) + FILTER (WHERE rn>%(sessions_limit_s)s AND rn<=%(sessions_limit_e)s), '[]'::JSONB) AS sessions + FROM (SELECT user_id, + count(full_sessions) AS user_sessions_count, + jsonb_agg(full_sessions) FILTER (WHERE rn <= 1) AS last_session, + MIN(full_sessions.start_ts) AS first_session_ts, + ROW_NUMBER() OVER (ORDER BY {g_sort} {data.order}) AS rn + FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY {sort} {data.order}) AS rn + FROM (SELECT DISTINCT ON(s.session_id) {SESSION_PROJECTION_COLS} + {"," if len(meta_keys) > 0 else ""}{",".join([f'metadata_{m["index"]}' for m in meta_keys])} + {query_part} + ) AS filtred_sessions + ) AS full_sessions + GROUP BY user_id + ) AS users_sessions;""", + full_args) + elif ids_only: + main_query = cur.mogrify(f"""SELECT DISTINCT ON(s.session_id) s.session_id + {query_part} + ORDER BY s.session_id desc + LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s;""", + full_args) + else: + if data.order is None: + data.order = schemas.SortOrderType.DESC.value + else: + data.order = data.order + sort = 'session_id' + if data.sort is not None and data.sort != "session_id": + # sort += " " + data.order + "," + helper.key_to_snake_case(data.sort) + sort = helper.key_to_snake_case(data.sort) + + meta_keys = metadata.get(project_id=project_id) + main_query = cur.mogrify(f"""SELECT COUNT(full_sessions) AS count, + COALESCE(JSONB_AGG(full_sessions) + FILTER (WHERE rn>%(sessions_limit_s)s AND rn<=%(sessions_limit_e)s), '[]'::JSONB) AS sessions + FROM (SELECT *, ROW_NUMBER() OVER (ORDER BY {sort} {data.order}, issue_score DESC) AS rn + FROM (SELECT DISTINCT ON(s.session_id) {SESSION_PROJECTION_COLS} + {"," if len(meta_keys) > 0 else ""}{",".join([f'metadata_{m["index"]}' for m in meta_keys])} + {query_part} + ORDER BY s.session_id desc) AS filtred_sessions + ORDER BY {sort} {data.order}, issue_score DESC) AS full_sessions;""", + full_args) + logger.debug("--------------------") + logger.debug(main_query) + logger.debug("--------------------") + try: + cur.execute(main_query) + sessions = cur.fetchone() + except Exception as err: + logger.warning("--------- SESSIONS SEARCH QUERY EXCEPTION -----------") + logger.warning(main_query.decode('UTF-8')) + logger.warning("--------- PAYLOAD -----------") + logger.warning(data.model_dump_json()) + logger.warning("--------------------") + raise err + if errors_only or ids_only: + return helper.list_to_camel_case(cur.fetchall()) + + if count_only: + return helper.dict_to_camel_case(sessions) + + total = sessions["count"] + sessions = sessions["sessions"] + + if data.group_by_user: + for i, s in enumerate(sessions): + sessions[i] = {**s.pop("last_session")[0], **s} + sessions[i].pop("rn") + sessions[i]["metadata"] = {k["key"]: sessions[i][f'metadata_{k["index"]}'] for k in meta_keys \ + if sessions[i][f'metadata_{k["index"]}'] is not None} + else: + for i, s in enumerate(sessions): + sessions[i]["metadata"] = {k["key"]: sessions[i][f'metadata_{k["index"]}'] for k in meta_keys \ + if sessions[i][f'metadata_{k["index"]}'] is not None} + # if not data.group_by_user and data.sort is not None and data.sort != "session_id": + # sessions = sorted(sessions, key=lambda s: s[helper.key_to_snake_case(data.sort)], + # reverse=data.order.upper() == "DESC") + return { + 'total': total, + 'sessions': helper.list_to_camel_case(sessions) + } + + +def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None): + if project_id is None: + all_projects = projects.get_projects(tenant_id=tenant_id) + else: + all_projects = [ + projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False, + include_gdpr=False)] + + all_projects = {int(p["projectId"]): p["name"] for p in all_projects} + project_ids = list(all_projects.keys()) + + available_keys = metadata.get_keys_by_projects(project_ids) + for i in available_keys: + available_keys[i]["user_id"] = schemas.FilterType.USER_ID + available_keys[i]["user_anonymous_id"] = schemas.FilterType.USER_ANONYMOUS_ID + results = {} + for i in project_ids: + if m_key not in available_keys[i].values(): + available_keys.pop(i) + results[i] = {"total": 0, "sessions": [], "missingMetadata": True} + project_ids = list(available_keys.keys()) + if len(project_ids) > 0: + with pg_client.PostgresClient() as cur: + sub_queries = [] + for i in project_ids: + col_name = list(available_keys[i].keys())[list(available_keys[i].values()).index(m_key)] + sub_queries.append(cur.mogrify( + f"(SELECT COALESCE(COUNT(s.*)) AS count FROM public.sessions AS s WHERE s.project_id = %(id)s AND s.{col_name} = %(value)s) AS \"{i}\"", + {"id": i, "value": m_value}).decode('UTF-8')) + query = f"""SELECT {", ".join(sub_queries)};""" + cur.execute(query=query) + + rows = cur.fetchone() + + sub_queries = [] + for i in rows.keys(): + results[i] = {"total": rows[i], "sessions": [], "missingMetadata": False, "name": all_projects[int(i)]} + if rows[i] > 0: + col_name = list(available_keys[int(i)].keys())[list(available_keys[int(i)].values()).index(m_key)] + sub_queries.append( + cur.mogrify( + f"""( + SELECT * + FROM ( + SELECT DISTINCT ON(favorite_sessions.session_id, s.session_id) {SESSION_PROJECTION_COLS} + FROM public.sessions AS s LEFT JOIN (SELECT session_id + FROM public.user_favorite_sessions + WHERE user_favorite_sessions.user_id = %(userId)s + ) AS favorite_sessions USING (session_id) + WHERE s.project_id = %(id)s AND s.duration IS NOT NULL AND s.{col_name} = %(value)s + ) AS full_sessions + ORDER BY favorite DESC, issue_score DESC + LIMIT 10 + )""", + {"id": i, "value": m_value, "userId": user_id}).decode('UTF-8')) + if len(sub_queries) > 0: + cur.execute("\nUNION\n".join(sub_queries)) + rows = cur.fetchall() + for i in rows: + results[str(i["project_id"])]["sessions"].append(helper.dict_to_camel_case(i)) + return results + + +def search_sessions_by_ids(project_id: int, session_ids: list, sort_by: str = 'session_id', + ascending: bool = False) -> dict: + if session_ids is None or len(session_ids) == 0: + return {"total": 0, "sessions": []} + with pg_client.PostgresClient() as cur: + meta_keys = metadata.get(project_id=project_id) + params = {"project_id": project_id, "session_ids": tuple(session_ids)} + order_direction = 'ASC' if ascending else 'DESC' + main_query = cur.mogrify(f"""SELECT {SESSION_PROJECTION_BASE_COLS} + {"," if len(meta_keys) > 0 else ""}{",".join([f'metadata_{m["index"]}' for m in meta_keys])} + FROM public.sessions AS s + WHERE project_id=%(project_id)s + AND session_id IN %(session_ids)s + ORDER BY {sort_by} {order_direction};""", params) + + cur.execute(main_query) + rows = cur.fetchall() + if len(meta_keys) > 0: + for s in rows: + s["metadata"] = {} + for m in meta_keys: + s["metadata"][m["key"]] = s.pop(f'metadata_{m["index"]}') + return {"total": len(rows), "sessions": helper.list_to_camel_case(rows)} diff --git a/api/chalicelib/core/usability_testing/service.py b/api/chalicelib/core/usability_testing/service.py index 62c60e3de..7c28ddc37 100644 --- a/api/chalicelib/core/usability_testing/service.py +++ b/api/chalicelib/core/usability_testing/service.py @@ -1,6 +1,7 @@ import logging -from chalicelib.core import sessions, assist +from chalicelib.core import assist +from chalicelib.core.sessions import sessions_search from chalicelib.core.db_request_handler import DatabaseRequestHandler from chalicelib.core.usability_testing.schema import UTTestCreate, UTTestSearch, UTTestUpdate from chalicelib.utils.TimeUTC import TimeUTC @@ -312,7 +313,7 @@ def ut_tests_sessions(project_id: int, test_id: int, page: int, limit: int, user session_ids = handler.fetchall() session_ids = [session['session_id'] for session in session_ids] - sessions_list = sessions.search_sessions_by_ids(project_id=project_id, session_ids=session_ids) + sessions_list = sessions_search.search_sessions_by_ids(project_id=project_id, session_ids=session_ids) sessions_list['page'] = page return sessions_list diff --git a/api/routers/core_dynamic.py b/api/routers/core_dynamic.py index d61dbdcd4..ce31c2efd 100644 --- a/api/routers/core_dynamic.py +++ b/api/routers/core_dynamic.py @@ -12,7 +12,7 @@ from chalicelib.core import errors, assist, signup, feature_flags from chalicelib.core.metrics import heatmaps from chalicelib.core.errors import errors_favorite, errors_viewed from chalicelib.core.sessions import sessions, sessions_notes, sessions_replay, sessions_favorite, sessions_viewed, \ - sessions_assignments, unprocessed_sessions + sessions_assignments, unprocessed_sessions, sessions_search from chalicelib.core import tenants, users, projects, license from chalicelib.core import webhook from chalicelib.core.collaborations.collaboration_slack import Slack @@ -242,8 +242,8 @@ def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] = if len(key) == 0: return {"errors": ["please provide a key for search"]} return { - "data": sessions.search_by_metadata(tenant_id=context.tenant_id, user_id=context.user_id, m_value=value, - m_key=key, project_id=projectId)} + "data": sessions_search.search_by_metadata(tenant_id=context.tenant_id, user_id=context.user_id, m_value=value, + m_key=key, project_id=projectId)} @app.get('/projects', tags=['projects']) @@ -252,18 +252,18 @@ def get_projects(context: schemas.CurrentContext = Depends(OR_context)): @app.post('/{projectId}/sessions/search', tags=["sessions"]) -def sessions_search(projectId: int, data: schemas.SessionsSearchPayloadSchema = Body(...), +def search_sessions(projectId: int, data: schemas.SessionsSearchPayloadSchema = Body(...), context: schemas.CurrentContext = Depends(OR_context)): - data = sessions.search_sessions(data=data, project_id=projectId, user_id=context.user_id, - platform=context.project.platform) + data = sessions_search.search_sessions(data=data, project_id=projectId, user_id=context.user_id, + platform=context.project.platform) return {'data': data} @app.post('/{projectId}/sessions/search/ids', tags=["sessions"]) def session_ids_search(projectId: int, data: schemas.SessionsSearchPayloadSchema = Body(...), context: schemas.CurrentContext = Depends(OR_context)): - data = sessions.search_sessions(data=data, project_id=projectId, user_id=context.user_id, ids_only=True, - platform=context.project.platform) + data = sessions_search.search_sessions(data=data, project_id=projectId, user_id=context.user_id, ids_only=True, + platform=context.project.platform) return {'data': data} diff --git a/ee/api/.gitignore b/ee/api/.gitignore index 7177344c0..f79a41945 100644 --- a/ee/api/.gitignore +++ b/ee/api/.gitignore @@ -221,6 +221,7 @@ Pipfile.lock /chalicelib/core/sessions/sessions_metas.py /chalicelib/core/sessions/sessions_mobs.py /chalicelib/core/sessions/sessions_replay.py +/chalicelib/core/sessions/sessions_search.py /chalicelib/core/sessions/performance_event.py /chalicelib/core/sessions/sessions_viewed.py /chalicelib/core/sessions/unprocessed_sessions.py diff --git a/ee/api/chalicelib/core/sessions/__init__.py b/ee/api/chalicelib/core/sessions/__init__.py index 63a0db2b5..9ae268be6 100644 --- a/ee/api/chalicelib/core/sessions/__init__.py +++ b/ee/api/chalicelib/core/sessions/__init__.py @@ -8,8 +8,10 @@ from . import sessions as sessions_legacy if config("EXP_SESSIONS_SEARCH", cast=bool, default=False): logger.info(">>> Using experimental sessions search") from . import sessions_ch as sessions + from . import sessions_search_exp as sessions_search else: from . import sessions + from . import sessions_search_exp from chalicelib.core.sessions import sessions_devtool_ee as sessions_devtool from chalicelib.core.sessions import sessions_viewed_ee as sessions_viewed diff --git a/ee/api/chalicelib/core/sessions/sessions_search_exp.py b/ee/api/chalicelib/core/sessions/sessions_search_exp.py new file mode 100644 index 000000000..a5d6f1398 --- /dev/null +++ b/ee/api/chalicelib/core/sessions/sessions_search_exp.py @@ -0,0 +1,269 @@ +import ast +import logging +from typing import List, Union + +import schemas +from chalicelib.core import events, metadata, projects +from chalicelib.core.metrics import metrics +from chalicelib.core.sessions import sessions_favorite, performance_event, sessions_legacy, sessions +from chalicelib.utils import pg_client, helper, metrics_helper, ch_client, exp_ch_helper +from chalicelib.utils import sql_helper as sh + +logger = logging.getLogger(__name__) + +SESSION_PROJECTION_COLS_CH = """\ +s.project_id, +s.session_id AS session_id, +s.user_uuid AS user_uuid, +s.user_id AS user_id, +s.user_os AS user_os, +s.user_browser AS user_browser, +s.user_device AS user_device, +s.user_device_type AS user_device_type, +s.user_country AS user_country, +s.user_city AS user_city, +s.user_state AS user_state, +toUnixTimestamp(s.datetime)*1000 AS start_ts, +s.duration AS duration, +s.events_count AS events_count, +s.pages_count AS pages_count, +s.errors_count AS errors_count, +s.user_anonymous_id AS user_anonymous_id, +s.platform AS platform, +s.timezone AS timezone, +coalesce(issue_score,0) AS issue_score, +s.issue_types AS issue_types +""" + +SESSION_PROJECTION_COLS_CH_MAP = """\ +'project_id', toString(%(project_id)s), +'session_id', toString(s.session_id), +'user_uuid', toString(s.user_uuid), +'user_id', toString(s.user_id), +'user_os', toString(s.user_os), +'user_browser', toString(s.user_browser), +'user_device', toString(s.user_device), +'user_device_type', toString(s.user_device_type), +'user_country', toString(s.user_country), +'user_city', toString(s.user_city), +'user_state', toString(s.user_state), +'start_ts', toString(toUnixTimestamp(s.datetime)*1000), +'duration', toString(s.duration), +'events_count', toString(s.events_count), +'pages_count', toString(s.pages_count), +'errors_count', toString(s.errors_count), +'user_anonymous_id', toString(s.user_anonymous_id), +'platform', toString(s.platform), +'timezone', toString(s.timezone), +'issue_score', toString(coalesce(issue_score,0)), +'viewed', toString(viewed_sessions.session_id > 0) +""" + + +# This function executes the query and return result +def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_id, errors_only=False, + error_status=schemas.ErrorStatus.ALL, count_only=False, issue=None, ids_only=False, + platform="web"): + if data.bookmarked: + data.startTimestamp, data.endTimestamp = sessions_favorite.get_start_end_timestamp(project_id, user_id) + full_args, query_part = sessions.search_query_parts_ch(data=data, error_status=error_status, + errors_only=errors_only, + favorite_only=data.bookmarked, issue=issue, + project_id=project_id, + user_id=user_id, platform=platform) + if data.sort == "startTs": + data.sort = "datetime" + if data.limit is not None and data.page is not None: + full_args["sessions_limit"] = data.limit + full_args["sessions_limit_s"] = (data.page - 1) * data.limit + full_args["sessions_limit_e"] = data.page * data.limit + else: + full_args["sessions_limit"] = 200 + full_args["sessions_limit_s"] = 0 + full_args["sessions_limit_e"] = 200 + + meta_keys = [] + with ch_client.ClickHouseClient() as cur: + if errors_only: + main_query = cur.format(query=f"""SELECT DISTINCT er.error_id, + COALESCE((SELECT TRUE + FROM {exp_ch_helper.get_user_viewed_errors_table()} AS ve + WHERE er.error_id = ve.error_id + AND ve.user_id = %(userId)s LIMIT 1), FALSE) AS viewed + {query_part};""", parameters=full_args) + + elif count_only: + main_query = cur.format(query=f"""SELECT COUNT(DISTINCT s.session_id) AS count_sessions, + COUNT(DISTINCT s.user_uuid) AS count_users + {query_part};""", + parameters=full_args) + elif data.group_by_user: + g_sort = "count(full_sessions)" + if data.order is None: + data.order = schemas.SortOrderType.DESC.value + else: + data.order = data.order + if data.sort is not None and data.sort != 'sessionsCount': + sort = helper.key_to_snake_case(data.sort) + g_sort = f"{'MIN' if data.order == schemas.SortOrderType.DESC else 'MAX'}({sort})" + else: + sort = 'start_ts' + + meta_keys = metadata.get(project_id=project_id) + meta_map = ",map(%s) AS 'metadata'" \ + % ','.join([f"'{m['key']}',coalesce(metadata_{m['index']},'None')" for m in meta_keys]) + main_query = cur.mogrify(f"""SELECT COUNT(*) AS count, + COALESCE(JSONB_AGG(users_sessions) + FILTER (WHERE rn>%(sessions_limit_s)s AND rn<=%(sessions_limit_e)s), '[]'::JSONB) AS sessions + FROM (SELECT user_id, + count(full_sessions) AS user_sessions_count, + jsonb_agg(full_sessions) FILTER (WHERE rn <= 1) AS last_session, + MIN(full_sessions.start_ts) AS first_session_ts, + ROW_NUMBER() OVER (ORDER BY {g_sort} {data.order}) AS rn + FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY {sort} {data.order}) AS rn + FROM (SELECT DISTINCT ON(s.session_id) {SESSION_PROJECTION_COLS_CH} {meta_map} + {query_part} + ) AS filtred_sessions + ) AS full_sessions + GROUP BY user_id + ) AS users_sessions;""", + full_args) + elif ids_only: + main_query = cur.format(query=f"""SELECT DISTINCT ON(s.session_id) s.session_id + {query_part} + ORDER BY s.session_id desc + LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s;""", + parameters=full_args) + else: + if data.order is None: + data.order = schemas.SortOrderType.DESC.value + else: + data.order = data.order + sort = 'session_id' + if data.sort is not None and data.sort != "session_id": + # sort += " " + data.order + "," + helper.key_to_snake_case(data.sort) + sort = helper.key_to_snake_case(data.sort) + + meta_keys = metadata.get(project_id=project_id) + meta_map = ",'metadata',toString(map(%s))" \ + % ','.join([f"'{m['key']}',coalesce(metadata_{m['index']},'None')" for m in meta_keys]) + main_query = cur.format(query=f"""SELECT any(total) AS count, + groupArray(%(sessions_limit)s)(details) AS sessions + FROM (SELECT total, details + FROM (SELECT COUNT() OVER () AS total, + s.{sort} AS sort_key, + map({SESSION_PROJECTION_COLS_CH_MAP}{meta_map}) AS details + {query_part} + LEFT JOIN (SELECT DISTINCT session_id + FROM experimental.user_viewed_sessions + WHERE user_id = %(userId)s AND project_id=%(project_id)s + AND _timestamp >= toDateTime(%(startDate)s / 1000)) AS viewed_sessions + ON (viewed_sessions.session_id = s.session_id) + ) AS raw + ORDER BY sort_key {data.order} + LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s) AS sorted_sessions;""", + parameters=full_args) + logging.debug("--------------------") + logging.debug(main_query) + logging.debug("--------------------") + try: + sessions = cur.execute(main_query) + except Exception as err: + logging.warning("--------- SESSIONS-CH SEARCH QUERY EXCEPTION -----------") + logging.warning(main_query) + logging.warning("--------- PAYLOAD -----------") + logging.warning(data.model_dump_json()) + logging.warning("--------------------") + raise err + if errors_only or ids_only: + return helper.list_to_camel_case(sessions) + + if len(sessions) > 0: + sessions = sessions[0] + + total = sessions["count"] + sessions = sessions["sessions"] + + if data.group_by_user: + for i, s in enumerate(sessions): + sessions[i] = {**s.pop("last_session")[0], **s} + sessions[i].pop("rn") + sessions[i]["metadata"] = ast.literal_eval(sessions[i]["metadata"]) + else: + for i in range(len(sessions)): + sessions[i]["metadata"] = ast.literal_eval(sessions[i]["metadata"]) + sessions[i] = schemas.SessionModel.parse_obj(helper.dict_to_camel_case(sessions[i])) + + return { + 'total': total, + 'sessions': sessions + } + + +def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None): + if project_id is None: + all_projects = projects.get_projects(tenant_id=tenant_id) + else: + all_projects = [ + projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False, + include_gdpr=False)] + + all_projects = {int(p["projectId"]): p["name"] for p in all_projects} + project_ids = list(all_projects.keys()) + + available_keys = metadata.get_keys_by_projects(project_ids) + for i in available_keys: + available_keys[i]["user_id"] = schemas.FilterType.USER_ID + available_keys[i]["user_anonymous_id"] = schemas.FilterType.USER_ANONYMOUS_ID + results = {} + for i in project_ids: + if m_key not in available_keys[i].values(): + available_keys.pop(i) + results[i] = {"total": 0, "sessions": [], "missingMetadata": True} + project_ids = list(available_keys.keys()) + if len(project_ids) > 0: + with pg_client.PostgresClient() as cur: + sub_queries = [] + for i in project_ids: + col_name = list(available_keys[i].keys())[list(available_keys[i].values()).index(m_key)] + sub_queries.append(cur.mogrify( + f"(SELECT COALESCE(COUNT(s.*)) AS count FROM public.sessions AS s WHERE s.project_id = %(id)s AND s.{col_name} = %(value)s) AS \"{i}\"", + {"id": i, "value": m_value}).decode('UTF-8')) + query = f"""SELECT {", ".join(sub_queries)};""" + cur.execute(query=query) + + rows = cur.fetchone() + + sub_queries = [] + for i in rows.keys(): + results[i] = {"total": rows[i], "sessions": [], "missingMetadata": False, "name": all_projects[int(i)]} + if rows[i] > 0: + col_name = list(available_keys[int(i)].keys())[list(available_keys[int(i)].values()).index(m_key)] + sub_queries.append( + cur.mogrify( + f"""( + SELECT * + FROM ( + SELECT DISTINCT ON(favorite_sessions.session_id, s.session_id) {SESSION_PROJECTION_COLS_CH} + FROM public.sessions AS s LEFT JOIN (SELECT session_id + FROM public.user_favorite_sessions + WHERE user_favorite_sessions.user_id = %(userId)s + ) AS favorite_sessions USING (session_id) + WHERE s.project_id = %(id)s AND s.duration IS NOT NULL AND s.{col_name} = %(value)s + ) AS full_sessions + ORDER BY favorite DESC, issue_score DESC + LIMIT 10 + )""", + {"id": i, "value": m_value, "userId": user_id}).decode('UTF-8')) + if len(sub_queries) > 0: + cur.execute("\nUNION\n".join(sub_queries)) + rows = cur.fetchall() + for i in rows: + results[str(i["project_id"])]["sessions"].append(helper.dict_to_camel_case(i)) + return results + + +# TODO: rewrite this function to use ClickHouse +def search_sessions_by_ids(project_id: int, session_ids: list, sort_by: str = 'session_id', + ascending: bool = False) -> dict: + return sessions_legacy.search_sessions_by_ids(project_id, session_ids, sort_by, ascending) diff --git a/ee/api/clean-dev.sh b/ee/api/clean-dev.sh index 9c31242b0..5fb3701b2 100755 --- a/ee/api/clean-dev.sh +++ b/ee/api/clean-dev.sh @@ -42,6 +42,7 @@ rm -rf ./chalicelib/core/sessions/sessions_assignments.py rm -rf ./chalicelib/core/sessions/sessions_metas.py rm -rf ./chalicelib/core/sessions/sessions_mobs.py rm -rf ./chalicelib/core/sessions/sessions_replay.py +rm -rf ./chalicelib/core/sessions/sessions_search.py rm -rf ./chalicelib/core/sessions/performance_event.py rm -rf ./chalicelib/core/sessions/sessions_viewed.py rm -rf ./chalicelib/core/sessions/unprocessed_sessions.py