diff --git a/ee/api/chalicelib/core/product_analytics.py b/ee/api/chalicelib/core/product_analytics.py index 31fccccd9..331adfa46 100644 --- a/ee/api/chalicelib/core/product_analytics.py +++ b/ee/api/chalicelib/core/product_analytics.py @@ -79,6 +79,7 @@ JOURNEY_TYPES = { } +# Q6: use events as a sub_query to support filter of materialized columns when doing a join # query: Q5, the result is correct, # startPoints are computed before ranked_events to reduce the number of window functions over rows # replaced time_to_target by time_from_previous @@ -106,7 +107,6 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis): ','.join([f"event_type='{s['eventType']}',{s['column']}" for s in sub_events[:-1]]), sub_events[-1]["column"]) extra_values = {} - start_join = [] initial_event_cte = "" reverse = data.start_type == "end" for i, sf in enumerate(data.start_point): @@ -309,7 +309,6 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis): else: path_direction = "" - # ch_sub_query = __get_basic_constraints(table_name="experimental.events", data=data.model_dump()) ch_sub_query = __get_basic_constraints(table_name="events") selected_event_type_sub_query = [] for s in data.metric_value: @@ -319,6 +318,14 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis): selected_event_type_sub_query = " OR ".join(selected_event_type_sub_query) ch_sub_query.append(f"({selected_event_type_sub_query})") + if len(start_points_conditions) > 0: + start_points_conditions = ["(" + " OR ".join(start_points_conditions) + ")", + "event_number_in_session = 1"] + start_point_conditions = ["(" + " OR ".join(start_point_conditions) + ")", + "events.project_id = toUInt16(%(project_id)s)", + "events.datetime >= toDateTime(%(startTimestamp)s / 1000)", + "events.datetime < toDateTime(%(endTimestamp)s / 1000)"] + main_table = exp_ch_helper.get_main_events_table(data.startTimestamp) if len(sessions_conditions) > 0: sessions_conditions.append(f"sessions.project_id = toUInt16(%(project_id)s)") @@ -326,10 +333,17 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis): sessions_conditions.append(f"sessions.datetime < toDateTime(%(endTimestamp)s / 1000)") sessions_conditions.append("sessions.events_count>1") sessions_conditions.append("sessions.duration>0") - main_table = f"""(SELECT DISTINCT session_id - FROM sessions + initial_event_main_table = f"""(SELECT DISTINCT session_id + FROM {exp_ch_helper.get_main_sessions_table(data.startTimestamp)} WHERE {" AND ".join(sessions_conditions)}) AS sub_sessions - INNER JOIN events USING (session_id)""" + INNER JOIN (SELECT session_id, event_type, datetime, message_id, {main_column} + FROM {main_table} + WHERE {" AND ".join(start_point_conditions)} + ) AS sub_events ON (sub_sessions.session_id = sub_events.session_id)""" + else: + initial_event_main_table = f"""{main_table} + WHERE {" AND ".join(start_point_conditions)}""" + if len(start_points_conditions) == 0: start_points_subquery = """SELECT DISTINCT session_id FROM (SELECT event_type, e_value @@ -343,22 +357,15 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis): top_start_events.e_value = pre_ranked_events.e_value) WHERE pre_ranked_events.event_number_in_session = 1""" else: - start_points_conditions = ["(" + " OR ".join(start_points_conditions) + ")", - "event_number_in_session = 1"] - start_point_conditions = ["(" + " OR ".join(start_point_conditions) + ")", - "events.project_id = toUInt16(%(project_id)s)", - "events.datetime >= toDateTime(%(startTimestamp)s / 1000)", - "events.datetime < toDateTime(%(endTimestamp)s / 1000)"] start_points_subquery = f"""SELECT DISTINCT session_id FROM pre_ranked_events WHERE {" AND ".join(start_points_conditions)}""" initial_event_cte = f"""\ initial_event AS (SELECT session_id, MIN(datetime) AS start_event_timestamp - FROM {main_table} - WHERE {" AND ".join(start_point_conditions)} + FROM {initial_event_main_table} GROUP BY session_id),""" ch_sub_query.append("events.datetime>=initial_event.start_event_timestamp") - main_table += " INNER JOIN initial_event USING (session_id)" + main_table += " INNER JOIN initial_event ON (events.session_id = initial_event.session_id)" del start_points_conditions steps_query = ["""n1 AS (SELECT event_number_in_session, @@ -431,10 +438,10 @@ WITH {initial_event_cte} WHERE event_number_in_session <= %(density)s) SELECT * FROM pre_ranked_events;""" + logger.debug("---------Q1-----------") ch.execute(query=ch_query1, params=params) if time() - _now > 2: logger.warning(f">>>>>>>>>PathAnalysis long query EE ({int(time() - _now)}s)<<<<<<<<<") - logger.warning("---------Q1-----------") logger.warning(ch.format(ch_query1, params)) logger.warning("----------------------") _now = time() @@ -458,10 +465,10 @@ WITH pre_ranked_events AS (SELECT * FROM start_points INNER JOIN pre_ranked_events USING (session_id)) SELECT * FROM ranked_events;""" + logger.debug("---------Q2-----------") ch.execute(query=ch_query2, params=params) if time() - _now > 2: logger.warning(f">>>>>>>>>PathAnalysis long query EE ({int(time() - _now)}s)<<<<<<<<<") - logger.warning("---------Q2-----------") logger.warning(ch.format(ch_query2, params)) logger.warning("----------------------") _now = time() @@ -473,11 +480,10 @@ WITH ranked_events AS (SELECT * SELECT * FROM ({" UNION ALL ".join(projection_query)}) AS chart_steps ORDER BY event_number_in_session;""" + logger.debug("---------Q3-----------") rows = ch.execute(query=ch_query3, params=params) - if time() - _now > 2: logger.warning(f">>>>>>>>>PathAnalysis long query EE ({int(time() - _now)}s)<<<<<<<<<") - logger.warning("---------Q3-----------") logger.warning(ch.format(ch_query3, params)) logger.warning("----------------------")