Crons v1.16.0 (#1776)

* refactor(chalice): moved db_request_handler to utils package

* refactor(chalice): moved db_request_handler to utils package
fix(chalice): supported usability tests in EE

* refactor(crons): changed assist_events_aggregates_cron to have only 1 execution every hour
refactor(crons): optimized assist_events_aggregates_cron to use only 1 DB cursor for successive queries
This commit is contained in:
Kraiem Taha Yassine 2023-12-13 18:06:33 +01:00 committed by GitHub
parent 34729e87ff
commit b2ac6ba0f8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 19 additions and 11 deletions

View file

@ -52,7 +52,7 @@ async def lifespan(app: FastAPI):
await events_queue.init()
app.schedule.start()
for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs + traces.cron_jobs + ee_crons.ee_cron_jobs:
for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs + traces.cron_jobs + ee_crons.cron_jobs:
app.schedule.add_job(id=job["func"].__name__, **job)
ap_logger.info(">Scheduled jobs:")

View file

@ -1,11 +1,12 @@
import logging
from datetime import datetime
from fastapi import HTTPException
from chalicelib.utils import pg_client, helper
from chalicelib.utils.TimeUTC import TimeUTC
from schemas import AssistStatsSessionsRequest, AssistStatsSessionsResponse, AssistStatsTopMembersResponse
logger = logging.getLogger(__name__)
event_type_mapping = {
"sessionsAssisted": "assist",
"assistDuration": "assist",
@ -17,12 +18,12 @@ event_type_mapping = {
def insert_aggregated_data():
try:
logging.info("Assist Stats: Inserting aggregated data")
end_timestamp = int(datetime.timestamp(datetime.now())) * 1000
end_timestamp = TimeUTC.now()
start_timestamp = __last_run_end_timestamp_from_aggregates()
if start_timestamp is None: # first run
logging.info("Assist Stats: First run, inserting data for last 7 days")
start_timestamp = end_timestamp - (7 * 24 * 60 * 60 * 1000)
start_timestamp = end_timestamp - TimeUTC.MS_WEEK
offset = 0
chunk_size = 1000
@ -103,9 +104,8 @@ def __last_run_end_timestamp_from_aggregates():
result = cur.fetchone()
last_run_time = result['last_run_time'] if result else None
if last_run_time is None: # first run handle all data
sql = "SELECT MIN(timestamp) as last_timestamp FROM assist_events;"
with pg_client.PostgresClient() as cur:
if last_run_time is None: # first run handle all data
sql = "SELECT MIN(timestamp) as last_timestamp FROM assist_events;"
cur.execute(sql)
result = cur.fetchone()
last_run_time = result['last_timestamp'] if result else None

View file

@ -3,6 +3,8 @@ from apscheduler.triggers.interval import IntervalTrigger
from chalicelib.utils import events_queue
from chalicelib.core import assist_stats
from decouple import config
async def pg_events_queue() -> None:
events_queue.global_queue.force_flush()
@ -12,8 +14,14 @@ async def assist_events_aggregates_cron() -> None:
assist_stats.insert_aggregated_data()
ee_cron_jobs = [
{"func": pg_events_queue, "trigger": IntervalTrigger(minutes=5), "misfire_grace_time": 20, "max_instances": 1},
{"func": assist_events_aggregates_cron,
"trigger": IntervalTrigger(hours=1, start_date="2023-04-01 0:0:0", jitter=10), }
# SINGLE_CRONS are crons that will be run the crons-service, they are a singleton crons
SINGLE_CRONS = [{"func": assist_events_aggregates_cron,
"trigger": IntervalTrigger(hours=1, start_date="2023-04-01 0:0:0", jitter=10)}]
# cron_jobs is the list of crons to run in main API service (so you will have as many runs as the number of instances of the API)
cron_jobs = [
{"func": pg_events_queue, "trigger": IntervalTrigger(minutes=5), "misfire_grace_time": 20, "max_instances": 1}
]
if config("LOCAL_CRONS", default=False, cast=bool):
cron_jobs += SINGLE_CRONS