From 218b0d92995239ce15b6c6a3cc347efdd7a7045f Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Mon, 24 Oct 2022 16:42:36 +0200 Subject: [PATCH] feat(chalice): graceful shutdown --- api/app.py | 61 +++++++++++++++++++------------ api/app_alerts.py | 36 +++++++++++++----- api/chalicelib/utils/pg_client.py | 22 +++++++---- 3 files changed, 78 insertions(+), 41 deletions(-) diff --git a/api/app.py b/api/app.py index 974d7d8d9..4d5080ced 100644 --- a/api/app.py +++ b/api/app.py @@ -20,22 +20,14 @@ app.add_middleware(GZipMiddleware, minimum_size=1000) @app.middleware('http') async def or_middleware(request: Request, call_next): - global OR_SESSION_TOKEN - OR_SESSION_TOKEN = request.headers.get('vnd.openreplay.com.sid', request.headers.get('vnd.asayer.io.sid')) - - try: - if helper.TRACK_TIME: - import time - now = int(time.time() * 1000) - response: StreamingResponse = await call_next(request) - if helper.TRACK_TIME: - now = int(time.time() * 1000) - now - if now > 500: - print(f"Execution time: {now} ms") - except Exception as e: - pg_client.close() - raise e - pg_client.close() + if helper.TRACK_TIME: + import time + now = int(time.time() * 1000) + response: StreamingResponse = await call_next(request) + if helper.TRACK_TIME: + now = int(time.time() * 1000) - now + if now > 500: + logging.info(f"Execution time: {now} ms") return response @@ -61,14 +53,35 @@ app.include_router(metrics.app) app.include_router(insights.app) app.include_router(v1_api.app_apikey) -Schedule = AsyncIOScheduler() -Schedule.start() +loglevel = config("LOGLEVEL", default=logging.INFO) +print(f">Loglevel set to: {loglevel}") +logging.basicConfig(level=loglevel) +ap_logger = logging.getLogger('apscheduler') +ap_logger.setLevel(loglevel) +app.schedule = AsyncIOScheduler() -for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs: - Schedule.add_job(id=job["func"].__name__, **job) -for job in Schedule.get_jobs(): - print({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)}) +@app.on_event("startup") +async def startup(): + await pg_client.init() + app.schedule.start() -logging.basicConfig(level=config("LOGLEVEL", default=logging.INFO)) -logging.getLogger('apscheduler').setLevel(config("LOGLEVEL", default=logging.INFO)) + for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs: + app.schedule.add_job(id=job["func"].__name__, **job) + + ap_logger.info(">Scheduled jobs:") + for job in app.schedule.get_jobs(): + ap_logger.info({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)}) + + +@app.on_event("shutdown") +async def shutdown(): + print(">>>>> shutting down") + app.schedule.shutdown(wait=False) + await pg_client.terminate() + + +@app.get('/private/suicide', tags=["private"]) +async def stop_server(): + import os, signal + os.kill(1, signal.SIGTERM) diff --git a/api/app_alerts.py b/api/app_alerts.py index 4e05ab1a8..0d8ad0f04 100644 --- a/api/app_alerts.py +++ b/api/app_alerts.py @@ -3,11 +3,12 @@ import logging from apscheduler.schedulers.asyncio import AsyncIOScheduler from decouple import config from fastapi import FastAPI +from chalicelib.utils import pg_client from chalicelib.core import alerts_processor app = FastAPI(root_path="/alerts", docs_url=config("docs_url", default=""), redoc_url=config("redoc_url", default="")) -print("============= ALERTS =============") +logging.info("============= ALERTS =============") @app.get("/") @@ -16,12 +17,29 @@ async def root(): app.schedule = AsyncIOScheduler() -app.schedule.start() -app.schedule.add_job(id="alerts_processor", **{"func": alerts_processor.process, "trigger": "interval", - "minutes": config("ALERTS_INTERVAL", cast=int, default=5), - "misfire_grace_time": 20}) -for job in app.schedule.get_jobs(): - print({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)}) -logging.basicConfig(level=config("LOGLEVEL", default=logging.INFO)) -logging.getLogger('apscheduler').setLevel(config("LOGLEVEL", default=logging.INFO)) +loglevel = config("LOGLEVEL", default=logging.INFO) +print(f">Loglevel set to: {loglevel}") +logging.basicConfig(level=loglevel) +ap_logger = logging.getLogger('apscheduler') +ap_logger.setLevel(loglevel) +app.schedule = AsyncIOScheduler() + + +@app.on_event("startup") +async def startup(): + await pg_client.init() + app.schedule.start() + app.schedule.add_job(id="alerts_processor", **{"func": alerts_processor.process, "trigger": "interval", + "minutes": config("ALERTS_INTERVAL", cast=int, default=5), + "misfire_grace_time": 20}) + + ap_logger.info(">Scheduled jobs:") + for job in app.schedule.get_jobs(): + ap_logger.info({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)}) + + +@app.on_event("shutdown") +async def shutdown(): + app.schedule.shutdown(wait=False) + await pg_client.terminate() diff --git a/api/chalicelib/utils/pg_client.py b/api/chalicelib/utils/pg_client.py index 4ff1efe4b..1bfb7aa09 100644 --- a/api/chalicelib/utils/pg_client.py +++ b/api/chalicelib/utils/pg_client.py @@ -20,8 +20,6 @@ PG_CONFIG = dict(_PG_CONFIG) if config("PG_TIMEOUT", cast=int, default=0) > 0: PG_CONFIG["options"] = f"-c statement_timeout={config('PG_TIMEOUT', cast=int) * 1000}" -logging.info(f">PG_POOL:{config('PG_POOL', default=None)}") - class ORThreadedConnectionPool(psycopg2.pool.ThreadedConnectionPool): def __init__(self, minconn, maxconn, *args, **kwargs): @@ -83,10 +81,6 @@ def make_pool(): raise error -if config('PG_POOL', cast=bool, default=True): - make_pool() - - class PostgresClient: connection = None cursor = None @@ -151,5 +145,17 @@ class PostgresClient: self.__enter__() -def close(): - pass +async def init(): + logging.info(f">PG_POOL:{config('PG_POOL', default=None)}") + if config('PG_POOL', cast=bool, default=True): + make_pool() + + +async def terminate(): + global postgreSQL_pool + if postgreSQL_pool is not None: + try: + postgreSQL_pool.closeall() + logging.info("Closed all connexions to PostgreSQL") + except (Exception, psycopg2.DatabaseError) as error: + logging.error("Error while closing all connexions to PostgreSQL", error)