diff --git a/ee/recommendation/ml_service/core/feedback.py b/ee/recommendation/core/feedback.py similarity index 100% rename from ee/recommendation/ml_service/core/feedback.py rename to ee/recommendation/core/feedback.py diff --git a/ee/recommendation/core/recommendation_model.py b/ee/recommendation/core/recommendation_model.py index 823fd5093..2e7805222 100644 --- a/ee/recommendation/core/recommendation_model.py +++ b/ee/recommendation/core/recommendation_model.py @@ -57,6 +57,17 @@ def preprocess(X): return x, transform +class RecommendationSystem(mlflow.pyfunc.PythonModel): + def __init__(self): + ... + + def fit(self, X, y): + ... + + def predict(self, X): + return None + + class SVM_recommendation(mlflow.pyfunc.PythonModel): def __init__(self, test=False, **params): diff --git a/ee/recommendation/ml_service/requirements.txt b/ee/recommendation/ml_service/requirements.txt index 0eb819bf7..d61e768be 100644 --- a/ee/recommendation/ml_service/requirements.txt +++ b/ee/recommendation/ml_service/requirements.txt @@ -1,4 +1,3 @@ -fastapi==0.95.2 -apscheduler==3.10.1 -uvicorn==0.22.0 -SQLAlchemy==2.0.15 +fastapi==0.110.0 +apscheduler==3.10.4 +uvicorn==0.27.1 diff --git a/ee/recommendation/ml_trainer/airflow/dags/db_update.py b/ee/recommendation/ml_trainer/airflow/dags/db_update.py index 095fcf2f1..56385746e 100644 --- a/ee/recommendation/ml_trainer/airflow/dags/db_update.py +++ b/ee/recommendation/ml_trainer/airflow/dags/db_update.py @@ -11,10 +11,13 @@ from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator, ShortCircuitOperator from datetime import datetime, timedelta from decouple import config +import numpy as np _work_dir = os.getcwd() sys.path.insert(1, _work_dir) from utils import pg_client -from utils.feedback import ConnectionHandler +from utils import ch_client +from core.feedback import ConnectionHandler +from copy import copy from sqlalchemy import text @@ -27,91 +30,108 @@ dbname = config('pg_dbname_ml') password = config('pg_password_ml') tracking_uri = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}" - +# 1702296756 def get_today_feedback(): - connection_handler = ConnectionHandler(tracking_uri) + current_datetime = int((datetime.now()-timedelta(seconds=execute_interval)).timestamp()) + query = f"SELECT project_id, session_id, user_id as viewer_id, payload FROM recommendation_feedback WHERE insertion_time >= {current_datetime}" + + connection_handler = ConnectionHandler(tracking_uri) # Connection to mlflow's database with connection_handler.get_live_session() as conn: - cur = conn.connection().connection.cursor() - query = cur.mogrify( - f"""SELECT * FROM recommendation_feedback WHERE insertion_time > %(time_lower_bound)s;""", - {'time_lower_bound': int(datetime.now().timestamp()) - execute_interval}) - conn.execute(text(query.decode("utf-8"))) + cur = conn.execute(text(query)) + res = cur.fetchall() conn.commit() + for i in range(len(res)): + payload_i = res[i][3] + res[i] = res[i][:3] + (payload_i['reason'], payload_i['comment'], payload_i['interesting']) -def get_features_pg(ti): - os.environ['PG_POOL'] = 'true' - asyncio.run(pg_client.init()) - sessionIds = ti.xcom_pull(key='sessionIds') - userIds = ti.xcom_pull(key='userIds').split(',') + df = pd.DataFrame(res, columns=["project_id", "session_id", "viewer_id", "reason", "comment", "interesting"]) + sessionsIds_list = df['session_id'].unique() + sessionIds = ','.join([str(k) for k in sessionsIds_list]) + with ch_client.ClickHouseClient() as conn: + query = f"""SELECT session_id, issue_type, count(1) as event_count FROM experimental.events WHERE session_id in ({sessionIds}) AND event_type = 'ISSUE' GROUP BY session_id, issue_type;""" + res = conn.execute(query) + + df3 = pd.DataFrame(res) + df3 = df3.pivot(index='session_id', columns=['issue_type'], values=['event_count']).event_count + + issues_type_found = df3.columns + df[issues_type_found] = [[0] * len(issues_type_found)] * len(df) + for sess in df3.index: + tmp = copy(df[df['session_id'] == sess]) + tmp[issues_type_found] = [df3.loc[sess]] * len(tmp) + df.loc[df['session_id'] == sess] = tmp + + asyncio.run(pg_client.init()) # Connection to OR postgres database with pg_client.PostgresClient() as conn: - conn.execute( - """SELECT T.project_id, - T.session_id, - T2.viewer_id, - T.pages_count, - T.events_count, - T.errors_count, - T.duration, - T.country, - T.issue_score, - T.device_type, - T2.replays, - T2.network_access, - T2.storage_access, - T2.console_access, - T2.stack_access - FROM (SELECT project_id, - user_id as viewer_id, - session_id, - count(CASE WHEN source = 'replay' THEN 1 END) as replays, - count(CASE WHEN source = 'network' THEN 1 END) as network_access, - count(CASE WHEN source = 'storage' THEN 1 END) as storage_access, - count(CASE WHEN source = 'console' THEN 1 END) as console_access, - count(CASE WHEN source = 'stack_events' THEN 1 END) as stack_access - FROM frontend_signals - WHERE session_id IN ({sessionIds}) - GROUP BY project_id, viewer_id, session_id) as T2 - INNER JOIN (SELECT project_id, - session_id, - user_id, - pages_count, - events_count, - errors_count, - duration, - user_country as country, - issue_score, - user_device_type as device_type - FROM sessions - WHERE session_id IN ({sessionIds}) - AND duration IS NOT NULL) as T - USING (session_id);""".format(sessionIds=sessionIds) - ) - response = conn.fetchall() - sessionIds = [int(sessId) for sessId in sessionIds.split(',')] - df = pd.DataFrame(response) - df2 = pd.DataFrame(zip(userIds, sessionIds), columns=['viewer_id', 'session_id']) + conn.execute("""SELECT T.project_id, + T.session_id, + T2.viewer_id, + T.pages_count, + T.events_count, + T.errors_count, + T.duration, + T.country, + T.issue_score, + T.device_type, + T2.replays, + T2.network_access, + T2.storage_access, + T2.console_access, + T2.stack_access + FROM (SELECT project_id, + user_id as viewer_id, + session_id, + count(CASE WHEN source = 'replay' THEN 1 END) as replays, + count(CASE WHEN source = 'network' THEN 1 END) as network_access, + count(CASE WHEN source = 'storage' THEN 1 END) as storage_access, + count(CASE WHEN source = 'console' THEN 1 END) as console_access, + count(CASE WHEN source = 'stack_events' THEN 1 END) as stack_access + FROM frontend_signals + WHERE session_id IN ({sessionIds}) + GROUP BY project_id, viewer_id, session_id) as T2 + INNER JOIN (SELECT project_id, + session_id, + user_id, + pages_count, + events_count, + errors_count, + duration, + user_country as country, + issue_score, + user_device_type as device_type + FROM sessions + WHERE session_id IN ({sessionIds}) + AND duration IS NOT NULL) as T + USING (session_id);""".format(sessionIds=sessionIds) + ) + res = conn.fetchall() + df2 = pd.DataFrame(res, + columns=["project_id", "session_id", "viewer_id", "pages_count", "events_count", "errors_count", + "duration", "country", "issue_score", "device_type", "replays", "network_access", + "storage_access", "console_access", "stack_access"]) - base_query = f"""INSERT INTO {features_table_name} (project_id, session_id, viewer_id, pages_count, events_count, - issues_count, duration, country, issue_score, device_type, - replays, network_access, storage_access, console_access, - stack_access) VALUES """ - count = 0 + df2 = df.merge(df2, on=['session_id', 'project_id', 'viewer_id'], how='inner') + for i in range(len(df2.columns)): + if df2.dtypes[i] == np.float64: + df2[df2.columns[i]] = df2[df2.columns[i]].astype('int') + df2.fillna(0, inplace=True) + + ## Upload df2 to DB table + + base_query = f"""INSERT INTO {features_table_name} ({', '.join(df2.columns)}) VALUES """ params = {} - for i in range(len(df)): - viewer = df['viewer_id'].iloc[i] - session = df['session_id'].iloc[i] - d = df2[df2['viewer_id'] == viewer] - x = d[d['session_id'] == session] - if len(x) > 0: - template = '(' - for k, v in x.items(): - params[f'{k}_{count}'] = v.values[0] - template += f's({k}_{count})%' - base_query += template + '), ' - count += 1 + for i in range(len(df2)): + template = '(' + for k, v in df2.iloc[i].items(): + try: + params[f'{k}_{i}'] = v.item() + except Exception: + params[f'{k}_{i}'] = v + template += f'%({k}_{i})s, ' + base_query += template[:-2] + '), ' base_query = base_query[:-2] connection_handler = ConnectionHandler(tracking_uri) with connection_handler.get_live_session() as conn: @@ -121,6 +141,10 @@ def get_features_pg(ti): conn.commit() +def get_features_pg(): + ... + + dag = DAG( "Feedback_DB_FILL", default_args={ diff --git a/ee/recommendation/ml_trainer/entrypoint.sh b/ee/recommendation/ml_trainer/entrypoint.sh index af93b034b..cfc96e85e 100755 --- a/ee/recommendation/ml_trainer/entrypoint.sh +++ b/ee/recommendation/ml_trainer/entrypoint.sh @@ -7,6 +7,7 @@ find airflow/ -type f -name "*.cfg" -exec sed -i "s/{{pg_dbname_airflow}}/${pg_d find airflow/ -type f -name "*.cfg" -exec sed -i "s#{{airflow_secret_key}}#${airflow_secret_key}#g" {} \; export MLFLOW_TRACKING_URI=postgresql+psycopg2://${pg_user_ml}:${pg_password_ml}@${pg_host_ml}:${pg_port_ml}/${pg_dbname_ml} git init airflow/dags +airflow db upgrade # Airflow setup # airflow db init # airflow users create \ diff --git a/ee/recommendation/ml_trainer/requirements.txt b/ee/recommendation/ml_trainer/requirements.txt index 7efa93d01..fd6bbaaea 100644 --- a/ee/recommendation/ml_trainer/requirements.txt +++ b/ee/recommendation/ml_trainer/requirements.txt @@ -1,3 +1,3 @@ -argcomplete==3.0.8 -apache-airflow==2.6.2 -airflow-code-editor==7.2.1 +argcomplete==3.2.2 +apache-airflow==2.8.2 +airflow-code-editor==7.5.0 diff --git a/ee/recommendation/requirements_base.txt b/ee/recommendation/requirements_base.txt index e378cc7f0..987c069c2 100644 --- a/ee/recommendation/requirements_base.txt +++ b/ee/recommendation/requirements_base.txt @@ -1,19 +1,19 @@ requests==2.31.0 -urllib3==1.26.16 +urllib3==2.0.7 pyjwt==2.8.0 -SQLAlchemy==2.0.20 -alembic==1.11.1 -psycopg2-binary==2.9.7 +SQLAlchemy==2.0.28 +alembic==1.13.1 +psycopg2-binary==2.9.9 joblib==1.3.2 -scipy==1.11.2 -scikit-learn==1.3.0 -mlflow==2.5 +scipy==1.12.0 +scikit-learn==1.4.1.post1 +mlflow==2.11.1 -clickhouse-driver==0.2.6 -python3-saml==1.15.0 +clickhouse-driver==0.2.7 +python3-saml==1.16.0 python-multipart==0.0.6 python-decouple==3.8 -pydantic==1.10.12 +pydantic==2.6.3 -boto3==1.28.29 +boto3==1.34.57 diff --git a/ee/recommendation/utils/ch_client.py b/ee/recommendation/utils/ch_client.py index 514820212..1633dbed2 100644 --- a/ee/recommendation/utils/ch_client.py +++ b/ee/recommendation/utils/ch_client.py @@ -22,6 +22,8 @@ class ClickHouseClient: self.__client = clickhouse_driver.Client(host=config("ch_host"), database="default", port=config("ch_port", cast=int), + user=config("ch_user", cast=str), + password=config("ch_password", cast=str), settings=settings) \ if self.__client is None else self.__client