From e4449c74bd14f892c2833288f48555249c698faa Mon Sep 17 00:00:00 2001 From: MauricioGarciaS <47052044+MauricioGarciaS@users.noreply.github.com> Date: Fri, 1 Sep 2023 14:47:49 +0200 Subject: [PATCH] fix(recommendations): fixed database update and added more features into DB --- .../ml_trainer/airflow/dags/db_update.py | 48 +++++++++++++------ 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/ee/recommendation/ml_trainer/airflow/dags/db_update.py b/ee/recommendation/ml_trainer/airflow/dags/db_update.py index 3d598a0e7..53f1b868b 100644 --- a/ee/recommendation/ml_trainer/airflow/dags/db_update.py +++ b/ee/recommendation/ml_trainer/airflow/dags/db_update.py @@ -11,11 +11,13 @@ from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator, ShortCircuitOperator from datetime import datetime, timedelta from decouple import config +import numpy as np _work_dir = os.getcwd() sys.path.insert(1, _work_dir) from utils import pg_client from utils import ch_client from core.feedback import ConnectionHandler +from copy import copy from sqlalchemy import text @@ -30,14 +32,12 @@ tracking_uri = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}" def get_today_feedback(): - connection_handler = ConnectionHandler(tracking_uri) + query = "SELECT project_id, session_id, user_id as viewer_id, payload FROM recommendation_feedback" + + connection_handler = ConnectionHandler(tracking_uri) # Connection to mlflow's database with connection_handler.get_live_session() as conn: - cur = conn.connection().connection.cursor() - query = cur.mogrify( - f"""SELECT project_id, session_id, user_id as viewer_id, payload FROM recommendation_feedback WHERE insertion_time > %(time_lower_bound)s;""", - {'time_lower_bound': int(datetime.now().timestamp()) - execute_interval}) - cur = conn.execute(text(query.decode("utf-8"))) + cur = conn.execute(text(query)) res = cur.fetchall() conn.commit() @@ -47,7 +47,21 @@ def get_today_feedback(): df = pd.DataFrame(res, columns=["project_id", "session_id", "viewer_id", "reason", "comment", "interesting"]) - sessionIds = ','.join([str(k) for k in df['session_id'].unique()]) + sessionsIds_list = df['session_id'].unique() + sessionIds = ','.join([str(k) for k in sessionsIds_list]) + with ch_client.ClickHouseClient() as conn: + query = f"""SELECT session_id, issue_type, count(1) as event_count FROM experimental.events WHERE session_id in ({sessionIds}) AND event_type = 'ISSUE' GROUP BY session_id, issue_type;""" + res = conn.execute(query) + + df3 = pd.DataFrame(res) + df3 = df3.pivot(index='session_id', columns=['issue_type'], values=['event_count']).event_count + + issues_type_found = df3.columns + df[issues_type_found] = [[0] * len(issues_type_found)] * len(df) + for sess in df3.index: + tmp = copy(df[df['session_id'] == sess]) + tmp[issues_type_found] = [df3.loc[sess]] * len(tmp) + df.loc[df['session_id'] == sess] = tmp asyncio.run(pg_client.init()) # Connection to OR postgres database with pg_client.PostgresClient() as conn: @@ -96,23 +110,27 @@ def get_today_feedback(): df2 = pd.DataFrame(res, columns=["project_id", "session_id", "viewer_id", "pages_count", "events_count", "errors_count", "duration", "country", "issue_score", "device_type", "replays", "network_access", - "storage_access", "console_access", "stack_acces"]) + "storage_access", "console_access", "stack_access"]) df2 = df.merge(df2, on=['session_id', 'project_id', 'viewer_id'], how='inner') + for i in range(len(df2.columns)): + if df2.dtypes[i] == np.float64: + df2[df2.columns[i]] = df2[df2.columns[i]].astype('int') + df2.fillna(0, inplace=True) ## Upload df2 to DB table - base_query = f"""INSERT INTO {features_table_name} (project_id, session_id, viewer_id, reason, comment, interesting, pages_count, events_count, - issues_count, duration, country, issue_score, device_type, - replays, network_access, storage_access, console_access, - stack_access) VALUES """ + base_query = f"""INSERT INTO {features_table_name} ({', '.join(df2.columns)}) VALUES """ params = {} for i in range(len(df2)): template = '(' for k, v in df2.iloc[i].items(): - params[f'{k}_{i}'] = v.values[0] - template += f's({k}_{i})%' - base_query += template + '), ' + try: + params[f'{k}_{i}'] = v.item() + except Exception: + params[f'{k}_{i}'] = v + template += f'%({k}_{i})s, ' + base_query += template[:-2] + '), ' base_query = base_query[:-2] connection_handler = ConnectionHandler(tracking_uri) with connection_handler.get_live_session() as conn: