From f52552b4f2c9a4192c6f87cdd2382b3c95eb2440 Mon Sep 17 00:00:00 2001 From: Shekar Siri Date: Mon, 10 Mar 2025 09:42:43 +0100 Subject: [PATCH] feat(api): health check improvements and other startup changes --- api/app.py | 203 +++++++++-------- api/boot/config.py | 78 +++++++ api/boot/health/router.py | 82 +++++++ api/boot/middleware/__init__.py | 8 + api/boot/middleware/cors.py | 15 ++ api/boot/middleware/timing.py | 42 ++++ api/boot/scheduler/manager.py | 71 ++++++ api/chalicelib/utils/ch_client.py | 299 ++++++++++++++++++++++---- api/chalicelib/utils/ch_client_exp.py | 103 +++++++++ api/chalicelib/utils/pg_client.py | 115 ++++++++++ 10 files changed, 871 insertions(+), 145 deletions(-) create mode 100644 api/boot/config.py create mode 100644 api/boot/health/router.py create mode 100644 api/boot/middleware/__init__.py create mode 100644 api/boot/middleware/cors.py create mode 100644 api/boot/middleware/timing.py create mode 100644 api/boot/scheduler/manager.py diff --git a/api/app.py b/api/app.py index d7e5215a5..2c90b37bb 100644 --- a/api/app.py +++ b/api/app.py @@ -1,134 +1,133 @@ import logging -import time from contextlib import asynccontextmanager -import psycopg_pool -from apscheduler.schedulers.asyncio import AsyncIOScheduler -from decouple import config -from fastapi import FastAPI, Request -from fastapi.middleware.cors import CORSMiddleware +from fastapi import FastAPI from fastapi.middleware.gzip import GZipMiddleware -from psycopg import AsyncConnection -from psycopg.rows import dict_row -from starlette.responses import StreamingResponse -from chalicelib.utils import helper +from boot.config import settings +from boot.middleware import timing_middleware, setup_cors +from boot.scheduler.manager import SchedulerManager +from boot.health.router import health_router from chalicelib.utils import pg_client, ch_client + +# Import routers and cron jobs from crons import core_crons, core_dynamic_crons from routers import core, core_dynamic from routers.subs import insights, metrics, v1_api, health, usability_tests, spot, product_anaytics -loglevel = config("LOGLEVEL", default=logging.WARNING) -print(f">Loglevel set to: {loglevel}") -logging.basicConfig(level=loglevel) - - -class ORPYAsyncConnection(AsyncConnection): - - def __init__(self, *args, **kwargs): - super().__init__(*args, row_factory=dict_row, **kwargs) +# Configure logging +settings.configure_logging() +logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): - # Startup - logging.info(">>>>> starting up <<<<<") - ap_logger = logging.getLogger('apscheduler') - ap_logger.setLevel(loglevel) + """ + Application lifespan context manager. - app.schedule = AsyncIOScheduler() + Handles startup and shutdown tasks: + - Database connections initialization + - Scheduler setup + - Cleanup on shutdown + """ + # Startup + logger.info(">>>>> starting up <<<<<") + + # Initialize database connections await pg_client.init() await ch_client.init() - app.schedule.start() - for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs: - app.schedule.add_job(id=job["func"].__name__, **job) + # Initialize scheduler with jobs + all_jobs = core_crons.cron_jobs + core_dynamic_crons.cron_jobs + SchedulerManager.initialize(all_jobs) - ap_logger.info(">Scheduled jobs:") - for job in app.schedule.get_jobs(): - ap_logger.info({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)}) + # Store PostgreSQL pool in app state for backwards compatibility + # app.state.postgresql = pg_client. - database = { - "host": config("pg_host", default="localhost"), - "dbname": config("pg_dbname", default="orpy"), - "user": config("pg_user", default="orpy"), - "password": config("pg_password", default="orpy"), - "port": config("pg_port", cast=int, default=5432), - "application_name": "AIO" + config("APP_NAME", default="PY"), - } - - database = psycopg_pool.AsyncConnectionPool(kwargs=database, connection_class=ORPYAsyncConnection, - min_size=config("PG_AIO_MINCONN", cast=int, default=1), - max_size=config("PG_AIO_MAXCONN", cast=int, default=5), ) - app.state.postgresql = database - - # App listening + # App is ready to handle requests yield # Shutdown - await database.close() - logging.info(">>>>> shutting down <<<<<") - app.schedule.shutdown(wait=False) + logger.info(">>>>> shutting down <<<<<") + + # Shutdown scheduler + SchedulerManager.shutdown() + + # Close database connections await pg_client.terminate() + await ch_client.terminate() -app = FastAPI(root_path=config("root_path", default="/api"), docs_url=config("docs_url", default=""), - redoc_url=config("redoc_url", default=""), lifespan=lifespan) -app.add_middleware(GZipMiddleware, minimum_size=1000) +def create_app() -> FastAPI: + """Create and configure the FastAPI application.""" + # Create FastAPI app + app = FastAPI( + root_path=settings.ROOT_PATH, + docs_url=settings.DOCS_URL, + redoc_url=settings.REDOC_URL, + lifespan=lifespan, + title=f"{settings.APP_NAME} API", + description=f"API for {settings.APP_NAME}", + version="1.0.0" + ) + + # Add middlewares + app.add_middleware(GZipMiddleware, minimum_size=1000) + app.middleware('http')(timing_middleware) + setup_cors(app) + + # Register health check routes first for monitoring systems + app.include_router(health_router) + + # Register existing application routers + register_routers(app) + + return app -@app.middleware('http') -async def or_middleware(request: Request, call_next): - if helper.TRACK_TIME: - now = time.time() - try: - response: StreamingResponse = await call_next(request) - except: - logging.error(f"{request.method}: {request.url.path} FAILED!") - raise - if response.status_code // 100 != 2: - logging.warning(f"{request.method}:{request.url.path} {response.status_code}!") - if helper.TRACK_TIME: - now = time.time() - now - if now > 2: - now = round(now, 2) - logging.warning(f"Execution time: {now} s for {request.method}: {request.url.path}") - response.headers["x-robots-tag"] = 'noindex, nofollow' - return response +def register_routers(app: FastAPI) -> None: + """Register all application routers.""" + # Core routers + app.include_router(core.public_app) + app.include_router(core.app) + app.include_router(core.app_apikey) + + # Core dynamic routers + app.include_router(core_dynamic.public_app) + app.include_router(core_dynamic.app) + app.include_router(core_dynamic.app_apikey) + + # Feature routers + app.include_router(metrics.app) + app.include_router(insights.app) + app.include_router(v1_api.app_apikey) + + # Health routers (existing ones from the codebase) + app.include_router(health.public_app) + app.include_router(health.app) + app.include_router(health.app_apikey) + + # Usability tests routers + app.include_router(usability_tests.public_app) + app.include_router(usability_tests.app) + app.include_router(usability_tests.app_apikey) + + # Spot routers + app.include_router(spot.public_app) + app.include_router(spot.app) + app.include_router(spot.app_apikey) + + # Product analytics routers + app.include_router(product_anaytics.public_app) + app.include_router(product_anaytics.app) + app.include_router(product_anaytics.app_apikey) -origins = [ - "*", -] +# Create application instance +app = create_app() -app.add_middleware( - CORSMiddleware, - allow_origins=origins, - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) -app.include_router(core.public_app) -app.include_router(core.app) -app.include_router(core.app_apikey) -app.include_router(core_dynamic.public_app) -app.include_router(core_dynamic.app) -app.include_router(core_dynamic.app_apikey) -app.include_router(metrics.app) -app.include_router(insights.app) -app.include_router(v1_api.app_apikey) -app.include_router(health.public_app) -app.include_router(health.app) -app.include_router(health.app_apikey) +# For running with a proper ASGI server like uvicorn +if __name__ == "__main__": + import uvicorn -app.include_router(usability_tests.public_app) -app.include_router(usability_tests.app) -app.include_router(usability_tests.app_apikey) - -app.include_router(spot.public_app) -app.include_router(spot.app) -app.include_router(spot.app_apikey) - -app.include_router(product_anaytics.public_app) -app.include_router(product_anaytics.app) -app.include_router(product_anaytics.app_apikey) + uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True) diff --git a/api/boot/config.py b/api/boot/config.py new file mode 100644 index 000000000..6d49aa0b8 --- /dev/null +++ b/api/boot/config.py @@ -0,0 +1,78 @@ +import logging +from typing import Dict, Any, List, Optional +from decouple import config as decouple_config + + +class Settings: + """Centralized application settings.""" + + # Application settings + APP_NAME: str = decouple_config("APP_NAME", default="OpenReplay") + ROOT_PATH: str = decouple_config("root_path", default="/api") + DOCS_URL: str = decouple_config("docs_url", default="") + REDOC_URL: str = decouple_config("redoc_url", default="") + TRACK_TIME: bool = decouple_config("TRACK_TIME", default=False, cast=bool) + + # Logging - get raw string value without casting + _LOG_LEVEL_STR: str = decouple_config("LOGLEVEL", default="WARNING") + LOG_FORMAT: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + + # PostgreSQL settings + PG_HOST: str = decouple_config("pg_host", default="localhost") + PG_DBNAME: str = decouple_config("pg_dbname", default="orpy") + PG_USER: str = decouple_config("pg_user", default="orpy") + PG_PASSWORD: str = decouple_config("pg_password", default="orpy") + PG_PORT: int = decouple_config("pg_port", cast=int, default=5432) + PG_AIO_MINCONN: int = decouple_config("PG_AIO_MINCONN", cast=int, default=1) + PG_AIO_MAXCONN: int = decouple_config("PG_AIO_MAXCONN", cast=int, default=5) + + # CORS settings + CORS_ORIGINS: List[str] = ["*"] + CORS_ALLOW_CREDENTIALS: bool = True + CORS_ALLOW_METHODS: List[str] = ["*"] + CORS_ALLOW_HEADERS: List[str] = ["*"] + + @property + def LOG_LEVEL(self) -> int: + """Convert string log level to an integer constant.""" + level_map = { + "CRITICAL": logging.CRITICAL, + "FATAL": logging.FATAL, + "ERROR": logging.ERROR, + "WARNING": logging.WARNING, + "WARN": logging.WARN, + "INFO": logging.INFO, + "DEBUG": logging.DEBUG, + "NOTSET": logging.NOTSET, + } + + # If it's a digit string, convert to int + if self._LOG_LEVEL_STR.isdigit(): + return int(self._LOG_LEVEL_STR) + + # If it's a known level name, return the corresponding value + return level_map.get(self._LOG_LEVEL_STR.upper(), logging.WARNING) + + @property + def pg_dsn(self) -> Dict[str, Any]: + """Return PostgreSQL connection parameters as a dictionary.""" + return { + "host": self.PG_HOST, + "dbname": self.PG_DBNAME, + "user": self.PG_USER, + "password": self.PG_PASSWORD, + "port": self.PG_PORT, + "application_name": f"AIO{self.APP_NAME}", + } + + def configure_logging(self) -> None: + """Configure application logging.""" + logging.basicConfig( + level=self.LOG_LEVEL, + format=self.LOG_FORMAT + ) + logging.info(f">Loglevel set to: {self._LOG_LEVEL_STR} ({self.LOG_LEVEL})") + + +# Create a singleton instance +settings = Settings() diff --git a/api/boot/health/router.py b/api/boot/health/router.py new file mode 100644 index 000000000..1925e53fa --- /dev/null +++ b/api/boot/health/router.py @@ -0,0 +1,82 @@ +import logging +import time +import psutil +from typing import Dict, Any + +from fastapi import APIRouter, status +from pydantic import BaseModel + +from boot.scheduler.manager import SchedulerManager +from chalicelib.utils import pg_client, ch_client + +logger = logging.getLogger(__name__) + +# Create router for health endpoints +health_router = APIRouter(prefix="/health", tags=["Health"]) + + +class HealthResponse(BaseModel): + """Health check response model.""" + status: str + details: Dict[str, Any] + + +@health_router.get("/liveness", response_model=HealthResponse, status_code=status.HTTP_200_OK) +async def liveness(): + """ + Liveness probe endpoint. + + This endpoint indicates if the application is running. + It's designed to be quick and does not check external dependencies. + """ + return HealthResponse( + status="alive", + details={ + "uptime_seconds": time.time() - psutil.boot_time(), + "process": { + "cpu_percent": psutil.cpu_percent(), + "memory_percent": psutil.virtual_memory().percent + } + } + ) + + +@health_router.get("/readiness", response_model=HealthResponse) +async def readiness(): + """ + Readiness probe endpoint. + + This endpoint indicates if the application is ready to handle requests. + It checks connections to dependent services. + """ + # Check PostgreSQL connection + pg_healthy = await pg_client.PostgresClient.health_check() + + # Check ClickHouse connection + ch_healthy = await ch_client.ClickHouseClient.health_check() + + # Check scheduler + scheduler_healthy = SchedulerManager.health_check() + + # Overall status + all_healthy = pg_healthy and ch_healthy and scheduler_healthy + status_code = status.HTTP_200_OK if all_healthy else status.HTTP_503_SERVICE_UNAVAILABLE + + response = HealthResponse( + status="ready" if all_healthy else "not ready", + details={ + "services": { + "postgres": "healthy" if pg_healthy else "unhealthy", + "clickhouse": "healthy" if ch_healthy else "unhealthy", + "scheduler": "healthy" if scheduler_healthy else "unhealthy" + }, + "system": { + "disk_usage_percent": psutil.disk_usage('/').percent, + "memory_available_mb": psutil.virtual_memory().available / (1024 * 1024), + "cpu_count": psutil.cpu_count(), + "cpu_percent": psutil.cpu_percent() + } + } + ) + + return response diff --git a/api/boot/middleware/__init__.py b/api/boot/middleware/__init__.py new file mode 100644 index 000000000..ebab0f0fd --- /dev/null +++ b/api/boot/middleware/__init__.py @@ -0,0 +1,8 @@ +""" +Middleware components for the FastAPI application. +""" + +from .timing import timing_middleware +from .cors import setup_cors + +__all__ = ["timing_middleware", "setup_cors"] diff --git a/api/boot/middleware/cors.py b/api/boot/middleware/cors.py new file mode 100644 index 000000000..367f8db97 --- /dev/null +++ b/api/boot/middleware/cors.py @@ -0,0 +1,15 @@ +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware + +from boot.config import settings + + +def setup_cors(app: FastAPI) -> None: + """Configure CORS middleware for the application.""" + app.add_middleware( + CORSMiddleware, + allow_origins=settings.CORS_ORIGINS, + allow_credentials=settings.CORS_ALLOW_CREDENTIALS, + allow_methods=settings.CORS_ALLOW_METHODS, + allow_headers=settings.CORS_ALLOW_HEADERS, + ) diff --git a/api/boot/middleware/timing.py b/api/boot/middleware/timing.py new file mode 100644 index 000000000..9def188c3 --- /dev/null +++ b/api/boot/middleware/timing.py @@ -0,0 +1,42 @@ +import logging +import time +from fastapi import Request +from starlette.responses import StreamingResponse + +from boot.config import settings +from chalicelib.utils import helper + +logger = logging.getLogger(__name__) + + +async def timing_middleware(request: Request, call_next): + """ + Middleware to track request timing and log slow requests. + Also adds security headers to responses. + """ + if helper.TRACK_TIME: + start_time = time.time() + + try: + response: StreamingResponse = await call_next(request) + except Exception as e: + path = request.url.path + method = request.method + logger.error(f"{method}: {path} FAILED! Error: {str(e)}") + raise + + # Log non-successful responses + if response.status_code // 100 != 2: + logger.warning(f"{request.method}:{request.url.path} {response.status_code}!") + + # Track execution time + if helper.TRACK_TIME: + elapsed = time.time() - start_time + if elapsed > 2: + elapsed = round(elapsed, 2) + logger.warning(f"Execution time: {elapsed} s for {request.method}: {request.url.path}") + + # Set security headers + response.headers["x-robots-tag"] = 'noindex, nofollow' + + return response diff --git a/api/boot/scheduler/manager.py b/api/boot/scheduler/manager.py new file mode 100644 index 000000000..d707e0476 --- /dev/null +++ b/api/boot/scheduler/manager.py @@ -0,0 +1,71 @@ +import logging +from typing import List, Dict, Any + +from apscheduler.schedulers.asyncio import AsyncIOScheduler + +logger = logging.getLogger(__name__) + + +class SchedulerManager: + """Scheduler manager for cron jobs.""" + + _scheduler: AsyncIOScheduler = None + + @classmethod + def initialize(cls, jobs: List[Dict[str, Any]]) -> None: + """Initialize the scheduler with jobs.""" + if cls._scheduler is not None: + logger.warning("Scheduler already initialized") + return + + # Configure APScheduler logger + ap_logger = logging.getLogger('apscheduler') + ap_logger.setLevel(logging.getLogger().level) + + # Create scheduler + cls._scheduler = AsyncIOScheduler() + + # Add all jobs to the scheduler + for job in jobs: + cls._scheduler.add_job(id=job["func"].__name__, **job) + + # Log scheduled jobs + ap_logger.info(">Scheduled jobs:") + for job in cls._scheduler.get_jobs(): + # Get the job information safely without depending on next_run_time + job_info = { + "Name": str(job.id), + "Run Frequency": str(job.trigger), + } + + # Try to get next run time if available, otherwise skip it + try: + if hasattr(job, "next_run_time"): + job_info["Next Run"] = str(job.next_run_time) + elif hasattr(job, "_get_run_times"): + # Try to get the next run time using _get_run_times + run_times = job._get_run_times(1) + if run_times: + job_info["Next Run"] = str(run_times[0]) + except Exception as e: + job_info["Next Run"] = "Unknown" + logger.debug(f"Could not determine next run time for job {job.id}: {e}") + + ap_logger.info(job_info) + + # Start the scheduler + cls._scheduler.start() + logger.info("Scheduler started") + + @classmethod + def shutdown(cls) -> None: + """Shutdown the scheduler.""" + if cls._scheduler is not None: + cls._scheduler.shutdown(wait=False) + cls._scheduler = None + logger.info("Scheduler shutdown") + + @classmethod + def health_check(cls) -> bool: + """Check scheduler health.""" + return cls._scheduler is not None and cls._scheduler.running diff --git a/api/chalicelib/utils/ch_client.py b/api/chalicelib/utils/ch_client.py index 5fbaa5752..79b43c641 100644 --- a/api/chalicelib/utils/ch_client.py +++ b/api/chalicelib/utils/ch_client.py @@ -1,73 +1,286 @@ import logging +import threading +import time +import asyncio +from functools import wraps +from queue import Queue, Empty +from typing import Dict, Any, Optional -import clickhouse_driver +import clickhouse_connect +from clickhouse_connect.driver.query import QueryContext from decouple import config logger = logging.getLogger(__name__) +_CH_CONFIG = {"host": config("ch_host"), + "user": config("ch_user", default="default"), + "password": config("ch_password", default=""), + "port": config("ch_port_http", cast=int), + "client_name": config("APP_NAME", default="PY")} +CH_CONFIG = dict(_CH_CONFIG) + settings = {} if config('ch_timeout', cast=int, default=-1) > 0: - logger.info(f"CH-max_execution_time set to {config('ch_timeout')}s") + logging.info(f"CH-max_execution_time set to {config('ch_timeout')}s") settings = {**settings, "max_execution_time": config('ch_timeout', cast=int)} if config('ch_receive_timeout', cast=int, default=-1) > 0: - logger.info(f"CH-receive_timeout set to {config('ch_receive_timeout')}s") + logging.info(f"CH-receive_timeout set to {config('ch_receive_timeout')}s") settings = {**settings, "receive_timeout": config('ch_receive_timeout', cast=int)} +extra_args = {} +if config("CH_COMPRESSION", cast=bool, default=True): + extra_args["compression"] = "lz4" + + +def transform_result(self, original_function): + @wraps(original_function) + def wrapper(*args, **kwargs): + logger.debug(str.encode(self.format(query=kwargs.get("query", ""), parameters=kwargs.get("parameters")))) + result = original_function(*args, **kwargs) + if isinstance(result, clickhouse_connect.driver.query.QueryResult): + column_names = result.column_names + result = result.result_rows + result = [dict(zip(column_names, row)) for row in result] + + return result + + return wrapper + + +class ClickHouseConnectionPool: + def __init__(self, min_size, max_size): + self.min_size = min_size + self.max_size = max_size + self.pool = Queue() + self.lock = threading.Lock() + self.total_connections = 0 + + # Initialize the pool with min_size connections + for _ in range(self.min_size): + client = clickhouse_connect.get_client(**CH_CONFIG, + database=config("ch_database", default="default"), + settings=settings, + **extra_args) + self.pool.put(client) + self.total_connections += 1 + + def get_connection(self): + try: + # Try to get a connection without blocking + client = self.pool.get_nowait() + return client + except Empty: + with self.lock: + if self.total_connections < self.max_size: + client = clickhouse_connect.get_client(**CH_CONFIG, + database=config("ch_database", default="default"), + settings=settings, + **extra_args) + self.total_connections += 1 + return client + # If max_size reached, wait until a connection is available + client = self.pool.get() + return client + + def release_connection(self, client): + self.pool.put(client) + + def close_all(self): + with self.lock: + while not self.pool.empty(): + client = self.pool.get() + client.close() + self.total_connections = 0 + + +CH_pool: ClickHouseConnectionPool = None + +RETRY_MAX = config("CH_RETRY_MAX", cast=int, default=50) +RETRY_INTERVAL = config("CH_RETRY_INTERVAL", cast=int, default=2) +RETRY = 0 + + +def make_pool(): + if not config('CH_POOL', cast=bool, default=True): + return + global CH_pool + global RETRY + if CH_pool is not None: + try: + CH_pool.close_all() + except Exception as error: + logger.error("Error while closing all connexions to CH", exc_info=error) + try: + CH_pool = ClickHouseConnectionPool(min_size=config("CH_MINCONN", cast=int, default=4), + max_size=config("CH_MAXCONN", cast=int, default=8)) + if CH_pool is not None: + logger.info("Connection pool created successfully for CH") + except ConnectionError as error: + logger.error("Error while connecting to CH", exc_info=error) + if RETRY < RETRY_MAX: + RETRY += 1 + logger.info(f"waiting for {RETRY_INTERVAL}s before retry n°{RETRY}") + time.sleep(RETRY_INTERVAL) + make_pool() + else: + raise error + class ClickHouseClient: - __client = None - def __init__(self, database=None): - extra_args = {} - if config("CH_COMPRESSION", cast=bool, default=True): - extra_args["compression"] = "lz4" - self.__client = clickhouse_driver.Client(host=config("ch_host"), - database=database if database else config("ch_database", - default="default"), - user=config("ch_user", default="default"), - password=config("ch_password", default=""), - port=config("ch_port", cast=int), - settings=settings, - **extra_args) \ - if self.__client is None else self.__client + self.__client = None + if database is not None or not config('CH_POOL', cast=bool, default=True): + self.__client = clickhouse_connect.get_client(**CH_CONFIG, + database=database if database else config("ch_database", + default="default"), + settings=settings, + **extra_args) + + else: + self.__client = CH_pool.get_connection() + + self.__client.execute = transform_result(self, self.__client.query) + self.__client.format = self.format def __enter__(self): - return self - - def execute(self, query, parameters=None, **args): - try: - results = self.__client.execute(query=query, params=parameters, with_column_types=True, **args) - keys = tuple(x for x, y in results[1]) - return [dict(zip(keys, i)) for i in results[0]] - except Exception as err: - logger.error("--------- CH EXCEPTION -----------", exc_info=err) - logger.error("--------- CH QUERY EXCEPTION -----------") - logger.error(self.format(query=query, parameters=parameters) - .replace('\n', '\\n') - .replace(' ', ' ') - .replace(' ', ' ')) - logger.error("--------------------") - raise err - - def insert(self, query, params=None, **args): - return self.__client.execute(query=query, params=params, **args) - - def client(self): return self.__client - def format(self, query, parameters): + def format(self, query, *, parameters=None): if parameters is None: return query - return self.__client.substitute_params(query, parameters, self.__client.connection.context) + return query % { + key: f"'{value}'" if isinstance(value, str) else value + for key, value in parameters.items() + } + + async def health_check(self) -> Dict[str, Any]: + """ + Check if the connection to ClickHouse is working. + + Returns: + Dict with status information: + { + "status": "ok" | "error", + "message": str, + "ping_time_ms": float (if available) + } + """ + try: + start_time = asyncio.get_event_loop().time() + + # Run the query in a thread pool to avoid blocking the event loop + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, lambda: self.__client.query("SELECT 1")) + + end_time = asyncio.get_event_loop().time() + ping_time_ms = (end_time - start_time) * 1000 + + return { + "status": "ok", + "message": "ClickHouse connection is healthy", + "ping_time_ms": round(ping_time_ms, 2) + } + except Exception as e: + logger.error(f"ClickHouse health check failed: {str(e)}") + return { + "status": "error", + "message": f"Failed to connect to ClickHouse: {str(e)}", + } + + @classmethod + async def health_check(cls) -> Dict[str, Any]: + """ + Class method to check if the ClickHouse connection is working. + Can be called directly on the class: await ClickHouseClient.health_check() + + Returns: + Dict with status information + """ + try: + # Create a temporary client just for the health check + client = cls() + + start_time = asyncio.get_event_loop().time() + + # Run the query in a thread pool + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, lambda: client.__client.query("SELECT 1")) + + end_time = asyncio.get_event_loop().time() + ping_time_ms = (end_time - start_time) * 1000 + + result = { + "status": "ok", + "message": "ClickHouse connection is healthy", + "ping_time_ms": round(ping_time_ms, 2) + } + + # Clean up the connection properly + if config('CH_POOL', cast=bool, default=True): + CH_pool.release_connection(client.__client) + + return result + + except Exception as e: + logger.error(f"ClickHouse health check failed: {str(e)}") + return { + "status": "error", + "message": f"Failed to connect to ClickHouse: {str(e)}", + } def __exit__(self, *args): - pass + if config('CH_POOL', cast=bool, default=True): + CH_pool.release_connection(self.__client) + else: + self.__client.close() + + +# Add the get_client function at module level +def get_client(database=None) -> ClickHouseClient: + """ + Get a ClickHouse client instance. + + Args: + database: Optional database name to override the default + + Returns: + ClickHouseClient instance + """ + return ClickHouseClient(database=database) async def init(): - logger.info(f">CH_POOL:not defined") + logger.info(f">use CH_POOL:{config('CH_POOL', default=True)}") + if config('CH_POOL', cast=bool, default=True): + make_pool() + + # Do a health check at initialization to verify connection + try: + health_status = await ClickHouseClient.health_check() + if health_status["status"] == "ok": + logger.info(f"ClickHouse connection verified. Ping: {health_status.get('ping_time_ms', 'N/A')}ms") + else: + logger.warning(f"ClickHouse connection check failed: {health_status['message']}") + except Exception as e: + logger.error(f"Error during initialization health check: {str(e)}") async def terminate(): - pass + global CH_pool + if CH_pool is not None: + try: + CH_pool.close_all() + logger.info("Closed all connexions to CH") + except Exception as error: + logger.error("Error while closing all connexions to CH", exc_info=error) + + +async def health_check() -> Dict[str, Any]: + """ + Public health check function that can be used by the application. + + Returns: + Health status dict + """ + return await ClickHouseClient.health_check() diff --git a/api/chalicelib/utils/ch_client_exp.py b/api/chalicelib/utils/ch_client_exp.py index 36f6b3e7b..47d66a216 100644 --- a/api/chalicelib/utils/ch_client_exp.py +++ b/api/chalicelib/utils/ch_client_exp.py @@ -1,8 +1,10 @@ import logging import threading import time +import asyncio from functools import wraps from queue import Queue, Empty +from typing import Dict, Any, Optional import clickhouse_connect from clickhouse_connect.driver.query import QueryContext @@ -154,6 +156,73 @@ class ClickHouseClient: for key, value in parameters.items() } + async def health_check(self) -> Dict[str, Any]: + """ + Check if the connection to ClickHouse is working. + + Returns: + Dict with status information: + { + "status": "ok" | "error", + "message": str, + "ping_time_ms": float (if available) + } + """ + try: + start_time = asyncio.get_event_loop().time() + + # Run the query in a thread pool to avoid blocking the event loop + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, lambda: self.__client.query("SELECT 1")) + + end_time = asyncio.get_event_loop().time() + ping_time_ms = (end_time - start_time) * 1000 + + return { + "status": "ok", + "message": "ClickHouse connection is healthy", + "ping_time_ms": round(ping_time_ms, 2) + } + except Exception as e: + logger.error(f"ClickHouse health check failed: {str(e)}") + return { + "status": "error", + "message": f"Failed to connect to ClickHouse: {str(e)}", + } + + @classmethod + async def health_check(cls) -> Dict[str, Any]: + """ + Class method to check if the ClickHouse connection is working. + Can be called directly on the class: await ClickHouseClient.health_check() + + Returns: + Dict with status information + """ + client = get_client() + try: + start_time = asyncio.get_event_loop().time() + + # Run the query in a thread pool to avoid blocking the event loop + loop = asyncio.get_event_loop() + client_instance = client.__client + await loop.run_in_executor(None, lambda: client_instance.query("SELECT 1")) + + end_time = asyncio.get_event_loop().time() + ping_time_ms = (end_time - start_time) * 1000 + + return { + "status": "ok", + "message": "ClickHouse connection is healthy", + "ping_time_ms": round(ping_time_ms, 2) + } + except Exception as e: + logger.error(f"ClickHouse health check failed: {str(e)}") + return { + "status": "error", + "message": f"Failed to connect to ClickHouse: {str(e)}", + } + def __exit__(self, *args): if config('CH_POOL', cast=bool, default=True): CH_pool.release_connection(self.__client) @@ -161,11 +230,35 @@ class ClickHouseClient: self.__client.close() +# Add the get_client function at module level +def get_client(database=None) -> ClickHouseClient: + """ + Get a ClickHouse client instance. + + Args: + database: Optional database name to override the default + + Returns: + ClickHouseClient instance + """ + return ClickHouseClient(database=database) + + async def init(): logger.info(f">use CH_POOL:{config('CH_POOL', default=True)}") if config('CH_POOL', cast=bool, default=True): make_pool() + # Do a health check at initialization to verify connection + try: + health_status = await ClickHouseClient.health_check() + if health_status["status"] == "ok": + logger.info(f"ClickHouse connection verified. Ping: {health_status.get('ping_time_ms', 'N/A')}ms") + else: + logger.warning(f"ClickHouse connection check failed: {health_status['message']}") + except Exception as e: + logger.error(f"Error during initialization health check: {str(e)}") + async def terminate(): global CH_pool @@ -175,3 +268,13 @@ async def terminate(): logger.info("Closed all connexions to CH") except Exception as error: logger.error("Error while closing all connexions to CH", exc_info=error) + + +async def health_check() -> Dict[str, Any]: + """ + Public health check function that can be used by the application. + + Returns: + Health status dict + """ + return await ClickHouseClient.health_check() diff --git a/api/chalicelib/utils/pg_client.py b/api/chalicelib/utils/pg_client.py index 0f7d498b1..25cd8d905 100644 --- a/api/chalicelib/utils/pg_client.py +++ b/api/chalicelib/utils/pg_client.py @@ -1,6 +1,8 @@ import logging import time +import asyncio from threading import Semaphore +from typing import Dict, Any, Optional import psycopg2 import psycopg2.extras @@ -173,12 +175,115 @@ class PostgresClient: self.cursor = None return self.__enter__() + async def health_check(self) -> Dict[str, Any]: + """ + Instance method to check DB connection health + """ + try: + start_time = asyncio.get_event_loop().time() + + # Run the query in a thread pool to avoid blocking the event loop + loop = asyncio.get_event_loop() + + def check_db(): + cursor = self.connection.cursor() + cursor.execute("SELECT 1") + cursor.close() + return True + + await loop.run_in_executor(None, check_db) + + end_time = asyncio.get_event_loop().time() + ping_time_ms = (end_time - start_time) * 1000 + + return { + "status": "ok", + "message": "PostgreSQL connection is healthy", + "ping_time_ms": round(ping_time_ms, 2) + } + except Exception as e: + logger.error(f"PostgreSQL health check failed: {e}") + return { + "status": "error", + "message": f"Failed to connect to PostgreSQL: {str(e)}" + } + + @classmethod + async def health_check(cls) -> Dict[str, Any]: + """ + Class method to check if PostgreSQL connection works. + Can be called directly on the class: await PostgresClient.health_check() + """ + try: + # Create a temporary client for the health check + client = cls() + + start_time = asyncio.get_event_loop().time() + + # Run the query in a thread pool + loop = asyncio.get_event_loop() + + def check_db(): + cursor = client.connection.cursor() + cursor.execute("SELECT 1") + cursor.close() + return True + + await loop.run_in_executor(None, check_db) + + end_time = asyncio.get_event_loop().time() + ping_time_ms = (end_time - start_time) * 1000 + + # Properly clean up the connection + if not client.use_pool or client.long_query or client.unlimited_query: + client.connection.close() + else: + postgreSQL_pool.putconn(client.connection) + + return { + "status": "ok", + "message": "PostgreSQL connection is healthy", + "ping_time_ms": round(ping_time_ms, 2) + } + except Exception as e: + logger.error(f"PostgreSQL health check failed: {e}") + return { + "status": "error", + "message": f"Failed to connect to PostgreSQL: {str(e)}" + } + + +# Add get_client function at module level +def get_client(long_query=False, unlimited_query=False, use_pool=True) -> PostgresClient: + """ + Get a PostgreSQL client instance. + + Args: + long_query: Set True for queries with extended timeout + unlimited_query: Set True for queries with no timeout + use_pool: Whether to use the connection pool + + Returns: + PostgresClient instance + """ + return PostgresClient(long_query=long_query, unlimited_query=unlimited_query, use_pool=use_pool) + async def init(): logger.info(f">use PG_POOL:{config('PG_POOL', default=True)}") if config('PG_POOL', cast=bool, default=True): make_pool() + # Do a health check at initialization + try: + health_status = await PostgresClient.health_check() + if health_status["status"] == "ok": + logger.info(f"PostgreSQL connection verified. Ping: {health_status.get('ping_time_ms', 'N/A')}ms") + else: + logger.warning(f"PostgreSQL connection check failed: {health_status['message']}") + except Exception as e: + logger.error(f"Error during initialization health check: {str(e)}") + async def terminate(): global postgreSQL_pool @@ -188,3 +293,13 @@ async def terminate(): logger.info("Closed all connexions to PostgreSQL") except (Exception, psycopg2.DatabaseError) as error: logger.error("Error while closing all connexions to PostgreSQL", exc_info=error) + + +async def health_check() -> Dict[str, Any]: + """ + Public health check function that can be used by the application. + + Returns: + Health status dict + """ + return await PostgresClient.health_check()