From 53326cce3a123ad8603be8520e3d42e3daf4ad8d Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Tue, 12 Oct 2021 19:19:50 +0200 Subject: [PATCH] feat(api): ee insights small optimizations feat(api): ee insights fix materialized column-join issue --- ee/api/app.py | 6 +- ee/api/chalicelib/core/insights.py | 136 +++++++++++++++++++---------- 2 files changed, 93 insertions(+), 49 deletions(-) diff --git a/ee/api/app.py b/ee/api/app.py index 222e37a39..1cde5efe4 100644 --- a/ee/api/app.py +++ b/ee/api/app.py @@ -5,8 +5,9 @@ from sentry_sdk import configure_scope from chalicelib import _overrides from chalicelib.blueprints import bp_authorizers from chalicelib.blueprints import bp_core, bp_core_crons +from chalicelib.blueprints.app import v1_api from chalicelib.blueprints import bp_core_dynamic, bp_core_dynamic_crons -from chalicelib.blueprints.subs import bp_dashboard +from chalicelib.blueprints.subs import bp_dashboard,bp_insights from chalicelib.utils import helper from chalicelib.utils import pg_client from chalicelib.utils.helper import environ @@ -121,7 +122,8 @@ app.register_blueprint(bp_core_crons.app) app.register_blueprint(bp_core_dynamic.app) app.register_blueprint(bp_core_dynamic_crons.app) app.register_blueprint(bp_dashboard.app) - +app.register_blueprint(bp_insights.app) +app.register_blueprint(v1_api.app) # Enterprise app.register_blueprint(bp_ee.app) app.register_blueprint(bp_ee_crons.app) diff --git a/ee/api/chalicelib/core/insights.py b/ee/api/chalicelib/core/insights.py index b7f2b7092..1d8503f3f 100644 --- a/ee/api/chalicelib/core/insights.py +++ b/ee/api/chalicelib/core/insights.py @@ -25,7 +25,7 @@ JOURNEY_TYPES = { "PAGES": {"table": "pages", "column": "url_path"}, "CLICK": {"table": "clicks", "column": "label"}, # "VIEW": {"table": "events_ios.views", "column": "name"}, TODO: enable this for SAAS only - # "EVENT": {"table": "events_common.customs", "column": "name"} + "EVENT": {"table": "customs", "column": "name"} } @@ -200,31 +200,54 @@ def users_retention(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endT ch_sub_query.append("sessions.duration>0") ch_sub_query.append("sessions_metadata.datetime >= toDateTime(%(startTimestamp)s / 1000)") with ch_client.ClickHouseClient() as ch: - ch_query = f"""SELECT toInt8((connexion_week - toDate(%(startTimestamp)s / 1000)) / 7) AS week, - COUNT(DISTINCT all_connexions.user_id) AS users_count, - groupArray(100)(all_connexions.user_id) AS connected_users - FROM (SELECT DISTINCT user_id - FROM sessions_metadata INNER JOIN sessions USING (session_id) - WHERE {" AND ".join(ch_sub_query)} - AND toStartOfWeek(sessions.datetime,1) = toDate(%(startTimestamp)s / 1000) - AND sessions_metadata.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 ) - AND sessions.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 ) - AND isNull((SELECT 1 - FROM sessions_metadata AS bmsess INNER JOIN sessions AS bsess USING (session_id) - WHERE bsess.datetime < toDateTime(%(startTimestamp)s / 1000) - AND bmsess.datetime < toDateTime(%(startTimestamp)s / 1000) - AND bsess.project_id = %(project_id)s - AND bmsess.user_id = sessions_metadata.user_id - LIMIT 1)) - ) AS users_list - INNER JOIN (SELECT DISTINCT user_id, toStartOfWeek(datetime,1) AS connexion_week - FROM sessions_metadata INNER JOIN sessions USING (session_id) - WHERE {" AND ".join(ch_sub_query)} - AND sessions_metadata.datetime < toDateTime(%(endTimestamp)s / 1000) - ORDER BY connexion_week, user_id - ) AS all_connexions USING (user_id) - GROUP BY connexion_week - ORDER BY connexion_week;""" + # current optimization: from 6s to 4-5s + ch_query = f"""SELECT toInt8((toStartOfWeek(datetime, 1) - toDate(1630886400000 / 1000)) / 7) AS week, + COUNT(DISTINCT user_id) AS users_count + FROM sessions_metadata INNER JOIN sessions USING (session_id) + WHERE {" AND ".join(ch_sub_query)} + AND sessions_metadata.datetime < toDateTime(%(endTimestamp)s / 1000) + AND user_id IN (SELECT DISTINCT user_id + FROM sessions_metadata + INNER JOIN sessions USING (session_id) + WHERE {" AND ".join(ch_sub_query)} + AND toStartOfWeek(sessions.datetime,1) = toDate(%(startTimestamp)s / 1000) + AND sessions_metadata.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 ) + AND sessions.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 ) + AND isNull((SELECT 1 + FROM sessions_metadata AS bmsess + INNER JOIN sessions AS bsess USING (session_id) + WHERE bsess.datetime < toDateTime(%(startTimestamp)s / 1000) + AND bmsess.datetime < toDateTime(%(startTimestamp)s / 1000) + AND bsess.project_id = %(project_id)s + AND bmsess.user_id = sessions_metadata.user_id + LIMIT 1)) + ) + GROUP BY week;""" + # THIS IS THE ORIGINAL QUERY, PROBABLY WILL BE REUSED AGAIN WHEN CH-STRUCTURE CHANGES + # ch_query = f"""SELECT toInt8((connexion_week - toDate(%(startTimestamp)s / 1000)) / 7) AS week, + # COUNT(all_connexions.user_id) AS users_count, + # groupArray(100)(all_connexions.user_id) AS connected_users + # FROM (SELECT DISTINCT user_id + # FROM sessions_metadata INNER JOIN sessions USING (session_id) + # WHERE {" AND ".join(ch_sub_query)} + # AND toStartOfWeek(sessions.datetime,1) = toDate(%(startTimestamp)s / 1000) + # AND sessions_metadata.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 ) + # AND sessions.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 ) + # AND isNull((SELECT 1 + # FROM sessions_metadata AS bmsess INNER JOIN sessions AS bsess USING (session_id) + # WHERE bsess.datetime < toDateTime(%(startTimestamp)s / 1000) + # AND bmsess.datetime < toDateTime(%(startTimestamp)s / 1000) + # AND bsess.project_id = %(project_id)s + # AND bmsess.user_id = sessions_metadata.user_id + # LIMIT 1)) + # ) AS users_list + # INNER JOIN (SELECT DISTINCT user_id, toStartOfWeek(datetime,1) AS connexion_week + # FROM sessions_metadata INNER JOIN sessions USING (session_id) + # WHERE {" AND ".join(ch_sub_query)} + # AND sessions_metadata.datetime < toDateTime(%(endTimestamp)s / 1000) + # ) AS all_connexions USING (user_id) + # GROUP BY connexion_week + # ORDER BY connexion_week;""" params = {"project_id": project_id, "startTimestamp": startTimestamp, "endTimestamp": endTimestamp, **__get_constraint_values(args)} # print(ch_query % params) @@ -249,6 +272,7 @@ def users_acquisition(project_id, startTimestamp=TimeUTC.now(delta_days=-70), en ch_sub_query.append("sessions.duration>0") ch_sub_query.append("sessions_metadata.datetime >= toDateTime(%(startTimestamp)s / 1000)") with ch_client.ClickHouseClient() as ch: + # TODO: optimize after DB structure change, optimization from 6s to 4s ch_query = f"""SELECT toUnixTimestamp(toDateTime(first_connexion_week))*1000 AS first_connexion_week, week, users_count, @@ -284,7 +308,7 @@ def users_acquisition(project_id, startTimestamp=TimeUTC.now(delta_days=-70), en params = {"project_id": project_id, "startTimestamp": startTimestamp, "endTimestamp": endTimestamp, **__get_constraint_values(args)} - # print(ch_query%params) + print(ch_query % params) rows = ch.execute(ch_query, params) rows = __compute_weekly_percentage(helper.list_to_camel_case(rows)) return { @@ -342,13 +366,13 @@ def feature_retention(project_id, startTimestamp=TimeUTC.now(delta_days=-70), en COUNT(DISTINCT all_connexions.user_id) AS users_count, groupArray(100)(all_connexions.user_id) AS connected_users FROM (SELECT DISTINCT user_id - FROM sessions_metadata INNER JOIN {event_table} AS feature USING (session_id) + FROM {event_table} AS feature INNER JOIN sessions_metadata USING (session_id) WHERE {" AND ".join(ch_sub_query)} AND toStartOfWeek(feature.datetime,1) = toDate(%(startTimestamp)s / 1000) AND sessions_metadata.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 ) AND feature.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 ) AND isNull((SELECT 1 - FROM sessions_metadata AS bmsess INNER JOIN {event_table} AS bsess USING (session_id) + FROM {event_table} AS bsess INNER JOIN sessions_metadata AS bmsess USING (session_id) WHERE bsess.datetime < toDateTime(%(startTimestamp)s / 1000) AND bmsess.datetime < toDateTime(%(startTimestamp)s / 1000) AND bsess.project_id = %(project_id)s @@ -357,7 +381,7 @@ def feature_retention(project_id, startTimestamp=TimeUTC.now(delta_days=-70), en LIMIT 1)) ) AS users_list INNER JOIN (SELECT DISTINCT user_id, toStartOfWeek(datetime,1) AS connexion_week - FROM sessions_metadata INNER JOIN {event_table} AS feature USING (session_id) + FROM {event_table} AS feature INNER JOIN sessions_metadata USING (session_id) WHERE {" AND ".join(ch_sub_query)} AND sessions_metadata.datetime < toDateTime(%(endTimestamp)s / 1000) ORDER BY connexion_week, user_id @@ -367,7 +391,7 @@ def feature_retention(project_id, startTimestamp=TimeUTC.now(delta_days=-70), en params = {"project_id": project_id, "startTimestamp": startTimestamp, "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - # print(ch_query % params) + print(ch_query % params) rows = ch.execute(ch_query, params) rows = __compute_weekly_percentage(helper.list_to_camel_case(rows)) return { @@ -490,6 +514,7 @@ def feature_popularity_frequency(project_id, startTimestamp=TimeUTC.now(delta_da extra_values["user_id"] = f["value"] with ch_client.ClickHouseClient() as ch: + # TODO: change this query to not use join, optimization from 5s to 1s ch_query = f"""SELECT COUNT(DISTINCT user_id) AS count FROM sessions AS feature INNER JOIN sessions_metadata USING (session_id) WHERE {" AND ".join(ch_sub_query)} @@ -497,7 +522,7 @@ def feature_popularity_frequency(project_id, startTimestamp=TimeUTC.now(delta_da AND not empty(user_id);""" params = {"project_id": project_id, "startTimestamp": startTimestamp, "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - # print(ch_query % params) + print(ch_query % params) print("---------------------") all_user_count = ch.execute(ch_query, params) if len(all_user_count) == 0 or all_user_count[0]["count"] == 0: @@ -513,17 +538,19 @@ def feature_popularity_frequency(project_id, startTimestamp=TimeUTC.now(delta_da ORDER BY count DESC LIMIT 7;""" - # print(ch_query % params) - # print("---------------------") + print(ch_query % params) + print("---------------------") popularity = ch.execute(ch_query, params) + params["values"] = [p["value"] for p in popularity] ch_query = f"""SELECT {event_column} AS value, COUNT(session_id) AS count FROM {event_table} AS feature INNER JOIN sessions_metadata USING (session_id) WHERE {" AND ".join(ch_sub_query)} + AND {event_column} IN %(values)s GROUP BY value;""" - # print(ch_query % params) - # print("---------------------") + print(ch_query % params) + print("---------------------") frequencies = ch.execute(ch_query, params) total_usage = sum([f["count"] for f in frequencies]) frequencies = {f["value"]: f["count"] for f in frequencies} @@ -562,12 +589,14 @@ def feature_adoption(project_id, startTimestamp=TimeUTC.now(delta_days=-70), end ch_sub_query.append("user_id IS NOT NULL") ch_sub_query.append("not empty(user_id)") with ch_client.ClickHouseClient() as ch: + # TODO: optimize this when DB structure changes, optimization from 3s to 1s ch_query = f"""SELECT COUNT(DISTINCT user_id) AS count FROM sessions_metadata INNER JOIN sessions AS feature USING(session_id) WHERE {" AND ".join(ch_sub_query)};""" params = {"project_id": project_id, "startTimestamp": startTimestamp, "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - # print(ch_query%params) + print(ch_query % params) + print("---------------------") all_user_count = ch.execute(ch_query, params) if len(all_user_count) == 0 or all_user_count[0]["count"] == 0: return {"adoption": 0, "target": 0, "filters": [{"type": "EVENT_TYPE", "value": event_type}, @@ -583,6 +612,8 @@ def feature_adoption(project_id, startTimestamp=TimeUTC.now(delta_days=-70), end LIMIT 1;""" params = {"project_id": project_id, "startTimestamp": startTimestamp, "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} + print(ch_query % params) + print("---------------------") row = ch.execute(ch_query, params) if len(row) > 0: event_value = row[0]["value"] @@ -593,8 +624,8 @@ def feature_adoption(project_id, startTimestamp=TimeUTC.now(delta_days=-70), end WHERE {" AND ".join(ch_sub_query)};""" params = {"project_id": project_id, "startTimestamp": startTimestamp, "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - # print(ch_query% params) - # print("---------------------") + print(ch_query % params) + print("---------------------") adoption = ch.execute(ch_query, params) adoption = adoption[0]["count"] / all_user_count return {"target": all_user_count, "adoption": adoption, @@ -643,6 +674,7 @@ def feature_adoption_top_users(project_id, startTimestamp=TimeUTC.now(delta_days event_value = row[0]["value"] extra_values["value"] = event_value ch_sub_query.append(f"feature.{event_column} = %(value)s") + # TODO: no possible optimization right now ch_query = f"""SELECT user_id, COUNT(DISTINCT session_id) AS count FROM {event_table} AS feature INNER JOIN sessions_metadata USING (session_id) WHERE {" AND ".join(ch_sub_query)} @@ -651,7 +683,7 @@ def feature_adoption_top_users(project_id, startTimestamp=TimeUTC.now(delta_days LIMIT 10;""" params = {"project_id": project_id, "startTimestamp": startTimestamp, "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - # print(ch_query%params) + print(ch_query % params) rows = ch.execute(ch_query, params) return {"users": helper.list_to_camel_case(rows), "filters": [{"type": "EVENT_TYPE", "value": event_type}, {"type": "EVENT_VALUE", "value": event_value}]} @@ -700,6 +732,7 @@ def feature_adoption_daily_usage(project_id, startTimestamp=TimeUTC.now(delta_da event_value = row[0]["value"] extra_values["value"] = event_value ch_sub_query.append(f"feature.{event_column} = %(value)s") + # optimal ch_query = f"""SELECT toUnixTimestamp(day)*1000 AS timestamp, count FROM (SELECT toStartOfDay(feature.datetime) AS day, COUNT(DISTINCT session_id) AS count FROM {event_table} AS feature {"INNER JOIN sessions_metadata USING (session_id)" if len(meta_condition) > 0 else ""} @@ -777,6 +810,7 @@ def users_active(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTime ch_sub_query.append("sessions_metadata.datetime >= toDateTime(%(startTimestamp)s/1000)") ch_sub_query.append("sessions_metadata.datetime < toDateTime(%(endTimestamp)s/1000)") with ch_client.ClickHouseClient() as ch: + # TODO: optimize this when DB structure changes, optimization from 3s to 1s ch_query = f"""SELECT SUM(count) / intDiv(%(endTimestamp)s - %(startTimestamp)s, %(step_size)s) AS avg FROM (SELECT {period_function}(sessions_metadata.datetime) AS period, count(DISTINCT user_id) AS count FROM sessions_metadata INNER JOIN sessions USING (session_id) @@ -787,19 +821,21 @@ def users_active(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTime "startTimestamp": TimeUTC.trunc_day(startTimestamp) if period == "DAY" else TimeUTC.trunc_week( startTimestamp), "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} + print(ch_query % params) + print("---------------------") avg = ch.execute(ch_query, params) if len(avg) == 0 or avg[0]["avg"] == 0: return {"avg": 0, "chart": []} avg = avg[0]["avg"] + # TODO: optimize this when DB structure changes, optimization from 3s to 1s ch_query = f"""SELECT toUnixTimestamp(toDateTime(period))*1000 AS timestamp, count FROM (SELECT {period_function}(sessions_metadata.datetime) AS period, count(DISTINCT user_id) AS count FROM sessions_metadata INNER JOIN sessions USING (session_id) WHERE {" AND ".join(ch_sub_query)} GROUP BY period - ORDER BY period) AS raw_results""" - - # print(ch_query%params) - # print("---------------------") + ORDER BY period) AS raw_results;""" + print(ch_query % params) + print("---------------------") rows = ch.execute(ch_query, params) return {"avg": avg, "chart": rows} @@ -813,6 +849,7 @@ def users_power(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimes ch_sub_query.append("not empty(user_id)") with ch_client.ClickHouseClient() as ch: + # TODO: optimize this when DB structure changes, optimization from 4s to 1s ch_query = f"""SELECT AVG(count) AS avg FROM(SELECT COUNT(user_id) AS count FROM (SELECT user_id, COUNT(DISTINCT toStartOfDay(datetime)) AS number_of_days @@ -823,10 +860,13 @@ def users_power(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimes ORDER BY number_of_days) AS results;""" params = {"project_id": project_id, "startTimestamp": startTimestamp, "endTimestamp": endTimestamp, **__get_constraint_values(args)} + print(ch_query % params) + print("---------------------") avg = ch.execute(ch_query, params) if len(avg) == 0 or avg[0]["avg"] == 0: return {"avg": 0, "partition": []} avg = avg[0]["avg"] + # TODO: optimize this when DB structure changes, optimization from 4s to 1s ch_query = f"""SELECT number_of_days, COUNT(user_id) AS count FROM (SELECT user_id, COUNT(DISTINCT toStartOfDay(datetime)) AS number_of_days FROM sessions_metadata INNER JOIN sessions USING (session_id) @@ -835,8 +875,8 @@ def users_power(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimes GROUP BY number_of_days ORDER BY number_of_days;""" - # print(ch_query%params) - # print("---------------------") + print(ch_query % params) + print("---------------------") rows = ch.execute(ch_query, params) return {"avg": avg, "partition": helper.list_to_camel_case(rows)} @@ -880,11 +920,13 @@ def users_slipping(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTi LIMIT 1;""" params = {"project_id": project_id, "startTimestamp": startTimestamp, "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} + print(ch_query % params) row = ch.execute(ch_query, params) if len(row) > 0: event_value = row[0]["value"] extra_values["value"] = event_value ch_sub_query.append(f"feature.{event_column} = %(value)s") + # TODO: no possible optimization right now ch_query = f"""SELECT user_id, toUnixTimestamp(last_time)*1000 AS last_time, interactions_count, @@ -899,7 +941,7 @@ def users_slipping(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTi GROUP BY user_id, last_time, interactions_count) AS raw_results;""" params = {"project_id": project_id, "startTimestamp": startTimestamp, "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - # print(ch_query, params) + print(ch_query % params) rows = ch.execute(ch_query, params) return { "startTimestamp": startTimestamp,