Revert "chore(api): asyncify GET /projects, and propagate (#1701)" (#1747)

This reverts commit 23015f5567.

Co-authored-by: ⵄⵎⵉⵔⵓⵛ <amirouche.boubekki@gmail.com>
This commit is contained in:
Kraiem Taha Yassine 2023-12-06 16:15:50 +01:00 committed by GitHub
parent 23015f5567
commit 7368072db2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 53 additions and 133 deletions

View file

@ -10,9 +10,7 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from psycopg import AsyncConnection
from starlette.responses import StreamingResponse
from psycopg.rows import dict_row
import orpy
from chalicelib.utils import helper
from chalicelib.utils import pg_client
from crons import core_crons, core_dynamic_crons
@ -22,6 +20,8 @@ from routers.subs import insights, metrics, v1_api, health, usability_tests
loglevel = config("LOGLEVEL", default=logging.WARNING)
print(f">Loglevel set to: {loglevel}")
logging.basicConfig(level=loglevel)
import orpy
from psycopg.rows import dict_row
class ORPYAsyncConnection(AsyncConnection):

View file

@ -284,8 +284,8 @@ def get_keys_by_projects(project_ids):
# return {"data": get(project_id)}
async def get_remaining_metadata_with_count(tenant_id):
all_projects = await projects.get_projects(tenant_id=tenant_id)
def get_remaining_metadata_with_count(tenant_id):
all_projects = projects.get_projects(tenant_id=tenant_id)
results = []
used_metas = get_batch([p["projectId"] for p in all_projects])
for p in all_projects:

View file

@ -7,7 +7,7 @@ import schemas
from chalicelib.core import users
from chalicelib.utils import pg_client, helper
from chalicelib.utils.TimeUTC import TimeUTC
import orpy
def __exists_by_name(name: str, exclude_id: Optional[int]) -> bool:
with pg_client.PostgresClient() as cur:
@ -52,9 +52,8 @@ def __create(tenant_id, data):
return get_project(tenant_id=tenant_id, project_id=project_id, include_gdpr=True)
async def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False):
async def _get_projects(cnx):
def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False):
with pg_client.PostgresClient() as cur:
extra_projection = ""
if gdpr:
extra_projection += ',s.gdpr'
@ -68,15 +67,16 @@ async def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = Fals
AND sessions.start_ts <= %(now)s
)) AS first_recorded"""
query = f"""{"SELECT *, first_recorded IS NOT NULL AS recorded FROM (" if recorded else ""}
query = cur.mogrify(f"""{"SELECT *, first_recorded IS NOT NULL AS recorded FROM (" if recorded else ""}
SELECT s.project_id, s.name, s.project_key, s.save_request_payloads, s.first_recorded_session_at,
s.created_at, s.sessions_last_check_at, s.sample_rate, s.platform
{extra_projection}
FROM public.projects AS s
WHERE s.deleted_at IS NULL
ORDER BY s.name {") AS raw" if recorded else ""};"""
rows = await cnx.execute(query, {"now": TimeUTC.now(), "check_delta": TimeUTC.MS_HOUR * 4})
rows = await rows.fetchall()
ORDER BY s.name {") AS raw" if recorded else ""};""",
{"now": TimeUTC.now(), "check_delta": TimeUTC.MS_HOUR * 4})
cur.execute(query)
rows = cur.fetchall()
# if recorded is requested, check if it was saved or computed
if recorded:
u_values = []
@ -94,10 +94,11 @@ async def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = Fals
r.pop("first_recorded")
r.pop("sessions_last_check_at")
if len(u_values) > 0:
await cnx.execute(f"""UPDATE public.projects
query = cur.mogrify(f"""UPDATE public.projects
SET sessions_last_check_at=(now() at time zone 'utc'), first_recorded_session_at=u.first_recorded
FROM (VALUES {",".join(u_values)}) AS u(project_id,first_recorded)
WHERE projects.project_id=u.project_id;""", params)
cur.execute(query)
else:
for r in rows:
r["created_at"] = TimeUTC.datetime_to_timestamp(r["created_at"])
@ -105,18 +106,13 @@ async def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = Fals
return helper.list_to_camel_case(rows)
async with orpy.get().database.connection() as cnx:
with cnx.transaction():
out = await _get_projects(cnx)
return out
def get_project(tenant_id, project_id, include_last_session=False, include_gdpr=None):
with pg_client.PostgresClient() as cur:
extra_select = ""
if include_last_session:
extra_select += """,(SELECT max(ss.start_ts)
FROM public.sessions AS ss
extra_select += """,(SELECT max(ss.start_ts)
FROM public.sessions AS ss
WHERE ss.project_id = %(project_id)s) AS last_recorded_session_at"""
if include_gdpr:
extra_select += ",s.gdpr"

View file

@ -1093,12 +1093,12 @@ def search_query_parts(data: schemas.SessionsSearchPayloadSchema, error_status,
return full_args, query_part
async def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None):
def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None):
if project_id is None:
all_projects = await projects.get_projects(tenant_id=tenant_id)
all_projects = projects.get_projects(tenant_id=tenant_id)
else:
all_projects = [
await projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False,
projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False,
include_gdpr=False)]
all_projects = {int(p["projectId"]): p["name"] for p in all_projects}

View file

@ -444,7 +444,7 @@ def change_password(tenant_id, user_id, email, old_password, new_password):
}
async def set_password_invitation(user_id, new_password):
def set_password_invitation(user_id, new_password):
changes = {"password": new_password,
"invitationToken": None, "invitedAt": None,
"changePwdExpireAt": None, "changePwdToken": None}
@ -455,11 +455,11 @@ async def set_password_invitation(user_id, new_password):
r["limits"] = {
"teamMember": -1,
"projects": -1,
"metadata": await metadata.get_remaining_metadata_with_count(tenant_id)}
"metadata": metadata.get_remaining_metadata_with_count(tenant_id)}
c = tenants.get_by_tenant_id(tenant_id)
c.pop("createdAt")
c["projects"] = await projects.get_projects(tenant_id=tenant_id, recorded=True)
c["projects"] = projects.get_projects(tenant_id=tenant_id, recorded=True)
c["smtp"] = smtp.has_smtp()
c["iceServers"] = assist.get_ice_servers()
return {

View file

@ -175,7 +175,7 @@ def process_invitation_link(token: str):
@public_app.post('/password/reset', tags=["users"])
async def change_password_by_invitation(data: schemas.EditPasswordByInvitationSchema = Body(...)):
def change_password_by_invitation(data: schemas.EditPasswordByInvitationSchema = Body(...)):
if data is None or len(data.invitation) < 64 or len(data.passphrase) < 8:
return {"errors": ["please provide a valid invitation & pass"]}
user = users.get_by_invitation_token(token=data.invitation, pass_token=data.passphrase)
@ -184,7 +184,7 @@ async def change_password_by_invitation(data: schemas.EditPasswordByInvitationSc
if user["expiredChange"]:
return {"errors": ["expired change, please re-use the invitation link"]}
return await users.set_password_invitation(new_password=data.password.get_secret_value(), user_id=user["userId"])
return users.set_password_invitation(new_password=data.password.get_secret_value(), user_id=user["userId"])
@app.put('/client/members/{memberId}', tags=["client"], dependencies=[OR_role("owner", "admin")])
@ -195,7 +195,7 @@ def edit_member(memberId: int, data: schemas.EditMemberSchema,
@app.get('/metadata/session_search', tags=["metadata"])
async def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] = None,
def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] = None,
context: schemas.CurrentContext = Depends(OR_context)):
if key is None or value is None or len(value) == 0 and len(key) == 0:
return {"errors": ["please provide a key&value for search"]}
@ -204,13 +204,13 @@ async def search_sessions_by_metadata(key: str, value: str, projectId: Optional[
if len(key) == 0:
return {"errors": ["please provide a key for search"]}
return {
"data": await sessions.search_by_metadata(tenant_id=context.tenant_id, user_id=context.user_id, m_value=value,
"data": sessions.search_by_metadata(tenant_id=context.tenant_id, user_id=context.user_id, m_value=value,
m_key=key, project_id=projectId)}
@app.get('/projects', tags=['projects'])
async def get_projects(context: schemas.CurrentContext = Depends(OR_context)):
return {"data": await projects.get_projects(tenant_id=context.tenant_id, gdpr=True, recorded=True)}
def get_projects(context: schemas.CurrentContext = Depends(OR_context)):
return {"data": projects.get_projects(tenant_id=context.tenant_id, gdpr=True, recorded=True)}
# for backward compatibility

View file

@ -19,4 +19,5 @@ if not tenants.tenants_exists_sync(use_pool=False):
async def get_public_health_status():
if await tenants.tenants_exists():
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Not Found")
return {"data": health.get_health()}

View file

@ -73,8 +73,8 @@ def cancel_job(projectKey: str, jobId: int, _=Body(None), context: schemas.Curre
@app_apikey.get('/v1/projects', tags=["api"])
async def get_projects(context: schemas.CurrentContext = Depends(OR_context)):
records = await projects.get_projects(tenant_id=context.tenant_id)
def get_projects(context: schemas.CurrentContext = Depends(OR_context)):
records = projects.get_projects(tenant_id=context.tenant_id)
for record in records:
del record['projectId']

View file

@ -372,78 +372,6 @@ func (e *Router) featureFlagsHandlerWeb(w http.ResponseWriter, r *http.Request)
ResponseWithJSON(w, resp, startTime, r.URL.Path, bodySize)
}
type ScreenshotMessage struct {
Name string
Data []byte
}
func (e *Router) imagesUploaderHandlerWeb(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r)
if err != nil { // Should accept expired token?
ResponseWithError(w, http.StatusUnauthorized, err, startTime, r.URL.Path, 0)
return
}
if r.Body == nil {
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"), startTime, r.URL.Path, 0)
return
}
r.Body = http.MaxBytesReader(w, r.Body, e.cfg.FileSizeLimit)
defer r.Body.Close()
// Parse the multipart form
err = r.ParseMultipartForm(10 << 20) // Max upload size 10 MB
if err == http.ErrNotMultipart || err == http.ErrMissingBoundary {
ResponseWithError(w, http.StatusUnsupportedMediaType, err, startTime, r.URL.Path, 0)
return
} else if err != nil {
ResponseWithError(w, http.StatusInternalServerError, err, startTime, r.URL.Path, 0) // TODO: send error here only on staging
return
}
// Iterate over uploaded files
for _, fileHeaderList := range r.MultipartForm.File {
for _, fileHeader := range fileHeaderList {
file, err := fileHeader.Open()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Read the file content
fileBytes, err := ioutil.ReadAll(file)
if err != nil {
file.Close()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
file.Close()
fileName := util.SafeString(fileHeader.Filename)
log.Printf("fileName: %s, fileSize: %d", fileName, len(fileBytes))
// Create a message to send to Kafka
msg := ScreenshotMessage{
Name: fileName,
Data: fileBytes,
}
data, err := json.Marshal(&msg)
if err != nil {
log.Printf("can't marshal screenshot message, err: %s", err)
continue
}
// Send the message to queue
if err := e.services.Producer.Produce(e.cfg.TopicCanvasImages, sessionData.ID, data); err != nil {
log.Printf("failed to produce canvas image message: %v", err)
}
}
}
ResponseOK(w, startTime, r.URL.Path, 0)
}
func (e *Router) getUXTestInfo(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
@ -573,7 +501,6 @@ func (e *Router) getUXUploadUrl(w http.ResponseWriter, r *http.Request) {
URL string `json:"url"`
}
ResponseWithJSON(w, &UrlResponse{URL: url}, startTime, r.URL.Path, bodySize)
}
type ScreenshotMessage struct {

View file

@ -7,7 +7,7 @@ import schemas
from chalicelib.core import users
from chalicelib.utils import pg_client, helper
from chalicelib.utils.TimeUTC import TimeUTC
import orpy
def __exists_by_name(tenant_id: int, name: str, exclude_id: Optional[int]) -> bool:
with pg_client.PostgresClient() as cur:
@ -53,9 +53,8 @@ def __create(tenant_id, data):
return get_project(tenant_id=tenant_id, project_id=project_id, include_gdpr=True)
async def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False, user_id: int = None):
async def _get_projects(cnx):
def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False, user_id: int = None):
with pg_client.PostgresClient() as cur:
role_query = """INNER JOIN LATERAL (SELECT 1
FROM users
INNER JOIN roles USING (role_id)
@ -78,7 +77,7 @@ async def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = Fals
AND sessions.start_ts <= %(now)s
)) AS first_recorded"""
rows = await cnx.execute(f"""{"SELECT *, first_recorded IS NOT NULL AS recorded FROM (" if recorded else ""}
query = cur.mogrify(f"""{"SELECT *, first_recorded IS NOT NULL AS recorded FROM (" if recorded else ""}
SELECT s.project_id, s.name, s.project_key, s.save_request_payloads, s.first_recorded_session_at,
s.created_at, s.sessions_last_check_at, s.sample_rate, s.platform
{extra_projection}
@ -89,7 +88,8 @@ async def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = Fals
ORDER BY s.name {") AS raw" if recorded else ""};""",
{"now": TimeUTC.now(), "check_delta": TimeUTC.MS_HOUR * 4,
"tenant_id": tenant_id, "user_id": user_id})
rows = await rows.fetchall()
cur.execute(query)
rows = cur.fetchall()
# if recorded is requested, check if it was saved or computed
if recorded:
u_values = []
@ -107,10 +107,11 @@ async def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = Fals
r.pop("first_recorded")
r.pop("sessions_last_check_at")
if len(u_values) > 0:
cnx.execute(f"""UPDATE public.projects
query = cur.mogrify(f"""UPDATE public.projects
SET sessions_last_check_at=(now() at time zone 'utc'), first_recorded_session_at=u.first_recorded
FROM (VALUES {",".join(u_values)}) AS u(project_id,first_recorded)
WHERE projects.project_id=u.project_id;""", params)
cur.execute(query)
else:
for r in rows:
r["created_at"] = TimeUTC.datetime_to_timestamp(r["created_at"])
@ -118,11 +119,6 @@ async def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = Fals
return helper.list_to_camel_case(rows)
async with orpy.get().database.connection() as cnx:
async with cnx.transaction():
out = await _get_projects(cnx)
return out
def get_project(tenant_id, project_id, include_last_session=False, include_gdpr=None):
with pg_client.PostgresClient() as cur:

View file

@ -1497,12 +1497,12 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
return full_args, query_part
async def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None):
def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None):
if project_id is None:
all_projects = await projects.get_projects(tenant_id=tenant_id)
all_projects = projects.get_projects(tenant_id=tenant_id)
else:
all_projects = [
await projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False,
projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False,
include_gdpr=False)]
all_projects = {int(p["projectId"]): p["name"] for p in all_projects}

View file

@ -523,7 +523,7 @@ def change_password(tenant_id, user_id, email, old_password, new_password):
}
async def set_password_invitation(tenant_id, user_id, new_password):
def set_password_invitation(tenant_id, user_id, new_password):
changes = {"password": new_password,
"invitationToken": None, "invitedAt": None,
"changePwdExpireAt": None, "changePwdToken": None}
@ -534,11 +534,11 @@ async def set_password_invitation(tenant_id, user_id, new_password):
r["limits"] = {
"teamMember": -1,
"projects": -1,
"metadata": await metadata.get_remaining_metadata_with_count(tenant_id)}
"metadata": metadata.get_remaining_metadata_with_count(tenant_id)}
c = tenants.get_by_tenant_id(tenant_id)
c.pop("createdAt")
c["projects"] = await projects.get_projects(tenant_id=tenant_id, recorded=True, user_id=user_id)
c["projects"] = projects.get_projects(tenant_id=tenant_id, recorded=True, user_id=user_id)
c["smtp"] = smtp.has_smtp()
c["iceServers"] = assist.get_ice_servers()
return {

View file

@ -38,7 +38,7 @@ async def get_all_signup():
if config("MULTI_TENANTS", cast=bool, default=False) or not tenants.tenants_exists_sync(use_pool=False):
@public_app.post('/signup', tags=['signup'])
@public_app.put('/signup', tags=['signup'])
async def signup_handler(data: schemas.UserSignupSchema):
async def signup_handler(data: schemas.UserSignupSchema = Body(...)):
content = await signup.create_tenant(data)
if "errors" in content:
return content
@ -51,7 +51,7 @@ if config("MULTI_TENANTS", cast=bool, default=False) or not tenants.tenants_exis
@public_app.post('/login', tags=["authentication"])
def login_user(response: JSONResponse, data: schemas.UserLoginSchema):
def login_user(response: JSONResponse, data: schemas.UserLoginSchema = Body(...)):
if helper.allow_captcha() and not captcha.is_valid(data.g_recaptcha_response):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
@ -182,7 +182,7 @@ async def process_invitation_link(token: str, request: Request):
@public_app.post('/password/reset', tags=["users"])
async def change_password_by_invitation(data: schemas.EditPasswordByInvitationSchema = Body(...)):
def change_password_by_invitation(data: schemas.EditPasswordByInvitationSchema = Body(...)):
if data is None or len(data.invitation) < 64 or len(data.passphrase) < 8:
return {"errors": ["please provide a valid invitation & pass"]}
user = users.get_by_invitation_token(token=data.invitation, pass_token=data.passphrase)
@ -191,7 +191,7 @@ async def change_password_by_invitation(data: schemas.EditPasswordByInvitationSc
if user["expiredChange"]:
return {"errors": ["expired change, please re-use the invitation link"]}
return await users.set_password_invitation(new_password=data.password.get_secret_value(), user_id=user["userId"],
return users.set_password_invitation(new_password=data.password.get_secret_value(), user_id=user["userId"],
tenant_id=user["tenantId"])
@ -203,7 +203,7 @@ def edit_member(memberId: int, data: schemas.EditMemberSchema,
@app.get('/metadata/session_search', tags=["metadata"])
async def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] = None,
def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] = None,
context: schemas.CurrentContext = Depends(OR_context)):
if key is None or value is None or len(value) == 0 and len(key) == 0:
return {"errors": ["please provide a key&value for search"]}
@ -216,13 +216,13 @@ async def search_sessions_by_metadata(key: str, value: str, projectId: Optional[
if len(key) == 0:
return {"errors": ["please provide a key for search"]}
return {
"data": await sessions.search_by_metadata(tenant_id=context.tenant_id, user_id=context.user_id, m_value=value,
"data": sessions.search_by_metadata(tenant_id=context.tenant_id, user_id=context.user_id, m_value=value,
m_key=key, project_id=projectId)}
@app.get('/projects', tags=['projects'])
async def get_projects(context: schemas.CurrentContext = Depends(OR_context)):
return {"data": await projects.get_projects(tenant_id=context.tenant_id, gdpr=True,
def get_projects(context: schemas.CurrentContext = Depends(OR_context)):
return {"data": projects.get_projects(tenant_id=context.tenant_id, gdpr=True,
recorded=True, user_id=context.user_id)}