openreplay/backend/pkg/db/postgres/bulk.go
Alexander 6830c8879f
web module refactoring (#2725)
* feat(server): moved an http server object into a pkg subdir to be reusable for http, spots, and integrations

* feat(web): isolated web module (server, router, middleware, utils) used in spots and new integrations

* feat(web): removed possible panic

* feat(web): split all handlers from http service into different packages for better management.

* feat(web): changed router's method signature

* feat(web): added missing handlers interface

* feat(web): added health middleware to remove unnecessary checks

* feat(web): customizable middleware set for web servers

* feat(web): simplified the handler's structure

* feat(web): created an unified server.Run method for all web services (http, spot, integrations)

* feat(web): fixed a json size limit issue

* feat(web): removed Keys and PG connection from router

* feat(web): simplified integration's main file

* feat(web): simplified spot's main file

* feat(web): simplified http's main file (builder)

* feat(web): refactored audit trail functionality

* feat(web): added ee version of audit trail

* feat(web): added ee version of conditions module

* feat(web): moved ee version of some web session structs

* feat(web): new format of web metrics

* feat(web): added new web metrics to all handlers

* feat(web): added justExpired feature to web ingest handler

* feat(web): added small integrations improvements
2024-11-21 17:48:04 +01:00

107 lines
2.5 KiB
Go

package postgres
import (
"bytes"
"errors"
"fmt"
"time"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/metrics/database"
)
const (
insertPrefix = `INSERT INTO `
insertValues = ` VALUES `
insertSuffix = ` ON CONFLICT DO NOTHING;`
)
type Bulk interface {
Append(args ...interface{}) error
Send() error
Table() string
}
type bulkImpl struct {
conn pool.Pool
table string
columns string
template string
setSize int
sizeLimit int
values []interface{}
}
func (b *bulkImpl) Append(args ...interface{}) error {
if len(args) != b.setSize {
return fmt.Errorf("wrong number of arguments, waited: %d, got: %d", b.setSize, len(args))
}
b.values = append(b.values, args...)
if len(b.values)/b.setSize >= b.sizeLimit {
return b.send()
}
return nil
}
func (b *bulkImpl) Send() error {
if len(b.values) == 0 {
return nil
}
return b.send()
}
func (b *bulkImpl) Table() string {
return b.table
}
func (b *bulkImpl) send() error {
start := time.Now()
size := len(b.values) / b.setSize
request := bytes.NewBufferString(insertPrefix + b.table + b.columns + insertValues)
args := make([]interface{}, b.setSize)
for i := 0; i < len(b.values)/b.setSize; i++ {
for j := 0; j < b.setSize; j++ {
args[j] = i*b.setSize + j + 1
}
if i > 0 {
request.WriteByte(',')
}
request.WriteString(fmt.Sprintf(b.template, args...))
}
request.WriteString(insertSuffix)
err := b.conn.Exec(request.String(), b.values...)
b.values = make([]interface{}, 0, b.setSize*b.sizeLimit)
if err != nil {
return fmt.Errorf("send bulk err: %s", err)
}
// Save bulk metrics
database.RecordBulkElements(float64(size), "pg", b.table)
database.RecordBulkInsertDuration(float64(time.Now().Sub(start).Milliseconds()), "pg", b.table)
return nil
}
func NewBulk(conn pool.Pool, table, columns, template string, setSize, sizeLimit int) (Bulk, error) {
switch {
case conn == nil:
return nil, errors.New("db conn is empty")
case table == "":
return nil, errors.New("table is empty")
case columns == "":
return nil, errors.New("columns is empty")
case template == "":
return nil, errors.New("template is empty")
case setSize <= 0:
return nil, errors.New("set size is wrong")
case sizeLimit <= 0:
return nil, errors.New("size limit is wrong")
}
return &bulkImpl{
conn: conn,
table: table,
columns: columns,
template: template,
setSize: setSize,
sizeLimit: sizeLimit,
values: make([]interface{}, 0, setSize*sizeLimit),
}, nil
}