From 312db29d2373129202a63cf0642bffdb963c2024 Mon Sep 17 00:00:00 2001 From: Kraiem Taha Yassine Date: Tue, 28 Jan 2025 13:39:11 +0100 Subject: [PATCH] Dev (#2979) * refactor(chalice): code cleaning * refactor(chalice): user journey use new DB structure --- .../core/metrics/product_analytics_ch.py | 99 ++++++++++--------- 1 file changed, 50 insertions(+), 49 deletions(-) diff --git a/api/chalicelib/core/metrics/product_analytics_ch.py b/api/chalicelib/core/metrics/product_analytics_ch.py index d6059acb9..5164f6f9e 100644 --- a/api/chalicelib/core/metrics/product_analytics_ch.py +++ b/api/chalicelib/core/metrics/product_analytics_ch.py @@ -1,7 +1,8 @@ from typing import List import schemas -from chalicelib.core.metrics.metrics_ch import __get_basic_constraints, __get_meta_constraint +from chalicelib.core.metrics.metrics_ch import __get_basic_constraints, __get_meta_constraint, \ + __get_basic_constraints_events from chalicelib.core.metrics.metrics_ch import __get_constraint_values, __complete_missing_steps from chalicelib.utils import ch_client, exp_ch_helper from chalicelib.utils import helper, dev @@ -16,14 +17,13 @@ from chalicelib.core.metrics.product_analytics import __transform_journey logger = logging.getLogger(__name__) JOURNEY_TYPES = { - schemas.ProductAnalyticsSelectedEventType.LOCATION: {"eventType": "LOCATION", "column": "url_path"}, - schemas.ProductAnalyticsSelectedEventType.CLICK: {"eventType": "CLICK", "column": "label"}, - schemas.ProductAnalyticsSelectedEventType.INPUT: {"eventType": "INPUT", "column": "label"}, - schemas.ProductAnalyticsSelectedEventType.CUSTOM_EVENT: {"eventType": "CUSTOM", "column": "name"} + schemas.ProductAnalyticsSelectedEventType.LOCATION: {"eventType": "LOCATION", "column": "`$properties`.url_path"}, + schemas.ProductAnalyticsSelectedEventType.CLICK: {"eventType": "CLICK", "column": "`$properties`.label"}, + schemas.ProductAnalyticsSelectedEventType.INPUT: {"eventType": "INPUT", "column": "`$properties`.label"}, + schemas.ProductAnalyticsSelectedEventType.CUSTOM_EVENT: {"eventType": "CUSTOM", "column": "`$properties`.name"} } - # startPoints are computed before ranked_events to reduce the number of window functions over rows # compute avg_time_from_previous at the same level as sessions_count (this was removed in v1.22) # if start-point is selected, the selected event is ranked n°1 @@ -49,8 +49,8 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis): sub_events.append({"column": JOURNEY_TYPES[s.type]["column"], "eventType": JOURNEY_TYPES[s.type]["eventType"]}) step_1_post_conditions.append( - f"(event_type='{JOURNEY_TYPES[s.type]["eventType"]}' AND event_number_in_session = 1 \ - OR event_type!='{JOURNEY_TYPES[s.type]["eventType"]}' AND event_number_in_session > 1)") + f"(`$event_name`='{JOURNEY_TYPES[s.type]["eventType"]}' AND event_number_in_session = 1 \ + OR `$event_name`!='{JOURNEY_TYPES[s.type]["eventType"]}' AND event_number_in_session > 1)") extra_metric_values.append(s.type) data.metric_value += extra_metric_values @@ -63,7 +63,7 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis): main_column = sub_events[0]['column'] else: main_column = f"multiIf(%s,%s)" % ( - ','.join([f"event_type='{s['eventType']}',{s['column']}" for s in sub_events[:-1]]), + ','.join([f"`$event_name`='{s['eventType']}',{s['column']}" for s in sub_events[:-1]]), sub_events[-1]["column"]) extra_values = {} reverse = data.start_type == "end" @@ -76,19 +76,19 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis): event_type = JOURNEY_TYPES[sf.type]['eventType'] extra_values = {**extra_values, **sh.multi_values(sf.value, value_key=f_k), f"start_event_type_{i}": event_type} - start_points_conditions.append(f"(event_type=%(start_event_type_{i})s AND " + + start_points_conditions.append(f"(`$event_name`=%(start_event_type_{i})s AND " + sh.multi_conditions(f'{event_column} {op} %({f_k})s', sf.value, is_not=is_not, value_key=f_k) + ")") - step_0_conditions.append(f"(event_type=%(start_event_type_{i})s AND " + + step_0_conditions.append(f"(`$event_name`=%(start_event_type_{i})s AND " + sh.multi_conditions(f'e_value {op} %({f_k})s', sf.value, is_not=is_not, value_key=f_k) + ")") if len(start_points_conditions) > 0: start_points_conditions = ["(" + " OR ".join(start_points_conditions) + ")", "events.project_id = toUInt16(%(project_id)s)", - "events.datetime >= toDateTime(%(startTimestamp)s / 1000)", - "events.datetime < toDateTime(%(endTimestamp)s / 1000)"] + "events.created_at >= toDateTime(%(startTimestamp)s / 1000)", + "events.created_at < toDateTime(%(endTimestamp)s / 1000)"] step_0_conditions = ["(" + " OR ".join(step_0_conditions) + ")", "pre_ranked_events.event_number_in_session = 1"] @@ -277,10 +277,11 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis): else: path_direction = "" - ch_sub_query = __get_basic_constraints(table_name="events") + # ch_sub_query = __get_basic_constraints(table_name="events") + ch_sub_query = __get_basic_constraints_events(table_name="events") selected_event_type_sub_query = [] for s in data.metric_value: - selected_event_type_sub_query.append(f"events.event_type = '{JOURNEY_TYPES[s]['eventType']}'") + selected_event_type_sub_query.append(f"events.`$event_name` = '{JOURNEY_TYPES[s]['eventType']}'") if s in exclusions: selected_event_type_sub_query[-1] += " AND (" + " AND ".join(exclusions[s]) + ")" selected_event_type_sub_query = " OR ".join(selected_event_type_sub_query) @@ -303,14 +304,14 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis): if len(start_points_conditions) == 0: step_0_subquery = """SELECT DISTINCT session_id - FROM (SELECT event_type, e_value + FROM (SELECT `$event_name`, e_value FROM pre_ranked_events WHERE event_number_in_session = 1 - GROUP BY event_type, e_value + GROUP BY `$event_name`, e_value ORDER BY count(1) DESC LIMIT 1) AS top_start_events INNER JOIN pre_ranked_events - ON (top_start_events.event_type = pre_ranked_events.event_type AND + ON (top_start_events.`$event_name` = pre_ranked_events.`$event_name` AND top_start_events.e_value = pre_ranked_events.e_value) WHERE pre_ranked_events.event_number_in_session = 1""" initial_event_cte = "" @@ -319,11 +320,11 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis): FROM pre_ranked_events WHERE {" AND ".join(step_0_conditions)}""" initial_event_cte = f"""\ - initial_event AS (SELECT events.session_id, MIN(datetime) AS start_event_timestamp + initial_event AS (SELECT events.session_id, MIN(created_at) AS start_event_timestamp FROM {main_events_table} {"INNER JOIN sub_sessions USING (session_id)" if len(sessions_conditions) > 0 else ""} WHERE {" AND ".join(start_points_conditions)} GROUP BY 1),""" - ch_sub_query.append("events.datetime>=initial_event.start_event_timestamp") + ch_sub_query.append("events.created_at>=initial_event.start_event_timestamp") main_events_table += " INNER JOIN initial_event ON (events.session_id = initial_event.session_id)" sessions_conditions = [] @@ -336,19 +337,19 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis): other_query = [] for i in range(1, data.density + (0 if data.hide_excess else 1)): steps_query.append(f"""n{i} AS (SELECT event_number_in_session, - event_type, + `$event_name`, e_value, next_type, next_value, COUNT(1) AS sessions_count FROM ranked_events WHERE event_number_in_session = {i} - GROUP BY event_number_in_session, event_type, e_value, next_type, next_value + GROUP BY event_number_in_session, `$event_name`, e_value, next_type, next_value ORDER BY sessions_count DESC)""") if data.hide_excess: projection_query.append(f"""\ SELECT event_number_in_session, - event_type, + `$event_name`, e_value, next_type, next_value, @@ -358,17 +359,17 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis): else: top_query.append(f"""\ SELECT event_number_in_session, - event_type, + `$event_name`, e_value, SUM(sessions_count) AS sessions_count FROM n{i} - GROUP BY event_number_in_session, event_type, e_value + GROUP BY event_number_in_session, `$event_name`, e_value ORDER BY sessions_count DESC LIMIT %(visibleRows)s""") if i < data.density: drop_query.append(f"""SELECT event_number_in_session, - event_type, + `$event_name`, e_value, 'DROP' AS next_type, NULL AS next_value, @@ -381,14 +382,14 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis): FROM n{i} INNER JOIN top_n ON (n{i}.event_number_in_session = top_n.event_number_in_session - AND n{i}.event_type = top_n.event_type + AND n{i}.`$event_name` = top_n.`$event_name` AND n{i}.e_value = top_n.e_value)""") if i > 1 and not data.hide_excess: other_query.append(f"""SELECT n{i}.* FROM n{i} - WHERE (event_number_in_session, event_type, e_value) NOT IN - (SELECT event_number_in_session, event_type, e_value + WHERE (event_number_in_session, `$event_name`, e_value) NOT IN + (SELECT event_number_in_session, `$event_name`, e_value FROM top_n WHERE top_n.event_number_in_session = {i})""") @@ -406,12 +407,12 @@ WITH {initial_sessions_cte} {initial_event_cte} pre_ranked_events AS (SELECT * FROM (SELECT session_id, - event_type, - datetime, - {main_column} AS e_value, + `$event_name`, + created_at, + toString({main_column}) AS e_value, row_number() OVER (PARTITION BY session_id - ORDER BY datetime {path_direction}, - message_id {path_direction} ) AS event_number_in_session + ORDER BY created_at {path_direction}, + event_id {path_direction} ) AS event_number_in_session FROM {main_events_table} {"INNER JOIN sub_sessions ON (sub_sessions.session_id = events.session_id)" if len(sessions_conditions) > 0 else ""} WHERE {" AND ".join(ch_sub_query)} ) AS full_ranked_events @@ -433,10 +434,10 @@ WITH pre_ranked_events AS (SELECT * start_points AS ({step_0_subquery}), ranked_events AS (SELECT pre_ranked_events.*, leadInFrame(e_value) - OVER (PARTITION BY session_id ORDER BY datetime {path_direction} + OVER (PARTITION BY session_id ORDER BY created_at {path_direction} ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS next_value, - leadInFrame(toNullable(event_type)) - OVER (PARTITION BY session_id ORDER BY datetime {path_direction} + leadInFrame(toNullable(`$event_name`)) + OVER (PARTITION BY session_id ORDER BY created_at {path_direction} ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS next_type FROM start_points INNER JOIN pre_ranked_events USING (session_id)) SELECT * @@ -461,37 +462,37 @@ FROM ranked_events;""" FROM top_n_with_next INNER JOIN top_n ON (top_n_with_next.event_number_in_session + 1 = top_n.event_number_in_session - AND top_n_with_next.next_type = top_n.event_type + AND top_n_with_next.next_type = top_n.`$event_name` AND top_n_with_next.next_value = top_n.e_value) UNION ALL -- Top to Others: valid SELECT top_n_with_next.event_number_in_session, - top_n_with_next.event_type, + top_n_with_next.`$event_name`, top_n_with_next.e_value, 'OTHER' AS next_type, NULL AS next_value, SUM(top_n_with_next.sessions_count) AS sessions_count FROM top_n_with_next WHERE (top_n_with_next.event_number_in_session + 1, top_n_with_next.next_type, top_n_with_next.next_value) IN - (SELECT others_n.event_number_in_session, others_n.event_type, others_n.e_value FROM others_n) - GROUP BY top_n_with_next.event_number_in_session, top_n_with_next.event_type, top_n_with_next.e_value + (SELECT others_n.event_number_in_session, others_n.`$event_name`, others_n.e_value FROM others_n) + GROUP BY top_n_with_next.event_number_in_session, top_n_with_next.`$event_name`, top_n_with_next.e_value UNION ALL -- Top go to Drop: valid SELECT drop_n.event_number_in_session, - drop_n.event_type, + drop_n.`$event_name`, drop_n.e_value, drop_n.next_type, drop_n.next_value, drop_n.sessions_count FROM drop_n INNER JOIN top_n ON (drop_n.event_number_in_session = top_n.event_number_in_session - AND drop_n.event_type = top_n.event_type + AND drop_n.`$event_name` = top_n.`$event_name` AND drop_n.e_value = top_n.e_value) ORDER BY drop_n.event_number_in_session UNION ALL -- Others got to Drop: valid SELECT others_n.event_number_in_session, - 'OTHER' AS event_type, + 'OTHER' AS `$event_name`, NULL AS e_value, 'DROP' AS next_type, NULL AS next_value, @@ -503,7 +504,7 @@ FROM ranked_events;""" UNION ALL -- Others got to Top:valid SELECT others_n.event_number_in_session, - 'OTHER' AS event_type, + 'OTHER' AS `$event_name`, NULL AS e_value, others_n.next_type, others_n.next_value, @@ -511,12 +512,12 @@ FROM ranked_events;""" FROM others_n WHERE isNotNull(others_n.next_type) AND (others_n.event_number_in_session + 1, others_n.next_type, others_n.next_value) IN - (SELECT top_n.event_number_in_session, top_n.event_type, top_n.e_value FROM top_n) + (SELECT top_n.event_number_in_session, top_n.`$event_name`, top_n.e_value FROM top_n) GROUP BY others_n.event_number_in_session, others_n.next_type, others_n.next_value UNION ALL -- Others got to Others SELECT others_n.event_number_in_session, - 'OTHER' AS event_type, + 'OTHER' AS `$event_name`, NULL AS e_value, 'OTHER' AS next_type, NULL AS next_value, @@ -525,12 +526,12 @@ FROM ranked_events;""" WHERE isNotNull(others_n.next_type) AND others_n.event_number_in_session < %(density)s AND (others_n.event_number_in_session + 1, others_n.next_type, others_n.next_value) NOT IN - (SELECT event_number_in_session, event_type, e_value FROM top_n) + (SELECT event_number_in_session, `$event_name`, e_value FROM top_n) GROUP BY others_n.event_number_in_session""" else: projection_query.append("""\ SELECT event_number_in_session, - event_type, + `$event_name`, e_value, next_type, next_value,