diff --git a/api/chalicelib/core/alerts_processor.py b/api/chalicelib/core/alerts_processor.py index be7ef4e00..dbc4aaf41 100644 --- a/api/chalicelib/core/alerts_processor.py +++ b/api/chalicelib/core/alerts_processor.py @@ -1,12 +1,16 @@ import decimal import logging +from decouple import config + import schemas from chalicelib.core import alerts_listener from chalicelib.core import sessions, alerts from chalicelib.utils import pg_client from chalicelib.utils.TimeUTC import TimeUTC +logging.basicConfig(level=config("LOGLEVEL", default=logging.INFO)) + LeftToDb = { schemas.AlertColumn.performance__dom_content_loaded__average: { "table": "events.pages INNER JOIN public.sessions USING(session_id)", @@ -191,30 +195,7 @@ def process(): result = cur.fetchone() if result["valid"]: logging.info("Valid alert, notifying users") - notifications.append({ - "alertId": alert["alertId"], - "tenantId": alert["tenantId"], - "title": alert["name"], - "description": f"has been triggered, {alert['query']['left']} = {round(result['value'], 2)} ({alert['query']['operator']} {alert['query']['right']}).", - "buttonText": "Check metrics for more details", - "buttonUrl": f"/{alert['projectId']}/metrics", - "imageUrl": None, - "options": {"source": "ALERT", "sourceId": alert["alertId"], - "sourceMeta": alert["detectionMethod"], - "message": alert["options"]["message"], "projectId": alert["projectId"], - "data": {"title": alert["name"], - "limitValue": alert["query"]["right"], - "actualValue": float(result["value"]) \ - if isinstance(result["value"], decimal.Decimal) \ - else result["value"], - "operator": alert["query"]["operator"], - "trigger": alert["query"]["left"], - "alertId": alert["alertId"], - "detectionMethod": alert["detectionMethod"], - "currentPeriod": alert["options"]["currentPeriod"], - "previousPeriod": alert["options"]["previousPeriod"], - "createdAt": TimeUTC.now()}}, - }) + notifications.append(generate_notification(alert, result)) except Exception as e: logging.error(f"!!!Error while running alert query for alertId:{alert['alertId']}") logging.error(str(e)) @@ -226,3 +207,30 @@ def process(): WHERE alert_id IN %(ids)s;""", {"ids": tuple([n["alertId"] for n in notifications])})) if len(notifications) > 0: alerts.process_notifications(notifications) + + +def generate_notification(alert, result): + return { + "alertId": alert["alertId"], + "tenantId": alert["tenantId"], + "title": alert["name"], + "description": f"has been triggered, {alert['query']['left']} = {round(result['value'], 2)} ({alert['query']['operator']} {alert['query']['right']}).", + "buttonText": "Check metrics for more details", + "buttonUrl": f"/{alert['projectId']}/metrics", + "imageUrl": None, + "options": {"source": "ALERT", "sourceId": alert["alertId"], + "sourceMeta": alert["detectionMethod"], + "message": alert["options"]["message"], "projectId": alert["projectId"], + "data": {"title": alert["name"], + "limitValue": alert["query"]["right"], + "actualValue": float(result["value"]) \ + if isinstance(result["value"], decimal.Decimal) \ + else result["value"], + "operator": alert["query"]["operator"], + "trigger": alert["query"]["left"], + "alertId": alert["alertId"], + "detectionMethod": alert["detectionMethod"], + "currentPeriod": alert["options"]["currentPeriod"], + "previousPeriod": alert["options"]["previousPeriod"], + "createdAt": TimeUTC.now()}}, + } diff --git a/ee/api/chalicelib/core/alerts_listener.py b/ee/api/chalicelib/core/alerts_listener.py index 40241f51e..6a97daf93 100644 --- a/ee/api/chalicelib/core/alerts_listener.py +++ b/ee/api/chalicelib/core/alerts_listener.py @@ -12,7 +12,8 @@ def get_all_alerts(): (EXTRACT(EPOCH FROM alerts.created_at) * 1000)::BIGINT AS created_at, alerts.name, alerts.series_id, - filter + filter, + change FROM public.alerts LEFT JOIN metric_series USING (series_id) INNER JOIN projects USING (project_id) diff --git a/ee/api/chalicelib/core/alerts_processor_exp.py b/ee/api/chalicelib/core/alerts_processor_exp.py index b4b1fb406..66448b1d9 100644 --- a/ee/api/chalicelib/core/alerts_processor_exp.py +++ b/ee/api/chalicelib/core/alerts_processor_exp.py @@ -1,12 +1,15 @@ -import decimal import logging +from decouple import config + import schemas from chalicelib.core import alerts_listener, alerts_processor from chalicelib.core import sessions, alerts from chalicelib.utils import pg_client, ch_client, exp_ch_helper from chalicelib.utils.TimeUTC import TimeUTC +logging.basicConfig(level=config("LOGLEVEL", default=logging.INFO)) + LeftToDb = { schemas.AlertColumn.performance__dom_content_loaded__average: { "table": lambda timestamp: f"{exp_ch_helper.get_main_events_table(timestamp)} AS pages", @@ -120,7 +123,7 @@ def Build(a): a["filter"]["order"] = schemas.SortOrderType.desc a["filter"]["startDate"] = -1 a["filter"]["endDate"] = TimeUTC.now() - full_args, query_part = sessions.search_query_parts( + full_args, query_part = sessions.search_query_parts_ch( data=schemas.SessionsSearchPayloadSchema.parse_obj(a["filter"]), error_status=None, errors_only=False, issue=None, project_id=a["projectId"], user_id=None, favorite_only=False) subQ = f"""SELECT COUNT(session_id) AS value @@ -130,7 +133,8 @@ def Build(a): params["event_type"] = LeftToDb[a["query"]["left"]].get("eventType") subQ = f"""SELECT {colDef["formula"]} AS value FROM {colDef["table"](now)} - WHERE project_id = %(project_id)s {"AND event_type=%(event_type)s" if params["event_type"] else ""} + WHERE project_id = %(project_id)s + {"AND event_type=%(event_type)s" if params["event_type"] else ""} {"AND " + colDef["condition"] if colDef.get("condition") is not None else ""}""" q = f"""SELECT coalesce(value,0) AS value, coalesce(value,0) {a["query"]["operator"]} {a["query"]["right"]} AS valid""" @@ -144,7 +148,7 @@ def Build(a): AND datetime<=toDateTime(%(now)s/1000) ) AS stat""" params = {**params, **full_args, "startDate": TimeUTC.now() - a["options"]["currentPeriod"] * 60 * 1000} else: - if a["options"]["change"] == schemas.AlertDetectionChangeType.change: + if a["change"] == schemas.AlertDetectionType.change: if a["seriesId"] is not None: sub2 = subQ.replace("%(startDate)s", "%(timestamp_sub2)s").replace("%(endDate)s", "%(startDate)s") sub1 = f"SELECT (({subQ})-({sub2})) AS value" @@ -153,10 +157,11 @@ def Build(a): "startDate": TimeUTC.now() - a["options"]["currentPeriod"] * 60 * 1000, "timestamp_sub2": TimeUTC.now() - 2 * a["options"]["currentPeriod"] * 60 * 1000} else: - sub1 = f"""{subQ} AND timestamp>=%(startDate)s""" + sub1 = f"""{subQ} AND datetime>=toDateTime(%(startDate)s/1000) + AND datetime<=toDateTime(%(now)s/1000)""" params["startDate"] = TimeUTC.now() - a["options"]["currentPeriod"] * 60 * 1000 - sub2 = f"""{subQ} AND timestamp<%(startDate)s - AND timestamp>=%(timestamp_sub2)s""" + sub2 = f"""{subQ} AND datetime=toDateTime(%(timestamp_sub2)s/1000)""" params["timestamp_sub2"] = TimeUTC.now() - 2 * a["options"]["currentPeriod"] * 60 * 1000 sub1 = f"SELECT (( {sub1} )-( {sub2} )) AS value" q += f" FROM ( {sub1} ) AS stat" @@ -172,11 +177,11 @@ def Build(a): - (a["options"]["currentPeriod"] + a["options"]["currentPeriod"]) \ * 60 * 1000} else: - sub1 = f"""{subQ} AND timestamp>=%(startDate)s - {"AND sessions.start_ts >= %(startDate)s" if j_s else ""}""" + sub1 = f"""{subQ} AND datetime>=toDateTime(%(startDate)s/1000) + AND datetime<=toDateTime(%(now)s/1000)""" params["startDate"] = TimeUTC.now() - a["options"]["currentPeriod"] * 60 * 1000 - sub2 = f"""{subQ} AND timestamp<%(startDate)s - AND timestamp>=%(timestamp_sub2)s""" + sub2 = f"""{subQ} AND datetime=toDateTime(%(timestamp_sub2)s/1000)""" params["timestamp_sub2"] = TimeUTC.now() \ - (a["options"]["currentPeriod"] + a["options"]["currentPeriod"]) * 60 * 1000 sub1 = f"SELECT (({sub1})/NULLIF(({sub2}),0)-1)*100 AS value" @@ -188,55 +193,24 @@ def Build(a): def process(): notifications = [] all_alerts = alerts_listener.get_all_alerts() - with pg_client.PostgresClient() as cur, ch_client.ClickHouseClient() as curc: + with pg_client.PostgresClient() as cur, ch_client.ClickHouseClient() as ch_cur: for alert in all_alerts: - if alert["query"]["left"] == "CUSTOM": + if alert["query"]["left"] != "CUSTOM": continue - if alert["query"]["left"] == schemas.AlertColumn.performance__dom_content_loaded__average: - alert["query"]["left"] = schemas.AlertColumn.errors__backend__count if True or alerts_processor.can_check(alert): logging.info(f"Querying alertId:{alert['alertId']} name: {alert['name']}") query, params = Build(alert) - query = curc.format(query, params) + query = ch_cur.format(query, params) logging.debug(alert) logging.debug(query) try: - print("------------------Alerts") - print(params) - print(alert) - print(query) - print("------------------") - # continue - result = curc.execute(query) + result = ch_cur.execute(query) if len(result) > 0: result = result[0] - continue + if result["valid"]: logging.info("Valid alert, notifying users") - notifications.append({ - "alertId": alert["alertId"], - "tenantId": alert["tenantId"], - "title": alert["name"], - "description": f"has been triggered, {alert['query']['left']} = {round(result['value'], 2)} ({alert['query']['operator']} {alert['query']['right']}).", - "buttonText": "Check metrics for more details", - "buttonUrl": f"/{alert['projectId']}/metrics", - "imageUrl": None, - "options": {"source": "ALERT", "sourceId": alert["alertId"], - "sourceMeta": alert["detectionMethod"], - "message": alert["options"]["message"], "projectId": alert["projectId"], - "data": {"title": alert["name"], - "limitValue": alert["query"]["right"], - "actualValue": float(result["value"]) \ - if isinstance(result["value"], decimal.Decimal) \ - else result["value"], - "operator": alert["query"]["operator"], - "trigger": alert["query"]["left"], - "alertId": alert["alertId"], - "detectionMethod": alert["detectionMethod"], - "currentPeriod": alert["options"]["currentPeriod"], - "previousPeriod": alert["options"]["previousPeriod"], - "createdAt": TimeUTC.now()}}, - }) + notifications.append(alerts_processor.generate_notification(alert, result)) except Exception as e: logging.error(f"!!!Error while running alert query for alertId:{alert['alertId']}") logging.error(str(e)) diff --git a/ee/api/chalicelib/core/sessions_exp.py b/ee/api/chalicelib/core/sessions_exp.py index 5973a3a8e..2b78f46f2 100644 --- a/ee/api/chalicelib/core/sessions_exp.py +++ b/ee/api/chalicelib/core/sessions_exp.py @@ -361,9 +361,9 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_ print("--------------------") main_query = cur.format(f"""SELECT DISTINCT er.error_id, COALESCE((SELECT TRUE - FROM final.user_viewed_errors AS ve - WHERE er.error_id = ve.error_id - AND ve.user_id = %(userId)s LIMIT 1), FALSE) AS viewed + FROM {exp_ch_helper.get_user_viewed_errors_table()} AS ve + WHERE er.error_id = ve.error_id + AND ve.user_id = %(userId)s LIMIT 1), FALSE) AS viewed {query_part};""", full_args) elif count_only: @@ -1252,9 +1252,9 @@ def search_query_parts_ch(data, error_status, errors_only, favorite_only, issue, "isNotNull(s.duration)" ] if favorite_only: - extra_constraints.append("""s.session_id IN (SELECT session_id - FROM final.user_favorite_sessions - WHERE user_id = %(userId)s)""") + extra_constraints.append(f"""s.session_id IN (SELECT session_id + FROM {exp_ch_helper.get_user_favorite_sessions_table()} AS user_favorite_sessions + WHERE user_id = %(userId)s)""") extra_from = "" events_query_part = "" __events_where_basic = ["project_id = %(projectId)s", @@ -1526,7 +1526,7 @@ def search_query_parts_ch(data, error_status, errors_only, favorite_only, issue, "main.datetime >= toDateTime(%(startDate)s/1000)", "main.datetime <= toDateTime(%(endDate)s/1000)"] if favorite_only and not errors_only: - event_from += "INNER JOIN final.user_favorite_sessions AS fs USING(session_id)" + event_from += f"INNER JOIN {exp_ch_helper.get_user_favorite_sessions_table()} AS fs USING(session_id)" event_where.append("fs.user_id = %(userId)s") # else: # event_from = "%s" @@ -1940,9 +1940,9 @@ def search_query_parts_ch(data, error_status, errors_only, favorite_only, issue, else: events_extra_join = f"LEFT JOIN ({events_extra_join}) AS main1 USING(error_id)" if favorite_only and user_id is not None: - events_conditions_where.append("""main.session_id IN (SELECT session_id - FROM final.user_favorite_sessions - WHERE user_id = %(userId)s)""") + events_conditions_where.append(f"""main.session_id IN (SELECT session_id + FROM {exp_ch_helper.get_user_favorite_sessions_table()} AS user_favorite_sessions + WHERE user_id = %(userId)s)""") if data.events_order in [schemas.SearchEventOrder._then, schemas.SearchEventOrder._and]: sequence_pattern = [f'(?{i + 1}){c.get("time", "")}' for i, c in enumerate(events_conditions)] @@ -2067,8 +2067,8 @@ def search_query_parts_ch(data, error_status, errors_only, favorite_only, issue, extra_from += """INNER JOIN (SELECT 1 AS session_id) AS favorite_sessions ON (TRUE)""" elif not favorite_only and not errors_only and user_id is not None: - extra_from += """LEFT JOIN (SELECT session_id - FROM final.user_favorite_sessions + extra_from += f"""LEFT JOIN (SELECT session_id + FROM {exp_ch_helper.get_user_favorite_sessions_table()} AS user_favorite_sessions WHERE user_id = %(userId)s) AS favorite_sessions ON (s.session_id=favorite_sessions.session_id)""" extra_join = "" diff --git a/ee/api/chalicelib/utils/exp_ch_helper.py b/ee/api/chalicelib/utils/exp_ch_helper.py index fb6781184..709b5e926 100644 --- a/ee/api/chalicelib/utils/exp_ch_helper.py +++ b/ee/api/chalicelib/utils/exp_ch_helper.py @@ -21,7 +21,7 @@ def get_main_sessions_table(timestamp): def get_main_resources_table(timestamp): - return "experimental.resources_l7s_mv" \ + return "experimental.resources_l7d_mv" \ if config("EXP_7D_MV", cast=bool, default=True) \ and timestamp >= TimeUTC.now(delta_days=-7) else "experimental.resources"