openreplay/ee/connectors/utils/cache.py
Rajesh Rajendran 53081a9822
Improvements (#1387)
* fix for memory cleansing and added setup files for msgcodec build

* Kafka reader now calling only once to PG to verify sessionIds, updated fill missing user_ids script

* fix(connector): small error while reading from pg

---------

Co-authored-by: MauricioGarciaS <47052044+MauricioGarciaS@users.noreply.github.com>
2023-06-30 12:04:23 +02:00

126 lines
4.9 KiB
Python

from utils.pg_client import PostgresClient
from decouple import config
from time import time
import json
def _project_from_session(sessionId):
"""Search projectId of requested sessionId in PG table sessions"""
with PostgresClient() as conn:
conn.execute(
conn.mogrify("SELECT project_id FROM sessions WHERE session_id=%(sessionId)s LIMIT 1",
{'sessionId': sessionId})
)
res = conn.fetchone()
if res is None:
print(f'[WARN] sessionid {sessionId} not found in sessions table')
return None
return res['project_id']
class CachedSessions:
def __init__(self):
"""cached_sessions of open and recently closed sessions with its current status.
env:
MAX_SESSION_LIFE: cache lifespan of session (default 7200 seconds)"""
self.session_project = dict()
self.max_alive_time = config('MAX_SESSION_LIFE', default=7800, cast=int) # Default 2 hours
def create(self, sessionid: int):
"""Saves a new session with status OPEN and set its insertion time"""
_sessionid = str(sessionid)
self.session_project[_sessionid] = (time(), 'OPEN')
def add(self, sessionid: int):
"""Handle the creation of a cached session or update its status if already in cache"""
_sessionid = str(sessionid)
if _sessionid in self.session_project.keys():
if self.session_project[_sessionid][1] == 'CLOSE':
tmp = self.session_project[_sessionid]
self.session_project[_sessionid] = (tmp[0], 'UPDATE')
else:
self.create(sessionid)
def close(self, sessionid: int):
"""Sets status of session to closed session (received sessionend message)"""
_sessionid = str(sessionid)
tmp = self.session_project[_sessionid]
old_status = tmp[1]
self.session_project[_sessionid] = (tmp[0], 'CLOSE')
return old_status
def clear_sessions(self):
"""Delete all sessions that reached max_alive_time"""
to_clean_list = list()
current_time = time()
for sessionid, values in self.session_project.items():
if current_time - values[0] > self.max_alive_time:
to_clean_list.append(int(sessionid))
for sessionid in to_clean_list:
del self.session_project[str(sessionid)]
return to_clean_list
class ProjectFilter:
def __init__(self, filter=list()):
"""Filters all sessions that comes from selected projects. This class reads from PG to find projectId and uses cache to avoid duplicated requests.
env:
max_cache_size: max allowed cache lenght - starts cleanup when oversize
cache_lifespan: max lifetime of cached - if surpased it is deleted in cleanup phase"""
self.filter = filter
self.cache = dict()
self.cached_sessions = CachedSessions()
self.to_clean = list()
self.count_bad = 0
self.max_cache_size = config('max_cache_size', default=50, cast=int)
self.cache_lifespan = config('cache_lifespan', default=900, cast=int)
def is_valid(self, sessionId):
"""Verify if sessionId is from selected project"""
if len(self.filter)==0:
return True
elif sessionId in self.cache.keys():
return self.cache[sessionId][1]
else:
found_project_id = _project_from_session(sessionId)
if found_project_id is None:
self.count_bad += 1
return False
else:
project_is_valid = found_project_id in self.filter
self.cache[sessionId] = [time(), project_is_valid]
return project_is_valid
def cleanup(self):
"""Deletes cache when reached cache_lifespan value"""
current_time = time()
self.cache = {sessionid: values for sessionid, values in self.cache.items() if current_time - values[0] < self.cache_lifespan}
def handle_clean(self):
"""Verifies and execute cleanup if needed"""
if len(self.filter) == 0:
return
elif len(self.cache) > self.max_cache_size:
self.cleanup()
def load_checkpoint(self, db):
file = db.load_binary(name='checkpoint')
checkpoint = json.loads(file.getvalue().decode('utf-8'))
file.close()
self.cache = checkpoint['cache']
self.to_clean = checkpoint['to_clean']
self.cached_sessions.session_project = checkpoint['cached_sessions']
def save_checkpoint(self, db):
checkpoint = {
'cache': self.cache,
'to_clean': self.to_clean,
'cached_sessions': self.cached_sessions.session_project,
}
db.save_binary(binary_data=json.dumps(checkpoint).encode('utf-8'), name='checkpoint')
def terminate(self, db):
# self.save_checkpoint(db)
db.close()