fix(chalice): fixed session-search-pg sortKey issue fix(chalice): fixed CH-query-formatter to handle special chars fix(chalice): fixed /ids response
268 lines
14 KiB
Python
268 lines
14 KiB
Python
import logging
|
|
|
|
import schemas
|
|
from chalicelib.core import metadata, projects
|
|
from . import sessions_favorite, sessions_legacy
|
|
from chalicelib.utils import pg_client, helper
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SESSION_PROJECTION_BASE_COLS = """s.project_id,
|
|
s.session_id::text AS session_id,
|
|
s.user_uuid,
|
|
s.user_id,
|
|
s.user_os,
|
|
s.user_browser,
|
|
s.user_device,
|
|
s.user_device_type,
|
|
s.user_country,
|
|
s.user_city,
|
|
s.user_state,
|
|
s.start_ts,
|
|
s.duration,
|
|
s.events_count,
|
|
s.pages_count,
|
|
s.errors_count,
|
|
s.user_anonymous_id,
|
|
s.platform,
|
|
s.issue_score,
|
|
s.timezone,
|
|
to_jsonb(s.issue_types) AS issue_types """
|
|
|
|
SESSION_PROJECTION_COLS = SESSION_PROJECTION_BASE_COLS + """,
|
|
favorite_sessions.session_id NOTNULL AS favorite,
|
|
COALESCE((SELECT TRUE
|
|
FROM public.user_viewed_sessions AS fs
|
|
WHERE s.session_id = fs.session_id
|
|
AND fs.user_id = %(userId)s LIMIT 1), FALSE) AS viewed """
|
|
|
|
|
|
# This function executes the query and return result
|
|
def search_sessions(data: schemas.SessionsSearchPayloadSchema, project: schemas.ProjectContext,
|
|
user_id, errors_only=False, error_status=schemas.ErrorStatus.ALL,
|
|
count_only=False, issue=None, ids_only=False, platform="web"):
|
|
if data.bookmarked:
|
|
data.startTimestamp, data.endTimestamp = sessions_favorite.get_start_end_timestamp(project.project_id, user_id)
|
|
if data.startTimestamp is None:
|
|
logger.debug(f"No vault sessions found for project:{project.project_id}")
|
|
return {
|
|
'total': 0,
|
|
'sessions': [],
|
|
'src': 1
|
|
}
|
|
full_args, query_part = sessions_legacy.search_query_parts(data=data, error_status=error_status,
|
|
errors_only=errors_only,
|
|
favorite_only=data.bookmarked, issue=issue,
|
|
project_id=project.project_id,
|
|
user_id=user_id, platform=platform)
|
|
if data.limit is not None and data.page is not None:
|
|
full_args["sessions_limit"] = data.limit
|
|
full_args["sessions_limit_s"] = (data.page - 1) * data.limit
|
|
full_args["sessions_limit_e"] = data.page * data.limit
|
|
else:
|
|
full_args["sessions_limit"] = 200
|
|
full_args["sessions_limit_s"] = 0
|
|
full_args["sessions_limit_e"] = 200
|
|
|
|
meta_keys = []
|
|
with pg_client.PostgresClient() as cur:
|
|
if errors_only:
|
|
main_query = cur.mogrify(f"""SELECT DISTINCT er.error_id,
|
|
COALESCE((SELECT TRUE
|
|
FROM public.user_viewed_errors AS ve
|
|
WHERE er.error_id = ve.error_id
|
|
AND ve.user_id = %(userId)s LIMIT 1), FALSE) AS viewed
|
|
{query_part};""", full_args)
|
|
|
|
elif count_only:
|
|
main_query = cur.mogrify(f"""SELECT COUNT(DISTINCT s.session_id) AS count_sessions,
|
|
COUNT(DISTINCT s.user_uuid) AS count_users
|
|
{query_part};""", full_args)
|
|
elif data.group_by_user:
|
|
g_sort = "count(full_sessions)"
|
|
if data.order is None:
|
|
data.order = schemas.SortOrderType.DESC.value
|
|
else:
|
|
data.order = data.order
|
|
if data.sort is not None and data.sort != 'sessionsCount':
|
|
sort = helper.key_to_snake_case(data.sort)
|
|
g_sort = f"{'MIN' if data.order == schemas.SortOrderType.DESC else 'MAX'}({sort})"
|
|
else:
|
|
sort = 'start_ts'
|
|
|
|
meta_keys = metadata.get(project_id=project.project_id)
|
|
main_query = cur.mogrify(f"""SELECT COUNT(*) AS count,
|
|
COALESCE(JSONB_AGG(users_sessions)
|
|
FILTER (WHERE rn>%(sessions_limit_s)s AND rn<=%(sessions_limit_e)s), '[]'::JSONB) AS sessions
|
|
FROM (SELECT user_id,
|
|
count(full_sessions) AS user_sessions_count,
|
|
jsonb_agg(full_sessions) FILTER (WHERE rn <= 1) AS last_session,
|
|
MIN(full_sessions.start_ts) AS first_session_ts,
|
|
ROW_NUMBER() OVER (ORDER BY {g_sort} {data.order}) AS rn
|
|
FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY {sort} {data.order}) AS rn
|
|
FROM (SELECT DISTINCT ON(s.session_id) {SESSION_PROJECTION_COLS}
|
|
{"," if len(meta_keys) > 0 else ""}{",".join([f'metadata_{m["index"]}' for m in meta_keys])}
|
|
{query_part}
|
|
) AS filtred_sessions
|
|
) AS full_sessions
|
|
GROUP BY user_id
|
|
) AS users_sessions;""",
|
|
full_args)
|
|
elif ids_only:
|
|
main_query = cur.mogrify(f"""SELECT DISTINCT ON(s.session_id) s.session_id
|
|
{query_part}
|
|
ORDER BY s.session_id desc
|
|
LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s;""",
|
|
full_args)
|
|
else:
|
|
if data.order is None:
|
|
data.order = schemas.SortOrderType.DESC.value
|
|
else:
|
|
data.order = data.order
|
|
sort = 'session_id'
|
|
if data.sort is not None and data.sort != "session_id":
|
|
# sort += " " + data.order + "," + helper.key_to_snake_case(data.sort)
|
|
if data.sort == 'datetime':
|
|
sort = 'start_ts'
|
|
else:
|
|
sort = helper.key_to_snake_case(data.sort)
|
|
|
|
meta_keys = metadata.get(project_id=project.project_id)
|
|
main_query = cur.mogrify(f"""SELECT COUNT(full_sessions) AS count,
|
|
COALESCE(JSONB_AGG(full_sessions)
|
|
FILTER (WHERE rn>%(sessions_limit_s)s AND rn<=%(sessions_limit_e)s), '[]'::JSONB) AS sessions
|
|
FROM (SELECT *, ROW_NUMBER() OVER (ORDER BY {sort} {data.order}, issue_score DESC) AS rn
|
|
FROM (SELECT DISTINCT ON(s.session_id) {SESSION_PROJECTION_COLS}
|
|
{"," if len(meta_keys) > 0 else ""}{",".join([f'metadata_{m["index"]}' for m in meta_keys])}
|
|
{query_part}
|
|
ORDER BY s.session_id desc) AS filtred_sessions
|
|
ORDER BY {sort} {data.order}, issue_score DESC) AS full_sessions;""",
|
|
full_args)
|
|
logger.debug("--------------------")
|
|
logger.debug(main_query)
|
|
logger.debug("--------------------")
|
|
try:
|
|
cur.execute(main_query)
|
|
sessions = cur.fetchone()
|
|
except Exception as err:
|
|
logger.warning("--------- SESSIONS SEARCH QUERY EXCEPTION -----------")
|
|
logger.warning(main_query.decode('UTF-8'))
|
|
logger.warning("--------- PAYLOAD -----------")
|
|
logger.warning(data.model_dump_json())
|
|
logger.warning("--------------------")
|
|
raise err
|
|
if errors_only or ids_only:
|
|
return helper.list_to_camel_case(cur.fetchall())
|
|
|
|
if count_only:
|
|
return helper.dict_to_camel_case(sessions)
|
|
|
|
total = sessions["count"]
|
|
sessions = sessions["sessions"]
|
|
|
|
if data.group_by_user:
|
|
for i, s in enumerate(sessions):
|
|
sessions[i] = {**s.pop("last_session")[0], **s}
|
|
sessions[i].pop("rn")
|
|
sessions[i]["metadata"] = {k["key"]: sessions[i][f'metadata_{k["index"]}'] for k in meta_keys \
|
|
if sessions[i][f'metadata_{k["index"]}'] is not None}
|
|
else:
|
|
for i, s in enumerate(sessions):
|
|
sessions[i]["metadata"] = {k["key"]: sessions[i][f'metadata_{k["index"]}'] for k in meta_keys \
|
|
if sessions[i][f'metadata_{k["index"]}'] is not None}
|
|
# if not data.group_by_user and data.sort is not None and data.sort != "session_id":
|
|
# sessions = sorted(sessions, key=lambda s: s[helper.key_to_snake_case(data.sort)],
|
|
# reverse=data.order.upper() == "DESC")
|
|
return {
|
|
'total': total,
|
|
'sessions': helper.list_to_camel_case(sessions),
|
|
'src': 1
|
|
}
|
|
|
|
|
|
def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None):
|
|
if project_id is None:
|
|
all_projects = projects.get_projects(tenant_id=tenant_id)
|
|
else:
|
|
all_projects = [
|
|
projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False,
|
|
include_gdpr=False)]
|
|
|
|
all_projects = {int(p["projectId"]): p["name"] for p in all_projects}
|
|
project_ids = list(all_projects.keys())
|
|
|
|
available_keys = metadata.get_keys_by_projects(project_ids)
|
|
for i in available_keys:
|
|
available_keys[i]["user_id"] = schemas.FilterType.USER_ID
|
|
available_keys[i]["user_anonymous_id"] = schemas.FilterType.USER_ANONYMOUS_ID
|
|
results = {}
|
|
for i in project_ids:
|
|
if m_key not in available_keys[i].values():
|
|
available_keys.pop(i)
|
|
results[i] = {"total": 0, "sessions": [], "missingMetadata": True}
|
|
project_ids = list(available_keys.keys())
|
|
if len(project_ids) > 0:
|
|
with pg_client.PostgresClient() as cur:
|
|
sub_queries = []
|
|
for i in project_ids:
|
|
col_name = list(available_keys[i].keys())[list(available_keys[i].values()).index(m_key)]
|
|
sub_queries.append(cur.mogrify(
|
|
f"(SELECT COALESCE(COUNT(s.*)) AS count FROM public.sessions AS s WHERE s.project_id = %(id)s AND s.{col_name} = %(value)s) AS \"{i}\"",
|
|
{"id": i, "value": m_value}).decode('UTF-8'))
|
|
query = f"""SELECT {", ".join(sub_queries)};"""
|
|
cur.execute(query=query)
|
|
|
|
rows = cur.fetchone()
|
|
|
|
sub_queries = []
|
|
for i in rows.keys():
|
|
results[i] = {"total": rows[i], "sessions": [], "missingMetadata": False, "name": all_projects[int(i)]}
|
|
if rows[i] > 0:
|
|
col_name = list(available_keys[int(i)].keys())[list(available_keys[int(i)].values()).index(m_key)]
|
|
sub_queries.append(
|
|
cur.mogrify(
|
|
f"""(
|
|
SELECT *
|
|
FROM (
|
|
SELECT DISTINCT ON(favorite_sessions.session_id, s.session_id) {SESSION_PROJECTION_COLS}
|
|
FROM public.sessions AS s LEFT JOIN (SELECT session_id
|
|
FROM public.user_favorite_sessions
|
|
WHERE user_favorite_sessions.user_id = %(userId)s
|
|
) AS favorite_sessions USING (session_id)
|
|
WHERE s.project_id = %(id)s AND s.duration IS NOT NULL AND s.{col_name} = %(value)s
|
|
) AS full_sessions
|
|
ORDER BY favorite DESC, issue_score DESC
|
|
LIMIT 10
|
|
)""",
|
|
{"id": i, "value": m_value, "userId": user_id}).decode('UTF-8'))
|
|
if len(sub_queries) > 0:
|
|
cur.execute("\nUNION\n".join(sub_queries))
|
|
rows = cur.fetchall()
|
|
for i in rows:
|
|
results[str(i["project_id"])]["sessions"].append(helper.dict_to_camel_case(i))
|
|
return results
|
|
|
|
|
|
def search_sessions_by_ids(project_id: int, session_ids: list, sort_by: str = 'session_id',
|
|
ascending: bool = False) -> dict:
|
|
if session_ids is None or len(session_ids) == 0:
|
|
return {"total": 0, "sessions": []}
|
|
with pg_client.PostgresClient() as cur:
|
|
meta_keys = metadata.get(project_id=project_id)
|
|
params = {"project_id": project_id, "session_ids": tuple(session_ids)}
|
|
order_direction = 'ASC' if ascending else 'DESC'
|
|
main_query = cur.mogrify(f"""SELECT {SESSION_PROJECTION_BASE_COLS}
|
|
{"," if len(meta_keys) > 0 else ""}{",".join([f'metadata_{m["index"]}' for m in meta_keys])}
|
|
FROM public.sessions AS s
|
|
WHERE project_id=%(project_id)s
|
|
AND session_id IN %(session_ids)s
|
|
ORDER BY {sort_by} {order_direction};""", params)
|
|
|
|
cur.execute(main_query)
|
|
rows = cur.fetchall()
|
|
if len(meta_keys) > 0:
|
|
for s in rows:
|
|
s["metadata"] = {}
|
|
for m in meta_keys:
|
|
s["metadata"][m["key"]] = s.pop(f'metadata_{m["index"]}')
|
|
return {"total": len(rows), "sessions": helper.list_to_camel_case(rows)}
|