feat(backend): added metrics for postgres bulks (size and insert duration)

This commit is contained in:
Alexander Zavorotynskiy 2022-12-27 18:58:46 +01:00
parent d151c20ed8
commit ee64d91a7e
3 changed files with 63 additions and 37 deletions

View file

@ -2,8 +2,14 @@ package postgres
import (
"bytes"
"context"
"errors"
"fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"log"
"openreplay/backend/pkg/monitoring"
"time"
)
const (
@ -25,6 +31,8 @@ type bulkImpl struct {
setSize int
sizeLimit int
values []interface{}
bulkSize syncfloat64.Histogram
bulkDuration syncfloat64.Histogram
}
func (b *bulkImpl) Append(args ...interface{}) error {
@ -46,6 +54,8 @@ func (b *bulkImpl) Send() error {
}
func (b *bulkImpl) send() error {
start := time.Now()
size := len(b.values) / b.setSize
request := bytes.NewBufferString(insertPrefix + b.table + b.columns + insertValues)
args := make([]interface{}, b.setSize)
for i := 0; i < len(b.values)/b.setSize; i++ {
@ -63,13 +73,19 @@ func (b *bulkImpl) send() error {
if err != nil {
return fmt.Errorf("send bulk err: %s", err)
}
// Save bulk metrics
ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*200)
b.bulkDuration.Record(ctx, float64(time.Now().Sub(start).Milliseconds()), attribute.String("table", b.table))
b.bulkSize.Record(ctx, float64(size), attribute.String("table", b.table))
return nil
}
func NewBulk(conn Pool, table, columns, template string, setSize, sizeLimit int) (Bulk, error) {
func NewBulk(conn Pool, metrics *monitoring.Metrics, table, columns, template string, setSize, sizeLimit int) (Bulk, error) {
switch {
case conn == nil:
return nil, errors.New("db conn is empty")
case metrics == nil:
return nil, errors.New("metrics is empty")
case table == "":
return nil, errors.New("table is empty")
case columns == "":
@ -81,6 +97,14 @@ func NewBulk(conn Pool, table, columns, template string, setSize, sizeLimit int)
case sizeLimit <= 0:
return nil, errors.New("size limit is wrong")
}
messagesInBulk, err := metrics.RegisterHistogram("messages_in_bulk")
if err != nil {
log.Printf("can't create messages_size metric: %s", err)
}
bulkInsertDuration, err := metrics.RegisterHistogram("bulk_insert_duration")
if err != nil {
log.Printf("can't create messages_size metric: %s", err)
}
return &bulkImpl{
conn: conn,
table: table,
@ -89,5 +113,7 @@ func NewBulk(conn Pool, table, columns, template string, setSize, sizeLimit int)
setSize: setSize,
sizeLimit: sizeLimit,
values: make([]interface{}, 0, setSize*sizeLimit),
bulkSize: messagesInBulk,
bulkDuration: bulkInsertDuration,
}, nil
}

View file

@ -78,7 +78,7 @@ func NewConn(url string, queueLimit, sizeLimit int, metrics *monitoring.Metrics)
if err != nil {
log.Fatalf("can't create new pool wrapper: %s", err)
}
conn.initBulks()
conn.initBulks(metrics)
return conn
}
@ -107,9 +107,9 @@ func (conn *Conn) initMetrics(metrics *monitoring.Metrics) {
}
}
func (conn *Conn) initBulks() {
func (conn *Conn) initBulks(metrics *monitoring.Metrics) {
var err error
conn.autocompletes, err = NewBulk(conn.c,
conn.autocompletes, err = NewBulk(conn.c, metrics,
"autocomplete",
"(value, type, project_id)",
"($%d, $%d, $%d)",
@ -117,7 +117,7 @@ func (conn *Conn) initBulks() {
if err != nil {
log.Fatalf("can't create autocomplete bulk: %s", err)
}
conn.requests, err = NewBulk(conn.c,
conn.requests, err = NewBulk(conn.c, metrics,
"events_common.requests",
"(session_id, timestamp, seq_index, url, duration, success)",
"($%d, $%d, $%d, left($%d, 2700), $%d, $%d)",
@ -125,7 +125,7 @@ func (conn *Conn) initBulks() {
if err != nil {
log.Fatalf("can't create requests bulk: %s", err)
}
conn.customEvents, err = NewBulk(conn.c,
conn.customEvents, err = NewBulk(conn.c, metrics,
"events_common.customs",
"(session_id, timestamp, seq_index, name, payload)",
"($%d, $%d, $%d, left($%d, 2700), $%d)",
@ -133,7 +133,7 @@ func (conn *Conn) initBulks() {
if err != nil {
log.Fatalf("can't create customEvents bulk: %s", err)
}
conn.webPageEvents, err = NewBulk(conn.c,
conn.webPageEvents, err = NewBulk(conn.c, metrics,
"events.pages",
"(session_id, message_id, timestamp, referrer, base_referrer, host, path, query, dom_content_loaded_time, "+
"load_time, response_end, first_paint_time, first_contentful_paint_time, speed_index, visually_complete, "+
@ -144,7 +144,7 @@ func (conn *Conn) initBulks() {
if err != nil {
log.Fatalf("can't create webPageEvents bulk: %s", err)
}
conn.webInputEvents, err = NewBulk(conn.c,
conn.webInputEvents, err = NewBulk(conn.c, metrics,
"events.inputs",
"(session_id, message_id, timestamp, value, label)",
"($%d, $%d, $%d, $%d, NULLIF($%d,''))",
@ -152,7 +152,7 @@ func (conn *Conn) initBulks() {
if err != nil {
log.Fatalf("can't create webPageEvents bulk: %s", err)
}
conn.webGraphQL, err = NewBulk(conn.c,
conn.webGraphQL, err = NewBulk(conn.c, metrics,
"events.graphql",
"(session_id, timestamp, message_id, name, request_body, response_body)",
"($%d, $%d, $%d, left($%d, 2700), $%d, $%d)",
@ -160,7 +160,7 @@ func (conn *Conn) initBulks() {
if err != nil {
log.Fatalf("can't create webPageEvents bulk: %s", err)
}
conn.webErrors, err = NewBulk(conn.c,
conn.webErrors, err = NewBulk(conn.c, metrics,
"errors",
"(error_id, project_id, source, name, message, payload)",
"($%d, $%d, $%d, $%d, $%d, $%d::jsonb)",
@ -168,7 +168,7 @@ func (conn *Conn) initBulks() {
if err != nil {
log.Fatalf("can't create webErrors bulk: %s", err)
}
conn.webErrorEvents, err = NewBulk(conn.c,
conn.webErrorEvents, err = NewBulk(conn.c, metrics,
"events.errors",
"(session_id, message_id, timestamp, error_id)",
"($%d, $%d, $%d, $%d)",
@ -176,7 +176,7 @@ func (conn *Conn) initBulks() {
if err != nil {
log.Fatalf("can't create webErrorEvents bulk: %s", err)
}
conn.webErrorTags, err = NewBulk(conn.c,
conn.webErrorTags, err = NewBulk(conn.c, metrics,
"public.errors_tags",
"(session_id, message_id, error_id, key, value)",
"($%d, $%d, $%d, $%d, $%d)",
@ -184,7 +184,7 @@ func (conn *Conn) initBulks() {
if err != nil {
log.Fatalf("can't create webErrorEvents bulk: %s", err)
}
conn.webIssues, err = NewBulk(conn.c,
conn.webIssues, err = NewBulk(conn.c, metrics,
"issues",
"(project_id, issue_id, type, context_string)",
"($%d, $%d, $%d, $%d)",
@ -192,7 +192,7 @@ func (conn *Conn) initBulks() {
if err != nil {
log.Fatalf("can't create webIssues bulk: %s", err)
}
conn.webIssueEvents, err = NewBulk(conn.c,
conn.webIssueEvents, err = NewBulk(conn.c, metrics,
"events_common.issues",
"(session_id, issue_id, timestamp, seq_index, payload)",
"($%d, $%d, $%d, $%d, CAST($%d AS jsonb))",
@ -200,7 +200,7 @@ func (conn *Conn) initBulks() {
if err != nil {
log.Fatalf("can't create webIssueEvents bulk: %s", err)
}
conn.webCustomEvents, err = NewBulk(conn.c,
conn.webCustomEvents, err = NewBulk(conn.c, metrics,
"events_common.customs",
"(session_id, seq_index, timestamp, name, payload, level)",
"($%d, $%d, $%d, left($%d, 2700), $%d, $%d)",
@ -208,7 +208,7 @@ func (conn *Conn) initBulks() {
if err != nil {
log.Fatalf("can't create webCustomEvents bulk: %s", err)
}
conn.webClickEvents, err = NewBulk(conn.c,
conn.webClickEvents, err = NewBulk(conn.c, metrics,
"events.clicks",
"(session_id, message_id, timestamp, label, selector, url, path)",
"($%d, $%d, $%d, NULLIF($%d, ''), $%d, $%d, $%d)",
@ -216,7 +216,7 @@ func (conn *Conn) initBulks() {
if err != nil {
log.Fatalf("can't create webClickEvents bulk: %s", err)
}
conn.webNetworkRequest, err = NewBulk(conn.c,
conn.webNetworkRequest, err = NewBulk(conn.c, metrics,
"events_common.requests",
"(session_id, timestamp, seq_index, url, host, path, query, request_body, response_body, status_code, method, duration, success)",
"($%d, $%d, $%d, left($%d, 2700), $%d, $%d, $%d, $%d, $%d, $%d::smallint, NULLIF($%d, '')::http_method, $%d, $%d)",

View file

@ -76,8 +76,8 @@ Counter is a synchronous instrument that measures additive non-decreasing values
*/
func (m *Metrics) RegisterCounter(name string) (syncfloat64.Counter, error) {
if _, ok := m.counters[name]; ok {
return nil, fmt.Errorf("counter %s already exists", name)
if counter, ok := m.counters[name]; ok {
return counter, nil
}
counter, err := m.meter.SyncFloat64().Counter(name)
if err != nil {
@ -100,8 +100,8 @@ for example, the number of:
*/
func (m *Metrics) RegisterUpDownCounter(name string) (syncfloat64.UpDownCounter, error) {
if _, ok := m.upDownCounters[name]; ok {
return nil, fmt.Errorf("upDownCounter %s already exists", name)
if counter, ok := m.upDownCounters[name]; ok {
return counter, nil
}
counter, err := m.meter.SyncFloat64().UpDownCounter(name)
if err != nil {
@ -122,8 +122,8 @@ Histogram is a synchronous instrument that produces a histogram from recorded va
*/
func (m *Metrics) RegisterHistogram(name string) (syncfloat64.Histogram, error) {
if _, ok := m.histograms[name]; ok {
return nil, fmt.Errorf("histogram %s already exists", name)
if hist, ok := m.histograms[name]; ok {
return hist, nil
}
hist, err := m.meter.SyncFloat64().Histogram(name)
if err != nil {