Compare commits
1 commit
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f52552b4f2 |
10 changed files with 871 additions and 145 deletions
203
api/app.py
203
api/app.py
|
|
@ -1,134 +1,133 @@
|
||||||
import logging
|
import logging
|
||||||
import time
|
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
|
||||||
import psycopg_pool
|
from fastapi import FastAPI
|
||||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
||||||
from decouple import config
|
|
||||||
from fastapi import FastAPI, Request
|
|
||||||
from fastapi.middleware.cors import CORSMiddleware
|
|
||||||
from fastapi.middleware.gzip import GZipMiddleware
|
from fastapi.middleware.gzip import GZipMiddleware
|
||||||
from psycopg import AsyncConnection
|
|
||||||
from psycopg.rows import dict_row
|
|
||||||
from starlette.responses import StreamingResponse
|
|
||||||
|
|
||||||
from chalicelib.utils import helper
|
from boot.config import settings
|
||||||
|
from boot.middleware import timing_middleware, setup_cors
|
||||||
|
from boot.scheduler.manager import SchedulerManager
|
||||||
|
from boot.health.router import health_router
|
||||||
from chalicelib.utils import pg_client, ch_client
|
from chalicelib.utils import pg_client, ch_client
|
||||||
|
|
||||||
|
# Import routers and cron jobs
|
||||||
from crons import core_crons, core_dynamic_crons
|
from crons import core_crons, core_dynamic_crons
|
||||||
from routers import core, core_dynamic
|
from routers import core, core_dynamic
|
||||||
from routers.subs import insights, metrics, v1_api, health, usability_tests, spot, product_anaytics
|
from routers.subs import insights, metrics, v1_api, health, usability_tests, spot, product_anaytics
|
||||||
|
|
||||||
loglevel = config("LOGLEVEL", default=logging.WARNING)
|
# Configure logging
|
||||||
print(f">Loglevel set to: {loglevel}")
|
settings.configure_logging()
|
||||||
logging.basicConfig(level=loglevel)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ORPYAsyncConnection(AsyncConnection):
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super().__init__(*args, row_factory=dict_row, **kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
# Startup
|
"""
|
||||||
logging.info(">>>>> starting up <<<<<")
|
Application lifespan context manager.
|
||||||
ap_logger = logging.getLogger('apscheduler')
|
|
||||||
ap_logger.setLevel(loglevel)
|
|
||||||
|
|
||||||
app.schedule = AsyncIOScheduler()
|
Handles startup and shutdown tasks:
|
||||||
|
- Database connections initialization
|
||||||
|
- Scheduler setup
|
||||||
|
- Cleanup on shutdown
|
||||||
|
"""
|
||||||
|
# Startup
|
||||||
|
logger.info(">>>>> starting up <<<<<")
|
||||||
|
|
||||||
|
# Initialize database connections
|
||||||
await pg_client.init()
|
await pg_client.init()
|
||||||
await ch_client.init()
|
await ch_client.init()
|
||||||
app.schedule.start()
|
|
||||||
|
|
||||||
for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs:
|
# Initialize scheduler with jobs
|
||||||
app.schedule.add_job(id=job["func"].__name__, **job)
|
all_jobs = core_crons.cron_jobs + core_dynamic_crons.cron_jobs
|
||||||
|
SchedulerManager.initialize(all_jobs)
|
||||||
|
|
||||||
ap_logger.info(">Scheduled jobs:")
|
# Store PostgreSQL pool in app state for backwards compatibility
|
||||||
for job in app.schedule.get_jobs():
|
# app.state.postgresql = pg_client.
|
||||||
ap_logger.info({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)})
|
|
||||||
|
|
||||||
database = {
|
# App is ready to handle requests
|
||||||
"host": config("pg_host", default="localhost"),
|
|
||||||
"dbname": config("pg_dbname", default="orpy"),
|
|
||||||
"user": config("pg_user", default="orpy"),
|
|
||||||
"password": config("pg_password", default="orpy"),
|
|
||||||
"port": config("pg_port", cast=int, default=5432),
|
|
||||||
"application_name": "AIO" + config("APP_NAME", default="PY"),
|
|
||||||
}
|
|
||||||
|
|
||||||
database = psycopg_pool.AsyncConnectionPool(kwargs=database, connection_class=ORPYAsyncConnection,
|
|
||||||
min_size=config("PG_AIO_MINCONN", cast=int, default=1),
|
|
||||||
max_size=config("PG_AIO_MAXCONN", cast=int, default=5), )
|
|
||||||
app.state.postgresql = database
|
|
||||||
|
|
||||||
# App listening
|
|
||||||
yield
|
yield
|
||||||
|
|
||||||
# Shutdown
|
# Shutdown
|
||||||
await database.close()
|
logger.info(">>>>> shutting down <<<<<")
|
||||||
logging.info(">>>>> shutting down <<<<<")
|
|
||||||
app.schedule.shutdown(wait=False)
|
# Shutdown scheduler
|
||||||
|
SchedulerManager.shutdown()
|
||||||
|
|
||||||
|
# Close database connections
|
||||||
await pg_client.terminate()
|
await pg_client.terminate()
|
||||||
|
await ch_client.terminate()
|
||||||
|
|
||||||
|
|
||||||
app = FastAPI(root_path=config("root_path", default="/api"), docs_url=config("docs_url", default=""),
|
def create_app() -> FastAPI:
|
||||||
redoc_url=config("redoc_url", default=""), lifespan=lifespan)
|
"""Create and configure the FastAPI application."""
|
||||||
app.add_middleware(GZipMiddleware, minimum_size=1000)
|
# Create FastAPI app
|
||||||
|
app = FastAPI(
|
||||||
|
root_path=settings.ROOT_PATH,
|
||||||
|
docs_url=settings.DOCS_URL,
|
||||||
|
redoc_url=settings.REDOC_URL,
|
||||||
|
lifespan=lifespan,
|
||||||
|
title=f"{settings.APP_NAME} API",
|
||||||
|
description=f"API for {settings.APP_NAME}",
|
||||||
|
version="1.0.0"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add middlewares
|
||||||
|
app.add_middleware(GZipMiddleware, minimum_size=1000)
|
||||||
|
app.middleware('http')(timing_middleware)
|
||||||
|
setup_cors(app)
|
||||||
|
|
||||||
|
# Register health check routes first for monitoring systems
|
||||||
|
app.include_router(health_router)
|
||||||
|
|
||||||
|
# Register existing application routers
|
||||||
|
register_routers(app)
|
||||||
|
|
||||||
|
return app
|
||||||
|
|
||||||
|
|
||||||
@app.middleware('http')
|
def register_routers(app: FastAPI) -> None:
|
||||||
async def or_middleware(request: Request, call_next):
|
"""Register all application routers."""
|
||||||
if helper.TRACK_TIME:
|
# Core routers
|
||||||
now = time.time()
|
app.include_router(core.public_app)
|
||||||
try:
|
app.include_router(core.app)
|
||||||
response: StreamingResponse = await call_next(request)
|
app.include_router(core.app_apikey)
|
||||||
except:
|
|
||||||
logging.error(f"{request.method}: {request.url.path} FAILED!")
|
# Core dynamic routers
|
||||||
raise
|
app.include_router(core_dynamic.public_app)
|
||||||
if response.status_code // 100 != 2:
|
app.include_router(core_dynamic.app)
|
||||||
logging.warning(f"{request.method}:{request.url.path} {response.status_code}!")
|
app.include_router(core_dynamic.app_apikey)
|
||||||
if helper.TRACK_TIME:
|
|
||||||
now = time.time() - now
|
# Feature routers
|
||||||
if now > 2:
|
app.include_router(metrics.app)
|
||||||
now = round(now, 2)
|
app.include_router(insights.app)
|
||||||
logging.warning(f"Execution time: {now} s for {request.method}: {request.url.path}")
|
app.include_router(v1_api.app_apikey)
|
||||||
response.headers["x-robots-tag"] = 'noindex, nofollow'
|
|
||||||
return response
|
# Health routers (existing ones from the codebase)
|
||||||
|
app.include_router(health.public_app)
|
||||||
|
app.include_router(health.app)
|
||||||
|
app.include_router(health.app_apikey)
|
||||||
|
|
||||||
|
# Usability tests routers
|
||||||
|
app.include_router(usability_tests.public_app)
|
||||||
|
app.include_router(usability_tests.app)
|
||||||
|
app.include_router(usability_tests.app_apikey)
|
||||||
|
|
||||||
|
# Spot routers
|
||||||
|
app.include_router(spot.public_app)
|
||||||
|
app.include_router(spot.app)
|
||||||
|
app.include_router(spot.app_apikey)
|
||||||
|
|
||||||
|
# Product analytics routers
|
||||||
|
app.include_router(product_anaytics.public_app)
|
||||||
|
app.include_router(product_anaytics.app)
|
||||||
|
app.include_router(product_anaytics.app_apikey)
|
||||||
|
|
||||||
|
|
||||||
origins = [
|
# Create application instance
|
||||||
"*",
|
app = create_app()
|
||||||
]
|
|
||||||
|
|
||||||
app.add_middleware(
|
# For running with a proper ASGI server like uvicorn
|
||||||
CORSMiddleware,
|
if __name__ == "__main__":
|
||||||
allow_origins=origins,
|
import uvicorn
|
||||||
allow_credentials=True,
|
|
||||||
allow_methods=["*"],
|
|
||||||
allow_headers=["*"],
|
|
||||||
)
|
|
||||||
app.include_router(core.public_app)
|
|
||||||
app.include_router(core.app)
|
|
||||||
app.include_router(core.app_apikey)
|
|
||||||
app.include_router(core_dynamic.public_app)
|
|
||||||
app.include_router(core_dynamic.app)
|
|
||||||
app.include_router(core_dynamic.app_apikey)
|
|
||||||
app.include_router(metrics.app)
|
|
||||||
app.include_router(insights.app)
|
|
||||||
app.include_router(v1_api.app_apikey)
|
|
||||||
app.include_router(health.public_app)
|
|
||||||
app.include_router(health.app)
|
|
||||||
app.include_router(health.app_apikey)
|
|
||||||
|
|
||||||
app.include_router(usability_tests.public_app)
|
uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)
|
||||||
app.include_router(usability_tests.app)
|
|
||||||
app.include_router(usability_tests.app_apikey)
|
|
||||||
|
|
||||||
app.include_router(spot.public_app)
|
|
||||||
app.include_router(spot.app)
|
|
||||||
app.include_router(spot.app_apikey)
|
|
||||||
|
|
||||||
app.include_router(product_anaytics.public_app)
|
|
||||||
app.include_router(product_anaytics.app)
|
|
||||||
app.include_router(product_anaytics.app_apikey)
|
|
||||||
|
|
|
||||||
78
api/boot/config.py
Normal file
78
api/boot/config.py
Normal file
|
|
@ -0,0 +1,78 @@
|
||||||
|
import logging
|
||||||
|
from typing import Dict, Any, List, Optional
|
||||||
|
from decouple import config as decouple_config
|
||||||
|
|
||||||
|
|
||||||
|
class Settings:
|
||||||
|
"""Centralized application settings."""
|
||||||
|
|
||||||
|
# Application settings
|
||||||
|
APP_NAME: str = decouple_config("APP_NAME", default="OpenReplay")
|
||||||
|
ROOT_PATH: str = decouple_config("root_path", default="/api")
|
||||||
|
DOCS_URL: str = decouple_config("docs_url", default="")
|
||||||
|
REDOC_URL: str = decouple_config("redoc_url", default="")
|
||||||
|
TRACK_TIME: bool = decouple_config("TRACK_TIME", default=False, cast=bool)
|
||||||
|
|
||||||
|
# Logging - get raw string value without casting
|
||||||
|
_LOG_LEVEL_STR: str = decouple_config("LOGLEVEL", default="WARNING")
|
||||||
|
LOG_FORMAT: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||||||
|
|
||||||
|
# PostgreSQL settings
|
||||||
|
PG_HOST: str = decouple_config("pg_host", default="localhost")
|
||||||
|
PG_DBNAME: str = decouple_config("pg_dbname", default="orpy")
|
||||||
|
PG_USER: str = decouple_config("pg_user", default="orpy")
|
||||||
|
PG_PASSWORD: str = decouple_config("pg_password", default="orpy")
|
||||||
|
PG_PORT: int = decouple_config("pg_port", cast=int, default=5432)
|
||||||
|
PG_AIO_MINCONN: int = decouple_config("PG_AIO_MINCONN", cast=int, default=1)
|
||||||
|
PG_AIO_MAXCONN: int = decouple_config("PG_AIO_MAXCONN", cast=int, default=5)
|
||||||
|
|
||||||
|
# CORS settings
|
||||||
|
CORS_ORIGINS: List[str] = ["*"]
|
||||||
|
CORS_ALLOW_CREDENTIALS: bool = True
|
||||||
|
CORS_ALLOW_METHODS: List[str] = ["*"]
|
||||||
|
CORS_ALLOW_HEADERS: List[str] = ["*"]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def LOG_LEVEL(self) -> int:
|
||||||
|
"""Convert string log level to an integer constant."""
|
||||||
|
level_map = {
|
||||||
|
"CRITICAL": logging.CRITICAL,
|
||||||
|
"FATAL": logging.FATAL,
|
||||||
|
"ERROR": logging.ERROR,
|
||||||
|
"WARNING": logging.WARNING,
|
||||||
|
"WARN": logging.WARN,
|
||||||
|
"INFO": logging.INFO,
|
||||||
|
"DEBUG": logging.DEBUG,
|
||||||
|
"NOTSET": logging.NOTSET,
|
||||||
|
}
|
||||||
|
|
||||||
|
# If it's a digit string, convert to int
|
||||||
|
if self._LOG_LEVEL_STR.isdigit():
|
||||||
|
return int(self._LOG_LEVEL_STR)
|
||||||
|
|
||||||
|
# If it's a known level name, return the corresponding value
|
||||||
|
return level_map.get(self._LOG_LEVEL_STR.upper(), logging.WARNING)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def pg_dsn(self) -> Dict[str, Any]:
|
||||||
|
"""Return PostgreSQL connection parameters as a dictionary."""
|
||||||
|
return {
|
||||||
|
"host": self.PG_HOST,
|
||||||
|
"dbname": self.PG_DBNAME,
|
||||||
|
"user": self.PG_USER,
|
||||||
|
"password": self.PG_PASSWORD,
|
||||||
|
"port": self.PG_PORT,
|
||||||
|
"application_name": f"AIO{self.APP_NAME}",
|
||||||
|
}
|
||||||
|
|
||||||
|
def configure_logging(self) -> None:
|
||||||
|
"""Configure application logging."""
|
||||||
|
logging.basicConfig(
|
||||||
|
level=self.LOG_LEVEL,
|
||||||
|
format=self.LOG_FORMAT
|
||||||
|
)
|
||||||
|
logging.info(f">Loglevel set to: {self._LOG_LEVEL_STR} ({self.LOG_LEVEL})")
|
||||||
|
|
||||||
|
|
||||||
|
# Create a singleton instance
|
||||||
|
settings = Settings()
|
||||||
82
api/boot/health/router.py
Normal file
82
api/boot/health/router.py
Normal file
|
|
@ -0,0 +1,82 @@
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
import psutil
|
||||||
|
from typing import Dict, Any
|
||||||
|
|
||||||
|
from fastapi import APIRouter, status
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from boot.scheduler.manager import SchedulerManager
|
||||||
|
from chalicelib.utils import pg_client, ch_client
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Create router for health endpoints
|
||||||
|
health_router = APIRouter(prefix="/health", tags=["Health"])
|
||||||
|
|
||||||
|
|
||||||
|
class HealthResponse(BaseModel):
|
||||||
|
"""Health check response model."""
|
||||||
|
status: str
|
||||||
|
details: Dict[str, Any]
|
||||||
|
|
||||||
|
|
||||||
|
@health_router.get("/liveness", response_model=HealthResponse, status_code=status.HTTP_200_OK)
|
||||||
|
async def liveness():
|
||||||
|
"""
|
||||||
|
Liveness probe endpoint.
|
||||||
|
|
||||||
|
This endpoint indicates if the application is running.
|
||||||
|
It's designed to be quick and does not check external dependencies.
|
||||||
|
"""
|
||||||
|
return HealthResponse(
|
||||||
|
status="alive",
|
||||||
|
details={
|
||||||
|
"uptime_seconds": time.time() - psutil.boot_time(),
|
||||||
|
"process": {
|
||||||
|
"cpu_percent": psutil.cpu_percent(),
|
||||||
|
"memory_percent": psutil.virtual_memory().percent
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@health_router.get("/readiness", response_model=HealthResponse)
|
||||||
|
async def readiness():
|
||||||
|
"""
|
||||||
|
Readiness probe endpoint.
|
||||||
|
|
||||||
|
This endpoint indicates if the application is ready to handle requests.
|
||||||
|
It checks connections to dependent services.
|
||||||
|
"""
|
||||||
|
# Check PostgreSQL connection
|
||||||
|
pg_healthy = await pg_client.PostgresClient.health_check()
|
||||||
|
|
||||||
|
# Check ClickHouse connection
|
||||||
|
ch_healthy = await ch_client.ClickHouseClient.health_check()
|
||||||
|
|
||||||
|
# Check scheduler
|
||||||
|
scheduler_healthy = SchedulerManager.health_check()
|
||||||
|
|
||||||
|
# Overall status
|
||||||
|
all_healthy = pg_healthy and ch_healthy and scheduler_healthy
|
||||||
|
status_code = status.HTTP_200_OK if all_healthy else status.HTTP_503_SERVICE_UNAVAILABLE
|
||||||
|
|
||||||
|
response = HealthResponse(
|
||||||
|
status="ready" if all_healthy else "not ready",
|
||||||
|
details={
|
||||||
|
"services": {
|
||||||
|
"postgres": "healthy" if pg_healthy else "unhealthy",
|
||||||
|
"clickhouse": "healthy" if ch_healthy else "unhealthy",
|
||||||
|
"scheduler": "healthy" if scheduler_healthy else "unhealthy"
|
||||||
|
},
|
||||||
|
"system": {
|
||||||
|
"disk_usage_percent": psutil.disk_usage('/').percent,
|
||||||
|
"memory_available_mb": psutil.virtual_memory().available / (1024 * 1024),
|
||||||
|
"cpu_count": psutil.cpu_count(),
|
||||||
|
"cpu_percent": psutil.cpu_percent()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return response
|
||||||
8
api/boot/middleware/__init__.py
Normal file
8
api/boot/middleware/__init__.py
Normal file
|
|
@ -0,0 +1,8 @@
|
||||||
|
"""
|
||||||
|
Middleware components for the FastAPI application.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from .timing import timing_middleware
|
||||||
|
from .cors import setup_cors
|
||||||
|
|
||||||
|
__all__ = ["timing_middleware", "setup_cors"]
|
||||||
15
api/boot/middleware/cors.py
Normal file
15
api/boot/middleware/cors.py
Normal file
|
|
@ -0,0 +1,15 @@
|
||||||
|
from fastapi import FastAPI
|
||||||
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
|
|
||||||
|
from boot.config import settings
|
||||||
|
|
||||||
|
|
||||||
|
def setup_cors(app: FastAPI) -> None:
|
||||||
|
"""Configure CORS middleware for the application."""
|
||||||
|
app.add_middleware(
|
||||||
|
CORSMiddleware,
|
||||||
|
allow_origins=settings.CORS_ORIGINS,
|
||||||
|
allow_credentials=settings.CORS_ALLOW_CREDENTIALS,
|
||||||
|
allow_methods=settings.CORS_ALLOW_METHODS,
|
||||||
|
allow_headers=settings.CORS_ALLOW_HEADERS,
|
||||||
|
)
|
||||||
42
api/boot/middleware/timing.py
Normal file
42
api/boot/middleware/timing.py
Normal file
|
|
@ -0,0 +1,42 @@
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from fastapi import Request
|
||||||
|
from starlette.responses import StreamingResponse
|
||||||
|
|
||||||
|
from boot.config import settings
|
||||||
|
from chalicelib.utils import helper
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
async def timing_middleware(request: Request, call_next):
|
||||||
|
"""
|
||||||
|
Middleware to track request timing and log slow requests.
|
||||||
|
Also adds security headers to responses.
|
||||||
|
"""
|
||||||
|
if helper.TRACK_TIME:
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
try:
|
||||||
|
response: StreamingResponse = await call_next(request)
|
||||||
|
except Exception as e:
|
||||||
|
path = request.url.path
|
||||||
|
method = request.method
|
||||||
|
logger.error(f"{method}: {path} FAILED! Error: {str(e)}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
# Log non-successful responses
|
||||||
|
if response.status_code // 100 != 2:
|
||||||
|
logger.warning(f"{request.method}:{request.url.path} {response.status_code}!")
|
||||||
|
|
||||||
|
# Track execution time
|
||||||
|
if helper.TRACK_TIME:
|
||||||
|
elapsed = time.time() - start_time
|
||||||
|
if elapsed > 2:
|
||||||
|
elapsed = round(elapsed, 2)
|
||||||
|
logger.warning(f"Execution time: {elapsed} s for {request.method}: {request.url.path}")
|
||||||
|
|
||||||
|
# Set security headers
|
||||||
|
response.headers["x-robots-tag"] = 'noindex, nofollow'
|
||||||
|
|
||||||
|
return response
|
||||||
71
api/boot/scheduler/manager.py
Normal file
71
api/boot/scheduler/manager.py
Normal file
|
|
@ -0,0 +1,71 @@
|
||||||
|
import logging
|
||||||
|
from typing import List, Dict, Any
|
||||||
|
|
||||||
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class SchedulerManager:
|
||||||
|
"""Scheduler manager for cron jobs."""
|
||||||
|
|
||||||
|
_scheduler: AsyncIOScheduler = None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def initialize(cls, jobs: List[Dict[str, Any]]) -> None:
|
||||||
|
"""Initialize the scheduler with jobs."""
|
||||||
|
if cls._scheduler is not None:
|
||||||
|
logger.warning("Scheduler already initialized")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Configure APScheduler logger
|
||||||
|
ap_logger = logging.getLogger('apscheduler')
|
||||||
|
ap_logger.setLevel(logging.getLogger().level)
|
||||||
|
|
||||||
|
# Create scheduler
|
||||||
|
cls._scheduler = AsyncIOScheduler()
|
||||||
|
|
||||||
|
# Add all jobs to the scheduler
|
||||||
|
for job in jobs:
|
||||||
|
cls._scheduler.add_job(id=job["func"].__name__, **job)
|
||||||
|
|
||||||
|
# Log scheduled jobs
|
||||||
|
ap_logger.info(">Scheduled jobs:")
|
||||||
|
for job in cls._scheduler.get_jobs():
|
||||||
|
# Get the job information safely without depending on next_run_time
|
||||||
|
job_info = {
|
||||||
|
"Name": str(job.id),
|
||||||
|
"Run Frequency": str(job.trigger),
|
||||||
|
}
|
||||||
|
|
||||||
|
# Try to get next run time if available, otherwise skip it
|
||||||
|
try:
|
||||||
|
if hasattr(job, "next_run_time"):
|
||||||
|
job_info["Next Run"] = str(job.next_run_time)
|
||||||
|
elif hasattr(job, "_get_run_times"):
|
||||||
|
# Try to get the next run time using _get_run_times
|
||||||
|
run_times = job._get_run_times(1)
|
||||||
|
if run_times:
|
||||||
|
job_info["Next Run"] = str(run_times[0])
|
||||||
|
except Exception as e:
|
||||||
|
job_info["Next Run"] = "Unknown"
|
||||||
|
logger.debug(f"Could not determine next run time for job {job.id}: {e}")
|
||||||
|
|
||||||
|
ap_logger.info(job_info)
|
||||||
|
|
||||||
|
# Start the scheduler
|
||||||
|
cls._scheduler.start()
|
||||||
|
logger.info("Scheduler started")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def shutdown(cls) -> None:
|
||||||
|
"""Shutdown the scheduler."""
|
||||||
|
if cls._scheduler is not None:
|
||||||
|
cls._scheduler.shutdown(wait=False)
|
||||||
|
cls._scheduler = None
|
||||||
|
logger.info("Scheduler shutdown")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def health_check(cls) -> bool:
|
||||||
|
"""Check scheduler health."""
|
||||||
|
return cls._scheduler is not None and cls._scheduler.running
|
||||||
|
|
@ -1,73 +1,286 @@
|
||||||
import logging
|
import logging
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
import asyncio
|
||||||
|
from functools import wraps
|
||||||
|
from queue import Queue, Empty
|
||||||
|
from typing import Dict, Any, Optional
|
||||||
|
|
||||||
import clickhouse_driver
|
import clickhouse_connect
|
||||||
|
from clickhouse_connect.driver.query import QueryContext
|
||||||
from decouple import config
|
from decouple import config
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_CH_CONFIG = {"host": config("ch_host"),
|
||||||
|
"user": config("ch_user", default="default"),
|
||||||
|
"password": config("ch_password", default=""),
|
||||||
|
"port": config("ch_port_http", cast=int),
|
||||||
|
"client_name": config("APP_NAME", default="PY")}
|
||||||
|
CH_CONFIG = dict(_CH_CONFIG)
|
||||||
|
|
||||||
settings = {}
|
settings = {}
|
||||||
if config('ch_timeout', cast=int, default=-1) > 0:
|
if config('ch_timeout', cast=int, default=-1) > 0:
|
||||||
logger.info(f"CH-max_execution_time set to {config('ch_timeout')}s")
|
logging.info(f"CH-max_execution_time set to {config('ch_timeout')}s")
|
||||||
settings = {**settings, "max_execution_time": config('ch_timeout', cast=int)}
|
settings = {**settings, "max_execution_time": config('ch_timeout', cast=int)}
|
||||||
|
|
||||||
if config('ch_receive_timeout', cast=int, default=-1) > 0:
|
if config('ch_receive_timeout', cast=int, default=-1) > 0:
|
||||||
logger.info(f"CH-receive_timeout set to {config('ch_receive_timeout')}s")
|
logging.info(f"CH-receive_timeout set to {config('ch_receive_timeout')}s")
|
||||||
settings = {**settings, "receive_timeout": config('ch_receive_timeout', cast=int)}
|
settings = {**settings, "receive_timeout": config('ch_receive_timeout', cast=int)}
|
||||||
|
|
||||||
|
extra_args = {}
|
||||||
|
if config("CH_COMPRESSION", cast=bool, default=True):
|
||||||
|
extra_args["compression"] = "lz4"
|
||||||
|
|
||||||
|
|
||||||
|
def transform_result(self, original_function):
|
||||||
|
@wraps(original_function)
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
logger.debug(str.encode(self.format(query=kwargs.get("query", ""), parameters=kwargs.get("parameters"))))
|
||||||
|
result = original_function(*args, **kwargs)
|
||||||
|
if isinstance(result, clickhouse_connect.driver.query.QueryResult):
|
||||||
|
column_names = result.column_names
|
||||||
|
result = result.result_rows
|
||||||
|
result = [dict(zip(column_names, row)) for row in result]
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
class ClickHouseConnectionPool:
|
||||||
|
def __init__(self, min_size, max_size):
|
||||||
|
self.min_size = min_size
|
||||||
|
self.max_size = max_size
|
||||||
|
self.pool = Queue()
|
||||||
|
self.lock = threading.Lock()
|
||||||
|
self.total_connections = 0
|
||||||
|
|
||||||
|
# Initialize the pool with min_size connections
|
||||||
|
for _ in range(self.min_size):
|
||||||
|
client = clickhouse_connect.get_client(**CH_CONFIG,
|
||||||
|
database=config("ch_database", default="default"),
|
||||||
|
settings=settings,
|
||||||
|
**extra_args)
|
||||||
|
self.pool.put(client)
|
||||||
|
self.total_connections += 1
|
||||||
|
|
||||||
|
def get_connection(self):
|
||||||
|
try:
|
||||||
|
# Try to get a connection without blocking
|
||||||
|
client = self.pool.get_nowait()
|
||||||
|
return client
|
||||||
|
except Empty:
|
||||||
|
with self.lock:
|
||||||
|
if self.total_connections < self.max_size:
|
||||||
|
client = clickhouse_connect.get_client(**CH_CONFIG,
|
||||||
|
database=config("ch_database", default="default"),
|
||||||
|
settings=settings,
|
||||||
|
**extra_args)
|
||||||
|
self.total_connections += 1
|
||||||
|
return client
|
||||||
|
# If max_size reached, wait until a connection is available
|
||||||
|
client = self.pool.get()
|
||||||
|
return client
|
||||||
|
|
||||||
|
def release_connection(self, client):
|
||||||
|
self.pool.put(client)
|
||||||
|
|
||||||
|
def close_all(self):
|
||||||
|
with self.lock:
|
||||||
|
while not self.pool.empty():
|
||||||
|
client = self.pool.get()
|
||||||
|
client.close()
|
||||||
|
self.total_connections = 0
|
||||||
|
|
||||||
|
|
||||||
|
CH_pool: ClickHouseConnectionPool = None
|
||||||
|
|
||||||
|
RETRY_MAX = config("CH_RETRY_MAX", cast=int, default=50)
|
||||||
|
RETRY_INTERVAL = config("CH_RETRY_INTERVAL", cast=int, default=2)
|
||||||
|
RETRY = 0
|
||||||
|
|
||||||
|
|
||||||
|
def make_pool():
|
||||||
|
if not config('CH_POOL', cast=bool, default=True):
|
||||||
|
return
|
||||||
|
global CH_pool
|
||||||
|
global RETRY
|
||||||
|
if CH_pool is not None:
|
||||||
|
try:
|
||||||
|
CH_pool.close_all()
|
||||||
|
except Exception as error:
|
||||||
|
logger.error("Error while closing all connexions to CH", exc_info=error)
|
||||||
|
try:
|
||||||
|
CH_pool = ClickHouseConnectionPool(min_size=config("CH_MINCONN", cast=int, default=4),
|
||||||
|
max_size=config("CH_MAXCONN", cast=int, default=8))
|
||||||
|
if CH_pool is not None:
|
||||||
|
logger.info("Connection pool created successfully for CH")
|
||||||
|
except ConnectionError as error:
|
||||||
|
logger.error("Error while connecting to CH", exc_info=error)
|
||||||
|
if RETRY < RETRY_MAX:
|
||||||
|
RETRY += 1
|
||||||
|
logger.info(f"waiting for {RETRY_INTERVAL}s before retry n°{RETRY}")
|
||||||
|
time.sleep(RETRY_INTERVAL)
|
||||||
|
make_pool()
|
||||||
|
else:
|
||||||
|
raise error
|
||||||
|
|
||||||
|
|
||||||
class ClickHouseClient:
|
class ClickHouseClient:
|
||||||
__client = None
|
|
||||||
|
|
||||||
def __init__(self, database=None):
|
def __init__(self, database=None):
|
||||||
extra_args = {}
|
self.__client = None
|
||||||
if config("CH_COMPRESSION", cast=bool, default=True):
|
if database is not None or not config('CH_POOL', cast=bool, default=True):
|
||||||
extra_args["compression"] = "lz4"
|
self.__client = clickhouse_connect.get_client(**CH_CONFIG,
|
||||||
self.__client = clickhouse_driver.Client(host=config("ch_host"),
|
database=database if database else config("ch_database",
|
||||||
database=database if database else config("ch_database",
|
default="default"),
|
||||||
default="default"),
|
settings=settings,
|
||||||
user=config("ch_user", default="default"),
|
**extra_args)
|
||||||
password=config("ch_password", default=""),
|
|
||||||
port=config("ch_port", cast=int),
|
else:
|
||||||
settings=settings,
|
self.__client = CH_pool.get_connection()
|
||||||
**extra_args) \
|
|
||||||
if self.__client is None else self.__client
|
self.__client.execute = transform_result(self, self.__client.query)
|
||||||
|
self.__client.format = self.format
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
return self
|
|
||||||
|
|
||||||
def execute(self, query, parameters=None, **args):
|
|
||||||
try:
|
|
||||||
results = self.__client.execute(query=query, params=parameters, with_column_types=True, **args)
|
|
||||||
keys = tuple(x for x, y in results[1])
|
|
||||||
return [dict(zip(keys, i)) for i in results[0]]
|
|
||||||
except Exception as err:
|
|
||||||
logger.error("--------- CH EXCEPTION -----------", exc_info=err)
|
|
||||||
logger.error("--------- CH QUERY EXCEPTION -----------")
|
|
||||||
logger.error(self.format(query=query, parameters=parameters)
|
|
||||||
.replace('\n', '\\n')
|
|
||||||
.replace(' ', ' ')
|
|
||||||
.replace(' ', ' '))
|
|
||||||
logger.error("--------------------")
|
|
||||||
raise err
|
|
||||||
|
|
||||||
def insert(self, query, params=None, **args):
|
|
||||||
return self.__client.execute(query=query, params=params, **args)
|
|
||||||
|
|
||||||
def client(self):
|
|
||||||
return self.__client
|
return self.__client
|
||||||
|
|
||||||
def format(self, query, parameters):
|
def format(self, query, *, parameters=None):
|
||||||
if parameters is None:
|
if parameters is None:
|
||||||
return query
|
return query
|
||||||
return self.__client.substitute_params(query, parameters, self.__client.connection.context)
|
return query % {
|
||||||
|
key: f"'{value}'" if isinstance(value, str) else value
|
||||||
|
for key, value in parameters.items()
|
||||||
|
}
|
||||||
|
|
||||||
|
async def health_check(self) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Check if the connection to ClickHouse is working.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict with status information:
|
||||||
|
{
|
||||||
|
"status": "ok" | "error",
|
||||||
|
"message": str,
|
||||||
|
"ping_time_ms": float (if available)
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
start_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
# Run the query in a thread pool to avoid blocking the event loop
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
await loop.run_in_executor(None, lambda: self.__client.query("SELECT 1"))
|
||||||
|
|
||||||
|
end_time = asyncio.get_event_loop().time()
|
||||||
|
ping_time_ms = (end_time - start_time) * 1000
|
||||||
|
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"message": "ClickHouse connection is healthy",
|
||||||
|
"ping_time_ms": round(ping_time_ms, 2)
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"ClickHouse health check failed: {str(e)}")
|
||||||
|
return {
|
||||||
|
"status": "error",
|
||||||
|
"message": f"Failed to connect to ClickHouse: {str(e)}",
|
||||||
|
}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def health_check(cls) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Class method to check if the ClickHouse connection is working.
|
||||||
|
Can be called directly on the class: await ClickHouseClient.health_check()
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict with status information
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Create a temporary client just for the health check
|
||||||
|
client = cls()
|
||||||
|
|
||||||
|
start_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
# Run the query in a thread pool
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
await loop.run_in_executor(None, lambda: client.__client.query("SELECT 1"))
|
||||||
|
|
||||||
|
end_time = asyncio.get_event_loop().time()
|
||||||
|
ping_time_ms = (end_time - start_time) * 1000
|
||||||
|
|
||||||
|
result = {
|
||||||
|
"status": "ok",
|
||||||
|
"message": "ClickHouse connection is healthy",
|
||||||
|
"ping_time_ms": round(ping_time_ms, 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
# Clean up the connection properly
|
||||||
|
if config('CH_POOL', cast=bool, default=True):
|
||||||
|
CH_pool.release_connection(client.__client)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"ClickHouse health check failed: {str(e)}")
|
||||||
|
return {
|
||||||
|
"status": "error",
|
||||||
|
"message": f"Failed to connect to ClickHouse: {str(e)}",
|
||||||
|
}
|
||||||
|
|
||||||
def __exit__(self, *args):
|
def __exit__(self, *args):
|
||||||
pass
|
if config('CH_POOL', cast=bool, default=True):
|
||||||
|
CH_pool.release_connection(self.__client)
|
||||||
|
else:
|
||||||
|
self.__client.close()
|
||||||
|
|
||||||
|
|
||||||
|
# Add the get_client function at module level
|
||||||
|
def get_client(database=None) -> ClickHouseClient:
|
||||||
|
"""
|
||||||
|
Get a ClickHouse client instance.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
database: Optional database name to override the default
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ClickHouseClient instance
|
||||||
|
"""
|
||||||
|
return ClickHouseClient(database=database)
|
||||||
|
|
||||||
|
|
||||||
async def init():
|
async def init():
|
||||||
logger.info(f">CH_POOL:not defined")
|
logger.info(f">use CH_POOL:{config('CH_POOL', default=True)}")
|
||||||
|
if config('CH_POOL', cast=bool, default=True):
|
||||||
|
make_pool()
|
||||||
|
|
||||||
|
# Do a health check at initialization to verify connection
|
||||||
|
try:
|
||||||
|
health_status = await ClickHouseClient.health_check()
|
||||||
|
if health_status["status"] == "ok":
|
||||||
|
logger.info(f"ClickHouse connection verified. Ping: {health_status.get('ping_time_ms', 'N/A')}ms")
|
||||||
|
else:
|
||||||
|
logger.warning(f"ClickHouse connection check failed: {health_status['message']}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error during initialization health check: {str(e)}")
|
||||||
|
|
||||||
|
|
||||||
async def terminate():
|
async def terminate():
|
||||||
pass
|
global CH_pool
|
||||||
|
if CH_pool is not None:
|
||||||
|
try:
|
||||||
|
CH_pool.close_all()
|
||||||
|
logger.info("Closed all connexions to CH")
|
||||||
|
except Exception as error:
|
||||||
|
logger.error("Error while closing all connexions to CH", exc_info=error)
|
||||||
|
|
||||||
|
|
||||||
|
async def health_check() -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Public health check function that can be used by the application.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Health status dict
|
||||||
|
"""
|
||||||
|
return await ClickHouseClient.health_check()
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,10 @@
|
||||||
import logging
|
import logging
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
import asyncio
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
from queue import Queue, Empty
|
from queue import Queue, Empty
|
||||||
|
from typing import Dict, Any, Optional
|
||||||
|
|
||||||
import clickhouse_connect
|
import clickhouse_connect
|
||||||
from clickhouse_connect.driver.query import QueryContext
|
from clickhouse_connect.driver.query import QueryContext
|
||||||
|
|
@ -154,6 +156,73 @@ class ClickHouseClient:
|
||||||
for key, value in parameters.items()
|
for key, value in parameters.items()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async def health_check(self) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Check if the connection to ClickHouse is working.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict with status information:
|
||||||
|
{
|
||||||
|
"status": "ok" | "error",
|
||||||
|
"message": str,
|
||||||
|
"ping_time_ms": float (if available)
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
start_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
# Run the query in a thread pool to avoid blocking the event loop
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
await loop.run_in_executor(None, lambda: self.__client.query("SELECT 1"))
|
||||||
|
|
||||||
|
end_time = asyncio.get_event_loop().time()
|
||||||
|
ping_time_ms = (end_time - start_time) * 1000
|
||||||
|
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"message": "ClickHouse connection is healthy",
|
||||||
|
"ping_time_ms": round(ping_time_ms, 2)
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"ClickHouse health check failed: {str(e)}")
|
||||||
|
return {
|
||||||
|
"status": "error",
|
||||||
|
"message": f"Failed to connect to ClickHouse: {str(e)}",
|
||||||
|
}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def health_check(cls) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Class method to check if the ClickHouse connection is working.
|
||||||
|
Can be called directly on the class: await ClickHouseClient.health_check()
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict with status information
|
||||||
|
"""
|
||||||
|
client = get_client()
|
||||||
|
try:
|
||||||
|
start_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
# Run the query in a thread pool to avoid blocking the event loop
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
client_instance = client.__client
|
||||||
|
await loop.run_in_executor(None, lambda: client_instance.query("SELECT 1"))
|
||||||
|
|
||||||
|
end_time = asyncio.get_event_loop().time()
|
||||||
|
ping_time_ms = (end_time - start_time) * 1000
|
||||||
|
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"message": "ClickHouse connection is healthy",
|
||||||
|
"ping_time_ms": round(ping_time_ms, 2)
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"ClickHouse health check failed: {str(e)}")
|
||||||
|
return {
|
||||||
|
"status": "error",
|
||||||
|
"message": f"Failed to connect to ClickHouse: {str(e)}",
|
||||||
|
}
|
||||||
|
|
||||||
def __exit__(self, *args):
|
def __exit__(self, *args):
|
||||||
if config('CH_POOL', cast=bool, default=True):
|
if config('CH_POOL', cast=bool, default=True):
|
||||||
CH_pool.release_connection(self.__client)
|
CH_pool.release_connection(self.__client)
|
||||||
|
|
@ -161,11 +230,35 @@ class ClickHouseClient:
|
||||||
self.__client.close()
|
self.__client.close()
|
||||||
|
|
||||||
|
|
||||||
|
# Add the get_client function at module level
|
||||||
|
def get_client(database=None) -> ClickHouseClient:
|
||||||
|
"""
|
||||||
|
Get a ClickHouse client instance.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
database: Optional database name to override the default
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ClickHouseClient instance
|
||||||
|
"""
|
||||||
|
return ClickHouseClient(database=database)
|
||||||
|
|
||||||
|
|
||||||
async def init():
|
async def init():
|
||||||
logger.info(f">use CH_POOL:{config('CH_POOL', default=True)}")
|
logger.info(f">use CH_POOL:{config('CH_POOL', default=True)}")
|
||||||
if config('CH_POOL', cast=bool, default=True):
|
if config('CH_POOL', cast=bool, default=True):
|
||||||
make_pool()
|
make_pool()
|
||||||
|
|
||||||
|
# Do a health check at initialization to verify connection
|
||||||
|
try:
|
||||||
|
health_status = await ClickHouseClient.health_check()
|
||||||
|
if health_status["status"] == "ok":
|
||||||
|
logger.info(f"ClickHouse connection verified. Ping: {health_status.get('ping_time_ms', 'N/A')}ms")
|
||||||
|
else:
|
||||||
|
logger.warning(f"ClickHouse connection check failed: {health_status['message']}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error during initialization health check: {str(e)}")
|
||||||
|
|
||||||
|
|
||||||
async def terminate():
|
async def terminate():
|
||||||
global CH_pool
|
global CH_pool
|
||||||
|
|
@ -175,3 +268,13 @@ async def terminate():
|
||||||
logger.info("Closed all connexions to CH")
|
logger.info("Closed all connexions to CH")
|
||||||
except Exception as error:
|
except Exception as error:
|
||||||
logger.error("Error while closing all connexions to CH", exc_info=error)
|
logger.error("Error while closing all connexions to CH", exc_info=error)
|
||||||
|
|
||||||
|
|
||||||
|
async def health_check() -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Public health check function that can be used by the application.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Health status dict
|
||||||
|
"""
|
||||||
|
return await ClickHouseClient.health_check()
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,8 @@
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
|
import asyncio
|
||||||
from threading import Semaphore
|
from threading import Semaphore
|
||||||
|
from typing import Dict, Any, Optional
|
||||||
|
|
||||||
import psycopg2
|
import psycopg2
|
||||||
import psycopg2.extras
|
import psycopg2.extras
|
||||||
|
|
@ -173,12 +175,115 @@ class PostgresClient:
|
||||||
self.cursor = None
|
self.cursor = None
|
||||||
return self.__enter__()
|
return self.__enter__()
|
||||||
|
|
||||||
|
async def health_check(self) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Instance method to check DB connection health
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
start_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
# Run the query in a thread pool to avoid blocking the event loop
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
|
def check_db():
|
||||||
|
cursor = self.connection.cursor()
|
||||||
|
cursor.execute("SELECT 1")
|
||||||
|
cursor.close()
|
||||||
|
return True
|
||||||
|
|
||||||
|
await loop.run_in_executor(None, check_db)
|
||||||
|
|
||||||
|
end_time = asyncio.get_event_loop().time()
|
||||||
|
ping_time_ms = (end_time - start_time) * 1000
|
||||||
|
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"message": "PostgreSQL connection is healthy",
|
||||||
|
"ping_time_ms": round(ping_time_ms, 2)
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"PostgreSQL health check failed: {e}")
|
||||||
|
return {
|
||||||
|
"status": "error",
|
||||||
|
"message": f"Failed to connect to PostgreSQL: {str(e)}"
|
||||||
|
}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def health_check(cls) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Class method to check if PostgreSQL connection works.
|
||||||
|
Can be called directly on the class: await PostgresClient.health_check()
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Create a temporary client for the health check
|
||||||
|
client = cls()
|
||||||
|
|
||||||
|
start_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
# Run the query in a thread pool
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
|
def check_db():
|
||||||
|
cursor = client.connection.cursor()
|
||||||
|
cursor.execute("SELECT 1")
|
||||||
|
cursor.close()
|
||||||
|
return True
|
||||||
|
|
||||||
|
await loop.run_in_executor(None, check_db)
|
||||||
|
|
||||||
|
end_time = asyncio.get_event_loop().time()
|
||||||
|
ping_time_ms = (end_time - start_time) * 1000
|
||||||
|
|
||||||
|
# Properly clean up the connection
|
||||||
|
if not client.use_pool or client.long_query or client.unlimited_query:
|
||||||
|
client.connection.close()
|
||||||
|
else:
|
||||||
|
postgreSQL_pool.putconn(client.connection)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"message": "PostgreSQL connection is healthy",
|
||||||
|
"ping_time_ms": round(ping_time_ms, 2)
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"PostgreSQL health check failed: {e}")
|
||||||
|
return {
|
||||||
|
"status": "error",
|
||||||
|
"message": f"Failed to connect to PostgreSQL: {str(e)}"
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# Add get_client function at module level
|
||||||
|
def get_client(long_query=False, unlimited_query=False, use_pool=True) -> PostgresClient:
|
||||||
|
"""
|
||||||
|
Get a PostgreSQL client instance.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
long_query: Set True for queries with extended timeout
|
||||||
|
unlimited_query: Set True for queries with no timeout
|
||||||
|
use_pool: Whether to use the connection pool
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
PostgresClient instance
|
||||||
|
"""
|
||||||
|
return PostgresClient(long_query=long_query, unlimited_query=unlimited_query, use_pool=use_pool)
|
||||||
|
|
||||||
|
|
||||||
async def init():
|
async def init():
|
||||||
logger.info(f">use PG_POOL:{config('PG_POOL', default=True)}")
|
logger.info(f">use PG_POOL:{config('PG_POOL', default=True)}")
|
||||||
if config('PG_POOL', cast=bool, default=True):
|
if config('PG_POOL', cast=bool, default=True):
|
||||||
make_pool()
|
make_pool()
|
||||||
|
|
||||||
|
# Do a health check at initialization
|
||||||
|
try:
|
||||||
|
health_status = await PostgresClient.health_check()
|
||||||
|
if health_status["status"] == "ok":
|
||||||
|
logger.info(f"PostgreSQL connection verified. Ping: {health_status.get('ping_time_ms', 'N/A')}ms")
|
||||||
|
else:
|
||||||
|
logger.warning(f"PostgreSQL connection check failed: {health_status['message']}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error during initialization health check: {str(e)}")
|
||||||
|
|
||||||
|
|
||||||
async def terminate():
|
async def terminate():
|
||||||
global postgreSQL_pool
|
global postgreSQL_pool
|
||||||
|
|
@ -188,3 +293,13 @@ async def terminate():
|
||||||
logger.info("Closed all connexions to PostgreSQL")
|
logger.info("Closed all connexions to PostgreSQL")
|
||||||
except (Exception, psycopg2.DatabaseError) as error:
|
except (Exception, psycopg2.DatabaseError) as error:
|
||||||
logger.error("Error while closing all connexions to PostgreSQL", exc_info=error)
|
logger.error("Error while closing all connexions to PostgreSQL", exc_info=error)
|
||||||
|
|
||||||
|
|
||||||
|
async def health_check() -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Public health check function that can be used by the application.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Health status dict
|
||||||
|
"""
|
||||||
|
return await PostgresClient.health_check()
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue