feat(api): usability testing (#1686)
* feat(api): usability testing - wip * feat(db): usabiity testing * feat(api): usability testing - api * feat(api): usability testing - api * feat(api): usability testing - db change * feat(api): usability testing - db change * feat(api): usability testing - unit tests update * feat(api): usability testing - test and tasks stats
This commit is contained in:
parent
5040bf9480
commit
595431187f
9 changed files with 1036 additions and 0 deletions
199
api/chalicelib/core/db_request_handler.py
Normal file
199
api/chalicelib/core/db_request_handler.py
Normal file
|
|
@ -0,0 +1,199 @@
|
|||
import logging
|
||||
from chalicelib.utils import helper, pg_client
|
||||
|
||||
|
||||
class DatabaseRequestHandler:
|
||||
def __init__(self, table_name):
|
||||
self.table_name = table_name
|
||||
self.constraints = []
|
||||
self.params = {}
|
||||
self.order_clause = ""
|
||||
self.sort_clause = ""
|
||||
self.select_columns = []
|
||||
self.sub_queries = []
|
||||
self.joins = []
|
||||
self.group_by_clause = ""
|
||||
self.client = pg_client
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.pagination = {}
|
||||
|
||||
def add_constraint(self, constraint, param=None):
|
||||
self.constraints.append(constraint)
|
||||
if param:
|
||||
self.params.update(param)
|
||||
|
||||
def add_subquery(self, subquery, alias, param=None):
|
||||
self.sub_queries.append((subquery, alias))
|
||||
if param:
|
||||
self.params.update(param)
|
||||
|
||||
def add_join(self, join_clause):
|
||||
self.joins.append(join_clause)
|
||||
|
||||
def set_order_by(self, order_by):
|
||||
self.order_clause = order_by
|
||||
|
||||
def set_sort_by(self, sort_by):
|
||||
self.sort_clause = sort_by
|
||||
|
||||
def set_select_columns(self, columns):
|
||||
self.select_columns = columns
|
||||
|
||||
def set_group_by(self, group_by_clause):
|
||||
self.group_by_clause = group_by_clause
|
||||
|
||||
def set_pagination(self, page, page_size):
|
||||
"""
|
||||
Set pagination parameters for the query.
|
||||
:param page: The page number (1-indexed)
|
||||
:param page_size: Number of items per page
|
||||
"""
|
||||
self.pagination = {
|
||||
'offset': (page - 1) * page_size,
|
||||
'limit': page_size
|
||||
}
|
||||
|
||||
def build_query(self, action="select", additional_clauses=None, data=None):
|
||||
|
||||
if action == "select":
|
||||
query = f"SELECT {', '.join(self.select_columns)} FROM {self.table_name}"
|
||||
elif action == "insert":
|
||||
columns = ', '.join(data.keys())
|
||||
placeholders = ', '.join(f'%({k})s' for k in data.keys())
|
||||
query = f"INSERT INTO {self.table_name} ({columns}) VALUES ({placeholders})"
|
||||
elif action == "update":
|
||||
set_clause = ', '.join(f"{k} = %({k})s" for k in data.keys())
|
||||
query = f"UPDATE {self.table_name} SET {set_clause}"
|
||||
elif action == "delete":
|
||||
query = f"DELETE FROM {self.table_name}"
|
||||
|
||||
for join in self.joins:
|
||||
query += f" {join}"
|
||||
for subquery, alias in self.sub_queries:
|
||||
query += f", ({subquery}) AS {alias}"
|
||||
if self.constraints:
|
||||
query += " WHERE " + " AND ".join(self.constraints)
|
||||
if action == "select":
|
||||
if self.group_by_clause:
|
||||
query += " GROUP BY " + self.group_by_clause
|
||||
if self.sort_clause:
|
||||
query += " ORDER BY " + self.sort_clause
|
||||
if self.order_clause:
|
||||
query += " " + self.order_clause
|
||||
if hasattr(self, 'pagination') and self.pagination:
|
||||
query += " LIMIT %(limit)s OFFSET %(offset)s"
|
||||
self.params.update(self.pagination)
|
||||
|
||||
if additional_clauses:
|
||||
query += " " + additional_clauses
|
||||
|
||||
logging.info(f"Query: {query}")
|
||||
return query
|
||||
|
||||
def execute_query(self, query, data=None):
|
||||
try:
|
||||
with self.client.PostgresClient() as cur:
|
||||
mogrified_query = cur.mogrify(query, {**data, **self.params} if data else self.params)
|
||||
cur.execute(mogrified_query)
|
||||
return cur.fetchall() if cur.description else None
|
||||
except Exception as e:
|
||||
self.logger.error(f"Database operation failed: {e}")
|
||||
raise
|
||||
|
||||
def fetchall(self):
|
||||
query = self.build_query()
|
||||
return self.execute_query(query)
|
||||
|
||||
def fetchone(self):
|
||||
query = self.build_query()
|
||||
result = self.execute_query(query)
|
||||
return result[0] if result else None
|
||||
|
||||
def insert(self, data):
|
||||
query = self.build_query(action="insert", data=data)
|
||||
query += " RETURNING *;"
|
||||
|
||||
result = self.execute_query(query, data)
|
||||
return result[0] if result else None
|
||||
|
||||
def update(self, data):
|
||||
query = self.build_query(action="update", data=data)
|
||||
query += " RETURNING *;"
|
||||
|
||||
result = self.execute_query(query, data)
|
||||
return result[0] if result else None
|
||||
|
||||
def delete(self):
|
||||
query = self.build_query(action="delete")
|
||||
return self.execute_query(query)
|
||||
|
||||
def batch_insert(self, items):
|
||||
if not items:
|
||||
return None
|
||||
|
||||
columns = ', '.join(items[0].keys())
|
||||
|
||||
# Building a values string with unique parameter names for each item
|
||||
all_values_query = ', '.join(
|
||||
'(' + ', '.join([f"%({key}_{i})s" for key in item]) + ')'
|
||||
for i, item in enumerate(items)
|
||||
)
|
||||
|
||||
query = f"INSERT INTO {self.table_name} ({columns}) VALUES {all_values_query} RETURNING *;"
|
||||
|
||||
try:
|
||||
with self.client.PostgresClient() as cur:
|
||||
# Flatten items into a single dictionary with unique keys
|
||||
combined_params = {f"{k}_{i}": v for i, item in enumerate(items) for k, v in item.items()}
|
||||
mogrified_query = cur.mogrify(query, combined_params)
|
||||
cur.execute(mogrified_query)
|
||||
return cur.fetchall()
|
||||
except Exception as e:
|
||||
self.logger.error(f"Database batch insert operation failed: {e}")
|
||||
raise
|
||||
|
||||
def raw_query(self, query, params=None):
|
||||
try:
|
||||
with self.client.PostgresClient() as cur:
|
||||
mogrified_query = cur.mogrify(query, params)
|
||||
cur.execute(mogrified_query)
|
||||
return cur.fetchall() if cur.description else None
|
||||
except Exception as e:
|
||||
self.logger.error(f"Database operation failed: {e}")
|
||||
raise
|
||||
|
||||
def batch_update(self, items):
|
||||
if not items:
|
||||
return None
|
||||
|
||||
id_column = list(items[0])[0]
|
||||
|
||||
# Building the set clause for the update statement
|
||||
update_columns = list(items[0].keys())
|
||||
update_columns.remove(id_column)
|
||||
set_clause = ', '.join([f"{col} = v.{col}" for col in update_columns])
|
||||
|
||||
# Building the values part for the 'VALUES' section
|
||||
values_rows = []
|
||||
for item in items:
|
||||
values = ', '.join([f"%({key})s" for key in item.keys()])
|
||||
values_rows.append(f"({values})")
|
||||
values_query = ', '.join(values_rows)
|
||||
|
||||
# Constructing the full update query
|
||||
query = f"""
|
||||
UPDATE {self.table_name} AS t
|
||||
SET {set_clause}
|
||||
FROM (VALUES {values_query}) AS v ({', '.join(items[0].keys())})
|
||||
WHERE t.{id_column} = v.{id_column};
|
||||
"""
|
||||
|
||||
try:
|
||||
with self.client.PostgresClient() as cur:
|
||||
# Flatten items into a single dictionary for mogrify
|
||||
combined_params = {k: v for item in items for k, v in item.items()}
|
||||
mogrified_query = cur.mogrify(query, combined_params)
|
||||
cur.execute(mogrified_query)
|
||||
except Exception as e:
|
||||
self.logger.error(f"Database batch update operation failed: {e}")
|
||||
raise
|
||||
0
api/chalicelib/core/usability_testing/__init__.py
Normal file
0
api/chalicelib/core/usability_testing/__init__.py
Normal file
124
api/chalicelib/core/usability_testing/routes.py
Normal file
124
api/chalicelib/core/usability_testing/routes.py
Normal file
|
|
@ -0,0 +1,124 @@
|
|||
from fastapi import Body, Depends
|
||||
|
||||
from chalicelib.core.usability_testing.schema import UTTestCreate, UTTestRead, UTTestUpdate, UTTestDelete, SearchResult, \
|
||||
UTTestSearch, UTTestSessionsSearch, UTTestResponsesSearch, StatusEnum, UTTestStatusUpdate
|
||||
from chalicelib.core.usability_testing import service
|
||||
from or_dependencies import OR_context
|
||||
from routers.base import get_routers
|
||||
from schemas import schemas
|
||||
|
||||
public_app, app, app_apikey = get_routers()
|
||||
tags = ["usability-tests"]
|
||||
|
||||
|
||||
@app.post("/{project_id}/usability-tests/search", tags=tags)
|
||||
async def search_ui_tests(
|
||||
project_id: int,
|
||||
search: UTTestSearch = Body(...,
|
||||
description="The search parameters including the query, page, limit, sort_by, "
|
||||
"and sort_order.")
|
||||
):
|
||||
"""
|
||||
Search for UT tests within a given project with pagination and optional sorting.
|
||||
|
||||
- **project_id**: The unique identifier of the project to search within.
|
||||
- **search**: The search parameters including the query, page, limit, sort_by, and sort_order.
|
||||
"""
|
||||
|
||||
return service.search_ui_tests(project_id, search)
|
||||
|
||||
|
||||
@app.post("/{project_id}/usability-tests", tags=tags)
|
||||
async def create_ut_test(project_id: int, test_data: UTTestCreate,
|
||||
context: schemas.CurrentContext = Depends(OR_context)):
|
||||
"""
|
||||
Create a new UT test in the specified project.
|
||||
|
||||
- **project_id**: The unique identifier of the project.
|
||||
- **test_data**: The data for the new UT test.
|
||||
"""
|
||||
test_data.project_id = project_id
|
||||
test_data.created_by = context.user_id
|
||||
return service.create_ut_test(test_data)
|
||||
|
||||
|
||||
@app.get("/{project_id}/usability-tests/{test_id}", tags=tags)
|
||||
async def get_ut_test(project_id: int, test_id: int):
|
||||
"""
|
||||
Retrieve a specific UT test by its ID.
|
||||
|
||||
- **project_id**: The unique identifier of the project.
|
||||
- **test_id**: The unique identifier of the UT test.
|
||||
"""
|
||||
return service.get_ut_test(project_id, test_id)
|
||||
|
||||
|
||||
@app.delete("/{project_id}/usability-tests/{test_id}", tags=tags)
|
||||
async def delete_ut_test(project_id: int, test_id: int):
|
||||
"""
|
||||
Delete a specific UT test by its ID.
|
||||
|
||||
- **project_id**: The unique identifier of the project.
|
||||
- **test_id**: The unique identifier of the UT test to be deleted.
|
||||
"""
|
||||
return service.delete_ut_test(project_id, test_id)
|
||||
|
||||
|
||||
@app.put("/{project_id}/usability-tests/{test_id}", tags=tags)
|
||||
async def update_ut_test(project_id: int, test_id: int, test_update: UTTestUpdate):
|
||||
"""
|
||||
Update a specific UT test by its ID.
|
||||
|
||||
- **project_id**: The unique identifier of the project.
|
||||
- **test_id**: The unique identifier of the UT test to be updated.
|
||||
- **test_update**: The updated data for the UT test.
|
||||
"""
|
||||
|
||||
return service.update_ut_test(project_id, test_id, test_update)
|
||||
|
||||
|
||||
@app.get("/{project_id}/usability-tests/{test_id}/sessions", tags=tags)
|
||||
async def get_sessions(project_id: int, test_id: int, page: int = 1, limit: int = 10):
|
||||
"""
|
||||
Get sessions related to a specific UT test.
|
||||
|
||||
- **project_id**: The unique identifier of the project.
|
||||
- **test_id**: The unique identifier of the UT test.
|
||||
"""
|
||||
|
||||
return service.ut_tests_sessions(project_id, test_id, page, limit)
|
||||
|
||||
|
||||
@app.get("/{project_id}/usability-tests/{test_id}/responses/{task_id}", tags=tags)
|
||||
async def get_responses(project_id: int, test_id: int, task_id: int, page: int = 1, limit: int = 10):
|
||||
"""
|
||||
Get responses related to a specific UT test.
|
||||
|
||||
- **project_id**: The unique identifier of the project.
|
||||
- **test_id**: The unique identifier of the UT test.
|
||||
"""
|
||||
return service.get_responses(project_id, test_id, task_id, page, limit)
|
||||
|
||||
|
||||
@app.get("/{project_id}/usability-tests/{test_id}/statistics", tags=tags)
|
||||
async def get_statistics(project_id: int, test_id: int):
|
||||
"""
|
||||
Get statistics related to a specific UT test.
|
||||
|
||||
:param project_id:
|
||||
:param test_id:
|
||||
:return:
|
||||
"""
|
||||
return service.get_statistics(test_id=test_id)
|
||||
|
||||
|
||||
@app.get("/{project_id}/usability-tests/{test_id}/task-statistics", tags=tags)
|
||||
async def get_task_statistics(project_id: int, test_id: int):
|
||||
"""
|
||||
Get statistics related to a specific UT test.
|
||||
|
||||
:param project_id:
|
||||
:param test_id:
|
||||
:return:
|
||||
"""
|
||||
return service.get_task_statistics(test_id=test_id)
|
||||
133
api/chalicelib/core/usability_testing/schema.py
Normal file
133
api/chalicelib/core/usability_testing/schema.py
Normal file
|
|
@ -0,0 +1,133 @@
|
|||
from typing import Optional, List
|
||||
from pydantic import BaseModel, Field
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
|
||||
from pydantic.v1 import validator
|
||||
|
||||
|
||||
class StatusEnum(str, Enum):
|
||||
preview = 'preview'
|
||||
in_progress = 'in-progress'
|
||||
paused = 'paused'
|
||||
closed = 'closed'
|
||||
|
||||
|
||||
class UTTestTask(BaseModel):
|
||||
task_id: Optional[int] = Field(None, description="The unique identifier of the task")
|
||||
test_id: Optional[int] = Field(None, description="The unique identifier of the usability test")
|
||||
title: str = Field(..., description="The title of the task")
|
||||
description: Optional[str] = Field(None, description="A detailed description of the task")
|
||||
allow_typing: Optional[bool] = Field(False, description="Indicates if the user is allowed to type")
|
||||
|
||||
|
||||
class UTTestBase(BaseModel):
|
||||
title: str = Field(..., description="The title of the usability test")
|
||||
project_id: Optional[int] = Field(None, description="The ID of the associated project")
|
||||
created_by: Optional[int] = Field(None, description="The ID of the user who created the test")
|
||||
starting_path: Optional[str] = Field(None, description="The starting path for the usability test")
|
||||
status: Optional[StatusEnum] = Field(StatusEnum.preview, description="The current status of the usability test")
|
||||
require_mic: bool = Field(False, description="Indicates if a microphone is required")
|
||||
require_camera: bool = Field(False, description="Indicates if a camera is required")
|
||||
description: Optional[str] = Field(None, description="A detailed description of the usability test")
|
||||
guidelines: Optional[str] = Field(None, description="Guidelines for the usability test")
|
||||
conclusion_message: Optional[str] = Field(None, description="Conclusion message for the test participants")
|
||||
visibility: bool = Field(False, description="Flag to indicate if the test is visible to the public")
|
||||
tasks: Optional[List[UTTestTask]] = Field(None, description="List of tasks for the usability test")
|
||||
|
||||
|
||||
class UTTestCreate(UTTestBase):
|
||||
pass
|
||||
|
||||
|
||||
class UTTestStatusUpdate(BaseModel):
|
||||
status: StatusEnum = Field(..., description="The updated status of the usability test")
|
||||
|
||||
|
||||
class UTTestRead(UTTestBase):
|
||||
test_id: int = Field(..., description="The unique identifier of the usability test")
|
||||
created_by: Optional[int] = Field(None, description="The ID of the user who created the test")
|
||||
updated_by: Optional[int] = Field(None, description="The ID of the user who last updated the test")
|
||||
created_at: datetime = Field(..., description="The timestamp when the test was created")
|
||||
updated_at: datetime = Field(..., description="The timestamp when the test was last updated")
|
||||
deleted_at: Optional[datetime] = Field(None, description="The timestamp when the test was deleted, if applicable")
|
||||
|
||||
|
||||
class UTTestUpdate(BaseModel):
|
||||
# Optional fields for updating the usability test
|
||||
title: Optional[str] = Field(None, description="The updated title of the usability test")
|
||||
status: Optional[StatusEnum] = Field(None, description="The updated status of the usability test")
|
||||
description: Optional[str] = Field(None, description="The updated description of the usability test")
|
||||
starting_path: Optional[str] = Field(None, description="The updated starting path for the usability test")
|
||||
require_mic: Optional[bool] = Field(None, description="Indicates if a microphone is required")
|
||||
require_camera: Optional[bool] = Field(None, description="Indicates if a camera is required")
|
||||
guidelines: Optional[str] = Field(None, description="Updated guidelines for the usability test")
|
||||
conclusion_message: Optional[str] = Field(None, description="Updated conclusion message for the test participants")
|
||||
visibility: Optional[bool] = Field(None, description="Flag to indicate if the test is visible to the public")
|
||||
tasks: Optional[List[UTTestTask]] = Field([], description="List of tasks for the usability test")
|
||||
|
||||
|
||||
class UTTestDelete(BaseModel):
|
||||
# You would usually not need a model for deletion, but let's assume you need to confirm the deletion timestamp
|
||||
deleted_at: datetime = Field(..., description="The timestamp when the test is marked as deleted")
|
||||
|
||||
|
||||
class UTTestSearch(BaseModel):
|
||||
query: Optional[str] = Field(None, description="Search query for the UT tests")
|
||||
page: Optional[int] = Field(1, ge=1, description="Page number of the results")
|
||||
limit: Optional[int] = Field(10, ge=1, le=100, description="Number of results per page")
|
||||
sort_by: Optional[str] = Field(description="Field to sort by", default="created_at")
|
||||
sort_order: Optional[str] = Field("asc", description="Sort order: 'asc' or 'desc'")
|
||||
is_active: Optional[bool] = Field(True, description="Flag to indicate if the test is active")
|
||||
user_id: Optional[int] = Field(None, description="The ID of the user who created the test")
|
||||
|
||||
@validator('sort_order')
|
||||
def sort_order_must_be_valid(cls, v):
|
||||
if v not in ['asc', 'desc']:
|
||||
raise ValueError('Sort order must be either "asc" or "desc"')
|
||||
return v
|
||||
|
||||
|
||||
class UTTestResponsesSearch(BaseModel):
|
||||
query: Optional[str] = Field(None, description="Search query for the UT responses")
|
||||
page: Optional[int] = Field(1, ge=1, description="Page number of the results")
|
||||
limit: Optional[int] = Field(10, ge=1, le=100, description="Number of results per page")
|
||||
|
||||
|
||||
class UTTestSignal(BaseModel):
|
||||
signal_id: int = Field(..., description="The unique identifier of the response")
|
||||
test_id: int = Field(..., description="The unique identifier of the usability test")
|
||||
session_id: int = Field(..., description="The unique identifier of the session")
|
||||
type: str = Field(..., description="The type of the signal")
|
||||
type_id: int = Field(..., description="The unique identifier of the type")
|
||||
status: str = Field(..., description="The status of the signal")
|
||||
comment: Optional[str] = Field(None, description="The comment for the signal")
|
||||
timestamp: datetime = Field(..., description="The timestamp when the signal was created")
|
||||
|
||||
|
||||
class UTTestResponse(BaseModel):
|
||||
test_id: int = Field(..., description="The unique identifier of the usability test")
|
||||
response_id: str = Field(..., description="The type of the signal")
|
||||
status: str = Field(..., description="The status of the signal")
|
||||
comment: Optional[str] = Field(None, description="The comment for the signal")
|
||||
timestamp: datetime = Field(..., description="The timestamp when the signal was created")
|
||||
|
||||
|
||||
class UTTestSession(BaseModel):
|
||||
test_id: int = Field(..., description="The unique identifier of the usability test")
|
||||
session_id: int = Field(..., description="The unique identifier of the session")
|
||||
status: str = Field(..., description="The status of the signal")
|
||||
timestamp: datetime = Field(..., description="The timestamp when the signal was created")
|
||||
|
||||
|
||||
class UTTestSessionsSearch(BaseModel):
|
||||
page: Optional[int] = Field(1, ge=1, description="Page number of the results")
|
||||
limit: Optional[int] = Field(10, ge=1, le=100, description="Number of results per page")
|
||||
status: Optional[str] = Field(None, description="The status of the session")
|
||||
|
||||
|
||||
class SearchResult(BaseModel):
|
||||
results: List[UTTestRead]
|
||||
total: int
|
||||
page: int
|
||||
limit: int
|
||||
340
api/chalicelib/core/usability_testing/service.py
Normal file
340
api/chalicelib/core/usability_testing/service.py
Normal file
|
|
@ -0,0 +1,340 @@
|
|||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
from fastapi import HTTPException, status
|
||||
|
||||
from chalicelib.core.db_request_handler import DatabaseRequestHandler
|
||||
from chalicelib.core.usability_testing.schema import UTTestCreate, UTTestSearch, UTTestUpdate, UTTestStatusUpdate
|
||||
from chalicelib.utils.TimeUTC import TimeUTC
|
||||
from chalicelib.utils.helper import dict_to_camel_case, list_to_camel_case
|
||||
|
||||
table_name = "ut_tests"
|
||||
|
||||
|
||||
def search_ui_tests(project_id: int, search: UTTestSearch):
|
||||
select_columns = [
|
||||
"ut.test_id",
|
||||
"ut.title",
|
||||
"ut.description",
|
||||
"ut.created_at",
|
||||
"ut.updated_at",
|
||||
"json_build_object('user_id', u.user_id, 'name', u.name) AS created_by"
|
||||
]
|
||||
|
||||
db_handler = DatabaseRequestHandler("ut_tests AS ut")
|
||||
db_handler.set_select_columns([f"COUNT(*) OVER() AS count"] + select_columns)
|
||||
db_handler.add_join("LEFT JOIN users u ON ut.created_by = u.user_id")
|
||||
db_handler.add_constraint("ut.project_id = %(project_id)s", {'project_id': project_id})
|
||||
db_handler.set_sort_by(f"ut.{search.sort_by} {search.sort_order}")
|
||||
db_handler.set_pagination(page=search.page, page_size=search.limit)
|
||||
|
||||
if (search.user_id is not None) and (search.user_id != 0):
|
||||
db_handler.add_constraint("ut.created_by = %(user_id)s", {'user_id': search.user_id})
|
||||
|
||||
if search.query:
|
||||
db_handler.add_constraint("ut.title ILIKE %(query)s", {'query': f"%{search.query}%"})
|
||||
|
||||
rows = db_handler.fetchall()
|
||||
|
||||
if not rows or len(rows) == 0:
|
||||
return {"data": {"total": 0, "list": []}}
|
||||
|
||||
total = rows[0]["count"]
|
||||
return {
|
||||
"data": {
|
||||
"list": list_to_camel_case(rows),
|
||||
"total": total,
|
||||
"page": search.page,
|
||||
"limit": search.limit
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def create_ut_test(test_data: UTTestCreate):
|
||||
db_handler = DatabaseRequestHandler("ut_tests")
|
||||
data = {
|
||||
'project_id': test_data.project_id,
|
||||
'title': test_data.title,
|
||||
'description': test_data.description,
|
||||
'created_by': test_data.created_by,
|
||||
'status': test_data.status,
|
||||
}
|
||||
|
||||
# Execute the insert query
|
||||
new_test = db_handler.insert(data)
|
||||
test_id = new_test['test_id']
|
||||
|
||||
# Insert tasks
|
||||
if test_data.tasks:
|
||||
new_test['tasks'] = insert_tasks(test_id, test_data.tasks)
|
||||
else:
|
||||
new_test['tasks'] = []
|
||||
|
||||
return {
|
||||
"data": dict_to_camel_case(new_test)
|
||||
}
|
||||
|
||||
|
||||
def insert_tasks(test_id, tasks):
|
||||
db_handler = DatabaseRequestHandler("ut_tests_tasks")
|
||||
data = []
|
||||
for task in tasks:
|
||||
data.append({
|
||||
'test_id': test_id,
|
||||
'title': task.title,
|
||||
'description': task.description,
|
||||
'allow_typing': task.allow_typing,
|
||||
})
|
||||
|
||||
return db_handler.batch_insert(data)
|
||||
|
||||
|
||||
def get_ut_test(project_id: int, test_id: int):
|
||||
db_handler = DatabaseRequestHandler("ut_tests AS ut")
|
||||
|
||||
tasks_sql = """
|
||||
SELECT COALESCE(jsonb_agg(utt ORDER BY task_id), '[]'::jsonb) AS tasks
|
||||
FROM public.ut_tests_tasks AS utt
|
||||
WHERE utt.test_id = %(test_id)s
|
||||
"""
|
||||
|
||||
select_columns = [
|
||||
"ut.test_id",
|
||||
"ut.title",
|
||||
"ut.description",
|
||||
"ut.status",
|
||||
"ut.created_at",
|
||||
"ut.updated_at",
|
||||
"json_build_object('id', u.user_id, 'name', u.name) AS created_by"
|
||||
]
|
||||
db_handler.set_select_columns(select_columns + [f"({tasks_sql}) AS tasks"])
|
||||
db_handler.add_join("LEFT JOIN users u ON ut.created_by = u.user_id")
|
||||
db_handler.add_constraint("ut.project_id = %(project_id)s", {'project_id': project_id})
|
||||
db_handler.add_constraint("ut.test_id = %(test_id)s", {'test_id': test_id})
|
||||
db_handler.add_constraint("ut.deleted_at IS NULL")
|
||||
|
||||
row = db_handler.fetchone()
|
||||
|
||||
if not row:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Test not found")
|
||||
|
||||
row['created_at'] = TimeUTC.datetime_to_timestamp(row['created_at'])
|
||||
row['updated_at'] = TimeUTC.datetime_to_timestamp(row['updated_at'])
|
||||
row['tasks'] = [dict_to_camel_case(task) for task in row['tasks']]
|
||||
|
||||
return {
|
||||
"data": dict_to_camel_case(row)
|
||||
}
|
||||
|
||||
|
||||
def delete_ut_test(project_id: int, test_id: int):
|
||||
db_handler = DatabaseRequestHandler("ut_tests")
|
||||
update_data = {'deleted_at': 'NOW()'} # Using a SQL function directly
|
||||
db_handler.add_constraint("project_id = %(project_id)s", {'project_id': project_id})
|
||||
db_handler.add_constraint("test_id = %(test_id)s", {'test_id': test_id})
|
||||
db_handler.add_constraint("deleted_at IS NULL")
|
||||
|
||||
try:
|
||||
db_handler.update(update_data)
|
||||
return {"status": "success"}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
|
||||
|
||||
|
||||
def check_test_exists(db_handler, project_id, test_id):
|
||||
db_handler.set_select_columns(['1']) # '1' as a dummy column for existence check
|
||||
db_handler.add_constraint("project_id = %(project_id)s", {'project_id': project_id})
|
||||
db_handler.add_constraint("test_id = %(test_id)s", {'test_id': test_id})
|
||||
db_handler.add_constraint("deleted_at IS NULL")
|
||||
|
||||
return bool(db_handler.fetchone())
|
||||
|
||||
|
||||
def update_ut_test(project_id: int, test_id: int, test_update: UTTestUpdate):
|
||||
db_handler = DatabaseRequestHandler("ut_tests")
|
||||
|
||||
# Check if the test exists
|
||||
if not check_test_exists(db_handler, project_id, test_id):
|
||||
return {"status": "error", "message": "Test not found"}
|
||||
|
||||
tasks = test_update.tasks
|
||||
del test_update.tasks
|
||||
|
||||
update_data = test_update.model_dump(exclude_unset=True)
|
||||
if not update_data:
|
||||
return {"status": "no_update"}
|
||||
|
||||
db_handler.constraints.clear()
|
||||
db_handler.add_constraint("project_id = %(project_id)s", {'project_id': project_id})
|
||||
db_handler.add_constraint("test_id = %(test_id)s", {'test_id': test_id})
|
||||
db_handler.add_constraint("deleted_at IS NULL")
|
||||
|
||||
result = db_handler.update(update_data)
|
||||
|
||||
if result is None:
|
||||
return {"status": "error", "message": "No update was made"}
|
||||
|
||||
result['tasks'] = check_tasks_update(db_handler, test_id, tasks)
|
||||
|
||||
return {
|
||||
"data": dict_to_camel_case(result)
|
||||
}
|
||||
|
||||
|
||||
def check_tasks_update(db_handler, test_id, tasks):
|
||||
if tasks is None:
|
||||
return []
|
||||
|
||||
db_handler = DatabaseRequestHandler("ut_tests_tasks")
|
||||
existing_tasks = get_test_tasks(db_handler, test_id)
|
||||
existing_task_ids = {task['task_id'] for task in existing_tasks}
|
||||
|
||||
to_be_updated = [task for task in tasks if task.task_id in existing_task_ids]
|
||||
to_be_created = [task for task in tasks if task.task_id not in existing_task_ids]
|
||||
to_be_deleted = existing_task_ids - {task.task_id for task in tasks}
|
||||
|
||||
# Perform batch operations
|
||||
if to_be_updated:
|
||||
batch_update_tasks(db_handler, to_be_updated)
|
||||
|
||||
if to_be_created:
|
||||
insert_tasks(test_id, to_be_created)
|
||||
|
||||
if to_be_deleted:
|
||||
delete_tasks(db_handler, to_be_deleted)
|
||||
|
||||
return get_test_tasks(db_handler, test_id)
|
||||
|
||||
|
||||
def delete_tasks(db_handler, task_ids):
|
||||
db_handler.constraints.clear()
|
||||
db_handler.add_constraint("task_id IN %(task_ids)s", {'task_ids': tuple(task_ids)})
|
||||
db_handler.delete()
|
||||
|
||||
|
||||
def batch_update_tasks(db_handler, tasks):
|
||||
db_handler = DatabaseRequestHandler("ut_tests_tasks")
|
||||
data = []
|
||||
for task in tasks:
|
||||
data.append({
|
||||
'task_id': task.task_id,
|
||||
'title': task.title,
|
||||
'description': task.description,
|
||||
'allow_typing': task.allow_typing,
|
||||
})
|
||||
|
||||
db_handler.batch_update(data)
|
||||
|
||||
|
||||
def get_test_tasks(db_handler, test_id):
|
||||
db_handler.constraints.clear()
|
||||
db_handler.set_select_columns(['task_id', 'title', 'description', 'allow_typing'])
|
||||
db_handler.add_constraint("test_id = %(test_id)s", {'test_id': test_id})
|
||||
|
||||
return db_handler.fetchall()
|
||||
|
||||
|
||||
def ut_tests_sessions(project_id: int, test_id: int, page: int, limit: int):
|
||||
db_handler = DatabaseRequestHandler("ut_tests_signals AS uts")
|
||||
db_handler.set_select_columns(["s.*"])
|
||||
db_handler.add_join("JOIN sessions s ON uts.session_id = s.session_id AND s.project_id = %(project_id)s")
|
||||
db_handler.add_constraint("uts.type = %(type)s", {'type': 'test'})
|
||||
db_handler.add_constraint("uts.status IN %(status_list)s", {'status_list': ('finished', 'aborted')})
|
||||
db_handler.add_constraint("project_id = %(project_id)s", {'project_id': project_id})
|
||||
db_handler.add_constraint("uts.type_id = %(test_id)s", {'test_id': test_id})
|
||||
db_handler.set_pagination(page, limit)
|
||||
|
||||
sessions = db_handler.fetchall()
|
||||
|
||||
return {
|
||||
"data": {
|
||||
"list": list_to_camel_case(sessions),
|
||||
"page": page,
|
||||
"limit": limit
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def get_responses(project_id: int, test_id: int, task_id: int, page: int = 1, limit: int = 10, query: str = None):
|
||||
db_handler = DatabaseRequestHandler("ut_tests_signals AS uts")
|
||||
db_handler.set_select_columns(["uts.*"])
|
||||
db_handler.add_constraint("uts.comment IS NOT NULL")
|
||||
db_handler.add_constraint("uts.type = %(type)s", {'type': 'task'})
|
||||
db_handler.add_constraint("uts.status IN %(status_list)s", {'status_list': ('done', 'skipped')})
|
||||
# db_handler.add_constraint("project_id = %(project_id)s", {'project_id': project_id})
|
||||
db_handler.add_constraint("uts.type_id = %(test_id)s", {'test_id': task_id})
|
||||
db_handler.set_pagination(page, limit)
|
||||
|
||||
responses = db_handler.fetchall()
|
||||
|
||||
return {
|
||||
"data": {
|
||||
"list": responses,
|
||||
"page": page,
|
||||
"limit": limit
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def get_statistics(test_id: int):
|
||||
try:
|
||||
handler = DatabaseRequestHandler("ut_tests_signals sig")
|
||||
results = handler.raw_query("""
|
||||
WITH TaskCounts AS (SELECT test_id, COUNT(*) as total_tasks
|
||||
FROM ut_tests_tasks
|
||||
GROUP BY test_id),
|
||||
CompletedSessions AS (SELECT s.session_id, s.test_id
|
||||
FROM ut_tests_signals s
|
||||
WHERE s.test_id = %(test_id)s
|
||||
AND s.status = 'done'
|
||||
AND s.task_id IS NOT NULL
|
||||
GROUP BY s.session_id, s.test_id
|
||||
HAVING COUNT(DISTINCT s.task_id) = (SELECT total_tasks FROM TaskCounts
|
||||
WHERE test_id = s.test_id))
|
||||
|
||||
SELECT sig.test_id,
|
||||
sum(case when sig.task_id is null then 1 else 0 end) as tests_attempts,
|
||||
sum(case when sig.task_id is null and sig.status = 'skipped' then 1 else 0 end) as tests_skipped,
|
||||
sum(case when sig.task_id is not null and sig.status = 'done' then 1 else 0 end) as tasks_completed,
|
||||
sum(case when sig.task_id is not null and sig.status = 'skipped' then 1 else 0 end) as tasks_skipped,
|
||||
(SELECT COUNT(*) FROM CompletedSessions WHERE test_id = sig.test_id) as completed_all_tasks
|
||||
FROM ut_tests_signals sig
|
||||
LEFT JOIN TaskCounts tc ON sig.test_id = tc.test_id
|
||||
WHERE sig.status IN ('done', 'skipped')
|
||||
AND sig.test_id = %(test_id)s
|
||||
GROUP BY sig.test_id;
|
||||
""", params={
|
||||
'test_id': test_id
|
||||
})
|
||||
|
||||
if results is None or len(results) == 0:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Test not found")
|
||||
|
||||
return {
|
||||
"data": results[0]
|
||||
}
|
||||
except HTTPException as http_exc:
|
||||
raise http_exc
|
||||
except Exception as e:
|
||||
logging.error(f"Unexpected error occurred: {e}")
|
||||
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error")
|
||||
|
||||
|
||||
def get_task_statistics(test_id: int):
|
||||
db_handler = DatabaseRequestHandler("ut_tests_tasks utt")
|
||||
db_handler.set_select_columns([
|
||||
"utt.task_id",
|
||||
"utt.title",
|
||||
"sum(case when uts.status = 'done' then 1 else 0 end) as completed",
|
||||
"avg(case when uts.status = 'done' then uts.duration else 0 end) as avg_completion_time",
|
||||
"sum(case when uts.status = 'skipped' then 1 else 0 end) as skipped"
|
||||
])
|
||||
db_handler.add_join("JOIN ut_tests_signals uts ON utt.task_id = uts.task_id")
|
||||
db_handler.add_constraint("utt.test_id = %(test_id)s", {'test_id': test_id})
|
||||
db_handler.set_group_by("utt.task_id, utt.title")
|
||||
|
||||
rows = db_handler.fetchall()
|
||||
|
||||
return {
|
||||
"data": list_to_camel_case(rows)
|
||||
}
|
||||
147
api/chalicelib/core/usability_testing/test_usability_testing.py
Normal file
147
api/chalicelib/core/usability_testing/test_usability_testing.py
Normal file
|
|
@ -0,0 +1,147 @@
|
|||
import unittest
|
||||
import datetime
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from fastapi import HTTPException
|
||||
|
||||
from chalicelib.core.usability_testing.service import search_ui_tests, create_ut_test, get_ut_test, delete_ut_test, \
|
||||
update_ut_test
|
||||
|
||||
from chalicelib.core.usability_testing.schema import UTTestSearch, UTTestCreate, UTTestUpdate
|
||||
|
||||
|
||||
class TestUsabilityTesting(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.mocked_pool = patch('chalicelib.utils.pg_client.postgreSQL_pool').start()
|
||||
self.mocked_pool.getconn.return_value = MagicMock()
|
||||
|
||||
# Mocking the PostgresClient
|
||||
self.mock_pg_client = patch('chalicelib.utils.pg_client.PostgresClient').start()
|
||||
self.mocked_cursor = MagicMock()
|
||||
self.mock_pg_client.return_value.__enter__.return_value = self.mocked_cursor
|
||||
|
||||
# Mocking init and terminate functions
|
||||
self.mocked_init = patch('chalicelib.utils.pg_client.init').start()
|
||||
self.mocked_terminate = patch('chalicelib.utils.pg_client.terminate').start()
|
||||
|
||||
def tearDown(self):
|
||||
patch.stopall()
|
||||
|
||||
def test_search_ui_tests_returns_correct_data(self):
|
||||
self.mocked_cursor.fetchall.return_value = [
|
||||
{
|
||||
"count": 1,
|
||||
"test_id": 123,
|
||||
"title": "Test",
|
||||
"description": "Description",
|
||||
"is_active": True,
|
||||
"created_by": 1,
|
||||
"created_at": datetime.datetime.now().isoformat(),
|
||||
"updated_at": datetime.datetime.now().isoformat(),
|
||||
},
|
||||
]
|
||||
|
||||
result = search_ui_tests(1, UTTestSearch(page=1, limit=10, sort_by='test_id', sort_order='asc'))
|
||||
|
||||
result = result['data']
|
||||
|
||||
self.assertEqual(1, len(result['list']))
|
||||
self.assertEqual(1, result['total'])
|
||||
self.assertEqual(1, result['page'])
|
||||
self.assertEqual(10, result['limit'])
|
||||
|
||||
def test_create_ut_test_creates_record(self):
|
||||
data = UTTestCreate(title="Test", description="Description", is_active=True, project_id=1, status="preview")
|
||||
self.mocked_cursor.fetchall.return_value = [
|
||||
{
|
||||
"project_id": 1,
|
||||
"status": "preview",
|
||||
"test_id": 123,
|
||||
"title": "Test",
|
||||
"description": "Description",
|
||||
"is_active": True,
|
||||
"created_by": 1,
|
||||
"created_at": datetime.datetime.now().isoformat(),
|
||||
"updated_at": datetime.datetime.now().isoformat(),
|
||||
}
|
||||
]
|
||||
|
||||
result = create_ut_test(data)
|
||||
|
||||
self.assertEqual(result['data']['testId'], 123)
|
||||
self.assertEqual(result['data']['title'], "Test")
|
||||
self.assertEqual(result['data']['description'], "Description")
|
||||
self.assertEqual(result['data']['isActive'], True)
|
||||
self.assertEqual(result['data']['createdBy'], 1)
|
||||
self.assertEqual(result['data']['status'], "preview")
|
||||
|
||||
def test_get_ut_test_returns_correct_data(self):
|
||||
self.mocked_cursor.fetchall.return_value = [
|
||||
{
|
||||
"test_id": 123,
|
||||
"title": "Test",
|
||||
"created_by": 1,
|
||||
"created_at": datetime.datetime.now().isoformat(),
|
||||
"updated_at": datetime.datetime.now().isoformat(),
|
||||
"tasks": [
|
||||
{
|
||||
"task_id": 1,
|
||||
"test_id": 123,
|
||||
"title": "Task",
|
||||
"description": "Description",
|
||||
"allow_typing": True,
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
|
||||
result = get_ut_test(1, 123)
|
||||
|
||||
self.assertIsNotNone(result['data'])
|
||||
self.assertEqual(result['data']['testId'], 123)
|
||||
self.assertEqual(result['data']['title'], "Test")
|
||||
|
||||
self.mocked_cursor.fetchall.return_value = None
|
||||
with self.assertRaises(HTTPException):
|
||||
get_ut_test(1, 999)
|
||||
|
||||
def test_delete_ut_test_deletes_record(self):
|
||||
self.mocked_cursor.return_value = 1
|
||||
|
||||
result = delete_ut_test(1, 123)
|
||||
|
||||
self.assertEqual(result['status'], 'success')
|
||||
|
||||
# def test_update_ut_test_updates_record(self):
|
||||
# self.mocked_cursor.fetchall.return_value = [
|
||||
# {
|
||||
# "test_id": 123,
|
||||
# "title": "Test",
|
||||
# "created_by": 1,
|
||||
# "created_at": datetime.datetime.now().isoformat(),
|
||||
# "updated_at": datetime.datetime.now().isoformat(),
|
||||
# "tasks": [
|
||||
# {
|
||||
# "task_id": 1,
|
||||
# "test_id": 123,
|
||||
# "title": "Task",
|
||||
# "description": "Description",
|
||||
# "allow_typing": True,
|
||||
# }
|
||||
# ]
|
||||
# }
|
||||
# ]
|
||||
#
|
||||
# result = update_ut_test(1, 123, UTTestUpdate(title="Updated Test"))
|
||||
# self.assertEqual(result['status'], 'success')
|
||||
|
||||
# def test_update_status_updates_status(self):
|
||||
# self.mock_pg_client.PostgresClient.return_value.__enter__.return_value.rowcount = 1
|
||||
#
|
||||
# result = update_status(1, 123, 'active')
|
||||
#
|
||||
# self.assertEqual('active', result['status'])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
@ -14,6 +14,7 @@ from chalicelib.core import log_tool_rollbar, sourcemaps, events, sessions_assig
|
|||
from chalicelib.core.collaboration_msteams import MSTeams
|
||||
from chalicelib.core.collaboration_slack import Slack
|
||||
from or_dependencies import OR_context, OR_role
|
||||
from chalicelib.core.usability_testing.routes import app as usability_testing_routes
|
||||
from routers.base import get_routers
|
||||
|
||||
public_app, app, app_apikey = get_routers()
|
||||
|
|
@ -860,3 +861,6 @@ async def check_recording_status(project_id: int):
|
|||
@public_app.get('/', tags=["health"])
|
||||
def health_check():
|
||||
return {}
|
||||
|
||||
|
||||
app.include_router(usability_testing_routes)
|
||||
|
|
|
|||
|
|
@ -19,6 +19,50 @@ $fn_def$, :'next_version')
|
|||
|
||||
--
|
||||
|
||||
CREATE TYPE ui_tests_status AS ENUM ('preview', 'in-progress', 'paused', 'closed');
|
||||
|
||||
CREATE TABLE IF NOT EXISTS ut_tests
|
||||
(
|
||||
test_id integer generated BY DEFAULT AS IDENTITY PRIMARY KEY,
|
||||
project_id integer NOT NULL REFERENCES public.projects (project_id) ON DELETE CASCADE,
|
||||
title VARCHAR(255) NOT NULL,
|
||||
starting_path VARCHAR(255) NULL,
|
||||
status VARCHAR(20) NOT NULL CHECK (status IN ('preview', 'in-progress', 'paused', 'closed')),
|
||||
require_mic BOOLEAN DEFAULT FALSE,
|
||||
require_camera BOOLEAN DEFAULT FALSE,
|
||||
description TEXT NULL,
|
||||
guidelines TEXT NULL,
|
||||
conclusion_message TEXT NULL,
|
||||
created_by integer REFERENCES public.users (user_id) ON DELETE SET NULL,
|
||||
updated_by integer REFERENCES public.users (user_id) ON DELETE SET NULL,
|
||||
visibility BOOLEAN DEFAULT FALSE,
|
||||
created_at timestamp without time zone NOT NULL DEFAULT timezone('utc'::text, now()),
|
||||
updated_at timestamp without time zone NOT NULL DEFAULT timezone('utc'::text, now()),
|
||||
deleted_at timestamp without time zone NULL DEFAULT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS ut_tests_tasks
|
||||
(
|
||||
task_id integer generated BY DEFAULT AS IDENTITY PRIMARY KEY,
|
||||
test_id integer NOT NULL REFERENCES ut_tests (test_id) ON DELETE CASCADE,
|
||||
title VARCHAR(255) NOT NULL,
|
||||
description TEXT NOT NULL,
|
||||
allow_typing BOOLEAN DEFAULT FALSE,
|
||||
FOREIGN KEY (test_id) REFERENCES ut_tests (test_id)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS ut_tests_signals
|
||||
(
|
||||
session_id BIGINT NOT NULL,
|
||||
test_id BIGINT NOT NULL,
|
||||
task_id BIGINT NULL,
|
||||
status VARCHAR(20) NOT NULL CHECK (status IN ('begin', 'done', 'skipped')),
|
||||
comment TEXT NULL,
|
||||
timestamp BIGINT NOT NULL,
|
||||
duration BIGINT NULL,
|
||||
PRIMARY KEY (session_id, test_id, status, timestamp)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS events.canvas_recordings
|
||||
(
|
||||
session_id bigint NOT NULL REFERENCES public.sessions (session_id) ON DELETE CASCADE,
|
||||
|
|
|
|||
|
|
@ -1116,6 +1116,51 @@ $$
|
|||
CREATE INDEX swipes_timestamp_idx ON events_ios.swipes (timestamp);
|
||||
CREATE INDEX swipes_label_session_id_timestamp_idx ON events_ios.swipes (label, session_id, timestamp);
|
||||
|
||||
|
||||
CREATE TYPE ui_tests_status AS ENUM ('preview', 'in-progress', 'paused', 'closed');
|
||||
|
||||
CREATE TABLE IF NOT EXISTS ut_tests
|
||||
(
|
||||
test_id integer generated BY DEFAULT AS IDENTITY PRIMARY KEY,
|
||||
project_id integer NOT NULL REFERENCES public.projects (project_id) ON DELETE CASCADE,
|
||||
title VARCHAR(255) NOT NULL,
|
||||
starting_path VARCHAR(255) NULL,
|
||||
status VARCHAR(20) NOT NULL CHECK (status IN ('preview', 'in-progress', 'paused', 'closed')),
|
||||
require_mic BOOLEAN DEFAULT FALSE,
|
||||
require_camera BOOLEAN DEFAULT FALSE,
|
||||
description TEXT NULL,
|
||||
guidelines TEXT NULL,
|
||||
conclusion_message TEXT NULL,
|
||||
created_by integer REFERENCES public.users (user_id) ON DELETE SET NULL,
|
||||
updated_by integer REFERENCES public.users (user_id) ON DELETE SET NULL,
|
||||
visibility BOOLEAN DEFAULT FALSE,
|
||||
created_at timestamp without time zone NOT NULL DEFAULT timezone('utc'::text, now()),
|
||||
updated_at timestamp without time zone NOT NULL DEFAULT timezone('utc'::text, now()),
|
||||
deleted_at timestamp without time zone NULL DEFAULT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS ut_tests_tasks
|
||||
(
|
||||
task_id integer generated BY DEFAULT AS IDENTITY PRIMARY KEY,
|
||||
test_id integer NOT NULL REFERENCES ut_tests (test_id) ON DELETE CASCADE,
|
||||
title VARCHAR(255) NOT NULL,
|
||||
description TEXT NOT NULL,
|
||||
allow_typing BOOLEAN DEFAULT FALSE,
|
||||
FOREIGN KEY (test_id) REFERENCES ut_tests (test_id)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS ut_tests_signals
|
||||
(
|
||||
session_id BIGINT NOT NULL,
|
||||
test_id BIGINT NOT NULL,
|
||||
task_id BIGINT NULL,
|
||||
status VARCHAR(20) NOT NULL CHECK (status IN ('begin', 'done', 'skipped')),
|
||||
comment TEXT NULL,
|
||||
timestamp BIGINT NOT NULL,
|
||||
duration BIGINT NULL,
|
||||
PRIMARY KEY (session_id, test_id, status, timestamp)
|
||||
);
|
||||
|
||||
CREATE TABLE events.canvas_recordings
|
||||
(
|
||||
session_id bigint NOT NULL REFERENCES public.sessions (session_id) ON DELETE CASCADE,
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue