import decimal import logging from pydantic_core._pydantic_core import ValidationError import schemas from chalicelib.core import alerts from chalicelib.core import alerts_listener from chalicelib.core import sessions from chalicelib.utils import pg_client from chalicelib.utils.TimeUTC import TimeUTC logger = logging.getLogger(__name__) LeftToDb = { schemas.AlertColumn.PERFORMANCE__DOM_CONTENT_LOADED__AVERAGE: { "table": "events.pages INNER JOIN public.sessions USING(session_id)", "formula": "COALESCE(AVG(NULLIF(dom_content_loaded_time ,0)),0)"}, schemas.AlertColumn.PERFORMANCE__FIRST_MEANINGFUL_PAINT__AVERAGE: { "table": "events.pages INNER JOIN public.sessions USING(session_id)", "formula": "COALESCE(AVG(NULLIF(first_contentful_paint_time,0)),0)"}, schemas.AlertColumn.PERFORMANCE__PAGE_LOAD_TIME__AVERAGE: { "table": "events.pages INNER JOIN public.sessions USING(session_id)", "formula": "AVG(NULLIF(load_time ,0))"}, schemas.AlertColumn.PERFORMANCE__DOM_BUILD_TIME__AVERAGE: { "table": "events.pages INNER JOIN public.sessions USING(session_id)", "formula": "AVG(NULLIF(dom_building_time,0))"}, schemas.AlertColumn.PERFORMANCE__SPEED_INDEX__AVERAGE: { "table": "events.pages INNER JOIN public.sessions USING(session_id)", "formula": "AVG(NULLIF(speed_index,0))"}, schemas.AlertColumn.PERFORMANCE__PAGE_RESPONSE_TIME__AVERAGE: { "table": "events.pages INNER JOIN public.sessions USING(session_id)", "formula": "AVG(NULLIF(response_time,0))"}, schemas.AlertColumn.PERFORMANCE__TTFB__AVERAGE: { "table": "events.pages INNER JOIN public.sessions USING(session_id)", "formula": "AVG(NULLIF(first_paint_time,0))"}, schemas.AlertColumn.PERFORMANCE__TIME_TO_RENDER__AVERAGE: { "table": "events.pages INNER JOIN public.sessions USING(session_id)", "formula": "AVG(NULLIF(visually_complete,0))"}, schemas.AlertColumn.PERFORMANCE__IMAGE_LOAD_TIME__AVERAGE: { "table": "events.resources INNER JOIN public.sessions USING(session_id)", "formula": "AVG(NULLIF(resources.duration,0))", "condition": "type='img'"}, schemas.AlertColumn.PERFORMANCE__REQUEST_LOAD_TIME__AVERAGE: { "table": "events.resources INNER JOIN public.sessions USING(session_id)", "formula": "AVG(NULLIF(resources.duration,0))", "condition": "type='fetch'"}, schemas.AlertColumn.RESOURCES__LOAD_TIME__AVERAGE: { "table": "events.resources INNER JOIN public.sessions USING(session_id)", "formula": "AVG(NULLIF(resources.duration,0))"}, schemas.AlertColumn.RESOURCES__MISSING__COUNT: { "table": "events.resources INNER JOIN public.sessions USING(session_id)", "formula": "COUNT(DISTINCT url_hostpath)", "condition": "success= FALSE AND type='img'"}, schemas.AlertColumn.ERRORS__4XX_5XX__COUNT: { "table": "events.resources INNER JOIN public.sessions USING(session_id)", "formula": "COUNT(session_id)", "condition": "status/100!=2"}, schemas.AlertColumn.ERRORS__4XX__COUNT: { "table": "events.resources INNER JOIN public.sessions USING(session_id)", "formula": "COUNT(session_id)", "condition": "status/100=4"}, schemas.AlertColumn.ERRORS__5XX__COUNT: { "table": "events.resources INNER JOIN public.sessions USING(session_id)", "formula": "COUNT(session_id)", "condition": "status/100=5"}, schemas.AlertColumn.ERRORS__JAVASCRIPT__IMPACTED_SESSIONS__COUNT: { "table": "events.resources INNER JOIN public.sessions USING(session_id)", "formula": "COUNT(DISTINCT session_id)", "condition": "success= FALSE AND type='script'"}, schemas.AlertColumn.PERFORMANCE__CRASHES__COUNT: { "table": "public.sessions", "formula": "COUNT(DISTINCT session_id)", "condition": "errors_count > 0 AND duration>0"}, schemas.AlertColumn.ERRORS__JAVASCRIPT__COUNT: { "table": "events.errors INNER JOIN public.errors AS m_errors USING (error_id)", "formula": "COUNT(DISTINCT session_id)", "condition": "source='js_exception'", "joinSessions": False}, schemas.AlertColumn.ERRORS__BACKEND__COUNT: { "table": "events.errors INNER JOIN public.errors AS m_errors USING (error_id)", "formula": "COUNT(DISTINCT session_id)", "condition": "source!='js_exception'", "joinSessions": False}, } # This is the frequency of execution for each threshold TimeInterval = { 15: 3, 30: 5, 60: 10, 120: 20, 240: 30, 1440: 60, } def can_check(a) -> bool: now = TimeUTC.now() repetitionBase = a["options"]["currentPeriod"] \ if a["detectionMethod"] == schemas.AlertDetectionMethod.CHANGE \ and a["options"]["currentPeriod"] > a["options"]["previousPeriod"] \ else a["options"]["previousPeriod"] if TimeInterval.get(repetitionBase) is None: logger.error(f"repetitionBase: {repetitionBase} NOT FOUND") return False return (a["options"]["renotifyInterval"] <= 0 or a["options"].get("lastNotification") is None or a["options"]["lastNotification"] <= 0 or ((now - a["options"]["lastNotification"]) > a["options"]["renotifyInterval"] * 60 * 1000)) \ and ((now - a["createdAt"]) % (TimeInterval[repetitionBase] * 60 * 1000)) < 60 * 1000 def Build(a): now = TimeUTC.now() params = {"project_id": a["projectId"], "now": now} full_args = {} j_s = True main_table = "" if a["seriesId"] is not None: a["filter"]["sort"] = "session_id" a["filter"]["order"] = schemas.SortOrderType.DESC a["filter"]["startDate"] = 0 a["filter"]["endDate"] = TimeUTC.now() try: data = schemas.SessionsSearchPayloadSchema.model_validate(a["filter"]) except ValidationError: logger.warning("Validation error for:") logger.warning(a["filter"]) raise full_args, query_part = sessions.search_query_parts(data=data, error_status=None, errors_only=False, issue=None, project_id=a["projectId"], user_id=None, favorite_only=False) subQ = f"""SELECT COUNT(session_id) AS value {query_part}""" else: colDef = LeftToDb[a["query"]["left"]] subQ = f"""SELECT {colDef["formula"]} AS value FROM {colDef["table"]} WHERE project_id = %(project_id)s {"AND " + colDef["condition"] if colDef.get("condition") else ""}""" j_s = colDef.get("joinSessions", True) main_table = colDef["table"] is_ss = main_table == "public.sessions" q = f"""SELECT coalesce(value,0) AS value, coalesce(value,0) {a["query"]["operator"]} {a["query"]["right"]} AS valid""" if a["detectionMethod"] == schemas.AlertDetectionMethod.THRESHOLD: if a["seriesId"] is not None: q += f""" FROM ({subQ}) AS stat""" else: q += f""" FROM ({subQ} {"AND timestamp >= %(startDate)s AND timestamp <= %(now)s" if not is_ss else ""} {"AND start_ts >= %(startDate)s AND start_ts <= %(now)s" if j_s else ""}) AS stat""" params = {**params, **full_args, "startDate": TimeUTC.now() - a["options"]["currentPeriod"] * 60 * 1000} else: if a["change"] == schemas.AlertDetectionType.CHANGE: if a["seriesId"] is not None: sub2 = subQ.replace("%(startDate)s", "%(timestamp_sub2)s").replace("%(endDate)s", "%(startDate)s") sub1 = f"SELECT (({subQ})-({sub2})) AS value" q += f" FROM ( {sub1} ) AS stat" params = {**params, **full_args, "startDate": TimeUTC.now() - a["options"]["currentPeriod"] * 60 * 1000, "timestamp_sub2": TimeUTC.now() - 2 * a["options"]["currentPeriod"] * 60 * 1000} else: sub1 = f"""{subQ} {"AND timestamp >= %(startDate)s AND timestamp <= %(now)s" if not is_ss else ""} {"AND start_ts >= %(startDate)s AND start_ts <= %(now)s" if j_s else ""}""" params["startDate"] = TimeUTC.now() - a["options"]["currentPeriod"] * 60 * 1000 sub2 = f"""{subQ} {"AND timestamp < %(startDate)s AND timestamp >= %(timestamp_sub2)s" if not is_ss else ""} {"AND start_ts < %(startDate)s AND start_ts >= %(timestamp_sub2)s" if j_s else ""}""" params["timestamp_sub2"] = TimeUTC.now() - 2 * a["options"]["currentPeriod"] * 60 * 1000 sub1 = f"SELECT (( {sub1} )-( {sub2} )) AS value" q += f" FROM ( {sub1} ) AS stat" else: if a["seriesId"] is not None: sub2 = subQ.replace("%(startDate)s", "%(timestamp_sub2)s").replace("%(endDate)s", "%(startDate)s") sub1 = f"SELECT (({subQ})/NULLIF(({sub2}),0)-1)*100 AS value" q += f" FROM ({sub1}) AS stat" params = {**params, **full_args, "startDate": TimeUTC.now() - a["options"]["currentPeriod"] * 60 * 1000, "timestamp_sub2": TimeUTC.now() \ - (a["options"]["currentPeriod"] + a["options"]["currentPeriod"]) \ * 60 * 1000} else: sub1 = f"""{subQ} {"AND timestamp >= %(startDate)s AND timestamp <= %(now)s" if not is_ss else ""} {"AND start_ts >= %(startDate)s AND start_ts <= %(now)s" if j_s else ""}""" params["startDate"] = TimeUTC.now() - a["options"]["currentPeriod"] * 60 * 1000 sub2 = f"""{subQ} {"AND timestamp < %(startDate)s AND timestamp >= %(timestamp_sub2)s" if not is_ss else ""} {"AND start_ts < %(startDate)s AND start_ts >= %(timestamp_sub2)s" if j_s else ""}""" params["timestamp_sub2"] = TimeUTC.now() \ - (a["options"]["currentPeriod"] + a["options"]["currentPeriod"]) * 60 * 1000 sub1 = f"SELECT (({sub1})/NULLIF(({sub2}),0)-1)*100 AS value" q += f" FROM ({sub1}) AS stat" return q, params def process(): notifications = [] all_alerts = alerts_listener.get_all_alerts() with pg_client.PostgresClient() as cur: for alert in all_alerts: if can_check(alert): query, params = Build(alert) try: query = cur.mogrify(query, params) except Exception as e: logger.error( f"!!!Error while building alert query for alertId:{alert['alertId']} name: {alert['name']}") logger.error(e) continue logger.debug(alert) logger.debug(query) try: cur.execute(query) result = cur.fetchone() if result["valid"]: logger.info(f"Valid alert, notifying users, alertId:{alert['alertId']} name: {alert['name']}") notifications.append(generate_notification(alert, result)) except Exception as e: logger.error( f"!!!Error while running alert query for alertId:{alert['alertId']} name: {alert['name']}") logger.error(query) logger.error(e) cur = cur.recreate(rollback=True) if len(notifications) > 0: cur.execute( cur.mogrify(f"""UPDATE public.alerts SET options = options||'{{"lastNotification":{TimeUTC.now()}}}'::jsonb WHERE alert_id IN %(ids)s;""", {"ids": tuple([n["alertId"] for n in notifications])})) if len(notifications) > 0: alerts.process_notifications(notifications) def __format_value(x): if x % 1 == 0: x = int(x) else: x = round(x, 2) return f"{x:,}" def generate_notification(alert, result): left = __format_value(result['value']) right = __format_value(alert['query']['right']) return { "alertId": alert["alertId"], "tenantId": alert["tenantId"], "title": alert["name"], "description": f"{alert['seriesName']} = {left} ({alert['query']['operator']} {right}).", "buttonText": "Check metrics for more details", "buttonUrl": f"/{alert['projectId']}/metrics", "imageUrl": None, "projectId": alert["projectId"], "projectName": alert["projectName"], "options": {"source": "ALERT", "sourceId": alert["alertId"], "sourceMeta": alert["detectionMethod"], "message": alert["options"]["message"], "projectId": alert["projectId"], "data": {"title": alert["name"], "limitValue": alert["query"]["right"], "actualValue": float(result["value"]) \ if isinstance(result["value"], decimal.Decimal) \ else result["value"], "operator": alert["query"]["operator"], "trigger": alert["query"]["left"], "alertId": alert["alertId"], "detectionMethod": alert["detectionMethod"], "currentPeriod": alert["options"]["currentPeriod"], "previousPeriod": alert["options"]["previousPeriod"], "createdAt": TimeUTC.now()}}, }