diff --git a/api/chalicelib/utils/ch_client_exp.py b/api/chalicelib/utils/ch_client_exp.py index 3cb461b7d..8bdb4c20b 100644 --- a/api/chalicelib/utils/ch_client_exp.py +++ b/api/chalicelib/utils/ch_client_exp.py @@ -6,9 +6,18 @@ from queue import Queue, Empty import clickhouse_connect from clickhouse_connect.driver.query import QueryContext +from clickhouse_connect.driver.exceptions import DatabaseError from decouple import config logger = logging.getLogger(__name__) + +_CH_CONFIG = {"host": config("ch_host"), + "user": config("ch_user", default="default"), + "password": config("ch_password", default=""), + "port": config("ch_port_http", cast=int), + "client_name": config("APP_NAME", default="PY")} +CH_CONFIG = dict(_CH_CONFIG) + settings = {} if config('ch_timeout', cast=int, default=-1) > 0: logging.info(f"CH-max_execution_time set to {config('ch_timeout')}s") @@ -18,14 +27,6 @@ if config('ch_receive_timeout', cast=int, default=-1) > 0: logging.info(f"CH-receive_timeout set to {config('ch_receive_timeout')}s") settings = {**settings, "receive_timeout": config('ch_receive_timeout', cast=int)} -logger.info("-- CH config --") -logger.info(f'host={config("ch_host")}') -logger.info(f'database={config("ch_database", default="default")}') -logger.info(f'user={config("ch_user", default="default")}') -logger.info(f'password={config("ch_password", default="")}') -logger.info(f'port={config("ch_port_http", cast=int)}') -logger.info(f'settings={settings}') - extra_args = {} if config("CH_COMPRESSION", cast=bool, default=True): extra_args["compression"] = "lz4" @@ -47,21 +48,17 @@ def transform_result(original_function): class ClickHouseConnectionPool: - def __init__(self, min_size, max_size, settings): + def __init__(self, min_size, max_size): self.min_size = min_size self.max_size = max_size self.pool = Queue() self.lock = threading.Lock() self.total_connections = 0 - self.settings = settings # Initialize the pool with min_size connections for _ in range(self.min_size): - client = clickhouse_connect.get_client(host=config("ch_host"), + client = clickhouse_connect.get_client(**CH_CONFIG, database=config("ch_database", default="default"), - user=config("ch_user", default="default"), - password=config("ch_password", default=""), - port=config("ch_port_http", cast=int), settings=settings, **extra_args) self.pool.put(client) @@ -75,15 +72,10 @@ class ClickHouseConnectionPool: except Empty: with self.lock: if self.total_connections < self.max_size: - client = clickhouse_connect.get_client( - host=config("ch_host"), - database=config("ch_database", default="default"), - user=config("ch_user", default="default"), - password=config("ch_password", default=""), - port=config("ch_port_http", cast=int), - settings=settings, - **extra_args - ) + client = clickhouse_connect.get_client(**CH_CONFIG, + database=config("ch_database", default="default"), + settings=settings, + **extra_args) self.total_connections += 1 return client # If max_size reached, wait until a connection is available @@ -119,12 +111,11 @@ def make_pool(): except Exception as error: logger.error("Error while closing all connexions to CH", error) try: - CH_pool = ClickHouseConnectionPool(min_size=config("PG_MINCONN", cast=int, default=4), - max_size=config("PG_MAXCONN", cast=int, default=8), - settings=settings) + CH_pool = ClickHouseConnectionPool(min_size=config("CH_MINCONN", cast=int, default=4), + max_size=config("CH_MAXCONN", cast=int, default=8)) if CH_pool is not None: logger.info("Connection pool created successfully for CH") - except Exception as error: + except ConnectionError as error: logger.error("Error while connecting to CH", error) if RETRY < RETRY_MAX: RETRY += 1 @@ -140,15 +131,12 @@ class ClickHouseClient: def __init__(self, database=None): if self.__client is None: - if config('CH_POOL', cast=bool, default=True): + if database is None and config('CH_POOL', cast=bool, default=True): self.__client = CH_pool.get_connection() else: - self.__client = clickhouse_connect.get_client(host=config("ch_host"), + self.__client = clickhouse_connect.get_client(**CH_CONFIG, database=database if database else config("ch_database", default="default"), - user=config("ch_user", default="default"), - password=config("ch_password", default=""), - port=config("ch_port_http", cast=int), settings=settings, **extra_args) self.__client.execute = transform_result(self.__client.query) @@ -173,7 +161,7 @@ class ClickHouseClient: async def init(): - logger.info(f">CH_POOL:{config('CH_POOL', default=None)}") + logger.info(f">use CH_POOL:{config('CH_POOL', default=True)}") if config('CH_POOL', cast=bool, default=True): make_pool() diff --git a/api/chalicelib/utils/pg_client.py b/api/chalicelib/utils/pg_client.py index b97ab005e..29ea84873 100644 --- a/api/chalicelib/utils/pg_client.py +++ b/api/chalicelib/utils/pg_client.py @@ -166,7 +166,7 @@ class PostgresClient: async def init(): - logger.info(f">PG_POOL:{config('PG_POOL', default=None)}") + logger.info(f">use PG_POOL:{config('PG_POOL', default=True)}") if config('PG_POOL', cast=bool, default=True): make_pool() diff --git a/scripts/helmcharts/vars.yaml b/scripts/helmcharts/vars.yaml index fa74a65fa..d2e89e1ec 100644 --- a/scripts/helmcharts/vars.yaml +++ b/scripts/helmcharts/vars.yaml @@ -29,6 +29,7 @@ clickhouse: &clickhouse password: "" service: webPort: 9000 + dataPort: 8123 # For enterpriseEdition quickwit: &quickwit