chore(api): asyncify GET /projects, and propagate (#1701)

* [Backend] Canvas support (#1705)

* feat(http): added new parameters to start response and new endpoint for canvas screenshorts

* fix(http): added new topic to dockerfile

* feat(http): try different multipart parser

* feat(image-storage): reused the same workflow for canvas topic handler

* feat(video-storage): new canvas parser and ffmpeg script

* feat(video-storage): use correct replay name for canvas

* feat(backend): added new message (CanvasNode)

* feat(backend): add canvas record events to db

* Async chalice.core.tenants:tenants_exists, and propagate

* rework after review

* chore(api): asyncify `/projects` and propagate.

Impact on the following routes:

  /projects
  /v1/projects
  /passowrd/reset
  /metadata/session_search

* fix(api): there is no cnx.mogrify method.

In psycopg v3, the mogrify method is only available on cursor objects.

In other words, just use cnx.execute(query, kwargs), except when
opening an "explicit cursor session".

ref: https://www.psycopg.org/psycopg3/docs/api/cursors.html#psycopg.ClientCursor.mogrify

---------

Co-authored-by: Alexander <zavorotynskiy@pm.me>
This commit is contained in:
ⵄⵎⵉⵔⵓⵛ 2023-12-06 16:09:55 +01:00 committed by GitHub
parent fdecd7411e
commit 23015f5567
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 133 additions and 53 deletions

View file

@ -10,7 +10,9 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from psycopg import AsyncConnection
from starlette.responses import StreamingResponse
from psycopg.rows import dict_row
import orpy
from chalicelib.utils import helper
from chalicelib.utils import pg_client
from crons import core_crons, core_dynamic_crons
@ -20,8 +22,6 @@ from routers.subs import insights, metrics, v1_api, health, usability_tests
loglevel = config("LOGLEVEL", default=logging.WARNING)
print(f">Loglevel set to: {loglevel}")
logging.basicConfig(level=loglevel)
import orpy
from psycopg.rows import dict_row
class ORPYAsyncConnection(AsyncConnection):

View file

@ -284,8 +284,8 @@ def get_keys_by_projects(project_ids):
# return {"data": get(project_id)}
def get_remaining_metadata_with_count(tenant_id):
all_projects = projects.get_projects(tenant_id=tenant_id)
async def get_remaining_metadata_with_count(tenant_id):
all_projects = await projects.get_projects(tenant_id=tenant_id)
results = []
used_metas = get_batch([p["projectId"] for p in all_projects])
for p in all_projects:

View file

@ -7,7 +7,7 @@ import schemas
from chalicelib.core import users
from chalicelib.utils import pg_client, helper
from chalicelib.utils.TimeUTC import TimeUTC
import orpy
def __exists_by_name(name: str, exclude_id: Optional[int]) -> bool:
with pg_client.PostgresClient() as cur:
@ -52,8 +52,9 @@ def __create(tenant_id, data):
return get_project(tenant_id=tenant_id, project_id=project_id, include_gdpr=True)
def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False):
with pg_client.PostgresClient() as cur:
async def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False):
async def _get_projects(cnx):
extra_projection = ""
if gdpr:
extra_projection += ',s.gdpr'
@ -67,16 +68,15 @@ def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False):
AND sessions.start_ts <= %(now)s
)) AS first_recorded"""
query = cur.mogrify(f"""{"SELECT *, first_recorded IS NOT NULL AS recorded FROM (" if recorded else ""}
query = f"""{"SELECT *, first_recorded IS NOT NULL AS recorded FROM (" if recorded else ""}
SELECT s.project_id, s.name, s.project_key, s.save_request_payloads, s.first_recorded_session_at,
s.created_at, s.sessions_last_check_at, s.sample_rate, s.platform
{extra_projection}
FROM public.projects AS s
WHERE s.deleted_at IS NULL
ORDER BY s.name {") AS raw" if recorded else ""};""",
{"now": TimeUTC.now(), "check_delta": TimeUTC.MS_HOUR * 4})
cur.execute(query)
rows = cur.fetchall()
ORDER BY s.name {") AS raw" if recorded else ""};"""
rows = await cnx.execute(query, {"now": TimeUTC.now(), "check_delta": TimeUTC.MS_HOUR * 4})
rows = await rows.fetchall()
# if recorded is requested, check if it was saved or computed
if recorded:
u_values = []
@ -94,11 +94,10 @@ def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False):
r.pop("first_recorded")
r.pop("sessions_last_check_at")
if len(u_values) > 0:
query = cur.mogrify(f"""UPDATE public.projects
await cnx.execute(f"""UPDATE public.projects
SET sessions_last_check_at=(now() at time zone 'utc'), first_recorded_session_at=u.first_recorded
FROM (VALUES {",".join(u_values)}) AS u(project_id,first_recorded)
WHERE projects.project_id=u.project_id;""", params)
cur.execute(query)
else:
for r in rows:
r["created_at"] = TimeUTC.datetime_to_timestamp(r["created_at"])
@ -106,13 +105,18 @@ def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False):
return helper.list_to_camel_case(rows)
async with orpy.get().database.connection() as cnx:
with cnx.transaction():
out = await _get_projects(cnx)
return out
def get_project(tenant_id, project_id, include_last_session=False, include_gdpr=None):
with pg_client.PostgresClient() as cur:
extra_select = ""
if include_last_session:
extra_select += """,(SELECT max(ss.start_ts)
FROM public.sessions AS ss
extra_select += """,(SELECT max(ss.start_ts)
FROM public.sessions AS ss
WHERE ss.project_id = %(project_id)s) AS last_recorded_session_at"""
if include_gdpr:
extra_select += ",s.gdpr"

View file

@ -1093,12 +1093,12 @@ def search_query_parts(data: schemas.SessionsSearchPayloadSchema, error_status,
return full_args, query_part
def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None):
async def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None):
if project_id is None:
all_projects = projects.get_projects(tenant_id=tenant_id)
all_projects = await projects.get_projects(tenant_id=tenant_id)
else:
all_projects = [
projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False,
await projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False,
include_gdpr=False)]
all_projects = {int(p["projectId"]): p["name"] for p in all_projects}

View file

@ -444,7 +444,7 @@ def change_password(tenant_id, user_id, email, old_password, new_password):
}
def set_password_invitation(user_id, new_password):
async def set_password_invitation(user_id, new_password):
changes = {"password": new_password,
"invitationToken": None, "invitedAt": None,
"changePwdExpireAt": None, "changePwdToken": None}
@ -455,11 +455,11 @@ def set_password_invitation(user_id, new_password):
r["limits"] = {
"teamMember": -1,
"projects": -1,
"metadata": metadata.get_remaining_metadata_with_count(tenant_id)}
"metadata": await metadata.get_remaining_metadata_with_count(tenant_id)}
c = tenants.get_by_tenant_id(tenant_id)
c.pop("createdAt")
c["projects"] = projects.get_projects(tenant_id=tenant_id, recorded=True)
c["projects"] = await projects.get_projects(tenant_id=tenant_id, recorded=True)
c["smtp"] = smtp.has_smtp()
c["iceServers"] = assist.get_ice_servers()
return {

View file

@ -175,7 +175,7 @@ def process_invitation_link(token: str):
@public_app.post('/password/reset', tags=["users"])
def change_password_by_invitation(data: schemas.EditPasswordByInvitationSchema = Body(...)):
async def change_password_by_invitation(data: schemas.EditPasswordByInvitationSchema = Body(...)):
if data is None or len(data.invitation) < 64 or len(data.passphrase) < 8:
return {"errors": ["please provide a valid invitation & pass"]}
user = users.get_by_invitation_token(token=data.invitation, pass_token=data.passphrase)
@ -184,7 +184,7 @@ def change_password_by_invitation(data: schemas.EditPasswordByInvitationSchema =
if user["expiredChange"]:
return {"errors": ["expired change, please re-use the invitation link"]}
return users.set_password_invitation(new_password=data.password.get_secret_value(), user_id=user["userId"])
return await users.set_password_invitation(new_password=data.password.get_secret_value(), user_id=user["userId"])
@app.put('/client/members/{memberId}', tags=["client"], dependencies=[OR_role("owner", "admin")])
@ -195,7 +195,7 @@ def edit_member(memberId: int, data: schemas.EditMemberSchema,
@app.get('/metadata/session_search', tags=["metadata"])
def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] = None,
async def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] = None,
context: schemas.CurrentContext = Depends(OR_context)):
if key is None or value is None or len(value) == 0 and len(key) == 0:
return {"errors": ["please provide a key&value for search"]}
@ -204,13 +204,13 @@ def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] =
if len(key) == 0:
return {"errors": ["please provide a key for search"]}
return {
"data": sessions.search_by_metadata(tenant_id=context.tenant_id, user_id=context.user_id, m_value=value,
"data": await sessions.search_by_metadata(tenant_id=context.tenant_id, user_id=context.user_id, m_value=value,
m_key=key, project_id=projectId)}
@app.get('/projects', tags=['projects'])
def get_projects(context: schemas.CurrentContext = Depends(OR_context)):
return {"data": projects.get_projects(tenant_id=context.tenant_id, gdpr=True, recorded=True)}
async def get_projects(context: schemas.CurrentContext = Depends(OR_context)):
return {"data": await projects.get_projects(tenant_id=context.tenant_id, gdpr=True, recorded=True)}
# for backward compatibility

View file

@ -19,5 +19,4 @@ if not tenants.tenants_exists_sync(use_pool=False):
async def get_public_health_status():
if await tenants.tenants_exists():
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Not Found")
return {"data": health.get_health()}

View file

@ -73,8 +73,8 @@ def cancel_job(projectKey: str, jobId: int, _=Body(None), context: schemas.Curre
@app_apikey.get('/v1/projects', tags=["api"])
def get_projects(context: schemas.CurrentContext = Depends(OR_context)):
records = projects.get_projects(tenant_id=context.tenant_id)
async def get_projects(context: schemas.CurrentContext = Depends(OR_context)):
records = await projects.get_projects(tenant_id=context.tenant_id)
for record in records:
del record['projectId']

View file

@ -372,6 +372,78 @@ func (e *Router) featureFlagsHandlerWeb(w http.ResponseWriter, r *http.Request)
ResponseWithJSON(w, resp, startTime, r.URL.Path, bodySize)
}
type ScreenshotMessage struct {
Name string
Data []byte
}
func (e *Router) imagesUploaderHandlerWeb(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r)
if err != nil { // Should accept expired token?
ResponseWithError(w, http.StatusUnauthorized, err, startTime, r.URL.Path, 0)
return
}
if r.Body == nil {
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"), startTime, r.URL.Path, 0)
return
}
r.Body = http.MaxBytesReader(w, r.Body, e.cfg.FileSizeLimit)
defer r.Body.Close()
// Parse the multipart form
err = r.ParseMultipartForm(10 << 20) // Max upload size 10 MB
if err == http.ErrNotMultipart || err == http.ErrMissingBoundary {
ResponseWithError(w, http.StatusUnsupportedMediaType, err, startTime, r.URL.Path, 0)
return
} else if err != nil {
ResponseWithError(w, http.StatusInternalServerError, err, startTime, r.URL.Path, 0) // TODO: send error here only on staging
return
}
// Iterate over uploaded files
for _, fileHeaderList := range r.MultipartForm.File {
for _, fileHeader := range fileHeaderList {
file, err := fileHeader.Open()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Read the file content
fileBytes, err := ioutil.ReadAll(file)
if err != nil {
file.Close()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
file.Close()
fileName := util.SafeString(fileHeader.Filename)
log.Printf("fileName: %s, fileSize: %d", fileName, len(fileBytes))
// Create a message to send to Kafka
msg := ScreenshotMessage{
Name: fileName,
Data: fileBytes,
}
data, err := json.Marshal(&msg)
if err != nil {
log.Printf("can't marshal screenshot message, err: %s", err)
continue
}
// Send the message to queue
if err := e.services.Producer.Produce(e.cfg.TopicCanvasImages, sessionData.ID, data); err != nil {
log.Printf("failed to produce canvas image message: %v", err)
}
}
}
ResponseOK(w, startTime, r.URL.Path, 0)
}
func (e *Router) getUXTestInfo(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
@ -501,6 +573,7 @@ func (e *Router) getUXUploadUrl(w http.ResponseWriter, r *http.Request) {
URL string `json:"url"`
}
ResponseWithJSON(w, &UrlResponse{URL: url}, startTime, r.URL.Path, bodySize)
}
type ScreenshotMessage struct {

View file

@ -7,7 +7,7 @@ import schemas
from chalicelib.core import users
from chalicelib.utils import pg_client, helper
from chalicelib.utils.TimeUTC import TimeUTC
import orpy
def __exists_by_name(tenant_id: int, name: str, exclude_id: Optional[int]) -> bool:
with pg_client.PostgresClient() as cur:
@ -53,8 +53,9 @@ def __create(tenant_id, data):
return get_project(tenant_id=tenant_id, project_id=project_id, include_gdpr=True)
def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False, user_id: int = None):
with pg_client.PostgresClient() as cur:
async def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False, user_id: int = None):
async def _get_projects(cnx):
role_query = """INNER JOIN LATERAL (SELECT 1
FROM users
INNER JOIN roles USING (role_id)
@ -77,7 +78,7 @@ def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False, use
AND sessions.start_ts <= %(now)s
)) AS first_recorded"""
query = cur.mogrify(f"""{"SELECT *, first_recorded IS NOT NULL AS recorded FROM (" if recorded else ""}
rows = await cnx.execute(f"""{"SELECT *, first_recorded IS NOT NULL AS recorded FROM (" if recorded else ""}
SELECT s.project_id, s.name, s.project_key, s.save_request_payloads, s.first_recorded_session_at,
s.created_at, s.sessions_last_check_at, s.sample_rate, s.platform
{extra_projection}
@ -88,8 +89,7 @@ def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False, use
ORDER BY s.name {") AS raw" if recorded else ""};""",
{"now": TimeUTC.now(), "check_delta": TimeUTC.MS_HOUR * 4,
"tenant_id": tenant_id, "user_id": user_id})
cur.execute(query)
rows = cur.fetchall()
rows = await rows.fetchall()
# if recorded is requested, check if it was saved or computed
if recorded:
u_values = []
@ -107,11 +107,10 @@ def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False, use
r.pop("first_recorded")
r.pop("sessions_last_check_at")
if len(u_values) > 0:
query = cur.mogrify(f"""UPDATE public.projects
cnx.execute(f"""UPDATE public.projects
SET sessions_last_check_at=(now() at time zone 'utc'), first_recorded_session_at=u.first_recorded
FROM (VALUES {",".join(u_values)}) AS u(project_id,first_recorded)
WHERE projects.project_id=u.project_id;""", params)
cur.execute(query)
else:
for r in rows:
r["created_at"] = TimeUTC.datetime_to_timestamp(r["created_at"])
@ -119,6 +118,11 @@ def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False, use
return helper.list_to_camel_case(rows)
async with orpy.get().database.connection() as cnx:
async with cnx.transaction():
out = await _get_projects(cnx)
return out
def get_project(tenant_id, project_id, include_last_session=False, include_gdpr=None):
with pg_client.PostgresClient() as cur:

View file

@ -1497,12 +1497,12 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
return full_args, query_part
def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None):
async def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None):
if project_id is None:
all_projects = projects.get_projects(tenant_id=tenant_id)
all_projects = await projects.get_projects(tenant_id=tenant_id)
else:
all_projects = [
projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False,
await projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False,
include_gdpr=False)]
all_projects = {int(p["projectId"]): p["name"] for p in all_projects}

View file

@ -523,7 +523,7 @@ def change_password(tenant_id, user_id, email, old_password, new_password):
}
def set_password_invitation(tenant_id, user_id, new_password):
async def set_password_invitation(tenant_id, user_id, new_password):
changes = {"password": new_password,
"invitationToken": None, "invitedAt": None,
"changePwdExpireAt": None, "changePwdToken": None}
@ -534,11 +534,11 @@ def set_password_invitation(tenant_id, user_id, new_password):
r["limits"] = {
"teamMember": -1,
"projects": -1,
"metadata": metadata.get_remaining_metadata_with_count(tenant_id)}
"metadata": await metadata.get_remaining_metadata_with_count(tenant_id)}
c = tenants.get_by_tenant_id(tenant_id)
c.pop("createdAt")
c["projects"] = projects.get_projects(tenant_id=tenant_id, recorded=True, user_id=user_id)
c["projects"] = await projects.get_projects(tenant_id=tenant_id, recorded=True, user_id=user_id)
c["smtp"] = smtp.has_smtp()
c["iceServers"] = assist.get_ice_servers()
return {

View file

@ -38,7 +38,7 @@ async def get_all_signup():
if config("MULTI_TENANTS", cast=bool, default=False) or not tenants.tenants_exists_sync(use_pool=False):
@public_app.post('/signup', tags=['signup'])
@public_app.put('/signup', tags=['signup'])
async def signup_handler(data: schemas.UserSignupSchema = Body(...)):
async def signup_handler(data: schemas.UserSignupSchema):
content = await signup.create_tenant(data)
if "errors" in content:
return content
@ -51,7 +51,7 @@ if config("MULTI_TENANTS", cast=bool, default=False) or not tenants.tenants_exis
@public_app.post('/login', tags=["authentication"])
def login_user(response: JSONResponse, data: schemas.UserLoginSchema = Body(...)):
def login_user(response: JSONResponse, data: schemas.UserLoginSchema):
if helper.allow_captcha() and not captcha.is_valid(data.g_recaptcha_response):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
@ -182,7 +182,7 @@ async def process_invitation_link(token: str, request: Request):
@public_app.post('/password/reset', tags=["users"])
def change_password_by_invitation(data: schemas.EditPasswordByInvitationSchema = Body(...)):
async def change_password_by_invitation(data: schemas.EditPasswordByInvitationSchema = Body(...)):
if data is None or len(data.invitation) < 64 or len(data.passphrase) < 8:
return {"errors": ["please provide a valid invitation & pass"]}
user = users.get_by_invitation_token(token=data.invitation, pass_token=data.passphrase)
@ -191,7 +191,7 @@ def change_password_by_invitation(data: schemas.EditPasswordByInvitationSchema =
if user["expiredChange"]:
return {"errors": ["expired change, please re-use the invitation link"]}
return users.set_password_invitation(new_password=data.password.get_secret_value(), user_id=user["userId"],
return await users.set_password_invitation(new_password=data.password.get_secret_value(), user_id=user["userId"],
tenant_id=user["tenantId"])
@ -203,7 +203,7 @@ def edit_member(memberId: int, data: schemas.EditMemberSchema,
@app.get('/metadata/session_search', tags=["metadata"])
def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] = None,
async def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] = None,
context: schemas.CurrentContext = Depends(OR_context)):
if key is None or value is None or len(value) == 0 and len(key) == 0:
return {"errors": ["please provide a key&value for search"]}
@ -216,13 +216,13 @@ def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] =
if len(key) == 0:
return {"errors": ["please provide a key for search"]}
return {
"data": sessions.search_by_metadata(tenant_id=context.tenant_id, user_id=context.user_id, m_value=value,
"data": await sessions.search_by_metadata(tenant_id=context.tenant_id, user_id=context.user_id, m_value=value,
m_key=key, project_id=projectId)}
@app.get('/projects', tags=['projects'])
def get_projects(context: schemas.CurrentContext = Depends(OR_context)):
return {"data": projects.get_projects(tenant_id=context.tenant_id, gdpr=True,
async def get_projects(context: schemas.CurrentContext = Depends(OR_context)):
return {"data": await projects.get_projects(tenant_id=context.tenant_id, gdpr=True,
recorded=True, user_id=context.user_id)}