fix(api): copy async code change to ee edition

This commit is contained in:
ⵄⵎⵉⵔⵓⵛ ⴰⵎⴰⵣⵉⵖ 2023-11-30 14:03:53 +01:00
parent f5e1de089c
commit 52ad41727d
4 changed files with 36 additions and 6 deletions

View file

@ -3,11 +3,13 @@ import queue
import time
from contextlib import asynccontextmanager
import psycopg_pool
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from decouple import config
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from psycopg import AsyncConnection
from starlette import status
from starlette.responses import StreamingResponse, JSONResponse
@ -27,6 +29,15 @@ from routers.subs import v1_api, health
loglevel = config("LOGLEVEL", default=logging.WARNING)
print(f">Loglevel set to: {loglevel}")
logging.basicConfig(level=loglevel)
import orpy
from psycopg.rows import dict_row
class ORPYAsyncConnection(AsyncConnection):
def __init__(self, *args, **kwargs):
super().__init__(*args, row_factory=dict_row, **kwargs)
@asynccontextmanager
@ -49,10 +60,26 @@ async def lifespan(app: FastAPI):
for job in app.schedule.get_jobs():
ap_logger.info({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)})
database = {
"host": config("pg_host", default="localhost"),
"dbname": config("pg_dbname", default="orpy"),
"user": config("pg_user", default="orpy"),
"password": config("pg_password", default="orpy"),
"port": config("pg_port", cast=int, default=5432),
"application_name": config("APP_NAME", default="PY"),
}
database = psycopg_pool.AsyncConnectionPool(kwargs=database, connection_class=ORPYAsyncConnection)
orpy.set(orpy.Application(
database,
))
# App listening
yield
# Shutdown
await database.close()
logging.info(">>>>> shutting down <<<<<")
app.schedule.shutdown(wait=True)
await traces.process_traces_queue()

View file

@ -1,3 +1,4 @@
import orpy
from chalicelib.core import license
from chalicelib.utils import helper
from chalicelib.utils import pg_client
@ -81,11 +82,13 @@ def edit_tenant(tenant_id, changes):
def tenants_exists_sync(use_pool=True):
with pg_client.PostgresClient(use_pool=use_pool) as cur:
cur.execute("SELECT EXISTS(SELECT 1 FROM public.tenants)")
return cur.fetchone()["exists"]
out = cur.fetchone()["exists"]
return out
async def tenants_exists(use_pool=True):
async with application.get().database as cnx:
async with orpy.get().database.connection() as cnx:
async with cnx.transaction() as txn:
row = await txn.execute(f"SELECT EXISTS(SELECT 1 FROM public.tenants)")
return await row.fetchone()["exists"]
row = await cnx.execute("SELECT EXISTS(SELECT 1 FROM public.tenants)")
row = await row.fetchone()
return row["exists"]

View file

@ -35,7 +35,7 @@ async def get_all_signup():
"edition": license.EDITION}}
if config("MULTI_TENANTS", cast=bool, default=False) or not tenants.tenants_exists(use_pool=False):
if config("MULTI_TENANTS", cast=bool, default=False) or not tenants.tenants_exists_sync(use_pool=False):
@public_app.post('/signup', tags=['signup'])
@public_app.put('/signup', tags=['signup'])
async def signup_handler(data: schemas.UserSignupSchema = Body(...)):

View file

@ -14,7 +14,7 @@ def get_global_health_status(context: schemas.CurrentContext = Depends(OR_contex
return {"data": health.get_health(tenant_id=context.tenant_id)}
if not tenants.tenants_exists(use_pool=False):
if not tenants.tenants_exists_sync(use_pool=False):
@public_app.get('/health', tags=["health-check"])
async def get_public_health_status():
if await tenants.tenants_exists():