change(api): follow the new structure for cards (#2952)

* change(api): follow the new strucutre for caards

* change(api): update query to handle location and performance events

* change(api): ch query updaets - monitors - sessions with 4xx ..

* change(api): ch query updaets - monitors - table of errors

* change(api): ch query updates - use created_at

* change(api): ch query updates - fix the column name for errorId

* change(api): ch query updates - heatmaps

* change(api): ch query updates - funnels

* change(api): ch query updates - user jounrey / path finder

* change(api): ch query updates - user jounrey / path finder

* change(api): ch query updates - heatmaps fix

* refactor(chalice): changes

* refactor(chalice): changes

* refactor(chalice): changes

---------

Co-authored-by: Taha Yassine Kraiem <tahayk2@gmail.com>
This commit is contained in:
Shekar Siri 2025-01-23 12:21:23 +01:00 committed by GitHub
parent f535870811
commit 954e811be0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 843 additions and 501 deletions

View file

@ -257,9 +257,9 @@ def __search_metadata(project_id, value, key=None, source=None):
WHERE project_id = %(project_id)s
AND {colname} ILIKE %(svalue)s LIMIT 5)""")
with ch_client.ClickHouseClient() as cur:
query = cur.format(f"""SELECT key, value, 'METADATA' AS TYPE
query = cur.format(query=f"""SELECT key, value, 'METADATA' AS TYPE
FROM({" UNION ALL ".join(sub_from)}) AS all_metas
LIMIT 5;""", {"project_id": project_id, "value": helper.string_to_sql_like(value),
LIMIT 5;""", parameters={"project_id": project_id, "value": helper.string_to_sql_like(value),
"svalue": helper.string_to_sql_like("^" + value)})
results = cur.execute(query)
return helper.list_to_camel_case(results)

View file

@ -1,6 +1,6 @@
import schemas
from chalicelib.core import metadata
from chalicelib.core import sessions
from chalicelib.core.errors.modules import sessions
from chalicelib.core.metrics import metrics
from chalicelib.utils import ch_client, exp_ch_helper
from chalicelib.utils import helper, metrics_helper
@ -70,7 +70,7 @@ def __get_basic_constraints(platform=None, time_constraint=True, startTime_arg_n
else:
table_name = ""
if type_condition:
ch_sub_query.append(f"{table_name}event_type='ERROR'")
ch_sub_query.append(f"{table_name}`$event_name`='ERROR'")
if time_constraint:
ch_sub_query += [f"{table_name}datetime >= toDateTime(%({startTime_arg_name})s/1000)",
f"{table_name}datetime < toDateTime(%({endTime_arg_name})s/1000)"]
@ -81,6 +81,26 @@ def __get_basic_constraints(platform=None, time_constraint=True, startTime_arg_n
return ch_sub_query
def __get_basic_constraints_events(platform=None, time_constraint=True, startTime_arg_name="startDate",
endTime_arg_name="endDate", type_condition=True, project_key="project_id",
table_name=None):
ch_sub_query = [f"{project_key} =toUInt16(%(project_id)s)"]
if table_name is not None:
table_name = table_name + "."
else:
table_name = ""
if type_condition:
ch_sub_query.append(f"{table_name}`$event_name`='ERROR'")
if time_constraint:
ch_sub_query += [f"{table_name}created_at >= toDateTime(%({startTime_arg_name})s/1000)",
f"{table_name}created_at < toDateTime(%({endTime_arg_name})s/1000)"]
# if platform == schemas.PlatformType.MOBILE:
# ch_sub_query.append("user_device_type = 'mobile'")
# elif platform == schemas.PlatformType.DESKTOP:
# ch_sub_query.append("user_device_type = 'desktop'")
return ch_sub_query
def __get_sort_key(key):
return {
schemas.ErrorSort.OCCURRENCE: "max_datetime",
@ -99,10 +119,11 @@ def search(data: schemas.SearchErrorsSchema, project_id, user_id):
platform = f.value[0]
ch_sessions_sub_query = __get_basic_constraints(platform, type_condition=False)
# ignore platform for errors table
ch_sub_query = __get_basic_constraints(None, type_condition=True)
ch_sub_query.append("source ='js_exception'")
ch_sub_query = __get_basic_constraints_events(None, type_condition=True)
ch_sub_query.append("JSONExtractString(toString(`$properties`), 'source') = 'js_exception'")
# To ignore Script error
ch_sub_query.append("message!='Script error.'")
ch_sub_query.append("JSONExtractString(toString(`$properties`), 'message') != 'Script error.'")
error_ids = None
if data.startTimestamp is None:
@ -183,7 +204,6 @@ def search(data: schemas.SearchErrorsSchema, project_id, user_id):
_multiple_conditions(f's.user_country {op} %({f_k})s', f.value, is_not=is_not,
value_key=f_k))
elif filter_type in [schemas.FilterType.UTM_SOURCE]:
if is_any:
ch_sessions_sub_query.append('isNotNull(s.utm_source)')
@ -333,17 +353,18 @@ def search(data: schemas.SearchErrorsSchema, project_id, user_id):
ch_sub_query.append("error_id IN %(error_ids)s")
main_ch_query = f"""\
SELECT details.error_id AS error_id,
SELECT details.error_id as error_id,
name, message, users, total,
sessions, last_occurrence, first_occurrence, chart
FROM (SELECT error_id,
name,
message,
FROM (SELECT JSONExtractString(toString(`$properties`), 'error_id') AS error_id,
JSONExtractString(toString(`$properties`), 'name') AS name,
JSONExtractString(toString(`$properties`), 'message') AS message,
COUNT(DISTINCT user_id) AS users,
COUNT(DISTINCT events.session_id) AS sessions,
MAX(datetime) AS max_datetime,
MIN(datetime) AS min_datetime,
COUNT(DISTINCT events.error_id) OVER() AS total
MAX(created_at) AS max_datetime,
MIN(created_at) AS min_datetime,
COUNT(DISTINCT JSONExtractString(toString(`$properties`), 'error_id'))
OVER() AS total
FROM {MAIN_EVENTS_TABLE} AS events
INNER JOIN (SELECT session_id, coalesce(user_id,toString(user_uuid)) AS user_id
FROM {MAIN_SESSIONS_TABLE} AS s
@ -354,16 +375,17 @@ def search(data: schemas.SearchErrorsSchema, project_id, user_id):
GROUP BY error_id, name, message
ORDER BY {sort} {order}
LIMIT %(errors_limit)s OFFSET %(errors_offset)s) AS details
INNER JOIN (SELECT error_id AS error_id,
toUnixTimestamp(MAX(datetime))*1000 AS last_occurrence,
toUnixTimestamp(MIN(datetime))*1000 AS first_occurrence
INNER JOIN (SELECT JSONExtractString(toString(`$properties`), 'error_id') AS error_id,
toUnixTimestamp(MAX(created_at))*1000 AS last_occurrence,
toUnixTimestamp(MIN(created_at))*1000 AS first_occurrence
FROM {MAIN_EVENTS_TABLE}
WHERE project_id=%(project_id)s
AND event_type='ERROR'
AND `$event_name`='ERROR'
GROUP BY error_id) AS time_details
ON details.error_id=time_details.error_id
INNER JOIN (SELECT error_id, groupArray([timestamp, count]) AS chart
FROM (SELECT error_id, toUnixTimestamp(toStartOfInterval(datetime, INTERVAL %(step_size)s second)) * 1000 AS timestamp,
FROM (SELECT JSONExtractString(toString(`$properties`), 'error_id') AS error_id,
toUnixTimestamp(toStartOfInterval(created_at, INTERVAL %(step_size)s second)) * 1000 AS timestamp,
COUNT(DISTINCT session_id) AS count
FROM {MAIN_EVENTS_TABLE}
WHERE {" AND ".join(ch_sub_query)}
@ -374,8 +396,9 @@ def search(data: schemas.SearchErrorsSchema, project_id, user_id):
# print("------------")
# print(ch.format(main_ch_query, params))
# print("------------")
query = ch.format(query=main_ch_query, parameters=params)
rows = ch.execute(query=main_ch_query, parameters=params)
rows = ch.execute(query)
total = rows[0]["total"] if len(rows) > 0 else 0
for r in rows:

View file

@ -17,15 +17,18 @@ def get_by_url(project_id, data: schemas.GetHeatMapPayloadSchema):
return []
args = {"startDate": data.startTimestamp, "endDate": data.endTimestamp,
"project_id": project_id, "url": data.url}
constraints = ["main_events.project_id = toUInt16(%(project_id)s)",
"main_events.datetime >= toDateTime(%(startDate)s/1000)",
"main_events.datetime <= toDateTime(%(endDate)s/1000)",
"main_events.event_type='CLICK'",
"isNotNull(main_events.normalized_x)"]
constraints = [
"main_events.project_id = toUInt16(%(project_id)s)",
"main_events.created_at >= toDateTime(%(startDate)s / 1000)",
"main_events.created_at <= toDateTime(%(endDate)s / 1000)",
"main_events.`$event_name` = 'CLICK'",
"isNotNull(JSON_VALUE(CAST(main_events.`$properties` AS String), '$.normalized_x'))"
]
if data.operator == schemas.SearchEventOperator.IS:
constraints.append("url_path= %(url)s")
constraints.append("JSON_VALUE(CAST(main_events.`$properties` AS String), '$.url_path') = %(url)s")
else:
constraints.append("url_path ILIKE %(url)s")
constraints.append("JSON_VALUE(CAST(main_events.`$properties` AS String), '$.url_path') ILIKE %(url)s")
args["url"] = helper.values_for_operator(data.url, data.operator)
query_from = f"{exp_ch_helper.get_main_events_table(data.startTimestamp)} AS main_events"
@ -70,8 +73,9 @@ def get_by_url(project_id, data: schemas.GetHeatMapPayloadSchema):
# query_from += """ LEFT JOIN experimental.events AS issues_t ON (main_events.session_id=issues_t.session_id)
# LEFT JOIN experimental.issues AS mis ON (issues_t.issue_id=mis.issue_id)"""
with ch_client.ClickHouseClient() as cur:
query = cur.format(query=f"""SELECT main_events.normalized_x AS normalized_x,
main_events.normalized_y AS normalized_y
query = cur.format(query=f"""SELECT
JSON_VALUE(CAST(`$properties` AS String), '$.normalized_x') AS normalized_x,
JSON_VALUE(CAST(`$properties` AS String), '$.normalized_y') AS normalized_y
FROM {query_from}
WHERE {" AND ".join(constraints)}
LIMIT 500;""",
@ -94,14 +98,16 @@ def get_by_url(project_id, data: schemas.GetHeatMapPayloadSchema):
def get_x_y_by_url_and_session_id(project_id, session_id, data: schemas.GetHeatMapPayloadSchema):
args = {"project_id": project_id, "session_id": session_id, "url": data.url}
constraints = ["main_events.project_id = toUInt16(%(project_id)s)",
constraints = [
"main_events.project_id = toUInt16(%(project_id)s)",
"main_events.session_id = %(session_id)s",
"main_events.event_type='CLICK'",
"isNotNull(main_events.normalized_x)"]
"main_events.`$event_name`='CLICK'",
"isNotNull(JSON_VALUE(CAST(main_events.`$properties` AS String), '$.normalized_x'))"
]
if data.operator == schemas.SearchEventOperator.IS:
constraints.append("main_events.url_path = %(url)s")
constraints.append("JSON_VALUE(CAST(main_events.`$properties` AS String), '$.url_path') = %(url)s")
else:
constraints.append("main_events.url_path ILIKE %(url)s")
constraints.append("JSON_VALUE(CAST(main_events.`$properties` AS String), '$.url_path') ILIKE %(url)s")
args["url"] = helper.values_for_operator(data.url, data.operator)
query_from = f"{exp_ch_helper.get_main_events_table(0)} AS main_events"
@ -132,11 +138,12 @@ def get_selectors_by_url_and_session_id(project_id, session_id, data: schemas.Ge
args = {"project_id": project_id, "session_id": session_id, "url": data.url}
constraints = ["main_events.project_id = toUInt16(%(project_id)s)",
"main_events.session_id = %(session_id)s",
"main_events.event_type='CLICK'"]
"main_events.`$event_name`='CLICK'"]
if data.operator == schemas.SearchEventOperator.IS:
constraints.append("main_events.url_path = %(url)s")
constraints.append("JSON_VALUE(CAST(main_events.`$properties` AS String), '$.url_path') = %(url)s")
else:
constraints.append("main_events.url_path ILIKE %(url)s")
constraints.append("JSON_VALUE(CAST(main_events.`$properties` AS String), '$.url_path') ILIKE %(url)s")
args["url"] = helper.values_for_operator(data.url, data.operator)
query_from = f"{exp_ch_helper.get_main_events_table(0)} AS main_events"
@ -181,7 +188,7 @@ def __get_1_url(location_condition: schemas.SessionSearchEventSchema2 | None, se
"start_time": start_time,
"end_time": end_time,
}
sub_condition = ["session_id = %(sessionId)s", "event_type = 'CLICK'", "project_id = %(projectId)s"]
sub_condition = ["session_id = %(sessionId)s", "`$event_name` = 'CLICK'", "project_id = %(projectId)s"]
if location_condition and len(location_condition.value) > 0:
f_k = "LOC"
op = sh.get_sql_operator(location_condition.operator)
@ -190,17 +197,23 @@ def __get_1_url(location_condition: schemas.SessionSearchEventSchema2 | None, se
sh.multi_conditions(f'path {op} %({f_k})s', location_condition.value, is_not=False,
value_key=f_k))
with ch_client.ClickHouseClient() as cur:
main_query = cur.format(query=f"""WITH paths AS (SELECT DISTINCT url_path
FROM experimental.events
WHERE {" AND ".join(sub_condition)})
SELECT url_path, COUNT(1) AS count
FROM experimental.events
INNER JOIN paths USING (url_path)
WHERE event_type = 'CLICK'
main_query = cur.format(query=f"""WITH paths AS (
SELECT DISTINCT
JSON_VALUE(CAST(`$properties` AS String), '$.url_path') AS url_path
FROM product_analytics.events
WHERE {" AND ".join(sub_condition)}
)
SELECT
paths.url_path,
COUNT(*) AS count
FROM product_analytics.events
INNER JOIN paths
ON JSON_VALUE(CAST(product_analytics.events.$properties AS String), '$.url_path') = paths.url_path
WHERE `$event_name` = 'CLICK'
AND project_id = %(projectId)s
AND datetime >= toDateTime(%(start_time)s / 1000)
AND datetime <= toDateTime(%(end_time)s / 1000)
GROUP BY url_path
AND created_at >= toDateTime(%(start_time)s / 1000)
AND created_at <= toDateTime(%(end_time)s / 1000)
GROUP BY paths.url_path
ORDER BY count DESC
LIMIT 1;""",
parameters=full_args)
@ -352,19 +365,21 @@ def get_selected_session(project_id, session_id):
def get_page_events(session_id, project_id):
with ch_client.ClickHouseClient() as cur:
rows = cur.execute("""\
SELECT
message_id,
toUnixTimestamp(datetime)*1000 AS timestamp,
url_host AS host,
url_path AS path,
url_path AS value,
url_path AS url,
query = cur.format(query=f"""SELECT
event_id as message_id,
toUnixTimestamp(created_at)*1000 AS timestamp,
JSON_VALUE(CAST(`$properties` AS String), '$.url_host') AS host,
JSON_VALUE(CAST(`$properties` AS String), '$.url_path') AS path,
JSON_VALUE(CAST(`$properties` AS String), '$.url_path') AS value,
JSON_VALUE(CAST(`$properties` AS String), '$.url_path') AS url,
'LOCATION' AS type
FROM experimental.events
FROM product_analytics.events
WHERE session_id = %(session_id)s
AND event_type='LOCATION'
AND `$event_name`='LOCATION'
AND project_id= %(project_id)s
ORDER BY datetime,message_id;""", {"session_id": session_id, "project_id": project_id})
ORDER BY created_at,message_id;""",
parameters={"session_id": session_id, "project_id": project_id})
rows = cur.execute(query=query)
rows = helper.list_to_camel_case(rows)
return rows

View file

@ -27,6 +27,22 @@ def __get_basic_constraints(table_name=None, time_constraint=True, round_start=F
return ch_sub_query + __get_generic_constraint(data=data, table_name=table_name)
def __get_basic_constraints_events(table_name=None, time_constraint=True, round_start=False, data={}, identifier="project_id"):
if table_name:
table_name += "."
else:
table_name = ""
ch_sub_query = [f"{table_name}{identifier} =toUInt16(%({identifier})s)"]
if time_constraint:
if round_start:
ch_sub_query.append(
f"toStartOfInterval({table_name}created_at, INTERVAL %(step_size)s second) >= toDateTime(%(startTimestamp)s/1000)")
else:
ch_sub_query.append(f"{table_name}created_at >= toDateTime(%(startTimestamp)s/1000)")
ch_sub_query.append(f"{table_name}created_at < toDateTime(%(endTimestamp)s/1000)")
return ch_sub_query + __get_generic_constraint(data=data, table_name=table_name)
def __frange(start, stop, step):
result = []
i = start

View file

@ -19,9 +19,9 @@ def get_simple_funnel(filter_d: schemas.CardSeriesFilterSchema, project: schemas
filters: List[schemas.SessionSearchFilterSchema] = filter_d.filters
platform = project.platform
constraints = ["e.project_id = %(project_id)s",
"e.datetime >= toDateTime(%(startTimestamp)s/1000)",
"e.datetime <= toDateTime(%(endTimestamp)s/1000)",
"e.event_type IN %(eventTypes)s"]
"e.created_at >= toDateTime(%(startTimestamp)s/1000)",
"e.created_at <= toDateTime(%(endTimestamp)s/1000)",
"e.`$event_name` IN %(eventTypes)s"]
full_args = {"project_id": project.project_id, "startTimestamp": filter_d.startTimestamp,
"endTimestamp": filter_d.endTimestamp}
@ -157,16 +157,23 @@ def get_simple_funnel(filter_d: schemas.CardSeriesFilterSchema, project: schemas
if next_event_type not in event_types:
event_types.append(next_event_type)
full_args[f"event_type_{i}"] = next_event_type
n_stages_query.append(f"event_type=%(event_type_{i})s")
n_stages_query.append(f"`$event_name`=%(event_type_{i})s")
if is_not:
n_stages_query_not.append(n_stages_query[-1] + " AND " +
(sh.multi_conditions(f' {next_col_name} {op} %({e_k})s', s.value,
is_not=is_not, value_key=e_k)
if not specific_condition else specific_condition))
(sh.multi_conditions(
f"JSON_VALUE(CAST(`$properties` AS String), '$.{next_col_name}') {op} %({e_k})s",
s.value,
is_not=is_not,
value_key=e_k
) if not specific_condition else specific_condition))
elif not is_any:
n_stages_query[-1] += " AND " + (sh.multi_conditions(f' {next_col_name} {op} %({e_k})s', s.value,
is_not=is_not, value_key=e_k)
if not specific_condition else specific_condition)
n_stages_query[-1] += " AND " + (
sh.multi_conditions(
f"JSON_VALUE(CAST(`$properties` AS String), '$.{next_col_name}') {op} %({e_k})s",
s.value,
is_not=is_not,
value_key=e_k
) if not specific_condition else specific_condition)
full_args = {"eventTypes": tuple(event_types), **full_args, **values}
n_stages = len(n_stages_query)
@ -188,8 +195,8 @@ def get_simple_funnel(filter_d: schemas.CardSeriesFilterSchema, project: schemas
if len(n_stages_query_not) > 0:
value_conditions_not_base = ["project_id = %(project_id)s",
"datetime >= toDateTime(%(startTimestamp)s/1000)",
"datetime <= toDateTime(%(endTimestamp)s/1000)"]
"created_at >= toDateTime(%(startTimestamp)s/1000)",
"created_at <= toDateTime(%(endTimestamp)s/1000)"]
_value_conditions_not = []
value_conditions_not = []
for c in n_stages_query_not:
@ -221,7 +228,7 @@ def get_simple_funnel(filter_d: schemas.CardSeriesFilterSchema, project: schemas
pattern += f"(?{j + 1})"
conditions.append(n_stages_query[j])
j += 1
sequences.append(f"sequenceMatch('{pattern}')(e.datetime, {','.join(conditions)}) AS T{i + 1}")
sequences.append(f"sequenceMatch('{pattern}')(toDateTime(e.created_at), {','.join(conditions)}) AS T{i + 1}")
n_stages_query = f"""
SELECT {",".join(projections)}

View file

@ -23,246 +23,6 @@ JOURNEY_TYPES = {
}
def __get_test_data():
with ch_client.ClickHouseClient(database="experimental") as ch:
ch_query1 = """
CREATE TEMPORARY TABLE pre_ranked_events_1736344377403 AS
(WITH initial_event AS (SELECT events.session_id, MIN(datetime) AS start_event_timestamp
FROM experimental.events AS events
WHERE ((event_type = 'LOCATION' AND (url_path = '/en/deployment/')))
AND events.project_id = toUInt16(65)
AND events.datetime >= toDateTime(1735599600000 / 1000)
AND events.datetime < toDateTime(1736290799999 / 1000)
GROUP BY 1),
pre_ranked_events AS (SELECT *
FROM (SELECT session_id,
event_type,
datetime,
url_path AS e_value,
row_number() OVER (PARTITION BY session_id
ORDER BY datetime ,
message_id ) AS event_number_in_session
FROM experimental.events AS events
INNER JOIN initial_event ON (events.session_id = initial_event.session_id)
WHERE events.project_id = toUInt16(65)
AND events.datetime >= toDateTime(1735599600000 / 1000)
AND events.datetime < toDateTime(1736290799999 / 1000)
AND (events.event_type = 'LOCATION')
AND events.datetime >= initial_event.start_event_timestamp
) AS full_ranked_events
WHERE event_number_in_session <= 5)
SELECT *
FROM pre_ranked_events);
"""
ch.execute(query=ch_query1, parameters={})
ch_query1 = """
CREATE TEMPORARY TABLE ranked_events_1736344377403 AS
(WITH pre_ranked_events AS (SELECT *
FROM pre_ranked_events_1736344377403),
start_points AS (SELECT DISTINCT session_id
FROM pre_ranked_events
WHERE ((event_type = 'LOCATION' AND (e_value = '/en/deployment/')))
AND pre_ranked_events.event_number_in_session = 1),
ranked_events AS (SELECT pre_ranked_events.*,
leadInFrame(e_value)
OVER (PARTITION BY session_id ORDER BY datetime
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS next_value,
leadInFrame(toNullable(event_type))
OVER (PARTITION BY session_id ORDER BY datetime
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS next_type
FROM start_points
INNER JOIN pre_ranked_events USING (session_id))
SELECT *
FROM ranked_events);
"""
ch.execute(query=ch_query1, parameters={})
ch_query1 = """
WITH ranked_events AS (SELECT *
FROM ranked_events_1736344377403),
n1 AS (SELECT event_number_in_session,
event_type,
e_value,
next_type,
next_value,
COUNT(1) AS sessions_count
FROM ranked_events
WHERE event_number_in_session = 1
GROUP BY event_number_in_session, event_type, e_value, next_type, next_value
ORDER BY sessions_count DESC),
n2 AS (SELECT event_number_in_session,
event_type,
e_value,
next_type,
next_value,
COUNT(1) AS sessions_count
FROM ranked_events
WHERE event_number_in_session = 2
GROUP BY event_number_in_session, event_type, e_value, next_type, next_value
ORDER BY sessions_count DESC),
n3 AS (SELECT event_number_in_session,
event_type,
e_value,
next_type,
next_value,
COUNT(1) AS sessions_count
FROM ranked_events
WHERE event_number_in_session = 3
GROUP BY event_number_in_session, event_type, e_value, next_type, next_value
ORDER BY sessions_count DESC),
drop_n AS (-- STEP 1
SELECT event_number_in_session,
event_type,
e_value,
'DROP' AS next_type,
NULL AS next_value,
sessions_count
FROM n1
WHERE isNull(n1.next_type)
UNION ALL
-- STEP 2
SELECT event_number_in_session,
event_type,
e_value,
'DROP' AS next_type,
NULL AS next_value,
sessions_count
FROM n2
WHERE isNull(n2.next_type)),
top_n AS (SELECT event_number_in_session,
event_type,
e_value,
SUM(sessions_count) AS sessions_count
FROM n1
GROUP BY event_number_in_session, event_type, e_value
LIMIT 1
UNION ALL
-- STEP 2
SELECT event_number_in_session,
event_type,
e_value,
SUM(sessions_count) AS sessions_count
FROM n2
GROUP BY event_number_in_session, event_type, e_value
ORDER BY sessions_count DESC
LIMIT 3
UNION ALL
-- STEP 3
SELECT event_number_in_session,
event_type,
e_value,
SUM(sessions_count) AS sessions_count
FROM n3
GROUP BY event_number_in_session, event_type, e_value
ORDER BY sessions_count DESC
LIMIT 3),
top_n_with_next AS (SELECT n1.*
FROM n1
UNION ALL
SELECT n2.*
FROM n2
INNER JOIN top_n ON (n2.event_number_in_session = top_n.event_number_in_session
AND n2.event_type = top_n.event_type
AND n2.e_value = top_n.e_value)),
others_n AS (
-- STEP 2
SELECT n2.*
FROM n2
WHERE (n2.event_number_in_session, n2.event_type, n2.e_value) NOT IN
(SELECT event_number_in_session, event_type, e_value
FROM top_n
WHERE top_n.event_number_in_session = 2)
UNION ALL
-- STEP 3
SELECT n3.*
FROM n3
WHERE (n3.event_number_in_session, n3.event_type, n3.e_value) NOT IN
(SELECT event_number_in_session, event_type, e_value
FROM top_n
WHERE top_n.event_number_in_session = 3))
SELECT *
FROM (
-- Top to Top: valid
SELECT top_n_with_next.*
FROM top_n_with_next
INNER JOIN top_n
ON (top_n_with_next.event_number_in_session + 1 = top_n.event_number_in_session
AND top_n_with_next.next_type = top_n.event_type
AND top_n_with_next.next_value = top_n.e_value)
UNION ALL
-- Top to Others: valid
SELECT top_n_with_next.event_number_in_session,
top_n_with_next.event_type,
top_n_with_next.e_value,
'OTHER' AS next_type,
NULL AS next_value,
SUM(top_n_with_next.sessions_count) AS sessions_count
FROM top_n_with_next
WHERE (top_n_with_next.event_number_in_session + 1, top_n_with_next.next_type, top_n_with_next.next_value) IN
(SELECT others_n.event_number_in_session, others_n.event_type, others_n.e_value FROM others_n)
GROUP BY top_n_with_next.event_number_in_session, top_n_with_next.event_type, top_n_with_next.e_value
UNION ALL
-- Top go to Drop: valid
SELECT drop_n.event_number_in_session,
drop_n.event_type,
drop_n.e_value,
drop_n.next_type,
drop_n.next_value,
drop_n.sessions_count
FROM drop_n
INNER JOIN top_n ON (drop_n.event_number_in_session = top_n.event_number_in_session
AND drop_n.event_type = top_n.event_type
AND drop_n.e_value = top_n.e_value)
ORDER BY drop_n.event_number_in_session
UNION ALL
-- Others got to Drop: valid
SELECT others_n.event_number_in_session,
'OTHER' AS event_type,
NULL AS e_value,
'DROP' AS next_type,
NULL AS next_value,
SUM(others_n.sessions_count) AS sessions_count
FROM others_n
WHERE isNull(others_n.next_type)
AND others_n.event_number_in_session < 3
GROUP BY others_n.event_number_in_session, next_type, next_value
UNION ALL
-- Others got to Top:valid
SELECT others_n.event_number_in_session,
'OTHER' AS event_type,
NULL AS e_value,
others_n.next_type,
others_n.next_value,
SUM(others_n.sessions_count) AS sessions_count
FROM others_n
WHERE isNotNull(others_n.next_type)
AND (others_n.event_number_in_session + 1, others_n.next_type, others_n.next_value) IN
(SELECT top_n.event_number_in_session, top_n.event_type, top_n.e_value FROM top_n)
GROUP BY others_n.event_number_in_session, others_n.next_type, others_n.next_value
UNION ALL
-- Others got to Others
SELECT others_n.event_number_in_session,
'OTHER' AS event_type,
NULL AS e_value,
'OTHER' AS next_type,
NULL AS next_value,
SUM(sessions_count) AS sessions_count
FROM others_n
WHERE isNotNull(others_n.next_type)
AND others_n.event_number_in_session < 3
AND (others_n.event_number_in_session + 1, others_n.next_type, others_n.next_value) NOT IN
(SELECT event_number_in_session, event_type, e_value FROM top_n)
GROUP BY others_n.event_number_in_session)
ORDER BY event_number_in_session, sessions_count DESC;"""
rows = ch.execute(query=ch_query1, parameters={})
drop = 0
for r in rows:
if r["next_type"] == "DROP":
drop += r["sessions_count"]
r["sessions_count"] = drop
return __transform_journey(rows=rows, reverse_path=False)
# startPoints are computed before ranked_events to reduce the number of window functions over rows
# compute avg_time_from_previous at the same level as sessions_count (this was removed in v1.22)
@ -289,7 +49,8 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
sub_events.append({"column": JOURNEY_TYPES[s.type]["column"],
"eventType": JOURNEY_TYPES[s.type]["eventType"]})
step_1_post_conditions.append(
f"(event_type!='{JOURNEY_TYPES[s.type]["eventType"]}' OR event_number_in_session = 1)")
f"(event_type='{JOURNEY_TYPES[s.type]["eventType"]}' AND event_number_in_session = 1 \
OR event_type!='{JOURNEY_TYPES[s.type]["eventType"]}' AND event_number_in_session > 1)")
extra_metric_values.append(s.type)
data.metric_value += extra_metric_values

View file

@ -0,0 +1,389 @@
from typing import List
import schemas
from chalicelib.core.metrics.metrics_ch import __get_basic_constraints, __get_basic_constraints_events
from chalicelib.core.metrics.metrics_ch import __get_constraint_values, __complete_missing_steps
from chalicelib.utils import ch_client, exp_ch_helper
from chalicelib.utils import helper, dev
from chalicelib.utils.TimeUTC import TimeUTC
from chalicelib.utils import sql_helper as sh
from chalicelib.core import metadata
from time import time
import logging
from chalicelib.core.metrics.product_analytics import __transform_journey
logger = logging.getLogger(__name__)
JOURNEY_TYPES = {
schemas.ProductAnalyticsSelectedEventType.LOCATION: {
"eventType": "LOCATION",
"column": "JSON_VALUE(CAST(`$properties` AS String), '$.url_path')",
},
schemas.ProductAnalyticsSelectedEventType.CLICK: {
"eventType": "LOCATION",
"column": "JSON_VALUE(CAST(`$properties` AS String), '$.label')",
},
schemas.ProductAnalyticsSelectedEventType.INPUT: {
"eventType": "LOCATION",
"column": "JSON_VALUE(CAST(`$properties` AS String), '$.label')",
},
schemas.ProductAnalyticsSelectedEventType.CUSTOM_EVENT: {
"eventType": "LOCATION",
"column": "JSON_VALUE(CAST(`$properties` AS String), '$.name')",
},
}
# Q6: use events as a sub_query to support filter of materialized columns when doing a join
# query: Q5, the result is correct,
# startPoints are computed before ranked_events to reduce the number of window functions over rows
# replaced time_to_target by time_from_previous
# compute avg_time_from_previous at the same level as sessions_count (this was removed in v1.22)
# sort by top 5 according to sessions_count at the CTE level
# final part project data without grouping
# if start-point is selected, the selected event is ranked n°1
def path_analysis(project_id: int, data: schemas.CardPathAnalysis):
sub_events = []
start_points_conditions = []
step_0_conditions = []
step_1_post_conditions = ["event_number_in_session <= %(density)s"]
if len(data.metric_value) == 0:
data.metric_value.append(schemas.ProductAnalyticsSelectedEventType.LOCATION)
sub_events.append({"column": JOURNEY_TYPES[schemas.ProductAnalyticsSelectedEventType.LOCATION]["column"],
"eventType": schemas.ProductAnalyticsSelectedEventType.LOCATION.value})
else:
if len(data.start_point) > 0:
extra_metric_values = []
for s in data.start_point:
if s.type not in data.metric_value:
sub_events.append({"column": JOURNEY_TYPES[s.type]["column"],
"eventType": JOURNEY_TYPES[s.type]["eventType"]})
step_1_post_conditions.append(
f"(`$event_name`!='{JOURNEY_TYPES[s.type]["eventType"]}' OR event_number_in_session = 1)")
extra_metric_values.append(s.type)
data.metric_value += extra_metric_values
for v in data.metric_value:
if JOURNEY_TYPES.get(v):
sub_events.append({"column": JOURNEY_TYPES[v]["column"],
"eventType": JOURNEY_TYPES[v]["eventType"]})
if len(sub_events) == 1:
main_column = sub_events[0]['column']
else:
main_column = f"multiIf(%s,%s)" % (
','.join([f"`$event_name`='{s['eventType']}',{s['column']}" for s in sub_events[:-1]]),
sub_events[-1]["column"])
extra_values = {}
reverse = data.start_type == "end"
for i, sf in enumerate(data.start_point):
f_k = f"start_point_{i}"
op = sh.get_sql_operator(sf.operator)
sf.value = helper.values_for_operator(value=sf.value, op=sf.operator)
is_not = sh.is_negation_operator(sf.operator)
event_column = JOURNEY_TYPES[sf.type]['column']
event_type = JOURNEY_TYPES[sf.type]['eventType']
extra_values = {**extra_values, **sh.multi_values(sf.value, value_key=f_k),
f"start_event_type_{i}": event_type}
start_points_conditions.append(f"(`$event_name`=%(start_event_type_{i})s AND " +
sh.multi_conditions(f'{event_column} {op} %({f_k})s', sf.value, is_not=is_not,
value_key=f_k)
+ ")")
step_0_conditions.append(f"(`$event_name`=%(start_event_type_{i})s AND " +
sh.multi_conditions(f'e_value {op} %({f_k})s', sf.value, is_not=is_not,
value_key=f_k)
+ ")")
if len(start_points_conditions) > 0:
start_points_conditions = ["(" + " OR ".join(start_points_conditions) + ")",
"events.project_id = toUInt16(%(project_id)s)",
"events.created_at >= toDateTime(%(startTimestamp)s / 1000)",
"events.created_at < toDateTime(%(endTimestamp)s / 1000)"]
step_0_conditions = ["(" + " OR ".join(step_0_conditions) + ")",
"pre_ranked_events.event_number_in_session = 1"]
exclusions = {}
for i, ef in enumerate(data.excludes):
if len(ef.value) == 0:
continue
if ef.type in data.metric_value:
f_k = f"exclude_{i}"
ef.value = helper.values_for_operator(value=ef.value, op=ef.operator)
op = sh.get_sql_operator(ef.operator)
op = sh.reverse_sql_operator(op)
extra_values = {**extra_values, **sh.multi_values(ef.value, value_key=f_k)}
exclusions[ef.type] = [
sh.multi_conditions(f'{JOURNEY_TYPES[ef.type]["column"]} {op} %({f_k})s', ef.value, is_not=True,
value_key=f_k)]
sessions_conditions = []
meta_keys = None
for i, f in enumerate(data.series[0].filter.filters):
op = sh.get_sql_operator(f.operator)
is_any = sh.isAny_opreator(f.operator)
is_not = sh.is_negation_operator(f.operator)
is_undefined = sh.isUndefined_operator(f.operator)
f_k = f"f_value_{i}"
extra_values = {**extra_values, **sh.multi_values(f.value, value_key=f_k)}
if not is_any and len(f.value) == 0:
continue
process_filter(f, is_any, is_not, is_undefined, op, f_k, sessions_conditions, extra_values, meta_keys,
project_id)
if reverse:
path_direction = "DESC"
else:
path_direction = ""
ch_sub_query = __get_basic_constraints_events(table_name="events")
selected_event_type_sub_query = []
for s in data.metric_value:
selected_event_type_sub_query.append(f"events.`$event_name` = '{JOURNEY_TYPES[s]['eventType']}'")
if s in exclusions:
selected_event_type_sub_query[-1] += " AND (" + " AND ".join(exclusions[s]) + ")"
selected_event_type_sub_query = " OR ".join(selected_event_type_sub_query)
ch_sub_query.append(f"({selected_event_type_sub_query})")
main_events_table = exp_ch_helper.get_main_events_table(data.startTimestamp) + " AS events"
main_sessions_table = exp_ch_helper.get_main_sessions_table(data.startTimestamp) + " AS sessions"
if len(sessions_conditions) > 0:
sessions_conditions.append(f"sessions.project_id = toUInt16(%(project_id)s)")
sessions_conditions.append(f"sessions.datetime >= toDateTime(%(startTimestamp)s / 1000)")
sessions_conditions.append(f"sessions.datetime < toDateTime(%(endTimestamp)s / 1000)")
sessions_conditions.append("sessions.events_count>1")
sessions_conditions.append("sessions.duration>0")
initial_sessions_cte = f"""sub_sessions AS (SELECT DISTINCT session_id
FROM {main_sessions_table}
WHERE {" AND ".join(sessions_conditions)}),"""
else:
initial_sessions_cte = ""
if len(start_points_conditions) == 0:
step_0_subquery = """SELECT DISTINCT session_id
FROM (SELECT `$event_name`, e_value
FROM pre_ranked_events
WHERE event_number_in_session = 1
GROUP BY `$event_name`, e_value
ORDER BY count(1) DESC
LIMIT 1) AS top_start_events
INNER JOIN pre_ranked_events
ON (top_start_events.`$event_name` = pre_ranked_events.`$event_name` AND
top_start_events.e_value = pre_ranked_events.e_value)
WHERE pre_ranked_events.event_number_in_session = 1"""
initial_event_cte = ""
else:
step_0_subquery = f"""SELECT DISTINCT session_id
FROM pre_ranked_events
WHERE {" AND ".join(step_0_conditions)}"""
initial_event_cte = f"""\
initial_event AS (SELECT events.session_id, MIN(created_at) AS start_event_timestamp
FROM {main_events_table} {"INNER JOIN sub_sessions USING (session_id)" if len(sessions_conditions) > 0 else ""}
WHERE {" AND ".join(start_points_conditions)}
GROUP BY 1),"""
ch_sub_query.append("events.created_at>=initial_event.start_event_timestamp")
main_events_table += " INNER JOIN initial_event ON (events.session_id = initial_event.session_id)"
sessions_conditions = []
steps_query = ["""n1 AS (SELECT event_number_in_session,
`$event_name` as event_type,
e_value,
next_type,
next_value,
COUNT(1) AS sessions_count
FROM ranked_events
WHERE event_number_in_session = 1
AND isNotNull(next_value)
GROUP BY event_number_in_session, event_type, e_value, next_type, next_value
ORDER BY sessions_count DESC
LIMIT %(eventThresholdNumberInGroup)s)"""]
projection_query = ["""SELECT event_number_in_session,
event_type,
e_value,
next_type,
next_value,
sessions_count
FROM n1"""]
for i in range(2, data.density + 1):
steps_query.append(f"""n{i} AS (SELECT *
FROM (SELECT re.event_number_in_session AS event_number_in_session,
re.`$event_name` as $event_name,
re.e_value AS e_value,
re.next_type AS next_type,
re.next_value AS next_value,
COUNT(1) AS sessions_count
FROM n{i - 1} INNER JOIN ranked_events AS re
ON (n{i - 1}.next_value = re.e_value AND n{i - 1}.next_type = re.`$event_name`)
WHERE re.event_number_in_session = {i}
GROUP BY re.event_number_in_session, re.`$event_name`, re.e_value, re.next_type, re.next_value) AS sub_level
ORDER BY sessions_count DESC
LIMIT %(eventThresholdNumberInGroup)s)""")
projection_query.append(f"""SELECT event_number_in_session,
`$event_name`,
e_value,
next_type,
next_value,
sessions_count
FROM n{i}""")
with ch_client.ClickHouseClient(database="product_analytics") as ch:
time_key = TimeUTC.now()
_now = time()
params = {"project_id": project_id, "startTimestamp": data.startTimestamp,
"endTimestamp": data.endTimestamp, "density": data.density,
# This is ignored because UI will take care of it
# "eventThresholdNumberInGroup": 4 if data.hide_excess else 8,
"eventThresholdNumberInGroup": 8,
**extra_values}
ch_query1 = f"""\
CREATE TEMPORARY TABLE pre_ranked_events_{time_key} AS
WITH {initial_sessions_cte}
{initial_event_cte}
pre_ranked_events AS (SELECT *
FROM (SELECT session_id,
`$event_name`,
created_at,
{main_column} AS e_value,
row_number() OVER (PARTITION BY session_id
ORDER BY created_at {path_direction},
event_id {path_direction} ) AS event_number_in_session
FROM {main_events_table} {"INNER JOIN sub_sessions ON (sub_sessions.session_id = events.session_id)" if len(sessions_conditions) > 0 else ""}
WHERE {" AND ".join(ch_sub_query)}
) AS full_ranked_events
WHERE {" AND ".join(step_1_post_conditions)})
SELECT *
FROM pre_ranked_events;"""
logger.debug("---------Q1-----------")
query = ch.format(query=ch_query1, parameters=params)
ch.execute(query=query)
if time() - _now > 2:
logger.warning(f">>>>>>>>>PathAnalysis long query EE ({int(time() - _now)}s)<<<<<<<<<")
logger.warning(query)
logger.warning("----------------------")
_now = time()
ch_query2 = f"""\
CREATE TEMPORARY TABLE ranked_events_{time_key} AS
WITH pre_ranked_events AS (SELECT *
FROM pre_ranked_events_{time_key}),
start_points AS ({step_0_subquery}),
ranked_events AS (SELECT pre_ranked_events.*,
leadInFrame(e_value)
OVER (PARTITION BY session_id ORDER BY created_at {path_direction}
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS next_value,
leadInFrame(toNullable(`$event_name`))
OVER (PARTITION BY session_id ORDER BY created_at {path_direction}
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS next_type
FROM start_points INNER JOIN pre_ranked_events USING (session_id))
SELECT *
FROM ranked_events;"""
logger.debug("---------Q2-----------")
query = ch.format(query=ch_query2, parameters=params)
ch.execute(query=query)
if time() - _now > 2:
logger.warning(f">>>>>>>>>PathAnalysis long query EE ({int(time() - _now)}s)<<<<<<<<<")
logger.warning(query)
logger.warning("----------------------")
_now = time()
ch_query3 = f"""\
WITH ranked_events AS (SELECT *
FROM ranked_events_{time_key}),
{",".join(steps_query)}
SELECT *
FROM ({" UNION ALL ".join(projection_query)}) AS chart_steps
ORDER BY event_number_in_session;"""
logger.debug("---------Q3-----------")
query = ch.format(query=ch_query3, parameters=params)
rows = ch.execute(query=query)
if time() - _now > 2:
logger.warning(f">>>>>>>>>PathAnalysis long query EE ({int(time() - _now)}s)<<<<<<<<<")
logger.warning(query)
logger.warning("----------------------")
return __transform_journey(rows=rows, reverse_path=reverse)
def process_filter(f, is_any, is_not, is_undefined, op, f_k, sessions_conditions, extra_values, meta_keys, project_id):
# Mapping for common types to their column names
type_column_mapping = {
schemas.FilterType.USER_BROWSER: 'user_browser',
schemas.FilterType.USER_OS: 'user_os',
schemas.FilterType.USER_DEVICE: 'user_device',
schemas.FilterType.USER_COUNTRY: 'user_country',
schemas.FilterType.USER_CITY: 'user_city',
schemas.FilterType.USER_STATE: 'user_state',
schemas.FilterType.UTM_SOURCE: 'utm_source',
schemas.FilterType.UTM_MEDIUM: 'utm_medium',
schemas.FilterType.UTM_CAMPAIGN: 'utm_campaign',
schemas.FilterType.USER_ID: 'user_id',
schemas.FilterType.USER_ID_MOBILE: 'user_id',
schemas.FilterType.USER_ANONYMOUS_ID: 'user_anonymous_id',
schemas.FilterType.USER_ANONYMOUS_ID_MOBILE: 'user_anonymous_id',
schemas.FilterType.REV_ID: 'rev_id',
schemas.FilterType.REV_ID_MOBILE: 'rev_id',
}
if f.type in type_column_mapping:
column = type_column_mapping[f.type]
if is_any:
sessions_conditions.append(f'isNotNull({column})')
elif is_undefined:
sessions_conditions.append(f'isNull({column})')
else:
sessions_conditions.append(
sh.multi_conditions(f"{column} {op} toString(%({f_k})s)", f.value, is_not=is_not, value_key=f_k)
)
elif f.type == schemas.FilterType.DURATION:
if len(f.value) > 0 and f.value[0] is not None:
sessions_conditions.append("duration >= %(minDuration)s")
extra_values["minDuration"] = f.value[0]
if len(f.value) > 1 and f.value[1] is not None and int(f.value[1]) > 0:
sessions_conditions.append("duration <= %(maxDuration)s")
extra_values["maxDuration"] = f.value[1]
elif f.type == schemas.FilterType.REFERRER:
if is_any:
sessions_conditions.append('isNotNull(base_referrer)')
else:
sessions_conditions.append(
sh.multi_conditions(f"base_referrer {op} %({f_k})s", f.value, is_not=is_not, value_key=f_k)
)
elif f.type == schemas.FilterType.METADATA:
if meta_keys is None:
meta_keys = metadata.get(project_id=project_id)
meta_keys = {m["key"]: m["index"] for m in meta_keys}
if f.source in meta_keys.keys():
column = metadata.index_to_colname(meta_keys[f.source])
if is_any:
sessions_conditions.append(f"isNotNull({column})")
elif is_undefined:
sessions_conditions.append(f"isNull({column})")
else:
sessions_conditions.append(
sh.multi_conditions(f"{column} {op} toString(%({f_k})s)", f.value, is_not=is_not, value_key=f_k)
)
elif f.type == schemas.FilterType.PLATFORM:
sessions_conditions.append(
sh.multi_conditions(f"user_device_type {op} %({f_k})s", f.value, is_not=is_not, value_key=f_k)
)
elif f.type == schemas.FilterType.ISSUE:
if is_any:
sessions_conditions.append("array_length(issue_types, 1) > 0")
else:
sessions_conditions.append(
sh.multi_conditions(f"has(issue_types,%({f_k})s)", f.value, is_not=is_not, value_key=f_k)
)
elif f.type == schemas.FilterType.EVENTS_COUNT:
sessions_conditions.append(
sh.multi_conditions(f"events_count {op} %({f_k})s", f.value, is_not=is_not, value_key=f_k)
)

View file

@ -21,10 +21,10 @@ def search2_series(data: schemas.SessionsSearchPayloadSchema, project_id: int, d
if metric_of == schemas.MetricOfTable.VISITED_URL:
extra_event = f"""SELECT DISTINCT ev.session_id, ev.url_path
FROM {exp_ch_helper.get_main_events_table(data.startTimestamp)} AS ev
WHERE ev.datetime >= toDateTime(%(startDate)s / 1000)
AND ev.datetime <= toDateTime(%(endDate)s / 1000)
WHERE ev.created_at >= toDateTime(%(startDate)s / 1000)
AND ev.created_at <= toDateTime(%(endDate)s / 1000)
AND ev.project_id = %(project_id)s
AND ev.event_type = 'LOCATION'"""
AND ev.`$event_name` = 'LOCATION'"""
elif metric_of == schemas.MetricOfTable.ISSUES and len(metric_value) > 0:
data.filters.append(schemas.SessionSearchFilterSchema(value=metric_value, type=schemas.FilterType.ISSUE,
operator=schemas.SearchEventOperator.IS))
@ -135,12 +135,13 @@ def search2_table(data: schemas.SessionsSearchPayloadSchema, project_id: int, de
extra_deduplication = []
extra_conditions = None
if metric_of == schemas.MetricOfTable.VISITED_URL:
extra_event = f"""SELECT DISTINCT ev.session_id, ev.url_path
extra_event = f"""SELECT DISTINCT ev.session_id,
JSONExtractString(toString(ev.`$properties`), 'url_path') AS url_path
FROM {exp_ch_helper.get_main_events_table(data.startTimestamp)} AS ev
WHERE ev.datetime >= toDateTime(%(startDate)s / 1000)
AND ev.datetime <= toDateTime(%(endDate)s / 1000)
WHERE ev.created_at >= toDateTime(%(startDate)s / 1000)
AND ev.created_at <= toDateTime(%(endDate)s / 1000)
AND ev.project_id = %(project_id)s
AND ev.event_type = 'LOCATION'"""
AND ev.`$event_name` = 'LOCATION'"""
extra_deduplication.append("url_path")
extra_conditions = {}
for e in data.events:
@ -158,12 +159,14 @@ def search2_table(data: schemas.SessionsSearchPayloadSchema, project_id: int, de
extra_conditions[e.operator].value.append(v)
extra_conditions = list(extra_conditions.values())
elif metric_of == schemas.MetricOfTable.FETCH:
extra_event = f"""SELECT DISTINCT ev.session_id, ev.url_path
extra_event = f"""SELECT DISTINCT ev.session_id,
JSONExtractString(toString(ev.`$properties`), 'url_path') AS url_path
FROM {exp_ch_helper.get_main_events_table(data.startTimestamp)} AS ev
WHERE ev.datetime >= toDateTime(%(startDate)s / 1000)
AND ev.datetime <= toDateTime(%(endDate)s / 1000)
WHERE ev.created_at >= toDateTime(%(startDate)s / 1000)
AND ev.created_at <= toDateTime(%(endDate)s / 1000)
AND ev.project_id = %(project_id)s
AND ev.event_type = 'REQUEST'"""
AND ev.`$event_name` = 'REQUEST'"""
extra_deduplication.append("url_path")
extra_conditions = {}
for e in data.events:
@ -286,6 +289,53 @@ def __is_valid_event(is_any: bool, event: schemas.SessionSearchEventSchema2):
event.filters is None or len(event.filters) == 0))
def json_condition(table_alias, json_column, json_key, op, values, value_key, check_existence=False,
numeric_check=False, numeric_type="float"):
"""
Constructs a condition to filter a JSON column dynamically in SQL queries.
Parameters:
table_alias (str): Alias of the table (e.g., 'main', 'sub').
json_column (str): Name of the JSON column (e.g., '$properties').
json_key (str): Key in the JSON object to extract.
op (str): SQL operator to apply (e.g., '=', 'ILIKE', etc.).
values (str | list | tuple): Single value, list of values, or tuple to compare.
value_key (str): The parameterized key for SQL (e.g., 'custom').
check_existence (bool): Whether to include a JSONHas condition to check if the key exists.
numeric_check (bool): Whether to include a numeric check on the extracted value.
numeric_type (str): Type for numeric extraction, "int" or "float".
Returns:
str: The constructed condition.
"""
if isinstance(values, tuple):
values = list(values)
elif not isinstance(values, list):
values = [values]
conditions = []
# Add JSONHas condition if required
if check_existence:
conditions.append(f"JSONHas(toString({table_alias}.`{json_column}`), '{json_key}')")
# Determine the extraction function for numeric checks
if numeric_check:
extract_func = "JSONExtractFloat" if numeric_type == "float" else "JSONExtractInt"
conditions.append(f"{extract_func}(toString({table_alias}.`{json_column}`), '{json_key}') > 0")
# Add the main condition for value comparison
if numeric_check:
extract_func = "JSONExtractFloat" if numeric_type == "float" else "JSONExtractInt"
condition = f"{extract_func}(toString({table_alias}.`{json_column}`), '{json_key}') {op} %({value_key})s"
else:
condition = f"JSONExtractString(toString({table_alias}.`{json_column}`), '{json_key}') {op} %({value_key})s"
conditions.append(sh.multi_conditions(condition, values, value_key=value_key))
return " AND ".join(conditions)
# this function generates the query and return the generated-query with the dict of query arguments
def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_status, errors_only, favorite_only, issue,
project_id, user_id, platform="web", extra_event=None, extra_deduplication=[],
@ -317,11 +367,11 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
events_query_part = ""
issues = []
__events_where_basic = ["project_id = %(projectId)s",
"datetime >= toDateTime(%(startDate)s/1000)",
"datetime <= toDateTime(%(endDate)s/1000)"]
"created_at >= toDateTime(%(startDate)s/1000)",
"created_at <= toDateTime(%(endDate)s/1000)"]
events_conditions_where = ["main.project_id = %(projectId)s",
"main.datetime >= toDateTime(%(startDate)s/1000)",
"main.datetime <= toDateTime(%(endDate)s/1000)"]
"main.created_at >= toDateTime(%(startDate)s/1000)",
"main.created_at <= toDateTime(%(endDate)s/1000)"]
if len(data.filters) > 0:
meta_keys = None
# to reduce include a sub-query of sessions inside events query, in order to reduce the selected data
@ -600,8 +650,8 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
# event_from = f"%s INNER JOIN {MAIN_SESSIONS_TABLE} AS ms USING (session_id)"
event_from = "%s"
event_where = ["main.project_id = %(projectId)s",
"main.datetime >= toDateTime(%(startDate)s/1000)",
"main.datetime <= toDateTime(%(endDate)s/1000)"]
"main.created_at >= toDateTime(%(startDate)s/1000)",
"main.created_at <= toDateTime(%(endDate)s/1000)"]
e_k = f"e_value{i}"
s_k = e_k + "_source"
@ -616,41 +666,48 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
if platform == "web":
_column = events.EventType.CLICK.column
event_where.append(
f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
events_conditions.append({"type": event_where[-1]})
if not is_any:
if schemas.ClickEventExtraOperator.has_value(event.operator):
event_where.append(
sh.multi_conditions(f"main.selector {op} %({e_k})s", event.value, value_key=e_k))
event_where.append(json_condition(
"main",
"$properties",
"selector", op, event.value, e_k)
)
events_conditions[-1]["condition"] = event_where[-1]
else:
if is_not:
event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(json_condition(
"sub", "$properties", _column, op, event.value, e_k
))
events_conditions_not.append(
{
"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
"type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
events_conditions_not[-1]["condition"] = event_where[-1]
else:
event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(
json_condition("main", "$properties", _column, op, event.value, e_k)
)
events_conditions[-1]["condition"] = event_where[-1]
else:
_column = events.EventType.CLICK_MOBILE.column
event_where.append(
f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
events_conditions.append({"type": event_where[-1]})
if not is_any:
if is_not:
event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(
json_condition("sub", "$properties", _column, op, event.value, e_k)
)
events_conditions_not.append(
{
"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
"type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
events_conditions_not[-1]["condition"] = event_where[-1]
else:
event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(
json_condition("main", "$properties", _column, op, event.value, e_k)
)
events_conditions[-1]["condition"] = event_where[-1]
elif event_type == events.EventType.INPUT.ui_type:
@ -658,40 +715,47 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
if platform == "web":
_column = events.EventType.INPUT.column
event_where.append(
f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
events_conditions.append({"type": event_where[-1]})
if not is_any:
if is_not:
event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(
json_condition("sub", "$properties", _column, op, event.value, e_k)
)
events_conditions_not.append(
{
"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
"type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
events_conditions_not[-1]["condition"] = event_where[-1]
else:
event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(
json_condition("main", "$properties", _column, op, event.value, e_k)
)
events_conditions[-1]["condition"] = event_where[-1]
if event.source is not None and len(event.source) > 0:
event_where.append(sh.multi_conditions(f"main.value ILIKE %(custom{i})s", event.source,
value_key=f"custom{i}"))
event_where.append(
json_condition("main", "$properties", "value", "ILIKE", event.source, f"custom{i}")
)
full_args = {**full_args, **sh.multi_values(event.source, value_key=f"custom{i}")}
else:
_column = events.EventType.INPUT_MOBILE.column
event_where.append(
f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
events_conditions.append({"type": event_where[-1]})
if not is_any:
if is_not:
event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(
json_condition("sub", "$properties", _column, op, event.value, e_k)
)
events_conditions_not.append(
{
"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
"type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
events_conditions_not[-1]["condition"] = event_where[-1]
else:
event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(
json_condition("main", "$properties", _column, op, event.value, e_k)
)
events_conditions[-1]["condition"] = event_where[-1]
elif event_type == events.EventType.LOCATION.ui_type:
@ -699,32 +763,35 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
if platform == "web":
_column = 'url_path'
event_where.append(
f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
events_conditions.append({"type": event_where[-1]})
if not is_any:
if is_not:
event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(
json_condition("sub", "$properties", _column, op, event.value, e_k)
)
events_conditions_not.append(
{
"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
"type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
events_conditions_not[-1]["condition"] = event_where[-1]
else:
event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s",
event.value, value_key=e_k))
event_where.append(
json_condition("main", "$properties", _column, op, event.value, e_k)
)
events_conditions[-1]["condition"] = event_where[-1]
else:
_column = events.EventType.VIEW_MOBILE.column
event_where.append(
f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
events_conditions.append({"type": event_where[-1]})
if not is_any:
if is_not:
event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(
json_condition("sub", "$properties", _column, op, event.value, e_k)
)
events_conditions_not.append(
{
"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
"type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
events_conditions_not[-1]["condition"] = event_where[-1]
else:
event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s",
@ -733,57 +800,70 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
elif event_type == events.EventType.CUSTOM.ui_type:
event_from = event_from % f"{MAIN_EVENTS_TABLE} AS main "
_column = events.EventType.CUSTOM.column
event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
event_where.append(
f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
events_conditions.append({"type": event_where[-1]})
if not is_any:
if is_not:
event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(
json_condition("sub", "$properties", _column, op, event.value, e_k)
)
events_conditions_not.append(
{"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
{
"type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
events_conditions_not[-1]["condition"] = event_where[-1]
else:
event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(json_condition(
"main", "$properties", _column, op, event.value, e_k
))
events_conditions[-1]["condition"] = event_where[-1]
elif event_type == events.EventType.REQUEST.ui_type:
event_from = event_from % f"{MAIN_EVENTS_TABLE} AS main "
_column = 'url_path'
event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
event_where.append(
f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
events_conditions.append({"type": event_where[-1]})
if not is_any:
if is_not:
event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(json_condition(
"sub", "$properties", _column, op, event.value, e_k
))
events_conditions_not.append(
{"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
{
"type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
events_conditions_not[-1]["condition"] = event_where[-1]
else:
event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(json_condition(
"main", "$properties", _column, op, event.value, e_k
))
events_conditions[-1]["condition"] = event_where[-1]
elif event_type == events.EventType.STATEACTION.ui_type:
event_from = event_from % f"{MAIN_EVENTS_TABLE} AS main "
_column = events.EventType.STATEACTION.column
event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
event_where.append(
f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
events_conditions.append({"type": event_where[-1]})
if not is_any:
if is_not:
event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(json_condition(
"sub", "$properties", _column, op, event.value, e_k
))
events_conditions_not.append(
{"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
{
"type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
events_conditions_not[-1]["condition"] = event_where[-1]
else:
event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s",
event.value, value_key=e_k))
event_where.append(json_condition(
"main", "$properties", _column, op, event.value, e_k
))
events_conditions[-1]["condition"] = event_where[-1]
# TODO: isNot for ERROR
elif event_type == events.EventType.ERROR.ui_type:
event_from = event_from % f"{MAIN_EVENTS_TABLE} AS main"
events_extra_join = f"SELECT * FROM {MAIN_EVENTS_TABLE} AS main1 WHERE main1.project_id=%(project_id)s"
event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
event_where.append(
f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
events_conditions.append({"type": event_where[-1]})
event.source = tuple(event.source)
events_conditions[-1]["condition"] = []
@ -803,127 +883,161 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
# ----- Mobile
elif event_type == events.EventType.CLICK_MOBILE.ui_type:
_column = events.EventType.CLICK_MOBILE.column
event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
event_where.append(
f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
events_conditions.append({"type": event_where[-1]})
if not is_any:
if is_not:
event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(json_condition(
"sub", "$properties", _column, op, event.value, e_k
))
events_conditions_not.append(
{"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
{
"type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
events_conditions_not[-1]["condition"] = event_where[-1]
else:
event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(json_condition(
"main", "$properties", _column, op, event.value, e_k
))
events_conditions[-1]["condition"] = event_where[-1]
elif event_type == events.EventType.INPUT_MOBILE.ui_type:
_column = events.EventType.INPUT_MOBILE.column
event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
event_where.append(
f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
events_conditions.append({"type": event_where[-1]})
if not is_any:
if is_not:
event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(json_condition(
"sub", "$properties", _column, op, event.value, e_k
))
events_conditions_not.append(
{"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
{
"type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
events_conditions_not[-1]["condition"] = event_where[-1]
else:
event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(json_condition(
"main", "$properties", _column, op, event.value, e_k
))
events_conditions[-1]["condition"] = event_where[-1]
elif event_type == events.EventType.VIEW_MOBILE.ui_type:
_column = events.EventType.VIEW_MOBILE.column
event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
event_where.append(
f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
events_conditions.append({"type": event_where[-1]})
if not is_any:
if is_not:
event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(json_condition(
"sub", "$properties", _column, op, event.value, e_k
))
events_conditions_not.append(
{"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
{
"type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
events_conditions_not[-1]["condition"] = event_where[-1]
else:
event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s",
event.value, value_key=e_k))
event_where.append(json_condition(
"main", "$properties", _column, op, event.value, e_k
))
events_conditions[-1]["condition"] = event_where[-1]
elif event_type == events.EventType.CUSTOM_MOBILE.ui_type:
_column = events.EventType.CUSTOM_MOBILE.column
event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
event_where.append(
f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
events_conditions.append({"type": event_where[-1]})
if not is_any:
if is_not:
event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(json_condition(
"sub", "$properties", _column, op, event.value, e_k
))
events_conditions_not.append(
{"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
{
"type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
events_conditions_not[-1]["condition"] = event_where[-1]
else:
event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s",
event.value, value_key=e_k))
event_where.append(json_condition(
"main", "$properties", _column, op, event.value, e_k
))
events_conditions[-1]["condition"] = event_where[-1]
elif event_type == events.EventType.REQUEST_MOBILE.ui_type:
event_from = event_from % f"{MAIN_EVENTS_TABLE} AS main "
_column = 'url_path'
event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
event_where.append(
f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
events_conditions.append({"type": event_where[-1]})
if not is_any:
if is_not:
event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(json_condition(
"sub", "$properties", _column, op, event.value, e_k
))
events_conditions_not.append(
{"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
{
"type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
events_conditions_not[-1]["condition"] = event_where[-1]
else:
event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(json_condition(
"main", "$properties", _column, op, event.value, e_k
))
events_conditions[-1]["condition"] = event_where[-1]
elif event_type == events.EventType.CRASH_MOBILE.ui_type:
_column = events.EventType.CRASH_MOBILE.column
event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
event_where.append(
f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
events_conditions.append({"type": event_where[-1]})
if not is_any:
if is_not:
event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(json_condition(
"sub", "$properties", _column, op, event.value, e_k
))
events_conditions_not.append(
{"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
{
"type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
events_conditions_not[-1]["condition"] = event_where[-1]
else:
event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s",
event.value, value_key=e_k))
event_where.append(json_condition(
"main", "$properties", _column, op, event.value, e_k
))
events_conditions[-1]["condition"] = event_where[-1]
elif event_type == events.EventType.SWIPE_MOBILE.ui_type and platform != "web":
_column = events.EventType.SWIPE_MOBILE.column
event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
event_where.append(
f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
events_conditions.append({"type": event_where[-1]})
if not is_any:
if is_not:
event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(json_condition(
"sub", "$properties", _column, op, event.value, e_k
))
events_conditions_not.append(
{"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
{
"type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
events_conditions_not[-1]["condition"] = event_where[-1]
else:
event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s",
event.value, value_key=e_k))
event_where.append(json_condition(
"main", "$properties", _column, op, event.value, e_k
))
events_conditions[-1]["condition"] = event_where[-1]
elif event_type == schemas.PerformanceEventType.FETCH_FAILED:
event_from = event_from % f"{MAIN_EVENTS_TABLE} AS main "
_column = 'url_path'
event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
event_where.append(
f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
events_conditions.append({"type": event_where[-1]})
events_conditions[-1]["condition"] = []
if not is_any:
if is_not:
event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value,
value_key=e_k))
event_where.append(json_condition(
"sub", "$properties", _column, op, event.value, e_k
))
events_conditions_not.append(
{"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
{
"type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
events_conditions_not[-1]["condition"] = event_where[-1]
else:
event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s",
event.value, value_key=e_k))
event_where.append(json_condition(
"main", "$properties", _column, op, event.value, e_k
))
events_conditions[-1]["condition"].append(event_where[-1])
col = performance_event.get_col(event_type)
colname = col["column"]
@ -935,52 +1049,57 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
schemas.PerformanceEventType.LOCATION_LARGEST_CONTENTFUL_PAINT_TIME,
schemas.PerformanceEventType.LOCATION_TTFB]:
event_from = event_from % f"{MAIN_EVENTS_TABLE} AS main "
event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
event_where.append(
f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
events_conditions.append({"type": event_where[-1]})
events_conditions[-1]["condition"] = []
col = performance_event.get_col(event_type)
colname = col["column"]
tname = "main"
if not is_any:
event_where.append(
sh.multi_conditions(f"main.url_path {op} %({e_k})s",
event.value, value_key=e_k))
event_where.append(json_condition(
"main", "$properties", 'url_path', op, event.value, e_k
))
events_conditions[-1]["condition"].append(event_where[-1])
e_k += "_custom"
full_args = {**full_args, **sh.multi_values(event.source, value_key=e_k)}
event_where.append(f"isNotNull({tname}.{colname}) AND {tname}.{colname}>0 AND " +
sh.multi_conditions(f"{tname}.{colname} {event.sourceOperator} %({e_k})s",
event.source, value_key=e_k))
event_where.append(json_condition(
tname, "$properties", colname, event.sourceOperator, event.source, e_k, True, True)
)
events_conditions[-1]["condition"].append(event_where[-1])
events_conditions[-1]["condition"] = " AND ".join(events_conditions[-1]["condition"])
# TODO: isNot for PerformanceEvent
elif event_type in [schemas.PerformanceEventType.LOCATION_AVG_CPU_LOAD,
schemas.PerformanceEventType.LOCATION_AVG_MEMORY_USAGE]:
event_from = event_from % f"{MAIN_EVENTS_TABLE} AS main "
event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
event_where.append(
f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
events_conditions.append({"type": event_where[-1]})
events_conditions[-1]["condition"] = []
col = performance_event.get_col(event_type)
colname = col["column"]
tname = "main"
if not is_any:
event_where.append(
sh.multi_conditions(f"main.url_path {op} %({e_k})s",
event.value, value_key=e_k))
event_where.append(json_condition(
"main", "$properties", 'url_path', op, event.value, e_k
))
events_conditions[-1]["condition"].append(event_where[-1])
e_k += "_custom"
full_args = {**full_args, **sh.multi_values(event.source, value_key=e_k)}
event_where.append(f"isNotNull({tname}.{colname}) AND {tname}.{colname}>0 AND " +
sh.multi_conditions(f"{tname}.{colname} {event.sourceOperator} %({e_k})s",
event.source, value_key=e_k))
event_where.append(json_condition(
tname, "$properties", colname, event.sourceOperator, event.source, e_k, True, True)
)
events_conditions[-1]["condition"].append(event_where[-1])
events_conditions[-1]["condition"] = " AND ".join(events_conditions[-1]["condition"])
elif event_type == schemas.EventType.REQUEST_DETAILS:
event_from = event_from % f"{MAIN_EVENTS_TABLE} AS main "
event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
event_where.append(
f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'")
events_conditions.append({"type": event_where[-1]})
apply = False
events_conditions[-1]["condition"] = []
@ -993,36 +1112,39 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
e_k_f = e_k + f"_fetch{j}"
full_args = {**full_args, **sh.multi_values(f.value, value_key=e_k_f)}
if f.type == schemas.FetchFilterType.FETCH_URL:
event_where.append(
sh.multi_conditions(f"main.url_path {op} %({e_k_f})s", f.value,
value_key=e_k_f))
event_where.append(json_condition(
"main", "$properties", 'url_path', op, f.value, e_k_f
))
events_conditions[-1]["condition"].append(event_where[-1])
apply = True
elif f.type == schemas.FetchFilterType.FETCH_STATUS_CODE:
event_where.append(
sh.multi_conditions(f"main.status {f.operator} %({e_k_f})s", f.value,
value_key=e_k_f))
event_where.append(json_condition(
"main", "$properties", 'status', op, f.value, e_k_f
))
events_conditions[-1]["condition"].append(event_where[-1])
apply = True
elif f.type == schemas.FetchFilterType.FETCH_METHOD:
event_where.append(
sh.multi_conditions(f"main.method {op} %({e_k_f})s", f.value, value_key=e_k_f))
event_where.append(json_condition(
"main", "$properties", 'method', op, f.value, e_k_f
))
events_conditions[-1]["condition"].append(event_where[-1])
apply = True
elif f.type == schemas.FetchFilterType.FETCH_DURATION:
event_where.append(
sh.multi_conditions(f"main.duration {f.operator} %({e_k_f})s", f.value,
sh.multi_conditions(f"main.`$duration_s` {f.operator} %({e_k_f})s/1000", f.value,
value_key=e_k_f))
events_conditions[-1]["condition"].append(event_where[-1])
apply = True
elif f.type == schemas.FetchFilterType.FETCH_REQUEST_BODY:
event_where.append(
sh.multi_conditions(f"main.request_body {op} %({e_k_f})s", f.value, value_key=e_k_f))
event_where.append(json_condition(
"main", "$properties", 'request_body', op, f.value, e_k_f
))
events_conditions[-1]["condition"].append(event_where[-1])
apply = True
elif f.type == schemas.FetchFilterType.FETCH_RESPONSE_BODY:
event_where.append(
sh.multi_conditions(f"main.response_body {op} %({e_k_f})s", f.value, value_key=e_k_f))
event_where.append(json_condition(
"main", "$properties", 'response_body', op, f.value, e_k_f
))
events_conditions[-1]["condition"].append(event_where[-1])
apply = True
else:
@ -1034,7 +1156,7 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
# TODO: no isNot for GraphQL
elif event_type == schemas.EventType.GRAPHQL:
event_from = event_from % f"{MAIN_EVENTS_TABLE} AS main "
event_where.append(f"main.event_type='GRAPHQL'")
event_where.append(f"main.`$event_name`='GRAPHQL'")
events_conditions.append({"type": event_where[-1]})
events_conditions[-1]["condition"] = []
for j, f in enumerate(event.filters):
@ -1046,21 +1168,24 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
e_k_f = e_k + f"_graphql{j}"
full_args = {**full_args, **sh.multi_values(f.value, value_key=e_k_f)}
if f.type == schemas.GraphqlFilterType.GRAPHQL_NAME:
event_where.append(
sh.multi_conditions(f"main.{events.EventType.GRAPHQL.column} {op} %({e_k_f})s", f.value,
value_key=e_k_f))
event_where.append(json_condition(
"main", "$properties", events.EventType.GRAPHQL.column, op, f.value, e_k_f
))
events_conditions[-1]["condition"].append(event_where[-1])
elif f.type == schemas.GraphqlFilterType.GRAPHQL_METHOD:
event_where.append(
sh.multi_conditions(f"main.method {op} %({e_k_f})s", f.value, value_key=e_k_f))
event_where.append(json_condition(
"main", "$properties", 'method', op, f.value, e_k_f
))
events_conditions[-1]["condition"].append(event_where[-1])
elif f.type == schemas.GraphqlFilterType.GRAPHQL_REQUEST_BODY:
event_where.append(
sh.multi_conditions(f"main.request_body {op} %({e_k_f})s", f.value, value_key=e_k_f))
event_where.append(json_condition(
"main", "$properties", 'request_body', op, f.value, e_k_f
))
events_conditions[-1]["condition"].append(event_where[-1])
elif f.type == schemas.GraphqlFilterType.GRAPHQL_RESPONSE_BODY:
event_where.append(
sh.multi_conditions(f"main.response_body {op} %({e_k_f})s", f.value, value_key=e_k_f))
event_where.append(json_condition(
"main", "$properties", 'response_body', op, f.value, e_k_f
))
events_conditions[-1]["condition"].append(event_where[-1])
else:
logging.warning(f"undefined GRAPHQL filter: {f.type}")
@ -1099,7 +1224,7 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
pass
else:
events_query_from.append(f"""\
(SELECT main.session_id, {"MIN" if event_index < (valid_events_count - 1) else "MAX"}(main.datetime) AS datetime
(SELECT main.session_id, {"MIN" if event_index < (valid_events_count - 1) else "MAX"}(main.created_at) AS datetime
FROM {event_from}
WHERE {" AND ".join(event_where)}
GROUP BY session_id
@ -1167,13 +1292,13 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
del value_conditions_not
if data.events_order == schemas.SearchEventOrder.THEN:
having = f"""HAVING sequenceMatch('{''.join(sequence_pattern)}')(main.datetime,{','.join(sequence_conditions)})"""
having = f"""HAVING sequenceMatch('{''.join(sequence_pattern)}')(toDateTime(main.created_at),{','.join(sequence_conditions)})"""
else:
having = f"""HAVING {" AND ".join([f"countIf({c})>0" for c in list(set(sequence_conditions))])}"""
events_query_part = f"""SELECT main.session_id,
MIN(main.datetime) AS first_event_ts,
MAX(main.datetime) AS last_event_ts
MIN(main.created_at) AS first_event_ts,
MAX(main.created_at) AS last_event_ts
FROM {MAIN_EVENTS_TABLE} AS main {events_extra_join}
{sub_join}
WHERE {" AND ".join(events_conditions_where)}
@ -1215,8 +1340,8 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
events_conditions_where.append(f"({' OR '.join(events_conditions)})")
events_query_part = f"""SELECT main.session_id,
MIN(main.datetime) AS first_event_ts,
MAX(main.datetime) AS last_event_ts
MIN(main.created_at) AS first_event_ts,
MAX(main.created_at) AS last_event_ts
FROM {MAIN_EVENTS_TABLE} AS main {events_extra_join}
WHERE {" AND ".join(events_conditions_where)}
GROUP BY session_id"""
@ -1239,8 +1364,8 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
AND issues.project_id = %(projectId)s
AND events.project_id = %(projectId)s
AND events.issue_type = %(issue_type)s
AND events.datetime >= toDateTime(%(startDate)s/1000)
AND events.datetime <= toDateTime(%(endDate)s/1000)
AND events.created_at >= toDateTime(%(startDate)s/1000)
AND events.created_at <= toDateTime(%(endDate)s/1000)
) AS issues ON (f.session_id = issues.session_id)
"""
full_args["issue_contextString"] = issue["contextString"]
@ -1259,8 +1384,8 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
INNER JOIN experimental.events USING (issue_id)
WHERE issues.project_id = %(projectId)s
AND events.project_id = %(projectId)s
AND events.datetime >= toDateTime(%(startDate)s/1000)
AND events.datetime <= toDateTime(%(endDate)s/1000)
AND events.created_at >= toDateTime(%(startDate)s/1000)
AND events.created_at <= toDateTime(%(endDate)s/1000)
AND {" OR ".join(issues_conditions)}
) AS issues USING (session_id)"""

View file

@ -8,7 +8,7 @@ logger = logging.getLogger(__name__)
def get_main_events_table(timestamp=0, platform="web"):
if platform == "web":
return "experimental.events"
return "product_analytics.events"
else:
return "experimental.ios_events"
@ -16,6 +16,7 @@ def get_main_events_table(timestamp=0, platform="web"):
def get_main_sessions_table(timestamp=0):
return "experimental.sessions"
def get_user_favorite_sessions_table(timestamp=0):
return "experimental.user_favorite_sessions"

View file

@ -3,7 +3,7 @@ from enum import Enum
import schemas
def get_sql_operator(op: Union[schemas.SearchEventOperator, schemas.ClickEventExtraOperator]):
def get_sql_operator(op: Union[schemas.SearchEventOperator, schemas.ClickEventExtraOperator, schemas.MathOperator]):
return {
schemas.SearchEventOperator.IS: "=",
schemas.SearchEventOperator.ON: "=",
@ -21,6 +21,11 @@ def get_sql_operator(op: Union[schemas.SearchEventOperator, schemas.ClickEventEx
schemas.ClickEventExtraOperator.NOT_CONTAINS: "NOT ILIKE",
schemas.ClickEventExtraOperator.STARTS_WITH: "ILIKE",
schemas.ClickEventExtraOperator.ENDS_WITH: "ILIKE",
schemas.MathOperator.GREATER: ">",
schemas.MathOperator.GREATER_EQ: ">=",
schemas.MathOperator.LESS: "<",
schemas.MathOperator.LESS_EQ: "<=",
}.get(op, "=")