From 9ae7d97e79d89bcba40ed97e8e79eafe0557ac94 Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Wed, 1 Sep 2021 18:50:14 +0100 Subject: [PATCH] feat(api): insights journey search --- api/chalicelib/core/insights.py | 91 +++++++++++++++++---------------- 1 file changed, 48 insertions(+), 43 deletions(-) diff --git a/api/chalicelib/core/insights.py b/api/chalicelib/core/insights.py index 7e12b9dcc..d470e1537 100644 --- a/api/chalicelib/core/insights.py +++ b/api/chalicelib/core/insights.py @@ -1,4 +1,4 @@ -from chalicelib.core import metadata +from chalicelib.core import sessions_metas from chalicelib.utils import args_transformer from chalicelib.utils import helper, dev from chalicelib.utils import pg_client @@ -22,33 +22,33 @@ def __transform_journey(rows): return {"nodes": nodes, "links": sorted(links, key=lambda x: x["value"], reverse=True)} +JOURNEY_DEPTH = 5 +JOURNEY_TYPES = { + "PAGES": {"table": "events.pages", "column": "base_path", "table_id": "message_id"}, + "CLICK": {"table": "events.clicks", "column": "label", "table_id": "message_id"}, + "VIEW": {"table": "events_ios.views", "column": "name", "table_id": "seq_index"}, + "EVENT": {"table": "events_common.customs", "column": "name", "table_id": "seq_index"} +} + + @dev.timed -def get_journey(project_id, startTimestamp=TimeUTC.now(delta_days=-1), endTimestamp=TimeUTC.now(), **args): +def get_journey(project_id, startTimestamp=TimeUTC.now(delta_days=-1), endTimestamp=TimeUTC.now(), filters=[], **args): pg_sub_query_subset = __get_constraints(project_id=project_id, data=args, duration=True, main_table="sessions", time_constraint=True) - # pg_sub_query_chart = __get_constraints(project_id=project_id, time_constraint=False, - # chart=True, data=args, main_table="events.pages", time_column="timestamp", - # project=False, duration=False) - # pg_sub_query_subset.append("m_errors.source = 'js_exception'") - # pg_sub_query_subset.append("pages.timestamp>=%(startTimestamp)s") - # pg_sub_query_subset.append("pages.timestamp<%(endTimestamp)s") - # with pg_client.PostgresClient() as cur: - # pg_query = f"""WITH errors AS (SELECT DISTINCT session_id, timestamp - # FROM events.errors - # INNER JOIN public.errors AS m_errors USING (error_id) - # WHERE {" AND ".join(pg_sub_query_subset)} - # ) - # SELECT generated_timestamp AS timestamp, - # COALESCE(COUNT(sessions), 0) AS count - # FROM generate_series(%(startTimestamp)s, %(endTimestamp)s, %(step_size)s) AS generated_timestamp - # LEFT JOIN LATERAL ( SELECT session_id - # FROM errors - # WHERE {" AND ".join(pg_sub_query_chart)} - # ) AS sessions ON (TRUE) - # GROUP BY generated_timestamp - # ORDER BY generated_timestamp;""" - # params = {"step_size": step_size, "project_id": project_id, "startTimestamp": startTimestamp, - # "endTimestamp": endTimestamp, **__get_constraint_values(args)} + event_start = None + event_table = JOURNEY_TYPES["PAGES"]["table"] + event_column = JOURNEY_TYPES["PAGES"]["column"] + event_table_id = JOURNEY_TYPES["PAGES"]["table_id"] + extra_values = {} + for f in filters: + if f["type"] == "START_POINT": + event_start = f["value"] + elif f["type"] == "EVENT_TYPE" and JOURNEY_TYPES.get(f["value"]): + event_table = JOURNEY_TYPES[f["value"]]["table"] + event_column = JOURNEY_TYPES[f["value"]]["column"] + elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]: + pg_sub_query_subset.append(f"sessions.user_id = %(user_id)s") + extra_values["user_id"] = f["value"] with pg_client.PostgresClient() as cur: pg_query = f"""SELECT source_event, @@ -57,42 +57,47 @@ def get_journey(project_id, startTimestamp=TimeUTC.now(delta_days=-1), endTimest MAX(source_id) max_source_id, count(*) AS value - FROM (SELECT event_number || '_' || path as target_event, - message_id as target_id, - LAG(event_number || '_' || path, 1) OVER ( PARTITION BY session_rank ) AS source_event, - LAG(message_id, 1) OVER ( PARTITION BY session_rank ) AS source_id - FROM (SELECT path, + FROM (SELECT event_number || '_' || value as target_event, + message_id AS target_id, + LAG(event_number || '_' || value, 1) OVER ( PARTITION BY session_rank ) AS source_event, + LAG(message_id, 1) OVER ( PARTITION BY session_rank ) AS source_id + FROM (SELECT value, session_rank, message_id, ROW_NUMBER() OVER ( PARTITION BY session_rank ORDER BY timestamp ) AS event_number - FROM (SELECT message_id, + + {f"FROM (SELECT * FROM (SELECT *, MIN(mark) OVER ( PARTITION BY session_id , session_rank ORDER BY timestamp ) AS max FROM (SELECT *, CASE WHEN value = %(event_start)s THEN timestamp ELSE NULL END as mark" + if event_start else ""} + + FROM (SELECT session_id, + message_id, timestamp, - path, + value, SUM(new_session) OVER (ORDER BY session_id, timestamp) AS session_rank FROM (SELECT *, CASE WHEN source_timestamp IS NULL THEN 1 ELSE 0 END AS new_session FROM (SELECT session_id, - message_id, + {event_table_id} AS message_id, timestamp, - path, - LAG(pages.timestamp) - OVER ( - PARTITION BY session_id - ORDER BY timestamp) AS source_timestamp - FROM events.pages - INNER JOIN public.sessions USING (session_id) + {event_column} AS value, + LAG(timestamp) + OVER (PARTITION BY session_id ORDER BY timestamp) AS source_timestamp + FROM {event_table} INNER JOIN public.sessions USING (session_id) WHERE {" AND ".join(pg_sub_query_subset)} - ) AS related_events) AS ranked_events) AS processed) AS sorted_events - WHERE event_number <= 4) AS final + ) AS related_events) AS ranked_events) AS processed + {") AS marked) AS maxed WHERE timestamp >= max) AS filtered" if event_start else ""} + ) AS sorted_events + WHERE event_number <= %(JOURNEY_DEPTH)s) AS final WHERE source_event IS NOT NULL and target_event IS NOT NULL GROUP BY source_event, target_event ORDER BY value DESC LIMIT 20;""" params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args)} + "endTimestamp": endTimestamp, "event_start": event_start, "JOURNEY_DEPTH": JOURNEY_DEPTH, + **__get_constraint_values(args), **extra_values} print(cur.mogrify(pg_query, params)) cur.execute(cur.mogrify(pg_query, params)) rows = cur.fetchall()