From b9b187d684b0ff8c0f6ea2d3a194629dfad653a1 Mon Sep 17 00:00:00 2001 From: MauricioGarciaS <47052044+MauricioGarciaS@users.noreply.github.com> Date: Mon, 10 Jul 2023 14:29:40 +0200 Subject: [PATCH] Redshift pool (#1397) * fix(connectors): added try method for redshift replace in fill * fix(connectors): adding userId only if not empty string * fix(connectors): Added fix as a worker --- ee/connectors/entrypoint.sh | 2 +- ee/connectors/fill_from_db.py | 2 +- ee/connectors/handler.py | 6 ++- ee/connectors/utils/worker.py | 75 +++++++++++++++++++++++++++++++++-- 4 files changed, 78 insertions(+), 7 deletions(-) diff --git a/ee/connectors/entrypoint.sh b/ee/connectors/entrypoint.sh index 2eb4b7ae7..523704b97 100755 --- a/ee/connectors/entrypoint.sh +++ b/ee/connectors/entrypoint.sh @@ -1,2 +1,2 @@ echo "[INFO] Service start" -python -u consumer_pool.py & python -u fill_from_db.py && fg +python -u consumer_pool.py diff --git a/ee/connectors/fill_from_db.py b/ee/connectors/fill_from_db.py index 54ad7f162..c5532cb1d 100644 --- a/ee/connectors/fill_from_db.py +++ b/ee/connectors/fill_from_db.py @@ -74,7 +74,7 @@ def try_method(f, params, on_exeption=None, _try=0): return on_exeption.restart() else: - logging.warning('[FILL Exception]', repr(e), 'retrying..') + logging.info('[FILL Exception]', repr(e), 'retrying..') sleep(1) return try_method(f=f, params=params, on_exeption=on_exeption, _try=_try+1) return None diff --git a/ee/connectors/handler.py b/ee/connectors/handler.py index 9edfbd34e..40f601eaf 100644 --- a/ee/connectors/handler.py +++ b/ee/connectors/handler.py @@ -187,7 +187,8 @@ def handle_session(n: Session, message: Message) -> Optional[Session]: if isinstance(message, UserID): try: - n.user_id = message.id + if message.id != '': + n.user_id = message.id except AttributeError as e: print(f'Session current type: {type(n)}') print(f'Message id: {message.id}') @@ -346,7 +347,8 @@ def handle_message(message: Message) -> Optional[DetailedEvent]: if isinstance(message, UserID): - n.userid_id = message.id + if message.id != '': + n.userid_id = message.id return n if isinstance(message, UserAnonymousID): diff --git a/ee/connectors/utils/worker.py b/ee/connectors/utils/worker.py index 86b3bf3c5..5692ce234 100644 --- a/ee/connectors/utils/worker.py +++ b/ee/connectors/utils/worker.py @@ -1,5 +1,6 @@ from multiprocessing import Pool, Process, Pipe, TimeoutError from multiprocessing.connection import Connection +from db.api import DBConnection from msgcodec import MessageCodec from messages import SessionEnd from utils.uploader import insertBatch @@ -12,6 +13,9 @@ from utils import pg_client from utils.signal_handler import signal_handler from copy import deepcopy from confluent_kafka import Consumer +import pandas as pd +from time import time +import logging import json import asyncio @@ -292,12 +296,77 @@ def decode_message(params: dict): return events_worker_batch, memory, sessionid_ended +def fix_missing_redshift(): + DATABASE = config('CLOUD_SERVICE') + table = sessions_table_name + database_api = DBConnection(DATABASE) + + limit = config('FILL_QUERY_LIMIT', default=100, cast=int) + t = time() + query = "SELECT sessionid FROM {table} WHERE user_id = 'NULL' ORDER BY session_start_timestamp ASC LIMIT {limit}" + try: + res = database_api.pdredshift.redshift_to_pandas(query.format(table=table, limit=limit)) + except Exception as e: + logging.error(f'[ERROR] Error while executing query {repr(e)}') + database_api.close() + return + if res is None: + logging.info('[FILL INFO] response is None') + database_api.close() + return + elif len(res) == 0: + logging.info('[FILL INFO] zero length response') + database_api.close() + return + # logging.info(f'[FILL INFO] {len(res)} length response') + sessionids = list(map(lambda k: str(k), res['sessionid'])) + asyncio.run(pg_client.init()) + with pg_client.PostgresClient() as conn: + conn.execute('SELECT session_id, user_id FROM sessions WHERE session_id IN ({session_id_list})'.format( + session_id_list=','.join(sessionids)) + ) + pg_res = conn.fetchall() + logging.info(f'response from pg, length {len(pg_res)}') + df = pd.DataFrame(pg_res) + df.fillna('NN', inplace=True) + df = df.groupby('user_id').agg({'session_id': lambda x: list(x)}) + base_query = "UPDATE {table} SET user_id = CASE".format(table=table) + template = "\nWHEN sessionid IN ({session_ids}) THEN '{user_id}'" + all_ids = list() + # logging.info(f'[FILL INFO] {pg_res[:5]}') + for i in range(len(df)): + user = df.iloc[i].name + aux = [str(sess) for sess in df.iloc[i].session_id if sess != 'NN'] + all_ids += aux + if len(aux) == 0: + continue + base_query += template.format(user_id=user, session_ids=','.join(aux)) + base_query += f"\nEND WHERE sessionid IN ({','.join(all_ids)})" + if len(all_ids) == 0: + logging.info('[FILL INFO] No ids obtained') + database_api.close() + asyncio.run(pg_client.terminate()) + return + # logging.info(f'[FILL INFO] {base_query}') + try: + database_api.pdredshift.exec_commit(base_query) + except Exception as e: + logging.error(f'[ERROR] Error while executing query {repr(e)}') + database_api.close() + asyncio.run(pg_client.terminate()) + return + logging.info(f'[FILL-INFO] {time() - t} - for {len(sessionids)} elements') + database_api.close() + asyncio.run(pg_client.terminate()) + return + + def work_assigner(params): flag = params.pop('flag') if flag == 'decoder': return {'flag': 'decoder', 'value': decode_message(params)} - # elif flag == 'reader': - # return {'flag': 'reader', 'value': read_from_kafka(params)} + elif flag == 'fix': + return {'flag': 'fix', 'value': fix_missing_redshift()} class WorkerPool: @@ -396,13 +465,13 @@ class WorkerPool: decoding_params[worker_id]['memory'][session_id] = session_to_dict(self.sessions[session_id]) except KeyError: ... - # Hand tasks to workers async_results = list() for params in decoding_params: if params['message']: async_results.append(self.pool.apply_async(work_assigner, args=[params])) results = [{'flag': 'reader', 'value': main_conn.recv()}] + async_results.append(self.pool.apply_async(work_assigner, args=[{'flag': 'fix'}])) for async_result in async_results: try: results.append(async_result.get(timeout=32 * UPLOAD_RATE))