From 7368072db2931ca2226fd4d3f3ccaea13bd67c5b Mon Sep 17 00:00:00 2001 From: Kraiem Taha Yassine Date: Wed, 6 Dec 2023 16:15:50 +0100 Subject: [PATCH] Revert "chore(api): asyncify GET `/projects`, and propagate (#1701)" (#1747) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 23015f5567c2a0b87047f2cec4521a02b31a884d. Co-authored-by: ⵄⵎⵉⵔⵓⵛ --- api/app.py | 4 +- api/chalicelib/core/metadata.py | 4 +- api/chalicelib/core/projects.py | 28 ++++---- api/chalicelib/core/sessions.py | 6 +- api/chalicelib/core/users.py | 6 +- api/routers/core_dynamic.py | 12 ++-- api/routers/subs/health.py | 1 + api/routers/subs/v1_api.py | 4 +- backend/internal/http/router/handlers-web.go | 73 -------------------- ee/api/chalicelib/core/projects.py | 20 +++--- ee/api/chalicelib/core/sessions_exp.py | 6 +- ee/api/chalicelib/core/users.py | 6 +- ee/api/routers/core_dynamic.py | 16 ++--- 13 files changed, 53 insertions(+), 133 deletions(-) diff --git a/api/app.py b/api/app.py index 4f9c92d72..34b78a2ca 100644 --- a/api/app.py +++ b/api/app.py @@ -10,9 +10,7 @@ from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.gzip import GZipMiddleware from psycopg import AsyncConnection from starlette.responses import StreamingResponse -from psycopg.rows import dict_row -import orpy from chalicelib.utils import helper from chalicelib.utils import pg_client from crons import core_crons, core_dynamic_crons @@ -22,6 +20,8 @@ from routers.subs import insights, metrics, v1_api, health, usability_tests loglevel = config("LOGLEVEL", default=logging.WARNING) print(f">Loglevel set to: {loglevel}") logging.basicConfig(level=loglevel) +import orpy +from psycopg.rows import dict_row class ORPYAsyncConnection(AsyncConnection): diff --git a/api/chalicelib/core/metadata.py b/api/chalicelib/core/metadata.py index 909832b56..e1f00a88e 100644 --- a/api/chalicelib/core/metadata.py +++ b/api/chalicelib/core/metadata.py @@ -284,8 +284,8 @@ def get_keys_by_projects(project_ids): # return {"data": get(project_id)} -async def get_remaining_metadata_with_count(tenant_id): - all_projects = await projects.get_projects(tenant_id=tenant_id) +def get_remaining_metadata_with_count(tenant_id): + all_projects = projects.get_projects(tenant_id=tenant_id) results = [] used_metas = get_batch([p["projectId"] for p in all_projects]) for p in all_projects: diff --git a/api/chalicelib/core/projects.py b/api/chalicelib/core/projects.py index 656085314..42877cf8b 100644 --- a/api/chalicelib/core/projects.py +++ b/api/chalicelib/core/projects.py @@ -7,7 +7,7 @@ import schemas from chalicelib.core import users from chalicelib.utils import pg_client, helper from chalicelib.utils.TimeUTC import TimeUTC -import orpy + def __exists_by_name(name: str, exclude_id: Optional[int]) -> bool: with pg_client.PostgresClient() as cur: @@ -52,9 +52,8 @@ def __create(tenant_id, data): return get_project(tenant_id=tenant_id, project_id=project_id, include_gdpr=True) -async def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False): - - async def _get_projects(cnx): +def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False): + with pg_client.PostgresClient() as cur: extra_projection = "" if gdpr: extra_projection += ',s.gdpr' @@ -68,15 +67,16 @@ async def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = Fals AND sessions.start_ts <= %(now)s )) AS first_recorded""" - query = f"""{"SELECT *, first_recorded IS NOT NULL AS recorded FROM (" if recorded else ""} + query = cur.mogrify(f"""{"SELECT *, first_recorded IS NOT NULL AS recorded FROM (" if recorded else ""} SELECT s.project_id, s.name, s.project_key, s.save_request_payloads, s.first_recorded_session_at, s.created_at, s.sessions_last_check_at, s.sample_rate, s.platform {extra_projection} FROM public.projects AS s WHERE s.deleted_at IS NULL - ORDER BY s.name {") AS raw" if recorded else ""};""" - rows = await cnx.execute(query, {"now": TimeUTC.now(), "check_delta": TimeUTC.MS_HOUR * 4}) - rows = await rows.fetchall() + ORDER BY s.name {") AS raw" if recorded else ""};""", + {"now": TimeUTC.now(), "check_delta": TimeUTC.MS_HOUR * 4}) + cur.execute(query) + rows = cur.fetchall() # if recorded is requested, check if it was saved or computed if recorded: u_values = [] @@ -94,10 +94,11 @@ async def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = Fals r.pop("first_recorded") r.pop("sessions_last_check_at") if len(u_values) > 0: - await cnx.execute(f"""UPDATE public.projects + query = cur.mogrify(f"""UPDATE public.projects SET sessions_last_check_at=(now() at time zone 'utc'), first_recorded_session_at=u.first_recorded FROM (VALUES {",".join(u_values)}) AS u(project_id,first_recorded) WHERE projects.project_id=u.project_id;""", params) + cur.execute(query) else: for r in rows: r["created_at"] = TimeUTC.datetime_to_timestamp(r["created_at"]) @@ -105,18 +106,13 @@ async def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = Fals return helper.list_to_camel_case(rows) - async with orpy.get().database.connection() as cnx: - with cnx.transaction(): - out = await _get_projects(cnx) - return out - def get_project(tenant_id, project_id, include_last_session=False, include_gdpr=None): with pg_client.PostgresClient() as cur: extra_select = "" if include_last_session: - extra_select += """,(SELECT max(ss.start_ts) - FROM public.sessions AS ss + extra_select += """,(SELECT max(ss.start_ts) + FROM public.sessions AS ss WHERE ss.project_id = %(project_id)s) AS last_recorded_session_at""" if include_gdpr: extra_select += ",s.gdpr" diff --git a/api/chalicelib/core/sessions.py b/api/chalicelib/core/sessions.py index f4fad4701..2a16c3cc3 100644 --- a/api/chalicelib/core/sessions.py +++ b/api/chalicelib/core/sessions.py @@ -1093,12 +1093,12 @@ def search_query_parts(data: schemas.SessionsSearchPayloadSchema, error_status, return full_args, query_part -async def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None): +def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None): if project_id is None: - all_projects = await projects.get_projects(tenant_id=tenant_id) + all_projects = projects.get_projects(tenant_id=tenant_id) else: all_projects = [ - await projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False, + projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False, include_gdpr=False)] all_projects = {int(p["projectId"]): p["name"] for p in all_projects} diff --git a/api/chalicelib/core/users.py b/api/chalicelib/core/users.py index a774f0ed8..9dfbbbf14 100644 --- a/api/chalicelib/core/users.py +++ b/api/chalicelib/core/users.py @@ -444,7 +444,7 @@ def change_password(tenant_id, user_id, email, old_password, new_password): } -async def set_password_invitation(user_id, new_password): +def set_password_invitation(user_id, new_password): changes = {"password": new_password, "invitationToken": None, "invitedAt": None, "changePwdExpireAt": None, "changePwdToken": None} @@ -455,11 +455,11 @@ async def set_password_invitation(user_id, new_password): r["limits"] = { "teamMember": -1, "projects": -1, - "metadata": await metadata.get_remaining_metadata_with_count(tenant_id)} + "metadata": metadata.get_remaining_metadata_with_count(tenant_id)} c = tenants.get_by_tenant_id(tenant_id) c.pop("createdAt") - c["projects"] = await projects.get_projects(tenant_id=tenant_id, recorded=True) + c["projects"] = projects.get_projects(tenant_id=tenant_id, recorded=True) c["smtp"] = smtp.has_smtp() c["iceServers"] = assist.get_ice_servers() return { diff --git a/api/routers/core_dynamic.py b/api/routers/core_dynamic.py index c8e10952a..dfadd05c7 100644 --- a/api/routers/core_dynamic.py +++ b/api/routers/core_dynamic.py @@ -175,7 +175,7 @@ def process_invitation_link(token: str): @public_app.post('/password/reset', tags=["users"]) -async def change_password_by_invitation(data: schemas.EditPasswordByInvitationSchema = Body(...)): +def change_password_by_invitation(data: schemas.EditPasswordByInvitationSchema = Body(...)): if data is None or len(data.invitation) < 64 or len(data.passphrase) < 8: return {"errors": ["please provide a valid invitation & pass"]} user = users.get_by_invitation_token(token=data.invitation, pass_token=data.passphrase) @@ -184,7 +184,7 @@ async def change_password_by_invitation(data: schemas.EditPasswordByInvitationSc if user["expiredChange"]: return {"errors": ["expired change, please re-use the invitation link"]} - return await users.set_password_invitation(new_password=data.password.get_secret_value(), user_id=user["userId"]) + return users.set_password_invitation(new_password=data.password.get_secret_value(), user_id=user["userId"]) @app.put('/client/members/{memberId}', tags=["client"], dependencies=[OR_role("owner", "admin")]) @@ -195,7 +195,7 @@ def edit_member(memberId: int, data: schemas.EditMemberSchema, @app.get('/metadata/session_search', tags=["metadata"]) -async def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] = None, +def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] = None, context: schemas.CurrentContext = Depends(OR_context)): if key is None or value is None or len(value) == 0 and len(key) == 0: return {"errors": ["please provide a key&value for search"]} @@ -204,13 +204,13 @@ async def search_sessions_by_metadata(key: str, value: str, projectId: Optional[ if len(key) == 0: return {"errors": ["please provide a key for search"]} return { - "data": await sessions.search_by_metadata(tenant_id=context.tenant_id, user_id=context.user_id, m_value=value, + "data": sessions.search_by_metadata(tenant_id=context.tenant_id, user_id=context.user_id, m_value=value, m_key=key, project_id=projectId)} @app.get('/projects', tags=['projects']) -async def get_projects(context: schemas.CurrentContext = Depends(OR_context)): - return {"data": await projects.get_projects(tenant_id=context.tenant_id, gdpr=True, recorded=True)} +def get_projects(context: schemas.CurrentContext = Depends(OR_context)): + return {"data": projects.get_projects(tenant_id=context.tenant_id, gdpr=True, recorded=True)} # for backward compatibility diff --git a/api/routers/subs/health.py b/api/routers/subs/health.py index 9ef4f8545..245f039c7 100644 --- a/api/routers/subs/health.py +++ b/api/routers/subs/health.py @@ -19,4 +19,5 @@ if not tenants.tenants_exists_sync(use_pool=False): async def get_public_health_status(): if await tenants.tenants_exists(): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Not Found") + return {"data": health.get_health()} diff --git a/api/routers/subs/v1_api.py b/api/routers/subs/v1_api.py index 6264f9c39..b4e9c2aaa 100644 --- a/api/routers/subs/v1_api.py +++ b/api/routers/subs/v1_api.py @@ -73,8 +73,8 @@ def cancel_job(projectKey: str, jobId: int, _=Body(None), context: schemas.Curre @app_apikey.get('/v1/projects', tags=["api"]) -async def get_projects(context: schemas.CurrentContext = Depends(OR_context)): - records = await projects.get_projects(tenant_id=context.tenant_id) +def get_projects(context: schemas.CurrentContext = Depends(OR_context)): + records = projects.get_projects(tenant_id=context.tenant_id) for record in records: del record['projectId'] diff --git a/backend/internal/http/router/handlers-web.go b/backend/internal/http/router/handlers-web.go index f64a70f2a..82825815c 100644 --- a/backend/internal/http/router/handlers-web.go +++ b/backend/internal/http/router/handlers-web.go @@ -372,78 +372,6 @@ func (e *Router) featureFlagsHandlerWeb(w http.ResponseWriter, r *http.Request) ResponseWithJSON(w, resp, startTime, r.URL.Path, bodySize) } -type ScreenshotMessage struct { - Name string - Data []byte -} - -func (e *Router) imagesUploaderHandlerWeb(w http.ResponseWriter, r *http.Request) { - startTime := time.Now() - - sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r) - if err != nil { // Should accept expired token? - ResponseWithError(w, http.StatusUnauthorized, err, startTime, r.URL.Path, 0) - return - } - - if r.Body == nil { - ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"), startTime, r.URL.Path, 0) - return - } - r.Body = http.MaxBytesReader(w, r.Body, e.cfg.FileSizeLimit) - defer r.Body.Close() - - // Parse the multipart form - err = r.ParseMultipartForm(10 << 20) // Max upload size 10 MB - if err == http.ErrNotMultipart || err == http.ErrMissingBoundary { - ResponseWithError(w, http.StatusUnsupportedMediaType, err, startTime, r.URL.Path, 0) - return - } else if err != nil { - ResponseWithError(w, http.StatusInternalServerError, err, startTime, r.URL.Path, 0) // TODO: send error here only on staging - return - } - - // Iterate over uploaded files - for _, fileHeaderList := range r.MultipartForm.File { - for _, fileHeader := range fileHeaderList { - file, err := fileHeader.Open() - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - // Read the file content - fileBytes, err := ioutil.ReadAll(file) - if err != nil { - file.Close() - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - file.Close() - - fileName := util.SafeString(fileHeader.Filename) - log.Printf("fileName: %s, fileSize: %d", fileName, len(fileBytes)) - - // Create a message to send to Kafka - msg := ScreenshotMessage{ - Name: fileName, - Data: fileBytes, - } - data, err := json.Marshal(&msg) - if err != nil { - log.Printf("can't marshal screenshot message, err: %s", err) - continue - } - - // Send the message to queue - if err := e.services.Producer.Produce(e.cfg.TopicCanvasImages, sessionData.ID, data); err != nil { - log.Printf("failed to produce canvas image message: %v", err) - } - } - } - ResponseOK(w, startTime, r.URL.Path, 0) -} - func (e *Router) getUXTestInfo(w http.ResponseWriter, r *http.Request) { startTime := time.Now() bodySize := 0 @@ -573,7 +501,6 @@ func (e *Router) getUXUploadUrl(w http.ResponseWriter, r *http.Request) { URL string `json:"url"` } ResponseWithJSON(w, &UrlResponse{URL: url}, startTime, r.URL.Path, bodySize) - } type ScreenshotMessage struct { diff --git a/ee/api/chalicelib/core/projects.py b/ee/api/chalicelib/core/projects.py index d38244c0b..3d33160bd 100644 --- a/ee/api/chalicelib/core/projects.py +++ b/ee/api/chalicelib/core/projects.py @@ -7,7 +7,7 @@ import schemas from chalicelib.core import users from chalicelib.utils import pg_client, helper from chalicelib.utils.TimeUTC import TimeUTC -import orpy + def __exists_by_name(tenant_id: int, name: str, exclude_id: Optional[int]) -> bool: with pg_client.PostgresClient() as cur: @@ -53,9 +53,8 @@ def __create(tenant_id, data): return get_project(tenant_id=tenant_id, project_id=project_id, include_gdpr=True) -async def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False, user_id: int = None): - - async def _get_projects(cnx): +def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False, user_id: int = None): + with pg_client.PostgresClient() as cur: role_query = """INNER JOIN LATERAL (SELECT 1 FROM users INNER JOIN roles USING (role_id) @@ -78,7 +77,7 @@ async def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = Fals AND sessions.start_ts <= %(now)s )) AS first_recorded""" - rows = await cnx.execute(f"""{"SELECT *, first_recorded IS NOT NULL AS recorded FROM (" if recorded else ""} + query = cur.mogrify(f"""{"SELECT *, first_recorded IS NOT NULL AS recorded FROM (" if recorded else ""} SELECT s.project_id, s.name, s.project_key, s.save_request_payloads, s.first_recorded_session_at, s.created_at, s.sessions_last_check_at, s.sample_rate, s.platform {extra_projection} @@ -89,7 +88,8 @@ async def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = Fals ORDER BY s.name {") AS raw" if recorded else ""};""", {"now": TimeUTC.now(), "check_delta": TimeUTC.MS_HOUR * 4, "tenant_id": tenant_id, "user_id": user_id}) - rows = await rows.fetchall() + cur.execute(query) + rows = cur.fetchall() # if recorded is requested, check if it was saved or computed if recorded: u_values = [] @@ -107,10 +107,11 @@ async def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = Fals r.pop("first_recorded") r.pop("sessions_last_check_at") if len(u_values) > 0: - cnx.execute(f"""UPDATE public.projects + query = cur.mogrify(f"""UPDATE public.projects SET sessions_last_check_at=(now() at time zone 'utc'), first_recorded_session_at=u.first_recorded FROM (VALUES {",".join(u_values)}) AS u(project_id,first_recorded) WHERE projects.project_id=u.project_id;""", params) + cur.execute(query) else: for r in rows: r["created_at"] = TimeUTC.datetime_to_timestamp(r["created_at"]) @@ -118,11 +119,6 @@ async def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = Fals return helper.list_to_camel_case(rows) - async with orpy.get().database.connection() as cnx: - async with cnx.transaction(): - out = await _get_projects(cnx) - return out - def get_project(tenant_id, project_id, include_last_session=False, include_gdpr=None): with pg_client.PostgresClient() as cur: diff --git a/ee/api/chalicelib/core/sessions_exp.py b/ee/api/chalicelib/core/sessions_exp.py index eee2c41fa..9f8876312 100644 --- a/ee/api/chalicelib/core/sessions_exp.py +++ b/ee/api/chalicelib/core/sessions_exp.py @@ -1497,12 +1497,12 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu return full_args, query_part -async def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None): +def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None): if project_id is None: - all_projects = await projects.get_projects(tenant_id=tenant_id) + all_projects = projects.get_projects(tenant_id=tenant_id) else: all_projects = [ - await projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False, + projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False, include_gdpr=False)] all_projects = {int(p["projectId"]): p["name"] for p in all_projects} diff --git a/ee/api/chalicelib/core/users.py b/ee/api/chalicelib/core/users.py index 22bf2b2b9..06dfb485f 100644 --- a/ee/api/chalicelib/core/users.py +++ b/ee/api/chalicelib/core/users.py @@ -523,7 +523,7 @@ def change_password(tenant_id, user_id, email, old_password, new_password): } -async def set_password_invitation(tenant_id, user_id, new_password): +def set_password_invitation(tenant_id, user_id, new_password): changes = {"password": new_password, "invitationToken": None, "invitedAt": None, "changePwdExpireAt": None, "changePwdToken": None} @@ -534,11 +534,11 @@ async def set_password_invitation(tenant_id, user_id, new_password): r["limits"] = { "teamMember": -1, "projects": -1, - "metadata": await metadata.get_remaining_metadata_with_count(tenant_id)} + "metadata": metadata.get_remaining_metadata_with_count(tenant_id)} c = tenants.get_by_tenant_id(tenant_id) c.pop("createdAt") - c["projects"] = await projects.get_projects(tenant_id=tenant_id, recorded=True, user_id=user_id) + c["projects"] = projects.get_projects(tenant_id=tenant_id, recorded=True, user_id=user_id) c["smtp"] = smtp.has_smtp() c["iceServers"] = assist.get_ice_servers() return { diff --git a/ee/api/routers/core_dynamic.py b/ee/api/routers/core_dynamic.py index 57f62150b..bc51a803d 100644 --- a/ee/api/routers/core_dynamic.py +++ b/ee/api/routers/core_dynamic.py @@ -38,7 +38,7 @@ async def get_all_signup(): if config("MULTI_TENANTS", cast=bool, default=False) or not tenants.tenants_exists_sync(use_pool=False): @public_app.post('/signup', tags=['signup']) @public_app.put('/signup', tags=['signup']) - async def signup_handler(data: schemas.UserSignupSchema): + async def signup_handler(data: schemas.UserSignupSchema = Body(...)): content = await signup.create_tenant(data) if "errors" in content: return content @@ -51,7 +51,7 @@ if config("MULTI_TENANTS", cast=bool, default=False) or not tenants.tenants_exis @public_app.post('/login', tags=["authentication"]) -def login_user(response: JSONResponse, data: schemas.UserLoginSchema): +def login_user(response: JSONResponse, data: schemas.UserLoginSchema = Body(...)): if helper.allow_captcha() and not captcha.is_valid(data.g_recaptcha_response): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, @@ -182,7 +182,7 @@ async def process_invitation_link(token: str, request: Request): @public_app.post('/password/reset', tags=["users"]) -async def change_password_by_invitation(data: schemas.EditPasswordByInvitationSchema = Body(...)): +def change_password_by_invitation(data: schemas.EditPasswordByInvitationSchema = Body(...)): if data is None or len(data.invitation) < 64 or len(data.passphrase) < 8: return {"errors": ["please provide a valid invitation & pass"]} user = users.get_by_invitation_token(token=data.invitation, pass_token=data.passphrase) @@ -191,7 +191,7 @@ async def change_password_by_invitation(data: schemas.EditPasswordByInvitationSc if user["expiredChange"]: return {"errors": ["expired change, please re-use the invitation link"]} - return await users.set_password_invitation(new_password=data.password.get_secret_value(), user_id=user["userId"], + return users.set_password_invitation(new_password=data.password.get_secret_value(), user_id=user["userId"], tenant_id=user["tenantId"]) @@ -203,7 +203,7 @@ def edit_member(memberId: int, data: schemas.EditMemberSchema, @app.get('/metadata/session_search', tags=["metadata"]) -async def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] = None, +def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] = None, context: schemas.CurrentContext = Depends(OR_context)): if key is None or value is None or len(value) == 0 and len(key) == 0: return {"errors": ["please provide a key&value for search"]} @@ -216,13 +216,13 @@ async def search_sessions_by_metadata(key: str, value: str, projectId: Optional[ if len(key) == 0: return {"errors": ["please provide a key for search"]} return { - "data": await sessions.search_by_metadata(tenant_id=context.tenant_id, user_id=context.user_id, m_value=value, + "data": sessions.search_by_metadata(tenant_id=context.tenant_id, user_id=context.user_id, m_value=value, m_key=key, project_id=projectId)} @app.get('/projects', tags=['projects']) -async def get_projects(context: schemas.CurrentContext = Depends(OR_context)): - return {"data": await projects.get_projects(tenant_id=context.tenant_id, gdpr=True, +def get_projects(context: schemas.CurrentContext = Depends(OR_context)): + return {"data": projects.get_projects(tenant_id=context.tenant_id, gdpr=True, recorded=True, user_id=context.user_id)}