db (feat): pg batching

This commit is contained in:
ShiKhu 2022-02-18 20:53:13 +01:00
parent c0f11b3cf4
commit 6930b71c5f
7 changed files with 74 additions and 51 deletions

View file

@ -65,7 +65,8 @@ func (c *PGCache) InsertMetadata(sessionID uint64, metadata *Metadata) error {
keyNo := project.GetMetadataNo(metadata.Key)
if keyNo == 0 {
// insert project metadata
// TODO: insert project metadata
return nil
}
if err := c.Conn.InsertMetadata(sessionID, keyNo, metadata.Value); err != nil {
return err

View file

@ -29,7 +29,7 @@ func (c *PGCache) InsertWebSessionStart(sessionID uint64, s *SessionStart) error
UserDeviceType: s.UserDeviceType,
UserDeviceMemorySize: s.UserDeviceMemorySize,
UserDeviceHeapSize: s.UserDeviceHeapSize,
UserID: &s.UserID,
UserID: &s.UserID,
}
if err := c.Conn.InsertSessionStart(sessionID, c.sessions[sessionID]); err != nil {
c.sessions[sessionID] = nil

View file

@ -15,7 +15,8 @@ func getTimeoutContext() context.Context {
}
type Conn struct {
c *pgxpool.Pool // TODO: conditional usage of Pool/Conn (use interface?)
c *pgxpool.Pool // TODO: conditional usage of Pool/Conn (use interface?)
batches map[uint64]*pgx.Batch
}
func NewConn(url string) *Conn {
@ -24,7 +25,8 @@ func NewConn(url string) *Conn {
log.Println(err)
log.Fatalln("pgxpool.Connect Error")
}
return &Conn{c}
batches := make(map[uint64]*pgx.Batch)
return &Conn{c, batches}
}
func (conn *Conn) Close() error {
@ -32,6 +34,31 @@ func (conn *Conn) Close() error {
return nil
}
func (conn *Conn) batchQueue(sessionID uint64, sql string, args ...interface{}) error {
batch, ok := conn.batches[sessionID]
if !ok {
conn.batches[sessionID] = &pgx.Batch{}
batch = conn.batches[sessionID]
}
batch.Queue(sql, args...)
return nil
}
func (conn *Conn) CommitBatches() {
for _, b := range conn.batches {
br := conn.c.SendBatch(getTimeoutContext(), b)
l := b.Len()
for i := 0; i < l; i++ {
if ct, err := br.Exec(); err != nil {
// TODO: ct info
log.Printf("Error in PG batch (command tag %v): %v \n", ct.String(), err)
}
}
br.Close() // returns err
}
conn.batches = make(map[uint64]*pgx.Batch)
}
func (conn *Conn) query(sql string, args ...interface{}) (pgx.Rows, error) {
return conn.c.Query(getTimeoutContext(), sql, args...)
}
@ -56,7 +83,7 @@ func (conn *Conn) begin() (_Tx, error) {
func (tx _Tx) exec(sql string, args ...interface{}) error {
_, err := tx.Exec(context.Background(), sql, args...)
return err;
return err
}
func (tx _Tx) rollback() error {
@ -66,5 +93,3 @@ func (tx _Tx) rollback() error {
func (tx _Tx) commit() error {
return tx.Commit(context.Background())
}

View file

@ -1,13 +1,13 @@
package postgres
import (
"fmt"
"log"
"strings"
"fmt"
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/hashid"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/db/types"
)
func getAutocompleteType(baseType string, platform string) string {
@ -22,7 +22,7 @@ func (conn *Conn) insertAutocompleteValue(sessionID uint64, tp string, value str
if len(value) == 0 {
return
}
if err := conn.exec(`
if err := conn.batchQueue(sessionID, `
INSERT INTO autocomplete (
value,
type,
@ -31,7 +31,7 @@ func (conn *Conn) insertAutocompleteValue(sessionID uint64, tp string, value str
$1, $2, project_id
FROM sessions
WHERE session_id = $3
) ON CONFLICT DO NOTHING`,
) ON CONFLICT DO NOTHING`,
value, tp, sessionID,
); err != nil {
log.Printf("Insert autocomplete error: %v", err)
@ -59,16 +59,16 @@ func (conn *Conn) InsertSessionStart(sessionID uint64, s *types.Session) error {
NULLIF($14, ''), NULLIF($15, ''), NULLIF($16, ''), NULLIF($17, 0), NULLIF($18, 0::bigint),
NULLIF($19, '')
)`,
sessionID, s.ProjectID, s.Timestamp,
sessionID, s.ProjectID, s.Timestamp,
s.UserUUID, s.UserDevice, s.UserDeviceType, s.UserCountry,
s.UserOS, s.UserOSVersion,
s.RevID,
s.RevID,
s.TrackerVersion, s.Timestamp/1000,
s.Platform,
s.UserAgent, s.UserBrowser, s.UserBrowserVersion, s.UserDeviceMemorySize, s.UserDeviceHeapSize,
s.UserID,
); err != nil {
return err;
return err
}
conn.insertAutocompleteValue(sessionID, getAutocompleteType("USEROS", s.Platform), s.UserOS)
conn.insertAutocompleteValue(sessionID, getAutocompleteType("USERDEVICE", s.Platform), s.UserDevice)
@ -81,7 +81,7 @@ func (conn *Conn) InsertSessionStart(sessionID uint64, s *types.Session) error {
func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, error) {
// Search acceleration
if err := conn.exec(`
if err := conn.batchQueue(sessionID, `
UPDATE sessions
SET issue_types=(SELECT
CASE WHEN errors_count > 0 THEN
@ -96,7 +96,7 @@ func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64,
`,
sessionID,
); err != nil {
log.Printf("Error while updating issue_types %v", sessionID)
log.Printf("Error while updating issue_types: %v. SessionID: %v", err, sessionID)
}
var dur uint64
@ -113,33 +113,33 @@ func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64,
}
func (conn *Conn) InsertRequest(sessionID uint64, timestamp uint64, index uint64, url string, duration uint64, success bool) error {
return conn.exec(`
return conn.batchQueue(sessionID, `
INSERT INTO events_common.requests (
session_id, timestamp, seq_index, url, duration, success
) VALUES (
$1, $2, $3, $4, $5, $6
)`,
sessionID, timestamp,
sessionID, timestamp,
getSqIdx(index),
url, duration, success,
)
}
func (conn *Conn) InsertCustomEvent(sessionID uint64, timestamp uint64, index uint64, name string, payload string) error {
return conn.exec(`
return conn.batchQueue(sessionID, `
INSERT INTO events_common.customs (
session_id, timestamp, seq_index, name, payload
) VALUES (
$1, $2, $3, $4, $5
)`,
sessionID, timestamp,
getSqIdx(index),
sessionID, timestamp,
getSqIdx(index),
name, payload,
)
}
func (conn *Conn) InsertUserID(sessionID uint64, userID string) error {
return conn.exec(`
return conn.batchQueue(sessionID, `
UPDATE sessions SET user_id = $1
WHERE session_id = $2`,
userID, sessionID,
@ -147,16 +147,15 @@ func (conn *Conn) InsertUserID(sessionID uint64, userID string) error {
}
func (conn *Conn) InsertUserAnonymousID(sessionID uint64, userAnonymousID string) error {
return conn.exec(`
return conn.batchQueue(sessionID, `
UPDATE sessions SET user_anonymous_id = $1
WHERE session_id = $2`,
userAnonymousID, sessionID,
)
}
func (conn *Conn) InsertMetadata(sessionID uint64, keyNo uint, value string) error {
return conn.exec(fmt.Sprintf(`
return conn.batchQueue(sessionID, fmt.Sprintf(`
UPDATE sessions SET metadata_%v = $1
WHERE session_id = $2`, keyNo),
value, sessionID,
@ -173,11 +172,11 @@ func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messag
issueID := hashid.IssueID(projectID, e)
// TEMP. TODO: nullable & json message field type
payload := &e.Payload;
payload := &e.Payload
if *payload == "" || *payload == "{}" {
payload = nil
}
context := &e.Context;
context := &e.Context
if *context == "" || *context == "{}" {
context = nil
}
@ -189,7 +188,7 @@ func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messag
project_id, $2, $3, $4, CAST($5 AS jsonb)
FROM sessions
WHERE session_id = $1
)ON CONFLICT DO NOTHING`,
)ON CONFLICT DO NOTHING`,
sessionID, issueID, e.Type, e.ContextString, context,
); err != nil {
return err
@ -199,8 +198,8 @@ func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messag
session_id, issue_id, timestamp, seq_index, payload
) VALUES (
$1, $2, $3, $4, CAST($5 AS jsonb)
)`,
sessionID, issueID, e.Timestamp,
)`,
sessionID, issueID, e.Timestamp,
getSqIdx(e.MessageID),
payload,
); err != nil {
@ -228,5 +227,3 @@ func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messag
}
return tx.commit()
}

View file

@ -68,16 +68,19 @@ func (conn *Conn) InsertWebPageEvent(sessionID uint64, e *PageEvent) error {
if err := tx.exec(`
INSERT INTO events.pages (
session_id, message_id, timestamp, referrer, base_referrer, host, path, base_path,
dom_content_loaded_time, load_time, response_end, first_paint_time, first_contentful_paint_time, speed_index, visually_complete, time_to_interactive,
dom_content_loaded_time, load_time, response_end, first_paint_time, first_contentful_paint_time,
speed_index, visually_complete, time_to_interactive,
response_time, dom_building_time
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8,
NULLIF($9, 0), NULLIF($10, 0), NULLIF($11, 0), NULLIF($12, 0), NULLIF($13, 0), NULLIF($14, 0), NULLIF($15, 0), NULLIF($16, 0),
NULLIF($9, 0), NULLIF($10, 0), NULLIF($11, 0), NULLIF($12, 0), NULLIF($13, 0),
NULLIF($14, 0), NULLIF($15, 0), NULLIF($16, 0),
NULLIF($17, 0), NULLIF($18, 0)
)
`,
sessionID, e.MessageID, e.Timestamp, e.Referrer, url.DiscardURLQuery(e.Referrer), host, path, url.DiscardURLQuery(path),
e.DomContentLoadedEventEnd, e.LoadEventEnd, e.ResponseEnd, e.FirstPaint, e.FirstContentfulPaint, e.SpeedIndex, e.VisuallyComplete, e.TimeToInteractive,
e.DomContentLoadedEventEnd, e.LoadEventEnd, e.ResponseEnd, e.FirstPaint, e.FirstContentfulPaint,
e.SpeedIndex, e.VisuallyComplete, e.TimeToInteractive,
calcResponseTime(e), calcDomBuildingTime(e),
); err != nil {
return err

View file

@ -1,21 +1,18 @@
package postgres
package postgres
import (
"openreplay/backend/pkg/url"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/url"
)
func (conn *Conn) InsertWebStatsLongtask(sessionID uint64, l *LongTask) error {
return nil // Do we even use them?
// conn.exec(``);
return nil // Do we even use them?
// conn.exec(``);
}
func (conn *Conn) InsertWebStatsPerformance(sessionID uint64, p *PerformanceTrackAggr) error {
timestamp := (p.TimestampEnd + p.TimestampStart) /2
return conn.exec(`
timestamp := (p.TimestampEnd + p.TimestampStart) / 2
return conn.batchQueue(sessionID, `
INSERT INTO events.performance (
session_id, timestamp, message_id,
min_fps, avg_fps, max_fps,
@ -34,7 +31,7 @@ func (conn *Conn) InsertWebStatsPerformance(sessionID uint64, p *PerformanceTrac
p.MinCPU, p.AvgCPU, p.MinCPU,
p.MinTotalJSHeapSize, p.AvgTotalJSHeapSize, p.MaxTotalJSHeapSize,
p.MinUsedJSHeapSize, p.AvgUsedJSHeapSize, p.MaxUsedJSHeapSize,
);
)
}
func (conn *Conn) InsertWebStatsResourceEvent(sessionID uint64, e *ResourceEvent) error {
@ -42,7 +39,7 @@ func (conn *Conn) InsertWebStatsResourceEvent(sessionID uint64, e *ResourceEvent
if err != nil {
return err
}
return conn.exec(`
return conn.batchQueue(sessionID, `
INSERT INTO events.resources (
session_id, timestamp, message_id,
type,
@ -58,11 +55,11 @@ func (conn *Conn) InsertWebStatsResourceEvent(sessionID uint64, e *ResourceEvent
NULLIF($10, '')::events.resource_method,
NULLIF($11, 0), NULLIF($12, 0), NULLIF($13, 0), NULLIF($14, 0), NULLIF($15, 0)
)`,
sessionID, e.Timestamp, e.MessageID,
sessionID, e.Timestamp, e.MessageID,
e.Type,
e.URL, host, url.DiscardURLQuery(e.URL),
e.Success, e.Status,
e.Success, e.Status,
url.EnsureMethod(e.Method),
e.Duration, e.TTFB, e.HeaderSize, e.EncodedBodySize, e.DecodedBodySize,
)
}
}

View file

@ -11,11 +11,11 @@ import (
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/env"
logger "openreplay/backend/pkg/log"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/services/db/heuristics"
logger "openreplay/backend/pkg/log"
)
var pg *cache.PGCache
@ -29,7 +29,6 @@ func main() {
heurFinder := heuristics.NewHandler()
statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"))
consumer := queue.NewMessageConsumer(
@ -91,6 +90,7 @@ func main() {
consumer.Close()
os.Exit(0)
case <-tick:
pg.CommitBatches()
if err := commitStats(); err != nil {
log.Printf("Error on stats commit: %v", err)
}