From 9bc6251db22c2cb81a0228ff31efa8d4080a9989 Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Thu, 7 Oct 2021 17:08:24 +0200 Subject: [PATCH] feat(api): insights feature acquisition cohort --- ee/api/chalicelib/core/insights.py | 138 +++++++++++++---------------- 1 file changed, 64 insertions(+), 74 deletions(-) diff --git a/ee/api/chalicelib/core/insights.py b/ee/api/chalicelib/core/insights.py index 00c8e71fb..6ff8b9481 100644 --- a/ee/api/chalicelib/core/insights.py +++ b/ee/api/chalicelib/core/insights.py @@ -241,8 +241,7 @@ def users_retention(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endT @dev.timed def users_acquisition(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], - **args): + filters=[], **args): startTimestamp = TimeUTC.trunc_week(startTimestamp) endTimestamp = startTimestamp + 10 * TimeUTC.MS_WEEK ch_sub_query = __get_basic_constraints(table_name='sessions', data=args) @@ -299,8 +298,7 @@ def users_acquisition(project_id, startTimestamp=TimeUTC.now(delta_days=-70), en @dev.timed def feature_retention(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], - **args): + filters=[], **args): startTimestamp = TimeUTC.trunc_week(startTimestamp) endTimestamp = startTimestamp + 10 * TimeUTC.MS_WEEK ch_sub_query = __get_basic_constraints(table_name='feature', data=args) @@ -384,15 +382,16 @@ def feature_retention(project_id, startTimestamp=TimeUTC.now(delta_days=-70), en @dev.timed def feature_acquisition(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], - **args): + filters=[], **args): startTimestamp = TimeUTC.trunc_week(startTimestamp) endTimestamp = startTimestamp + 10 * TimeUTC.MS_WEEK - pg_sub_query = __get_constraints(project_id=project_id, data=args, duration=True, main_table="sessions", - time_constraint=True) - pg_sub_query.append("user_id IS NOT NULL") - pg_sub_query.append("feature.timestamp >= %(startTimestamp)s") - pg_sub_query.append("feature.timestamp < %(endTimestamp)s") + ch_sub_query = __get_basic_constraints(table_name='feature', data=args) + meta_condition = __get_meta_constraint(args) + ch_sub_query += meta_condition + ch_sub_query.append("user_id IS NOT NULL") + ch_sub_query.append("not empty(user_id)") + ch_sub_query.append("sessions_metadata.datetime >= toDateTime(%(startTimestamp)s / 1000)") + event_type = "PAGES" event_value = "/" extra_values = {} @@ -404,69 +403,65 @@ def feature_acquisition(project_id, startTimestamp=TimeUTC.now(delta_days=-70), event_value = f["value"] default = False elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]: - pg_sub_query.append(f"sessions.user_id = %(user_id)s") + ch_sub_query.append(f"sessions_metadata.user_id = %(user_id)s") extra_values["user_id"] = f["value"] event_table = JOURNEY_TYPES[event_type]["table"] event_column = JOURNEY_TYPES[event_type]["column"] - - pg_sub_query.append(f"feature.{event_column} = %(value)s") - - with pg_client.PostgresClient() as cur: + with ch_client.ClickHouseClient() as ch: if default: # get most used value - pg_query = f"""SELECT {event_column} AS value, COUNT(*) AS count - FROM {event_table} AS feature INNER JOIN public.sessions USING (session_id) - WHERE {" AND ".join(pg_sub_query[:-1])} + ch_query = f"""SELECT {event_column} AS value, COUNT(*) AS count + FROM {event_table} AS feature INNER JOIN sessions_metadata USING (session_id) + WHERE {" AND ".join(ch_sub_query)} AND length({event_column}) > 2 - GROUP BY value - ORDER BY count DESC - LIMIT 1;""" + GROUP BY value + ORDER BY count DESC + LIMIT 1;""" params = {"project_id": project_id, "startTimestamp": startTimestamp, "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - cur.execute(cur.mogrify(pg_query, params)) - row = cur.fetchone() - if row is not None: - event_value = row["value"] + # print(ch_query% params) + row = ch.execute(ch_query, params) + if len(row) > 0: + event_value = row[0]["value"] extra_values["value"] = event_value - if len(event_value) > 2: - pg_sub_query.append(f"length({event_column})>2") - pg_query = f"""SELECT EXTRACT(EPOCH FROM first_connexion_week::date)::bigint*1000 AS first_connexion_week, - FLOOR(DATE_PART('day', connexion_week - first_connexion_week) / 7)::integer AS week, - COUNT(DISTINCT connexions_list.user_id) AS users_count, - ARRAY_AGG(DISTINCT connexions_list.user_id) AS connected_users - FROM (SELECT user_id, DATE_TRUNC('week', to_timestamp(first_connexion_week / 1000)) AS first_connexion_week - FROM(SELECT DISTINCT user_id, MIN(start_ts) AS first_connexion_week - FROM sessions INNER JOIN {event_table} AS feature USING (session_id) - WHERE {" AND ".join(pg_sub_query)} - AND NOT EXISTS((SELECT 1 - FROM sessions AS bsess INNER JOIN {event_table} AS bfeature USING (session_id) - WHERE bsess.start_ts<%(startTimestamp)s - AND project_id = %(project_id)s - AND bsess.user_id = sessions.user_id - AND bfeature.timestamp<%(startTimestamp)s - AND bfeature.{event_column}=%(value)s - LIMIT 1)) - GROUP BY user_id) AS raw_users_list) AS users_list - LEFT JOIN LATERAL (SELECT DATE_TRUNC('week', to_timestamp(start_ts / 1000)::timestamp) AS connexion_week, - user_id - FROM sessions INNER JOIN {event_table} AS feature USING(session_id) - WHERE users_list.user_id = sessions.user_id - AND first_connexion_week <= - DATE_TRUNC('week', to_timestamp(sessions.start_ts / 1000)::timestamp) - AND sessions.project_id = %(project_id)s - AND sessions.start_ts < (%(endTimestamp)s - 1) - AND feature.timestamp >= %(startTimestamp)s - AND feature.timestamp < %(endTimestamp)s - AND feature.{event_column} = %(value)s - GROUP BY connexion_week, user_id) AS connexions_list ON (TRUE) - GROUP BY first_connexion_week, week - ORDER BY first_connexion_week, week;""" + ch_sub_query.append(f"feature.{event_column} = %(value)s") + ch_query = f"""SELECT toUnixTimestamp(toDateTime(first_connexion_week))*1000 AS first_connexion_week, + week, + users_count, + connected_users + FROM ( + SELECT first_connexion_week, + toInt8((connexion_week - first_connexion_week) / 7) AS week, + COUNT(DISTINCT all_connexions.user_id) AS users_count, + groupArray(100)(all_connexions.user_id) AS connected_users + FROM (SELECT user_id, MIN(toStartOfWeek(feature.datetime, 1)) AS first_connexion_week + FROM sessions_metadata INNER JOIN {event_table} AS feature USING (session_id) + WHERE {" AND ".join(ch_sub_query)} + AND sessions_metadata.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 ) + AND feature.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 ) + AND isNull((SELECT 1 + FROM sessions_metadata AS bmsess INNER JOIN sessions AS bsess USING (session_id) + WHERE bsess.datetime < toDateTime(%(startTimestamp)s / 1000) + AND bmsess.datetime < toDateTime(%(startTimestamp)s / 1000) + AND bsess.project_id = %(project_id)s + AND bmsess.user_id = sessions_metadata.user_id + LIMIT 1)) + GROUP BY user_id) AS users_list + INNER JOIN (SELECT DISTINCT user_id, toStartOfWeek(datetime, 1) AS connexion_week + FROM sessions_metadata INNER JOIN {event_table} AS feature USING (session_id) + WHERE {" AND ".join(ch_sub_query)} + AND sessions_metadata.datetime < toDateTime(%(endTimestamp)s / 1000) + ORDER BY connexion_week, user_id + ) AS all_connexions USING (user_id) + WHERE first_connexion_week <= connexion_week + GROUP BY first_connexion_week, week + ORDER BY first_connexion_week, week + ) AS full_data;""" params = {"project_id": project_id, "startTimestamp": startTimestamp, "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - print(cur.mogrify(pg_query, params)) - cur.execute(cur.mogrify(pg_query, params)) - rows = cur.fetchall() + print(ch_query % params) + rows = ch.execute(ch_query, params) rows = __compute_weekly_percentage(helper.list_to_camel_case(rows)) return { "startTimestamp": startTimestamp, @@ -477,8 +472,7 @@ def feature_acquisition(project_id, startTimestamp=TimeUTC.now(delta_days=-70), @dev.timed def feature_popularity_frequency(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], - **args): + filters=[], **args): startTimestamp = TimeUTC.trunc_week(startTimestamp) endTimestamp = startTimestamp + 10 * TimeUTC.MS_WEEK pg_sub_query = __get_constraints(project_id=project_id, data=args, duration=True, main_table="sessions", @@ -542,8 +536,7 @@ def feature_popularity_frequency(project_id, startTimestamp=TimeUTC.now(delta_da @dev.timed def feature_adoption(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], - **args): + filters=[], **args): pg_sub_query = __get_constraints(project_id=project_id, data=args, duration=True, main_table="sessions", time_constraint=True) event_type = "CLICK" @@ -736,8 +729,7 @@ def feature_adoption_daily_usage(project_id, startTimestamp=TimeUTC.now(delta_da @dev.timed -def feature_intensity(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], +def feature_intensity(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), filters=[], **args): pg_sub_query = __get_constraints(project_id=project_id, data=args, duration=True, main_table="sessions", time_constraint=True) @@ -773,8 +765,7 @@ def feature_intensity(project_id, startTimestamp=TimeUTC.now(delta_days=-70), en @dev.timed -def users_active(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], +def users_active(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), filters=[], **args): pg_sub_query_chart = __get_constraints(project_id=project_id, time_constraint=True, chart=True, data=args) @@ -815,8 +806,7 @@ def users_active(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTime @dev.timed -def users_power(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], **args): +def users_power(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), filters=[], **args): pg_sub_query = __get_constraints(project_id=project_id, time_constraint=True, chart=False, data=args) pg_sub_query.append("user_id IS NOT NULL") @@ -840,8 +830,8 @@ def users_power(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimes @dev.timed -def users_slipping(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], **args): +def users_slipping(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), filters=[], + **args): pg_sub_query = __get_constraints(project_id=project_id, data=args, duration=True, main_table="sessions", time_constraint=True) pg_sub_query.append("user_id IS NOT NULL")