feat(api): changed SSO assertion handler

feat(api): removed SLO endpoint
This commit is contained in:
Taha Yassine Kraiem 2021-12-02 19:35:09 +01:00
parent 779d4fba44
commit 1216ed74ad
2 changed files with 29 additions and 78 deletions

View file

@ -45,25 +45,6 @@ def process_sso_assertion():
if 'AuthNRequestID' in session:
del session['AuthNRequestID']
user_data = auth.get_attributes()
# session['samlUserdata'] = user_data
# session['samlNameId'] = auth.get_nameid()
# session['samlNameIdFormat'] = auth.get_nameid_format()
# session['samlNameIdNameQualifier'] = auth.get_nameid_nq()
# session['samlNameIdSPNameQualifier'] = auth.get_nameid_spnq()
# session['samlSessionIndex'] = auth.get_session_index()
# session['samlSessionExpiration'] = auth.get_session_expiration()
# print('>>>>')
# print(session)
# ---- ignore relay-state
# self_url = OneLogin_Saml2_Utils.get_self_url(req)
# if 'RelayState' in request.form and self_url != request.form['RelayState']:
# print("====>redirect to")
# print("====>redirect to")
# return Response(
# status_code=307,
# body='',
# headers={'Location': auth.redirect_to(request.form['RelayState']), 'Content-Type': 'text/plain'})
elif auth.get_settings().is_debug_active():
error_reason = auth.get_last_error_reason()
return {"errors": [error_reason]}
@ -74,64 +55,34 @@ def process_sso_assertion():
existing = users.get_by_email_only(auth.get_nameid())
internal_id = next(iter(user_data.get("internalId", [])), None)
if len(existing) == 0 or existing[0].get("origin") is None:
tenant_key = user_data.get("tenantKey", [])
if len(tenant_key) == 0:
print("tenantKey not present in assertion")
return Response(
status_code=307,
body={"errors": ["tenantKey not present in assertion"]},
headers={'Location': auth.redirect_to(request.form['RelayState']), 'Content-Type': 'text/plain'})
else:
t = tenants.get_by_tenant_key(tenant_key[0])
if t is None:
return Response(
status_code=307,
body={"errors": ["Unknown tenantKey"]},
headers={'Location': auth.redirect_to(request.form['RelayState']), 'Content-Type': 'text/plain'})
if len(existing) == 0:
print("== new user ==")
users.create_sso_user(tenant_id=t['tenantId'], email=email, admin=True,
origin=SAML2_helper.get_saml2_provider(),
name=" ".join(user_data.get("firstName", []) + user_data.get("lastName", [])),
internal_id=internal_id)
else:
existing = existing[0]
if existing.get("origin") is None:
print(f"== migrating user to {SAML2_helper.get_saml2_provider()} ==")
users.update(tenant_id=t['tenantId'], user_id=existing["id"],
changes={"origin": SAML2_helper.get_saml2_provider(), "internal_id": internal_id})
tenant_key = user_data.get("tenantKey", [])
if len(tenant_key) == 0:
print("tenantKey not present in assertion, please check your SP-assertion-configuration")
return {"errors": ["tenantKey not present in assertion, please check your SP-assertion-configuration"]}
else:
t = tenants.get_by_tenant_key(tenant_key[0])
if t is None:
print("invalid tenantKey, please copy the correct value from Preferences > Account")
return {"errors": ["invalid tenantKey, please copy the correct value from Preferences > Account"]}
if existing is None:
print("== new user ==")
users.create_sso_user(tenant_id=t['tenantId'], email=email, admin=True,
origin=SAML2_helper.get_saml2_provider(),
name=" ".join(user_data.get("firstName", []) + user_data.get("lastName", [])),
internal_id=internal_id)
else:
if existing.get("origin") is None:
print(f"== migrating user to {SAML2_helper.get_saml2_provider()} ==")
users.update(tenant_id=t['tenantId'], user_id=existing[0]["id"],
changes={"origin": SAML2_helper.get_saml2_provider(), "internal_id": internal_id})
elif t['tenantId'] != existing["tenantId"]:
print("user exists for a different tenant")
return {"errors": ["user exists for a different tenant"]}
return users.authenticate_sso(email=email, internal_id=internal_id, exp=auth.get_session_expiration())
@app.route('/sso/saml2/slo', methods=['GET'])
def process_slo_request(context):
req = prepare_request(request=app.current_request)
session = req["cookie"]["session"]
request = req['request']
auth = init_saml_auth(req)
name_id = session_index = name_id_format = name_id_nq = name_id_spnq = None
if 'samlNameId' in session:
name_id = session['samlNameId']
if 'samlSessionIndex' in session:
session_index = session['samlSessionIndex']
if 'samlNameIdFormat' in session:
name_id_format = session['samlNameIdFormat']
if 'samlNameIdNameQualifier' in session:
name_id_nq = session['samlNameIdNameQualifier']
if 'samlNameIdSPNameQualifier' in session:
name_id_spnq = session['samlNameIdSPNameQualifier']
users.change_jwt_iat(context["userId"])
return Response(
status_code=307,
body='',
headers={'Location': auth.logout(name_id=name_id, session_index=session_index, nq=name_id_nq,
name_id_format=name_id_format,
spnq=name_id_spnq), 'Content-Type': 'text/plain'})
@app.route('/sso/saml2/sls', methods=['GET'], authorizer=None)
def process_sls_assertion():
req = prepare_request(request=app.current_request)

View file

@ -339,13 +339,13 @@ def get_by_email_only(email):
(CASE WHEN users.role = 'member' THEN TRUE ELSE FALSE END) AS member,
origin
FROM public.users LEFT JOIN public.basic_authentication ON users.user_id=basic_authentication.user_id
WHERE
users.email = %(email)s
AND users.deleted_at IS NULL;""",
WHERE users.email = %(email)s
AND users.deleted_at IS NULL
LIMIT 1;""",
{"email": email})
)
r = cur.fetchall()
return helper.list_to_camel_case(r)
r = cur.fetchone()
return helper.dict_to_camel_case(r)
def get_by_email_reset(email, reset_token):