feat(chalice): changed projects stats for health-check
feat(crons): cahnged projects stats for health-check chore(helm): projectStats cron every 18 min chore(helm): projectStats-fix cron every Sunday at 5am
This commit is contained in:
parent
be4161887b
commit
2b4e5a5121
7 changed files with 174 additions and 30 deletions
|
|
@ -161,9 +161,12 @@ def __check_SSL(*_):
|
|||
|
||||
def __get_sessions_stats(*_):
|
||||
with pg_client.PostgresClient() as cur:
|
||||
query = cur.mogrify("""SELECT COALESCE(SUM(sessions_count),0) AS s_c,
|
||||
COALESCE(SUM(events_count),0) AS e_c
|
||||
FROM public.projects_stats;""")
|
||||
constraints = ["projects.deteled_at IS NULL"]
|
||||
query = cur.mogrify(f"""SELECT COALESCE(SUM(sessions_count),0) AS s_c,
|
||||
COALESCE(SUM(events_count),0) AS e_c
|
||||
FROM public.projects_stats
|
||||
INNER JOIN public.projects USING(project_id)
|
||||
WHERE {" AND ".join(constraints)};""")
|
||||
cur.execute(query)
|
||||
row = cur.fetchone()
|
||||
return {
|
||||
|
|
|
|||
|
|
@ -1,20 +1,3 @@
|
|||
from apscheduler.triggers.cron import CronTrigger
|
||||
from apscheduler.triggers.interval import IntervalTrigger
|
||||
|
||||
from chalicelib.core import health
|
||||
|
||||
|
||||
async def health_cron() -> None:
|
||||
health.cron()
|
||||
|
||||
|
||||
async def weekly_health_cron() -> None:
|
||||
health.weekly_cron()
|
||||
|
||||
|
||||
cron_jobs = [
|
||||
{"func": health_cron, "trigger": IntervalTrigger(hours=0, minutes=30, start_date="2023-01-01 0:0:0", jitter=300),
|
||||
"misfire_grace_time": 60 * 60, "max_instances": 1},
|
||||
{"func": weekly_health_cron, "trigger": CronTrigger(day_of_week="sun", hour=5),
|
||||
"misfire_grace_time": 60 * 60, "max_instances": 1}
|
||||
|
||||
]
|
||||
|
|
|
|||
|
|
@ -1,7 +1,8 @@
|
|||
from apscheduler.triggers.cron import CronTrigger
|
||||
from apscheduler.triggers.interval import IntervalTrigger
|
||||
|
||||
from chalicelib.core import telemetry
|
||||
from chalicelib.core import weekly_report, jobs
|
||||
from chalicelib.core import weekly_report, jobs, health
|
||||
|
||||
|
||||
async def run_scheduled_jobs() -> None:
|
||||
|
|
@ -16,11 +17,23 @@ async def telemetry_cron() -> None:
|
|||
telemetry.compute()
|
||||
|
||||
|
||||
async def health_cron() -> None:
|
||||
health.cron()
|
||||
|
||||
|
||||
async def weekly_health_cron() -> None:
|
||||
health.weekly_cron()
|
||||
|
||||
|
||||
cron_jobs = [
|
||||
{"func": telemetry_cron, "trigger": CronTrigger(day_of_week="*"),
|
||||
"misfire_grace_time": 60 * 60, "max_instances": 1},
|
||||
{"func": run_scheduled_jobs, "trigger": CronTrigger(day_of_week="*", hour=0, minute=15),
|
||||
"misfire_grace_time": 20, "max_instances": 1},
|
||||
{"func": weekly_report_cron, "trigger": CronTrigger(day_of_week="mon", hour=5),
|
||||
"misfire_grace_time": 60 * 60, "max_instances": 1},
|
||||
{"func": health_cron, "trigger": IntervalTrigger(hours=0, minutes=30, start_date="2023-04-01 0:0:0", jitter=300),
|
||||
"misfire_grace_time": 60 * 60, "max_instances": 1},
|
||||
{"func": weekly_health_cron, "trigger": CronTrigger(day_of_week="sun", hour=5),
|
||||
"misfire_grace_time": 60 * 60, "max_instances": 1}
|
||||
]
|
||||
|
|
|
|||
|
|
@ -7,7 +7,9 @@ from routers.crons import core_dynamic_crons
|
|||
ACTIONS = {
|
||||
"TELEMETRY": core_dynamic_crons.telemetry_cron,
|
||||
"JOB": core_dynamic_crons.run_scheduled_jobs,
|
||||
"REPORT": core_dynamic_crons.weekly_report
|
||||
"REPORT": core_dynamic_crons.weekly_report,
|
||||
"PROJECTS_STATS": core_dynamic_crons.health_cron,
|
||||
"FIX_PROJECTS_STATS": core_dynamic_crons.weekly_health_cron
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import requests
|
|||
from decouple import config
|
||||
|
||||
from chalicelib.utils import pg_client, ch_client
|
||||
from chalicelib.utils.TimeUTC import TimeUTC
|
||||
|
||||
|
||||
def app_connection_string(name, port, path):
|
||||
|
|
@ -161,13 +162,14 @@ def __check_SSL(*_):
|
|||
|
||||
def __get_sessions_stats(tenant_id, *_):
|
||||
with pg_client.PostgresClient() as cur:
|
||||
extra_query = ""
|
||||
constraints = ["projects.deteled_at IS NULL"]
|
||||
if tenant_id:
|
||||
extra_query = """INNER JOIN public.projects USING(project_id)
|
||||
WHERE tenant_id=%(tenant_id)s"""
|
||||
constraints.append("tenant_id=%(tenant_id)s")
|
||||
query = cur.mogrify(f"""SELECT COALESCE(SUM(sessions_count),0) AS s_c,
|
||||
COALESCE(SUM(events_count),0) AS e_c
|
||||
FROM public.sessions_count {extra_query};""",
|
||||
FROM public.projects_stats
|
||||
INNER JOIN public.projects USING(project_id)
|
||||
WHERE {" AND ".join(constraints)};""",
|
||||
{"tenant_id": tenant_id})
|
||||
cur.execute(query)
|
||||
row = cur.fetchone()
|
||||
|
|
@ -219,6 +221,112 @@ def get_health(tenant_id=None):
|
|||
return health_map
|
||||
|
||||
|
||||
def cron():
|
||||
with pg_client.PostgresClient() as cur:
|
||||
query = cur.mogrify("""SELECT projects.project_id,
|
||||
projects.created_at,
|
||||
projects.sessions_last_check_at,
|
||||
projects.first_recorded_session_at,
|
||||
projects_stats.last_update_at
|
||||
FROM public.projects
|
||||
LEFT JOIN public.projects_stats USING (project_id)
|
||||
WHERE projects.deleted_at IS NULL
|
||||
ORDER BY project_id;""")
|
||||
cur.execute(query)
|
||||
rows = cur.fetchall()
|
||||
for r in rows:
|
||||
insert = False
|
||||
if r["last_update_at"] is None:
|
||||
# never counted before, must insert
|
||||
insert = True
|
||||
if r["first_recorded_session_at"] is None:
|
||||
if r["sessions_last_check_at"] is None:
|
||||
count_start_from = r["created_at"]
|
||||
else:
|
||||
count_start_from = r["sessions_last_check_at"]
|
||||
else:
|
||||
count_start_from = r["first_recorded_session_at"]
|
||||
|
||||
else:
|
||||
# counted before, must update
|
||||
count_start_from = r["last_update_at"]
|
||||
|
||||
count_start_from = TimeUTC.datetime_to_timestamp(count_start_from)
|
||||
params = {"project_id": r["project_id"],
|
||||
"start_ts": count_start_from,
|
||||
"end_ts": TimeUTC.now(),
|
||||
"sessions_count": 0,
|
||||
"events_count": 0}
|
||||
|
||||
query = cur.mogrify("""SELECT COUNT(1) AS sessions_count,
|
||||
COALESCE(SUM(events_count),0) AS events_count
|
||||
FROM public.sessions
|
||||
WHERE project_id=%(project_id)s
|
||||
AND start_ts>=%(start_ts)s
|
||||
AND start_ts<=%(end_ts)s
|
||||
AND duration IS NOT NULL;""",
|
||||
params)
|
||||
cur.execute(query)
|
||||
row = cur.fetchone()
|
||||
if row is not None:
|
||||
params["sessions_count"] = row["sessions_count"]
|
||||
params["events_count"] = row["events_count"]
|
||||
|
||||
if insert:
|
||||
query = cur.mogrify("""INSERT INTO public.projects_stats(project_id, sessions_count, events_count, last_update_at)
|
||||
VALUES (%(project_id)s, %(sessions_count)s, %(events_count)s, (now() AT TIME ZONE 'utc'::text));""",
|
||||
params)
|
||||
else:
|
||||
query = cur.mogrify("""UPDATE public.projects_stats
|
||||
SET sessions_count=sessions_count+%(sessions_count)s,
|
||||
events_count=events_count+%(events_count)s,
|
||||
last_update_at=(now() AT TIME ZONE 'utc'::text)
|
||||
WHERE project_id=%(project_id)s;""",
|
||||
params)
|
||||
cur.execute(query)
|
||||
|
||||
|
||||
# this cron is used to correct the sessions&events count every week
|
||||
def weekly_cron():
|
||||
with pg_client.PostgresClient(long_query=True) as cur:
|
||||
query = cur.mogrify("""SELECT project_id
|
||||
projects_stats.last_update_at
|
||||
FROM public.projects
|
||||
LEFT JOIN public.projects_stats USING (project_id)
|
||||
WHERE projects.deleted_at IS NULL
|
||||
ORDER BY project_id;""")
|
||||
cur.execute(query)
|
||||
rows = cur.fetchall()
|
||||
for r in rows:
|
||||
if r["last_update_at"] is None:
|
||||
continue
|
||||
|
||||
params = {"project_id": r["project_id"],
|
||||
"end_ts": TimeUTC.now(),
|
||||
"sessions_count": 0,
|
||||
"events_count": 0}
|
||||
|
||||
query = cur.mogrify("""SELECT COUNT(1) AS sessions_count,
|
||||
SUM(events_count) AS events_count
|
||||
FROM public.sessions
|
||||
WHERE project_id=%(project_id)s
|
||||
AND start_ts<=%(end_ts)s
|
||||
AND duration IS NOT NULL;""",
|
||||
params)
|
||||
cur.execute(query)
|
||||
row = cur.fetchone()
|
||||
if row is not None:
|
||||
params["sessions_count"] = row["sessions_count"]
|
||||
params["events_count"] = row["events_count"]
|
||||
|
||||
query = cur.mogrify("""UPDATE public.projects_stats
|
||||
SET sessions_count=%(sessions_count)s,
|
||||
events_count=%(events_count)s,
|
||||
last_update_at=(now() AT TIME ZONE 'utc'::text)
|
||||
WHERE project_id=%(project_id)s;""",
|
||||
params)
|
||||
cur.execute(query)
|
||||
|
||||
def __check_database_ch(*_):
|
||||
fail_response = {
|
||||
"health": False,
|
||||
|
|
|
|||
|
|
@ -1,9 +1,10 @@
|
|||
from apscheduler.triggers.cron import CronTrigger
|
||||
from apscheduler.triggers.interval import IntervalTrigger
|
||||
from decouple import config
|
||||
|
||||
from chalicelib.core import jobs
|
||||
from chalicelib.core import telemetry, unlock
|
||||
from chalicelib.core import weekly_report as weekly_report_script
|
||||
from chalicelib.core import weekly_report as weekly_report_script, health
|
||||
|
||||
|
||||
async def run_scheduled_jobs() -> None:
|
||||
|
|
@ -18,12 +19,20 @@ async def telemetry_cron() -> None:
|
|||
telemetry.compute()
|
||||
|
||||
|
||||
def unlock_cron() -> None:
|
||||
async def unlock_cron() -> None:
|
||||
print("validating license")
|
||||
unlock.check()
|
||||
print(f"valid: {unlock.is_valid()}")
|
||||
|
||||
|
||||
async def health_cron() -> None:
|
||||
health.cron()
|
||||
|
||||
|
||||
async def weekly_health_cron() -> None:
|
||||
health.weekly_cron()
|
||||
|
||||
|
||||
cron_jobs = [
|
||||
{"func": unlock_cron, "trigger": CronTrigger(day="*")},
|
||||
]
|
||||
|
|
@ -34,6 +43,10 @@ SINGLE_CRONS = [
|
|||
{"func": run_scheduled_jobs, "trigger": CronTrigger(day_of_week="*", hour=0, minute=15),
|
||||
"misfire_grace_time": 20, "max_instances": 1},
|
||||
{"func": weekly_report, "trigger": CronTrigger(day_of_week="mon", hour=5),
|
||||
"misfire_grace_time": 60 * 60, "max_instances": 1},
|
||||
{"func": health_cron, "trigger": IntervalTrigger(hours=0, minutes=30, start_date="2023-04-01 0:0:0", jitter=300),
|
||||
"misfire_grace_time": 60 * 60, "max_instances": 1},
|
||||
{"func": weekly_health_cron, "trigger": CronTrigger(day_of_week="sun", hour=5),
|
||||
"misfire_grace_time": 60 * 60, "max_instances": 1}
|
||||
]
|
||||
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ telemetry:
|
|||
env:
|
||||
ACTION: "TELEMETRY"
|
||||
report:
|
||||
# Ref: https://crontab.guru/#0_5_*_*_1
|
||||
# https://crontab.guru/#0_5_*_*_1
|
||||
# Monday morning 5am
|
||||
cron: "0 5 * * 1"
|
||||
image:
|
||||
|
|
@ -47,6 +47,28 @@ sessionsCleaner:
|
|||
tag: ""
|
||||
env:
|
||||
ACTION: "JOB"
|
||||
projectsStats:
|
||||
# https://crontab.guru/#*/18_*_*_*_*
|
||||
# Every 18 min
|
||||
cron: "*/18 * * * *"
|
||||
image:
|
||||
repository: "{{ .Values.global.openReplayContainerRegistry }}/crons"
|
||||
pullPolicy: Always
|
||||
# Overrides the image tag whose default is the chart appVersion.
|
||||
tag: ""
|
||||
env:
|
||||
ACTION: "PROJECTS_STATS"
|
||||
fixProjectsStats:
|
||||
# https://crontab.guru/#0_5_*_*_0
|
||||
# Sunday at 5am
|
||||
cron: "0 5 * * 0"
|
||||
image:
|
||||
repository: "{{ .Values.global.openReplayContainerRegistry }}/crons"
|
||||
pullPolicy: Always
|
||||
# Overrides the image tag whose default is the chart appVersion.
|
||||
tag: ""
|
||||
env:
|
||||
ACTION: "FIX_PROJECTS_STATS"
|
||||
|
||||
# Common env values are from chalice for the crons
|
||||
chalice:
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue