diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index f1a1dcf48..f9440a908 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -114,8 +114,8 @@ func main() { consumer := queue.NewConsumer( cfg.GroupDB, []string{ - cfg.TopicRawWeb, - cfg.TopicAnalytics, + cfg.TopicRawWeb, // from tracker + cfg.TopicAnalytics, // from heuristics }, messages.NewMessageIterator(msgHandler, msgFilter, true), false, @@ -131,30 +131,34 @@ func main() { // Send collected batches to db commitDBUpdates := func() { - start := time.Now() - pg.CommitBatches() - pgDur := time.Now().Sub(start).Milliseconds() - - start = time.Now() + // Commit collected batches and bulks of information to PG + pg.Commit() + // Commit collected batches of information to CH if err := saver.CommitStats(); err != nil { log.Printf("Error on stats commit: %v", err) } - chDur := time.Now().Sub(start).Milliseconds() - log.Printf("commit duration(ms), pg: %d, ch: %d", pgDur, chDur) - + // Commit current position in queue if err := consumer.Commit(); err != nil { log.Printf("Error on consumer commit: %v", err) } } + for { select { case sig := <-sigchan: log.Printf("Caught signal %s: terminating\n", sig.String()) commitDBUpdates() + if err := pg.Close(); err != nil { + log.Printf("db.Close error: %s", err) + } + if err := saver.Close(); err != nil { + log.Printf("saver.Close error: %s", err) + } consumer.Close() os.Exit(0) case <-commitTick: commitDBUpdates() + builderMap.ClearOldSessions() case msg := <-consumer.Rebalanced(): log.Println(msg) default: diff --git a/backend/internal/db/datasaver/stats.go b/backend/internal/db/datasaver/stats.go index b523ecdbe..c7daeb3dc 100644 --- a/backend/internal/db/datasaver/stats.go +++ b/backend/internal/db/datasaver/stats.go @@ -23,3 +23,7 @@ func (si *Saver) InsertStats(session *Session, msg Message) error { func (si *Saver) CommitStats() error { return nil } + +func (si *Saver) Close() error { + return nil +} diff --git a/backend/pkg/db/postgres/batches.go b/backend/pkg/db/postgres/batches.go new file mode 100644 index 000000000..c1283da10 --- /dev/null +++ b/backend/pkg/db/postgres/batches.go @@ -0,0 +1,241 @@ +package postgres + +import ( + "context" + "github.com/jackc/pgx/v4" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/instrument/syncfloat64" + "log" + "openreplay/backend/pkg/monitoring" + "strings" + "time" +) + +type batchItem struct { + query string + arguments []interface{} +} + +type SessionBatch struct { + sessID uint64 + batch *pgx.Batch + size int + items []*batchItem + updates *sessionUpdates +} + +func NewSessionBatch(sessionID uint64) *SessionBatch { + return &SessionBatch{ + sessID: sessionID, + batch: &pgx.Batch{}, + size: 0, + items: make([]*batchItem, 0), + updates: NewSessionUpdates(sessionID), + } +} + +func (b *SessionBatch) SessionID() uint64 { + return b.sessID +} + +func (b *SessionBatch) Queue(query string, arguments ...interface{}) { + b.batch.Queue(query, arguments...) + b.items = append(b.items, &batchItem{ + query: query, + arguments: arguments, + }) +} + +func (b *SessionBatch) Update(pages, events int) { + b.updates.addEvents(pages, events) +} + +func (b *SessionBatch) AddToSize(size int) { + b.size += size +} + +func (b *SessionBatch) Size() int { + return b.size +} + +func (b *SessionBatch) Len() int { + return b.batch.Len() +} + +func (b *SessionBatch) Prepare() { + sql, args := b.updates.request() + if sql != "" { + b.Queue(sql, args...) + } +} + +type batchesTask struct { + batches []*SessionBatch +} + +func NewBatchesTask(size int) *batchesTask { + return &batchesTask{batches: make([]*SessionBatch, 0, size)} +} + +type BatchSet struct { + c Pool + batches map[uint64]*SessionBatch + batchQueueLimit int + batchSizeLimit int + batchSizeBytes syncfloat64.Histogram + batchSizeLines syncfloat64.Histogram + sqlRequestTime syncfloat64.Histogram + sqlRequestCounter syncfloat64.Counter + updates map[uint64]*sessionUpdates + workerTask chan *batchesTask + done chan struct{} + finished chan struct{} +} + +func NewBatchSet(c Pool, queueLimit, sizeLimit int, metrics *monitoring.Metrics) *BatchSet { + bs := &BatchSet{ + c: c, + batches: make(map[uint64]*SessionBatch), + batchQueueLimit: queueLimit, + batchSizeLimit: sizeLimit, + workerTask: make(chan *batchesTask, 1), + done: make(chan struct{}), + finished: make(chan struct{}), + updates: make(map[uint64]*sessionUpdates), + } + bs.initMetrics(metrics) + go bs.worker() + return bs +} + +func (conn *BatchSet) initMetrics(metrics *monitoring.Metrics) { + var err error + conn.batchSizeBytes, err = metrics.RegisterHistogram("batch_size_bytes") + if err != nil { + log.Printf("can't create batchSizeBytes metric: %s", err) + } + conn.batchSizeLines, err = metrics.RegisterHistogram("batch_size_lines") + if err != nil { + log.Printf("can't create batchSizeLines metric: %s", err) + } + conn.sqlRequestTime, err = metrics.RegisterHistogram("sql_request_time") + if err != nil { + log.Printf("can't create sqlRequestTime metric: %s", err) + } + conn.sqlRequestCounter, err = metrics.RegisterCounter("sql_request_number") + if err != nil { + log.Printf("can't create sqlRequestNumber metric: %s", err) + } +} + +func (conn *BatchSet) getBatch(sessionID uint64) *SessionBatch { + sessionID = sessionID % 10 + if _, ok := conn.batches[sessionID]; !ok { + conn.batches[sessionID] = NewSessionBatch(sessionID) + } + return conn.batches[sessionID] +} + +func (conn *BatchSet) batchQueue(sessionID uint64, sql string, args ...interface{}) { + conn.getBatch(sessionID).Queue(sql, args...) +} + +func (conn *BatchSet) updateSessionEvents(sessionID uint64, events, pages int) { + upd, ok := conn.updates[sessionID] + if !ok { + upd = NewSessionUpdates(sessionID) + conn.updates[sessionID] = upd + } + upd.addEvents(pages, events) +} + +func (conn *BatchSet) updateSessionIssues(sessionID uint64, errors, issueScore int) { + upd, ok := conn.updates[sessionID] + if !ok { + upd = NewSessionUpdates(sessionID) + conn.updates[sessionID] = upd + } + upd.addIssues(errors, issueScore) +} + +func (conn *BatchSet) updateBatchSize(sessionID uint64, reqSize int) { + conn.getBatch(sessionID).AddToSize(reqSize) +} + +func (conn *BatchSet) Commit() { + newTask := NewBatchesTask(len(conn.batches) + 2) + // Copy batches + for _, b := range conn.batches { + newTask.batches = append(newTask.batches, b) + } + // Reset current batches + conn.batches = make(map[uint64]*SessionBatch) + + // common batch for user's updates + batch := NewSessionBatch(0) + for _, upd := range conn.updates { + if str, args := upd.request(); str != "" { + batch.Queue(str, args...) + } + } + newTask.batches = append(newTask.batches, batch) + conn.updates = make(map[uint64]*sessionUpdates) + + conn.workerTask <- newTask +} + +func (conn *BatchSet) Stop() { + conn.done <- struct{}{} + <-conn.finished +} + +func (conn *BatchSet) sendBatches(t *batchesTask) { + for _, batch := range t.batches { + // Append session update sql request to the end of batch + batch.Prepare() + // Record batch size in bytes and number of lines + conn.batchSizeBytes.Record(context.Background(), float64(batch.Size())) + conn.batchSizeLines.Record(context.Background(), float64(batch.Len())) + + start := time.Now() + isFailed := false + + // Send batch to db and execute + br := conn.c.SendBatch(batch.batch) + l := batch.Len() + for i := 0; i < l; i++ { + if _, err := br.Exec(); err != nil { + log.Printf("Error in PG batch (session: %d): %v \n", batch.SessionID(), err) + failedSql := batch.items[i] + query := strings.ReplaceAll(failedSql.query, "\n", " ") + log.Println("failed sql req:", query, failedSql.arguments) + isFailed = true + } + } + br.Close() // returns err + dur := time.Now().Sub(start).Milliseconds() + conn.sqlRequestTime.Record(context.Background(), float64(dur), + attribute.String("method", "batch"), attribute.Bool("failed", isFailed)) + conn.sqlRequestCounter.Add(context.Background(), 1, + attribute.String("method", "batch"), attribute.Bool("failed", isFailed)) + } +} + +func (conn *BatchSet) worker() { + for { + select { + case t := <-conn.workerTask: + start := time.Now() + conn.sendBatches(t) + log.Printf("pg batches dur: %d", time.Now().Sub(start).Milliseconds()) + case <-conn.done: + if len(conn.workerTask) > 0 { + for t := range conn.workerTask { + conn.sendBatches(t) + } + } + conn.finished <- struct{}{} + return + } + } +} diff --git a/backend/pkg/db/postgres/bulk.go b/backend/pkg/db/postgres/bulk.go index 7b9bf90c8..8c6c42f78 100644 --- a/backend/pkg/db/postgres/bulk.go +++ b/backend/pkg/db/postgres/bulk.go @@ -21,6 +21,7 @@ const ( type Bulk interface { Append(args ...interface{}) error Send() error + Table() string } type bulkImpl struct { @@ -53,6 +54,10 @@ func (b *bulkImpl) Send() error { return b.send() } +func (b *bulkImpl) Table() string { + return b.table +} + func (b *bulkImpl) send() error { start := time.Now() size := len(b.values) / b.setSize diff --git a/backend/pkg/db/postgres/bulks.go b/backend/pkg/db/postgres/bulks.go new file mode 100644 index 000000000..5774ba184 --- /dev/null +++ b/backend/pkg/db/postgres/bulks.go @@ -0,0 +1,262 @@ +package postgres + +import ( + "log" + "openreplay/backend/pkg/monitoring" + "time" +) + +type bulksTask struct { + bulks []Bulk +} + +func NewBulksTask() *bulksTask { + return &bulksTask{bulks: make([]Bulk, 0, 14)} +} + +type BulkSet struct { + c Pool + autocompletes Bulk + requests Bulk + customEvents Bulk + webPageEvents Bulk + webInputEvents Bulk + webGraphQL Bulk + webErrors Bulk + webErrorEvents Bulk + webErrorTags Bulk + webIssues Bulk + webIssueEvents Bulk + webCustomEvents Bulk + webClickEvents Bulk + webNetworkRequest Bulk + metrics *monitoring.Metrics + workerTask chan *bulksTask + done chan struct{} + finished chan struct{} +} + +func NewBulkSet(c Pool, metrics *monitoring.Metrics) *BulkSet { + bs := &BulkSet{ + c: c, + metrics: metrics, + workerTask: make(chan *bulksTask, 1), + done: make(chan struct{}), + finished: make(chan struct{}), + } + bs.initBulks() + go bs.worker() + return bs +} + +func (conn *BulkSet) Get(name string) Bulk { + switch name { + case "autocompletes": + return conn.autocompletes + case "requests": + return conn.requests + case "customEvents": + return conn.customEvents + case "webPageEvents": + return conn.webPageEvents + case "webInputEvents": + return conn.webInputEvents + case "webGraphQL": + return conn.webGraphQL + case "webErrors": + return conn.webErrors + case "webErrorEvents": + return conn.webErrorEvents + case "webErrorTags": + return conn.webErrorTags + case "webIssues": + return conn.webIssues + case "webIssueEvents": + return conn.webIssueEvents + case "webCustomEvents": + return conn.webCustomEvents + case "webClickEvents": + return conn.webClickEvents + case "webNetworkRequest": + return conn.webNetworkRequest + default: + return nil + } +} + +func (conn *BulkSet) initBulks() { + var err error + conn.autocompletes, err = NewBulk(conn.c, conn.metrics, + "autocomplete", + "(value, type, project_id)", + "($%d, $%d, $%d)", + 3, 200) + if err != nil { + log.Fatalf("can't create autocomplete bulk: %s", err) + } + conn.requests, err = NewBulk(conn.c, conn.metrics, + "events_common.requests", + "(session_id, timestamp, seq_index, url, duration, success)", + "($%d, $%d, $%d, LEFT($%d, 8000), $%d, $%d)", + 6, 200) + if err != nil { + log.Fatalf("can't create requests bulk: %s", err) + } + conn.customEvents, err = NewBulk(conn.c, conn.metrics, + "events_common.customs", + "(session_id, timestamp, seq_index, name, payload)", + "($%d, $%d, $%d, LEFT($%d, 2000), $%d)", + 5, 200) + if err != nil { + log.Fatalf("can't create customEvents bulk: %s", err) + } + conn.webPageEvents, err = NewBulk(conn.c, conn.metrics, + "events.pages", + "(session_id, message_id, timestamp, referrer, base_referrer, host, path, query, dom_content_loaded_time, "+ + "load_time, response_end, first_paint_time, first_contentful_paint_time, speed_index, visually_complete, "+ + "time_to_interactive, response_time, dom_building_time)", + "($%d, $%d, $%d, LEFT($%d, 8000), LEFT($%d, 8000), LEFT($%d, 300), LEFT($%d, 2000), LEFT($%d, 8000), "+ + "NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0),"+ + " NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0))", + 18, 200) + if err != nil { + log.Fatalf("can't create webPageEvents bulk: %s", err) + } + conn.webInputEvents, err = NewBulk(conn.c, conn.metrics, + "events.inputs", + "(session_id, message_id, timestamp, value, label)", + "($%d, $%d, $%d, LEFT($%d, 2000), NULLIF(LEFT($%d, 2000),''))", + 5, 200) + if err != nil { + log.Fatalf("can't create webPageEvents bulk: %s", err) + } + conn.webGraphQL, err = NewBulk(conn.c, conn.metrics, + "events.graphql", + "(session_id, timestamp, message_id, name, request_body, response_body)", + "($%d, $%d, $%d, LEFT($%d, 2000), $%d, $%d)", + 6, 200) + if err != nil { + log.Fatalf("can't create webPageEvents bulk: %s", err) + } + conn.webErrors, err = NewBulk(conn.c, conn.metrics, + "errors", + "(error_id, project_id, source, name, message, payload)", + "($%d, $%d, $%d, $%d, $%d, $%d::jsonb)", + 6, 200) + if err != nil { + log.Fatalf("can't create webErrors bulk: %s", err) + } + conn.webErrorEvents, err = NewBulk(conn.c, conn.metrics, + "events.errors", + "(session_id, message_id, timestamp, error_id)", + "($%d, $%d, $%d, $%d)", + 4, 200) + if err != nil { + log.Fatalf("can't create webErrorEvents bulk: %s", err) + } + conn.webErrorTags, err = NewBulk(conn.c, conn.metrics, + "public.errors_tags", + "(session_id, message_id, error_id, key, value)", + "($%d, $%d, $%d, $%d, $%d)", + 5, 200) + if err != nil { + log.Fatalf("can't create webErrorEvents bulk: %s", err) + } + conn.webIssues, err = NewBulk(conn.c, conn.metrics, + "issues", + "(project_id, issue_id, type, context_string)", + "($%d, $%d, $%d, $%d)", + 4, 200) + if err != nil { + log.Fatalf("can't create webIssues bulk: %s", err) + } + conn.webIssueEvents, err = NewBulk(conn.c, conn.metrics, + "events_common.issues", + "(session_id, issue_id, timestamp, seq_index, payload)", + "($%d, $%d, $%d, $%d, CAST($%d AS jsonb))", + 5, 200) + if err != nil { + log.Fatalf("can't create webIssueEvents bulk: %s", err) + } + conn.webCustomEvents, err = NewBulk(conn.c, conn.metrics, + "events_common.customs", + "(session_id, seq_index, timestamp, name, payload, level)", + "($%d, $%d, $%d, LEFT($%d, 2000), $%d, $%d)", + 6, 200) + if err != nil { + log.Fatalf("can't create webCustomEvents bulk: %s", err) + } + conn.webClickEvents, err = NewBulk(conn.c, conn.metrics, + "events.clicks", + "(session_id, message_id, timestamp, label, selector, url, path)", + "($%d, $%d, $%d, NULLIF(LEFT($%d, 2000), ''), LEFT($%d, 8000), LEFT($%d, 2000), LEFT($%d, 2000))", + 7, 200) + if err != nil { + log.Fatalf("can't create webClickEvents bulk: %s", err) + } + conn.webNetworkRequest, err = NewBulk(conn.c, conn.metrics, + "events_common.requests", + "(session_id, timestamp, seq_index, url, host, path, query, request_body, response_body, status_code, method, duration, success)", + "($%d, $%d, $%d, LEFT($%d, 8000), LEFT($%d, 300), LEFT($%d, 2000), LEFT($%d, 8000), $%d, $%d, $%d::smallint, NULLIF($%d, '')::http_method, $%d, $%d)", + 13, 200) + if err != nil { + log.Fatalf("can't create webNetworkRequest bulk: %s", err) + } +} + +func (conn *BulkSet) Send() { + newTask := NewBulksTask() + + // Prepare set of bulks to send + newTask.bulks = append(newTask.bulks, conn.autocompletes) + newTask.bulks = append(newTask.bulks, conn.requests) + newTask.bulks = append(newTask.bulks, conn.customEvents) + newTask.bulks = append(newTask.bulks, conn.webPageEvents) + newTask.bulks = append(newTask.bulks, conn.webInputEvents) + newTask.bulks = append(newTask.bulks, conn.webGraphQL) + newTask.bulks = append(newTask.bulks, conn.webErrors) + newTask.bulks = append(newTask.bulks, conn.webErrorEvents) + newTask.bulks = append(newTask.bulks, conn.webErrorTags) + newTask.bulks = append(newTask.bulks, conn.webIssues) + newTask.bulks = append(newTask.bulks, conn.webIssueEvents) + newTask.bulks = append(newTask.bulks, conn.webCustomEvents) + newTask.bulks = append(newTask.bulks, conn.webClickEvents) + newTask.bulks = append(newTask.bulks, conn.webNetworkRequest) + + conn.workerTask <- newTask + + // Reset new bulks + conn.initBulks() +} + +func (conn *BulkSet) Stop() { + conn.done <- struct{}{} + <-conn.finished +} + +func (conn *BulkSet) sendBulks(t *bulksTask) { + for _, bulk := range t.bulks { + if err := bulk.Send(); err != nil { + log.Printf("%s bulk send err: %s", bulk.Table(), err) + } + } +} + +func (conn *BulkSet) worker() { + for { + select { + case t := <-conn.workerTask: + start := time.Now() + conn.sendBulks(t) + log.Printf("pg bulks dur: %d", time.Now().Sub(start).Milliseconds()) + case <-conn.done: + if len(conn.workerTask) > 0 { + for t := range conn.workerTask { + conn.sendBulks(t) + } + } + conn.finished <- struct{}{} + return + } + } +} diff --git a/backend/pkg/db/postgres/connector.go b/backend/pkg/db/postgres/connector.go index f2d016346..2e8f3d425 100644 --- a/backend/pkg/db/postgres/connector.go +++ b/backend/pkg/db/postgres/connector.go @@ -2,50 +2,22 @@ package postgres import ( "context" - "go.opentelemetry.io/otel/attribute" + "github.com/jackc/pgx/v4/pgxpool" "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "log" "openreplay/backend/pkg/db/types" "openreplay/backend/pkg/monitoring" - "strings" - "time" - - "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/pgxpool" ) type CH interface { InsertAutocomplete(session *types.Session, msgType, msgValue string) error } -type batchItem struct { - query string - arguments []interface{} -} - // Conn contains batches, bulks and cache for all sessions type Conn struct { c Pool - batches map[uint64]*pgx.Batch - batchSizes map[uint64]int - rawBatches map[uint64][]*batchItem - autocompletes Bulk - requests Bulk - customEvents Bulk - webPageEvents Bulk - webInputEvents Bulk - webGraphQL Bulk - webErrors Bulk - webErrorEvents Bulk - webErrorTags Bulk - webIssues Bulk - webIssueEvents Bulk - webCustomEvents Bulk - webClickEvents Bulk - webNetworkRequest Bulk - sessionUpdates map[uint64]*sessionUpdates - batchQueueLimit int - batchSizeLimit int + batches *BatchSet + bulks *BulkSet batchSizeBytes syncfloat64.Histogram batchSizeLines syncfloat64.Histogram sqlRequestTime syncfloat64.Histogram @@ -65,24 +37,20 @@ func NewConn(url string, queueLimit, sizeLimit int, metrics *monitoring.Metrics) if err != nil { log.Fatalf("pgxpool.Connect err: %s", err) } - conn := &Conn{ - batches: make(map[uint64]*pgx.Batch), - batchSizes: make(map[uint64]int), - rawBatches: make(map[uint64][]*batchItem), - sessionUpdates: make(map[uint64]*sessionUpdates), - batchQueueLimit: queueLimit, - batchSizeLimit: sizeLimit, - } + conn := &Conn{} conn.initMetrics(metrics) conn.c, err = NewPool(c, conn.sqlRequestTime, conn.sqlRequestCounter) if err != nil { log.Fatalf("can't create new pool wrapper: %s", err) } - conn.initBulks(metrics) + conn.bulks = NewBulkSet(conn.c, metrics) + conn.batches = NewBatchSet(conn.c, queueLimit, sizeLimit, metrics) return conn } func (conn *Conn) Close() error { + conn.bulks.Stop() + conn.batches.Stop() conn.c.Close() return nil } @@ -107,131 +75,11 @@ func (conn *Conn) initMetrics(metrics *monitoring.Metrics) { } } -func (conn *Conn) initBulks(metrics *monitoring.Metrics) { - var err error - conn.autocompletes, err = NewBulk(conn.c, metrics, - "autocomplete", - "(value, type, project_id)", - "($%d, $%d, $%d)", - 3, 200) - if err != nil { - log.Fatalf("can't create autocomplete bulk: %s", err) - } - conn.requests, err = NewBulk(conn.c, metrics, - "events_common.requests", - "(session_id, timestamp, seq_index, url, duration, success)", - "($%d, $%d, $%d, LEFT($%d, 8000), $%d, $%d)", - 6, 200) - if err != nil { - log.Fatalf("can't create requests bulk: %s", err) - } - conn.customEvents, err = NewBulk(conn.c, metrics, - "events_common.customs", - "(session_id, timestamp, seq_index, name, payload)", - "($%d, $%d, $%d, LEFT($%d, 2000), $%d)", - 5, 200) - if err != nil { - log.Fatalf("can't create customEvents bulk: %s", err) - } - conn.webPageEvents, err = NewBulk(conn.c, metrics, - "events.pages", - "(session_id, message_id, timestamp, referrer, base_referrer, host, path, query, dom_content_loaded_time, "+ - "load_time, response_end, first_paint_time, first_contentful_paint_time, speed_index, visually_complete, "+ - "time_to_interactive, response_time, dom_building_time)", - "($%d, $%d, $%d, LEFT($%d, 8000), LEFT($%d, 8000), LEFT($%d, 300), LEFT($%d, 2000), LEFT($%d, 8000), "+ - "NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0),"+ - " NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0))", - 18, 200) - if err != nil { - log.Fatalf("can't create webPageEvents bulk: %s", err) - } - conn.webInputEvents, err = NewBulk(conn.c, metrics, - "events.inputs", - "(session_id, message_id, timestamp, value, label)", - "($%d, $%d, $%d, LEFT($%d, 2000), NULLIF(LEFT($%d, 2000),''))", - 5, 200) - if err != nil { - log.Fatalf("can't create webPageEvents bulk: %s", err) - } - conn.webGraphQL, err = NewBulk(conn.c, metrics, - "events.graphql", - "(session_id, timestamp, message_id, name, request_body, response_body)", - "($%d, $%d, $%d, LEFT($%d, 2000), $%d, $%d)", - 6, 200) - if err != nil { - log.Fatalf("can't create webPageEvents bulk: %s", err) - } - conn.webErrors, err = NewBulk(conn.c, metrics, - "errors", - "(error_id, project_id, source, name, message, payload)", - "($%d, $%d, $%d, $%d, $%d, $%d::jsonb)", - 6, 200) - if err != nil { - log.Fatalf("can't create webErrors bulk: %s", err) - } - conn.webErrorEvents, err = NewBulk(conn.c, metrics, - "events.errors", - "(session_id, message_id, timestamp, error_id)", - "($%d, $%d, $%d, $%d)", - 4, 200) - if err != nil { - log.Fatalf("can't create webErrorEvents bulk: %s", err) - } - conn.webErrorTags, err = NewBulk(conn.c, metrics, - "public.errors_tags", - "(session_id, message_id, error_id, key, value)", - "($%d, $%d, $%d, $%d, $%d)", - 5, 200) - if err != nil { - log.Fatalf("can't create webErrorEvents bulk: %s", err) - } - conn.webIssues, err = NewBulk(conn.c, metrics, - "issues", - "(project_id, issue_id, type, context_string)", - "($%d, $%d, $%d, $%d)", - 4, 200) - if err != nil { - log.Fatalf("can't create webIssues bulk: %s", err) - } - conn.webIssueEvents, err = NewBulk(conn.c, metrics, - "events_common.issues", - "(session_id, issue_id, timestamp, seq_index, payload)", - "($%d, $%d, $%d, $%d, CAST($%d AS jsonb))", - 5, 200) - if err != nil { - log.Fatalf("can't create webIssueEvents bulk: %s", err) - } - conn.webCustomEvents, err = NewBulk(conn.c, metrics, - "events_common.customs", - "(session_id, seq_index, timestamp, name, payload, level)", - "($%d, $%d, $%d, LEFT($%d, 2000), $%d, $%d)", - 6, 200) - if err != nil { - log.Fatalf("can't create webCustomEvents bulk: %s", err) - } - conn.webClickEvents, err = NewBulk(conn.c, metrics, - "events.clicks", - "(session_id, message_id, timestamp, label, selector, url, path)", - "($%d, $%d, $%d, NULLIF(LEFT($%d, 2000), ''), LEFT($%d, 8000), LEFT($%d, 2000), LEFT($%d, 2000))", - 7, 200) - if err != nil { - log.Fatalf("can't create webClickEvents bulk: %s", err) - } - conn.webNetworkRequest, err = NewBulk(conn.c, metrics, - "events_common.requests", - "(session_id, timestamp, seq_index, url, host, path, query, request_body, response_body, status_code, method, duration, success)", - "($%d, $%d, $%d, LEFT($%d, 8000), LEFT($%d, 300), LEFT($%d, 2000), LEFT($%d, 8000), $%d, $%d, $%d::smallint, NULLIF($%d, '')::http_method, $%d, $%d)", - 13, 200) - if err != nil { - log.Fatalf("can't create webNetworkRequest bulk: %s", err) - } -} - func (conn *Conn) insertAutocompleteValue(sessionID uint64, projectID uint32, tp string, value string) { if len(value) == 0 { return } - if err := conn.autocompletes.Append(value, tp, projectID); err != nil { + if err := conn.bulks.Get("autocompletes").Append(value, tp, projectID); err != nil { log.Printf("autocomplete bulk err: %s", err) } if conn.chConn == nil { @@ -244,192 +92,22 @@ func (conn *Conn) insertAutocompleteValue(sessionID uint64, projectID uint32, tp } func (conn *Conn) batchQueue(sessionID uint64, sql string, args ...interface{}) { - batch, ok := conn.batches[sessionID] - if !ok { - conn.batches[sessionID] = &pgx.Batch{} - conn.rawBatches[sessionID] = make([]*batchItem, 0) - batch = conn.batches[sessionID] - } - batch.Queue(sql, args...) - conn.rawBatch(sessionID, sql, args...) -} - -func (conn *Conn) rawBatch(sessionID uint64, sql string, args ...interface{}) { - // Temp raw batch store - raw := conn.rawBatches[sessionID] - raw = append(raw, &batchItem{ - query: sql, - arguments: args, - }) - conn.rawBatches[sessionID] = raw + conn.batches.batchQueue(sessionID, sql, args...) } func (conn *Conn) updateSessionEvents(sessionID uint64, events, pages int) { - if _, ok := conn.sessionUpdates[sessionID]; !ok { - conn.sessionUpdates[sessionID] = NewSessionUpdates(sessionID) - } - conn.sessionUpdates[sessionID].addEvents(pages, events) + conn.batches.updateSessionEvents(sessionID, events, pages) } func (conn *Conn) updateSessionIssues(sessionID uint64, errors, issueScore int) { - if _, ok := conn.sessionUpdates[sessionID]; !ok { - conn.sessionUpdates[sessionID] = NewSessionUpdates(sessionID) - } - conn.sessionUpdates[sessionID].addIssues(errors, issueScore) + conn.batches.updateSessionIssues(sessionID, errors, issueScore) } -func (conn *Conn) sendBulks() { - if err := conn.autocompletes.Send(); err != nil { - log.Printf("autocomplete bulk send err: %s", err) - } - if err := conn.requests.Send(); err != nil { - log.Printf("requests bulk send err: %s", err) - } - if err := conn.customEvents.Send(); err != nil { - log.Printf("customEvents bulk send err: %s", err) - } - if err := conn.webPageEvents.Send(); err != nil { - log.Printf("webPageEvents bulk send err: %s", err) - } - if err := conn.webInputEvents.Send(); err != nil { - log.Printf("webInputEvents bulk send err: %s", err) - } - if err := conn.webGraphQL.Send(); err != nil { - log.Printf("webGraphQL bulk send err: %s", err) - } - if err := conn.webErrors.Send(); err != nil { - log.Printf("webErrors bulk send err: %s", err) - } - if err := conn.webErrorEvents.Send(); err != nil { - log.Printf("webErrorEvents bulk send err: %s", err) - } - if err := conn.webErrorTags.Send(); err != nil { - log.Printf("webErrorTags bulk send err: %s", err) - } - if err := conn.webIssues.Send(); err != nil { - log.Printf("webIssues bulk send err: %s", err) - } - if err := conn.webIssueEvents.Send(); err != nil { - log.Printf("webIssueEvents bulk send err: %s", err) - } - if err := conn.webCustomEvents.Send(); err != nil { - log.Printf("webCustomEvents bulk send err: %s", err) - } - if err := conn.webClickEvents.Send(); err != nil { - log.Printf("webClickEvents bulk send err: %s", err) - } - if err := conn.webNetworkRequest.Send(); err != nil { - log.Printf("webNetworkRequest bulk send err: %s", err) - } -} - -func (conn *Conn) CommitBatches() { - conn.sendBulks() - for sessID, b := range conn.batches { - // Append session update sql request to the end of batch - if update, ok := conn.sessionUpdates[sessID]; ok { - sql, args := update.request() - if sql != "" { - conn.batchQueue(sessID, sql, args...) - b, _ = conn.batches[sessID] - } - } - // Record batch size in bytes and number of lines - conn.batchSizeBytes.Record(context.Background(), float64(conn.batchSizes[sessID])) - conn.batchSizeLines.Record(context.Background(), float64(b.Len())) - - start := time.Now() - isFailed := false - - // Send batch to db and execute - br := conn.c.SendBatch(b) - l := b.Len() - for i := 0; i < l; i++ { - if ct, err := br.Exec(); err != nil { - log.Printf("Error in PG batch (command tag %s, session: %d): %v \n", ct.String(), sessID, err) - failedSql := conn.rawBatches[sessID][i] - query := strings.ReplaceAll(failedSql.query, "\n", " ") - log.Println("failed sql req:", query, failedSql.arguments) - isFailed = true - } - } - br.Close() // returns err - conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), - attribute.String("method", "batch"), attribute.Bool("failed", isFailed)) - conn.sqlRequestCounter.Add(context.Background(), 1, - attribute.String("method", "batch"), attribute.Bool("failed", isFailed)) - if !isFailed { - delete(conn.sessionUpdates, sessID) - } - } - conn.batches = make(map[uint64]*pgx.Batch) - conn.batchSizes = make(map[uint64]int) - conn.rawBatches = make(map[uint64][]*batchItem) - - // Session updates - for sessID, su := range conn.sessionUpdates { - sql, args := su.request() - if sql == "" { - continue - } - if err := conn.c.Exec(sql, args...); err != nil { - log.Printf("failed session update, sessID: %d, err: %s", sessID, err) - } - } - conn.sessionUpdates = make(map[uint64]*sessionUpdates) +func (conn *Conn) Commit() { + conn.bulks.Send() + conn.batches.Commit() } func (conn *Conn) updateBatchSize(sessionID uint64, reqSize int) { - conn.batchSizes[sessionID] += reqSize - if conn.batchSizes[sessionID] >= conn.batchSizeLimit || conn.batches[sessionID].Len() >= conn.batchQueueLimit { - conn.commitBatch(sessionID) - } -} - -// Send only one batch to pg -func (conn *Conn) commitBatch(sessionID uint64) { - b, ok := conn.batches[sessionID] - if !ok { - log.Printf("can't find batch for session: %d", sessionID) - return - } - // Append session update sql request to the end of batch - if update, ok := conn.sessionUpdates[sessionID]; ok { - sql, args := update.request() - if sql != "" { - conn.batchQueue(sessionID, sql, args...) - b, _ = conn.batches[sessionID] - } - } - // Record batch size in bytes and number of lines - conn.batchSizeBytes.Record(context.Background(), float64(conn.batchSizes[sessionID])) - conn.batchSizeLines.Record(context.Background(), float64(b.Len())) - - start := time.Now() - isFailed := false - - // Send batch to db and execute - br := conn.c.SendBatch(b) - l := b.Len() - for i := 0; i < l; i++ { - if ct, err := br.Exec(); err != nil { - log.Printf("Error in PG batch (command tag %s, session: %d): %v \n", ct.String(), sessionID, err) - failedSql := conn.rawBatches[sessionID][i] - query := strings.ReplaceAll(failedSql.query, "\n", " ") - log.Println("failed sql req:", query, failedSql.arguments) - isFailed = true - } - } - br.Close() - - conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), - attribute.String("method", "batch"), attribute.Bool("failed", isFailed)) - conn.sqlRequestCounter.Add(context.Background(), 1, - attribute.String("method", "batch"), attribute.Bool("failed", isFailed)) - - // Clean batch info - delete(conn.batches, sessionID) - delete(conn.batchSizes, sessionID) - delete(conn.rawBatches, sessionID) - delete(conn.sessionUpdates, sessionID) + conn.batches.updateBatchSize(sessionID, reqSize) } diff --git a/backend/pkg/db/postgres/messages-common.go b/backend/pkg/db/postgres/messages-common.go index 6bcd52cd4..aab74dc6b 100644 --- a/backend/pkg/db/postgres/messages-common.go +++ b/backend/pkg/db/postgres/messages-common.go @@ -104,14 +104,14 @@ func (conn *Conn) HandleSessionEnd(sessionID uint64) error { } func (conn *Conn) InsertRequest(sessionID uint64, timestamp uint64, index uint32, url string, duration uint64, success bool) error { - if err := conn.requests.Append(sessionID, timestamp, index, url, duration, success); err != nil { + if err := conn.bulks.Get("requests").Append(sessionID, timestamp, index, url, duration, success); err != nil { return fmt.Errorf("insert request in bulk err: %s", err) } return nil } func (conn *Conn) InsertCustomEvent(sessionID uint64, timestamp uint64, index uint32, name string, payload string) error { - if err := conn.customEvents.Append(sessionID, timestamp, index, name, payload); err != nil { + if err := conn.bulks.Get("customEvents").Append(sessionID, timestamp, index, name, payload); err != nil { return fmt.Errorf("insert custom event in bulk err: %s", err) } return nil @@ -153,15 +153,15 @@ func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messag payload = nil } - if err := conn.webIssues.Append(projectID, issueID, e.Type, e.ContextString); err != nil { + if err := conn.bulks.Get("webIssues").Append(projectID, issueID, e.Type, e.ContextString); err != nil { log.Printf("insert web issue err: %s", err) } - if err := conn.webIssueEvents.Append(sessionID, issueID, e.Timestamp, truncSqIdx(e.MessageID), payload); err != nil { + if err := conn.bulks.Get("webIssueEvents").Append(sessionID, issueID, e.Timestamp, truncSqIdx(e.MessageID), payload); err != nil { log.Printf("insert web issue event err: %s", err) } conn.updateSessionIssues(sessionID, 0, getIssueScore(e)) if e.Type == "custom" { - if err := conn.webCustomEvents.Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, e.ContextString, e.Payload, "error"); err != nil { + if err := conn.bulks.Get("webCustomEvents").Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, e.ContextString, e.Payload, "error"); err != nil { log.Printf("insert web custom event err: %s", err) } } diff --git a/backend/pkg/db/postgres/messages-web.go b/backend/pkg/db/postgres/messages-web.go index f96ac7230..08db4491e 100644 --- a/backend/pkg/db/postgres/messages-web.go +++ b/backend/pkg/db/postgres/messages-web.go @@ -44,7 +44,7 @@ func (conn *Conn) InsertWebPageEvent(sessionID uint64, projectID uint32, e *Page return err } // base_path is deprecated - if err = conn.webPageEvents.Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, e.Referrer, url.DiscardURLQuery(e.Referrer), + if err = conn.bulks.Get("webPageEvents").Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, e.Referrer, url.DiscardURLQuery(e.Referrer), host, path, query, e.DomContentLoadedEventEnd, e.LoadEventEnd, e.ResponseEnd, e.FirstPaint, e.FirstContentfulPaint, e.SpeedIndex, e.VisuallyComplete, e.TimeToInteractive, calcResponseTime(e), calcDomBuildingTime(e)); err != nil { log.Printf("insert web page event in bulk err: %s", err) @@ -60,7 +60,7 @@ func (conn *Conn) InsertWebPageEvent(sessionID uint64, projectID uint32, e *Page func (conn *Conn) InsertWebClickEvent(sessionID uint64, projectID uint32, e *ClickEvent) error { var host, path string host, path, _, _ = url.GetURLParts(e.Url) - if err := conn.webClickEvents.Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, e.Label, e.Selector, host+path, path); err != nil { + if err := conn.bulks.Get("webClickEvents").Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, e.Label, e.Selector, host+path, path); err != nil { log.Printf("insert web click err: %s", err) } // Accumulate session updates and exec inside batch with another sql commands @@ -78,7 +78,7 @@ func (conn *Conn) InsertWebInputEvent(sessionID uint64, projectID uint32, e *Inp if e.ValueMasked { value = nil } - if err := conn.webInputEvents.Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, value, e.Label); err != nil { + if err := conn.bulks.Get("webInputEvents").Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, value, e.Label); err != nil { log.Printf("insert web input event err: %s", err) } conn.updateSessionEvents(sessionID, 1, 0) @@ -88,15 +88,15 @@ func (conn *Conn) InsertWebInputEvent(sessionID uint64, projectID uint32, e *Inp func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *types.ErrorEvent) error { errorID := e.ID(projectID) - if err := conn.webErrors.Append(errorID, projectID, e.Source, e.Name, e.Message, e.Payload); err != nil { + if err := conn.bulks.Get("webErrors").Append(errorID, projectID, e.Source, e.Name, e.Message, e.Payload); err != nil { log.Printf("insert web error err: %s", err) } - if err := conn.webErrorEvents.Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, errorID); err != nil { + if err := conn.bulks.Get("webErrorEvents").Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, errorID); err != nil { log.Printf("insert web error event err: %s", err) } conn.updateSessionIssues(sessionID, 1, 1000) for key, value := range e.Tags { - if err := conn.webErrorTags.Append(sessionID, truncSqIdx(e.MessageID), errorID, key, value); err != nil { + if err := conn.bulks.Get("webErrorTags").Append(sessionID, truncSqIdx(e.MessageID), errorID, key, value); err != nil { log.Printf("insert web error token err: %s", err) } } @@ -114,7 +114,7 @@ func (conn *Conn) InsertWebNetworkRequest(sessionID uint64, projectID uint32, sa if err != nil { return err } - conn.webNetworkRequest.Append(sessionID, e.Meta().Timestamp, truncSqIdx(e.Meta().Index), e.URL, host, path, query, + conn.bulks.Get("webNetworkRequest").Append(sessionID, e.Meta().Timestamp, truncSqIdx(e.Meta().Index), e.URL, host, path, query, request, response, e.Status, url.EnsureMethod(e.Method), e.Duration, e.Status < 400) return nil } @@ -125,7 +125,7 @@ func (conn *Conn) InsertWebGraphQL(sessionID uint64, projectID uint32, savePaylo request = &e.Variables response = &e.Response } - if err := conn.webGraphQL.Append(sessionID, e.Meta().Timestamp, truncSqIdx(e.Meta().Index), e.OperationName, request, response); err != nil { + if err := conn.bulks.Get("webGraphQL").Append(sessionID, e.Meta().Timestamp, truncSqIdx(e.Meta().Index), e.OperationName, request, response); err != nil { log.Printf("insert web graphQL event err: %s", err) } conn.insertAutocompleteValue(sessionID, projectID, "GRAPHQL", e.OperationName) diff --git a/backend/pkg/sessions/builderMap.go b/backend/pkg/sessions/builderMap.go index bdf8e8686..85e787929 100644 --- a/backend/pkg/sessions/builderMap.go +++ b/backend/pkg/sessions/builderMap.go @@ -1,6 +1,7 @@ package sessions import ( + "log" "openreplay/backend/pkg/handlers" "time" @@ -37,6 +38,21 @@ func (m *builderMap) HandleMessage(msg Message) { b.handleMessage(msg, messageID) } +func (m *builderMap) ClearOldSessions() { + deleted := 0 + now := time.Now() + for id, sess := range m.sessions { + if sess.lastSystemTime.Add(FORCE_DELETE_TIMEOUT).Before(now) { + // Should delete zombie session + delete(m.sessions, id) + deleted++ + } + } + if deleted > 0 { + log.Printf("deleted %d sessions from message builder", deleted) + } +} + func (m *builderMap) iterateSessionReadyMessages(sessionID uint64, b *builder, iter func(msg Message)) { if b.ended || b.lastSystemTime.Add(FORCE_DELETE_TIMEOUT).Before(time.Now()) { for _, p := range b.processors { diff --git a/ee/backend/internal/db/datasaver/stats.go b/ee/backend/internal/db/datasaver/stats.go index 69ecf3a7d..049c319bd 100644 --- a/ee/backend/internal/db/datasaver/stats.go +++ b/ee/backend/internal/db/datasaver/stats.go @@ -50,3 +50,7 @@ func (si *Saver) InsertStats(session *types.Session, msg messages.Message) error func (si *Saver) CommitStats() error { return si.ch.Commit() } + +func (si *Saver) Close() error { + return si.ch.Stop() +} diff --git a/ee/backend/pkg/db/clickhouse/connector.go b/ee/backend/pkg/db/clickhouse/connector.go index a89b6003a..157d384b9 100644 --- a/ee/backend/pkg/db/clickhouse/connector.go +++ b/ee/backend/pkg/db/clickhouse/connector.go @@ -21,6 +21,7 @@ import ( type Connector interface { Prepare() error Commit() error + Stop() error InsertWebSession(session *types.Session) error InsertWebResourceEvent(session *types.Session, msg *messages.ResourceEvent) error InsertWebPageEvent(session *types.Session, msg *messages.PageEvent) error @@ -35,9 +36,20 @@ type Connector interface { InsertIssue(session *types.Session, msg *messages.IssueEvent) error } +type task struct { + bulks []Bulk +} + +func NewTask() *task { + return &task{bulks: make([]Bulk, 0, 14)} +} + type connectorImpl struct { - conn driver.Conn - batches map[string]Bulk //driver.Batch + conn driver.Conn + batches map[string]Bulk //driver.Batch + workerTask chan *task + done chan struct{} + finished chan struct{} } // Check env variables. If not present, return default value. @@ -76,9 +88,13 @@ func NewConnector(url string) Connector { } c := &connectorImpl{ - conn: conn, - batches: make(map[string]Bulk, 9), + conn: conn, + batches: make(map[string]Bulk, 13), + workerTask: make(chan *task, 1), + done: make(chan struct{}), + finished: make(chan struct{}), } + go c.worker() return c } @@ -117,12 +133,47 @@ func (c *connectorImpl) Prepare() error { } func (c *connectorImpl) Commit() error { + newTask := NewTask() for _, b := range c.batches { + newTask.bulks = append(newTask.bulks, b) + } + c.batches = make(map[string]Bulk, 13) + if err := c.Prepare(); err != nil { + log.Printf("can't prepare new CH batch set: %s", err) + } + c.workerTask <- newTask + return nil +} + +func (c *connectorImpl) Stop() error { + c.done <- struct{}{} + <-c.finished + return c.conn.Close() +} + +func (c *connectorImpl) sendBulks(t *task) { + for _, b := range t.bulks { if err := b.Send(); err != nil { - return fmt.Errorf("can't send batch: %s", err) + log.Printf("can't send batch: %s", err) + } + } +} + +func (c *connectorImpl) worker() { + for { + select { + case t := <-c.workerTask: + start := time.Now() + c.sendBulks(t) + log.Printf("ch bulks dur: %d", time.Now().Sub(start).Milliseconds()) + case <-c.done: + for t := range c.workerTask { + c.sendBulks(t) + } + c.finished <- struct{}{} + return } } - return nil } func (c *connectorImpl) checkError(name string, err error) {