feat(alerts): optimized alerts
feat(alerts): refactored alerts feat(alerts): moved EE alerts to CH
This commit is contained in:
parent
a1a47c8e84
commit
4c2fa46931
5 changed files with 69 additions and 86 deletions
|
|
@ -1,12 +1,16 @@
|
|||
import decimal
|
||||
import logging
|
||||
|
||||
from decouple import config
|
||||
|
||||
import schemas
|
||||
from chalicelib.core import alerts_listener
|
||||
from chalicelib.core import sessions, alerts
|
||||
from chalicelib.utils import pg_client
|
||||
from chalicelib.utils.TimeUTC import TimeUTC
|
||||
|
||||
logging.basicConfig(level=config("LOGLEVEL", default=logging.INFO))
|
||||
|
||||
LeftToDb = {
|
||||
schemas.AlertColumn.performance__dom_content_loaded__average: {
|
||||
"table": "events.pages INNER JOIN public.sessions USING(session_id)",
|
||||
|
|
@ -191,30 +195,7 @@ def process():
|
|||
result = cur.fetchone()
|
||||
if result["valid"]:
|
||||
logging.info("Valid alert, notifying users")
|
||||
notifications.append({
|
||||
"alertId": alert["alertId"],
|
||||
"tenantId": alert["tenantId"],
|
||||
"title": alert["name"],
|
||||
"description": f"has been triggered, {alert['query']['left']} = {round(result['value'], 2)} ({alert['query']['operator']} {alert['query']['right']}).",
|
||||
"buttonText": "Check metrics for more details",
|
||||
"buttonUrl": f"/{alert['projectId']}/metrics",
|
||||
"imageUrl": None,
|
||||
"options": {"source": "ALERT", "sourceId": alert["alertId"],
|
||||
"sourceMeta": alert["detectionMethod"],
|
||||
"message": alert["options"]["message"], "projectId": alert["projectId"],
|
||||
"data": {"title": alert["name"],
|
||||
"limitValue": alert["query"]["right"],
|
||||
"actualValue": float(result["value"]) \
|
||||
if isinstance(result["value"], decimal.Decimal) \
|
||||
else result["value"],
|
||||
"operator": alert["query"]["operator"],
|
||||
"trigger": alert["query"]["left"],
|
||||
"alertId": alert["alertId"],
|
||||
"detectionMethod": alert["detectionMethod"],
|
||||
"currentPeriod": alert["options"]["currentPeriod"],
|
||||
"previousPeriod": alert["options"]["previousPeriod"],
|
||||
"createdAt": TimeUTC.now()}},
|
||||
})
|
||||
notifications.append(generate_notification(alert, result))
|
||||
except Exception as e:
|
||||
logging.error(f"!!!Error while running alert query for alertId:{alert['alertId']}")
|
||||
logging.error(str(e))
|
||||
|
|
@ -226,3 +207,30 @@ def process():
|
|||
WHERE alert_id IN %(ids)s;""", {"ids": tuple([n["alertId"] for n in notifications])}))
|
||||
if len(notifications) > 0:
|
||||
alerts.process_notifications(notifications)
|
||||
|
||||
|
||||
def generate_notification(alert, result):
|
||||
return {
|
||||
"alertId": alert["alertId"],
|
||||
"tenantId": alert["tenantId"],
|
||||
"title": alert["name"],
|
||||
"description": f"has been triggered, {alert['query']['left']} = {round(result['value'], 2)} ({alert['query']['operator']} {alert['query']['right']}).",
|
||||
"buttonText": "Check metrics for more details",
|
||||
"buttonUrl": f"/{alert['projectId']}/metrics",
|
||||
"imageUrl": None,
|
||||
"options": {"source": "ALERT", "sourceId": alert["alertId"],
|
||||
"sourceMeta": alert["detectionMethod"],
|
||||
"message": alert["options"]["message"], "projectId": alert["projectId"],
|
||||
"data": {"title": alert["name"],
|
||||
"limitValue": alert["query"]["right"],
|
||||
"actualValue": float(result["value"]) \
|
||||
if isinstance(result["value"], decimal.Decimal) \
|
||||
else result["value"],
|
||||
"operator": alert["query"]["operator"],
|
||||
"trigger": alert["query"]["left"],
|
||||
"alertId": alert["alertId"],
|
||||
"detectionMethod": alert["detectionMethod"],
|
||||
"currentPeriod": alert["options"]["currentPeriod"],
|
||||
"previousPeriod": alert["options"]["previousPeriod"],
|
||||
"createdAt": TimeUTC.now()}},
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,7 +12,8 @@ def get_all_alerts():
|
|||
(EXTRACT(EPOCH FROM alerts.created_at) * 1000)::BIGINT AS created_at,
|
||||
alerts.name,
|
||||
alerts.series_id,
|
||||
filter
|
||||
filter,
|
||||
change
|
||||
FROM public.alerts
|
||||
LEFT JOIN metric_series USING (series_id)
|
||||
INNER JOIN projects USING (project_id)
|
||||
|
|
|
|||
|
|
@ -1,12 +1,15 @@
|
|||
import decimal
|
||||
import logging
|
||||
|
||||
from decouple import config
|
||||
|
||||
import schemas
|
||||
from chalicelib.core import alerts_listener, alerts_processor
|
||||
from chalicelib.core import sessions, alerts
|
||||
from chalicelib.utils import pg_client, ch_client, exp_ch_helper
|
||||
from chalicelib.utils.TimeUTC import TimeUTC
|
||||
|
||||
logging.basicConfig(level=config("LOGLEVEL", default=logging.INFO))
|
||||
|
||||
LeftToDb = {
|
||||
schemas.AlertColumn.performance__dom_content_loaded__average: {
|
||||
"table": lambda timestamp: f"{exp_ch_helper.get_main_events_table(timestamp)} AS pages",
|
||||
|
|
@ -120,7 +123,7 @@ def Build(a):
|
|||
a["filter"]["order"] = schemas.SortOrderType.desc
|
||||
a["filter"]["startDate"] = -1
|
||||
a["filter"]["endDate"] = TimeUTC.now()
|
||||
full_args, query_part = sessions.search_query_parts(
|
||||
full_args, query_part = sessions.search_query_parts_ch(
|
||||
data=schemas.SessionsSearchPayloadSchema.parse_obj(a["filter"]), error_status=None, errors_only=False,
|
||||
issue=None, project_id=a["projectId"], user_id=None, favorite_only=False)
|
||||
subQ = f"""SELECT COUNT(session_id) AS value
|
||||
|
|
@ -130,7 +133,8 @@ def Build(a):
|
|||
params["event_type"] = LeftToDb[a["query"]["left"]].get("eventType")
|
||||
subQ = f"""SELECT {colDef["formula"]} AS value
|
||||
FROM {colDef["table"](now)}
|
||||
WHERE project_id = %(project_id)s {"AND event_type=%(event_type)s" if params["event_type"] else ""}
|
||||
WHERE project_id = %(project_id)s
|
||||
{"AND event_type=%(event_type)s" if params["event_type"] else ""}
|
||||
{"AND " + colDef["condition"] if colDef.get("condition") is not None else ""}"""
|
||||
|
||||
q = f"""SELECT coalesce(value,0) AS value, coalesce(value,0) {a["query"]["operator"]} {a["query"]["right"]} AS valid"""
|
||||
|
|
@ -144,7 +148,7 @@ def Build(a):
|
|||
AND datetime<=toDateTime(%(now)s/1000) ) AS stat"""
|
||||
params = {**params, **full_args, "startDate": TimeUTC.now() - a["options"]["currentPeriod"] * 60 * 1000}
|
||||
else:
|
||||
if a["options"]["change"] == schemas.AlertDetectionChangeType.change:
|
||||
if a["change"] == schemas.AlertDetectionType.change:
|
||||
if a["seriesId"] is not None:
|
||||
sub2 = subQ.replace("%(startDate)s", "%(timestamp_sub2)s").replace("%(endDate)s", "%(startDate)s")
|
||||
sub1 = f"SELECT (({subQ})-({sub2})) AS value"
|
||||
|
|
@ -153,10 +157,11 @@ def Build(a):
|
|||
"startDate": TimeUTC.now() - a["options"]["currentPeriod"] * 60 * 1000,
|
||||
"timestamp_sub2": TimeUTC.now() - 2 * a["options"]["currentPeriod"] * 60 * 1000}
|
||||
else:
|
||||
sub1 = f"""{subQ} AND timestamp>=%(startDate)s"""
|
||||
sub1 = f"""{subQ} AND datetime>=toDateTime(%(startDate)s/1000)
|
||||
AND datetime<=toDateTime(%(now)s/1000)"""
|
||||
params["startDate"] = TimeUTC.now() - a["options"]["currentPeriod"] * 60 * 1000
|
||||
sub2 = f"""{subQ} AND timestamp<%(startDate)s
|
||||
AND timestamp>=%(timestamp_sub2)s"""
|
||||
sub2 = f"""{subQ} AND datetime<toDateTime(%(startDate)s/1000)
|
||||
AND datetime>=toDateTime(%(timestamp_sub2)s/1000)"""
|
||||
params["timestamp_sub2"] = TimeUTC.now() - 2 * a["options"]["currentPeriod"] * 60 * 1000
|
||||
sub1 = f"SELECT (( {sub1} )-( {sub2} )) AS value"
|
||||
q += f" FROM ( {sub1} ) AS stat"
|
||||
|
|
@ -172,11 +177,11 @@ def Build(a):
|
|||
- (a["options"]["currentPeriod"] + a["options"]["currentPeriod"]) \
|
||||
* 60 * 1000}
|
||||
else:
|
||||
sub1 = f"""{subQ} AND timestamp>=%(startDate)s
|
||||
{"AND sessions.start_ts >= %(startDate)s" if j_s else ""}"""
|
||||
sub1 = f"""{subQ} AND datetime>=toDateTime(%(startDate)s/1000)
|
||||
AND datetime<=toDateTime(%(now)s/1000)"""
|
||||
params["startDate"] = TimeUTC.now() - a["options"]["currentPeriod"] * 60 * 1000
|
||||
sub2 = f"""{subQ} AND timestamp<%(startDate)s
|
||||
AND timestamp>=%(timestamp_sub2)s"""
|
||||
sub2 = f"""{subQ} AND datetime<toDateTime(%(startDate)s/1000)
|
||||
AND datetime>=toDateTime(%(timestamp_sub2)s/1000)"""
|
||||
params["timestamp_sub2"] = TimeUTC.now() \
|
||||
- (a["options"]["currentPeriod"] + a["options"]["currentPeriod"]) * 60 * 1000
|
||||
sub1 = f"SELECT (({sub1})/NULLIF(({sub2}),0)-1)*100 AS value"
|
||||
|
|
@ -188,55 +193,24 @@ def Build(a):
|
|||
def process():
|
||||
notifications = []
|
||||
all_alerts = alerts_listener.get_all_alerts()
|
||||
with pg_client.PostgresClient() as cur, ch_client.ClickHouseClient() as curc:
|
||||
with pg_client.PostgresClient() as cur, ch_client.ClickHouseClient() as ch_cur:
|
||||
for alert in all_alerts:
|
||||
if alert["query"]["left"] == "CUSTOM":
|
||||
if alert["query"]["left"] != "CUSTOM":
|
||||
continue
|
||||
if alert["query"]["left"] == schemas.AlertColumn.performance__dom_content_loaded__average:
|
||||
alert["query"]["left"] = schemas.AlertColumn.errors__backend__count
|
||||
if True or alerts_processor.can_check(alert):
|
||||
logging.info(f"Querying alertId:{alert['alertId']} name: {alert['name']}")
|
||||
query, params = Build(alert)
|
||||
query = curc.format(query, params)
|
||||
query = ch_cur.format(query, params)
|
||||
logging.debug(alert)
|
||||
logging.debug(query)
|
||||
try:
|
||||
print("------------------Alerts")
|
||||
print(params)
|
||||
print(alert)
|
||||
print(query)
|
||||
print("------------------")
|
||||
# continue
|
||||
result = curc.execute(query)
|
||||
result = ch_cur.execute(query)
|
||||
if len(result) > 0:
|
||||
result = result[0]
|
||||
continue
|
||||
|
||||
if result["valid"]:
|
||||
logging.info("Valid alert, notifying users")
|
||||
notifications.append({
|
||||
"alertId": alert["alertId"],
|
||||
"tenantId": alert["tenantId"],
|
||||
"title": alert["name"],
|
||||
"description": f"has been triggered, {alert['query']['left']} = {round(result['value'], 2)} ({alert['query']['operator']} {alert['query']['right']}).",
|
||||
"buttonText": "Check metrics for more details",
|
||||
"buttonUrl": f"/{alert['projectId']}/metrics",
|
||||
"imageUrl": None,
|
||||
"options": {"source": "ALERT", "sourceId": alert["alertId"],
|
||||
"sourceMeta": alert["detectionMethod"],
|
||||
"message": alert["options"]["message"], "projectId": alert["projectId"],
|
||||
"data": {"title": alert["name"],
|
||||
"limitValue": alert["query"]["right"],
|
||||
"actualValue": float(result["value"]) \
|
||||
if isinstance(result["value"], decimal.Decimal) \
|
||||
else result["value"],
|
||||
"operator": alert["query"]["operator"],
|
||||
"trigger": alert["query"]["left"],
|
||||
"alertId": alert["alertId"],
|
||||
"detectionMethod": alert["detectionMethod"],
|
||||
"currentPeriod": alert["options"]["currentPeriod"],
|
||||
"previousPeriod": alert["options"]["previousPeriod"],
|
||||
"createdAt": TimeUTC.now()}},
|
||||
})
|
||||
notifications.append(alerts_processor.generate_notification(alert, result))
|
||||
except Exception as e:
|
||||
logging.error(f"!!!Error while running alert query for alertId:{alert['alertId']}")
|
||||
logging.error(str(e))
|
||||
|
|
|
|||
|
|
@ -361,9 +361,9 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_
|
|||
print("--------------------")
|
||||
main_query = cur.format(f"""SELECT DISTINCT er.error_id,
|
||||
COALESCE((SELECT TRUE
|
||||
FROM final.user_viewed_errors AS ve
|
||||
WHERE er.error_id = ve.error_id
|
||||
AND ve.user_id = %(userId)s LIMIT 1), FALSE) AS viewed
|
||||
FROM {exp_ch_helper.get_user_viewed_errors_table()} AS ve
|
||||
WHERE er.error_id = ve.error_id
|
||||
AND ve.user_id = %(userId)s LIMIT 1), FALSE) AS viewed
|
||||
{query_part};""", full_args)
|
||||
|
||||
elif count_only:
|
||||
|
|
@ -1252,9 +1252,9 @@ def search_query_parts_ch(data, error_status, errors_only, favorite_only, issue,
|
|||
"isNotNull(s.duration)"
|
||||
]
|
||||
if favorite_only:
|
||||
extra_constraints.append("""s.session_id IN (SELECT session_id
|
||||
FROM final.user_favorite_sessions
|
||||
WHERE user_id = %(userId)s)""")
|
||||
extra_constraints.append(f"""s.session_id IN (SELECT session_id
|
||||
FROM {exp_ch_helper.get_user_favorite_sessions_table()} AS user_favorite_sessions
|
||||
WHERE user_id = %(userId)s)""")
|
||||
extra_from = ""
|
||||
events_query_part = ""
|
||||
__events_where_basic = ["project_id = %(projectId)s",
|
||||
|
|
@ -1526,7 +1526,7 @@ def search_query_parts_ch(data, error_status, errors_only, favorite_only, issue,
|
|||
"main.datetime >= toDateTime(%(startDate)s/1000)",
|
||||
"main.datetime <= toDateTime(%(endDate)s/1000)"]
|
||||
if favorite_only and not errors_only:
|
||||
event_from += "INNER JOIN final.user_favorite_sessions AS fs USING(session_id)"
|
||||
event_from += f"INNER JOIN {exp_ch_helper.get_user_favorite_sessions_table()} AS fs USING(session_id)"
|
||||
event_where.append("fs.user_id = %(userId)s")
|
||||
# else:
|
||||
# event_from = "%s"
|
||||
|
|
@ -1940,9 +1940,9 @@ def search_query_parts_ch(data, error_status, errors_only, favorite_only, issue,
|
|||
else:
|
||||
events_extra_join = f"LEFT JOIN ({events_extra_join}) AS main1 USING(error_id)"
|
||||
if favorite_only and user_id is not None:
|
||||
events_conditions_where.append("""main.session_id IN (SELECT session_id
|
||||
FROM final.user_favorite_sessions
|
||||
WHERE user_id = %(userId)s)""")
|
||||
events_conditions_where.append(f"""main.session_id IN (SELECT session_id
|
||||
FROM {exp_ch_helper.get_user_favorite_sessions_table()} AS user_favorite_sessions
|
||||
WHERE user_id = %(userId)s)""")
|
||||
|
||||
if data.events_order in [schemas.SearchEventOrder._then, schemas.SearchEventOrder._and]:
|
||||
sequence_pattern = [f'(?{i + 1}){c.get("time", "")}' for i, c in enumerate(events_conditions)]
|
||||
|
|
@ -2067,8 +2067,8 @@ def search_query_parts_ch(data, error_status, errors_only, favorite_only, issue,
|
|||
extra_from += """INNER JOIN (SELECT 1 AS session_id) AS favorite_sessions
|
||||
ON (TRUE)"""
|
||||
elif not favorite_only and not errors_only and user_id is not None:
|
||||
extra_from += """LEFT JOIN (SELECT session_id
|
||||
FROM final.user_favorite_sessions
|
||||
extra_from += f"""LEFT JOIN (SELECT session_id
|
||||
FROM {exp_ch_helper.get_user_favorite_sessions_table()} AS user_favorite_sessions
|
||||
WHERE user_id = %(userId)s) AS favorite_sessions
|
||||
ON (s.session_id=favorite_sessions.session_id)"""
|
||||
extra_join = ""
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ def get_main_sessions_table(timestamp):
|
|||
|
||||
|
||||
def get_main_resources_table(timestamp):
|
||||
return "experimental.resources_l7s_mv" \
|
||||
return "experimental.resources_l7d_mv" \
|
||||
if config("EXP_7D_MV", cast=bool, default=True) \
|
||||
and timestamp >= TimeUTC.now(delta_days=-7) else "experimental.resources"
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue