Chore/async api one (#1708)

* Async chalice.core.tenants:tenants_exists, and propagate

* rework after review

* typofix

* fix(chalice): small fixes

* fix(api): use a global variable to store singletong...

... that will not work if several POSIX threads are serving
from the same POSIX processus.

* fix(api): pass database argument as dictionary.

ref: https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING

* chore(api): ease debugging with simple return statement.

* NOTES++

---------

Co-authored-by: Taha Yassine Kraiem <tahayk2@gmail.com>
This commit is contained in:
ⵄⵎⵉⵔⵓⵛ 2023-11-27 15:58:02 +01:00 committed by GitHub
parent 6da4d0d398
commit 71c74cd658
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 117 additions and 23 deletions

25
api/NOTES.md Normal file
View file

@ -0,0 +1,25 @@
#### psycopg3 API
I mis-remember the psycopg v2 vs. v3 API.
For the record, the expected psycopg3's async api looks like the
following pseudo code:
```python
async with orpy.get().database.connection() as cnx:
async with cnx.transaction():
row = await cnx.execute("SELECT EXISTS(SELECT 1 FROM public.tenants)")
row = await row.fetchone()
return row["exists"]
```
Minding the following:
- Where `orpy.get().database` is the postgresql connection pooler.
- Wrap explicit transaction with `async with cnx.transaction():
foobar()`
- Most of the time the transaction object is not used;
- Do execute await operation against `cnx`;
- `await cnx.execute` returns a cursor object;
- Do the `await cursor.fetchqux...` calls against the object return by
a call to execute.

View file

@ -2,11 +2,13 @@ import logging
import time
from contextlib import asynccontextmanager
import psycopg_pool
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from decouple import config
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from psycopg import AsyncConnection
from starlette.responses import StreamingResponse
from chalicelib.utils import helper
@ -18,6 +20,15 @@ from routers.subs import insights, metrics, v1_api, health
loglevel = config("LOGLEVEL", default=logging.WARNING)
print(f">Loglevel set to: {loglevel}")
logging.basicConfig(level=loglevel)
import orpy
from psycopg.rows import dict_row
class ORPYAsyncConnection(AsyncConnection):
def __init__(self, *args, **kwargs):
super().__init__(*args, row_factory=dict_row, **kwargs)
@asynccontextmanager
@ -38,10 +49,26 @@ async def lifespan(app: FastAPI):
for job in app.schedule.get_jobs():
ap_logger.info({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)})
database = {
"host": config("pg_host", default="localhost"),
"dbname": config("pg_dbname", default="orpy"),
"user": config("pg_user", default="orpy"),
"password": config("pg_password", default="orpy"),
"port": config("pg_port", cast=int, default=5432),
"application_name": config("APP_NAME", default="PY"),
}
database = psycopg_pool.AsyncConnectionPool(kwargs=database, connection_class=ORPYAsyncConnection)
orpy.set(orpy.Application(
database,
))
# App listening
yield
# Shutdown
await database.close()
logging.info(">>>>> shutting down <<<<<")
app.schedule.shutdown(wait=False)
await pg_client.terminate()

View file

@ -11,10 +11,10 @@ from chalicelib.utils.TimeUTC import TimeUTC
logger = logging.getLogger(__name__)
def create_tenant(data: schemas.UserSignupSchema):
async def create_tenant(data: schemas.UserSignupSchema):
logger.info(f"==== Signup started at {TimeUTC.to_human_readable(TimeUTC.now())} UTC")
errors = []
if tenants.tenants_exists():
if await tenants.tenants_exists():
return {"errors": ["tenants already registered"]}
email = data.email

View file

@ -1,3 +1,4 @@
import orpy
from chalicelib.core import license
from chalicelib.utils import helper
from chalicelib.utils import pg_client
@ -52,7 +53,16 @@ def edit_tenant(tenant_id, changes):
return helper.dict_to_camel_case(cur.fetchone())
def tenants_exists(use_pool=True):
def tenants_exists_sync(use_pool=True):
with pg_client.PostgresClient(use_pool=use_pool) as cur:
cur.execute(f"SELECT EXISTS(SELECT 1 FROM public.tenants)")
return cur.fetchone()["exists"]
cur.execute("SELECT EXISTS(SELECT 1 FROM public.tenants)")
out = cur.fetchone()["exists"]
return out
async def tenants_exists(use_pool=True):
async with orpy.get().database.connection() as cnx:
async with cnx.transaction() as txn:
row = await cnx.execute("SELECT EXISTS(SELECT 1 FROM public.tenants)")
row = await row.fetchone()
return row["exists"]

20
api/orpy.py Normal file
View file

@ -0,0 +1,20 @@
from collections import namedtuple
Application = namedtuple(
"Application",
(
"database",
),
)
APPLICATION = None
def set(application):
global APPLICATION
APPLICATION = application
def get():
return APPLICATION

View file

@ -16,3 +16,5 @@ pydantic[email]==2.3.0
apscheduler==3.10.4
redis==5.0.1
psycopg[pool,binary]==3.1.12

View file

@ -22,19 +22,20 @@ public_app, app, app_apikey = get_routers()
@public_app.get('/signup', tags=['signup'])
def get_all_signup():
return {"data": {"tenants": tenants.tenants_exists(),
async def get_all_signup():
return {"data": {"tenants": await tenants.tenants_exists(),
"sso": None,
"ssoProvider": None,
"enforceSSO": None,
"edition": license.EDITION}}
if not tenants.tenants_exists(use_pool=False):
if not tenants.tenants_exists_sync(use_pool=False):
@public_app.post('/signup', tags=['signup'])
@public_app.put('/signup', tags=['signup'])
def signup_handler(data: schemas.UserSignupSchema = Body(...)):
content = signup.create_tenant(data)
async def signup_handler(data: schemas.UserSignupSchema = Body(...)):
content = await signup.create_tenant(data)
if "errors" in content:
return content
refresh_token = content.pop("refreshToken")

View file

@ -14,10 +14,10 @@ def get_global_health_status():
return {"data": health.get_health()}
if not tenants.tenants_exists(use_pool=False):
if not tenants.tenants_exists_sync(use_pool=False):
@public_app.get('/health', tags=["health-check"])
def get_public_health_status():
if tenants.tenants_exists():
async def get_public_health_status():
if await tenants.tenants_exists():
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Not Found")
return {"data": health.get_health()}

View file

@ -13,10 +13,10 @@ from chalicelib.utils.TimeUTC import TimeUTC
logger = logging.getLogger(__name__)
def create_tenant(data: schemas.UserSignupSchema):
async def create_tenant(data: schemas.UserSignupSchema):
logger.info(f"==== Signup started at {TimeUTC.to_human_readable(TimeUTC.now())} UTC")
errors = []
if not config("MULTI_TENANTS", cast=bool, default=False) and tenants.tenants_exists():
if not config("MULTI_TENANTS", cast=bool, default=False) and await tenants.tenants_exists():
return {"errors": ["tenants already registered"]}
email = data.email

View file

@ -78,7 +78,14 @@ def edit_tenant(tenant_id, changes):
return helper.dict_to_camel_case(cur.fetchone())
def tenants_exists(use_pool=True):
def tenants_exists_sync(use_pool=True):
with pg_client.PostgresClient(use_pool=use_pool) as cur:
cur.execute(f"SELECT EXISTS(SELECT 1 FROM public.tenants)")
cur.execute("SELECT EXISTS(SELECT 1 FROM public.tenants)")
return cur.fetchone()["exists"]
async def tenants_exists(use_pool=True):
async with application.get().database as cnx:
async with cnx.transaction() as txn:
row = await txn.execute(f"SELECT EXISTS(SELECT 1 FROM public.tenants)")
return await row.fetchone()["exists"]

View file

@ -26,3 +26,5 @@ python-multipart==0.0.6
redis==5.0.1
#confluent-kafka==2.1.0
azure-storage-blob==12.19.0
psycopg[pool,binary]==3.1.12

View file

@ -27,8 +27,8 @@ public_app, app, app_apikey = get_routers()
@public_app.get('/signup', tags=['signup'])
def get_all_signup():
return {"data": {"tenants": tenants.tenants_exists(),
async def get_all_signup():
return {"data": {"tenants": await tenants.tenants_exists(),
"sso": SAML2_helper.is_saml2_available(),
"ssoProvider": SAML2_helper.get_saml2_provider(),
"enforceSSO": config("enforce_SSO", cast=bool, default=False) and helper.is_saml2_available(),
@ -38,8 +38,8 @@ def get_all_signup():
if config("MULTI_TENANTS", cast=bool, default=False) or not tenants.tenants_exists(use_pool=False):
@public_app.post('/signup', tags=['signup'])
@public_app.put('/signup', tags=['signup'])
def signup_handler(data: schemas.UserSignupSchema = Body(...)):
content = signup.create_tenant(data)
async def signup_handler(data: schemas.UserSignupSchema = Body(...)):
content = await signup.create_tenant(data)
if "errors" in content:
return content
refresh_token = content.pop("refreshToken")

View file

@ -16,8 +16,8 @@ def get_global_health_status(context: schemas.CurrentContext = Depends(OR_contex
if not tenants.tenants_exists(use_pool=False):
@public_app.get('/health', tags=["health-check"])
def get_public_health_status():
if tenants.tenants_exists():
async def get_public_health_status():
if await tenants.tenants_exists():
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Not Found")
return {"data": health.get_health()}