From 3e3d63936235ae81c8b6313641d5c22c9dfa8181 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=B5=84=E2=B5=8E=E2=B5=89=E2=B5=94=E2=B5=93=E2=B5=9B?= Date: Fri, 24 Nov 2023 14:27:28 +0100 Subject: [PATCH] Chore(api): Async chalice.core.tenants:tenants_exists, and propagate. (#1695) --- api/app.py | 26 ++++++++++++++++++++++++++ api/chalicelib/core/signup.py | 4 ++-- api/chalicelib/core/tenants.py | 11 +++++++++-- api/orpy.py | 11 +++++++++++ api/requirements.txt | 2 ++ api/routers/core_dynamic.py | 11 ++++++----- api/routers/subs/health.py | 6 +++--- ee/api/chalicelib/core/signup.py | 4 ++-- ee/api/chalicelib/core/tenants.py | 11 +++++++++-- ee/api/requirements.txt | 2 ++ ee/api/routers/core_dynamic.py | 8 ++++---- ee/api/routers/subs/health.py | 4 ++-- 12 files changed, 78 insertions(+), 22 deletions(-) create mode 100644 api/orpy.py diff --git a/api/app.py b/api/app.py index 66f49d064..17e9ce099 100644 --- a/api/app.py +++ b/api/app.py @@ -18,6 +18,15 @@ from routers.subs import insights, metrics, v1_api, health loglevel = config("LOGLEVEL", default=logging.WARNING) print(f">Loglevel set to: {loglevel}") logging.basicConfig(level=loglevel) +from orpy import application +from psycopg.rows import dict_row + + +class ORPYAsyncConnection(AsyncConnection): + + def __init__(self, *args, **kwargs): + super().__init__(*args, row_factory=dict_row, **kwargs) + @asynccontextmanager @@ -38,10 +47,27 @@ async def lifespan(app: FastAPI): for job in app.schedule.get_jobs(): ap_logger.info({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)}) + + database = { + "host": config("pg_host", default="localhost"), + "dbname": config("pg_dbname", default="orpy"), + "user": config("pg_user", default="orpy"), + "password": config("pg_password", default="orpy"), + "port": config("pg_port", cast=int, default=5432), + "application_name": config("APP_NAME", default="PY"), + } + + database = " ".join("{}={}".format(k, v) for k, v in database.items()) + database = psycopg_pool.AsyncConnectionPool(database, connection_class=ORPYAsyncConnection) + application.set(Application( + database, + ) + # App listening yield # Shutdown + database.close() logging.info(">>>>> shutting down <<<<<") app.schedule.shutdown(wait=False) await pg_client.terminate() diff --git a/api/chalicelib/core/signup.py b/api/chalicelib/core/signup.py index 233a96578..e230bc1bd 100644 --- a/api/chalicelib/core/signup.py +++ b/api/chalicelib/core/signup.py @@ -11,10 +11,10 @@ from chalicelib.utils.TimeUTC import TimeUTC logger = logging.getLogger(__name__) -def create_tenant(data: schemas.UserSignupSchema): +async def create_tenant(data: schemas.UserSignupSchema): logger.info(f"==== Signup started at {TimeUTC.to_human_readable(TimeUTC.now())} UTC") errors = [] - if tenants.tenants_exists(): + if await tenants.tenants_exists(): return {"errors": ["tenants already registered"]} email = data.email diff --git a/api/chalicelib/core/tenants.py b/api/chalicelib/core/tenants.py index 7a4e1a451..e6cf46f4d 100644 --- a/api/chalicelib/core/tenants.py +++ b/api/chalicelib/core/tenants.py @@ -52,7 +52,14 @@ def edit_tenant(tenant_id, changes): return helper.dict_to_camel_case(cur.fetchone()) -def tenants_exists(use_pool=True): +def tenants_exists_sync(use_pool=True): with pg_client.PostgresClient(use_pool=use_pool) as cur: - cur.execute(f"SELECT EXISTS(SELECT 1 FROM public.tenants)") + cur.execute("SELECT EXISTS(SELECT 1 FROM public.tenants)") return cur.fetchone()["exists"] + + +async def tenants_exists(use_pool=True): + async with application.get().database.connection() as cnx: + async with cnx.transaction() as txn: + row = await txn.execute("SELECT EXISTS(SELECT 1 FROM public.tenants)") + return await row.fetchone()["exists"] diff --git a/api/orpy.py b/api/orpy.py new file mode 100644 index 000000000..7f7fce76a --- /dev/null +++ b/api/orpy.py @@ -0,0 +1,11 @@ +from collections import namedtuple +from contextvars import ContextVar + + +Application = namedtuple( + "Application", + ( + "database", + ), +) +application: Application = ContextVar("application", default=None) diff --git a/api/requirements.txt b/api/requirements.txt index 8f8c2a358..d73f25c21 100644 --- a/api/requirements.txt +++ b/api/requirements.txt @@ -16,3 +16,5 @@ pydantic[email]==2.3.0 apscheduler==3.10.4 redis==5.0.1 + +psycopg[pool,binary]==3.1.12 diff --git a/api/routers/core_dynamic.py b/api/routers/core_dynamic.py index 96acc4097..dfadd05c7 100644 --- a/api/routers/core_dynamic.py +++ b/api/routers/core_dynamic.py @@ -22,19 +22,20 @@ public_app, app, app_apikey = get_routers() @public_app.get('/signup', tags=['signup']) -def get_all_signup(): - return {"data": {"tenants": tenants.tenants_exists(), +async def get_all_signup(): + return {"data": {"tenants": await tenants.tenants_exists(), "sso": None, "ssoProvider": None, "enforceSSO": None, "edition": license.EDITION}} -if not tenants.tenants_exists(use_pool=False): + +if not tenants.tenants_exists_sync(use_pool=False): @public_app.post('/signup', tags=['signup']) @public_app.put('/signup', tags=['signup']) - def signup_handler(data: schemas.UserSignupSchema = Body(...)): - content = signup.create_tenant(data) + async def signup_handler(data: schemas.UserSignupSchema = Body(...)): + content = await signup.create_tenant(data) if "errors" in content: return content refresh_token = content.pop("refreshToken") diff --git a/api/routers/subs/health.py b/api/routers/subs/health.py index 58d5625b7..245f039c7 100644 --- a/api/routers/subs/health.py +++ b/api/routers/subs/health.py @@ -14,10 +14,10 @@ def get_global_health_status(): return {"data": health.get_health()} -if not tenants.tenants_exists(use_pool=False): +if not tenants.tenants_exists_sync(use_pool=False): @public_app.get('/health', tags=["health-check"]) - def get_public_health_status(): - if tenants.tenants_exists(): + async def get_public_health_status(): + if await tenants.tenants_exists(): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Not Found") return {"data": health.get_health()} diff --git a/ee/api/chalicelib/core/signup.py b/ee/api/chalicelib/core/signup.py index 5e06ae224..79b7d6d9f 100644 --- a/ee/api/chalicelib/core/signup.py +++ b/ee/api/chalicelib/core/signup.py @@ -13,10 +13,10 @@ from chalicelib.utils.TimeUTC import TimeUTC logger = logging.getLogger(__name__) -def create_tenant(data: schemas.UserSignupSchema): +async def create_tenant(data: schemas.UserSignupSchema): logger.info(f"==== Signup started at {TimeUTC.to_human_readable(TimeUTC.now())} UTC") errors = [] - if not config("MULTI_TENANTS", cast=bool, default=False) and tenants.tenants_exists(): + if not config("MULTI_TENANTS", cast=bool, default=False) and await tenants.tenants_exists(): return {"errors": ["tenants already registered"]} email = data.email diff --git a/ee/api/chalicelib/core/tenants.py b/ee/api/chalicelib/core/tenants.py index 5f245188a..91ef6edb6 100644 --- a/ee/api/chalicelib/core/tenants.py +++ b/ee/api/chalicelib/core/tenants.py @@ -78,7 +78,14 @@ def edit_tenant(tenant_id, changes): return helper.dict_to_camel_case(cur.fetchone()) -def tenants_exists(use_pool=True): +def tenants_exists_sync(use_pool=True): with pg_client.PostgresClient(use_pool=use_pool) as cur: - cur.execute(f"SELECT EXISTS(SELECT 1 FROM public.tenants)") + cur.execute("SELECT EXISTS(SELECT 1 FROM public.tenants)") return cur.fetchone()["exists"] + + +async def tenants_exists(use_pool=True): + async with application.get().database as cnx: + async with cnx.transaction() as txn: + row = await txn.execute(f"SELECT EXISTS(SELECT 1 FROM public.tenants)") + return await row.fetchone()["exists"] diff --git a/ee/api/requirements.txt b/ee/api/requirements.txt index 27a18912c..f06132394 100644 --- a/ee/api/requirements.txt +++ b/ee/api/requirements.txt @@ -26,3 +26,5 @@ python-multipart==0.0.6 redis==5.0.1 #confluent-kafka==2.1.0 azure-storage-blob==12.19.0 + +psycopg[pool,binary]==3.1.12 diff --git a/ee/api/routers/core_dynamic.py b/ee/api/routers/core_dynamic.py index 20cc71d5d..274a4d381 100644 --- a/ee/api/routers/core_dynamic.py +++ b/ee/api/routers/core_dynamic.py @@ -27,8 +27,8 @@ public_app, app, app_apikey = get_routers() @public_app.get('/signup', tags=['signup']) -def get_all_signup(): - return {"data": {"tenants": tenants.tenants_exists(), +async def get_all_signup(): + return {"data": {"tenants": await tenants.tenants_exists(), "sso": SAML2_helper.is_saml2_available(), "ssoProvider": SAML2_helper.get_saml2_provider(), "enforceSSO": config("enforce_SSO", cast=bool, default=False) and helper.is_saml2_available(), @@ -38,8 +38,8 @@ def get_all_signup(): if config("MULTI_TENANTS", cast=bool, default=False) or not tenants.tenants_exists(use_pool=False): @public_app.post('/signup', tags=['signup']) @public_app.put('/signup', tags=['signup']) - def signup_handler(data: schemas.UserSignupSchema = Body(...)): - content = signup.create_tenant(data) + async def signup_handler(data: schemas.UserSignupSchema = Body(...)): + content = await signup.create_tenant(data) if "errors" in content: return content refresh_token = content.pop("refreshToken") diff --git a/ee/api/routers/subs/health.py b/ee/api/routers/subs/health.py index aacdf3383..7a34a77f6 100644 --- a/ee/api/routers/subs/health.py +++ b/ee/api/routers/subs/health.py @@ -16,8 +16,8 @@ def get_global_health_status(context: schemas.CurrentContext = Depends(OR_contex if not tenants.tenants_exists(use_pool=False): @public_app.get('/health', tags=["health-check"]) - def get_public_health_status(): - if tenants.tenants_exists(): + async def get_public_health_status(): + if await tenants.tenants_exists(): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Not Found") return {"data": health.get_health()}