feat(chalice): graceful shutdown

This commit is contained in:
Taha Yassine Kraiem 2022-10-24 16:42:36 +02:00
parent 680dd7b31e
commit 218b0d9299
3 changed files with 78 additions and 41 deletions

View file

@ -20,22 +20,14 @@ app.add_middleware(GZipMiddleware, minimum_size=1000)
@app.middleware('http')
async def or_middleware(request: Request, call_next):
global OR_SESSION_TOKEN
OR_SESSION_TOKEN = request.headers.get('vnd.openreplay.com.sid', request.headers.get('vnd.asayer.io.sid'))
try:
if helper.TRACK_TIME:
import time
now = int(time.time() * 1000)
response: StreamingResponse = await call_next(request)
if helper.TRACK_TIME:
now = int(time.time() * 1000) - now
if now > 500:
print(f"Execution time: {now} ms")
except Exception as e:
pg_client.close()
raise e
pg_client.close()
if helper.TRACK_TIME:
import time
now = int(time.time() * 1000)
response: StreamingResponse = await call_next(request)
if helper.TRACK_TIME:
now = int(time.time() * 1000) - now
if now > 500:
logging.info(f"Execution time: {now} ms")
return response
@ -61,14 +53,35 @@ app.include_router(metrics.app)
app.include_router(insights.app)
app.include_router(v1_api.app_apikey)
Schedule = AsyncIOScheduler()
Schedule.start()
loglevel = config("LOGLEVEL", default=logging.INFO)
print(f">Loglevel set to: {loglevel}")
logging.basicConfig(level=loglevel)
ap_logger = logging.getLogger('apscheduler')
ap_logger.setLevel(loglevel)
app.schedule = AsyncIOScheduler()
for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs:
Schedule.add_job(id=job["func"].__name__, **job)
for job in Schedule.get_jobs():
print({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)})
@app.on_event("startup")
async def startup():
await pg_client.init()
app.schedule.start()
logging.basicConfig(level=config("LOGLEVEL", default=logging.INFO))
logging.getLogger('apscheduler').setLevel(config("LOGLEVEL", default=logging.INFO))
for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs:
app.schedule.add_job(id=job["func"].__name__, **job)
ap_logger.info(">Scheduled jobs:")
for job in app.schedule.get_jobs():
ap_logger.info({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)})
@app.on_event("shutdown")
async def shutdown():
print(">>>>> shutting down")
app.schedule.shutdown(wait=False)
await pg_client.terminate()
@app.get('/private/suicide', tags=["private"])
async def stop_server():
import os, signal
os.kill(1, signal.SIGTERM)

View file

@ -3,11 +3,12 @@ import logging
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from decouple import config
from fastapi import FastAPI
from chalicelib.utils import pg_client
from chalicelib.core import alerts_processor
app = FastAPI(root_path="/alerts", docs_url=config("docs_url", default=""), redoc_url=config("redoc_url", default=""))
print("============= ALERTS =============")
logging.info("============= ALERTS =============")
@app.get("/")
@ -16,12 +17,29 @@ async def root():
app.schedule = AsyncIOScheduler()
app.schedule.start()
app.schedule.add_job(id="alerts_processor", **{"func": alerts_processor.process, "trigger": "interval",
"minutes": config("ALERTS_INTERVAL", cast=int, default=5),
"misfire_grace_time": 20})
for job in app.schedule.get_jobs():
print({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)})
logging.basicConfig(level=config("LOGLEVEL", default=logging.INFO))
logging.getLogger('apscheduler').setLevel(config("LOGLEVEL", default=logging.INFO))
loglevel = config("LOGLEVEL", default=logging.INFO)
print(f">Loglevel set to: {loglevel}")
logging.basicConfig(level=loglevel)
ap_logger = logging.getLogger('apscheduler')
ap_logger.setLevel(loglevel)
app.schedule = AsyncIOScheduler()
@app.on_event("startup")
async def startup():
await pg_client.init()
app.schedule.start()
app.schedule.add_job(id="alerts_processor", **{"func": alerts_processor.process, "trigger": "interval",
"minutes": config("ALERTS_INTERVAL", cast=int, default=5),
"misfire_grace_time": 20})
ap_logger.info(">Scheduled jobs:")
for job in app.schedule.get_jobs():
ap_logger.info({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)})
@app.on_event("shutdown")
async def shutdown():
app.schedule.shutdown(wait=False)
await pg_client.terminate()

View file

@ -20,8 +20,6 @@ PG_CONFIG = dict(_PG_CONFIG)
if config("PG_TIMEOUT", cast=int, default=0) > 0:
PG_CONFIG["options"] = f"-c statement_timeout={config('PG_TIMEOUT', cast=int) * 1000}"
logging.info(f">PG_POOL:{config('PG_POOL', default=None)}")
class ORThreadedConnectionPool(psycopg2.pool.ThreadedConnectionPool):
def __init__(self, minconn, maxconn, *args, **kwargs):
@ -83,10 +81,6 @@ def make_pool():
raise error
if config('PG_POOL', cast=bool, default=True):
make_pool()
class PostgresClient:
connection = None
cursor = None
@ -151,5 +145,17 @@ class PostgresClient:
self.__enter__()
def close():
pass
async def init():
logging.info(f">PG_POOL:{config('PG_POOL', default=None)}")
if config('PG_POOL', cast=bool, default=True):
make_pool()
async def terminate():
global postgreSQL_pool
if postgreSQL_pool is not None:
try:
postgreSQL_pool.closeall()
logging.info("Closed all connexions to PostgreSQL")
except (Exception, psycopg2.DatabaseError) as error:
logging.error("Error while closing all connexions to PostgreSQL", error)