openreplay/ee/api/or_dependencies.py
2022-11-07 13:10:23 +01:00

59 lines
2.2 KiB
Python

import json
from typing import Callable
from fastapi import HTTPException, Depends
from fastapi import Security
from fastapi.routing import APIRoute
from fastapi.security import SecurityScopes
from starlette import status
from starlette.exceptions import HTTPException
from starlette.requests import Request
from starlette.responses import Response, JSONResponse
import schemas_ee
from chalicelib.core import traces
async def OR_context(request: Request) -> schemas_ee.CurrentContext:
if hasattr(request.state, "currentContext"):
return request.state.currentContext
else:
raise Exception("currentContext not found")
class ORRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
try:
response: Response = await original_route_handler(request)
except HTTPException as e:
if e.status_code // 100 == 4:
response = JSONResponse(content={"errors": [e.detail]}, status_code=e.status_code)
else:
raise e
if isinstance(response, JSONResponse):
response: JSONResponse = response
body = json.loads(response.body.decode('utf8'))
if response.status_code == 200 and body is not None and body.get("errors") is not None:
if "not found" in body["errors"][0]:
response.status_code = status.HTTP_404_NOT_FOUND
else:
response.status_code = status.HTTP_400_BAD_REQUEST
traces.trace(action=self.name, path_format=self.path_format, request=request, response=response)
return response
return custom_route_handler
def __check(security_scopes: SecurityScopes, context: schemas_ee.CurrentContext = Depends(OR_context)):
for scope in security_scopes.scopes:
if scope not in context.permissions:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions")
def OR_scope(*scopes):
return Security(__check, scopes=list(scopes))