feat(api): ee insights small optimizations

feat(api): ee insights fix materialized column-join issue
This commit is contained in:
Taha Yassine Kraiem 2021-10-12 19:19:50 +02:00
parent c2a61a9c88
commit 53326cce3a
2 changed files with 93 additions and 49 deletions

View file

@ -5,8 +5,9 @@ from sentry_sdk import configure_scope
from chalicelib import _overrides
from chalicelib.blueprints import bp_authorizers
from chalicelib.blueprints import bp_core, bp_core_crons
from chalicelib.blueprints.app import v1_api
from chalicelib.blueprints import bp_core_dynamic, bp_core_dynamic_crons
from chalicelib.blueprints.subs import bp_dashboard
from chalicelib.blueprints.subs import bp_dashboard,bp_insights
from chalicelib.utils import helper
from chalicelib.utils import pg_client
from chalicelib.utils.helper import environ
@ -121,7 +122,8 @@ app.register_blueprint(bp_core_crons.app)
app.register_blueprint(bp_core_dynamic.app)
app.register_blueprint(bp_core_dynamic_crons.app)
app.register_blueprint(bp_dashboard.app)
app.register_blueprint(bp_insights.app)
app.register_blueprint(v1_api.app)
# Enterprise
app.register_blueprint(bp_ee.app)
app.register_blueprint(bp_ee_crons.app)

View file

@ -25,7 +25,7 @@ JOURNEY_TYPES = {
"PAGES": {"table": "pages", "column": "url_path"},
"CLICK": {"table": "clicks", "column": "label"},
# "VIEW": {"table": "events_ios.views", "column": "name"}, TODO: enable this for SAAS only
# "EVENT": {"table": "events_common.customs", "column": "name"}
"EVENT": {"table": "customs", "column": "name"}
}
@ -200,31 +200,54 @@ def users_retention(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endT
ch_sub_query.append("sessions.duration>0")
ch_sub_query.append("sessions_metadata.datetime >= toDateTime(%(startTimestamp)s / 1000)")
with ch_client.ClickHouseClient() as ch:
ch_query = f"""SELECT toInt8((connexion_week - toDate(%(startTimestamp)s / 1000)) / 7) AS week,
COUNT(DISTINCT all_connexions.user_id) AS users_count,
groupArray(100)(all_connexions.user_id) AS connected_users
FROM (SELECT DISTINCT user_id
FROM sessions_metadata INNER JOIN sessions USING (session_id)
WHERE {" AND ".join(ch_sub_query)}
AND toStartOfWeek(sessions.datetime,1) = toDate(%(startTimestamp)s / 1000)
AND sessions_metadata.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 )
AND sessions.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 )
AND isNull((SELECT 1
FROM sessions_metadata AS bmsess INNER JOIN sessions AS bsess USING (session_id)
WHERE bsess.datetime < toDateTime(%(startTimestamp)s / 1000)
AND bmsess.datetime < toDateTime(%(startTimestamp)s / 1000)
AND bsess.project_id = %(project_id)s
AND bmsess.user_id = sessions_metadata.user_id
LIMIT 1))
) AS users_list
INNER JOIN (SELECT DISTINCT user_id, toStartOfWeek(datetime,1) AS connexion_week
FROM sessions_metadata INNER JOIN sessions USING (session_id)
WHERE {" AND ".join(ch_sub_query)}
AND sessions_metadata.datetime < toDateTime(%(endTimestamp)s / 1000)
ORDER BY connexion_week, user_id
) AS all_connexions USING (user_id)
GROUP BY connexion_week
ORDER BY connexion_week;"""
# current optimization: from 6s to 4-5s
ch_query = f"""SELECT toInt8((toStartOfWeek(datetime, 1) - toDate(1630886400000 / 1000)) / 7) AS week,
COUNT(DISTINCT user_id) AS users_count
FROM sessions_metadata INNER JOIN sessions USING (session_id)
WHERE {" AND ".join(ch_sub_query)}
AND sessions_metadata.datetime < toDateTime(%(endTimestamp)s / 1000)
AND user_id IN (SELECT DISTINCT user_id
FROM sessions_metadata
INNER JOIN sessions USING (session_id)
WHERE {" AND ".join(ch_sub_query)}
AND toStartOfWeek(sessions.datetime,1) = toDate(%(startTimestamp)s / 1000)
AND sessions_metadata.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 )
AND sessions.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 )
AND isNull((SELECT 1
FROM sessions_metadata AS bmsess
INNER JOIN sessions AS bsess USING (session_id)
WHERE bsess.datetime < toDateTime(%(startTimestamp)s / 1000)
AND bmsess.datetime < toDateTime(%(startTimestamp)s / 1000)
AND bsess.project_id = %(project_id)s
AND bmsess.user_id = sessions_metadata.user_id
LIMIT 1))
)
GROUP BY week;"""
# THIS IS THE ORIGINAL QUERY, PROBABLY WILL BE REUSED AGAIN WHEN CH-STRUCTURE CHANGES
# ch_query = f"""SELECT toInt8((connexion_week - toDate(%(startTimestamp)s / 1000)) / 7) AS week,
# COUNT(all_connexions.user_id) AS users_count,
# groupArray(100)(all_connexions.user_id) AS connected_users
# FROM (SELECT DISTINCT user_id
# FROM sessions_metadata INNER JOIN sessions USING (session_id)
# WHERE {" AND ".join(ch_sub_query)}
# AND toStartOfWeek(sessions.datetime,1) = toDate(%(startTimestamp)s / 1000)
# AND sessions_metadata.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 )
# AND sessions.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 )
# AND isNull((SELECT 1
# FROM sessions_metadata AS bmsess INNER JOIN sessions AS bsess USING (session_id)
# WHERE bsess.datetime < toDateTime(%(startTimestamp)s / 1000)
# AND bmsess.datetime < toDateTime(%(startTimestamp)s / 1000)
# AND bsess.project_id = %(project_id)s
# AND bmsess.user_id = sessions_metadata.user_id
# LIMIT 1))
# ) AS users_list
# INNER JOIN (SELECT DISTINCT user_id, toStartOfWeek(datetime,1) AS connexion_week
# FROM sessions_metadata INNER JOIN sessions USING (session_id)
# WHERE {" AND ".join(ch_sub_query)}
# AND sessions_metadata.datetime < toDateTime(%(endTimestamp)s / 1000)
# ) AS all_connexions USING (user_id)
# GROUP BY connexion_week
# ORDER BY connexion_week;"""
params = {"project_id": project_id, "startTimestamp": startTimestamp,
"endTimestamp": endTimestamp, **__get_constraint_values(args)}
# print(ch_query % params)
@ -249,6 +272,7 @@ def users_acquisition(project_id, startTimestamp=TimeUTC.now(delta_days=-70), en
ch_sub_query.append("sessions.duration>0")
ch_sub_query.append("sessions_metadata.datetime >= toDateTime(%(startTimestamp)s / 1000)")
with ch_client.ClickHouseClient() as ch:
# TODO: optimize after DB structure change, optimization from 6s to 4s
ch_query = f"""SELECT toUnixTimestamp(toDateTime(first_connexion_week))*1000 AS first_connexion_week,
week,
users_count,
@ -284,7 +308,7 @@ def users_acquisition(project_id, startTimestamp=TimeUTC.now(delta_days=-70), en
params = {"project_id": project_id, "startTimestamp": startTimestamp,
"endTimestamp": endTimestamp, **__get_constraint_values(args)}
# print(ch_query%params)
print(ch_query % params)
rows = ch.execute(ch_query, params)
rows = __compute_weekly_percentage(helper.list_to_camel_case(rows))
return {
@ -342,13 +366,13 @@ def feature_retention(project_id, startTimestamp=TimeUTC.now(delta_days=-70), en
COUNT(DISTINCT all_connexions.user_id) AS users_count,
groupArray(100)(all_connexions.user_id) AS connected_users
FROM (SELECT DISTINCT user_id
FROM sessions_metadata INNER JOIN {event_table} AS feature USING (session_id)
FROM {event_table} AS feature INNER JOIN sessions_metadata USING (session_id)
WHERE {" AND ".join(ch_sub_query)}
AND toStartOfWeek(feature.datetime,1) = toDate(%(startTimestamp)s / 1000)
AND sessions_metadata.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 )
AND feature.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 )
AND isNull((SELECT 1
FROM sessions_metadata AS bmsess INNER JOIN {event_table} AS bsess USING (session_id)
FROM {event_table} AS bsess INNER JOIN sessions_metadata AS bmsess USING (session_id)
WHERE bsess.datetime < toDateTime(%(startTimestamp)s / 1000)
AND bmsess.datetime < toDateTime(%(startTimestamp)s / 1000)
AND bsess.project_id = %(project_id)s
@ -357,7 +381,7 @@ def feature_retention(project_id, startTimestamp=TimeUTC.now(delta_days=-70), en
LIMIT 1))
) AS users_list
INNER JOIN (SELECT DISTINCT user_id, toStartOfWeek(datetime,1) AS connexion_week
FROM sessions_metadata INNER JOIN {event_table} AS feature USING (session_id)
FROM {event_table} AS feature INNER JOIN sessions_metadata USING (session_id)
WHERE {" AND ".join(ch_sub_query)}
AND sessions_metadata.datetime < toDateTime(%(endTimestamp)s / 1000)
ORDER BY connexion_week, user_id
@ -367,7 +391,7 @@ def feature_retention(project_id, startTimestamp=TimeUTC.now(delta_days=-70), en
params = {"project_id": project_id, "startTimestamp": startTimestamp,
"endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values}
# print(ch_query % params)
print(ch_query % params)
rows = ch.execute(ch_query, params)
rows = __compute_weekly_percentage(helper.list_to_camel_case(rows))
return {
@ -490,6 +514,7 @@ def feature_popularity_frequency(project_id, startTimestamp=TimeUTC.now(delta_da
extra_values["user_id"] = f["value"]
with ch_client.ClickHouseClient() as ch:
# TODO: change this query to not use join, optimization from 5s to 1s
ch_query = f"""SELECT COUNT(DISTINCT user_id) AS count
FROM sessions AS feature INNER JOIN sessions_metadata USING (session_id)
WHERE {" AND ".join(ch_sub_query)}
@ -497,7 +522,7 @@ def feature_popularity_frequency(project_id, startTimestamp=TimeUTC.now(delta_da
AND not empty(user_id);"""
params = {"project_id": project_id, "startTimestamp": startTimestamp,
"endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values}
# print(ch_query % params)
print(ch_query % params)
print("---------------------")
all_user_count = ch.execute(ch_query, params)
if len(all_user_count) == 0 or all_user_count[0]["count"] == 0:
@ -513,17 +538,19 @@ def feature_popularity_frequency(project_id, startTimestamp=TimeUTC.now(delta_da
ORDER BY count DESC
LIMIT 7;"""
# print(ch_query % params)
# print("---------------------")
print(ch_query % params)
print("---------------------")
popularity = ch.execute(ch_query, params)
params["values"] = [p["value"] for p in popularity]
ch_query = f"""SELECT {event_column} AS value, COUNT(session_id) AS count
FROM {event_table} AS feature INNER JOIN sessions_metadata USING (session_id)
WHERE {" AND ".join(ch_sub_query)}
AND {event_column} IN %(values)s
GROUP BY value;"""
# print(ch_query % params)
# print("---------------------")
print(ch_query % params)
print("---------------------")
frequencies = ch.execute(ch_query, params)
total_usage = sum([f["count"] for f in frequencies])
frequencies = {f["value"]: f["count"] for f in frequencies}
@ -562,12 +589,14 @@ def feature_adoption(project_id, startTimestamp=TimeUTC.now(delta_days=-70), end
ch_sub_query.append("user_id IS NOT NULL")
ch_sub_query.append("not empty(user_id)")
with ch_client.ClickHouseClient() as ch:
# TODO: optimize this when DB structure changes, optimization from 3s to 1s
ch_query = f"""SELECT COUNT(DISTINCT user_id) AS count
FROM sessions_metadata INNER JOIN sessions AS feature USING(session_id)
WHERE {" AND ".join(ch_sub_query)};"""
params = {"project_id": project_id, "startTimestamp": startTimestamp,
"endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values}
# print(ch_query%params)
print(ch_query % params)
print("---------------------")
all_user_count = ch.execute(ch_query, params)
if len(all_user_count) == 0 or all_user_count[0]["count"] == 0:
return {"adoption": 0, "target": 0, "filters": [{"type": "EVENT_TYPE", "value": event_type},
@ -583,6 +612,8 @@ def feature_adoption(project_id, startTimestamp=TimeUTC.now(delta_days=-70), end
LIMIT 1;"""
params = {"project_id": project_id, "startTimestamp": startTimestamp,
"endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values}
print(ch_query % params)
print("---------------------")
row = ch.execute(ch_query, params)
if len(row) > 0:
event_value = row[0]["value"]
@ -593,8 +624,8 @@ def feature_adoption(project_id, startTimestamp=TimeUTC.now(delta_days=-70), end
WHERE {" AND ".join(ch_sub_query)};"""
params = {"project_id": project_id, "startTimestamp": startTimestamp,
"endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values}
# print(ch_query% params)
# print("---------------------")
print(ch_query % params)
print("---------------------")
adoption = ch.execute(ch_query, params)
adoption = adoption[0]["count"] / all_user_count
return {"target": all_user_count, "adoption": adoption,
@ -643,6 +674,7 @@ def feature_adoption_top_users(project_id, startTimestamp=TimeUTC.now(delta_days
event_value = row[0]["value"]
extra_values["value"] = event_value
ch_sub_query.append(f"feature.{event_column} = %(value)s")
# TODO: no possible optimization right now
ch_query = f"""SELECT user_id, COUNT(DISTINCT session_id) AS count
FROM {event_table} AS feature INNER JOIN sessions_metadata USING (session_id)
WHERE {" AND ".join(ch_sub_query)}
@ -651,7 +683,7 @@ def feature_adoption_top_users(project_id, startTimestamp=TimeUTC.now(delta_days
LIMIT 10;"""
params = {"project_id": project_id, "startTimestamp": startTimestamp,
"endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values}
# print(ch_query%params)
print(ch_query % params)
rows = ch.execute(ch_query, params)
return {"users": helper.list_to_camel_case(rows),
"filters": [{"type": "EVENT_TYPE", "value": event_type}, {"type": "EVENT_VALUE", "value": event_value}]}
@ -700,6 +732,7 @@ def feature_adoption_daily_usage(project_id, startTimestamp=TimeUTC.now(delta_da
event_value = row[0]["value"]
extra_values["value"] = event_value
ch_sub_query.append(f"feature.{event_column} = %(value)s")
# optimal
ch_query = f"""SELECT toUnixTimestamp(day)*1000 AS timestamp, count
FROM (SELECT toStartOfDay(feature.datetime) AS day, COUNT(DISTINCT session_id) AS count
FROM {event_table} AS feature {"INNER JOIN sessions_metadata USING (session_id)" if len(meta_condition) > 0 else ""}
@ -777,6 +810,7 @@ def users_active(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTime
ch_sub_query.append("sessions_metadata.datetime >= toDateTime(%(startTimestamp)s/1000)")
ch_sub_query.append("sessions_metadata.datetime < toDateTime(%(endTimestamp)s/1000)")
with ch_client.ClickHouseClient() as ch:
# TODO: optimize this when DB structure changes, optimization from 3s to 1s
ch_query = f"""SELECT SUM(count) / intDiv(%(endTimestamp)s - %(startTimestamp)s, %(step_size)s) AS avg
FROM (SELECT {period_function}(sessions_metadata.datetime) AS period, count(DISTINCT user_id) AS count
FROM sessions_metadata INNER JOIN sessions USING (session_id)
@ -787,19 +821,21 @@ def users_active(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTime
"startTimestamp": TimeUTC.trunc_day(startTimestamp) if period == "DAY" else TimeUTC.trunc_week(
startTimestamp), "endTimestamp": endTimestamp, **__get_constraint_values(args),
**extra_values}
print(ch_query % params)
print("---------------------")
avg = ch.execute(ch_query, params)
if len(avg) == 0 or avg[0]["avg"] == 0:
return {"avg": 0, "chart": []}
avg = avg[0]["avg"]
# TODO: optimize this when DB structure changes, optimization from 3s to 1s
ch_query = f"""SELECT toUnixTimestamp(toDateTime(period))*1000 AS timestamp, count
FROM (SELECT {period_function}(sessions_metadata.datetime) AS period, count(DISTINCT user_id) AS count
FROM sessions_metadata INNER JOIN sessions USING (session_id)
WHERE {" AND ".join(ch_sub_query)}
GROUP BY period
ORDER BY period) AS raw_results"""
# print(ch_query%params)
# print("---------------------")
ORDER BY period) AS raw_results;"""
print(ch_query % params)
print("---------------------")
rows = ch.execute(ch_query, params)
return {"avg": avg, "chart": rows}
@ -813,6 +849,7 @@ def users_power(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimes
ch_sub_query.append("not empty(user_id)")
with ch_client.ClickHouseClient() as ch:
# TODO: optimize this when DB structure changes, optimization from 4s to 1s
ch_query = f"""SELECT AVG(count) AS avg
FROM(SELECT COUNT(user_id) AS count
FROM (SELECT user_id, COUNT(DISTINCT toStartOfDay(datetime)) AS number_of_days
@ -823,10 +860,13 @@ def users_power(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimes
ORDER BY number_of_days) AS results;"""
params = {"project_id": project_id,
"startTimestamp": startTimestamp, "endTimestamp": endTimestamp, **__get_constraint_values(args)}
print(ch_query % params)
print("---------------------")
avg = ch.execute(ch_query, params)
if len(avg) == 0 or avg[0]["avg"] == 0:
return {"avg": 0, "partition": []}
avg = avg[0]["avg"]
# TODO: optimize this when DB structure changes, optimization from 4s to 1s
ch_query = f"""SELECT number_of_days, COUNT(user_id) AS count
FROM (SELECT user_id, COUNT(DISTINCT toStartOfDay(datetime)) AS number_of_days
FROM sessions_metadata INNER JOIN sessions USING (session_id)
@ -835,8 +875,8 @@ def users_power(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimes
GROUP BY number_of_days
ORDER BY number_of_days;"""
# print(ch_query%params)
# print("---------------------")
print(ch_query % params)
print("---------------------")
rows = ch.execute(ch_query, params)
return {"avg": avg, "partition": helper.list_to_camel_case(rows)}
@ -880,11 +920,13 @@ def users_slipping(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTi
LIMIT 1;"""
params = {"project_id": project_id, "startTimestamp": startTimestamp,
"endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values}
print(ch_query % params)
row = ch.execute(ch_query, params)
if len(row) > 0:
event_value = row[0]["value"]
extra_values["value"] = event_value
ch_sub_query.append(f"feature.{event_column} = %(value)s")
# TODO: no possible optimization right now
ch_query = f"""SELECT user_id,
toUnixTimestamp(last_time)*1000 AS last_time,
interactions_count,
@ -899,7 +941,7 @@ def users_slipping(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTi
GROUP BY user_id, last_time, interactions_count) AS raw_results;"""
params = {"project_id": project_id, "startTimestamp": startTimestamp,
"endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values}
# print(ch_query, params)
print(ch_query % params)
rows = ch.execute(ch_query, params)
return {
"startTimestamp": startTimestamp,