diff --git a/ee/recommendation/ml_service/core/model_handler.py b/ee/recommendation/ml_service/core/model_handler.py index 45c2470f7..12f1f9c7f 100644 --- a/ee/recommendation/ml_service/core/model_handler.py +++ b/ee/recommendation/ml_service/core/model_handler.py @@ -79,7 +79,7 @@ class ServedModel: query = conn.mogrify( """SELECT project_id, session_id, user_id, %(userId)s as viewer_id, events_count, errors_count, duration, user_country as country, issue_score, user_device_type as device_type FROM sessions - WHERE project_id = %(projectId)s AND session_id NOT IN (SELECT session_id FROM user_viewed_sessions WHERE user_id = %(userId)s) AND duration IS NOT NULL AND start_ts > %(oldest_limit)s LIMIT %(limit)s""", + WHERE project_id = %(projectId)s AND session_id NOT IN (SELECT session_id FROM user_viewed_sessions WHERE user_id = %(userId)s) AND duration > 10000 AND start_ts > %(oldest_limit)s LIMIT %(limit)s""", {'userId': userId, 'projectId': projectId, 'limit': limit, 'oldest_limit': oldest_limit} ) conn.execute(query) diff --git a/ee/recommendation/ml_trainer/airflow/dags/db_update.py b/ee/recommendation/ml_trainer/airflow/dags/db_update.py new file mode 100644 index 000000000..095fcf2f1 --- /dev/null +++ b/ee/recommendation/ml_trainer/airflow/dags/db_update.py @@ -0,0 +1,147 @@ +import asyncio +import hashlib +import mlflow +import os + +import pandas as pd +import pendulum +import sys +from airflow import DAG +from airflow.operators.bash import BashOperator +from airflow.operators.python import PythonOperator, ShortCircuitOperator +from datetime import datetime, timedelta +from decouple import config +_work_dir = os.getcwd() +sys.path.insert(1, _work_dir) +from utils import pg_client +from utils.feedback import ConnectionHandler +from sqlalchemy import text + + +execute_interval = config('EXECUTION_INTERVAL', default=24*60*60, cast=int) +features_table_name = config('FEATURES_TABLE_NAME', default='features_table') +host = config('pg_host_ml') +port = config('pg_port_ml') +user = config('pg_user_ml') +dbname = config('pg_dbname_ml') +password = config('pg_password_ml') +tracking_uri = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}" + + +def get_today_feedback(): + connection_handler = ConnectionHandler(tracking_uri) + + with connection_handler.get_live_session() as conn: + cur = conn.connection().connection.cursor() + query = cur.mogrify( + f"""SELECT * FROM recommendation_feedback WHERE insertion_time > %(time_lower_bound)s;""", + {'time_lower_bound': int(datetime.now().timestamp()) - execute_interval}) + conn.execute(text(query.decode("utf-8"))) + conn.commit() + + +def get_features_pg(ti): + os.environ['PG_POOL'] = 'true' + asyncio.run(pg_client.init()) + sessionIds = ti.xcom_pull(key='sessionIds') + userIds = ti.xcom_pull(key='userIds').split(',') + + with pg_client.PostgresClient() as conn: + conn.execute( + """SELECT T.project_id, + T.session_id, + T2.viewer_id, + T.pages_count, + T.events_count, + T.errors_count, + T.duration, + T.country, + T.issue_score, + T.device_type, + T2.replays, + T2.network_access, + T2.storage_access, + T2.console_access, + T2.stack_access + FROM (SELECT project_id, + user_id as viewer_id, + session_id, + count(CASE WHEN source = 'replay' THEN 1 END) as replays, + count(CASE WHEN source = 'network' THEN 1 END) as network_access, + count(CASE WHEN source = 'storage' THEN 1 END) as storage_access, + count(CASE WHEN source = 'console' THEN 1 END) as console_access, + count(CASE WHEN source = 'stack_events' THEN 1 END) as stack_access + FROM frontend_signals + WHERE session_id IN ({sessionIds}) + GROUP BY project_id, viewer_id, session_id) as T2 + INNER JOIN (SELECT project_id, + session_id, + user_id, + pages_count, + events_count, + errors_count, + duration, + user_country as country, + issue_score, + user_device_type as device_type + FROM sessions + WHERE session_id IN ({sessionIds}) + AND duration IS NOT NULL) as T + USING (session_id);""".format(sessionIds=sessionIds) + ) + response = conn.fetchall() + sessionIds = [int(sessId) for sessId in sessionIds.split(',')] + df = pd.DataFrame(response) + df2 = pd.DataFrame(zip(userIds, sessionIds), columns=['viewer_id', 'session_id']) + + base_query = f"""INSERT INTO {features_table_name} (project_id, session_id, viewer_id, pages_count, events_count, + issues_count, duration, country, issue_score, device_type, + replays, network_access, storage_access, console_access, + stack_access) VALUES """ + count = 0 + params = {} + for i in range(len(df)): + viewer = df['viewer_id'].iloc[i] + session = df['session_id'].iloc[i] + d = df2[df2['viewer_id'] == viewer] + x = d[d['session_id'] == session] + if len(x) > 0: + template = '(' + for k, v in x.items(): + params[f'{k}_{count}'] = v.values[0] + template += f's({k}_{count})%' + base_query += template + '), ' + count += 1 + base_query = base_query[:-2] + connection_handler = ConnectionHandler(tracking_uri) + with connection_handler.get_live_session() as conn: + cur = conn.connection().connection.cursor() + query = cur.mogrify(base_query, params) + conn.execute(text(query.decode("utf-8"))) + conn.commit() + + +dag = DAG( + "Feedback_DB_FILL", + default_args={ + "retries": 1, + "retry_delay": timedelta(minutes=3), + }, + start_date=pendulum.datetime(2015, 12, 1, tz="UTC"), + description="My first test", + schedule=timedelta(seconds=execute_interval), + catchup=False, +) + +with dag: + dag_t_feedback = PythonOperator( + task_id='Get_Feedbacks', + python_callable=get_today_feedback, + ) + + dag_features = PythonOperator( + task_id='Update_DB', + python_callable=get_features_pg, + ) + + dag_t_feedback >> dag_features diff --git a/ee/recommendation/ml_trainer/airflow/dags/training_dag.py b/ee/recommendation/ml_trainer/airflow/dags/training_dag.py index ea81f21cf..e4fd0092d 100644 --- a/ee/recommendation/ml_trainer/airflow/dags/training_dag.py +++ b/ee/recommendation/ml_trainer/airflow/dags/training_dag.py @@ -81,7 +81,7 @@ dag = DAG( }, start_date=pendulum.datetime(2015, 12, 1, tz="UTC"), description="My first test", - schedule=config('crons_train', default='@daily'), + schedule=config('crons_train', default='@weekly'), catchup=False, ) diff --git a/ee/recommendation/ml_trainer/build_dev.sh b/ee/recommendation/ml_trainer/build_dev.sh index c11741677..8c54c336d 100755 --- a/ee/recommendation/ml_trainer/build_dev.sh +++ b/ee/recommendation/ml_trainer/build_dev.sh @@ -1,2 +1,3 @@ -cp ../../api/chalicelib/utils/ch_client.py utils -cp ../../../api/chalicelib/utils/pg_client.py utils +cp ../utils/ch_client.py utils +cp ../utils/pg_client.py utils +cp ../ml_service/core/feedback.py utils diff --git a/ee/recommendation/ml_trainer/entrypoint.sh b/ee/recommendation/ml_trainer/entrypoint.sh index 68e119897..af93b034b 100755 --- a/ee/recommendation/ml_trainer/entrypoint.sh +++ b/ee/recommendation/ml_trainer/entrypoint.sh @@ -8,13 +8,13 @@ find airflow/ -type f -name "*.cfg" -exec sed -i "s#{{airflow_secret_key}}#${air export MLFLOW_TRACKING_URI=postgresql+psycopg2://${pg_user_ml}:${pg_password_ml}@${pg_host_ml}:${pg_port_ml}/${pg_dbname_ml} git init airflow/dags # Airflow setup -airflow db init -airflow users create \ - --username admin \ - --firstname admin \ - --lastname admin \ - --role Admin \ - --email admin@admin.admin \ - -p ${airflow_admin_password} +# airflow db init +# airflow users create \ +# --username admin \ +# --firstname admin \ +# --lastname admin \ +# --role Admin \ +# --email admin@admin.admin \ +# -p ${airflow_admin_password} # Run services airflow webserver --port 8080 & airflow scheduler & ./mlflow_server.sh diff --git a/ee/recommendation/ml_trainer/main.py b/ee/recommendation/ml_trainer/main.py index 53ae07ea7..02a94a2a3 100644 --- a/ee/recommendation/ml_trainer/main.py +++ b/ee/recommendation/ml_trainer/main.py @@ -3,7 +3,7 @@ import hashlib import argparse import numpy as np from decouple import config -from datetime import datetime +from datetime import datetime,timedelta from core.user_features import get_training_database from core.recommendation_model import SVM_recommendation, sort_database @@ -53,7 +53,7 @@ def main(experiment_name, projectId, tenantId): tenantId: tenant of the project id (used mainly as salt for hashing). """ hashed = hashlib.sha256(bytes(f'{projectId}-{tenantId}'.encode('utf-8'))).hexdigest() - x_, y_, d = get_training_database(projectId, max_timestamp=1680248412284, favorites=True) + x_, y_, d = get_training_database(projectId, max_timestamp=int((datetime.now() - timedelta(days=1)).timestamp()), favorites=True) x, y = handle_database(x_, y_) if x is None: diff --git a/ee/recommendation/requirements_base.txt b/ee/recommendation/requirements_base.txt index 48c72ee26..e378cc7f0 100644 --- a/ee/recommendation/requirements_base.txt +++ b/ee/recommendation/requirements_base.txt @@ -1,19 +1,19 @@ -requests==2.28.2 -urllib3==1.26.12 -pyjwt==2.6.0 -SQLAlchemy==2.0.10 +requests==2.31.0 +urllib3==1.26.16 +pyjwt==2.8.0 +SQLAlchemy==2.0.20 alembic==1.11.1 -psycopg2-binary==2.9.5 +psycopg2-binary==2.9.7 -joblib==1.2.0 -scipy==1.10.1 -scikit-learn==1.2.2 -mlflow==2.3 +joblib==1.3.2 +scipy==1.11.2 +scikit-learn==1.3.0 +mlflow==2.5 -clickhouse-driver==0.2.5 -python3-saml==1.14.0 -python-multipart==0.0.5 +clickhouse-driver==0.2.6 +python3-saml==1.15.0 +python-multipart==0.0.6 python-decouple==3.8 -pydantic==1.10.8 +pydantic==1.10.12 -boto3==1.26.100 +boto3==1.28.29