From 28df42132f4f9137a082f9a4e208e347a268be6b Mon Sep 17 00:00:00 2001 From: Kraiem Taha Yassine Date: Thu, 2 May 2024 15:52:25 +0200 Subject: [PATCH] fix(chalice): changed SSO (#2157) --- ee/api/routers/saml.py | 50 +++++++++++++++++++++++++----------------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/ee/api/routers/saml.py b/ee/api/routers/saml.py index 7a0f9caf4..42d957b47 100644 --- a/ee/api/routers/saml.py +++ b/ee/api/routers/saml.py @@ -64,11 +64,9 @@ async def process_sso_assertion(request: Request): auth.process_response(request_id=request_id) errors = auth.get_errors() - user_data = {} if len(errors) == 0: if 'AuthNRequestID' in session: del session['AuthNRequestID'] - user_data = auth.get_attributes() else: error_reason = auth.get_last_error_reason() logger.error("SAML2 error:") @@ -79,9 +77,12 @@ async def process_sso_assertion(request: Request): logger.debug(f"received nameId: {email}") existing = users.get_by_email_only(auth.get_nameid()) - internal_id = next(iter(user_data.get("internalId", [])), None) - tenant_key = user_data.get("tenantKey", []) - if len(tenant_key) == 0: + internal_id = auth.get_attribute("internalId") + logger.debug(f"internalId: {internal_id}") + + tenant_key = auth.get_attribute("tenantKey") + logger.debug(f"tenantKey: {tenant_key}") + if tenant_key is None or len(tenant_key) == 0: logger.error("tenantKey not present in assertion, please check your SP-assertion-configuration") return {"errors": ["tenantKey not present in assertion, please check your SP-assertion-configuration"]} else: @@ -89,9 +90,10 @@ async def process_sso_assertion(request: Request): if t is None: logger.error("invalid tenantKey, please copy the correct value from Preferences > Account") return {"errors": ["invalid tenantKey, please copy the correct value from Preferences > Account"]} - logger.debug(user_data) - role_name = user_data.get("role", []) - if len(role_name) == 0: + + role_name = auth.get_attribute("role") + logger.debug(f"roleName: {role_name}") + if role_name is None or len(role_name) == 0: logger.info("No role specified, setting role to member") role_name = ["member"] role_name = role_name[0] @@ -99,24 +101,28 @@ async def process_sso_assertion(request: Request): if role is None: return {"errors": [f"role {role_name} not found, please create it in openreplay first"]} - admin_privileges = user_data.get("adminPrivileges", []) + admin_privileges = auth.get_attribute("adminPrivileges") + logger.debug(f"adminPrivileges: {admin_privileges}") admin_privileges = not (len(admin_privileges) == 0 or admin_privileges[0] is None or admin_privileges[0].lower() == "false") + first_name = auth.get_attribute("firstName") + last_name = auth.get_attribute("lastName") + logger.debug(f"firstName: {first_name}, lastName: {last_name}") if existing is None: deleted = users.get_deleted_user_by_email(auth.get_nameid()) if deleted is not None: logger.info("== restore deleted user ==") users.restore_sso_user(user_id=deleted["userId"], tenant_id=t['tenantId'], email=email, admin=admin_privileges, origin=SAML2_helper.get_saml2_provider(), - name=" ".join(user_data.get("firstName", []) + user_data.get("lastName", [])), + name=" ".join(first_name + last_name), internal_id=internal_id, role_id=role["roleId"]) else: logger.info("== new user ==") users.create_sso_user(tenant_id=t['tenantId'], email=email, admin=admin_privileges, origin=SAML2_helper.get_saml2_provider(), - name=" ".join(user_data.get("firstName", []) + user_data.get("lastName", [])), + name=" ".join(first_name + last_name), internal_id=internal_id, role_id=role["roleId"]) else: if t['tenantId'] != existing["tenantId"]: @@ -178,11 +184,9 @@ async def process_sso_assertion_tk(tenantKey: str, request: Request): auth.process_response(request_id=request_id) errors = auth.get_errors() - user_data = {} if len(errors) == 0: if 'AuthNRequestID' in session: del session['AuthNRequestID'] - user_data = auth.get_attributes() else: error_reason = auth.get_last_error_reason() logger.error("SAML2 error:") @@ -193,15 +197,17 @@ async def process_sso_assertion_tk(tenantKey: str, request: Request): logger.debug(f"received nameId: {email}") existing = users.get_by_email_only(auth.get_nameid()) - internal_id = next(iter(user_data.get("internalId", [])), None) + internal_id = auth.get_attribute("internalId") + logger.debug(f"internalId: {internal_id}") t = tenants.get_by_tenant_key(tenantKey) if t is None: logger.error("invalid tenantKey, please copy the correct value from Preferences > Account") return {"errors": ["invalid tenantKey, please copy the correct value from Preferences > Account"]} - logger.debug(user_data) - role_name = user_data.get("role", []) - if len(role_name) == 0: + + role_name = auth.get_attribute("role") + logger.debug(f"roleName: {role_name}") + if role_name is None or len(role_name) == 0: logger.info("No role specified, setting role to member") role_name = ["member"] role_name = role_name[0] @@ -209,24 +215,28 @@ async def process_sso_assertion_tk(tenantKey: str, request: Request): if role is None: return {"errors": [f"role {role_name} not found, please create it in openreplay first"]} - admin_privileges = user_data.get("adminPrivileges", []) + admin_privileges = auth.get_attribute("adminPrivileges") + logger.debug(f"adminPrivileges: {admin_privileges}") admin_privileges = not (len(admin_privileges) == 0 or admin_privileges[0] is None or admin_privileges[0].lower() == "false") + first_name = auth.get_attribute("firstName") + last_name = auth.get_attribute("lastName") + logger.debug(f"firstName: {first_name}, lastName: {last_name}") if existing is None: deleted = users.get_deleted_user_by_email(auth.get_nameid()) if deleted is not None: logger.info("== restore deleted user ==") users.restore_sso_user(user_id=deleted["userId"], tenant_id=t['tenantId'], email=email, admin=admin_privileges, origin=SAML2_helper.get_saml2_provider(), - name=" ".join(user_data.get("firstName", []) + user_data.get("lastName", [])), + name=" ".join(first_name + last_name), internal_id=internal_id, role_id=role["roleId"]) else: logger.info("== new user ==") users.create_sso_user(tenant_id=t['tenantId'], email=email, admin=admin_privileges, origin=SAML2_helper.get_saml2_provider(), - name=" ".join(user_data.get("firstName", []) + user_data.get("lastName", [])), + name=" ".join(first_name + last_name), internal_id=internal_id, role_id=role["roleId"]) else: if t['tenantId'] != existing["tenantId"]: