feat(api): insights journey search
This commit is contained in:
parent
13e2ad7c08
commit
9ae7d97e79
1 changed files with 48 additions and 43 deletions
|
|
@ -1,4 +1,4 @@
|
|||
from chalicelib.core import metadata
|
||||
from chalicelib.core import sessions_metas
|
||||
from chalicelib.utils import args_transformer
|
||||
from chalicelib.utils import helper, dev
|
||||
from chalicelib.utils import pg_client
|
||||
|
|
@ -22,33 +22,33 @@ def __transform_journey(rows):
|
|||
return {"nodes": nodes, "links": sorted(links, key=lambda x: x["value"], reverse=True)}
|
||||
|
||||
|
||||
JOURNEY_DEPTH = 5
|
||||
JOURNEY_TYPES = {
|
||||
"PAGES": {"table": "events.pages", "column": "base_path", "table_id": "message_id"},
|
||||
"CLICK": {"table": "events.clicks", "column": "label", "table_id": "message_id"},
|
||||
"VIEW": {"table": "events_ios.views", "column": "name", "table_id": "seq_index"},
|
||||
"EVENT": {"table": "events_common.customs", "column": "name", "table_id": "seq_index"}
|
||||
}
|
||||
|
||||
|
||||
@dev.timed
|
||||
def get_journey(project_id, startTimestamp=TimeUTC.now(delta_days=-1), endTimestamp=TimeUTC.now(), **args):
|
||||
def get_journey(project_id, startTimestamp=TimeUTC.now(delta_days=-1), endTimestamp=TimeUTC.now(), filters=[], **args):
|
||||
pg_sub_query_subset = __get_constraints(project_id=project_id, data=args, duration=True, main_table="sessions",
|
||||
time_constraint=True)
|
||||
# pg_sub_query_chart = __get_constraints(project_id=project_id, time_constraint=False,
|
||||
# chart=True, data=args, main_table="events.pages", time_column="timestamp",
|
||||
# project=False, duration=False)
|
||||
# pg_sub_query_subset.append("m_errors.source = 'js_exception'")
|
||||
# pg_sub_query_subset.append("pages.timestamp>=%(startTimestamp)s")
|
||||
# pg_sub_query_subset.append("pages.timestamp<%(endTimestamp)s")
|
||||
# with pg_client.PostgresClient() as cur:
|
||||
# pg_query = f"""WITH errors AS (SELECT DISTINCT session_id, timestamp
|
||||
# FROM events.errors
|
||||
# INNER JOIN public.errors AS m_errors USING (error_id)
|
||||
# WHERE {" AND ".join(pg_sub_query_subset)}
|
||||
# )
|
||||
# SELECT generated_timestamp AS timestamp,
|
||||
# COALESCE(COUNT(sessions), 0) AS count
|
||||
# FROM generate_series(%(startTimestamp)s, %(endTimestamp)s, %(step_size)s) AS generated_timestamp
|
||||
# LEFT JOIN LATERAL ( SELECT session_id
|
||||
# FROM errors
|
||||
# WHERE {" AND ".join(pg_sub_query_chart)}
|
||||
# ) AS sessions ON (TRUE)
|
||||
# GROUP BY generated_timestamp
|
||||
# ORDER BY generated_timestamp;"""
|
||||
# params = {"step_size": step_size, "project_id": project_id, "startTimestamp": startTimestamp,
|
||||
# "endTimestamp": endTimestamp, **__get_constraint_values(args)}
|
||||
event_start = None
|
||||
event_table = JOURNEY_TYPES["PAGES"]["table"]
|
||||
event_column = JOURNEY_TYPES["PAGES"]["column"]
|
||||
event_table_id = JOURNEY_TYPES["PAGES"]["table_id"]
|
||||
extra_values = {}
|
||||
for f in filters:
|
||||
if f["type"] == "START_POINT":
|
||||
event_start = f["value"]
|
||||
elif f["type"] == "EVENT_TYPE" and JOURNEY_TYPES.get(f["value"]):
|
||||
event_table = JOURNEY_TYPES[f["value"]]["table"]
|
||||
event_column = JOURNEY_TYPES[f["value"]]["column"]
|
||||
elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]:
|
||||
pg_sub_query_subset.append(f"sessions.user_id = %(user_id)s")
|
||||
extra_values["user_id"] = f["value"]
|
||||
|
||||
with pg_client.PostgresClient() as cur:
|
||||
pg_query = f"""SELECT source_event,
|
||||
|
|
@ -57,42 +57,47 @@ def get_journey(project_id, startTimestamp=TimeUTC.now(delta_days=-1), endTimest
|
|||
MAX(source_id) max_source_id,
|
||||
count(*) AS value
|
||||
|
||||
FROM (SELECT event_number || '_' || path as target_event,
|
||||
message_id as target_id,
|
||||
LAG(event_number || '_' || path, 1) OVER ( PARTITION BY session_rank ) AS source_event,
|
||||
LAG(message_id, 1) OVER ( PARTITION BY session_rank ) AS source_id
|
||||
FROM (SELECT path,
|
||||
FROM (SELECT event_number || '_' || value as target_event,
|
||||
message_id AS target_id,
|
||||
LAG(event_number || '_' || value, 1) OVER ( PARTITION BY session_rank ) AS source_event,
|
||||
LAG(message_id, 1) OVER ( PARTITION BY session_rank ) AS source_id
|
||||
FROM (SELECT value,
|
||||
session_rank,
|
||||
message_id,
|
||||
ROW_NUMBER() OVER ( PARTITION BY session_rank ORDER BY timestamp ) AS event_number
|
||||
FROM (SELECT message_id,
|
||||
|
||||
{f"FROM (SELECT * FROM (SELECT *, MIN(mark) OVER ( PARTITION BY session_id , session_rank ORDER BY timestamp ) AS max FROM (SELECT *, CASE WHEN value = %(event_start)s THEN timestamp ELSE NULL END as mark"
|
||||
if event_start else ""}
|
||||
|
||||
FROM (SELECT session_id,
|
||||
message_id,
|
||||
timestamp,
|
||||
path,
|
||||
value,
|
||||
SUM(new_session) OVER (ORDER BY session_id, timestamp) AS session_rank
|
||||
FROM (SELECT *,
|
||||
CASE
|
||||
WHEN source_timestamp IS NULL THEN 1
|
||||
ELSE 0 END AS new_session
|
||||
FROM (SELECT session_id,
|
||||
message_id,
|
||||
{event_table_id} AS message_id,
|
||||
timestamp,
|
||||
path,
|
||||
LAG(pages.timestamp)
|
||||
OVER (
|
||||
PARTITION BY session_id
|
||||
ORDER BY timestamp) AS source_timestamp
|
||||
FROM events.pages
|
||||
INNER JOIN public.sessions USING (session_id)
|
||||
{event_column} AS value,
|
||||
LAG(timestamp)
|
||||
OVER (PARTITION BY session_id ORDER BY timestamp) AS source_timestamp
|
||||
FROM {event_table} INNER JOIN public.sessions USING (session_id)
|
||||
WHERE {" AND ".join(pg_sub_query_subset)}
|
||||
) AS related_events) AS ranked_events) AS processed) AS sorted_events
|
||||
WHERE event_number <= 4) AS final
|
||||
) AS related_events) AS ranked_events) AS processed
|
||||
{") AS marked) AS maxed WHERE timestamp >= max) AS filtered" if event_start else ""}
|
||||
) AS sorted_events
|
||||
WHERE event_number <= %(JOURNEY_DEPTH)s) AS final
|
||||
WHERE source_event IS NOT NULL
|
||||
and target_event IS NOT NULL
|
||||
GROUP BY source_event, target_event
|
||||
ORDER BY value DESC
|
||||
LIMIT 20;"""
|
||||
params = {"project_id": project_id, "startTimestamp": startTimestamp,
|
||||
"endTimestamp": endTimestamp, **__get_constraint_values(args)}
|
||||
"endTimestamp": endTimestamp, "event_start": event_start, "JOURNEY_DEPTH": JOURNEY_DEPTH,
|
||||
**__get_constraint_values(args), **extra_values}
|
||||
print(cur.mogrify(pg_query, params))
|
||||
cur.execute(cur.mogrify(pg_query, params))
|
||||
rows = cur.fetchall()
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue