feat(chalice): cleaned code

This commit is contained in:
Taha Yassine Kraiem 2023-02-27 11:39:16 +01:00
parent b94bd834d3
commit 8a843a1af9
4 changed files with 62 additions and 93 deletions

View file

@ -212,15 +212,6 @@ def delete(tenant_id, user_id, project_id):
return {"data": {"state": "success"}}
def count_by_tenant(tenant_id):
with pg_client.PostgresClient() as cur:
query = """SELECT count(1) AS count
FROM public.projects AS s
WHERE s.deleted_at IS NULL;"""
cur.execute(query=query)
return cur.fetchone()["count"]
def get_gdpr(project_id):
with pg_client.PostgresClient() as cur:
query = cur.mogrify("""SELECT gdpr

View file

@ -47,22 +47,15 @@ async def get_account(context: schemas.CurrentContext = Depends(OR_context)):
@app.post('/account', tags=["account"])
async def edit_account(data: schemas.EditUserSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
return users.edit(tenant_id=context.tenant_id, user_id_to_update=context.user_id, changes=data,
editor_id=context.user_id)
@app.get('/projects/limit', tags=['projects'])
async def get_projects_limit(context: schemas.CurrentContext = Depends(OR_context)):
return {"data": {
"current": projects.count_by_tenant(tenant_id=context.tenant_id),
"remaining": -1
}}
@app.post('/integrations/slack', tags=['integrations'])
@app.put('/integrations/slack', tags=['integrations'])
async def add_slack_integration(data: schemas.AddCollaborationSchema, context: schemas.CurrentContext = Depends(OR_context)):
async def add_slack_integration(data: schemas.AddCollaborationSchema,
context: schemas.CurrentContext = Depends(OR_context)):
n = Slack.add(tenant_id=context.tenant_id, data=data)
if n is None:
return {
@ -73,7 +66,7 @@ async def add_slack_integration(data: schemas.AddCollaborationSchema, context: s
@app.post('/integrations/slack/{integrationId}', tags=['integrations'])
async def edit_slack_integration(integrationId: int, data: schemas.EditCollaborationSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
if len(data.url) > 0:
old = Slack.get_integration(tenant_id=context.tenant_id, integration_id=integrationId)
if not old:
@ -90,7 +83,7 @@ async def edit_slack_integration(integrationId: int, data: schemas.EditCollabora
@app.post('/client/members', tags=["client"])
async def add_member(background_tasks: BackgroundTasks, data: schemas.CreateMemberSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
return users.create_member(tenant_id=context.tenant_id, user_id=context.user_id, data=data.dict(),
background_tasks=background_tasks)
@ -127,14 +120,14 @@ async def change_password_by_invitation(data: schemas.EditPasswordByInvitationSc
@app.put('/client/members/{memberId}', tags=["client"])
async def edit_member(memberId: int, data: schemas.EditMemberSchema,
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
return users.edit_member(tenant_id=context.tenant_id, editor_id=context.user_id, changes=data,
user_id_to_update=memberId)
@app.get('/metadata/session_search', tags=["metadata"])
async def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] = None,
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
if key is None or value is None or len(value) == 0 and len(key) == 0:
return {"errors": ["please provide a key&value for search"]}
if len(value) == 0:
@ -154,7 +147,7 @@ async def get_projects(context: schemas.CurrentContext = Depends(OR_context)):
@app.get('/{projectId}/sessions/{sessionId}', tags=["sessions"])
async def get_session(projectId: int, sessionId: Union[int, str], background_tasks: BackgroundTasks,
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
if isinstance(sessionId, str):
return {"errors": ["session not found"]}
data = sessions.get_by_id2_pg(project_id=projectId, session_id=sessionId, full_data=True,
@ -171,7 +164,7 @@ async def get_session(projectId: int, sessionId: Union[int, str], background_tas
@app.get('/{projectId}/sessions/{sessionId}/errors/{errorId}/sourcemaps', tags=["sessions", "sourcemaps"])
async def get_error_trace(projectId: int, sessionId: int, errorId: str,
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
data = errors.get_trace(project_id=projectId, error_id=errorId)
if "errors" in data:
return data
@ -182,19 +175,19 @@ async def get_error_trace(projectId: int, sessionId: int, errorId: str,
@app.post('/{projectId}/errors/search', tags=['errors'])
async def errors_search(projectId: int, data: schemas.SearchErrorsSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
return {"data": errors.search(data, projectId, user_id=context.user_id)}
@app.get('/{projectId}/errors/stats', tags=['errors'])
async def errors_stats(projectId: int, startTimestamp: int, endTimestamp: int,
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
return errors.stats(projectId, user_id=context.user_id, startTimestamp=startTimestamp, endTimestamp=endTimestamp)
@app.get('/{projectId}/errors/{errorId}', tags=['errors'])
async def errors_get_details(projectId: int, errorId: str, background_tasks: BackgroundTasks, density24: int = 24,
density30: int = 30, context: schemas.CurrentContext = Depends(OR_context)):
density30: int = 30, context: schemas.CurrentContext = Depends(OR_context)):
data = errors.get_details(project_id=projectId, user_id=context.user_id, error_id=errorId,
**{"density24": density24, "density30": density30})
if data.get("data") is not None:
@ -205,8 +198,8 @@ async def errors_get_details(projectId: int, errorId: str, background_tasks: Bac
@app.get('/{projectId}/errors/{errorId}/stats', tags=['errors'])
async def errors_get_details_right_column(projectId: int, errorId: str, startDate: int = TimeUTC.now(-7),
endDate: int = TimeUTC.now(), density: int = 7,
context: schemas.CurrentContext = Depends(OR_context)):
endDate: int = TimeUTC.now(), density: int = 7,
context: schemas.CurrentContext = Depends(OR_context)):
data = errors.get_details_chart(project_id=projectId, user_id=context.user_id, error_id=errorId,
**{"startDate": startDate, "endDate": endDate, "density": density})
return data
@ -214,7 +207,7 @@ async def errors_get_details_right_column(projectId: int, errorId: str, startDat
@app.get('/{projectId}/errors/{errorId}/sourcemaps', tags=['errors'])
async def errors_get_details_sourcemaps(projectId: int, errorId: str,
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
data = errors.get_trace(project_id=projectId, error_id=errorId)
if "errors" in data:
return data
@ -225,7 +218,8 @@ async def errors_get_details_sourcemaps(projectId: int, errorId: str,
@app.get('/{projectId}/errors/{errorId}/{action}', tags=["errors"])
async def add_remove_favorite_error(projectId: int, errorId: str, action: str, startDate: int = TimeUTC.now(-7),
endDate: int = TimeUTC.now(), context: schemas.CurrentContext = Depends(OR_context)):
endDate: int = TimeUTC.now(),
context: schemas.CurrentContext = Depends(OR_context)):
if action == "favorite":
return errors_favorite.favorite_error(project_id=projectId, user_id=context.user_id, error_id=errorId)
elif action == "sessions":
@ -242,7 +236,7 @@ async def add_remove_favorite_error(projectId: int, errorId: str, action: str, s
@app.get('/{projectId}/assist/sessions/{sessionId}', tags=["assist"])
async def get_live_session(projectId: int, sessionId: str, background_tasks: BackgroundTasks,
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
data = assist.get_live_session_by_id(project_id=projectId, session_id=sessionId)
if data is None:
data = sessions.get_by_id2_pg(context=context, project_id=projectId, session_id=sessionId,
@ -257,7 +251,7 @@ async def get_live_session(projectId: int, sessionId: str, background_tasks: Bac
@app.get('/{projectId}/unprocessed/{sessionId}/dom.mob', tags=["assist"])
async def get_live_session_replay_file(projectId: int, sessionId: Union[int, str],
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
not_found = {"errors": ["Replay file not found"]}
if isinstance(sessionId, str):
print(f"{sessionId} not a valid number.")
@ -277,7 +271,7 @@ async def get_live_session_replay_file(projectId: int, sessionId: Union[int, str
@app.get('/{projectId}/unprocessed/{sessionId}/devtools.mob', tags=["assist"])
async def get_live_session_devtools_file(projectId: int, sessionId: Union[int, str],
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
not_found = {"errors": ["Devtools file not found"]}
if isinstance(sessionId, str):
print(f"{sessionId} not a valid number.")
@ -297,13 +291,13 @@ async def get_live_session_devtools_file(projectId: int, sessionId: Union[int, s
@app.post('/{projectId}/heatmaps/url', tags=["heatmaps"])
async def get_heatmaps_by_url(projectId: int, data: schemas.GetHeatmapPayloadSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
return {"data": heatmaps.get_by_url(project_id=projectId, data=data)}
@app.get('/{projectId}/sessions/{sessionId}/favorite', tags=["sessions"])
async def add_remove_favorite_session2(projectId: int, sessionId: int,
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
return {
"data": sessions_favorite.favorite_session(context=context, project_id=projectId, session_id=sessionId)}
@ -322,7 +316,7 @@ async def assign_session(projectId: int, sessionId, context: schemas.CurrentCont
@app.get('/{projectId}/sessions/{sessionId}/assign/{issueId}', tags=["sessions", "issueTracking"])
async def assign_session(projectId: int, sessionId: int, issueId: str,
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
data = sessions_assignments.get(project_id=projectId, session_id=sessionId, assignment_id=issueId,
tenant_id=context.tenant_id, user_id=context.user_id)
if "errors" in data:
@ -333,8 +327,9 @@ async def assign_session(projectId: int, sessionId: int, issueId: str,
@app.post('/{projectId}/sessions/{sessionId}/assign/{issueId}/comment', tags=["sessions", "issueTracking"])
async def comment_assignment(projectId: int, sessionId: int, issueId: str, data: schemas.CommentAssignmentSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
async def comment_assignment(projectId: int, sessionId: int, issueId: str,
data: schemas.CommentAssignmentSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
data = sessions_assignments.comment(tenant_id=context.tenant_id, project_id=projectId,
session_id=sessionId, assignment_id=issueId,
user_id=context.user_id, message=data.message)
@ -347,7 +342,7 @@ async def comment_assignment(projectId: int, sessionId: int, issueId: str, data:
@app.post('/{projectId}/sessions/{sessionId}/notes', tags=["sessions", "notes"])
async def create_note(projectId: int, sessionId: int, data: schemas.SessionNoteSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
if not sessions.session_exists(project_id=projectId, session_id=sessionId):
return {"errors": ["Session not found"]}
data = sessions_notes.create(tenant_id=context.tenant_id, project_id=projectId,
@ -372,7 +367,7 @@ async def get_session_notes(projectId: int, sessionId: int, context: schemas.Cur
@app.post('/{projectId}/notes/{noteId}', tags=["sessions", "notes"])
async def edit_note(projectId: int, noteId: int, data: schemas.SessionUpdateNoteSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
data = sessions_notes.edit(tenant_id=context.tenant_id, project_id=projectId, user_id=context.user_id,
note_id=noteId, data=data)
if "errors" in data.keys():
@ -391,21 +386,21 @@ async def delete_note(projectId: int, noteId: int, context: schemas.CurrentConte
@app.get('/{projectId}/notes/{noteId}/slack/{webhookId}', tags=["sessions", "notes"])
async def share_note_to_slack(projectId: int, noteId: int, webhookId: int,
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
return sessions_notes.share_to_slack(tenant_id=context.tenant_id, project_id=projectId, user_id=context.user_id,
note_id=noteId, webhook_id=webhookId)
@app.get('/{projectId}/notes/{noteId}/msteams/{webhookId}', tags=["sessions", "notes"])
async def share_note_to_msteams(projectId: int, noteId: int, webhookId: int,
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
return sessions_notes.share_to_msteams(tenant_id=context.tenant_id, project_id=projectId, user_id=context.user_id,
note_id=noteId, webhook_id=webhookId)
@app.post('/{projectId}/notes', tags=["sessions", "notes"])
async def get_all_notes(projectId: int, data: schemas.SearchNoteSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
data = sessions_notes.get_all_notes_by_project_id(tenant_id=context.tenant_id, project_id=projectId,
user_id=context.user_id, data=data)
if "errors" in data:
@ -415,5 +410,5 @@ async def get_all_notes(projectId: int, data: schemas.SearchNoteSchema = Body(..
@app.post('/{projectId}/click_maps/search', tags=["click maps"])
async def click_map_search(projectId: int, data: schemas.FlatClickMapSessionsSearch = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
return {"data": click_maps.search_short_session(user_id=context.user_id, data=data, project_id=projectId)}

View file

@ -207,17 +207,6 @@ def delete(tenant_id, user_id, project_id):
return {"data": {"state": "success"}}
def count_by_tenant(tenant_id):
with pg_client.PostgresClient() as cur:
query = cur.mogrify("""SELECT count(1) AS count
FROM public.projects AS s
WHERE s.deleted_at IS NULL
AND tenant_id= %(tenant_id)s;""",
{"tenant_id": tenant_id})
cur.execute(query=query)
return cur.fetchone()["count"]
def get_gdpr(project_id):
with pg_client.PostgresClient() as cur:
query = cur.mogrify("""SELECT gdpr

View file

@ -52,19 +52,11 @@ async def get_account(context: schemas.CurrentContext = Depends(OR_context)):
@app.post('/account', tags=["account"])
async def edit_account(data: schemas_ee.EditUserSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
return users.edit(tenant_id=context.tenant_id, user_id_to_update=context.user_id, changes=data,
editor_id=context.user_id)
@app.get('/projects/limit', tags=['projects'])
async def get_projects_limit(context: schemas.CurrentContext = Depends(OR_context)):
return {"data": {
"current": projects.count_by_tenant(tenant_id=context.tenant_id),
"remaining": -1
}}
@app.post('/integrations/slack', tags=['integrations'])
@app.put('/integrations/slack', tags=['integrations'])
async def add_slack_client(data: schemas.AddCollaborationSchema, context: schemas.CurrentContext = Depends(OR_context)):
@ -78,7 +70,7 @@ async def add_slack_client(data: schemas.AddCollaborationSchema, context: schema
@app.post('/integrations/slack/{integrationId}', tags=['integrations'])
async def edit_slack_integration(integrationId: int, data: schemas.EditCollaborationSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
if len(data.url) > 0:
old = Slack.get_integration(tenant_id=context.tenant_id, integration_id=integrationId)
if not old:
@ -95,7 +87,7 @@ async def edit_slack_integration(integrationId: int, data: schemas.EditCollabora
@app.post('/client/members', tags=["client"])
async def add_member(background_tasks: BackgroundTasks, data: schemas_ee.CreateMemberSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
return users.create_member(tenant_id=context.tenant_id, user_id=context.user_id, data=data.dict(),
background_tasks=background_tasks)
@ -134,14 +126,14 @@ async def change_password_by_invitation(data: schemas.EditPasswordByInvitationSc
@app.put('/client/members/{memberId}', tags=["client"])
async def edit_member(memberId: int, data: schemas_ee.EditMemberSchema,
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
return users.edit_member(tenant_id=context.tenant_id, editor_id=context.user_id, changes=data,
user_id_to_update=memberId)
@app.get('/metadata/session_search', tags=["metadata"])
async def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] = None,
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
if key is None or value is None or len(value) == 0 and len(key) == 0:
return {"errors": ["please provide a key&value for search"]}
@ -165,7 +157,7 @@ async def get_projects(context: schemas.CurrentContext = Depends(OR_context)):
@app.get('/{projectId}/sessions/{sessionId}', tags=["sessions"], dependencies=[OR_scope(Permissions.session_replay)])
async def get_session(projectId: int, sessionId: Union[int, str], background_tasks: BackgroundTasks,
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
if isinstance(sessionId, str):
return {"errors": ["session not found"]}
data = sessions.get_by_id2_pg(project_id=projectId, session_id=sessionId, full_data=True,
@ -183,7 +175,7 @@ async def get_session(projectId: int, sessionId: Union[int, str], background_tas
@app.get('/{projectId}/sessions/{sessionId}/errors/{errorId}/sourcemaps', tags=["sessions", "sourcemaps"],
dependencies=[OR_scope(Permissions.dev_tools)])
async def get_error_trace(projectId: int, sessionId: int, errorId: str,
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
data = errors.get_trace(project_id=projectId, error_id=errorId)
if "errors" in data:
return data
@ -194,19 +186,19 @@ async def get_error_trace(projectId: int, sessionId: int, errorId: str,
@app.post('/{projectId}/errors/search', tags=['errors'], dependencies=[OR_scope(Permissions.dev_tools)])
async def errors_search(projectId: int, data: schemas.SearchErrorsSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
return {"data": errors.search(data, projectId, user_id=context.user_id)}
@app.get('/{projectId}/errors/stats', tags=['errors'], dependencies=[OR_scope(Permissions.dev_tools)])
async def errors_stats(projectId: int, startTimestamp: int, endTimestamp: int,
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
return errors.stats(projectId, user_id=context.user_id, startTimestamp=startTimestamp, endTimestamp=endTimestamp)
@app.get('/{projectId}/errors/{errorId}', tags=['errors'], dependencies=[OR_scope(Permissions.dev_tools)])
async def errors_get_details(projectId: int, errorId: str, background_tasks: BackgroundTasks, density24: int = 24,
density30: int = 30, context: schemas.CurrentContext = Depends(OR_context)):
density30: int = 30, context: schemas.CurrentContext = Depends(OR_context)):
data = errors.get_details(project_id=projectId, user_id=context.user_id, error_id=errorId,
**{"density24": density24, "density30": density30})
if data.get("data") is not None:
@ -217,8 +209,8 @@ async def errors_get_details(projectId: int, errorId: str, background_tasks: Bac
@app.get('/{projectId}/errors/{errorId}/stats', tags=['errors'], dependencies=[OR_scope(Permissions.dev_tools)])
async def errors_get_details_right_column(projectId: int, errorId: str, startDate: int = TimeUTC.now(-7),
endDate: int = TimeUTC.now(), density: int = 7,
context: schemas.CurrentContext = Depends(OR_context)):
endDate: int = TimeUTC.now(), density: int = 7,
context: schemas.CurrentContext = Depends(OR_context)):
data = errors.get_details_chart(project_id=projectId, user_id=context.user_id, error_id=errorId,
**{"startDate": startDate, "endDate": endDate, "density": density})
return data
@ -226,7 +218,7 @@ async def errors_get_details_right_column(projectId: int, errorId: str, startDat
@app.get('/{projectId}/errors/{errorId}/sourcemaps', tags=['errors'], dependencies=[OR_scope(Permissions.dev_tools)])
async def errors_get_details_sourcemaps(projectId: int, errorId: str,
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
data = errors.get_trace(project_id=projectId, error_id=errorId)
if "errors" in data:
return data
@ -237,7 +229,8 @@ async def errors_get_details_sourcemaps(projectId: int, errorId: str,
@app.get('/{projectId}/errors/{errorId}/{action}', tags=["errors"], dependencies=[OR_scope(Permissions.dev_tools)])
async def add_remove_favorite_error(projectId: int, errorId: str, action: str, startDate: int = TimeUTC.now(-7),
endDate: int = TimeUTC.now(), context: schemas.CurrentContext = Depends(OR_context)):
endDate: int = TimeUTC.now(),
context: schemas.CurrentContext = Depends(OR_context)):
if action == "favorite":
return errors_favorite.favorite_error(project_id=projectId, user_id=context.user_id, error_id=errorId)
elif action == "sessions":
@ -254,7 +247,7 @@ async def add_remove_favorite_error(projectId: int, errorId: str, action: str, s
@app.get('/{projectId}/assist/sessions/{sessionId}', tags=["assist"], dependencies=[OR_scope(Permissions.assist_live)])
async def get_live_session(projectId: int, sessionId: str, background_tasks: BackgroundTasks,
context: schemas_ee.CurrentContext = Depends(OR_context)):
context: schemas_ee.CurrentContext = Depends(OR_context)):
data = assist.get_live_session_by_id(project_id=projectId, session_id=sessionId)
if data is None:
data = sessions.get_by_id2_pg(context=context, project_id=projectId, session_id=sessionId,
@ -270,7 +263,7 @@ async def get_live_session(projectId: int, sessionId: str, background_tasks: Bac
@app.get('/{projectId}/unprocessed/{sessionId}/dom.mob', tags=["assist"],
dependencies=[OR_scope(Permissions.assist_live, Permissions.session_replay)])
async def get_live_session_replay_file(projectId: int, sessionId: Union[int, str],
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
not_found = {"errors": ["Replay file not found"]}
if isinstance(sessionId, str):
print(f"{sessionId} not a valid number.")
@ -291,7 +284,7 @@ async def get_live_session_replay_file(projectId: int, sessionId: Union[int, str
@app.get('/{projectId}/unprocessed/{sessionId}/devtools.mob', tags=["assist"],
dependencies=[OR_scope(Permissions.assist_live, Permissions.session_replay, Permissions.dev_tools)])
async def get_live_session_devtools_file(projectId: int, sessionId: Union[int, str],
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
not_found = {"errors": ["Devtools file not found"]}
if isinstance(sessionId, str):
print(f"{sessionId} not a valid number.")
@ -311,14 +304,14 @@ async def get_live_session_devtools_file(projectId: int, sessionId: Union[int, s
@app.post('/{projectId}/heatmaps/url', tags=["heatmaps"], dependencies=[OR_scope(Permissions.session_replay)])
async def get_heatmaps_by_url(projectId: int, data: schemas.GetHeatmapPayloadSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
return {"data": heatmaps.get_by_url(project_id=projectId, data=data)}
@app.get('/{projectId}/sessions/{sessionId}/favorite', tags=["sessions"],
dependencies=[OR_scope(Permissions.session_replay)])
async def add_remove_favorite_session2(projectId: int, sessionId: int,
context: schemas_ee.CurrentContext = Depends(OR_context)):
context: schemas_ee.CurrentContext = Depends(OR_context)):
return {
"data": sessions_favorite.favorite_session(context=context, project_id=projectId, session_id=sessionId)}
@ -339,7 +332,7 @@ async def assign_session(projectId: int, sessionId, context: schemas.CurrentCont
@app.get('/{projectId}/sessions/{sessionId}/assign/{issueId}', tags=["sessions", "issueTracking"],
dependencies=[OR_scope(Permissions.session_replay)])
async def assign_session(projectId: int, sessionId: int, issueId: str,
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
data = sessions_assignments.get(project_id=projectId, session_id=sessionId, assignment_id=issueId,
tenant_id=context.tenant_id, user_id=context.user_id)
if "errors" in data:
@ -351,8 +344,9 @@ async def assign_session(projectId: int, sessionId: int, issueId: str,
@app.post('/{projectId}/sessions/{sessionId}/assign/{issueId}/comment', tags=["sessions", "issueTracking"],
dependencies=[OR_scope(Permissions.session_replay)])
async def comment_assignment(projectId: int, sessionId: int, issueId: str, data: schemas.CommentAssignmentSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
async def comment_assignment(projectId: int, sessionId: int, issueId: str,
data: schemas.CommentAssignmentSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
data = sessions_assignments.comment(tenant_id=context.tenant_id, project_id=projectId,
session_id=sessionId, assignment_id=issueId,
user_id=context.user_id, message=data.message)
@ -366,7 +360,7 @@ async def comment_assignment(projectId: int, sessionId: int, issueId: str, data:
@app.post('/{projectId}/sessions/{sessionId}/notes', tags=["sessions", "notes"],
dependencies=[OR_scope(Permissions.session_replay)])
async def create_note(projectId: int, sessionId: int, data: schemas.SessionNoteSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
if not sessions.session_exists(project_id=projectId, session_id=sessionId):
return {"errors": ["Session not found"]}
data = sessions_notes.create(tenant_id=context.tenant_id, project_id=projectId,
@ -393,7 +387,7 @@ async def get_session_notes(projectId: int, sessionId: int, context: schemas.Cur
@app.post('/{projectId}/notes/{noteId}', tags=["sessions", "notes"],
dependencies=[OR_scope(Permissions.session_replay)])
async def edit_note(projectId: int, noteId: int, data: schemas.SessionUpdateNoteSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
data = sessions_notes.edit(tenant_id=context.tenant_id, project_id=projectId, user_id=context.user_id,
note_id=noteId, data=data)
if "errors" in data.keys():
@ -414,21 +408,21 @@ async def delete_note(projectId: int, noteId: int, context: schemas.CurrentConte
@app.get('/{projectId}/notes/{noteId}/slack/{webhookId}', tags=["sessions", "notes"],
dependencies=[OR_scope(Permissions.session_replay)])
async def share_note_to_slack(projectId: int, noteId: int, webhookId: int,
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
return sessions_notes.share_to_slack(tenant_id=context.tenant_id, project_id=projectId, user_id=context.user_id,
note_id=noteId, webhook_id=webhookId)
@app.get('/{projectId}/notes/{noteId}/msteams/{webhookId}', tags=["sessions", "notes"])
async def share_note_to_msteams(projectId: int, noteId: int, webhookId: int,
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
return sessions_notes.share_to_msteams(tenant_id=context.tenant_id, project_id=projectId, user_id=context.user_id,
note_id=noteId, webhook_id=webhookId)
@app.post('/{projectId}/notes', tags=["sessions", "notes"], dependencies=[OR_scope(Permissions.session_replay)])
async def get_all_notes(projectId: int, data: schemas.SearchNoteSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
data = sessions_notes.get_all_notes_by_project_id(tenant_id=context.tenant_id, project_id=projectId,
user_id=context.user_id, data=data)
if "errors" in data:
@ -438,5 +432,5 @@ async def get_all_notes(projectId: int, data: schemas.SearchNoteSchema = Body(..
@app.post('/{projectId}/click_maps/search', tags=["click maps"], dependencies=[OR_scope(Permissions.session_replay)])
async def click_map_search(projectId: int, data: schemas.FlatClickMapSessionsSearch = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
context: schemas.CurrentContext = Depends(OR_context)):
return {"data": click_maps.search_short_session(user_id=context.user_id, data=data, project_id=projectId)}