openreplay/ee/recommendation/utils/pg_client.py
MauricioGarciaS cea5eda985
feat(recommendations): Added services recommendation (ml_service) and trainer (ml_trainer) (#1275)
* Created two services: recommendation training and recommendation serving

* Deleted Docker temporary

* Added features based in signals information

* Added method to get sessions features using PG

* Added same utils and core elements into ml_trainer

* Added checks before training models, added handler for model serving

* Updated serving API and recommendation functions to use frontend signals features

* reorganized modules to have base image and for both serving and training

* Added Dockerfiles and base Dockerfile

* Solved issue while ordering sessions by relevance

* Added method to save user feedback of recommendations

* Added security authorization

* Updated Dockerfile

* fixed issues with secret insertion to API

* Updated feedback structure

* Added git for dags

* Solved issue of insertion on recommendation feedback

* Changed update method from def to async def and it is called during startup

* Solved issues of airflow running mlflow in dag

* Changes sanity checks and added middleware params

* base path renaming

* Changed update method to a interval method which loads one model each 10s if there are models to download

* Added sql files for recommendation service and trainer

* Cleaned files and added documentation for methods and classes

* Added README file

* Renamed endpoints, changed None into empty array and updated readme

* refactor(recommendation): optimized query

* style(recommendation): changed import to top file, renamed endpoints parameters, function optimization

* refactor(recommendation): .gitignore

* refactor(recommendation): .gitignore

* refactor(recommendation): Optimized Dockerfiles

* refactor(recommendation): changed imports

* refactor(recommendation): optimized requests

* refactor(recommendation): optimized requests

* Fixed boot for fastapi, updated some queries

* Fixed issues while downloading models and while returning json response from API

* limited number of recommendations and set a minimum score to present recommendations

* fix(recommendation): fixed some queries and updated prediction method

* Added env value to control number of predictions to make

* docs(recommendation): Added third party libraries used in recommendation service

* frozen requirements

* Update base_crons.py

added `misfire_grace_time` to recommendation crons

---------

Co-authored-by: Taha Yassine Kraiem <tahayk2@gmail.com>
2023-06-07 15:58:33 +02:00

205 lines
7.6 KiB
Python

import logging
import time
from threading import Semaphore
import psycopg2
import psycopg2.extras
from decouple import config
from psycopg2 import pool
logging.basicConfig(level=config("LOGLEVEL", default=logging.INFO))
logging.getLogger('apscheduler').setLevel(config("LOGLEVEL", default=logging.INFO))
conn_str = config('string_connection', default='')
if conn_str == '':
_PG_CONFIG = {"host": config("pg_host"),
"database": config("pg_dbname"),
"user": config("pg_user"),
"password": config("pg_password"),
"port": config("pg_port", cast=int),
"application_name": config("APP_NAME", default="PY")}
else:
import urllib.parse
conn_str = urllib.parse.unquote(conn_str)
usr_info, host_info = conn_str.split('@')
i = usr_info.find('://')
pg_user, pg_password = usr_info[i+3:].split(':')
host_info, pg_dbname = host_info.split('/')
i = host_info.find(':')
if i == -1:
pg_host = host_info
pg_port = 5432
else:
pg_host, pg_port = host_info.split(':')
pg_port = int(pg_port)
_PG_CONFIG = {"host": pg_host,
"database": pg_dbname,
"user": pg_user,
"password": pg_password,
"port": pg_port,
"application_name": config("APP_NAME", default="PY")}
PG_CONFIG = dict(_PG_CONFIG)
if config("PG_TIMEOUT", cast=int, default=0) > 0:
PG_CONFIG["options"] = f"-c statement_timeout={config('PG_TIMEOUT', cast=int) * 1000}"
class ORThreadedConnectionPool(psycopg2.pool.ThreadedConnectionPool):
def __init__(self, minconn, maxconn, *args, **kwargs):
self._semaphore = Semaphore(maxconn)
super().__init__(minconn, maxconn, *args, **kwargs)
def getconn(self, *args, **kwargs):
self._semaphore.acquire()
try:
return super().getconn(*args, **kwargs)
except psycopg2.pool.PoolError as e:
if str(e) == "connection pool is closed":
make_pool()
raise e
def putconn(self, *args, **kwargs):
try:
super().putconn(*args, **kwargs)
self._semaphore.release()
except psycopg2.pool.PoolError as e:
if str(e) == "trying to put unkeyed connection":
print("!!! trying to put unkeyed connection")
print(f"env-PG_POOL:{config('PG_POOL', default=None)}")
return
raise e
postgreSQL_pool: ORThreadedConnectionPool = None
RETRY_MAX = config("PG_RETRY_MAX", cast=int, default=50)
RETRY_INTERVAL = config("PG_RETRY_INTERVAL", cast=int, default=2)
RETRY = 0
def make_pool():
if not config('PG_POOL', cast=bool, default=True):
return
global postgreSQL_pool
global RETRY
if postgreSQL_pool is not None:
try:
postgreSQL_pool.closeall()
except (Exception, psycopg2.DatabaseError) as error:
logging.error("Error while closing all connexions to PostgreSQL", error)
try:
postgreSQL_pool = ORThreadedConnectionPool(config("PG_MINCONN", cast=int, default=20),
config("PG_MAXCONN", cast=int, default=80),
**PG_CONFIG)
if (postgreSQL_pool):
logging.info("Connection pool created successfully")
except (Exception, psycopg2.DatabaseError) as error:
logging.error("Error while connecting to PostgreSQL", error)
if RETRY < RETRY_MAX:
RETRY += 1
logging.info(f"waiting for {RETRY_INTERVAL}s before retry n°{RETRY}")
time.sleep(RETRY_INTERVAL)
make_pool()
else:
raise error
class PostgresClient:
connection = None
cursor = None
long_query = False
unlimited_query = False
def __init__(self, long_query=False, unlimited_query=False, use_pool=True):
self.long_query = long_query
self.unlimited_query = unlimited_query
self.use_pool = use_pool
if unlimited_query:
long_config = dict(_PG_CONFIG)
long_config["application_name"] += "-UNLIMITED"
self.connection = psycopg2.connect(**long_config)
elif long_query:
long_config = dict(_PG_CONFIG)
long_config["application_name"] += "-LONG"
long_config["options"] = f"-c statement_timeout=" \
f"{config('pg_long_timeout', cast=int, default=5 * 60) * 1000}"
self.connection = psycopg2.connect(**long_config)
elif not use_pool or not config('PG_POOL', cast=bool, default=True):
single_config = dict(_PG_CONFIG)
single_config["application_name"] += "-NOPOOL"
single_config["options"] = f"-c statement_timeout={config('PG_TIMEOUT', cast=int, default=30) * 1000}"
self.connection = psycopg2.connect(**single_config)
else:
self.connection = postgreSQL_pool.getconn()
def __enter__(self):
if self.cursor is None:
self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
self.cursor.cursor_execute = self.cursor.execute
self.cursor.execute = self.__execute
self.cursor.recreate = self.recreate_cursor
return self.cursor
def __exit__(self, *args):
try:
self.connection.commit()
self.cursor.close()
if not self.use_pool or self.long_query or self.unlimited_query:
self.connection.close()
except Exception as error:
logging.error("Error while committing/closing PG-connection", error)
if str(error) == "connection already closed" \
and self.use_pool \
and not self.long_query \
and not self.unlimited_query \
and config('PG_POOL', cast=bool, default=True):
logging.info("Recreating the connexion pool")
make_pool()
else:
raise error
finally:
if config('PG_POOL', cast=bool, default=True) \
and self.use_pool \
and not self.long_query \
and not self.unlimited_query:
postgreSQL_pool.putconn(self.connection)
def __execute(self, query, vars=None):
try:
result = self.cursor.cursor_execute(query=query, vars=vars)
except psycopg2.Error as error:
logging.error(f"!!! Error of type:{type(error)} while executing query:")
logging.error(query)
logging.info("starting rollback to allow future execution")
self.connection.rollback()
raise error
return result
def recreate_cursor(self, rollback=False):
if rollback:
try:
self.connection.rollback()
except Exception as error:
logging.error("Error while rollbacking connection for recreation", error)
try:
self.cursor.close()
except Exception as error:
logging.error("Error while closing cursor for recreation", error)
self.cursor = None
return self.__enter__()
async def init():
logging.info(f">PG_POOL:{config('PG_POOL', default=None)}")
if config('PG_POOL', cast=bool, default=True):
make_pool()
async def terminate():
global postgreSQL_pool
if postgreSQL_pool is not None:
try:
postgreSQL_pool.closeall()
logging.info("Closed all connexions to PostgreSQL")
except (Exception, psycopg2.DatabaseError) as error:
logging.error("Error while closing all connexions to PostgreSQL", error)