diff --git a/backend/Dockerfile b/backend/Dockerfile index 08b7d871c..8de0b0279 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -50,6 +50,8 @@ ENV TZ=UTC \ FS_CLEAN_HRS=72 \ FILE_SPLIT_SIZE=300000 \ LOG_QUEUE_STATS_INTERVAL_SEC=60 \ + BATCH_QUEUE_LIMIT=20 \ + BATCH_SIZE_LIMIT=10000000 \ PARTITIONS_NUMBER=1 diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index 7928b4bb8..f64b9f5d5 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -122,6 +122,7 @@ func main() { consumer.Close() os.Exit(0) case <-commitTick: + // Send collected batches to db pg.CommitBatches() if err := saver.CommitStats(); err != nil { log.Printf("Error on stats commit: %v", err) @@ -131,6 +132,7 @@ func main() { log.Printf("Error on consumer commit: %v", err) } default: + // Handle new message from queue err := consumer.ConsumeNext() if err != nil { log.Fatalf("Error on consumption: %v", err) // TODO: is always fatal? diff --git a/backend/internal/config/db/config.go b/backend/internal/config/db/config.go index e074399dc..7dc262855 100644 --- a/backend/internal/config/db/config.go +++ b/backend/internal/config/db/config.go @@ -14,6 +14,8 @@ type Config struct { TopicRawIOS string TopicTrigger string CommitBatchTimeout time.Duration + BatchQueueLimit int + BatchSizeLimit int } func New() *Config { @@ -26,5 +28,7 @@ func New() *Config { TopicRawIOS: env.String("TOPIC_RAW_IOS"), TopicTrigger: env.String("TOPIC_TRIGGER"), CommitBatchTimeout: 15 * time.Second, + BatchQueueLimit: env.Int("BATCH_QUEUE_LIMIT"), + BatchSizeLimit: env.Int("BATCH_SIZE_LIMIT"), } } diff --git a/backend/pkg/db/postgres/connector.go b/backend/pkg/db/postgres/connector.go index 1eb29e04c..9b9724e58 100644 --- a/backend/pkg/db/postgres/connector.go +++ b/backend/pkg/db/postgres/connector.go @@ -15,8 +15,11 @@ func getTimeoutContext() context.Context { } type Conn struct { - c *pgxpool.Pool // TODO: conditional usage of Pool/Conn (use interface?) - batches map[uint64]*pgx.Batch + c *pgxpool.Pool // TODO: conditional usage of Pool/Conn (use interface?) + batches map[uint64]*pgx.Batch + batchSizes map[uint64]int + batchQueueLimit int + batchSizeLimit int } func NewConn(url string) *Conn { @@ -25,8 +28,11 @@ func NewConn(url string) *Conn { log.Println(err) log.Fatalln("pgxpool.Connect Error") } - batches := make(map[uint64]*pgx.Batch) - return &Conn{c, batches} + return &Conn{ + c: c, + batches: make(map[uint64]*pgx.Batch), + batchSizes: make(map[uint64]int), + } } func (conn *Conn) Close() error { @@ -34,29 +40,56 @@ func (conn *Conn) Close() error { return nil } -func (conn *Conn) batchQueue(sessionID uint64, sql string, args ...interface{}) error { +func (conn *Conn) batchQueue(sessionID uint64, sql string, args ...interface{}) { batch, ok := conn.batches[sessionID] if !ok { conn.batches[sessionID] = &pgx.Batch{} batch = conn.batches[sessionID] } batch.Queue(sql, args...) - return nil } func (conn *Conn) CommitBatches() { - for _, b := range conn.batches { + for sessID, b := range conn.batches { br := conn.c.SendBatch(getTimeoutContext(), b) l := b.Len() for i := 0; i < l; i++ { if ct, err := br.Exec(); err != nil { - // TODO: ct info - log.Printf("Error in PG batch (command tag %v): %v \n", ct.String(), err) + log.Printf("Error in PG batch (command tag %s, session: %d): %v \n", ct.String(), sessID, err) } } br.Close() // returns err } conn.batches = make(map[uint64]*pgx.Batch) + conn.batchSizes = make(map[uint64]int) +} + +func (conn *Conn) updateBatchSize(sessionID uint64, reqSize int) { + conn.batchSizes[sessionID] += reqSize + if conn.batchSizes[sessionID] >= conn.batchSizeLimit || conn.batches[sessionID].Len() >= conn.batchQueueLimit { + conn.commitBatch(sessionID) + } +} + +// Send only one batch to pg +func (conn *Conn) commitBatch(sessionID uint64) { + b, ok := conn.batches[sessionID] + if !ok { + log.Printf("can't find batch for session: %d", sessionID) + return + } + br := conn.c.SendBatch(getTimeoutContext(), b) + l := b.Len() + for i := 0; i < l; i++ { + if ct, err := br.Exec(); err != nil { + log.Printf("Error in PG batch (command tag %s, session: %d): %v \n", ct.String(), sessionID, err) + } + } + br.Close() + + // Clean batch info + delete(conn.batches, sessionID) + delete(conn.batchSizes, sessionID) } func (conn *Conn) query(sql string, args ...interface{}) (pgx.Rows, error) { diff --git a/backend/pkg/db/postgres/integration.go b/backend/pkg/db/postgres/integration.go index 94351b44d..e44bd726e 100644 --- a/backend/pkg/db/postgres/integration.go +++ b/backend/pkg/db/postgres/integration.go @@ -14,8 +14,8 @@ type Integration struct { Options json.RawMessage `json:"options"` } -func (pg *Conn) IterateIntegrationsOrdered(iter func(integration *Integration, err error)) error { - rows, err := pg.query(` +func (conn *Conn) IterateIntegrationsOrdered(iter func(integration *Integration, err error)) error { + rows, err := conn.query(` SELECT project_id, provider, options, request_data FROM integrations `) @@ -39,8 +39,8 @@ func (pg *Conn) IterateIntegrationsOrdered(iter func(integration *Integration, e return nil } -func (pg *Conn) UpdateIntegrationRequestData(i *Integration) error { - return pg.exec(` +func (conn *Conn) UpdateIntegrationRequestData(i *Integration) error { + return conn.exec(` UPDATE integrations SET request_data = $1 WHERE project_id=$2 AND provider=$3`, diff --git a/backend/pkg/db/postgres/messages-common.go b/backend/pkg/db/postgres/messages-common.go index 1f4f781ce..2fdb6ecfb 100644 --- a/backend/pkg/db/postgres/messages-common.go +++ b/backend/pkg/db/postgres/messages-common.go @@ -2,7 +2,6 @@ package postgres import ( "fmt" - "log" "strings" "openreplay/backend/pkg/db/types" @@ -22,7 +21,7 @@ func (conn *Conn) insertAutocompleteValue(sessionID uint64, tp string, value str if len(value) == 0 { return } - if err := conn.batchQueue(sessionID, ` + sqlRequest := ` INSERT INTO autocomplete ( value, type, @@ -31,11 +30,11 @@ func (conn *Conn) insertAutocompleteValue(sessionID uint64, tp string, value str $1, $2, project_id FROM sessions WHERE session_id = $3 - ) ON CONFLICT DO NOTHING`, - value, tp, sessionID, - ); err != nil { - log.Printf("Insert autocomplete error: %v", err) - } + ) ON CONFLICT DO NOTHING` + conn.batchQueue(sessionID, sqlRequest, value, tp, sessionID) + + // Record approximate message size + conn.updateBatchSize(sessionID, len(sqlRequest)+len(value)+len(tp)+8) } func (conn *Conn) InsertSessionStart(sessionID uint64, s *types.Session) error { @@ -80,9 +79,9 @@ func (conn *Conn) InsertSessionStart(sessionID uint64, s *types.Session) error { } func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, error) { - // Search acceleration - if err := conn.batchQueue(sessionID, ` - UPDATE sessions + // TODO: search acceleration? + sqlRequest := ` + UPDATE sessions SET issue_types=(SELECT CASE WHEN errors_count > 0 THEN (COALESCE(ARRAY_AGG(DISTINCT ps.type), '{}') || 'js_exception'::issue_type)::issue_type[] @@ -92,12 +91,11 @@ func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, FROM events_common.issues INNER JOIN issues AS ps USING (issue_id) WHERE session_id = $1) - WHERE session_id = $1 - `, - sessionID, - ); err != nil { - log.Printf("Error while updating issue_types: %v. SessionID: %v", err, sessionID) - } + WHERE session_id = $1` + conn.batchQueue(sessionID, sqlRequest, sessionID) + + // Record approximate message size + conn.updateBatchSize(sessionID, len(sqlRequest)+8) var dur uint64 if err := conn.queryRow(` @@ -113,54 +111,65 @@ func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, } func (conn *Conn) InsertRequest(sessionID uint64, timestamp uint64, index uint64, url string, duration uint64, success bool) error { - return conn.batchQueue(sessionID, ` + sqlRequest := ` INSERT INTO events_common.requests ( session_id, timestamp, seq_index, url, duration, success ) VALUES ( $1, $2, $3, $4, $5, $6 - )`, - sessionID, timestamp, - getSqIdx(index), - url, duration, success, - ) + )` + conn.batchQueue(sessionID, sqlRequest, sessionID, timestamp, getSqIdx(index), url, duration, success) + + // Record approximate message size + conn.updateBatchSize(sessionID, len(sqlRequest)+len(url)+8*4) + return nil } func (conn *Conn) InsertCustomEvent(sessionID uint64, timestamp uint64, index uint64, name string, payload string) error { - return conn.batchQueue(sessionID, ` + sqlRequest := ` INSERT INTO events_common.customs ( session_id, timestamp, seq_index, name, payload ) VALUES ( $1, $2, $3, $4, $5 - )`, - sessionID, timestamp, - getSqIdx(index), - name, payload, - ) + )` + conn.batchQueue(sessionID, sqlRequest, sessionID, timestamp, getSqIdx(index), name, payload) + + // Record approximate message size + conn.updateBatchSize(sessionID, len(sqlRequest)+len(name)+len(payload)+8*3) + return nil } func (conn *Conn) InsertUserID(sessionID uint64, userID string) error { - return conn.batchQueue(sessionID, ` + sqlRequest := ` UPDATE sessions SET user_id = $1 - WHERE session_id = $2`, - userID, sessionID, - ) + WHERE session_id = $2` + conn.batchQueue(sessionID, sqlRequest, userID, sessionID) + + // Record approximate message size + conn.updateBatchSize(sessionID, len(sqlRequest)+len(userID)+8) + return nil } func (conn *Conn) InsertUserAnonymousID(sessionID uint64, userAnonymousID string) error { - return conn.batchQueue(sessionID, ` + sqlRequest := ` UPDATE sessions SET user_anonymous_id = $1 - WHERE session_id = $2`, - userAnonymousID, sessionID, - ) + WHERE session_id = $2` + conn.batchQueue(sessionID, sqlRequest, userAnonymousID, sessionID) + + // Record approximate message size + conn.updateBatchSize(sessionID, len(sqlRequest)+len(userAnonymousID)+8) + return nil } func (conn *Conn) InsertMetadata(sessionID uint64, keyNo uint, value string) error { - return conn.batchQueue(sessionID, fmt.Sprintf(` + sqlRequest := ` UPDATE sessions SET metadata_%v = $1 - WHERE session_id = $2`, keyNo), - value, sessionID, - ) + WHERE session_id = $2` + conn.batchQueue(sessionID, fmt.Sprintf(sqlRequest, keyNo), value, sessionID) // conn.insertAutocompleteValue(sessionID, "METADATA", value) + + // Record approximate message size + conn.updateBatchSize(sessionID, len(sqlRequest)+len(value)+8*2) + return nil } func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messages.IssueEvent) error { diff --git a/backend/pkg/db/postgres/messages-web.go b/backend/pkg/db/postgres/messages-web.go index 197924fa9..495ca53e4 100644 --- a/backend/pkg/db/postgres/messages-web.go +++ b/backend/pkg/db/postgres/messages-web.go @@ -220,7 +220,8 @@ func (conn *Conn) InsertWebFetchEvent(sessionID uint64, savePayload bool, e *Fet if err != nil { return err } - return conn.batchQueue(sessionID, ` + + sqlRequest := ` INSERT INTO events_common.requests ( session_id, timestamp, seq_index, url, host, path, query, @@ -231,13 +232,18 @@ func (conn *Conn) InsertWebFetchEvent(sessionID uint64, savePayload bool, e *Fet $4, $5, $6, $7, $8, $9, $10::smallint, NULLIF($11, '')::http_method, $12, $13 - ) ON CONFLICT DO NOTHING`, + ) ON CONFLICT DO NOTHING` + conn.batchQueue(sessionID, sqlRequest, sessionID, e.Timestamp, getSqIdx(e.MessageID), e.URL, host, path, query, request, response, e.Status, url.EnsureMethod(e.Method), e.Duration, e.Status < 400, ) + // Record approximate message size + conn.updateBatchSize(sessionID, len(sqlRequest)+len(e.URL)+len(host)+len(path)+len(query)+ + len(*request)+len(*response)+len(url.EnsureMethod(e.Method))+8*5+1) + return nil } func (conn *Conn) InsertWebGraphQLEvent(sessionID uint64, savePayload bool, e *GraphQLEvent) error { @@ -247,7 +253,8 @@ func (conn *Conn) InsertWebGraphQLEvent(sessionID uint64, savePayload bool, e *G response = &e.Response } conn.insertAutocompleteValue(sessionID, "GRAPHQL", e.OperationName) - return conn.batchQueue(sessionID, ` + + sqlRequest := ` INSERT INTO events.graphql ( session_id, timestamp, message_id, name, @@ -256,9 +263,12 @@ func (conn *Conn) InsertWebGraphQLEvent(sessionID uint64, savePayload bool, e *G $1, $2, $3, $4, $5, $6 - ) ON CONFLICT DO NOTHING`, - sessionID, e.Timestamp, e.MessageID, - e.OperationName, - request, response, + ) ON CONFLICT DO NOTHING` + conn.batchQueue(sessionID, sqlRequest, sessionID, e.Timestamp, e.MessageID, + e.OperationName, request, response, ) + + // Record approximate message size + conn.updateBatchSize(sessionID, len(sqlRequest)+len(e.OperationName)+len(*request)+len(*response)+8*3) + return nil }