From 954e811be08cceeee0f33d62f843a56a1df0e3a1 Mon Sep 17 00:00:00 2001 From: Shekar Siri Date: Thu, 23 Jan 2025 12:21:23 +0100 Subject: [PATCH] change(api): follow the new structure for cards (#2952) * change(api): follow the new strucutre for caards * change(api): update query to handle location and performance events * change(api): ch query updaets - monitors - sessions with 4xx .. * change(api): ch query updaets - monitors - table of errors * change(api): ch query updates - use created_at * change(api): ch query updates - fix the column name for errorId * change(api): ch query updates - heatmaps * change(api): ch query updates - funnels * change(api): ch query updates - user jounrey / path finder * change(api): ch query updates - user jounrey / path finder * change(api): ch query updates - heatmaps fix * refactor(chalice): changes * refactor(chalice): changes * refactor(chalice): changes --------- Co-authored-by: Taha Yassine Kraiem --- .../core/autocomplete/autocomplete_ch.py | 4 +- api/chalicelib/core/errors/errors_ch.py | 63 ++- api/chalicelib/core/metrics/heatmaps_ch.py | 113 +++-- api/chalicelib/core/metrics/metrics_ch.py | 16 + .../modules/significance/significance_ch.py | 33 +- .../core/metrics/product_analytics_ch.py | 243 +-------- ...uct_analytics_ch_new_structure_old_code.py | 389 ++++++++++++++ api/chalicelib/core/sessions/sessions_ch.py | 473 +++++++++++------- api/chalicelib/utils/exp_ch_helper.py | 3 +- api/chalicelib/utils/sql_helper.py | 7 +- 10 files changed, 843 insertions(+), 501 deletions(-) create mode 100644 api/chalicelib/core/metrics/product_analytics_ch_new_structure_old_code.py diff --git a/api/chalicelib/core/autocomplete/autocomplete_ch.py b/api/chalicelib/core/autocomplete/autocomplete_ch.py index 2ff665cd9..a3ad1d836 100644 --- a/api/chalicelib/core/autocomplete/autocomplete_ch.py +++ b/api/chalicelib/core/autocomplete/autocomplete_ch.py @@ -257,9 +257,9 @@ def __search_metadata(project_id, value, key=None, source=None): WHERE project_id = %(project_id)s AND {colname} ILIKE %(svalue)s LIMIT 5)""") with ch_client.ClickHouseClient() as cur: - query = cur.format(f"""SELECT key, value, 'METADATA' AS TYPE + query = cur.format(query=f"""SELECT key, value, 'METADATA' AS TYPE FROM({" UNION ALL ".join(sub_from)}) AS all_metas - LIMIT 5;""", {"project_id": project_id, "value": helper.string_to_sql_like(value), + LIMIT 5;""", parameters={"project_id": project_id, "value": helper.string_to_sql_like(value), "svalue": helper.string_to_sql_like("^" + value)}) results = cur.execute(query) return helper.list_to_camel_case(results) diff --git a/api/chalicelib/core/errors/errors_ch.py b/api/chalicelib/core/errors/errors_ch.py index 4bfd96b36..0c25c8142 100644 --- a/api/chalicelib/core/errors/errors_ch.py +++ b/api/chalicelib/core/errors/errors_ch.py @@ -1,6 +1,6 @@ import schemas from chalicelib.core import metadata -from chalicelib.core import sessions +from chalicelib.core.errors.modules import sessions from chalicelib.core.metrics import metrics from chalicelib.utils import ch_client, exp_ch_helper from chalicelib.utils import helper, metrics_helper @@ -70,7 +70,7 @@ def __get_basic_constraints(platform=None, time_constraint=True, startTime_arg_n else: table_name = "" if type_condition: - ch_sub_query.append(f"{table_name}event_type='ERROR'") + ch_sub_query.append(f"{table_name}`$event_name`='ERROR'") if time_constraint: ch_sub_query += [f"{table_name}datetime >= toDateTime(%({startTime_arg_name})s/1000)", f"{table_name}datetime < toDateTime(%({endTime_arg_name})s/1000)"] @@ -81,6 +81,26 @@ def __get_basic_constraints(platform=None, time_constraint=True, startTime_arg_n return ch_sub_query +def __get_basic_constraints_events(platform=None, time_constraint=True, startTime_arg_name="startDate", + endTime_arg_name="endDate", type_condition=True, project_key="project_id", + table_name=None): + ch_sub_query = [f"{project_key} =toUInt16(%(project_id)s)"] + if table_name is not None: + table_name = table_name + "." + else: + table_name = "" + if type_condition: + ch_sub_query.append(f"{table_name}`$event_name`='ERROR'") + if time_constraint: + ch_sub_query += [f"{table_name}created_at >= toDateTime(%({startTime_arg_name})s/1000)", + f"{table_name}created_at < toDateTime(%({endTime_arg_name})s/1000)"] + # if platform == schemas.PlatformType.MOBILE: + # ch_sub_query.append("user_device_type = 'mobile'") + # elif platform == schemas.PlatformType.DESKTOP: + # ch_sub_query.append("user_device_type = 'desktop'") + return ch_sub_query + + def __get_sort_key(key): return { schemas.ErrorSort.OCCURRENCE: "max_datetime", @@ -99,10 +119,11 @@ def search(data: schemas.SearchErrorsSchema, project_id, user_id): platform = f.value[0] ch_sessions_sub_query = __get_basic_constraints(platform, type_condition=False) # ignore platform for errors table - ch_sub_query = __get_basic_constraints(None, type_condition=True) - ch_sub_query.append("source ='js_exception'") + ch_sub_query = __get_basic_constraints_events(None, type_condition=True) + ch_sub_query.append("JSONExtractString(toString(`$properties`), 'source') = 'js_exception'") + # To ignore Script error - ch_sub_query.append("message!='Script error.'") + ch_sub_query.append("JSONExtractString(toString(`$properties`), 'message') != 'Script error.'") error_ids = None if data.startTimestamp is None: @@ -183,7 +204,6 @@ def search(data: schemas.SearchErrorsSchema, project_id, user_id): _multiple_conditions(f's.user_country {op} %({f_k})s', f.value, is_not=is_not, value_key=f_k)) - elif filter_type in [schemas.FilterType.UTM_SOURCE]: if is_any: ch_sessions_sub_query.append('isNotNull(s.utm_source)') @@ -333,17 +353,18 @@ def search(data: schemas.SearchErrorsSchema, project_id, user_id): ch_sub_query.append("error_id IN %(error_ids)s") main_ch_query = f"""\ - SELECT details.error_id AS error_id, + SELECT details.error_id as error_id, name, message, users, total, sessions, last_occurrence, first_occurrence, chart - FROM (SELECT error_id, - name, - message, + FROM (SELECT JSONExtractString(toString(`$properties`), 'error_id') AS error_id, + JSONExtractString(toString(`$properties`), 'name') AS name, + JSONExtractString(toString(`$properties`), 'message') AS message, COUNT(DISTINCT user_id) AS users, COUNT(DISTINCT events.session_id) AS sessions, - MAX(datetime) AS max_datetime, - MIN(datetime) AS min_datetime, - COUNT(DISTINCT events.error_id) OVER() AS total + MAX(created_at) AS max_datetime, + MIN(created_at) AS min_datetime, + COUNT(DISTINCT JSONExtractString(toString(`$properties`), 'error_id')) + OVER() AS total FROM {MAIN_EVENTS_TABLE} AS events INNER JOIN (SELECT session_id, coalesce(user_id,toString(user_uuid)) AS user_id FROM {MAIN_SESSIONS_TABLE} AS s @@ -354,17 +375,18 @@ def search(data: schemas.SearchErrorsSchema, project_id, user_id): GROUP BY error_id, name, message ORDER BY {sort} {order} LIMIT %(errors_limit)s OFFSET %(errors_offset)s) AS details - INNER JOIN (SELECT error_id AS error_id, - toUnixTimestamp(MAX(datetime))*1000 AS last_occurrence, - toUnixTimestamp(MIN(datetime))*1000 AS first_occurrence + INNER JOIN (SELECT JSONExtractString(toString(`$properties`), 'error_id') AS error_id, + toUnixTimestamp(MAX(created_at))*1000 AS last_occurrence, + toUnixTimestamp(MIN(created_at))*1000 AS first_occurrence FROM {MAIN_EVENTS_TABLE} WHERE project_id=%(project_id)s - AND event_type='ERROR' + AND `$event_name`='ERROR' GROUP BY error_id) AS time_details ON details.error_id=time_details.error_id INNER JOIN (SELECT error_id, groupArray([timestamp, count]) AS chart - FROM (SELECT error_id, toUnixTimestamp(toStartOfInterval(datetime, INTERVAL %(step_size)s second)) * 1000 AS timestamp, - COUNT(DISTINCT session_id) AS count + FROM (SELECT JSONExtractString(toString(`$properties`), 'error_id') AS error_id, + toUnixTimestamp(toStartOfInterval(created_at, INTERVAL %(step_size)s second)) * 1000 AS timestamp, + COUNT(DISTINCT session_id) AS count FROM {MAIN_EVENTS_TABLE} WHERE {" AND ".join(ch_sub_query)} GROUP BY error_id, timestamp @@ -374,8 +396,9 @@ def search(data: schemas.SearchErrorsSchema, project_id, user_id): # print("------------") # print(ch.format(main_ch_query, params)) # print("------------") + query = ch.format(query=main_ch_query, parameters=params) - rows = ch.execute(query=main_ch_query, parameters=params) + rows = ch.execute(query) total = rows[0]["total"] if len(rows) > 0 else 0 for r in rows: diff --git a/api/chalicelib/core/metrics/heatmaps_ch.py b/api/chalicelib/core/metrics/heatmaps_ch.py index e7e323410..cfbd1b89c 100644 --- a/api/chalicelib/core/metrics/heatmaps_ch.py +++ b/api/chalicelib/core/metrics/heatmaps_ch.py @@ -17,15 +17,18 @@ def get_by_url(project_id, data: schemas.GetHeatMapPayloadSchema): return [] args = {"startDate": data.startTimestamp, "endDate": data.endTimestamp, "project_id": project_id, "url": data.url} - constraints = ["main_events.project_id = toUInt16(%(project_id)s)", - "main_events.datetime >= toDateTime(%(startDate)s/1000)", - "main_events.datetime <= toDateTime(%(endDate)s/1000)", - "main_events.event_type='CLICK'", - "isNotNull(main_events.normalized_x)"] + constraints = [ + "main_events.project_id = toUInt16(%(project_id)s)", + "main_events.created_at >= toDateTime(%(startDate)s / 1000)", + "main_events.created_at <= toDateTime(%(endDate)s / 1000)", + "main_events.`$event_name` = 'CLICK'", + "isNotNull(JSON_VALUE(CAST(main_events.`$properties` AS String), '$.normalized_x'))" + ] + if data.operator == schemas.SearchEventOperator.IS: - constraints.append("url_path= %(url)s") + constraints.append("JSON_VALUE(CAST(main_events.`$properties` AS String), '$.url_path') = %(url)s") else: - constraints.append("url_path ILIKE %(url)s") + constraints.append("JSON_VALUE(CAST(main_events.`$properties` AS String), '$.url_path') ILIKE %(url)s") args["url"] = helper.values_for_operator(data.url, data.operator) query_from = f"{exp_ch_helper.get_main_events_table(data.startTimestamp)} AS main_events" @@ -70,11 +73,12 @@ def get_by_url(project_id, data: schemas.GetHeatMapPayloadSchema): # query_from += """ LEFT JOIN experimental.events AS issues_t ON (main_events.session_id=issues_t.session_id) # LEFT JOIN experimental.issues AS mis ON (issues_t.issue_id=mis.issue_id)""" with ch_client.ClickHouseClient() as cur: - query = cur.format(query=f"""SELECT main_events.normalized_x AS normalized_x, - main_events.normalized_y AS normalized_y - FROM {query_from} - WHERE {" AND ".join(constraints)} - LIMIT 500;""", + query = cur.format(query=f"""SELECT + JSON_VALUE(CAST(`$properties` AS String), '$.normalized_x') AS normalized_x, + JSON_VALUE(CAST(`$properties` AS String), '$.normalized_y') AS normalized_y + FROM {query_from} + WHERE {" AND ".join(constraints)} + LIMIT 500;""", parameters=args) logger.debug("---------") logger.debug(query) @@ -94,14 +98,16 @@ def get_by_url(project_id, data: schemas.GetHeatMapPayloadSchema): def get_x_y_by_url_and_session_id(project_id, session_id, data: schemas.GetHeatMapPayloadSchema): args = {"project_id": project_id, "session_id": session_id, "url": data.url} - constraints = ["main_events.project_id = toUInt16(%(project_id)s)", - "main_events.session_id = %(session_id)s", - "main_events.event_type='CLICK'", - "isNotNull(main_events.normalized_x)"] + constraints = [ + "main_events.project_id = toUInt16(%(project_id)s)", + "main_events.session_id = %(session_id)s", + "main_events.`$event_name`='CLICK'", + "isNotNull(JSON_VALUE(CAST(main_events.`$properties` AS String), '$.normalized_x'))" + ] if data.operator == schemas.SearchEventOperator.IS: - constraints.append("main_events.url_path = %(url)s") + constraints.append("JSON_VALUE(CAST(main_events.`$properties` AS String), '$.url_path') = %(url)s") else: - constraints.append("main_events.url_path ILIKE %(url)s") + constraints.append("JSON_VALUE(CAST(main_events.`$properties` AS String), '$.url_path') ILIKE %(url)s") args["url"] = helper.values_for_operator(data.url, data.operator) query_from = f"{exp_ch_helper.get_main_events_table(0)} AS main_events" @@ -132,11 +138,12 @@ def get_selectors_by_url_and_session_id(project_id, session_id, data: schemas.Ge args = {"project_id": project_id, "session_id": session_id, "url": data.url} constraints = ["main_events.project_id = toUInt16(%(project_id)s)", "main_events.session_id = %(session_id)s", - "main_events.event_type='CLICK'"] + "main_events.`$event_name`='CLICK'"] + if data.operator == schemas.SearchEventOperator.IS: - constraints.append("main_events.url_path = %(url)s") + constraints.append("JSON_VALUE(CAST(main_events.`$properties` AS String), '$.url_path') = %(url)s") else: - constraints.append("main_events.url_path ILIKE %(url)s") + constraints.append("JSON_VALUE(CAST(main_events.`$properties` AS String), '$.url_path') ILIKE %(url)s") args["url"] = helper.values_for_operator(data.url, data.operator) query_from = f"{exp_ch_helper.get_main_events_table(0)} AS main_events" @@ -181,7 +188,7 @@ def __get_1_url(location_condition: schemas.SessionSearchEventSchema2 | None, se "start_time": start_time, "end_time": end_time, } - sub_condition = ["session_id = %(sessionId)s", "event_type = 'CLICK'", "project_id = %(projectId)s"] + sub_condition = ["session_id = %(sessionId)s", "`$event_name` = 'CLICK'", "project_id = %(projectId)s"] if location_condition and len(location_condition.value) > 0: f_k = "LOC" op = sh.get_sql_operator(location_condition.operator) @@ -190,19 +197,25 @@ def __get_1_url(location_condition: schemas.SessionSearchEventSchema2 | None, se sh.multi_conditions(f'path {op} %({f_k})s', location_condition.value, is_not=False, value_key=f_k)) with ch_client.ClickHouseClient() as cur: - main_query = cur.format(query=f"""WITH paths AS (SELECT DISTINCT url_path - FROM experimental.events - WHERE {" AND ".join(sub_condition)}) - SELECT url_path, COUNT(1) AS count - FROM experimental.events - INNER JOIN paths USING (url_path) - WHERE event_type = 'CLICK' - AND project_id = %(projectId)s - AND datetime >= toDateTime(%(start_time)s / 1000) - AND datetime <= toDateTime(%(end_time)s / 1000) - GROUP BY url_path - ORDER BY count DESC - LIMIT 1;""", + main_query = cur.format(query=f"""WITH paths AS ( + SELECT DISTINCT + JSON_VALUE(CAST(`$properties` AS String), '$.url_path') AS url_path + FROM product_analytics.events + WHERE {" AND ".join(sub_condition)} + ) + SELECT + paths.url_path, + COUNT(*) AS count + FROM product_analytics.events + INNER JOIN paths + ON JSON_VALUE(CAST(product_analytics.events.$properties AS String), '$.url_path') = paths.url_path + WHERE `$event_name` = 'CLICK' + AND project_id = %(projectId)s + AND created_at >= toDateTime(%(start_time)s / 1000) + AND created_at <= toDateTime(%(end_time)s / 1000) + GROUP BY paths.url_path + ORDER BY count DESC + LIMIT 1;""", parameters=full_args) logger.debug("--------------------") logger.debug(main_query) @@ -352,19 +365,21 @@ def get_selected_session(project_id, session_id): def get_page_events(session_id, project_id): with ch_client.ClickHouseClient() as cur: - rows = cur.execute("""\ - SELECT - message_id, - toUnixTimestamp(datetime)*1000 AS timestamp, - url_host AS host, - url_path AS path, - url_path AS value, - url_path AS url, - 'LOCATION' AS type - FROM experimental.events - WHERE session_id = %(session_id)s - AND event_type='LOCATION' - AND project_id= %(project_id)s - ORDER BY datetime,message_id;""", {"session_id": session_id, "project_id": project_id}) + query = cur.format(query=f"""SELECT + event_id as message_id, + toUnixTimestamp(created_at)*1000 AS timestamp, + JSON_VALUE(CAST(`$properties` AS String), '$.url_host') AS host, + JSON_VALUE(CAST(`$properties` AS String), '$.url_path') AS path, + JSON_VALUE(CAST(`$properties` AS String), '$.url_path') AS value, + JSON_VALUE(CAST(`$properties` AS String), '$.url_path') AS url, + 'LOCATION' AS type + FROM product_analytics.events + WHERE session_id = %(session_id)s + AND `$event_name`='LOCATION' + AND project_id= %(project_id)s + ORDER BY created_at,message_id;""", + parameters={"session_id": session_id, "project_id": project_id}) + + rows = cur.execute(query=query) rows = helper.list_to_camel_case(rows) return rows diff --git a/api/chalicelib/core/metrics/metrics_ch.py b/api/chalicelib/core/metrics/metrics_ch.py index fccc4d48f..86bcb2f29 100644 --- a/api/chalicelib/core/metrics/metrics_ch.py +++ b/api/chalicelib/core/metrics/metrics_ch.py @@ -27,6 +27,22 @@ def __get_basic_constraints(table_name=None, time_constraint=True, round_start=F return ch_sub_query + __get_generic_constraint(data=data, table_name=table_name) +def __get_basic_constraints_events(table_name=None, time_constraint=True, round_start=False, data={}, identifier="project_id"): + if table_name: + table_name += "." + else: + table_name = "" + ch_sub_query = [f"{table_name}{identifier} =toUInt16(%({identifier})s)"] + if time_constraint: + if round_start: + ch_sub_query.append( + f"toStartOfInterval({table_name}created_at, INTERVAL %(step_size)s second) >= toDateTime(%(startTimestamp)s/1000)") + else: + ch_sub_query.append(f"{table_name}created_at >= toDateTime(%(startTimestamp)s/1000)") + ch_sub_query.append(f"{table_name}created_at < toDateTime(%(endTimestamp)s/1000)") + return ch_sub_query + __get_generic_constraint(data=data, table_name=table_name) + + def __frange(start, stop, step): result = [] i = start diff --git a/api/chalicelib/core/metrics/modules/significance/significance_ch.py b/api/chalicelib/core/metrics/modules/significance/significance_ch.py index d01c49da0..6d920ce59 100644 --- a/api/chalicelib/core/metrics/modules/significance/significance_ch.py +++ b/api/chalicelib/core/metrics/modules/significance/significance_ch.py @@ -19,9 +19,9 @@ def get_simple_funnel(filter_d: schemas.CardSeriesFilterSchema, project: schemas filters: List[schemas.SessionSearchFilterSchema] = filter_d.filters platform = project.platform constraints = ["e.project_id = %(project_id)s", - "e.datetime >= toDateTime(%(startTimestamp)s/1000)", - "e.datetime <= toDateTime(%(endTimestamp)s/1000)", - "e.event_type IN %(eventTypes)s"] + "e.created_at >= toDateTime(%(startTimestamp)s/1000)", + "e.created_at <= toDateTime(%(endTimestamp)s/1000)", + "e.`$event_name` IN %(eventTypes)s"] full_args = {"project_id": project.project_id, "startTimestamp": filter_d.startTimestamp, "endTimestamp": filter_d.endTimestamp} @@ -157,16 +157,23 @@ def get_simple_funnel(filter_d: schemas.CardSeriesFilterSchema, project: schemas if next_event_type not in event_types: event_types.append(next_event_type) full_args[f"event_type_{i}"] = next_event_type - n_stages_query.append(f"event_type=%(event_type_{i})s") + n_stages_query.append(f"`$event_name`=%(event_type_{i})s") if is_not: n_stages_query_not.append(n_stages_query[-1] + " AND " + - (sh.multi_conditions(f' {next_col_name} {op} %({e_k})s', s.value, - is_not=is_not, value_key=e_k) - if not specific_condition else specific_condition)) + (sh.multi_conditions( + f"JSON_VALUE(CAST(`$properties` AS String), '$.{next_col_name}') {op} %({e_k})s", + s.value, + is_not=is_not, + value_key=e_k + ) if not specific_condition else specific_condition)) elif not is_any: - n_stages_query[-1] += " AND " + (sh.multi_conditions(f' {next_col_name} {op} %({e_k})s', s.value, - is_not=is_not, value_key=e_k) - if not specific_condition else specific_condition) + n_stages_query[-1] += " AND " + ( + sh.multi_conditions( + f"JSON_VALUE(CAST(`$properties` AS String), '$.{next_col_name}') {op} %({e_k})s", + s.value, + is_not=is_not, + value_key=e_k + ) if not specific_condition else specific_condition) full_args = {"eventTypes": tuple(event_types), **full_args, **values} n_stages = len(n_stages_query) @@ -188,8 +195,8 @@ def get_simple_funnel(filter_d: schemas.CardSeriesFilterSchema, project: schemas if len(n_stages_query_not) > 0: value_conditions_not_base = ["project_id = %(project_id)s", - "datetime >= toDateTime(%(startTimestamp)s/1000)", - "datetime <= toDateTime(%(endTimestamp)s/1000)"] + "created_at >= toDateTime(%(startTimestamp)s/1000)", + "created_at <= toDateTime(%(endTimestamp)s/1000)"] _value_conditions_not = [] value_conditions_not = [] for c in n_stages_query_not: @@ -221,7 +228,7 @@ def get_simple_funnel(filter_d: schemas.CardSeriesFilterSchema, project: schemas pattern += f"(?{j + 1})" conditions.append(n_stages_query[j]) j += 1 - sequences.append(f"sequenceMatch('{pattern}')(e.datetime, {','.join(conditions)}) AS T{i + 1}") + sequences.append(f"sequenceMatch('{pattern}')(toDateTime(e.created_at), {','.join(conditions)}) AS T{i + 1}") n_stages_query = f""" SELECT {",".join(projections)} diff --git a/api/chalicelib/core/metrics/product_analytics_ch.py b/api/chalicelib/core/metrics/product_analytics_ch.py index 6246e5013..d6059acb9 100644 --- a/api/chalicelib/core/metrics/product_analytics_ch.py +++ b/api/chalicelib/core/metrics/product_analytics_ch.py @@ -23,246 +23,6 @@ JOURNEY_TYPES = { } -def __get_test_data(): - with ch_client.ClickHouseClient(database="experimental") as ch: - ch_query1 = """ - CREATE TEMPORARY TABLE pre_ranked_events_1736344377403 AS - (WITH initial_event AS (SELECT events.session_id, MIN(datetime) AS start_event_timestamp - FROM experimental.events AS events - WHERE ((event_type = 'LOCATION' AND (url_path = '/en/deployment/'))) - AND events.project_id = toUInt16(65) - AND events.datetime >= toDateTime(1735599600000 / 1000) - AND events.datetime < toDateTime(1736290799999 / 1000) - GROUP BY 1), - pre_ranked_events AS (SELECT * - FROM (SELECT session_id, - event_type, - datetime, - url_path AS e_value, - row_number() OVER (PARTITION BY session_id - ORDER BY datetime , - message_id ) AS event_number_in_session - FROM experimental.events AS events - INNER JOIN initial_event ON (events.session_id = initial_event.session_id) - WHERE events.project_id = toUInt16(65) - AND events.datetime >= toDateTime(1735599600000 / 1000) - AND events.datetime < toDateTime(1736290799999 / 1000) - AND (events.event_type = 'LOCATION') - AND events.datetime >= initial_event.start_event_timestamp - ) AS full_ranked_events - WHERE event_number_in_session <= 5) - SELECT * - FROM pre_ranked_events); - """ - ch.execute(query=ch_query1, parameters={}) - ch_query1 = """ - CREATE TEMPORARY TABLE ranked_events_1736344377403 AS - (WITH pre_ranked_events AS (SELECT * - FROM pre_ranked_events_1736344377403), - start_points AS (SELECT DISTINCT session_id - FROM pre_ranked_events - WHERE ((event_type = 'LOCATION' AND (e_value = '/en/deployment/'))) - AND pre_ranked_events.event_number_in_session = 1), - ranked_events AS (SELECT pre_ranked_events.*, - leadInFrame(e_value) - OVER (PARTITION BY session_id ORDER BY datetime - ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS next_value, - leadInFrame(toNullable(event_type)) - OVER (PARTITION BY session_id ORDER BY datetime - ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS next_type - FROM start_points - INNER JOIN pre_ranked_events USING (session_id)) - SELECT * - FROM ranked_events); - """ - ch.execute(query=ch_query1, parameters={}) - ch_query1 = """ - WITH ranked_events AS (SELECT * - FROM ranked_events_1736344377403), - n1 AS (SELECT event_number_in_session, - event_type, - e_value, - next_type, - next_value, - COUNT(1) AS sessions_count - FROM ranked_events - WHERE event_number_in_session = 1 - GROUP BY event_number_in_session, event_type, e_value, next_type, next_value - ORDER BY sessions_count DESC), - n2 AS (SELECT event_number_in_session, - event_type, - e_value, - next_type, - next_value, - COUNT(1) AS sessions_count - FROM ranked_events - WHERE event_number_in_session = 2 - GROUP BY event_number_in_session, event_type, e_value, next_type, next_value - ORDER BY sessions_count DESC), - n3 AS (SELECT event_number_in_session, - event_type, - e_value, - next_type, - next_value, - COUNT(1) AS sessions_count - FROM ranked_events - WHERE event_number_in_session = 3 - GROUP BY event_number_in_session, event_type, e_value, next_type, next_value - ORDER BY sessions_count DESC), - - drop_n AS (-- STEP 1 - SELECT event_number_in_session, - event_type, - e_value, - 'DROP' AS next_type, - NULL AS next_value, - sessions_count - FROM n1 - WHERE isNull(n1.next_type) - UNION ALL - -- STEP 2 - SELECT event_number_in_session, - event_type, - e_value, - 'DROP' AS next_type, - NULL AS next_value, - sessions_count - FROM n2 - WHERE isNull(n2.next_type)), - top_n AS (SELECT event_number_in_session, - event_type, - e_value, - SUM(sessions_count) AS sessions_count - FROM n1 - GROUP BY event_number_in_session, event_type, e_value - LIMIT 1 - UNION ALL - -- STEP 2 - SELECT event_number_in_session, - event_type, - e_value, - SUM(sessions_count) AS sessions_count - FROM n2 - GROUP BY event_number_in_session, event_type, e_value - ORDER BY sessions_count DESC - LIMIT 3 - UNION ALL - -- STEP 3 - SELECT event_number_in_session, - event_type, - e_value, - SUM(sessions_count) AS sessions_count - FROM n3 - GROUP BY event_number_in_session, event_type, e_value - ORDER BY sessions_count DESC - LIMIT 3), - top_n_with_next AS (SELECT n1.* - FROM n1 - UNION ALL - SELECT n2.* - FROM n2 - INNER JOIN top_n ON (n2.event_number_in_session = top_n.event_number_in_session - AND n2.event_type = top_n.event_type - AND n2.e_value = top_n.e_value)), - others_n AS ( - -- STEP 2 - SELECT n2.* - FROM n2 - WHERE (n2.event_number_in_session, n2.event_type, n2.e_value) NOT IN - (SELECT event_number_in_session, event_type, e_value - FROM top_n - WHERE top_n.event_number_in_session = 2) - UNION ALL - -- STEP 3 - SELECT n3.* - FROM n3 - WHERE (n3.event_number_in_session, n3.event_type, n3.e_value) NOT IN - (SELECT event_number_in_session, event_type, e_value - FROM top_n - WHERE top_n.event_number_in_session = 3)) - SELECT * - FROM ( - -- Top to Top: valid - SELECT top_n_with_next.* - FROM top_n_with_next - INNER JOIN top_n - ON (top_n_with_next.event_number_in_session + 1 = top_n.event_number_in_session - AND top_n_with_next.next_type = top_n.event_type - AND top_n_with_next.next_value = top_n.e_value) - UNION ALL - -- Top to Others: valid - SELECT top_n_with_next.event_number_in_session, - top_n_with_next.event_type, - top_n_with_next.e_value, - 'OTHER' AS next_type, - NULL AS next_value, - SUM(top_n_with_next.sessions_count) AS sessions_count - FROM top_n_with_next - WHERE (top_n_with_next.event_number_in_session + 1, top_n_with_next.next_type, top_n_with_next.next_value) IN - (SELECT others_n.event_number_in_session, others_n.event_type, others_n.e_value FROM others_n) - GROUP BY top_n_with_next.event_number_in_session, top_n_with_next.event_type, top_n_with_next.e_value - UNION ALL - -- Top go to Drop: valid - SELECT drop_n.event_number_in_session, - drop_n.event_type, - drop_n.e_value, - drop_n.next_type, - drop_n.next_value, - drop_n.sessions_count - FROM drop_n - INNER JOIN top_n ON (drop_n.event_number_in_session = top_n.event_number_in_session - AND drop_n.event_type = top_n.event_type - AND drop_n.e_value = top_n.e_value) - ORDER BY drop_n.event_number_in_session - UNION ALL - -- Others got to Drop: valid - SELECT others_n.event_number_in_session, - 'OTHER' AS event_type, - NULL AS e_value, - 'DROP' AS next_type, - NULL AS next_value, - SUM(others_n.sessions_count) AS sessions_count - FROM others_n - WHERE isNull(others_n.next_type) - AND others_n.event_number_in_session < 3 - GROUP BY others_n.event_number_in_session, next_type, next_value - UNION ALL - -- Others got to Top:valid - SELECT others_n.event_number_in_session, - 'OTHER' AS event_type, - NULL AS e_value, - others_n.next_type, - others_n.next_value, - SUM(others_n.sessions_count) AS sessions_count - FROM others_n - WHERE isNotNull(others_n.next_type) - AND (others_n.event_number_in_session + 1, others_n.next_type, others_n.next_value) IN - (SELECT top_n.event_number_in_session, top_n.event_type, top_n.e_value FROM top_n) - GROUP BY others_n.event_number_in_session, others_n.next_type, others_n.next_value - UNION ALL - -- Others got to Others - SELECT others_n.event_number_in_session, - 'OTHER' AS event_type, - NULL AS e_value, - 'OTHER' AS next_type, - NULL AS next_value, - SUM(sessions_count) AS sessions_count - FROM others_n - WHERE isNotNull(others_n.next_type) - AND others_n.event_number_in_session < 3 - AND (others_n.event_number_in_session + 1, others_n.next_type, others_n.next_value) NOT IN - (SELECT event_number_in_session, event_type, e_value FROM top_n) - GROUP BY others_n.event_number_in_session) - ORDER BY event_number_in_session, sessions_count DESC;""" - rows = ch.execute(query=ch_query1, parameters={}) - drop = 0 - for r in rows: - if r["next_type"] == "DROP": - drop += r["sessions_count"] - r["sessions_count"] = drop - - return __transform_journey(rows=rows, reverse_path=False) - # startPoints are computed before ranked_events to reduce the number of window functions over rows # compute avg_time_from_previous at the same level as sessions_count (this was removed in v1.22) @@ -289,7 +49,8 @@ def path_analysis(project_id: int, data: schemas.CardPathAnalysis): sub_events.append({"column": JOURNEY_TYPES[s.type]["column"], "eventType": JOURNEY_TYPES[s.type]["eventType"]}) step_1_post_conditions.append( - f"(event_type!='{JOURNEY_TYPES[s.type]["eventType"]}' OR event_number_in_session = 1)") + f"(event_type='{JOURNEY_TYPES[s.type]["eventType"]}' AND event_number_in_session = 1 \ + OR event_type!='{JOURNEY_TYPES[s.type]["eventType"]}' AND event_number_in_session > 1)") extra_metric_values.append(s.type) data.metric_value += extra_metric_values diff --git a/api/chalicelib/core/metrics/product_analytics_ch_new_structure_old_code.py b/api/chalicelib/core/metrics/product_analytics_ch_new_structure_old_code.py new file mode 100644 index 000000000..31513b2d6 --- /dev/null +++ b/api/chalicelib/core/metrics/product_analytics_ch_new_structure_old_code.py @@ -0,0 +1,389 @@ +from typing import List + +import schemas +from chalicelib.core.metrics.metrics_ch import __get_basic_constraints, __get_basic_constraints_events +from chalicelib.core.metrics.metrics_ch import __get_constraint_values, __complete_missing_steps +from chalicelib.utils import ch_client, exp_ch_helper +from chalicelib.utils import helper, dev +from chalicelib.utils.TimeUTC import TimeUTC +from chalicelib.utils import sql_helper as sh +from chalicelib.core import metadata +from time import time + +import logging +from chalicelib.core.metrics.product_analytics import __transform_journey + +logger = logging.getLogger(__name__) + +JOURNEY_TYPES = { + schemas.ProductAnalyticsSelectedEventType.LOCATION: { + "eventType": "LOCATION", + "column": "JSON_VALUE(CAST(`$properties` AS String), '$.url_path')", + }, + schemas.ProductAnalyticsSelectedEventType.CLICK: { + "eventType": "LOCATION", + "column": "JSON_VALUE(CAST(`$properties` AS String), '$.label')", + }, + schemas.ProductAnalyticsSelectedEventType.INPUT: { + "eventType": "LOCATION", + "column": "JSON_VALUE(CAST(`$properties` AS String), '$.label')", + }, + schemas.ProductAnalyticsSelectedEventType.CUSTOM_EVENT: { + "eventType": "LOCATION", + "column": "JSON_VALUE(CAST(`$properties` AS String), '$.name')", + }, +} + + +# Q6: use events as a sub_query to support filter of materialized columns when doing a join +# query: Q5, the result is correct, +# startPoints are computed before ranked_events to reduce the number of window functions over rows +# replaced time_to_target by time_from_previous +# compute avg_time_from_previous at the same level as sessions_count (this was removed in v1.22) +# sort by top 5 according to sessions_count at the CTE level +# final part project data without grouping +# if start-point is selected, the selected event is ranked n°1 +def path_analysis(project_id: int, data: schemas.CardPathAnalysis): + sub_events = [] + start_points_conditions = [] + step_0_conditions = [] + step_1_post_conditions = ["event_number_in_session <= %(density)s"] + + if len(data.metric_value) == 0: + data.metric_value.append(schemas.ProductAnalyticsSelectedEventType.LOCATION) + sub_events.append({"column": JOURNEY_TYPES[schemas.ProductAnalyticsSelectedEventType.LOCATION]["column"], + "eventType": schemas.ProductAnalyticsSelectedEventType.LOCATION.value}) + else: + if len(data.start_point) > 0: + extra_metric_values = [] + for s in data.start_point: + if s.type not in data.metric_value: + sub_events.append({"column": JOURNEY_TYPES[s.type]["column"], + "eventType": JOURNEY_TYPES[s.type]["eventType"]}) + step_1_post_conditions.append( + f"(`$event_name`!='{JOURNEY_TYPES[s.type]["eventType"]}' OR event_number_in_session = 1)") + extra_metric_values.append(s.type) + data.metric_value += extra_metric_values + + for v in data.metric_value: + if JOURNEY_TYPES.get(v): + sub_events.append({"column": JOURNEY_TYPES[v]["column"], + "eventType": JOURNEY_TYPES[v]["eventType"]}) + + if len(sub_events) == 1: + main_column = sub_events[0]['column'] + else: + main_column = f"multiIf(%s,%s)" % ( + ','.join([f"`$event_name`='{s['eventType']}',{s['column']}" for s in sub_events[:-1]]), + sub_events[-1]["column"]) + extra_values = {} + reverse = data.start_type == "end" + for i, sf in enumerate(data.start_point): + f_k = f"start_point_{i}" + op = sh.get_sql_operator(sf.operator) + sf.value = helper.values_for_operator(value=sf.value, op=sf.operator) + is_not = sh.is_negation_operator(sf.operator) + event_column = JOURNEY_TYPES[sf.type]['column'] + event_type = JOURNEY_TYPES[sf.type]['eventType'] + extra_values = {**extra_values, **sh.multi_values(sf.value, value_key=f_k), + f"start_event_type_{i}": event_type} + start_points_conditions.append(f"(`$event_name`=%(start_event_type_{i})s AND " + + sh.multi_conditions(f'{event_column} {op} %({f_k})s', sf.value, is_not=is_not, + value_key=f_k) + + ")") + step_0_conditions.append(f"(`$event_name`=%(start_event_type_{i})s AND " + + sh.multi_conditions(f'e_value {op} %({f_k})s', sf.value, is_not=is_not, + value_key=f_k) + + ")") + if len(start_points_conditions) > 0: + start_points_conditions = ["(" + " OR ".join(start_points_conditions) + ")", + "events.project_id = toUInt16(%(project_id)s)", + "events.created_at >= toDateTime(%(startTimestamp)s / 1000)", + "events.created_at < toDateTime(%(endTimestamp)s / 1000)"] + step_0_conditions = ["(" + " OR ".join(step_0_conditions) + ")", + "pre_ranked_events.event_number_in_session = 1"] + + exclusions = {} + for i, ef in enumerate(data.excludes): + if len(ef.value) == 0: + continue + if ef.type in data.metric_value: + f_k = f"exclude_{i}" + ef.value = helper.values_for_operator(value=ef.value, op=ef.operator) + op = sh.get_sql_operator(ef.operator) + op = sh.reverse_sql_operator(op) + extra_values = {**extra_values, **sh.multi_values(ef.value, value_key=f_k)} + exclusions[ef.type] = [ + sh.multi_conditions(f'{JOURNEY_TYPES[ef.type]["column"]} {op} %({f_k})s', ef.value, is_not=True, + value_key=f_k)] + + sessions_conditions = [] + meta_keys = None + for i, f in enumerate(data.series[0].filter.filters): + op = sh.get_sql_operator(f.operator) + is_any = sh.isAny_opreator(f.operator) + is_not = sh.is_negation_operator(f.operator) + is_undefined = sh.isUndefined_operator(f.operator) + f_k = f"f_value_{i}" + extra_values = {**extra_values, **sh.multi_values(f.value, value_key=f_k)} + + if not is_any and len(f.value) == 0: + continue + + process_filter(f, is_any, is_not, is_undefined, op, f_k, sessions_conditions, extra_values, meta_keys, + project_id) + + if reverse: + path_direction = "DESC" + else: + path_direction = "" + + ch_sub_query = __get_basic_constraints_events(table_name="events") + selected_event_type_sub_query = [] + for s in data.metric_value: + selected_event_type_sub_query.append(f"events.`$event_name` = '{JOURNEY_TYPES[s]['eventType']}'") + if s in exclusions: + selected_event_type_sub_query[-1] += " AND (" + " AND ".join(exclusions[s]) + ")" + selected_event_type_sub_query = " OR ".join(selected_event_type_sub_query) + ch_sub_query.append(f"({selected_event_type_sub_query})") + + main_events_table = exp_ch_helper.get_main_events_table(data.startTimestamp) + " AS events" + main_sessions_table = exp_ch_helper.get_main_sessions_table(data.startTimestamp) + " AS sessions" + if len(sessions_conditions) > 0: + sessions_conditions.append(f"sessions.project_id = toUInt16(%(project_id)s)") + sessions_conditions.append(f"sessions.datetime >= toDateTime(%(startTimestamp)s / 1000)") + sessions_conditions.append(f"sessions.datetime < toDateTime(%(endTimestamp)s / 1000)") + sessions_conditions.append("sessions.events_count>1") + sessions_conditions.append("sessions.duration>0") + + initial_sessions_cte = f"""sub_sessions AS (SELECT DISTINCT session_id + FROM {main_sessions_table} + WHERE {" AND ".join(sessions_conditions)}),""" + else: + initial_sessions_cte = "" + + if len(start_points_conditions) == 0: + step_0_subquery = """SELECT DISTINCT session_id + FROM (SELECT `$event_name`, e_value + FROM pre_ranked_events + WHERE event_number_in_session = 1 + GROUP BY `$event_name`, e_value + ORDER BY count(1) DESC + LIMIT 1) AS top_start_events + INNER JOIN pre_ranked_events + ON (top_start_events.`$event_name` = pre_ranked_events.`$event_name` AND + top_start_events.e_value = pre_ranked_events.e_value) + WHERE pre_ranked_events.event_number_in_session = 1""" + initial_event_cte = "" + else: + step_0_subquery = f"""SELECT DISTINCT session_id + FROM pre_ranked_events + WHERE {" AND ".join(step_0_conditions)}""" + initial_event_cte = f"""\ + initial_event AS (SELECT events.session_id, MIN(created_at) AS start_event_timestamp + FROM {main_events_table} {"INNER JOIN sub_sessions USING (session_id)" if len(sessions_conditions) > 0 else ""} + WHERE {" AND ".join(start_points_conditions)} + GROUP BY 1),""" + ch_sub_query.append("events.created_at>=initial_event.start_event_timestamp") + main_events_table += " INNER JOIN initial_event ON (events.session_id = initial_event.session_id)" + sessions_conditions = [] + + steps_query = ["""n1 AS (SELECT event_number_in_session, + `$event_name` as event_type, + e_value, + next_type, + next_value, + COUNT(1) AS sessions_count + FROM ranked_events + WHERE event_number_in_session = 1 + AND isNotNull(next_value) + GROUP BY event_number_in_session, event_type, e_value, next_type, next_value + ORDER BY sessions_count DESC + LIMIT %(eventThresholdNumberInGroup)s)"""] + projection_query = ["""SELECT event_number_in_session, + event_type, + e_value, + next_type, + next_value, + sessions_count + FROM n1"""] + for i in range(2, data.density + 1): + steps_query.append(f"""n{i} AS (SELECT * + FROM (SELECT re.event_number_in_session AS event_number_in_session, + re.`$event_name` as $event_name, + re.e_value AS e_value, + re.next_type AS next_type, + re.next_value AS next_value, + COUNT(1) AS sessions_count + FROM n{i - 1} INNER JOIN ranked_events AS re + ON (n{i - 1}.next_value = re.e_value AND n{i - 1}.next_type = re.`$event_name`) + WHERE re.event_number_in_session = {i} + GROUP BY re.event_number_in_session, re.`$event_name`, re.e_value, re.next_type, re.next_value) AS sub_level + ORDER BY sessions_count DESC + LIMIT %(eventThresholdNumberInGroup)s)""") + projection_query.append(f"""SELECT event_number_in_session, + `$event_name`, + e_value, + next_type, + next_value, + sessions_count + FROM n{i}""") + + with ch_client.ClickHouseClient(database="product_analytics") as ch: + time_key = TimeUTC.now() + _now = time() + params = {"project_id": project_id, "startTimestamp": data.startTimestamp, + "endTimestamp": data.endTimestamp, "density": data.density, + # This is ignored because UI will take care of it + # "eventThresholdNumberInGroup": 4 if data.hide_excess else 8, + "eventThresholdNumberInGroup": 8, + **extra_values} + + ch_query1 = f"""\ +CREATE TEMPORARY TABLE pre_ranked_events_{time_key} AS +WITH {initial_sessions_cte} + {initial_event_cte} + pre_ranked_events AS (SELECT * + FROM (SELECT session_id, + `$event_name`, + created_at, + {main_column} AS e_value, + row_number() OVER (PARTITION BY session_id + ORDER BY created_at {path_direction}, + event_id {path_direction} ) AS event_number_in_session + FROM {main_events_table} {"INNER JOIN sub_sessions ON (sub_sessions.session_id = events.session_id)" if len(sessions_conditions) > 0 else ""} + WHERE {" AND ".join(ch_sub_query)} + ) AS full_ranked_events + WHERE {" AND ".join(step_1_post_conditions)}) +SELECT * +FROM pre_ranked_events;""" + logger.debug("---------Q1-----------") + query = ch.format(query=ch_query1, parameters=params) + ch.execute(query=query) + if time() - _now > 2: + logger.warning(f">>>>>>>>>PathAnalysis long query EE ({int(time() - _now)}s)<<<<<<<<<") + logger.warning(query) + logger.warning("----------------------") + _now = time() + + ch_query2 = f"""\ +CREATE TEMPORARY TABLE ranked_events_{time_key} AS +WITH pre_ranked_events AS (SELECT * + FROM pre_ranked_events_{time_key}), + start_points AS ({step_0_subquery}), + ranked_events AS (SELECT pre_ranked_events.*, + leadInFrame(e_value) + OVER (PARTITION BY session_id ORDER BY created_at {path_direction} + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS next_value, + leadInFrame(toNullable(`$event_name`)) + OVER (PARTITION BY session_id ORDER BY created_at {path_direction} + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS next_type + FROM start_points INNER JOIN pre_ranked_events USING (session_id)) +SELECT * +FROM ranked_events;""" + logger.debug("---------Q2-----------") + query = ch.format(query=ch_query2, parameters=params) + ch.execute(query=query) + if time() - _now > 2: + logger.warning(f">>>>>>>>>PathAnalysis long query EE ({int(time() - _now)}s)<<<<<<<<<") + logger.warning(query) + logger.warning("----------------------") + _now = time() + + ch_query3 = f"""\ +WITH ranked_events AS (SELECT * + FROM ranked_events_{time_key}), + {",".join(steps_query)} +SELECT * +FROM ({" UNION ALL ".join(projection_query)}) AS chart_steps +ORDER BY event_number_in_session;""" + logger.debug("---------Q3-----------") + query = ch.format(query=ch_query3, parameters=params) + rows = ch.execute(query=query) + if time() - _now > 2: + logger.warning(f">>>>>>>>>PathAnalysis long query EE ({int(time() - _now)}s)<<<<<<<<<") + logger.warning(query) + logger.warning("----------------------") + + return __transform_journey(rows=rows, reverse_path=reverse) + + +def process_filter(f, is_any, is_not, is_undefined, op, f_k, sessions_conditions, extra_values, meta_keys, project_id): + # Mapping for common types to their column names + type_column_mapping = { + schemas.FilterType.USER_BROWSER: 'user_browser', + schemas.FilterType.USER_OS: 'user_os', + schemas.FilterType.USER_DEVICE: 'user_device', + schemas.FilterType.USER_COUNTRY: 'user_country', + schemas.FilterType.USER_CITY: 'user_city', + schemas.FilterType.USER_STATE: 'user_state', + schemas.FilterType.UTM_SOURCE: 'utm_source', + schemas.FilterType.UTM_MEDIUM: 'utm_medium', + schemas.FilterType.UTM_CAMPAIGN: 'utm_campaign', + schemas.FilterType.USER_ID: 'user_id', + schemas.FilterType.USER_ID_MOBILE: 'user_id', + schemas.FilterType.USER_ANONYMOUS_ID: 'user_anonymous_id', + schemas.FilterType.USER_ANONYMOUS_ID_MOBILE: 'user_anonymous_id', + schemas.FilterType.REV_ID: 'rev_id', + schemas.FilterType.REV_ID_MOBILE: 'rev_id', + } + + if f.type in type_column_mapping: + column = type_column_mapping[f.type] + if is_any: + sessions_conditions.append(f'isNotNull({column})') + elif is_undefined: + sessions_conditions.append(f'isNull({column})') + else: + sessions_conditions.append( + sh.multi_conditions(f"{column} {op} toString(%({f_k})s)", f.value, is_not=is_not, value_key=f_k) + ) + + elif f.type == schemas.FilterType.DURATION: + if len(f.value) > 0 and f.value[0] is not None: + sessions_conditions.append("duration >= %(minDuration)s") + extra_values["minDuration"] = f.value[0] + if len(f.value) > 1 and f.value[1] is not None and int(f.value[1]) > 0: + sessions_conditions.append("duration <= %(maxDuration)s") + extra_values["maxDuration"] = f.value[1] + + elif f.type == schemas.FilterType.REFERRER: + if is_any: + sessions_conditions.append('isNotNull(base_referrer)') + else: + sessions_conditions.append( + sh.multi_conditions(f"base_referrer {op} %({f_k})s", f.value, is_not=is_not, value_key=f_k) + ) + + elif f.type == schemas.FilterType.METADATA: + if meta_keys is None: + meta_keys = metadata.get(project_id=project_id) + meta_keys = {m["key"]: m["index"] for m in meta_keys} + if f.source in meta_keys.keys(): + column = metadata.index_to_colname(meta_keys[f.source]) + if is_any: + sessions_conditions.append(f"isNotNull({column})") + elif is_undefined: + sessions_conditions.append(f"isNull({column})") + else: + sessions_conditions.append( + sh.multi_conditions(f"{column} {op} toString(%({f_k})s)", f.value, is_not=is_not, value_key=f_k) + ) + + elif f.type == schemas.FilterType.PLATFORM: + sessions_conditions.append( + sh.multi_conditions(f"user_device_type {op} %({f_k})s", f.value, is_not=is_not, value_key=f_k) + ) + + elif f.type == schemas.FilterType.ISSUE: + if is_any: + sessions_conditions.append("array_length(issue_types, 1) > 0") + else: + sessions_conditions.append( + sh.multi_conditions(f"has(issue_types,%({f_k})s)", f.value, is_not=is_not, value_key=f_k) + ) + + elif f.type == schemas.FilterType.EVENTS_COUNT: + sessions_conditions.append( + sh.multi_conditions(f"events_count {op} %({f_k})s", f.value, is_not=is_not, value_key=f_k) + ) diff --git a/api/chalicelib/core/sessions/sessions_ch.py b/api/chalicelib/core/sessions/sessions_ch.py index 0dd9d4d1e..16641b4dd 100644 --- a/api/chalicelib/core/sessions/sessions_ch.py +++ b/api/chalicelib/core/sessions/sessions_ch.py @@ -21,10 +21,10 @@ def search2_series(data: schemas.SessionsSearchPayloadSchema, project_id: int, d if metric_of == schemas.MetricOfTable.VISITED_URL: extra_event = f"""SELECT DISTINCT ev.session_id, ev.url_path FROM {exp_ch_helper.get_main_events_table(data.startTimestamp)} AS ev - WHERE ev.datetime >= toDateTime(%(startDate)s / 1000) - AND ev.datetime <= toDateTime(%(endDate)s / 1000) + WHERE ev.created_at >= toDateTime(%(startDate)s / 1000) + AND ev.created_at <= toDateTime(%(endDate)s / 1000) AND ev.project_id = %(project_id)s - AND ev.event_type = 'LOCATION'""" + AND ev.`$event_name` = 'LOCATION'""" elif metric_of == schemas.MetricOfTable.ISSUES and len(metric_value) > 0: data.filters.append(schemas.SessionSearchFilterSchema(value=metric_value, type=schemas.FilterType.ISSUE, operator=schemas.SearchEventOperator.IS)) @@ -135,12 +135,13 @@ def search2_table(data: schemas.SessionsSearchPayloadSchema, project_id: int, de extra_deduplication = [] extra_conditions = None if metric_of == schemas.MetricOfTable.VISITED_URL: - extra_event = f"""SELECT DISTINCT ev.session_id, ev.url_path - FROM {exp_ch_helper.get_main_events_table(data.startTimestamp)} AS ev - WHERE ev.datetime >= toDateTime(%(startDate)s / 1000) - AND ev.datetime <= toDateTime(%(endDate)s / 1000) - AND ev.project_id = %(project_id)s - AND ev.event_type = 'LOCATION'""" + extra_event = f"""SELECT DISTINCT ev.session_id, + JSONExtractString(toString(ev.`$properties`), 'url_path') AS url_path + FROM {exp_ch_helper.get_main_events_table(data.startTimestamp)} AS ev + WHERE ev.created_at >= toDateTime(%(startDate)s / 1000) + AND ev.created_at <= toDateTime(%(endDate)s / 1000) + AND ev.project_id = %(project_id)s + AND ev.`$event_name` = 'LOCATION'""" extra_deduplication.append("url_path") extra_conditions = {} for e in data.events: @@ -158,12 +159,14 @@ def search2_table(data: schemas.SessionsSearchPayloadSchema, project_id: int, de extra_conditions[e.operator].value.append(v) extra_conditions = list(extra_conditions.values()) elif metric_of == schemas.MetricOfTable.FETCH: - extra_event = f"""SELECT DISTINCT ev.session_id, ev.url_path - FROM {exp_ch_helper.get_main_events_table(data.startTimestamp)} AS ev - WHERE ev.datetime >= toDateTime(%(startDate)s / 1000) - AND ev.datetime <= toDateTime(%(endDate)s / 1000) - AND ev.project_id = %(project_id)s - AND ev.event_type = 'REQUEST'""" + extra_event = f"""SELECT DISTINCT ev.session_id, + JSONExtractString(toString(ev.`$properties`), 'url_path') AS url_path + FROM {exp_ch_helper.get_main_events_table(data.startTimestamp)} AS ev + WHERE ev.created_at >= toDateTime(%(startDate)s / 1000) + AND ev.created_at <= toDateTime(%(endDate)s / 1000) + AND ev.project_id = %(project_id)s + AND ev.`$event_name` = 'REQUEST'""" + extra_deduplication.append("url_path") extra_conditions = {} for e in data.events: @@ -286,6 +289,53 @@ def __is_valid_event(is_any: bool, event: schemas.SessionSearchEventSchema2): event.filters is None or len(event.filters) == 0)) +def json_condition(table_alias, json_column, json_key, op, values, value_key, check_existence=False, + numeric_check=False, numeric_type="float"): + """ + Constructs a condition to filter a JSON column dynamically in SQL queries. + + Parameters: + table_alias (str): Alias of the table (e.g., 'main', 'sub'). + json_column (str): Name of the JSON column (e.g., '$properties'). + json_key (str): Key in the JSON object to extract. + op (str): SQL operator to apply (e.g., '=', 'ILIKE', etc.). + values (str | list | tuple): Single value, list of values, or tuple to compare. + value_key (str): The parameterized key for SQL (e.g., 'custom'). + check_existence (bool): Whether to include a JSONHas condition to check if the key exists. + numeric_check (bool): Whether to include a numeric check on the extracted value. + numeric_type (str): Type for numeric extraction, "int" or "float". + + Returns: + str: The constructed condition. + """ + if isinstance(values, tuple): + values = list(values) + elif not isinstance(values, list): + values = [values] + + conditions = [] + + # Add JSONHas condition if required + if check_existence: + conditions.append(f"JSONHas(toString({table_alias}.`{json_column}`), '{json_key}')") + + # Determine the extraction function for numeric checks + if numeric_check: + extract_func = "JSONExtractFloat" if numeric_type == "float" else "JSONExtractInt" + conditions.append(f"{extract_func}(toString({table_alias}.`{json_column}`), '{json_key}') > 0") + + # Add the main condition for value comparison + if numeric_check: + extract_func = "JSONExtractFloat" if numeric_type == "float" else "JSONExtractInt" + condition = f"{extract_func}(toString({table_alias}.`{json_column}`), '{json_key}') {op} %({value_key})s" + else: + condition = f"JSONExtractString(toString({table_alias}.`{json_column}`), '{json_key}') {op} %({value_key})s" + + conditions.append(sh.multi_conditions(condition, values, value_key=value_key)) + + return " AND ".join(conditions) + + # this function generates the query and return the generated-query with the dict of query arguments def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_status, errors_only, favorite_only, issue, project_id, user_id, platform="web", extra_event=None, extra_deduplication=[], @@ -317,11 +367,11 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu events_query_part = "" issues = [] __events_where_basic = ["project_id = %(projectId)s", - "datetime >= toDateTime(%(startDate)s/1000)", - "datetime <= toDateTime(%(endDate)s/1000)"] + "created_at >= toDateTime(%(startDate)s/1000)", + "created_at <= toDateTime(%(endDate)s/1000)"] events_conditions_where = ["main.project_id = %(projectId)s", - "main.datetime >= toDateTime(%(startDate)s/1000)", - "main.datetime <= toDateTime(%(endDate)s/1000)"] + "main.created_at >= toDateTime(%(startDate)s/1000)", + "main.created_at <= toDateTime(%(endDate)s/1000)"] if len(data.filters) > 0: meta_keys = None # to reduce include a sub-query of sessions inside events query, in order to reduce the selected data @@ -600,8 +650,8 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu # event_from = f"%s INNER JOIN {MAIN_SESSIONS_TABLE} AS ms USING (session_id)" event_from = "%s" event_where = ["main.project_id = %(projectId)s", - "main.datetime >= toDateTime(%(startDate)s/1000)", - "main.datetime <= toDateTime(%(endDate)s/1000)"] + "main.created_at >= toDateTime(%(startDate)s/1000)", + "main.created_at <= toDateTime(%(endDate)s/1000)"] e_k = f"e_value{i}" s_k = e_k + "_source" @@ -616,41 +666,48 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu if platform == "web": _column = events.EventType.CLICK.column event_where.append( - f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") + f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") events_conditions.append({"type": event_where[-1]}) if not is_any: if schemas.ClickEventExtraOperator.has_value(event.operator): - event_where.append( - sh.multi_conditions(f"main.selector {op} %({e_k})s", event.value, value_key=e_k)) + event_where.append(json_condition( + "main", + "$properties", + "selector", op, event.value, e_k) + ) events_conditions[-1]["condition"] = event_where[-1] else: if is_not: - event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append(json_condition( + "sub", "$properties", _column, op, event.value, e_k + )) events_conditions_not.append( { - "type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) + "type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) events_conditions_not[-1]["condition"] = event_where[-1] else: - event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append( + json_condition("main", "$properties", _column, op, event.value, e_k) + ) events_conditions[-1]["condition"] = event_where[-1] else: _column = events.EventType.CLICK_MOBILE.column event_where.append( - f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") + f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") events_conditions.append({"type": event_where[-1]}) if not is_any: if is_not: - event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append( + json_condition("sub", "$properties", _column, op, event.value, e_k) + ) events_conditions_not.append( { - "type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) + "type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) events_conditions_not[-1]["condition"] = event_where[-1] else: - event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append( + json_condition("main", "$properties", _column, op, event.value, e_k) + ) events_conditions[-1]["condition"] = event_where[-1] elif event_type == events.EventType.INPUT.ui_type: @@ -658,40 +715,47 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu if platform == "web": _column = events.EventType.INPUT.column event_where.append( - f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") + f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") events_conditions.append({"type": event_where[-1]}) if not is_any: if is_not: - event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append( + json_condition("sub", "$properties", _column, op, event.value, e_k) + ) events_conditions_not.append( { - "type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) + "type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) events_conditions_not[-1]["condition"] = event_where[-1] else: - event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append( + json_condition("main", "$properties", _column, op, event.value, e_k) + ) events_conditions[-1]["condition"] = event_where[-1] if event.source is not None and len(event.source) > 0: - event_where.append(sh.multi_conditions(f"main.value ILIKE %(custom{i})s", event.source, - value_key=f"custom{i}")) + event_where.append( + json_condition("main", "$properties", "value", "ILIKE", event.source, f"custom{i}") + ) + full_args = {**full_args, **sh.multi_values(event.source, value_key=f"custom{i}")} else: _column = events.EventType.INPUT_MOBILE.column event_where.append( - f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") + f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") events_conditions.append({"type": event_where[-1]}) if not is_any: if is_not: - event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append( + json_condition("sub", "$properties", _column, op, event.value, e_k) + ) events_conditions_not.append( { - "type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) + "type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) events_conditions_not[-1]["condition"] = event_where[-1] else: - event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append( + json_condition("main", "$properties", _column, op, event.value, e_k) + ) + events_conditions[-1]["condition"] = event_where[-1] elif event_type == events.EventType.LOCATION.ui_type: @@ -699,32 +763,35 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu if platform == "web": _column = 'url_path' event_where.append( - f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") + f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") events_conditions.append({"type": event_where[-1]}) if not is_any: if is_not: - event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append( + json_condition("sub", "$properties", _column, op, event.value, e_k) + ) events_conditions_not.append( { - "type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) + "type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) events_conditions_not[-1]["condition"] = event_where[-1] else: - event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", - event.value, value_key=e_k)) + event_where.append( + json_condition("main", "$properties", _column, op, event.value, e_k) + ) events_conditions[-1]["condition"] = event_where[-1] else: _column = events.EventType.VIEW_MOBILE.column event_where.append( - f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") + f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") events_conditions.append({"type": event_where[-1]}) if not is_any: if is_not: - event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append( + json_condition("sub", "$properties", _column, op, event.value, e_k) + ) events_conditions_not.append( { - "type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) + "type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) events_conditions_not[-1]["condition"] = event_where[-1] else: event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", @@ -733,57 +800,70 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu elif event_type == events.EventType.CUSTOM.ui_type: event_from = event_from % f"{MAIN_EVENTS_TABLE} AS main " _column = events.EventType.CUSTOM.column - event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") + event_where.append( + f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") events_conditions.append({"type": event_where[-1]}) if not is_any: if is_not: - event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append( + json_condition("sub", "$properties", _column, op, event.value, e_k) + ) events_conditions_not.append( - {"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) + { + "type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) events_conditions_not[-1]["condition"] = event_where[-1] else: - event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append(json_condition( + "main", "$properties", _column, op, event.value, e_k + )) events_conditions[-1]["condition"] = event_where[-1] elif event_type == events.EventType.REQUEST.ui_type: event_from = event_from % f"{MAIN_EVENTS_TABLE} AS main " _column = 'url_path' - event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") + event_where.append( + f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") events_conditions.append({"type": event_where[-1]}) if not is_any: if is_not: - event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append(json_condition( + "sub", "$properties", _column, op, event.value, e_k + )) events_conditions_not.append( - {"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) + { + "type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) events_conditions_not[-1]["condition"] = event_where[-1] else: - event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append(json_condition( + "main", "$properties", _column, op, event.value, e_k + )) events_conditions[-1]["condition"] = event_where[-1] elif event_type == events.EventType.STATEACTION.ui_type: event_from = event_from % f"{MAIN_EVENTS_TABLE} AS main " _column = events.EventType.STATEACTION.column - event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") + event_where.append( + f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") events_conditions.append({"type": event_where[-1]}) if not is_any: if is_not: - event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append(json_condition( + "sub", "$properties", _column, op, event.value, e_k + )) events_conditions_not.append( - {"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) + { + "type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) events_conditions_not[-1]["condition"] = event_where[-1] else: - event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", - event.value, value_key=e_k)) + event_where.append(json_condition( + "main", "$properties", _column, op, event.value, e_k + )) events_conditions[-1]["condition"] = event_where[-1] # TODO: isNot for ERROR elif event_type == events.EventType.ERROR.ui_type: event_from = event_from % f"{MAIN_EVENTS_TABLE} AS main" events_extra_join = f"SELECT * FROM {MAIN_EVENTS_TABLE} AS main1 WHERE main1.project_id=%(project_id)s" - event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") + event_where.append( + f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") events_conditions.append({"type": event_where[-1]}) event.source = tuple(event.source) events_conditions[-1]["condition"] = [] @@ -803,127 +883,161 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu # ----- Mobile elif event_type == events.EventType.CLICK_MOBILE.ui_type: _column = events.EventType.CLICK_MOBILE.column - event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") + event_where.append( + f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") events_conditions.append({"type": event_where[-1]}) if not is_any: if is_not: - event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append(json_condition( + "sub", "$properties", _column, op, event.value, e_k + )) events_conditions_not.append( - {"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) + { + "type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) events_conditions_not[-1]["condition"] = event_where[-1] else: - event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append(json_condition( + "main", "$properties", _column, op, event.value, e_k + )) events_conditions[-1]["condition"] = event_where[-1] elif event_type == events.EventType.INPUT_MOBILE.ui_type: _column = events.EventType.INPUT_MOBILE.column - event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") + event_where.append( + f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") events_conditions.append({"type": event_where[-1]}) if not is_any: if is_not: - event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append(json_condition( + "sub", "$properties", _column, op, event.value, e_k + )) events_conditions_not.append( - {"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) + { + "type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) events_conditions_not[-1]["condition"] = event_where[-1] else: - event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append(json_condition( + "main", "$properties", _column, op, event.value, e_k + )) events_conditions[-1]["condition"] = event_where[-1] elif event_type == events.EventType.VIEW_MOBILE.ui_type: _column = events.EventType.VIEW_MOBILE.column - event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") + event_where.append( + f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") events_conditions.append({"type": event_where[-1]}) if not is_any: if is_not: - event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append(json_condition( + "sub", "$properties", _column, op, event.value, e_k + )) events_conditions_not.append( - {"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) + { + "type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) events_conditions_not[-1]["condition"] = event_where[-1] else: - event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", - event.value, value_key=e_k)) + event_where.append(json_condition( + "main", "$properties", _column, op, event.value, e_k + )) events_conditions[-1]["condition"] = event_where[-1] elif event_type == events.EventType.CUSTOM_MOBILE.ui_type: _column = events.EventType.CUSTOM_MOBILE.column - event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") + event_where.append( + f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") events_conditions.append({"type": event_where[-1]}) if not is_any: if is_not: - event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append(json_condition( + "sub", "$properties", _column, op, event.value, e_k + )) events_conditions_not.append( - {"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) + { + "type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) events_conditions_not[-1]["condition"] = event_where[-1] else: - event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", - event.value, value_key=e_k)) + event_where.append(json_condition( + "main", "$properties", _column, op, event.value, e_k + )) + events_conditions[-1]["condition"] = event_where[-1] elif event_type == events.EventType.REQUEST_MOBILE.ui_type: event_from = event_from % f"{MAIN_EVENTS_TABLE} AS main " _column = 'url_path' - event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") + event_where.append( + f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") events_conditions.append({"type": event_where[-1]}) if not is_any: if is_not: - event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append(json_condition( + "sub", "$properties", _column, op, event.value, e_k + )) events_conditions_not.append( - {"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) + { + "type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) events_conditions_not[-1]["condition"] = event_where[-1] else: - event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append(json_condition( + "main", "$properties", _column, op, event.value, e_k + )) events_conditions[-1]["condition"] = event_where[-1] elif event_type == events.EventType.CRASH_MOBILE.ui_type: _column = events.EventType.CRASH_MOBILE.column - event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") + event_where.append( + f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") events_conditions.append({"type": event_where[-1]}) if not is_any: if is_not: - event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append(json_condition( + "sub", "$properties", _column, op, event.value, e_k + )) + events_conditions_not.append( - {"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) + { + "type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) events_conditions_not[-1]["condition"] = event_where[-1] else: - event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", - event.value, value_key=e_k)) + event_where.append(json_condition( + "main", "$properties", _column, op, event.value, e_k + )) events_conditions[-1]["condition"] = event_where[-1] elif event_type == events.EventType.SWIPE_MOBILE.ui_type and platform != "web": _column = events.EventType.SWIPE_MOBILE.column - event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") + event_where.append( + f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") events_conditions.append({"type": event_where[-1]}) if not is_any: if is_not: - event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append(json_condition( + "sub", "$properties", _column, op, event.value, e_k + )) events_conditions_not.append( - {"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) + { + "type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) events_conditions_not[-1]["condition"] = event_where[-1] else: - event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", - event.value, value_key=e_k)) + event_where.append(json_condition( + "main", "$properties", _column, op, event.value, e_k + )) events_conditions[-1]["condition"] = event_where[-1] elif event_type == schemas.PerformanceEventType.FETCH_FAILED: event_from = event_from % f"{MAIN_EVENTS_TABLE} AS main " _column = 'url_path' - event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") + event_where.append( + f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") events_conditions.append({"type": event_where[-1]}) events_conditions[-1]["condition"] = [] if not is_any: if is_not: - event_where.append(sh.multi_conditions(f"sub.{_column} {op} %({e_k})s", event.value, - value_key=e_k)) + event_where.append(json_condition( + "sub", "$properties", _column, op, event.value, e_k + )) events_conditions_not.append( - {"type": f"sub.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) + { + "type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) events_conditions_not[-1]["condition"] = event_where[-1] else: - event_where.append(sh.multi_conditions(f"main.{_column} {op} %({e_k})s", - event.value, value_key=e_k)) + event_where.append(json_condition( + "main", "$properties", _column, op, event.value, e_k + )) events_conditions[-1]["condition"].append(event_where[-1]) col = performance_event.get_col(event_type) colname = col["column"] @@ -935,52 +1049,57 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu schemas.PerformanceEventType.LOCATION_LARGEST_CONTENTFUL_PAINT_TIME, schemas.PerformanceEventType.LOCATION_TTFB]: event_from = event_from % f"{MAIN_EVENTS_TABLE} AS main " - event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") + event_where.append( + f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") events_conditions.append({"type": event_where[-1]}) events_conditions[-1]["condition"] = [] col = performance_event.get_col(event_type) colname = col["column"] tname = "main" if not is_any: - event_where.append( - sh.multi_conditions(f"main.url_path {op} %({e_k})s", - event.value, value_key=e_k)) + event_where.append(json_condition( + "main", "$properties", 'url_path', op, event.value, e_k + )) events_conditions[-1]["condition"].append(event_where[-1]) e_k += "_custom" full_args = {**full_args, **sh.multi_values(event.source, value_key=e_k)} - event_where.append(f"isNotNull({tname}.{colname}) AND {tname}.{colname}>0 AND " + - sh.multi_conditions(f"{tname}.{colname} {event.sourceOperator} %({e_k})s", - event.source, value_key=e_k)) + event_where.append(json_condition( + tname, "$properties", colname, event.sourceOperator, event.source, e_k, True, True) + ) + events_conditions[-1]["condition"].append(event_where[-1]) events_conditions[-1]["condition"] = " AND ".join(events_conditions[-1]["condition"]) # TODO: isNot for PerformanceEvent elif event_type in [schemas.PerformanceEventType.LOCATION_AVG_CPU_LOAD, schemas.PerformanceEventType.LOCATION_AVG_MEMORY_USAGE]: event_from = event_from % f"{MAIN_EVENTS_TABLE} AS main " - event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") + event_where.append( + f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") events_conditions.append({"type": event_where[-1]}) events_conditions[-1]["condition"] = [] col = performance_event.get_col(event_type) colname = col["column"] tname = "main" if not is_any: - event_where.append( - sh.multi_conditions(f"main.url_path {op} %({e_k})s", - event.value, value_key=e_k)) + event_where.append(json_condition( + "main", "$properties", 'url_path', op, event.value, e_k + )) events_conditions[-1]["condition"].append(event_where[-1]) e_k += "_custom" full_args = {**full_args, **sh.multi_values(event.source, value_key=e_k)} - event_where.append(f"isNotNull({tname}.{colname}) AND {tname}.{colname}>0 AND " + - sh.multi_conditions(f"{tname}.{colname} {event.sourceOperator} %({e_k})s", - event.source, value_key=e_k)) + event_where.append(json_condition( + tname, "$properties", colname, event.sourceOperator, event.source, e_k, True, True) + ) + events_conditions[-1]["condition"].append(event_where[-1]) events_conditions[-1]["condition"] = " AND ".join(events_conditions[-1]["condition"]) elif event_type == schemas.EventType.REQUEST_DETAILS: event_from = event_from % f"{MAIN_EVENTS_TABLE} AS main " - event_where.append(f"main.event_type='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") + event_where.append( + f"main.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'") events_conditions.append({"type": event_where[-1]}) apply = False events_conditions[-1]["condition"] = [] @@ -993,36 +1112,39 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu e_k_f = e_k + f"_fetch{j}" full_args = {**full_args, **sh.multi_values(f.value, value_key=e_k_f)} if f.type == schemas.FetchFilterType.FETCH_URL: - event_where.append( - sh.multi_conditions(f"main.url_path {op} %({e_k_f})s", f.value, - value_key=e_k_f)) + event_where.append(json_condition( + "main", "$properties", 'url_path', op, f.value, e_k_f + )) events_conditions[-1]["condition"].append(event_where[-1]) apply = True elif f.type == schemas.FetchFilterType.FETCH_STATUS_CODE: - event_where.append( - sh.multi_conditions(f"main.status {f.operator} %({e_k_f})s", f.value, - value_key=e_k_f)) + event_where.append(json_condition( + "main", "$properties", 'status', op, f.value, e_k_f + )) events_conditions[-1]["condition"].append(event_where[-1]) apply = True elif f.type == schemas.FetchFilterType.FETCH_METHOD: - event_where.append( - sh.multi_conditions(f"main.method {op} %({e_k_f})s", f.value, value_key=e_k_f)) + event_where.append(json_condition( + "main", "$properties", 'method', op, f.value, e_k_f + )) events_conditions[-1]["condition"].append(event_where[-1]) apply = True elif f.type == schemas.FetchFilterType.FETCH_DURATION: event_where.append( - sh.multi_conditions(f"main.duration {f.operator} %({e_k_f})s", f.value, + sh.multi_conditions(f"main.`$duration_s` {f.operator} %({e_k_f})s/1000", f.value, value_key=e_k_f)) events_conditions[-1]["condition"].append(event_where[-1]) apply = True elif f.type == schemas.FetchFilterType.FETCH_REQUEST_BODY: - event_where.append( - sh.multi_conditions(f"main.request_body {op} %({e_k_f})s", f.value, value_key=e_k_f)) + event_where.append(json_condition( + "main", "$properties", 'request_body', op, f.value, e_k_f + )) events_conditions[-1]["condition"].append(event_where[-1]) apply = True elif f.type == schemas.FetchFilterType.FETCH_RESPONSE_BODY: - event_where.append( - sh.multi_conditions(f"main.response_body {op} %({e_k_f})s", f.value, value_key=e_k_f)) + event_where.append(json_condition( + "main", "$properties", 'response_body', op, f.value, e_k_f + )) events_conditions[-1]["condition"].append(event_where[-1]) apply = True else: @@ -1034,7 +1156,7 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu # TODO: no isNot for GraphQL elif event_type == schemas.EventType.GRAPHQL: event_from = event_from % f"{MAIN_EVENTS_TABLE} AS main " - event_where.append(f"main.event_type='GRAPHQL'") + event_where.append(f"main.`$event_name`='GRAPHQL'") events_conditions.append({"type": event_where[-1]}) events_conditions[-1]["condition"] = [] for j, f in enumerate(event.filters): @@ -1046,21 +1168,24 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu e_k_f = e_k + f"_graphql{j}" full_args = {**full_args, **sh.multi_values(f.value, value_key=e_k_f)} if f.type == schemas.GraphqlFilterType.GRAPHQL_NAME: - event_where.append( - sh.multi_conditions(f"main.{events.EventType.GRAPHQL.column} {op} %({e_k_f})s", f.value, - value_key=e_k_f)) + event_where.append(json_condition( + "main", "$properties", events.EventType.GRAPHQL.column, op, f.value, e_k_f + )) events_conditions[-1]["condition"].append(event_where[-1]) elif f.type == schemas.GraphqlFilterType.GRAPHQL_METHOD: - event_where.append( - sh.multi_conditions(f"main.method {op} %({e_k_f})s", f.value, value_key=e_k_f)) + event_where.append(json_condition( + "main", "$properties", 'method', op, f.value, e_k_f + )) events_conditions[-1]["condition"].append(event_where[-1]) elif f.type == schemas.GraphqlFilterType.GRAPHQL_REQUEST_BODY: - event_where.append( - sh.multi_conditions(f"main.request_body {op} %({e_k_f})s", f.value, value_key=e_k_f)) + event_where.append(json_condition( + "main", "$properties", 'request_body', op, f.value, e_k_f + )) events_conditions[-1]["condition"].append(event_where[-1]) elif f.type == schemas.GraphqlFilterType.GRAPHQL_RESPONSE_BODY: - event_where.append( - sh.multi_conditions(f"main.response_body {op} %({e_k_f})s", f.value, value_key=e_k_f)) + event_where.append(json_condition( + "main", "$properties", 'response_body', op, f.value, e_k_f + )) events_conditions[-1]["condition"].append(event_where[-1]) else: logging.warning(f"undefined GRAPHQL filter: {f.type}") @@ -1099,7 +1224,7 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu pass else: events_query_from.append(f"""\ - (SELECT main.session_id, {"MIN" if event_index < (valid_events_count - 1) else "MAX"}(main.datetime) AS datetime + (SELECT main.session_id, {"MIN" if event_index < (valid_events_count - 1) else "MAX"}(main.created_at) AS datetime FROM {event_from} WHERE {" AND ".join(event_where)} GROUP BY session_id @@ -1167,13 +1292,13 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu del value_conditions_not if data.events_order == schemas.SearchEventOrder.THEN: - having = f"""HAVING sequenceMatch('{''.join(sequence_pattern)}')(main.datetime,{','.join(sequence_conditions)})""" + having = f"""HAVING sequenceMatch('{''.join(sequence_pattern)}')(toDateTime(main.created_at),{','.join(sequence_conditions)})""" else: having = f"""HAVING {" AND ".join([f"countIf({c})>0" for c in list(set(sequence_conditions))])}""" events_query_part = f"""SELECT main.session_id, - MIN(main.datetime) AS first_event_ts, - MAX(main.datetime) AS last_event_ts + MIN(main.created_at) AS first_event_ts, + MAX(main.created_at) AS last_event_ts FROM {MAIN_EVENTS_TABLE} AS main {events_extra_join} {sub_join} WHERE {" AND ".join(events_conditions_where)} @@ -1215,8 +1340,8 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu events_conditions_where.append(f"({' OR '.join(events_conditions)})") events_query_part = f"""SELECT main.session_id, - MIN(main.datetime) AS first_event_ts, - MAX(main.datetime) AS last_event_ts + MIN(main.created_at) AS first_event_ts, + MAX(main.created_at) AS last_event_ts FROM {MAIN_EVENTS_TABLE} AS main {events_extra_join} WHERE {" AND ".join(events_conditions_where)} GROUP BY session_id""" @@ -1239,8 +1364,8 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu AND issues.project_id = %(projectId)s AND events.project_id = %(projectId)s AND events.issue_type = %(issue_type)s - AND events.datetime >= toDateTime(%(startDate)s/1000) - AND events.datetime <= toDateTime(%(endDate)s/1000) + AND events.created_at >= toDateTime(%(startDate)s/1000) + AND events.created_at <= toDateTime(%(endDate)s/1000) ) AS issues ON (f.session_id = issues.session_id) """ full_args["issue_contextString"] = issue["contextString"] @@ -1259,8 +1384,8 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu INNER JOIN experimental.events USING (issue_id) WHERE issues.project_id = %(projectId)s AND events.project_id = %(projectId)s - AND events.datetime >= toDateTime(%(startDate)s/1000) - AND events.datetime <= toDateTime(%(endDate)s/1000) + AND events.created_at >= toDateTime(%(startDate)s/1000) + AND events.created_at <= toDateTime(%(endDate)s/1000) AND {" OR ".join(issues_conditions)} ) AS issues USING (session_id)""" diff --git a/api/chalicelib/utils/exp_ch_helper.py b/api/chalicelib/utils/exp_ch_helper.py index d36d87fb2..15c4986d8 100644 --- a/api/chalicelib/utils/exp_ch_helper.py +++ b/api/chalicelib/utils/exp_ch_helper.py @@ -8,7 +8,7 @@ logger = logging.getLogger(__name__) def get_main_events_table(timestamp=0, platform="web"): if platform == "web": - return "experimental.events" + return "product_analytics.events" else: return "experimental.ios_events" @@ -16,6 +16,7 @@ def get_main_events_table(timestamp=0, platform="web"): def get_main_sessions_table(timestamp=0): return "experimental.sessions" + def get_user_favorite_sessions_table(timestamp=0): return "experimental.user_favorite_sessions" diff --git a/api/chalicelib/utils/sql_helper.py b/api/chalicelib/utils/sql_helper.py index 723755863..23c0078c4 100644 --- a/api/chalicelib/utils/sql_helper.py +++ b/api/chalicelib/utils/sql_helper.py @@ -3,7 +3,7 @@ from enum import Enum import schemas -def get_sql_operator(op: Union[schemas.SearchEventOperator, schemas.ClickEventExtraOperator]): +def get_sql_operator(op: Union[schemas.SearchEventOperator, schemas.ClickEventExtraOperator, schemas.MathOperator]): return { schemas.SearchEventOperator.IS: "=", schemas.SearchEventOperator.ON: "=", @@ -21,6 +21,11 @@ def get_sql_operator(op: Union[schemas.SearchEventOperator, schemas.ClickEventEx schemas.ClickEventExtraOperator.NOT_CONTAINS: "NOT ILIKE", schemas.ClickEventExtraOperator.STARTS_WITH: "ILIKE", schemas.ClickEventExtraOperator.ENDS_WITH: "ILIKE", + + schemas.MathOperator.GREATER: ">", + schemas.MathOperator.GREATER_EQ: ">=", + schemas.MathOperator.LESS: "<", + schemas.MathOperator.LESS_EQ: "<=", }.get(op, "=")