- Alerts worker use PG only
This commit is contained in:
KRAIEM Taha Yassine 2021-05-11 01:37:18 +02:00
parent 445cd871c2
commit 85041ac278
3 changed files with 47 additions and 72 deletions

View file

@ -106,27 +106,25 @@ type columnDefinition struct {
}
var LeftToDb = map[string]columnDefinition{
"performance.dom_content_loaded.average": {table: "pages", formula: "COALESCE(AVG(NULLIF(dom_content_loaded_event_start ,0)),0)"},
"performance.first_meaningful_paint.average": {table: "pages", formula: "COALESCE(AVG(NULLIF(first_contentful_paint,0)),0)"},
"performance.page_load_time.average": {table: "pages", formula: "AVG(NULLIF(load_event_end ,0))"},
"performance.dom_build_time.average": {table: "pages", formula: "AVG(NULLIF(dom_building_time,0))"},
"performance.speed_index.average": {table: "pages", formula: "AVG(NULLIF(speed_index,0))"},
//"avgSpeedIndexByLocation": {table: "pages", formula: "AVG(NULLIF(speed_index,0))", group: "user_country"},
"performance.page_response_time.average": {table: "pages", formula: "AVG(NULLIF(response_time,0))"},
"performance.ttfb.average": {table: "pages", formula: "AVG(NULLIF(first_paint,0))"},
//"avgDomContentLoaded": {table: "pages", formula: "AVG(NULLIF(dom_content_loaded_event_time,0))"},
"performance.time_to_render.averag": {table: "pages", formula: "AVG(NULLIF(visually_complete,0))"},
"performance.image_load_time.average": {table: "resources", formula: "AVG(NULLIF(duration,0))", condition: "type=='img'"},
"performance.request_load_time.average": {table: "resources", formula: "AVG(NULLIF(duration,0))", condition: "type=='fetch'"},
"resources.load_time.average": {table: "resources", formula: "AVG(NULLIF(duration,0))"},
"resources.missing.count": {table: "resources", formula: "COUNT(DISTINCT url_hostpath)", condition: "success==0"},
"errors.4xx_5xx.count": {table: "resources", formula: "COUNT(session_id)", condition: "intDiv(status, 100)!=2"},
"errors.4xx.count": {table: "resources", formula: "COUNT(session_id)", condition: "intDiv(status, 100)==4"},
"errors.5xx.count": {table: "resources", formula: "COUNT(session_id)", condition: "intDiv(status, 100)==5"},
"errors.javascript.impacted_sessions.count": {table: "resources", formula: "COUNT(DISTINCT session_id)", condition: "success=0 AND type='script'"},
"performance.crashes.count": {table: "sessions", formula: "COUNT(DISTINCT session_id)", condition: "errors_count > 0"},
"errors.javascript.count": {table: "errors", formula: "COUNT(DISTINCT session_id)", condition: "source=='js_exception'"},
"errors.backend.count": {table: "errors", formula: "COUNT(DISTINCT session_id)", condition: "source!='js_exception'"},
"performance.dom_content_loaded.average": {table: "events.pages", formula: "COALESCE(AVG(NULLIF(dom_content_loaded_time ,0)),0)"},
"performance.first_meaningful_paint.average": {table: "events.pages", formula: "COALESCE(AVG(NULLIF(first_contentful_paint_time,0)),0)"},
"performance.page_load_time.average": {table: "events.pages", formula: "AVG(NULLIF(load_time ,0))"},
"performance.dom_build_time.average": {table: "events.pages", formula: "AVG(NULLIF(dom_building_time,0))"},
"performance.speed_index.average": {table: "events.pages", formula: "AVG(NULLIF(speed_index,0))"},
"performance.page_response_time.average": {table: "events.pages", formula: "AVG(NULLIF(response_time,0))"},
"performance.ttfb.average": {table: "events.pages", formula: "AVG(NULLIF(first_paint_time,0))"},
"performance.time_to_render.averag": {table: "events.pages", formula: "AVG(NULLIF(visually_complete,0))"},
"performance.image_load_time.average": {table: "events.resources", formula: "AVG(NULLIF(duration,0))", condition: "type=='img'"},
"performance.request_load_time.average": {table: "events.resources", formula: "AVG(NULLIF(duration,0))", condition: "type=='fetch'"},
"resources.load_time.average": {table: "events.resources", formula: "AVG(NULLIF(duration,0))"},
"resources.missing.count": {table: "events.resources", formula: "COUNT(DISTINCT url_hostpath)", condition: "success==0"},
"errors.4xx_5xx.count": {table: "events.resources", formula: "COUNT(session_id)", condition: "status/100!=2"},
"errors.4xx.count": {table: "events.resources", formula: "COUNT(session_id)", condition: "status/100==4"},
"errors.5xx.count": {table: "events.resources", formula: "COUNT(session_id)", condition: "status/100==5"},
"errors.javascript.impacted_sessions.count": {table: "events.resources", formula: "COUNT(DISTINCT session_id)", condition: "success= FALSE AND type='script'"},
"performance.crashes.count": {table: "public.sessions", formula: "COUNT(DISTINCT session_id)", condition: "errors_count > 0"},
"errors.javascript.count": {table: "events.errors INNER JOIN public.errors AS m_errors USING (error_id)", formula: "COUNT(DISTINCT session_id)", condition: "source=='js_exception'"},
"errors.backend.count": {table: "events.errors INNER JOIN public.errors AS m_errors USING (error_id)", formula: "COUNT(DISTINCT session_id)", condition: "source!='js_exception'"},
}
//This is the frequency of execution for each threshold
@ -151,20 +149,6 @@ func (a *Alert) CanCheck() bool {
log.Printf("repetitionBase: %d NOT FOUND", repetitionBase)
return false
}
//for i := int64(0); i <= 10; i++ {
// now += 60 * 1000
// log.Printf("%s: ((now-*a.CreatedAt)%%TimeInterval[repetitionBase]*60*1000) < 60*1000: %t", a.Name, ((now-*a.CreatedAt)%(TimeInterval[repetitionBase]*60*1000)) < 60*1000)
// log.Printf("now: %d", now)
// log.Printf("*a.CreatedAt: %d", *a.CreatedAt)
// log.Printf("now-*a.CreatedAt: %d", now-*a.CreatedAt)
// log.Printf("(now-*a.CreatedAt)%%TimeInterval[repetitionBase]*60*1000: %d", (now-*a.CreatedAt)%TimeInterval[repetitionBase]*60*1000)
//}
//return false
//log.Printf("%s: a.Options.RenotifyInterval<=0: %t", a.Name, a.Options.RenotifyInterval <= 0)
//log.Printf("%s: a.Options.LastNotification <= 0: %t", a.Name, a.Options.LastNotification <= 0)
//log.Printf("%s: (now-a.Options.LastNotification) > a.Options.RenotifyInterval*60*1000: %t", a.Name, (now-a.Options.LastNotification) > a.Options.RenotifyInterval*60*1000)
//log.Printf("%s: ((now-*a.CreatedAt)%%TimeInterval[repetitionBase]*60*1000) < 60*1000: %t", a.Name, ((now-*a.CreatedAt)%TimeInterval[repetitionBase]*60*1000) < 60*1000)
//log.Printf("%s: TimeInterval[repetitionBase]: %d", a.Name, TimeInterval[repetitionBase])
return a.DeletedAt == nil && a.Active &&
(a.Options.RenotifyInterval <= 0 ||
a.Options.LastNotification <= 0 ||

View file

@ -8,7 +8,6 @@ import (
"syscall"
"time"
"github.com/ClickHouse/clickhouse-go"
"openreplay/backend/pkg/env"
"openreplay/backend/pkg/db/postgres"
)
@ -16,26 +15,18 @@ import (
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
POSTGRES_STRING := env.String("POSTGRES_STRING")
CLICKHOUSE_STRING := env.String("CLICKHOUSE_STRING")
NOTIFICATIONS_STRING := env.String("ALERT_NOTIFICATION_STRING")
log.Printf("Notifications: %s \nCH: %s\n", NOTIFICATIONS_STRING, CLICKHOUSE_STRING)
log.Printf("Notifications: %s \nPG: %s\n", NOTIFICATIONS_STRING, POSTGRES_STRING)
pg := postgres.NewConn(POSTGRES_STRING)
defer pg.Close()
ch, err := sql.Open("clickhouse", CLICKHOUSE_STRING)
pgs, err := sql.Open("postgres", POSTGRES_STRING)
if err != nil {
log.Fatal(err)
}
if err := ch.Ping(); err != nil {
if exception, ok := err.(*clickhouse.Exception); ok {
log.Printf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
} else {
log.Println(err)
}
return
}
manager := NewManager(NOTIFICATIONS_STRING, ch, pg)
manager := NewManager(NOTIFICATIONS_STRING, pgs, pg)
if err := pg.IterateAlerts(func(a *postgres.Alert, err error) {
if err != nil {
log.Printf("Postgres error: %v\n", err)

View file

@ -11,16 +11,16 @@ import (
"openreplay/backend/pkg/db/postgres"
)
const CHParallelLimit = 2
const PGParallelLimit = 2
var chCount int64
var pgCount int64
type manager struct {
notificationsUrl string
alertsCache map[uint32]*postgres.Alert
cacheMutex sync.Mutex
chParallel chan bool
ch *sql.DB
pgParallel chan bool
pgs *sql.DB
pg *postgres.Conn
pgMutex sync.Mutex
notifications map[uint32]*postgres.TenantNotification
@ -28,13 +28,13 @@ type manager struct {
notificationsMutex sync.Mutex
}
func NewManager(notificationsUrl string, ch *sql.DB, pg *postgres.Conn) *manager {
func NewManager(notificationsUrl string, pg *sql.DB, pg *postgres.Conn) *manager {
return &manager{
notificationsUrl: notificationsUrl,
alertsCache: make(map[uint32]*postgres.Alert),
cacheMutex: sync.Mutex{},
chParallel: make(chan bool, CHParallelLimit),
ch: ch,
pgParallel: make(chan bool, PGParallelLimit),
pgs: pgs,
pg: pg,
pgMutex: sync.Mutex{},
notifications: make(map[uint32]*postgres.TenantNotification),
@ -64,29 +64,23 @@ func (m *manager) Update(a *postgres.Alert) error {
func (m *manager) processAlert(a *postgres.Alert) {
defer func() {
defer m.notificationsGo.Done()
<-m.chParallel
<-m.pgParallel
}()
if !a.CanCheck() {
//log.Printf("cannot check %+v", a)
//log.Printf("cannot check alertId %d", a.AlertID)
log.Printf("cannot check %s", a.Name)
return
}
//log.Printf("checking %+v", a)
log.Printf("quering %s", a.Name)
//--- For stats:
atomic.AddInt64(&chCount, 1)
atomic.AddInt64(&pgCount, 1)
q, err := a.Build()
if err != nil {
log.Println(err)
return
}
//sub1, args, _ := q.ToSql()
//log.Println(sub1)
//log.Println(args)
//return
rows, err := q.RunWith(m.ch).Query()
rows, err := q.RunWith(m.pgs).Query()
if err != nil {
log.Println(err)
@ -103,8 +97,7 @@ func (m *manager) processAlert(a *postgres.Alert) {
log.Println(err)
continue
}
//log.Println(valid)
//log.Println(value.Valid)
if valid && value.Valid {
log.Printf("%s: valid", a.Name)
m.notificationsMutex.Lock()
@ -115,7 +108,7 @@ func (m *manager) processAlert(a *postgres.Alert) {
ButtonText: "Check metrics for more details",
ButtonUrl: fmt.Sprintf("/%d/metrics", a.ProjectID),
ImageUrl: nil,
Options: map[string]interface{}{"source": "ALERT", "sourceId": a.AlertID, "sourceMeta": a.DetectionMethod, "message": a.Options.Message, "projectId": a.ProjectID},
Options: map[string]interface{}{"source": "ALERT", "sourceId": a.AlertID, "sourceMeta": a.DetectionMethod, "message": a.Options.Message, "projectId": a.ProjectID, "data": map[string]interface{}{"title": a.Name, "limitValue": a.Query.Right, "actualValue": value.Float64, "operator": a.Query.Operator, "trigger": a.Query.Left, "alertId": a.AlertID, "detectionMethod": a.DetectionMethod, "currentPeriod": a.Options.CurrentPeriod, "previousPeriod": a.Options.PreviousPeriod, "createdAt": time.Now().Unix() * 1000}},
}
m.notificationsMutex.Unlock()
}
@ -126,7 +119,7 @@ func (m *manager) RequestAll() {
now := time.Now().Unix()
m.cacheMutex.Lock()
for _, a := range m.alertsCache {
m.chParallel <- true
m.pgParallel <- true
m.notificationsGo.Add(1)
go m.processAlert(a)
//m.processAlert(a)
@ -135,8 +128,8 @@ func (m *manager) RequestAll() {
m.cacheMutex.Unlock()
//log.Println("waiting for all alerts to finish")
m.notificationsGo.Wait()
log.Printf("done %d CH queries in: %ds", chCount, time.Now().Unix()-now)
chCount = 0
log.Printf("done %d PG queries in: %ds", pgCount, time.Now().Unix()-now)
pgCount = 0
//log.Printf("Processing %d Notifications", len(m.notifications))
m.notificationsMutex.Lock()
go m.ProcessNotifications(m.notifications)
@ -146,7 +139,6 @@ func (m *manager) RequestAll() {
}
func (m *manager) ProcessNotifications(allNotifications map[uint32]*postgres.TenantNotification) {
//return
if len(allNotifications) == 0 {
log.Println("No notifications to process")
return
@ -164,6 +156,14 @@ func (m *manager) ProcessNotifications(allNotifications map[uint32]*postgres.Ten
toSend.Send(m.notificationsUrl)
if err := m.pg.SaveLastNotification(allIds); err != nil {
log.Printf("Error saving LastNotification time: %v", err)
return
if err.Error() == "conn closed" {
m.pg, err = postgres.NewPostgres(POSTGRES_STRING)
if err != nil {
panic(fmt.Sprintf("Postgres renew notifications connection error: %v\n", err))
}
if err := m.pg.SaveLastNotification(allIds); err != nil {
panic(fmt.Sprintf("Error saving LastNotification time, suicide: %v", err))
}
}
}
}