From 6c4b1356a13748e162473270b195f5d73dffc082 Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Thu, 28 Oct 2021 19:27:56 +0200 Subject: [PATCH] feat(api): v1.4.0 patch --- api/app.py | 3 +- api/chalicelib/blueprints/subs/bp_insights.py | 163 --- api/chalicelib/core/insights.py | 932 --------------- ee/api/app.py | 3 +- .../chalicelib/blueprints/subs/bp_insights.py | 163 --- ee/api/chalicelib/core/insights.py | 1047 ----------------- .../{1.3.6/1.3.6.sql => 1.4.0/1.4.0.sql} | 0 .../{1.3.6/1.3.6.sql => 1.4.0/1.4.0.sql} | 0 8 files changed, 2 insertions(+), 2309 deletions(-) delete mode 100644 api/chalicelib/blueprints/subs/bp_insights.py delete mode 100644 api/chalicelib/core/insights.py delete mode 100644 ee/api/chalicelib/blueprints/subs/bp_insights.py delete mode 100644 ee/api/chalicelib/core/insights.py rename ee/scripts/helm/db/init_dbs/postgresql/{1.3.6/1.3.6.sql => 1.4.0/1.4.0.sql} (100%) rename scripts/helm/db/init_dbs/postgresql/{1.3.6/1.3.6.sql => 1.4.0/1.4.0.sql} (100%) diff --git a/api/app.py b/api/app.py index 92224f99e..e67810de5 100644 --- a/api/app.py +++ b/api/app.py @@ -7,7 +7,7 @@ from chalicelib.blueprints import bp_authorizers from chalicelib.blueprints import bp_core, bp_core_crons from chalicelib.blueprints.app import v1_api from chalicelib.blueprints import bp_core_dynamic, bp_core_dynamic_crons -from chalicelib.blueprints.subs import bp_dashboard,bp_insights +from chalicelib.blueprints.subs import bp_dashboard from chalicelib.utils import helper from chalicelib.utils import pg_client from chalicelib.utils.helper import environ @@ -106,5 +106,4 @@ app.register_blueprint(bp_core_crons.app) app.register_blueprint(bp_core_dynamic.app) app.register_blueprint(bp_core_dynamic_crons.app) app.register_blueprint(bp_dashboard.app) -app.register_blueprint(bp_insights.app) app.register_blueprint(v1_api.app) diff --git a/api/chalicelib/blueprints/subs/bp_insights.py b/api/chalicelib/blueprints/subs/bp_insights.py deleted file mode 100644 index 6546bfd12..000000000 --- a/api/chalicelib/blueprints/subs/bp_insights.py +++ /dev/null @@ -1,163 +0,0 @@ -from chalice import Blueprint -from chalicelib.utils import helper -from chalicelib import _overrides - -from chalicelib.core import dashboard, insights -from chalicelib.core import metadata - -app = Blueprint(__name__) -_overrides.chalice_app(app) - - -@app.route('/{projectId}/insights/journey', methods=['GET', 'POST']) -def get_insights_journey(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.journey(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/users_acquisition', methods=['GET', 'POST']) -def get_users_acquisition(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.users_acquisition(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/users_retention', methods=['GET', 'POST']) -def get_users_retention(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.users_retention(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/feature_retention', methods=['GET', 'POST']) -def get_feature_rentention(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.feature_retention(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/feature_acquisition', methods=['GET', 'POST']) -def get_feature_acquisition(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.feature_acquisition(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/feature_popularity_frequency', methods=['GET', 'POST']) -def get_feature_popularity_frequency(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.feature_popularity_frequency(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/feature_intensity', methods=['GET', 'POST']) -def get_feature_intensity(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.feature_intensity(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/feature_adoption', methods=['GET', 'POST']) -def get_feature_adoption(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.feature_adoption(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/feature_adoption_top_users', methods=['GET', 'POST']) -def get_feature_adoption(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.feature_adoption_top_users(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/feature_adoption_daily_usage', methods=['GET', 'POST']) -def get_feature_adoption_daily_usage(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.feature_adoption_daily_usage(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/users_active', methods=['GET', 'POST']) -def get_users_active(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.users_active(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/users_power', methods=['GET', 'POST']) -def get_users_power(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.users_power(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/users_slipping', methods=['GET', 'POST']) -def get_users_slipping(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.users_slipping(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/search', methods=['GET']) -def get_insights_autocomplete(projectId, context): - params = app.current_request.query_params - if params is None or params.get('q') is None or len(params.get('q')) == 0: - return {"data": []} - # params['q'] = '^' + params['q'] - - return {'data': insights.search(params.get('q', ''), project_id=projectId, - platform=params.get('platform', None), feature_type=params.get("key"))} diff --git a/api/chalicelib/core/insights.py b/api/chalicelib/core/insights.py deleted file mode 100644 index af4a48c09..000000000 --- a/api/chalicelib/core/insights.py +++ /dev/null @@ -1,932 +0,0 @@ -from chalicelib.core import sessions_metas -from chalicelib.utils import helper, dev -from chalicelib.utils import pg_client -from chalicelib.utils.TimeUTC import TimeUTC -from chalicelib.utils.metrics_helper import __get_step_size -import math -from chalicelib.core.dashboard import __get_constraints, __get_constraint_values - - -def __transform_journey(rows): - nodes = [] - links = [] - for r in rows: - source = r["source_event"][r["source_event"].index("_") + 1:] - target = r["target_event"][r["target_event"].index("_") + 1:] - if source not in nodes: - nodes.append(source) - if target not in nodes: - nodes.append(target) - links.append({"source": nodes.index(source), "target": nodes.index(target), "value": r["value"]}) - return {"nodes": nodes, "links": sorted(links, key=lambda x: x["value"], reverse=True)} - - -JOURNEY_DEPTH = 5 -JOURNEY_TYPES = { - "PAGES": {"table": "events.pages", "column": "base_path", "table_id": "message_id"}, - "CLICK": {"table": "events.clicks", "column": "label", "table_id": "message_id"}, - # "VIEW": {"table": "events_ios.views", "column": "name", "table_id": "seq_index"}, TODO: enable this for SAAS only - "EVENT": {"table": "events_common.customs", "column": "name", "table_id": "seq_index"} -} - - -@dev.timed -def journey(project_id, startTimestamp=TimeUTC.now(delta_days=-1), endTimestamp=TimeUTC.now(), filters=[], **args): - pg_sub_query_subset = __get_constraints(project_id=project_id, data=args, duration=True, main_table="sessions", - time_constraint=True) - event_start = None - event_table = JOURNEY_TYPES["PAGES"]["table"] - event_column = JOURNEY_TYPES["PAGES"]["column"] - event_table_id = JOURNEY_TYPES["PAGES"]["table_id"] - extra_values = {} - for f in filters: - if f["type"] == "START_POINT": - event_start = f["value"] - elif f["type"] == "EVENT_TYPE" and JOURNEY_TYPES.get(f["value"]): - event_table = JOURNEY_TYPES[f["value"]]["table"] - event_column = JOURNEY_TYPES[f["value"]]["column"] - elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]: - pg_sub_query_subset.append(f"sessions.user_id = %(user_id)s") - extra_values["user_id"] = f["value"] - - with pg_client.PostgresClient() as cur: - pg_query = f"""SELECT source_event, - target_event, - count(*) AS value - - FROM (SELECT event_number || '_' || value as target_event, - LAG(event_number || '_' || value, 1) OVER ( PARTITION BY session_rank ) AS source_event - FROM (SELECT value, - session_rank, - message_id, - ROW_NUMBER() OVER ( PARTITION BY session_rank ORDER BY timestamp ) AS event_number - - {f"FROM (SELECT * FROM (SELECT *, MIN(mark) OVER ( PARTITION BY session_id , session_rank ORDER BY timestamp ) AS max FROM (SELECT *, CASE WHEN value = %(event_start)s THEN timestamp ELSE NULL END as mark" - if event_start else ""} - - FROM (SELECT session_id, - message_id, - timestamp, - value, - SUM(new_session) OVER (ORDER BY session_id, timestamp) AS session_rank - FROM (SELECT *, - CASE - WHEN source_timestamp IS NULL THEN 1 - ELSE 0 END AS new_session - FROM (SELECT session_id, - {event_table_id} AS message_id, - timestamp, - {event_column} AS value, - LAG(timestamp) - OVER (PARTITION BY session_id ORDER BY timestamp) AS source_timestamp - FROM {event_table} INNER JOIN public.sessions USING (session_id) - WHERE {" AND ".join(pg_sub_query_subset)} - ) AS related_events) AS ranked_events) AS processed - {") AS marked) AS maxed WHERE timestamp >= max) AS filtered" if event_start else ""} - ) AS sorted_events - WHERE event_number <= %(JOURNEY_DEPTH)s) AS final - WHERE source_event IS NOT NULL - and target_event IS NOT NULL - GROUP BY source_event, target_event - ORDER BY value DESC - LIMIT 20;""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, "event_start": event_start, "JOURNEY_DEPTH": JOURNEY_DEPTH, - **__get_constraint_values(args), **extra_values} - # print(cur.mogrify(pg_query, params)) - cur.execute(cur.mogrify(pg_query, params)) - rows = cur.fetchall() - - return __transform_journey(rows) - - -def __compute_weekly_percentage(rows): - if rows is None or len(rows) == 0: - return rows - t = -1 - for r in rows: - if r["week"] == 0: - t = r["usersCount"] - r["percentage"] = r["usersCount"] / t - return rows - - -def __complete_retention(rows, start_date, end_date=None): - if rows is None: - return [] - max_week = 10 - for i in range(max_week): - if end_date is not None and start_date + i * TimeUTC.MS_WEEK >= end_date: - break - neutral = { - "firstConnexionWeek": start_date, - "week": i, - "usersCount": 0, - "connectedUsers": [], - "percentage": 0 - } - if i < len(rows) \ - and i != rows[i]["week"]: - rows.insert(i, neutral) - elif i >= len(rows): - rows.append(neutral) - return rows - - -def __complete_acquisition(rows, start_date, end_date=None): - if rows is None: - return [] - max_week = 10 - week = 0 - delta_date = 0 - while max_week > 0: - start_date += TimeUTC.MS_WEEK - if end_date is not None and start_date >= end_date: - break - delta = 0 - if delta_date + week >= len(rows) \ - or delta_date + week < len(rows) and rows[delta_date + week]["firstConnexionWeek"] > start_date: - for i in range(max_week): - if end_date is not None and start_date + i * TimeUTC.MS_WEEK >= end_date: - break - - neutral = { - "firstConnexionWeek": start_date, - "week": i, - "usersCount": 0, - "connectedUsers": [], - "percentage": 0 - } - rows.insert(delta_date + week + i, neutral) - delta = i - else: - for i in range(max_week): - if end_date is not None and start_date + i * TimeUTC.MS_WEEK >= end_date: - break - - neutral = { - "firstConnexionWeek": start_date, - "week": i, - "usersCount": 0, - "connectedUsers": [], - "percentage": 0 - } - if delta_date + week + i < len(rows) \ - and i != rows[delta_date + week + i]["week"]: - rows.insert(delta_date + week + i, neutral) - elif delta_date + week + i >= len(rows): - rows.append(neutral) - delta = i - week += delta - max_week -= 1 - delta_date += 1 - return rows - - -@dev.timed -def users_retention(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), filters=[], - **args): - startTimestamp = TimeUTC.trunc_week(startTimestamp) - endTimestamp = startTimestamp + 10 * TimeUTC.MS_WEEK - pg_sub_query = __get_constraints(project_id=project_id, data=args, duration=True, main_table="sessions", - time_constraint=True) - pg_sub_query.append("user_id IS NOT NULL") - pg_sub_query.append("DATE_TRUNC('week', to_timestamp(start_ts / 1000)) = to_timestamp(%(startTimestamp)s / 1000)") - with pg_client.PostgresClient() as cur: - pg_query = f"""SELECT FLOOR(DATE_PART('day', connexion_week - DATE_TRUNC('week', to_timestamp(%(startTimestamp)s / 1000)::timestamp)) / 7)::integer AS week, - COUNT(DISTINCT connexions_list.user_id) AS users_count, - ARRAY_AGG(DISTINCT connexions_list.user_id) AS connected_users - FROM (SELECT DISTINCT user_id - FROM sessions - WHERE {" AND ".join(pg_sub_query)} - AND DATE_PART('week', to_timestamp((sessions.start_ts - %(startTimestamp)s)/1000)) = 1 - AND NOT EXISTS((SELECT 1 - FROM sessions AS bsess - WHERE bsess.start_ts < %(startTimestamp)s - AND project_id = %(project_id)s - AND bsess.user_id = sessions.user_id - LIMIT 1)) - ) AS users_list - LEFT JOIN LATERAL (SELECT DATE_TRUNC('week', to_timestamp(start_ts / 1000)::timestamp) AS connexion_week, - user_id - FROM sessions - WHERE users_list.user_id = sessions.user_id - AND %(startTimestamp)s <=sessions.start_ts - AND sessions.project_id = %(project_id)s - AND sessions.start_ts < (%(endTimestamp)s - 1) - GROUP BY connexion_week, user_id - ) AS connexions_list ON (TRUE) - GROUP BY week - ORDER BY week;""" - - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args)} - print(cur.mogrify(pg_query, params)) - cur.execute(cur.mogrify(pg_query, params)) - rows = cur.fetchall() - rows = __compute_weekly_percentage(helper.list_to_camel_case(rows)) - return { - "startTimestamp": startTimestamp, - "chart": __complete_retention(rows=rows, start_date=startTimestamp, end_date=TimeUTC.now()) - } - - -@dev.timed -def users_acquisition(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], - **args): - startTimestamp = TimeUTC.trunc_week(startTimestamp) - endTimestamp = startTimestamp + 10 * TimeUTC.MS_WEEK - pg_sub_query = __get_constraints(project_id=project_id, data=args, duration=True, main_table="sessions", - time_constraint=True) - pg_sub_query.append("user_id IS NOT NULL") - with pg_client.PostgresClient() as cur: - pg_query = f"""SELECT EXTRACT(EPOCH FROM first_connexion_week::date)::bigint*1000 AS first_connexion_week, - FLOOR(DATE_PART('day', connexion_week - first_connexion_week) / 7)::integer AS week, - COUNT(DISTINCT connexions_list.user_id) AS users_count, - ARRAY_AGG(DISTINCT connexions_list.user_id) AS connected_users - FROM (SELECT user_id, MIN(DATE_TRUNC('week', to_timestamp(start_ts / 1000))) AS first_connexion_week - FROM sessions - WHERE {" AND ".join(pg_sub_query)} - AND NOT EXISTS((SELECT 1 - FROM sessions AS bsess - WHERE bsess.start_ts<%(startTimestamp)s - AND project_id = %(project_id)s - AND bsess.user_id = sessions.user_id - LIMIT 1)) - GROUP BY user_id) AS users_list - LEFT JOIN LATERAL (SELECT DATE_TRUNC('week', to_timestamp(start_ts / 1000)::timestamp) AS connexion_week, - user_id - FROM sessions - WHERE users_list.user_id = sessions.user_id - AND first_connexion_week <= - DATE_TRUNC('week', to_timestamp(sessions.start_ts / 1000)::timestamp) - AND sessions.project_id = %(project_id)s - AND sessions.start_ts < (%(endTimestamp)s - 1) - GROUP BY connexion_week, user_id) AS connexions_list ON (TRUE) - GROUP BY first_connexion_week, week - ORDER BY first_connexion_week, week;""" - - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args)} - print(cur.mogrify(pg_query, params)) - cur.execute(cur.mogrify(pg_query, params)) - rows = cur.fetchall() - rows = __compute_weekly_percentage(helper.list_to_camel_case(rows)) - return { - "startTimestamp": startTimestamp, - "chart": __complete_acquisition(rows=rows, start_date=startTimestamp, end_date=TimeUTC.now()) - } - - -@dev.timed -def feature_retention(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], - **args): - startTimestamp = TimeUTC.trunc_week(startTimestamp) - endTimestamp = startTimestamp + 10 * TimeUTC.MS_WEEK - pg_sub_query = __get_constraints(project_id=project_id, data=args, duration=True, main_table="sessions", - time_constraint=True) - pg_sub_query.append("user_id IS NOT NULL") - pg_sub_query.append("feature.timestamp >= %(startTimestamp)s") - pg_sub_query.append("feature.timestamp < %(endTimestamp)s") - event_type = "PAGES" - event_value = "/" - extra_values = {} - default = True - for f in filters: - if f["type"] == "EVENT_TYPE" and JOURNEY_TYPES.get(f["value"]): - event_type = f["value"] - elif f["type"] == "EVENT_VALUE": - event_value = f["value"] - default = False - elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]: - pg_sub_query.append(f"sessions.user_id = %(user_id)s") - extra_values["user_id"] = f["value"] - event_table = JOURNEY_TYPES[event_type]["table"] - event_column = JOURNEY_TYPES[event_type]["column"] - pg_sub_query.append(f"feature.{event_column} = %(value)s") - - with pg_client.PostgresClient() as cur: - if default: - # get most used value - pg_query = f"""SELECT {event_column} AS value, COUNT(*) AS count - FROM {event_table} AS feature INNER JOIN public.sessions USING (session_id) - WHERE {" AND ".join(pg_sub_query[:-1])} - AND length({event_column}) > 2 - GROUP BY value - ORDER BY count DESC - LIMIT 1;""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - cur.execute(cur.mogrify(pg_query, params)) - row = cur.fetchone() - if row is not None: - event_value = row["value"] - extra_values["value"] = event_value - if len(event_value) > 2: - pg_sub_query.append(f"length({event_column})>2") - pg_query = f"""SELECT FLOOR(DATE_PART('day', connexion_week - to_timestamp(%(startTimestamp)s/1000)) / 7)::integer AS week, - COUNT(DISTINCT connexions_list.user_id) AS users_count, - ARRAY_AGG(DISTINCT connexions_list.user_id) AS connected_users - FROM (SELECT DISTINCT user_id - FROM sessions INNER JOIN {event_table} AS feature USING (session_id) - WHERE {" AND ".join(pg_sub_query)} - AND DATE_PART('week', to_timestamp((sessions.start_ts - %(startTimestamp)s)/1000)) = 1 - AND NOT EXISTS((SELECT 1 - FROM sessions AS bsess INNER JOIN {event_table} AS bfeature USING (session_id) - WHERE bsess.start_ts<%(startTimestamp)s - AND project_id = %(project_id)s - AND bsess.user_id = sessions.user_id - AND bfeature.timestamp<%(startTimestamp)s - AND bfeature.{event_column}=%(value)s - LIMIT 1)) - GROUP BY user_id) AS users_list - LEFT JOIN LATERAL (SELECT DATE_TRUNC('week', to_timestamp(start_ts / 1000)::timestamp) AS connexion_week, - user_id - FROM sessions INNER JOIN {event_table} AS feature USING (session_id) - WHERE users_list.user_id = sessions.user_id - AND %(startTimestamp)s <= sessions.start_ts - AND sessions.project_id = %(project_id)s - AND sessions.start_ts < (%(endTimestamp)s - 1) - AND feature.timestamp >= %(startTimestamp)s - AND feature.timestamp < %(endTimestamp)s - AND feature.{event_column} = %(value)s - GROUP BY connexion_week, user_id) AS connexions_list ON (TRUE) - GROUP BY week - ORDER BY week;""" - - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - print(cur.mogrify(pg_query, params)) - cur.execute(cur.mogrify(pg_query, params)) - rows = cur.fetchall() - rows = __compute_weekly_percentage(helper.list_to_camel_case(rows)) - return { - "startTimestamp": startTimestamp, - "filters": [{"type": "EVENT_TYPE", "value": event_type}, {"type": "EVENT_VALUE", "value": event_value}], - "chart": __complete_retention(rows=rows, start_date=startTimestamp, end_date=TimeUTC.now()) - } - - -@dev.timed -def feature_acquisition(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], - **args): - startTimestamp = TimeUTC.trunc_week(startTimestamp) - endTimestamp = startTimestamp + 10 * TimeUTC.MS_WEEK - pg_sub_query = __get_constraints(project_id=project_id, data=args, duration=True, main_table="sessions", - time_constraint=True) - pg_sub_query.append("user_id IS NOT NULL") - pg_sub_query.append("feature.timestamp >= %(startTimestamp)s") - pg_sub_query.append("feature.timestamp < %(endTimestamp)s") - event_type = "PAGES" - event_value = "/" - extra_values = {} - default = True - for f in filters: - if f["type"] == "EVENT_TYPE" and JOURNEY_TYPES.get(f["value"]): - event_type = f["value"] - elif f["type"] == "EVENT_VALUE": - event_value = f["value"] - default = False - elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]: - pg_sub_query.append(f"sessions.user_id = %(user_id)s") - extra_values["user_id"] = f["value"] - event_table = JOURNEY_TYPES[event_type]["table"] - event_column = JOURNEY_TYPES[event_type]["column"] - - pg_sub_query.append(f"feature.{event_column} = %(value)s") - - with pg_client.PostgresClient() as cur: - if default: - # get most used value - pg_query = f"""SELECT {event_column} AS value, COUNT(*) AS count - FROM {event_table} AS feature INNER JOIN public.sessions USING (session_id) - WHERE {" AND ".join(pg_sub_query[:-1])} - AND length({event_column}) > 2 - GROUP BY value - ORDER BY count DESC - LIMIT 1;""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - cur.execute(cur.mogrify(pg_query, params)) - row = cur.fetchone() - if row is not None: - event_value = row["value"] - extra_values["value"] = event_value - if len(event_value) > 2: - pg_sub_query.append(f"length({event_column})>2") - pg_query = f"""SELECT EXTRACT(EPOCH FROM first_connexion_week::date)::bigint*1000 AS first_connexion_week, - FLOOR(DATE_PART('day', connexion_week - first_connexion_week) / 7)::integer AS week, - COUNT(DISTINCT connexions_list.user_id) AS users_count, - ARRAY_AGG(DISTINCT connexions_list.user_id) AS connected_users - FROM (SELECT user_id, DATE_TRUNC('week', to_timestamp(first_connexion_week / 1000)) AS first_connexion_week - FROM(SELECT DISTINCT user_id, MIN(start_ts) AS first_connexion_week - FROM sessions INNER JOIN {event_table} AS feature USING (session_id) - WHERE {" AND ".join(pg_sub_query)} - AND NOT EXISTS((SELECT 1 - FROM sessions AS bsess INNER JOIN {event_table} AS bfeature USING (session_id) - WHERE bsess.start_ts<%(startTimestamp)s - AND project_id = %(project_id)s - AND bsess.user_id = sessions.user_id - AND bfeature.timestamp<%(startTimestamp)s - AND bfeature.{event_column}=%(value)s - LIMIT 1)) - GROUP BY user_id) AS raw_users_list) AS users_list - LEFT JOIN LATERAL (SELECT DATE_TRUNC('week', to_timestamp(start_ts / 1000)::timestamp) AS connexion_week, - user_id - FROM sessions INNER JOIN {event_table} AS feature USING(session_id) - WHERE users_list.user_id = sessions.user_id - AND first_connexion_week <= - DATE_TRUNC('week', to_timestamp(sessions.start_ts / 1000)::timestamp) - AND sessions.project_id = %(project_id)s - AND sessions.start_ts < (%(endTimestamp)s - 1) - AND feature.timestamp >= %(startTimestamp)s - AND feature.timestamp < %(endTimestamp)s - AND feature.{event_column} = %(value)s - GROUP BY connexion_week, user_id) AS connexions_list ON (TRUE) - GROUP BY first_connexion_week, week - ORDER BY first_connexion_week, week;""" - - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - print(cur.mogrify(pg_query, params)) - cur.execute(cur.mogrify(pg_query, params)) - rows = cur.fetchall() - rows = __compute_weekly_percentage(helper.list_to_camel_case(rows)) - return { - "startTimestamp": startTimestamp, - "filters": [{"type": "EVENT_TYPE", "value": event_type}, {"type": "EVENT_VALUE", "value": event_value}], - "chart": __complete_acquisition(rows=rows, start_date=startTimestamp, end_date=TimeUTC.now()) - } - - -@dev.timed -def feature_popularity_frequency(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], - **args): - startTimestamp = TimeUTC.trunc_week(startTimestamp) - endTimestamp = startTimestamp + 10 * TimeUTC.MS_WEEK - pg_sub_query = __get_constraints(project_id=project_id, data=args, duration=True, main_table="sessions", - time_constraint=True) - event_table = JOURNEY_TYPES["CLICK"]["table"] - event_column = JOURNEY_TYPES["CLICK"]["column"] - extra_values = {} - for f in filters: - if f["type"] == "EVENT_TYPE" and JOURNEY_TYPES.get(f["value"]): - event_table = JOURNEY_TYPES[f["value"]]["table"] - event_column = JOURNEY_TYPES[f["value"]]["column"] - elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]: - pg_sub_query.append(f"sessions.user_id = %(user_id)s") - extra_values["user_id"] = f["value"] - - with pg_client.PostgresClient() as cur: - pg_query = f"""SELECT COUNT(DISTINCT user_id) AS count - FROM sessions - WHERE {" AND ".join(pg_sub_query)} - AND user_id IS NOT NULL;""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - # print(cur.mogrify(pg_query, params)) - # print("---------------------") - cur.execute(cur.mogrify(pg_query, params)) - all_user_count = cur.fetchone()["count"] - if all_user_count == 0: - return [] - pg_sub_query.append("feature.timestamp >= %(startTimestamp)s") - pg_sub_query.append("feature.timestamp < %(endTimestamp)s") - pg_sub_query.append(f"length({event_column})>2") - pg_query = f"""SELECT {event_column} AS value, COUNT(DISTINCT user_id) AS count - FROM {event_table} AS feature INNER JOIN sessions USING (session_id) - WHERE {" AND ".join(pg_sub_query)} - AND user_id IS NOT NULL - GROUP BY value - ORDER BY count DESC - LIMIT 7;""" - # TODO: solve full scan - print(cur.mogrify(pg_query, params)) - print("---------------------") - cur.execute(cur.mogrify(pg_query, params)) - popularity = cur.fetchall() - pg_query = f"""SELECT {event_column} AS value, COUNT(session_id) AS count - FROM {event_table} AS feature INNER JOIN sessions USING (session_id) - WHERE {" AND ".join(pg_sub_query)} - GROUP BY value;""" - # TODO: solve full scan - print(cur.mogrify(pg_query, params)) - print("---------------------") - cur.execute(cur.mogrify(pg_query, params)) - frequencies = cur.fetchall() - total_usage = sum([f["count"] for f in frequencies]) - frequencies = {f["value"]: f["count"] for f in frequencies} - for p in popularity: - p["popularity"] = p.pop("count") / all_user_count - p["frequency"] = frequencies[p["value"]] / total_usage - - return popularity - - -@dev.timed -def feature_adoption(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], - **args): - pg_sub_query = __get_constraints(project_id=project_id, data=args, duration=True, main_table="sessions", - time_constraint=True) - event_type = "CLICK" - event_value = '/' - extra_values = {} - default = True - for f in filters: - if f["type"] == "EVENT_TYPE" and JOURNEY_TYPES.get(f["value"]): - event_type = f["value"] - elif f["type"] == "EVENT_VALUE": - event_value = f["value"] - default = False - elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]: - pg_sub_query.append(f"sessions.user_id = %(user_id)s") - extra_values["user_id"] = f["value"] - event_table = JOURNEY_TYPES[event_type]["table"] - event_column = JOURNEY_TYPES[event_type]["column"] - with pg_client.PostgresClient() as cur: - pg_query = f"""SELECT COUNT(DISTINCT user_id) AS count - FROM sessions - WHERE {" AND ".join(pg_sub_query)} - AND user_id IS NOT NULL;""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - # print(cur.mogrify(pg_query, params)) - # print("---------------------") - cur.execute(cur.mogrify(pg_query, params)) - all_user_count = cur.fetchone()["count"] - if all_user_count == 0: - return {"adoption": 0, "target": 0, "filters": [{"type": "EVENT_TYPE", "value": event_type}, - {"type": "EVENT_VALUE", "value": event_value}], } - pg_sub_query.append("feature.timestamp >= %(startTimestamp)s") - pg_sub_query.append("feature.timestamp < %(endTimestamp)s") - if default: - # get most used value - pg_query = f"""SELECT {event_column} AS value, COUNT(*) AS count - FROM {event_table} AS feature INNER JOIN public.sessions USING (session_id) - WHERE {" AND ".join(pg_sub_query[:-1])} - AND length({event_column}) > 2 - GROUP BY value - ORDER BY count DESC - LIMIT 1;""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - cur.execute(cur.mogrify(pg_query, params)) - row = cur.fetchone() - if row is not None: - event_value = row["value"] - extra_values["value"] = event_value - if len(event_value) > 2: - pg_sub_query.append(f"length({event_column})>2") - pg_sub_query.append(f"feature.{event_column} = %(value)s") - pg_query = f"""SELECT COUNT(DISTINCT user_id) AS count - FROM {event_table} AS feature INNER JOIN sessions USING (session_id) - WHERE {" AND ".join(pg_sub_query)} - AND user_id IS NOT NULL;""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - # print(cur.mogrify(pg_query, params)) - # print("---------------------") - cur.execute(cur.mogrify(pg_query, params)) - adoption = cur.fetchone()["count"] / all_user_count - return {"target": all_user_count, "adoption": adoption, - "filters": [{"type": "EVENT_TYPE", "value": event_type}, {"type": "EVENT_VALUE", "value": event_value}]} - - -@dev.timed -def feature_adoption_top_users(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], **args): - pg_sub_query = __get_constraints(project_id=project_id, data=args, duration=True, main_table="sessions", - time_constraint=True) - pg_sub_query.append("user_id IS NOT NULL") - event_type = "CLICK" - event_value = '/' - extra_values = {} - default = True - for f in filters: - if f["type"] == "EVENT_TYPE" and JOURNEY_TYPES.get(f["value"]): - event_type = f["value"] - elif f["type"] == "EVENT_VALUE": - event_value = f["value"] - default = False - elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]: - pg_sub_query.append(f"sessions.user_id = %(user_id)s") - extra_values["user_id"] = f["value"] - event_table = JOURNEY_TYPES[event_type]["table"] - event_column = JOURNEY_TYPES[event_type]["column"] - with pg_client.PostgresClient() as cur: - pg_sub_query.append("feature.timestamp >= %(startTimestamp)s") - pg_sub_query.append("feature.timestamp < %(endTimestamp)s") - if default: - # get most used value - pg_query = f"""SELECT {event_column} AS value, COUNT(*) AS count - FROM {event_table} AS feature INNER JOIN public.sessions USING (session_id) - WHERE {" AND ".join(pg_sub_query[:-1])} - AND length({event_column}) > 2 - GROUP BY value - ORDER BY count DESC - LIMIT 1;""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - cur.execute(cur.mogrify(pg_query, params)) - row = cur.fetchone() - if row is not None: - event_value = row["value"] - extra_values["value"] = event_value - if len(event_value) > 2: - pg_sub_query.append(f"length({event_column})>2") - pg_sub_query.append(f"feature.{event_column} = %(value)s") - pg_query = f"""SELECT user_id, COUNT(DISTINCT session_id) AS count - FROM {event_table} AS feature - INNER JOIN sessions USING (session_id) - WHERE {" AND ".join(pg_sub_query)} - GROUP BY 1 - ORDER BY 2 DESC - LIMIT 10;""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - # print(cur.mogrify(pg_query, params)) - # print("---------------------") - cur.execute(cur.mogrify(pg_query, params)) - rows = cur.fetchall() - return {"users": helper.list_to_camel_case(rows), - "filters": [{"type": "EVENT_TYPE", "value": event_type}, {"type": "EVENT_VALUE", "value": event_value}]} - - -@dev.timed -def feature_adoption_daily_usage(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], **args): - pg_sub_query = __get_constraints(project_id=project_id, data=args, duration=True, main_table="sessions", - time_constraint=True) - pg_sub_query_chart = __get_constraints(project_id=project_id, time_constraint=True, - chart=True, data=args) - event_type = "CLICK" - event_value = '/' - extra_values = {} - default = True - for f in filters: - if f["type"] == "EVENT_TYPE" and JOURNEY_TYPES.get(f["value"]): - event_type = f["value"] - elif f["type"] == "EVENT_VALUE": - event_value = f["value"] - default = False - elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]: - pg_sub_query_chart.append(f"sessions.user_id = %(user_id)s") - extra_values["user_id"] = f["value"] - event_table = JOURNEY_TYPES[event_type]["table"] - event_column = JOURNEY_TYPES[event_type]["column"] - with pg_client.PostgresClient() as cur: - pg_sub_query_chart.append("feature.timestamp >= %(startTimestamp)s") - pg_sub_query_chart.append("feature.timestamp < %(endTimestamp)s") - pg_sub_query.append("feature.timestamp >= %(startTimestamp)s") - pg_sub_query.append("feature.timestamp < %(endTimestamp)s") - if default: - # get most used value - pg_query = f"""SELECT {event_column} AS value, COUNT(*) AS count - FROM {event_table} AS feature INNER JOIN public.sessions USING (session_id) - WHERE {" AND ".join(pg_sub_query)} - AND length({event_column})>2 - GROUP BY value - ORDER BY count DESC - LIMIT 1;""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - cur.execute(cur.mogrify(pg_query, params)) - row = cur.fetchone() - if row is not None: - event_value = row["value"] - extra_values["value"] = event_value - if len(event_value) > 2: - pg_sub_query.append(f"length({event_column})>2") - pg_sub_query_chart.append(f"feature.{event_column} = %(value)s") - pg_query = f"""SELECT generated_timestamp AS timestamp, - COALESCE(COUNT(session_id), 0) AS count - FROM generate_series(%(startTimestamp)s, %(endTimestamp)s, %(step_size)s) AS generated_timestamp - LEFT JOIN LATERAL ( SELECT DISTINCT session_id - FROM {event_table} AS feature INNER JOIN public.sessions USING (session_id) - WHERE {" AND ".join(pg_sub_query_chart)} - ) AS users ON (TRUE) - GROUP BY generated_timestamp - ORDER BY generated_timestamp;""" - params = {"step_size": TimeUTC.MS_DAY, "project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - print(cur.mogrify(pg_query, params)) - print("---------------------") - cur.execute(cur.mogrify(pg_query, params)) - rows = cur.fetchall() - return {"chart": helper.list_to_camel_case(rows), - "filters": [{"type": "EVENT_TYPE", "value": event_type}, {"type": "EVENT_VALUE", "value": event_value}]} - - -@dev.timed -def feature_intensity(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], - **args): - pg_sub_query = __get_constraints(project_id=project_id, data=args, duration=True, main_table="sessions", - time_constraint=True) - pg_sub_query.append("feature.timestamp >= %(startTimestamp)s") - pg_sub_query.append("feature.timestamp < %(endTimestamp)s") - event_table = JOURNEY_TYPES["CLICK"]["table"] - event_column = JOURNEY_TYPES["CLICK"]["column"] - extra_values = {} - for f in filters: - if f["type"] == "EVENT_TYPE" and JOURNEY_TYPES.get(f["value"]): - event_table = JOURNEY_TYPES[f["value"]]["table"] - event_column = JOURNEY_TYPES[f["value"]]["column"] - elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]: - pg_sub_query.append(f"sessions.user_id = %(user_id)s") - extra_values["user_id"] = f["value"] - pg_sub_query.append(f"length({event_column})>2") - with pg_client.PostgresClient() as cur: - pg_query = f"""SELECT {event_column} AS value, AVG(DISTINCT session_id) AS avg - FROM {event_table} AS feature INNER JOIN sessions USING (session_id) - WHERE {" AND ".join(pg_sub_query)} - GROUP BY value - ORDER BY avg DESC - LIMIT 7;""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - # TODO: solve full scan issue - print(cur.mogrify(pg_query, params)) - print("---------------------") - cur.execute(cur.mogrify(pg_query, params)) - rows = cur.fetchall() - - return rows - - -@dev.timed -def users_active(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], - **args): - pg_sub_query_chart = __get_constraints(project_id=project_id, time_constraint=True, - chart=True, data=args) - - pg_sub_query_chart.append("user_id IS NOT NULL") - period = "DAY" - extra_values = {} - for f in filters: - if f["type"] == "PERIOD" and f["value"] in ["DAY", "WEEK"]: - period = f["value"] - elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]: - pg_sub_query_chart.append(f"sessions.user_id = %(user_id)s") - extra_values["user_id"] = f["value"] - - with pg_client.PostgresClient() as cur: - pg_query = f"""SELECT AVG(count) AS avg, JSONB_AGG(chart) AS chart - FROM (SELECT generated_timestamp AS timestamp, - COALESCE(COUNT(users), 0) AS count - FROM generate_series(%(startTimestamp)s, %(endTimestamp)s, %(step_size)s) AS generated_timestamp - LEFT JOIN LATERAL ( SELECT DISTINCT user_id - FROM public.sessions - WHERE {" AND ".join(pg_sub_query_chart)} - ) AS users ON (TRUE) - GROUP BY generated_timestamp - ORDER BY generated_timestamp) AS chart;""" - params = {"step_size": TimeUTC.MS_DAY if period == "DAY" else TimeUTC.MS_WEEK, - "project_id": project_id, - "startTimestamp": TimeUTC.trunc_day(startTimestamp) if period == "DAY" else TimeUTC.trunc_week( - startTimestamp), - "endTimestamp": endTimestamp, **__get_constraint_values(args), - **extra_values} - # print(cur.mogrify(pg_query, params)) - # print("---------------------") - cur.execute(cur.mogrify(pg_query, params)) - row_users = cur.fetchone() - - return row_users - - -@dev.timed -def users_power(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], **args): - pg_sub_query = __get_constraints(project_id=project_id, time_constraint=True, chart=False, data=args) - pg_sub_query.append("user_id IS NOT NULL") - - with pg_client.PostgresClient() as cur: - pg_query = f"""SELECT AVG(count) AS avg, JSONB_AGG(day_users_partition) AS partition - FROM (SELECT number_of_days, COUNT(user_id) AS count - FROM (SELECT user_id, COUNT(DISTINCT DATE_TRUNC('day', to_timestamp(start_ts / 1000))) AS number_of_days - FROM sessions - WHERE {" AND ".join(pg_sub_query)} - GROUP BY 1) AS users_connexions - GROUP BY number_of_days - ORDER BY number_of_days) AS day_users_partition;""" - params = {"project_id": project_id, - "startTimestamp": startTimestamp, "endTimestamp": endTimestamp, **__get_constraint_values(args)} - # print(cur.mogrify(pg_query, params)) - # print("---------------------") - cur.execute(cur.mogrify(pg_query, params)) - row_users = cur.fetchone() - - return helper.dict_to_camel_case(row_users) - - -@dev.timed -def users_slipping(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], **args): - pg_sub_query = __get_constraints(project_id=project_id, data=args, duration=True, main_table="sessions", - time_constraint=True) - pg_sub_query.append("user_id IS NOT NULL") - pg_sub_query.append("feature.timestamp >= %(startTimestamp)s") - pg_sub_query.append("feature.timestamp < %(endTimestamp)s") - event_type = "PAGES" - event_value = "/" - extra_values = {} - default = True - for f in filters: - if f["type"] == "EVENT_TYPE" and JOURNEY_TYPES.get(f["value"]): - event_type = f["value"] - elif f["type"] == "EVENT_VALUE": - event_value = f["value"] - default = False - elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]: - pg_sub_query.append(f"sessions.user_id = %(user_id)s") - extra_values["user_id"] = f["value"] - event_table = JOURNEY_TYPES[event_type]["table"] - event_column = JOURNEY_TYPES[event_type]["column"] - pg_sub_query.append(f"feature.{event_column} = %(value)s") - - with pg_client.PostgresClient() as cur: - if default: - # get most used value - pg_query = f"""SELECT {event_column} AS value, COUNT(*) AS count - FROM {event_table} AS feature INNER JOIN public.sessions USING (session_id) - WHERE {" AND ".join(pg_sub_query[:-1])} - AND length({event_column}) > 2 - GROUP BY value - ORDER BY count DESC - LIMIT 1;""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - cur.execute(cur.mogrify(pg_query, params)) - row = cur.fetchone() - if row is not None: - event_value = row["value"] - extra_values["value"] = event_value - if len(event_value) > 2: - pg_sub_query.append(f"length({event_column})>2") - pg_query = f"""SELECT user_id, last_time, interactions_count, MIN(start_ts) AS first_seen, MAX(start_ts) AS last_seen - FROM (SELECT user_id, MAX(timestamp) AS last_time, COUNT(DISTINCT session_id) AS interactions_count - FROM {event_table} AS feature INNER JOIN sessions USING (session_id) - WHERE {" AND ".join(pg_sub_query)} - GROUP BY user_id) AS user_last_usage - INNER JOIN sessions USING (user_id) - WHERE EXTRACT(EPOCH FROM now()) * 1000 - last_time > 7 * 24 * 60 * 60 * 1000 - GROUP BY user_id, last_time,interactions_count;""" - - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - # print(cur.mogrify(pg_query, params)) - cur.execute(cur.mogrify(pg_query, params)) - rows = cur.fetchall() - return { - "startTimestamp": startTimestamp, - "filters": [{"type": "EVENT_TYPE", "value": event_type}, {"type": "EVENT_VALUE", "value": event_value}], - "list": helper.list_to_camel_case(rows) - } - - -@dev.timed -def search(text, feature_type, project_id, platform=None): - if not feature_type: - resource_type = "ALL" - data = search(text=text, feature_type=resource_type, project_id=project_id, platform=platform) - return data - - pg_sub_query = __get_constraints(project_id=project_id, time_constraint=True, duration=True, - data={} if platform is None else {"platform": platform}) - - params = {"startTimestamp": TimeUTC.now() - 2 * TimeUTC.MS_MONTH, - "endTimestamp": TimeUTC.now(), - "project_id": project_id, - "value": helper.string_to_sql_like(text.lower()), - "platform_0": platform} - if feature_type == "ALL": - with pg_client.PostgresClient() as cur: - sub_queries = [] - for e in JOURNEY_TYPES: - sub_queries.append(f"""(SELECT DISTINCT {JOURNEY_TYPES[e]["column"]} AS value, '{e}' AS "type" - FROM {JOURNEY_TYPES[e]["table"]} INNER JOIN public.sessions USING(session_id) - WHERE {" AND ".join(pg_sub_query)} AND {JOURNEY_TYPES[e]["column"]} ILIKE %(value)s - LIMIT 10)""") - pg_query = "UNION ALL".join(sub_queries) - # print(cur.mogrify(pg_query, params)) - cur.execute(cur.mogrify(pg_query, params)) - rows = cur.fetchall() - elif JOURNEY_TYPES.get(feature_type) is not None: - with pg_client.PostgresClient() as cur: - pg_query = f"""SELECT DISTINCT {JOURNEY_TYPES[feature_type]["column"]} AS value, '{feature_type}' AS "type" - FROM {JOURNEY_TYPES[feature_type]["table"]} INNER JOIN public.sessions USING(session_id) - WHERE {" AND ".join(pg_sub_query)} AND {JOURNEY_TYPES[feature_type]["column"]} ILIKE %(value)s - LIMIT 10;""" - # print(cur.mogrify(pg_query, params)) - cur.execute(cur.mogrify(pg_query, params)) - rows = cur.fetchall() - else: - return [] - return [helper.dict_to_camel_case(row) for row in rows] diff --git a/ee/api/app.py b/ee/api/app.py index 1cde5efe4..cc901c6f8 100644 --- a/ee/api/app.py +++ b/ee/api/app.py @@ -7,7 +7,7 @@ from chalicelib.blueprints import bp_authorizers from chalicelib.blueprints import bp_core, bp_core_crons from chalicelib.blueprints.app import v1_api from chalicelib.blueprints import bp_core_dynamic, bp_core_dynamic_crons -from chalicelib.blueprints.subs import bp_dashboard,bp_insights +from chalicelib.blueprints.subs import bp_dashboard from chalicelib.utils import helper from chalicelib.utils import pg_client from chalicelib.utils.helper import environ @@ -122,7 +122,6 @@ app.register_blueprint(bp_core_crons.app) app.register_blueprint(bp_core_dynamic.app) app.register_blueprint(bp_core_dynamic_crons.app) app.register_blueprint(bp_dashboard.app) -app.register_blueprint(bp_insights.app) app.register_blueprint(v1_api.app) # Enterprise app.register_blueprint(bp_ee.app) diff --git a/ee/api/chalicelib/blueprints/subs/bp_insights.py b/ee/api/chalicelib/blueprints/subs/bp_insights.py deleted file mode 100644 index 6546bfd12..000000000 --- a/ee/api/chalicelib/blueprints/subs/bp_insights.py +++ /dev/null @@ -1,163 +0,0 @@ -from chalice import Blueprint -from chalicelib.utils import helper -from chalicelib import _overrides - -from chalicelib.core import dashboard, insights -from chalicelib.core import metadata - -app = Blueprint(__name__) -_overrides.chalice_app(app) - - -@app.route('/{projectId}/insights/journey', methods=['GET', 'POST']) -def get_insights_journey(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.journey(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/users_acquisition', methods=['GET', 'POST']) -def get_users_acquisition(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.users_acquisition(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/users_retention', methods=['GET', 'POST']) -def get_users_retention(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.users_retention(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/feature_retention', methods=['GET', 'POST']) -def get_feature_rentention(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.feature_retention(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/feature_acquisition', methods=['GET', 'POST']) -def get_feature_acquisition(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.feature_acquisition(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/feature_popularity_frequency', methods=['GET', 'POST']) -def get_feature_popularity_frequency(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.feature_popularity_frequency(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/feature_intensity', methods=['GET', 'POST']) -def get_feature_intensity(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.feature_intensity(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/feature_adoption', methods=['GET', 'POST']) -def get_feature_adoption(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.feature_adoption(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/feature_adoption_top_users', methods=['GET', 'POST']) -def get_feature_adoption(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.feature_adoption_top_users(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/feature_adoption_daily_usage', methods=['GET', 'POST']) -def get_feature_adoption_daily_usage(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.feature_adoption_daily_usage(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/users_active', methods=['GET', 'POST']) -def get_users_active(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.users_active(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/users_power', methods=['GET', 'POST']) -def get_users_power(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.users_power(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/users_slipping', methods=['GET', 'POST']) -def get_users_slipping(projectId, context): - data = app.current_request.json_body - if data is None: - data = {} - params = app.current_request.query_params - args = dashboard.dashboard_args(params) - - return {"data": insights.users_slipping(project_id=projectId, **{**data, **args})} - - -@app.route('/{projectId}/insights/search', methods=['GET']) -def get_insights_autocomplete(projectId, context): - params = app.current_request.query_params - if params is None or params.get('q') is None or len(params.get('q')) == 0: - return {"data": []} - # params['q'] = '^' + params['q'] - - return {'data': insights.search(params.get('q', ''), project_id=projectId, - platform=params.get('platform', None), feature_type=params.get("key"))} diff --git a/ee/api/chalicelib/core/insights.py b/ee/api/chalicelib/core/insights.py deleted file mode 100644 index 2a8febe5d..000000000 --- a/ee/api/chalicelib/core/insights.py +++ /dev/null @@ -1,1047 +0,0 @@ -from chalicelib.core import sessions_metas -from chalicelib.utils import helper, dev -from chalicelib.utils import ch_client -from chalicelib.utils.TimeUTC import TimeUTC -from chalicelib.core.dashboard import __get_constraint_values, __complete_missing_steps -from chalicelib.core.dashboard import __get_basic_constraints, __get_meta_constraint - - -def __transform_journey(rows): - nodes = [] - links = [] - for r in rows: - source = r["source_event"][r["source_event"].index("_") + 1:] - target = r["target_event"][r["target_event"].index("_") + 1:] - if source not in nodes: - nodes.append(source) - if target not in nodes: - nodes.append(target) - links.append({"source": nodes.index(source), "target": nodes.index(target), "value": r["value"]}) - return {"nodes": nodes, "links": sorted(links, key=lambda x: x["value"], reverse=True)} - - -JOURNEY_DEPTH = 5 -JOURNEY_TYPES = { - "PAGES": {"table": "pages", "column": "url_path"}, - "CLICK": {"table": "clicks", "column": "label"}, - # "VIEW": {"table": "events_ios.views", "column": "name"}, TODO: enable this for SAAS only - "EVENT": {"table": "customs", "column": "name"} -} - - -@dev.timed -def journey(project_id, startTimestamp=TimeUTC.now(delta_days=-1), endTimestamp=TimeUTC.now(), filters=[], **args): - event_start = None - event_table = JOURNEY_TYPES["CLICK"]["table"] - event_column = JOURNEY_TYPES["CLICK"]["column"] - extra_values = {} - meta_condition = [] - for f in filters: - if f["type"] == "START_POINT": - event_start = f["value"] - elif f["type"] == "EVENT_TYPE" and JOURNEY_TYPES.get(f["value"]): - event_table = JOURNEY_TYPES[f["value"]]["table"] - event_column = JOURNEY_TYPES[f["value"]]["column"] - elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]: - meta_condition.append(f"sessions_metadata.user_id = %(user_id)s") - meta_condition.append(f"sessions_metadata.project_id = %(project_id)s") - meta_condition.append(f"sessions_metadata.datetime >= toDateTime(%(startTimestamp)s / 1000)") - meta_condition.append(f"sessions_metadata.datetime < toDateTime(%(endTimestamp)s / 1000)") - extra_values["user_id"] = f["value"] - ch_sub_query = __get_basic_constraints(table_name=event_table, data=args) - meta_condition += __get_meta_constraint(args) - ch_sub_query += meta_condition - with ch_client.ClickHouseClient() as ch: - ch_query = f"""SELECT source_event, - target_event, - count(*) AS value - FROM (SELECT toString(event_number) || '_' || value AS target_event, - lagInFrame(toString(event_number) || '_' || value) OVER (PARTITION BY session_rank ORDER BY datetime ASC ROWS - BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS source_event - FROM (SELECT session_rank, - datetime, - value, - row_number AS event_number - FROM (SELECT session_rank, - groupArray(datetime) AS arr_datetime, - groupArray(value) AS arr_value, - arrayEnumerate(arr_datetime) AS row_number - {f"FROM (SELECT * FROM (SELECT *, MIN(mark) OVER ( PARTITION BY session_id , session_rank ORDER BY datetime ) AS max FROM (SELECT *, CASE WHEN value = %(event_start)s THEN datetime ELSE NULL END as mark" if event_start else ""} - FROM (SELECT session_id, - datetime, - value, - SUM(new_session) OVER (ORDER BY session_id, datetime) AS session_rank - FROM (SELECT *, - if(equals(source_timestamp, '1970-01-01'), 1, 0) AS new_session - FROM (SELECT session_id, - datetime, - {event_column} AS value, - lagInFrame(datetime) OVER (PARTITION BY session_id ORDER BY datetime ASC ROWS - BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS source_timestamp - FROM {event_table} {"INNER JOIN sessions_metadata USING(session_id)" if len(meta_condition) > 0 else ""} - WHERE {" AND ".join(ch_sub_query)} - ORDER BY session_id, datetime) AS related_events) AS ranked_events - ORDER BY session_rank, datetime - ) AS processed - {") AS marked) AS maxed WHERE datetime >= max) AS filtered" if event_start else ""} - GROUP BY session_rank - ORDER BY session_rank) - ARRAY JOIN - arr_datetime AS datetime, - arr_value AS value, - row_number - ORDER BY session_rank ASC, - row_number ASC) AS sorted_events - WHERE event_number <= %(JOURNEY_DEPTH)s) AS final - WHERE not empty(source_event) - AND not empty(target_event) - GROUP BY source_event, target_event - ORDER BY value DESC - LIMIT 20;""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, "event_start": event_start, "JOURNEY_DEPTH": JOURNEY_DEPTH, - **__get_constraint_values(args), **extra_values} - - rows = ch.execute(query=ch_query, params=params) - # print(ch_query % params) - return __transform_journey(rows) - - -def __compute_weekly_percentage(rows): - if rows is None or len(rows) == 0: - return rows - t = -1 - for r in rows: - if r["week"] == 0: - t = r["usersCount"] - r["percentage"] = r["usersCount"] / t - return rows - - -def __complete_retention(rows, start_date, end_date=None): - if rows is None: - return [] - max_week = 10 - for i in range(max_week): - if end_date is not None and start_date + i * TimeUTC.MS_WEEK >= end_date: - break - neutral = { - "firstConnexionWeek": start_date, - "week": i, - "usersCount": 0, - "connectedUsers": [], - "percentage": 0 - } - if i < len(rows) \ - and i != rows[i]["week"]: - rows.insert(i, neutral) - elif i >= len(rows): - rows.append(neutral) - return rows - - -def __complete_acquisition(rows, start_date, end_date=None): - if rows is None: - return [] - max_week = 10 - week = 0 - delta_date = 0 - while max_week > 0: - start_date += TimeUTC.MS_WEEK - if end_date is not None and start_date >= end_date: - break - delta = 0 - if delta_date + week >= len(rows) \ - or delta_date + week < len(rows) and rows[delta_date + week]["firstConnexionWeek"] > start_date: - for i in range(max_week): - if end_date is not None and start_date + i * TimeUTC.MS_WEEK >= end_date: - break - - neutral = { - "firstConnexionWeek": start_date, - "week": i, - "usersCount": 0, - "connectedUsers": [], - "percentage": 0 - } - rows.insert(delta_date + week + i, neutral) - delta = i - else: - for i in range(max_week): - if end_date is not None and start_date + i * TimeUTC.MS_WEEK >= end_date: - break - - neutral = { - "firstConnexionWeek": start_date, - "week": i, - "usersCount": 0, - "connectedUsers": [], - "percentage": 0 - } - if delta_date + week + i < len(rows) \ - and i != rows[delta_date + week + i]["week"]: - rows.insert(delta_date + week + i, neutral) - elif delta_date + week + i >= len(rows): - rows.append(neutral) - delta = i - week += delta - max_week -= 1 - delta_date += 1 - return rows - - -@dev.timed -def users_retention(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), filters=[], - **args): - startTimestamp = TimeUTC.trunc_week(startTimestamp) - endTimestamp = startTimestamp + 10 * TimeUTC.MS_WEEK - ch_sub_query = __get_basic_constraints(table_name='sessions_metadata', data=args) - meta_condition = __get_meta_constraint(args) - ch_sub_query += meta_condition - ch_sub_query.append("sessions_metadata.user_id IS NOT NULL") - ch_sub_query.append("not empty(sessions_metadata.user_id)") - with ch_client.ClickHouseClient() as ch: - ch_query = f"""SELECT toInt8((connexion_week - toDate(%(startTimestamp)s / 1000)) / 7) AS week, - COUNT(all_connexions.user_id) AS users_count, - groupArray(100)(all_connexions.user_id) AS connected_users - FROM (SELECT DISTINCT user_id - FROM sessions_metadata - WHERE {" AND ".join(ch_sub_query)} - AND toStartOfWeek(sessions_metadata.datetime,1) = toDate(%(startTimestamp)s / 1000) - AND sessions_metadata.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 ) - AND isNull((SELECT 1 - FROM sessions_metadata AS bmsess - WHERE bmsess.datetime < toDateTime(%(startTimestamp)s / 1000) - AND bmsess.project_id = %(project_id)s - AND bmsess.user_id = sessions_metadata.user_id - LIMIT 1)) - ) AS users_list - INNER JOIN (SELECT DISTINCT user_id, toStartOfWeek(datetime,1) AS connexion_week - FROM sessions_metadata - WHERE {" AND ".join(ch_sub_query)} - ) AS all_connexions USING (user_id) - GROUP BY connexion_week - ORDER BY connexion_week;""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args)} - # print(ch_query % params) - rows = ch.execute(ch_query, params) - rows = __compute_weekly_percentage(helper.list_to_camel_case(rows)) - return { - "startTimestamp": startTimestamp, - "chart": __complete_retention(rows=rows, start_date=startTimestamp, end_date=TimeUTC.now()) - } - - -@dev.timed -def users_acquisition(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], **args): - startTimestamp = TimeUTC.trunc_week(startTimestamp) - endTimestamp = startTimestamp + 10 * TimeUTC.MS_WEEK - ch_sub_query = __get_basic_constraints(table_name='sessions_metadata', data=args) - meta_condition = __get_meta_constraint(args) - ch_sub_query += meta_condition - ch_sub_query.append("sessions_metadata.user_id IS NOT NULL") - ch_sub_query.append("not empty(sessions_metadata.user_id)") - ch_sub_query.append("sessions_metadata.datetime >= toDateTime(%(startTimestamp)s / 1000)") - with ch_client.ClickHouseClient() as ch: - ch_query = f"""SELECT toUnixTimestamp(toDateTime(first_connexion_week))*1000 AS first_connexion_week, - week, - users_count, - connected_users - FROM ( - SELECT first_connexion_week, - toInt8((connexion_week - first_connexion_week) / 7) AS week, - COUNT(DISTINCT all_connexions.user_id) AS users_count, - groupArray(20)(all_connexions.user_id) AS connected_users - FROM (SELECT user_id, MIN(toStartOfWeek(sessions_metadata.datetime, 1)) AS first_connexion_week - FROM sessions_metadata - WHERE {" AND ".join(ch_sub_query)} - AND sessions_metadata.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 ) - AND isNull((SELECT 1 - FROM sessions_metadata AS bmsess - WHERE bmsess.datetime < toDateTime(%(startTimestamp)s / 1000) - AND bmsess.project_id = %(project_id)s - AND bmsess.user_id = sessions_metadata.user_id - LIMIT 1)) - GROUP BY user_id) AS users_list - INNER JOIN (SELECT DISTINCT user_id, toStartOfWeek(datetime, 1) AS connexion_week - FROM sessions_metadata - WHERE {" AND ".join(ch_sub_query)} - ORDER BY connexion_week, user_id - ) AS all_connexions USING (user_id) - WHERE first_connexion_week <= connexion_week - GROUP BY first_connexion_week, week - ORDER BY first_connexion_week, week - ) AS full_data;""" - - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args)} - # print(ch_query % params) - rows = ch.execute(ch_query, params) - rows = __compute_weekly_percentage(helper.list_to_camel_case(rows)) - return { - "startTimestamp": startTimestamp, - "chart": __complete_acquisition(rows=rows, start_date=startTimestamp, end_date=TimeUTC.now()) - } - - -@dev.timed -def feature_retention(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], **args): - startTimestamp = TimeUTC.trunc_week(startTimestamp) - endTimestamp = startTimestamp + 10 * TimeUTC.MS_WEEK - ch_sub_query = __get_basic_constraints(table_name='feature', data=args) - meta_condition = __get_meta_constraint(args) - event_type = "PAGES" - event_value = "/" - extra_values = {} - default = True - for f in filters: - if f["type"] == "EVENT_TYPE" and JOURNEY_TYPES.get(f["value"]): - event_type = f["value"] - elif f["type"] == "EVENT_VALUE": - event_value = f["value"] - default = False - elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]: - meta_condition.append(f"sessions_metadata.user_id = %(user_id)s") - meta_condition.append("sessions_metadata.user_id IS NOT NULL") - meta_condition.append("not empty(sessions_metadata.user_id)") - meta_condition.append("sessions_metadata.project_id = %(project_id)s") - meta_condition.append("sessions_metadata.datetime >= toDateTime(%(startTimestamp)s/1000)") - meta_condition.append("sessions_metadata.datetime < toDateTime(%(endTimestamp)s/1000)") - extra_values["user_id"] = f["value"] - event_table = JOURNEY_TYPES[event_type]["table"] - event_column = JOURNEY_TYPES[event_type]["column"] - - with ch_client.ClickHouseClient() as ch: - if default: - # get most used value - ch_query = f"""SELECT {event_column} AS value, COUNT(*) AS count - FROM {event_table} AS feature - {"INNER JOIN sessions_metadata USING (session_id)" if len(meta_condition) > 0 else ""} - WHERE {" AND ".join(ch_sub_query)} - GROUP BY value - ORDER BY count DESC - LIMIT 1;""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - # print(ch_query% params) - row = ch.execute(ch_query, params) - if len(row) > 0: - event_value = row[0]["value"] - else: - print(f"no {event_table} most used value") - return { - "startTimestamp": startTimestamp, - "filters": [{"type": "EVENT_TYPE", "value": event_type}, - {"type": "EVENT_VALUE", "value": ""}], - "chart": __complete_retention(rows=[], start_date=startTimestamp, end_date=TimeUTC.now()) - } - extra_values["value"] = event_value - if len(meta_condition) == 0: - meta_condition.append("sessions_metadata.user_id IS NOT NULL") - meta_condition.append("not empty(sessions_metadata.user_id)") - meta_condition.append("sessions_metadata.project_id = %(project_id)s") - meta_condition.append("sessions_metadata.datetime >= toDateTime(%(startTimestamp)s/1000)") - meta_condition.append("sessions_metadata.datetime < toDateTime(%(endTimestamp)s/1000)") - ch_sub_query += meta_condition - ch_sub_query.append(f"feature.{event_column} = %(value)s") - ch_query = f"""SELECT toInt8((connexion_week - toDate(%(startTimestamp)s / 1000)) / 7) AS week, - COUNT(DISTINCT all_connexions.user_id) AS users_count, - groupArray(100)(all_connexions.user_id) AS connected_users - FROM (SELECT DISTINCT user_id - FROM {event_table} AS feature INNER JOIN sessions_metadata USING (session_id) - WHERE {" AND ".join(ch_sub_query)} - AND toStartOfWeek(feature.datetime,1) = toDate(%(startTimestamp)s / 1000) - AND sessions_metadata.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 ) - AND feature.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 ) - AND isNull((SELECT 1 - FROM {event_table} AS bsess INNER JOIN sessions_metadata AS bmsess USING (session_id) - WHERE bsess.datetime < toDateTime(%(startTimestamp)s / 1000) - AND bmsess.datetime < toDateTime(%(startTimestamp)s / 1000) - AND bsess.project_id = %(project_id)s - AND bmsess.project_id = %(project_id)s - AND bmsess.user_id = sessions_metadata.user_id - AND bsess.{event_column}=%(value)s - LIMIT 1)) - ) AS users_list - INNER JOIN (SELECT DISTINCT user_id, toStartOfWeek(datetime,1) AS connexion_week - FROM {event_table} AS feature INNER JOIN sessions_metadata USING (session_id) - WHERE {" AND ".join(ch_sub_query)} - ORDER BY connexion_week, user_id - ) AS all_connexions USING (user_id) - GROUP BY connexion_week - ORDER BY connexion_week;""" - - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - print(ch_query % params) - rows = ch.execute(ch_query, params) - rows = __compute_weekly_percentage(helper.list_to_camel_case(rows)) - return { - "startTimestamp": startTimestamp, - "filters": [{"type": "EVENT_TYPE", "value": event_type}, {"type": "EVENT_VALUE", "value": event_value}], - "chart": __complete_retention(rows=rows, start_date=startTimestamp, end_date=TimeUTC.now()) - } - - -@dev.timed -def feature_acquisition(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], **args): - startTimestamp = TimeUTC.trunc_week(startTimestamp) - endTimestamp = startTimestamp + 10 * TimeUTC.MS_WEEK - ch_sub_query = __get_basic_constraints(table_name='feature', data=args) - meta_condition = __get_meta_constraint(args) - - event_type = "PAGES" - event_value = "/" - extra_values = {} - default = True - for f in filters: - if f["type"] == "EVENT_TYPE" and JOURNEY_TYPES.get(f["value"]): - event_type = f["value"] - elif f["type"] == "EVENT_VALUE": - event_value = f["value"] - default = False - elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]: - meta_condition.append(f"sessions_metadata.user_id = %(user_id)s") - meta_condition.append("sessions_metadata.user_id IS NOT NULL") - meta_condition.append("not empty(sessions_metadata.user_id)") - meta_condition.append("sessions_metadata.project_id = %(project_id)s") - meta_condition.append("sessions_metadata.datetime >= toDateTime(%(startTimestamp)s/1000)") - meta_condition.append("sessions_metadata.datetime < toDateTime(%(endTimestamp)s/1000)") - - extra_values["user_id"] = f["value"] - event_table = JOURNEY_TYPES[event_type]["table"] - event_column = JOURNEY_TYPES[event_type]["column"] - with ch_client.ClickHouseClient() as ch: - if default: - # get most used value - ch_query = f"""SELECT {event_column} AS value, COUNT(*) AS count - FROM {event_table} AS feature - {"INNER JOIN sessions_metadata USING (session_id)" if len(meta_condition) > 0 else ""} - WHERE {" AND ".join(ch_sub_query)} - GROUP BY value - ORDER BY count DESC - LIMIT 1;""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - # print(ch_query% params) - row = ch.execute(ch_query, params) - if len(row) > 0: - event_value = row[0]["value"] - else: - print(f"no {event_table} most used value") - return { - "startTimestamp": startTimestamp, - "filters": [{"type": "EVENT_TYPE", "value": event_type}, - {"type": "EVENT_VALUE", "value": ""}], - "chart": __complete_acquisition(rows=[], start_date=startTimestamp, end_date=TimeUTC.now()) - } - extra_values["value"] = event_value - - if len(meta_condition) == 0: - meta_condition.append("sessions_metadata.project_id = %(project_id)s") - meta_condition.append("sessions_metadata.user_id IS NOT NULL") - meta_condition.append("not empty(sessions_metadata.user_id)") - meta_condition.append("sessions_metadata.datetime >= toDateTime(%(startTimestamp)s/1000)") - meta_condition.append("sessions_metadata.datetime < toDateTime(%(endTimestamp)s/1000)") - - ch_sub_query += meta_condition - ch_sub_query.append(f"feature.{event_column} = %(value)s") - ch_query = f"""SELECT toUnixTimestamp(toDateTime(first_connexion_week))*1000 AS first_connexion_week, - week, - users_count, - connected_users - FROM ( - SELECT first_connexion_week, - toInt8((connexion_week - first_connexion_week) / 7) AS week, - COUNT(DISTINCT all_connexions.user_id) AS users_count, - groupArray(100)(all_connexions.user_id) AS connected_users - FROM (SELECT user_id, MIN(toStartOfWeek(feature.datetime, 1)) AS first_connexion_week - FROM sessions_metadata INNER JOIN {event_table} AS feature USING (session_id) - WHERE {" AND ".join(ch_sub_query)} - AND sessions_metadata.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 ) - AND feature.datetime < toDateTime(%(startTimestamp)s/1000 + 8 * 24 * 60 * 60 ) - AND isNull((SELECT 1 - FROM sessions_metadata AS bmsess - INNER JOIN {event_table} AS bsess USING (session_id) - WHERE bsess.datetime < toDateTime(%(startTimestamp)s / 1000) - AND bmsess.datetime < toDateTime(%(startTimestamp)s / 1000) - AND bsess.project_id = %(project_id)s - AND bmsess.project_id = %(project_id)s - AND bmsess.user_id = sessions_metadata.user_id - AND bsess.{event_column} = %(value)s - LIMIT 1)) - GROUP BY user_id) AS users_list - INNER JOIN (SELECT DISTINCT user_id, toStartOfWeek(datetime, 1) AS connexion_week - FROM sessions_metadata INNER JOIN {event_table} AS feature USING (session_id) - WHERE {" AND ".join(ch_sub_query)} - ORDER BY connexion_week, user_id - ) AS all_connexions USING (user_id) - WHERE first_connexion_week <= connexion_week - GROUP BY first_connexion_week, week - ORDER BY first_connexion_week, week - ) AS full_data;""" - - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - print(ch_query % params) - rows = ch.execute(ch_query, params) - rows = __compute_weekly_percentage(helper.list_to_camel_case(rows)) - return { - "startTimestamp": startTimestamp, - "filters": [{"type": "EVENT_TYPE", "value": event_type}, {"type": "EVENT_VALUE", "value": event_value}], - "chart": __complete_acquisition(rows=rows, start_date=startTimestamp, end_date=TimeUTC.now()) - } - - -@dev.timed -def feature_popularity_frequency(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], **args): - startTimestamp = TimeUTC.trunc_week(startTimestamp) - endTimestamp = startTimestamp + 10 * TimeUTC.MS_WEEK - ch_sub_query = __get_basic_constraints(table_name='feature', data=args) - meta_condition = __get_meta_constraint(args) - - event_table = JOURNEY_TYPES["CLICK"]["table"] - event_column = JOURNEY_TYPES["CLICK"]["column"] - extra_values = {} - for f in filters: - if f["type"] == "EVENT_TYPE" and JOURNEY_TYPES.get(f["value"]): - event_table = JOURNEY_TYPES[f["value"]]["table"] - event_column = JOURNEY_TYPES[f["value"]]["column"] - elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]: - meta_condition.append(f"sessions_metadata.user_id = %(user_id)s") - meta_condition.append("sessions_metadata.user_id IS NOT NULL") - meta_condition.append("not empty(sessions_metadata.user_id)") - meta_condition.append("sessions_metadata.project_id = %(project_id)s") - meta_condition.append("sessions_metadata.datetime >= toDateTime(%(startTimestamp)s/1000)") - meta_condition.append("sessions_metadata.datetime < toDateTime(%(endTimestamp)s/1000)") - extra_values["user_id"] = f["value"] - - with ch_client.ClickHouseClient() as ch: - if len(meta_condition) == 0: - meta_condition.append("sessions_metadata.user_id IS NOT NULL") - meta_condition.append("not empty(sessions_metadata.user_id)") - meta_condition.append("sessions_metadata.project_id = %(project_id)s") - meta_condition.append("sessions_metadata.datetime >= toDateTime(%(startTimestamp)s/1000)") - meta_condition.append("sessions_metadata.datetime < toDateTime(%(endTimestamp)s/1000)") - ch_sub_query += meta_condition - ch_query = f"""SELECT COUNT(DISTINCT user_id) AS count - FROM sessions_metadata - WHERE {" AND ".join(meta_condition)};""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - # print(ch_query % params) - # print("---------------------") - all_user_count = ch.execute(ch_query, params) - if len(all_user_count) == 0 or all_user_count[0]["count"] == 0: - return [] - all_user_count = all_user_count[0]["count"] - ch_query = f"""SELECT {event_column} AS value, COUNT(DISTINCT user_id) AS count - FROM {event_table} AS feature INNER JOIN sessions_metadata USING (session_id) - WHERE {" AND ".join(ch_sub_query)} - AND length({event_column})>2 - GROUP BY value - ORDER BY count DESC - LIMIT 7;""" - - # print(ch_query % params) - # print("---------------------") - popularity = ch.execute(ch_query, params) - params["values"] = [p["value"] for p in popularity] - if len(params["values"]) == 0: - return [] - ch_query = f"""SELECT {event_column} AS value, COUNT(session_id) AS count - FROM {event_table} AS feature INNER JOIN sessions_metadata USING (session_id) - WHERE {" AND ".join(ch_sub_query)} - AND {event_column} IN %(values)s - GROUP BY value;""" - - # print(ch_query % params) - # print("---------------------") - frequencies = ch.execute(ch_query, params) - total_usage = sum([f["count"] for f in frequencies]) - frequencies = {f["value"]: f["count"] for f in frequencies} - for p in popularity: - p["popularity"] = p.pop("count") / all_user_count - p["frequency"] = frequencies[p["value"]] / total_usage - - return popularity - - -@dev.timed -def feature_adoption(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], **args): - event_type = "CLICK" - event_value = '/' - extra_values = {} - default = True - meta_condition = [] - for f in filters: - if f["type"] == "EVENT_TYPE" and JOURNEY_TYPES.get(f["value"]): - event_type = f["value"] - elif f["type"] == "EVENT_VALUE": - event_value = f["value"] - default = False - elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]: - meta_condition.append(f"sessions_metadata.user_id = %(user_id)s") - meta_condition.append("sessions_metadata.user_id IS NOT NULL") - meta_condition.append("not empty(sessions_metadata.user_id)") - meta_condition.append("sessions_metadata.project_id = %(project_id)s") - meta_condition.append("sessions_metadata.datetime >= toDateTime(%(startTimestamp)s/1000)") - meta_condition.append("sessions_metadata.datetime < toDateTime(%(endTimestamp)s/1000)") - extra_values["user_id"] = f["value"] - event_table = JOURNEY_TYPES[event_type]["table"] - event_column = JOURNEY_TYPES[event_type]["column"] - - ch_sub_query = __get_basic_constraints(table_name='feature', data=args) - meta_condition += __get_meta_constraint(args) - ch_sub_query += meta_condition - with ch_client.ClickHouseClient() as ch: - if default: - # get most used value - ch_query = f"""SELECT {event_column} AS value, COUNT(*) AS count - FROM {event_table} AS feature - {"INNER JOIN sessions_metadata USING (session_id)" if len(meta_condition) > 0 else ""} - WHERE {" AND ".join(ch_sub_query)} - GROUP BY value - ORDER BY count DESC - LIMIT 1;""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - # print(ch_query % params) - # print("---------------------") - row = ch.execute(ch_query, params) - if len(row) > 0: - event_value = row[0]["value"] - # else: - # print(f"no {event_table} most used value") - # return {"target": 0, "adoption": 0, - # "filters": [{"type": "EVENT_TYPE", "value": event_type}, {"type": "EVENT_VALUE", "value": ""}]} - - extra_values["value"] = event_value - - if len(meta_condition) == 0: - meta_condition.append("sessions_metadata.project_id = %(project_id)s") - meta_condition.append("sessions_metadata.datetime >= toDateTime(%(startTimestamp)s/1000)") - meta_condition.append("sessions_metadata.datetime < toDateTime(%(endTimestamp)s/1000)") - meta_condition.append("sessions_metadata.user_id IS NOT NULL") - meta_condition.append("not empty(sessions_metadata.user_id)") - ch_sub_query += meta_condition - ch_query = f"""SELECT COUNT(DISTINCT user_id) AS count - FROM sessions_metadata - WHERE {" AND ".join(meta_condition)};""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - # print(ch_query % params) - # print("---------------------") - all_user_count = ch.execute(ch_query, params) - if len(all_user_count) == 0 or all_user_count[0]["count"] == 0: - return {"adoption": 0, "target": 0, "filters": [{"type": "EVENT_TYPE", "value": event_type}, - {"type": "EVENT_VALUE", "value": event_value}], } - all_user_count = all_user_count[0]["count"] - - ch_sub_query.append(f"feature.{event_column} = %(value)s") - ch_query = f"""SELECT COUNT(DISTINCT user_id) AS count - FROM {event_table} AS feature INNER JOIN sessions_metadata USING (session_id) - WHERE {" AND ".join(ch_sub_query)};""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - # print(ch_query % params) - # print("---------------------") - adoption = ch.execute(ch_query, params) - adoption = adoption[0]["count"] / all_user_count - return {"target": all_user_count, "adoption": adoption, - "filters": [{"type": "EVENT_TYPE", "value": event_type}, {"type": "EVENT_VALUE", "value": event_value}]} - - -@dev.timed -def feature_adoption_top_users(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], **args): - event_type = "CLICK" - event_value = '/' - extra_values = {} - default = True - meta_condition = [] - for f in filters: - if f["type"] == "EVENT_TYPE" and JOURNEY_TYPES.get(f["value"]): - event_type = f["value"] - elif f["type"] == "EVENT_VALUE": - event_value = f["value"] - default = False - elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]: - meta_condition.append(f"sessions_metadata.user_id = %(user_id)s") - meta_condition.append("user_id IS NOT NULL") - meta_condition.append("not empty(sessions_metadata.user_id)") - meta_condition.append("sessions_metadata.project_id = %(project_id)s") - meta_condition.append("sessions_metadata.datetime >= toDateTime(%(startTimestamp)s/1000)") - meta_condition.append("sessions_metadata.datetime < toDateTime(%(endTimestamp)s/1000)") - extra_values["user_id"] = f["value"] - event_table = JOURNEY_TYPES[event_type]["table"] - event_column = JOURNEY_TYPES[event_type]["column"] - ch_sub_query = __get_basic_constraints(table_name='feature', data=args) - meta_condition += __get_meta_constraint(args) - ch_sub_query += meta_condition - - with ch_client.ClickHouseClient() as ch: - if default: - # get most used value - ch_query = f"""SELECT {event_column} AS value, COUNT(*) AS count - FROM {event_table} AS feature - {"INNER JOIN sessions_metadata USING (session_id)" if len(meta_condition) > 0 else ""} - WHERE {" AND ".join(ch_sub_query)} - GROUP BY value - ORDER BY count DESC - LIMIT 1;""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - row = ch.execute(ch_query, params) - if len(row) > 0: - event_value = row[0]["value"] - else: - print(f"no {event_table} most used value") - return {"users": [], - "filters": [{"type": "EVENT_TYPE", "value": event_type}, {"type": "EVENT_VALUE", "value": ""}]} - - extra_values["value"] = event_value - if len(meta_condition) == 0: - ch_sub_query.append("user_id IS NOT NULL") - ch_sub_query.append("not empty(sessions_metadata.user_id)") - ch_sub_query.append("sessions_metadata.project_id = %(project_id)s") - ch_sub_query.append("sessions_metadata.datetime >= toDateTime(%(startTimestamp)s/1000)") - ch_sub_query.append("sessions_metadata.datetime < toDateTime(%(endTimestamp)s/1000)") - ch_sub_query.append(f"feature.{event_column} = %(value)s") - ch_query = f"""SELECT user_id, COUNT(DISTINCT session_id) AS count - FROM {event_table} AS feature INNER JOIN sessions_metadata USING (session_id) - WHERE {" AND ".join(ch_sub_query)} - GROUP BY user_id - ORDER BY count DESC - LIMIT 10;""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - # print(ch_query % params) - rows = ch.execute(ch_query, params) - return {"users": helper.list_to_camel_case(rows), - "filters": [{"type": "EVENT_TYPE", "value": event_type}, {"type": "EVENT_VALUE", "value": event_value}]} - - -@dev.timed -def feature_adoption_daily_usage(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), - filters=[], **args): - event_type = "CLICK" - event_value = '/' - extra_values = {} - default = True - meta_condition = [] - for f in filters: - if f["type"] == "EVENT_TYPE" and JOURNEY_TYPES.get(f["value"]): - event_type = f["value"] - elif f["type"] == "EVENT_VALUE": - event_value = f["value"] - default = False - elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]: - meta_condition.append(f"sessions_metadata.user_id = %(user_id)s") - meta_condition.append("sessions_metadata.project_id = %(project_id)s") - meta_condition.append("sessions_metadata.datetime >= toDateTime(%(startTimestamp)s/1000)") - meta_condition.append("sessions_metadata.datetime < toDateTime(%(endTimestamp)s/1000)") - - extra_values["user_id"] = f["value"] - event_table = JOURNEY_TYPES[event_type]["table"] - event_column = JOURNEY_TYPES[event_type]["column"] - ch_sub_query = __get_basic_constraints(table_name="feature", data=args) - meta_condition += __get_meta_constraint(args) - ch_sub_query += meta_condition - with ch_client.ClickHouseClient() as ch: - if default: - # get most used value - ch_query = f"""SELECT {event_column} AS value, COUNT(*) AS count - FROM {event_table} AS feature {"INNER JOIN sessions_metadata USING (session_id)" if len(meta_condition) > 0 else ""} - WHERE {" AND ".join(ch_sub_query)} - AND length({event_column}) > 2 - GROUP BY value - ORDER BY count DESC - LIMIT 1;""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - # print(ch_query % params) - row = ch.execute(ch_query, params) - if len(row) > 0: - event_value = row[0]["value"] - else: - print(f"no {event_table} most used value") - return { - "startTimestamp": startTimestamp, - "filters": [{"type": "EVENT_TYPE", "value": event_type}, - {"type": "EVENT_VALUE", "value": ""}], - "chart": __complete_acquisition(rows=[], start_date=startTimestamp, end_date=TimeUTC.now()) - } - extra_values["value"] = event_value - ch_sub_query.append(f"feature.{event_column} = %(value)s") - ch_query = f"""SELECT toUnixTimestamp(day)*1000 AS timestamp, count - FROM (SELECT toStartOfDay(feature.datetime) AS day, COUNT(DISTINCT session_id) AS count - FROM {event_table} AS feature {"INNER JOIN sessions_metadata USING (session_id)" if len(meta_condition) > 0 else ""} - WHERE {" AND ".join(ch_sub_query)} - GROUP BY day - ORDER BY day) AS raw_results;""" - params = {"step_size": TimeUTC.MS_DAY, "project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - # print(ch_query % params) - rows = ch.execute(ch_query, params) - return {"chart": __complete_missing_steps(rows=rows, start_time=startTimestamp, end_time=endTimestamp, - density=(endTimestamp - startTimestamp) // TimeUTC.MS_DAY, - neutral={"count": 0}), - "filters": [{"type": "EVENT_TYPE", "value": event_type}, {"type": "EVENT_VALUE", "value": event_value}]} - - -@dev.timed -def feature_intensity(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), filters=[], - **args): - event_table = JOURNEY_TYPES["CLICK"]["table"] - event_column = JOURNEY_TYPES["CLICK"]["column"] - extra_values = {} - meta_condition = [] - for f in filters: - if f["type"] == "EVENT_TYPE" and JOURNEY_TYPES.get(f["value"]): - event_table = JOURNEY_TYPES[f["value"]]["table"] - event_column = JOURNEY_TYPES[f["value"]]["column"] - elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]: - meta_condition.append(f"sessions_metadata.user_id = %(user_id)s") - meta_condition.append("sessions_metadata.project_id = %(project_id)s") - meta_condition.append("sessions_metadata.datetime >= toDateTime(%(startTimestamp)s/1000)") - meta_condition.append("sessions_metadata.datetime < toDateTime(%(endTimestamp)s/1000)") - extra_values["user_id"] = f["value"] - ch_sub_query = __get_basic_constraints(table_name="feature", data=args) - meta_condition += __get_meta_constraint(args) - ch_sub_query += meta_condition - with ch_client.ClickHouseClient() as ch: - ch_query = f"""SELECT {event_column} AS value, AVG(DISTINCT session_id) AS avg - FROM {event_table} AS feature - {"INNER JOIN sessions_metadata USING (session_id)" if len(meta_condition) > 0 else ""} - WHERE {" AND ".join(ch_sub_query)} - GROUP BY value - ORDER BY avg DESC - LIMIT 7;""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - # print(ch_query % params) - rows = ch.execute(ch_query, params) - - return rows - - -PERIOD_TO_FUNCTION = { - "DAY": "toStartOfDay", - "WEEK": "toStartOfWeek" -} - - -@dev.timed -def users_active(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), filters=[], - **args): - meta_condition = __get_meta_constraint(args) - period = "DAY" - extra_values = {} - for f in filters: - if f["type"] == "PERIOD" and f["value"] in ["DAY", "WEEK"]: - period = f["value"] - elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]: - meta_condition.append(f"sessions_metadata.user_id = %(user_id)s") - extra_values["user_id"] = f["value"] - period_function = PERIOD_TO_FUNCTION[period] - ch_sub_query = __get_basic_constraints(table_name="sessions_metadata", data=args) - ch_sub_query += meta_condition - ch_sub_query.append("sessions_metadata.user_id IS NOT NULL") - ch_sub_query.append("not empty(sessions_metadata.user_id)") - with ch_client.ClickHouseClient() as ch: - ch_query = f"""SELECT SUM(count) / intDiv(%(endTimestamp)s - %(startTimestamp)s, %(step_size)s) AS avg - FROM (SELECT {period_function}(sessions_metadata.datetime) AS period, count(DISTINCT user_id) AS count - FROM sessions_metadata - WHERE {" AND ".join(ch_sub_query)} - GROUP BY period) AS daily_users;""" - params = {"step_size": TimeUTC.MS_DAY if period == "DAY" else TimeUTC.MS_WEEK, - "project_id": project_id, - "startTimestamp": TimeUTC.trunc_day(startTimestamp) if period == "DAY" else TimeUTC.trunc_week( - startTimestamp), "endTimestamp": endTimestamp, **__get_constraint_values(args), - **extra_values} - # print(ch_query % params) - # print("---------------------") - avg = ch.execute(ch_query, params) - if len(avg) == 0 or avg[0]["avg"] == 0: - return {"avg": 0, "chart": []} - avg = avg[0]["avg"] - # TODO: optimize this when DB structure changes, optimization from 3s to 1s - ch_query = f"""SELECT toUnixTimestamp(toDateTime(period))*1000 AS timestamp, count - FROM (SELECT {period_function}(sessions_metadata.datetime) AS period, count(DISTINCT user_id) AS count - FROM sessions_metadata - WHERE {" AND ".join(ch_sub_query)} - GROUP BY period - ORDER BY period) AS raw_results;""" - # print(ch_query % params) - # print("---------------------") - rows = ch.execute(ch_query, params) - return {"avg": avg, "chart": rows} - - -@dev.timed -def users_power(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), filters=[], **args): - ch_sub_query = __get_basic_constraints(table_name="sessions_metadata", data=args) - meta_condition = __get_meta_constraint(args) - ch_sub_query += meta_condition - ch_sub_query.append("sessions_metadata.user_id IS NOT NULL") - ch_sub_query.append("not empty(sessions_metadata.user_id)") - - with ch_client.ClickHouseClient() as ch: - ch_query = f"""SELECT ifNotFinite(AVG(count),0) AS avg - FROM(SELECT COUNT(user_id) AS count - FROM (SELECT user_id, COUNT(DISTINCT toStartOfDay(datetime)) AS number_of_days - FROM sessions_metadata - WHERE {" AND ".join(ch_sub_query)} - GROUP BY user_id) AS users_connexions - GROUP BY number_of_days - ORDER BY number_of_days) AS results;""" - params = {"project_id": project_id, - "startTimestamp": startTimestamp, "endTimestamp": endTimestamp, **__get_constraint_values(args)} - # print(ch_query % params) - # print("---------------------") - avg = ch.execute(ch_query, params) - if len(avg) == 0 or avg[0]["avg"] == 0: - return {"avg": 0, "partition": []} - avg = avg[0]["avg"] - ch_query = f"""SELECT number_of_days, COUNT(user_id) AS count - FROM (SELECT user_id, COUNT(DISTINCT toStartOfDay(datetime)) AS number_of_days - FROM sessions_metadata - WHERE {" AND ".join(ch_sub_query)} - GROUP BY user_id) AS users_connexions - GROUP BY number_of_days - ORDER BY number_of_days;""" - - # print(ch_query % params) - # print("---------------------") - rows = ch.execute(ch_query, params) - - return {"avg": avg, "partition": helper.list_to_camel_case(rows)} - - -@dev.timed -def users_slipping(project_id, startTimestamp=TimeUTC.now(delta_days=-70), endTimestamp=TimeUTC.now(), filters=[], - **args): - ch_sub_query = __get_basic_constraints(table_name="feature", data=args) - event_type = "PAGES" - event_value = "/" - extra_values = {} - default = True - meta_condition = [] - for f in filters: - if f["type"] == "EVENT_TYPE" and JOURNEY_TYPES.get(f["value"]): - event_type = f["value"] - elif f["type"] == "EVENT_VALUE": - event_value = f["value"] - default = False - elif f["type"] in [sessions_metas.meta_type.USERID, sessions_metas.meta_type.USERID_IOS]: - meta_condition.append(f"sessions_metadata.user_id = %(user_id)s") - meta_condition.append("sessions_metadata.project_id = %(project_id)s") - meta_condition.append("sessions_metadata.datetime >= toDateTime(%(startTimestamp)s/1000)") - meta_condition.append("sessions_metadata.datetime < toDateTime(%(endTimestamp)s/1000)") - extra_values["user_id"] = f["value"] - event_table = JOURNEY_TYPES[event_type]["table"] - event_column = JOURNEY_TYPES[event_type]["column"] - - meta_condition += __get_meta_constraint(args) - ch_sub_query += meta_condition - with ch_client.ClickHouseClient() as ch: - if default: - # get most used value - ch_query = f"""SELECT {event_column} AS value, COUNT(*) AS count - FROM {event_table} AS feature - {"INNER JOIN sessions_metadata USING (session_id)" if len(meta_condition) > 0 else ""} - WHERE {" AND ".join(ch_sub_query)} - GROUP BY value - ORDER BY count DESC - LIMIT 1;""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - print(ch_query % params) - row = ch.execute(ch_query, params) - if len(row) > 0: - event_value = row[0]["value"] - else: - print(f"no {event_table} most used value") - return { - "startTimestamp": startTimestamp, - "filters": [{"type": "EVENT_TYPE", "value": event_type}, - {"type": "EVENT_VALUE", "value": ""}], - "list": [] - } - extra_values["value"] = event_value - if len(meta_condition) == 0: - ch_sub_query.append("sessions_metadata.user_id IS NOT NULL") - ch_sub_query.append("not empty(sessions_metadata.user_id)") - ch_sub_query.append("sessions_metadata.project_id = %(project_id)s") - ch_sub_query.append("sessions_metadata.datetime >= toDateTime(%(startTimestamp)s/1000)") - ch_sub_query.append("sessions_metadata.datetime < toDateTime(%(endTimestamp)s/1000)") - ch_sub_query.append(f"feature.{event_column} = %(value)s") - ch_query = f"""SELECT user_id, - toUnixTimestamp(last_time)*1000 AS last_time, - interactions_count, - toUnixTimestamp(first_seen) * 1000 AS first_seen, - toUnixTimestamp(last_seen) * 1000 AS last_seen - FROM (SELECT user_id, last_time, interactions_count, MIN(datetime) AS first_seen, MAX(datetime) AS last_seen - FROM (SELECT user_id, MAX(datetime) AS last_time, COUNT(DISTINCT session_id) AS interactions_count - FROM {event_table} AS feature INNER JOIN sessions_metadata USING (session_id) - WHERE {" AND ".join(ch_sub_query)} - GROUP BY user_id ) AS user_last_usage INNER JOIN sessions_metadata USING (user_id) - WHERE now() - last_time > 7 - GROUP BY user_id, last_time, interactions_count - ORDER BY interactions_count DESC, last_time DESC - LIMIT 50) AS raw_results;""" - params = {"project_id": project_id, "startTimestamp": startTimestamp, - "endTimestamp": endTimestamp, **__get_constraint_values(args), **extra_values} - print(ch_query % params) - rows = ch.execute(ch_query, params) - return { - "startTimestamp": startTimestamp, - "filters": [{"type": "EVENT_TYPE", "value": event_type}, {"type": "EVENT_VALUE", "value": event_value}], - "list": helper.list_to_camel_case(rows) - } - - -@dev.timed -def search(text, feature_type, project_id, platform=None): - if not feature_type: - resource_type = "ALL" - data = search(text=text, feature_type=resource_type, project_id=project_id, platform=platform) - return data - args = {} if platform is None else {"platform": platform} - ch_sub_query = __get_basic_constraints(table_name="feature", data=args) - meta_condition = __get_meta_constraint(args) - ch_sub_query += meta_condition - params = {"startTimestamp": TimeUTC.now() - 1 * TimeUTC.MS_MONTH, - "endTimestamp": TimeUTC.now(), - "project_id": project_id, - "value": text.lower(), - "platform_0": platform} - if feature_type == "ALL": - with ch_client.ClickHouseClient() as ch: - sub_queries = [] - for e in JOURNEY_TYPES: - sub_queries.append(f"""(SELECT DISTINCT {JOURNEY_TYPES[e]["column"]} AS value, '{e}' AS "type" - FROM {JOURNEY_TYPES[e]["table"]} AS feature - WHERE {" AND ".join(ch_sub_query)} AND positionUTF8({JOURNEY_TYPES[e]["column"]},%(value)s)!=0 - LIMIT 10)""") - ch_query = "UNION ALL".join(sub_queries) - print(ch_query % params) - rows = ch.execute(ch_query, params) - elif JOURNEY_TYPES.get(feature_type) is not None: - with ch_client.ClickHouseClient() as ch: - ch_query = f"""SELECT DISTINCT {JOURNEY_TYPES[feature_type]["column"]} AS value, '{feature_type}' AS "type" - FROM {JOURNEY_TYPES[feature_type]["table"]} AS feature - WHERE {" AND ".join(ch_sub_query)} AND positionUTF8({JOURNEY_TYPES[feature_type]["column"]},%(value)s)!=0 - LIMIT 10;""" - print(ch_query % params) - rows = ch.execute(ch_query, params) - else: - return [] - return [helper.dict_to_camel_case(row) for row in rows] diff --git a/ee/scripts/helm/db/init_dbs/postgresql/1.3.6/1.3.6.sql b/ee/scripts/helm/db/init_dbs/postgresql/1.4.0/1.4.0.sql similarity index 100% rename from ee/scripts/helm/db/init_dbs/postgresql/1.3.6/1.3.6.sql rename to ee/scripts/helm/db/init_dbs/postgresql/1.4.0/1.4.0.sql diff --git a/scripts/helm/db/init_dbs/postgresql/1.3.6/1.3.6.sql b/scripts/helm/db/init_dbs/postgresql/1.4.0/1.4.0.sql similarity index 100% rename from scripts/helm/db/init_dbs/postgresql/1.3.6/1.3.6.sql rename to scripts/helm/db/init_dbs/postgresql/1.4.0/1.4.0.sql