* fix for memory cleansing and added setup files for msgcodec build * Kafka reader now calling only once to PG to verify sessionIds, updated fill missing user_ids script * fix(connector): small error while reading from pg --------- Co-authored-by: MauricioGarciaS <47052044+MauricioGarciaS@users.noreply.github.com>
126 lines
4.9 KiB
Python
126 lines
4.9 KiB
Python
from utils.pg_client import PostgresClient
|
|
from decouple import config
|
|
from time import time
|
|
import json
|
|
|
|
|
|
def _project_from_session(sessionId):
|
|
"""Search projectId of requested sessionId in PG table sessions"""
|
|
with PostgresClient() as conn:
|
|
conn.execute(
|
|
conn.mogrify("SELECT project_id FROM sessions WHERE session_id=%(sessionId)s LIMIT 1",
|
|
{'sessionId': sessionId})
|
|
)
|
|
res = conn.fetchone()
|
|
if res is None:
|
|
print(f'[WARN] sessionid {sessionId} not found in sessions table')
|
|
return None
|
|
return res['project_id']
|
|
|
|
|
|
class CachedSessions:
|
|
|
|
def __init__(self):
|
|
"""cached_sessions of open and recently closed sessions with its current status.
|
|
env:
|
|
MAX_SESSION_LIFE: cache lifespan of session (default 7200 seconds)"""
|
|
self.session_project = dict()
|
|
self.max_alive_time = config('MAX_SESSION_LIFE', default=7800, cast=int) # Default 2 hours
|
|
|
|
def create(self, sessionid: int):
|
|
"""Saves a new session with status OPEN and set its insertion time"""
|
|
_sessionid = str(sessionid)
|
|
self.session_project[_sessionid] = (time(), 'OPEN')
|
|
|
|
def add(self, sessionid: int):
|
|
"""Handle the creation of a cached session or update its status if already in cache"""
|
|
_sessionid = str(sessionid)
|
|
if _sessionid in self.session_project.keys():
|
|
if self.session_project[_sessionid][1] == 'CLOSE':
|
|
tmp = self.session_project[_sessionid]
|
|
self.session_project[_sessionid] = (tmp[0], 'UPDATE')
|
|
else:
|
|
self.create(sessionid)
|
|
|
|
def close(self, sessionid: int):
|
|
"""Sets status of session to closed session (received sessionend message)"""
|
|
_sessionid = str(sessionid)
|
|
tmp = self.session_project[_sessionid]
|
|
old_status = tmp[1]
|
|
self.session_project[_sessionid] = (tmp[0], 'CLOSE')
|
|
return old_status
|
|
|
|
def clear_sessions(self):
|
|
"""Delete all sessions that reached max_alive_time"""
|
|
to_clean_list = list()
|
|
current_time = time()
|
|
for sessionid, values in self.session_project.items():
|
|
if current_time - values[0] > self.max_alive_time:
|
|
to_clean_list.append(int(sessionid))
|
|
for sessionid in to_clean_list:
|
|
del self.session_project[str(sessionid)]
|
|
return to_clean_list
|
|
|
|
|
|
class ProjectFilter:
|
|
|
|
def __init__(self, filter=list()):
|
|
"""Filters all sessions that comes from selected projects. This class reads from PG to find projectId and uses cache to avoid duplicated requests.
|
|
env:
|
|
max_cache_size: max allowed cache lenght - starts cleanup when oversize
|
|
cache_lifespan: max lifetime of cached - if surpased it is deleted in cleanup phase"""
|
|
self.filter = filter
|
|
self.cache = dict()
|
|
self.cached_sessions = CachedSessions()
|
|
self.to_clean = list()
|
|
self.count_bad = 0
|
|
self.max_cache_size = config('max_cache_size', default=50, cast=int)
|
|
self.cache_lifespan = config('cache_lifespan', default=900, cast=int)
|
|
|
|
def is_valid(self, sessionId):
|
|
"""Verify if sessionId is from selected project"""
|
|
if len(self.filter)==0:
|
|
return True
|
|
elif sessionId in self.cache.keys():
|
|
return self.cache[sessionId][1]
|
|
else:
|
|
found_project_id = _project_from_session(sessionId)
|
|
if found_project_id is None:
|
|
self.count_bad += 1
|
|
return False
|
|
else:
|
|
project_is_valid = found_project_id in self.filter
|
|
self.cache[sessionId] = [time(), project_is_valid]
|
|
return project_is_valid
|
|
|
|
def cleanup(self):
|
|
"""Deletes cache when reached cache_lifespan value"""
|
|
current_time = time()
|
|
self.cache = {sessionid: values for sessionid, values in self.cache.items() if current_time - values[0] < self.cache_lifespan}
|
|
|
|
def handle_clean(self):
|
|
"""Verifies and execute cleanup if needed"""
|
|
if len(self.filter) == 0:
|
|
return
|
|
elif len(self.cache) > self.max_cache_size:
|
|
self.cleanup()
|
|
|
|
def load_checkpoint(self, db):
|
|
file = db.load_binary(name='checkpoint')
|
|
checkpoint = json.loads(file.getvalue().decode('utf-8'))
|
|
file.close()
|
|
self.cache = checkpoint['cache']
|
|
self.to_clean = checkpoint['to_clean']
|
|
self.cached_sessions.session_project = checkpoint['cached_sessions']
|
|
|
|
def save_checkpoint(self, db):
|
|
checkpoint = {
|
|
'cache': self.cache,
|
|
'to_clean': self.to_clean,
|
|
'cached_sessions': self.cached_sessions.session_project,
|
|
}
|
|
db.save_binary(binary_data=json.dumps(checkpoint).encode('utf-8'), name='checkpoint')
|
|
|
|
def terminate(self, db):
|
|
# self.save_checkpoint(db)
|
|
db.close()
|