openreplay/api/auth/auth_jwt.py
Kraiem Taha Yassine 19935dc105
Patch/v1.14.0 api (#1432)
* change(ui): iframe handle routes

* chore(helm): Updating frontend image release

* feat(chalice): support service account

* feat(chalice): support service account

---------

Co-authored-by: Shekar Siri <sshekarsiri@gmail.com>
Co-authored-by: rjshrjndrn <rjshrjndrn@gmail.com>
2023-08-01 17:01:12 +02:00

53 lines
2.8 KiB
Python

from typing import Optional
from fastapi import Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from starlette import status
from starlette.exceptions import HTTPException
from chalicelib.core import authorizers, users
import schemas
class JWTAuth(HTTPBearer):
def __init__(self, auto_error: bool = True):
super(JWTAuth, self).__init__(auto_error=auto_error)
async def __call__(self, request: Request) -> Optional[schemas.CurrentContext]:
credentials: HTTPAuthorizationCredentials = await super(JWTAuth, self).__call__(request)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid authentication scheme.")
jwt_payload = authorizers.jwt_authorizer(scheme=credentials.scheme, token=credentials.credentials)
auth_exists = jwt_payload is not None \
and users.auth_exists(user_id=jwt_payload.get("userId", -1),
tenant_id=jwt_payload.get("tenantId", -1),
jwt_iat=jwt_payload.get("iat", 100),
jwt_aud=jwt_payload.get("aud", ""))
if jwt_payload is None \
or jwt_payload.get("iat") is None or jwt_payload.get("aud") is None \
or not auth_exists:
if jwt_payload is not None:
print(jwt_payload)
if jwt_payload.get("iat") is None:
print("JWTAuth: iat is None")
if jwt_payload.get("aud") is None:
print("JWTAuth: aud is None")
if not auth_exists:
print("JWTAuth: not users.auth_exists")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid token or expired token.")
user = users.get(user_id=jwt_payload.get("userId", -1), tenant_id=jwt_payload.get("tenantId", -1))
if user is None:
print("JWTAuth: User not found.")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User not found.")
jwt_payload["authorizer_identity"] = "jwt"
request.state.authorizer_identity = "jwt"
request.state.currentContext = schemas.CurrentContext(tenant_id=jwt_payload.get("tenantId", -1),
user_id=jwt_payload.get("userId", -1),
email=user["email"])
return request.state.currentContext
else:
print("JWTAuth: Invalid authorization code.")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid authorization code.")