Merge pull request #1103 from openreplay/v1.11.0-patch

V1.11.0 patch
This commit is contained in:
Kraiem Taha Yassine 2023-04-04 14:04:31 +01:00 committed by GitHub
commit 43aee87b06
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 84 additions and 48 deletions

View file

@ -115,7 +115,7 @@ def reset_member(tenant_id, editor_id, user_id_to_update):
return {"data": {"invitationLink": generate_new_invitation(user_id_to_update)}}
def update(tenant_id, user_id, changes):
def update(tenant_id, user_id, changes, output=True):
AUTH_KEYS = ["password", "invitationToken", "invitedAt", "changePwdExpireAt", "changePwdToken"]
if len(changes.keys()) == 0:
return None
@ -167,7 +167,8 @@ def update(tenant_id, user_id, changes):
(CASE WHEN users.role = 'member' THEN TRUE ELSE FALSE END) AS member;""",
{"user_id": user_id, **changes})
)
if not output:
return None
return get(user_id=user_id, tenant_id=tenant_id)
@ -291,35 +292,29 @@ def edit(user_id_to_update, tenant_id, changes: schemas.EditUserSchema, editor_i
return {"data": user}
def edit_member(user_id_to_update, tenant_id, changes: schemas.EditUserSchema, editor_id):
def edit_member(user_id_to_update, tenant_id, changes: schemas.EditMemberSchema, editor_id):
user = get_member(user_id=user_id_to_update, tenant_id=tenant_id)
if editor_id != user_id_to_update or changes.admin is not None and changes.admin != user["admin"]:
admin = get(tenant_id=tenant_id, user_id=editor_id)
_changes = {}
if editor_id != user_id_to_update:
admin = get_user_role(tenant_id=tenant_id, user_id=editor_id)
if not admin["superAdmin"] and not admin["admin"]:
return {"errors": ["unauthorized"]}
_changes = {}
if editor_id == user_id_to_update:
if changes.admin is not None:
if user["superAdmin"]:
changes.admin = None
elif changes.admin != user["admin"]:
return {"errors": ["cannot change your own role"]}
if admin["admin"] and user["superAdmin"]:
return {"errors": ["only the owner can edit his own details"]}
else:
if user["superAdmin"]:
changes.admin = None
elif changes.admin != user["admin"]:
return {"errors": ["cannot change your own admin privileges"]}
if changes.email is not None and changes.email != user["email"]:
if email_exists(changes.email):
return {"errors": ["email already exists."]}
if get_deleted_user_by_email(changes.email) is not None:
return {"errors": ["email previously deleted."]}
_changes["email"] = changes.email
if changes.name is not None and len(changes.name) > 0:
if changes.name and len(changes.name) > 0:
_changes["name"] = changes.name
if changes.admin is not None:
_changes["role"] = "admin" if changes.admin else "member"
if len(_changes.keys()) > 0:
update(tenant_id=tenant_id, user_id=user_id_to_update, changes=_changes)
update(tenant_id=tenant_id, user_id=user_id_to_update, changes=_changes, output=False)
return {"data": get_member(user_id=user_id_to_update, tenant_id=tenant_id)}
return {"data": user}
@ -630,3 +625,25 @@ def authenticate(email, password, for_change_password=False):
**r
}
return None
def get_user_role(tenant_id, user_id):
with pg_client.PostgresClient() as cur:
cur.execute(
cur.mogrify(
f"""SELECT
users.user_id,
users.email,
users.role,
users.name,
users.created_at,
(CASE WHEN users.role = 'owner' THEN TRUE ELSE FALSE END) AS super_admin,
(CASE WHEN users.role = 'admin' THEN TRUE ELSE FALSE END) AS admin,
(CASE WHEN users.role = 'member' THEN TRUE ELSE FALSE END) AS member
FROM public.users
WHERE users.deleted_at IS NULL
AND users.user_id=%(user_id)s
LIMIT 1""",
{"user_id": user_id})
)
return helper.dict_to_camel_case(cur.fetchone())

View file

@ -42,6 +42,7 @@ class EditUserSchema(BaseModel):
email: Optional[EmailStr] = Field(None)
admin: Optional[bool] = Field(None)
_transform_name = validator('name', pre=True, allow_reuse=True)(remove_whitespace)
_transform_email = validator('email', pre=True, allow_reuse=True)(transform_email)
@ -152,8 +153,6 @@ class EditMemberSchema(EditUserSchema):
name: str = Field(...)
email: EmailStr = Field(...)
admin: bool = Field(False)
_transform_name = validator('name', pre=True, allow_reuse=True)(remove_whitespace)
_transform_email = validator('email', pre=True, allow_reuse=True)(transform_email)
class EditPasswordByInvitationSchema(BaseModel):

View file

@ -128,7 +128,7 @@ def reset_member(tenant_id, editor_id, user_id_to_update):
return {"data": {"invitationLink": generate_new_invitation(user_id_to_update)}}
def update(tenant_id, user_id, changes):
def update(tenant_id, user_id, changes, output=True):
AUTH_KEYS = ["password", "invitationToken", "invitedAt", "changePwdExpireAt", "changePwdToken"]
if len(changes.keys()) == 0:
return None
@ -197,7 +197,8 @@ def update(tenant_id, user_id, changes):
AND roles.role_id=users.role_id) AS role_name;""",
{"tenant_id": tenant_id, "user_id": user_id, **changes})
)
if not output:
return None
return get(user_id=user_id, tenant_id=tenant_id)
@ -344,33 +345,29 @@ def edit(user_id_to_update, tenant_id, changes: schemas_ee.EditUserSchema, edito
return {"data": user}
def edit_member(user_id_to_update, tenant_id, changes: schemas_ee.EditUserSchema, editor_id):
def edit_member(user_id_to_update, tenant_id, changes: schemas_ee.EditMemberSchema, editor_id):
user = get_member(user_id=user_id_to_update, tenant_id=tenant_id)
if editor_id != user_id_to_update or changes.admin is not None and changes.admin != user["admin"]:
admin = get(tenant_id=tenant_id, user_id=editor_id)
_changes = {}
if editor_id != user_id_to_update:
admin = get_user_role(tenant_id=tenant_id, user_id=editor_id)
if not admin["superAdmin"] and not admin["admin"]:
return {"errors": ["unauthorized"]}
_changes = {}
if editor_id == user_id_to_update:
if changes.admin is not None:
if user["superAdmin"]:
changes.admin = None
elif changes.admin != user["admin"]:
return {"errors": ["cannot change your own role"]}
if changes.roleId is not None:
if user["superAdmin"]:
if admin["admin"] and user["superAdmin"]:
return {"errors": ["only the owner can edit his own details"]}
else:
if user["superAdmin"]:
changes.admin = None
elif changes.admin != user["admin"]:
return {"errors": ["cannot change your own admin privileges"]}
if changes.roleId:
if user["superAdmin"] and changes.roleId != user["roleId"]:
changes.roleId = None
elif changes.roleId != user["roleId"]:
return {"errors": ["owner's role cannot be changed"]}
if changes.roleId != user["roleId"]:
return {"errors": ["cannot change your own role"]}
if changes.email is not None and changes.email != user["email"]:
if email_exists(changes.email):
return {"errors": ["email already exists."]}
if get_deleted_user_by_email(changes.email) is not None:
return {"errors": ["email previously deleted."]}
_changes["email"] = changes.email
if changes.name is not None and len(changes.name) > 0:
if changes.name and len(changes.name) > 0:
_changes["name"] = changes.name
if changes.admin is not None:
@ -380,8 +377,8 @@ def edit_member(user_id_to_update, tenant_id, changes: schemas_ee.EditUserSchema
_changes["roleId"] = changes.roleId
if len(_changes.keys()) > 0:
update(tenant_id=tenant_id, user_id=user_id_to_update, changes=_changes)
return {"data": get_member(tenant_id=tenant_id, user_id=user_id_to_update)}
update(tenant_id=tenant_id, user_id=user_id_to_update, changes=_changes, output=False)
return {"data": get_member(user_id=user_id_to_update, tenant_id=tenant_id)}
return {"data": user}
@ -853,3 +850,26 @@ def __hard_delete_user(user_id):
WHERE users.user_id = %(user_id)s AND users.deleted_at IS NOT NULL ;""",
{"user_id": user_id})
cur.execute(query)
def get_user_role(tenant_id, user_id):
with pg_client.PostgresClient() as cur:
cur.execute(
cur.mogrify(
f"""SELECT
users.user_id,
users.email,
users.role,
users.name,
users.created_at,
(CASE WHEN users.role = 'owner' THEN TRUE ELSE FALSE END) AS super_admin,
(CASE WHEN users.role = 'admin' THEN TRUE ELSE FALSE END) AS admin,
(CASE WHEN users.role = 'member' THEN TRUE ELSE FALSE END) AS member
FROM public.users
WHERE users.deleted_at IS NULL
AND users.user_id=%(user_id)s
AND users.tenant_id=%(tenant_id)s
LIMIT 1""",
{"tenant_id": tenant_id, "user_id": user_id})
)
return helper.dict_to_camel_case(cur.fetchone())