feat(chalice): graceful shutdown

feat(DB): changed integrations trigger
This commit is contained in:
Taha Yassine Kraiem 2022-10-24 17:01:52 +02:00
parent bb1d0cf1b6
commit eb7c2b15db
5 changed files with 71 additions and 27 deletions

View file

@ -63,6 +63,7 @@ app.schedule = AsyncIOScheduler()
@app.on_event("startup")
async def startup():
logging.info(">>>>> starting up <<<<<")
await pg_client.init()
app.schedule.start()
@ -76,13 +77,14 @@ async def startup():
@app.on_event("shutdown")
async def shutdown():
print(">>>>> shutting down")
logging.info(">>>>> shutting down <<<<<")
app.schedule.shutdown(wait=False)
await pg_client.terminate()
@app.get('/private/shutdown', tags=["private"])
async def stop_server():
logging.info("Requested shutdown")
await shutdown()
import os, signal
os.kill(1, signal.SIGTERM)

View file

@ -28,6 +28,7 @@ app.schedule = AsyncIOScheduler()
@app.on_event("startup")
async def startup():
logging.info(">>>>> starting up <<<<<")
await pg_client.init()
app.schedule.start()
app.schedule.add_job(id="alerts_processor", **{"func": alerts_processor.process, "trigger": "interval",
@ -41,5 +42,14 @@ async def startup():
@app.on_event("shutdown")
async def shutdown():
logging.info(">>>>> shutting down <<<<<")
app.schedule.shutdown(wait=False)
await pg_client.terminate()
@app.get('/private/shutdown', tags=["private"])
async def stop_server():
logging.info("Requested shutdown")
await shutdown()
import os, signal
os.kill(1, signal.SIGTERM)

View file

@ -9,6 +9,7 @@ from fastapi.middleware.gzip import GZipMiddleware
from starlette import status
from starlette.responses import StreamingResponse, JSONResponse
from chalicelib.core import traces
from chalicelib.utils import helper
from chalicelib.utils import pg_client
from routers import core, core_dynamic, ee, saml
@ -27,21 +28,14 @@ async def or_middleware(request: Request, call_next):
if not unlock.is_valid():
return JSONResponse(content={"errors": ["expired license"]}, status_code=status.HTTP_403_FORBIDDEN)
global OR_SESSION_TOKEN
OR_SESSION_TOKEN = request.headers.get('vnd.openreplay.com.sid', request.headers.get('vnd.asayer.io.sid'))
try:
if helper.TRACK_TIME:
import time
now = int(time.time() * 1000)
response: StreamingResponse = await call_next(request)
if helper.TRACK_TIME:
now = int(time.time() * 1000) - now
if now > 500:
print(f"Execution time: {now} ms")
except Exception as e:
pg_client.close()
raise e
pg_client.close()
if helper.TRACK_TIME:
import time
now = int(time.time() * 1000)
response: StreamingResponse = await call_next(request)
if helper.TRACK_TIME:
now = int(time.time() * 1000) - now
if now > 500:
logging.info(f"Execution time: {now} ms")
return response
@ -74,18 +68,41 @@ app.include_router(insights.app)
app.include_router(v1_api.app_apikey)
app.include_router(v1_api_ee.app_apikey)
app.queue_system = queue.Queue()
loglevel = config("LOGLEVEL", default=logging.INFO)
print(f">Loglevel set to: {loglevel}")
logging.basicConfig(level=loglevel)
ap_logger = logging.getLogger('apscheduler')
ap_logger.setLevel(loglevel)
app.schedule = AsyncIOScheduler()
app.schedule.start()
app.queue_system = queue.Queue()
for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs:
app.schedule.add_job(id=job["func"].__name__, **job)
from chalicelib.core import traces
app.schedule.add_job(id="trace_worker", **traces.cron_jobs[0])
@app.on_event("startup")
async def startup():
logging.info(">>>>> starting up <<<<<")
await pg_client.init()
app.schedule.start()
for job in app.schedule.get_jobs():
print({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)})
for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs + traces.cron_jobs:
app.schedule.add_job(id=job["func"].__name__, **job)
ap_logger.info(">Scheduled jobs:")
for job in app.schedule.get_jobs():
ap_logger.info({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)})
@app.on_event("shutdown")
async def shutdown():
logging.info(">>>>> shutting down <<<<<")
app.schedule.shutdown(wait=True)
await traces.process_traces_queue()
await pg_client.terminate()
@app.get('/private/shutdown', tags=["private"])
async def stop_server():
logging.info("Requested shutdown")
await shutdown()
import os, signal
os.kill(1, signal.SIGTERM)
logging.basicConfig(level=config("LOGLEVEL", default=logging.INFO))
logging.getLogger('apscheduler').setLevel(config("LOGLEVEL", default=logging.INFO))

View file

@ -46,4 +46,18 @@ UPDATE dashboard_widgets
SET config=config || '{"col":4}'
WHERE metric_id IN (SELECT metric_id FROM metrics WHERE metric_type = 'funnel');
CREATE OR REPLACE FUNCTION notify_integration() RETURNS trigger AS
$$
BEGIN
IF NEW IS NULL THEN
PERFORM pg_notify('integration',
jsonb_build_object('project_id', OLD.project_id, 'provider', OLD.provider, 'options',
null)::text);
ELSIF (OLD IS NULL) OR (OLD.options <> NEW.options) THEN
PERFORM pg_notify('integration', row_to_json(NEW)::text);
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
COMMIT;

View file

@ -61,7 +61,8 @@ $$
BEGIN
IF NEW IS NULL THEN
PERFORM pg_notify('integration',
(row_to_json(OLD)::text || '{"options": null, "request_data": null}'::text));
jsonb_build_object('project_id', OLD.project_id, 'provider', OLD.provider, 'options',
null)::text);
ELSIF (OLD IS NULL) OR (OLD.options <> NEW.options) THEN
PERFORM pg_notify('integration', row_to_json(NEW)::text);
END IF;