refactor(chalice): remove peers from health checks and fix formatting

Updated health.py to remove the peers-openreplay service from health
checks and applied consistent formatting throughout the file. This
includes proper line breaks, trailing commas for multi-line data
structures, and consistent indentation patterns.

Signed-off-by: rjshrjndrn <rjshrjndrn@gmail.com>
This commit is contained in:
rjshrjndrn 2025-03-13 11:47:21 +01:00
parent 221bee70f5
commit fb49c715cb
3 changed files with 214 additions and 170 deletions

View file

@ -13,15 +13,18 @@ def get_state(tenant_id):
if len(pids) > 0: if len(pids) > 0:
cur.execute( cur.execute(
cur.mogrify("""SELECT EXISTS(( SELECT 1 cur.mogrify(
"""SELECT EXISTS(( SELECT 1
FROM public.sessions AS s FROM public.sessions AS s
WHERE s.project_id IN %(ids)s)) AS exists;""", WHERE s.project_id IN %(ids)s)) AS exists;""",
{"ids": tuple(pids)}) {"ids": tuple(pids)},
)
) )
recorded = cur.fetchone()["exists"] recorded = cur.fetchone()["exists"]
meta = False meta = False
if recorded: if recorded:
query = cur.mogrify(f"""SELECT EXISTS((SELECT 1 query = cur.mogrify(
f"""SELECT EXISTS((SELECT 1
FROM public.projects AS p FROM public.projects AS p
LEFT JOIN LATERAL ( SELECT 1 LEFT JOIN LATERAL ( SELECT 1
FROM public.sessions FROM public.sessions
@ -36,26 +39,35 @@ def get_state(tenant_id):
OR p.metadata_8 IS NOT NULL OR p.metadata_9 IS NOT NULL OR p.metadata_8 IS NOT NULL OR p.metadata_9 IS NOT NULL
OR p.metadata_10 IS NOT NULL ) OR p.metadata_10 IS NOT NULL )
)) AS exists;""", )) AS exists;""",
{"tenant_id": tenant_id}) {"tenant_id": tenant_id},
)
cur.execute(query) cur.execute(query)
meta = cur.fetchone()["exists"] meta = cur.fetchone()["exists"]
return [ return [
{"task": "Install OpenReplay", {
"done": recorded, "task": "Install OpenReplay",
"URL": "https://docs.openreplay.com/getting-started/quick-start"}, "done": recorded,
{"task": "Identify Users", "URL": "https://docs.openreplay.com/getting-started/quick-start",
"done": meta, },
"URL": "https://docs.openreplay.com/data-privacy-security/metadata"}, {
{"task": "Invite Team Members", "task": "Identify Users",
"done": len(users.get_members(tenant_id=tenant_id)) > 1, "done": meta,
"URL": "https://app.openreplay.com/client/manage-users"}, "URL": "https://docs.openreplay.com/data-privacy-security/metadata",
{"task": "Integrations", },
"done": len(datadog.get_all(tenant_id=tenant_id)) > 0 \ {
or len(sentry.get_all(tenant_id=tenant_id)) > 0 \ "task": "Invite Team Members",
or len(stackdriver.get_all(tenant_id=tenant_id)) > 0, "done": len(users.get_members(tenant_id=tenant_id)) > 1,
"URL": "https://docs.openreplay.com/integrations"} "URL": "https://app.openreplay.com/client/manage-users",
},
{
"task": "Integrations",
"done": len(datadog.get_all(tenant_id=tenant_id)) > 0
or len(sentry.get_all(tenant_id=tenant_id)) > 0
or len(stackdriver.get_all(tenant_id=tenant_id)) > 0,
"URL": "https://docs.openreplay.com/integrations",
},
] ]
@ -66,21 +78,26 @@ def get_state_installing(tenant_id):
if len(pids) > 0: if len(pids) > 0:
cur.execute( cur.execute(
cur.mogrify("""SELECT EXISTS(( SELECT 1 cur.mogrify(
"""SELECT EXISTS(( SELECT 1
FROM public.sessions AS s FROM public.sessions AS s
WHERE s.project_id IN %(ids)s)) AS exists;""", WHERE s.project_id IN %(ids)s)) AS exists;""",
{"ids": tuple(pids)}) {"ids": tuple(pids)},
)
) )
recorded = cur.fetchone()["exists"] recorded = cur.fetchone()["exists"]
return {"task": "Install OpenReplay", return {
"done": recorded, "task": "Install OpenReplay",
"URL": "https://docs.openreplay.com/getting-started/quick-start"} "done": recorded,
"URL": "https://docs.openreplay.com/getting-started/quick-start",
}
def get_state_identify_users(tenant_id): def get_state_identify_users(tenant_id):
with pg_client.PostgresClient() as cur: with pg_client.PostgresClient() as cur:
query = cur.mogrify(f"""SELECT EXISTS((SELECT 1 query = cur.mogrify(
f"""SELECT EXISTS((SELECT 1
FROM public.projects AS p FROM public.projects AS p
LEFT JOIN LATERAL ( SELECT 1 LEFT JOIN LATERAL ( SELECT 1
FROM public.sessions FROM public.sessions
@ -95,25 +112,32 @@ def get_state_identify_users(tenant_id):
OR p.metadata_8 IS NOT NULL OR p.metadata_9 IS NOT NULL OR p.metadata_8 IS NOT NULL OR p.metadata_9 IS NOT NULL
OR p.metadata_10 IS NOT NULL ) OR p.metadata_10 IS NOT NULL )
)) AS exists;""", )) AS exists;""",
{"tenant_id": tenant_id}) {"tenant_id": tenant_id},
)
cur.execute(query) cur.execute(query)
meta = cur.fetchone()["exists"] meta = cur.fetchone()["exists"]
return {"task": "Identify Users", return {
"done": meta, "task": "Identify Users",
"URL": "https://docs.openreplay.com/data-privacy-security/metadata"} "done": meta,
"URL": "https://docs.openreplay.com/data-privacy-security/metadata",
}
def get_state_manage_users(tenant_id): def get_state_manage_users(tenant_id):
return {"task": "Invite Team Members", return {
"done": len(users.get_members(tenant_id=tenant_id)) > 1, "task": "Invite Team Members",
"URL": "https://app.openreplay.com/client/manage-users"} "done": len(users.get_members(tenant_id=tenant_id)) > 1,
"URL": "https://app.openreplay.com/client/manage-users",
}
def get_state_integrations(tenant_id): def get_state_integrations(tenant_id):
return {"task": "Integrations", return {
"done": len(datadog.get_all(tenant_id=tenant_id)) > 0 \ "task": "Integrations",
or len(sentry.get_all(tenant_id=tenant_id)) > 0 \ "done": len(datadog.get_all(tenant_id=tenant_id)) > 0
or len(stackdriver.get_all(tenant_id=tenant_id)) > 0, or len(sentry.get_all(tenant_id=tenant_id)) > 0
"URL": "https://docs.openreplay.com/integrations"} or len(stackdriver.get_all(tenant_id=tenant_id)) > 0,
"URL": "https://docs.openreplay.com/integrations",
}

View file

@ -27,7 +27,6 @@ HEALTH_ENDPOINTS = {
"http": app_connection_string("http-openreplay", 8888, "metrics"), "http": app_connection_string("http-openreplay", 8888, "metrics"),
"ingress-nginx": app_connection_string("ingress-nginx-openreplay", 80, "healthz"), "ingress-nginx": app_connection_string("ingress-nginx-openreplay", 80, "healthz"),
"integrations": app_connection_string("integrations-openreplay", 8888, "metrics"), "integrations": app_connection_string("integrations-openreplay", 8888, "metrics"),
"peers": app_connection_string("peers-openreplay", 8888, "health"),
"sink": app_connection_string("sink-openreplay", 8888, "metrics"), "sink": app_connection_string("sink-openreplay", 8888, "metrics"),
"sourcemapreader": app_connection_string( "sourcemapreader": app_connection_string(
"sourcemapreader-openreplay", 8888, "health" "sourcemapreader-openreplay", 8888, "health"
@ -39,9 +38,7 @@ HEALTH_ENDPOINTS = {
def __check_database_pg(*_): def __check_database_pg(*_):
fail_response = { fail_response = {
"health": False, "health": False,
"details": { "details": {"errors": ["Postgres health-check failed"]},
"errors": ["Postgres health-check failed"]
}
} }
with pg_client.PostgresClient() as cur: with pg_client.PostgresClient() as cur:
try: try:
@ -63,29 +60,26 @@ def __check_database_pg(*_):
"details": { "details": {
# "version": server_version["server_version"], # "version": server_version["server_version"],
# "schema": schema_version["version"] # "schema": schema_version["version"]
} },
} }
def __always_healthy(*_): def __always_healthy(*_):
return { return {"health": True, "details": {}}
"health": True,
"details": {}
}
def __check_be_service(service_name): def __check_be_service(service_name):
def fn(*_): def fn(*_):
fail_response = { fail_response = {
"health": False, "health": False,
"details": { "details": {"errors": ["server health-check failed"]},
"errors": ["server health-check failed"]
}
} }
try: try:
results = requests.get(HEALTH_ENDPOINTS.get(service_name), timeout=2) results = requests.get(HEALTH_ENDPOINTS.get(service_name), timeout=2)
if results.status_code != 200: if results.status_code != 200:
logger.error(f"!! issue with the {service_name}-health code:{results.status_code}") logger.error(
f"!! issue with the {service_name}-health code:{results.status_code}"
)
logger.error(results.text) logger.error(results.text)
# fail_response["details"]["errors"].append(results.text) # fail_response["details"]["errors"].append(results.text)
return fail_response return fail_response
@ -103,10 +97,7 @@ def __check_be_service(service_name):
logger.error("couldn't get response") logger.error("couldn't get response")
# fail_response["details"]["errors"].append(str(e)) # fail_response["details"]["errors"].append(str(e))
return fail_response return fail_response
return { return {"health": True, "details": {}}
"health": True,
"details": {}
}
return fn return fn
@ -114,7 +105,7 @@ def __check_be_service(service_name):
def __check_redis(*_): def __check_redis(*_):
fail_response = { fail_response = {
"health": False, "health": False,
"details": {"errors": ["server health-check failed"]} "details": {"errors": ["server health-check failed"]},
} }
if config("REDIS_STRING", default=None) is None: if config("REDIS_STRING", default=None) is None:
# fail_response["details"]["errors"].append("REDIS_STRING not defined in env-vars") # fail_response["details"]["errors"].append("REDIS_STRING not defined in env-vars")
@ -133,16 +124,14 @@ def __check_redis(*_):
"health": True, "health": True,
"details": { "details": {
# "version": r.execute_command('INFO')['redis_version'] # "version": r.execute_command('INFO')['redis_version']
} },
} }
def __check_SSL(*_): def __check_SSL(*_):
fail_response = { fail_response = {
"health": False, "health": False,
"details": { "details": {"errors": ["SSL Certificate health-check failed"]},
"errors": ["SSL Certificate health-check failed"]
}
} }
try: try:
requests.get(config("SITE_URL"), verify=True, allow_redirects=True) requests.get(config("SITE_URL"), verify=True, allow_redirects=True)
@ -150,36 +139,28 @@ def __check_SSL(*_):
logger.error("!! health failed: SSL Certificate") logger.error("!! health failed: SSL Certificate")
logger.exception(e) logger.exception(e)
return fail_response return fail_response
return { return {"health": True, "details": {}}
"health": True,
"details": {}
}
def __get_sessions_stats(*_): def __get_sessions_stats(*_):
with pg_client.PostgresClient() as cur: with pg_client.PostgresClient() as cur:
constraints = ["projects.deleted_at IS NULL"] constraints = ["projects.deleted_at IS NULL"]
query = cur.mogrify(f"""SELECT COALESCE(SUM(sessions_count),0) AS s_c, query = cur.mogrify(
f"""SELECT COALESCE(SUM(sessions_count),0) AS s_c,
COALESCE(SUM(events_count),0) AS e_c COALESCE(SUM(events_count),0) AS e_c
FROM public.projects_stats FROM public.projects_stats
INNER JOIN public.projects USING(project_id) INNER JOIN public.projects USING(project_id)
WHERE {" AND ".join(constraints)};""") WHERE {" AND ".join(constraints)};"""
)
cur.execute(query) cur.execute(query)
row = cur.fetchone() row = cur.fetchone()
return { return {"numberOfSessionsCaptured": row["s_c"], "numberOfEventCaptured": row["e_c"]}
"numberOfSessionsCaptured": row["s_c"],
"numberOfEventCaptured": row["e_c"]
}
def get_health(tenant_id=None): def get_health(tenant_id=None):
health_map = { health_map = {
"databases": { "databases": {"postgres": __check_database_pg},
"postgres": __check_database_pg "ingestionPipeline": {"redis": __check_redis},
},
"ingestionPipeline": {
"redis": __check_redis
},
"backendServices": { "backendServices": {
"alerts": __check_be_service("alerts"), "alerts": __check_be_service("alerts"),
"assets": __check_be_service("assets"), "assets": __check_be_service("assets"),
@ -192,13 +173,12 @@ def get_health(tenant_id=None):
"http": __check_be_service("http"), "http": __check_be_service("http"),
"ingress-nginx": __always_healthy, "ingress-nginx": __always_healthy,
"integrations": __check_be_service("integrations"), "integrations": __check_be_service("integrations"),
"peers": __check_be_service("peers"),
"sink": __check_be_service("sink"), "sink": __check_be_service("sink"),
"sourcemapreader": __check_be_service("sourcemapreader"), "sourcemapreader": __check_be_service("sourcemapreader"),
"storage": __check_be_service("storage") "storage": __check_be_service("storage"),
}, },
"details": __get_sessions_stats, "details": __get_sessions_stats,
"ssl": __check_SSL "ssl": __check_SSL,
} }
return __process_health(health_map=health_map) return __process_health(health_map=health_map)
@ -210,10 +190,16 @@ def __process_health(health_map):
response.pop(parent_key) response.pop(parent_key)
elif isinstance(health_map[parent_key], dict): elif isinstance(health_map[parent_key], dict):
for element_key in health_map[parent_key]: for element_key in health_map[parent_key]:
if config(f"SKIP_H_{parent_key.upper()}_{element_key.upper()}", cast=bool, default=False): if config(
f"SKIP_H_{parent_key.upper()}_{element_key.upper()}",
cast=bool,
default=False,
):
response[parent_key].pop(element_key) response[parent_key].pop(element_key)
else: else:
response[parent_key][element_key] = health_map[parent_key][element_key]() response[parent_key][element_key] = health_map[parent_key][
element_key
]()
else: else:
response[parent_key] = health_map[parent_key]() response[parent_key] = health_map[parent_key]()
return response return response
@ -221,7 +207,8 @@ def __process_health(health_map):
def cron(): def cron():
with pg_client.PostgresClient() as cur: with pg_client.PostgresClient() as cur:
query = cur.mogrify("""SELECT projects.project_id, query = cur.mogrify(
"""SELECT projects.project_id,
projects.created_at, projects.created_at,
projects.sessions_last_check_at, projects.sessions_last_check_at,
projects.first_recorded_session_at, projects.first_recorded_session_at,
@ -229,7 +216,8 @@ def cron():
FROM public.projects FROM public.projects
LEFT JOIN public.projects_stats USING (project_id) LEFT JOIN public.projects_stats USING (project_id)
WHERE projects.deleted_at IS NULL WHERE projects.deleted_at IS NULL
ORDER BY project_id;""") ORDER BY project_id;"""
)
cur.execute(query) cur.execute(query)
rows = cur.fetchall() rows = cur.fetchall()
for r in rows: for r in rows:
@ -250,20 +238,24 @@ def cron():
count_start_from = r["last_update_at"] count_start_from = r["last_update_at"]
count_start_from = TimeUTC.datetime_to_timestamp(count_start_from) count_start_from = TimeUTC.datetime_to_timestamp(count_start_from)
params = {"project_id": r["project_id"], params = {
"start_ts": count_start_from, "project_id": r["project_id"],
"end_ts": TimeUTC.now(), "start_ts": count_start_from,
"sessions_count": 0, "end_ts": TimeUTC.now(),
"events_count": 0} "sessions_count": 0,
"events_count": 0,
}
query = cur.mogrify("""SELECT COUNT(1) AS sessions_count, query = cur.mogrify(
"""SELECT COUNT(1) AS sessions_count,
COALESCE(SUM(events_count),0) AS events_count COALESCE(SUM(events_count),0) AS events_count
FROM public.sessions FROM public.sessions
WHERE project_id=%(project_id)s WHERE project_id=%(project_id)s
AND start_ts>=%(start_ts)s AND start_ts>=%(start_ts)s
AND start_ts<=%(end_ts)s AND start_ts<=%(end_ts)s
AND duration IS NOT NULL;""", AND duration IS NOT NULL;""",
params) params,
)
cur.execute(query) cur.execute(query)
row = cur.fetchone() row = cur.fetchone()
if row is not None: if row is not None:
@ -271,56 +263,68 @@ def cron():
params["events_count"] = row["events_count"] params["events_count"] = row["events_count"]
if insert: if insert:
query = cur.mogrify("""INSERT INTO public.projects_stats(project_id, sessions_count, events_count, last_update_at) query = cur.mogrify(
"""INSERT INTO public.projects_stats(project_id, sessions_count, events_count, last_update_at)
VALUES (%(project_id)s, %(sessions_count)s, %(events_count)s, (now() AT TIME ZONE 'utc'::text));""", VALUES (%(project_id)s, %(sessions_count)s, %(events_count)s, (now() AT TIME ZONE 'utc'::text));""",
params) params,
)
else: else:
query = cur.mogrify("""UPDATE public.projects_stats query = cur.mogrify(
"""UPDATE public.projects_stats
SET sessions_count=sessions_count+%(sessions_count)s, SET sessions_count=sessions_count+%(sessions_count)s,
events_count=events_count+%(events_count)s, events_count=events_count+%(events_count)s,
last_update_at=(now() AT TIME ZONE 'utc'::text) last_update_at=(now() AT TIME ZONE 'utc'::text)
WHERE project_id=%(project_id)s;""", WHERE project_id=%(project_id)s;""",
params) params,
)
cur.execute(query) cur.execute(query)
# this cron is used to correct the sessions&events count every week # this cron is used to correct the sessions&events count every week
def weekly_cron(): def weekly_cron():
with pg_client.PostgresClient(long_query=True) as cur: with pg_client.PostgresClient(long_query=True) as cur:
query = cur.mogrify("""SELECT project_id, query = cur.mogrify(
"""SELECT project_id,
projects_stats.last_update_at projects_stats.last_update_at
FROM public.projects FROM public.projects
LEFT JOIN public.projects_stats USING (project_id) LEFT JOIN public.projects_stats USING (project_id)
WHERE projects.deleted_at IS NULL WHERE projects.deleted_at IS NULL
ORDER BY project_id;""") ORDER BY project_id;"""
)
cur.execute(query) cur.execute(query)
rows = cur.fetchall() rows = cur.fetchall()
for r in rows: for r in rows:
if r["last_update_at"] is None: if r["last_update_at"] is None:
continue continue
params = {"project_id": r["project_id"], params = {
"end_ts": TimeUTC.now(), "project_id": r["project_id"],
"sessions_count": 0, "end_ts": TimeUTC.now(),
"events_count": 0} "sessions_count": 0,
"events_count": 0,
}
query = cur.mogrify("""SELECT COUNT(1) AS sessions_count, query = cur.mogrify(
"""SELECT COUNT(1) AS sessions_count,
COALESCE(SUM(events_count),0) AS events_count COALESCE(SUM(events_count),0) AS events_count
FROM public.sessions FROM public.sessions
WHERE project_id=%(project_id)s WHERE project_id=%(project_id)s
AND start_ts<=%(end_ts)s AND start_ts<=%(end_ts)s
AND duration IS NOT NULL;""", AND duration IS NOT NULL;""",
params) params,
)
cur.execute(query) cur.execute(query)
row = cur.fetchone() row = cur.fetchone()
if row is not None: if row is not None:
params["sessions_count"] = row["sessions_count"] params["sessions_count"] = row["sessions_count"]
params["events_count"] = row["events_count"] params["events_count"] = row["events_count"]
query = cur.mogrify("""UPDATE public.projects_stats query = cur.mogrify(
"""UPDATE public.projects_stats
SET sessions_count=%(sessions_count)s, SET sessions_count=%(sessions_count)s,
events_count=%(events_count)s, events_count=%(events_count)s,
last_update_at=(now() AT TIME ZONE 'utc'::text) last_update_at=(now() AT TIME ZONE 'utc'::text)
WHERE project_id=%(project_id)s;""", WHERE project_id=%(project_id)s;""",
params) params,
)
cur.execute(query) cur.execute(query)

View file

@ -2,6 +2,7 @@ import logging
import redis import redis
import requests import requests
# from confluent_kafka.admin import AdminClient # from confluent_kafka.admin import AdminClient
from decouple import config from decouple import config
@ -28,7 +29,6 @@ HEALTH_ENDPOINTS = {
"http": app_connection_string("http-openreplay", 8888, "metrics"), "http": app_connection_string("http-openreplay", 8888, "metrics"),
"ingress-nginx": app_connection_string("ingress-nginx-openreplay", 80, "healthz"), "ingress-nginx": app_connection_string("ingress-nginx-openreplay", 80, "healthz"),
"integrations": app_connection_string("integrations-openreplay", 8888, "metrics"), "integrations": app_connection_string("integrations-openreplay", 8888, "metrics"),
"peers": app_connection_string("peers-openreplay", 8888, "health"),
"sink": app_connection_string("sink-openreplay", 8888, "metrics"), "sink": app_connection_string("sink-openreplay", 8888, "metrics"),
"sourcemapreader": app_connection_string( "sourcemapreader": app_connection_string(
"sourcemapreader-openreplay", 8888, "health" "sourcemapreader-openreplay", 8888, "health"
@ -40,9 +40,7 @@ HEALTH_ENDPOINTS = {
def __check_database_pg(*_): def __check_database_pg(*_):
fail_response = { fail_response = {
"health": False, "health": False,
"details": { "details": {"errors": ["Postgres health-check failed"]},
"errors": ["Postgres health-check failed"]
}
} }
with pg_client.PostgresClient() as cur: with pg_client.PostgresClient() as cur:
try: try:
@ -64,29 +62,26 @@ def __check_database_pg(*_):
"details": { "details": {
# "version": server_version["server_version"], # "version": server_version["server_version"],
# "schema": schema_version["version"] # "schema": schema_version["version"]
} },
} }
def __always_healthy(*_): def __always_healthy(*_):
return { return {"health": True, "details": {}}
"health": True,
"details": {}
}
def __check_be_service(service_name): def __check_be_service(service_name):
def fn(*_): def fn(*_):
fail_response = { fail_response = {
"health": False, "health": False,
"details": { "details": {"errors": ["server health-check failed"]},
"errors": ["server health-check failed"]
}
} }
try: try:
results = requests.get(HEALTH_ENDPOINTS.get(service_name), timeout=2) results = requests.get(HEALTH_ENDPOINTS.get(service_name), timeout=2)
if results.status_code != 200: if results.status_code != 200:
logger.error(f"!! issue with the {service_name}-health code:{results.status_code}") logger.error(
f"!! issue with the {service_name}-health code:{results.status_code}"
)
logger.error(results.text) logger.error(results.text)
# fail_response["details"]["errors"].append(results.text) # fail_response["details"]["errors"].append(results.text)
return fail_response return fail_response
@ -104,10 +99,7 @@ def __check_be_service(service_name):
logger.error("couldn't get response") logger.error("couldn't get response")
# fail_response["details"]["errors"].append(str(e)) # fail_response["details"]["errors"].append(str(e))
return fail_response return fail_response
return { return {"health": True, "details": {}}
"health": True,
"details": {}
}
return fn return fn
@ -115,7 +107,7 @@ def __check_be_service(service_name):
def __check_redis(*_): def __check_redis(*_):
fail_response = { fail_response = {
"health": False, "health": False,
"details": {"errors": ["server health-check failed"]} "details": {"errors": ["server health-check failed"]},
} }
if config("REDIS_STRING", default=None) is None: if config("REDIS_STRING", default=None) is None:
# fail_response["details"]["errors"].append("REDIS_STRING not defined in env-vars") # fail_response["details"]["errors"].append("REDIS_STRING not defined in env-vars")
@ -134,16 +126,14 @@ def __check_redis(*_):
"health": True, "health": True,
"details": { "details": {
# "version": r.execute_command('INFO')['redis_version'] # "version": r.execute_command('INFO')['redis_version']
} },
} }
def __check_SSL(*_): def __check_SSL(*_):
fail_response = { fail_response = {
"health": False, "health": False,
"details": { "details": {"errors": ["SSL Certificate health-check failed"]},
"errors": ["SSL Certificate health-check failed"]
}
} }
try: try:
requests.get(config("SITE_URL"), verify=True, allow_redirects=True) requests.get(config("SITE_URL"), verify=True, allow_redirects=True)
@ -151,10 +141,7 @@ def __check_SSL(*_):
logger.error("!! health failed: SSL Certificate") logger.error("!! health failed: SSL Certificate")
logger.exception(e) logger.exception(e)
return fail_response return fail_response
return { return {"health": True, "details": {}}
"health": True,
"details": {}
}
def __get_sessions_stats(tenant_id, *_): def __get_sessions_stats(tenant_id, *_):
@ -162,31 +149,34 @@ def __get_sessions_stats(tenant_id, *_):
constraints = ["projects.deleted_at IS NULL"] constraints = ["projects.deleted_at IS NULL"]
if tenant_id: if tenant_id:
constraints.append("tenant_id=%(tenant_id)s") constraints.append("tenant_id=%(tenant_id)s")
query = cur.mogrify(f"""SELECT COALESCE(SUM(sessions_count),0) AS s_c, query = cur.mogrify(
f"""SELECT COALESCE(SUM(sessions_count),0) AS s_c,
COALESCE(SUM(events_count),0) AS e_c COALESCE(SUM(events_count),0) AS e_c
FROM public.projects_stats FROM public.projects_stats
INNER JOIN public.projects USING(project_id) INNER JOIN public.projects USING(project_id)
WHERE {" AND ".join(constraints)};""", WHERE {" AND ".join(constraints)};""",
{"tenant_id": tenant_id}) {"tenant_id": tenant_id},
)
cur.execute(query) cur.execute(query)
row = cur.fetchone() row = cur.fetchone()
return { return {"numberOfSessionsCaptured": row["s_c"], "numberOfEventCaptured": row["e_c"]}
"numberOfSessionsCaptured": row["s_c"],
"numberOfEventCaptured": row["e_c"]
}
def get_health(tenant_id=None): def get_health(tenant_id=None):
health_map = { health_map = {
"databases": { "databases": {
"postgres": __check_database_pg, "postgres": __check_database_pg,
"clickhouse": __check_database_ch "clickhouse": __check_database_ch,
}, },
"ingestionPipeline": { "ingestionPipeline": {
**({"redis": __check_redis} if config("REDIS_STRING", default=None) **(
and len(config("REDIS_STRING")) > 0 else {}), {"redis": __check_redis}
if config("REDIS_STRING", default=None)
and len(config("REDIS_STRING")) > 0
else {}
),
# "kafka": __check_kafka # "kafka": __check_kafka
"kafka": __always_healthy "kafka": __always_healthy,
}, },
"backendServices": { "backendServices": {
"alerts": __check_be_service("alerts"), "alerts": __check_be_service("alerts"),
@ -200,14 +190,13 @@ def get_health(tenant_id=None):
"http": __check_be_service("http"), "http": __check_be_service("http"),
"ingress-nginx": __always_healthy, "ingress-nginx": __always_healthy,
"integrations": __check_be_service("integrations"), "integrations": __check_be_service("integrations"),
"peers": __check_be_service("peers"),
# "quickwit": __check_be_service("quickwit"), # "quickwit": __check_be_service("quickwit"),
"sink": __check_be_service("sink"), "sink": __check_be_service("sink"),
"sourcemapreader": __check_be_service("sourcemapreader"), "sourcemapreader": __check_be_service("sourcemapreader"),
"storage": __check_be_service("storage") "storage": __check_be_service("storage"),
}, },
"details": __get_sessions_stats, "details": __get_sessions_stats,
"ssl": __check_SSL "ssl": __check_SSL,
} }
return __process_health(tenant_id=tenant_id, health_map=health_map) return __process_health(tenant_id=tenant_id, health_map=health_map)
@ -219,10 +208,16 @@ def __process_health(tenant_id, health_map):
response.pop(parent_key) response.pop(parent_key)
elif isinstance(health_map[parent_key], dict): elif isinstance(health_map[parent_key], dict):
for element_key in health_map[parent_key]: for element_key in health_map[parent_key]:
if config(f"SKIP_H_{parent_key.upper()}_{element_key.upper()}", cast=bool, default=False): if config(
f"SKIP_H_{parent_key.upper()}_{element_key.upper()}",
cast=bool,
default=False,
):
response[parent_key].pop(element_key) response[parent_key].pop(element_key)
else: else:
response[parent_key][element_key] = health_map[parent_key][element_key](tenant_id) response[parent_key][element_key] = health_map[parent_key][
element_key
](tenant_id)
else: else:
response[parent_key] = health_map[parent_key](tenant_id) response[parent_key] = health_map[parent_key](tenant_id)
return response return response
@ -230,7 +225,8 @@ def __process_health(tenant_id, health_map):
def cron(): def cron():
with pg_client.PostgresClient() as cur: with pg_client.PostgresClient() as cur:
query = cur.mogrify("""SELECT projects.project_id, query = cur.mogrify(
"""SELECT projects.project_id,
projects.created_at, projects.created_at,
projects.sessions_last_check_at, projects.sessions_last_check_at,
projects.first_recorded_session_at, projects.first_recorded_session_at,
@ -238,7 +234,8 @@ def cron():
FROM public.projects FROM public.projects
LEFT JOIN public.projects_stats USING (project_id) LEFT JOIN public.projects_stats USING (project_id)
WHERE projects.deleted_at IS NULL WHERE projects.deleted_at IS NULL
ORDER BY project_id;""") ORDER BY project_id;"""
)
cur.execute(query) cur.execute(query)
rows = cur.fetchall() rows = cur.fetchall()
for r in rows: for r in rows:
@ -259,20 +256,24 @@ def cron():
count_start_from = r["last_update_at"] count_start_from = r["last_update_at"]
count_start_from = TimeUTC.datetime_to_timestamp(count_start_from) count_start_from = TimeUTC.datetime_to_timestamp(count_start_from)
params = {"project_id": r["project_id"], params = {
"start_ts": count_start_from, "project_id": r["project_id"],
"end_ts": TimeUTC.now(), "start_ts": count_start_from,
"sessions_count": 0, "end_ts": TimeUTC.now(),
"events_count": 0} "sessions_count": 0,
"events_count": 0,
}
query = cur.mogrify("""SELECT COUNT(1) AS sessions_count, query = cur.mogrify(
"""SELECT COUNT(1) AS sessions_count,
COALESCE(SUM(events_count),0) AS events_count COALESCE(SUM(events_count),0) AS events_count
FROM public.sessions FROM public.sessions
WHERE project_id=%(project_id)s WHERE project_id=%(project_id)s
AND start_ts>=%(start_ts)s AND start_ts>=%(start_ts)s
AND start_ts<=%(end_ts)s AND start_ts<=%(end_ts)s
AND duration IS NOT NULL;""", AND duration IS NOT NULL;""",
params) params,
)
cur.execute(query) cur.execute(query)
row = cur.fetchone() row = cur.fetchone()
if row is not None: if row is not None:
@ -280,65 +281,77 @@ def cron():
params["events_count"] = row["events_count"] params["events_count"] = row["events_count"]
if insert: if insert:
query = cur.mogrify("""INSERT INTO public.projects_stats(project_id, sessions_count, events_count, last_update_at) query = cur.mogrify(
"""INSERT INTO public.projects_stats(project_id, sessions_count, events_count, last_update_at)
VALUES (%(project_id)s, %(sessions_count)s, %(events_count)s, (now() AT TIME ZONE 'utc'::text));""", VALUES (%(project_id)s, %(sessions_count)s, %(events_count)s, (now() AT TIME ZONE 'utc'::text));""",
params) params,
)
else: else:
query = cur.mogrify("""UPDATE public.projects_stats query = cur.mogrify(
"""UPDATE public.projects_stats
SET sessions_count=sessions_count+%(sessions_count)s, SET sessions_count=sessions_count+%(sessions_count)s,
events_count=events_count+%(events_count)s, events_count=events_count+%(events_count)s,
last_update_at=(now() AT TIME ZONE 'utc'::text) last_update_at=(now() AT TIME ZONE 'utc'::text)
WHERE project_id=%(project_id)s;""", WHERE project_id=%(project_id)s;""",
params) params,
)
cur.execute(query) cur.execute(query)
# this cron is used to correct the sessions&events count every week # this cron is used to correct the sessions&events count every week
def weekly_cron(): def weekly_cron():
with pg_client.PostgresClient(long_query=True) as cur: with pg_client.PostgresClient(long_query=True) as cur:
query = cur.mogrify("""SELECT project_id, query = cur.mogrify(
"""SELECT project_id,
projects_stats.last_update_at projects_stats.last_update_at
FROM public.projects FROM public.projects
LEFT JOIN public.projects_stats USING (project_id) LEFT JOIN public.projects_stats USING (project_id)
WHERE projects.deleted_at IS NULL WHERE projects.deleted_at IS NULL
ORDER BY project_id;""") ORDER BY project_id;"""
)
cur.execute(query) cur.execute(query)
rows = cur.fetchall() rows = cur.fetchall()
for r in rows: for r in rows:
if r["last_update_at"] is None: if r["last_update_at"] is None:
continue continue
params = {"project_id": r["project_id"], params = {
"end_ts": TimeUTC.now(), "project_id": r["project_id"],
"sessions_count": 0, "end_ts": TimeUTC.now(),
"events_count": 0} "sessions_count": 0,
"events_count": 0,
}
query = cur.mogrify("""SELECT COUNT(1) AS sessions_count, query = cur.mogrify(
"""SELECT COUNT(1) AS sessions_count,
COALESCE(SUM(events_count),0) AS events_count COALESCE(SUM(events_count),0) AS events_count
FROM public.sessions FROM public.sessions
WHERE project_id=%(project_id)s WHERE project_id=%(project_id)s
AND start_ts<=%(end_ts)s AND start_ts<=%(end_ts)s
AND duration IS NOT NULL;""", AND duration IS NOT NULL;""",
params) params,
)
cur.execute(query) cur.execute(query)
row = cur.fetchone() row = cur.fetchone()
if row is not None: if row is not None:
params["sessions_count"] = row["sessions_count"] params["sessions_count"] = row["sessions_count"]
params["events_count"] = row["events_count"] params["events_count"] = row["events_count"]
query = cur.mogrify("""UPDATE public.projects_stats query = cur.mogrify(
"""UPDATE public.projects_stats
SET sessions_count=%(sessions_count)s, SET sessions_count=%(sessions_count)s,
events_count=%(events_count)s, events_count=%(events_count)s,
last_update_at=(now() AT TIME ZONE 'utc'::text) last_update_at=(now() AT TIME ZONE 'utc'::text)
WHERE project_id=%(project_id)s;""", WHERE project_id=%(project_id)s;""",
params) params,
)
cur.execute(query) cur.execute(query)
def __check_database_ch(*_): def __check_database_ch(*_):
fail_response = { fail_response = {
"health": False, "health": False,
"details": {"errors": ["server health-check failed"]} "details": {"errors": ["server health-check failed"]},
} }
with ch_client.ClickHouseClient() as ch: with ch_client.ClickHouseClient() as ch:
try: try:
@ -348,9 +361,11 @@ def __check_database_ch(*_):
logger.exception(e) logger.exception(e)
return fail_response return fail_response
schema_version = ch.execute("""SELECT 1 schema_version = ch.execute(
"""SELECT 1
FROM system.functions FROM system.functions
WHERE name = 'openreplay_version';""") WHERE name = 'openreplay_version';"""
)
if len(schema_version) > 0: if len(schema_version) > 0:
schema_version = ch.execute("SELECT openreplay_version() AS version;") schema_version = ch.execute("SELECT openreplay_version() AS version;")
schema_version = schema_version[0]["version"] schema_version = schema_version[0]["version"]
@ -365,9 +380,10 @@ def __check_database_ch(*_):
# "version": server_version[0]["server_version"], # "version": server_version[0]["server_version"],
# "schema": schema_version, # "schema": schema_version,
# **errors # **errors
} },
} }
# def __check_kafka(*_): # def __check_kafka(*_):
# fail_response = { # fail_response = {
# "health": False, # "health": False,