286 lines
9.8 KiB
Python
286 lines
9.8 KiB
Python
import logging
|
|
import threading
|
|
import time
|
|
import asyncio
|
|
from functools import wraps
|
|
from queue import Queue, Empty
|
|
from typing import Dict, Any, Optional
|
|
|
|
import clickhouse_connect
|
|
from clickhouse_connect.driver.query import QueryContext
|
|
from decouple import config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_CH_CONFIG = {"host": config("ch_host"),
|
|
"user": config("ch_user", default="default"),
|
|
"password": config("ch_password", default=""),
|
|
"port": config("ch_port_http", cast=int),
|
|
"client_name": config("APP_NAME", default="PY")}
|
|
CH_CONFIG = dict(_CH_CONFIG)
|
|
|
|
settings = {}
|
|
if config('ch_timeout', cast=int, default=-1) > 0:
|
|
logging.info(f"CH-max_execution_time set to {config('ch_timeout')}s")
|
|
settings = {**settings, "max_execution_time": config('ch_timeout', cast=int)}
|
|
|
|
if config('ch_receive_timeout', cast=int, default=-1) > 0:
|
|
logging.info(f"CH-receive_timeout set to {config('ch_receive_timeout')}s")
|
|
settings = {**settings, "receive_timeout": config('ch_receive_timeout', cast=int)}
|
|
|
|
extra_args = {}
|
|
if config("CH_COMPRESSION", cast=bool, default=True):
|
|
extra_args["compression"] = "lz4"
|
|
|
|
|
|
def transform_result(self, original_function):
|
|
@wraps(original_function)
|
|
def wrapper(*args, **kwargs):
|
|
logger.debug(str.encode(self.format(query=kwargs.get("query", ""), parameters=kwargs.get("parameters"))))
|
|
result = original_function(*args, **kwargs)
|
|
if isinstance(result, clickhouse_connect.driver.query.QueryResult):
|
|
column_names = result.column_names
|
|
result = result.result_rows
|
|
result = [dict(zip(column_names, row)) for row in result]
|
|
|
|
return result
|
|
|
|
return wrapper
|
|
|
|
|
|
class ClickHouseConnectionPool:
|
|
def __init__(self, min_size, max_size):
|
|
self.min_size = min_size
|
|
self.max_size = max_size
|
|
self.pool = Queue()
|
|
self.lock = threading.Lock()
|
|
self.total_connections = 0
|
|
|
|
# Initialize the pool with min_size connections
|
|
for _ in range(self.min_size):
|
|
client = clickhouse_connect.get_client(**CH_CONFIG,
|
|
database=config("ch_database", default="default"),
|
|
settings=settings,
|
|
**extra_args)
|
|
self.pool.put(client)
|
|
self.total_connections += 1
|
|
|
|
def get_connection(self):
|
|
try:
|
|
# Try to get a connection without blocking
|
|
client = self.pool.get_nowait()
|
|
return client
|
|
except Empty:
|
|
with self.lock:
|
|
if self.total_connections < self.max_size:
|
|
client = clickhouse_connect.get_client(**CH_CONFIG,
|
|
database=config("ch_database", default="default"),
|
|
settings=settings,
|
|
**extra_args)
|
|
self.total_connections += 1
|
|
return client
|
|
# If max_size reached, wait until a connection is available
|
|
client = self.pool.get()
|
|
return client
|
|
|
|
def release_connection(self, client):
|
|
self.pool.put(client)
|
|
|
|
def close_all(self):
|
|
with self.lock:
|
|
while not self.pool.empty():
|
|
client = self.pool.get()
|
|
client.close()
|
|
self.total_connections = 0
|
|
|
|
|
|
CH_pool: ClickHouseConnectionPool = None
|
|
|
|
RETRY_MAX = config("CH_RETRY_MAX", cast=int, default=50)
|
|
RETRY_INTERVAL = config("CH_RETRY_INTERVAL", cast=int, default=2)
|
|
RETRY = 0
|
|
|
|
|
|
def make_pool():
|
|
if not config('CH_POOL', cast=bool, default=True):
|
|
return
|
|
global CH_pool
|
|
global RETRY
|
|
if CH_pool is not None:
|
|
try:
|
|
CH_pool.close_all()
|
|
except Exception as error:
|
|
logger.error("Error while closing all connexions to CH", exc_info=error)
|
|
try:
|
|
CH_pool = ClickHouseConnectionPool(min_size=config("CH_MINCONN", cast=int, default=4),
|
|
max_size=config("CH_MAXCONN", cast=int, default=8))
|
|
if CH_pool is not None:
|
|
logger.info("Connection pool created successfully for CH")
|
|
except ConnectionError as error:
|
|
logger.error("Error while connecting to CH", exc_info=error)
|
|
if RETRY < RETRY_MAX:
|
|
RETRY += 1
|
|
logger.info(f"waiting for {RETRY_INTERVAL}s before retry n°{RETRY}")
|
|
time.sleep(RETRY_INTERVAL)
|
|
make_pool()
|
|
else:
|
|
raise error
|
|
|
|
|
|
class ClickHouseClient:
|
|
def __init__(self, database=None):
|
|
self.__client = None
|
|
if database is not None or not config('CH_POOL', cast=bool, default=True):
|
|
self.__client = clickhouse_connect.get_client(**CH_CONFIG,
|
|
database=database if database else config("ch_database",
|
|
default="default"),
|
|
settings=settings,
|
|
**extra_args)
|
|
|
|
else:
|
|
self.__client = CH_pool.get_connection()
|
|
|
|
self.__client.execute = transform_result(self, self.__client.query)
|
|
self.__client.format = self.format
|
|
|
|
def __enter__(self):
|
|
return self.__client
|
|
|
|
def format(self, query, *, parameters=None):
|
|
if parameters is None:
|
|
return query
|
|
return query % {
|
|
key: f"'{value}'" if isinstance(value, str) else value
|
|
for key, value in parameters.items()
|
|
}
|
|
|
|
async def health_check(self) -> Dict[str, Any]:
|
|
"""
|
|
Check if the connection to ClickHouse is working.
|
|
|
|
Returns:
|
|
Dict with status information:
|
|
{
|
|
"status": "ok" | "error",
|
|
"message": str,
|
|
"ping_time_ms": float (if available)
|
|
}
|
|
"""
|
|
try:
|
|
start_time = asyncio.get_event_loop().time()
|
|
|
|
# Run the query in a thread pool to avoid blocking the event loop
|
|
loop = asyncio.get_event_loop()
|
|
await loop.run_in_executor(None, lambda: self.__client.query("SELECT 1"))
|
|
|
|
end_time = asyncio.get_event_loop().time()
|
|
ping_time_ms = (end_time - start_time) * 1000
|
|
|
|
return {
|
|
"status": "ok",
|
|
"message": "ClickHouse connection is healthy",
|
|
"ping_time_ms": round(ping_time_ms, 2)
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"ClickHouse health check failed: {str(e)}")
|
|
return {
|
|
"status": "error",
|
|
"message": f"Failed to connect to ClickHouse: {str(e)}",
|
|
}
|
|
|
|
@classmethod
|
|
async def health_check(cls) -> Dict[str, Any]:
|
|
"""
|
|
Class method to check if the ClickHouse connection is working.
|
|
Can be called directly on the class: await ClickHouseClient.health_check()
|
|
|
|
Returns:
|
|
Dict with status information
|
|
"""
|
|
try:
|
|
# Create a temporary client just for the health check
|
|
client = cls()
|
|
|
|
start_time = asyncio.get_event_loop().time()
|
|
|
|
# Run the query in a thread pool
|
|
loop = asyncio.get_event_loop()
|
|
await loop.run_in_executor(None, lambda: client.__client.query("SELECT 1"))
|
|
|
|
end_time = asyncio.get_event_loop().time()
|
|
ping_time_ms = (end_time - start_time) * 1000
|
|
|
|
result = {
|
|
"status": "ok",
|
|
"message": "ClickHouse connection is healthy",
|
|
"ping_time_ms": round(ping_time_ms, 2)
|
|
}
|
|
|
|
# Clean up the connection properly
|
|
if config('CH_POOL', cast=bool, default=True):
|
|
CH_pool.release_connection(client.__client)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"ClickHouse health check failed: {str(e)}")
|
|
return {
|
|
"status": "error",
|
|
"message": f"Failed to connect to ClickHouse: {str(e)}",
|
|
}
|
|
|
|
def __exit__(self, *args):
|
|
if config('CH_POOL', cast=bool, default=True):
|
|
CH_pool.release_connection(self.__client)
|
|
else:
|
|
self.__client.close()
|
|
|
|
|
|
# Add the get_client function at module level
|
|
def get_client(database=None) -> ClickHouseClient:
|
|
"""
|
|
Get a ClickHouse client instance.
|
|
|
|
Args:
|
|
database: Optional database name to override the default
|
|
|
|
Returns:
|
|
ClickHouseClient instance
|
|
"""
|
|
return ClickHouseClient(database=database)
|
|
|
|
|
|
async def init():
|
|
logger.info(f">use CH_POOL:{config('CH_POOL', default=True)}")
|
|
if config('CH_POOL', cast=bool, default=True):
|
|
make_pool()
|
|
|
|
# Do a health check at initialization to verify connection
|
|
try:
|
|
health_status = await ClickHouseClient.health_check()
|
|
if health_status["status"] == "ok":
|
|
logger.info(f"ClickHouse connection verified. Ping: {health_status.get('ping_time_ms', 'N/A')}ms")
|
|
else:
|
|
logger.warning(f"ClickHouse connection check failed: {health_status['message']}")
|
|
except Exception as e:
|
|
logger.error(f"Error during initialization health check: {str(e)}")
|
|
|
|
|
|
async def terminate():
|
|
global CH_pool
|
|
if CH_pool is not None:
|
|
try:
|
|
CH_pool.close_all()
|
|
logger.info("Closed all connexions to CH")
|
|
except Exception as error:
|
|
logger.error("Error while closing all connexions to CH", exc_info=error)
|
|
|
|
|
|
async def health_check() -> Dict[str, Any]:
|
|
"""
|
|
Public health check function that can be used by the application.
|
|
|
|
Returns:
|
|
Health status dict
|
|
"""
|
|
return await ClickHouseClient.health_check()
|