feat(chalice): refactored projects code

feat(chalice): sessions-check-flag every hour
feat(chalice): sessions-check-delta set to 4 hours
This commit is contained in:
Taha Yassine Kraiem 2023-04-26 17:47:00 +02:00
parent b8dfe90ce8
commit 171373b73e
5 changed files with 31 additions and 51 deletions

View file

@ -58,21 +58,22 @@ def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False):
if gdpr:
extra_projection += ',s.gdpr'
if recorded:
extra_projection += """,COALESCE(EXTRACT(EPOCH FROM s.first_recorded_session_at) * 1000::BIGINT,
extra_projection += """,\nCOALESCE(EXTRACT(EPOCH FROM s.first_recorded_session_at) * 1000::BIGINT,
(SELECT MIN(sessions.start_ts)
FROM public.sessions
WHERE sessions.project_id = s.project_id
AND sessions.start_ts >= (EXTRACT(EPOCH FROM
COALESCE(s.sessions_last_check_at, s.created_at)) * 1000-24*60*60*1000)
AND sessions.start_ts >= (EXTRACT(EPOCH
FROM COALESCE(s.sessions_last_check_at, s.created_at)) * 1000-%(check_delta)s)
AND sessions.start_ts <= %(now)s
)) AS first_recorded"""
query = cur.mogrify(f"""{"SELECT *, first_recorded IS NOT NULL AS recorded FROM (" if recorded else ""}
SELECT s.project_id, s.name, s.project_key, s.save_request_payloads, s.first_recorded_session_at,
created_at {extra_projection}
created_at, sessions_last_check_at {extra_projection}
FROM public.projects AS s
WHERE s.deleted_at IS NULL
ORDER BY s.name {") AS raw" if recorded else ""};""", {"now": TimeUTC.now()})
ORDER BY s.name {") AS raw" if recorded else ""};""",
{"now": TimeUTC.now(), "check_delta": TimeUTC.MS_HOUR * 4})
cur.execute(query)
rows = cur.fetchall()
# if recorded is requested, check if it was saved or computed
@ -80,13 +81,17 @@ def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False):
u_values = []
params = {}
for i, r in enumerate(rows):
r["sessions_last_check_at"] = TimeUTC.datetime_to_timestamp(r["sessions_last_check_at"])
r["created_at"] = TimeUTC.datetime_to_timestamp(r["created_at"])
if r["first_recorded_session_at"] is None:
if r["first_recorded_session_at"] is None \
and r["sessions_last_check_at"] is not None \
and (TimeUTC.now() - r["sessions_last_check_at"]) > TimeUTC.MS_HOUR:
u_values.append(f"(%(project_id_{i})s,to_timestamp(%(first_recorded_{i})s/1000))")
params[f"project_id_{i}"] = r["project_id"]
params[f"first_recorded_{i}"] = r["first_recorded"] if r["recorded"] else None
r.pop("first_recorded_session_at")
r.pop("first_recorded")
r.pop("sessions_last_check_at")
if len(u_values) > 0:
query = cur.mogrify(f"""UPDATE public.projects
SET sessions_last_check_at=(now() at time zone 'utc'), first_recorded_session_at=u.first_recorded
@ -96,6 +101,7 @@ def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False):
else:
for r in rows:
r["created_at"] = TimeUTC.datetime_to_timestamp(r["created_at"])
r.pop("sessions_last_check_at")
return helper.list_to_camel_case(rows)

View file

@ -53,8 +53,7 @@ def __create(tenant_id, name):
return get_project(tenant_id=tenant_id, project_id=project_id, include_gdpr=True)
def get_projects(tenant_id, recording_state=False, gdpr=None, recorded=False, stack_integrations=False, user_id=None):
stack_integrations = False
def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False, user_id: int = None):
with pg_client.PostgresClient() as cur:
role_query = """INNER JOIN LATERAL (SELECT 1
FROM users
@ -66,37 +65,28 @@ def get_projects(tenant_id, recording_state=False, gdpr=None, recorded=False, st
AND (roles.all_projects OR roles_projects.project_id = s.project_id)
LIMIT 1) AS role_project ON (TRUE)"""
extra_projection = ""
extra_join = ""
if gdpr:
extra_projection += ',s.gdpr'
if recorded:
extra_projection += """,COALESCE(nullif(EXTRACT(EPOCH FROM s.first_recorded_session_at) * 1000, NULL)::BIGINT ,
extra_projection += """,\nCOALESCE(EXTRACT(EPOCH FROM s.first_recorded_session_at) * 1000::BIGINT,
(SELECT MIN(sessions.start_ts)
FROM public.sessions
WHERE sessions.project_id = s.project_id
AND sessions.start_ts >= (EXTRACT(EPOCH FROM
COALESCE(s.sessions_last_check_at, s.created_at)) * 1000-24*60*60*1000)
AND sessions.start_ts >= (EXTRACT(EPOCH
FROM COALESCE(s.sessions_last_check_at, s.created_at)) * 1000-%(check_delta)s)
AND sessions.start_ts <= %(now)s
LIMIT 1), NULL) AS first_recorded"""
if stack_integrations:
extra_projection += ',stack_integrations.count>0 AS stack_integrations'
if stack_integrations:
extra_join = """LEFT JOIN LATERAL (SELECT COUNT(*) AS count
FROM public.integrations
WHERE s.project_id = integrations.project_id
LIMIT 1) AS stack_integrations ON TRUE"""
)) AS first_recorded"""
query = cur.mogrify(f"""{"SELECT *, first_recorded IS NOT NULL AS recorded FROM (" if recorded else ""}
SELECT s.project_id, s.name, s.project_key, s.save_request_payloads, s.first_recorded_session_at,
created_at {extra_projection}
created_at, sessions_last_check_at {extra_projection}
FROM public.projects AS s
{extra_join}
{role_query if user_id is not None else ""}
WHERE s.tenant_id =%(tenant_id)s
AND s.deleted_at IS NULL
ORDER BY s.name {") AS raw" if recorded else ""};""",
{"tenant_id": tenant_id, "user_id": user_id, "now": TimeUTC.now()})
{"tenant_id": tenant_id, "user_id": user_id, "now": TimeUTC.now(),
"check_delta": TimeUTC.MS_HOUR * 4})
cur.execute(query)
rows = cur.fetchall()
# if recorded is requested, check if it was saved or computed
@ -104,13 +94,17 @@ def get_projects(tenant_id, recording_state=False, gdpr=None, recorded=False, st
u_values = []
params = {}
for i, r in enumerate(rows):
r["sessions_last_check_at"] = TimeUTC.datetime_to_timestamp(r["sessions_last_check_at"])
r["created_at"] = TimeUTC.datetime_to_timestamp(r["created_at"])
if r["first_recorded_session_at"] is None:
if r["first_recorded_session_at"] is None \
and r["sessions_last_check_at"] is not None \
and (TimeUTC.now() - r["sessions_last_check_at"]) > TimeUTC.MS_HOUR:
u_values.append(f"(%(project_id_{i})s,to_timestamp(%(first_recorded_{i})s/1000))")
params[f"project_id_{i}"] = r["project_id"]
params[f"first_recorded_{i}"] = r["first_recorded"] if r["recorded"] else None
r.pop("first_recorded_session_at")
r.pop("first_recorded")
r.pop("sessions_last_check_at")
if len(u_values) > 0:
query = cur.mogrify(f"""UPDATE public.projects
SET sessions_last_check_at=(now() at time zone 'utc'), first_recorded_session_at=u.first_recorded
@ -120,26 +114,7 @@ def get_projects(tenant_id, recording_state=False, gdpr=None, recorded=False, st
else:
for r in rows:
r["created_at"] = TimeUTC.datetime_to_timestamp(r["created_at"])
if recording_state and len(rows) > 0:
project_ids = [f'({r["project_id"]})' for r in rows]
query = cur.mogrify(f"""SELECT projects.project_id, COALESCE(MAX(start_ts), 0) AS last
FROM (VALUES {",".join(project_ids)}) AS projects(project_id)
LEFT JOIN sessions USING (project_id)
WHERE sessions.start_ts >= %(startDate)s AND sessions.start_ts <= %(endDate)s
GROUP BY project_id;""",
{"startDate": TimeUTC.now(delta_days=-3), "endDate": TimeUTC.now(delta_days=1)})
cur.execute(query=query)
status = cur.fetchall()
for r in rows:
r["status"] = "red"
for s in status:
if s["project_id"] == r["project_id"]:
if TimeUTC.now(-2) <= s["last"] < TimeUTC.now(-1):
r["status"] = "yellow"
elif s["last"] >= TimeUTC.now(-1):
r["status"] = "green"
break
r.pop("sessions_last_check_at")
return helper.list_to_camel_case(rows)

View file

@ -1272,7 +1272,7 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None):
if project_id is None:
all_projects = projects.get_projects(tenant_id=tenant_id, recording_state=False)
all_projects = projects.get_projects(tenant_id=tenant_id)
else:
all_projects = [
projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False,

View file

@ -575,8 +575,7 @@ def set_password_invitation(tenant_id, user_id, new_password):
c = tenants.get_by_tenant_id(tenant_id)
c.pop("createdAt")
c["projects"] = projects.get_projects(tenant_id=tenant_id, recording_state=True, recorded=True,
stack_integrations=True, user_id=user_id)
c["projects"] = projects.get_projects(tenant_id=tenant_id, recorded=True, user_id=user_id)
c["smtp"] = helper.has_smtp()
c["iceServers"] = assist.get_ice_servers()
return {

View file

@ -159,8 +159,8 @@ async def search_sessions_by_metadata(key: str, value: str, projectId: Optional[
@app.get('/projects', tags=['projects'])
async def get_projects(context: schemas.CurrentContext = Depends(OR_context)):
return {"data": projects.get_projects(tenant_id=context.tenant_id, recording_state=True, gdpr=True, recorded=True,
stack_integrations=True, user_id=context.user_id)}
return {"data": projects.get_projects(tenant_id=context.tenant_id, gdpr=True,
recorded=True, user_id=context.user_id)}
# for backward compatibility