fix(recommendations): fixed database update and added more features into DB
This commit is contained in:
parent
490ac3c2ed
commit
e4449c74bd
1 changed files with 33 additions and 15 deletions
|
|
@ -11,11 +11,13 @@ from airflow.operators.bash import BashOperator
|
|||
from airflow.operators.python import PythonOperator, ShortCircuitOperator
|
||||
from datetime import datetime, timedelta
|
||||
from decouple import config
|
||||
import numpy as np
|
||||
_work_dir = os.getcwd()
|
||||
sys.path.insert(1, _work_dir)
|
||||
from utils import pg_client
|
||||
from utils import ch_client
|
||||
from core.feedback import ConnectionHandler
|
||||
from copy import copy
|
||||
from sqlalchemy import text
|
||||
|
||||
|
||||
|
|
@ -30,14 +32,12 @@ tracking_uri = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}"
|
|||
|
||||
|
||||
def get_today_feedback():
|
||||
connection_handler = ConnectionHandler(tracking_uri)
|
||||
query = "SELECT project_id, session_id, user_id as viewer_id, payload FROM recommendation_feedback"
|
||||
|
||||
connection_handler = ConnectionHandler(tracking_uri) # Connection to mlflow's database
|
||||
|
||||
with connection_handler.get_live_session() as conn:
|
||||
cur = conn.connection().connection.cursor()
|
||||
query = cur.mogrify(
|
||||
f"""SELECT project_id, session_id, user_id as viewer_id, payload FROM recommendation_feedback WHERE insertion_time > %(time_lower_bound)s;""",
|
||||
{'time_lower_bound': int(datetime.now().timestamp()) - execute_interval})
|
||||
cur = conn.execute(text(query.decode("utf-8")))
|
||||
cur = conn.execute(text(query))
|
||||
res = cur.fetchall()
|
||||
conn.commit()
|
||||
|
||||
|
|
@ -47,7 +47,21 @@ def get_today_feedback():
|
|||
|
||||
df = pd.DataFrame(res, columns=["project_id", "session_id", "viewer_id", "reason", "comment", "interesting"])
|
||||
|
||||
sessionIds = ','.join([str(k) for k in df['session_id'].unique()])
|
||||
sessionsIds_list = df['session_id'].unique()
|
||||
sessionIds = ','.join([str(k) for k in sessionsIds_list])
|
||||
with ch_client.ClickHouseClient() as conn:
|
||||
query = f"""SELECT session_id, issue_type, count(1) as event_count FROM experimental.events WHERE session_id in ({sessionIds}) AND event_type = 'ISSUE' GROUP BY session_id, issue_type;"""
|
||||
res = conn.execute(query)
|
||||
|
||||
df3 = pd.DataFrame(res)
|
||||
df3 = df3.pivot(index='session_id', columns=['issue_type'], values=['event_count']).event_count
|
||||
|
||||
issues_type_found = df3.columns
|
||||
df[issues_type_found] = [[0] * len(issues_type_found)] * len(df)
|
||||
for sess in df3.index:
|
||||
tmp = copy(df[df['session_id'] == sess])
|
||||
tmp[issues_type_found] = [df3.loc[sess]] * len(tmp)
|
||||
df.loc[df['session_id'] == sess] = tmp
|
||||
|
||||
asyncio.run(pg_client.init()) # Connection to OR postgres database
|
||||
with pg_client.PostgresClient() as conn:
|
||||
|
|
@ -96,23 +110,27 @@ def get_today_feedback():
|
|||
df2 = pd.DataFrame(res,
|
||||
columns=["project_id", "session_id", "viewer_id", "pages_count", "events_count", "errors_count",
|
||||
"duration", "country", "issue_score", "device_type", "replays", "network_access",
|
||||
"storage_access", "console_access", "stack_acces"])
|
||||
"storage_access", "console_access", "stack_access"])
|
||||
|
||||
df2 = df.merge(df2, on=['session_id', 'project_id', 'viewer_id'], how='inner')
|
||||
for i in range(len(df2.columns)):
|
||||
if df2.dtypes[i] == np.float64:
|
||||
df2[df2.columns[i]] = df2[df2.columns[i]].astype('int')
|
||||
df2.fillna(0, inplace=True)
|
||||
|
||||
## Upload df2 to DB table
|
||||
|
||||
base_query = f"""INSERT INTO {features_table_name} (project_id, session_id, viewer_id, reason, comment, interesting, pages_count, events_count,
|
||||
issues_count, duration, country, issue_score, device_type,
|
||||
replays, network_access, storage_access, console_access,
|
||||
stack_access) VALUES """
|
||||
base_query = f"""INSERT INTO {features_table_name} ({', '.join(df2.columns)}) VALUES """
|
||||
params = {}
|
||||
for i in range(len(df2)):
|
||||
template = '('
|
||||
for k, v in df2.iloc[i].items():
|
||||
params[f'{k}_{i}'] = v.values[0]
|
||||
template += f's({k}_{i})%'
|
||||
base_query += template + '), '
|
||||
try:
|
||||
params[f'{k}_{i}'] = v.item()
|
||||
except Exception:
|
||||
params[f'{k}_{i}'] = v
|
||||
template += f'%({k}_{i})s, '
|
||||
base_query += template[:-2] + '), '
|
||||
base_query = base_query[:-2]
|
||||
connection_handler = ConnectionHandler(tracking_uri)
|
||||
with connection_handler.get_live_session() as conn:
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue