feat(backend): added extra information for db metrics
This commit is contained in:
parent
6d45a41c18
commit
bae347c4ff
1 changed files with 98 additions and 23 deletions
|
|
@ -24,15 +24,16 @@ type batchItem struct {
|
|||
}
|
||||
|
||||
type Conn struct {
|
||||
c *pgxpool.Pool // TODO: conditional usage of Pool/Conn (use interface?)
|
||||
batches map[uint64]*pgx.Batch
|
||||
batchSizes map[uint64]int
|
||||
rawBatches map[uint64][]*batchItem
|
||||
batchQueueLimit int
|
||||
batchSizeLimit int
|
||||
batchSizeBytes syncfloat64.Histogram
|
||||
batchSizeLines syncfloat64.Histogram
|
||||
sqlRequestTime syncfloat64.Histogram
|
||||
c *pgxpool.Pool // TODO: conditional usage of Pool/Conn (use interface?)
|
||||
batches map[uint64]*pgx.Batch
|
||||
batchSizes map[uint64]int
|
||||
rawBatches map[uint64][]*batchItem
|
||||
batchQueueLimit int
|
||||
batchSizeLimit int
|
||||
batchSizeBytes syncfloat64.Histogram
|
||||
batchSizeLines syncfloat64.Histogram
|
||||
sqlRequestTime syncfloat64.Histogram
|
||||
sqlRequestCounter syncfloat64.Counter
|
||||
}
|
||||
|
||||
func NewConn(url string, queueLimit, sizeLimit int, metrics *monitoring.Metrics) *Conn {
|
||||
|
|
@ -75,6 +76,10 @@ func (conn *Conn) initMetrics(metrics *monitoring.Metrics) {
|
|||
if err != nil {
|
||||
log.Printf("can't create sqlRequestTime metric: %s", err)
|
||||
}
|
||||
conn.sqlRequestCounter, err = metrics.RegisterCounter("sql_request_number")
|
||||
if err != nil {
|
||||
log.Printf("can't create sqlRequestNumber metric: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (conn *Conn) batchQueue(sessionID uint64, sql string, args ...interface{}) {
|
||||
|
|
@ -99,6 +104,10 @@ func (conn *Conn) CommitBatches() {
|
|||
// Record batch size in bytes and number of lines
|
||||
conn.batchSizeBytes.Record(context.Background(), float64(conn.batchSizes[sessID]))
|
||||
conn.batchSizeLines.Record(context.Background(), float64(b.Len()))
|
||||
|
||||
start := time.Now()
|
||||
isFailed := false
|
||||
|
||||
// Send batch to db and execute
|
||||
br := conn.c.SendBatch(getTimeoutContext(), b)
|
||||
l := b.Len()
|
||||
|
|
@ -108,9 +117,14 @@ func (conn *Conn) CommitBatches() {
|
|||
failedSql := conn.rawBatches[sessID][i]
|
||||
query := strings.ReplaceAll(failedSql.query, "\n", " ")
|
||||
log.Println("failed sql req:", query, failedSql.arguments)
|
||||
isFailed = true
|
||||
}
|
||||
}
|
||||
br.Close() // returns err
|
||||
conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
|
||||
attribute.String("method", "batch"), attribute.Bool("failed", isFailed))
|
||||
conn.sqlRequestCounter.Add(context.Background(), 1,
|
||||
attribute.String("method", "batch"), attribute.Bool("failed", isFailed))
|
||||
}
|
||||
conn.batches = make(map[uint64]*pgx.Batch)
|
||||
conn.batchSizes = make(map[uint64]int)
|
||||
|
|
@ -134,6 +148,10 @@ func (conn *Conn) commitBatch(sessionID uint64) {
|
|||
// Record batch size in bytes and number of lines
|
||||
conn.batchSizeBytes.Record(context.Background(), float64(conn.batchSizes[sessionID]))
|
||||
conn.batchSizeLines.Record(context.Background(), float64(b.Len()))
|
||||
|
||||
start := time.Now()
|
||||
isFailed := false
|
||||
|
||||
// Send batch to db and execute
|
||||
br := conn.c.SendBatch(getTimeoutContext(), b)
|
||||
l := b.Len()
|
||||
|
|
@ -143,10 +161,16 @@ func (conn *Conn) commitBatch(sessionID uint64) {
|
|||
failedSql := conn.rawBatches[sessionID][i]
|
||||
query := strings.ReplaceAll(failedSql.query, "\n", " ")
|
||||
log.Println("failed sql req:", query, failedSql.arguments)
|
||||
isFailed = true
|
||||
}
|
||||
}
|
||||
br.Close()
|
||||
|
||||
conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
|
||||
attribute.String("method", "batch"), attribute.Bool("failed", isFailed))
|
||||
conn.sqlRequestCounter.Add(context.Background(), 1,
|
||||
attribute.String("method", "batch"), attribute.Bool("failed", isFailed))
|
||||
|
||||
// Clean batch info
|
||||
delete(conn.batches, sessionID)
|
||||
delete(conn.batchSizes, sessionID)
|
||||
|
|
@ -156,61 +180,112 @@ func (conn *Conn) commitBatch(sessionID uint64) {
|
|||
func (conn *Conn) query(sql string, args ...interface{}) (pgx.Rows, error) {
|
||||
start := time.Now()
|
||||
res, err := conn.c.Query(getTimeoutContext(), sql, args...)
|
||||
conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), attribute.String("method", methodName(sql)))
|
||||
method, table := methodName(sql)
|
||||
conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
|
||||
attribute.String("method", method), attribute.String("table", table))
|
||||
conn.sqlRequestCounter.Add(context.Background(), 1,
|
||||
attribute.String("method", method), attribute.String("table", table))
|
||||
return res, err
|
||||
}
|
||||
|
||||
func (conn *Conn) queryRow(sql string, args ...interface{}) pgx.Row {
|
||||
start := time.Now()
|
||||
res := conn.c.QueryRow(getTimeoutContext(), sql, args...)
|
||||
conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), attribute.String("method", methodName(sql)))
|
||||
method, table := methodName(sql)
|
||||
conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
|
||||
attribute.String("method", method), attribute.String("table", table))
|
||||
conn.sqlRequestCounter.Add(context.Background(), 1,
|
||||
attribute.String("method", method), attribute.String("table", table))
|
||||
return res
|
||||
}
|
||||
|
||||
func (conn *Conn) exec(sql string, args ...interface{}) error {
|
||||
start := time.Now()
|
||||
_, err := conn.c.Exec(getTimeoutContext(), sql, args...)
|
||||
conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), attribute.String("method", methodName(sql)))
|
||||
method, table := methodName(sql)
|
||||
conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
|
||||
attribute.String("method", method), attribute.String("table", table))
|
||||
conn.sqlRequestCounter.Add(context.Background(), 1,
|
||||
attribute.String("method", method), attribute.String("table", table))
|
||||
return err
|
||||
}
|
||||
|
||||
type _Tx struct {
|
||||
pgx.Tx
|
||||
sqlRequestTime syncfloat64.Histogram
|
||||
sqlRequestTime syncfloat64.Histogram
|
||||
sqlRequestCounter syncfloat64.Counter
|
||||
}
|
||||
|
||||
func (conn *Conn) begin() (_Tx, error) {
|
||||
start := time.Now()
|
||||
tx, err := conn.c.Begin(context.Background())
|
||||
conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), attribute.String("method", "begin"))
|
||||
return _Tx{tx, conn.sqlRequestTime}, err
|
||||
conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
|
||||
attribute.String("method", "begin"))
|
||||
conn.sqlRequestCounter.Add(context.Background(), 1,
|
||||
attribute.String("method", "begin"))
|
||||
return _Tx{tx, conn.sqlRequestTime, conn.sqlRequestCounter}, err
|
||||
}
|
||||
|
||||
func (tx _Tx) exec(sql string, args ...interface{}) error {
|
||||
start := time.Now()
|
||||
_, err := tx.Exec(context.Background(), sql, args...)
|
||||
tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), attribute.String("method", methodName(sql)))
|
||||
method, table := methodName(sql)
|
||||
tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
|
||||
attribute.String("method", method), attribute.String("table", table))
|
||||
tx.sqlRequestCounter.Add(context.Background(), 1,
|
||||
attribute.String("method", method), attribute.String("table", table))
|
||||
return err
|
||||
}
|
||||
|
||||
func (tx _Tx) rollback() error {
|
||||
start := time.Now()
|
||||
err := tx.Rollback(context.Background())
|
||||
tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), attribute.String("method", "rollback"))
|
||||
tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
|
||||
attribute.String("method", "rollback"))
|
||||
tx.sqlRequestCounter.Add(context.Background(), 1,
|
||||
attribute.String("method", "rollback"))
|
||||
return err
|
||||
}
|
||||
|
||||
func (tx _Tx) commit() error {
|
||||
start := time.Now()
|
||||
err := tx.Commit(context.Background())
|
||||
tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), attribute.String("method", "commit"))
|
||||
tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
|
||||
attribute.String("method", "commit"))
|
||||
tx.sqlRequestCounter.Add(context.Background(), 1,
|
||||
attribute.String("method", "commit"))
|
||||
return err
|
||||
}
|
||||
|
||||
func methodName(sql string) string {
|
||||
method := "unknown"
|
||||
if parts := strings.Split(sql, ""); len(parts) > 0 {
|
||||
method = parts[0]
|
||||
func methodName(sql string) (string, string) {
|
||||
cmd, table := "unknown", "unknown"
|
||||
|
||||
// Prepare sql request for parsing
|
||||
sql = strings.TrimSpace(sql)
|
||||
sql = strings.ReplaceAll(sql, "\n", " ")
|
||||
sql = strings.ReplaceAll(sql, "\t", "")
|
||||
sql = strings.ToLower(sql)
|
||||
|
||||
// Get sql command name
|
||||
parts := strings.Split(sql, " ")
|
||||
if parts[0] == "" {
|
||||
return cmd, table
|
||||
} else {
|
||||
cmd = strings.TrimSpace(parts[0])
|
||||
}
|
||||
return strings.ToLower(method)
|
||||
|
||||
// Get table name
|
||||
switch cmd {
|
||||
case "select":
|
||||
for i, p := range parts {
|
||||
if strings.TrimSpace(p) == "from" {
|
||||
table = strings.TrimSpace(parts[i+1])
|
||||
}
|
||||
}
|
||||
case "update":
|
||||
table = strings.TrimSpace(parts[1])
|
||||
case "insert":
|
||||
table = strings.TrimSpace(parts[2])
|
||||
}
|
||||
return cmd, table
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue