From 490ac3c2ede15ae822b691c72ae2a6f821e8cb7f Mon Sep 17 00:00:00 2001 From: MauricioGarciaS <47052044+MauricioGarciaS@users.noreply.github.com> Date: Tue, 29 Aug 2023 09:58:13 +0200 Subject: [PATCH] Updated dag for updating database with feedbacks, changed feedback file from ml_service/core into common core --- .../{ml_service => }/core/feedback.py | 0 .../ml_trainer/airflow/dags/db_update.py | 147 +++++++++--------- 2 files changed, 76 insertions(+), 71 deletions(-) rename ee/recommendation/{ml_service => }/core/feedback.py (100%) diff --git a/ee/recommendation/ml_service/core/feedback.py b/ee/recommendation/core/feedback.py similarity index 100% rename from ee/recommendation/ml_service/core/feedback.py rename to ee/recommendation/core/feedback.py diff --git a/ee/recommendation/ml_trainer/airflow/dags/db_update.py b/ee/recommendation/ml_trainer/airflow/dags/db_update.py index 095fcf2f1..3d598a0e7 100644 --- a/ee/recommendation/ml_trainer/airflow/dags/db_update.py +++ b/ee/recommendation/ml_trainer/airflow/dags/db_update.py @@ -14,7 +14,8 @@ from decouple import config _work_dir = os.getcwd() sys.path.insert(1, _work_dir) from utils import pg_client -from utils.feedback import ConnectionHandler +from utils import ch_client +from core.feedback import ConnectionHandler from sqlalchemy import text @@ -34,84 +35,84 @@ def get_today_feedback(): with connection_handler.get_live_session() as conn: cur = conn.connection().connection.cursor() query = cur.mogrify( - f"""SELECT * FROM recommendation_feedback WHERE insertion_time > %(time_lower_bound)s;""", + f"""SELECT project_id, session_id, user_id as viewer_id, payload FROM recommendation_feedback WHERE insertion_time > %(time_lower_bound)s;""", {'time_lower_bound': int(datetime.now().timestamp()) - execute_interval}) - conn.execute(text(query.decode("utf-8"))) + cur = conn.execute(text(query.decode("utf-8"))) + res = cur.fetchall() conn.commit() + for i in range(len(res)): + payload_i = res[i][3] + res[i] = res[i][:3] + (payload_i['reason'], payload_i['comment'], payload_i['interesting']) -def get_features_pg(ti): - os.environ['PG_POOL'] = 'true' - asyncio.run(pg_client.init()) - sessionIds = ti.xcom_pull(key='sessionIds') - userIds = ti.xcom_pull(key='userIds').split(',') + df = pd.DataFrame(res, columns=["project_id", "session_id", "viewer_id", "reason", "comment", "interesting"]) + sessionIds = ','.join([str(k) for k in df['session_id'].unique()]) + + asyncio.run(pg_client.init()) # Connection to OR postgres database with pg_client.PostgresClient() as conn: - conn.execute( - """SELECT T.project_id, - T.session_id, - T2.viewer_id, - T.pages_count, - T.events_count, - T.errors_count, - T.duration, - T.country, - T.issue_score, - T.device_type, - T2.replays, - T2.network_access, - T2.storage_access, - T2.console_access, - T2.stack_access - FROM (SELECT project_id, - user_id as viewer_id, - session_id, - count(CASE WHEN source = 'replay' THEN 1 END) as replays, - count(CASE WHEN source = 'network' THEN 1 END) as network_access, - count(CASE WHEN source = 'storage' THEN 1 END) as storage_access, - count(CASE WHEN source = 'console' THEN 1 END) as console_access, - count(CASE WHEN source = 'stack_events' THEN 1 END) as stack_access - FROM frontend_signals - WHERE session_id IN ({sessionIds}) - GROUP BY project_id, viewer_id, session_id) as T2 - INNER JOIN (SELECT project_id, - session_id, - user_id, - pages_count, - events_count, - errors_count, - duration, - user_country as country, - issue_score, - user_device_type as device_type - FROM sessions - WHERE session_id IN ({sessionIds}) - AND duration IS NOT NULL) as T - USING (session_id);""".format(sessionIds=sessionIds) - ) - response = conn.fetchall() - sessionIds = [int(sessId) for sessId in sessionIds.split(',')] - df = pd.DataFrame(response) - df2 = pd.DataFrame(zip(userIds, sessionIds), columns=['viewer_id', 'session_id']) + conn.execute("""SELECT T.project_id, + T.session_id, + T2.viewer_id, + T.pages_count, + T.events_count, + T.errors_count, + T.duration, + T.country, + T.issue_score, + T.device_type, + T2.replays, + T2.network_access, + T2.storage_access, + T2.console_access, + T2.stack_access + FROM (SELECT project_id, + user_id as viewer_id, + session_id, + count(CASE WHEN source = 'replay' THEN 1 END) as replays, + count(CASE WHEN source = 'network' THEN 1 END) as network_access, + count(CASE WHEN source = 'storage' THEN 1 END) as storage_access, + count(CASE WHEN source = 'console' THEN 1 END) as console_access, + count(CASE WHEN source = 'stack_events' THEN 1 END) as stack_access + FROM frontend_signals + WHERE session_id IN ({sessionIds}) + GROUP BY project_id, viewer_id, session_id) as T2 + INNER JOIN (SELECT project_id, + session_id, + user_id, + pages_count, + events_count, + errors_count, + duration, + user_country as country, + issue_score, + user_device_type as device_type + FROM sessions + WHERE session_id IN ({sessionIds}) + AND duration IS NOT NULL) as T + USING (session_id);""".format(sessionIds=sessionIds) + ) + res = conn.fetchall() + df2 = pd.DataFrame(res, + columns=["project_id", "session_id", "viewer_id", "pages_count", "events_count", "errors_count", + "duration", "country", "issue_score", "device_type", "replays", "network_access", + "storage_access", "console_access", "stack_acces"]) - base_query = f"""INSERT INTO {features_table_name} (project_id, session_id, viewer_id, pages_count, events_count, - issues_count, duration, country, issue_score, device_type, - replays, network_access, storage_access, console_access, - stack_access) VALUES """ - count = 0 + df2 = df.merge(df2, on=['session_id', 'project_id', 'viewer_id'], how='inner') + + ## Upload df2 to DB table + + base_query = f"""INSERT INTO {features_table_name} (project_id, session_id, viewer_id, reason, comment, interesting, pages_count, events_count, + issues_count, duration, country, issue_score, device_type, + replays, network_access, storage_access, console_access, + stack_access) VALUES """ params = {} - for i in range(len(df)): - viewer = df['viewer_id'].iloc[i] - session = df['session_id'].iloc[i] - d = df2[df2['viewer_id'] == viewer] - x = d[d['session_id'] == session] - if len(x) > 0: - template = '(' - for k, v in x.items(): - params[f'{k}_{count}'] = v.values[0] - template += f's({k}_{count})%' - base_query += template + '), ' - count += 1 + for i in range(len(df2)): + template = '(' + for k, v in df2.iloc[i].items(): + params[f'{k}_{i}'] = v.values[0] + template += f's({k}_{i})%' + base_query += template + '), ' base_query = base_query[:-2] connection_handler = ConnectionHandler(tracking_uri) with connection_handler.get_live_session() as conn: @@ -121,6 +122,10 @@ def get_features_pg(ti): conn.commit() +def get_features_pg(): + ... + + dag = DAG( "Feedback_DB_FILL", default_args={