From aa3192b6c7e960d780583e8dcd1a1f56a2422f51 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 21 Jul 2022 17:22:43 +0200 Subject: [PATCH] Patch for db and ender services (#632) * feat(backend/db): implemented bulk inserts for db service * feat(backend/storage): added new reading_duration metric for storage service * feat(backend/ender): added check for sessionEnd duplicates --- backend/cmd/ender/main.go | 12 +- backend/internal/storage/storage.go | 14 +- backend/pkg/db/cache/messages-common.go | 8 +- backend/pkg/db/cache/messages-ios.go | 11 +- backend/pkg/db/cache/messages-web.go | 55 +++- backend/pkg/db/cache/pg-cache.go | 7 +- backend/pkg/db/postgres/bulk.go | 93 +++++++ backend/pkg/db/postgres/connector.go | 257 +++++++++++++------ backend/pkg/db/postgres/integration.go | 4 +- backend/pkg/db/postgres/messages-common.go | 88 +++---- backend/pkg/db/postgres/messages-ios.go | 22 +- backend/pkg/db/postgres/messages-web.go | 185 ++++--------- backend/pkg/db/postgres/pool.go | 175 +++++++++++++ backend/pkg/db/postgres/project.go | 4 +- backend/pkg/db/postgres/session-updates.go | 30 +++ backend/pkg/db/postgres/session.go | 71 +---- backend/pkg/db/postgres/unstarted-session.go | 2 +- 17 files changed, 657 insertions(+), 381 deletions(-) create mode 100644 backend/pkg/db/postgres/bulk.go create mode 100644 backend/pkg/db/postgres/pool.go create mode 100644 backend/pkg/db/postgres/session-updates.go diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index c0613fca0..1fd2f4e64 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -82,10 +82,20 @@ func main() { // Find ended sessions and send notification to other services sessions.HandleEndedSessions(func(sessionID uint64, timestamp int64) bool { msg := &messages.SessionEnd{Timestamp: uint64(timestamp)} - if err := pg.InsertSessionEnd(sessionID, msg.Timestamp); err != nil { + currDuration, err := pg.GetSessionDuration(sessionID) + if err != nil { + log.Printf("getSessionDuration failed, sessID: %d, err: %s", sessionID, err) + } + newDuration, err := pg.InsertSessionEnd(sessionID, msg.Timestamp) + if err != nil { log.Printf("can't save sessionEnd to database, sessID: %d, err: %s", sessionID, err) return false } + if currDuration == newDuration { + log.Printf("sessionEnd duplicate, sessID: %d, prevDur: %d, newDur: %d", sessionID, + currDuration, newDuration) + return true + } if err := producer.Produce(cfg.TopicRawWeb, sessionID, messages.Encode(msg)); err != nil { log.Printf("can't send sessionEnd to topic: %s; sessID: %d", err, sessionID) return false diff --git a/backend/internal/storage/storage.go b/backend/internal/storage/storage.go index abe8089be..8d79468db 100644 --- a/backend/internal/storage/storage.go +++ b/backend/internal/storage/storage.go @@ -21,6 +21,7 @@ type Storage struct { startBytes []byte totalSessions syncfloat64.Counter sessionSize syncfloat64.Histogram + readingTime syncfloat64.Histogram archivingTime syncfloat64.Histogram } @@ -40,6 +41,10 @@ func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Stor if err != nil { log.Printf("can't create session_size metric: %s", err) } + readingTime, err := metrics.RegisterHistogram("reading_duration") + if err != nil { + log.Printf("can't create reading_duration metric: %s", err) + } archivingTime, err := metrics.RegisterHistogram("archiving_duration") if err != nil { log.Printf("can't create archiving_duration metric: %s", err) @@ -50,16 +55,17 @@ func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Stor startBytes: make([]byte, cfg.FileSplitSize), totalSessions: totalSessions, sessionSize: sessionSize, + readingTime: readingTime, archivingTime: archivingTime, }, nil } func (s *Storage) UploadKey(key string, retryCount int) error { - start := time.Now() if retryCount <= 0 { return nil } + start := time.Now() file, err := os.Open(s.cfg.FSDir + "/" + key) if err != nil { sessID, _ := strconv.ParseUint(key, 10, 64) @@ -84,6 +90,9 @@ func (s *Storage) UploadKey(key string, retryCount int) error { }) return nil } + s.readingTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds())) + + start = time.Now() startReader := bytes.NewBuffer(s.startBytes[:nRead]) if err := s.s3.Upload(s.gzipFile(startReader), key, "application/octet-stream", true); err != nil { log.Fatalf("Storage: start upload failed. %v\n", err) @@ -93,6 +102,7 @@ func (s *Storage) UploadKey(key string, retryCount int) error { log.Fatalf("Storage: end upload failed. %v\n", err) } } + s.archivingTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds())) // Save metrics var fileSize float64 = 0 @@ -103,7 +113,7 @@ func (s *Storage) UploadKey(key string, retryCount int) error { fileSize = float64(fileInfo.Size()) } ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*200) - s.archivingTime.Record(ctx, float64(time.Now().Sub(start).Milliseconds())) + s.sessionSize.Record(ctx, fileSize) s.totalSessions.Add(ctx, 1) return nil diff --git a/backend/pkg/db/cache/messages-common.go b/backend/pkg/db/cache/messages-common.go index cebdaf5e7..41cdb1895 100644 --- a/backend/pkg/db/cache/messages-common.go +++ b/backend/pkg/db/cache/messages-common.go @@ -7,12 +7,8 @@ import ( // . "openreplay/backend/pkg/db/types" ) -func (c *PGCache) InsertSessionEnd(sessionID uint64, timestamp uint64) error { - _, err := c.Conn.InsertSessionEnd(sessionID, timestamp) - if err != nil { - return err - } - return nil +func (c *PGCache) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, error) { + return c.Conn.InsertSessionEnd(sessionID, timestamp) } func (c *PGCache) HandleSessionEnd(sessionID uint64) error { diff --git a/backend/pkg/db/cache/messages-ios.go b/backend/pkg/db/cache/messages-ios.go index 4195976c3..e0463c431 100644 --- a/backend/pkg/db/cache/messages-ios.go +++ b/backend/pkg/db/cache/messages-ios.go @@ -32,7 +32,8 @@ func (c *PGCache) InsertIOSSessionStart(sessionID uint64, s *IOSSessionStart) er } func (c *PGCache) InsertIOSSessionEnd(sessionID uint64, e *IOSSessionEnd) error { - return c.InsertSessionEnd(sessionID, e.Timestamp) + _, err := c.InsertSessionEnd(sessionID, e.Timestamp) + return err } func (c *PGCache) InsertIOSScreenEnter(sessionID uint64, screenEnter *IOSScreenEnter) error { @@ -84,13 +85,5 @@ func (c *PGCache) InsertIOSCrash(sessionID uint64, crash *IOSCrash) error { } func (c *PGCache) InsertIOSIssueEvent(sessionID uint64, issueEvent *IOSIssueEvent) error { - // session, err := c.GetSession(sessionID) - // if err != nil { - // return err - // } - // TODO: unite IssueEvent message for the all platforms - // if err := c.Conn.InsertIssueEvent(sessionID, session.ProjectID, issueEvent); err != nil { - // return err - // } return nil } diff --git a/backend/pkg/db/cache/messages-web.go b/backend/pkg/db/cache/messages-web.go index 7da7006af..0a864e6d3 100644 --- a/backend/pkg/db/cache/messages-web.go +++ b/backend/pkg/db/cache/messages-web.go @@ -63,7 +63,8 @@ func (c *PGCache) HandleWebSessionStart(sessionID uint64, s *SessionStart) error } func (c *PGCache) InsertWebSessionEnd(sessionID uint64, e *SessionEnd) error { - return c.InsertSessionEnd(sessionID, e.Timestamp) + _, err := c.InsertSessionEnd(sessionID, e.Timestamp) + return err } func (c *PGCache) HandleWebSessionEnd(sessionID uint64, e *SessionEnd) error { @@ -91,7 +92,7 @@ func (c *PGCache) InsertWebFetchEvent(sessionID uint64, e *FetchEvent) error { if err != nil { return err } - return c.Conn.InsertWebFetchEvent(sessionID, project.SaveRequestPayloads, e) + return c.Conn.InsertWebFetchEvent(sessionID, session.ProjectID, project.SaveRequestPayloads, e) } func (c *PGCache) InsertWebGraphQLEvent(sessionID uint64, e *GraphQLEvent) error { @@ -103,5 +104,53 @@ func (c *PGCache) InsertWebGraphQLEvent(sessionID uint64, e *GraphQLEvent) error if err != nil { return err } - return c.Conn.InsertWebGraphQLEvent(sessionID, project.SaveRequestPayloads, e) + return c.Conn.InsertWebGraphQLEvent(sessionID, session.ProjectID, project.SaveRequestPayloads, e) +} + +func (c *PGCache) InsertWebCustomEvent(sessionID uint64, e *CustomEvent) error { + session, err := c.GetSession(sessionID) + if err != nil { + return err + } + return c.Conn.InsertWebCustomEvent(sessionID, session.ProjectID, e) +} + +func (c *PGCache) InsertWebUserID(sessionID uint64, userID *UserID) error { + session, err := c.GetSession(sessionID) + if err != nil { + return err + } + return c.Conn.InsertWebUserID(sessionID, session.ProjectID, userID) +} + +func (c *PGCache) InsertWebUserAnonymousID(sessionID uint64, userAnonymousID *UserAnonymousID) error { + session, err := c.GetSession(sessionID) + if err != nil { + return err + } + return c.Conn.InsertWebUserAnonymousID(sessionID, session.ProjectID, userAnonymousID) +} + +func (c *PGCache) InsertWebPageEvent(sessionID uint64, e *PageEvent) error { + session, err := c.GetSession(sessionID) + if err != nil { + return err + } + return c.Conn.InsertWebPageEvent(sessionID, session.ProjectID, e) +} + +func (c *PGCache) InsertWebClickEvent(sessionID uint64, e *ClickEvent) error { + session, err := c.GetSession(sessionID) + if err != nil { + return err + } + return c.Conn.InsertWebClickEvent(sessionID, session.ProjectID, e) +} + +func (c *PGCache) InsertWebInputEvent(sessionID uint64, e *InputEvent) error { + session, err := c.GetSession(sessionID) + if err != nil { + return err + } + return c.Conn.InsertWebInputEvent(sessionID, session.ProjectID, e) } diff --git a/backend/pkg/db/cache/pg-cache.go b/backend/pkg/db/cache/pg-cache.go index 6422209d4..ca31bcd82 100644 --- a/backend/pkg/db/cache/pg-cache.go +++ b/backend/pkg/db/cache/pg-cache.go @@ -29,10 +29,9 @@ type PGCache struct { // TODO: create conn automatically func NewPGCache(pgConn *postgres.Conn, projectExpirationTimeoutMs int64) *PGCache { return &PGCache{ - Conn: pgConn, - sessions: make(map[uint64]*Session), - projects: make(map[uint32]*ProjectMeta), - //projectsByKeys: make(map[string]*ProjectMeta), + Conn: pgConn, + sessions: make(map[uint64]*Session), + projects: make(map[uint32]*ProjectMeta), projectExpirationTimeout: time.Duration(1000 * projectExpirationTimeoutMs), } } diff --git a/backend/pkg/db/postgres/bulk.go b/backend/pkg/db/postgres/bulk.go new file mode 100644 index 000000000..16f59efcd --- /dev/null +++ b/backend/pkg/db/postgres/bulk.go @@ -0,0 +1,93 @@ +package postgres + +import ( + "bytes" + "errors" + "fmt" +) + +const ( + insertPrefix = `INSERT INTO ` + insertValues = ` VALUES ` + insertSuffix = ` ON CONFLICT DO NOTHING;` +) + +type Bulk interface { + Append(args ...interface{}) error + Send() error +} + +type bulkImpl struct { + conn Pool + table string + columns string + template string + setSize int + sizeLimit int + values []interface{} +} + +func (b *bulkImpl) Append(args ...interface{}) error { + if len(args) != b.setSize { + return fmt.Errorf("wrong number of arguments, waited: %d, got: %d", b.setSize, len(args)) + } + b.values = append(b.values, args...) + if len(b.values)/b.setSize >= b.sizeLimit { + return b.send() + } + return nil +} + +func (b *bulkImpl) Send() error { + if len(b.values) == 0 { + return nil + } + return b.send() +} + +func (b *bulkImpl) send() error { + request := bytes.NewBufferString(insertPrefix + b.table + b.columns + insertValues) + args := make([]interface{}, b.setSize) + for i := 0; i < len(b.values)/b.setSize; i++ { + for j := 0; j < b.setSize; j++ { + args[j] = i*b.setSize + j + 1 + } + if i > 0 { + request.WriteByte(',') + } + request.WriteString(fmt.Sprintf(b.template, args...)) + } + request.WriteString(insertSuffix) + err := b.conn.Exec(request.String(), b.values...) + b.values = make([]interface{}, 0, b.setSize*b.sizeLimit) + if err != nil { + return fmt.Errorf("send bulk err: %s", err) + } + return nil +} + +func NewBulk(conn Pool, table, columns, template string, setSize, sizeLimit int) (Bulk, error) { + switch { + case conn == nil: + return nil, errors.New("db conn is empty") + case table == "": + return nil, errors.New("table is empty") + case columns == "": + return nil, errors.New("columns is empty") + case template == "": + return nil, errors.New("template is empty") + case setSize <= 0: + return nil, errors.New("set size is wrong") + case sizeLimit <= 0: + return nil, errors.New("size limit is wrong") + } + return &bulkImpl{ + conn: conn, + table: table, + columns: columns, + template: template, + setSize: setSize, + sizeLimit: sizeLimit, + values: make([]interface{}, 0, setSize*sizeLimit), + }, nil +} diff --git a/backend/pkg/db/postgres/connector.go b/backend/pkg/db/postgres/connector.go index 4a85029fd..1cc537982 100644 --- a/backend/pkg/db/postgres/connector.go +++ b/backend/pkg/db/postgres/connector.go @@ -13,26 +13,30 @@ import ( "github.com/jackc/pgx/v4/pgxpool" ) -func getTimeoutContext() context.Context { - ctx, _ := context.WithTimeout(context.Background(), time.Duration(time.Second*30)) - return ctx -} - type batchItem struct { query string arguments []interface{} } +// Conn contains batches, bulks and cache for all sessions type Conn struct { - c *pgxpool.Pool // TODO: conditional usage of Pool/Conn (use interface?) - batches map[uint64]*pgx.Batch - batchSizes map[uint64]int - rawBatches map[uint64][]*batchItem - batchQueueLimit int - batchSizeLimit int - batchSizeBytes syncfloat64.Histogram - batchSizeLines syncfloat64.Histogram - sqlRequestTime syncfloat64.Histogram + c Pool + batches map[uint64]*pgx.Batch + batchSizes map[uint64]int + rawBatches map[uint64][]*batchItem + autocompletes Bulk + requests Bulk + customEvents Bulk + webPageEvents Bulk + webInputEvents Bulk + webGraphQLEvents Bulk + sessionUpdates map[uint64]*sessionUpdates + batchQueueLimit int + batchSizeLimit int + batchSizeBytes syncfloat64.Histogram + batchSizeLines syncfloat64.Histogram + sqlRequestTime syncfloat64.Histogram + sqlRequestCounter syncfloat64.Counter } func NewConn(url string, queueLimit, sizeLimit int, metrics *monitoring.Metrics) *Conn { @@ -45,14 +49,19 @@ func NewConn(url string, queueLimit, sizeLimit int, metrics *monitoring.Metrics) log.Fatalln("pgxpool.Connect Error") } conn := &Conn{ - c: c, batches: make(map[uint64]*pgx.Batch), batchSizes: make(map[uint64]int), rawBatches: make(map[uint64][]*batchItem), + sessionUpdates: make(map[uint64]*sessionUpdates), batchQueueLimit: queueLimit, batchSizeLimit: sizeLimit, } conn.initMetrics(metrics) + conn.c, err = NewPool(c, conn.sqlRequestTime, conn.sqlRequestCounter) + if err != nil { + log.Fatalf("can't create new pool wrapper: %s", err) + } + conn.initBulks() return conn } @@ -75,6 +84,74 @@ func (conn *Conn) initMetrics(metrics *monitoring.Metrics) { if err != nil { log.Printf("can't create sqlRequestTime metric: %s", err) } + conn.sqlRequestCounter, err = metrics.RegisterCounter("sql_request_number") + if err != nil { + log.Printf("can't create sqlRequestNumber metric: %s", err) + } +} + +func (conn *Conn) initBulks() { + var err error + conn.autocompletes, err = NewBulk(conn.c, + "autocomplete", + "(value, type, project_id)", + "($%d, $%d, $%d)", + 3, 100) + if err != nil { + log.Fatalf("can't create autocomplete bulk") + } + conn.requests, err = NewBulk(conn.c, + "events_common.requests", + "(session_id, timestamp, seq_index, url, duration, success)", + "($%d, $%d, $%d, left($%d, 2700), $%d, $%d)", + 6, 100) + if err != nil { + log.Fatalf("can't create requests bulk") + } + conn.customEvents, err = NewBulk(conn.c, + "events_common.customs", + "(session_id, timestamp, seq_index, name, payload)", + "($%d, $%d, $%d, left($%d, 2700), $%d)", + 5, 100) + if err != nil { + log.Fatalf("can't create customEvents bulk") + } + conn.webPageEvents, err = NewBulk(conn.c, + "events.pages", + "(session_id, message_id, timestamp, referrer, base_referrer, host, path, query, dom_content_loaded_time, "+ + "load_time, response_end, first_paint_time, first_contentful_paint_time, speed_index, visually_complete, "+ + "time_to_interactive, response_time, dom_building_time)", + "($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0),"+ + " NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0))", + 18, 100) + if err != nil { + log.Fatalf("can't create webPageEvents bulk") + } + conn.webInputEvents, err = NewBulk(conn.c, + "events.inputs", + "(session_id, message_id, timestamp, value, label)", + "($%d, $%d, $%d, $%d, NULLIF($%d,''))", + 5, 100) + if err != nil { + log.Fatalf("can't create webPageEvents bulk") + } + conn.webGraphQLEvents, err = NewBulk(conn.c, + "events.graphql", + "(session_id, timestamp, message_id, name, request_body, response_body)", + "($%d, $%d, $%d, left($%d, 2700), $%d, $%d)", + 6, 100) + if err != nil { + log.Fatalf("can't create webPageEvents bulk") + } +} + +func (conn *Conn) insertAutocompleteValue(sessionID uint64, projectID uint32, tp string, value string) { + if len(value) == 0 { + return + } + if err := conn.autocompletes.Append(value, tp, projectID); err != nil { + log.Printf("autocomplete bulk err: %s", err) + } } func (conn *Conn) batchQueue(sessionID uint64, sql string, args ...interface{}) { @@ -85,6 +162,10 @@ func (conn *Conn) batchQueue(sessionID uint64, sql string, args ...interface{}) batch = conn.batches[sessionID] } batch.Queue(sql, args...) + conn.rawBatch(sessionID, sql, args...) +} + +func (conn *Conn) rawBatch(sessionID uint64, sql string, args ...interface{}) { // Temp raw batch store raw := conn.rawBatches[sessionID] raw = append(raw, &batchItem{ @@ -94,13 +175,54 @@ func (conn *Conn) batchQueue(sessionID uint64, sql string, args ...interface{}) conn.rawBatches[sessionID] = raw } +func (conn *Conn) updateSessionEvents(sessionID uint64, events, pages int) { + if _, ok := conn.sessionUpdates[sessionID]; !ok { + conn.sessionUpdates[sessionID] = NewSessionUpdates(sessionID) + } + conn.sessionUpdates[sessionID].add(pages, events) +} + +func (conn *Conn) sendBulks() { + if err := conn.autocompletes.Send(); err != nil { + log.Printf("autocomplete bulk send err: %s", err) + } + if err := conn.requests.Send(); err != nil { + log.Printf("requests bulk send err: %s", err) + } + if err := conn.customEvents.Send(); err != nil { + log.Printf("customEvents bulk send err: %s", err) + } + if err := conn.webPageEvents.Send(); err != nil { + log.Printf("webPageEvents bulk send err: %s", err) + } + if err := conn.webInputEvents.Send(); err != nil { + log.Printf("webInputEvents bulk send err: %s", err) + } + if err := conn.webGraphQLEvents.Send(); err != nil { + log.Printf("webGraphQLEvents bulk send err: %s", err) + } +} + func (conn *Conn) CommitBatches() { + conn.sendBulks() for sessID, b := range conn.batches { + // Append session update sql request to the end of batch + if update, ok := conn.sessionUpdates[sessID]; ok { + sql, args := update.request() + if sql != "" { + conn.batchQueue(sessID, sql, args...) + b, _ = conn.batches[sessID] + } + } // Record batch size in bytes and number of lines conn.batchSizeBytes.Record(context.Background(), float64(conn.batchSizes[sessID])) conn.batchSizeLines.Record(context.Background(), float64(b.Len())) + + start := time.Now() + isFailed := false + // Send batch to db and execute - br := conn.c.SendBatch(getTimeoutContext(), b) + br := conn.c.SendBatch(b) l := b.Len() for i := 0; i < l; i++ { if ct, err := br.Exec(); err != nil { @@ -108,13 +230,33 @@ func (conn *Conn) CommitBatches() { failedSql := conn.rawBatches[sessID][i] query := strings.ReplaceAll(failedSql.query, "\n", " ") log.Println("failed sql req:", query, failedSql.arguments) + isFailed = true } } br.Close() // returns err + conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), + attribute.String("method", "batch"), attribute.Bool("failed", isFailed)) + conn.sqlRequestCounter.Add(context.Background(), 1, + attribute.String("method", "batch"), attribute.Bool("failed", isFailed)) + if !isFailed { + delete(conn.sessionUpdates, sessID) + } } conn.batches = make(map[uint64]*pgx.Batch) conn.batchSizes = make(map[uint64]int) conn.rawBatches = make(map[uint64][]*batchItem) + + // Session updates + for sessID, su := range conn.sessionUpdates { + sql, args := su.request() + if sql == "" { + continue + } + if err := conn.c.Exec(sql, args...); err != nil { + log.Printf("failed session update, sessID: %d, err: %s", sessID, err) + } + } + conn.sessionUpdates = make(map[uint64]*sessionUpdates) } func (conn *Conn) updateBatchSize(sessionID uint64, reqSize int) { @@ -131,11 +273,23 @@ func (conn *Conn) commitBatch(sessionID uint64) { log.Printf("can't find batch for session: %d", sessionID) return } + // Append session update sql request to the end of batch + if update, ok := conn.sessionUpdates[sessionID]; ok { + sql, args := update.request() + if sql != "" { + conn.batchQueue(sessionID, sql, args...) + b, _ = conn.batches[sessionID] + } + } // Record batch size in bytes and number of lines conn.batchSizeBytes.Record(context.Background(), float64(conn.batchSizes[sessionID])) conn.batchSizeLines.Record(context.Background(), float64(b.Len())) + + start := time.Now() + isFailed := false + // Send batch to db and execute - br := conn.c.SendBatch(getTimeoutContext(), b) + br := conn.c.SendBatch(b) l := b.Len() for i := 0; i < l; i++ { if ct, err := br.Exec(); err != nil { @@ -143,74 +297,19 @@ func (conn *Conn) commitBatch(sessionID uint64) { failedSql := conn.rawBatches[sessionID][i] query := strings.ReplaceAll(failedSql.query, "\n", " ") log.Println("failed sql req:", query, failedSql.arguments) + isFailed = true } } br.Close() + conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), + attribute.String("method", "batch"), attribute.Bool("failed", isFailed)) + conn.sqlRequestCounter.Add(context.Background(), 1, + attribute.String("method", "batch"), attribute.Bool("failed", isFailed)) + // Clean batch info delete(conn.batches, sessionID) delete(conn.batchSizes, sessionID) delete(conn.rawBatches, sessionID) -} - -func (conn *Conn) query(sql string, args ...interface{}) (pgx.Rows, error) { - start := time.Now() - res, err := conn.c.Query(getTimeoutContext(), sql, args...) - conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), attribute.String("method", methodName(sql))) - return res, err -} - -func (conn *Conn) queryRow(sql string, args ...interface{}) pgx.Row { - start := time.Now() - res := conn.c.QueryRow(getTimeoutContext(), sql, args...) - conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), attribute.String("method", methodName(sql))) - return res -} - -func (conn *Conn) exec(sql string, args ...interface{}) error { - start := time.Now() - _, err := conn.c.Exec(getTimeoutContext(), sql, args...) - conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), attribute.String("method", methodName(sql))) - return err -} - -type _Tx struct { - pgx.Tx - sqlRequestTime syncfloat64.Histogram -} - -func (conn *Conn) begin() (_Tx, error) { - start := time.Now() - tx, err := conn.c.Begin(context.Background()) - conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), attribute.String("method", "begin")) - return _Tx{tx, conn.sqlRequestTime}, err -} - -func (tx _Tx) exec(sql string, args ...interface{}) error { - start := time.Now() - _, err := tx.Exec(context.Background(), sql, args...) - tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), attribute.String("method", methodName(sql))) - return err -} - -func (tx _Tx) rollback() error { - start := time.Now() - err := tx.Rollback(context.Background()) - tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), attribute.String("method", "rollback")) - return err -} - -func (tx _Tx) commit() error { - start := time.Now() - err := tx.Commit(context.Background()) - tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), attribute.String("method", "commit")) - return err -} - -func methodName(sql string) string { - method := "unknown" - if parts := strings.Split(sql, ""); len(parts) > 0 { - method = parts[0] - } - return strings.ToLower(method) + delete(conn.sessionUpdates, sessionID) } diff --git a/backend/pkg/db/postgres/integration.go b/backend/pkg/db/postgres/integration.go index e44bd726e..1556006c1 100644 --- a/backend/pkg/db/postgres/integration.go +++ b/backend/pkg/db/postgres/integration.go @@ -15,7 +15,7 @@ type Integration struct { } func (conn *Conn) IterateIntegrationsOrdered(iter func(integration *Integration, err error)) error { - rows, err := conn.query(` + rows, err := conn.c.Query(` SELECT project_id, provider, options, request_data FROM integrations `) @@ -40,7 +40,7 @@ func (conn *Conn) IterateIntegrationsOrdered(iter func(integration *Integration, } func (conn *Conn) UpdateIntegrationRequestData(i *Integration) error { - return conn.exec(` + return conn.c.Exec(` UPDATE integrations SET request_data = $1 WHERE project_id=$2 AND provider=$3`, diff --git a/backend/pkg/db/postgres/messages-common.go b/backend/pkg/db/postgres/messages-common.go index a68d2c814..2925acde3 100644 --- a/backend/pkg/db/postgres/messages-common.go +++ b/backend/pkg/db/postgres/messages-common.go @@ -18,31 +18,8 @@ func getAutocompleteType(baseType string, platform string) string { } -func (conn *Conn) insertAutocompleteValue(sessionID uint64, tp string, value string) { - if len(value) == 0 { - return - } - sqlRequest := ` - INSERT INTO autocomplete ( - value, - type, - project_id - ) (SELECT - $1, $2, project_id - FROM sessions - WHERE session_id = $3 - ) ON CONFLICT DO NOTHING` - if err := conn.exec(sqlRequest, value, tp, sessionID); err != nil { - log.Printf("can't insert autocomplete: %s", err) - } - //conn.batchQueue(sessionID, sqlRequest, value, tp, sessionID) - - // Record approximate message size - //conn.updateBatchSize(sessionID, len(sqlRequest)+len(value)+len(tp)+8) -} - func (conn *Conn) InsertSessionStart(sessionID uint64, s *types.Session) error { - return conn.exec(` + return conn.c.Exec(` INSERT INTO sessions ( session_id, project_id, start_ts, user_uuid, user_device, user_device_type, user_country, @@ -74,18 +51,26 @@ func (conn *Conn) InsertSessionStart(sessionID uint64, s *types.Session) error { } func (conn *Conn) HandleSessionStart(sessionID uint64, s *types.Session) error { - conn.insertAutocompleteValue(sessionID, getAutocompleteType("USEROS", s.Platform), s.UserOS) - conn.insertAutocompleteValue(sessionID, getAutocompleteType("USERDEVICE", s.Platform), s.UserDevice) - conn.insertAutocompleteValue(sessionID, getAutocompleteType("USERCOUNTRY", s.Platform), s.UserCountry) - conn.insertAutocompleteValue(sessionID, getAutocompleteType("REVID", s.Platform), s.RevID) + conn.insertAutocompleteValue(sessionID, s.ProjectID, getAutocompleteType("USEROS", s.Platform), s.UserOS) + conn.insertAutocompleteValue(sessionID, s.ProjectID, getAutocompleteType("USERDEVICE", s.Platform), s.UserDevice) + conn.insertAutocompleteValue(sessionID, s.ProjectID, getAutocompleteType("USERCOUNTRY", s.Platform), s.UserCountry) + conn.insertAutocompleteValue(sessionID, s.ProjectID, getAutocompleteType("REVID", s.Platform), s.RevID) // s.Platform == "web" - conn.insertAutocompleteValue(sessionID, "USERBROWSER", s.UserBrowser) + conn.insertAutocompleteValue(sessionID, s.ProjectID, "USERBROWSER", s.UserBrowser) return nil } +func (conn *Conn) GetSessionDuration(sessionID uint64) (uint64, error) { + var dur uint64 + if err := conn.c.QueryRow("SELECT COALESCE( duration, 0 ) FROM sessions WHERE session_id=$1", sessionID).Scan(&dur); err != nil { + return 0, err + } + return dur, nil +} + func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, error) { var dur uint64 - if err := conn.queryRow(` + if err := conn.c.QueryRow(` UPDATE sessions SET duration=$2 - start_ts WHERE session_id=$1 RETURNING duration @@ -119,30 +104,16 @@ func (conn *Conn) HandleSessionEnd(sessionID uint64) error { } func (conn *Conn) InsertRequest(sessionID uint64, timestamp uint64, index uint64, url string, duration uint64, success bool) error { - sqlRequest := ` - INSERT INTO events_common.requests ( - session_id, timestamp, seq_index, url, duration, success - ) VALUES ( - $1, $2, $3, left($4, 2700), $5, $6 - )` - conn.batchQueue(sessionID, sqlRequest, sessionID, timestamp, getSqIdx(index), url, duration, success) - - // Record approximate message size - conn.updateBatchSize(sessionID, len(sqlRequest)+len(url)+8*4) + if err := conn.requests.Append(sessionID, timestamp, getSqIdx(index), url, duration, success); err != nil { + return fmt.Errorf("insert request in bulk err: %s", err) + } return nil } func (conn *Conn) InsertCustomEvent(sessionID uint64, timestamp uint64, index uint64, name string, payload string) error { - sqlRequest := ` - INSERT INTO events_common.customs ( - session_id, timestamp, seq_index, name, payload - ) VALUES ( - $1, $2, $3, left($4, 2700), $5 - )` - conn.batchQueue(sessionID, sqlRequest, sessionID, timestamp, getSqIdx(index), name, payload) - - // Record approximate message size - conn.updateBatchSize(sessionID, len(sqlRequest)+len(name)+len(payload)+8*3) + if err := conn.customEvents.Append(sessionID, timestamp, getSqIdx(index), name, payload); err != nil { + return fmt.Errorf("insert custom event in bulk err: %s", err) + } return nil } @@ -172,15 +143,21 @@ func (conn *Conn) InsertMetadata(sessionID uint64, keyNo uint, value string) err sqlRequest := ` UPDATE sessions SET metadata_%v = $1 WHERE session_id = $2` - return conn.exec(fmt.Sprintf(sqlRequest, keyNo), value, sessionID) + return conn.c.Exec(fmt.Sprintf(sqlRequest, keyNo), value, sessionID) } -func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messages.IssueEvent) error { - tx, err := conn.begin() +func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messages.IssueEvent) (err error) { + tx, err := conn.c.Begin() if err != nil { return err } - defer tx.rollback() + defer func() { + if err != nil { + if rollbackErr := tx.rollback(); rollbackErr != nil { + log.Printf("rollback err: %s", rollbackErr) + } + } + }() issueID := hashid.IssueID(projectID, e) // TEMP. TODO: nullable & json message field type @@ -237,5 +214,6 @@ func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messag return err } } - return tx.commit() + err = tx.commit() + return } diff --git a/backend/pkg/db/postgres/messages-ios.go b/backend/pkg/db/postgres/messages-ios.go index e75ff2acd..d7b2f58f3 100644 --- a/backend/pkg/db/postgres/messages-ios.go +++ b/backend/pkg/db/postgres/messages-ios.go @@ -9,7 +9,7 @@ import ( func (conn *Conn) InsertIOSCustomEvent(sessionID uint64, e *messages.IOSCustomEvent) error { err := conn.InsertCustomEvent(sessionID, e.Timestamp, e.Index, e.Name, e.Payload) if err == nil { - conn.insertAutocompleteValue(sessionID, "CUSTOM_IOS", e.Name) + conn.insertAutocompleteValue(sessionID, 0, "CUSTOM_IOS", e.Name) } return err } @@ -17,7 +17,7 @@ func (conn *Conn) InsertIOSCustomEvent(sessionID uint64, e *messages.IOSCustomEv func (conn *Conn) InsertIOSUserID(sessionID uint64, userID *messages.IOSUserID) error { err := conn.InsertUserID(sessionID, userID.Value) if err == nil { - conn.insertAutocompleteValue(sessionID, "USERID_IOS", userID.Value) + conn.insertAutocompleteValue(sessionID, 0, "USERID_IOS", userID.Value) } return err } @@ -25,7 +25,7 @@ func (conn *Conn) InsertIOSUserID(sessionID uint64, userID *messages.IOSUserID) func (conn *Conn) InsertIOSUserAnonymousID(sessionID uint64, userAnonymousID *messages.IOSUserAnonymousID) error { err := conn.InsertUserAnonymousID(sessionID, userAnonymousID.Value) if err == nil { - conn.insertAutocompleteValue(sessionID, "USERANONYMOUSID_IOS", userAnonymousID.Value) + conn.insertAutocompleteValue(sessionID, 0, "USERANONYMOUSID_IOS", userAnonymousID.Value) } return err } @@ -33,13 +33,13 @@ func (conn *Conn) InsertIOSUserAnonymousID(sessionID uint64, userAnonymousID *me func (conn *Conn) InsertIOSNetworkCall(sessionID uint64, e *messages.IOSNetworkCall) error { err := conn.InsertRequest(sessionID, e.Timestamp, e.Index, e.URL, e.Duration, e.Success) if err == nil { - conn.insertAutocompleteValue(sessionID, "REQUEST_IOS", url.DiscardURLQuery(e.URL)) + conn.insertAutocompleteValue(sessionID, 0, "REQUEST_IOS", url.DiscardURLQuery(e.URL)) } return err } func (conn *Conn) InsertIOSScreenEnter(sessionID uint64, screenEnter *messages.IOSScreenEnter) error { - tx, err := conn.begin() + tx, err := conn.c.Begin() if err != nil { return err } @@ -65,12 +65,12 @@ func (conn *Conn) InsertIOSScreenEnter(sessionID uint64, screenEnter *messages.I if err = tx.commit(); err != nil { return err } - conn.insertAutocompleteValue(sessionID, "VIEW_IOS", screenEnter.ViewName) + conn.insertAutocompleteValue(sessionID, 0, "VIEW_IOS", screenEnter.ViewName) return nil } func (conn *Conn) InsertIOSClickEvent(sessionID uint64, clickEvent *messages.IOSClickEvent) error { - tx, err := conn.begin() + tx, err := conn.c.Begin() if err != nil { return err } @@ -96,12 +96,12 @@ func (conn *Conn) InsertIOSClickEvent(sessionID uint64, clickEvent *messages.IOS if err = tx.commit(); err != nil { return err } - conn.insertAutocompleteValue(sessionID, "CLICK_IOS", clickEvent.Label) + conn.insertAutocompleteValue(sessionID, 0, "CLICK_IOS", clickEvent.Label) return nil } func (conn *Conn) InsertIOSInputEvent(sessionID uint64, inputEvent *messages.IOSInputEvent) error { - tx, err := conn.begin() + tx, err := conn.c.Begin() if err != nil { return err } @@ -132,13 +132,13 @@ func (conn *Conn) InsertIOSInputEvent(sessionID uint64, inputEvent *messages.IOS if err = tx.commit(); err != nil { return err } - conn.insertAutocompleteValue(sessionID, "INPUT_IOS", inputEvent.Label) + conn.insertAutocompleteValue(sessionID, 0, "INPUT_IOS", inputEvent.Label) // conn.insertAutocompleteValue(sessionID, "INPUT_VALUE", inputEvent.Label) return nil } func (conn *Conn) InsertIOSCrash(sessionID uint64, projectID uint32, crash *messages.IOSCrash) error { - tx, err := conn.begin() + tx, err := conn.c.Begin() if err != nil { return err } diff --git a/backend/pkg/db/postgres/messages-web.go b/backend/pkg/db/postgres/messages-web.go index e703ee933..c55344509 100644 --- a/backend/pkg/db/postgres/messages-web.go +++ b/backend/pkg/db/postgres/messages-web.go @@ -1,6 +1,7 @@ package postgres import ( + "log" "math" "openreplay/backend/pkg/hashid" @@ -13,104 +14,54 @@ func getSqIdx(messageID uint64) uint { return uint(messageID % math.MaxInt32) } -func (conn *Conn) InsertWebCustomEvent(sessionID uint64, e *CustomEvent) error { +func (conn *Conn) InsertWebCustomEvent(sessionID uint64, projectID uint32, e *CustomEvent) error { err := conn.InsertCustomEvent(sessionID, e.Timestamp, e.MessageID, e.Name, e.Payload) if err == nil { - conn.insertAutocompleteValue(sessionID, "CUSTOM", e.Name) + conn.insertAutocompleteValue(sessionID, projectID, "CUSTOM", e.Name) } return err } -func (conn *Conn) InsertWebUserID(sessionID uint64, userID *UserID) error { +func (conn *Conn) InsertWebUserID(sessionID uint64, projectID uint32, userID *UserID) error { err := conn.InsertUserID(sessionID, userID.ID) if err == nil { - conn.insertAutocompleteValue(sessionID, "USERID", userID.ID) + conn.insertAutocompleteValue(sessionID, projectID, "USERID", userID.ID) } return err } -func (conn *Conn) InsertWebUserAnonymousID(sessionID uint64, userAnonymousID *UserAnonymousID) error { +func (conn *Conn) InsertWebUserAnonymousID(sessionID uint64, projectID uint32, userAnonymousID *UserAnonymousID) error { err := conn.InsertUserAnonymousID(sessionID, userAnonymousID.ID) if err == nil { - conn.insertAutocompleteValue(sessionID, "USERANONYMOUSID", userAnonymousID.ID) + conn.insertAutocompleteValue(sessionID, projectID, "USERANONYMOUSID", userAnonymousID.ID) } return err } -// func (conn *Conn) InsertWebResourceEvent(sessionID uint64, e *ResourceEvent) error { -// if e.Type != "fetch" { -// return nil -// } -// err := conn.InsertRequest(sessionID, e.Timestamp, -// e.MessageID, -// e.URL, e.Duration, e.Success, -// ) -// if err == nil { -// conn.insertAutocompleteValue(sessionID, "REQUEST", url.DiscardURLQuery(e.URL)) -// } -// return err -// } - // TODO: fix column "dom_content_loaded_event_end" of relation "pages" -func (conn *Conn) InsertWebPageEvent(sessionID uint64, e *PageEvent) error { +func (conn *Conn) InsertWebPageEvent(sessionID uint64, projectID uint32, e *PageEvent) error { host, path, query, err := url.GetURLParts(e.URL) if err != nil { return err } - tx, err := conn.begin() - if err != nil { - return err + // base_path is deprecated + if err = conn.webPageEvents.Append(sessionID, e.MessageID, e.Timestamp, e.Referrer, url.DiscardURLQuery(e.Referrer), + host, path, query, e.DomContentLoadedEventEnd, e.LoadEventEnd, e.ResponseEnd, e.FirstPaint, e.FirstContentfulPaint, + e.SpeedIndex, e.VisuallyComplete, e.TimeToInteractive, calcResponseTime(e), calcDomBuildingTime(e)); err != nil { + log.Printf("insert web page event in bulk err: %s", err) } - defer tx.rollback() - // base_path is depricated - if err := tx.exec(` - INSERT INTO events.pages ( - session_id, message_id, timestamp, referrer, base_referrer, host, path, query, - dom_content_loaded_time, load_time, response_end, first_paint_time, first_contentful_paint_time, - speed_index, visually_complete, time_to_interactive, - response_time, dom_building_time - ) VALUES ( - $1, $2, $3, - $4, $5, - $6, $7, $8, - NULLIF($9, 0), NULLIF($10, 0), NULLIF($11, 0), NULLIF($12, 0), NULLIF($13, 0), - NULLIF($14, 0), NULLIF($15, 0), NULLIF($16, 0), - NULLIF($17, 0), NULLIF($18, 0) - ) - `, - sessionID, e.MessageID, e.Timestamp, - e.Referrer, url.DiscardURLQuery(e.Referrer), - host, path, query, - e.DomContentLoadedEventEnd, e.LoadEventEnd, e.ResponseEnd, e.FirstPaint, e.FirstContentfulPaint, - e.SpeedIndex, e.VisuallyComplete, e.TimeToInteractive, - calcResponseTime(e), calcDomBuildingTime(e), - ); err != nil { - return err - } - if err = tx.exec(` - UPDATE sessions SET (pages_count, events_count) = (pages_count + 1, events_count + 1) - WHERE session_id = $1`, - sessionID, - ); err != nil { - return err - } - if err = tx.commit(); err != nil { - return err - } - conn.insertAutocompleteValue(sessionID, "LOCATION", url.DiscardURLQuery(path)) - conn.insertAutocompleteValue(sessionID, "REFERRER", url.DiscardURLQuery(e.Referrer)) + // Accumulate session updates and exec inside batch with another sql commands + conn.updateSessionEvents(sessionID, 1, 1) + // Add new value set to autocomplete bulk + conn.insertAutocompleteValue(sessionID, projectID, "LOCATION", url.DiscardURLQuery(path)) + conn.insertAutocompleteValue(sessionID, projectID, "REFERRER", url.DiscardURLQuery(e.Referrer)) return nil } -func (conn *Conn) InsertWebClickEvent(sessionID uint64, e *ClickEvent) error { - tx, err := conn.begin() - if err != nil { - return err - } - defer tx.rollback() - if err = tx.exec(` +func (conn *Conn) InsertWebClickEvent(sessionID uint64, projectID uint32, e *ClickEvent) error { + sqlRequest := ` INSERT INTO events.clicks (session_id, message_id, timestamp, label, selector, url) (SELECT @@ -118,65 +69,40 @@ func (conn *Conn) InsertWebClickEvent(sessionID uint64, e *ClickEvent) error { FROM events.pages WHERE session_id = $1 AND timestamp <= $3 ORDER BY timestamp DESC LIMIT 1 ) - `, - sessionID, e.MessageID, e.Timestamp, e.Label, e.Selector, - ); err != nil { - return err - } - if err = tx.exec(` - UPDATE sessions SET events_count = events_count + 1 - WHERE session_id = $1`, - sessionID, - ); err != nil { - return err - } - if err = tx.commit(); err != nil { - return err - } - conn.insertAutocompleteValue(sessionID, "CLICK", e.Label) + ` + conn.batchQueue(sessionID, sqlRequest, sessionID, e.MessageID, e.Timestamp, e.Label, e.Selector) + // Accumulate session updates and exec inside batch with another sql commands + conn.updateSessionEvents(sessionID, 1, 0) + // Add new value set to autocomplete bulk + conn.insertAutocompleteValue(sessionID, projectID, "CLICK", e.Label) return nil } -func (conn *Conn) InsertWebInputEvent(sessionID uint64, e *InputEvent) error { - tx, err := conn.begin() - if err != nil { - return err - } - defer tx.rollback() +func (conn *Conn) InsertWebInputEvent(sessionID uint64, projectID uint32, e *InputEvent) error { value := &e.Value if e.ValueMasked { value = nil } - if err = tx.exec(` - INSERT INTO events.inputs - (session_id, message_id, timestamp, value, label) - VALUES - ($1, $2, $3, $4, NULLIF($5,'')) - `, - sessionID, e.MessageID, e.Timestamp, value, e.Label, - ); err != nil { - return err + if err := conn.webInputEvents.Append(sessionID, e.MessageID, e.Timestamp, value, e.Label); err != nil { + log.Printf("insert web input event err: %s", err) } - if err = tx.exec(` - UPDATE sessions SET events_count = events_count + 1 - WHERE session_id = $1`, - sessionID, - ); err != nil { - return err - } - if err = tx.commit(); err != nil { - return err - } - conn.insertAutocompleteValue(sessionID, "INPUT", e.Label) + conn.updateSessionEvents(sessionID, 1, 0) + conn.insertAutocompleteValue(sessionID, projectID, "INPUT", e.Label) return nil } -func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *ErrorEvent) error { - tx, err := conn.begin() +func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *ErrorEvent) (err error) { + tx, err := conn.c.Begin() if err != nil { return err } - defer tx.rollback() + defer func() { + if err != nil { + if rollbackErr := tx.rollback(); rollbackErr != nil { + log.Printf("rollback err: %s", rollbackErr) + } + } + }() errorID := hashid.WebErrorID(projectID, e) if err = tx.exec(` @@ -206,17 +132,18 @@ func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *Err ); err != nil { return err } - return tx.commit() + err = tx.commit() + return } -func (conn *Conn) InsertWebFetchEvent(sessionID uint64, savePayload bool, e *FetchEvent) error { +func (conn *Conn) InsertWebFetchEvent(sessionID uint64, projectID uint32, savePayload bool, e *FetchEvent) error { var request, response *string if savePayload { request = &e.Request response = &e.Response } host, path, query, err := url.GetURLParts(e.URL) - conn.insertAutocompleteValue(sessionID, "REQUEST", path) + conn.insertAutocompleteValue(sessionID, projectID, "REQUEST", path) if err != nil { return err } @@ -246,29 +173,15 @@ func (conn *Conn) InsertWebFetchEvent(sessionID uint64, savePayload bool, e *Fet return nil } -func (conn *Conn) InsertWebGraphQLEvent(sessionID uint64, savePayload bool, e *GraphQLEvent) error { +func (conn *Conn) InsertWebGraphQLEvent(sessionID uint64, projectID uint32, savePayload bool, e *GraphQLEvent) error { var request, response *string if savePayload { request = &e.Variables response = &e.Response } - conn.insertAutocompleteValue(sessionID, "GRAPHQL", e.OperationName) - - sqlRequest := ` - INSERT INTO events.graphql ( - session_id, timestamp, message_id, - name, - request_body, response_body - ) VALUES ( - $1, $2, $3, - left($4, 2700), - $5, $6 - ) ON CONFLICT DO NOTHING` - conn.batchQueue(sessionID, sqlRequest, sessionID, e.Timestamp, e.MessageID, - e.OperationName, request, response, - ) - - // Record approximate message size - conn.updateBatchSize(sessionID, len(sqlRequest)+len(e.OperationName)+len(e.Variables)+len(e.Response)+8*3) + if err := conn.webGraphQLEvents.Append(sessionID, e.Timestamp, e.MessageID, e.OperationName, request, response); err != nil { + log.Printf("insert web graphQL event err: %s", err) + } + conn.insertAutocompleteValue(sessionID, projectID, "GRAPHQL", e.OperationName) return nil } diff --git a/backend/pkg/db/postgres/pool.go b/backend/pkg/db/postgres/pool.go new file mode 100644 index 000000000..5f9cbaa29 --- /dev/null +++ b/backend/pkg/db/postgres/pool.go @@ -0,0 +1,175 @@ +package postgres + +import ( + "context" + "errors" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/instrument/syncfloat64" + "strings" + "time" +) + +// Pool is a pgx.Pool wrapper with metrics integration +type Pool interface { + Query(sql string, args ...interface{}) (pgx.Rows, error) + QueryRow(sql string, args ...interface{}) pgx.Row + Exec(sql string, arguments ...interface{}) error + SendBatch(b *pgx.Batch) pgx.BatchResults + Begin() (*_Tx, error) + Close() +} + +type poolImpl struct { + conn *pgxpool.Pool + sqlRequestTime syncfloat64.Histogram + sqlRequestCounter syncfloat64.Counter +} + +func (p *poolImpl) Query(sql string, args ...interface{}) (pgx.Rows, error) { + start := time.Now() + res, err := p.conn.Query(getTimeoutContext(), sql, args...) + method, table := methodName(sql) + p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), + attribute.String("method", method), attribute.String("table", table)) + p.sqlRequestCounter.Add(context.Background(), 1, + attribute.String("method", method), attribute.String("table", table)) + return res, err +} + +func (p *poolImpl) QueryRow(sql string, args ...interface{}) pgx.Row { + start := time.Now() + res := p.conn.QueryRow(getTimeoutContext(), sql, args...) + method, table := methodName(sql) + p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), + attribute.String("method", method), attribute.String("table", table)) + p.sqlRequestCounter.Add(context.Background(), 1, + attribute.String("method", method), attribute.String("table", table)) + return res +} + +func (p *poolImpl) Exec(sql string, arguments ...interface{}) error { + start := time.Now() + _, err := p.conn.Exec(getTimeoutContext(), sql, arguments...) + method, table := methodName(sql) + p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), + attribute.String("method", method), attribute.String("table", table)) + p.sqlRequestCounter.Add(context.Background(), 1, + attribute.String("method", method), attribute.String("table", table)) + return err +} + +func (p *poolImpl) SendBatch(b *pgx.Batch) pgx.BatchResults { + start := time.Now() + res := p.conn.SendBatch(getTimeoutContext(), b) + p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), + attribute.String("method", "sendBatch")) + p.sqlRequestCounter.Add(context.Background(), 1, + attribute.String("method", "sendBatch")) + return res +} + +func (p *poolImpl) Begin() (*_Tx, error) { + start := time.Now() + tx, err := p.conn.Begin(context.Background()) + p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), + attribute.String("method", "begin")) + p.sqlRequestCounter.Add(context.Background(), 1, + attribute.String("method", "begin")) + return &_Tx{tx, p.sqlRequestTime, p.sqlRequestCounter}, err +} + +func (p *poolImpl) Close() { + p.conn.Close() +} + +func NewPool(conn *pgxpool.Pool, sqlRequestTime syncfloat64.Histogram, sqlRequestCounter syncfloat64.Counter) (Pool, error) { + if conn == nil { + return nil, errors.New("conn is empty") + } + return &poolImpl{ + conn: conn, + sqlRequestTime: sqlRequestTime, + sqlRequestCounter: sqlRequestCounter, + }, nil +} + +// TX - start + +type _Tx struct { + pgx.Tx + sqlRequestTime syncfloat64.Histogram + sqlRequestCounter syncfloat64.Counter +} + +func (tx *_Tx) exec(sql string, args ...interface{}) error { + start := time.Now() + _, err := tx.Exec(context.Background(), sql, args...) + method, table := methodName(sql) + tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), + attribute.String("method", method), attribute.String("table", table)) + tx.sqlRequestCounter.Add(context.Background(), 1, + attribute.String("method", method), attribute.String("table", table)) + return err +} + +func (tx *_Tx) rollback() error { + start := time.Now() + err := tx.Rollback(context.Background()) + tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), + attribute.String("method", "rollback")) + tx.sqlRequestCounter.Add(context.Background(), 1, + attribute.String("method", "rollback")) + return err +} + +func (tx *_Tx) commit() error { + start := time.Now() + err := tx.Commit(context.Background()) + tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), + attribute.String("method", "commit")) + tx.sqlRequestCounter.Add(context.Background(), 1, + attribute.String("method", "commit")) + return err +} + +// TX - end + +func getTimeoutContext() context.Context { + ctx, _ := context.WithTimeout(context.Background(), time.Second*30) + return ctx +} + +func methodName(sql string) (string, string) { + cmd, table := "unknown", "unknown" + + // Prepare sql request for parsing + sql = strings.TrimSpace(sql) + sql = strings.ReplaceAll(sql, "\n", " ") + sql = strings.ReplaceAll(sql, "\t", "") + sql = strings.ToLower(sql) + + // Get sql command name + parts := strings.Split(sql, " ") + if parts[0] == "" { + return cmd, table + } else { + cmd = strings.TrimSpace(parts[0]) + } + + // Get table name + switch cmd { + case "select": + for i, p := range parts { + if strings.TrimSpace(p) == "from" { + table = strings.TrimSpace(parts[i+1]) + } + } + case "update": + table = strings.TrimSpace(parts[1]) + case "insert": + table = strings.TrimSpace(parts[2]) + } + return cmd, table +} diff --git a/backend/pkg/db/postgres/project.go b/backend/pkg/db/postgres/project.go index 066339791..f38161885 100644 --- a/backend/pkg/db/postgres/project.go +++ b/backend/pkg/db/postgres/project.go @@ -6,7 +6,7 @@ import ( func (conn *Conn) GetProjectByKey(projectKey string) (*Project, error) { p := &Project{ProjectKey: projectKey} - if err := conn.queryRow(` + if err := conn.c.QueryRow(` SELECT max_session_duration, sample_rate, project_id FROM projects WHERE project_key=$1 AND active = true @@ -21,7 +21,7 @@ func (conn *Conn) GetProjectByKey(projectKey string) (*Project, error) { // TODO: logical separation of metadata func (conn *Conn) GetProject(projectID uint32) (*Project, error) { p := &Project{ProjectID: projectID} - if err := conn.queryRow(` + if err := conn.c.QueryRow(` SELECT project_key, max_session_duration, save_request_payloads, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10 diff --git a/backend/pkg/db/postgres/session-updates.go b/backend/pkg/db/postgres/session-updates.go new file mode 100644 index 000000000..14260c2c6 --- /dev/null +++ b/backend/pkg/db/postgres/session-updates.go @@ -0,0 +1,30 @@ +package postgres + +// Mechanism of combination several session updates into one +const sessionUpdateReq = `UPDATE sessions SET (pages_count, events_count) = (pages_count + $1, events_count + $2) WHERE session_id = $3` + +type sessionUpdates struct { + sessionID uint64 + pages int + events int +} + +func NewSessionUpdates(sessionID uint64) *sessionUpdates { + return &sessionUpdates{ + sessionID: sessionID, + pages: 0, + events: 0, + } +} + +func (su *sessionUpdates) add(pages, events int) { + su.pages += pages + su.events += events +} + +func (su *sessionUpdates) request() (string, []interface{}) { + if su.pages == 0 && su.events == 0 { + return "", nil + } + return sessionUpdateReq, []interface{}{su.pages, su.events, su.sessionID} +} diff --git a/backend/pkg/db/postgres/session.go b/backend/pkg/db/postgres/session.go index 7148d9871..9735cdc1a 100644 --- a/backend/pkg/db/postgres/session.go +++ b/backend/pkg/db/postgres/session.go @@ -1,14 +1,11 @@ package postgres -//import . "openreplay/backend/pkg/messages" import . "openreplay/backend/pkg/db/types" -//import "log" - func (conn *Conn) GetSession(sessionID uint64) (*Session, error) { s := &Session{SessionID: sessionID} var revID, userOSVersion *string - if err := conn.queryRow(` + if err := conn.c.QueryRow(` SELECT platform, duration, project_id, start_ts, user_uuid, user_os, user_os_version, @@ -39,69 +36,3 @@ func (conn *Conn) GetSession(sessionID uint64) (*Session, error) { } return s, nil } - -// func (conn *Conn) GetSessionClickEvents(sessionID uint64) (list []IOSClickEvent, err error) { -// rows, err := conn.query(` -// SELECT -// timestamp, seq_index, label -// FROM events_ios.clicks -// WHERE session_id=$1 -// `, sessionID) -// if err != nil { -// return err -// } -// defer rows.Close() -// for rows.Next() { -// e := new(IOSClickEvent) -// if err = rows.Scan(&e.Timestamp, &e.Index, &e.Label); err != nil { -// log.Printf("Error while scanning click events: %v", err) -// } else { -// list = append(list, e) -// } -// } -// return list -// } - -// func (conn *Conn) GetSessionInputEvents(sessionID uint64) (list []IOSInputEvent, err error) { -// rows, err := conn.query(` -// SELECT -// timestamp, seq_index, label, value -// FROM events_ios.inputs -// WHERE session_id=$1 -// `, sessionID) -// if err != nil { -// return err -// } -// defer rows.Close() -// for rows.Next() { -// e := new(IOSInputEvent) -// if err = rows.Scan(&e.Timestamp, &e.Index, &e.Label, &e.Value); err != nil { -// log.Printf("Error while scanning click events: %v", err) -// } else { -// list = append(list, e) -// } -// } -// return list -// } - -// func (conn *Conn) GetSessionCrashEvents(sessionID uint64) (list []IOSCrash, err error) { -// rows, err := conn.query(` -// SELECT -// timestamp, seq_index -// FROM events_ios.crashes -// WHERE session_id=$1 -// `, sessionID) -// if err != nil { -// return err -// } -// defer rows.Close() -// for rows.Next() { -// e := new(IOSCrash) -// if err = rows.Scan(&e.Timestamp, &e.Index, &e.Label, &e.Value); err != nil { -// log.Printf("Error while scanning click events: %v", err) -// } else { -// list = append(list, e) -// } -// } -// return list -// } diff --git a/backend/pkg/db/postgres/unstarted-session.go b/backend/pkg/db/postgres/unstarted-session.go index 2a9a71037..cc27e3f5d 100644 --- a/backend/pkg/db/postgres/unstarted-session.go +++ b/backend/pkg/db/postgres/unstarted-session.go @@ -16,7 +16,7 @@ type UnstartedSession struct { } func (conn *Conn) InsertUnstartedSession(s UnstartedSession) error { - return conn.exec(` + return conn.c.Exec(` INSERT INTO unstarted_sessions ( project_id, tracker_version, do_not_track,