From 585d8930630e86721c195976609e5a137910a1e6 Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Wed, 12 Apr 2023 15:23:50 +0100 Subject: [PATCH 1/5] feat(chalice): refactored Jobs feat(chalice): added limits on Jobs --- api/chalicelib/core/jobs.py | 34 +++++++++++++++++++++++--- api/chalicelib/core/sessions.py | 25 ------------------- api/chalicelib/utils/s3.py | 2 +- api/env.default | 2 +- ee/api/chalicelib/core/sessions_exp.py | 25 ------------------- ee/api/env.default | 2 +- 6 files changed, 33 insertions(+), 57 deletions(-) diff --git a/api/chalicelib/core/jobs.py b/api/chalicelib/core/jobs.py index bc5ce81ab..8ca556fdf 100644 --- a/api/chalicelib/core/jobs.py +++ b/api/chalicelib/core/jobs.py @@ -1,6 +1,6 @@ from chalicelib.utils import pg_client, helper from chalicelib.utils.TimeUTC import TimeUTC -from chalicelib.core import sessions, sessions_mobs, sessions_devtool +from chalicelib.core import sessions_mobs, sessions_devtool class Actions: @@ -103,6 +103,32 @@ def format_datetime(r): r["start_at"] = TimeUTC.datetime_to_timestamp(r["start_at"]) +def __get_session_ids_by_user_ids(project_id, user_ids): + with pg_client.PostgresClient() as cur: + query = cur.mogrify( + """SELECT session_id + FROM public.sessions + WHERE project_id = %(project_id)s + AND user_id IN %(userId)s;""", + {"project_id": project_id, "userId": tuple(user_ids)}) + cur.execute(query=query) + ids = cur.fetchall() + return [s["session_id"] for s in ids] + + +def __delete_sessions_by_session_ids(session_ids): + with pg_client.PostgresClient(unlimited_query=True) as cur: + query = cur.mogrify( + """DELETE FROM public.sessions + WHERE session_id IN %(session_ids)s + LIMIT 1000;""", + {"session_ids": tuple(session_ids)} + ) + cur.execute(query=query) + + return True + + def get_scheduled_jobs(): with pg_client.PostgresClient() as cur: query = cur.mogrify( @@ -122,11 +148,11 @@ def execute_jobs(): print(f"Executing jobId:{job['jobId']}") try: if job["action"] == Actions.DELETE_USER_DATA: - session_ids = sessions.get_session_ids_by_user_ids(project_id=job["projectId"], - user_ids=[job["referenceId"]]) + session_ids = __get_session_ids_by_user_ids(project_id=job["projectId"], + user_ids=[job["referenceId"]]) if len(session_ids) > 0: print(f"Deleting {len(session_ids)} sessions") - sessions.delete_sessions_by_session_ids(session_ids) + __delete_sessions_by_session_ids(session_ids) sessions_mobs.delete_mobs(session_ids=session_ids, project_id=job["projectId"]) sessions_devtool.delete_mobs(session_ids=session_ids, project_id=job["projectId"]) else: diff --git a/api/chalicelib/core/sessions.py b/api/chalicelib/core/sessions.py index 7de8253da..e7283755e 100644 --- a/api/chalicelib/core/sessions.py +++ b/api/chalicelib/core/sessions.py @@ -1065,31 +1065,6 @@ def get_session_user(project_id, user_id): return helper.dict_to_camel_case(data) -def get_session_ids_by_user_ids(project_id, user_ids): - with pg_client.PostgresClient() as cur: - query = cur.mogrify( - """SELECT session_id - FROM public.sessions - WHERE project_id = %(project_id)s - AND user_id IN %(userId)s;""", - {"project_id": project_id, "userId": tuple(user_ids)}) - cur.execute(query=query) - ids = cur.fetchall() - return [s["session_id"] for s in ids] - - -def delete_sessions_by_session_ids(session_ids): - with pg_client.PostgresClient(unlimited_query=True) as cur: - query = cur.mogrify( - """DELETE FROM public.sessions - WHERE session_id IN %(session_ids)s;""", - {"session_ids": tuple(session_ids)} - ) - cur.execute(query=query) - - return True - - def count_all(): with pg_client.PostgresClient(unlimited_query=True) as cur: cur.execute(query="SELECT COUNT(session_id) AS count FROM public.sessions") diff --git a/api/chalicelib/utils/s3.py b/api/chalicelib/utils/s3.py index cdd22aa4e..6eeacd261 100644 --- a/api/chalicelib/utils/s3.py +++ b/api/chalicelib/utils/s3.py @@ -115,7 +115,7 @@ def schedule_for_deletion(bucket, key): s3 = __get_s3_resource() s3_object = s3.Object(bucket, key) s3_object.copy_from(CopySource={'Bucket': bucket, 'Key': key}, - Expires=datetime.utcnow() + timedelta(days=config("SCH_DELETE_DAYS", cast=int, default=7)), + Expires=datetime.utcnow() + timedelta(days=config("SCH_DELETE_DAYS", cast=int, default=30)), MetadataDirective='REPLACE') return True diff --git a/api/env.default b/api/env.default index e0560619f..7d5266470 100644 --- a/api/env.default +++ b/api/env.default @@ -54,4 +54,4 @@ ASSIST_JWT_EXPIRATION=144000 ASSIST_JWT_SECRET= PYTHONUNBUFFERED=1 REDIS_STRING=redis://redis-master.db.svc.cluster.local:6379 -SCH_DELETE_DAYS=7 \ No newline at end of file +SCH_DELETE_DAYS=30 \ No newline at end of file diff --git a/ee/api/chalicelib/core/sessions_exp.py b/ee/api/chalicelib/core/sessions_exp.py index 1c4d20883..b7539c3c2 100644 --- a/ee/api/chalicelib/core/sessions_exp.py +++ b/ee/api/chalicelib/core/sessions_exp.py @@ -1396,31 +1396,6 @@ def get_session_user(project_id, user_id): return helper.dict_to_camel_case(data) -def get_session_ids_by_user_ids(project_id, user_ids): - with pg_client.PostgresClient() as cur: - query = cur.mogrify( - """SELECT session_id - FROM public.sessions - WHERE project_id = %(project_id)s - AND user_id IN %(userId)s;""", - {"project_id": project_id, "userId": tuple(user_ids)}) - cur.execute(query=query) - ids = cur.fetchall() - return [s["session_id"] for s in ids] - - -def delete_sessions_by_session_ids(session_ids): - with pg_client.PostgresClient(unlimited_query=True) as cur: - query = cur.mogrify( - """DELETE FROM public.sessions - WHERE session_id IN %(session_ids)s;""", - {"session_ids": tuple(session_ids)} - ) - cur.execute(query=query) - - return True - - def count_all(): with ch_client.ClickHouseClient() as cur: row = cur.execute(query=f"SELECT COUNT(session_id) AS count FROM {exp_ch_helper.get_main_sessions_table()}") diff --git a/ee/api/env.default b/ee/api/env.default index 603c291b0..a35f6f451 100644 --- a/ee/api/env.default +++ b/ee/api/env.default @@ -74,4 +74,4 @@ ASSIST_JWT_EXPIRATION=144000 ASSIST_JWT_SECRET= KAFKA_SERVERS=kafka.db.svc.cluster.local:9092 KAFKA_USE_SSL=false -SCH_DELETE_DAYS=7 \ No newline at end of file +SCH_DELETE_DAYS=30 \ No newline at end of file From 92f4ffa1fbaef24fd3505dba1cb20a95f2b3c948 Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Wed, 12 Apr 2023 15:24:42 +0100 Subject: [PATCH 2/5] chore(build): test patch branch --- .github/workflows/api-ee.yaml | 1 + .github/workflows/api.yaml | 1 + 2 files changed, 2 insertions(+) diff --git a/.github/workflows/api-ee.yaml b/.github/workflows/api-ee.yaml index f9a1730f1..ed81c8972 100644 --- a/.github/workflows/api-ee.yaml +++ b/.github/workflows/api-ee.yaml @@ -10,6 +10,7 @@ on: branches: - dev - api-* + - v1.11.0-patch paths: - "ee/api/**" - "api/**" diff --git a/.github/workflows/api.yaml b/.github/workflows/api.yaml index 8e2f7fa7b..451ae64b5 100644 --- a/.github/workflows/api.yaml +++ b/.github/workflows/api.yaml @@ -10,6 +10,7 @@ on: branches: - dev - api-* + - v1.11.0-patch paths: - "api/**" - "!api/.gitignore" From 86017ec2cff814d5d7be15e778cbcabc8e81787a Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Wed, 12 Apr 2023 15:59:17 +0100 Subject: [PATCH 3/5] feat(chalice): fixing Jobs --- api/chalicelib/core/jobs.py | 21 ++++++++++----------- api/routers/subs/v1_api.py | 37 +++++++++++-------------------------- 2 files changed, 21 insertions(+), 37 deletions(-) diff --git a/api/chalicelib/core/jobs.py b/api/chalicelib/core/jobs.py index 8ca556fdf..3c812ad33 100644 --- a/api/chalicelib/core/jobs.py +++ b/api/chalicelib/core/jobs.py @@ -47,13 +47,14 @@ def get_all(project_id): return helper.list_to_camel_case(data) -def create(project_id, data): +def create(project_id, user_id): with pg_client.PostgresClient() as cur: - job = { - "status": "scheduled", - "project_id": project_id, - **data - } + job = {"status": "scheduled", + "project_id": project_id, + "action": Actions.DELETE_USER_DATA, + "reference_id": user_id, + "description": f"Delete user sessions of userId = {user_id}", + "start_at": TimeUTC.to_human_readable(TimeUTC.midnight(1))} query = cur.mogrify( """INSERT INTO public.jobs(project_id, description, status, action,reference_id, start_at) @@ -109,7 +110,8 @@ def __get_session_ids_by_user_ids(project_id, user_ids): """SELECT session_id FROM public.sessions WHERE project_id = %(project_id)s - AND user_id IN %(userId)s;""", + AND user_id IN %(userId)s + LIMIT 1000;""", {"project_id": project_id, "userId": tuple(user_ids)}) cur.execute(query=query) ids = cur.fetchall() @@ -120,14 +122,11 @@ def __delete_sessions_by_session_ids(session_ids): with pg_client.PostgresClient(unlimited_query=True) as cur: query = cur.mogrify( """DELETE FROM public.sessions - WHERE session_id IN %(session_ids)s - LIMIT 1000;""", + WHERE session_id IN %(session_ids)s""", {"session_ids": tuple(session_ids)} ) cur.execute(query=query) - return True - def get_scheduled_jobs(): with pg_client.PostgresClient() as cur: diff --git a/api/routers/subs/v1_api.py b/api/routers/subs/v1_api.py index 0759d0c31..ca98aaf5f 100644 --- a/api/routers/subs/v1_api.py +++ b/api/routers/subs/v1_api.py @@ -2,7 +2,6 @@ from fastapi import Depends, Body import schemas from chalicelib.core import sessions, events, jobs, projects -from chalicelib.utils.TimeUTC import TimeUTC from or_dependencies import OR_context from routers.base import get_routers @@ -15,7 +14,7 @@ async def get_user_sessions(projectKey: str, userId: str, start_date: int = None if projectId is None: return {"errors": ["invalid projectKey"]} return { - 'data': sessions.get_user_sessions( + "data": sessions.get_user_sessions( project_id=projectId, user_id=userId, start_date=start_date, @@ -30,7 +29,7 @@ async def get_session_events(projectKey: str, sessionId: int): if projectId is None: return {"errors": ["invalid projectKey"]} return { - 'data': events.get_by_session_id( + "data": events.get_by_session_id( project_id=projectId, session_id=sessionId ) @@ -43,7 +42,7 @@ async def get_user_details(projectKey: str, userId: str): if projectId is None: return {"errors": ["invalid projectKey"]} return { - 'data': sessions.get_session_user( + "data": sessions.get_session_user( project_id=projectId, user_id=userId ) @@ -55,14 +54,8 @@ async def schedule_to_delete_user_data(projectKey: str, userId: str): projectId = projects.get_internal_project_id(projectKey) if projectId is None: return {"errors": ["invalid projectKey"]} - data = {"action": "delete_user_data", - "reference_id": userId, - "description": f"Delete user sessions of userId = {userId}", - "start_at": TimeUTC.to_human_readable(TimeUTC.midnight(1))} - record = jobs.create(project_id=projectId, data=data) - return { - 'data': record - } + record = jobs.create(project_id=projectId, user_id=userId) + return {"data": record} @app_apikey.get('/v1/{projectKey}/jobs', tags=["api"]) @@ -70,16 +63,12 @@ async def get_jobs(projectKey: str): projectId = projects.get_internal_project_id(projectKey) if projectId is None: return {"errors": ["invalid projectKey"]} - return { - 'data': jobs.get_all(project_id=projectId) - } + return {"data": jobs.get_all(project_id=projectId)} @app_apikey.get('/v1/{projectKey}/jobs/{jobId}', tags=["api"]) async def get_job(projectKey: str, jobId: int): - return { - 'data': jobs.get(job_id=jobId) - } + return {"data": jobs.get(job_id=jobId)} @app_apikey.delete('/v1/{projectKey}/jobs/{jobId}', tags=["api"]) @@ -93,9 +82,7 @@ async def cancel_job(projectKey: str, jobId: int): return {"errors": ["The request job has already been canceled/completed."]} job["status"] = "cancelled" - return { - 'data': jobs.update(job_id=jobId, job=job) - } + return {"data": jobs.update(job_id=jobId, job=job)} @app_apikey.get('/v1/projects', tags=["api"]) @@ -104,15 +91,13 @@ async def get_projects(context: schemas.CurrentContext = Depends(OR_context)): for record in records: del record['projectId'] - return { - 'data': records - } + return {"data": records} @app_apikey.get('/v1/projects/{projectKey}', tags=["api"]) async def get_project(projectKey: str, context: schemas.CurrentContext = Depends(OR_context)): return { - 'data': projects.get_project_by_key(tenant_id=context.tenant_id, project_key=projectKey) + "data": projects.get_project_by_key(tenant_id=context.tenant_id, project_key=projectKey) } @@ -125,5 +110,5 @@ async def create_project(data: schemas.CreateProjectSchema = Body(...), data=data, skip_authorization=True ) - del record['data']['projectId'] + del record["data"]['projectId'] return record From b67300b462a2d3cfe0fca6dfd4285d1dddb764c9 Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Wed, 12 Apr 2023 16:19:33 +0100 Subject: [PATCH 4/5] feat(chalice): changed corn-Job execution time --- api/routers/crons/core_dynamic_crons.py | 3 +-- ee/api/routers/crons/core_dynamic_crons.py | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/api/routers/crons/core_dynamic_crons.py b/api/routers/crons/core_dynamic_crons.py index a79ce705d..0fdeb242a 100644 --- a/api/routers/crons/core_dynamic_crons.py +++ b/api/routers/crons/core_dynamic_crons.py @@ -1,5 +1,4 @@ from apscheduler.triggers.cron import CronTrigger -from apscheduler.triggers.interval import IntervalTrigger from chalicelib.core import telemetry from chalicelib.core import weekly_report, jobs @@ -20,7 +19,7 @@ async def telemetry_cron() -> None: cron_jobs = [ {"func": telemetry_cron, "trigger": CronTrigger(day_of_week="*"), "misfire_grace_time": 60 * 60, "max_instances": 1}, - {"func": run_scheduled_jobs, "trigger": IntervalTrigger(minutes=1), + {"func": run_scheduled_jobs, "trigger": CronTrigger(day_of_week="*", hour=0, minute=15), "misfire_grace_time": 20, "max_instances": 1}, {"func": weekly_report2, "trigger": CronTrigger(day_of_week="mon", hour=5), "misfire_grace_time": 60 * 60, "max_instances": 1} diff --git a/ee/api/routers/crons/core_dynamic_crons.py b/ee/api/routers/crons/core_dynamic_crons.py index 5d13c90d1..9febbe18a 100644 --- a/ee/api/routers/crons/core_dynamic_crons.py +++ b/ee/api/routers/crons/core_dynamic_crons.py @@ -1,5 +1,4 @@ from apscheduler.triggers.cron import CronTrigger -from apscheduler.triggers.interval import IntervalTrigger from decouple import config from chalicelib.core import jobs @@ -31,7 +30,7 @@ cron_jobs = [ SINGLE_CRONS = [{"func": telemetry_cron, "trigger": CronTrigger(day_of_week="*"), "misfire_grace_time": 60 * 60, "max_instances": 1}, - {"func": run_scheduled_jobs, "trigger": IntervalTrigger(minutes=60), + {"func": run_scheduled_jobs, "trigger": CronTrigger(day_of_week="*", hour=0, minute=15), "misfire_grace_time": 20, "max_instances": 1}, {"func": weekly_report, "trigger": CronTrigger(day_of_week="mon", hour=5), "misfire_grace_time": 60 * 60, "max_instances": 1} From a229f915010e40b7849afec5d541b113fc774237 Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Wed, 12 Apr 2023 16:36:51 +0100 Subject: [PATCH 5/5] chore(build): testing EE cron-Jobs --- .github/workflows/crons-ee.yaml | 1 + ee/api/routers/crons/core_dynamic_crons.py | 15 ++++++++------- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/.github/workflows/crons-ee.yaml b/.github/workflows/crons-ee.yaml index 77c098e4e..b357fea65 100644 --- a/.github/workflows/crons-ee.yaml +++ b/.github/workflows/crons-ee.yaml @@ -10,6 +10,7 @@ on: branches: - dev - api-* + - v1.11.0-patch paths: - "ee/api/**" - "api/**" diff --git a/ee/api/routers/crons/core_dynamic_crons.py b/ee/api/routers/crons/core_dynamic_crons.py index 9febbe18a..89846366b 100644 --- a/ee/api/routers/crons/core_dynamic_crons.py +++ b/ee/api/routers/crons/core_dynamic_crons.py @@ -28,13 +28,14 @@ cron_jobs = [ {"func": unlock_cron, "trigger": CronTrigger(day="*")}, ] -SINGLE_CRONS = [{"func": telemetry_cron, "trigger": CronTrigger(day_of_week="*"), - "misfire_grace_time": 60 * 60, "max_instances": 1}, - {"func": run_scheduled_jobs, "trigger": CronTrigger(day_of_week="*", hour=0, minute=15), - "misfire_grace_time": 20, "max_instances": 1}, - {"func": weekly_report, "trigger": CronTrigger(day_of_week="mon", hour=5), - "misfire_grace_time": 60 * 60, "max_instances": 1} - ] +SINGLE_CRONS = [ + {"func": telemetry_cron, "trigger": CronTrigger(day_of_week="*"), + "misfire_grace_time": 60 * 60, "max_instances": 1}, + {"func": run_scheduled_jobs, "trigger": CronTrigger(day_of_week="*", hour=0, minute=15), + "misfire_grace_time": 20, "max_instances": 1}, + {"func": weekly_report, "trigger": CronTrigger(day_of_week="mon", hour=5), + "misfire_grace_time": 60 * 60, "max_instances": 1} +] if config("LOCAL_CRONS", default=False, cast=bool): cron_jobs += SINGLE_CRONS