Compare commits

...
Sign in to create a new pull request.

1 commit

Author SHA1 Message Date
Shekar Siri
f52552b4f2 feat(api): health check improvements and other startup changes 2025-03-10 09:42:43 +01:00
10 changed files with 871 additions and 145 deletions

View file

@ -1,134 +1,133 @@
import logging
import time
from contextlib import asynccontextmanager
import psycopg_pool
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from decouple import config
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI
from fastapi.middleware.gzip import GZipMiddleware
from psycopg import AsyncConnection
from psycopg.rows import dict_row
from starlette.responses import StreamingResponse
from chalicelib.utils import helper
from boot.config import settings
from boot.middleware import timing_middleware, setup_cors
from boot.scheduler.manager import SchedulerManager
from boot.health.router import health_router
from chalicelib.utils import pg_client, ch_client
# Import routers and cron jobs
from crons import core_crons, core_dynamic_crons
from routers import core, core_dynamic
from routers.subs import insights, metrics, v1_api, health, usability_tests, spot, product_anaytics
loglevel = config("LOGLEVEL", default=logging.WARNING)
print(f">Loglevel set to: {loglevel}")
logging.basicConfig(level=loglevel)
class ORPYAsyncConnection(AsyncConnection):
def __init__(self, *args, **kwargs):
super().__init__(*args, row_factory=dict_row, **kwargs)
# Configure logging
settings.configure_logging()
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
logging.info(">>>>> starting up <<<<<")
ap_logger = logging.getLogger('apscheduler')
ap_logger.setLevel(loglevel)
"""
Application lifespan context manager.
app.schedule = AsyncIOScheduler()
Handles startup and shutdown tasks:
- Database connections initialization
- Scheduler setup
- Cleanup on shutdown
"""
# Startup
logger.info(">>>>> starting up <<<<<")
# Initialize database connections
await pg_client.init()
await ch_client.init()
app.schedule.start()
for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs:
app.schedule.add_job(id=job["func"].__name__, **job)
# Initialize scheduler with jobs
all_jobs = core_crons.cron_jobs + core_dynamic_crons.cron_jobs
SchedulerManager.initialize(all_jobs)
ap_logger.info(">Scheduled jobs:")
for job in app.schedule.get_jobs():
ap_logger.info({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)})
# Store PostgreSQL pool in app state for backwards compatibility
# app.state.postgresql = pg_client.
database = {
"host": config("pg_host", default="localhost"),
"dbname": config("pg_dbname", default="orpy"),
"user": config("pg_user", default="orpy"),
"password": config("pg_password", default="orpy"),
"port": config("pg_port", cast=int, default=5432),
"application_name": "AIO" + config("APP_NAME", default="PY"),
}
database = psycopg_pool.AsyncConnectionPool(kwargs=database, connection_class=ORPYAsyncConnection,
min_size=config("PG_AIO_MINCONN", cast=int, default=1),
max_size=config("PG_AIO_MAXCONN", cast=int, default=5), )
app.state.postgresql = database
# App listening
# App is ready to handle requests
yield
# Shutdown
await database.close()
logging.info(">>>>> shutting down <<<<<")
app.schedule.shutdown(wait=False)
logger.info(">>>>> shutting down <<<<<")
# Shutdown scheduler
SchedulerManager.shutdown()
# Close database connections
await pg_client.terminate()
await ch_client.terminate()
app = FastAPI(root_path=config("root_path", default="/api"), docs_url=config("docs_url", default=""),
redoc_url=config("redoc_url", default=""), lifespan=lifespan)
app.add_middleware(GZipMiddleware, minimum_size=1000)
def create_app() -> FastAPI:
"""Create and configure the FastAPI application."""
# Create FastAPI app
app = FastAPI(
root_path=settings.ROOT_PATH,
docs_url=settings.DOCS_URL,
redoc_url=settings.REDOC_URL,
lifespan=lifespan,
title=f"{settings.APP_NAME} API",
description=f"API for {settings.APP_NAME}",
version="1.0.0"
)
# Add middlewares
app.add_middleware(GZipMiddleware, minimum_size=1000)
app.middleware('http')(timing_middleware)
setup_cors(app)
# Register health check routes first for monitoring systems
app.include_router(health_router)
# Register existing application routers
register_routers(app)
return app
@app.middleware('http')
async def or_middleware(request: Request, call_next):
if helper.TRACK_TIME:
now = time.time()
try:
response: StreamingResponse = await call_next(request)
except:
logging.error(f"{request.method}: {request.url.path} FAILED!")
raise
if response.status_code // 100 != 2:
logging.warning(f"{request.method}:{request.url.path} {response.status_code}!")
if helper.TRACK_TIME:
now = time.time() - now
if now > 2:
now = round(now, 2)
logging.warning(f"Execution time: {now} s for {request.method}: {request.url.path}")
response.headers["x-robots-tag"] = 'noindex, nofollow'
return response
def register_routers(app: FastAPI) -> None:
"""Register all application routers."""
# Core routers
app.include_router(core.public_app)
app.include_router(core.app)
app.include_router(core.app_apikey)
# Core dynamic routers
app.include_router(core_dynamic.public_app)
app.include_router(core_dynamic.app)
app.include_router(core_dynamic.app_apikey)
# Feature routers
app.include_router(metrics.app)
app.include_router(insights.app)
app.include_router(v1_api.app_apikey)
# Health routers (existing ones from the codebase)
app.include_router(health.public_app)
app.include_router(health.app)
app.include_router(health.app_apikey)
# Usability tests routers
app.include_router(usability_tests.public_app)
app.include_router(usability_tests.app)
app.include_router(usability_tests.app_apikey)
# Spot routers
app.include_router(spot.public_app)
app.include_router(spot.app)
app.include_router(spot.app_apikey)
# Product analytics routers
app.include_router(product_anaytics.public_app)
app.include_router(product_anaytics.app)
app.include_router(product_anaytics.app_apikey)
origins = [
"*",
]
# Create application instance
app = create_app()
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(core.public_app)
app.include_router(core.app)
app.include_router(core.app_apikey)
app.include_router(core_dynamic.public_app)
app.include_router(core_dynamic.app)
app.include_router(core_dynamic.app_apikey)
app.include_router(metrics.app)
app.include_router(insights.app)
app.include_router(v1_api.app_apikey)
app.include_router(health.public_app)
app.include_router(health.app)
app.include_router(health.app_apikey)
# For running with a proper ASGI server like uvicorn
if __name__ == "__main__":
import uvicorn
app.include_router(usability_tests.public_app)
app.include_router(usability_tests.app)
app.include_router(usability_tests.app_apikey)
app.include_router(spot.public_app)
app.include_router(spot.app)
app.include_router(spot.app_apikey)
app.include_router(product_anaytics.public_app)
app.include_router(product_anaytics.app)
app.include_router(product_anaytics.app_apikey)
uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)

78
api/boot/config.py Normal file
View file

@ -0,0 +1,78 @@
import logging
from typing import Dict, Any, List, Optional
from decouple import config as decouple_config
class Settings:
"""Centralized application settings."""
# Application settings
APP_NAME: str = decouple_config("APP_NAME", default="OpenReplay")
ROOT_PATH: str = decouple_config("root_path", default="/api")
DOCS_URL: str = decouple_config("docs_url", default="")
REDOC_URL: str = decouple_config("redoc_url", default="")
TRACK_TIME: bool = decouple_config("TRACK_TIME", default=False, cast=bool)
# Logging - get raw string value without casting
_LOG_LEVEL_STR: str = decouple_config("LOGLEVEL", default="WARNING")
LOG_FORMAT: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
# PostgreSQL settings
PG_HOST: str = decouple_config("pg_host", default="localhost")
PG_DBNAME: str = decouple_config("pg_dbname", default="orpy")
PG_USER: str = decouple_config("pg_user", default="orpy")
PG_PASSWORD: str = decouple_config("pg_password", default="orpy")
PG_PORT: int = decouple_config("pg_port", cast=int, default=5432)
PG_AIO_MINCONN: int = decouple_config("PG_AIO_MINCONN", cast=int, default=1)
PG_AIO_MAXCONN: int = decouple_config("PG_AIO_MAXCONN", cast=int, default=5)
# CORS settings
CORS_ORIGINS: List[str] = ["*"]
CORS_ALLOW_CREDENTIALS: bool = True
CORS_ALLOW_METHODS: List[str] = ["*"]
CORS_ALLOW_HEADERS: List[str] = ["*"]
@property
def LOG_LEVEL(self) -> int:
"""Convert string log level to an integer constant."""
level_map = {
"CRITICAL": logging.CRITICAL,
"FATAL": logging.FATAL,
"ERROR": logging.ERROR,
"WARNING": logging.WARNING,
"WARN": logging.WARN,
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
"NOTSET": logging.NOTSET,
}
# If it's a digit string, convert to int
if self._LOG_LEVEL_STR.isdigit():
return int(self._LOG_LEVEL_STR)
# If it's a known level name, return the corresponding value
return level_map.get(self._LOG_LEVEL_STR.upper(), logging.WARNING)
@property
def pg_dsn(self) -> Dict[str, Any]:
"""Return PostgreSQL connection parameters as a dictionary."""
return {
"host": self.PG_HOST,
"dbname": self.PG_DBNAME,
"user": self.PG_USER,
"password": self.PG_PASSWORD,
"port": self.PG_PORT,
"application_name": f"AIO{self.APP_NAME}",
}
def configure_logging(self) -> None:
"""Configure application logging."""
logging.basicConfig(
level=self.LOG_LEVEL,
format=self.LOG_FORMAT
)
logging.info(f">Loglevel set to: {self._LOG_LEVEL_STR} ({self.LOG_LEVEL})")
# Create a singleton instance
settings = Settings()

82
api/boot/health/router.py Normal file
View file

@ -0,0 +1,82 @@
import logging
import time
import psutil
from typing import Dict, Any
from fastapi import APIRouter, status
from pydantic import BaseModel
from boot.scheduler.manager import SchedulerManager
from chalicelib.utils import pg_client, ch_client
logger = logging.getLogger(__name__)
# Create router for health endpoints
health_router = APIRouter(prefix="/health", tags=["Health"])
class HealthResponse(BaseModel):
"""Health check response model."""
status: str
details: Dict[str, Any]
@health_router.get("/liveness", response_model=HealthResponse, status_code=status.HTTP_200_OK)
async def liveness():
"""
Liveness probe endpoint.
This endpoint indicates if the application is running.
It's designed to be quick and does not check external dependencies.
"""
return HealthResponse(
status="alive",
details={
"uptime_seconds": time.time() - psutil.boot_time(),
"process": {
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent
}
}
)
@health_router.get("/readiness", response_model=HealthResponse)
async def readiness():
"""
Readiness probe endpoint.
This endpoint indicates if the application is ready to handle requests.
It checks connections to dependent services.
"""
# Check PostgreSQL connection
pg_healthy = await pg_client.PostgresClient.health_check()
# Check ClickHouse connection
ch_healthy = await ch_client.ClickHouseClient.health_check()
# Check scheduler
scheduler_healthy = SchedulerManager.health_check()
# Overall status
all_healthy = pg_healthy and ch_healthy and scheduler_healthy
status_code = status.HTTP_200_OK if all_healthy else status.HTTP_503_SERVICE_UNAVAILABLE
response = HealthResponse(
status="ready" if all_healthy else "not ready",
details={
"services": {
"postgres": "healthy" if pg_healthy else "unhealthy",
"clickhouse": "healthy" if ch_healthy else "unhealthy",
"scheduler": "healthy" if scheduler_healthy else "unhealthy"
},
"system": {
"disk_usage_percent": psutil.disk_usage('/').percent,
"memory_available_mb": psutil.virtual_memory().available / (1024 * 1024),
"cpu_count": psutil.cpu_count(),
"cpu_percent": psutil.cpu_percent()
}
}
)
return response

View file

@ -0,0 +1,8 @@
"""
Middleware components for the FastAPI application.
"""
from .timing import timing_middleware
from .cors import setup_cors
__all__ = ["timing_middleware", "setup_cors"]

View file

@ -0,0 +1,15 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from boot.config import settings
def setup_cors(app: FastAPI) -> None:
"""Configure CORS middleware for the application."""
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=settings.CORS_ALLOW_CREDENTIALS,
allow_methods=settings.CORS_ALLOW_METHODS,
allow_headers=settings.CORS_ALLOW_HEADERS,
)

View file

@ -0,0 +1,42 @@
import logging
import time
from fastapi import Request
from starlette.responses import StreamingResponse
from boot.config import settings
from chalicelib.utils import helper
logger = logging.getLogger(__name__)
async def timing_middleware(request: Request, call_next):
"""
Middleware to track request timing and log slow requests.
Also adds security headers to responses.
"""
if helper.TRACK_TIME:
start_time = time.time()
try:
response: StreamingResponse = await call_next(request)
except Exception as e:
path = request.url.path
method = request.method
logger.error(f"{method}: {path} FAILED! Error: {str(e)}")
raise
# Log non-successful responses
if response.status_code // 100 != 2:
logger.warning(f"{request.method}:{request.url.path} {response.status_code}!")
# Track execution time
if helper.TRACK_TIME:
elapsed = time.time() - start_time
if elapsed > 2:
elapsed = round(elapsed, 2)
logger.warning(f"Execution time: {elapsed} s for {request.method}: {request.url.path}")
# Set security headers
response.headers["x-robots-tag"] = 'noindex, nofollow'
return response

View file

@ -0,0 +1,71 @@
import logging
from typing import List, Dict, Any
from apscheduler.schedulers.asyncio import AsyncIOScheduler
logger = logging.getLogger(__name__)
class SchedulerManager:
"""Scheduler manager for cron jobs."""
_scheduler: AsyncIOScheduler = None
@classmethod
def initialize(cls, jobs: List[Dict[str, Any]]) -> None:
"""Initialize the scheduler with jobs."""
if cls._scheduler is not None:
logger.warning("Scheduler already initialized")
return
# Configure APScheduler logger
ap_logger = logging.getLogger('apscheduler')
ap_logger.setLevel(logging.getLogger().level)
# Create scheduler
cls._scheduler = AsyncIOScheduler()
# Add all jobs to the scheduler
for job in jobs:
cls._scheduler.add_job(id=job["func"].__name__, **job)
# Log scheduled jobs
ap_logger.info(">Scheduled jobs:")
for job in cls._scheduler.get_jobs():
# Get the job information safely without depending on next_run_time
job_info = {
"Name": str(job.id),
"Run Frequency": str(job.trigger),
}
# Try to get next run time if available, otherwise skip it
try:
if hasattr(job, "next_run_time"):
job_info["Next Run"] = str(job.next_run_time)
elif hasattr(job, "_get_run_times"):
# Try to get the next run time using _get_run_times
run_times = job._get_run_times(1)
if run_times:
job_info["Next Run"] = str(run_times[0])
except Exception as e:
job_info["Next Run"] = "Unknown"
logger.debug(f"Could not determine next run time for job {job.id}: {e}")
ap_logger.info(job_info)
# Start the scheduler
cls._scheduler.start()
logger.info("Scheduler started")
@classmethod
def shutdown(cls) -> None:
"""Shutdown the scheduler."""
if cls._scheduler is not None:
cls._scheduler.shutdown(wait=False)
cls._scheduler = None
logger.info("Scheduler shutdown")
@classmethod
def health_check(cls) -> bool:
"""Check scheduler health."""
return cls._scheduler is not None and cls._scheduler.running

View file

@ -1,73 +1,286 @@
import logging
import threading
import time
import asyncio
from functools import wraps
from queue import Queue, Empty
from typing import Dict, Any, Optional
import clickhouse_driver
import clickhouse_connect
from clickhouse_connect.driver.query import QueryContext
from decouple import config
logger = logging.getLogger(__name__)
_CH_CONFIG = {"host": config("ch_host"),
"user": config("ch_user", default="default"),
"password": config("ch_password", default=""),
"port": config("ch_port_http", cast=int),
"client_name": config("APP_NAME", default="PY")}
CH_CONFIG = dict(_CH_CONFIG)
settings = {}
if config('ch_timeout', cast=int, default=-1) > 0:
logger.info(f"CH-max_execution_time set to {config('ch_timeout')}s")
logging.info(f"CH-max_execution_time set to {config('ch_timeout')}s")
settings = {**settings, "max_execution_time": config('ch_timeout', cast=int)}
if config('ch_receive_timeout', cast=int, default=-1) > 0:
logger.info(f"CH-receive_timeout set to {config('ch_receive_timeout')}s")
logging.info(f"CH-receive_timeout set to {config('ch_receive_timeout')}s")
settings = {**settings, "receive_timeout": config('ch_receive_timeout', cast=int)}
extra_args = {}
if config("CH_COMPRESSION", cast=bool, default=True):
extra_args["compression"] = "lz4"
def transform_result(self, original_function):
@wraps(original_function)
def wrapper(*args, **kwargs):
logger.debug(str.encode(self.format(query=kwargs.get("query", ""), parameters=kwargs.get("parameters"))))
result = original_function(*args, **kwargs)
if isinstance(result, clickhouse_connect.driver.query.QueryResult):
column_names = result.column_names
result = result.result_rows
result = [dict(zip(column_names, row)) for row in result]
return result
return wrapper
class ClickHouseConnectionPool:
def __init__(self, min_size, max_size):
self.min_size = min_size
self.max_size = max_size
self.pool = Queue()
self.lock = threading.Lock()
self.total_connections = 0
# Initialize the pool with min_size connections
for _ in range(self.min_size):
client = clickhouse_connect.get_client(**CH_CONFIG,
database=config("ch_database", default="default"),
settings=settings,
**extra_args)
self.pool.put(client)
self.total_connections += 1
def get_connection(self):
try:
# Try to get a connection without blocking
client = self.pool.get_nowait()
return client
except Empty:
with self.lock:
if self.total_connections < self.max_size:
client = clickhouse_connect.get_client(**CH_CONFIG,
database=config("ch_database", default="default"),
settings=settings,
**extra_args)
self.total_connections += 1
return client
# If max_size reached, wait until a connection is available
client = self.pool.get()
return client
def release_connection(self, client):
self.pool.put(client)
def close_all(self):
with self.lock:
while not self.pool.empty():
client = self.pool.get()
client.close()
self.total_connections = 0
CH_pool: ClickHouseConnectionPool = None
RETRY_MAX = config("CH_RETRY_MAX", cast=int, default=50)
RETRY_INTERVAL = config("CH_RETRY_INTERVAL", cast=int, default=2)
RETRY = 0
def make_pool():
if not config('CH_POOL', cast=bool, default=True):
return
global CH_pool
global RETRY
if CH_pool is not None:
try:
CH_pool.close_all()
except Exception as error:
logger.error("Error while closing all connexions to CH", exc_info=error)
try:
CH_pool = ClickHouseConnectionPool(min_size=config("CH_MINCONN", cast=int, default=4),
max_size=config("CH_MAXCONN", cast=int, default=8))
if CH_pool is not None:
logger.info("Connection pool created successfully for CH")
except ConnectionError as error:
logger.error("Error while connecting to CH", exc_info=error)
if RETRY < RETRY_MAX:
RETRY += 1
logger.info(f"waiting for {RETRY_INTERVAL}s before retry n°{RETRY}")
time.sleep(RETRY_INTERVAL)
make_pool()
else:
raise error
class ClickHouseClient:
__client = None
def __init__(self, database=None):
extra_args = {}
if config("CH_COMPRESSION", cast=bool, default=True):
extra_args["compression"] = "lz4"
self.__client = clickhouse_driver.Client(host=config("ch_host"),
database=database if database else config("ch_database",
default="default"),
user=config("ch_user", default="default"),
password=config("ch_password", default=""),
port=config("ch_port", cast=int),
settings=settings,
**extra_args) \
if self.__client is None else self.__client
self.__client = None
if database is not None or not config('CH_POOL', cast=bool, default=True):
self.__client = clickhouse_connect.get_client(**CH_CONFIG,
database=database if database else config("ch_database",
default="default"),
settings=settings,
**extra_args)
else:
self.__client = CH_pool.get_connection()
self.__client.execute = transform_result(self, self.__client.query)
self.__client.format = self.format
def __enter__(self):
return self
def execute(self, query, parameters=None, **args):
try:
results = self.__client.execute(query=query, params=parameters, with_column_types=True, **args)
keys = tuple(x for x, y in results[1])
return [dict(zip(keys, i)) for i in results[0]]
except Exception as err:
logger.error("--------- CH EXCEPTION -----------", exc_info=err)
logger.error("--------- CH QUERY EXCEPTION -----------")
logger.error(self.format(query=query, parameters=parameters)
.replace('\n', '\\n')
.replace(' ', ' ')
.replace(' ', ' '))
logger.error("--------------------")
raise err
def insert(self, query, params=None, **args):
return self.__client.execute(query=query, params=params, **args)
def client(self):
return self.__client
def format(self, query, parameters):
def format(self, query, *, parameters=None):
if parameters is None:
return query
return self.__client.substitute_params(query, parameters, self.__client.connection.context)
return query % {
key: f"'{value}'" if isinstance(value, str) else value
for key, value in parameters.items()
}
async def health_check(self) -> Dict[str, Any]:
"""
Check if the connection to ClickHouse is working.
Returns:
Dict with status information:
{
"status": "ok" | "error",
"message": str,
"ping_time_ms": float (if available)
}
"""
try:
start_time = asyncio.get_event_loop().time()
# Run the query in a thread pool to avoid blocking the event loop
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: self.__client.query("SELECT 1"))
end_time = asyncio.get_event_loop().time()
ping_time_ms = (end_time - start_time) * 1000
return {
"status": "ok",
"message": "ClickHouse connection is healthy",
"ping_time_ms": round(ping_time_ms, 2)
}
except Exception as e:
logger.error(f"ClickHouse health check failed: {str(e)}")
return {
"status": "error",
"message": f"Failed to connect to ClickHouse: {str(e)}",
}
@classmethod
async def health_check(cls) -> Dict[str, Any]:
"""
Class method to check if the ClickHouse connection is working.
Can be called directly on the class: await ClickHouseClient.health_check()
Returns:
Dict with status information
"""
try:
# Create a temporary client just for the health check
client = cls()
start_time = asyncio.get_event_loop().time()
# Run the query in a thread pool
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: client.__client.query("SELECT 1"))
end_time = asyncio.get_event_loop().time()
ping_time_ms = (end_time - start_time) * 1000
result = {
"status": "ok",
"message": "ClickHouse connection is healthy",
"ping_time_ms": round(ping_time_ms, 2)
}
# Clean up the connection properly
if config('CH_POOL', cast=bool, default=True):
CH_pool.release_connection(client.__client)
return result
except Exception as e:
logger.error(f"ClickHouse health check failed: {str(e)}")
return {
"status": "error",
"message": f"Failed to connect to ClickHouse: {str(e)}",
}
def __exit__(self, *args):
pass
if config('CH_POOL', cast=bool, default=True):
CH_pool.release_connection(self.__client)
else:
self.__client.close()
# Add the get_client function at module level
def get_client(database=None) -> ClickHouseClient:
"""
Get a ClickHouse client instance.
Args:
database: Optional database name to override the default
Returns:
ClickHouseClient instance
"""
return ClickHouseClient(database=database)
async def init():
logger.info(f">CH_POOL:not defined")
logger.info(f">use CH_POOL:{config('CH_POOL', default=True)}")
if config('CH_POOL', cast=bool, default=True):
make_pool()
# Do a health check at initialization to verify connection
try:
health_status = await ClickHouseClient.health_check()
if health_status["status"] == "ok":
logger.info(f"ClickHouse connection verified. Ping: {health_status.get('ping_time_ms', 'N/A')}ms")
else:
logger.warning(f"ClickHouse connection check failed: {health_status['message']}")
except Exception as e:
logger.error(f"Error during initialization health check: {str(e)}")
async def terminate():
pass
global CH_pool
if CH_pool is not None:
try:
CH_pool.close_all()
logger.info("Closed all connexions to CH")
except Exception as error:
logger.error("Error while closing all connexions to CH", exc_info=error)
async def health_check() -> Dict[str, Any]:
"""
Public health check function that can be used by the application.
Returns:
Health status dict
"""
return await ClickHouseClient.health_check()

View file

@ -1,8 +1,10 @@
import logging
import threading
import time
import asyncio
from functools import wraps
from queue import Queue, Empty
from typing import Dict, Any, Optional
import clickhouse_connect
from clickhouse_connect.driver.query import QueryContext
@ -154,6 +156,73 @@ class ClickHouseClient:
for key, value in parameters.items()
}
async def health_check(self) -> Dict[str, Any]:
"""
Check if the connection to ClickHouse is working.
Returns:
Dict with status information:
{
"status": "ok" | "error",
"message": str,
"ping_time_ms": float (if available)
}
"""
try:
start_time = asyncio.get_event_loop().time()
# Run the query in a thread pool to avoid blocking the event loop
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: self.__client.query("SELECT 1"))
end_time = asyncio.get_event_loop().time()
ping_time_ms = (end_time - start_time) * 1000
return {
"status": "ok",
"message": "ClickHouse connection is healthy",
"ping_time_ms": round(ping_time_ms, 2)
}
except Exception as e:
logger.error(f"ClickHouse health check failed: {str(e)}")
return {
"status": "error",
"message": f"Failed to connect to ClickHouse: {str(e)}",
}
@classmethod
async def health_check(cls) -> Dict[str, Any]:
"""
Class method to check if the ClickHouse connection is working.
Can be called directly on the class: await ClickHouseClient.health_check()
Returns:
Dict with status information
"""
client = get_client()
try:
start_time = asyncio.get_event_loop().time()
# Run the query in a thread pool to avoid blocking the event loop
loop = asyncio.get_event_loop()
client_instance = client.__client
await loop.run_in_executor(None, lambda: client_instance.query("SELECT 1"))
end_time = asyncio.get_event_loop().time()
ping_time_ms = (end_time - start_time) * 1000
return {
"status": "ok",
"message": "ClickHouse connection is healthy",
"ping_time_ms": round(ping_time_ms, 2)
}
except Exception as e:
logger.error(f"ClickHouse health check failed: {str(e)}")
return {
"status": "error",
"message": f"Failed to connect to ClickHouse: {str(e)}",
}
def __exit__(self, *args):
if config('CH_POOL', cast=bool, default=True):
CH_pool.release_connection(self.__client)
@ -161,11 +230,35 @@ class ClickHouseClient:
self.__client.close()
# Add the get_client function at module level
def get_client(database=None) -> ClickHouseClient:
"""
Get a ClickHouse client instance.
Args:
database: Optional database name to override the default
Returns:
ClickHouseClient instance
"""
return ClickHouseClient(database=database)
async def init():
logger.info(f">use CH_POOL:{config('CH_POOL', default=True)}")
if config('CH_POOL', cast=bool, default=True):
make_pool()
# Do a health check at initialization to verify connection
try:
health_status = await ClickHouseClient.health_check()
if health_status["status"] == "ok":
logger.info(f"ClickHouse connection verified. Ping: {health_status.get('ping_time_ms', 'N/A')}ms")
else:
logger.warning(f"ClickHouse connection check failed: {health_status['message']}")
except Exception as e:
logger.error(f"Error during initialization health check: {str(e)}")
async def terminate():
global CH_pool
@ -175,3 +268,13 @@ async def terminate():
logger.info("Closed all connexions to CH")
except Exception as error:
logger.error("Error while closing all connexions to CH", exc_info=error)
async def health_check() -> Dict[str, Any]:
"""
Public health check function that can be used by the application.
Returns:
Health status dict
"""
return await ClickHouseClient.health_check()

View file

@ -1,6 +1,8 @@
import logging
import time
import asyncio
from threading import Semaphore
from typing import Dict, Any, Optional
import psycopg2
import psycopg2.extras
@ -173,12 +175,115 @@ class PostgresClient:
self.cursor = None
return self.__enter__()
async def health_check(self) -> Dict[str, Any]:
"""
Instance method to check DB connection health
"""
try:
start_time = asyncio.get_event_loop().time()
# Run the query in a thread pool to avoid blocking the event loop
loop = asyncio.get_event_loop()
def check_db():
cursor = self.connection.cursor()
cursor.execute("SELECT 1")
cursor.close()
return True
await loop.run_in_executor(None, check_db)
end_time = asyncio.get_event_loop().time()
ping_time_ms = (end_time - start_time) * 1000
return {
"status": "ok",
"message": "PostgreSQL connection is healthy",
"ping_time_ms": round(ping_time_ms, 2)
}
except Exception as e:
logger.error(f"PostgreSQL health check failed: {e}")
return {
"status": "error",
"message": f"Failed to connect to PostgreSQL: {str(e)}"
}
@classmethod
async def health_check(cls) -> Dict[str, Any]:
"""
Class method to check if PostgreSQL connection works.
Can be called directly on the class: await PostgresClient.health_check()
"""
try:
# Create a temporary client for the health check
client = cls()
start_time = asyncio.get_event_loop().time()
# Run the query in a thread pool
loop = asyncio.get_event_loop()
def check_db():
cursor = client.connection.cursor()
cursor.execute("SELECT 1")
cursor.close()
return True
await loop.run_in_executor(None, check_db)
end_time = asyncio.get_event_loop().time()
ping_time_ms = (end_time - start_time) * 1000
# Properly clean up the connection
if not client.use_pool or client.long_query or client.unlimited_query:
client.connection.close()
else:
postgreSQL_pool.putconn(client.connection)
return {
"status": "ok",
"message": "PostgreSQL connection is healthy",
"ping_time_ms": round(ping_time_ms, 2)
}
except Exception as e:
logger.error(f"PostgreSQL health check failed: {e}")
return {
"status": "error",
"message": f"Failed to connect to PostgreSQL: {str(e)}"
}
# Add get_client function at module level
def get_client(long_query=False, unlimited_query=False, use_pool=True) -> PostgresClient:
"""
Get a PostgreSQL client instance.
Args:
long_query: Set True for queries with extended timeout
unlimited_query: Set True for queries with no timeout
use_pool: Whether to use the connection pool
Returns:
PostgresClient instance
"""
return PostgresClient(long_query=long_query, unlimited_query=unlimited_query, use_pool=use_pool)
async def init():
logger.info(f">use PG_POOL:{config('PG_POOL', default=True)}")
if config('PG_POOL', cast=bool, default=True):
make_pool()
# Do a health check at initialization
try:
health_status = await PostgresClient.health_check()
if health_status["status"] == "ok":
logger.info(f"PostgreSQL connection verified. Ping: {health_status.get('ping_time_ms', 'N/A')}ms")
else:
logger.warning(f"PostgreSQL connection check failed: {health_status['message']}")
except Exception as e:
logger.error(f"Error during initialization health check: {str(e)}")
async def terminate():
global postgreSQL_pool
@ -188,3 +293,13 @@ async def terminate():
logger.info("Closed all connexions to PostgreSQL")
except (Exception, psycopg2.DatabaseError) as error:
logger.error("Error while closing all connexions to PostgreSQL", exc_info=error)
async def health_check() -> Dict[str, Any]:
"""
Public health check function that can be used by the application.
Returns:
Health status dict
"""
return await PostgresClient.health_check()