feat(api): alerts notifications
feat(api): alerts dockerfile feat(api): alerts build script feat(backend): removed alerts
This commit is contained in:
parent
6b14f13e53
commit
f98d995906
15 changed files with 122 additions and 542 deletions
|
|
@ -1,67 +0,0 @@
|
|||
{
|
||||
"version": "2.0",
|
||||
"app_name": "parrot",
|
||||
"environment_variables": {
|
||||
},
|
||||
"stages": {
|
||||
"default-foss": {
|
||||
"api_gateway_stage": "default-fos",
|
||||
"manage_iam_role": false,
|
||||
"iam_role_arn": "",
|
||||
"autogen_policy": true,
|
||||
"environment_variables": {
|
||||
"isFOS": "true",
|
||||
"isEE": "false",
|
||||
"stage": "default-foss",
|
||||
"jwt_issuer": "openreplay-default-foss",
|
||||
"sentryURL": "",
|
||||
"pg_host": "postgresql.db.svc.cluster.local",
|
||||
"pg_port": "5432",
|
||||
"pg_dbname": "postgres",
|
||||
"pg_user": "postgres",
|
||||
"pg_password": "asayerPostgres",
|
||||
"alert_ntf": "http://127.0.0.1:8000/async/alerts/notifications/%s",
|
||||
"email_signup": "http://127.0.0.1:8000/async/email_signup/%s",
|
||||
"email_funnel": "http://127.0.0.1:8000/async/funnel/%s",
|
||||
"email_basic": "http://127.0.0.1:8000/async/basic/%s",
|
||||
"assign_link": "http://127.0.0.1:8000/async/email_assignment",
|
||||
"captcha_server": "",
|
||||
"captcha_key": "",
|
||||
"sessions_bucket": "mobs",
|
||||
"sessions_region": "us-east-1",
|
||||
"put_S3_TTL": "20",
|
||||
"sourcemaps_reader": "http://0.0.0.0:9000/sourcemaps",
|
||||
"sourcemaps_bucket": "sourcemaps",
|
||||
"js_cache_bucket": "sessions-assets",
|
||||
"peers": "http://0.0.0.0:9000/assist/peers",
|
||||
"async_Token": "",
|
||||
"EMAIL_HOST": "",
|
||||
"EMAIL_PORT": "587",
|
||||
"EMAIL_USER": "",
|
||||
"EMAIL_PASSWORD": "",
|
||||
"EMAIL_USE_TLS": "true",
|
||||
"EMAIL_USE_SSL": "false",
|
||||
"EMAIL_SSL_KEY": "",
|
||||
"EMAIL_SSL_CERT": "",
|
||||
"EMAIL_FROM": "OpenReplay<do-not-reply@openreplay.com>",
|
||||
"SITE_URL": "",
|
||||
"announcement_url": "",
|
||||
"jwt_secret": "",
|
||||
"jwt_algorithm": "HS512",
|
||||
"jwt_exp_delta_seconds": "2592000",
|
||||
"S3_HOST": "",
|
||||
"S3_KEY": "",
|
||||
"S3_SECRET": "",
|
||||
"invitation_link": "/api/users/invitation?token=%s",
|
||||
"change_password_link": "/reset-password?invitation=%s&&pass=%s",
|
||||
"version_number": "1.3.5"
|
||||
},
|
||||
"lambda_timeout": 150,
|
||||
"lambda_memory_size": 400,
|
||||
"subnet_ids": [
|
||||
],
|
||||
"security_group_ids": [
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,68 +0,0 @@
|
|||
{
|
||||
"version": "2.0",
|
||||
"app_name": "parrot",
|
||||
"environment_variables": {
|
||||
},
|
||||
"stages": {
|
||||
"default-foss": {
|
||||
"api_gateway_stage": "default-fos",
|
||||
"manage_iam_role": false,
|
||||
"iam_role_arn": "",
|
||||
"autogen_policy": true,
|
||||
"environment_variables": {
|
||||
"isFOS": "true",
|
||||
"isEE": "false",
|
||||
"stage": "default-foss",
|
||||
"jwt_issuer": "openreplay-default-foss",
|
||||
"sentryURL": "",
|
||||
"pg_host": "postgresql.db.svc.cluster.local",
|
||||
"pg_port": "5432",
|
||||
"pg_dbname": "postgres",
|
||||
"pg_user": "postgres",
|
||||
"pg_password": "asayerPostgres",
|
||||
"alert_ntf": "http://127.0.0.1:8000/async/alerts/notifications/%s",
|
||||
"email_signup": "http://127.0.0.1:8000/async/email_signup/%s",
|
||||
"email_funnel": "http://127.0.0.1:8000/async/funnel/%s",
|
||||
"email_basic": "http://127.0.0.1:8000/async/basic/%s",
|
||||
"assign_link": "http://127.0.0.1:8000/async/email_assignment",
|
||||
"captcha_server": "",
|
||||
"captcha_key": "",
|
||||
"sessions_bucket": "mobs",
|
||||
"sessions_region": "us-east-1",
|
||||
"put_S3_TTL": "20",
|
||||
"sourcemaps_reader": "http://utilities-openreplay.app.svc.cluster.local:9000/sourcemaps",
|
||||
"sourcemaps_bucket": "sourcemaps",
|
||||
"js_cache_bucket": "sessions-assets",
|
||||
"peers": "http://utilities-openreplay.app.svc.cluster.local:9000/assist/%s/peers",
|
||||
"async_Token": "",
|
||||
"EMAIL_HOST": "",
|
||||
"EMAIL_PORT": "587",
|
||||
"EMAIL_USER": "",
|
||||
"EMAIL_PASSWORD": "",
|
||||
"EMAIL_USE_TLS": "true",
|
||||
"EMAIL_USE_SSL": "false",
|
||||
"EMAIL_SSL_KEY": "",
|
||||
"EMAIL_SSL_CERT": "",
|
||||
"EMAIL_FROM": "OpenReplay<do-not-reply@openreplay.com>",
|
||||
"SITE_URL": "",
|
||||
"announcement_url": "",
|
||||
"jwt_secret": "",
|
||||
"jwt_algorithm": "HS512",
|
||||
"jwt_exp_delta_seconds": "2592000",
|
||||
"S3_HOST": "",
|
||||
"S3_KEY": "",
|
||||
"S3_SECRET": "",
|
||||
"invitation_link": "/api/users/invitation?token=%s",
|
||||
"change_password_link": "/reset-password?invitation=%s&&pass=%s",
|
||||
"iosBucket": "openreplay-ios-images",
|
||||
"version_number": "1.3.6"
|
||||
},
|
||||
"lambda_timeout": 150,
|
||||
"lambda_memory_size": 400,
|
||||
"subnet_ids": [
|
||||
],
|
||||
"security_group_ids": [
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
18
api/Dockerfile.alerts
Normal file
18
api/Dockerfile.alerts
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
FROM python:3.9.7-slim
|
||||
LABEL Maintainer="Rajesh Rajendran<rjshrjndrn@gmail.com>"
|
||||
LABEL Maintainer="KRAIEM Taha Yassine<tahayk2@gmail.com>"
|
||||
WORKDIR /work
|
||||
COPY . .
|
||||
RUN pip install -r requirements.txt
|
||||
RUN mv .env.default .env && rm app.py && ren app_alerts.py app.py
|
||||
ENV pg_minconn 2
|
||||
|
||||
# Add Tini
|
||||
# Startup daemon
|
||||
ENV TINI_VERSION v0.19.0
|
||||
ARG envarg
|
||||
ENV ENTERPRISE_BUILD ${envarg}
|
||||
ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /tini
|
||||
RUN chmod +x /tini
|
||||
ENTRYPOINT ["/tini", "--"]
|
||||
CMD ./entrypoint.sh
|
||||
26
api/app_alerts.py
Normal file
26
api/app_alerts.py
Normal file
|
|
@ -0,0 +1,26 @@
|
|||
import logging
|
||||
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
from fastapi import FastAPI
|
||||
|
||||
from chalicelib.core import alerts_processor
|
||||
|
||||
app = FastAPI()
|
||||
print("============= ALERTS =============")
|
||||
|
||||
|
||||
@app.get("/")
|
||||
async def root():
|
||||
return {"status": "Running"}
|
||||
|
||||
|
||||
app.schedule = AsyncIOScheduler()
|
||||
app.schedule.start()
|
||||
app.schedule.add_job(id="alerts_processor", **{"func": alerts_processor.process, "trigger": "interval", "minutes": 5,
|
||||
"misfire_grace_time": 20})
|
||||
|
||||
for job in app.schedule.get_jobs():
|
||||
print({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)})
|
||||
|
||||
logging.basicConfig()
|
||||
logging.getLogger('apscheduler').setLevel(logging.INFO)
|
||||
|
|
@ -22,15 +22,19 @@ function build_api(){
|
|||
# Copy enterprise code
|
||||
[[ $1 == "ee" ]] && {
|
||||
cp -rf ../ee/api/* ./
|
||||
cp -rf ../ee/api/.chalice/* ./.chalice/
|
||||
envarg="default-ee"
|
||||
tag="ee-"
|
||||
}
|
||||
docker build -f ./Dockerfile --build-arg envarg=$envarg -t ${DOCKER_REPO:-'local'}/api:${git_sha1} .
|
||||
docker build -f ./Dockerfile.alerts --build-arg envarg=$envarg -t ${DOCKER_REPO:-'local'}/alerts:${git_sha1} .
|
||||
[[ $PUSH_IMAGE -eq 1 ]] && {
|
||||
docker push ${DOCKER_REPO:-'local'}/api:${git_sha1}
|
||||
docker tag ${DOCKER_REPO:-'local'}/api:${git_sha1} ${DOCKER_REPO:-'local'}/api:${tag}latest
|
||||
docker push ${DOCKER_REPO:-'local'}/api:${tag}latest
|
||||
|
||||
docker push ${DOCKER_REPO:-'local'}/alerts:${git_sha1}
|
||||
docker tag ${DOCKER_REPO:-'local'}/alerts:${git_sha1} ${DOCKER_REPO:-'local'}/alerts:${tag}latest
|
||||
docker push ${DOCKER_REPO:-'local'}/alerts:${tag}latest
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,8 +1,6 @@
|
|||
import json
|
||||
import time
|
||||
|
||||
from fastapi import BackgroundTasks
|
||||
|
||||
import schemas
|
||||
from chalicelib.core import notifications, slack, webhook
|
||||
from chalicelib.utils import pg_client, helper, email_helper
|
||||
|
|
@ -125,7 +123,7 @@ def update(id, data: schemas.AlertSchema):
|
|||
return {"data": __process_circular(a)}
|
||||
|
||||
|
||||
def process_notifications(data, background_tasks: BackgroundTasks):
|
||||
def process_notifications(data):
|
||||
full = {}
|
||||
for n in data:
|
||||
if "message" in n["options"]:
|
||||
|
|
@ -150,11 +148,23 @@ def process_notifications(data, background_tasks: BackgroundTasks):
|
|||
notifications_list = full[t][i:i + BATCH_SIZE]
|
||||
|
||||
if t == "slack":
|
||||
background_tasks.add_task(slack.send_batch, notifications_list=notifications_list)
|
||||
try:
|
||||
slack.send_batch(notifications_list=notifications_list)
|
||||
except Exception as e:
|
||||
print("!!!Error while sending slack notifications batch")
|
||||
print(str(e))
|
||||
elif t == "email":
|
||||
background_tasks.add_task(send_by_email_batch, notifications_list=notifications_list)
|
||||
try:
|
||||
send_by_email_batch(notifications_list=notifications_list)
|
||||
except Exception as e:
|
||||
print("!!!Error while sending email notifications batch")
|
||||
print(str(e))
|
||||
elif t == "webhook":
|
||||
background_tasks.add_task(webhook.trigger_batch, data_list=notifications_list)
|
||||
try:
|
||||
webhook.trigger_batch(data_list=notifications_list)
|
||||
except Exception as e:
|
||||
print("!!!Error while sending webhook notifications batch")
|
||||
print(str(e))
|
||||
|
||||
|
||||
def send_by_email(notification, destination):
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
import schemas
|
||||
from chalicelib.core import sessions
|
||||
from chalicelib.core import sessions, alerts
|
||||
from chalicelib.utils import pg_client, helper
|
||||
from chalicelib.utils.TimeUTC import TimeUTC
|
||||
|
||||
|
|
@ -112,10 +112,8 @@ def Build(a):
|
|||
WHERE project_id = %(project_id)s
|
||||
{"AND " + colDef["condition"] if colDef.get("condition") is not None else ""}"""
|
||||
j_s = colDef.get("joinSessions", True)
|
||||
print(">>>>>>>>>>>>>")
|
||||
print(j_s)
|
||||
# q = sq.Select(fmt.Sprint("value, coalesce(value,0)", a.Query.Operator, a.Query.Right, " AS valid"))
|
||||
q = f"""SELECT value, coalesce(value,0) {a["query"]["operator"]} {a["query"]["right"]} AS valid"""
|
||||
|
||||
q = f"""SELECT coalesce(value,0) AS value, coalesce(value,0) {a["query"]["operator"]} {a["query"]["right"]} AS valid"""
|
||||
|
||||
# if len(colDef.group) > 0 {
|
||||
# subQ = subQ.Column(colDef.group + " AS group_value")
|
||||
|
|
@ -196,8 +194,10 @@ def Build(a):
|
|||
|
||||
|
||||
def process():
|
||||
notifications = []
|
||||
with pg_client.PostgresClient(long_query=True) as cur:
|
||||
query = """SELECT alert_id,
|
||||
query = """SELECT -1 AS tenant_id,
|
||||
alert_id,
|
||||
project_id,
|
||||
detection_method,
|
||||
query,
|
||||
|
|
@ -217,14 +217,49 @@ def process():
|
|||
ORDER BY alerts.created_at;"""
|
||||
cur.execute(query=query)
|
||||
all_alerts = helper.list_to_camel_case(cur.fetchall())
|
||||
|
||||
with pg_client.PostgresClient() as cur:
|
||||
for alert in all_alerts:
|
||||
if True or can_check(alert):
|
||||
if can_check(alert):
|
||||
print(f"Querying alertId:{alert['alertId']} name: {alert['name']}")
|
||||
query, params = Build(alert)
|
||||
query = cur.mogrify(query, params)
|
||||
# print(alert)
|
||||
# print(query)
|
||||
cur.execute(query)
|
||||
result = cur.fetchone()
|
||||
if result["valid"]:
|
||||
print("Valid alert, notifying users")
|
||||
try:
|
||||
cur.execute(query)
|
||||
result = cur.fetchone()
|
||||
if result["valid"]:
|
||||
print("Valid alert, notifying users")
|
||||
notifications.append({
|
||||
"alertId": alert["alertId"],
|
||||
"tenantId": alert["tenantId"],
|
||||
"title": alert["name"],
|
||||
"description": f"has been triggered, {alert['query']['left']} = {result['value']} ({alert['query']['operator']} {alert['query']['right']}).",
|
||||
"buttonText": "Check metrics for more details",
|
||||
"buttonUrl": f"/{alert['projectId']}/metrics",
|
||||
"imageUrl": None,
|
||||
"options": {"source": "ALERT", "sourceId": alert["alertId"],
|
||||
"sourceMeta": alert["detectionMethod"],
|
||||
"message": alert["options"]["message"], "projectId": alert["projectId"],
|
||||
"data": {"title": alert["name"],
|
||||
"limitValue": alert["query"]["right"], "actualValue": result["value"],
|
||||
"operator": alert["query"]["operator"],
|
||||
"trigger": alert["query"]["left"],
|
||||
"alertId": alert["alertId"],
|
||||
"detectionMethod": alert["detectionMethod"],
|
||||
"currentPeriod": alert["options"]["currentPeriod"],
|
||||
"previousPeriod": alert["options"]["previousPeriod"],
|
||||
"createdAt": TimeUTC.now()}},
|
||||
})
|
||||
except Exception as e:
|
||||
print(f"!!!Error while running alert query for alertId:{alert['alertId']}")
|
||||
print(str(e))
|
||||
print(query)
|
||||
if len(notifications) > 0:
|
||||
cur.execute(
|
||||
cur.mogrify(f"""UPDATE public.Alerts
|
||||
SET options = options||'{{"lastNotification":{TimeUTC.now()}}}'::jsonb
|
||||
WHERE alert_id IN %(ids)s;""", {"ids": tuple([n["alertId"] for n in notifications])}))
|
||||
if len(notifications) > 0:
|
||||
alerts.process_notifications(notifications)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import requests
|
||||
|
||||
from chalicelib.utils import pg_client, helper
|
||||
from chalicelib.utils.TimeUTC import TimeUTC
|
||||
import requests
|
||||
|
||||
|
||||
def get_by_id(webhook_id):
|
||||
|
|
@ -81,7 +82,7 @@ def update(tenant_id, webhook_id, changes, replace_none=False):
|
|||
SET {','.join(sub_query)}
|
||||
WHERE webhook_id =%(id)s AND deleted_at ISNULL
|
||||
RETURNING webhook_id AS integration_id, webhook_id AS id,*;""",
|
||||
{"id": webhook_id, **changes}))
|
||||
{"id": webhook_id, **changes}))
|
||||
cur.execute(
|
||||
cur.mogrify(f"""\
|
||||
UPDATE public.webhooks
|
||||
|
|
@ -150,22 +151,18 @@ def trigger_batch(data_list):
|
|||
for w in data_list:
|
||||
if w["destination"] not in webhooks_map:
|
||||
webhooks_map[w["destination"]] = get_by_id(webhook_id=w["destination"])
|
||||
__trigger(hook=webhooks_map[w["destination"]], data=w["data"])
|
||||
if webhooks_map[w["destination"]] is None:
|
||||
print(f"!!Error webhook not found: webhook_id={w['destination']}")
|
||||
else:
|
||||
__trigger(hook=webhooks_map[w["destination"]], data=w["data"])
|
||||
|
||||
|
||||
def __trigger(hook, data):
|
||||
if hook["type"] == 'webhook':
|
||||
if hook is not None and hook["type"] == 'webhook':
|
||||
headers = {}
|
||||
if hook["authHeader"] is not None and len(hook["authHeader"]) > 0:
|
||||
headers = {"Authorization": hook["authHeader"]}
|
||||
|
||||
# body = {
|
||||
# "webhookId": hook["id"],
|
||||
# "createdAt": TimeUTC.now(),
|
||||
# "event": event,
|
||||
# "data": data
|
||||
# }
|
||||
|
||||
r = requests.post(url=hook["endpoint"], json=data, headers=headers)
|
||||
if r.status_code != 200:
|
||||
print("=======> webhook: something went wrong")
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ from chalicelib.core import log_tool_rollbar, sourcemaps, events, sessions_assig
|
|||
log_tool_stackdriver, reset_password, sessions_favorite_viewed, \
|
||||
log_tool_cloudwatch, log_tool_sentry, log_tool_sumologic, log_tools, errors, sessions, \
|
||||
log_tool_newrelic, announcements, log_tool_bugsnag, weekly_report, integration_jira_cloud, integration_github, \
|
||||
assist, heatmaps, mobile, signup, tenants, errors_favorite_viewed, boarding, notifications, webhook, slack, users, \
|
||||
assist, heatmaps, mobile, signup, tenants, errors_favorite_viewed, boarding, notifications, webhook, users, \
|
||||
custom_metrics, saved_search
|
||||
from chalicelib.core.collaboration_slack import Slack
|
||||
from chalicelib.utils import email_helper
|
||||
|
|
@ -954,19 +954,6 @@ def add_remove_favorite_error(projectId: int, errorId: str, action: str, startDa
|
|||
return {"errors": ["undefined action"]}
|
||||
|
||||
|
||||
@public_app.post('/async/alerts/notifications/{step}', tags=["async", "alerts"])
|
||||
@public_app.put('/async/alerts/notifications/{step}', tags=["async", "alerts"])
|
||||
def send_alerts_notification_async(step: str, data: schemas.AlertNotificationSchema = Body(...)):
|
||||
if data.auth != config("async_Token"):
|
||||
return {"errors": ["missing auth"]}
|
||||
if step == "slack":
|
||||
slack.send_batch(notifications_list=data.notifications)
|
||||
elif step == "email":
|
||||
alerts.send_by_email_batch(notifications_list=data.notifications)
|
||||
elif step == "webhook":
|
||||
webhook.trigger_batch(data_list=data.notifications)
|
||||
|
||||
|
||||
@app.get('/notifications', tags=['notifications'])
|
||||
def get_notifications(context: schemas.CurrentContext = Depends(OR_context)):
|
||||
return {"data": notifications.get_all(tenant_id=context.tenant_id, user_id=context.user_id)}
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import schemas
|
|||
from chalicelib.core import assist
|
||||
from chalicelib.core import integrations_manager
|
||||
from chalicelib.core import sessions
|
||||
from chalicelib.core import tenants, users, metadata, projects, license, alerts
|
||||
from chalicelib.core import tenants, users, metadata, projects, license
|
||||
from chalicelib.core import webhook
|
||||
from chalicelib.core.collaboration_slack import Slack
|
||||
from chalicelib.utils import captcha
|
||||
|
|
@ -209,13 +209,6 @@ def get_current_plan(context: schemas.CurrentContext = Depends(OR_context)):
|
|||
}
|
||||
|
||||
|
||||
@public_app.post('/alerts/notifications', tags=["alerts"])
|
||||
@public_app.put('/alerts/notifications', tags=["alerts"])
|
||||
def send_alerts_notifications(background_tasks: BackgroundTasks, data: schemas.AlertNotificationSchema = Body(...)):
|
||||
# TODO: validate token
|
||||
return {"data": alerts.process_notifications(data.notifications, background_tasks=background_tasks)}
|
||||
|
||||
|
||||
@public_app.get('/general_stats', tags=["private"], include_in_schema=False)
|
||||
def get_general_stats():
|
||||
return {"data": {"sessions:": sessions.count_all()}}
|
||||
|
|
|
|||
|
|
@ -88,16 +88,6 @@ class SearchErrorsSchema(BaseModel):
|
|||
order: Optional[str] = Field(None)
|
||||
|
||||
|
||||
class EmailNotificationSchema(BaseModel):
|
||||
notification: str = Field(...)
|
||||
destination: str = Field(...)
|
||||
|
||||
|
||||
class AlertNotificationSchema(BaseModel):
|
||||
auth: str = Field(...)
|
||||
notifications: List[EmailNotificationSchema] = Field(...)
|
||||
|
||||
|
||||
class CreateNotificationSchema(BaseModel):
|
||||
token: str = Field(...)
|
||||
notifications: List = Field(...)
|
||||
|
|
|
|||
|
|
@ -1,86 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"openreplay/backend/pkg/db/postgres"
|
||||
"openreplay/backend/pkg/env"
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
func main() {
|
||||
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
|
||||
POSTGRES_STRING := env.String("POSTGRES_STRING")
|
||||
NOTIFICATIONS_STRING := env.String("ALERT_NOTIFICATION_STRING")
|
||||
log.Printf("Notifications: %s \nPG: %s\n", NOTIFICATIONS_STRING, POSTGRES_STRING)
|
||||
pg := postgres.NewConn(POSTGRES_STRING)
|
||||
defer pg.Close()
|
||||
|
||||
pgs, err := sql.Open("postgres", POSTGRES_STRING+ "?sslmode=disable")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer pgs.Close()
|
||||
|
||||
manager := NewManager(NOTIFICATIONS_STRING, POSTGRES_STRING, pgs, pg)
|
||||
if err := pg.IterateAlerts(func(a *postgres.Alert, err error) {
|
||||
if err != nil {
|
||||
log.Printf("Postgres error: %v\n", err)
|
||||
return
|
||||
}
|
||||
log.Printf("Alert initialization: %+v\n", *a)
|
||||
//log.Printf("CreatedAt: %s\n", *a.CreatedAt)
|
||||
err = manager.Update(a)
|
||||
if err != nil {
|
||||
log.Printf("Alert parse error: %v | Alert: %+v\n", err, *a)
|
||||
return
|
||||
}
|
||||
}); err != nil {
|
||||
log.Fatalf("Postgres error: %v\n", err)
|
||||
}
|
||||
|
||||
listener, err := postgres.NewAlertsListener(POSTGRES_STRING)
|
||||
if err != nil {
|
||||
log.Fatalf("Postgres listener error: %v\n", err)
|
||||
}
|
||||
defer listener.Close()
|
||||
|
||||
sigchan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
tickAlert := time.Tick(1 * time.Minute)
|
||||
|
||||
log.Printf("Alert service started\n")
|
||||
manager.RequestAll()
|
||||
//return
|
||||
for {
|
||||
select {
|
||||
case sig := <-sigchan:
|
||||
log.Printf("Caught signal %v: terminating\n", sig)
|
||||
listener.Close()
|
||||
pg.Close()
|
||||
os.Exit(0)
|
||||
case <-tickAlert:
|
||||
log.Printf("Requesting all...%d alerts\n", manager.Length())
|
||||
manager.RequestAll()
|
||||
case iPointer := <-listener.Alerts:
|
||||
log.Printf("Alert update: %+v\n", *iPointer)
|
||||
//log.Printf("CreatedAt: %s\n", *iPointer.CreatedAt)
|
||||
//log.Printf("Notification received for AlertId: %d\n", iPointer.AlertID)
|
||||
err := manager.Update(iPointer)
|
||||
if err != nil {
|
||||
log.Printf("Alert parse error: %+v | Alert: %v\n", err, *iPointer)
|
||||
}
|
||||
case err := <-listener.Errors:
|
||||
log.Printf("listener error: %v\n", err)
|
||||
if err.Error() == "conn closed" {
|
||||
panic("Listener conn lost")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,171 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"openreplay/backend/pkg/db/postgres"
|
||||
)
|
||||
|
||||
const PGParallelLimit = 2
|
||||
|
||||
var pgCount int64
|
||||
|
||||
type manager struct {
|
||||
postgresString string
|
||||
notificationsUrl string
|
||||
alertsCache map[uint32]*postgres.Alert
|
||||
cacheMutex sync.Mutex
|
||||
pgParallel chan bool
|
||||
pgs *sql.DB
|
||||
pg *postgres.Conn
|
||||
pgMutex sync.Mutex
|
||||
notifications map[uint32]*postgres.TenantNotification
|
||||
notificationsGo *sync.WaitGroup
|
||||
notificationsMutex sync.Mutex
|
||||
}
|
||||
|
||||
func NewManager(notificationsUrl string, postgresString string, pgs *sql.DB, pg *postgres.Conn) *manager {
|
||||
return &manager{
|
||||
postgresString: postgresString,
|
||||
notificationsUrl: notificationsUrl,
|
||||
alertsCache: make(map[uint32]*postgres.Alert),
|
||||
cacheMutex: sync.Mutex{},
|
||||
pgParallel: make(chan bool, PGParallelLimit),
|
||||
pgs: pgs,
|
||||
pg: pg,
|
||||
pgMutex: sync.Mutex{},
|
||||
notifications: make(map[uint32]*postgres.TenantNotification),
|
||||
notificationsGo: &sync.WaitGroup{},
|
||||
notificationsMutex: sync.Mutex{},
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (m *manager) Length() int {
|
||||
return len(m.alertsCache)
|
||||
}
|
||||
|
||||
func (m *manager) Update(a *postgres.Alert) error {
|
||||
m.cacheMutex.Lock()
|
||||
defer m.cacheMutex.Unlock()
|
||||
_, exists := m.alertsCache[a.AlertID]
|
||||
if exists && a.DeletedAt != nil {
|
||||
log.Println("deleting alert from memory")
|
||||
delete(m.alertsCache, a.AlertID)
|
||||
return nil
|
||||
} else {
|
||||
m.alertsCache[a.AlertID] = a
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (m *manager) processAlert(a *postgres.Alert) {
|
||||
defer func() {
|
||||
defer m.notificationsGo.Done()
|
||||
<-m.pgParallel
|
||||
}()
|
||||
if !a.CanCheck() {
|
||||
log.Printf("cannot check %s", a.Name)
|
||||
return
|
||||
}
|
||||
//log.Printf("checking %+v", a)
|
||||
log.Printf("quering %s", a.Name)
|
||||
//--- For stats:
|
||||
atomic.AddInt64(&pgCount, 1)
|
||||
q, err := a.Build()
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
}
|
||||
|
||||
rows, err := q.RunWith(m.pgs).Query()
|
||||
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var (
|
||||
value sql.NullFloat64
|
||||
valid bool
|
||||
)
|
||||
if err := rows.Scan(&value, &valid); err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
|
||||
if valid && value.Valid {
|
||||
log.Printf("%s: valid", a.Name)
|
||||
m.notificationsMutex.Lock()
|
||||
m.notifications[a.AlertID] = &postgres.TenantNotification{
|
||||
TenantId: a.TenantId,
|
||||
Title: a.Name,
|
||||
Description: fmt.Sprintf("has been triggered, %s = %.0f (%s %.0f).", a.Query.Left, value.Float64, a.Query.Operator, a.Query.Right),
|
||||
ButtonText: "Check metrics for more details",
|
||||
ButtonUrl: fmt.Sprintf("/%d/metrics", a.ProjectID),
|
||||
ImageUrl: nil,
|
||||
Options: map[string]interface{}{"source": "ALERT", "sourceId": a.AlertID, "sourceMeta": a.DetectionMethod, "message": a.Options.Message, "projectId": a.ProjectID, "data": map[string]interface{}{"title": a.Name, "limitValue": a.Query.Right, "actualValue": value.Float64, "operator": a.Query.Operator, "trigger": a.Query.Left, "alertId": a.AlertID, "detectionMethod": a.DetectionMethod, "currentPeriod": a.Options.CurrentPeriod, "previousPeriod": a.Options.PreviousPeriod, "createdAt": time.Now().Unix() * 1000}},
|
||||
}
|
||||
m.notificationsMutex.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
func (m *manager) RequestAll() {
|
||||
now := time.Now().Unix()
|
||||
m.cacheMutex.Lock()
|
||||
for _, a := range m.alertsCache {
|
||||
m.pgParallel <- true
|
||||
m.notificationsGo.Add(1)
|
||||
go m.processAlert(a)
|
||||
//m.processAlert(a)
|
||||
}
|
||||
//log.Println("releasing cache")
|
||||
m.cacheMutex.Unlock()
|
||||
//log.Println("waiting for all alerts to finish")
|
||||
m.notificationsGo.Wait()
|
||||
log.Printf("done %d PG queries in: %ds", pgCount, time.Now().Unix()-now)
|
||||
pgCount = 0
|
||||
//log.Printf("Processing %d Notifications", len(m.notifications))
|
||||
m.notificationsMutex.Lock()
|
||||
go m.ProcessNotifications(m.notifications)
|
||||
m.notificationsMutex.Unlock()
|
||||
m.notifications = make(map[uint32]*postgres.TenantNotification)
|
||||
//log.Printf("Notifications purged: %d", len(m.notifications))
|
||||
}
|
||||
|
||||
func (m *manager) ProcessNotifications(allNotifications map[uint32]*postgres.TenantNotification) {
|
||||
if len(allNotifications) == 0 {
|
||||
log.Println("No notifications to process")
|
||||
return
|
||||
}
|
||||
log.Printf("sending %d notifications", len(allNotifications))
|
||||
allIds := make([]uint32, 0, len(allNotifications))
|
||||
toSend := postgres.Notifications{
|
||||
Notifications: []*postgres.TenantNotification{},
|
||||
}
|
||||
for k, n := range allNotifications {
|
||||
//log.Printf("notification for %d", k)
|
||||
allIds = append(allIds, k)
|
||||
toSend.Notifications = append(toSend.Notifications, n)
|
||||
}
|
||||
toSend.Send(m.notificationsUrl)
|
||||
if err := m.pg.SaveLastNotification(allIds); err != nil {
|
||||
log.Printf("Error saving LastNotification time: %v", err)
|
||||
if err.Error() == "conn closed" {
|
||||
m.pg = postgres.NewConn(m.postgresString)
|
||||
//if err != nil {
|
||||
// panic(fmt.Sprintf("Postgres renew notifications connection error: %v\n", err))
|
||||
//}
|
||||
if err := m.pg.SaveLastNotification(allIds); err != nil {
|
||||
panic(fmt.Sprintf("Error saving LastNotification time, suicide: %v", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,81 +0,0 @@
|
|||
{
|
||||
"version": "2.0",
|
||||
"app_name": "parrot",
|
||||
"environment_variables": {
|
||||
},
|
||||
"stages": {
|
||||
"default-ee": {
|
||||
"api_gateway_stage": "default-ee",
|
||||
"manage_iam_role": false,
|
||||
"iam_role_arn": "",
|
||||
"autogen_policy": true,
|
||||
"environment_variables": {
|
||||
"isFOS": "false",
|
||||
"isEE": "true",
|
||||
"stage": "default-ee",
|
||||
"jwt_issuer": "openreplay-default-ee",
|
||||
"sentryURL": "",
|
||||
"pg_host": "127.0.0.1",
|
||||
"pg_port": "9202",
|
||||
"pg_dbname": "app",
|
||||
"pg_user": "",
|
||||
"pg_password": "",
|
||||
"ch_host": "",
|
||||
"ch_port": "",
|
||||
"alert_ntf": "http://127.0.0.1:8000/async/alerts/notifications/%s",
|
||||
"email_signup": "http://127.0.0.1:8000/async/email_signup/%s",
|
||||
"email_funnel": "http://127.0.0.1:8000/async/funnel/%s",
|
||||
"email_plans": "http://127.0.0.1:8000/async/plans/%s",
|
||||
"email_basic": "http://127.0.0.1:8000/async/basic/%s",
|
||||
"assign_link": "http://127.0.0.1:8000/async/email_assignment",
|
||||
"captcha_server": "",
|
||||
"captcha_key": "",
|
||||
"sessions_bucket": "mobs",
|
||||
"sessions_region": "us-east-1",
|
||||
"put_S3_TTL": "20",
|
||||
"sourcemaps_reader": "http://utilities-openreplay.app.svc.cluster.local:9000/sourcemaps",
|
||||
"sourcemaps_bucket": "sourcemaps",
|
||||
"peers": "http://utilities-openreplay.app.svc.cluster.local:9000/assist/%s/peers",
|
||||
"js_cache_bucket": "sessions-assets",
|
||||
"async_Token": "",
|
||||
"EMAIL_HOST": "",
|
||||
"EMAIL_PORT": "587",
|
||||
"EMAIL_USER": "",
|
||||
"EMAIL_PASSWORD": "",
|
||||
"EMAIL_USE_TLS": "true",
|
||||
"EMAIL_USE_SSL": "false",
|
||||
"EMAIL_SSL_KEY": "",
|
||||
"EMAIL_SSL_CERT": "",
|
||||
"EMAIL_FROM": "OpenReplay<do-not-reply@openreplay.com>",
|
||||
"SITE_URL": "",
|
||||
"announcement_url": "",
|
||||
"jwt_secret": "SET A RANDOM STRING HERE",
|
||||
"jwt_algorithm": "HS512",
|
||||
"jwt_exp_delta_seconds": "2592000",
|
||||
"S3_HOST": "",
|
||||
"S3_KEY": "",
|
||||
"S3_SECRET": "",
|
||||
"LICENSE_KEY": "",
|
||||
"SAML2_MD_URL": "",
|
||||
"idp_entityId": "",
|
||||
"idp_sso_url": "",
|
||||
"idp_x509cert": "",
|
||||
"idp_sls_url": "",
|
||||
"idp_name": "",
|
||||
"sso_exp_delta_seconds": "172800",
|
||||
"sso_landing": "/login?jwt=%s",
|
||||
"invitation_link": "/api/users/invitation?token=%s",
|
||||
"change_password_link": "/reset-password?invitation=%s&&pass=%s",
|
||||
"iosBucket": "openreplay-ios-images",
|
||||
"version_number": "1.3.6",
|
||||
"assist_secret": ""
|
||||
},
|
||||
"lambda_timeout": 150,
|
||||
"lambda_memory_size": 400,
|
||||
"subnet_ids": [
|
||||
],
|
||||
"security_group_ids": [
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -8,7 +8,7 @@ import schemas
|
|||
import schemas_ee
|
||||
from chalicelib.core import integrations_manager
|
||||
from chalicelib.core import sessions
|
||||
from chalicelib.core import tenants, users, metadata, projects, license, alerts, assist
|
||||
from chalicelib.core import tenants, users, metadata, projects, license, assist
|
||||
from chalicelib.core import webhook
|
||||
from chalicelib.core.collaboration_slack import Slack
|
||||
from chalicelib.utils import captcha, SAML2_helper
|
||||
|
|
@ -213,13 +213,6 @@ def get_current_plan(context: schemas.CurrentContext = Depends(OR_context)):
|
|||
}
|
||||
|
||||
|
||||
@public_app.post('/alerts/notifications', tags=["alerts"])
|
||||
@public_app.put('/alerts/notifications', tags=["alerts"])
|
||||
def send_alerts_notifications(background_tasks: BackgroundTasks, data: schemas.AlertNotificationSchema = Body(...)):
|
||||
# TODO: validate token
|
||||
return {"data": alerts.process_notifications(data.notifications, background_tasks=background_tasks)}
|
||||
|
||||
|
||||
@public_app.get('/general_stats', tags=["private"], include_in_schema=False)
|
||||
def get_general_stats():
|
||||
return {"data": {"sessions:": sessions.count_all()}}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue