import logging import urllib from http import cookies from os import environ from urllib.parse import urlparse from decouple import config from fastapi import Request, HTTPException from starlette.datastructures import FormData if config("ENABLE_SSO", cast=bool, default=True): from onelogin.saml2.auth import OneLogin_Saml2_Auth if config("LOCAL_DEV", default=False, cast=bool): API_PREFIX = "" else: API_PREFIX = "/api" SAML2 = { "strict": config("saml_strict", cast=bool, default=True), "debug": config("saml_debug", cast=bool, default=True), "sp": { "entityId": config("SITE_URL") + API_PREFIX + "/sso/saml2/metadata/", "assertionConsumerService": { "url": config("SITE_URL") + API_PREFIX + "/sso/saml2/acs/", "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" }, "singleLogoutService": { "url": config("SITE_URL") + API_PREFIX + "/sso/saml2/sls/", "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" }, "NameIDFormat": "urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress", "x509cert": config("sp_crt", default=""), "privateKey": config("sp_key", default=""), }, "security": { "requestedAuthnContext": False }, "idp": None } # in case tenantKey is included in the URL sp_acs = config("idp_tenantKey", default="") if sp_acs is not None and len(sp_acs) > 0: SAML2["sp"]["assertionConsumerService"]["url"] += sp_acs + "/" idp = None # SAML2 config handler if config("SAML2_MD_URL", default=None) is not None and len(config("SAML2_MD_URL")) > 0: print("SAML2_MD_URL provided, getting IdP metadata config") from onelogin.saml2.idp_metadata_parser import OneLogin_Saml2_IdPMetadataParser idp_data = OneLogin_Saml2_IdPMetadataParser.parse_remote(config("SAML2_MD_URL", default=None)) idp = idp_data.get("idp") if SAML2["idp"] is None: if len(config("idp_entityId", default="")) > 0 \ and len(config("idp_sso_url", default="")) > 0 \ and len(config("idp_x509cert", default="")) > 0: idp = { "entityId": config("idp_entityId"), "singleSignOnService": { "url": config("idp_sso_url"), "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" }, "x509cert": config("idp_x509cert") } if len(config("idp_sls_url", default="")) > 0: idp["singleLogoutService"] = { "url": config("idp_sls_url"), "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" } if idp is None: logging.info("No SAML2 IdP configuration found") else: SAML2["idp"] = idp def init_saml_auth(req): if idp is None: raise Exception("No SAML2 config provided") return OneLogin_Saml2_Auth(req, old_settings=SAML2) async def prepare_request(request: Request): if not is_saml2_available(): raise HTTPException(status_code=401, detail="SSO configuration not available.") request.args = dict(request.query_params).copy() if request.query_params else {} form: FormData = await request.form() request.form = dict(form) cookie_str = request.headers.get("cookie", "") if "session" in cookie_str: cookie = cookies.SimpleCookie() cookie.load(cookie_str) # Even though SimpleCookie is dictionary-like, it internally uses a Morsel object # which is incompatible with requests. Manually construct a dictionary instead. extracted_cookies = {} for key, morsel in cookie.items(): extracted_cookies[key] = morsel.value if "session" not in extracted_cookies: logging.info("!!! session not found in extracted_cookies") logging.info(extracted_cookies) session = extracted_cookies.get("session", {}) else: session = {} # If server is behind proxys or balancers use the HTTP_X_FORWARDED fields headers = request.headers proto = headers.get('x-forwarded-proto', 'http') url_data = urlparse('%s://%s' % (proto, headers['host'])) path = request.url.path site_url = urlparse(config("SITE_URL")) # to support custom port without changing IDP config host_suffix = "" if site_url.port is not None and request.url.port is None: host_suffix = f":{site_url.port}" # add / to /acs if not path.endswith("/"): path = path + '/' if len(API_PREFIX) > 0 and not path.startswith(API_PREFIX): path = API_PREFIX + path return { 'https': 'on' if proto == 'https' else 'off', 'http_host': request.headers['host'] + host_suffix, 'server_port': url_data.port, 'script_name': path, 'get_data': request.args.copy(), # Uncomment if using ADFS as IdP, https://github.com/onelogin/python-saml/pull/144 # 'lowercase_urlencoding': True, 'post_data': request.form.copy(), 'cookie': {"session": session}, 'request': request } def is_saml2_available(): return idp is not None def get_saml2_provider(): return config("idp_name", default="saml2") if is_saml2_available() and len( config("idp_name", default="saml2")) > 0 else None def get_landing_URL(query_params: dict = None, redirect_to_link2=False): if query_params is not None and len(query_params.keys()) > 0: query_params = "?" + urllib.parse.urlencode(query_params) else: query_params = "" if redirect_to_link2: if len(config("sso_landing_override", default="")) == 0: logging.warning("SSO trying to redirect to custom URL, but sso_landing_override env var is empty") else: return config("sso_landing_override") + query_params return config("SITE_URL") + config("sso_landing", default="/login") + query_params environ["hastSAML2"] = str(is_saml2_available())