From 6930b71c5fc207cd2cefefc65742520e9c157ac2 Mon Sep 17 00:00:00 2001 From: ShiKhu Date: Fri, 18 Feb 2022 20:53:13 +0100 Subject: [PATCH] db (feat): pg batching --- backend/pkg/db/cache/messages_common.go | 3 +- backend/pkg/db/cache/messages_web.go | 2 +- backend/pkg/db/postgres/connector.go | 35 ++++++++++++-- backend/pkg/db/postgres/messages_common.go | 47 +++++++++---------- backend/pkg/db/postgres/messages_web.go | 9 ++-- backend/pkg/db/postgres/messages_web_stats.go | 25 +++++----- backend/services/db/main.go | 4 +- 7 files changed, 74 insertions(+), 51 deletions(-) diff --git a/backend/pkg/db/cache/messages_common.go b/backend/pkg/db/cache/messages_common.go index 65c8bf4e1..3983982fe 100644 --- a/backend/pkg/db/cache/messages_common.go +++ b/backend/pkg/db/cache/messages_common.go @@ -65,7 +65,8 @@ func (c *PGCache) InsertMetadata(sessionID uint64, metadata *Metadata) error { keyNo := project.GetMetadataNo(metadata.Key) if keyNo == 0 { - // insert project metadata + // TODO: insert project metadata + return nil } if err := c.Conn.InsertMetadata(sessionID, keyNo, metadata.Value); err != nil { return err diff --git a/backend/pkg/db/cache/messages_web.go b/backend/pkg/db/cache/messages_web.go index 21b3ac866..4aa4dfa7b 100644 --- a/backend/pkg/db/cache/messages_web.go +++ b/backend/pkg/db/cache/messages_web.go @@ -29,7 +29,7 @@ func (c *PGCache) InsertWebSessionStart(sessionID uint64, s *SessionStart) error UserDeviceType: s.UserDeviceType, UserDeviceMemorySize: s.UserDeviceMemorySize, UserDeviceHeapSize: s.UserDeviceHeapSize, - UserID: &s.UserID, + UserID: &s.UserID, } if err := c.Conn.InsertSessionStart(sessionID, c.sessions[sessionID]); err != nil { c.sessions[sessionID] = nil diff --git a/backend/pkg/db/postgres/connector.go b/backend/pkg/db/postgres/connector.go index cfa8f28f8..9e4e82633 100644 --- a/backend/pkg/db/postgres/connector.go +++ b/backend/pkg/db/postgres/connector.go @@ -15,7 +15,8 @@ func getTimeoutContext() context.Context { } type Conn struct { - c *pgxpool.Pool // TODO: conditional usage of Pool/Conn (use interface?) + c *pgxpool.Pool // TODO: conditional usage of Pool/Conn (use interface?) + batches map[uint64]*pgx.Batch } func NewConn(url string) *Conn { @@ -24,7 +25,8 @@ func NewConn(url string) *Conn { log.Println(err) log.Fatalln("pgxpool.Connect Error") } - return &Conn{c} + batches := make(map[uint64]*pgx.Batch) + return &Conn{c, batches} } func (conn *Conn) Close() error { @@ -32,6 +34,31 @@ func (conn *Conn) Close() error { return nil } +func (conn *Conn) batchQueue(sessionID uint64, sql string, args ...interface{}) error { + batch, ok := conn.batches[sessionID] + if !ok { + conn.batches[sessionID] = &pgx.Batch{} + batch = conn.batches[sessionID] + } + batch.Queue(sql, args...) + return nil +} + +func (conn *Conn) CommitBatches() { + for _, b := range conn.batches { + br := conn.c.SendBatch(getTimeoutContext(), b) + l := b.Len() + for i := 0; i < l; i++ { + if ct, err := br.Exec(); err != nil { + // TODO: ct info + log.Printf("Error in PG batch (command tag %v): %v \n", ct.String(), err) + } + } + br.Close() // returns err + } + conn.batches = make(map[uint64]*pgx.Batch) +} + func (conn *Conn) query(sql string, args ...interface{}) (pgx.Rows, error) { return conn.c.Query(getTimeoutContext(), sql, args...) } @@ -56,7 +83,7 @@ func (conn *Conn) begin() (_Tx, error) { func (tx _Tx) exec(sql string, args ...interface{}) error { _, err := tx.Exec(context.Background(), sql, args...) - return err; + return err } func (tx _Tx) rollback() error { @@ -66,5 +93,3 @@ func (tx _Tx) rollback() error { func (tx _Tx) commit() error { return tx.Commit(context.Background()) } - - diff --git a/backend/pkg/db/postgres/messages_common.go b/backend/pkg/db/postgres/messages_common.go index df539e05c..1f4f781ce 100644 --- a/backend/pkg/db/postgres/messages_common.go +++ b/backend/pkg/db/postgres/messages_common.go @@ -1,13 +1,13 @@ package postgres import ( + "fmt" "log" "strings" - "fmt" + "openreplay/backend/pkg/db/types" "openreplay/backend/pkg/hashid" "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/db/types" ) func getAutocompleteType(baseType string, platform string) string { @@ -22,7 +22,7 @@ func (conn *Conn) insertAutocompleteValue(sessionID uint64, tp string, value str if len(value) == 0 { return } - if err := conn.exec(` + if err := conn.batchQueue(sessionID, ` INSERT INTO autocomplete ( value, type, @@ -31,7 +31,7 @@ func (conn *Conn) insertAutocompleteValue(sessionID uint64, tp string, value str $1, $2, project_id FROM sessions WHERE session_id = $3 - ) ON CONFLICT DO NOTHING`, + ) ON CONFLICT DO NOTHING`, value, tp, sessionID, ); err != nil { log.Printf("Insert autocomplete error: %v", err) @@ -59,16 +59,16 @@ func (conn *Conn) InsertSessionStart(sessionID uint64, s *types.Session) error { NULLIF($14, ''), NULLIF($15, ''), NULLIF($16, ''), NULLIF($17, 0), NULLIF($18, 0::bigint), NULLIF($19, '') )`, - sessionID, s.ProjectID, s.Timestamp, + sessionID, s.ProjectID, s.Timestamp, s.UserUUID, s.UserDevice, s.UserDeviceType, s.UserCountry, s.UserOS, s.UserOSVersion, - s.RevID, + s.RevID, s.TrackerVersion, s.Timestamp/1000, s.Platform, s.UserAgent, s.UserBrowser, s.UserBrowserVersion, s.UserDeviceMemorySize, s.UserDeviceHeapSize, s.UserID, ); err != nil { - return err; + return err } conn.insertAutocompleteValue(sessionID, getAutocompleteType("USEROS", s.Platform), s.UserOS) conn.insertAutocompleteValue(sessionID, getAutocompleteType("USERDEVICE", s.Platform), s.UserDevice) @@ -81,7 +81,7 @@ func (conn *Conn) InsertSessionStart(sessionID uint64, s *types.Session) error { func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, error) { // Search acceleration - if err := conn.exec(` + if err := conn.batchQueue(sessionID, ` UPDATE sessions SET issue_types=(SELECT CASE WHEN errors_count > 0 THEN @@ -96,7 +96,7 @@ func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, `, sessionID, ); err != nil { - log.Printf("Error while updating issue_types %v", sessionID) + log.Printf("Error while updating issue_types: %v. SessionID: %v", err, sessionID) } var dur uint64 @@ -113,33 +113,33 @@ func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, } func (conn *Conn) InsertRequest(sessionID uint64, timestamp uint64, index uint64, url string, duration uint64, success bool) error { - return conn.exec(` + return conn.batchQueue(sessionID, ` INSERT INTO events_common.requests ( session_id, timestamp, seq_index, url, duration, success ) VALUES ( $1, $2, $3, $4, $5, $6 )`, - sessionID, timestamp, + sessionID, timestamp, getSqIdx(index), url, duration, success, ) } func (conn *Conn) InsertCustomEvent(sessionID uint64, timestamp uint64, index uint64, name string, payload string) error { - return conn.exec(` + return conn.batchQueue(sessionID, ` INSERT INTO events_common.customs ( session_id, timestamp, seq_index, name, payload ) VALUES ( $1, $2, $3, $4, $5 )`, - sessionID, timestamp, - getSqIdx(index), + sessionID, timestamp, + getSqIdx(index), name, payload, ) } func (conn *Conn) InsertUserID(sessionID uint64, userID string) error { - return conn.exec(` + return conn.batchQueue(sessionID, ` UPDATE sessions SET user_id = $1 WHERE session_id = $2`, userID, sessionID, @@ -147,16 +147,15 @@ func (conn *Conn) InsertUserID(sessionID uint64, userID string) error { } func (conn *Conn) InsertUserAnonymousID(sessionID uint64, userAnonymousID string) error { - return conn.exec(` + return conn.batchQueue(sessionID, ` UPDATE sessions SET user_anonymous_id = $1 WHERE session_id = $2`, userAnonymousID, sessionID, ) } - func (conn *Conn) InsertMetadata(sessionID uint64, keyNo uint, value string) error { - return conn.exec(fmt.Sprintf(` + return conn.batchQueue(sessionID, fmt.Sprintf(` UPDATE sessions SET metadata_%v = $1 WHERE session_id = $2`, keyNo), value, sessionID, @@ -173,11 +172,11 @@ func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messag issueID := hashid.IssueID(projectID, e) // TEMP. TODO: nullable & json message field type - payload := &e.Payload; + payload := &e.Payload if *payload == "" || *payload == "{}" { payload = nil } - context := &e.Context; + context := &e.Context if *context == "" || *context == "{}" { context = nil } @@ -189,7 +188,7 @@ func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messag project_id, $2, $3, $4, CAST($5 AS jsonb) FROM sessions WHERE session_id = $1 - )ON CONFLICT DO NOTHING`, + )ON CONFLICT DO NOTHING`, sessionID, issueID, e.Type, e.ContextString, context, ); err != nil { return err @@ -199,8 +198,8 @@ func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messag session_id, issue_id, timestamp, seq_index, payload ) VALUES ( $1, $2, $3, $4, CAST($5 AS jsonb) - )`, - sessionID, issueID, e.Timestamp, + )`, + sessionID, issueID, e.Timestamp, getSqIdx(e.MessageID), payload, ); err != nil { @@ -228,5 +227,3 @@ func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messag } return tx.commit() } - - diff --git a/backend/pkg/db/postgres/messages_web.go b/backend/pkg/db/postgres/messages_web.go index f7354252b..64d7ba561 100644 --- a/backend/pkg/db/postgres/messages_web.go +++ b/backend/pkg/db/postgres/messages_web.go @@ -68,16 +68,19 @@ func (conn *Conn) InsertWebPageEvent(sessionID uint64, e *PageEvent) error { if err := tx.exec(` INSERT INTO events.pages ( session_id, message_id, timestamp, referrer, base_referrer, host, path, base_path, - dom_content_loaded_time, load_time, response_end, first_paint_time, first_contentful_paint_time, speed_index, visually_complete, time_to_interactive, + dom_content_loaded_time, load_time, response_end, first_paint_time, first_contentful_paint_time, + speed_index, visually_complete, time_to_interactive, response_time, dom_building_time ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, - NULLIF($9, 0), NULLIF($10, 0), NULLIF($11, 0), NULLIF($12, 0), NULLIF($13, 0), NULLIF($14, 0), NULLIF($15, 0), NULLIF($16, 0), + NULLIF($9, 0), NULLIF($10, 0), NULLIF($11, 0), NULLIF($12, 0), NULLIF($13, 0), + NULLIF($14, 0), NULLIF($15, 0), NULLIF($16, 0), NULLIF($17, 0), NULLIF($18, 0) ) `, sessionID, e.MessageID, e.Timestamp, e.Referrer, url.DiscardURLQuery(e.Referrer), host, path, url.DiscardURLQuery(path), - e.DomContentLoadedEventEnd, e.LoadEventEnd, e.ResponseEnd, e.FirstPaint, e.FirstContentfulPaint, e.SpeedIndex, e.VisuallyComplete, e.TimeToInteractive, + e.DomContentLoadedEventEnd, e.LoadEventEnd, e.ResponseEnd, e.FirstPaint, e.FirstContentfulPaint, + e.SpeedIndex, e.VisuallyComplete, e.TimeToInteractive, calcResponseTime(e), calcDomBuildingTime(e), ); err != nil { return err diff --git a/backend/pkg/db/postgres/messages_web_stats.go b/backend/pkg/db/postgres/messages_web_stats.go index 9e3b5bc77..933442b0b 100644 --- a/backend/pkg/db/postgres/messages_web_stats.go +++ b/backend/pkg/db/postgres/messages_web_stats.go @@ -1,21 +1,18 @@ -package postgres +package postgres import ( - "openreplay/backend/pkg/url" . "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/url" ) - - func (conn *Conn) InsertWebStatsLongtask(sessionID uint64, l *LongTask) error { - return nil // Do we even use them? - // conn.exec(``); + return nil // Do we even use them? + // conn.exec(``); } - func (conn *Conn) InsertWebStatsPerformance(sessionID uint64, p *PerformanceTrackAggr) error { - timestamp := (p.TimestampEnd + p.TimestampStart) /2 - return conn.exec(` + timestamp := (p.TimestampEnd + p.TimestampStart) / 2 + return conn.batchQueue(sessionID, ` INSERT INTO events.performance ( session_id, timestamp, message_id, min_fps, avg_fps, max_fps, @@ -34,7 +31,7 @@ func (conn *Conn) InsertWebStatsPerformance(sessionID uint64, p *PerformanceTrac p.MinCPU, p.AvgCPU, p.MinCPU, p.MinTotalJSHeapSize, p.AvgTotalJSHeapSize, p.MaxTotalJSHeapSize, p.MinUsedJSHeapSize, p.AvgUsedJSHeapSize, p.MaxUsedJSHeapSize, - ); + ) } func (conn *Conn) InsertWebStatsResourceEvent(sessionID uint64, e *ResourceEvent) error { @@ -42,7 +39,7 @@ func (conn *Conn) InsertWebStatsResourceEvent(sessionID uint64, e *ResourceEvent if err != nil { return err } - return conn.exec(` + return conn.batchQueue(sessionID, ` INSERT INTO events.resources ( session_id, timestamp, message_id, type, @@ -58,11 +55,11 @@ func (conn *Conn) InsertWebStatsResourceEvent(sessionID uint64, e *ResourceEvent NULLIF($10, '')::events.resource_method, NULLIF($11, 0), NULLIF($12, 0), NULLIF($13, 0), NULLIF($14, 0), NULLIF($15, 0) )`, - sessionID, e.Timestamp, e.MessageID, + sessionID, e.Timestamp, e.MessageID, e.Type, e.URL, host, url.DiscardURLQuery(e.URL), - e.Success, e.Status, + e.Success, e.Status, url.EnsureMethod(e.Method), e.Duration, e.TTFB, e.HeaderSize, e.EncodedBodySize, e.DecodedBodySize, ) -} \ No newline at end of file +} diff --git a/backend/services/db/main.go b/backend/services/db/main.go index b60f7e7db..d6190a4f0 100644 --- a/backend/services/db/main.go +++ b/backend/services/db/main.go @@ -11,11 +11,11 @@ import ( "openreplay/backend/pkg/db/cache" "openreplay/backend/pkg/db/postgres" "openreplay/backend/pkg/env" + logger "openreplay/backend/pkg/log" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue" "openreplay/backend/pkg/queue/types" "openreplay/backend/services/db/heuristics" - logger "openreplay/backend/pkg/log" ) var pg *cache.PGCache @@ -29,7 +29,6 @@ func main() { heurFinder := heuristics.NewHandler() - statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC")) consumer := queue.NewMessageConsumer( @@ -91,6 +90,7 @@ func main() { consumer.Close() os.Exit(0) case <-tick: + pg.CommitBatches() if err := commitStats(); err != nil { log.Printf("Error on stats commit: %v", err) }