From 6f116353de949ceedcea2bffb13480da468b712b Mon Sep 17 00:00:00 2001 From: KRAIEM Taha Yassine Date: Tue, 11 May 2021 01:37:18 +0200 Subject: [PATCH] Changes: - Alerts worker use PG only --- backend/pkg/db/postgres/alert.go | 54 +++++++++++------------------- backend/services/alerts/main.go | 17 +++------- backend/services/alerts/manager.go | 48 +++++++++++++------------- 3 files changed, 47 insertions(+), 72 deletions(-) diff --git a/backend/pkg/db/postgres/alert.go b/backend/pkg/db/postgres/alert.go index e00ecf953..9511e10d6 100644 --- a/backend/pkg/db/postgres/alert.go +++ b/backend/pkg/db/postgres/alert.go @@ -106,27 +106,25 @@ type columnDefinition struct { } var LeftToDb = map[string]columnDefinition{ - "performance.dom_content_loaded.average": {table: "pages", formula: "COALESCE(AVG(NULLIF(dom_content_loaded_event_start ,0)),0)"}, - "performance.first_meaningful_paint.average": {table: "pages", formula: "COALESCE(AVG(NULLIF(first_contentful_paint,0)),0)"}, - "performance.page_load_time.average": {table: "pages", formula: "AVG(NULLIF(load_event_end ,0))"}, - "performance.dom_build_time.average": {table: "pages", formula: "AVG(NULLIF(dom_building_time,0))"}, - "performance.speed_index.average": {table: "pages", formula: "AVG(NULLIF(speed_index,0))"}, - //"avgSpeedIndexByLocation": {table: "pages", formula: "AVG(NULLIF(speed_index,0))", group: "user_country"}, - "performance.page_response_time.average": {table: "pages", formula: "AVG(NULLIF(response_time,0))"}, - "performance.ttfb.average": {table: "pages", formula: "AVG(NULLIF(first_paint,0))"}, - //"avgDomContentLoaded": {table: "pages", formula: "AVG(NULLIF(dom_content_loaded_event_time,0))"}, - "performance.time_to_render.averag": {table: "pages", formula: "AVG(NULLIF(visually_complete,0))"}, - "performance.image_load_time.average": {table: "resources", formula: "AVG(NULLIF(duration,0))", condition: "type=='img'"}, - "performance.request_load_time.average": {table: "resources", formula: "AVG(NULLIF(duration,0))", condition: "type=='fetch'"}, - "resources.load_time.average": {table: "resources", formula: "AVG(NULLIF(duration,0))"}, - "resources.missing.count": {table: "resources", formula: "COUNT(DISTINCT url_hostpath)", condition: "success==0"}, - "errors.4xx_5xx.count": {table: "resources", formula: "COUNT(session_id)", condition: "intDiv(status, 100)!=2"}, - "errors.4xx.count": {table: "resources", formula: "COUNT(session_id)", condition: "intDiv(status, 100)==4"}, - "errors.5xx.count": {table: "resources", formula: "COUNT(session_id)", condition: "intDiv(status, 100)==5"}, - "errors.javascript.impacted_sessions.count": {table: "resources", formula: "COUNT(DISTINCT session_id)", condition: "success=0 AND type='script'"}, - "performance.crashes.count": {table: "sessions", formula: "COUNT(DISTINCT session_id)", condition: "errors_count > 0"}, - "errors.javascript.count": {table: "errors", formula: "COUNT(DISTINCT session_id)", condition: "source=='js_exception'"}, - "errors.backend.count": {table: "errors", formula: "COUNT(DISTINCT session_id)", condition: "source!='js_exception'"}, + "performance.dom_content_loaded.average": {table: "events.pages", formula: "COALESCE(AVG(NULLIF(dom_content_loaded_time ,0)),0)"}, + "performance.first_meaningful_paint.average": {table: "events.pages", formula: "COALESCE(AVG(NULLIF(first_contentful_paint_time,0)),0)"}, + "performance.page_load_time.average": {table: "events.pages", formula: "AVG(NULLIF(load_time ,0))"}, + "performance.dom_build_time.average": {table: "events.pages", formula: "AVG(NULLIF(dom_building_time,0))"}, + "performance.speed_index.average": {table: "events.pages", formula: "AVG(NULLIF(speed_index,0))"}, + "performance.page_response_time.average": {table: "events.pages", formula: "AVG(NULLIF(response_time,0))"}, + "performance.ttfb.average": {table: "events.pages", formula: "AVG(NULLIF(first_paint_time,0))"}, + "performance.time_to_render.averag": {table: "events.pages", formula: "AVG(NULLIF(visually_complete,0))"}, + "performance.image_load_time.average": {table: "events.resources", formula: "AVG(NULLIF(duration,0))", condition: "type=='img'"}, + "performance.request_load_time.average": {table: "events.resources", formula: "AVG(NULLIF(duration,0))", condition: "type=='fetch'"}, + "resources.load_time.average": {table: "events.resources", formula: "AVG(NULLIF(duration,0))"}, + "resources.missing.count": {table: "events.resources", formula: "COUNT(DISTINCT url_hostpath)", condition: "success==0"}, + "errors.4xx_5xx.count": {table: "events.resources", formula: "COUNT(session_id)", condition: "status/100!=2"}, + "errors.4xx.count": {table: "events.resources", formula: "COUNT(session_id)", condition: "status/100==4"}, + "errors.5xx.count": {table: "events.resources", formula: "COUNT(session_id)", condition: "status/100==5"}, + "errors.javascript.impacted_sessions.count": {table: "events.resources", formula: "COUNT(DISTINCT session_id)", condition: "success= FALSE AND type='script'"}, + "performance.crashes.count": {table: "public.sessions", formula: "COUNT(DISTINCT session_id)", condition: "errors_count > 0"}, + "errors.javascript.count": {table: "events.errors INNER JOIN public.errors AS m_errors USING (error_id)", formula: "COUNT(DISTINCT session_id)", condition: "source=='js_exception'"}, + "errors.backend.count": {table: "events.errors INNER JOIN public.errors AS m_errors USING (error_id)", formula: "COUNT(DISTINCT session_id)", condition: "source!='js_exception'"}, } //This is the frequency of execution for each threshold @@ -151,20 +149,6 @@ func (a *Alert) CanCheck() bool { log.Printf("repetitionBase: %d NOT FOUND", repetitionBase) return false } - //for i := int64(0); i <= 10; i++ { - // now += 60 * 1000 - // log.Printf("%s: ((now-*a.CreatedAt)%%TimeInterval[repetitionBase]*60*1000) < 60*1000: %t", a.Name, ((now-*a.CreatedAt)%(TimeInterval[repetitionBase]*60*1000)) < 60*1000) - // log.Printf("now: %d", now) - // log.Printf("*a.CreatedAt: %d", *a.CreatedAt) - // log.Printf("now-*a.CreatedAt: %d", now-*a.CreatedAt) - // log.Printf("(now-*a.CreatedAt)%%TimeInterval[repetitionBase]*60*1000: %d", (now-*a.CreatedAt)%TimeInterval[repetitionBase]*60*1000) - //} - //return false - //log.Printf("%s: a.Options.RenotifyInterval<=0: %t", a.Name, a.Options.RenotifyInterval <= 0) - //log.Printf("%s: a.Options.LastNotification <= 0: %t", a.Name, a.Options.LastNotification <= 0) - //log.Printf("%s: (now-a.Options.LastNotification) > a.Options.RenotifyInterval*60*1000: %t", a.Name, (now-a.Options.LastNotification) > a.Options.RenotifyInterval*60*1000) - //log.Printf("%s: ((now-*a.CreatedAt)%%TimeInterval[repetitionBase]*60*1000) < 60*1000: %t", a.Name, ((now-*a.CreatedAt)%TimeInterval[repetitionBase]*60*1000) < 60*1000) - //log.Printf("%s: TimeInterval[repetitionBase]: %d", a.Name, TimeInterval[repetitionBase]) return a.DeletedAt == nil && a.Active && (a.Options.RenotifyInterval <= 0 || a.Options.LastNotification <= 0 || diff --git a/backend/services/alerts/main.go b/backend/services/alerts/main.go index c4398f470..2c0ff09f2 100644 --- a/backend/services/alerts/main.go +++ b/backend/services/alerts/main.go @@ -8,7 +8,6 @@ import ( "syscall" "time" - "github.com/ClickHouse/clickhouse-go" "openreplay/backend/pkg/env" "openreplay/backend/pkg/db/postgres" ) @@ -16,26 +15,18 @@ import ( func main() { log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) POSTGRES_STRING := env.String("POSTGRES_STRING") - CLICKHOUSE_STRING := env.String("CLICKHOUSE_STRING") NOTIFICATIONS_STRING := env.String("ALERT_NOTIFICATION_STRING") - log.Printf("Notifications: %s \nCH: %s\n", NOTIFICATIONS_STRING, CLICKHOUSE_STRING) + log.Printf("Notifications: %s \nPG: %s\n", NOTIFICATIONS_STRING, POSTGRES_STRING) pg := postgres.NewConn(POSTGRES_STRING) defer pg.Close() - ch, err := sql.Open("clickhouse", CLICKHOUSE_STRING) + pgs, err := sql.Open("postgres", POSTGRES_STRING) if err != nil { log.Fatal(err) } - if err := ch.Ping(); err != nil { - if exception, ok := err.(*clickhouse.Exception); ok { - log.Printf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace) - } else { - log.Println(err) - } - return - } - manager := NewManager(NOTIFICATIONS_STRING, ch, pg) + + manager := NewManager(NOTIFICATIONS_STRING, pgs, pg) if err := pg.IterateAlerts(func(a *postgres.Alert, err error) { if err != nil { log.Printf("Postgres error: %v\n", err) diff --git a/backend/services/alerts/manager.go b/backend/services/alerts/manager.go index 3f9e39851..9e8d9cde2 100644 --- a/backend/services/alerts/manager.go +++ b/backend/services/alerts/manager.go @@ -11,16 +11,16 @@ import ( "openreplay/backend/pkg/db/postgres" ) -const CHParallelLimit = 2 +const PGParallelLimit = 2 -var chCount int64 +var pgCount int64 type manager struct { notificationsUrl string alertsCache map[uint32]*postgres.Alert cacheMutex sync.Mutex - chParallel chan bool - ch *sql.DB + pgParallel chan bool + pgs *sql.DB pg *postgres.Conn pgMutex sync.Mutex notifications map[uint32]*postgres.TenantNotification @@ -28,13 +28,13 @@ type manager struct { notificationsMutex sync.Mutex } -func NewManager(notificationsUrl string, ch *sql.DB, pg *postgres.Conn) *manager { +func NewManager(notificationsUrl string, pg *sql.DB, pg *postgres.Conn) *manager { return &manager{ notificationsUrl: notificationsUrl, alertsCache: make(map[uint32]*postgres.Alert), cacheMutex: sync.Mutex{}, - chParallel: make(chan bool, CHParallelLimit), - ch: ch, + pgParallel: make(chan bool, PGParallelLimit), + pgs: pgs, pg: pg, pgMutex: sync.Mutex{}, notifications: make(map[uint32]*postgres.TenantNotification), @@ -64,29 +64,23 @@ func (m *manager) Update(a *postgres.Alert) error { func (m *manager) processAlert(a *postgres.Alert) { defer func() { defer m.notificationsGo.Done() - <-m.chParallel + <-m.pgParallel }() if !a.CanCheck() { - //log.Printf("cannot check %+v", a) - //log.Printf("cannot check alertId %d", a.AlertID) log.Printf("cannot check %s", a.Name) return } //log.Printf("checking %+v", a) log.Printf("quering %s", a.Name) //--- For stats: - atomic.AddInt64(&chCount, 1) + atomic.AddInt64(&pgCount, 1) q, err := a.Build() if err != nil { log.Println(err) return } - //sub1, args, _ := q.ToSql() - //log.Println(sub1) - //log.Println(args) - //return - rows, err := q.RunWith(m.ch).Query() + rows, err := q.RunWith(m.pgs).Query() if err != nil { log.Println(err) @@ -103,8 +97,7 @@ func (m *manager) processAlert(a *postgres.Alert) { log.Println(err) continue } - //log.Println(valid) - //log.Println(value.Valid) + if valid && value.Valid { log.Printf("%s: valid", a.Name) m.notificationsMutex.Lock() @@ -115,7 +108,7 @@ func (m *manager) processAlert(a *postgres.Alert) { ButtonText: "Check metrics for more details", ButtonUrl: fmt.Sprintf("/%d/metrics", a.ProjectID), ImageUrl: nil, - Options: map[string]interface{}{"source": "ALERT", "sourceId": a.AlertID, "sourceMeta": a.DetectionMethod, "message": a.Options.Message, "projectId": a.ProjectID}, + Options: map[string]interface{}{"source": "ALERT", "sourceId": a.AlertID, "sourceMeta": a.DetectionMethod, "message": a.Options.Message, "projectId": a.ProjectID, "data": map[string]interface{}{"title": a.Name, "limitValue": a.Query.Right, "actualValue": value.Float64, "operator": a.Query.Operator, "trigger": a.Query.Left, "alertId": a.AlertID, "detectionMethod": a.DetectionMethod, "currentPeriod": a.Options.CurrentPeriod, "previousPeriod": a.Options.PreviousPeriod, "createdAt": time.Now().Unix() * 1000}}, } m.notificationsMutex.Unlock() } @@ -126,7 +119,7 @@ func (m *manager) RequestAll() { now := time.Now().Unix() m.cacheMutex.Lock() for _, a := range m.alertsCache { - m.chParallel <- true + m.pgParallel <- true m.notificationsGo.Add(1) go m.processAlert(a) //m.processAlert(a) @@ -135,8 +128,8 @@ func (m *manager) RequestAll() { m.cacheMutex.Unlock() //log.Println("waiting for all alerts to finish") m.notificationsGo.Wait() - log.Printf("done %d CH queries in: %ds", chCount, time.Now().Unix()-now) - chCount = 0 + log.Printf("done %d PG queries in: %ds", pgCount, time.Now().Unix()-now) + pgCount = 0 //log.Printf("Processing %d Notifications", len(m.notifications)) m.notificationsMutex.Lock() go m.ProcessNotifications(m.notifications) @@ -146,7 +139,6 @@ func (m *manager) RequestAll() { } func (m *manager) ProcessNotifications(allNotifications map[uint32]*postgres.TenantNotification) { - //return if len(allNotifications) == 0 { log.Println("No notifications to process") return @@ -164,6 +156,14 @@ func (m *manager) ProcessNotifications(allNotifications map[uint32]*postgres.Ten toSend.Send(m.notificationsUrl) if err := m.pg.SaveLastNotification(allIds); err != nil { log.Printf("Error saving LastNotification time: %v", err) - return + if err.Error() == "conn closed" { + m.pg, err = postgres.NewPostgres(POSTGRES_STRING) + if err != nil { + panic(fmt.Sprintf("Postgres renew notifications connection error: %v\n", err)) + } + if err := m.pg.SaveLastNotification(allIds); err != nil { + panic(fmt.Sprintf("Error saving LastNotification time, suicide: %v", err)) + } + } } }