From f98d995906e7116f88890233c6bbd5b21970fcc9 Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Sat, 8 Jan 2022 17:58:26 +0100 Subject: [PATCH] feat(api): alerts notifications feat(api): alerts dockerfile feat(api): alerts build script feat(backend): removed alerts --- api/.chalice/config.bundle.json | 67 ---------- api/.chalice/config.json | 68 ---------- api/Dockerfile.alerts | 18 +++ api/app_alerts.py | 26 ++++ api/build.sh | 6 +- api/chalicelib/core/alerts.py | 22 ++- api/chalicelib/core/alerts_processor.py | 57 ++++++-- api/chalicelib/core/webhook.py | 19 ++- api/routers/core.py | 15 +-- api/routers/core_dynamic.py | 9 +- api/schemas.py | 10 -- backend/services/alerts/main.go | 86 ------------ backend/services/alerts/manager.go | 171 ------------------------ ee/api/.chalice/config.json | 81 ----------- ee/api/routers/core_dynamic.py | 9 +- 15 files changed, 122 insertions(+), 542 deletions(-) delete mode 100644 api/.chalice/config.bundle.json delete mode 100644 api/.chalice/config.json create mode 100644 api/Dockerfile.alerts create mode 100644 api/app_alerts.py delete mode 100644 backend/services/alerts/main.go delete mode 100644 backend/services/alerts/manager.go delete mode 100644 ee/api/.chalice/config.json diff --git a/api/.chalice/config.bundle.json b/api/.chalice/config.bundle.json deleted file mode 100644 index 95b29ab50..000000000 --- a/api/.chalice/config.bundle.json +++ /dev/null @@ -1,67 +0,0 @@ -{ - "version": "2.0", - "app_name": "parrot", - "environment_variables": { - }, - "stages": { - "default-foss": { - "api_gateway_stage": "default-fos", - "manage_iam_role": false, - "iam_role_arn": "", - "autogen_policy": true, - "environment_variables": { - "isFOS": "true", - "isEE": "false", - "stage": "default-foss", - "jwt_issuer": "openreplay-default-foss", - "sentryURL": "", - "pg_host": "postgresql.db.svc.cluster.local", - "pg_port": "5432", - "pg_dbname": "postgres", - "pg_user": "postgres", - "pg_password": "asayerPostgres", - "alert_ntf": "http://127.0.0.1:8000/async/alerts/notifications/%s", - "email_signup": "http://127.0.0.1:8000/async/email_signup/%s", - "email_funnel": "http://127.0.0.1:8000/async/funnel/%s", - "email_basic": "http://127.0.0.1:8000/async/basic/%s", - "assign_link": "http://127.0.0.1:8000/async/email_assignment", - "captcha_server": "", - "captcha_key": "", - "sessions_bucket": "mobs", - "sessions_region": "us-east-1", - "put_S3_TTL": "20", - "sourcemaps_reader": "http://0.0.0.0:9000/sourcemaps", - "sourcemaps_bucket": "sourcemaps", - "js_cache_bucket": "sessions-assets", - "peers": "http://0.0.0.0:9000/assist/peers", - "async_Token": "", - "EMAIL_HOST": "", - "EMAIL_PORT": "587", - "EMAIL_USER": "", - "EMAIL_PASSWORD": "", - "EMAIL_USE_TLS": "true", - "EMAIL_USE_SSL": "false", - "EMAIL_SSL_KEY": "", - "EMAIL_SSL_CERT": "", - "EMAIL_FROM": "OpenReplay", - "SITE_URL": "", - "announcement_url": "", - "jwt_secret": "", - "jwt_algorithm": "HS512", - "jwt_exp_delta_seconds": "2592000", - "S3_HOST": "", - "S3_KEY": "", - "S3_SECRET": "", - "invitation_link": "/api/users/invitation?token=%s", - "change_password_link": "/reset-password?invitation=%s&&pass=%s", - "version_number": "1.3.5" - }, - "lambda_timeout": 150, - "lambda_memory_size": 400, - "subnet_ids": [ - ], - "security_group_ids": [ - ] - } - } -} diff --git a/api/.chalice/config.json b/api/.chalice/config.json deleted file mode 100644 index d1fe6c36c..000000000 --- a/api/.chalice/config.json +++ /dev/null @@ -1,68 +0,0 @@ -{ - "version": "2.0", - "app_name": "parrot", - "environment_variables": { - }, - "stages": { - "default-foss": { - "api_gateway_stage": "default-fos", - "manage_iam_role": false, - "iam_role_arn": "", - "autogen_policy": true, - "environment_variables": { - "isFOS": "true", - "isEE": "false", - "stage": "default-foss", - "jwt_issuer": "openreplay-default-foss", - "sentryURL": "", - "pg_host": "postgresql.db.svc.cluster.local", - "pg_port": "5432", - "pg_dbname": "postgres", - "pg_user": "postgres", - "pg_password": "asayerPostgres", - "alert_ntf": "http://127.0.0.1:8000/async/alerts/notifications/%s", - "email_signup": "http://127.0.0.1:8000/async/email_signup/%s", - "email_funnel": "http://127.0.0.1:8000/async/funnel/%s", - "email_basic": "http://127.0.0.1:8000/async/basic/%s", - "assign_link": "http://127.0.0.1:8000/async/email_assignment", - "captcha_server": "", - "captcha_key": "", - "sessions_bucket": "mobs", - "sessions_region": "us-east-1", - "put_S3_TTL": "20", - "sourcemaps_reader": "http://utilities-openreplay.app.svc.cluster.local:9000/sourcemaps", - "sourcemaps_bucket": "sourcemaps", - "js_cache_bucket": "sessions-assets", - "peers": "http://utilities-openreplay.app.svc.cluster.local:9000/assist/%s/peers", - "async_Token": "", - "EMAIL_HOST": "", - "EMAIL_PORT": "587", - "EMAIL_USER": "", - "EMAIL_PASSWORD": "", - "EMAIL_USE_TLS": "true", - "EMAIL_USE_SSL": "false", - "EMAIL_SSL_KEY": "", - "EMAIL_SSL_CERT": "", - "EMAIL_FROM": "OpenReplay", - "SITE_URL": "", - "announcement_url": "", - "jwt_secret": "", - "jwt_algorithm": "HS512", - "jwt_exp_delta_seconds": "2592000", - "S3_HOST": "", - "S3_KEY": "", - "S3_SECRET": "", - "invitation_link": "/api/users/invitation?token=%s", - "change_password_link": "/reset-password?invitation=%s&&pass=%s", - "iosBucket": "openreplay-ios-images", - "version_number": "1.3.6" - }, - "lambda_timeout": 150, - "lambda_memory_size": 400, - "subnet_ids": [ - ], - "security_group_ids": [ - ] - } - } -} \ No newline at end of file diff --git a/api/Dockerfile.alerts b/api/Dockerfile.alerts new file mode 100644 index 000000000..69e2a1184 --- /dev/null +++ b/api/Dockerfile.alerts @@ -0,0 +1,18 @@ +FROM python:3.9.7-slim +LABEL Maintainer="Rajesh Rajendran" +LABEL Maintainer="KRAIEM Taha Yassine" +WORKDIR /work +COPY . . +RUN pip install -r requirements.txt +RUN mv .env.default .env && rm app.py && ren app_alerts.py app.py +ENV pg_minconn 2 + +# Add Tini +# Startup daemon +ENV TINI_VERSION v0.19.0 +ARG envarg +ENV ENTERPRISE_BUILD ${envarg} +ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /tini +RUN chmod +x /tini +ENTRYPOINT ["/tini", "--"] +CMD ./entrypoint.sh \ No newline at end of file diff --git a/api/app_alerts.py b/api/app_alerts.py new file mode 100644 index 000000000..0cba8986a --- /dev/null +++ b/api/app_alerts.py @@ -0,0 +1,26 @@ +import logging + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from fastapi import FastAPI + +from chalicelib.core import alerts_processor + +app = FastAPI() +print("============= ALERTS =============") + + +@app.get("/") +async def root(): + return {"status": "Running"} + + +app.schedule = AsyncIOScheduler() +app.schedule.start() +app.schedule.add_job(id="alerts_processor", **{"func": alerts_processor.process, "trigger": "interval", "minutes": 5, + "misfire_grace_time": 20}) + +for job in app.schedule.get_jobs(): + print({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)}) + +logging.basicConfig() +logging.getLogger('apscheduler').setLevel(logging.INFO) diff --git a/api/build.sh b/api/build.sh index ee7cec3ab..c0ede73da 100644 --- a/api/build.sh +++ b/api/build.sh @@ -22,15 +22,19 @@ function build_api(){ # Copy enterprise code [[ $1 == "ee" ]] && { cp -rf ../ee/api/* ./ - cp -rf ../ee/api/.chalice/* ./.chalice/ envarg="default-ee" tag="ee-" } docker build -f ./Dockerfile --build-arg envarg=$envarg -t ${DOCKER_REPO:-'local'}/api:${git_sha1} . + docker build -f ./Dockerfile.alerts --build-arg envarg=$envarg -t ${DOCKER_REPO:-'local'}/alerts:${git_sha1} . [[ $PUSH_IMAGE -eq 1 ]] && { docker push ${DOCKER_REPO:-'local'}/api:${git_sha1} docker tag ${DOCKER_REPO:-'local'}/api:${git_sha1} ${DOCKER_REPO:-'local'}/api:${tag}latest docker push ${DOCKER_REPO:-'local'}/api:${tag}latest + + docker push ${DOCKER_REPO:-'local'}/alerts:${git_sha1} + docker tag ${DOCKER_REPO:-'local'}/alerts:${git_sha1} ${DOCKER_REPO:-'local'}/alerts:${tag}latest + docker push ${DOCKER_REPO:-'local'}/alerts:${tag}latest } } diff --git a/api/chalicelib/core/alerts.py b/api/chalicelib/core/alerts.py index 4fc628408..ef35d48b3 100644 --- a/api/chalicelib/core/alerts.py +++ b/api/chalicelib/core/alerts.py @@ -1,8 +1,6 @@ import json import time -from fastapi import BackgroundTasks - import schemas from chalicelib.core import notifications, slack, webhook from chalicelib.utils import pg_client, helper, email_helper @@ -125,7 +123,7 @@ def update(id, data: schemas.AlertSchema): return {"data": __process_circular(a)} -def process_notifications(data, background_tasks: BackgroundTasks): +def process_notifications(data): full = {} for n in data: if "message" in n["options"]: @@ -150,11 +148,23 @@ def process_notifications(data, background_tasks: BackgroundTasks): notifications_list = full[t][i:i + BATCH_SIZE] if t == "slack": - background_tasks.add_task(slack.send_batch, notifications_list=notifications_list) + try: + slack.send_batch(notifications_list=notifications_list) + except Exception as e: + print("!!!Error while sending slack notifications batch") + print(str(e)) elif t == "email": - background_tasks.add_task(send_by_email_batch, notifications_list=notifications_list) + try: + send_by_email_batch(notifications_list=notifications_list) + except Exception as e: + print("!!!Error while sending email notifications batch") + print(str(e)) elif t == "webhook": - background_tasks.add_task(webhook.trigger_batch, data_list=notifications_list) + try: + webhook.trigger_batch(data_list=notifications_list) + except Exception as e: + print("!!!Error while sending webhook notifications batch") + print(str(e)) def send_by_email(notification, destination): diff --git a/api/chalicelib/core/alerts_processor.py b/api/chalicelib/core/alerts_processor.py index d811624f8..281064abf 100644 --- a/api/chalicelib/core/alerts_processor.py +++ b/api/chalicelib/core/alerts_processor.py @@ -1,5 +1,5 @@ import schemas -from chalicelib.core import sessions +from chalicelib.core import sessions, alerts from chalicelib.utils import pg_client, helper from chalicelib.utils.TimeUTC import TimeUTC @@ -112,10 +112,8 @@ def Build(a): WHERE project_id = %(project_id)s {"AND " + colDef["condition"] if colDef.get("condition") is not None else ""}""" j_s = colDef.get("joinSessions", True) - print(">>>>>>>>>>>>>") - print(j_s) - # q = sq.Select(fmt.Sprint("value, coalesce(value,0)", a.Query.Operator, a.Query.Right, " AS valid")) - q = f"""SELECT value, coalesce(value,0) {a["query"]["operator"]} {a["query"]["right"]} AS valid""" + + q = f"""SELECT coalesce(value,0) AS value, coalesce(value,0) {a["query"]["operator"]} {a["query"]["right"]} AS valid""" # if len(colDef.group) > 0 { # subQ = subQ.Column(colDef.group + " AS group_value") @@ -196,8 +194,10 @@ def Build(a): def process(): + notifications = [] with pg_client.PostgresClient(long_query=True) as cur: - query = """SELECT alert_id, + query = """SELECT -1 AS tenant_id, + alert_id, project_id, detection_method, query, @@ -217,14 +217,49 @@ def process(): ORDER BY alerts.created_at;""" cur.execute(query=query) all_alerts = helper.list_to_camel_case(cur.fetchall()) + + with pg_client.PostgresClient() as cur: for alert in all_alerts: - if True or can_check(alert): + if can_check(alert): print(f"Querying alertId:{alert['alertId']} name: {alert['name']}") query, params = Build(alert) query = cur.mogrify(query, params) # print(alert) # print(query) - cur.execute(query) - result = cur.fetchone() - if result["valid"]: - print("Valid alert, notifying users") + try: + cur.execute(query) + result = cur.fetchone() + if result["valid"]: + print("Valid alert, notifying users") + notifications.append({ + "alertId": alert["alertId"], + "tenantId": alert["tenantId"], + "title": alert["name"], + "description": f"has been triggered, {alert['query']['left']} = {result['value']} ({alert['query']['operator']} {alert['query']['right']}).", + "buttonText": "Check metrics for more details", + "buttonUrl": f"/{alert['projectId']}/metrics", + "imageUrl": None, + "options": {"source": "ALERT", "sourceId": alert["alertId"], + "sourceMeta": alert["detectionMethod"], + "message": alert["options"]["message"], "projectId": alert["projectId"], + "data": {"title": alert["name"], + "limitValue": alert["query"]["right"], "actualValue": result["value"], + "operator": alert["query"]["operator"], + "trigger": alert["query"]["left"], + "alertId": alert["alertId"], + "detectionMethod": alert["detectionMethod"], + "currentPeriod": alert["options"]["currentPeriod"], + "previousPeriod": alert["options"]["previousPeriod"], + "createdAt": TimeUTC.now()}}, + }) + except Exception as e: + print(f"!!!Error while running alert query for alertId:{alert['alertId']}") + print(str(e)) + print(query) + if len(notifications) > 0: + cur.execute( + cur.mogrify(f"""UPDATE public.Alerts + SET options = options||'{{"lastNotification":{TimeUTC.now()}}}'::jsonb + WHERE alert_id IN %(ids)s;""", {"ids": tuple([n["alertId"] for n in notifications])})) + if len(notifications) > 0: + alerts.process_notifications(notifications) diff --git a/api/chalicelib/core/webhook.py b/api/chalicelib/core/webhook.py index 653a2b513..254a46c97 100644 --- a/api/chalicelib/core/webhook.py +++ b/api/chalicelib/core/webhook.py @@ -1,6 +1,7 @@ +import requests + from chalicelib.utils import pg_client, helper from chalicelib.utils.TimeUTC import TimeUTC -import requests def get_by_id(webhook_id): @@ -81,7 +82,7 @@ def update(tenant_id, webhook_id, changes, replace_none=False): SET {','.join(sub_query)} WHERE webhook_id =%(id)s AND deleted_at ISNULL RETURNING webhook_id AS integration_id, webhook_id AS id,*;""", - {"id": webhook_id, **changes})) + {"id": webhook_id, **changes})) cur.execute( cur.mogrify(f"""\ UPDATE public.webhooks @@ -150,22 +151,18 @@ def trigger_batch(data_list): for w in data_list: if w["destination"] not in webhooks_map: webhooks_map[w["destination"]] = get_by_id(webhook_id=w["destination"]) - __trigger(hook=webhooks_map[w["destination"]], data=w["data"]) + if webhooks_map[w["destination"]] is None: + print(f"!!Error webhook not found: webhook_id={w['destination']}") + else: + __trigger(hook=webhooks_map[w["destination"]], data=w["data"]) def __trigger(hook, data): - if hook["type"] == 'webhook': + if hook is not None and hook["type"] == 'webhook': headers = {} if hook["authHeader"] is not None and len(hook["authHeader"]) > 0: headers = {"Authorization": hook["authHeader"]} - # body = { - # "webhookId": hook["id"], - # "createdAt": TimeUTC.now(), - # "event": event, - # "data": data - # } - r = requests.post(url=hook["endpoint"], json=data, headers=headers) if r.status_code != 200: print("=======> webhook: something went wrong") diff --git a/api/routers/core.py b/api/routers/core.py index 327cb2eb6..78170298a 100644 --- a/api/routers/core.py +++ b/api/routers/core.py @@ -10,7 +10,7 @@ from chalicelib.core import log_tool_rollbar, sourcemaps, events, sessions_assig log_tool_stackdriver, reset_password, sessions_favorite_viewed, \ log_tool_cloudwatch, log_tool_sentry, log_tool_sumologic, log_tools, errors, sessions, \ log_tool_newrelic, announcements, log_tool_bugsnag, weekly_report, integration_jira_cloud, integration_github, \ - assist, heatmaps, mobile, signup, tenants, errors_favorite_viewed, boarding, notifications, webhook, slack, users, \ + assist, heatmaps, mobile, signup, tenants, errors_favorite_viewed, boarding, notifications, webhook, users, \ custom_metrics, saved_search from chalicelib.core.collaboration_slack import Slack from chalicelib.utils import email_helper @@ -954,19 +954,6 @@ def add_remove_favorite_error(projectId: int, errorId: str, action: str, startDa return {"errors": ["undefined action"]} -@public_app.post('/async/alerts/notifications/{step}', tags=["async", "alerts"]) -@public_app.put('/async/alerts/notifications/{step}', tags=["async", "alerts"]) -def send_alerts_notification_async(step: str, data: schemas.AlertNotificationSchema = Body(...)): - if data.auth != config("async_Token"): - return {"errors": ["missing auth"]} - if step == "slack": - slack.send_batch(notifications_list=data.notifications) - elif step == "email": - alerts.send_by_email_batch(notifications_list=data.notifications) - elif step == "webhook": - webhook.trigger_batch(data_list=data.notifications) - - @app.get('/notifications', tags=['notifications']) def get_notifications(context: schemas.CurrentContext = Depends(OR_context)): return {"data": notifications.get_all(tenant_id=context.tenant_id, user_id=context.user_id)} diff --git a/api/routers/core_dynamic.py b/api/routers/core_dynamic.py index 0cd1b98da..d61d2ebfb 100644 --- a/api/routers/core_dynamic.py +++ b/api/routers/core_dynamic.py @@ -8,7 +8,7 @@ import schemas from chalicelib.core import assist from chalicelib.core import integrations_manager from chalicelib.core import sessions -from chalicelib.core import tenants, users, metadata, projects, license, alerts +from chalicelib.core import tenants, users, metadata, projects, license from chalicelib.core import webhook from chalicelib.core.collaboration_slack import Slack from chalicelib.utils import captcha @@ -209,13 +209,6 @@ def get_current_plan(context: schemas.CurrentContext = Depends(OR_context)): } -@public_app.post('/alerts/notifications', tags=["alerts"]) -@public_app.put('/alerts/notifications', tags=["alerts"]) -def send_alerts_notifications(background_tasks: BackgroundTasks, data: schemas.AlertNotificationSchema = Body(...)): - # TODO: validate token - return {"data": alerts.process_notifications(data.notifications, background_tasks=background_tasks)} - - @public_app.get('/general_stats', tags=["private"], include_in_schema=False) def get_general_stats(): return {"data": {"sessions:": sessions.count_all()}} diff --git a/api/schemas.py b/api/schemas.py index 975ea1eb9..6a33c20a4 100644 --- a/api/schemas.py +++ b/api/schemas.py @@ -88,16 +88,6 @@ class SearchErrorsSchema(BaseModel): order: Optional[str] = Field(None) -class EmailNotificationSchema(BaseModel): - notification: str = Field(...) - destination: str = Field(...) - - -class AlertNotificationSchema(BaseModel): - auth: str = Field(...) - notifications: List[EmailNotificationSchema] = Field(...) - - class CreateNotificationSchema(BaseModel): token: str = Field(...) notifications: List = Field(...) diff --git a/backend/services/alerts/main.go b/backend/services/alerts/main.go deleted file mode 100644 index b11d3ae04..000000000 --- a/backend/services/alerts/main.go +++ /dev/null @@ -1,86 +0,0 @@ -package main - -import ( - "database/sql" - "log" - "os" - "os/signal" - "syscall" - "time" - - "openreplay/backend/pkg/db/postgres" - "openreplay/backend/pkg/env" - _ "github.com/lib/pq" -) - -func main() { - log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) - POSTGRES_STRING := env.String("POSTGRES_STRING") - NOTIFICATIONS_STRING := env.String("ALERT_NOTIFICATION_STRING") - log.Printf("Notifications: %s \nPG: %s\n", NOTIFICATIONS_STRING, POSTGRES_STRING) - pg := postgres.NewConn(POSTGRES_STRING) - defer pg.Close() - - pgs, err := sql.Open("postgres", POSTGRES_STRING+ "?sslmode=disable") - if err != nil { - log.Fatal(err) - } - defer pgs.Close() - - manager := NewManager(NOTIFICATIONS_STRING, POSTGRES_STRING, pgs, pg) - if err := pg.IterateAlerts(func(a *postgres.Alert, err error) { - if err != nil { - log.Printf("Postgres error: %v\n", err) - return - } - log.Printf("Alert initialization: %+v\n", *a) - //log.Printf("CreatedAt: %s\n", *a.CreatedAt) - err = manager.Update(a) - if err != nil { - log.Printf("Alert parse error: %v | Alert: %+v\n", err, *a) - return - } - }); err != nil { - log.Fatalf("Postgres error: %v\n", err) - } - - listener, err := postgres.NewAlertsListener(POSTGRES_STRING) - if err != nil { - log.Fatalf("Postgres listener error: %v\n", err) - } - defer listener.Close() - - sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - - tickAlert := time.Tick(1 * time.Minute) - - log.Printf("Alert service started\n") - manager.RequestAll() - //return - for { - select { - case sig := <-sigchan: - log.Printf("Caught signal %v: terminating\n", sig) - listener.Close() - pg.Close() - os.Exit(0) - case <-tickAlert: - log.Printf("Requesting all...%d alerts\n", manager.Length()) - manager.RequestAll() - case iPointer := <-listener.Alerts: - log.Printf("Alert update: %+v\n", *iPointer) - //log.Printf("CreatedAt: %s\n", *iPointer.CreatedAt) - //log.Printf("Notification received for AlertId: %d\n", iPointer.AlertID) - err := manager.Update(iPointer) - if err != nil { - log.Printf("Alert parse error: %+v | Alert: %v\n", err, *iPointer) - } - case err := <-listener.Errors: - log.Printf("listener error: %v\n", err) - if err.Error() == "conn closed" { - panic("Listener conn lost") - } - } - } -} diff --git a/backend/services/alerts/manager.go b/backend/services/alerts/manager.go deleted file mode 100644 index 11ddb9363..000000000 --- a/backend/services/alerts/manager.go +++ /dev/null @@ -1,171 +0,0 @@ -package main - -import ( - "database/sql" - "fmt" - "log" - "sync" - "sync/atomic" - "time" - - "openreplay/backend/pkg/db/postgres" -) - -const PGParallelLimit = 2 - -var pgCount int64 - -type manager struct { - postgresString string - notificationsUrl string - alertsCache map[uint32]*postgres.Alert - cacheMutex sync.Mutex - pgParallel chan bool - pgs *sql.DB - pg *postgres.Conn - pgMutex sync.Mutex - notifications map[uint32]*postgres.TenantNotification - notificationsGo *sync.WaitGroup - notificationsMutex sync.Mutex -} - -func NewManager(notificationsUrl string, postgresString string, pgs *sql.DB, pg *postgres.Conn) *manager { - return &manager{ - postgresString: postgresString, - notificationsUrl: notificationsUrl, - alertsCache: make(map[uint32]*postgres.Alert), - cacheMutex: sync.Mutex{}, - pgParallel: make(chan bool, PGParallelLimit), - pgs: pgs, - pg: pg, - pgMutex: sync.Mutex{}, - notifications: make(map[uint32]*postgres.TenantNotification), - notificationsGo: &sync.WaitGroup{}, - notificationsMutex: sync.Mutex{}, - } - -} - -func (m *manager) Length() int { - return len(m.alertsCache) -} - -func (m *manager) Update(a *postgres.Alert) error { - m.cacheMutex.Lock() - defer m.cacheMutex.Unlock() - _, exists := m.alertsCache[a.AlertID] - if exists && a.DeletedAt != nil { - log.Println("deleting alert from memory") - delete(m.alertsCache, a.AlertID) - return nil - } else { - m.alertsCache[a.AlertID] = a - } - return nil -} -func (m *manager) processAlert(a *postgres.Alert) { - defer func() { - defer m.notificationsGo.Done() - <-m.pgParallel - }() - if !a.CanCheck() { - log.Printf("cannot check %s", a.Name) - return - } - //log.Printf("checking %+v", a) - log.Printf("quering %s", a.Name) - //--- For stats: - atomic.AddInt64(&pgCount, 1) - q, err := a.Build() - if err != nil { - log.Println(err) - return - } - - rows, err := q.RunWith(m.pgs).Query() - - if err != nil { - log.Println(err) - return - } - defer rows.Close() - - for rows.Next() { - var ( - value sql.NullFloat64 - valid bool - ) - if err := rows.Scan(&value, &valid); err != nil { - log.Println(err) - continue - } - - if valid && value.Valid { - log.Printf("%s: valid", a.Name) - m.notificationsMutex.Lock() - m.notifications[a.AlertID] = &postgres.TenantNotification{ - TenantId: a.TenantId, - Title: a.Name, - Description: fmt.Sprintf("has been triggered, %s = %.0f (%s %.0f).", a.Query.Left, value.Float64, a.Query.Operator, a.Query.Right), - ButtonText: "Check metrics for more details", - ButtonUrl: fmt.Sprintf("/%d/metrics", a.ProjectID), - ImageUrl: nil, - Options: map[string]interface{}{"source": "ALERT", "sourceId": a.AlertID, "sourceMeta": a.DetectionMethod, "message": a.Options.Message, "projectId": a.ProjectID, "data": map[string]interface{}{"title": a.Name, "limitValue": a.Query.Right, "actualValue": value.Float64, "operator": a.Query.Operator, "trigger": a.Query.Left, "alertId": a.AlertID, "detectionMethod": a.DetectionMethod, "currentPeriod": a.Options.CurrentPeriod, "previousPeriod": a.Options.PreviousPeriod, "createdAt": time.Now().Unix() * 1000}}, - } - m.notificationsMutex.Unlock() - } - } - -} -func (m *manager) RequestAll() { - now := time.Now().Unix() - m.cacheMutex.Lock() - for _, a := range m.alertsCache { - m.pgParallel <- true - m.notificationsGo.Add(1) - go m.processAlert(a) - //m.processAlert(a) - } - //log.Println("releasing cache") - m.cacheMutex.Unlock() - //log.Println("waiting for all alerts to finish") - m.notificationsGo.Wait() - log.Printf("done %d PG queries in: %ds", pgCount, time.Now().Unix()-now) - pgCount = 0 - //log.Printf("Processing %d Notifications", len(m.notifications)) - m.notificationsMutex.Lock() - go m.ProcessNotifications(m.notifications) - m.notificationsMutex.Unlock() - m.notifications = make(map[uint32]*postgres.TenantNotification) - //log.Printf("Notifications purged: %d", len(m.notifications)) -} - -func (m *manager) ProcessNotifications(allNotifications map[uint32]*postgres.TenantNotification) { - if len(allNotifications) == 0 { - log.Println("No notifications to process") - return - } - log.Printf("sending %d notifications", len(allNotifications)) - allIds := make([]uint32, 0, len(allNotifications)) - toSend := postgres.Notifications{ - Notifications: []*postgres.TenantNotification{}, - } - for k, n := range allNotifications { - //log.Printf("notification for %d", k) - allIds = append(allIds, k) - toSend.Notifications = append(toSend.Notifications, n) - } - toSend.Send(m.notificationsUrl) - if err := m.pg.SaveLastNotification(allIds); err != nil { - log.Printf("Error saving LastNotification time: %v", err) - if err.Error() == "conn closed" { - m.pg = postgres.NewConn(m.postgresString) - //if err != nil { - // panic(fmt.Sprintf("Postgres renew notifications connection error: %v\n", err)) - //} - if err := m.pg.SaveLastNotification(allIds); err != nil { - panic(fmt.Sprintf("Error saving LastNotification time, suicide: %v", err)) - } - } - } -} diff --git a/ee/api/.chalice/config.json b/ee/api/.chalice/config.json deleted file mode 100644 index db58c76ba..000000000 --- a/ee/api/.chalice/config.json +++ /dev/null @@ -1,81 +0,0 @@ -{ - "version": "2.0", - "app_name": "parrot", - "environment_variables": { - }, - "stages": { - "default-ee": { - "api_gateway_stage": "default-ee", - "manage_iam_role": false, - "iam_role_arn": "", - "autogen_policy": true, - "environment_variables": { - "isFOS": "false", - "isEE": "true", - "stage": "default-ee", - "jwt_issuer": "openreplay-default-ee", - "sentryURL": "", - "pg_host": "127.0.0.1", - "pg_port": "9202", - "pg_dbname": "app", - "pg_user": "", - "pg_password": "", - "ch_host": "", - "ch_port": "", - "alert_ntf": "http://127.0.0.1:8000/async/alerts/notifications/%s", - "email_signup": "http://127.0.0.1:8000/async/email_signup/%s", - "email_funnel": "http://127.0.0.1:8000/async/funnel/%s", - "email_plans": "http://127.0.0.1:8000/async/plans/%s", - "email_basic": "http://127.0.0.1:8000/async/basic/%s", - "assign_link": "http://127.0.0.1:8000/async/email_assignment", - "captcha_server": "", - "captcha_key": "", - "sessions_bucket": "mobs", - "sessions_region": "us-east-1", - "put_S3_TTL": "20", - "sourcemaps_reader": "http://utilities-openreplay.app.svc.cluster.local:9000/sourcemaps", - "sourcemaps_bucket": "sourcemaps", - "peers": "http://utilities-openreplay.app.svc.cluster.local:9000/assist/%s/peers", - "js_cache_bucket": "sessions-assets", - "async_Token": "", - "EMAIL_HOST": "", - "EMAIL_PORT": "587", - "EMAIL_USER": "", - "EMAIL_PASSWORD": "", - "EMAIL_USE_TLS": "true", - "EMAIL_USE_SSL": "false", - "EMAIL_SSL_KEY": "", - "EMAIL_SSL_CERT": "", - "EMAIL_FROM": "OpenReplay", - "SITE_URL": "", - "announcement_url": "", - "jwt_secret": "SET A RANDOM STRING HERE", - "jwt_algorithm": "HS512", - "jwt_exp_delta_seconds": "2592000", - "S3_HOST": "", - "S3_KEY": "", - "S3_SECRET": "", - "LICENSE_KEY": "", - "SAML2_MD_URL": "", - "idp_entityId": "", - "idp_sso_url": "", - "idp_x509cert": "", - "idp_sls_url": "", - "idp_name": "", - "sso_exp_delta_seconds": "172800", - "sso_landing": "/login?jwt=%s", - "invitation_link": "/api/users/invitation?token=%s", - "change_password_link": "/reset-password?invitation=%s&&pass=%s", - "iosBucket": "openreplay-ios-images", - "version_number": "1.3.6", - "assist_secret": "" - }, - "lambda_timeout": 150, - "lambda_memory_size": 400, - "subnet_ids": [ - ], - "security_group_ids": [ - ] - } - } -} diff --git a/ee/api/routers/core_dynamic.py b/ee/api/routers/core_dynamic.py index a887237b3..bab3689ff 100644 --- a/ee/api/routers/core_dynamic.py +++ b/ee/api/routers/core_dynamic.py @@ -8,7 +8,7 @@ import schemas import schemas_ee from chalicelib.core import integrations_manager from chalicelib.core import sessions -from chalicelib.core import tenants, users, metadata, projects, license, alerts, assist +from chalicelib.core import tenants, users, metadata, projects, license, assist from chalicelib.core import webhook from chalicelib.core.collaboration_slack import Slack from chalicelib.utils import captcha, SAML2_helper @@ -213,13 +213,6 @@ def get_current_plan(context: schemas.CurrentContext = Depends(OR_context)): } -@public_app.post('/alerts/notifications', tags=["alerts"]) -@public_app.put('/alerts/notifications', tags=["alerts"]) -def send_alerts_notifications(background_tasks: BackgroundTasks, data: schemas.AlertNotificationSchema = Body(...)): - # TODO: validate token - return {"data": alerts.process_notifications(data.notifications, background_tasks=background_tasks)} - - @public_app.get('/general_stats', tags=["private"], include_in_schema=False) def get_general_stats(): return {"data": {"sessions:": sessions.count_all()}}