* fix(chalice): fixed path-finder first step-type issue

* refactor(chalice): removed time between steps in path-finder
This commit is contained in:
Kraiem Taha Yassine 2025-01-08 16:09:53 +01:00 committed by GitHub
parent 80462e4534
commit 2291980a89
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 28 additions and 36 deletions

View file

@ -18,10 +18,7 @@ def __transform_journey(rows, reverse_path=False):
break
number_of_step1 += 1
total_100p += r["sessions_count"]
# for i in range(number_of_step1):
# rows[i]["value"] = 100 / number_of_step1
# for i in range(number_of_step1, len(rows)):
for i in range(len(rows)):
rows[i]["value"] = rows[i]["sessions_count"] * 100 / total_100p
@ -32,22 +29,17 @@ def __transform_journey(rows, reverse_path=False):
source = f"{r['event_number_in_session']}_{r['event_type']}_{r['e_value']}"
if source not in nodes:
nodes.append(source)
nodes_values.append({"name": r['e_value'], "eventType": r['event_type'],
"avgTimeFromPrevious": 0, "sessionsCount": 0})
nodes_values.append({"name": r['e_value'], "eventType": r['event_type']})
if r['next_value']:
target = f"{r['event_number_in_session'] + 1}_{r['next_type']}_{r['next_value']}"
if target not in nodes:
nodes.append(target)
nodes_values.append({"name": r['next_value'], "eventType": r['next_type'],
"avgTimeFromPrevious": 0, "sessionsCount": 0})
nodes_values.append({"name": r['next_value'], "eventType": r['next_type']})
sr_idx = nodes.index(source)
tg_idx = nodes.index(target)
if r["avg_time_from_previous"] is not None:
nodes_values[tg_idx]["avgTimeFromPrevious"] += r["avg_time_from_previous"] * r["sessions_count"]
nodes_values[tg_idx]["sessionsCount"] += r["sessions_count"]
link = {"eventType": r['event_type'], "sessionsCount": r["sessions_count"],
"value": r["value"], "avgTimeFromPrevious": r["avg_time_from_previous"]}
link = {"eventType": r['event_type'], "sessionsCount": r["sessions_count"],"value": r["value"]}
if not reverse_path:
link["source"] = sr_idx
link["target"] = tg_idx
@ -55,12 +47,6 @@ def __transform_journey(rows, reverse_path=False):
link["source"] = tg_idx
link["target"] = sr_idx
links.append(link)
for n in nodes_values:
if n["sessionsCount"] > 0:
n["avgTimeFromPrevious"] = n["avgTimeFromPrevious"] / n["sessionsCount"]
else:
n["avgTimeFromPrevious"] = None
n.pop("sessionsCount")
return {"nodes": nodes_values,
"links": sorted(links, key=lambda x: (x["source"], x["target"]), reverse=False)}

View file

@ -27,7 +27,7 @@ JOURNEY_TYPES = {
# query: Q5, the result is correct,
# startPoints are computed before ranked_events to reduce the number of window functions over rows
# replaced time_to_target by time_from_previous
# compute avg_time_from_previous at the same level as sessions_count
# compute avg_time_from_previous at the same level as sessions_count (this was removed in v1.22)
# sort by top 5 according to sessions_count at the CTE level
# final part project data without grouping
# if start-point is selected, the selected event is ranked n°1
@ -35,15 +35,29 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
sub_events = []
start_points_conditions = []
step_0_conditions = []
step_1_post_conditions = ["event_number_in_session <= %(density)s"]
if len(data.metric_value) == 0:
data.metric_value.append(schemas.ProductAnalyticsSelectedEventType.LOCATION)
sub_events.append({"column": JOURNEY_TYPES[schemas.ProductAnalyticsSelectedEventType.LOCATION]["column"],
"eventType": schemas.ProductAnalyticsSelectedEventType.LOCATION.value})
else:
if len(data.start_point) > 0:
extra_metric_values = []
for s in data.start_point:
if s.type not in data.metric_value:
sub_events.append({"column": JOURNEY_TYPES[s.type]["column"],
"eventType": JOURNEY_TYPES[s.type]["eventType"]})
step_1_post_conditions.append(
f"(event_type!='{JOURNEY_TYPES[s.type]["eventType"]}' OR event_number_in_session = 1)")
extra_metric_values.append(s.type)
data.metric_value += extra_metric_values
for v in data.metric_value:
if JOURNEY_TYPES.get(v):
sub_events.append({"column": JOURNEY_TYPES[v]["column"],
"eventType": JOURNEY_TYPES[v]["eventType"]})
if len(sub_events) == 1:
main_column = sub_events[0]['column']
else:
@ -317,7 +331,6 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
e_value,
next_type,
next_value,
AVG(time_from_previous) AS avg_time_from_previous,
COUNT(1) AS sessions_count
FROM ranked_events
WHERE event_number_in_session = 1
@ -330,8 +343,7 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
e_value,
next_type,
next_value,
sessions_count,
avg_time_from_previous
sessions_count
FROM n1"""]
for i in range(2, data.density + 1):
steps_query.append(f"""n{i} AS (SELECT *
@ -340,7 +352,6 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
re.e_value AS e_value,
re.next_type AS next_type,
re.next_value AS next_value,
AVG(re.time_from_previous) AS avg_time_from_previous,
COUNT(1) AS sessions_count
FROM n{i - 1} INNER JOIN ranked_events AS re
ON (n{i - 1}.next_value = re.e_value AND n{i - 1}.next_type = re.event_type)
@ -353,8 +364,7 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
e_value,
next_type,
next_value,
sessions_count,
avg_time_from_previous
sessions_count
FROM n{i}""")
with ch_client.ClickHouseClient(database="experimental") as ch:
@ -382,7 +392,7 @@ WITH {initial_sessions_cte}
FROM {main_events_table} {"INNER JOIN sub_sessions ON (sub_sessions.session_id = events.session_id)" if len(sessions_conditions) > 0 else ""}
WHERE {" AND ".join(ch_sub_query)}
) AS full_ranked_events
WHERE event_number_in_session <= %(density)s)
WHERE {" AND ".join(step_1_post_conditions)})
SELECT *
FROM pre_ranked_events;"""
logger.debug("---------Q1-----------")
@ -404,11 +414,7 @@ WITH pre_ranked_events AS (SELECT *
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS next_value,
leadInFrame(toNullable(event_type))
OVER (PARTITION BY session_id ORDER BY datetime {path_direction}
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS next_type,
abs(lagInFrame(toNullable(datetime))
OVER (PARTITION BY session_id ORDER BY datetime {path_direction}
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
- pre_ranked_events.datetime) AS time_from_previous
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS next_type
FROM start_points INNER JOIN pre_ranked_events USING (session_id))
SELECT *
FROM ranked_events;"""

View file

@ -6,7 +6,6 @@ from queue import Queue, Empty
import clickhouse_connect
from clickhouse_connect.driver.query import QueryContext
from clickhouse_connect.driver.exceptions import DatabaseError
from decouple import config
logger = logging.getLogger(__name__)
@ -32,9 +31,10 @@ if config("CH_COMPRESSION", cast=bool, default=True):
extra_args["compression"] = "lz4"
def transform_result(original_function):
def transform_result(self, original_function):
@wraps(original_function)
def wrapper(*args, **kwargs):
logger.debug(self.format(query=kwargs.get("query"), parameters=kwargs.get("parameters")))
result = original_function(*args, **kwargs)
if isinstance(result, clickhouse_connect.driver.query.QueryResult):
column_names = result.column_names
@ -140,7 +140,7 @@ class ClickHouseClient:
else:
self.__client = CH_pool.get_connection()
self.__client.execute = transform_result(self.__client.query)
self.__client.execute = transform_result(self, self.__client.query)
self.__client.format = self.format
def __enter__(self):

View file

@ -1209,10 +1209,10 @@ class CardPathAnalysis(__CardSchema):
if len(s.value) == 0:
continue
start_point.append(s)
self.metric_value.append(s.type)
# self.metric_value.append(s.type)
self.start_point = start_point
self.metric_value = remove_duplicate_values(self.metric_value)
# self.metric_value = remove_duplicate_values(self.metric_value)
return self