diff --git a/api/chalicelib/core/db_request_handler.py b/api/chalicelib/core/db_request_handler.py new file mode 100644 index 000000000..e591868a4 --- /dev/null +++ b/api/chalicelib/core/db_request_handler.py @@ -0,0 +1,199 @@ +import logging +from chalicelib.utils import helper, pg_client + + +class DatabaseRequestHandler: + def __init__(self, table_name): + self.table_name = table_name + self.constraints = [] + self.params = {} + self.order_clause = "" + self.sort_clause = "" + self.select_columns = [] + self.sub_queries = [] + self.joins = [] + self.group_by_clause = "" + self.client = pg_client + self.logger = logging.getLogger(__name__) + self.pagination = {} + + def add_constraint(self, constraint, param=None): + self.constraints.append(constraint) + if param: + self.params.update(param) + + def add_subquery(self, subquery, alias, param=None): + self.sub_queries.append((subquery, alias)) + if param: + self.params.update(param) + + def add_join(self, join_clause): + self.joins.append(join_clause) + + def set_order_by(self, order_by): + self.order_clause = order_by + + def set_sort_by(self, sort_by): + self.sort_clause = sort_by + + def set_select_columns(self, columns): + self.select_columns = columns + + def set_group_by(self, group_by_clause): + self.group_by_clause = group_by_clause + + def set_pagination(self, page, page_size): + """ + Set pagination parameters for the query. + :param page: The page number (1-indexed) + :param page_size: Number of items per page + """ + self.pagination = { + 'offset': (page - 1) * page_size, + 'limit': page_size + } + + def build_query(self, action="select", additional_clauses=None, data=None): + + if action == "select": + query = f"SELECT {', '.join(self.select_columns)} FROM {self.table_name}" + elif action == "insert": + columns = ', '.join(data.keys()) + placeholders = ', '.join(f'%({k})s' for k in data.keys()) + query = f"INSERT INTO {self.table_name} ({columns}) VALUES ({placeholders})" + elif action == "update": + set_clause = ', '.join(f"{k} = %({k})s" for k in data.keys()) + query = f"UPDATE {self.table_name} SET {set_clause}" + elif action == "delete": + query = f"DELETE FROM {self.table_name}" + + for join in self.joins: + query += f" {join}" + for subquery, alias in self.sub_queries: + query += f", ({subquery}) AS {alias}" + if self.constraints: + query += " WHERE " + " AND ".join(self.constraints) + if action == "select": + if self.group_by_clause: + query += " GROUP BY " + self.group_by_clause + if self.sort_clause: + query += " ORDER BY " + self.sort_clause + if self.order_clause: + query += " " + self.order_clause + if hasattr(self, 'pagination') and self.pagination: + query += " LIMIT %(limit)s OFFSET %(offset)s" + self.params.update(self.pagination) + + if additional_clauses: + query += " " + additional_clauses + + logging.info(f"Query: {query}") + return query + + def execute_query(self, query, data=None): + try: + with self.client.PostgresClient() as cur: + mogrified_query = cur.mogrify(query, {**data, **self.params} if data else self.params) + cur.execute(mogrified_query) + return cur.fetchall() if cur.description else None + except Exception as e: + self.logger.error(f"Database operation failed: {e}") + raise + + def fetchall(self): + query = self.build_query() + return self.execute_query(query) + + def fetchone(self): + query = self.build_query() + result = self.execute_query(query) + return result[0] if result else None + + def insert(self, data): + query = self.build_query(action="insert", data=data) + query += " RETURNING *;" + + result = self.execute_query(query, data) + return result[0] if result else None + + def update(self, data): + query = self.build_query(action="update", data=data) + query += " RETURNING *;" + + result = self.execute_query(query, data) + return result[0] if result else None + + def delete(self): + query = self.build_query(action="delete") + return self.execute_query(query) + + def batch_insert(self, items): + if not items: + return None + + columns = ', '.join(items[0].keys()) + + # Building a values string with unique parameter names for each item + all_values_query = ', '.join( + '(' + ', '.join([f"%({key}_{i})s" for key in item]) + ')' + for i, item in enumerate(items) + ) + + query = f"INSERT INTO {self.table_name} ({columns}) VALUES {all_values_query} RETURNING *;" + + try: + with self.client.PostgresClient() as cur: + # Flatten items into a single dictionary with unique keys + combined_params = {f"{k}_{i}": v for i, item in enumerate(items) for k, v in item.items()} + mogrified_query = cur.mogrify(query, combined_params) + cur.execute(mogrified_query) + return cur.fetchall() + except Exception as e: + self.logger.error(f"Database batch insert operation failed: {e}") + raise + + def raw_query(self, query, params=None): + try: + with self.client.PostgresClient() as cur: + mogrified_query = cur.mogrify(query, params) + cur.execute(mogrified_query) + return cur.fetchall() if cur.description else None + except Exception as e: + self.logger.error(f"Database operation failed: {e}") + raise + + def batch_update(self, items): + if not items: + return None + + id_column = list(items[0])[0] + + # Building the set clause for the update statement + update_columns = list(items[0].keys()) + update_columns.remove(id_column) + set_clause = ', '.join([f"{col} = v.{col}" for col in update_columns]) + + # Building the values part for the 'VALUES' section + values_rows = [] + for item in items: + values = ', '.join([f"%({key})s" for key in item.keys()]) + values_rows.append(f"({values})") + values_query = ', '.join(values_rows) + + # Constructing the full update query + query = f""" + UPDATE {self.table_name} AS t + SET {set_clause} + FROM (VALUES {values_query}) AS v ({', '.join(items[0].keys())}) + WHERE t.{id_column} = v.{id_column}; + """ + + try: + with self.client.PostgresClient() as cur: + # Flatten items into a single dictionary for mogrify + combined_params = {k: v for item in items for k, v in item.items()} + mogrified_query = cur.mogrify(query, combined_params) + cur.execute(mogrified_query) + except Exception as e: + self.logger.error(f"Database batch update operation failed: {e}") + raise diff --git a/api/chalicelib/core/usability_testing/__init__.py b/api/chalicelib/core/usability_testing/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/api/chalicelib/core/usability_testing/routes.py b/api/chalicelib/core/usability_testing/routes.py new file mode 100644 index 000000000..f8c472240 --- /dev/null +++ b/api/chalicelib/core/usability_testing/routes.py @@ -0,0 +1,124 @@ +from fastapi import Body, Depends + +from chalicelib.core.usability_testing.schema import UTTestCreate, UTTestRead, UTTestUpdate, UTTestDelete, SearchResult, \ + UTTestSearch, UTTestSessionsSearch, UTTestResponsesSearch, StatusEnum, UTTestStatusUpdate +from chalicelib.core.usability_testing import service +from or_dependencies import OR_context +from routers.base import get_routers +from schemas import schemas + +public_app, app, app_apikey = get_routers() +tags = ["usability-tests"] + + +@app.post("/{project_id}/usability-tests/search", tags=tags) +async def search_ui_tests( + project_id: int, + search: UTTestSearch = Body(..., + description="The search parameters including the query, page, limit, sort_by, " + "and sort_order.") +): + """ + Search for UT tests within a given project with pagination and optional sorting. + + - **project_id**: The unique identifier of the project to search within. + - **search**: The search parameters including the query, page, limit, sort_by, and sort_order. + """ + + return service.search_ui_tests(project_id, search) + + +@app.post("/{project_id}/usability-tests", tags=tags) +async def create_ut_test(project_id: int, test_data: UTTestCreate, + context: schemas.CurrentContext = Depends(OR_context)): + """ + Create a new UT test in the specified project. + + - **project_id**: The unique identifier of the project. + - **test_data**: The data for the new UT test. + """ + test_data.project_id = project_id + test_data.created_by = context.user_id + return service.create_ut_test(test_data) + + +@app.get("/{project_id}/usability-tests/{test_id}", tags=tags) +async def get_ut_test(project_id: int, test_id: int): + """ + Retrieve a specific UT test by its ID. + + - **project_id**: The unique identifier of the project. + - **test_id**: The unique identifier of the UT test. + """ + return service.get_ut_test(project_id, test_id) + + +@app.delete("/{project_id}/usability-tests/{test_id}", tags=tags) +async def delete_ut_test(project_id: int, test_id: int): + """ + Delete a specific UT test by its ID. + + - **project_id**: The unique identifier of the project. + - **test_id**: The unique identifier of the UT test to be deleted. + """ + return service.delete_ut_test(project_id, test_id) + + +@app.put("/{project_id}/usability-tests/{test_id}", tags=tags) +async def update_ut_test(project_id: int, test_id: int, test_update: UTTestUpdate): + """ + Update a specific UT test by its ID. + + - **project_id**: The unique identifier of the project. + - **test_id**: The unique identifier of the UT test to be updated. + - **test_update**: The updated data for the UT test. + """ + + return service.update_ut_test(project_id, test_id, test_update) + + +@app.get("/{project_id}/usability-tests/{test_id}/sessions", tags=tags) +async def get_sessions(project_id: int, test_id: int, page: int = 1, limit: int = 10): + """ + Get sessions related to a specific UT test. + + - **project_id**: The unique identifier of the project. + - **test_id**: The unique identifier of the UT test. + """ + + return service.ut_tests_sessions(project_id, test_id, page, limit) + + +@app.get("/{project_id}/usability-tests/{test_id}/responses/{task_id}", tags=tags) +async def get_responses(project_id: int, test_id: int, task_id: int, page: int = 1, limit: int = 10): + """ + Get responses related to a specific UT test. + + - **project_id**: The unique identifier of the project. + - **test_id**: The unique identifier of the UT test. + """ + return service.get_responses(project_id, test_id, task_id, page, limit) + + +@app.get("/{project_id}/usability-tests/{test_id}/statistics", tags=tags) +async def get_statistics(project_id: int, test_id: int): + """ + Get statistics related to a specific UT test. + + :param project_id: + :param test_id: + :return: + """ + return service.get_statistics(test_id=test_id) + + +@app.get("/{project_id}/usability-tests/{test_id}/task-statistics", tags=tags) +async def get_task_statistics(project_id: int, test_id: int): + """ + Get statistics related to a specific UT test. + + :param project_id: + :param test_id: + :return: + """ + return service.get_task_statistics(test_id=test_id) diff --git a/api/chalicelib/core/usability_testing/schema.py b/api/chalicelib/core/usability_testing/schema.py new file mode 100644 index 000000000..ebabd80e6 --- /dev/null +++ b/api/chalicelib/core/usability_testing/schema.py @@ -0,0 +1,133 @@ +from typing import Optional, List +from pydantic import BaseModel, Field +from datetime import datetime +from enum import Enum + +from pydantic.v1 import validator + + +class StatusEnum(str, Enum): + preview = 'preview' + in_progress = 'in-progress' + paused = 'paused' + closed = 'closed' + + +class UTTestTask(BaseModel): + task_id: Optional[int] = Field(None, description="The unique identifier of the task") + test_id: Optional[int] = Field(None, description="The unique identifier of the usability test") + title: str = Field(..., description="The title of the task") + description: Optional[str] = Field(None, description="A detailed description of the task") + allow_typing: Optional[bool] = Field(False, description="Indicates if the user is allowed to type") + + +class UTTestBase(BaseModel): + title: str = Field(..., description="The title of the usability test") + project_id: Optional[int] = Field(None, description="The ID of the associated project") + created_by: Optional[int] = Field(None, description="The ID of the user who created the test") + starting_path: Optional[str] = Field(None, description="The starting path for the usability test") + status: Optional[StatusEnum] = Field(StatusEnum.preview, description="The current status of the usability test") + require_mic: bool = Field(False, description="Indicates if a microphone is required") + require_camera: bool = Field(False, description="Indicates if a camera is required") + description: Optional[str] = Field(None, description="A detailed description of the usability test") + guidelines: Optional[str] = Field(None, description="Guidelines for the usability test") + conclusion_message: Optional[str] = Field(None, description="Conclusion message for the test participants") + visibility: bool = Field(False, description="Flag to indicate if the test is visible to the public") + tasks: Optional[List[UTTestTask]] = Field(None, description="List of tasks for the usability test") + + +class UTTestCreate(UTTestBase): + pass + + +class UTTestStatusUpdate(BaseModel): + status: StatusEnum = Field(..., description="The updated status of the usability test") + + +class UTTestRead(UTTestBase): + test_id: int = Field(..., description="The unique identifier of the usability test") + created_by: Optional[int] = Field(None, description="The ID of the user who created the test") + updated_by: Optional[int] = Field(None, description="The ID of the user who last updated the test") + created_at: datetime = Field(..., description="The timestamp when the test was created") + updated_at: datetime = Field(..., description="The timestamp when the test was last updated") + deleted_at: Optional[datetime] = Field(None, description="The timestamp when the test was deleted, if applicable") + + +class UTTestUpdate(BaseModel): + # Optional fields for updating the usability test + title: Optional[str] = Field(None, description="The updated title of the usability test") + status: Optional[StatusEnum] = Field(None, description="The updated status of the usability test") + description: Optional[str] = Field(None, description="The updated description of the usability test") + starting_path: Optional[str] = Field(None, description="The updated starting path for the usability test") + require_mic: Optional[bool] = Field(None, description="Indicates if a microphone is required") + require_camera: Optional[bool] = Field(None, description="Indicates if a camera is required") + guidelines: Optional[str] = Field(None, description="Updated guidelines for the usability test") + conclusion_message: Optional[str] = Field(None, description="Updated conclusion message for the test participants") + visibility: Optional[bool] = Field(None, description="Flag to indicate if the test is visible to the public") + tasks: Optional[List[UTTestTask]] = Field([], description="List of tasks for the usability test") + + +class UTTestDelete(BaseModel): + # You would usually not need a model for deletion, but let's assume you need to confirm the deletion timestamp + deleted_at: datetime = Field(..., description="The timestamp when the test is marked as deleted") + + +class UTTestSearch(BaseModel): + query: Optional[str] = Field(None, description="Search query for the UT tests") + page: Optional[int] = Field(1, ge=1, description="Page number of the results") + limit: Optional[int] = Field(10, ge=1, le=100, description="Number of results per page") + sort_by: Optional[str] = Field(description="Field to sort by", default="created_at") + sort_order: Optional[str] = Field("asc", description="Sort order: 'asc' or 'desc'") + is_active: Optional[bool] = Field(True, description="Flag to indicate if the test is active") + user_id: Optional[int] = Field(None, description="The ID of the user who created the test") + + @validator('sort_order') + def sort_order_must_be_valid(cls, v): + if v not in ['asc', 'desc']: + raise ValueError('Sort order must be either "asc" or "desc"') + return v + + +class UTTestResponsesSearch(BaseModel): + query: Optional[str] = Field(None, description="Search query for the UT responses") + page: Optional[int] = Field(1, ge=1, description="Page number of the results") + limit: Optional[int] = Field(10, ge=1, le=100, description="Number of results per page") + + +class UTTestSignal(BaseModel): + signal_id: int = Field(..., description="The unique identifier of the response") + test_id: int = Field(..., description="The unique identifier of the usability test") + session_id: int = Field(..., description="The unique identifier of the session") + type: str = Field(..., description="The type of the signal") + type_id: int = Field(..., description="The unique identifier of the type") + status: str = Field(..., description="The status of the signal") + comment: Optional[str] = Field(None, description="The comment for the signal") + timestamp: datetime = Field(..., description="The timestamp when the signal was created") + + +class UTTestResponse(BaseModel): + test_id: int = Field(..., description="The unique identifier of the usability test") + response_id: str = Field(..., description="The type of the signal") + status: str = Field(..., description="The status of the signal") + comment: Optional[str] = Field(None, description="The comment for the signal") + timestamp: datetime = Field(..., description="The timestamp when the signal was created") + + +class UTTestSession(BaseModel): + test_id: int = Field(..., description="The unique identifier of the usability test") + session_id: int = Field(..., description="The unique identifier of the session") + status: str = Field(..., description="The status of the signal") + timestamp: datetime = Field(..., description="The timestamp when the signal was created") + + +class UTTestSessionsSearch(BaseModel): + page: Optional[int] = Field(1, ge=1, description="Page number of the results") + limit: Optional[int] = Field(10, ge=1, le=100, description="Number of results per page") + status: Optional[str] = Field(None, description="The status of the session") + + +class SearchResult(BaseModel): + results: List[UTTestRead] + total: int + page: int + limit: int diff --git a/api/chalicelib/core/usability_testing/service.py b/api/chalicelib/core/usability_testing/service.py new file mode 100644 index 000000000..806d21bdc --- /dev/null +++ b/api/chalicelib/core/usability_testing/service.py @@ -0,0 +1,340 @@ +import logging +from datetime import datetime + +from fastapi import HTTPException, status + +from chalicelib.core.db_request_handler import DatabaseRequestHandler +from chalicelib.core.usability_testing.schema import UTTestCreate, UTTestSearch, UTTestUpdate, UTTestStatusUpdate +from chalicelib.utils.TimeUTC import TimeUTC +from chalicelib.utils.helper import dict_to_camel_case, list_to_camel_case + +table_name = "ut_tests" + + +def search_ui_tests(project_id: int, search: UTTestSearch): + select_columns = [ + "ut.test_id", + "ut.title", + "ut.description", + "ut.created_at", + "ut.updated_at", + "json_build_object('user_id', u.user_id, 'name', u.name) AS created_by" + ] + + db_handler = DatabaseRequestHandler("ut_tests AS ut") + db_handler.set_select_columns([f"COUNT(*) OVER() AS count"] + select_columns) + db_handler.add_join("LEFT JOIN users u ON ut.created_by = u.user_id") + db_handler.add_constraint("ut.project_id = %(project_id)s", {'project_id': project_id}) + db_handler.set_sort_by(f"ut.{search.sort_by} {search.sort_order}") + db_handler.set_pagination(page=search.page, page_size=search.limit) + + if (search.user_id is not None) and (search.user_id != 0): + db_handler.add_constraint("ut.created_by = %(user_id)s", {'user_id': search.user_id}) + + if search.query: + db_handler.add_constraint("ut.title ILIKE %(query)s", {'query': f"%{search.query}%"}) + + rows = db_handler.fetchall() + + if not rows or len(rows) == 0: + return {"data": {"total": 0, "list": []}} + + total = rows[0]["count"] + return { + "data": { + "list": list_to_camel_case(rows), + "total": total, + "page": search.page, + "limit": search.limit + } + } + + +def create_ut_test(test_data: UTTestCreate): + db_handler = DatabaseRequestHandler("ut_tests") + data = { + 'project_id': test_data.project_id, + 'title': test_data.title, + 'description': test_data.description, + 'created_by': test_data.created_by, + 'status': test_data.status, + } + + # Execute the insert query + new_test = db_handler.insert(data) + test_id = new_test['test_id'] + + # Insert tasks + if test_data.tasks: + new_test['tasks'] = insert_tasks(test_id, test_data.tasks) + else: + new_test['tasks'] = [] + + return { + "data": dict_to_camel_case(new_test) + } + + +def insert_tasks(test_id, tasks): + db_handler = DatabaseRequestHandler("ut_tests_tasks") + data = [] + for task in tasks: + data.append({ + 'test_id': test_id, + 'title': task.title, + 'description': task.description, + 'allow_typing': task.allow_typing, + }) + + return db_handler.batch_insert(data) + + +def get_ut_test(project_id: int, test_id: int): + db_handler = DatabaseRequestHandler("ut_tests AS ut") + + tasks_sql = """ + SELECT COALESCE(jsonb_agg(utt ORDER BY task_id), '[]'::jsonb) AS tasks + FROM public.ut_tests_tasks AS utt + WHERE utt.test_id = %(test_id)s + """ + + select_columns = [ + "ut.test_id", + "ut.title", + "ut.description", + "ut.status", + "ut.created_at", + "ut.updated_at", + "json_build_object('id', u.user_id, 'name', u.name) AS created_by" + ] + db_handler.set_select_columns(select_columns + [f"({tasks_sql}) AS tasks"]) + db_handler.add_join("LEFT JOIN users u ON ut.created_by = u.user_id") + db_handler.add_constraint("ut.project_id = %(project_id)s", {'project_id': project_id}) + db_handler.add_constraint("ut.test_id = %(test_id)s", {'test_id': test_id}) + db_handler.add_constraint("ut.deleted_at IS NULL") + + row = db_handler.fetchone() + + if not row: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Test not found") + + row['created_at'] = TimeUTC.datetime_to_timestamp(row['created_at']) + row['updated_at'] = TimeUTC.datetime_to_timestamp(row['updated_at']) + row['tasks'] = [dict_to_camel_case(task) for task in row['tasks']] + + return { + "data": dict_to_camel_case(row) + } + + +def delete_ut_test(project_id: int, test_id: int): + db_handler = DatabaseRequestHandler("ut_tests") + update_data = {'deleted_at': 'NOW()'} # Using a SQL function directly + db_handler.add_constraint("project_id = %(project_id)s", {'project_id': project_id}) + db_handler.add_constraint("test_id = %(test_id)s", {'test_id': test_id}) + db_handler.add_constraint("deleted_at IS NULL") + + try: + db_handler.update(update_data) + return {"status": "success"} + except Exception as e: + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) + + +def check_test_exists(db_handler, project_id, test_id): + db_handler.set_select_columns(['1']) # '1' as a dummy column for existence check + db_handler.add_constraint("project_id = %(project_id)s", {'project_id': project_id}) + db_handler.add_constraint("test_id = %(test_id)s", {'test_id': test_id}) + db_handler.add_constraint("deleted_at IS NULL") + + return bool(db_handler.fetchone()) + + +def update_ut_test(project_id: int, test_id: int, test_update: UTTestUpdate): + db_handler = DatabaseRequestHandler("ut_tests") + + # Check if the test exists + if not check_test_exists(db_handler, project_id, test_id): + return {"status": "error", "message": "Test not found"} + + tasks = test_update.tasks + del test_update.tasks + + update_data = test_update.model_dump(exclude_unset=True) + if not update_data: + return {"status": "no_update"} + + db_handler.constraints.clear() + db_handler.add_constraint("project_id = %(project_id)s", {'project_id': project_id}) + db_handler.add_constraint("test_id = %(test_id)s", {'test_id': test_id}) + db_handler.add_constraint("deleted_at IS NULL") + + result = db_handler.update(update_data) + + if result is None: + return {"status": "error", "message": "No update was made"} + + result['tasks'] = check_tasks_update(db_handler, test_id, tasks) + + return { + "data": dict_to_camel_case(result) + } + + +def check_tasks_update(db_handler, test_id, tasks): + if tasks is None: + return [] + + db_handler = DatabaseRequestHandler("ut_tests_tasks") + existing_tasks = get_test_tasks(db_handler, test_id) + existing_task_ids = {task['task_id'] for task in existing_tasks} + + to_be_updated = [task for task in tasks if task.task_id in existing_task_ids] + to_be_created = [task for task in tasks if task.task_id not in existing_task_ids] + to_be_deleted = existing_task_ids - {task.task_id for task in tasks} + + # Perform batch operations + if to_be_updated: + batch_update_tasks(db_handler, to_be_updated) + + if to_be_created: + insert_tasks(test_id, to_be_created) + + if to_be_deleted: + delete_tasks(db_handler, to_be_deleted) + + return get_test_tasks(db_handler, test_id) + + +def delete_tasks(db_handler, task_ids): + db_handler.constraints.clear() + db_handler.add_constraint("task_id IN %(task_ids)s", {'task_ids': tuple(task_ids)}) + db_handler.delete() + + +def batch_update_tasks(db_handler, tasks): + db_handler = DatabaseRequestHandler("ut_tests_tasks") + data = [] + for task in tasks: + data.append({ + 'task_id': task.task_id, + 'title': task.title, + 'description': task.description, + 'allow_typing': task.allow_typing, + }) + + db_handler.batch_update(data) + + +def get_test_tasks(db_handler, test_id): + db_handler.constraints.clear() + db_handler.set_select_columns(['task_id', 'title', 'description', 'allow_typing']) + db_handler.add_constraint("test_id = %(test_id)s", {'test_id': test_id}) + + return db_handler.fetchall() + + +def ut_tests_sessions(project_id: int, test_id: int, page: int, limit: int): + db_handler = DatabaseRequestHandler("ut_tests_signals AS uts") + db_handler.set_select_columns(["s.*"]) + db_handler.add_join("JOIN sessions s ON uts.session_id = s.session_id AND s.project_id = %(project_id)s") + db_handler.add_constraint("uts.type = %(type)s", {'type': 'test'}) + db_handler.add_constraint("uts.status IN %(status_list)s", {'status_list': ('finished', 'aborted')}) + db_handler.add_constraint("project_id = %(project_id)s", {'project_id': project_id}) + db_handler.add_constraint("uts.type_id = %(test_id)s", {'test_id': test_id}) + db_handler.set_pagination(page, limit) + + sessions = db_handler.fetchall() + + return { + "data": { + "list": list_to_camel_case(sessions), + "page": page, + "limit": limit + } + } + + +def get_responses(project_id: int, test_id: int, task_id: int, page: int = 1, limit: int = 10, query: str = None): + db_handler = DatabaseRequestHandler("ut_tests_signals AS uts") + db_handler.set_select_columns(["uts.*"]) + db_handler.add_constraint("uts.comment IS NOT NULL") + db_handler.add_constraint("uts.type = %(type)s", {'type': 'task'}) + db_handler.add_constraint("uts.status IN %(status_list)s", {'status_list': ('done', 'skipped')}) + # db_handler.add_constraint("project_id = %(project_id)s", {'project_id': project_id}) + db_handler.add_constraint("uts.type_id = %(test_id)s", {'test_id': task_id}) + db_handler.set_pagination(page, limit) + + responses = db_handler.fetchall() + + return { + "data": { + "list": responses, + "page": page, + "limit": limit + } + } + + +def get_statistics(test_id: int): + try: + handler = DatabaseRequestHandler("ut_tests_signals sig") + results = handler.raw_query(""" + WITH TaskCounts AS (SELECT test_id, COUNT(*) as total_tasks + FROM ut_tests_tasks + GROUP BY test_id), + CompletedSessions AS (SELECT s.session_id, s.test_id + FROM ut_tests_signals s + WHERE s.test_id = %(test_id)s + AND s.status = 'done' + AND s.task_id IS NOT NULL + GROUP BY s.session_id, s.test_id + HAVING COUNT(DISTINCT s.task_id) = (SELECT total_tasks FROM TaskCounts + WHERE test_id = s.test_id)) + + SELECT sig.test_id, + sum(case when sig.task_id is null then 1 else 0 end) as tests_attempts, + sum(case when sig.task_id is null and sig.status = 'skipped' then 1 else 0 end) as tests_skipped, + sum(case when sig.task_id is not null and sig.status = 'done' then 1 else 0 end) as tasks_completed, + sum(case when sig.task_id is not null and sig.status = 'skipped' then 1 else 0 end) as tasks_skipped, + (SELECT COUNT(*) FROM CompletedSessions WHERE test_id = sig.test_id) as completed_all_tasks + FROM ut_tests_signals sig + LEFT JOIN TaskCounts tc ON sig.test_id = tc.test_id + WHERE sig.status IN ('done', 'skipped') + AND sig.test_id = %(test_id)s + GROUP BY sig.test_id; + """, params={ + 'test_id': test_id + }) + + if results is None or len(results) == 0: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Test not found") + + return { + "data": results[0] + } + except HTTPException as http_exc: + raise http_exc + except Exception as e: + logging.error(f"Unexpected error occurred: {e}") + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error") + + +def get_task_statistics(test_id: int): + db_handler = DatabaseRequestHandler("ut_tests_tasks utt") + db_handler.set_select_columns([ + "utt.task_id", + "utt.title", + "sum(case when uts.status = 'done' then 1 else 0 end) as completed", + "avg(case when uts.status = 'done' then uts.duration else 0 end) as avg_completion_time", + "sum(case when uts.status = 'skipped' then 1 else 0 end) as skipped" + ]) + db_handler.add_join("JOIN ut_tests_signals uts ON utt.task_id = uts.task_id") + db_handler.add_constraint("utt.test_id = %(test_id)s", {'test_id': test_id}) + db_handler.set_group_by("utt.task_id, utt.title") + + rows = db_handler.fetchall() + + return { + "data": list_to_camel_case(rows) + } diff --git a/api/chalicelib/core/usability_testing/test_usability_testing.py b/api/chalicelib/core/usability_testing/test_usability_testing.py new file mode 100644 index 000000000..8991b4d2b --- /dev/null +++ b/api/chalicelib/core/usability_testing/test_usability_testing.py @@ -0,0 +1,147 @@ +import unittest +import datetime +from unittest.mock import MagicMock, patch + +from fastapi import HTTPException + +from chalicelib.core.usability_testing.service import search_ui_tests, create_ut_test, get_ut_test, delete_ut_test, \ + update_ut_test + +from chalicelib.core.usability_testing.schema import UTTestSearch, UTTestCreate, UTTestUpdate + + +class TestUsabilityTesting(unittest.TestCase): + def setUp(self): + self.mocked_pool = patch('chalicelib.utils.pg_client.postgreSQL_pool').start() + self.mocked_pool.getconn.return_value = MagicMock() + + # Mocking the PostgresClient + self.mock_pg_client = patch('chalicelib.utils.pg_client.PostgresClient').start() + self.mocked_cursor = MagicMock() + self.mock_pg_client.return_value.__enter__.return_value = self.mocked_cursor + + # Mocking init and terminate functions + self.mocked_init = patch('chalicelib.utils.pg_client.init').start() + self.mocked_terminate = patch('chalicelib.utils.pg_client.terminate').start() + + def tearDown(self): + patch.stopall() + + def test_search_ui_tests_returns_correct_data(self): + self.mocked_cursor.fetchall.return_value = [ + { + "count": 1, + "test_id": 123, + "title": "Test", + "description": "Description", + "is_active": True, + "created_by": 1, + "created_at": datetime.datetime.now().isoformat(), + "updated_at": datetime.datetime.now().isoformat(), + }, + ] + + result = search_ui_tests(1, UTTestSearch(page=1, limit=10, sort_by='test_id', sort_order='asc')) + + result = result['data'] + + self.assertEqual(1, len(result['list'])) + self.assertEqual(1, result['total']) + self.assertEqual(1, result['page']) + self.assertEqual(10, result['limit']) + + def test_create_ut_test_creates_record(self): + data = UTTestCreate(title="Test", description="Description", is_active=True, project_id=1, status="preview") + self.mocked_cursor.fetchall.return_value = [ + { + "project_id": 1, + "status": "preview", + "test_id": 123, + "title": "Test", + "description": "Description", + "is_active": True, + "created_by": 1, + "created_at": datetime.datetime.now().isoformat(), + "updated_at": datetime.datetime.now().isoformat(), + } + ] + + result = create_ut_test(data) + + self.assertEqual(result['data']['testId'], 123) + self.assertEqual(result['data']['title'], "Test") + self.assertEqual(result['data']['description'], "Description") + self.assertEqual(result['data']['isActive'], True) + self.assertEqual(result['data']['createdBy'], 1) + self.assertEqual(result['data']['status'], "preview") + + def test_get_ut_test_returns_correct_data(self): + self.mocked_cursor.fetchall.return_value = [ + { + "test_id": 123, + "title": "Test", + "created_by": 1, + "created_at": datetime.datetime.now().isoformat(), + "updated_at": datetime.datetime.now().isoformat(), + "tasks": [ + { + "task_id": 1, + "test_id": 123, + "title": "Task", + "description": "Description", + "allow_typing": True, + } + ] + } + ] + + result = get_ut_test(1, 123) + + self.assertIsNotNone(result['data']) + self.assertEqual(result['data']['testId'], 123) + self.assertEqual(result['data']['title'], "Test") + + self.mocked_cursor.fetchall.return_value = None + with self.assertRaises(HTTPException): + get_ut_test(1, 999) + + def test_delete_ut_test_deletes_record(self): + self.mocked_cursor.return_value = 1 + + result = delete_ut_test(1, 123) + + self.assertEqual(result['status'], 'success') + + # def test_update_ut_test_updates_record(self): + # self.mocked_cursor.fetchall.return_value = [ + # { + # "test_id": 123, + # "title": "Test", + # "created_by": 1, + # "created_at": datetime.datetime.now().isoformat(), + # "updated_at": datetime.datetime.now().isoformat(), + # "tasks": [ + # { + # "task_id": 1, + # "test_id": 123, + # "title": "Task", + # "description": "Description", + # "allow_typing": True, + # } + # ] + # } + # ] + # + # result = update_ut_test(1, 123, UTTestUpdate(title="Updated Test")) + # self.assertEqual(result['status'], 'success') + + # def test_update_status_updates_status(self): + # self.mock_pg_client.PostgresClient.return_value.__enter__.return_value.rowcount = 1 + # + # result = update_status(1, 123, 'active') + # + # self.assertEqual('active', result['status']) + + +if __name__ == '__main__': + unittest.main() diff --git a/api/routers/core.py b/api/routers/core.py index 2073aa09b..7cf288fe6 100644 --- a/api/routers/core.py +++ b/api/routers/core.py @@ -14,6 +14,7 @@ from chalicelib.core import log_tool_rollbar, sourcemaps, events, sessions_assig from chalicelib.core.collaboration_msteams import MSTeams from chalicelib.core.collaboration_slack import Slack from or_dependencies import OR_context, OR_role +from chalicelib.core.usability_testing.routes import app as usability_testing_routes from routers.base import get_routers public_app, app, app_apikey = get_routers() @@ -860,3 +861,6 @@ async def check_recording_status(project_id: int): @public_app.get('/', tags=["health"]) def health_check(): return {} + + +app.include_router(usability_testing_routes) diff --git a/scripts/schema/db/init_dbs/postgresql/1.16.0/1.16.0.sql b/scripts/schema/db/init_dbs/postgresql/1.16.0/1.16.0.sql index 2416358b8..88d701b81 100644 --- a/scripts/schema/db/init_dbs/postgresql/1.16.0/1.16.0.sql +++ b/scripts/schema/db/init_dbs/postgresql/1.16.0/1.16.0.sql @@ -19,6 +19,50 @@ $fn_def$, :'next_version') -- +CREATE TYPE ui_tests_status AS ENUM ('preview', 'in-progress', 'paused', 'closed'); + +CREATE TABLE IF NOT EXISTS ut_tests +( + test_id integer generated BY DEFAULT AS IDENTITY PRIMARY KEY, + project_id integer NOT NULL REFERENCES public.projects (project_id) ON DELETE CASCADE, + title VARCHAR(255) NOT NULL, + starting_path VARCHAR(255) NULL, + status VARCHAR(20) NOT NULL CHECK (status IN ('preview', 'in-progress', 'paused', 'closed')), + require_mic BOOLEAN DEFAULT FALSE, + require_camera BOOLEAN DEFAULT FALSE, + description TEXT NULL, + guidelines TEXT NULL, + conclusion_message TEXT NULL, + created_by integer REFERENCES public.users (user_id) ON DELETE SET NULL, + updated_by integer REFERENCES public.users (user_id) ON DELETE SET NULL, + visibility BOOLEAN DEFAULT FALSE, + created_at timestamp without time zone NOT NULL DEFAULT timezone('utc'::text, now()), + updated_at timestamp without time zone NOT NULL DEFAULT timezone('utc'::text, now()), + deleted_at timestamp without time zone NULL DEFAULT NULL +); + +CREATE TABLE IF NOT EXISTS ut_tests_tasks +( + task_id integer generated BY DEFAULT AS IDENTITY PRIMARY KEY, + test_id integer NOT NULL REFERENCES ut_tests (test_id) ON DELETE CASCADE, + title VARCHAR(255) NOT NULL, + description TEXT NOT NULL, + allow_typing BOOLEAN DEFAULT FALSE, + FOREIGN KEY (test_id) REFERENCES ut_tests (test_id) +); + +CREATE TABLE IF NOT EXISTS ut_tests_signals +( + session_id BIGINT NOT NULL, + test_id BIGINT NOT NULL, + task_id BIGINT NULL, + status VARCHAR(20) NOT NULL CHECK (status IN ('begin', 'done', 'skipped')), + comment TEXT NULL, + timestamp BIGINT NOT NULL, + duration BIGINT NULL, + PRIMARY KEY (session_id, test_id, status, timestamp) +); + CREATE TABLE IF NOT EXISTS events.canvas_recordings ( session_id bigint NOT NULL REFERENCES public.sessions (session_id) ON DELETE CASCADE, diff --git a/scripts/schema/db/init_dbs/postgresql/init_schema.sql b/scripts/schema/db/init_dbs/postgresql/init_schema.sql index e26c4da30..e81f5165c 100644 --- a/scripts/schema/db/init_dbs/postgresql/init_schema.sql +++ b/scripts/schema/db/init_dbs/postgresql/init_schema.sql @@ -1116,6 +1116,51 @@ $$ CREATE INDEX swipes_timestamp_idx ON events_ios.swipes (timestamp); CREATE INDEX swipes_label_session_id_timestamp_idx ON events_ios.swipes (label, session_id, timestamp); + + CREATE TYPE ui_tests_status AS ENUM ('preview', 'in-progress', 'paused', 'closed'); + + CREATE TABLE IF NOT EXISTS ut_tests + ( + test_id integer generated BY DEFAULT AS IDENTITY PRIMARY KEY, + project_id integer NOT NULL REFERENCES public.projects (project_id) ON DELETE CASCADE, + title VARCHAR(255) NOT NULL, + starting_path VARCHAR(255) NULL, + status VARCHAR(20) NOT NULL CHECK (status IN ('preview', 'in-progress', 'paused', 'closed')), + require_mic BOOLEAN DEFAULT FALSE, + require_camera BOOLEAN DEFAULT FALSE, + description TEXT NULL, + guidelines TEXT NULL, + conclusion_message TEXT NULL, + created_by integer REFERENCES public.users (user_id) ON DELETE SET NULL, + updated_by integer REFERENCES public.users (user_id) ON DELETE SET NULL, + visibility BOOLEAN DEFAULT FALSE, + created_at timestamp without time zone NOT NULL DEFAULT timezone('utc'::text, now()), + updated_at timestamp without time zone NOT NULL DEFAULT timezone('utc'::text, now()), + deleted_at timestamp without time zone NULL DEFAULT NULL + ); + + CREATE TABLE IF NOT EXISTS ut_tests_tasks + ( + task_id integer generated BY DEFAULT AS IDENTITY PRIMARY KEY, + test_id integer NOT NULL REFERENCES ut_tests (test_id) ON DELETE CASCADE, + title VARCHAR(255) NOT NULL, + description TEXT NOT NULL, + allow_typing BOOLEAN DEFAULT FALSE, + FOREIGN KEY (test_id) REFERENCES ut_tests (test_id) + ); + + CREATE TABLE IF NOT EXISTS ut_tests_signals + ( + session_id BIGINT NOT NULL, + test_id BIGINT NOT NULL, + task_id BIGINT NULL, + status VARCHAR(20) NOT NULL CHECK (status IN ('begin', 'done', 'skipped')), + comment TEXT NULL, + timestamp BIGINT NOT NULL, + duration BIGINT NULL, + PRIMARY KEY (session_id, test_id, status, timestamp) + ); + CREATE TABLE events.canvas_recordings ( session_id bigint NOT NULL REFERENCES public.sessions (session_id) ON DELETE CASCADE,