feat(backend): control batch size and number of sql requests in db service to more accurate management data inserts (#540)

Co-authored-by: Alexander Zavorotynskiy <alexander@openreplay.com>
This commit is contained in:
Alexander 2022-06-15 12:57:09 +02:00 committed by GitHub
parent 6b5d9d3799
commit 3c6bd9613c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 120 additions and 60 deletions

View file

@ -50,6 +50,8 @@ ENV TZ=UTC \
FS_CLEAN_HRS=72 \
FILE_SPLIT_SIZE=300000 \
LOG_QUEUE_STATS_INTERVAL_SEC=60 \
BATCH_QUEUE_LIMIT=20 \
BATCH_SIZE_LIMIT=10000000 \
PARTITIONS_NUMBER=1

View file

@ -122,6 +122,7 @@ func main() {
consumer.Close()
os.Exit(0)
case <-commitTick:
// Send collected batches to db
pg.CommitBatches()
if err := saver.CommitStats(); err != nil {
log.Printf("Error on stats commit: %v", err)
@ -131,6 +132,7 @@ func main() {
log.Printf("Error on consumer commit: %v", err)
}
default:
// Handle new message from queue
err := consumer.ConsumeNext()
if err != nil {
log.Fatalf("Error on consumption: %v", err) // TODO: is always fatal?

View file

@ -14,6 +14,8 @@ type Config struct {
TopicRawIOS string
TopicTrigger string
CommitBatchTimeout time.Duration
BatchQueueLimit int
BatchSizeLimit int
}
func New() *Config {
@ -26,5 +28,7 @@ func New() *Config {
TopicRawIOS: env.String("TOPIC_RAW_IOS"),
TopicTrigger: env.String("TOPIC_TRIGGER"),
CommitBatchTimeout: 15 * time.Second,
BatchQueueLimit: env.Int("BATCH_QUEUE_LIMIT"),
BatchSizeLimit: env.Int("BATCH_SIZE_LIMIT"),
}
}

View file

@ -15,8 +15,11 @@ func getTimeoutContext() context.Context {
}
type Conn struct {
c *pgxpool.Pool // TODO: conditional usage of Pool/Conn (use interface?)
batches map[uint64]*pgx.Batch
c *pgxpool.Pool // TODO: conditional usage of Pool/Conn (use interface?)
batches map[uint64]*pgx.Batch
batchSizes map[uint64]int
batchQueueLimit int
batchSizeLimit int
}
func NewConn(url string) *Conn {
@ -25,8 +28,11 @@ func NewConn(url string) *Conn {
log.Println(err)
log.Fatalln("pgxpool.Connect Error")
}
batches := make(map[uint64]*pgx.Batch)
return &Conn{c, batches}
return &Conn{
c: c,
batches: make(map[uint64]*pgx.Batch),
batchSizes: make(map[uint64]int),
}
}
func (conn *Conn) Close() error {
@ -34,29 +40,56 @@ func (conn *Conn) Close() error {
return nil
}
func (conn *Conn) batchQueue(sessionID uint64, sql string, args ...interface{}) error {
func (conn *Conn) batchQueue(sessionID uint64, sql string, args ...interface{}) {
batch, ok := conn.batches[sessionID]
if !ok {
conn.batches[sessionID] = &pgx.Batch{}
batch = conn.batches[sessionID]
}
batch.Queue(sql, args...)
return nil
}
func (conn *Conn) CommitBatches() {
for _, b := range conn.batches {
for sessID, b := range conn.batches {
br := conn.c.SendBatch(getTimeoutContext(), b)
l := b.Len()
for i := 0; i < l; i++ {
if ct, err := br.Exec(); err != nil {
// TODO: ct info
log.Printf("Error in PG batch (command tag %v): %v \n", ct.String(), err)
log.Printf("Error in PG batch (command tag %s, session: %d): %v \n", ct.String(), sessID, err)
}
}
br.Close() // returns err
}
conn.batches = make(map[uint64]*pgx.Batch)
conn.batchSizes = make(map[uint64]int)
}
func (conn *Conn) updateBatchSize(sessionID uint64, reqSize int) {
conn.batchSizes[sessionID] += reqSize
if conn.batchSizes[sessionID] >= conn.batchSizeLimit || conn.batches[sessionID].Len() >= conn.batchQueueLimit {
conn.commitBatch(sessionID)
}
}
// Send only one batch to pg
func (conn *Conn) commitBatch(sessionID uint64) {
b, ok := conn.batches[sessionID]
if !ok {
log.Printf("can't find batch for session: %d", sessionID)
return
}
br := conn.c.SendBatch(getTimeoutContext(), b)
l := b.Len()
for i := 0; i < l; i++ {
if ct, err := br.Exec(); err != nil {
log.Printf("Error in PG batch (command tag %s, session: %d): %v \n", ct.String(), sessionID, err)
}
}
br.Close()
// Clean batch info
delete(conn.batches, sessionID)
delete(conn.batchSizes, sessionID)
}
func (conn *Conn) query(sql string, args ...interface{}) (pgx.Rows, error) {

View file

@ -14,8 +14,8 @@ type Integration struct {
Options json.RawMessage `json:"options"`
}
func (pg *Conn) IterateIntegrationsOrdered(iter func(integration *Integration, err error)) error {
rows, err := pg.query(`
func (conn *Conn) IterateIntegrationsOrdered(iter func(integration *Integration, err error)) error {
rows, err := conn.query(`
SELECT project_id, provider, options, request_data
FROM integrations
`)
@ -39,8 +39,8 @@ func (pg *Conn) IterateIntegrationsOrdered(iter func(integration *Integration, e
return nil
}
func (pg *Conn) UpdateIntegrationRequestData(i *Integration) error {
return pg.exec(`
func (conn *Conn) UpdateIntegrationRequestData(i *Integration) error {
return conn.exec(`
UPDATE integrations
SET request_data = $1
WHERE project_id=$2 AND provider=$3`,

View file

@ -2,7 +2,6 @@ package postgres
import (
"fmt"
"log"
"strings"
"openreplay/backend/pkg/db/types"
@ -22,7 +21,7 @@ func (conn *Conn) insertAutocompleteValue(sessionID uint64, tp string, value str
if len(value) == 0 {
return
}
if err := conn.batchQueue(sessionID, `
sqlRequest := `
INSERT INTO autocomplete (
value,
type,
@ -31,11 +30,11 @@ func (conn *Conn) insertAutocompleteValue(sessionID uint64, tp string, value str
$1, $2, project_id
FROM sessions
WHERE session_id = $3
) ON CONFLICT DO NOTHING`,
value, tp, sessionID,
); err != nil {
log.Printf("Insert autocomplete error: %v", err)
}
) ON CONFLICT DO NOTHING`
conn.batchQueue(sessionID, sqlRequest, value, tp, sessionID)
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+len(value)+len(tp)+8)
}
func (conn *Conn) InsertSessionStart(sessionID uint64, s *types.Session) error {
@ -80,9 +79,9 @@ func (conn *Conn) InsertSessionStart(sessionID uint64, s *types.Session) error {
}
func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, error) {
// Search acceleration
if err := conn.batchQueue(sessionID, `
UPDATE sessions
// TODO: search acceleration?
sqlRequest := `
UPDATE sessions
SET issue_types=(SELECT
CASE WHEN errors_count > 0 THEN
(COALESCE(ARRAY_AGG(DISTINCT ps.type), '{}') || 'js_exception'::issue_type)::issue_type[]
@ -92,12 +91,11 @@ func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64,
FROM events_common.issues
INNER JOIN issues AS ps USING (issue_id)
WHERE session_id = $1)
WHERE session_id = $1
`,
sessionID,
); err != nil {
log.Printf("Error while updating issue_types: %v. SessionID: %v", err, sessionID)
}
WHERE session_id = $1`
conn.batchQueue(sessionID, sqlRequest, sessionID)
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+8)
var dur uint64
if err := conn.queryRow(`
@ -113,54 +111,65 @@ func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64,
}
func (conn *Conn) InsertRequest(sessionID uint64, timestamp uint64, index uint64, url string, duration uint64, success bool) error {
return conn.batchQueue(sessionID, `
sqlRequest := `
INSERT INTO events_common.requests (
session_id, timestamp, seq_index, url, duration, success
) VALUES (
$1, $2, $3, $4, $5, $6
)`,
sessionID, timestamp,
getSqIdx(index),
url, duration, success,
)
)`
conn.batchQueue(sessionID, sqlRequest, sessionID, timestamp, getSqIdx(index), url, duration, success)
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+len(url)+8*4)
return nil
}
func (conn *Conn) InsertCustomEvent(sessionID uint64, timestamp uint64, index uint64, name string, payload string) error {
return conn.batchQueue(sessionID, `
sqlRequest := `
INSERT INTO events_common.customs (
session_id, timestamp, seq_index, name, payload
) VALUES (
$1, $2, $3, $4, $5
)`,
sessionID, timestamp,
getSqIdx(index),
name, payload,
)
)`
conn.batchQueue(sessionID, sqlRequest, sessionID, timestamp, getSqIdx(index), name, payload)
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+len(name)+len(payload)+8*3)
return nil
}
func (conn *Conn) InsertUserID(sessionID uint64, userID string) error {
return conn.batchQueue(sessionID, `
sqlRequest := `
UPDATE sessions SET user_id = $1
WHERE session_id = $2`,
userID, sessionID,
)
WHERE session_id = $2`
conn.batchQueue(sessionID, sqlRequest, userID, sessionID)
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+len(userID)+8)
return nil
}
func (conn *Conn) InsertUserAnonymousID(sessionID uint64, userAnonymousID string) error {
return conn.batchQueue(sessionID, `
sqlRequest := `
UPDATE sessions SET user_anonymous_id = $1
WHERE session_id = $2`,
userAnonymousID, sessionID,
)
WHERE session_id = $2`
conn.batchQueue(sessionID, sqlRequest, userAnonymousID, sessionID)
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+len(userAnonymousID)+8)
return nil
}
func (conn *Conn) InsertMetadata(sessionID uint64, keyNo uint, value string) error {
return conn.batchQueue(sessionID, fmt.Sprintf(`
sqlRequest := `
UPDATE sessions SET metadata_%v = $1
WHERE session_id = $2`, keyNo),
value, sessionID,
)
WHERE session_id = $2`
conn.batchQueue(sessionID, fmt.Sprintf(sqlRequest, keyNo), value, sessionID)
// conn.insertAutocompleteValue(sessionID, "METADATA", value)
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+len(value)+8*2)
return nil
}
func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messages.IssueEvent) error {

View file

@ -220,7 +220,8 @@ func (conn *Conn) InsertWebFetchEvent(sessionID uint64, savePayload bool, e *Fet
if err != nil {
return err
}
return conn.batchQueue(sessionID, `
sqlRequest := `
INSERT INTO events_common.requests (
session_id, timestamp, seq_index,
url, host, path, query,
@ -231,13 +232,18 @@ func (conn *Conn) InsertWebFetchEvent(sessionID uint64, savePayload bool, e *Fet
$4, $5, $6, $7,
$8, $9, $10::smallint, NULLIF($11, '')::http_method,
$12, $13
) ON CONFLICT DO NOTHING`,
) ON CONFLICT DO NOTHING`
conn.batchQueue(sessionID, sqlRequest,
sessionID, e.Timestamp, getSqIdx(e.MessageID),
e.URL, host, path, query,
request, response, e.Status, url.EnsureMethod(e.Method),
e.Duration, e.Status < 400,
)
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+len(e.URL)+len(host)+len(path)+len(query)+
len(*request)+len(*response)+len(url.EnsureMethod(e.Method))+8*5+1)
return nil
}
func (conn *Conn) InsertWebGraphQLEvent(sessionID uint64, savePayload bool, e *GraphQLEvent) error {
@ -247,7 +253,8 @@ func (conn *Conn) InsertWebGraphQLEvent(sessionID uint64, savePayload bool, e *G
response = &e.Response
}
conn.insertAutocompleteValue(sessionID, "GRAPHQL", e.OperationName)
return conn.batchQueue(sessionID, `
sqlRequest := `
INSERT INTO events.graphql (
session_id, timestamp, message_id,
name,
@ -256,9 +263,12 @@ func (conn *Conn) InsertWebGraphQLEvent(sessionID uint64, savePayload bool, e *G
$1, $2, $3,
$4,
$5, $6
) ON CONFLICT DO NOTHING`,
sessionID, e.Timestamp, e.MessageID,
e.OperationName,
request, response,
) ON CONFLICT DO NOTHING`
conn.batchQueue(sessionID, sqlRequest, sessionID, e.Timestamp, e.MessageID,
e.OperationName, request, response,
)
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+len(e.OperationName)+len(*request)+len(*response)+8*3)
return nil
}