openreplay/ee/recommendation/ml_trainer/airflow/dags/db_update.py
MauricioGarciaS 7ffcf79bf6
chore(recommendations): python modules updated and added airflow dag to save sessions features (#1979)
* fix(trainer): Updated requirements

* fix(recommendations): Downgraded pydantic to 1.10.12 and mlflow to 2.5

* Updated dag for updating database with feedbacks, changed feedback file from ml_service/core into common core

* fix(recommendations): fixed database update and added more features into DB

* Updated modules in recommendations trainer and server

* chore(recommendations): Updated python modules for trainer. Added script to save features from feedback sessions into ml database.

* updated requirements

* updated requirements
2024-04-24 15:10:18 +02:00

171 lines
7.4 KiB
Python

import asyncio
import hashlib
import mlflow
import os
import pandas as pd
import pendulum
import sys
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, ShortCircuitOperator
from datetime import datetime, timedelta
from decouple import config
import numpy as np
_work_dir = os.getcwd()
sys.path.insert(1, _work_dir)
from utils import pg_client
from utils import ch_client
from core.feedback import ConnectionHandler
from copy import copy
from sqlalchemy import text
execute_interval = config('EXECUTION_INTERVAL', default=24*60*60, cast=int)
features_table_name = config('FEATURES_TABLE_NAME', default='features_table')
host = config('pg_host_ml')
port = config('pg_port_ml')
user = config('pg_user_ml')
dbname = config('pg_dbname_ml')
password = config('pg_password_ml')
tracking_uri = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}"
# 1702296756
def get_today_feedback():
current_datetime = int((datetime.now()-timedelta(seconds=execute_interval)).timestamp())
query = f"SELECT project_id, session_id, user_id as viewer_id, payload FROM recommendation_feedback WHERE insertion_time >= {current_datetime}"
connection_handler = ConnectionHandler(tracking_uri) # Connection to mlflow's database
with connection_handler.get_live_session() as conn:
cur = conn.execute(text(query))
res = cur.fetchall()
conn.commit()
for i in range(len(res)):
payload_i = res[i][3]
res[i] = res[i][:3] + (payload_i['reason'], payload_i['comment'], payload_i['interesting'])
df = pd.DataFrame(res, columns=["project_id", "session_id", "viewer_id", "reason", "comment", "interesting"])
sessionsIds_list = df['session_id'].unique()
sessionIds = ','.join([str(k) for k in sessionsIds_list])
with ch_client.ClickHouseClient() as conn:
query = f"""SELECT session_id, issue_type, count(1) as event_count FROM experimental.events WHERE session_id in ({sessionIds}) AND event_type = 'ISSUE' GROUP BY session_id, issue_type;"""
res = conn.execute(query)
df3 = pd.DataFrame(res)
df3 = df3.pivot(index='session_id', columns=['issue_type'], values=['event_count']).event_count
issues_type_found = df3.columns
df[issues_type_found] = [[0] * len(issues_type_found)] * len(df)
for sess in df3.index:
tmp = copy(df[df['session_id'] == sess])
tmp[issues_type_found] = [df3.loc[sess]] * len(tmp)
df.loc[df['session_id'] == sess] = tmp
asyncio.run(pg_client.init()) # Connection to OR postgres database
with pg_client.PostgresClient() as conn:
conn.execute("""SELECT T.project_id,
T.session_id,
T2.viewer_id,
T.pages_count,
T.events_count,
T.errors_count,
T.duration,
T.country,
T.issue_score,
T.device_type,
T2.replays,
T2.network_access,
T2.storage_access,
T2.console_access,
T2.stack_access
FROM (SELECT project_id,
user_id as viewer_id,
session_id,
count(CASE WHEN source = 'replay' THEN 1 END) as replays,
count(CASE WHEN source = 'network' THEN 1 END) as network_access,
count(CASE WHEN source = 'storage' THEN 1 END) as storage_access,
count(CASE WHEN source = 'console' THEN 1 END) as console_access,
count(CASE WHEN source = 'stack_events' THEN 1 END) as stack_access
FROM frontend_signals
WHERE session_id IN ({sessionIds})
GROUP BY project_id, viewer_id, session_id) as T2
INNER JOIN (SELECT project_id,
session_id,
user_id,
pages_count,
events_count,
errors_count,
duration,
user_country as country,
issue_score,
user_device_type as device_type
FROM sessions
WHERE session_id IN ({sessionIds})
AND duration IS NOT NULL) as T
USING (session_id);""".format(sessionIds=sessionIds)
)
res = conn.fetchall()
df2 = pd.DataFrame(res,
columns=["project_id", "session_id", "viewer_id", "pages_count", "events_count", "errors_count",
"duration", "country", "issue_score", "device_type", "replays", "network_access",
"storage_access", "console_access", "stack_access"])
df2 = df.merge(df2, on=['session_id', 'project_id', 'viewer_id'], how='inner')
for i in range(len(df2.columns)):
if df2.dtypes[i] == np.float64:
df2[df2.columns[i]] = df2[df2.columns[i]].astype('int')
df2.fillna(0, inplace=True)
## Upload df2 to DB table
base_query = f"""INSERT INTO {features_table_name} ({', '.join(df2.columns)}) VALUES """
params = {}
for i in range(len(df2)):
template = '('
for k, v in df2.iloc[i].items():
try:
params[f'{k}_{i}'] = v.item()
except Exception:
params[f'{k}_{i}'] = v
template += f'%({k}_{i})s, '
base_query += template[:-2] + '), '
base_query = base_query[:-2]
connection_handler = ConnectionHandler(tracking_uri)
with connection_handler.get_live_session() as conn:
cur = conn.connection().connection.cursor()
query = cur.mogrify(base_query, params)
conn.execute(text(query.decode("utf-8")))
conn.commit()
def get_features_pg():
...
dag = DAG(
"Feedback_DB_FILL",
default_args={
"retries": 1,
"retry_delay": timedelta(minutes=3),
},
start_date=pendulum.datetime(2015, 12, 1, tz="UTC"),
description="My first test",
schedule=timedelta(seconds=execute_interval),
catchup=False,
)
with dag:
dag_t_feedback = PythonOperator(
task_id='Get_Feedbacks',
python_callable=get_today_feedback,
)
dag_features = PythonOperator(
task_id='Update_DB',
python_callable=get_features_pg,
)
dag_t_feedback >> dag_features