Merge pull request #1154 from openreplay/v1.11.0-patch

V1.11.0 patch
This commit is contained in:
Kraiem Taha Yassine 2023-04-12 16:50:05 +01:00 committed by GitHub
commit 83ea01762d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 61 additions and 99 deletions

View file

@ -10,6 +10,7 @@ on:
branches:
- dev
- api-*
- v1.11.0-patch
paths:
- "ee/api/**"
- "api/**"

View file

@ -10,6 +10,7 @@ on:
branches:
- dev
- api-*
- v1.11.0-patch
paths:
- "api/**"
- "!api/.gitignore"

View file

@ -10,6 +10,7 @@ on:
branches:
- dev
- api-*
- v1.11.0-patch
paths:
- "ee/api/**"
- "api/**"

View file

@ -1,6 +1,6 @@
from chalicelib.utils import pg_client, helper
from chalicelib.utils.TimeUTC import TimeUTC
from chalicelib.core import sessions, sessions_mobs, sessions_devtool
from chalicelib.core import sessions_mobs, sessions_devtool
class Actions:
@ -47,13 +47,14 @@ def get_all(project_id):
return helper.list_to_camel_case(data)
def create(project_id, data):
def create(project_id, user_id):
with pg_client.PostgresClient() as cur:
job = {
"status": "scheduled",
"project_id": project_id,
**data
}
job = {"status": "scheduled",
"project_id": project_id,
"action": Actions.DELETE_USER_DATA,
"reference_id": user_id,
"description": f"Delete user sessions of userId = {user_id}",
"start_at": TimeUTC.to_human_readable(TimeUTC.midnight(1))}
query = cur.mogrify(
"""INSERT INTO public.jobs(project_id, description, status, action,reference_id, start_at)
@ -103,6 +104,30 @@ def format_datetime(r):
r["start_at"] = TimeUTC.datetime_to_timestamp(r["start_at"])
def __get_session_ids_by_user_ids(project_id, user_ids):
with pg_client.PostgresClient() as cur:
query = cur.mogrify(
"""SELECT session_id
FROM public.sessions
WHERE project_id = %(project_id)s
AND user_id IN %(userId)s
LIMIT 1000;""",
{"project_id": project_id, "userId": tuple(user_ids)})
cur.execute(query=query)
ids = cur.fetchall()
return [s["session_id"] for s in ids]
def __delete_sessions_by_session_ids(session_ids):
with pg_client.PostgresClient(unlimited_query=True) as cur:
query = cur.mogrify(
"""DELETE FROM public.sessions
WHERE session_id IN %(session_ids)s""",
{"session_ids": tuple(session_ids)}
)
cur.execute(query=query)
def get_scheduled_jobs():
with pg_client.PostgresClient() as cur:
query = cur.mogrify(
@ -122,11 +147,11 @@ def execute_jobs():
print(f"Executing jobId:{job['jobId']}")
try:
if job["action"] == Actions.DELETE_USER_DATA:
session_ids = sessions.get_session_ids_by_user_ids(project_id=job["projectId"],
user_ids=[job["referenceId"]])
session_ids = __get_session_ids_by_user_ids(project_id=job["projectId"],
user_ids=[job["referenceId"]])
if len(session_ids) > 0:
print(f"Deleting {len(session_ids)} sessions")
sessions.delete_sessions_by_session_ids(session_ids)
__delete_sessions_by_session_ids(session_ids)
sessions_mobs.delete_mobs(session_ids=session_ids, project_id=job["projectId"])
sessions_devtool.delete_mobs(session_ids=session_ids, project_id=job["projectId"])
else:

View file

@ -1065,31 +1065,6 @@ def get_session_user(project_id, user_id):
return helper.dict_to_camel_case(data)
def get_session_ids_by_user_ids(project_id, user_ids):
with pg_client.PostgresClient() as cur:
query = cur.mogrify(
"""SELECT session_id
FROM public.sessions
WHERE project_id = %(project_id)s
AND user_id IN %(userId)s;""",
{"project_id": project_id, "userId": tuple(user_ids)})
cur.execute(query=query)
ids = cur.fetchall()
return [s["session_id"] for s in ids]
def delete_sessions_by_session_ids(session_ids):
with pg_client.PostgresClient(unlimited_query=True) as cur:
query = cur.mogrify(
"""DELETE FROM public.sessions
WHERE session_id IN %(session_ids)s;""",
{"session_ids": tuple(session_ids)}
)
cur.execute(query=query)
return True
def count_all():
with pg_client.PostgresClient(unlimited_query=True) as cur:
cur.execute(query="SELECT COUNT(session_id) AS count FROM public.sessions")

View file

@ -115,7 +115,7 @@ def schedule_for_deletion(bucket, key):
s3 = __get_s3_resource()
s3_object = s3.Object(bucket, key)
s3_object.copy_from(CopySource={'Bucket': bucket, 'Key': key},
Expires=datetime.utcnow() + timedelta(days=config("SCH_DELETE_DAYS", cast=int, default=7)),
Expires=datetime.utcnow() + timedelta(days=config("SCH_DELETE_DAYS", cast=int, default=30)),
MetadataDirective='REPLACE')
return True

View file

@ -54,4 +54,4 @@ ASSIST_JWT_EXPIRATION=144000
ASSIST_JWT_SECRET=
PYTHONUNBUFFERED=1
REDIS_STRING=redis://redis-master.db.svc.cluster.local:6379
SCH_DELETE_DAYS=7
SCH_DELETE_DAYS=30

View file

@ -1,5 +1,4 @@
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from chalicelib.core import telemetry
from chalicelib.core import weekly_report, jobs
@ -20,7 +19,7 @@ async def telemetry_cron() -> None:
cron_jobs = [
{"func": telemetry_cron, "trigger": CronTrigger(day_of_week="*"),
"misfire_grace_time": 60 * 60, "max_instances": 1},
{"func": run_scheduled_jobs, "trigger": IntervalTrigger(minutes=1),
{"func": run_scheduled_jobs, "trigger": CronTrigger(day_of_week="*", hour=0, minute=15),
"misfire_grace_time": 20, "max_instances": 1},
{"func": weekly_report2, "trigger": CronTrigger(day_of_week="mon", hour=5),
"misfire_grace_time": 60 * 60, "max_instances": 1}

View file

@ -2,7 +2,6 @@ from fastapi import Depends, Body
import schemas
from chalicelib.core import sessions, events, jobs, projects
from chalicelib.utils.TimeUTC import TimeUTC
from or_dependencies import OR_context
from routers.base import get_routers
@ -15,7 +14,7 @@ async def get_user_sessions(projectKey: str, userId: str, start_date: int = None
if projectId is None:
return {"errors": ["invalid projectKey"]}
return {
'data': sessions.get_user_sessions(
"data": sessions.get_user_sessions(
project_id=projectId,
user_id=userId,
start_date=start_date,
@ -30,7 +29,7 @@ async def get_session_events(projectKey: str, sessionId: int):
if projectId is None:
return {"errors": ["invalid projectKey"]}
return {
'data': events.get_by_session_id(
"data": events.get_by_session_id(
project_id=projectId,
session_id=sessionId
)
@ -43,7 +42,7 @@ async def get_user_details(projectKey: str, userId: str):
if projectId is None:
return {"errors": ["invalid projectKey"]}
return {
'data': sessions.get_session_user(
"data": sessions.get_session_user(
project_id=projectId,
user_id=userId
)
@ -55,14 +54,8 @@ async def schedule_to_delete_user_data(projectKey: str, userId: str):
projectId = projects.get_internal_project_id(projectKey)
if projectId is None:
return {"errors": ["invalid projectKey"]}
data = {"action": "delete_user_data",
"reference_id": userId,
"description": f"Delete user sessions of userId = {userId}",
"start_at": TimeUTC.to_human_readable(TimeUTC.midnight(1))}
record = jobs.create(project_id=projectId, data=data)
return {
'data': record
}
record = jobs.create(project_id=projectId, user_id=userId)
return {"data": record}
@app_apikey.get('/v1/{projectKey}/jobs', tags=["api"])
@ -70,16 +63,12 @@ async def get_jobs(projectKey: str):
projectId = projects.get_internal_project_id(projectKey)
if projectId is None:
return {"errors": ["invalid projectKey"]}
return {
'data': jobs.get_all(project_id=projectId)
}
return {"data": jobs.get_all(project_id=projectId)}
@app_apikey.get('/v1/{projectKey}/jobs/{jobId}', tags=["api"])
async def get_job(projectKey: str, jobId: int):
return {
'data': jobs.get(job_id=jobId)
}
return {"data": jobs.get(job_id=jobId)}
@app_apikey.delete('/v1/{projectKey}/jobs/{jobId}', tags=["api"])
@ -93,9 +82,7 @@ async def cancel_job(projectKey: str, jobId: int):
return {"errors": ["The request job has already been canceled/completed."]}
job["status"] = "cancelled"
return {
'data': jobs.update(job_id=jobId, job=job)
}
return {"data": jobs.update(job_id=jobId, job=job)}
@app_apikey.get('/v1/projects', tags=["api"])
@ -104,15 +91,13 @@ async def get_projects(context: schemas.CurrentContext = Depends(OR_context)):
for record in records:
del record['projectId']
return {
'data': records
}
return {"data": records}
@app_apikey.get('/v1/projects/{projectKey}', tags=["api"])
async def get_project(projectKey: str, context: schemas.CurrentContext = Depends(OR_context)):
return {
'data': projects.get_project_by_key(tenant_id=context.tenant_id, project_key=projectKey)
"data": projects.get_project_by_key(tenant_id=context.tenant_id, project_key=projectKey)
}
@ -125,5 +110,5 @@ async def create_project(data: schemas.CreateProjectSchema = Body(...),
data=data,
skip_authorization=True
)
del record['data']['projectId']
del record["data"]['projectId']
return record

View file

@ -1396,31 +1396,6 @@ def get_session_user(project_id, user_id):
return helper.dict_to_camel_case(data)
def get_session_ids_by_user_ids(project_id, user_ids):
with pg_client.PostgresClient() as cur:
query = cur.mogrify(
"""SELECT session_id
FROM public.sessions
WHERE project_id = %(project_id)s
AND user_id IN %(userId)s;""",
{"project_id": project_id, "userId": tuple(user_ids)})
cur.execute(query=query)
ids = cur.fetchall()
return [s["session_id"] for s in ids]
def delete_sessions_by_session_ids(session_ids):
with pg_client.PostgresClient(unlimited_query=True) as cur:
query = cur.mogrify(
"""DELETE FROM public.sessions
WHERE session_id IN %(session_ids)s;""",
{"session_ids": tuple(session_ids)}
)
cur.execute(query=query)
return True
def count_all():
with ch_client.ClickHouseClient() as cur:
row = cur.execute(query=f"SELECT COUNT(session_id) AS count FROM {exp_ch_helper.get_main_sessions_table()}")

View file

@ -74,4 +74,4 @@ ASSIST_JWT_EXPIRATION=144000
ASSIST_JWT_SECRET=
KAFKA_SERVERS=kafka.db.svc.cluster.local:9092
KAFKA_USE_SSL=false
SCH_DELETE_DAYS=7
SCH_DELETE_DAYS=30

View file

@ -1,5 +1,4 @@
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from decouple import config
from chalicelib.core import jobs
@ -29,13 +28,14 @@ cron_jobs = [
{"func": unlock_cron, "trigger": CronTrigger(day="*")},
]
SINGLE_CRONS = [{"func": telemetry_cron, "trigger": CronTrigger(day_of_week="*"),
"misfire_grace_time": 60 * 60, "max_instances": 1},
{"func": run_scheduled_jobs, "trigger": IntervalTrigger(minutes=60),
"misfire_grace_time": 20, "max_instances": 1},
{"func": weekly_report, "trigger": CronTrigger(day_of_week="mon", hour=5),
"misfire_grace_time": 60 * 60, "max_instances": 1}
]
SINGLE_CRONS = [
{"func": telemetry_cron, "trigger": CronTrigger(day_of_week="*"),
"misfire_grace_time": 60 * 60, "max_instances": 1},
{"func": run_scheduled_jobs, "trigger": CronTrigger(day_of_week="*", hour=0, minute=15),
"misfire_grace_time": 20, "max_instances": 1},
{"func": weekly_report, "trigger": CronTrigger(day_of_week="mon", hour=5),
"misfire_grace_time": 60 * 60, "max_instances": 1}
]
if config("LOCAL_CRONS", default=False, cast=bool):
cron_jobs += SINGLE_CRONS