* Created two services: recommendation training and recommendation serving * Deleted Docker temporary * Added features based in signals information * Added method to get sessions features using PG * Added same utils and core elements into ml_trainer * Added checks before training models, added handler for model serving * Updated serving API and recommendation functions to use frontend signals features * reorganized modules to have base image and for both serving and training * Added Dockerfiles and base Dockerfile * Solved issue while ordering sessions by relevance * Added method to save user feedback of recommendations * Added security authorization * Updated Dockerfile * fixed issues with secret insertion to API * Updated feedback structure * Added git for dags * Solved issue of insertion on recommendation feedback * Changed update method from def to async def and it is called during startup * Solved issues of airflow running mlflow in dag * Changes sanity checks and added middleware params * base path renaming * Changed update method to a interval method which loads one model each 10s if there are models to download * Added sql files for recommendation service and trainer * Cleaned files and added documentation for methods and classes * Added README file * Renamed endpoints, changed None into empty array and updated readme * refactor(recommendation): optimized query * style(recommendation): changed import to top file, renamed endpoints parameters, function optimization * refactor(recommendation): .gitignore * refactor(recommendation): .gitignore * refactor(recommendation): Optimized Dockerfiles * refactor(recommendation): changed imports * refactor(recommendation): optimized requests * refactor(recommendation): optimized requests * Fixed boot for fastapi, updated some queries * Fixed issues while downloading models and while returning json response from API * limited number of recommendations and set a minimum score to present recommendations * fix(recommendation): fixed some queries and updated prediction method * Added env value to control number of predictions to make * docs(recommendation): Added third party libraries used in recommendation service * frozen requirements * Update base_crons.py added `misfire_grace_time` to recommendation crons --------- Co-authored-by: Taha Yassine Kraiem <tahayk2@gmail.com>
54 lines
1.9 KiB
Python
54 lines
1.9 KiB
Python
import logging
|
|
|
|
import clickhouse_driver
|
|
from decouple import config
|
|
|
|
logging.basicConfig(level=config("LOGLEVEL", default=logging.INFO))
|
|
|
|
settings = {}
|
|
if config('ch_timeout', cast=int, default=-1) > 0:
|
|
logging.info(f"CH-max_execution_time set to {config('ch_timeout')}s")
|
|
settings = {**settings, "max_execution_time": config('ch_timeout', cast=int)}
|
|
|
|
if config('ch_receive_timeout', cast=int, default=-1) > 0:
|
|
logging.info(f"CH-receive_timeout set to {config('ch_receive_timeout')}s")
|
|
settings = {**settings, "receive_timeout": config('ch_receive_timeout', cast=int)}
|
|
|
|
|
|
class ClickHouseClient:
|
|
__client = None
|
|
|
|
def __init__(self):
|
|
self.__client = clickhouse_driver.Client(host=config("ch_host"),
|
|
database="default",
|
|
port=config("ch_port", cast=int),
|
|
settings=settings) \
|
|
if self.__client is None else self.__client
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def execute(self, query, params=None, **args):
|
|
try:
|
|
results = self.__client.execute(query=query, params=params, with_column_types=True, **args)
|
|
keys = tuple(x for x, y in results[1])
|
|
return [dict(zip(keys, i)) for i in results[0]]
|
|
except Exception as err:
|
|
logging.error("--------- CH QUERY EXCEPTION -----------")
|
|
logging.error(self.format(query=query, params=params))
|
|
logging.error("--------------------")
|
|
raise err
|
|
|
|
def insert(self, query, params=None, **args):
|
|
return self.__client.execute(query=query, params=params, **args)
|
|
|
|
def client(self):
|
|
return self.__client
|
|
|
|
def format(self, query, params):
|
|
if params is None:
|
|
return query
|
|
return self.__client.substitute_params(query, params, self.__client.connection.context)
|
|
|
|
def __exit__(self, *args):
|
|
pass
|