Improvements (#1387)

* fix for memory cleansing and added setup files for msgcodec build

* Kafka reader now calling only once to PG to verify sessionIds, updated fill missing user_ids script

* fix(connector): small error while reading from pg

---------

Co-authored-by: MauricioGarciaS <47052044+MauricioGarciaS@users.noreply.github.com>
This commit is contained in:
Rajesh Rajendran 2023-06-30 12:04:23 +02:00 committed by GitHub
parent 265f5d5500
commit 53081a9822
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 218 additions and 68 deletions

View file

@ -19,5 +19,5 @@ COPY handler.py .
COPY consumer_pool.py .
COPY fill_from_db.py .
COPY entrypoint.sh .
ENV replace_interval=300
ENTRYPOINT ./entrypoint.sh

View file

@ -2,6 +2,7 @@ chardet==5.1.0
idna==3.4
confluent-kafka==2.1.1
psycopg2-binary==2.9.6
apscheduler==3.10.1
python-decouple==3.8
pytz==2022.6
requests==2.28.1

View file

@ -1,2 +1,2 @@
echo "[INFO] Service start"
python -u consumer_pool.py
python -u consumer_pool.py & python -u fill_from_db.py && fg

View file

@ -1,7 +1,12 @@
import pandas_redshift as pdredshift
import pandas as pd
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from utils import pg_client
from decouple import config, Choices
import asyncio
from time import time
DATABASE = config('CLOUD_SERVICE')
sessions_table_name = config('SESSIONS_TABLE', default='connector_user_sessions')
@ -31,30 +36,77 @@ pdredshift.connect_to_redshift(dbname=cluster_info['DBNAME'],
password=cluster_info['PASSWORD'],
sslmode=sslmode)
async def main(limit = 100):
async def main():
limit = config('FILL_QUERY_LIMIT', default=100, cast=int)
t = time()
query = "SELECT sessionid FROM {table} WHERE user_id = 'NULL' LIMIT {limit}"
try:
res = pdredshift.redshift_to_pandas(query.format(table=table, limit=limit))
except Exception as e:
print(repr(e))
if len(res) == 0:
res = list()
if res is None:
return
elif len(res) == 0:
return
sessionids = list(map(lambda k: str(k), res['sessionid']))
await pg_client.init()
with pg_client.PostgresClient() as conn:
conn.execute('SELECT session_id, user_id FROM sessions WHERE session_id IN ({session_id_list})'.format(
session_id_list = ','.join(sessionids))
)
pg_res = conn.fetchall()
df = pd.DataFrame(pg_res)
df.dropna(inplace=True)
df = df.groupby('user_id').agg({'session_id': lambda x: list(x)})
base_query = "UPDATE {table} SET user_id = CASE".format(table=table)
template = "\nWHEN sessionid IN ({session_ids}) THEN '{user_id}'"
all_ids = list()
for i in range(len(df)):
user = df.iloc[i].name
if user == '' or user == 'None' or user == 'NULL':
continue
aux = [str(sess) for sess in df.iloc[i].session_id]
all_ids += aux
if len(aux) == 0:
continue
base_query += template.format(user_id=user, session_ids=','.join(aux))
base_query += f"\nEND WHERE sessionid IN ({','.join(all_ids)})"
if len(all_ids) == 0:
return
print(base_query[:200])
try:
pdredshift.exec_commit(base_query)
except Exception as e:
print(repr(e))
print(f'[FILL-INFO] {time()-t} - for {len(sessionids)} elements')
cron_jobs = [
{"func": main, "trigger": IntervalTrigger(seconds=15), "misfire_grace_time": 60, "max_instances": 1},
]
def get_or_create_eventloop():
try:
return asyncio.get_event_loop()
except RuntimeError as ex:
if "There is no current event loop in thread" in str(ex):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return asyncio.get_event_loop()
base_query = "UPDATE {table} SET user_id = '{user_id}' WHERE sessionid = {session_id}"
for e in pg_res:
query = base_query.format(table=table, user_id=e['user_id'], session_id=e['session_id'])
try:
pdredshift.exec_commit(query)
except Exception as e:
print(repr(e))
if __name__ == '__main__':
asyncio.run(main())
scheduler = AsyncIOScheduler()
asyncio.run(pg_client.init())
for job in cron_jobs:
scheduler.add_job(id=job['func'].__name__, **job)
loop = get_or_create_eventloop()
scheduler.start()
try:
loop.run_forever()
except (KeyboardInterrupt, SystemExit):
pass
asyncio.run(pg_client.terminate())

View file

@ -0,0 +1,9 @@
#from setuptools import setup
from distutils.core import setup
from Cython.Build import cythonize
setup(
ext_modules = cythonize("messages.pyx"),
include_package_data=True,
package_data={"": ["*.pxd"]},
)

View file

@ -0,0 +1,9 @@
#from setuptools import setup
from distutils.core import setup
from Cython.Build import cythonize
setup(
ext_modules = cythonize("msgcodec.pyx"),
include_package_data=True,
package_data={"": ["*.pxd"]},
)

View file

@ -27,24 +27,27 @@ class CachedSessions:
self.session_project = dict()
self.max_alive_time = config('MAX_SESSION_LIFE', default=7800, cast=int) # Default 2 hours
def create(self, sessionid):
def create(self, sessionid: int):
"""Saves a new session with status OPEN and set its insertion time"""
self.session_project[sessionid] = (time(), 'OPEN')
_sessionid = str(sessionid)
self.session_project[_sessionid] = (time(), 'OPEN')
def add(self, sessionid):
def add(self, sessionid: int):
"""Handle the creation of a cached session or update its status if already in cache"""
if sessionid in self.session_project.keys():
if self.session_project[sessionid][1] == 'CLOSE':
tmp = self.session_project[sessionid]
self.session_project[sessionid] = (tmp[0], 'UPDATE')
_sessionid = str(sessionid)
if _sessionid in self.session_project.keys():
if self.session_project[_sessionid][1] == 'CLOSE':
tmp = self.session_project[_sessionid]
self.session_project[_sessionid] = (tmp[0], 'UPDATE')
else:
self.create(sessionid)
def close(self, sessionid):
def close(self, sessionid: int):
"""Sets status of session to closed session (received sessionend message)"""
tmp = self.session_project[sessionid]
_sessionid = str(sessionid)
tmp = self.session_project[_sessionid]
old_status = tmp[1]
self.session_project[sessionid] = (tmp[0], 'CLOSE')
self.session_project[_sessionid] = (tmp[0], 'CLOSE')
return old_status
def clear_sessions(self):
@ -53,9 +56,9 @@ class CachedSessions:
current_time = time()
for sessionid, values in self.session_project.items():
if current_time - values[0] > self.max_alive_time:
to_clean_list.append(sessionid)
to_clean_list.append(int(sessionid))
for sessionid in to_clean_list:
del self.session_project[sessionid]
del self.session_project[str(sessionid)]
return to_clean_list

View file

@ -41,6 +41,7 @@ elif EVENT_TYPE == 'detailed':
events_messages = [1, 4, 21, 22, 25, 27, 31, 32, 39, 48, 59, 64, 69, 78, 125, 126]
allowed_messages = list(set(session_messages + events_messages))
codec = MessageCodec(allowed_messages)
max_kafka_read = config('MAX_KAFKA_READ', default=60000, cast=int)
def init_consumer():
@ -95,7 +96,7 @@ class ProjectFilter:
self.sessions_lifespan = CachedSessions()
self.non_valid_sessions_cache = dict()
def is_valid(self, sessionId):
def is_valid(self, sessionId: int):
if len(self.project_filter) == 0:
return True
elif sessionId in self.sessions_lifespan.session_project.keys():
@ -103,13 +104,36 @@ class ProjectFilter:
elif sessionId in self.non_valid_sessions_cache.keys():
return False
else:
projetId = project_from_session(sessionId)
if projetId not in self.project_filter:
projectId = project_from_session(sessionId)
if projectId not in self.project_filter:
self.non_valid_sessions_cache[sessionId] = int(datetime.now().timestamp())
return False
else:
return True
def already_checked(self, sessionId):
if len(self.project_filter) == 0:
return True, True
elif sessionId in self.sessions_lifespan.session_project.keys():
return True, True
elif sessionId in self.non_valid_sessions_cache.keys():
return True, False
else:
return False, None
def are_valid(self, sessionIds: list[int]):
valid_sessions = list()
if len(self.project_filter) == 0:
return sessionIds
projects_session = project_from_sessions(list(set(sessionIds)))
current_datetime = int(datetime.now().timestamp())
for projectId, sessionId in projects_session:
if projectId not in self.project_filter:
self.non_valid_sessions_cache[sessionId] = current_datetime
else:
valid_sessions.append(sessionId)
return valid_sessions
def handle_clean(self):
if len(self.project_filter) == 0:
return
@ -121,45 +145,68 @@ class ProjectFilter:
def read_from_kafka(pipe: Connection, params: dict):
global UPLOAD_RATE
try:
asyncio.run(pg_client.init())
kafka_consumer = init_consumer()
project_filter = params['project_filter']
while True:
to_decode = list()
sessionIds = list()
start_time = datetime.now().timestamp()
broken_batchs = 0
n_messages = 0
while datetime.now().timestamp() - start_time < UPLOAD_RATE:
global UPLOAD_RATE, max_kafka_read
# try:
asyncio.run(pg_client.init())
kafka_consumer = init_consumer()
project_filter = params['project_filter']
capture_messages = list()
capture_sessions = list()
while True:
to_decode = list()
sessionIds = list()
start_time = datetime.now().timestamp()
broken_batchs = 0
n_messages = 0
while datetime.now().timestamp() - start_time < UPLOAD_RATE and max_kafka_read > n_messages:
try:
msg = kafka_consumer.poll(5.0)
if msg is None:
continue
n_messages += 1
try:
sessionId = codec.decode_key(msg.key())
except Exception:
broken_batchs += 1
continue
if project_filter.is_valid(sessionId):
to_decode.append(msg.value())
sessionIds.append(sessionId)
if n_messages != 0:
print(
f'[INFO] Found {broken_batchs} broken batch over {n_messages} read messages ({100 * broken_batchs / n_messages:.2f}%)')
else:
print('[WARN] No messages read')
non_valid_updated = project_filter.non_valid_sessions_cache
pipe.send((non_valid_updated, sessionIds, to_decode))
continue_signal = pipe.recv()
if continue_signal == 'CLOSE':
break
kafka_consumer.commit()
close_consumer(kafka_consumer)
asyncio.run(pg_client.terminate())
except Exception as e:
print('[WARN]', repr(e))
except Exception as e:
print(e)
if msg is None:
continue
n_messages += 1
try:
sessionId = codec.decode_key(msg.key())
except Exception:
broken_batchs += 1
continue
checked, is_valid = project_filter.already_checked(sessionId)
if not checked:
capture_sessions.append(sessionId)
capture_messages.append(msg.value())
elif is_valid:
to_decode.append(msg.value())
sessionIds.append(sessionId)
# if project_filter.is_valid(sessionId):
# to_decode.append(msg.value())
# sessionIds.append(sessionId)
valid_sessions = project_filter.are_valid(list(set(capture_sessions)))
while capture_sessions:
sessId = capture_sessions.pop()
msg = capture_messages.pop()
if sessId in valid_sessions:
sessionIds.append(sessId)
to_decode.append(msg)
if n_messages != 0:
print(
f'[INFO-bg] Found {broken_batchs} broken batch over {n_messages} read messages ({100 * broken_batchs / n_messages:.2f}%)')
else:
print('[WARN-bg] No messages read')
non_valid_updated = project_filter.non_valid_sessions_cache
pipe.send((non_valid_updated, sessionIds, to_decode))
continue_signal = pipe.recv()
if continue_signal == 'CLOSE':
print('[SHUTDOWN-reader] Reader shutting down')
break
kafka_consumer.commit()
print('[INFO] Closing consumer')
close_consumer(kafka_consumer)
print('[INFO] Closing pg connection')
asyncio.run(pg_client.terminate())
print('[INFO] Successfully closed reader task')
# except Exception as e:
# print('[WARN]', repr(e))
def into_batch(batch: list[Event | DetailedEvent], session_id: int, n: Session):
@ -184,6 +231,32 @@ def project_from_session(sessionId: int):
return res['project_id']
def project_from_sessions(sessionIds: list[int]):
"""Search projectId of requested sessionId in PG table sessions"""
response = list()
while sessionIds:
sessIds = sessionIds[-1000:]
try:
with pg_client.PostgresClient() as conn:
conn.execute(
"SELECT session_id, project_id FROM sessions WHERE session_id IN ({sessionIds})".format(
sessionIds=','.join([str(sessId) for sessId in sessIds])
)
)
res = conn.fetchall()
except Exception as e:
print('[project_from_sessions]', repr(e))
raise e
if res is None:
print(f'[WARN] sessionids {",".join([str(sessId) for sessId in sessIds])} not found in sessions table')
else:
response += res
sessionIds = sessionIds[:-1000]
if not response:
return []
return [(e['project_id'], e['session_id']) for e in response]
def decode_message(params: dict):
global codec, session_messages, events_messages, EVENT_TYPE
if len(params['message']) == 0:
@ -301,6 +374,8 @@ class WorkerPool:
while signal_handler.KEEP_PROCESSING:
# Setup of parameters for workers
if not kafka_reader_process.is_alive():
print('[INFO] Restarting reader task')
del kafka_reader_process
kafka_reader_process = Process(target=read_from_kafka, args=(reader_conn, kafka_task_params))
kafka_reader_process.start()
decoding_params = [{'flag': 'decoder',
@ -333,14 +408,15 @@ class WorkerPool:
raise e
events_batch, sessions_insert_batch, sessions_update_batch, session_ids, messages = self._pool_response_handler(
pool_results=results)
insertBatch(events_batch, sessions_insert_batch, sessions_update_batch, database_api, sessions_table_name,
table_name, EVENT_TYPE)
self.save_snapshot(database_api)
main_conn.send('CONTINUE')
print('[INFO] Sending close signal')
main_conn.send('CLOSE')
self.terminate()
kafka_reader_process.terminate()
print('[SHUTDOWN] Process terminated')
def load_checkpoint(self, database_api):
file = database_api.load_binary(name='checkpoint')
@ -349,7 +425,7 @@ class WorkerPool:
if 'version' not in checkpoint.keys():
sessions_cache_list = checkpoint['cache']
reload_default_time = datetime.now().timestamp()
self.project_filter_class.non_valid_sessions_cache = {sessId: reload_default_time for sessId, value in
self.project_filter_class.non_valid_sessions_cache = {int(sessId): reload_default_time for sessId, value in
sessions_cache_list.items() if not value[1]}
self.project_filter_class.sessions_lifespan.session_project = checkpoint['cached_sessions']
elif checkpoint['version'] == 'v1.0':