* fix(chalice): fixed Math-operators validation
refactor(chalice): search for sessions that have events for heatmaps

* refactor(chalice): search for sessions that have at least 1 location event for heatmaps

* fix(chalice): fixed Math-operators validation
refactor(chalice): search for sessions that have events for heatmaps

* refactor(chalice): search for sessions that have at least 1 location event for heatmaps

* feat(chalice): autocomplete return top 10 with stats

* fix(chalice): fixed autocomplete top 10 meta-filters

* refactor(chalice): refactord alerts

* refactor(alerts): refactord alerts
refactor(alerts): moved CH
This commit is contained in:
Kraiem Taha Yassine 2024-12-10 18:19:12 +01:00 committed by GitHub
parent dab822e772
commit e74effe24d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 62 additions and 289 deletions

View file

@ -5,7 +5,7 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler
from decouple import config
from fastapi import FastAPI
from chalicelib.core import alerts_processor
from chalicelib.core.alerts import alerts_processor
from chalicelib.utils import pg_client

View file

@ -0,0 +1,10 @@
import logging
from decouple import config
logger = logging.getLogger(__name__)
if config("EXP_ALERTS", cast=bool, default=False):
logging.info(">>> Using experimental alerts")
from . import alerts_processor_ch as alerts_processor
else:
from . import alerts_processor as alerts_processor

View file

@ -4,13 +4,14 @@ import logging
from pydantic_core._pydantic_core import ValidationError
import schemas
from chalicelib.core import alerts
from chalicelib.core import alerts_listener
from chalicelib.core import sessions
from chalicelib.core.alerts import alerts
from chalicelib.core.alerts import alerts_listener
from chalicelib.core.alerts import sessions
from chalicelib.utils import pg_client
from chalicelib.utils.TimeUTC import TimeUTC
logger = logging.getLogger(__name__)
LeftToDb = {
schemas.AlertColumn.PERFORMANCE__DOM_CONTENT_LOADED__AVERAGE: {
"table": "events.pages INNER JOIN public.sessions USING(session_id)",

View file

@ -3,9 +3,9 @@ import logging
from pydantic_core._pydantic_core import ValidationError
import schemas
from chalicelib.core import alerts
from chalicelib.core import alerts_listener, alerts_processor
from chalicelib.core import sessions_exp as sessions
from chalicelib.core.alerts import alerts
from chalicelib.core.alerts import alerts_listener, alerts_processor
from chalicelib.core.alerts import sessions
from chalicelib.utils import pg_client, ch_client, exp_ch_helper
from chalicelib.utils.TimeUTC import TimeUTC

View file

@ -0,0 +1,6 @@
from decouple import config
if config("EXP_ALERTS", cast=bool, default=False):
from chalicelib.core.sessions_ch import *
else:
from chalicelib.core.sessions import *

View file

@ -3,11 +3,12 @@ import logging
from typing import List, Union
import schemas
from chalicelib.core import events, metadata, projects, performance_event, metrics, sessions_favorite, sessions_legacy
from chalicelib.core import events, metadata, projects, performance_event, metrics, sessions_favorite, sessions
from chalicelib.utils import pg_client, helper, metrics_helper, ch_client, exp_ch_helper
from chalicelib.utils import sql_helper as sh
logger = logging.getLogger(__name__)
SESSION_PROJECTION_COLS_CH = """\
s.project_id,
s.session_id AS session_id,
@ -1690,24 +1691,4 @@ def check_recording_status(project_id: int) -> dict:
# TODO: rewrite this function to use ClickHouse
def search_sessions_by_ids(project_id: int, session_ids: list, sort_by: str = 'session_id',
ascending: bool = False) -> dict:
if session_ids is None or len(session_ids) == 0:
return {"total": 0, "sessions": []}
with pg_client.PostgresClient() as cur:
meta_keys = metadata.get(project_id=project_id)
params = {"project_id": project_id, "session_ids": tuple(session_ids)}
order_direction = 'ASC' if ascending else 'DESC'
main_query = cur.mogrify(f"""SELECT {sessions_legacy.SESSION_PROJECTION_BASE_COLS}
{"," if len(meta_keys) > 0 else ""}{",".join([f'metadata_{m["index"]}' for m in meta_keys])}
FROM public.sessions AS s
WHERE project_id=%(project_id)s
AND session_id IN %(session_ids)s
ORDER BY {sort_by} {order_direction};""", params)
cur.execute(main_query)
rows = cur.fetchall()
if len(meta_keys) > 0:
for s in rows:
s["metadata"] = {}
for m in meta_keys:
s["metadata"][m["key"]] = s.pop(f'metadata_{m["index"]}')
return {"total": len(rows), "sessions": helper.list_to_camel_case(rows)}
return sessions.search_sessions_by_ids(project_id, session_ids, sort_by, ascending)

View file

@ -71,4 +71,5 @@ sourcemaps_reader=http://sourcemapreader-openreplay.app.svc.cluster.local:9000/s
STAGE=default-foss
TZ=UTC
EXP_CH_DRIVER=true
EXP_AUTOCOMPLETE=true
EXP_AUTOCOMPLETE=true
EXP_ALERTS=true

6
ee/api/.gitignore vendored
View file

@ -184,7 +184,6 @@ Pipfile.lock
/build.sh
/build_alerts.sh
/build_crons.sh
/chalicelib/core/alerts.py
/chalicelib/core/announcements.py
/chalicelib/core/assist.py
/chalicelib/core/authorizers.py
@ -286,3 +285,8 @@ Pipfile.lock
/chalicelib/utils/ch_client.py
/chalicelib/utils/ch_client_exp.py
/routers/subs/product_anaytics.py
/chalicelib/core/alerts/__init__.py
/chalicelib/core/alerts/alerts.py
/chalicelib/core/alerts/alerts_processor.py
/chalicelib/core/alerts/alerts_processor_ch.py
/chalicelib/core/sessions_ch.py

View file

@ -11,7 +11,7 @@ from . import metrics as metrics_legacy
if config("EXP_SESSIONS_SEARCH", cast=bool, default=False):
logging.info(">>> Using experimental sessions search")
from . import sessions_exp as sessions
from . import sessions_ch as sessions
else:
from . import sessions as sessions
@ -34,12 +34,6 @@ else:
if config("EXP_SESSIONS_SEARCH_METRIC", cast=bool, default=False):
logging.info(">>> Using experimental sessions search for metrics")
if config("EXP_ALERTS", cast=bool, default=False):
logging.info(">>> Using experimental alerts")
from . import alerts_processor_exp as alerts_processor
else:
from . import alerts_processor as alerts_processor
if config("EXP_FUNNELS", cast=bool, default=False):
logging.info(">>> Using experimental funnels")
if not config("EXP_SESSIONS_SEARCH", cast=bool, default=False):

View file

@ -0,0 +1,12 @@
from decouple import config
if config("EXP_ALERTS", cast=bool, default=False):
if config("EXP_SESSIONS_SEARCH", cast=bool, default=False):
from chalicelib.core.sessions import *
else:
from chalicelib.core.sessions_legacy import *
else:
if config("EXP_SESSIONS_SEARCH", cast=bool, default=False):
from chalicelib.core.sessions_legacy import *
else:
from chalicelib.core.sessions import *

View file

@ -1,242 +0,0 @@
import decimal
import logging
from decouple import config
from pydantic_core._pydantic_core import ValidationError
import schemas
from chalicelib.core import alerts
from chalicelib.core import alerts_listener
from chalicelib.utils import pg_client
from chalicelib.utils.TimeUTC import TimeUTC
if config("EXP_SESSIONS_SEARCH", cast=bool, default=False):
from chalicelib.core import sessions_legacy as sessions
else:
from chalicelib.core import sessions
logging.basicConfig(level=config("LOGLEVEL", default=logging.INFO))
LeftToDb = {
schemas.AlertColumn.PERFORMANCE__DOM_CONTENT_LOADED__AVERAGE: {
"table": "events.pages INNER JOIN public.sessions USING(session_id)",
"formula": "COALESCE(AVG(NULLIF(dom_content_loaded_time ,0)),0)"},
schemas.AlertColumn.PERFORMANCE__FIRST_MEANINGFUL_PAINT__AVERAGE: {
"table": "events.pages INNER JOIN public.sessions USING(session_id)",
"formula": "COALESCE(AVG(NULLIF(first_contentful_paint_time,0)),0)"},
schemas.AlertColumn.PERFORMANCE__PAGE_LOAD_TIME__AVERAGE: {
"table": "events.pages INNER JOIN public.sessions USING(session_id)", "formula": "AVG(NULLIF(load_time ,0))"},
schemas.AlertColumn.PERFORMANCE__DOM_BUILD_TIME__AVERAGE: {
"table": "events.pages INNER JOIN public.sessions USING(session_id)",
"formula": "AVG(NULLIF(dom_building_time,0))"},
schemas.AlertColumn.PERFORMANCE__SPEED_INDEX__AVERAGE: {
"table": "events.pages INNER JOIN public.sessions USING(session_id)", "formula": "AVG(NULLIF(speed_index,0))"},
schemas.AlertColumn.PERFORMANCE__PAGE_RESPONSE_TIME__AVERAGE: {
"table": "events.pages INNER JOIN public.sessions USING(session_id)",
"formula": "AVG(NULLIF(response_time,0))"},
schemas.AlertColumn.PERFORMANCE__TTFB__AVERAGE: {
"table": "events.pages INNER JOIN public.sessions USING(session_id)",
"formula": "AVG(NULLIF(first_paint_time,0))"},
schemas.AlertColumn.PERFORMANCE__TIME_TO_RENDER__AVERAGE: {
"table": "events.pages INNER JOIN public.sessions USING(session_id)",
"formula": "AVG(NULLIF(visually_complete,0))"},
schemas.AlertColumn.PERFORMANCE__CRASHES__COUNT: {
"table": "public.sessions",
"formula": "COUNT(DISTINCT session_id)",
"condition": "errors_count > 0 AND duration>0"},
schemas.AlertColumn.ERRORS__JAVASCRIPT__COUNT: {
"table": "events.errors INNER JOIN public.errors AS m_errors USING (error_id)",
"formula": "COUNT(DISTINCT session_id)", "condition": "source='js_exception'", "joinSessions": False},
schemas.AlertColumn.ERRORS__BACKEND__COUNT: {
"table": "events.errors INNER JOIN public.errors AS m_errors USING (error_id)",
"formula": "COUNT(DISTINCT session_id)", "condition": "source!='js_exception'", "joinSessions": False},
}
# This is the frequency of execution for each threshold
TimeInterval = {
15: 3,
30: 5,
60: 10,
120: 20,
240: 30,
1440: 60,
}
def can_check(a) -> bool:
now = TimeUTC.now()
repetitionBase = a["options"]["currentPeriod"] \
if a["detectionMethod"] == schemas.AlertDetectionMethod.CHANGE \
and a["options"]["currentPeriod"] > a["options"]["previousPeriod"] \
else a["options"]["previousPeriod"]
if TimeInterval.get(repetitionBase) is None:
logging.error(f"repetitionBase: {repetitionBase} NOT FOUND")
return False
return (a["options"]["renotifyInterval"] <= 0 or
a["options"].get("lastNotification") is None or
a["options"]["lastNotification"] <= 0 or
((now - a["options"]["lastNotification"]) > a["options"]["renotifyInterval"] * 60 * 1000)) \
and ((now - a["createdAt"]) % (TimeInterval[repetitionBase] * 60 * 1000)) < 60 * 1000
def Build(a):
now = TimeUTC.now()
params = {"project_id": a["projectId"], "now": now}
full_args = {}
j_s = True
main_table = ""
if a["seriesId"] is not None:
a["filter"]["sort"] = "session_id"
a["filter"]["order"] = schemas.SortOrderType.DESC
a["filter"]["startDate"] = 0
a["filter"]["endDate"] = TimeUTC.now()
try:
data = schemas.SessionsSearchPayloadSchema.model_validate(a["filter"])
except ValidationError:
logging.warning("Validation error for:")
logging.warning(a["filter"])
raise
full_args, query_part = sessions.search_query_parts(data=data, error_status=None, errors_only=False,
issue=None, project_id=a["projectId"], user_id=None,
favorite_only=False)
subQ = f"""SELECT COUNT(session_id) AS value
{query_part}"""
else:
colDef = LeftToDb[a["query"]["left"]]
subQ = f"""SELECT {colDef["formula"]} AS value
FROM {colDef["table"]}
WHERE project_id = %(project_id)s
{"AND " + colDef["condition"] if colDef.get("condition") else ""}"""
j_s = colDef.get("joinSessions", True)
main_table = colDef["table"]
is_ss = main_table == "public.sessions"
q = f"""SELECT coalesce(value,0) AS value, coalesce(value,0) {a["query"]["operator"]} {a["query"]["right"]} AS valid"""
if a["detectionMethod"] == schemas.AlertDetectionMethod.THRESHOLD:
if a["seriesId"] is not None:
q += f""" FROM ({subQ}) AS stat"""
else:
q += f""" FROM ({subQ} {"AND timestamp >= %(startDate)s AND timestamp <= %(now)s" if not is_ss else ""}
{"AND start_ts >= %(startDate)s AND start_ts <= %(now)s" if j_s else ""}) AS stat"""
params = {**params, **full_args, "startDate": TimeUTC.now() - a["options"]["currentPeriod"] * 60 * 1000}
else:
if a["change"] == schemas.AlertDetectionType.CHANGE:
if a["seriesId"] is not None:
sub2 = subQ.replace("%(startDate)s", "%(timestamp_sub2)s").replace("%(endDate)s", "%(startDate)s")
sub1 = f"SELECT (({subQ})-({sub2})) AS value"
q += f" FROM ( {sub1} ) AS stat"
params = {**params, **full_args,
"startDate": TimeUTC.now() - a["options"]["currentPeriod"] * 60 * 1000,
"timestamp_sub2": TimeUTC.now() - 2 * a["options"]["currentPeriod"] * 60 * 1000}
else:
sub1 = f"""{subQ} {"AND timestamp >= %(startDate)s AND timestamp <= %(now)s" if not is_ss else ""}
{"AND start_ts >= %(startDate)s AND start_ts <= %(now)s" if j_s else ""}"""
params["startDate"] = TimeUTC.now() - a["options"]["currentPeriod"] * 60 * 1000
sub2 = f"""{subQ} {"AND timestamp < %(startDate)s AND timestamp >= %(timestamp_sub2)s" if not is_ss else ""}
{"AND start_ts < %(startDate)s AND start_ts >= %(timestamp_sub2)s" if j_s else ""}"""
params["timestamp_sub2"] = TimeUTC.now() - 2 * a["options"]["currentPeriod"] * 60 * 1000
sub1 = f"SELECT (( {sub1} )-( {sub2} )) AS value"
q += f" FROM ( {sub1} ) AS stat"
else:
if a["seriesId"] is not None:
sub2 = subQ.replace("%(startDate)s", "%(timestamp_sub2)s").replace("%(endDate)s", "%(startDate)s")
sub1 = f"SELECT (({subQ})/NULLIF(({sub2}),0)-1)*100 AS value"
q += f" FROM ({sub1}) AS stat"
params = {**params, **full_args,
"startDate": TimeUTC.now() - a["options"]["currentPeriod"] * 60 * 1000,
"timestamp_sub2": TimeUTC.now() \
- (a["options"]["currentPeriod"] + a["options"]["currentPeriod"]) \
* 60 * 1000}
else:
sub1 = f"""{subQ} {"AND timestamp >= %(startDate)s AND timestamp <= %(now)s" if not is_ss else ""}
{"AND start_ts >= %(startDate)s AND start_ts <= %(now)s" if j_s else ""}"""
params["startDate"] = TimeUTC.now() - a["options"]["currentPeriod"] * 60 * 1000
sub2 = f"""{subQ} {"AND timestamp < %(startDate)s AND timestamp >= %(timestamp_sub2)s" if not is_ss else ""}
{"AND start_ts < %(startDate)s AND start_ts >= %(timestamp_sub2)s" if j_s else ""}"""
params["timestamp_sub2"] = TimeUTC.now() \
- (a["options"]["currentPeriod"] + a["options"]["currentPeriod"]) * 60 * 1000
sub1 = f"SELECT (({sub1})/NULLIF(({sub2}),0)-1)*100 AS value"
q += f" FROM ({sub1}) AS stat"
return q, params
def process():
notifications = []
all_alerts = alerts_listener.get_all_alerts()
with pg_client.PostgresClient() as cur:
for alert in all_alerts:
if can_check(alert):
query, params = Build(alert)
try:
query = cur.mogrify(query, params)
except Exception as e:
logging.error(
f"!!!Error while building alert query for alertId:{alert['alertId']} name: {alert['name']}")
logging.error(e)
continue
logging.debug(alert)
logging.debug(query)
try:
cur.execute(query)
result = cur.fetchone()
if result["valid"]:
logging.info(f"Valid alert, notifying users, alertId:{alert['alertId']} name: {alert['name']}")
notifications.append(generate_notification(alert, result))
except Exception as e:
logging.error(
f"!!!Error while running alert query for alertId:{alert['alertId']} name: {alert['name']}")
logging.error(query)
logging.error(e)
cur = cur.recreate(rollback=True)
if len(notifications) > 0:
cur.execute(
cur.mogrify(f"""UPDATE public.alerts
SET options = options||'{{"lastNotification":{TimeUTC.now()}}}'::jsonb
WHERE alert_id IN %(ids)s;""", {"ids": tuple([n["alertId"] for n in notifications])}))
if len(notifications) > 0:
alerts.process_notifications(notifications)
def __format_value(x):
if x % 1 == 0:
x = int(x)
else:
x = round(x, 2)
return f"{x:,}"
def generate_notification(alert, result):
left = __format_value(result['value'])
right = __format_value(alert['query']['right'])
return {
"alertId": alert["alertId"],
"tenantId": alert["tenantId"],
"title": alert["name"],
"description": f"{alert['seriesName']} = {left} ({alert['query']['operator']} {right}).",
"buttonText": "Check metrics for more details",
"buttonUrl": f"/{alert['projectId']}/metrics",
"imageUrl": None,
"projectId": alert["projectId"],
"projectName": alert["projectName"],
"options": {"source": "ALERT", "sourceId": alert["alertId"],
"sourceMeta": alert["detectionMethod"],
"message": alert["options"]["message"], "projectId": alert["projectId"],
"data": {"title": alert["name"],
"limitValue": alert["query"]["right"],
"actualValue": float(result["value"]) \
if isinstance(result["value"], decimal.Decimal) \
else result["value"],
"operator": alert["query"]["operator"],
"trigger": alert["query"]["left"],
"alertId": alert["alertId"],
"detectionMethod": alert["detectionMethod"],
"currentPeriod": alert["options"]["currentPeriod"],
"previousPeriod": alert["options"]["previousPeriod"],
"createdAt": TimeUTC.now()}},
}

View file

@ -11,11 +11,13 @@ from chalicelib.utils import helper, pg_client
from chalicelib.utils.TimeUTC import TimeUTC
from chalicelib.utils.storage import extra
if config("EXP_ERRORS_SEARCH", cast=bool, default=False):
logging.info(">>> Using experimental error search")
from . import errors_exp as errors
else:
from . import errors as errors
# TODO: fix this import
from . import errors as errors
# if config("EXP_ERRORS_SEARCH", cast=bool, default=False):
# logging.info(">>> Using experimental error search")
# from . import errors_exp as errors
# else:
# from . import errors as errors
if config("EXP_SESSIONS_SEARCH_METRIC", cast=bool, default=False):
from chalicelib.core import sessions

View file

@ -7,7 +7,7 @@ from chalicelib.core import sessions_mobs, events
from chalicelib.utils import sql_helper as sh
if config("EXP_SESSIONS_SEARCH", cast=bool, default=False):
from chalicelib.core import sessions_exp as sessions
from chalicelib.core import sessions_ch as sessions
else:
from chalicelib.core import sessions

View file

@ -1,8 +1,8 @@
from typing import List
import schemas
from chalicelib.core.metrics import __get_basic_constraints, __get_meta_constraint
from chalicelib.core.metrics import __get_constraint_values, __complete_missing_steps
from chalicelib.core.metrics_ch import __get_basic_constraints, __get_meta_constraint
from chalicelib.core.metrics_ch import __get_constraint_values, __complete_missing_steps
from chalicelib.utils import ch_client, exp_ch_helper
from chalicelib.utils import helper, dev
from chalicelib.utils.TimeUTC import TimeUTC

View file

@ -6,7 +6,6 @@ rm -rf ./auth/auth_apikey.py
rm -rf ./build.sh
rm -rf ./build_alerts.sh
rm -rf ./build_crons.sh
rm -rf ./chalicelib/core/alerts.py
rm -rf ./chalicelib/core/announcements.py
rm -rf ./chalicelib/core/assist.py
rm -rf ./chalicelib/core/authorizers.py
@ -105,3 +104,8 @@ rm -rf ./chalicelib/core/product_anaytics2.py
rm -rf ./chalicelib/utils/ch_client.py
rm -rf ./chalicelib/utils/ch_client_exp.py
rm -rf ./routers/subs/product_anaytics.py
rm -rf ./chalicelib/core/alerts/__init__.py
rm -rf ./chalicelib/core/alerts/alerts.py
rm -rf ./chalicelib/core/alerts/alerts_processor.py
rm -rf ./chalicelib/core/alerts/alerts_processor_ch.py
rm -rf ./chalicelib/core/sessions_ch.py