openreplay/ee/api/chalicelib/utils/SAML2_helper.py
2022-10-04 08:43:55 +02:00

131 lines
No EOL
4.5 KiB
Python

from http import cookies
from os import environ
from urllib.parse import urlparse
from decouple import config
from fastapi import Request
from onelogin.saml2.auth import OneLogin_Saml2_Auth
from starlette.datastructures import FormData
SAML2 = {
"strict": True,
"debug": True,
"sp": {
"entityId": config("SITE_URL") + "/api/sso/saml2/metadata/",
"assertionConsumerService": {
"url": config("SITE_URL") + "/api/sso/saml2/acs/",
"binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST"
},
"singleLogoutService": {
"url": config("SITE_URL") + "/api/sso/saml2/sls/",
"binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
},
"NameIDFormat": "urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress",
"x509cert": "",
"privateKey": ""
},
"idp": None
}
# in case tenantKey is included in the URL
sp_acs = config("idp_tenantKey", default="")
if sp_acs is not None and len(sp_acs) > 0:
SAML2["sp"]["assertionConsumerService"]["url"] += sp_acs + "/"
idp = None
# SAML2 config handler
if config("SAML2_MD_URL", default=None) is not None and len(config("SAML2_MD_URL")) > 0:
print("SAML2_MD_URL provided, getting IdP metadata config")
from onelogin.saml2.idp_metadata_parser import OneLogin_Saml2_IdPMetadataParser
idp_data = OneLogin_Saml2_IdPMetadataParser.parse_remote(config("SAML2_MD_URL", default=None))
idp = idp_data.get("idp")
if SAML2["idp"] is None:
if len(config("idp_entityId", default="")) > 0 \
and len(config("idp_sso_url", default="")) > 0 \
and len(config("idp_x509cert", default="")) > 0:
idp = {
"entityId": config("idp_entityId"),
"singleSignOnService": {
"url": config("idp_sso_url"),
"binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
},
"x509cert": config("idp_x509cert")
}
if len(config("idp_sls_url", default="")) > 0:
idp["singleLogoutService"] = {
"url": config("idp_sls_url"),
"binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
}
if idp is None:
print("No SAML2 IdP configuration found")
else:
SAML2["idp"] = idp
def init_saml_auth(req):
# auth = OneLogin_Saml2_Auth(req, custom_base_path=environ['SAML_PATH'])
if idp is None:
raise Exception("No SAML2 config provided")
return OneLogin_Saml2_Auth(req, old_settings=SAML2)
async def prepare_request(request: Request):
request.args = dict(request.query_params).copy() if request.query_params else {}
form: FormData = await request.form()
request.form = dict(form)
cookie_str = request.headers.get("cookie", "")
if "session" in cookie_str:
cookie = cookies.SimpleCookie()
cookie.load(cookie_str)
# Even though SimpleCookie is dictionary-like, it internally uses a Morsel object
# which is incompatible with requests. Manually construct a dictionary instead.
extracted_cookies = {}
for key, morsel in cookie.items():
extracted_cookies[key] = morsel.value
session = extracted_cookies["session"]
else:
session = {}
# If server is behind proxys or balancers use the HTTP_X_FORWARDED fields
headers = request.headers
proto = headers.get('x-forwarded-proto', 'http')
if headers.get('x-forwarded-proto') is not None:
print(f"x-forwarded-proto: {proto}")
url_data = urlparse('%s://%s' % (proto, headers['host']))
path = request.url.path
# add / to /acs
if not path.endswith("/"):
path = path + '/'
if not path.startswith("/api"):
path = "/api" + path
return {
'https': 'on' if proto == 'https' else 'off',
'http_host': request.headers['host'],
'server_port': url_data.port,
'script_name': path,
'get_data': request.args.copy(),
# Uncomment if using ADFS as IdP, https://github.com/onelogin/python-saml/pull/144
# 'lowercase_urlencoding': True,
'post_data': request.form.copy(),
'cookie': {"session": session},
'request': request
}
def is_saml2_available():
return idp is not None
def get_saml2_provider():
return config("idp_name", default="saml2") if is_saml2_available() and len(
config("idp_name", default="saml2")) > 0 else None
def get_landing_URL(jwt):
return config("SITE_URL") + config("sso_landing", default="/login?jwt=%s") % jwt
environ["hastSAML2"] = str(is_saml2_available())