* feat(DB): rearranged queries feat(DB): ready for v1.15.0 * refactor(chalice): upgraded dependencies refactor(crons): upgraded dependencies refactor(alerts): upgraded dependencies * feat(chalice): upgraded dependencies feat(alerts): upgraded dependencies feat(crons): upgraded dependencies * fix(chalice): remove urllib3 dependency * feat(chalice): remove FOSS to pydantic v2 * fix(chalice): freeze urllib3 to not have conflicts between boto3 and requests * feat(chalice): refactoring schema in progress * feat(chalice): refactoring schema in progress * feat(chalice): refactoring schema in progress * feat(chalice): refactoring schema in progress feat(chalice): upgraded dependencies * feat(chalice): refactored schema * fix(chalice): pull rebase dev * feat(DB): transfer size support * feat(chalice): support service account * feat(chalice): support service account * fix(chalice): fixed refactored PayloadSchema-name * feat(chalice): path analysis * feat(DB): timezone support * feat(chalice): upgraded dependencies feat(alerts): upgraded dependencies feat(crons): upgraded dependencies feat(assist): upgraded dependencies feat(sourcemaps): upgraded dependencies * feat(chalice): path analysis schema changes * feat(chalice): path analysis query change * feat(chalice): path analysis query change * feat(chalice): path analysis changes * feat(chalice): upgraded dependencies * feat(chalice): simple hide minor paths * feat(chalice): path analysis density * feat(chalice): support project's platform * feat(chalice): upgraded dependencies feat(chalice): path analysis specific event type for startPoint feat(chalice): path analysis specific event type for endPoint feat(chalice): path analysis specific event type for exclude * refactoring(chalice): migrated the rest of schema to pydantic v2 * refactoring(chalice): upgraded dependencies refactoring(alerts): upgraded dependencies refactoring(crons): upgraded dependencies refactoring(peers): upgraded dependencies refactoring(assist): upgraded dependencies refactoring(sourcemaps-reader): upgraded dependencies * refactoring(chalice): upgraded dependencies refactoring(alerts): upgraded dependencies refactoring(crons): upgraded dependencies refactoring(peers): upgraded dependencies refactoring(assist): upgraded dependencies refactoring(sourcemaps-reader): upgraded dependencies
75 lines
3.2 KiB
Python
75 lines
3.2 KiB
Python
import json
|
|
from typing import Callable
|
|
|
|
from fastapi import HTTPException, Depends
|
|
from fastapi import Security
|
|
from fastapi.routing import APIRoute
|
|
from fastapi.security import SecurityScopes
|
|
from starlette import status
|
|
from starlette.exceptions import HTTPException
|
|
from starlette.requests import Request
|
|
from starlette.responses import Response, JSONResponse
|
|
|
|
import schemas
|
|
from chalicelib.utils import helper
|
|
from chalicelib.core import traces
|
|
|
|
|
|
async def OR_context(request: Request) -> schemas.CurrentContext:
|
|
if hasattr(request.state, "currentContext"):
|
|
return request.state.currentContext
|
|
else:
|
|
raise Exception("currentContext not found")
|
|
|
|
|
|
class ORRoute(APIRoute):
|
|
def get_route_handler(self) -> Callable:
|
|
original_route_handler = super().get_route_handler()
|
|
|
|
async def custom_route_handler(request: Request) -> Response:
|
|
try:
|
|
response: Response = await original_route_handler(request)
|
|
except HTTPException as e:
|
|
if e.status_code // 100 == 4:
|
|
response = JSONResponse(content={"errors": [e.detail]}, status_code=e.status_code)
|
|
else:
|
|
raise e
|
|
|
|
if isinstance(response, JSONResponse):
|
|
response: JSONResponse = response
|
|
body = json.loads(response.body.decode('utf8'))
|
|
body = helper.cast_session_id_to_string(body)
|
|
response = JSONResponse(content=body, status_code=response.status_code,
|
|
headers={k: v for k, v in response.headers.items() if k != "content-length"},
|
|
media_type=response.media_type, background=response.background)
|
|
if response.status_code == 200 \
|
|
and body is not None and isinstance(body, dict) \
|
|
and body.get("errors") is not None:
|
|
if "not found" in body["errors"][0]:
|
|
response.status_code = status.HTTP_404_NOT_FOUND
|
|
else:
|
|
response.status_code = status.HTTP_400_BAD_REQUEST
|
|
traces.trace(action=self.name, path_format=self.path_format, request=request, response=response)
|
|
return response
|
|
|
|
return custom_route_handler
|
|
|
|
|
|
def __check(security_scopes: SecurityScopes, context: schemas.CurrentContext = Depends(OR_context)):
|
|
s_p = 0
|
|
for scope in security_scopes.scopes:
|
|
if isinstance(scope, schemas_ee.ServicePermissions):
|
|
s_p += 1
|
|
if context.service_account and not isinstance(scope, schemas_ee.ServicePermissions) \
|
|
or not context.service_account and not isinstance(scope, schemas_ee.Permissions):
|
|
continue
|
|
if scope not in context.permissions:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Not enough permissions")
|
|
if context.service_account and s_p == 0:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Not enough permissions (service account)")
|
|
|
|
|
|
def OR_scope(*scopes):
|
|
return Security(__check, scopes=list(scopes))
|