diff --git a/api/chalicelib/core/product_analytics.py b/api/chalicelib/core/product_analytics.py index 1b92c7c1f..99950bbd1 100644 --- a/api/chalicelib/core/product_analytics.py +++ b/api/chalicelib/core/product_analytics.py @@ -40,7 +40,19 @@ def __transform_journey2(rows, reverse_path=False): def __transform_journey3(rows, reverse_path=False): - # nodes should contain duplicates for different steps otherwise the UI crashes + total_100p = 0 + number_of_step1 = 0 + for r in rows: + if r["event_number_in_session"] > 1: + break + number_of_step1 += 1 + total_100p += r["sessions_count"] + for i in range(number_of_step1): + rows[i]["value"] = round(number=100 / number_of_step1, ndigits=2) + + for i in range(number_of_step1, len(rows)): + rows[i]["value"] = round(number=rows[i]["sessions_count"] * 100 / total_100p, ndigits=2) + nodes = [] nodes_values = [] links = [] @@ -54,8 +66,8 @@ def __transform_journey3(rows, reverse_path=False): if target not in nodes: nodes.append(target) nodes_values.append({"name": r['next_value'], "eventType": r['next_type']}) - link = {"eventType": r['event_type'], "value": r["sessions_count"], - "avgTimeFromPervious": r["avg_time_from_previous"]} + link = {"eventType": r['event_type'], "sessionsCount": r["sessions_count"], + "value": r["value"], "avgTimeFromPrevious": r["avg_time_from_previous"]} if not reverse_path: link["source"] = nodes.index(source) link["target"] = nodes.index(target) @@ -76,328 +88,6 @@ JOURNEY_TYPES = { } -# query: Q2, the result is correct -def path_analysis_deprecated(project_id: int, data: schemas.CardPathAnalysis): - sub_events = [] - start_points_join = "" - start_points_conditions = [] - sessions_conditions = ["start_ts>=%(startTimestamp)s", "start_ts<%(endTimestamp)s", - "project_id=%(project_id)s", "events_count > 1", "duration>0"] - if len(data.metric_value) == 0: - data.metric_value.append(schemas.ProductAnalyticsSelectedEventType.location) - sub_events.append({"table": JOURNEY_TYPES[schemas.ProductAnalyticsSelectedEventType.location]["table"], - "column": JOURNEY_TYPES[schemas.ProductAnalyticsSelectedEventType.location]["column"], - "eventType": schemas.ProductAnalyticsSelectedEventType.location.value}) - else: - for v in data.metric_value: - if JOURNEY_TYPES.get(v): - sub_events.append({"table": JOURNEY_TYPES[v]["table"], - "column": JOURNEY_TYPES[v]["column"], - "eventType": v}) - - extra_values = {} - reverse = data.start_type == "end" - for i, sf in enumerate(data.start_point): - f_k = f"start_point_{i}" - op = sh.get_sql_operator(sf.operator) - is_not = sh.is_negation_operator(sf.operator) - extra_values = {**extra_values, **sh.multi_values(sf.value, value_key=f_k)} - start_points_conditions.append(f"(event_type='{sf.type}' AND " + - sh.multi_conditions(f'e_value {op} %({f_k})s', sf.value, is_not=is_not, - value_key=f_k) - + ")") - - exclusions = {} - for i, ef in enumerate(data.excludes): - if ef.type in data.metric_value: - f_k = f"exclude_{i}" - extra_values = {**extra_values, **sh.multi_values(ef.value, value_key=f_k)} - exclusions[ef.type] = [ - sh.multi_conditions(f'{JOURNEY_TYPES[ef.type]["column"]} != %({f_k})s', ef.value, is_not=True, - value_key=f_k)] - - meta_keys = None - for i, f in enumerate(data.series[0].filter.filters): - op = sh.get_sql_operator(f.operator) - is_any = sh.isAny_opreator(f.operator) - is_not = sh.is_negation_operator(f.operator) - is_undefined = sh.isUndefined_operator(f.operator) - f_k = f"f_value_{i}" - extra_values = {**extra_values, **sh.multi_values(f.value, value_key=f_k)} - - # ---- meta-filters - if f.type == schemas.FilterType.user_browser: - if is_any: - sessions_conditions.append('user_browser IS NOT NULL') - else: - sessions_conditions.append( - sh.multi_conditions(f'user_browser {op} %({f_k})s', f.value, is_not=is_not, value_key=f_k)) - - elif f.type in [schemas.FilterType.user_os]: - if is_any: - sessions_conditions.append('user_os IS NOT NULL') - else: - sessions_conditions.append( - sh.multi_conditions(f'user_os {op} %({f_k})s', f.value, is_not=is_not, value_key=f_k)) - - elif f.type in [schemas.FilterType.user_device]: - if is_any: - sessions_conditions.append('user_device IS NOT NULL') - else: - sessions_conditions.append( - sh.multi_conditions(f'user_device {op} %({f_k})s', f.value, is_not=is_not, value_key=f_k)) - - elif f.type in [schemas.FilterType.user_country]: - if is_any: - sessions_conditions.append('user_country IS NOT NULL') - else: - sessions_conditions.append( - sh.multi_conditions(f'user_country {op} %({f_k})s', f.value, is_not=is_not, value_key=f_k)) - - elif f.type == schemas.FilterType.user_city: - if is_any: - sessions_conditions.append('user_city IS NOT NULL') - else: - sessions_conditions.append( - sh.multi_conditions(f'user_city {op} %({f_k})s', f.value, is_not=is_not, value_key=f_k)) - - elif f.type == schemas.FilterType.user_state: - if is_any: - sessions_conditions.append('user_state IS NOT NULL') - else: - sessions_conditions.append( - sh.multi_conditions(f'user_state {op} %({f_k})s', f.value, is_not=is_not, value_key=f_k)) - - elif f.type in [schemas.FilterType.utm_source]: - if is_any: - sessions_conditions.append('utm_source IS NOT NULL') - elif is_undefined: - sessions_conditions.append('utm_source IS NULL') - else: - sessions_conditions.append( - sh.multi_conditions(f'utm_source {op} %({f_k})s::text', f.value, is_not=is_not, - value_key=f_k)) - - elif f.type in [schemas.FilterType.utm_medium]: - if is_any: - sessions_conditions.append('utm_medium IS NOT NULL') - elif is_undefined: - sessions_conditions.append('utm_medium IS NULL') - else: - sessions_conditions.append( - sh.multi_conditions(f'utm_medium {op} %({f_k})s::text', f.value, is_not=is_not, - value_key=f_k)) - - elif f.type in [schemas.FilterType.utm_campaign]: - if is_any: - sessions_conditions.append('utm_campaign IS NOT NULL') - elif is_undefined: - sessions_conditions.append('utm_campaign IS NULL') - else: - sessions_conditions.append( - sh.multi_conditions(f'utm_campaign {op} %({f_k})s::text', f.value, is_not=is_not, - value_key=f_k)) - - elif f.type == schemas.FilterType.duration: - if len(f.value) > 0 and f.value[0] is not None: - sessions_conditions.append("duration >= %(minDuration)s") - extra_values["minDuration"] = f.value[0] - if len(f.value) > 1 and f.value[1] is not None and int(f.value[1]) > 0: - sessions_conditions.append("duration <= %(maxDuration)s") - extra_values["maxDuration"] = f.value[1] - elif f.type == schemas.FilterType.referrer: - # extra_from += f"INNER JOIN {events.event_type.LOCATION.table} AS p USING(session_id)" - if is_any: - sessions_conditions.append('base_referrer IS NOT NULL') - else: - sessions_conditions.append( - sh.multi_conditions(f"base_referrer {op} %({f_k})s", f.value, is_not=is_not, - value_key=f_k)) - elif f.type == schemas.FilterType.metadata: - # get metadata list only if you need it - if meta_keys is None: - meta_keys = metadata.get(project_id=project_id) - meta_keys = {m["key"]: m["index"] for m in meta_keys} - if f.source in meta_keys.keys(): - if is_any: - sessions_conditions.append(f"{metadata.index_to_colname(meta_keys[f.source])} IS NOT NULL") - elif is_undefined: - sessions_conditions.append(f"{metadata.index_to_colname(meta_keys[f.source])} IS NULL") - else: - sessions_conditions.append( - sh.multi_conditions( - f"{metadata.index_to_colname(meta_keys[f.source])} {op} %({f_k})s::text", - f.value, is_not=is_not, value_key=f_k)) - - elif f.type in [schemas.FilterType.user_id, schemas.FilterType.user_id_ios]: - if is_any: - sessions_conditions.append('user_id IS NOT NULL') - elif is_undefined: - sessions_conditions.append('user_id IS NULL') - else: - sessions_conditions.append( - sh.multi_conditions(f"s.user_id {op} %({f_k})s::text", f.value, is_not=is_not, - value_key=f_k)) - - elif f.type in [schemas.FilterType.user_anonymous_id, - schemas.FilterType.user_anonymous_id_ios]: - if is_any: - sessions_conditions.append('user_anonymous_id IS NOT NULL') - elif is_undefined: - sessions_conditions.append('user_anonymous_id IS NULL') - else: - sessions_conditions.append( - sh.multi_conditions(f"user_anonymous_id {op} %({f_k})s::text", f.value, is_not=is_not, - value_key=f_k)) - - elif f.type in [schemas.FilterType.rev_id, schemas.FilterType.rev_id_ios]: - if is_any: - sessions_conditions.append('rev_id IS NOT NULL') - elif is_undefined: - sessions_conditions.append('rev_id IS NULL') - else: - sessions_conditions.append( - sh.multi_conditions(f"rev_id {op} %({f_k})s::text", f.value, is_not=is_not, value_key=f_k)) - - elif f.type == schemas.FilterType.platform: - # op = __ sh.get_sql_operator(f.operator) - sessions_conditions.append( - sh.multi_conditions(f"user_device_type {op} %({f_k})s", f.value, is_not=is_not, - value_key=f_k)) - - elif f.type == schemas.FilterType.issue: - if is_any: - sessions_conditions.append("array_length(issue_types, 1) > 0") - else: - sessions_conditions.append( - sh.multi_conditions(f"%({f_k})s {op} ANY (issue_types)", f.value, is_not=is_not, - value_key=f_k)) - - elif f.type == schemas.FilterType.events_count: - sessions_conditions.append( - sh.multi_conditions(f"events_count {op} %({f_k})s", f.value, is_not=is_not, - value_key=f_k)) - events_subquery = [] - for t in sub_events: - sub_events_conditions = ["e.timestamp >= %(startTimestamp)s", - "e.timestamp < %(endTimestamp)s"] + exclusions.get(t["eventType"], []) - events_subquery.append(f"""\ - SELECT session_id, {t["column"]} AS e_value, timestamp, '{t["eventType"]}' AS event_type - FROM {t["table"]} AS e - INNER JOIN sub_sessions USING (session_id) - WHERE {" AND ".join(sub_events_conditions)}""") - events_subquery = "\n UNION ALL \n".join(events_subquery) - - if reverse: - path_direction = "DESC" - else: - path_direction = "" - - if len(start_points_conditions) == 0: - start_points_join = """INNER JOIN - (SELECT event_type, e_value - FROM ranked_events - WHERE event_number_in_session = 1 - GROUP BY event_type, e_value - ORDER BY count(1) DESC - LIMIT 2 - ) AS top_start_events USING (event_type, e_value)""" - else: - start_points_conditions = ["(" + " OR ".join(start_points_conditions) + ")"] - start_points_conditions.append("event_number_in_session = 1") - - steps_query = ["""n1 AS (SELECT event_number_in_session, - event_type, - e_value, - next_type, - next_value, - time_to_next, - count(1) AS sessions_count - FROM ranked_events - INNER JOIN start_points USING (session_id) - WHERE event_number_in_session = 1 - GROUP BY event_number_in_session, event_type, e_value, next_type, next_value, time_to_next)"""] - projection_query = ["""(SELECT event_number_in_session, - event_type, - e_value, - next_type, - next_value, - sessions_count, - avg(time_to_next) AS avg_time_to_target - FROM n1 - GROUP BY event_number_in_session, event_type, e_value, next_type, next_value, sessions_count - ORDER BY event_number_in_session, event_type, e_value, next_type, next_value)"""] - for i in range(2, data.density): - steps_query.append(f"""n{i} AS (SELECT * - FROM (SELECT re.event_number_in_session, - re.event_type, - re.e_value, - re.next_type, - re.next_value, - re.time_to_next, - count(1) AS sessions_count - FROM ranked_events AS re - INNER JOIN start_points USING (session_id) - INNER JOIN n{i - 1} ON (n{i - 1}.next_value = re.e_value) - WHERE re.event_number_in_session = {i} - GROUP BY re.event_number_in_session, re.event_type, re.e_value, re.next_type, re.next_value, - re.time_to_next) AS sub_level - ORDER BY sessions_count DESC - LIMIT %(eventThresholdNumberInGroup)s)""") - projection_query.append(f"""(SELECT event_number_in_session, - event_type, - e_value, - next_type, - next_value, - sessions_count, - avg(time_to_next) AS avg_time_to_target - FROM n{i} - GROUP BY event_number_in_session, event_type, e_value, next_type, next_value, sessions_count - ORDER BY event_number_in_session, event_type, e_value, next_type, next_value)""") - - with pg_client.PostgresClient() as cur: - pg_query = f"""\ -WITH sub_sessions AS ( SELECT session_id - FROM public.sessions - WHERE {" AND ".join(sessions_conditions)}), - sub_events AS ({events_subquery}), - ranked_events AS (SELECT * - FROM (SELECT session_id, - event_type, - e_value, - row_number() OVER (PARTITION BY session_id ORDER BY timestamp {path_direction}) AS event_number_in_session, - LEAD(e_value, 1) OVER (PARTITION BY session_id ORDER BY timestamp {path_direction}) AS next_value, - LEAD(event_type, 1) OVER (PARTITION BY session_id ORDER BY timestamp {path_direction}) AS next_type, - abs(LEAD(timestamp, 1) OVER (PARTITION BY session_id ORDER BY timestamp {path_direction}) - - timestamp) AS time_to_next - FROM sub_events) AS full_ranked_events - WHERE event_number_in_session < %(density)s - ), - start_points AS (SELECT session_id - FROM ranked_events {start_points_join} - WHERE {" AND ".join(start_points_conditions)}), - {",".join(steps_query)} -{"UNION ALL".join(projection_query)};""" - params = {"project_id": project_id, "startTimestamp": data.startTimestamp, - "endTimestamp": data.endTimestamp, "density": data.density, - "eventThresholdNumberInGroup": 8 if data.hide_excess else 6, - # TODO: add if data=args is required - # **__get_constraint_values(args), - **extra_values} - query = cur.mogrify(pg_query, params) - _now = time() - - cur.execute(query) - if time() - _now > 2: - print(f">>>>>>>>>PathAnalysis long query ({int(time() - _now)}s)<<<<<<<<<") - print("----------------------") - print(query) - print("----------------------") - rows = cur.fetchall() - - return __transform_journey2(rows=rows, reverse_path=reverse) - - # query: Q3, the result is correct, # startPoints are computed before ranked_events to reduce the number of window functions over rows # replaced time_to_target by time_from_previous @@ -642,7 +332,7 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis): count(1) AS sessions_count FROM ranked_events INNER JOIN start_points USING (session_id) - WHERE event_number_in_session = 1 + WHERE event_number_in_session = 1 AND next_value IS NOT NULL GROUP BY event_number_in_session, event_type, e_value, next_type, next_value, time_from_previous)"""] projection_query = ["""(SELECT event_number_in_session, event_type, diff --git a/api/schemas/schemas.py b/api/schemas/schemas.py index 1c0e1c2dc..48867e78e 100644 --- a/api/schemas/schemas.py +++ b/api/schemas/schemas.py @@ -1262,6 +1262,7 @@ class CardInsights(__CardSchema): # class CardPathAnalysisSchema(BaseModel): class CardPathAnalysisSchema(CardSessionsSchema): + name: Optional[str] = Field(default=None) filter: PathAnalysisSchema = Field(...) density: int = Field(default=4, ge=2, le=10) diff --git a/ee/api/chalicelib/core/product_analytics.py b/ee/api/chalicelib/core/product_analytics.py index a45ebaa18..0e2668105 100644 --- a/ee/api/chalicelib/core/product_analytics.py +++ b/ee/api/chalicelib/core/product_analytics.py @@ -25,7 +25,6 @@ def __transform_journey2(rows, reverse_path=False): target = f"{r['event_number_in_session'] + 1}_{r['next_type']}_{r['next_value']}" if target not in nodes: nodes.append(target) - # TODO: remove this after UI supports long values nodes_values.append({"name": r['next_value'], "eventType": r['next_type']}) link = {"eventType": r['event_type'], "value": r["sessions_count"], "avgTimeToTarget": r["avg_time_to_target"]} @@ -41,6 +40,47 @@ def __transform_journey2(rows, reverse_path=False): "links": sorted(links, key=lambda x: (x["source"], x["target"]), reverse=False)} +def __transform_journey3(rows, reverse_path=False): + total_100p = 0 + number_of_step1 = 0 + for r in rows: + if r["event_number_in_session"] > 1: + break + number_of_step1 += 1 + total_100p += r["sessions_count"] + for i in range(number_of_step1): + rows[i]["value"] = round(number=100 / number_of_step1, ndigits=2) + + for i in range(number_of_step1, len(rows)): + rows[i]["value"] = round(number=rows[i]["sessions_count"] * 100 / total_100p, ndigits=2) + + nodes = [] + nodes_values = [] + links = [] + for r in rows: + source = f"{r['event_number_in_session']}_{r['event_type']}_{r['e_value']}" + if source not in nodes: + nodes.append(source) + nodes_values.append({"name": r['e_value'], "eventType": r['event_type']}) + if r['next_value']: + target = f"{r['event_number_in_session'] + 1}_{r['next_type']}_{r['next_value']}" + if target not in nodes: + nodes.append(target) + nodes_values.append({"name": r['next_value'], "eventType": r['next_type']}) + link = {"eventType": r['event_type'], "sessionsCount": r["sessions_count"], + "value": r["value"], "avgTimeFromPrevious": r["avg_time_from_previous"]} + if not reverse_path: + link["source"] = nodes.index(source) + link["target"] = nodes.index(target) + else: + link["source"] = nodes.index(target) + link["target"] = nodes.index(source) + links.append(link) + + return {"nodes": nodes_values, + "links": sorted(links, key=lambda x: (x["source"], x["target"]), reverse=False)} + + JOURNEY_TYPES = { schemas.ProductAnalyticsSelectedEventType.location: {"eventType": "LOCATION", "column": "url_path"}, schemas.ProductAnalyticsSelectedEventType.click: {"eventType": "CLICK", "column": "label"}, @@ -49,6 +89,9 @@ JOURNEY_TYPES = { } +# query: Q3, the result is correct, +# startPoints are computed before ranked_events to reduce the number of window functions over rows +# replaced time_to_target by time_from_previous def path_analysis(project_id: int, data: schemas.CardPathAnalysis): sub_events = [] start_points_conditions = [] @@ -80,7 +123,7 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis): + ")") exclusions = {} - for i, sf in enumerate(data.exclude): + for i, sf in enumerate(data.excludes): if sf.type in data.metric_value: f_k = f"exclude_{i}" extra_values = {**extra_values, **sh.multi_values(sf.value, value_key=f_k)} @@ -98,6 +141,9 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis): f_k = f"f_value_{i}" extra_values = {**extra_values, **sh.multi_values(f.value, value_key=f_k)} + if not is_any and len(f.value) == 0: + continue + # ---- meta-filters if f.type == schemas.FilterType.user_browser: if is_any: @@ -274,23 +320,21 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis): INNER JOIN events USING (session_id)""" if len(start_points_conditions) == 0: start_points_subquery = """SELECT DISTINCT session_id - FROM (SELECT event_type, e_value - FROM full_ranked_events - WHERE event_number_in_session = 1 - GROUP BY event_type, e_value - ORDER BY count(1) DESC - LIMIT 2) AS top_start_events - INNER JOIN full_ranked_events - ON (top_start_events.event_type = full_ranked_events.event_type AND - top_start_events.e_value = full_ranked_events.e_value AND - full_ranked_events.event_number_in_session = 1 AND - isNotNull(next_value))""" + FROM (SELECT event_type, e_value + FROM pre_ranked_events + WHERE event_number_in_session = 1 + GROUP BY event_type, e_value + ORDER BY count(1) DESC + LIMIT 1) AS top_start_events + INNER JOIN pre_ranked_events + ON (top_start_events.event_type = pre_ranked_events.event_type AND + top_start_events.e_value = pre_ranked_events.e_value) + WHERE pre_ranked_events.event_number_in_session = 1""" else: start_points_conditions = ["(" + " OR ".join(start_points_conditions) + ")", - "event_number_in_session = 1", - "isNotNull(next_value)"] + "event_number_in_session = 1"] start_points_subquery = f"""SELECT DISTINCT session_id - FROM full_ranked_events + FROM pre_ranked_events WHERE {" AND ".join(start_points_conditions)}""" del start_points_conditions if reverse: @@ -298,7 +342,56 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis): else: path_direction = "" + steps_query = ["""n1 AS (SELECT event_number_in_session, + event_type, + e_value, + next_type, + next_value, + time_from_previous, + count(1) AS sessions_count + FROM ranked_events + WHERE event_number_in_session = 1 + AND isNotNull(next_value) + GROUP BY event_number_in_session, event_type, e_value, next_type, next_value, time_from_previous + LIMIT 5)"""] + projection_query = ["""SELECT event_number_in_session, + event_type, + e_value, + next_type, + next_value, + sessions_count, + avg(time_from_previous) AS avg_time_from_previous + FROM n1 + GROUP BY event_number_in_session, event_type, e_value, next_type, next_value, sessions_count"""] + for i in range(2, data.density): + steps_query.append(f"""n{i} AS (SELECT * + FROM (SELECT re.event_number_in_session AS event_number_in_session, + re.event_type AS event_type, + re.e_value AS e_value, + re.next_type AS next_type, + re.next_value AS next_value, + re.time_from_previous AS time_from_previous, + count(1) AS sessions_count + FROM n{i - 1} INNER JOIN ranked_events AS re + ON (n{i - 1}.next_value = re.e_value AND n{i - 1}.next_type = re.event_type) + WHERE re.event_number_in_session = {i} + GROUP BY re.event_number_in_session, re.event_type, re.e_value, re.next_type, re.next_value, + re.time_from_previous) AS sub_level + ORDER BY sessions_count DESC + LIMIT %(eventThresholdNumberInGroup)s)""") + projection_query.append(f"""SELECT event_number_in_session, + event_type, + e_value, + next_type, + next_value, + sessions_count, + avg(time_from_previous) AS avg_time_from_previous + FROM n{i} + GROUP BY event_number_in_session, event_type, e_value, next_type, next_value, sessions_count""") + with ch_client.ClickHouseClient(database="experimental") as ch: + time_key = TimeUTC.now() + _now = time() ch_query = f"""\ WITH full_ranked_events AS (SELECT session_id, event_type, @@ -342,19 +435,70 @@ FROM (SELECT * GROUP BY event_number_in_session, event_type, e_value, next_type, next_value, sessions_count ORDER BY event_number_in_session, e_value, next_value;""" params = {"project_id": project_id, "startTimestamp": data.startTimestamp, - "endTimestamp": data.endTimestamp, - # **__get_constraint_values(args), + "endTimestamp": data.endTimestamp, "density": data.density, + "eventThresholdNumberInGroup": 6 if data.hide_excess else 8, **extra_values} - _now = time() - rows = ch.execute(query=ch_query, params=params) - if time() - _now > 0: + ch_query1 = f"""\ +CREATE TEMPORARY TABLE pre_ranked_events_{time_key} AS +WITH pre_ranked_events AS (SELECT * + FROM (SELECT session_id, + event_type, + datetime, + {main_column} AS e_value, + row_number() OVER (PARTITION BY session_id + ORDER BY datetime {path_direction}, + message_id {path_direction} ) AS event_number_in_session + FROM {main_table} + WHERE {" AND ".join(ch_sub_query)} + ) AS full_ranked_events + WHERE event_number_in_session < 4) +SELECT * +FROM pre_ranked_events;""" + ch.execute(query=ch_query1, params=params) + + ch_query2 = f"""\ +CREATE TEMPORARY TABLE ranked_events_{time_key} AS +WITH pre_ranked_events AS (SELECT * + FROM pre_ranked_events_{time_key}), + start_points AS ({start_points_subquery}), + ranked_events AS (SELECT pre_ranked_events.*, + leadInFrame(e_value) + OVER (PARTITION BY session_id ORDER BY datetime {path_direction} + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS next_value, + leadInFrame(toNullable(event_type)) + OVER (PARTITION BY session_id ORDER BY datetime {path_direction} + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS next_type, + abs(lagInFrame(toNullable(datetime)) + OVER (PARTITION BY session_id ORDER BY datetime {path_direction} + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) + - pre_ranked_events.datetime) AS time_from_previous + FROM start_points INNER JOIN pre_ranked_events USING (session_id)) +SELECT * +FROM ranked_events;""" + ch.execute(query=ch_query2, params=params) + + ch_query3 = f"""\ +WITH ranked_events AS (SELECT * + FROM ranked_events_{time_key}), + {",".join(steps_query)} +SELECT * +FROM ({" UNION ALL ".join(projection_query)}) AS chart_steps +ORDER BY event_number_in_session;""" + rows = ch.execute(query=ch_query3, params=params) + + if time() - _now > 2: print(f">>>>>>>>>PathAnalysis long query EE ({int(time() - _now)}s)<<<<<<<<<") print("----------------------") - print(print(ch.format(ch_query, params))) + print("---------Q1-----------") + print(ch.format(ch_query1, params)) + print("---------Q2-----------") + print(ch.format(ch_query2, params)) + print("---------Q3-----------") + print(ch.format(ch_query3, params)) print("----------------------") - return __transform_journey2(rows=rows, reverse_path=reverse) + return __transform_journey3(rows=rows, reverse_path=reverse) # # def __compute_weekly_percentage(rows):