Api v1.15.0 (#1515)

* feat(chalice): upgraded dependencies

* feat(chalice): changed path analysis schema

* feat(DB): click coordinate support

* feat(chalice): changed path analysis issues schema
feat(chalice): upgraded dependencies

* fix(chalice): fixed pydantic issue

* refactor(chalice): refresh token validator

* feat(chalice): role restrictions

* feat(chalice): EE path analysis changes

* refactor(DB): changed creation queries
refactor(DB): changed delte queries
feat(DB): support new path analysis payload

* feat(chalice): save path analysis card

* feat(chalice): restrict access

* feat(chalice): restrict access

* feat(chalice): EE save new path analysis card

* refactor(chalice): path analysis

* feat(chalice): path analysis new query

* fix(chalice): configurable CH config

* fix(chalice): assist autocomplete

* refactor(chalice): refactored permissions

* refactor(chalice): changed log level

* refactor(chalice): upgraded dependencies

* refactor(chalice): changed path analysis query

* refactor(chalice): changed path analysis query

* refactor(chalice): upgraded dependencies
refactor(alerts): upgraded dependencies
refactor(crons): upgraded dependencies

* feat(chalice): path analysis ignore start point

* feat(chalice): path analysis in progress

* refactor(chalice): path analysis changed link sort

* refactor(chalice): path analysis changed link sort

* refactor(chalice): path analysis changed link sort

* refactor(chalice): path analysis new query
refactor(chalice): authorizers

* refactor(chalice): refactored authorizer

* fix(chalice): fixed create card of PathAnalysis

* refactor(chalice): compute link-percentage for Path Analysis

* refactor(chalice): remove null starting point from Path Analysis

* feat(chalice): path analysis CH query
This commit is contained in:
Kraiem Taha Yassine 2023-10-12 17:43:11 +02:00 committed by GitHub
parent e23717741f
commit 56f2a0af75
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 184 additions and 349 deletions

View file

@ -40,7 +40,19 @@ def __transform_journey2(rows, reverse_path=False):
def __transform_journey3(rows, reverse_path=False):
# nodes should contain duplicates for different steps otherwise the UI crashes
total_100p = 0
number_of_step1 = 0
for r in rows:
if r["event_number_in_session"] > 1:
break
number_of_step1 += 1
total_100p += r["sessions_count"]
for i in range(number_of_step1):
rows[i]["value"] = round(number=100 / number_of_step1, ndigits=2)
for i in range(number_of_step1, len(rows)):
rows[i]["value"] = round(number=rows[i]["sessions_count"] * 100 / total_100p, ndigits=2)
nodes = []
nodes_values = []
links = []
@ -54,8 +66,8 @@ def __transform_journey3(rows, reverse_path=False):
if target not in nodes:
nodes.append(target)
nodes_values.append({"name": r['next_value'], "eventType": r['next_type']})
link = {"eventType": r['event_type'], "value": r["sessions_count"],
"avgTimeFromPervious": r["avg_time_from_previous"]}
link = {"eventType": r['event_type'], "sessionsCount": r["sessions_count"],
"value": r["value"], "avgTimeFromPrevious": r["avg_time_from_previous"]}
if not reverse_path:
link["source"] = nodes.index(source)
link["target"] = nodes.index(target)
@ -76,328 +88,6 @@ JOURNEY_TYPES = {
}
# query: Q2, the result is correct
def path_analysis_deprecated(project_id: int, data: schemas.CardPathAnalysis):
sub_events = []
start_points_join = ""
start_points_conditions = []
sessions_conditions = ["start_ts>=%(startTimestamp)s", "start_ts<%(endTimestamp)s",
"project_id=%(project_id)s", "events_count > 1", "duration>0"]
if len(data.metric_value) == 0:
data.metric_value.append(schemas.ProductAnalyticsSelectedEventType.location)
sub_events.append({"table": JOURNEY_TYPES[schemas.ProductAnalyticsSelectedEventType.location]["table"],
"column": JOURNEY_TYPES[schemas.ProductAnalyticsSelectedEventType.location]["column"],
"eventType": schemas.ProductAnalyticsSelectedEventType.location.value})
else:
for v in data.metric_value:
if JOURNEY_TYPES.get(v):
sub_events.append({"table": JOURNEY_TYPES[v]["table"],
"column": JOURNEY_TYPES[v]["column"],
"eventType": v})
extra_values = {}
reverse = data.start_type == "end"
for i, sf in enumerate(data.start_point):
f_k = f"start_point_{i}"
op = sh.get_sql_operator(sf.operator)
is_not = sh.is_negation_operator(sf.operator)
extra_values = {**extra_values, **sh.multi_values(sf.value, value_key=f_k)}
start_points_conditions.append(f"(event_type='{sf.type}' AND " +
sh.multi_conditions(f'e_value {op} %({f_k})s', sf.value, is_not=is_not,
value_key=f_k)
+ ")")
exclusions = {}
for i, ef in enumerate(data.excludes):
if ef.type in data.metric_value:
f_k = f"exclude_{i}"
extra_values = {**extra_values, **sh.multi_values(ef.value, value_key=f_k)}
exclusions[ef.type] = [
sh.multi_conditions(f'{JOURNEY_TYPES[ef.type]["column"]} != %({f_k})s', ef.value, is_not=True,
value_key=f_k)]
meta_keys = None
for i, f in enumerate(data.series[0].filter.filters):
op = sh.get_sql_operator(f.operator)
is_any = sh.isAny_opreator(f.operator)
is_not = sh.is_negation_operator(f.operator)
is_undefined = sh.isUndefined_operator(f.operator)
f_k = f"f_value_{i}"
extra_values = {**extra_values, **sh.multi_values(f.value, value_key=f_k)}
# ---- meta-filters
if f.type == schemas.FilterType.user_browser:
if is_any:
sessions_conditions.append('user_browser IS NOT NULL')
else:
sessions_conditions.append(
sh.multi_conditions(f'user_browser {op} %({f_k})s', f.value, is_not=is_not, value_key=f_k))
elif f.type in [schemas.FilterType.user_os]:
if is_any:
sessions_conditions.append('user_os IS NOT NULL')
else:
sessions_conditions.append(
sh.multi_conditions(f'user_os {op} %({f_k})s', f.value, is_not=is_not, value_key=f_k))
elif f.type in [schemas.FilterType.user_device]:
if is_any:
sessions_conditions.append('user_device IS NOT NULL')
else:
sessions_conditions.append(
sh.multi_conditions(f'user_device {op} %({f_k})s', f.value, is_not=is_not, value_key=f_k))
elif f.type in [schemas.FilterType.user_country]:
if is_any:
sessions_conditions.append('user_country IS NOT NULL')
else:
sessions_conditions.append(
sh.multi_conditions(f'user_country {op} %({f_k})s', f.value, is_not=is_not, value_key=f_k))
elif f.type == schemas.FilterType.user_city:
if is_any:
sessions_conditions.append('user_city IS NOT NULL')
else:
sessions_conditions.append(
sh.multi_conditions(f'user_city {op} %({f_k})s', f.value, is_not=is_not, value_key=f_k))
elif f.type == schemas.FilterType.user_state:
if is_any:
sessions_conditions.append('user_state IS NOT NULL')
else:
sessions_conditions.append(
sh.multi_conditions(f'user_state {op} %({f_k})s', f.value, is_not=is_not, value_key=f_k))
elif f.type in [schemas.FilterType.utm_source]:
if is_any:
sessions_conditions.append('utm_source IS NOT NULL')
elif is_undefined:
sessions_conditions.append('utm_source IS NULL')
else:
sessions_conditions.append(
sh.multi_conditions(f'utm_source {op} %({f_k})s::text', f.value, is_not=is_not,
value_key=f_k))
elif f.type in [schemas.FilterType.utm_medium]:
if is_any:
sessions_conditions.append('utm_medium IS NOT NULL')
elif is_undefined:
sessions_conditions.append('utm_medium IS NULL')
else:
sessions_conditions.append(
sh.multi_conditions(f'utm_medium {op} %({f_k})s::text', f.value, is_not=is_not,
value_key=f_k))
elif f.type in [schemas.FilterType.utm_campaign]:
if is_any:
sessions_conditions.append('utm_campaign IS NOT NULL')
elif is_undefined:
sessions_conditions.append('utm_campaign IS NULL')
else:
sessions_conditions.append(
sh.multi_conditions(f'utm_campaign {op} %({f_k})s::text', f.value, is_not=is_not,
value_key=f_k))
elif f.type == schemas.FilterType.duration:
if len(f.value) > 0 and f.value[0] is not None:
sessions_conditions.append("duration >= %(minDuration)s")
extra_values["minDuration"] = f.value[0]
if len(f.value) > 1 and f.value[1] is not None and int(f.value[1]) > 0:
sessions_conditions.append("duration <= %(maxDuration)s")
extra_values["maxDuration"] = f.value[1]
elif f.type == schemas.FilterType.referrer:
# extra_from += f"INNER JOIN {events.event_type.LOCATION.table} AS p USING(session_id)"
if is_any:
sessions_conditions.append('base_referrer IS NOT NULL')
else:
sessions_conditions.append(
sh.multi_conditions(f"base_referrer {op} %({f_k})s", f.value, is_not=is_not,
value_key=f_k))
elif f.type == schemas.FilterType.metadata:
# get metadata list only if you need it
if meta_keys is None:
meta_keys = metadata.get(project_id=project_id)
meta_keys = {m["key"]: m["index"] for m in meta_keys}
if f.source in meta_keys.keys():
if is_any:
sessions_conditions.append(f"{metadata.index_to_colname(meta_keys[f.source])} IS NOT NULL")
elif is_undefined:
sessions_conditions.append(f"{metadata.index_to_colname(meta_keys[f.source])} IS NULL")
else:
sessions_conditions.append(
sh.multi_conditions(
f"{metadata.index_to_colname(meta_keys[f.source])} {op} %({f_k})s::text",
f.value, is_not=is_not, value_key=f_k))
elif f.type in [schemas.FilterType.user_id, schemas.FilterType.user_id_ios]:
if is_any:
sessions_conditions.append('user_id IS NOT NULL')
elif is_undefined:
sessions_conditions.append('user_id IS NULL')
else:
sessions_conditions.append(
sh.multi_conditions(f"s.user_id {op} %({f_k})s::text", f.value, is_not=is_not,
value_key=f_k))
elif f.type in [schemas.FilterType.user_anonymous_id,
schemas.FilterType.user_anonymous_id_ios]:
if is_any:
sessions_conditions.append('user_anonymous_id IS NOT NULL')
elif is_undefined:
sessions_conditions.append('user_anonymous_id IS NULL')
else:
sessions_conditions.append(
sh.multi_conditions(f"user_anonymous_id {op} %({f_k})s::text", f.value, is_not=is_not,
value_key=f_k))
elif f.type in [schemas.FilterType.rev_id, schemas.FilterType.rev_id_ios]:
if is_any:
sessions_conditions.append('rev_id IS NOT NULL')
elif is_undefined:
sessions_conditions.append('rev_id IS NULL')
else:
sessions_conditions.append(
sh.multi_conditions(f"rev_id {op} %({f_k})s::text", f.value, is_not=is_not, value_key=f_k))
elif f.type == schemas.FilterType.platform:
# op = __ sh.get_sql_operator(f.operator)
sessions_conditions.append(
sh.multi_conditions(f"user_device_type {op} %({f_k})s", f.value, is_not=is_not,
value_key=f_k))
elif f.type == schemas.FilterType.issue:
if is_any:
sessions_conditions.append("array_length(issue_types, 1) > 0")
else:
sessions_conditions.append(
sh.multi_conditions(f"%({f_k})s {op} ANY (issue_types)", f.value, is_not=is_not,
value_key=f_k))
elif f.type == schemas.FilterType.events_count:
sessions_conditions.append(
sh.multi_conditions(f"events_count {op} %({f_k})s", f.value, is_not=is_not,
value_key=f_k))
events_subquery = []
for t in sub_events:
sub_events_conditions = ["e.timestamp >= %(startTimestamp)s",
"e.timestamp < %(endTimestamp)s"] + exclusions.get(t["eventType"], [])
events_subquery.append(f"""\
SELECT session_id, {t["column"]} AS e_value, timestamp, '{t["eventType"]}' AS event_type
FROM {t["table"]} AS e
INNER JOIN sub_sessions USING (session_id)
WHERE {" AND ".join(sub_events_conditions)}""")
events_subquery = "\n UNION ALL \n".join(events_subquery)
if reverse:
path_direction = "DESC"
else:
path_direction = ""
if len(start_points_conditions) == 0:
start_points_join = """INNER JOIN
(SELECT event_type, e_value
FROM ranked_events
WHERE event_number_in_session = 1
GROUP BY event_type, e_value
ORDER BY count(1) DESC
LIMIT 2
) AS top_start_events USING (event_type, e_value)"""
else:
start_points_conditions = ["(" + " OR ".join(start_points_conditions) + ")"]
start_points_conditions.append("event_number_in_session = 1")
steps_query = ["""n1 AS (SELECT event_number_in_session,
event_type,
e_value,
next_type,
next_value,
time_to_next,
count(1) AS sessions_count
FROM ranked_events
INNER JOIN start_points USING (session_id)
WHERE event_number_in_session = 1
GROUP BY event_number_in_session, event_type, e_value, next_type, next_value, time_to_next)"""]
projection_query = ["""(SELECT event_number_in_session,
event_type,
e_value,
next_type,
next_value,
sessions_count,
avg(time_to_next) AS avg_time_to_target
FROM n1
GROUP BY event_number_in_session, event_type, e_value, next_type, next_value, sessions_count
ORDER BY event_number_in_session, event_type, e_value, next_type, next_value)"""]
for i in range(2, data.density):
steps_query.append(f"""n{i} AS (SELECT *
FROM (SELECT re.event_number_in_session,
re.event_type,
re.e_value,
re.next_type,
re.next_value,
re.time_to_next,
count(1) AS sessions_count
FROM ranked_events AS re
INNER JOIN start_points USING (session_id)
INNER JOIN n{i - 1} ON (n{i - 1}.next_value = re.e_value)
WHERE re.event_number_in_session = {i}
GROUP BY re.event_number_in_session, re.event_type, re.e_value, re.next_type, re.next_value,
re.time_to_next) AS sub_level
ORDER BY sessions_count DESC
LIMIT %(eventThresholdNumberInGroup)s)""")
projection_query.append(f"""(SELECT event_number_in_session,
event_type,
e_value,
next_type,
next_value,
sessions_count,
avg(time_to_next) AS avg_time_to_target
FROM n{i}
GROUP BY event_number_in_session, event_type, e_value, next_type, next_value, sessions_count
ORDER BY event_number_in_session, event_type, e_value, next_type, next_value)""")
with pg_client.PostgresClient() as cur:
pg_query = f"""\
WITH sub_sessions AS ( SELECT session_id
FROM public.sessions
WHERE {" AND ".join(sessions_conditions)}),
sub_events AS ({events_subquery}),
ranked_events AS (SELECT *
FROM (SELECT session_id,
event_type,
e_value,
row_number() OVER (PARTITION BY session_id ORDER BY timestamp {path_direction}) AS event_number_in_session,
LEAD(e_value, 1) OVER (PARTITION BY session_id ORDER BY timestamp {path_direction}) AS next_value,
LEAD(event_type, 1) OVER (PARTITION BY session_id ORDER BY timestamp {path_direction}) AS next_type,
abs(LEAD(timestamp, 1) OVER (PARTITION BY session_id ORDER BY timestamp {path_direction}) -
timestamp) AS time_to_next
FROM sub_events) AS full_ranked_events
WHERE event_number_in_session < %(density)s
),
start_points AS (SELECT session_id
FROM ranked_events {start_points_join}
WHERE {" AND ".join(start_points_conditions)}),
{",".join(steps_query)}
{"UNION ALL".join(projection_query)};"""
params = {"project_id": project_id, "startTimestamp": data.startTimestamp,
"endTimestamp": data.endTimestamp, "density": data.density,
"eventThresholdNumberInGroup": 8 if data.hide_excess else 6,
# TODO: add if data=args is required
# **__get_constraint_values(args),
**extra_values}
query = cur.mogrify(pg_query, params)
_now = time()
cur.execute(query)
if time() - _now > 2:
print(f">>>>>>>>>PathAnalysis long query ({int(time() - _now)}s)<<<<<<<<<")
print("----------------------")
print(query)
print("----------------------")
rows = cur.fetchall()
return __transform_journey2(rows=rows, reverse_path=reverse)
# query: Q3, the result is correct,
# startPoints are computed before ranked_events to reduce the number of window functions over rows
# replaced time_to_target by time_from_previous
@ -642,7 +332,7 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
count(1) AS sessions_count
FROM ranked_events
INNER JOIN start_points USING (session_id)
WHERE event_number_in_session = 1
WHERE event_number_in_session = 1 AND next_value IS NOT NULL
GROUP BY event_number_in_session, event_type, e_value, next_type, next_value, time_from_previous)"""]
projection_query = ["""(SELECT event_number_in_session,
event_type,

View file

@ -1262,6 +1262,7 @@ class CardInsights(__CardSchema):
# class CardPathAnalysisSchema(BaseModel):
class CardPathAnalysisSchema(CardSessionsSchema):
name: Optional[str] = Field(default=None)
filter: PathAnalysisSchema = Field(...)
density: int = Field(default=4, ge=2, le=10)

View file

@ -25,7 +25,6 @@ def __transform_journey2(rows, reverse_path=False):
target = f"{r['event_number_in_session'] + 1}_{r['next_type']}_{r['next_value']}"
if target not in nodes:
nodes.append(target)
# TODO: remove this after UI supports long values
nodes_values.append({"name": r['next_value'], "eventType": r['next_type']})
link = {"eventType": r['event_type'], "value": r["sessions_count"],
"avgTimeToTarget": r["avg_time_to_target"]}
@ -41,6 +40,47 @@ def __transform_journey2(rows, reverse_path=False):
"links": sorted(links, key=lambda x: (x["source"], x["target"]), reverse=False)}
def __transform_journey3(rows, reverse_path=False):
total_100p = 0
number_of_step1 = 0
for r in rows:
if r["event_number_in_session"] > 1:
break
number_of_step1 += 1
total_100p += r["sessions_count"]
for i in range(number_of_step1):
rows[i]["value"] = round(number=100 / number_of_step1, ndigits=2)
for i in range(number_of_step1, len(rows)):
rows[i]["value"] = round(number=rows[i]["sessions_count"] * 100 / total_100p, ndigits=2)
nodes = []
nodes_values = []
links = []
for r in rows:
source = f"{r['event_number_in_session']}_{r['event_type']}_{r['e_value']}"
if source not in nodes:
nodes.append(source)
nodes_values.append({"name": r['e_value'], "eventType": r['event_type']})
if r['next_value']:
target = f"{r['event_number_in_session'] + 1}_{r['next_type']}_{r['next_value']}"
if target not in nodes:
nodes.append(target)
nodes_values.append({"name": r['next_value'], "eventType": r['next_type']})
link = {"eventType": r['event_type'], "sessionsCount": r["sessions_count"],
"value": r["value"], "avgTimeFromPrevious": r["avg_time_from_previous"]}
if not reverse_path:
link["source"] = nodes.index(source)
link["target"] = nodes.index(target)
else:
link["source"] = nodes.index(target)
link["target"] = nodes.index(source)
links.append(link)
return {"nodes": nodes_values,
"links": sorted(links, key=lambda x: (x["source"], x["target"]), reverse=False)}
JOURNEY_TYPES = {
schemas.ProductAnalyticsSelectedEventType.location: {"eventType": "LOCATION", "column": "url_path"},
schemas.ProductAnalyticsSelectedEventType.click: {"eventType": "CLICK", "column": "label"},
@ -49,6 +89,9 @@ JOURNEY_TYPES = {
}
# query: Q3, the result is correct,
# startPoints are computed before ranked_events to reduce the number of window functions over rows
# replaced time_to_target by time_from_previous
def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
sub_events = []
start_points_conditions = []
@ -80,7 +123,7 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
+ ")")
exclusions = {}
for i, sf in enumerate(data.exclude):
for i, sf in enumerate(data.excludes):
if sf.type in data.metric_value:
f_k = f"exclude_{i}"
extra_values = {**extra_values, **sh.multi_values(sf.value, value_key=f_k)}
@ -98,6 +141,9 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
f_k = f"f_value_{i}"
extra_values = {**extra_values, **sh.multi_values(f.value, value_key=f_k)}
if not is_any and len(f.value) == 0:
continue
# ---- meta-filters
if f.type == schemas.FilterType.user_browser:
if is_any:
@ -274,23 +320,21 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
INNER JOIN events USING (session_id)"""
if len(start_points_conditions) == 0:
start_points_subquery = """SELECT DISTINCT session_id
FROM (SELECT event_type, e_value
FROM full_ranked_events
WHERE event_number_in_session = 1
GROUP BY event_type, e_value
ORDER BY count(1) DESC
LIMIT 2) AS top_start_events
INNER JOIN full_ranked_events
ON (top_start_events.event_type = full_ranked_events.event_type AND
top_start_events.e_value = full_ranked_events.e_value AND
full_ranked_events.event_number_in_session = 1 AND
isNotNull(next_value))"""
FROM (SELECT event_type, e_value
FROM pre_ranked_events
WHERE event_number_in_session = 1
GROUP BY event_type, e_value
ORDER BY count(1) DESC
LIMIT 1) AS top_start_events
INNER JOIN pre_ranked_events
ON (top_start_events.event_type = pre_ranked_events.event_type AND
top_start_events.e_value = pre_ranked_events.e_value)
WHERE pre_ranked_events.event_number_in_session = 1"""
else:
start_points_conditions = ["(" + " OR ".join(start_points_conditions) + ")",
"event_number_in_session = 1",
"isNotNull(next_value)"]
"event_number_in_session = 1"]
start_points_subquery = f"""SELECT DISTINCT session_id
FROM full_ranked_events
FROM pre_ranked_events
WHERE {" AND ".join(start_points_conditions)}"""
del start_points_conditions
if reverse:
@ -298,7 +342,56 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
else:
path_direction = ""
steps_query = ["""n1 AS (SELECT event_number_in_session,
event_type,
e_value,
next_type,
next_value,
time_from_previous,
count(1) AS sessions_count
FROM ranked_events
WHERE event_number_in_session = 1
AND isNotNull(next_value)
GROUP BY event_number_in_session, event_type, e_value, next_type, next_value, time_from_previous
LIMIT 5)"""]
projection_query = ["""SELECT event_number_in_session,
event_type,
e_value,
next_type,
next_value,
sessions_count,
avg(time_from_previous) AS avg_time_from_previous
FROM n1
GROUP BY event_number_in_session, event_type, e_value, next_type, next_value, sessions_count"""]
for i in range(2, data.density):
steps_query.append(f"""n{i} AS (SELECT *
FROM (SELECT re.event_number_in_session AS event_number_in_session,
re.event_type AS event_type,
re.e_value AS e_value,
re.next_type AS next_type,
re.next_value AS next_value,
re.time_from_previous AS time_from_previous,
count(1) AS sessions_count
FROM n{i - 1} INNER JOIN ranked_events AS re
ON (n{i - 1}.next_value = re.e_value AND n{i - 1}.next_type = re.event_type)
WHERE re.event_number_in_session = {i}
GROUP BY re.event_number_in_session, re.event_type, re.e_value, re.next_type, re.next_value,
re.time_from_previous) AS sub_level
ORDER BY sessions_count DESC
LIMIT %(eventThresholdNumberInGroup)s)""")
projection_query.append(f"""SELECT event_number_in_session,
event_type,
e_value,
next_type,
next_value,
sessions_count,
avg(time_from_previous) AS avg_time_from_previous
FROM n{i}
GROUP BY event_number_in_session, event_type, e_value, next_type, next_value, sessions_count""")
with ch_client.ClickHouseClient(database="experimental") as ch:
time_key = TimeUTC.now()
_now = time()
ch_query = f"""\
WITH full_ranked_events AS (SELECT session_id,
event_type,
@ -342,19 +435,70 @@ FROM (SELECT *
GROUP BY event_number_in_session, event_type, e_value, next_type, next_value, sessions_count
ORDER BY event_number_in_session, e_value, next_value;"""
params = {"project_id": project_id, "startTimestamp": data.startTimestamp,
"endTimestamp": data.endTimestamp,
# **__get_constraint_values(args),
"endTimestamp": data.endTimestamp, "density": data.density,
"eventThresholdNumberInGroup": 6 if data.hide_excess else 8,
**extra_values}
_now = time()
rows = ch.execute(query=ch_query, params=params)
if time() - _now > 0:
ch_query1 = f"""\
CREATE TEMPORARY TABLE pre_ranked_events_{time_key} AS
WITH pre_ranked_events AS (SELECT *
FROM (SELECT session_id,
event_type,
datetime,
{main_column} AS e_value,
row_number() OVER (PARTITION BY session_id
ORDER BY datetime {path_direction},
message_id {path_direction} ) AS event_number_in_session
FROM {main_table}
WHERE {" AND ".join(ch_sub_query)}
) AS full_ranked_events
WHERE event_number_in_session < 4)
SELECT *
FROM pre_ranked_events;"""
ch.execute(query=ch_query1, params=params)
ch_query2 = f"""\
CREATE TEMPORARY TABLE ranked_events_{time_key} AS
WITH pre_ranked_events AS (SELECT *
FROM pre_ranked_events_{time_key}),
start_points AS ({start_points_subquery}),
ranked_events AS (SELECT pre_ranked_events.*,
leadInFrame(e_value)
OVER (PARTITION BY session_id ORDER BY datetime {path_direction}
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS next_value,
leadInFrame(toNullable(event_type))
OVER (PARTITION BY session_id ORDER BY datetime {path_direction}
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS next_type,
abs(lagInFrame(toNullable(datetime))
OVER (PARTITION BY session_id ORDER BY datetime {path_direction}
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
- pre_ranked_events.datetime) AS time_from_previous
FROM start_points INNER JOIN pre_ranked_events USING (session_id))
SELECT *
FROM ranked_events;"""
ch.execute(query=ch_query2, params=params)
ch_query3 = f"""\
WITH ranked_events AS (SELECT *
FROM ranked_events_{time_key}),
{",".join(steps_query)}
SELECT *
FROM ({" UNION ALL ".join(projection_query)}) AS chart_steps
ORDER BY event_number_in_session;"""
rows = ch.execute(query=ch_query3, params=params)
if time() - _now > 2:
print(f">>>>>>>>>PathAnalysis long query EE ({int(time() - _now)}s)<<<<<<<<<")
print("----------------------")
print(print(ch.format(ch_query, params)))
print("---------Q1-----------")
print(ch.format(ch_query1, params))
print("---------Q2-----------")
print(ch.format(ch_query2, params))
print("---------Q3-----------")
print(ch.format(ch_query3, params))
print("----------------------")
return __transform_journey2(rows=rows, reverse_path=reverse)
return __transform_journey3(rows=rows, reverse_path=reverse)
#
# def __compute_weekly_percentage(rows):