Dev (#2898)
* feat(chalice): autocomplete return top 10 with stats * fix(chalice): fixed autocomplete top 10 meta-filters * refactor(chalice): restricted sessions search
This commit is contained in:
parent
7c4ff2ed3f
commit
ddfaaeb6c5
9 changed files with 544 additions and 510 deletions
|
|
@ -9,166 +9,6 @@ from chalicelib.utils import sql_helper as sh
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SESSION_PROJECTION_BASE_COLS = """s.project_id,
|
||||
s.session_id::text AS session_id,
|
||||
s.user_uuid,
|
||||
s.user_id,
|
||||
s.user_os,
|
||||
s.user_browser,
|
||||
s.user_device,
|
||||
s.user_device_type,
|
||||
s.user_country,
|
||||
s.user_city,
|
||||
s.user_state,
|
||||
s.start_ts,
|
||||
s.duration,
|
||||
s.events_count,
|
||||
s.pages_count,
|
||||
s.errors_count,
|
||||
s.user_anonymous_id,
|
||||
s.platform,
|
||||
s.issue_score,
|
||||
s.timezone,
|
||||
to_jsonb(s.issue_types) AS issue_types """
|
||||
|
||||
SESSION_PROJECTION_COLS = SESSION_PROJECTION_BASE_COLS + """,
|
||||
favorite_sessions.session_id NOTNULL AS favorite,
|
||||
COALESCE((SELECT TRUE
|
||||
FROM public.user_viewed_sessions AS fs
|
||||
WHERE s.session_id = fs.session_id
|
||||
AND fs.user_id = %(userId)s LIMIT 1), FALSE) AS viewed """
|
||||
|
||||
|
||||
# This function executes the query and return result
|
||||
def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_id, errors_only=False,
|
||||
error_status=schemas.ErrorStatus.ALL, count_only=False, issue=None, ids_only=False,
|
||||
platform="web"):
|
||||
if data.bookmarked:
|
||||
data.startTimestamp, data.endTimestamp = sessions_favorite.get_start_end_timestamp(project_id, user_id)
|
||||
|
||||
full_args, query_part = search_query_parts(data=data, error_status=error_status, errors_only=errors_only,
|
||||
favorite_only=data.bookmarked, issue=issue, project_id=project_id,
|
||||
user_id=user_id, platform=platform)
|
||||
if data.limit is not None and data.page is not None:
|
||||
full_args["sessions_limit"] = data.limit
|
||||
full_args["sessions_limit_s"] = (data.page - 1) * data.limit
|
||||
full_args["sessions_limit_e"] = data.page * data.limit
|
||||
else:
|
||||
full_args["sessions_limit"] = 200
|
||||
full_args["sessions_limit_s"] = 0
|
||||
full_args["sessions_limit_e"] = 200
|
||||
|
||||
meta_keys = []
|
||||
with pg_client.PostgresClient() as cur:
|
||||
if errors_only:
|
||||
main_query = cur.mogrify(f"""SELECT DISTINCT er.error_id,
|
||||
COALESCE((SELECT TRUE
|
||||
FROM public.user_viewed_errors AS ve
|
||||
WHERE er.error_id = ve.error_id
|
||||
AND ve.user_id = %(userId)s LIMIT 1), FALSE) AS viewed
|
||||
{query_part};""", full_args)
|
||||
|
||||
elif count_only:
|
||||
main_query = cur.mogrify(f"""SELECT COUNT(DISTINCT s.session_id) AS count_sessions,
|
||||
COUNT(DISTINCT s.user_uuid) AS count_users
|
||||
{query_part};""", full_args)
|
||||
elif data.group_by_user:
|
||||
g_sort = "count(full_sessions)"
|
||||
if data.order is None:
|
||||
data.order = schemas.SortOrderType.DESC.value
|
||||
else:
|
||||
data.order = data.order
|
||||
if data.sort is not None and data.sort != 'sessionsCount':
|
||||
sort = helper.key_to_snake_case(data.sort)
|
||||
g_sort = f"{'MIN' if data.order == schemas.SortOrderType.DESC else 'MAX'}({sort})"
|
||||
else:
|
||||
sort = 'start_ts'
|
||||
|
||||
meta_keys = metadata.get(project_id=project_id)
|
||||
main_query = cur.mogrify(f"""SELECT COUNT(*) AS count,
|
||||
COALESCE(JSONB_AGG(users_sessions)
|
||||
FILTER (WHERE rn>%(sessions_limit_s)s AND rn<=%(sessions_limit_e)s), '[]'::JSONB) AS sessions
|
||||
FROM (SELECT user_id,
|
||||
count(full_sessions) AS user_sessions_count,
|
||||
jsonb_agg(full_sessions) FILTER (WHERE rn <= 1) AS last_session,
|
||||
MIN(full_sessions.start_ts) AS first_session_ts,
|
||||
ROW_NUMBER() OVER (ORDER BY {g_sort} {data.order}) AS rn
|
||||
FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY {sort} {data.order}) AS rn
|
||||
FROM (SELECT DISTINCT ON(s.session_id) {SESSION_PROJECTION_COLS}
|
||||
{"," if len(meta_keys) > 0 else ""}{",".join([f'metadata_{m["index"]}' for m in meta_keys])}
|
||||
{query_part}
|
||||
) AS filtred_sessions
|
||||
) AS full_sessions
|
||||
GROUP BY user_id
|
||||
) AS users_sessions;""",
|
||||
full_args)
|
||||
elif ids_only:
|
||||
main_query = cur.mogrify(f"""SELECT DISTINCT ON(s.session_id) s.session_id
|
||||
{query_part}
|
||||
ORDER BY s.session_id desc
|
||||
LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s;""",
|
||||
full_args)
|
||||
else:
|
||||
if data.order is None:
|
||||
data.order = schemas.SortOrderType.DESC.value
|
||||
else:
|
||||
data.order = data.order
|
||||
sort = 'session_id'
|
||||
if data.sort is not None and data.sort != "session_id":
|
||||
# sort += " " + data.order + "," + helper.key_to_snake_case(data.sort)
|
||||
sort = helper.key_to_snake_case(data.sort)
|
||||
|
||||
meta_keys = metadata.get(project_id=project_id)
|
||||
main_query = cur.mogrify(f"""SELECT COUNT(full_sessions) AS count,
|
||||
COALESCE(JSONB_AGG(full_sessions)
|
||||
FILTER (WHERE rn>%(sessions_limit_s)s AND rn<=%(sessions_limit_e)s), '[]'::JSONB) AS sessions
|
||||
FROM (SELECT *, ROW_NUMBER() OVER (ORDER BY {sort} {data.order}, issue_score DESC) AS rn
|
||||
FROM (SELECT DISTINCT ON(s.session_id) {SESSION_PROJECTION_COLS}
|
||||
{"," if len(meta_keys) > 0 else ""}{",".join([f'metadata_{m["index"]}' for m in meta_keys])}
|
||||
{query_part}
|
||||
ORDER BY s.session_id desc) AS filtred_sessions
|
||||
ORDER BY {sort} {data.order}, issue_score DESC) AS full_sessions;""",
|
||||
full_args)
|
||||
logger.debug("--------------------")
|
||||
logger.debug(main_query)
|
||||
logger.debug("--------------------")
|
||||
try:
|
||||
cur.execute(main_query)
|
||||
sessions = cur.fetchone()
|
||||
except Exception as err:
|
||||
logger.warning("--------- SESSIONS SEARCH QUERY EXCEPTION -----------")
|
||||
logger.warning(main_query.decode('UTF-8'))
|
||||
logger.warning("--------- PAYLOAD -----------")
|
||||
logger.warning(data.model_dump_json())
|
||||
logger.warning("--------------------")
|
||||
raise err
|
||||
if errors_only or ids_only:
|
||||
return helper.list_to_camel_case(cur.fetchall())
|
||||
|
||||
if count_only:
|
||||
return helper.dict_to_camel_case(sessions)
|
||||
|
||||
total = sessions["count"]
|
||||
sessions = sessions["sessions"]
|
||||
|
||||
if data.group_by_user:
|
||||
for i, s in enumerate(sessions):
|
||||
sessions[i] = {**s.pop("last_session")[0], **s}
|
||||
sessions[i].pop("rn")
|
||||
sessions[i]["metadata"] = {k["key"]: sessions[i][f'metadata_{k["index"]}'] for k in meta_keys \
|
||||
if sessions[i][f'metadata_{k["index"]}'] is not None}
|
||||
else:
|
||||
for i, s in enumerate(sessions):
|
||||
sessions[i]["metadata"] = {k["key"]: sessions[i][f'metadata_{k["index"]}'] for k in meta_keys \
|
||||
if sessions[i][f'metadata_{k["index"]}'] is not None}
|
||||
# if not data.group_by_user and data.sort is not None and data.sort != "session_id":
|
||||
# sessions = sorted(sessions, key=lambda s: s[helper.key_to_snake_case(data.sort)],
|
||||
# reverse=data.order.upper() == "DESC")
|
||||
return {
|
||||
'total': total,
|
||||
'sessions': helper.list_to_camel_case(sessions)
|
||||
}
|
||||
|
||||
|
||||
# TODO: remove "table of" search from this function
|
||||
def search2_series(data: schemas.SessionsSearchPayloadSchema, project_id: int, density: int,
|
||||
|
|
@ -1162,68 +1002,6 @@ def search_query_parts(data: schemas.SessionsSearchPayloadSchema, error_status,
|
|||
return full_args, query_part
|
||||
|
||||
|
||||
def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None):
|
||||
if project_id is None:
|
||||
all_projects = projects.get_projects(tenant_id=tenant_id)
|
||||
else:
|
||||
all_projects = [
|
||||
projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False,
|
||||
include_gdpr=False)]
|
||||
|
||||
all_projects = {int(p["projectId"]): p["name"] for p in all_projects}
|
||||
project_ids = list(all_projects.keys())
|
||||
|
||||
available_keys = metadata.get_keys_by_projects(project_ids)
|
||||
for i in available_keys:
|
||||
available_keys[i]["user_id"] = schemas.FilterType.USER_ID
|
||||
available_keys[i]["user_anonymous_id"] = schemas.FilterType.USER_ANONYMOUS_ID
|
||||
results = {}
|
||||
for i in project_ids:
|
||||
if m_key not in available_keys[i].values():
|
||||
available_keys.pop(i)
|
||||
results[i] = {"total": 0, "sessions": [], "missingMetadata": True}
|
||||
project_ids = list(available_keys.keys())
|
||||
if len(project_ids) > 0:
|
||||
with pg_client.PostgresClient() as cur:
|
||||
sub_queries = []
|
||||
for i in project_ids:
|
||||
col_name = list(available_keys[i].keys())[list(available_keys[i].values()).index(m_key)]
|
||||
sub_queries.append(cur.mogrify(
|
||||
f"(SELECT COALESCE(COUNT(s.*)) AS count FROM public.sessions AS s WHERE s.project_id = %(id)s AND s.{col_name} = %(value)s) AS \"{i}\"",
|
||||
{"id": i, "value": m_value}).decode('UTF-8'))
|
||||
query = f"""SELECT {", ".join(sub_queries)};"""
|
||||
cur.execute(query=query)
|
||||
|
||||
rows = cur.fetchone()
|
||||
|
||||
sub_queries = []
|
||||
for i in rows.keys():
|
||||
results[i] = {"total": rows[i], "sessions": [], "missingMetadata": False, "name": all_projects[int(i)]}
|
||||
if rows[i] > 0:
|
||||
col_name = list(available_keys[int(i)].keys())[list(available_keys[int(i)].values()).index(m_key)]
|
||||
sub_queries.append(
|
||||
cur.mogrify(
|
||||
f"""(
|
||||
SELECT *
|
||||
FROM (
|
||||
SELECT DISTINCT ON(favorite_sessions.session_id, s.session_id) {SESSION_PROJECTION_COLS}
|
||||
FROM public.sessions AS s LEFT JOIN (SELECT session_id
|
||||
FROM public.user_favorite_sessions
|
||||
WHERE user_favorite_sessions.user_id = %(userId)s
|
||||
) AS favorite_sessions USING (session_id)
|
||||
WHERE s.project_id = %(id)s AND s.duration IS NOT NULL AND s.{col_name} = %(value)s
|
||||
) AS full_sessions
|
||||
ORDER BY favorite DESC, issue_score DESC
|
||||
LIMIT 10
|
||||
)""",
|
||||
{"id": i, "value": m_value, "userId": user_id}).decode('UTF-8'))
|
||||
if len(sub_queries) > 0:
|
||||
cur.execute("\nUNION\n".join(sub_queries))
|
||||
rows = cur.fetchall()
|
||||
for i in rows:
|
||||
results[str(i["project_id"])]["sessions"].append(helper.dict_to_camel_case(i))
|
||||
return results
|
||||
|
||||
|
||||
def get_user_sessions(project_id, user_id, start_date, end_date):
|
||||
with pg_client.PostgresClient() as cur:
|
||||
|
|
@ -1335,27 +1113,3 @@ def check_recording_status(project_id: int) -> dict:
|
|||
"sessionsCount": row["sessions_count"]
|
||||
}
|
||||
|
||||
|
||||
def search_sessions_by_ids(project_id: int, session_ids: list, sort_by: str = 'session_id',
|
||||
ascending: bool = False) -> dict:
|
||||
if session_ids is None or len(session_ids) == 0:
|
||||
return {"total": 0, "sessions": []}
|
||||
with pg_client.PostgresClient() as cur:
|
||||
meta_keys = metadata.get(project_id=project_id)
|
||||
params = {"project_id": project_id, "session_ids": tuple(session_ids)}
|
||||
order_direction = 'ASC' if ascending else 'DESC'
|
||||
main_query = cur.mogrify(f"""SELECT {SESSION_PROJECTION_BASE_COLS}
|
||||
{"," if len(meta_keys) > 0 else ""}{",".join([f'metadata_{m["index"]}' for m in meta_keys])}
|
||||
FROM public.sessions AS s
|
||||
WHERE project_id=%(project_id)s
|
||||
AND session_id IN %(session_ids)s
|
||||
ORDER BY {sort_by} {order_direction};""", params)
|
||||
|
||||
cur.execute(main_query)
|
||||
rows = cur.fetchall()
|
||||
if len(meta_keys) > 0:
|
||||
for s in rows:
|
||||
s["metadata"] = {}
|
||||
for m in meta_keys:
|
||||
s["metadata"][m["key"]] = s.pop(f'metadata_{m["index"]}')
|
||||
return {"total": len(rows), "sessions": helper.list_to_camel_case(rows)}
|
||||
|
|
|
|||
|
|
@ -11,193 +11,6 @@ from chalicelib.utils import sql_helper as sh
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SESSION_PROJECTION_COLS_CH = """\
|
||||
s.project_id,
|
||||
s.session_id AS session_id,
|
||||
s.user_uuid AS user_uuid,
|
||||
s.user_id AS user_id,
|
||||
s.user_os AS user_os,
|
||||
s.user_browser AS user_browser,
|
||||
s.user_device AS user_device,
|
||||
s.user_device_type AS user_device_type,
|
||||
s.user_country AS user_country,
|
||||
s.user_city AS user_city,
|
||||
s.user_state AS user_state,
|
||||
toUnixTimestamp(s.datetime)*1000 AS start_ts,
|
||||
s.duration AS duration,
|
||||
s.events_count AS events_count,
|
||||
s.pages_count AS pages_count,
|
||||
s.errors_count AS errors_count,
|
||||
s.user_anonymous_id AS user_anonymous_id,
|
||||
s.platform AS platform,
|
||||
s.timezone AS timezone,
|
||||
coalesce(issue_score,0) AS issue_score,
|
||||
s.issue_types AS issue_types
|
||||
"""
|
||||
|
||||
SESSION_PROJECTION_COLS_CH_MAP = """\
|
||||
'project_id', toString(%(project_id)s),
|
||||
'session_id', toString(s.session_id),
|
||||
'user_uuid', toString(s.user_uuid),
|
||||
'user_id', toString(s.user_id),
|
||||
'user_os', toString(s.user_os),
|
||||
'user_browser', toString(s.user_browser),
|
||||
'user_device', toString(s.user_device),
|
||||
'user_device_type', toString(s.user_device_type),
|
||||
'user_country', toString(s.user_country),
|
||||
'user_city', toString(s.user_city),
|
||||
'user_state', toString(s.user_state),
|
||||
'start_ts', toString(toUnixTimestamp(s.datetime)*1000),
|
||||
'duration', toString(s.duration),
|
||||
'events_count', toString(s.events_count),
|
||||
'pages_count', toString(s.pages_count),
|
||||
'errors_count', toString(s.errors_count),
|
||||
'user_anonymous_id', toString(s.user_anonymous_id),
|
||||
'platform', toString(s.platform),
|
||||
'timezone', toString(s.timezone),
|
||||
'issue_score', toString(coalesce(issue_score,0)),
|
||||
'viewed', toString(viewed_sessions.session_id > 0)
|
||||
"""
|
||||
|
||||
|
||||
# This function executes the query and return result
|
||||
def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_id, errors_only=False,
|
||||
error_status=schemas.ErrorStatus.ALL, count_only=False, issue=None, ids_only=False,
|
||||
platform="web"):
|
||||
if data.bookmarked:
|
||||
data.startTimestamp, data.endTimestamp = sessions_favorite.get_start_end_timestamp(project_id, user_id)
|
||||
full_args, query_part = search_query_parts_ch(data=data, error_status=error_status, errors_only=errors_only,
|
||||
favorite_only=data.bookmarked, issue=issue, project_id=project_id,
|
||||
user_id=user_id, platform=platform)
|
||||
if data.sort == "startTs":
|
||||
data.sort = "datetime"
|
||||
if data.limit is not None and data.page is not None:
|
||||
full_args["sessions_limit"] = data.limit
|
||||
full_args["sessions_limit_s"] = (data.page - 1) * data.limit
|
||||
full_args["sessions_limit_e"] = data.page * data.limit
|
||||
else:
|
||||
full_args["sessions_limit"] = 200
|
||||
full_args["sessions_limit_s"] = 0
|
||||
full_args["sessions_limit_e"] = 200
|
||||
|
||||
meta_keys = []
|
||||
with ch_client.ClickHouseClient() as cur:
|
||||
if errors_only:
|
||||
main_query = cur.format(query=f"""SELECT DISTINCT er.error_id,
|
||||
COALESCE((SELECT TRUE
|
||||
FROM {exp_ch_helper.get_user_viewed_errors_table()} AS ve
|
||||
WHERE er.error_id = ve.error_id
|
||||
AND ve.user_id = %(userId)s LIMIT 1), FALSE) AS viewed
|
||||
{query_part};""", parameters=full_args)
|
||||
|
||||
elif count_only:
|
||||
main_query = cur.format(query=f"""SELECT COUNT(DISTINCT s.session_id) AS count_sessions,
|
||||
COUNT(DISTINCT s.user_uuid) AS count_users
|
||||
{query_part};""",
|
||||
parameters=full_args)
|
||||
elif data.group_by_user:
|
||||
g_sort = "count(full_sessions)"
|
||||
if data.order is None:
|
||||
data.order = schemas.SortOrderType.DESC.value
|
||||
else:
|
||||
data.order = data.order
|
||||
if data.sort is not None and data.sort != 'sessionsCount':
|
||||
sort = helper.key_to_snake_case(data.sort)
|
||||
g_sort = f"{'MIN' if data.order == schemas.SortOrderType.DESC else 'MAX'}({sort})"
|
||||
else:
|
||||
sort = 'start_ts'
|
||||
|
||||
meta_keys = metadata.get(project_id=project_id)
|
||||
meta_map = ",map(%s) AS 'metadata'" \
|
||||
% ','.join([f"'{m['key']}',coalesce(metadata_{m['index']},'None')" for m in meta_keys])
|
||||
main_query = cur.mogrify(f"""SELECT COUNT(*) AS count,
|
||||
COALESCE(JSONB_AGG(users_sessions)
|
||||
FILTER (WHERE rn>%(sessions_limit_s)s AND rn<=%(sessions_limit_e)s), '[]'::JSONB) AS sessions
|
||||
FROM (SELECT user_id,
|
||||
count(full_sessions) AS user_sessions_count,
|
||||
jsonb_agg(full_sessions) FILTER (WHERE rn <= 1) AS last_session,
|
||||
MIN(full_sessions.start_ts) AS first_session_ts,
|
||||
ROW_NUMBER() OVER (ORDER BY {g_sort} {data.order}) AS rn
|
||||
FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY {sort} {data.order}) AS rn
|
||||
FROM (SELECT DISTINCT ON(s.session_id) {SESSION_PROJECTION_COLS_CH} {meta_map}
|
||||
{query_part}
|
||||
) AS filtred_sessions
|
||||
) AS full_sessions
|
||||
GROUP BY user_id
|
||||
) AS users_sessions;""",
|
||||
full_args)
|
||||
elif ids_only:
|
||||
main_query = cur.format(query=f"""SELECT DISTINCT ON(s.session_id) s.session_id
|
||||
{query_part}
|
||||
ORDER BY s.session_id desc
|
||||
LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s;""",
|
||||
parameters=full_args)
|
||||
else:
|
||||
if data.order is None:
|
||||
data.order = schemas.SortOrderType.DESC.value
|
||||
else:
|
||||
data.order = data.order
|
||||
sort = 'session_id'
|
||||
if data.sort is not None and data.sort != "session_id":
|
||||
# sort += " " + data.order + "," + helper.key_to_snake_case(data.sort)
|
||||
sort = helper.key_to_snake_case(data.sort)
|
||||
|
||||
meta_keys = metadata.get(project_id=project_id)
|
||||
meta_map = ",'metadata',toString(map(%s))" \
|
||||
% ','.join([f"'{m['key']}',coalesce(metadata_{m['index']},'None')" for m in meta_keys])
|
||||
main_query = cur.format(query=f"""SELECT any(total) AS count,
|
||||
groupArray(%(sessions_limit)s)(details) AS sessions
|
||||
FROM (SELECT total, details
|
||||
FROM (SELECT COUNT() OVER () AS total,
|
||||
s.{sort} AS sort_key,
|
||||
map({SESSION_PROJECTION_COLS_CH_MAP}{meta_map}) AS details
|
||||
{query_part}
|
||||
LEFT JOIN (SELECT DISTINCT session_id
|
||||
FROM experimental.user_viewed_sessions
|
||||
WHERE user_id = %(userId)s AND project_id=%(project_id)s
|
||||
AND _timestamp >= toDateTime(%(startDate)s / 1000)) AS viewed_sessions
|
||||
ON (viewed_sessions.session_id = s.session_id)
|
||||
) AS raw
|
||||
ORDER BY sort_key {data.order}
|
||||
LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s) AS sorted_sessions;""",
|
||||
parameters=full_args)
|
||||
logging.debug("--------------------")
|
||||
logging.debug(main_query)
|
||||
logging.debug("--------------------")
|
||||
try:
|
||||
sessions = cur.execute(main_query)
|
||||
except Exception as err:
|
||||
logging.warning("--------- SESSIONS-CH SEARCH QUERY EXCEPTION -----------")
|
||||
logging.warning(main_query)
|
||||
logging.warning("--------- PAYLOAD -----------")
|
||||
logging.warning(data.model_dump_json())
|
||||
logging.warning("--------------------")
|
||||
raise err
|
||||
if errors_only or ids_only:
|
||||
return helper.list_to_camel_case(sessions)
|
||||
|
||||
if len(sessions) > 0:
|
||||
sessions = sessions[0]
|
||||
|
||||
total = sessions["count"]
|
||||
sessions = sessions["sessions"]
|
||||
|
||||
if data.group_by_user:
|
||||
for i, s in enumerate(sessions):
|
||||
sessions[i] = {**s.pop("last_session")[0], **s}
|
||||
sessions[i].pop("rn")
|
||||
sessions[i]["metadata"] = ast.literal_eval(sessions[i]["metadata"])
|
||||
else:
|
||||
for i in range(len(sessions)):
|
||||
sessions[i]["metadata"] = ast.literal_eval(sessions[i]["metadata"])
|
||||
sessions[i] = schemas.SessionModel.parse_obj(helper.dict_to_camel_case(sessions[i]))
|
||||
|
||||
return {
|
||||
'total': total,
|
||||
'sessions': sessions
|
||||
}
|
||||
|
||||
|
||||
def search2_series(data: schemas.SessionsSearchPayloadSchema, project_id: int, density: int,
|
||||
metric_type: schemas.MetricType, metric_of: schemas.MetricOfTimeseries | schemas.MetricOfTable,
|
||||
metric_value: List):
|
||||
|
|
@ -1494,68 +1307,6 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
|
|||
return full_args, query_part
|
||||
|
||||
|
||||
def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None):
|
||||
if project_id is None:
|
||||
all_projects = projects.get_projects(tenant_id=tenant_id)
|
||||
else:
|
||||
all_projects = [
|
||||
projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False,
|
||||
include_gdpr=False)]
|
||||
|
||||
all_projects = {int(p["projectId"]): p["name"] for p in all_projects}
|
||||
project_ids = list(all_projects.keys())
|
||||
|
||||
available_keys = metadata.get_keys_by_projects(project_ids)
|
||||
for i in available_keys:
|
||||
available_keys[i]["user_id"] = schemas.FilterType.USER_ID
|
||||
available_keys[i]["user_anonymous_id"] = schemas.FilterType.USER_ANONYMOUS_ID
|
||||
results = {}
|
||||
for i in project_ids:
|
||||
if m_key not in available_keys[i].values():
|
||||
available_keys.pop(i)
|
||||
results[i] = {"total": 0, "sessions": [], "missingMetadata": True}
|
||||
project_ids = list(available_keys.keys())
|
||||
if len(project_ids) > 0:
|
||||
with pg_client.PostgresClient() as cur:
|
||||
sub_queries = []
|
||||
for i in project_ids:
|
||||
col_name = list(available_keys[i].keys())[list(available_keys[i].values()).index(m_key)]
|
||||
sub_queries.append(cur.mogrify(
|
||||
f"(SELECT COALESCE(COUNT(s.*)) AS count FROM public.sessions AS s WHERE s.project_id = %(id)s AND s.{col_name} = %(value)s) AS \"{i}\"",
|
||||
{"id": i, "value": m_value}).decode('UTF-8'))
|
||||
query = f"""SELECT {", ".join(sub_queries)};"""
|
||||
cur.execute(query=query)
|
||||
|
||||
rows = cur.fetchone()
|
||||
|
||||
sub_queries = []
|
||||
for i in rows.keys():
|
||||
results[i] = {"total": rows[i], "sessions": [], "missingMetadata": False, "name": all_projects[int(i)]}
|
||||
if rows[i] > 0:
|
||||
col_name = list(available_keys[int(i)].keys())[list(available_keys[int(i)].values()).index(m_key)]
|
||||
sub_queries.append(
|
||||
cur.mogrify(
|
||||
f"""(
|
||||
SELECT *
|
||||
FROM (
|
||||
SELECT DISTINCT ON(favorite_sessions.session_id, s.session_id) {SESSION_PROJECTION_COLS_CH}
|
||||
FROM public.sessions AS s LEFT JOIN (SELECT session_id
|
||||
FROM public.user_favorite_sessions
|
||||
WHERE user_favorite_sessions.user_id = %(userId)s
|
||||
) AS favorite_sessions USING (session_id)
|
||||
WHERE s.project_id = %(id)s AND s.duration IS NOT NULL AND s.{col_name} = %(value)s
|
||||
) AS full_sessions
|
||||
ORDER BY favorite DESC, issue_score DESC
|
||||
LIMIT 10
|
||||
)""",
|
||||
{"id": i, "value": m_value, "userId": user_id}).decode('UTF-8'))
|
||||
if len(sub_queries) > 0:
|
||||
cur.execute("\nUNION\n".join(sub_queries))
|
||||
rows = cur.fetchall()
|
||||
for i in rows:
|
||||
results[str(i["project_id"])]["sessions"].append(helper.dict_to_camel_case(i))
|
||||
return results
|
||||
|
||||
|
||||
def get_user_sessions(project_id, user_id, start_date, end_date):
|
||||
with pg_client.PostgresClient() as cur:
|
||||
|
|
@ -1660,8 +1411,3 @@ def check_recording_status(project_id: int) -> dict:
|
|||
"sessionsCount": row["sessions_count"]
|
||||
}
|
||||
|
||||
|
||||
# TODO: rewrite this function to use ClickHouse
|
||||
def search_sessions_by_ids(project_id: int, session_ids: list, sort_by: str = 'session_id',
|
||||
ascending: bool = False) -> dict:
|
||||
return sessions_legacy.search_sessions_by_ids(project_id, session_ids, sort_by, ascending)
|
||||
|
|
|
|||
260
api/chalicelib/core/sessions/sessions_search.py
Normal file
260
api/chalicelib/core/sessions/sessions_search.py
Normal file
|
|
@ -0,0 +1,260 @@
|
|||
import logging
|
||||
from typing import List, Union
|
||||
|
||||
import schemas
|
||||
from chalicelib.core import events, metadata, projects
|
||||
from chalicelib.core.sessions import sessions_favorite, performance_event, sessions_legacy
|
||||
from chalicelib.utils import pg_client, helper, metrics_helper
|
||||
from chalicelib.utils import sql_helper as sh
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SESSION_PROJECTION_BASE_COLS = """s.project_id,
|
||||
s.session_id::text AS session_id,
|
||||
s.user_uuid,
|
||||
s.user_id,
|
||||
s.user_os,
|
||||
s.user_browser,
|
||||
s.user_device,
|
||||
s.user_device_type,
|
||||
s.user_country,
|
||||
s.user_city,
|
||||
s.user_state,
|
||||
s.start_ts,
|
||||
s.duration,
|
||||
s.events_count,
|
||||
s.pages_count,
|
||||
s.errors_count,
|
||||
s.user_anonymous_id,
|
||||
s.platform,
|
||||
s.issue_score,
|
||||
s.timezone,
|
||||
to_jsonb(s.issue_types) AS issue_types """
|
||||
|
||||
SESSION_PROJECTION_COLS = SESSION_PROJECTION_BASE_COLS + """,
|
||||
favorite_sessions.session_id NOTNULL AS favorite,
|
||||
COALESCE((SELECT TRUE
|
||||
FROM public.user_viewed_sessions AS fs
|
||||
WHERE s.session_id = fs.session_id
|
||||
AND fs.user_id = %(userId)s LIMIT 1), FALSE) AS viewed """
|
||||
|
||||
|
||||
# This function executes the query and return result
|
||||
def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_id, errors_only=False,
|
||||
error_status=schemas.ErrorStatus.ALL, count_only=False, issue=None, ids_only=False,
|
||||
platform="web"):
|
||||
if data.bookmarked:
|
||||
data.startTimestamp, data.endTimestamp = sessions_favorite.get_start_end_timestamp(project_id, user_id)
|
||||
|
||||
full_args, query_part = sessions_legacy.search_query_parts(data=data, error_status=error_status,
|
||||
errors_only=errors_only,
|
||||
favorite_only=data.bookmarked, issue=issue,
|
||||
project_id=project_id,
|
||||
user_id=user_id, platform=platform)
|
||||
if data.limit is not None and data.page is not None:
|
||||
full_args["sessions_limit"] = data.limit
|
||||
full_args["sessions_limit_s"] = (data.page - 1) * data.limit
|
||||
full_args["sessions_limit_e"] = data.page * data.limit
|
||||
else:
|
||||
full_args["sessions_limit"] = 200
|
||||
full_args["sessions_limit_s"] = 0
|
||||
full_args["sessions_limit_e"] = 200
|
||||
|
||||
meta_keys = []
|
||||
with pg_client.PostgresClient() as cur:
|
||||
if errors_only:
|
||||
main_query = cur.mogrify(f"""SELECT DISTINCT er.error_id,
|
||||
COALESCE((SELECT TRUE
|
||||
FROM public.user_viewed_errors AS ve
|
||||
WHERE er.error_id = ve.error_id
|
||||
AND ve.user_id = %(userId)s LIMIT 1), FALSE) AS viewed
|
||||
{query_part};""", full_args)
|
||||
|
||||
elif count_only:
|
||||
main_query = cur.mogrify(f"""SELECT COUNT(DISTINCT s.session_id) AS count_sessions,
|
||||
COUNT(DISTINCT s.user_uuid) AS count_users
|
||||
{query_part};""", full_args)
|
||||
elif data.group_by_user:
|
||||
g_sort = "count(full_sessions)"
|
||||
if data.order is None:
|
||||
data.order = schemas.SortOrderType.DESC.value
|
||||
else:
|
||||
data.order = data.order
|
||||
if data.sort is not None and data.sort != 'sessionsCount':
|
||||
sort = helper.key_to_snake_case(data.sort)
|
||||
g_sort = f"{'MIN' if data.order == schemas.SortOrderType.DESC else 'MAX'}({sort})"
|
||||
else:
|
||||
sort = 'start_ts'
|
||||
|
||||
meta_keys = metadata.get(project_id=project_id)
|
||||
main_query = cur.mogrify(f"""SELECT COUNT(*) AS count,
|
||||
COALESCE(JSONB_AGG(users_sessions)
|
||||
FILTER (WHERE rn>%(sessions_limit_s)s AND rn<=%(sessions_limit_e)s), '[]'::JSONB) AS sessions
|
||||
FROM (SELECT user_id,
|
||||
count(full_sessions) AS user_sessions_count,
|
||||
jsonb_agg(full_sessions) FILTER (WHERE rn <= 1) AS last_session,
|
||||
MIN(full_sessions.start_ts) AS first_session_ts,
|
||||
ROW_NUMBER() OVER (ORDER BY {g_sort} {data.order}) AS rn
|
||||
FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY {sort} {data.order}) AS rn
|
||||
FROM (SELECT DISTINCT ON(s.session_id) {SESSION_PROJECTION_COLS}
|
||||
{"," if len(meta_keys) > 0 else ""}{",".join([f'metadata_{m["index"]}' for m in meta_keys])}
|
||||
{query_part}
|
||||
) AS filtred_sessions
|
||||
) AS full_sessions
|
||||
GROUP BY user_id
|
||||
) AS users_sessions;""",
|
||||
full_args)
|
||||
elif ids_only:
|
||||
main_query = cur.mogrify(f"""SELECT DISTINCT ON(s.session_id) s.session_id
|
||||
{query_part}
|
||||
ORDER BY s.session_id desc
|
||||
LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s;""",
|
||||
full_args)
|
||||
else:
|
||||
if data.order is None:
|
||||
data.order = schemas.SortOrderType.DESC.value
|
||||
else:
|
||||
data.order = data.order
|
||||
sort = 'session_id'
|
||||
if data.sort is not None and data.sort != "session_id":
|
||||
# sort += " " + data.order + "," + helper.key_to_snake_case(data.sort)
|
||||
sort = helper.key_to_snake_case(data.sort)
|
||||
|
||||
meta_keys = metadata.get(project_id=project_id)
|
||||
main_query = cur.mogrify(f"""SELECT COUNT(full_sessions) AS count,
|
||||
COALESCE(JSONB_AGG(full_sessions)
|
||||
FILTER (WHERE rn>%(sessions_limit_s)s AND rn<=%(sessions_limit_e)s), '[]'::JSONB) AS sessions
|
||||
FROM (SELECT *, ROW_NUMBER() OVER (ORDER BY {sort} {data.order}, issue_score DESC) AS rn
|
||||
FROM (SELECT DISTINCT ON(s.session_id) {SESSION_PROJECTION_COLS}
|
||||
{"," if len(meta_keys) > 0 else ""}{",".join([f'metadata_{m["index"]}' for m in meta_keys])}
|
||||
{query_part}
|
||||
ORDER BY s.session_id desc) AS filtred_sessions
|
||||
ORDER BY {sort} {data.order}, issue_score DESC) AS full_sessions;""",
|
||||
full_args)
|
||||
logger.debug("--------------------")
|
||||
logger.debug(main_query)
|
||||
logger.debug("--------------------")
|
||||
try:
|
||||
cur.execute(main_query)
|
||||
sessions = cur.fetchone()
|
||||
except Exception as err:
|
||||
logger.warning("--------- SESSIONS SEARCH QUERY EXCEPTION -----------")
|
||||
logger.warning(main_query.decode('UTF-8'))
|
||||
logger.warning("--------- PAYLOAD -----------")
|
||||
logger.warning(data.model_dump_json())
|
||||
logger.warning("--------------------")
|
||||
raise err
|
||||
if errors_only or ids_only:
|
||||
return helper.list_to_camel_case(cur.fetchall())
|
||||
|
||||
if count_only:
|
||||
return helper.dict_to_camel_case(sessions)
|
||||
|
||||
total = sessions["count"]
|
||||
sessions = sessions["sessions"]
|
||||
|
||||
if data.group_by_user:
|
||||
for i, s in enumerate(sessions):
|
||||
sessions[i] = {**s.pop("last_session")[0], **s}
|
||||
sessions[i].pop("rn")
|
||||
sessions[i]["metadata"] = {k["key"]: sessions[i][f'metadata_{k["index"]}'] for k in meta_keys \
|
||||
if sessions[i][f'metadata_{k["index"]}'] is not None}
|
||||
else:
|
||||
for i, s in enumerate(sessions):
|
||||
sessions[i]["metadata"] = {k["key"]: sessions[i][f'metadata_{k["index"]}'] for k in meta_keys \
|
||||
if sessions[i][f'metadata_{k["index"]}'] is not None}
|
||||
# if not data.group_by_user and data.sort is not None and data.sort != "session_id":
|
||||
# sessions = sorted(sessions, key=lambda s: s[helper.key_to_snake_case(data.sort)],
|
||||
# reverse=data.order.upper() == "DESC")
|
||||
return {
|
||||
'total': total,
|
||||
'sessions': helper.list_to_camel_case(sessions)
|
||||
}
|
||||
|
||||
|
||||
def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None):
|
||||
if project_id is None:
|
||||
all_projects = projects.get_projects(tenant_id=tenant_id)
|
||||
else:
|
||||
all_projects = [
|
||||
projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False,
|
||||
include_gdpr=False)]
|
||||
|
||||
all_projects = {int(p["projectId"]): p["name"] for p in all_projects}
|
||||
project_ids = list(all_projects.keys())
|
||||
|
||||
available_keys = metadata.get_keys_by_projects(project_ids)
|
||||
for i in available_keys:
|
||||
available_keys[i]["user_id"] = schemas.FilterType.USER_ID
|
||||
available_keys[i]["user_anonymous_id"] = schemas.FilterType.USER_ANONYMOUS_ID
|
||||
results = {}
|
||||
for i in project_ids:
|
||||
if m_key not in available_keys[i].values():
|
||||
available_keys.pop(i)
|
||||
results[i] = {"total": 0, "sessions": [], "missingMetadata": True}
|
||||
project_ids = list(available_keys.keys())
|
||||
if len(project_ids) > 0:
|
||||
with pg_client.PostgresClient() as cur:
|
||||
sub_queries = []
|
||||
for i in project_ids:
|
||||
col_name = list(available_keys[i].keys())[list(available_keys[i].values()).index(m_key)]
|
||||
sub_queries.append(cur.mogrify(
|
||||
f"(SELECT COALESCE(COUNT(s.*)) AS count FROM public.sessions AS s WHERE s.project_id = %(id)s AND s.{col_name} = %(value)s) AS \"{i}\"",
|
||||
{"id": i, "value": m_value}).decode('UTF-8'))
|
||||
query = f"""SELECT {", ".join(sub_queries)};"""
|
||||
cur.execute(query=query)
|
||||
|
||||
rows = cur.fetchone()
|
||||
|
||||
sub_queries = []
|
||||
for i in rows.keys():
|
||||
results[i] = {"total": rows[i], "sessions": [], "missingMetadata": False, "name": all_projects[int(i)]}
|
||||
if rows[i] > 0:
|
||||
col_name = list(available_keys[int(i)].keys())[list(available_keys[int(i)].values()).index(m_key)]
|
||||
sub_queries.append(
|
||||
cur.mogrify(
|
||||
f"""(
|
||||
SELECT *
|
||||
FROM (
|
||||
SELECT DISTINCT ON(favorite_sessions.session_id, s.session_id) {SESSION_PROJECTION_COLS}
|
||||
FROM public.sessions AS s LEFT JOIN (SELECT session_id
|
||||
FROM public.user_favorite_sessions
|
||||
WHERE user_favorite_sessions.user_id = %(userId)s
|
||||
) AS favorite_sessions USING (session_id)
|
||||
WHERE s.project_id = %(id)s AND s.duration IS NOT NULL AND s.{col_name} = %(value)s
|
||||
) AS full_sessions
|
||||
ORDER BY favorite DESC, issue_score DESC
|
||||
LIMIT 10
|
||||
)""",
|
||||
{"id": i, "value": m_value, "userId": user_id}).decode('UTF-8'))
|
||||
if len(sub_queries) > 0:
|
||||
cur.execute("\nUNION\n".join(sub_queries))
|
||||
rows = cur.fetchall()
|
||||
for i in rows:
|
||||
results[str(i["project_id"])]["sessions"].append(helper.dict_to_camel_case(i))
|
||||
return results
|
||||
|
||||
|
||||
def search_sessions_by_ids(project_id: int, session_ids: list, sort_by: str = 'session_id',
|
||||
ascending: bool = False) -> dict:
|
||||
if session_ids is None or len(session_ids) == 0:
|
||||
return {"total": 0, "sessions": []}
|
||||
with pg_client.PostgresClient() as cur:
|
||||
meta_keys = metadata.get(project_id=project_id)
|
||||
params = {"project_id": project_id, "session_ids": tuple(session_ids)}
|
||||
order_direction = 'ASC' if ascending else 'DESC'
|
||||
main_query = cur.mogrify(f"""SELECT {SESSION_PROJECTION_BASE_COLS}
|
||||
{"," if len(meta_keys) > 0 else ""}{",".join([f'metadata_{m["index"]}' for m in meta_keys])}
|
||||
FROM public.sessions AS s
|
||||
WHERE project_id=%(project_id)s
|
||||
AND session_id IN %(session_ids)s
|
||||
ORDER BY {sort_by} {order_direction};""", params)
|
||||
|
||||
cur.execute(main_query)
|
||||
rows = cur.fetchall()
|
||||
if len(meta_keys) > 0:
|
||||
for s in rows:
|
||||
s["metadata"] = {}
|
||||
for m in meta_keys:
|
||||
s["metadata"][m["key"]] = s.pop(f'metadata_{m["index"]}')
|
||||
return {"total": len(rows), "sessions": helper.list_to_camel_case(rows)}
|
||||
|
|
@ -1,6 +1,7 @@
|
|||
import logging
|
||||
|
||||
from chalicelib.core import sessions, assist
|
||||
from chalicelib.core import assist
|
||||
from chalicelib.core.sessions import sessions_search
|
||||
from chalicelib.core.db_request_handler import DatabaseRequestHandler
|
||||
from chalicelib.core.usability_testing.schema import UTTestCreate, UTTestSearch, UTTestUpdate
|
||||
from chalicelib.utils.TimeUTC import TimeUTC
|
||||
|
|
@ -312,7 +313,7 @@ def ut_tests_sessions(project_id: int, test_id: int, page: int, limit: int, user
|
|||
|
||||
session_ids = handler.fetchall()
|
||||
session_ids = [session['session_id'] for session in session_ids]
|
||||
sessions_list = sessions.search_sessions_by_ids(project_id=project_id, session_ids=session_ids)
|
||||
sessions_list = sessions_search.search_sessions_by_ids(project_id=project_id, session_ids=session_ids)
|
||||
sessions_list['page'] = page
|
||||
|
||||
return sessions_list
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ from chalicelib.core import errors, assist, signup, feature_flags
|
|||
from chalicelib.core.metrics import heatmaps
|
||||
from chalicelib.core.errors import errors_favorite, errors_viewed
|
||||
from chalicelib.core.sessions import sessions, sessions_notes, sessions_replay, sessions_favorite, sessions_viewed, \
|
||||
sessions_assignments, unprocessed_sessions
|
||||
sessions_assignments, unprocessed_sessions, sessions_search
|
||||
from chalicelib.core import tenants, users, projects, license
|
||||
from chalicelib.core import webhook
|
||||
from chalicelib.core.collaborations.collaboration_slack import Slack
|
||||
|
|
@ -242,8 +242,8 @@ def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] =
|
|||
if len(key) == 0:
|
||||
return {"errors": ["please provide a key for search"]}
|
||||
return {
|
||||
"data": sessions.search_by_metadata(tenant_id=context.tenant_id, user_id=context.user_id, m_value=value,
|
||||
m_key=key, project_id=projectId)}
|
||||
"data": sessions_search.search_by_metadata(tenant_id=context.tenant_id, user_id=context.user_id, m_value=value,
|
||||
m_key=key, project_id=projectId)}
|
||||
|
||||
|
||||
@app.get('/projects', tags=['projects'])
|
||||
|
|
@ -252,18 +252,18 @@ def get_projects(context: schemas.CurrentContext = Depends(OR_context)):
|
|||
|
||||
|
||||
@app.post('/{projectId}/sessions/search', tags=["sessions"])
|
||||
def sessions_search(projectId: int, data: schemas.SessionsSearchPayloadSchema = Body(...),
|
||||
def search_sessions(projectId: int, data: schemas.SessionsSearchPayloadSchema = Body(...),
|
||||
context: schemas.CurrentContext = Depends(OR_context)):
|
||||
data = sessions.search_sessions(data=data, project_id=projectId, user_id=context.user_id,
|
||||
platform=context.project.platform)
|
||||
data = sessions_search.search_sessions(data=data, project_id=projectId, user_id=context.user_id,
|
||||
platform=context.project.platform)
|
||||
return {'data': data}
|
||||
|
||||
|
||||
@app.post('/{projectId}/sessions/search/ids', tags=["sessions"])
|
||||
def session_ids_search(projectId: int, data: schemas.SessionsSearchPayloadSchema = Body(...),
|
||||
context: schemas.CurrentContext = Depends(OR_context)):
|
||||
data = sessions.search_sessions(data=data, project_id=projectId, user_id=context.user_id, ids_only=True,
|
||||
platform=context.project.platform)
|
||||
data = sessions_search.search_sessions(data=data, project_id=projectId, user_id=context.user_id, ids_only=True,
|
||||
platform=context.project.platform)
|
||||
return {'data': data}
|
||||
|
||||
|
||||
|
|
|
|||
1
ee/api/.gitignore
vendored
1
ee/api/.gitignore
vendored
|
|
@ -221,6 +221,7 @@ Pipfile.lock
|
|||
/chalicelib/core/sessions/sessions_metas.py
|
||||
/chalicelib/core/sessions/sessions_mobs.py
|
||||
/chalicelib/core/sessions/sessions_replay.py
|
||||
/chalicelib/core/sessions/sessions_search.py
|
||||
/chalicelib/core/sessions/performance_event.py
|
||||
/chalicelib/core/sessions/sessions_viewed.py
|
||||
/chalicelib/core/sessions/unprocessed_sessions.py
|
||||
|
|
|
|||
|
|
@ -8,8 +8,10 @@ from . import sessions as sessions_legacy
|
|||
if config("EXP_SESSIONS_SEARCH", cast=bool, default=False):
|
||||
logger.info(">>> Using experimental sessions search")
|
||||
from . import sessions_ch as sessions
|
||||
from . import sessions_search_exp as sessions_search
|
||||
else:
|
||||
from . import sessions
|
||||
from . import sessions_search_exp
|
||||
|
||||
from chalicelib.core.sessions import sessions_devtool_ee as sessions_devtool
|
||||
from chalicelib.core.sessions import sessions_viewed_ee as sessions_viewed
|
||||
|
|
|
|||
269
ee/api/chalicelib/core/sessions/sessions_search_exp.py
Normal file
269
ee/api/chalicelib/core/sessions/sessions_search_exp.py
Normal file
|
|
@ -0,0 +1,269 @@
|
|||
import ast
|
||||
import logging
|
||||
from typing import List, Union
|
||||
|
||||
import schemas
|
||||
from chalicelib.core import events, metadata, projects
|
||||
from chalicelib.core.metrics import metrics
|
||||
from chalicelib.core.sessions import sessions_favorite, performance_event, sessions_legacy, sessions
|
||||
from chalicelib.utils import pg_client, helper, metrics_helper, ch_client, exp_ch_helper
|
||||
from chalicelib.utils import sql_helper as sh
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SESSION_PROJECTION_COLS_CH = """\
|
||||
s.project_id,
|
||||
s.session_id AS session_id,
|
||||
s.user_uuid AS user_uuid,
|
||||
s.user_id AS user_id,
|
||||
s.user_os AS user_os,
|
||||
s.user_browser AS user_browser,
|
||||
s.user_device AS user_device,
|
||||
s.user_device_type AS user_device_type,
|
||||
s.user_country AS user_country,
|
||||
s.user_city AS user_city,
|
||||
s.user_state AS user_state,
|
||||
toUnixTimestamp(s.datetime)*1000 AS start_ts,
|
||||
s.duration AS duration,
|
||||
s.events_count AS events_count,
|
||||
s.pages_count AS pages_count,
|
||||
s.errors_count AS errors_count,
|
||||
s.user_anonymous_id AS user_anonymous_id,
|
||||
s.platform AS platform,
|
||||
s.timezone AS timezone,
|
||||
coalesce(issue_score,0) AS issue_score,
|
||||
s.issue_types AS issue_types
|
||||
"""
|
||||
|
||||
SESSION_PROJECTION_COLS_CH_MAP = """\
|
||||
'project_id', toString(%(project_id)s),
|
||||
'session_id', toString(s.session_id),
|
||||
'user_uuid', toString(s.user_uuid),
|
||||
'user_id', toString(s.user_id),
|
||||
'user_os', toString(s.user_os),
|
||||
'user_browser', toString(s.user_browser),
|
||||
'user_device', toString(s.user_device),
|
||||
'user_device_type', toString(s.user_device_type),
|
||||
'user_country', toString(s.user_country),
|
||||
'user_city', toString(s.user_city),
|
||||
'user_state', toString(s.user_state),
|
||||
'start_ts', toString(toUnixTimestamp(s.datetime)*1000),
|
||||
'duration', toString(s.duration),
|
||||
'events_count', toString(s.events_count),
|
||||
'pages_count', toString(s.pages_count),
|
||||
'errors_count', toString(s.errors_count),
|
||||
'user_anonymous_id', toString(s.user_anonymous_id),
|
||||
'platform', toString(s.platform),
|
||||
'timezone', toString(s.timezone),
|
||||
'issue_score', toString(coalesce(issue_score,0)),
|
||||
'viewed', toString(viewed_sessions.session_id > 0)
|
||||
"""
|
||||
|
||||
|
||||
# This function executes the query and return result
|
||||
def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_id, errors_only=False,
|
||||
error_status=schemas.ErrorStatus.ALL, count_only=False, issue=None, ids_only=False,
|
||||
platform="web"):
|
||||
if data.bookmarked:
|
||||
data.startTimestamp, data.endTimestamp = sessions_favorite.get_start_end_timestamp(project_id, user_id)
|
||||
full_args, query_part = sessions.search_query_parts_ch(data=data, error_status=error_status,
|
||||
errors_only=errors_only,
|
||||
favorite_only=data.bookmarked, issue=issue,
|
||||
project_id=project_id,
|
||||
user_id=user_id, platform=platform)
|
||||
if data.sort == "startTs":
|
||||
data.sort = "datetime"
|
||||
if data.limit is not None and data.page is not None:
|
||||
full_args["sessions_limit"] = data.limit
|
||||
full_args["sessions_limit_s"] = (data.page - 1) * data.limit
|
||||
full_args["sessions_limit_e"] = data.page * data.limit
|
||||
else:
|
||||
full_args["sessions_limit"] = 200
|
||||
full_args["sessions_limit_s"] = 0
|
||||
full_args["sessions_limit_e"] = 200
|
||||
|
||||
meta_keys = []
|
||||
with ch_client.ClickHouseClient() as cur:
|
||||
if errors_only:
|
||||
main_query = cur.format(query=f"""SELECT DISTINCT er.error_id,
|
||||
COALESCE((SELECT TRUE
|
||||
FROM {exp_ch_helper.get_user_viewed_errors_table()} AS ve
|
||||
WHERE er.error_id = ve.error_id
|
||||
AND ve.user_id = %(userId)s LIMIT 1), FALSE) AS viewed
|
||||
{query_part};""", parameters=full_args)
|
||||
|
||||
elif count_only:
|
||||
main_query = cur.format(query=f"""SELECT COUNT(DISTINCT s.session_id) AS count_sessions,
|
||||
COUNT(DISTINCT s.user_uuid) AS count_users
|
||||
{query_part};""",
|
||||
parameters=full_args)
|
||||
elif data.group_by_user:
|
||||
g_sort = "count(full_sessions)"
|
||||
if data.order is None:
|
||||
data.order = schemas.SortOrderType.DESC.value
|
||||
else:
|
||||
data.order = data.order
|
||||
if data.sort is not None and data.sort != 'sessionsCount':
|
||||
sort = helper.key_to_snake_case(data.sort)
|
||||
g_sort = f"{'MIN' if data.order == schemas.SortOrderType.DESC else 'MAX'}({sort})"
|
||||
else:
|
||||
sort = 'start_ts'
|
||||
|
||||
meta_keys = metadata.get(project_id=project_id)
|
||||
meta_map = ",map(%s) AS 'metadata'" \
|
||||
% ','.join([f"'{m['key']}',coalesce(metadata_{m['index']},'None')" for m in meta_keys])
|
||||
main_query = cur.mogrify(f"""SELECT COUNT(*) AS count,
|
||||
COALESCE(JSONB_AGG(users_sessions)
|
||||
FILTER (WHERE rn>%(sessions_limit_s)s AND rn<=%(sessions_limit_e)s), '[]'::JSONB) AS sessions
|
||||
FROM (SELECT user_id,
|
||||
count(full_sessions) AS user_sessions_count,
|
||||
jsonb_agg(full_sessions) FILTER (WHERE rn <= 1) AS last_session,
|
||||
MIN(full_sessions.start_ts) AS first_session_ts,
|
||||
ROW_NUMBER() OVER (ORDER BY {g_sort} {data.order}) AS rn
|
||||
FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY {sort} {data.order}) AS rn
|
||||
FROM (SELECT DISTINCT ON(s.session_id) {SESSION_PROJECTION_COLS_CH} {meta_map}
|
||||
{query_part}
|
||||
) AS filtred_sessions
|
||||
) AS full_sessions
|
||||
GROUP BY user_id
|
||||
) AS users_sessions;""",
|
||||
full_args)
|
||||
elif ids_only:
|
||||
main_query = cur.format(query=f"""SELECT DISTINCT ON(s.session_id) s.session_id
|
||||
{query_part}
|
||||
ORDER BY s.session_id desc
|
||||
LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s;""",
|
||||
parameters=full_args)
|
||||
else:
|
||||
if data.order is None:
|
||||
data.order = schemas.SortOrderType.DESC.value
|
||||
else:
|
||||
data.order = data.order
|
||||
sort = 'session_id'
|
||||
if data.sort is not None and data.sort != "session_id":
|
||||
# sort += " " + data.order + "," + helper.key_to_snake_case(data.sort)
|
||||
sort = helper.key_to_snake_case(data.sort)
|
||||
|
||||
meta_keys = metadata.get(project_id=project_id)
|
||||
meta_map = ",'metadata',toString(map(%s))" \
|
||||
% ','.join([f"'{m['key']}',coalesce(metadata_{m['index']},'None')" for m in meta_keys])
|
||||
main_query = cur.format(query=f"""SELECT any(total) AS count,
|
||||
groupArray(%(sessions_limit)s)(details) AS sessions
|
||||
FROM (SELECT total, details
|
||||
FROM (SELECT COUNT() OVER () AS total,
|
||||
s.{sort} AS sort_key,
|
||||
map({SESSION_PROJECTION_COLS_CH_MAP}{meta_map}) AS details
|
||||
{query_part}
|
||||
LEFT JOIN (SELECT DISTINCT session_id
|
||||
FROM experimental.user_viewed_sessions
|
||||
WHERE user_id = %(userId)s AND project_id=%(project_id)s
|
||||
AND _timestamp >= toDateTime(%(startDate)s / 1000)) AS viewed_sessions
|
||||
ON (viewed_sessions.session_id = s.session_id)
|
||||
) AS raw
|
||||
ORDER BY sort_key {data.order}
|
||||
LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s) AS sorted_sessions;""",
|
||||
parameters=full_args)
|
||||
logging.debug("--------------------")
|
||||
logging.debug(main_query)
|
||||
logging.debug("--------------------")
|
||||
try:
|
||||
sessions = cur.execute(main_query)
|
||||
except Exception as err:
|
||||
logging.warning("--------- SESSIONS-CH SEARCH QUERY EXCEPTION -----------")
|
||||
logging.warning(main_query)
|
||||
logging.warning("--------- PAYLOAD -----------")
|
||||
logging.warning(data.model_dump_json())
|
||||
logging.warning("--------------------")
|
||||
raise err
|
||||
if errors_only or ids_only:
|
||||
return helper.list_to_camel_case(sessions)
|
||||
|
||||
if len(sessions) > 0:
|
||||
sessions = sessions[0]
|
||||
|
||||
total = sessions["count"]
|
||||
sessions = sessions["sessions"]
|
||||
|
||||
if data.group_by_user:
|
||||
for i, s in enumerate(sessions):
|
||||
sessions[i] = {**s.pop("last_session")[0], **s}
|
||||
sessions[i].pop("rn")
|
||||
sessions[i]["metadata"] = ast.literal_eval(sessions[i]["metadata"])
|
||||
else:
|
||||
for i in range(len(sessions)):
|
||||
sessions[i]["metadata"] = ast.literal_eval(sessions[i]["metadata"])
|
||||
sessions[i] = schemas.SessionModel.parse_obj(helper.dict_to_camel_case(sessions[i]))
|
||||
|
||||
return {
|
||||
'total': total,
|
||||
'sessions': sessions
|
||||
}
|
||||
|
||||
|
||||
def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None):
|
||||
if project_id is None:
|
||||
all_projects = projects.get_projects(tenant_id=tenant_id)
|
||||
else:
|
||||
all_projects = [
|
||||
projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False,
|
||||
include_gdpr=False)]
|
||||
|
||||
all_projects = {int(p["projectId"]): p["name"] for p in all_projects}
|
||||
project_ids = list(all_projects.keys())
|
||||
|
||||
available_keys = metadata.get_keys_by_projects(project_ids)
|
||||
for i in available_keys:
|
||||
available_keys[i]["user_id"] = schemas.FilterType.USER_ID
|
||||
available_keys[i]["user_anonymous_id"] = schemas.FilterType.USER_ANONYMOUS_ID
|
||||
results = {}
|
||||
for i in project_ids:
|
||||
if m_key not in available_keys[i].values():
|
||||
available_keys.pop(i)
|
||||
results[i] = {"total": 0, "sessions": [], "missingMetadata": True}
|
||||
project_ids = list(available_keys.keys())
|
||||
if len(project_ids) > 0:
|
||||
with pg_client.PostgresClient() as cur:
|
||||
sub_queries = []
|
||||
for i in project_ids:
|
||||
col_name = list(available_keys[i].keys())[list(available_keys[i].values()).index(m_key)]
|
||||
sub_queries.append(cur.mogrify(
|
||||
f"(SELECT COALESCE(COUNT(s.*)) AS count FROM public.sessions AS s WHERE s.project_id = %(id)s AND s.{col_name} = %(value)s) AS \"{i}\"",
|
||||
{"id": i, "value": m_value}).decode('UTF-8'))
|
||||
query = f"""SELECT {", ".join(sub_queries)};"""
|
||||
cur.execute(query=query)
|
||||
|
||||
rows = cur.fetchone()
|
||||
|
||||
sub_queries = []
|
||||
for i in rows.keys():
|
||||
results[i] = {"total": rows[i], "sessions": [], "missingMetadata": False, "name": all_projects[int(i)]}
|
||||
if rows[i] > 0:
|
||||
col_name = list(available_keys[int(i)].keys())[list(available_keys[int(i)].values()).index(m_key)]
|
||||
sub_queries.append(
|
||||
cur.mogrify(
|
||||
f"""(
|
||||
SELECT *
|
||||
FROM (
|
||||
SELECT DISTINCT ON(favorite_sessions.session_id, s.session_id) {SESSION_PROJECTION_COLS_CH}
|
||||
FROM public.sessions AS s LEFT JOIN (SELECT session_id
|
||||
FROM public.user_favorite_sessions
|
||||
WHERE user_favorite_sessions.user_id = %(userId)s
|
||||
) AS favorite_sessions USING (session_id)
|
||||
WHERE s.project_id = %(id)s AND s.duration IS NOT NULL AND s.{col_name} = %(value)s
|
||||
) AS full_sessions
|
||||
ORDER BY favorite DESC, issue_score DESC
|
||||
LIMIT 10
|
||||
)""",
|
||||
{"id": i, "value": m_value, "userId": user_id}).decode('UTF-8'))
|
||||
if len(sub_queries) > 0:
|
||||
cur.execute("\nUNION\n".join(sub_queries))
|
||||
rows = cur.fetchall()
|
||||
for i in rows:
|
||||
results[str(i["project_id"])]["sessions"].append(helper.dict_to_camel_case(i))
|
||||
return results
|
||||
|
||||
|
||||
# TODO: rewrite this function to use ClickHouse
|
||||
def search_sessions_by_ids(project_id: int, session_ids: list, sort_by: str = 'session_id',
|
||||
ascending: bool = False) -> dict:
|
||||
return sessions_legacy.search_sessions_by_ids(project_id, session_ids, sort_by, ascending)
|
||||
|
|
@ -42,6 +42,7 @@ rm -rf ./chalicelib/core/sessions/sessions_assignments.py
|
|||
rm -rf ./chalicelib/core/sessions/sessions_metas.py
|
||||
rm -rf ./chalicelib/core/sessions/sessions_mobs.py
|
||||
rm -rf ./chalicelib/core/sessions/sessions_replay.py
|
||||
rm -rf ./chalicelib/core/sessions/sessions_search.py
|
||||
rm -rf ./chalicelib/core/sessions/performance_event.py
|
||||
rm -rf ./chalicelib/core/sessions/sessions_viewed.py
|
||||
rm -rf ./chalicelib/core/sessions/unprocessed_sessions.py
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue