60 lines
2.2 KiB
Python
60 lines
2.2 KiB
Python
import json
|
|
from typing import Callable
|
|
|
|
from fastapi import HTTPException, Depends
|
|
from fastapi import Security
|
|
from fastapi.routing import APIRoute
|
|
from fastapi.security import SecurityScopes
|
|
from starlette import status
|
|
from starlette.exceptions import HTTPException
|
|
from starlette.requests import Request
|
|
from starlette.responses import Response, JSONResponse
|
|
|
|
import schemas_ee
|
|
from chalicelib.core import traces
|
|
from chalicelib.core import permissions
|
|
|
|
|
|
async def OR_context(request: Request) -> schemas_ee.CurrentContext:
|
|
if hasattr(request.state, "currentContext"):
|
|
return request.state.currentContext
|
|
else:
|
|
raise Exception("currentContext not found")
|
|
|
|
|
|
class ORRoute(APIRoute):
|
|
def get_route_handler(self) -> Callable:
|
|
original_route_handler = super().get_route_handler()
|
|
|
|
async def custom_route_handler(request: Request) -> Response:
|
|
try:
|
|
response: Response = await original_route_handler(request)
|
|
except HTTPException as e:
|
|
if e.status_code // 100 == 4:
|
|
response = JSONResponse(content={"errors": [e.detail]}, status_code=e.status_code)
|
|
else:
|
|
raise e
|
|
|
|
if isinstance(response, JSONResponse):
|
|
response: JSONResponse = response
|
|
body = json.loads(response.body.decode('utf8'))
|
|
if response.status_code == 200 and body is not None and body.get("errors") is not None:
|
|
if "not found" in body["errors"][0]:
|
|
response.status_code = status.HTTP_404_NOT_FOUND
|
|
else:
|
|
response.status_code = status.HTTP_400_BAD_REQUEST
|
|
traces.trace(action=self.name, path_format=self.path_format, request=request, response=response)
|
|
return response
|
|
|
|
return custom_route_handler
|
|
|
|
|
|
def __check(security_scopes: SecurityScopes, context: schemas_ee.CurrentContext = Depends(OR_context)):
|
|
for scope in security_scopes.scopes:
|
|
if scope not in context.permissions:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Not enough permissions")
|
|
|
|
|
|
def OR_scope(*scopes):
|
|
return Security(__check, scopes=list(scopes))
|