Api v1.15.0 (#1685)

* fix(chalice): (EE) fixed Path Analysis breaking when specifying a session's filter in addition to a start-point

(cherry picked from commit 37b46696c5)
This commit is contained in:
Kraiem Taha Yassine 2023-11-20 17:57:07 +01:00 committed by Taha Yassine Kraiem
parent 7178fabbca
commit 05a07ab525

View file

@ -79,6 +79,7 @@ JOURNEY_TYPES = {
}
# Q6: use events as a sub_query to support filter of materialized columns when doing a join
# query: Q5, the result is correct,
# startPoints are computed before ranked_events to reduce the number of window functions over rows
# replaced time_to_target by time_from_previous
@ -106,7 +107,6 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
','.join([f"event_type='{s['eventType']}',{s['column']}" for s in sub_events[:-1]]),
sub_events[-1]["column"])
extra_values = {}
start_join = []
initial_event_cte = ""
reverse = data.start_type == "end"
for i, sf in enumerate(data.start_point):
@ -309,7 +309,6 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
else:
path_direction = ""
# ch_sub_query = __get_basic_constraints(table_name="experimental.events", data=data.model_dump())
ch_sub_query = __get_basic_constraints(table_name="events")
selected_event_type_sub_query = []
for s in data.metric_value:
@ -319,6 +318,14 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
selected_event_type_sub_query = " OR ".join(selected_event_type_sub_query)
ch_sub_query.append(f"({selected_event_type_sub_query})")
if len(start_points_conditions) > 0:
start_points_conditions = ["(" + " OR ".join(start_points_conditions) + ")",
"event_number_in_session = 1"]
start_point_conditions = ["(" + " OR ".join(start_point_conditions) + ")",
"events.project_id = toUInt16(%(project_id)s)",
"events.datetime >= toDateTime(%(startTimestamp)s / 1000)",
"events.datetime < toDateTime(%(endTimestamp)s / 1000)"]
main_table = exp_ch_helper.get_main_events_table(data.startTimestamp)
if len(sessions_conditions) > 0:
sessions_conditions.append(f"sessions.project_id = toUInt16(%(project_id)s)")
@ -326,10 +333,17 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
sessions_conditions.append(f"sessions.datetime < toDateTime(%(endTimestamp)s / 1000)")
sessions_conditions.append("sessions.events_count>1")
sessions_conditions.append("sessions.duration>0")
main_table = f"""(SELECT DISTINCT session_id
FROM sessions
initial_event_main_table = f"""(SELECT DISTINCT session_id
FROM {exp_ch_helper.get_main_sessions_table(data.startTimestamp)}
WHERE {" AND ".join(sessions_conditions)}) AS sub_sessions
INNER JOIN events USING (session_id)"""
INNER JOIN (SELECT session_id, event_type, datetime, message_id, {main_column}
FROM {main_table}
WHERE {" AND ".join(start_point_conditions)}
) AS sub_events ON (sub_sessions.session_id = sub_events.session_id)"""
else:
initial_event_main_table = f"""{main_table}
WHERE {" AND ".join(start_point_conditions)}"""
if len(start_points_conditions) == 0:
start_points_subquery = """SELECT DISTINCT session_id
FROM (SELECT event_type, e_value
@ -343,22 +357,15 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
top_start_events.e_value = pre_ranked_events.e_value)
WHERE pre_ranked_events.event_number_in_session = 1"""
else:
start_points_conditions = ["(" + " OR ".join(start_points_conditions) + ")",
"event_number_in_session = 1"]
start_point_conditions = ["(" + " OR ".join(start_point_conditions) + ")",
"events.project_id = toUInt16(%(project_id)s)",
"events.datetime >= toDateTime(%(startTimestamp)s / 1000)",
"events.datetime < toDateTime(%(endTimestamp)s / 1000)"]
start_points_subquery = f"""SELECT DISTINCT session_id
FROM pre_ranked_events
WHERE {" AND ".join(start_points_conditions)}"""
initial_event_cte = f"""\
initial_event AS (SELECT session_id, MIN(datetime) AS start_event_timestamp
FROM {main_table}
WHERE {" AND ".join(start_point_conditions)}
FROM {initial_event_main_table}
GROUP BY session_id),"""
ch_sub_query.append("events.datetime>=initial_event.start_event_timestamp")
main_table += " INNER JOIN initial_event USING (session_id)"
main_table += " INNER JOIN initial_event ON (events.session_id = initial_event.session_id)"
del start_points_conditions
steps_query = ["""n1 AS (SELECT event_number_in_session,
@ -431,10 +438,10 @@ WITH {initial_event_cte}
WHERE event_number_in_session <= %(density)s)
SELECT *
FROM pre_ranked_events;"""
logger.debug("---------Q1-----------")
ch.execute(query=ch_query1, params=params)
if time() - _now > 2:
logger.warning(f">>>>>>>>>PathAnalysis long query EE ({int(time() - _now)}s)<<<<<<<<<")
logger.warning("---------Q1-----------")
logger.warning(ch.format(ch_query1, params))
logger.warning("----------------------")
_now = time()
@ -458,10 +465,10 @@ WITH pre_ranked_events AS (SELECT *
FROM start_points INNER JOIN pre_ranked_events USING (session_id))
SELECT *
FROM ranked_events;"""
logger.debug("---------Q2-----------")
ch.execute(query=ch_query2, params=params)
if time() - _now > 2:
logger.warning(f">>>>>>>>>PathAnalysis long query EE ({int(time() - _now)}s)<<<<<<<<<")
logger.warning("---------Q2-----------")
logger.warning(ch.format(ch_query2, params))
logger.warning("----------------------")
_now = time()
@ -473,11 +480,10 @@ WITH ranked_events AS (SELECT *
SELECT *
FROM ({" UNION ALL ".join(projection_query)}) AS chart_steps
ORDER BY event_number_in_session;"""
logger.debug("---------Q3-----------")
rows = ch.execute(query=ch_query3, params=params)
if time() - _now > 2:
logger.warning(f">>>>>>>>>PathAnalysis long query EE ({int(time() - _now)}s)<<<<<<<<<")
logger.warning("---------Q3-----------")
logger.warning(ch.format(ch_query3, params))
logger.warning("----------------------")