* refactor(chalice): code cleaning

* refactor(chalice): user journey use new DB structure
This commit is contained in:
Kraiem Taha Yassine 2025-01-28 13:39:11 +01:00 committed by GitHub
parent 3038fe58d0
commit 312db29d23
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,7 +1,8 @@
from typing import List
import schemas
from chalicelib.core.metrics.metrics_ch import __get_basic_constraints, __get_meta_constraint
from chalicelib.core.metrics.metrics_ch import __get_basic_constraints, __get_meta_constraint, \
__get_basic_constraints_events
from chalicelib.core.metrics.metrics_ch import __get_constraint_values, __complete_missing_steps
from chalicelib.utils import ch_client, exp_ch_helper
from chalicelib.utils import helper, dev
@ -16,14 +17,13 @@ from chalicelib.core.metrics.product_analytics import __transform_journey
logger = logging.getLogger(__name__)
JOURNEY_TYPES = {
schemas.ProductAnalyticsSelectedEventType.LOCATION: {"eventType": "LOCATION", "column": "url_path"},
schemas.ProductAnalyticsSelectedEventType.CLICK: {"eventType": "CLICK", "column": "label"},
schemas.ProductAnalyticsSelectedEventType.INPUT: {"eventType": "INPUT", "column": "label"},
schemas.ProductAnalyticsSelectedEventType.CUSTOM_EVENT: {"eventType": "CUSTOM", "column": "name"}
schemas.ProductAnalyticsSelectedEventType.LOCATION: {"eventType": "LOCATION", "column": "`$properties`.url_path"},
schemas.ProductAnalyticsSelectedEventType.CLICK: {"eventType": "CLICK", "column": "`$properties`.label"},
schemas.ProductAnalyticsSelectedEventType.INPUT: {"eventType": "INPUT", "column": "`$properties`.label"},
schemas.ProductAnalyticsSelectedEventType.CUSTOM_EVENT: {"eventType": "CUSTOM", "column": "`$properties`.name"}
}
# startPoints are computed before ranked_events to reduce the number of window functions over rows
# compute avg_time_from_previous at the same level as sessions_count (this was removed in v1.22)
# if start-point is selected, the selected event is ranked n°1
@ -49,8 +49,8 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
sub_events.append({"column": JOURNEY_TYPES[s.type]["column"],
"eventType": JOURNEY_TYPES[s.type]["eventType"]})
step_1_post_conditions.append(
f"(event_type='{JOURNEY_TYPES[s.type]["eventType"]}' AND event_number_in_session = 1 \
OR event_type!='{JOURNEY_TYPES[s.type]["eventType"]}' AND event_number_in_session > 1)")
f"(`$event_name`='{JOURNEY_TYPES[s.type]["eventType"]}' AND event_number_in_session = 1 \
OR `$event_name`!='{JOURNEY_TYPES[s.type]["eventType"]}' AND event_number_in_session > 1)")
extra_metric_values.append(s.type)
data.metric_value += extra_metric_values
@ -63,7 +63,7 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
main_column = sub_events[0]['column']
else:
main_column = f"multiIf(%s,%s)" % (
','.join([f"event_type='{s['eventType']}',{s['column']}" for s in sub_events[:-1]]),
','.join([f"`$event_name`='{s['eventType']}',{s['column']}" for s in sub_events[:-1]]),
sub_events[-1]["column"])
extra_values = {}
reverse = data.start_type == "end"
@ -76,19 +76,19 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
event_type = JOURNEY_TYPES[sf.type]['eventType']
extra_values = {**extra_values, **sh.multi_values(sf.value, value_key=f_k),
f"start_event_type_{i}": event_type}
start_points_conditions.append(f"(event_type=%(start_event_type_{i})s AND " +
start_points_conditions.append(f"(`$event_name`=%(start_event_type_{i})s AND " +
sh.multi_conditions(f'{event_column} {op} %({f_k})s', sf.value, is_not=is_not,
value_key=f_k)
+ ")")
step_0_conditions.append(f"(event_type=%(start_event_type_{i})s AND " +
step_0_conditions.append(f"(`$event_name`=%(start_event_type_{i})s AND " +
sh.multi_conditions(f'e_value {op} %({f_k})s', sf.value, is_not=is_not,
value_key=f_k)
+ ")")
if len(start_points_conditions) > 0:
start_points_conditions = ["(" + " OR ".join(start_points_conditions) + ")",
"events.project_id = toUInt16(%(project_id)s)",
"events.datetime >= toDateTime(%(startTimestamp)s / 1000)",
"events.datetime < toDateTime(%(endTimestamp)s / 1000)"]
"events.created_at >= toDateTime(%(startTimestamp)s / 1000)",
"events.created_at < toDateTime(%(endTimestamp)s / 1000)"]
step_0_conditions = ["(" + " OR ".join(step_0_conditions) + ")",
"pre_ranked_events.event_number_in_session = 1"]
@ -277,10 +277,11 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
else:
path_direction = ""
ch_sub_query = __get_basic_constraints(table_name="events")
# ch_sub_query = __get_basic_constraints(table_name="events")
ch_sub_query = __get_basic_constraints_events(table_name="events")
selected_event_type_sub_query = []
for s in data.metric_value:
selected_event_type_sub_query.append(f"events.event_type = '{JOURNEY_TYPES[s]['eventType']}'")
selected_event_type_sub_query.append(f"events.`$event_name` = '{JOURNEY_TYPES[s]['eventType']}'")
if s in exclusions:
selected_event_type_sub_query[-1] += " AND (" + " AND ".join(exclusions[s]) + ")"
selected_event_type_sub_query = " OR ".join(selected_event_type_sub_query)
@ -303,14 +304,14 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
if len(start_points_conditions) == 0:
step_0_subquery = """SELECT DISTINCT session_id
FROM (SELECT event_type, e_value
FROM (SELECT `$event_name`, e_value
FROM pre_ranked_events
WHERE event_number_in_session = 1
GROUP BY event_type, e_value
GROUP BY `$event_name`, e_value
ORDER BY count(1) DESC
LIMIT 1) AS top_start_events
INNER JOIN pre_ranked_events
ON (top_start_events.event_type = pre_ranked_events.event_type AND
ON (top_start_events.`$event_name` = pre_ranked_events.`$event_name` AND
top_start_events.e_value = pre_ranked_events.e_value)
WHERE pre_ranked_events.event_number_in_session = 1"""
initial_event_cte = ""
@ -319,11 +320,11 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
FROM pre_ranked_events
WHERE {" AND ".join(step_0_conditions)}"""
initial_event_cte = f"""\
initial_event AS (SELECT events.session_id, MIN(datetime) AS start_event_timestamp
initial_event AS (SELECT events.session_id, MIN(created_at) AS start_event_timestamp
FROM {main_events_table} {"INNER JOIN sub_sessions USING (session_id)" if len(sessions_conditions) > 0 else ""}
WHERE {" AND ".join(start_points_conditions)}
GROUP BY 1),"""
ch_sub_query.append("events.datetime>=initial_event.start_event_timestamp")
ch_sub_query.append("events.created_at>=initial_event.start_event_timestamp")
main_events_table += " INNER JOIN initial_event ON (events.session_id = initial_event.session_id)"
sessions_conditions = []
@ -336,19 +337,19 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
other_query = []
for i in range(1, data.density + (0 if data.hide_excess else 1)):
steps_query.append(f"""n{i} AS (SELECT event_number_in_session,
event_type,
`$event_name`,
e_value,
next_type,
next_value,
COUNT(1) AS sessions_count
FROM ranked_events
WHERE event_number_in_session = {i}
GROUP BY event_number_in_session, event_type, e_value, next_type, next_value
GROUP BY event_number_in_session, `$event_name`, e_value, next_type, next_value
ORDER BY sessions_count DESC)""")
if data.hide_excess:
projection_query.append(f"""\
SELECT event_number_in_session,
event_type,
`$event_name`,
e_value,
next_type,
next_value,
@ -358,17 +359,17 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
else:
top_query.append(f"""\
SELECT event_number_in_session,
event_type,
`$event_name`,
e_value,
SUM(sessions_count) AS sessions_count
FROM n{i}
GROUP BY event_number_in_session, event_type, e_value
GROUP BY event_number_in_session, `$event_name`, e_value
ORDER BY sessions_count DESC
LIMIT %(visibleRows)s""")
if i < data.density:
drop_query.append(f"""SELECT event_number_in_session,
event_type,
`$event_name`,
e_value,
'DROP' AS next_type,
NULL AS next_value,
@ -381,14 +382,14 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
FROM n{i}
INNER JOIN top_n
ON (n{i}.event_number_in_session = top_n.event_number_in_session
AND n{i}.event_type = top_n.event_type
AND n{i}.`$event_name` = top_n.`$event_name`
AND n{i}.e_value = top_n.e_value)""")
if i > 1 and not data.hide_excess:
other_query.append(f"""SELECT n{i}.*
FROM n{i}
WHERE (event_number_in_session, event_type, e_value) NOT IN
(SELECT event_number_in_session, event_type, e_value
WHERE (event_number_in_session, `$event_name`, e_value) NOT IN
(SELECT event_number_in_session, `$event_name`, e_value
FROM top_n
WHERE top_n.event_number_in_session = {i})""")
@ -406,12 +407,12 @@ WITH {initial_sessions_cte}
{initial_event_cte}
pre_ranked_events AS (SELECT *
FROM (SELECT session_id,
event_type,
datetime,
{main_column} AS e_value,
`$event_name`,
created_at,
toString({main_column}) AS e_value,
row_number() OVER (PARTITION BY session_id
ORDER BY datetime {path_direction},
message_id {path_direction} ) AS event_number_in_session
ORDER BY created_at {path_direction},
event_id {path_direction} ) AS event_number_in_session
FROM {main_events_table} {"INNER JOIN sub_sessions ON (sub_sessions.session_id = events.session_id)" if len(sessions_conditions) > 0 else ""}
WHERE {" AND ".join(ch_sub_query)}
) AS full_ranked_events
@ -433,10 +434,10 @@ WITH pre_ranked_events AS (SELECT *
start_points AS ({step_0_subquery}),
ranked_events AS (SELECT pre_ranked_events.*,
leadInFrame(e_value)
OVER (PARTITION BY session_id ORDER BY datetime {path_direction}
OVER (PARTITION BY session_id ORDER BY created_at {path_direction}
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS next_value,
leadInFrame(toNullable(event_type))
OVER (PARTITION BY session_id ORDER BY datetime {path_direction}
leadInFrame(toNullable(`$event_name`))
OVER (PARTITION BY session_id ORDER BY created_at {path_direction}
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS next_type
FROM start_points INNER JOIN pre_ranked_events USING (session_id))
SELECT *
@ -461,37 +462,37 @@ FROM ranked_events;"""
FROM top_n_with_next
INNER JOIN top_n
ON (top_n_with_next.event_number_in_session + 1 = top_n.event_number_in_session
AND top_n_with_next.next_type = top_n.event_type
AND top_n_with_next.next_type = top_n.`$event_name`
AND top_n_with_next.next_value = top_n.e_value)
UNION ALL
-- Top to Others: valid
SELECT top_n_with_next.event_number_in_session,
top_n_with_next.event_type,
top_n_with_next.`$event_name`,
top_n_with_next.e_value,
'OTHER' AS next_type,
NULL AS next_value,
SUM(top_n_with_next.sessions_count) AS sessions_count
FROM top_n_with_next
WHERE (top_n_with_next.event_number_in_session + 1, top_n_with_next.next_type, top_n_with_next.next_value) IN
(SELECT others_n.event_number_in_session, others_n.event_type, others_n.e_value FROM others_n)
GROUP BY top_n_with_next.event_number_in_session, top_n_with_next.event_type, top_n_with_next.e_value
(SELECT others_n.event_number_in_session, others_n.`$event_name`, others_n.e_value FROM others_n)
GROUP BY top_n_with_next.event_number_in_session, top_n_with_next.`$event_name`, top_n_with_next.e_value
UNION ALL
-- Top go to Drop: valid
SELECT drop_n.event_number_in_session,
drop_n.event_type,
drop_n.`$event_name`,
drop_n.e_value,
drop_n.next_type,
drop_n.next_value,
drop_n.sessions_count
FROM drop_n
INNER JOIN top_n ON (drop_n.event_number_in_session = top_n.event_number_in_session
AND drop_n.event_type = top_n.event_type
AND drop_n.`$event_name` = top_n.`$event_name`
AND drop_n.e_value = top_n.e_value)
ORDER BY drop_n.event_number_in_session
UNION ALL
-- Others got to Drop: valid
SELECT others_n.event_number_in_session,
'OTHER' AS event_type,
'OTHER' AS `$event_name`,
NULL AS e_value,
'DROP' AS next_type,
NULL AS next_value,
@ -503,7 +504,7 @@ FROM ranked_events;"""
UNION ALL
-- Others got to Top:valid
SELECT others_n.event_number_in_session,
'OTHER' AS event_type,
'OTHER' AS `$event_name`,
NULL AS e_value,
others_n.next_type,
others_n.next_value,
@ -511,12 +512,12 @@ FROM ranked_events;"""
FROM others_n
WHERE isNotNull(others_n.next_type)
AND (others_n.event_number_in_session + 1, others_n.next_type, others_n.next_value) IN
(SELECT top_n.event_number_in_session, top_n.event_type, top_n.e_value FROM top_n)
(SELECT top_n.event_number_in_session, top_n.`$event_name`, top_n.e_value FROM top_n)
GROUP BY others_n.event_number_in_session, others_n.next_type, others_n.next_value
UNION ALL
-- Others got to Others
SELECT others_n.event_number_in_session,
'OTHER' AS event_type,
'OTHER' AS `$event_name`,
NULL AS e_value,
'OTHER' AS next_type,
NULL AS next_value,
@ -525,12 +526,12 @@ FROM ranked_events;"""
WHERE isNotNull(others_n.next_type)
AND others_n.event_number_in_session < %(density)s
AND (others_n.event_number_in_session + 1, others_n.next_type, others_n.next_value) NOT IN
(SELECT event_number_in_session, event_type, e_value FROM top_n)
(SELECT event_number_in_session, `$event_name`, e_value FROM top_n)
GROUP BY others_n.event_number_in_session"""
else:
projection_query.append("""\
SELECT event_number_in_session,
event_type,
`$event_name`,
e_value,
next_type,
next_value,