feat(api): insights feature acquisition cohort

This commit is contained in:
Taha Yassine Kraiem 2021-10-07 17:08:24 +02:00
parent e7a942ee64
commit 9bc6251db2

View file

@ -241,8 +241,7 @@ def users_retention(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endT
@dev.timed
def users_acquisition(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(),
filters=[],
**args):
filters=[], **args):
startTimestamp = TimeUTC.trunc_week(startTimestamp)
endTimestamp = startTimestamp + 10 * TimeUTC.MS_WEEK
ch_sub_query = __get_basic_constraints(table_name='sessions', data=args)
@ -299,8 +298,7 @@ def users_acquisition(project_id, startTimestamp=TimeUTC.now(delta_days=-70), en
@dev.timed
def feature_retention(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(),
filters=[],
**args):
filters=[], **args):
startTimestamp = TimeUTC.trunc_week(startTimestamp)
endTimestamp = startTimestamp + 10 * TimeUTC.MS_WEEK
ch_sub_query = __get_basic_constraints(table_name='feature', data=args)
@ -384,15 +382,16 @@ def feature_retention(project_id, startTimestamp=TimeUTC.now(delta_days=-70), en
@dev.timed
def feature_acquisition(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(),
filters=[],
**args):
filters=[], **args):
startTimestamp = TimeUTC.trunc_week(startTimestamp)
endTimestamp = startTimestamp + 10 * TimeUTC.MS_WEEK
pg_sub_query = __get_constraints(project_id=project_id, data=args, duration=True, main_table="sessions",
time_constraint=True)
pg_sub_query.append("user_id IS NOT NULL")
pg_sub_query.append("feature.timestamp >= %(startTimestamp)s")
pg_sub_query.append("feature.timestamp < %(endTimestamp)s")
ch_sub_query = __get_basic_constraints(table_name='feature', data=args)
meta_condition = __get_meta_constraint(args)
ch_sub_query += meta_condition
ch_sub_query.append("user_id IS NOT NULL")
ch_sub_query.append("not empty(user_id)")
ch_sub_query.append("sessions_metadata.datetime >= toDateTime(%(startTimestamp)s / 1000)")
event_type = "PAGES"
event_value = "/"
extra_values = {}
@ -404,69 +403,65 @@ def feature_acquisition(project_id, startTimestamp=TimeUTC.now(delta_days=-70),
event_value = f["value"]
default = False
elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]:
pg_sub_query.append(f"sessions.user_id = %(user_id)s")
ch_sub_query.append(f"sessions_metadata.user_id = %(user_id)s")
extra_values["user_id"] = f["value"]
event_table = JOURNEY_TYPES[event_type]["table"]
event_column = JOURNEY_TYPES[event_type]["column"]
pg_sub_query.append(f"feature.{event_column} = %(value)s")
with pg_client.PostgresClient() as cur:
with ch_client.ClickHouseClient() as ch:
if default:
# get most used value
pg_query = f"""SELECT {event_column} AS value, COUNT(*) AS count
FROM {event_table} AS feature INNER JOIN public.sessions USING (session_id)
WHERE {" AND ".join(pg_sub_query[:-1])}
ch_query = f"""SELECT {event_column} AS value, COUNT(*) AS count
FROM {event_table} AS feature INNER JOIN sessions_metadata USING (session_id)
WHERE {" AND ".join(ch_sub_query)}
AND length({event_column}) > 2
GROUP BY value
ORDER BY count DESC
LIMIT 1;"""
GROUP BY value
ORDER BY count DESC
LIMIT 1;"""
params = {"project_id": project_id, "startTimestamp": startTimestamp,
"endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values}
cur.execute(cur.mogrify(pg_query, params))
row = cur.fetchone()
if row is not None:
event_value = row["value"]
# print(ch_query% params)
row = ch.execute(ch_query, params)
if len(row) > 0:
event_value = row[0]["value"]
extra_values["value"] = event_value
if len(event_value) > 2:
pg_sub_query.append(f"length({event_column})>2")
pg_query = f"""SELECT EXTRACT(EPOCH FROM first_connexion_week::date)::bigint*1000 AS first_connexion_week,
FLOOR(DATE_PART('day', connexion_week - first_connexion_week) / 7)::integer AS week,
COUNT(DISTINCT connexions_list.user_id) AS users_count,
ARRAY_AGG(DISTINCT connexions_list.user_id) AS connected_users
FROM (SELECT user_id, DATE_TRUNC('week', to_timestamp(first_connexion_week / 1000)) AS first_connexion_week
FROM(SELECT DISTINCT user_id, MIN(start_ts) AS first_connexion_week
FROM sessions INNER JOIN {event_table} AS feature USING (session_id)
WHERE {" AND ".join(pg_sub_query)}
AND NOT EXISTS((SELECT 1
FROM sessions AS bsess INNER JOIN {event_table} AS bfeature USING (session_id)
WHERE bsess.start_ts<%(startTimestamp)s
AND project_id = %(project_id)s
AND bsess.user_id = sessions.user_id
AND bfeature.timestamp<%(startTimestamp)s
AND bfeature.{event_column}=%(value)s
LIMIT 1))
GROUP BY user_id) AS raw_users_list) AS users_list
LEFT JOIN LATERAL (SELECT DATE_TRUNC('week', to_timestamp(start_ts / 1000)::timestamp) AS connexion_week,
user_id
FROM sessions INNER JOIN {event_table} AS feature USING(session_id)
WHERE users_list.user_id = sessions.user_id
AND first_connexion_week <=
DATE_TRUNC('week', to_timestamp(sessions.start_ts / 1000)::timestamp)
AND sessions.project_id = %(project_id)s
AND sessions.start_ts < (%(endTimestamp)s - 1)
AND feature.timestamp >= %(startTimestamp)s
AND feature.timestamp < %(endTimestamp)s
AND feature.{event_column} = %(value)s
GROUP BY connexion_week, user_id) AS connexions_list ON (TRUE)
GROUP BY first_connexion_week, week
ORDER BY first_connexion_week, week;"""
ch_sub_query.append(f"feature.{event_column} = %(value)s")
ch_query = f"""SELECT toUnixTimestamp(toDateTime(first_connexion_week))*1000 AS first_connexion_week,
week,
users_count,
connected_users
FROM (
SELECT first_connexion_week,
toInt8((connexion_week - first_connexion_week) / 7) AS week,
COUNT(DISTINCT all_connexions.user_id) AS users_count,
groupArray(100)(all_connexions.user_id) AS connected_users
FROM (SELECT user_id, MIN(toStartOfWeek(feature.datetime, 1)) AS first_connexion_week
FROM sessions_metadata INNER JOIN {event_table} AS feature USING (session_id)
WHERE {" AND ".join(ch_sub_query)}
AND sessions_metadata.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 )
AND feature.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 )
AND isNull((SELECT 1
FROM sessions_metadata AS bmsess INNER JOIN sessions AS bsess USING (session_id)
WHERE bsess.datetime < toDateTime(%(startTimestamp)s / 1000)
AND bmsess.datetime < toDateTime(%(startTimestamp)s / 1000)
AND bsess.project_id = %(project_id)s
AND bmsess.user_id = sessions_metadata.user_id
LIMIT 1))
GROUP BY user_id) AS users_list
INNER JOIN (SELECT DISTINCT user_id, toStartOfWeek(datetime, 1) AS connexion_week
FROM sessions_metadata INNER JOIN {event_table} AS feature USING (session_id)
WHERE {" AND ".join(ch_sub_query)}
AND sessions_metadata.datetime < toDateTime(%(endTimestamp)s / 1000)
ORDER BY connexion_week, user_id
) AS all_connexions USING (user_id)
WHERE first_connexion_week <= connexion_week
GROUP BY first_connexion_week, week
ORDER BY first_connexion_week, week
) AS full_data;"""
params = {"project_id": project_id, "startTimestamp": startTimestamp,
"endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values}
print(cur.mogrify(pg_query, params))
cur.execute(cur.mogrify(pg_query, params))
rows = cur.fetchall()
print(ch_query % params)
rows = ch.execute(ch_query, params)
rows = __compute_weekly_percentage(helper.list_to_camel_case(rows))
return {
"startTimestamp": startTimestamp,
@ -477,8 +472,7 @@ def feature_acquisition(project_id, startTimestamp=TimeUTC.now(delta_days=-70),
@dev.timed
def feature_popularity_frequency(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(),
filters=[],
**args):
filters=[], **args):
startTimestamp = TimeUTC.trunc_week(startTimestamp)
endTimestamp = startTimestamp + 10 * TimeUTC.MS_WEEK
pg_sub_query = __get_constraints(project_id=project_id, data=args, duration=True, main_table="sessions",
@ -542,8 +536,7 @@ def feature_popularity_frequency(project_id, startTimestamp=TimeUTC.now(delta_da
@dev.timed
def feature_adoption(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(),
filters=[],
**args):
filters=[], **args):
pg_sub_query = __get_constraints(project_id=project_id, data=args, duration=True, main_table="sessions",
time_constraint=True)
event_type = "CLICK"
@ -736,8 +729,7 @@ def feature_adoption_daily_usage(project_id, startTimestamp=TimeUTC.now(delta_da
@dev.timed
def feature_intensity(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(),
filters=[],
def feature_intensity(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), filters=[],
**args):
pg_sub_query = __get_constraints(project_id=project_id, data=args, duration=True, main_table="sessions",
time_constraint=True)
@ -773,8 +765,7 @@ def feature_intensity(project_id, startTimestamp=TimeUTC.now(delta_days=-70), en
@dev.timed
def users_active(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(),
filters=[],
def users_active(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), filters=[],
**args):
pg_sub_query_chart = __get_constraints(project_id=project_id, time_constraint=True,
chart=True, data=args)
@ -815,8 +806,7 @@ def users_active(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTime
@dev.timed
def users_power(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(),
filters=[], **args):
def users_power(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), filters=[], **args):
pg_sub_query = __get_constraints(project_id=project_id, time_constraint=True, chart=False, data=args)
pg_sub_query.append("user_id IS NOT NULL")
@ -840,8 +830,8 @@ def users_power(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimes
@dev.timed
def users_slipping(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(),
filters=[], **args):
def users_slipping(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), filters=[],
**args):
pg_sub_query = __get_constraints(project_id=project_id, data=args, duration=True, main_table="sessions",
time_constraint=True)
pg_sub_query.append("user_id IS NOT NULL")