From 11850cd27f89f9dbef32d4c3ec27feaeae1d93cd Mon Sep 17 00:00:00 2001 From: Kraiem Taha Yassine Date: Thu, 2 May 2024 16:42:17 +0200 Subject: [PATCH] Revert "fix(chalice): changed SSO (#2157)" (#2159) This reverts commit 28df42132f4f9137a082f9a4e208e347a268be6b. --- ee/api/routers/saml.py | 50 +++++++++++++++++------------------------- 1 file changed, 20 insertions(+), 30 deletions(-) diff --git a/ee/api/routers/saml.py b/ee/api/routers/saml.py index 42d957b47..7a0f9caf4 100644 --- a/ee/api/routers/saml.py +++ b/ee/api/routers/saml.py @@ -64,9 +64,11 @@ async def process_sso_assertion(request: Request): auth.process_response(request_id=request_id) errors = auth.get_errors() + user_data = {} if len(errors) == 0: if 'AuthNRequestID' in session: del session['AuthNRequestID'] + user_data = auth.get_attributes() else: error_reason = auth.get_last_error_reason() logger.error("SAML2 error:") @@ -77,12 +79,9 @@ async def process_sso_assertion(request: Request): logger.debug(f"received nameId: {email}") existing = users.get_by_email_only(auth.get_nameid()) - internal_id = auth.get_attribute("internalId") - logger.debug(f"internalId: {internal_id}") - - tenant_key = auth.get_attribute("tenantKey") - logger.debug(f"tenantKey: {tenant_key}") - if tenant_key is None or len(tenant_key) == 0: + internal_id = next(iter(user_data.get("internalId", [])), None) + tenant_key = user_data.get("tenantKey", []) + if len(tenant_key) == 0: logger.error("tenantKey not present in assertion, please check your SP-assertion-configuration") return {"errors": ["tenantKey not present in assertion, please check your SP-assertion-configuration"]} else: @@ -90,10 +89,9 @@ async def process_sso_assertion(request: Request): if t is None: logger.error("invalid tenantKey, please copy the correct value from Preferences > Account") return {"errors": ["invalid tenantKey, please copy the correct value from Preferences > Account"]} - - role_name = auth.get_attribute("role") - logger.debug(f"roleName: {role_name}") - if role_name is None or len(role_name) == 0: + logger.debug(user_data) + role_name = user_data.get("role", []) + if len(role_name) == 0: logger.info("No role specified, setting role to member") role_name = ["member"] role_name = role_name[0] @@ -101,28 +99,24 @@ async def process_sso_assertion(request: Request): if role is None: return {"errors": [f"role {role_name} not found, please create it in openreplay first"]} - admin_privileges = auth.get_attribute("adminPrivileges") - logger.debug(f"adminPrivileges: {admin_privileges}") + admin_privileges = user_data.get("adminPrivileges", []) admin_privileges = not (len(admin_privileges) == 0 or admin_privileges[0] is None or admin_privileges[0].lower() == "false") - first_name = auth.get_attribute("firstName") - last_name = auth.get_attribute("lastName") - logger.debug(f"firstName: {first_name}, lastName: {last_name}") if existing is None: deleted = users.get_deleted_user_by_email(auth.get_nameid()) if deleted is not None: logger.info("== restore deleted user ==") users.restore_sso_user(user_id=deleted["userId"], tenant_id=t['tenantId'], email=email, admin=admin_privileges, origin=SAML2_helper.get_saml2_provider(), - name=" ".join(first_name + last_name), + name=" ".join(user_data.get("firstName", []) + user_data.get("lastName", [])), internal_id=internal_id, role_id=role["roleId"]) else: logger.info("== new user ==") users.create_sso_user(tenant_id=t['tenantId'], email=email, admin=admin_privileges, origin=SAML2_helper.get_saml2_provider(), - name=" ".join(first_name + last_name), + name=" ".join(user_data.get("firstName", []) + user_data.get("lastName", [])), internal_id=internal_id, role_id=role["roleId"]) else: if t['tenantId'] != existing["tenantId"]: @@ -184,9 +178,11 @@ async def process_sso_assertion_tk(tenantKey: str, request: Request): auth.process_response(request_id=request_id) errors = auth.get_errors() + user_data = {} if len(errors) == 0: if 'AuthNRequestID' in session: del session['AuthNRequestID'] + user_data = auth.get_attributes() else: error_reason = auth.get_last_error_reason() logger.error("SAML2 error:") @@ -197,17 +193,15 @@ async def process_sso_assertion_tk(tenantKey: str, request: Request): logger.debug(f"received nameId: {email}") existing = users.get_by_email_only(auth.get_nameid()) - internal_id = auth.get_attribute("internalId") - logger.debug(f"internalId: {internal_id}") + internal_id = next(iter(user_data.get("internalId", [])), None) t = tenants.get_by_tenant_key(tenantKey) if t is None: logger.error("invalid tenantKey, please copy the correct value from Preferences > Account") return {"errors": ["invalid tenantKey, please copy the correct value from Preferences > Account"]} - - role_name = auth.get_attribute("role") - logger.debug(f"roleName: {role_name}") - if role_name is None or len(role_name) == 0: + logger.debug(user_data) + role_name = user_data.get("role", []) + if len(role_name) == 0: logger.info("No role specified, setting role to member") role_name = ["member"] role_name = role_name[0] @@ -215,28 +209,24 @@ async def process_sso_assertion_tk(tenantKey: str, request: Request): if role is None: return {"errors": [f"role {role_name} not found, please create it in openreplay first"]} - admin_privileges = auth.get_attribute("adminPrivileges") - logger.debug(f"adminPrivileges: {admin_privileges}") + admin_privileges = user_data.get("adminPrivileges", []) admin_privileges = not (len(admin_privileges) == 0 or admin_privileges[0] is None or admin_privileges[0].lower() == "false") - first_name = auth.get_attribute("firstName") - last_name = auth.get_attribute("lastName") - logger.debug(f"firstName: {first_name}, lastName: {last_name}") if existing is None: deleted = users.get_deleted_user_by_email(auth.get_nameid()) if deleted is not None: logger.info("== restore deleted user ==") users.restore_sso_user(user_id=deleted["userId"], tenant_id=t['tenantId'], email=email, admin=admin_privileges, origin=SAML2_helper.get_saml2_provider(), - name=" ".join(first_name + last_name), + name=" ".join(user_data.get("firstName", []) + user_data.get("lastName", [])), internal_id=internal_id, role_id=role["roleId"]) else: logger.info("== new user ==") users.create_sso_user(tenant_id=t['tenantId'], email=email, admin=admin_privileges, origin=SAML2_helper.get_saml2_provider(), - name=" ".join(first_name + last_name), + name=" ".join(user_data.get("firstName", []) + user_data.get("lastName", [])), internal_id=internal_id, role_id=role["roleId"]) else: if t['tenantId'] != existing["tenantId"]: