Updated dag for updating database with feedbacks, changed feedback file from ml_service/core into common core

This commit is contained in:
MauricioGarciaS 2023-08-29 09:58:13 +02:00
parent 4b1d0f5c59
commit 490ac3c2ed
2 changed files with 76 additions and 71 deletions

View file

@ -14,7 +14,8 @@ from decouple import config
_work_dir = os.getcwd()
sys.path.insert(1, _work_dir)
from utils import pg_client
from utils.feedback import ConnectionHandler
from utils import ch_client
from core.feedback import ConnectionHandler
from sqlalchemy import text
@ -34,84 +35,84 @@ def get_today_feedback():
with connection_handler.get_live_session() as conn:
cur = conn.connection().connection.cursor()
query = cur.mogrify(
f"""SELECT * FROM recommendation_feedback WHERE insertion_time > %(time_lower_bound)s;""",
f"""SELECT project_id, session_id, user_id as viewer_id, payload FROM recommendation_feedback WHERE insertion_time > %(time_lower_bound)s;""",
{'time_lower_bound': int(datetime.now().timestamp()) - execute_interval})
conn.execute(text(query.decode("utf-8")))
cur = conn.execute(text(query.decode("utf-8")))
res = cur.fetchall()
conn.commit()
for i in range(len(res)):
payload_i = res[i][3]
res[i] = res[i][:3] + (payload_i['reason'], payload_i['comment'], payload_i['interesting'])
def get_features_pg(ti):
os.environ['PG_POOL'] = 'true'
asyncio.run(pg_client.init())
sessionIds = ti.xcom_pull(key='sessionIds')
userIds = ti.xcom_pull(key='userIds').split(',')
df = pd.DataFrame(res, columns=["project_id", "session_id", "viewer_id", "reason", "comment", "interesting"])
sessionIds = ','.join([str(k) for k in df['session_id'].unique()])
asyncio.run(pg_client.init()) # Connection to OR postgres database
with pg_client.PostgresClient() as conn:
conn.execute(
"""SELECT T.project_id,
T.session_id,
T2.viewer_id,
T.pages_count,
T.events_count,
T.errors_count,
T.duration,
T.country,
T.issue_score,
T.device_type,
T2.replays,
T2.network_access,
T2.storage_access,
T2.console_access,
T2.stack_access
FROM (SELECT project_id,
user_id as viewer_id,
session_id,
count(CASE WHEN source = 'replay' THEN 1 END) as replays,
count(CASE WHEN source = 'network' THEN 1 END) as network_access,
count(CASE WHEN source = 'storage' THEN 1 END) as storage_access,
count(CASE WHEN source = 'console' THEN 1 END) as console_access,
count(CASE WHEN source = 'stack_events' THEN 1 END) as stack_access
FROM frontend_signals
WHERE session_id IN ({sessionIds})
GROUP BY project_id, viewer_id, session_id) as T2
INNER JOIN (SELECT project_id,
session_id,
user_id,
pages_count,
events_count,
errors_count,
duration,
user_country as country,
issue_score,
user_device_type as device_type
FROM sessions
WHERE session_id IN ({sessionIds})
AND duration IS NOT NULL) as T
USING (session_id);""".format(sessionIds=sessionIds)
)
response = conn.fetchall()
sessionIds = [int(sessId) for sessId in sessionIds.split(',')]
df = pd.DataFrame(response)
df2 = pd.DataFrame(zip(userIds, sessionIds), columns=['viewer_id', 'session_id'])
conn.execute("""SELECT T.project_id,
T.session_id,
T2.viewer_id,
T.pages_count,
T.events_count,
T.errors_count,
T.duration,
T.country,
T.issue_score,
T.device_type,
T2.replays,
T2.network_access,
T2.storage_access,
T2.console_access,
T2.stack_access
FROM (SELECT project_id,
user_id as viewer_id,
session_id,
count(CASE WHEN source = 'replay' THEN 1 END) as replays,
count(CASE WHEN source = 'network' THEN 1 END) as network_access,
count(CASE WHEN source = 'storage' THEN 1 END) as storage_access,
count(CASE WHEN source = 'console' THEN 1 END) as console_access,
count(CASE WHEN source = 'stack_events' THEN 1 END) as stack_access
FROM frontend_signals
WHERE session_id IN ({sessionIds})
GROUP BY project_id, viewer_id, session_id) as T2
INNER JOIN (SELECT project_id,
session_id,
user_id,
pages_count,
events_count,
errors_count,
duration,
user_country as country,
issue_score,
user_device_type as device_type
FROM sessions
WHERE session_id IN ({sessionIds})
AND duration IS NOT NULL) as T
USING (session_id);""".format(sessionIds=sessionIds)
)
res = conn.fetchall()
df2 = pd.DataFrame(res,
columns=["project_id", "session_id", "viewer_id", "pages_count", "events_count", "errors_count",
"duration", "country", "issue_score", "device_type", "replays", "network_access",
"storage_access", "console_access", "stack_acces"])
base_query = f"""INSERT INTO {features_table_name} (project_id, session_id, viewer_id, pages_count, events_count,
issues_count, duration, country, issue_score, device_type,
replays, network_access, storage_access, console_access,
stack_access) VALUES """
count = 0
df2 = df.merge(df2, on=['session_id', 'project_id', 'viewer_id'], how='inner')
## Upload df2 to DB table
base_query = f"""INSERT INTO {features_table_name} (project_id, session_id, viewer_id, reason, comment, interesting, pages_count, events_count,
issues_count, duration, country, issue_score, device_type,
replays, network_access, storage_access, console_access,
stack_access) VALUES """
params = {}
for i in range(len(df)):
viewer = df['viewer_id'].iloc[i]
session = df['session_id'].iloc[i]
d = df2[df2['viewer_id'] == viewer]
x = d[d['session_id'] == session]
if len(x) > 0:
template = '('
for k, v in x.items():
params[f'{k}_{count}'] = v.values[0]
template += f's({k}_{count})%'
base_query += template + '), '
count += 1
for i in range(len(df2)):
template = '('
for k, v in df2.iloc[i].items():
params[f'{k}_{i}'] = v.values[0]
template += f's({k}_{i})%'
base_query += template + '), '
base_query = base_query[:-2]
connection_handler = ConnectionHandler(tracking_uri)
with connection_handler.get_live_session() as conn:
@ -121,6 +122,10 @@ def get_features_pg(ti):
conn.commit()
def get_features_pg():
...
dag = DAG(
"Feedback_DB_FILL",
default_args={