Redshift pool (#1397)

* fix(connectors): added try method for redshift replace in fill

* fix(connectors): adding userId only if not empty string

* fix(connectors): Added fix as a worker
This commit is contained in:
MauricioGarciaS 2023-07-10 14:29:40 +02:00 committed by GitHub
parent 58624687c6
commit b9b187d684
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 78 additions and 7 deletions

View file

@ -1,2 +1,2 @@
echo "[INFO] Service start" echo "[INFO] Service start"
python -u consumer_pool.py & python -u fill_from_db.py && fg python -u consumer_pool.py

View file

@ -74,7 +74,7 @@ def try_method(f, params, on_exeption=None, _try=0):
return return
on_exeption.restart() on_exeption.restart()
else: else:
logging.warning('[FILL Exception]', repr(e), 'retrying..') logging.info('[FILL Exception]', repr(e), 'retrying..')
sleep(1) sleep(1)
return try_method(f=f, params=params, on_exeption=on_exeption, _try=_try+1) return try_method(f=f, params=params, on_exeption=on_exeption, _try=_try+1)
return None return None

View file

@ -187,7 +187,8 @@ def handle_session(n: Session, message: Message) -> Optional[Session]:
if isinstance(message, UserID): if isinstance(message, UserID):
try: try:
n.user_id = message.id if message.id != '':
n.user_id = message.id
except AttributeError as e: except AttributeError as e:
print(f'Session current type: {type(n)}') print(f'Session current type: {type(n)}')
print(f'Message id: {message.id}') print(f'Message id: {message.id}')
@ -346,7 +347,8 @@ def handle_message(message: Message) -> Optional[DetailedEvent]:
if isinstance(message, UserID): if isinstance(message, UserID):
n.userid_id = message.id if message.id != '':
n.userid_id = message.id
return n return n
if isinstance(message, UserAnonymousID): if isinstance(message, UserAnonymousID):

View file

@ -1,5 +1,6 @@
from multiprocessing import Pool, Process, Pipe, TimeoutError from multiprocessing import Pool, Process, Pipe, TimeoutError
from multiprocessing.connection import Connection from multiprocessing.connection import Connection
from db.api import DBConnection
from msgcodec import MessageCodec from msgcodec import MessageCodec
from messages import SessionEnd from messages import SessionEnd
from utils.uploader import insertBatch from utils.uploader import insertBatch
@ -12,6 +13,9 @@ from utils import pg_client
from utils.signal_handler import signal_handler from utils.signal_handler import signal_handler
from copy import deepcopy from copy import deepcopy
from confluent_kafka import Consumer from confluent_kafka import Consumer
import pandas as pd
from time import time
import logging
import json import json
import asyncio import asyncio
@ -292,12 +296,77 @@ def decode_message(params: dict):
return events_worker_batch, memory, sessionid_ended return events_worker_batch, memory, sessionid_ended
def fix_missing_redshift():
DATABASE = config('CLOUD_SERVICE')
table = sessions_table_name
database_api = DBConnection(DATABASE)
limit = config('FILL_QUERY_LIMIT', default=100, cast=int)
t = time()
query = "SELECT sessionid FROM {table} WHERE user_id = 'NULL' ORDER BY session_start_timestamp ASC LIMIT {limit}"
try:
res = database_api.pdredshift.redshift_to_pandas(query.format(table=table, limit=limit))
except Exception as e:
logging.error(f'[ERROR] Error while executing query {repr(e)}')
database_api.close()
return
if res is None:
logging.info('[FILL INFO] response is None')
database_api.close()
return
elif len(res) == 0:
logging.info('[FILL INFO] zero length response')
database_api.close()
return
# logging.info(f'[FILL INFO] {len(res)} length response')
sessionids = list(map(lambda k: str(k), res['sessionid']))
asyncio.run(pg_client.init())
with pg_client.PostgresClient() as conn:
conn.execute('SELECT session_id, user_id FROM sessions WHERE session_id IN ({session_id_list})'.format(
session_id_list=','.join(sessionids))
)
pg_res = conn.fetchall()
logging.info(f'response from pg, length {len(pg_res)}')
df = pd.DataFrame(pg_res)
df.fillna('NN', inplace=True)
df = df.groupby('user_id').agg({'session_id': lambda x: list(x)})
base_query = "UPDATE {table} SET user_id = CASE".format(table=table)
template = "\nWHEN sessionid IN ({session_ids}) THEN '{user_id}'"
all_ids = list()
# logging.info(f'[FILL INFO] {pg_res[:5]}')
for i in range(len(df)):
user = df.iloc[i].name
aux = [str(sess) for sess in df.iloc[i].session_id if sess != 'NN']
all_ids += aux
if len(aux) == 0:
continue
base_query += template.format(user_id=user, session_ids=','.join(aux))
base_query += f"\nEND WHERE sessionid IN ({','.join(all_ids)})"
if len(all_ids) == 0:
logging.info('[FILL INFO] No ids obtained')
database_api.close()
asyncio.run(pg_client.terminate())
return
# logging.info(f'[FILL INFO] {base_query}')
try:
database_api.pdredshift.exec_commit(base_query)
except Exception as e:
logging.error(f'[ERROR] Error while executing query {repr(e)}')
database_api.close()
asyncio.run(pg_client.terminate())
return
logging.info(f'[FILL-INFO] {time() - t} - for {len(sessionids)} elements')
database_api.close()
asyncio.run(pg_client.terminate())
return
def work_assigner(params): def work_assigner(params):
flag = params.pop('flag') flag = params.pop('flag')
if flag == 'decoder': if flag == 'decoder':
return {'flag': 'decoder', 'value': decode_message(params)} return {'flag': 'decoder', 'value': decode_message(params)}
# elif flag == 'reader': elif flag == 'fix':
# return {'flag': 'reader', 'value': read_from_kafka(params)} return {'flag': 'fix', 'value': fix_missing_redshift()}
class WorkerPool: class WorkerPool:
@ -396,13 +465,13 @@ class WorkerPool:
decoding_params[worker_id]['memory'][session_id] = session_to_dict(self.sessions[session_id]) decoding_params[worker_id]['memory'][session_id] = session_to_dict(self.sessions[session_id])
except KeyError: except KeyError:
... ...
# Hand tasks to workers # Hand tasks to workers
async_results = list() async_results = list()
for params in decoding_params: for params in decoding_params:
if params['message']: if params['message']:
async_results.append(self.pool.apply_async(work_assigner, args=[params])) async_results.append(self.pool.apply_async(work_assigner, args=[params]))
results = [{'flag': 'reader', 'value': main_conn.recv()}] results = [{'flag': 'reader', 'value': main_conn.recv()}]
async_results.append(self.pool.apply_async(work_assigner, args=[{'flag': 'fix'}]))
for async_result in async_results: for async_result in async_results:
try: try:
results.append(async_result.get(timeout=32 * UPLOAD_RATE)) results.append(async_result.get(timeout=32 * UPLOAD_RATE))