diff --git a/api/app.py b/api/app.py index 959f1ef8f..4fd042d1a 100644 --- a/api/app.py +++ b/api/app.py @@ -13,7 +13,7 @@ from routers.crons import core_crons from routers.crons import core_dynamic_crons from routers.subs import dashboard, insights, metrics, v1_api -app = FastAPI(root_path="/api") +app = FastAPI(root_path="/api", docs_url=config("docs_url", default=""), redoc_url=config("redoc_url", default="")) @app.middleware('http') diff --git a/api/app_alerts.py b/api/app_alerts.py index 57bfcd55d..4e05ab1a8 100644 --- a/api/app_alerts.py +++ b/api/app_alerts.py @@ -6,7 +6,7 @@ from fastapi import FastAPI from chalicelib.core import alerts_processor -app = FastAPI() +app = FastAPI(root_path="/alerts", docs_url=config("docs_url", default=""), redoc_url=config("redoc_url", default="")) print("============= ALERTS =============") diff --git a/api/chalicelib/utils/pg_client.py b/api/chalicelib/utils/pg_client.py index 2abc9f6c7..eda7747f8 100644 --- a/api/chalicelib/utils/pg_client.py +++ b/api/chalicelib/utils/pg_client.py @@ -75,9 +75,11 @@ class PostgresClient: connection = None cursor = None long_query = False + unlimited_query = False def __init__(self, long_query=False, unlimited_query=False): self.long_query = long_query + self.unlimited_query = unlimited_query if unlimited_query: long_config = dict(_PG_CONFIG) long_config["application_name"] += "-UNLIMITED" @@ -85,7 +87,7 @@ class PostgresClient: elif long_query: long_config = dict(_PG_CONFIG) long_config["application_name"] += "-LONG" - long_config["options"] = f"-c statement_timeout={config('pg_long_timeout', cast=int, default=5*60) * 1000}" + long_config["options"] = f"-c statement_timeout={config('pg_long_timeout', cast=int, default=5 * 60) * 1000}" self.connection = psycopg2.connect(**long_config) else: self.connection = postgreSQL_pool.getconn() @@ -99,11 +101,11 @@ class PostgresClient: try: self.connection.commit() self.cursor.close() - if self.long_query: + if self.long_query or self.unlimited_query: self.connection.close() except Exception as error: print("Error while committing/closing PG-connection", error) - if str(error) == "connection already closed": + if str(error) == "connection already closed" and not self.long_query and not self.unlimited_query: print("Recreating the connexion pool") make_pool() else: diff --git a/api/routers/core.py b/api/routers/core.py index df98c1c09..2c3ff5b90 100644 --- a/api/routers/core.py +++ b/api/routers/core.py @@ -1171,4 +1171,5 @@ def get_limits(context: schemas.CurrentContext = Depends(OR_context)): @public_app.put('/', tags=["health"]) @public_app.delete('/', tags=["health"]) def health_check(): - return {"data": f"live {config('version_number', default='')}"} + return {"data": {"stage": f"live {config('version_number', default='')}", + "internalCrons": config("LOCAL_CRONS", default=False, cast=bool)}} diff --git a/backend/Dockerfile b/backend/Dockerfile index 4f060587d..28bedcb40 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -28,7 +28,7 @@ ENV TZ=UTC \ FS_DIR=/mnt/efs \ MAXMINDDB_FILE=/home/openreplay/geoip.mmdb \ UAPARSER_FILE=/home/openreplay/regexes.yaml \ - HTTP_PORT=80 \ + HTTP_PORT=8080 \ KAFKA_USE_SSL=true \ KAFKA_MAX_POLL_INTERVAL_MS=400000 \ REDIS_STREAMS_MAX_LEN=10000 \ diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index c0613fca0..1fd2f4e64 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -82,10 +82,20 @@ func main() { // Find ended sessions and send notification to other services sessions.HandleEndedSessions(func(sessionID uint64, timestamp int64) bool { msg := &messages.SessionEnd{Timestamp: uint64(timestamp)} - if err := pg.InsertSessionEnd(sessionID, msg.Timestamp); err != nil { + currDuration, err := pg.GetSessionDuration(sessionID) + if err != nil { + log.Printf("getSessionDuration failed, sessID: %d, err: %s", sessionID, err) + } + newDuration, err := pg.InsertSessionEnd(sessionID, msg.Timestamp) + if err != nil { log.Printf("can't save sessionEnd to database, sessID: %d, err: %s", sessionID, err) return false } + if currDuration == newDuration { + log.Printf("sessionEnd duplicate, sessID: %d, prevDur: %d, newDur: %d", sessionID, + currDuration, newDuration) + return true + } if err := producer.Produce(cfg.TopicRawWeb, sessionID, messages.Encode(msg)); err != nil { log.Printf("can't send sessionEnd to topic: %s; sessID: %d", err, sessionID) return false diff --git a/backend/internal/storage/storage.go b/backend/internal/storage/storage.go index abe8089be..8d79468db 100644 --- a/backend/internal/storage/storage.go +++ b/backend/internal/storage/storage.go @@ -21,6 +21,7 @@ type Storage struct { startBytes []byte totalSessions syncfloat64.Counter sessionSize syncfloat64.Histogram + readingTime syncfloat64.Histogram archivingTime syncfloat64.Histogram } @@ -40,6 +41,10 @@ func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Stor if err != nil { log.Printf("can't create session_size metric: %s", err) } + readingTime, err := metrics.RegisterHistogram("reading_duration") + if err != nil { + log.Printf("can't create reading_duration metric: %s", err) + } archivingTime, err := metrics.RegisterHistogram("archiving_duration") if err != nil { log.Printf("can't create archiving_duration metric: %s", err) @@ -50,16 +55,17 @@ func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Stor startBytes: make([]byte, cfg.FileSplitSize), totalSessions: totalSessions, sessionSize: sessionSize, + readingTime: readingTime, archivingTime: archivingTime, }, nil } func (s *Storage) UploadKey(key string, retryCount int) error { - start := time.Now() if retryCount <= 0 { return nil } + start := time.Now() file, err := os.Open(s.cfg.FSDir + "/" + key) if err != nil { sessID, _ := strconv.ParseUint(key, 10, 64) @@ -84,6 +90,9 @@ func (s *Storage) UploadKey(key string, retryCount int) error { }) return nil } + s.readingTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds())) + + start = time.Now() startReader := bytes.NewBuffer(s.startBytes[:nRead]) if err := s.s3.Upload(s.gzipFile(startReader), key, "application/octet-stream", true); err != nil { log.Fatalf("Storage: start upload failed. %v\n", err) @@ -93,6 +102,7 @@ func (s *Storage) UploadKey(key string, retryCount int) error { log.Fatalf("Storage: end upload failed. %v\n", err) } } + s.archivingTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds())) // Save metrics var fileSize float64 = 0 @@ -103,7 +113,7 @@ func (s *Storage) UploadKey(key string, retryCount int) error { fileSize = float64(fileInfo.Size()) } ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*200) - s.archivingTime.Record(ctx, float64(time.Now().Sub(start).Milliseconds())) + s.sessionSize.Record(ctx, fileSize) s.totalSessions.Add(ctx, 1) return nil diff --git a/backend/pkg/db/cache/messages-common.go b/backend/pkg/db/cache/messages-common.go index cebdaf5e7..41cdb1895 100644 --- a/backend/pkg/db/cache/messages-common.go +++ b/backend/pkg/db/cache/messages-common.go @@ -7,12 +7,8 @@ import ( // . "openreplay/backend/pkg/db/types" ) -func (c *PGCache) InsertSessionEnd(sessionID uint64, timestamp uint64) error { - _, err := c.Conn.InsertSessionEnd(sessionID, timestamp) - if err != nil { - return err - } - return nil +func (c *PGCache) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, error) { + return c.Conn.InsertSessionEnd(sessionID, timestamp) } func (c *PGCache) HandleSessionEnd(sessionID uint64) error { diff --git a/backend/pkg/db/cache/messages-ios.go b/backend/pkg/db/cache/messages-ios.go index 4195976c3..e0463c431 100644 --- a/backend/pkg/db/cache/messages-ios.go +++ b/backend/pkg/db/cache/messages-ios.go @@ -32,7 +32,8 @@ func (c *PGCache) InsertIOSSessionStart(sessionID uint64, s *IOSSessionStart) er } func (c *PGCache) InsertIOSSessionEnd(sessionID uint64, e *IOSSessionEnd) error { - return c.InsertSessionEnd(sessionID, e.Timestamp) + _, err := c.InsertSessionEnd(sessionID, e.Timestamp) + return err } func (c *PGCache) InsertIOSScreenEnter(sessionID uint64, screenEnter *IOSScreenEnter) error { @@ -84,13 +85,5 @@ func (c *PGCache) InsertIOSCrash(sessionID uint64, crash *IOSCrash) error { } func (c *PGCache) InsertIOSIssueEvent(sessionID uint64, issueEvent *IOSIssueEvent) error { - // session, err := c.GetSession(sessionID) - // if err != nil { - // return err - // } - // TODO: unite IssueEvent message for the all platforms - // if err := c.Conn.InsertIssueEvent(sessionID, session.ProjectID, issueEvent); err != nil { - // return err - // } return nil } diff --git a/backend/pkg/db/cache/messages-web.go b/backend/pkg/db/cache/messages-web.go index 7da7006af..0a864e6d3 100644 --- a/backend/pkg/db/cache/messages-web.go +++ b/backend/pkg/db/cache/messages-web.go @@ -63,7 +63,8 @@ func (c *PGCache) HandleWebSessionStart(sessionID uint64, s *SessionStart) error } func (c *PGCache) InsertWebSessionEnd(sessionID uint64, e *SessionEnd) error { - return c.InsertSessionEnd(sessionID, e.Timestamp) + _, err := c.InsertSessionEnd(sessionID, e.Timestamp) + return err } func (c *PGCache) HandleWebSessionEnd(sessionID uint64, e *SessionEnd) error { @@ -91,7 +92,7 @@ func (c *PGCache) InsertWebFetchEvent(sessionID uint64, e *FetchEvent) error { if err != nil { return err } - return c.Conn.InsertWebFetchEvent(sessionID, project.SaveRequestPayloads, e) + return c.Conn.InsertWebFetchEvent(sessionID, session.ProjectID, project.SaveRequestPayloads, e) } func (c *PGCache) InsertWebGraphQLEvent(sessionID uint64, e *GraphQLEvent) error { @@ -103,5 +104,53 @@ func (c *PGCache) InsertWebGraphQLEvent(sessionID uint64, e *GraphQLEvent) error if err != nil { return err } - return c.Conn.InsertWebGraphQLEvent(sessionID, project.SaveRequestPayloads, e) + return c.Conn.InsertWebGraphQLEvent(sessionID, session.ProjectID, project.SaveRequestPayloads, e) +} + +func (c *PGCache) InsertWebCustomEvent(sessionID uint64, e *CustomEvent) error { + session, err := c.GetSession(sessionID) + if err != nil { + return err + } + return c.Conn.InsertWebCustomEvent(sessionID, session.ProjectID, e) +} + +func (c *PGCache) InsertWebUserID(sessionID uint64, userID *UserID) error { + session, err := c.GetSession(sessionID) + if err != nil { + return err + } + return c.Conn.InsertWebUserID(sessionID, session.ProjectID, userID) +} + +func (c *PGCache) InsertWebUserAnonymousID(sessionID uint64, userAnonymousID *UserAnonymousID) error { + session, err := c.GetSession(sessionID) + if err != nil { + return err + } + return c.Conn.InsertWebUserAnonymousID(sessionID, session.ProjectID, userAnonymousID) +} + +func (c *PGCache) InsertWebPageEvent(sessionID uint64, e *PageEvent) error { + session, err := c.GetSession(sessionID) + if err != nil { + return err + } + return c.Conn.InsertWebPageEvent(sessionID, session.ProjectID, e) +} + +func (c *PGCache) InsertWebClickEvent(sessionID uint64, e *ClickEvent) error { + session, err := c.GetSession(sessionID) + if err != nil { + return err + } + return c.Conn.InsertWebClickEvent(sessionID, session.ProjectID, e) +} + +func (c *PGCache) InsertWebInputEvent(sessionID uint64, e *InputEvent) error { + session, err := c.GetSession(sessionID) + if err != nil { + return err + } + return c.Conn.InsertWebInputEvent(sessionID, session.ProjectID, e) } diff --git a/backend/pkg/db/cache/pg-cache.go b/backend/pkg/db/cache/pg-cache.go index 6422209d4..ca31bcd82 100644 --- a/backend/pkg/db/cache/pg-cache.go +++ b/backend/pkg/db/cache/pg-cache.go @@ -29,10 +29,9 @@ type PGCache struct { // TODO: create conn automatically func NewPGCache(pgConn *postgres.Conn, projectExpirationTimeoutMs int64) *PGCache { return &PGCache{ - Conn: pgConn, - sessions: make(map[uint64]*Session), - projects: make(map[uint32]*ProjectMeta), - //projectsByKeys: make(map[string]*ProjectMeta), + Conn: pgConn, + sessions: make(map[uint64]*Session), + projects: make(map[uint32]*ProjectMeta), projectExpirationTimeout: time.Duration(1000 * projectExpirationTimeoutMs), } } diff --git a/backend/pkg/db/postgres/bulk.go b/backend/pkg/db/postgres/bulk.go new file mode 100644 index 000000000..16f59efcd --- /dev/null +++ b/backend/pkg/db/postgres/bulk.go @@ -0,0 +1,93 @@ +package postgres + +import ( + "bytes" + "errors" + "fmt" +) + +const ( + insertPrefix = `INSERT INTO ` + insertValues = ` VALUES ` + insertSuffix = ` ON CONFLICT DO NOTHING;` +) + +type Bulk interface { + Append(args ...interface{}) error + Send() error +} + +type bulkImpl struct { + conn Pool + table string + columns string + template string + setSize int + sizeLimit int + values []interface{} +} + +func (b *bulkImpl) Append(args ...interface{}) error { + if len(args) != b.setSize { + return fmt.Errorf("wrong number of arguments, waited: %d, got: %d", b.setSize, len(args)) + } + b.values = append(b.values, args...) + if len(b.values)/b.setSize >= b.sizeLimit { + return b.send() + } + return nil +} + +func (b *bulkImpl) Send() error { + if len(b.values) == 0 { + return nil + } + return b.send() +} + +func (b *bulkImpl) send() error { + request := bytes.NewBufferString(insertPrefix + b.table + b.columns + insertValues) + args := make([]interface{}, b.setSize) + for i := 0; i < len(b.values)/b.setSize; i++ { + for j := 0; j < b.setSize; j++ { + args[j] = i*b.setSize + j + 1 + } + if i > 0 { + request.WriteByte(',') + } + request.WriteString(fmt.Sprintf(b.template, args...)) + } + request.WriteString(insertSuffix) + err := b.conn.Exec(request.String(), b.values...) + b.values = make([]interface{}, 0, b.setSize*b.sizeLimit) + if err != nil { + return fmt.Errorf("send bulk err: %s", err) + } + return nil +} + +func NewBulk(conn Pool, table, columns, template string, setSize, sizeLimit int) (Bulk, error) { + switch { + case conn == nil: + return nil, errors.New("db conn is empty") + case table == "": + return nil, errors.New("table is empty") + case columns == "": + return nil, errors.New("columns is empty") + case template == "": + return nil, errors.New("template is empty") + case setSize <= 0: + return nil, errors.New("set size is wrong") + case sizeLimit <= 0: + return nil, errors.New("size limit is wrong") + } + return &bulkImpl{ + conn: conn, + table: table, + columns: columns, + template: template, + setSize: setSize, + sizeLimit: sizeLimit, + values: make([]interface{}, 0, setSize*sizeLimit), + }, nil +} diff --git a/backend/pkg/db/postgres/connector.go b/backend/pkg/db/postgres/connector.go index 4a85029fd..1cc537982 100644 --- a/backend/pkg/db/postgres/connector.go +++ b/backend/pkg/db/postgres/connector.go @@ -13,26 +13,30 @@ import ( "github.com/jackc/pgx/v4/pgxpool" ) -func getTimeoutContext() context.Context { - ctx, _ := context.WithTimeout(context.Background(), time.Duration(time.Second*30)) - return ctx -} - type batchItem struct { query string arguments []interface{} } +// Conn contains batches, bulks and cache for all sessions type Conn struct { - c *pgxpool.Pool // TODO: conditional usage of Pool/Conn (use interface?) - batches map[uint64]*pgx.Batch - batchSizes map[uint64]int - rawBatches map[uint64][]*batchItem - batchQueueLimit int - batchSizeLimit int - batchSizeBytes syncfloat64.Histogram - batchSizeLines syncfloat64.Histogram - sqlRequestTime syncfloat64.Histogram + c Pool + batches map[uint64]*pgx.Batch + batchSizes map[uint64]int + rawBatches map[uint64][]*batchItem + autocompletes Bulk + requests Bulk + customEvents Bulk + webPageEvents Bulk + webInputEvents Bulk + webGraphQLEvents Bulk + sessionUpdates map[uint64]*sessionUpdates + batchQueueLimit int + batchSizeLimit int + batchSizeBytes syncfloat64.Histogram + batchSizeLines syncfloat64.Histogram + sqlRequestTime syncfloat64.Histogram + sqlRequestCounter syncfloat64.Counter } func NewConn(url string, queueLimit, sizeLimit int, metrics *monitoring.Metrics) *Conn { @@ -45,14 +49,19 @@ func NewConn(url string, queueLimit, sizeLimit int, metrics *monitoring.Metrics) log.Fatalln("pgxpool.Connect Error") } conn := &Conn{ - c: c, batches: make(map[uint64]*pgx.Batch), batchSizes: make(map[uint64]int), rawBatches: make(map[uint64][]*batchItem), + sessionUpdates: make(map[uint64]*sessionUpdates), batchQueueLimit: queueLimit, batchSizeLimit: sizeLimit, } conn.initMetrics(metrics) + conn.c, err = NewPool(c, conn.sqlRequestTime, conn.sqlRequestCounter) + if err != nil { + log.Fatalf("can't create new pool wrapper: %s", err) + } + conn.initBulks() return conn } @@ -75,6 +84,74 @@ func (conn *Conn) initMetrics(metrics *monitoring.Metrics) { if err != nil { log.Printf("can't create sqlRequestTime metric: %s", err) } + conn.sqlRequestCounter, err = metrics.RegisterCounter("sql_request_number") + if err != nil { + log.Printf("can't create sqlRequestNumber metric: %s", err) + } +} + +func (conn *Conn) initBulks() { + var err error + conn.autocompletes, err = NewBulk(conn.c, + "autocomplete", + "(value, type, project_id)", + "($%d, $%d, $%d)", + 3, 100) + if err != nil { + log.Fatalf("can't create autocomplete bulk") + } + conn.requests, err = NewBulk(conn.c, + "events_common.requests", + "(session_id, timestamp, seq_index, url, duration, success)", + "($%d, $%d, $%d, left($%d, 2700), $%d, $%d)", + 6, 100) + if err != nil { + log.Fatalf("can't create requests bulk") + } + conn.customEvents, err = NewBulk(conn.c, + "events_common.customs", + "(session_id, timestamp, seq_index, name, payload)", + "($%d, $%d, $%d, left($%d, 2700), $%d)", + 5, 100) + if err != nil { + log.Fatalf("can't create customEvents bulk") + } + conn.webPageEvents, err = NewBulk(conn.c, + "events.pages", + "(session_id, message_id, timestamp, referrer, base_referrer, host, path, query, dom_content_loaded_time, "+ + "load_time, response_end, first_paint_time, first_contentful_paint_time, speed_index, visually_complete, "+ + "time_to_interactive, response_time, dom_building_time)", + "($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0),"+ + " NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0))", + 18, 100) + if err != nil { + log.Fatalf("can't create webPageEvents bulk") + } + conn.webInputEvents, err = NewBulk(conn.c, + "events.inputs", + "(session_id, message_id, timestamp, value, label)", + "($%d, $%d, $%d, $%d, NULLIF($%d,''))", + 5, 100) + if err != nil { + log.Fatalf("can't create webPageEvents bulk") + } + conn.webGraphQLEvents, err = NewBulk(conn.c, + "events.graphql", + "(session_id, timestamp, message_id, name, request_body, response_body)", + "($%d, $%d, $%d, left($%d, 2700), $%d, $%d)", + 6, 100) + if err != nil { + log.Fatalf("can't create webPageEvents bulk") + } +} + +func (conn *Conn) insertAutocompleteValue(sessionID uint64, projectID uint32, tp string, value string) { + if len(value) == 0 { + return + } + if err := conn.autocompletes.Append(value, tp, projectID); err != nil { + log.Printf("autocomplete bulk err: %s", err) + } } func (conn *Conn) batchQueue(sessionID uint64, sql string, args ...interface{}) { @@ -85,6 +162,10 @@ func (conn *Conn) batchQueue(sessionID uint64, sql string, args ...interface{}) batch = conn.batches[sessionID] } batch.Queue(sql, args...) + conn.rawBatch(sessionID, sql, args...) +} + +func (conn *Conn) rawBatch(sessionID uint64, sql string, args ...interface{}) { // Temp raw batch store raw := conn.rawBatches[sessionID] raw = append(raw, &batchItem{ @@ -94,13 +175,54 @@ func (conn *Conn) batchQueue(sessionID uint64, sql string, args ...interface{}) conn.rawBatches[sessionID] = raw } +func (conn *Conn) updateSessionEvents(sessionID uint64, events, pages int) { + if _, ok := conn.sessionUpdates[sessionID]; !ok { + conn.sessionUpdates[sessionID] = NewSessionUpdates(sessionID) + } + conn.sessionUpdates[sessionID].add(pages, events) +} + +func (conn *Conn) sendBulks() { + if err := conn.autocompletes.Send(); err != nil { + log.Printf("autocomplete bulk send err: %s", err) + } + if err := conn.requests.Send(); err != nil { + log.Printf("requests bulk send err: %s", err) + } + if err := conn.customEvents.Send(); err != nil { + log.Printf("customEvents bulk send err: %s", err) + } + if err := conn.webPageEvents.Send(); err != nil { + log.Printf("webPageEvents bulk send err: %s", err) + } + if err := conn.webInputEvents.Send(); err != nil { + log.Printf("webInputEvents bulk send err: %s", err) + } + if err := conn.webGraphQLEvents.Send(); err != nil { + log.Printf("webGraphQLEvents bulk send err: %s", err) + } +} + func (conn *Conn) CommitBatches() { + conn.sendBulks() for sessID, b := range conn.batches { + // Append session update sql request to the end of batch + if update, ok := conn.sessionUpdates[sessID]; ok { + sql, args := update.request() + if sql != "" { + conn.batchQueue(sessID, sql, args...) + b, _ = conn.batches[sessID] + } + } // Record batch size in bytes and number of lines conn.batchSizeBytes.Record(context.Background(), float64(conn.batchSizes[sessID])) conn.batchSizeLines.Record(context.Background(), float64(b.Len())) + + start := time.Now() + isFailed := false + // Send batch to db and execute - br := conn.c.SendBatch(getTimeoutContext(), b) + br := conn.c.SendBatch(b) l := b.Len() for i := 0; i < l; i++ { if ct, err := br.Exec(); err != nil { @@ -108,13 +230,33 @@ func (conn *Conn) CommitBatches() { failedSql := conn.rawBatches[sessID][i] query := strings.ReplaceAll(failedSql.query, "\n", " ") log.Println("failed sql req:", query, failedSql.arguments) + isFailed = true } } br.Close() // returns err + conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), + attribute.String("method", "batch"), attribute.Bool("failed", isFailed)) + conn.sqlRequestCounter.Add(context.Background(), 1, + attribute.String("method", "batch"), attribute.Bool("failed", isFailed)) + if !isFailed { + delete(conn.sessionUpdates, sessID) + } } conn.batches = make(map[uint64]*pgx.Batch) conn.batchSizes = make(map[uint64]int) conn.rawBatches = make(map[uint64][]*batchItem) + + // Session updates + for sessID, su := range conn.sessionUpdates { + sql, args := su.request() + if sql == "" { + continue + } + if err := conn.c.Exec(sql, args...); err != nil { + log.Printf("failed session update, sessID: %d, err: %s", sessID, err) + } + } + conn.sessionUpdates = make(map[uint64]*sessionUpdates) } func (conn *Conn) updateBatchSize(sessionID uint64, reqSize int) { @@ -131,11 +273,23 @@ func (conn *Conn) commitBatch(sessionID uint64) { log.Printf("can't find batch for session: %d", sessionID) return } + // Append session update sql request to the end of batch + if update, ok := conn.sessionUpdates[sessionID]; ok { + sql, args := update.request() + if sql != "" { + conn.batchQueue(sessionID, sql, args...) + b, _ = conn.batches[sessionID] + } + } // Record batch size in bytes and number of lines conn.batchSizeBytes.Record(context.Background(), float64(conn.batchSizes[sessionID])) conn.batchSizeLines.Record(context.Background(), float64(b.Len())) + + start := time.Now() + isFailed := false + // Send batch to db and execute - br := conn.c.SendBatch(getTimeoutContext(), b) + br := conn.c.SendBatch(b) l := b.Len() for i := 0; i < l; i++ { if ct, err := br.Exec(); err != nil { @@ -143,74 +297,19 @@ func (conn *Conn) commitBatch(sessionID uint64) { failedSql := conn.rawBatches[sessionID][i] query := strings.ReplaceAll(failedSql.query, "\n", " ") log.Println("failed sql req:", query, failedSql.arguments) + isFailed = true } } br.Close() + conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), + attribute.String("method", "batch"), attribute.Bool("failed", isFailed)) + conn.sqlRequestCounter.Add(context.Background(), 1, + attribute.String("method", "batch"), attribute.Bool("failed", isFailed)) + // Clean batch info delete(conn.batches, sessionID) delete(conn.batchSizes, sessionID) delete(conn.rawBatches, sessionID) -} - -func (conn *Conn) query(sql string, args ...interface{}) (pgx.Rows, error) { - start := time.Now() - res, err := conn.c.Query(getTimeoutContext(), sql, args...) - conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), attribute.String("method", methodName(sql))) - return res, err -} - -func (conn *Conn) queryRow(sql string, args ...interface{}) pgx.Row { - start := time.Now() - res := conn.c.QueryRow(getTimeoutContext(), sql, args...) - conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), attribute.String("method", methodName(sql))) - return res -} - -func (conn *Conn) exec(sql string, args ...interface{}) error { - start := time.Now() - _, err := conn.c.Exec(getTimeoutContext(), sql, args...) - conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), attribute.String("method", methodName(sql))) - return err -} - -type _Tx struct { - pgx.Tx - sqlRequestTime syncfloat64.Histogram -} - -func (conn *Conn) begin() (_Tx, error) { - start := time.Now() - tx, err := conn.c.Begin(context.Background()) - conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), attribute.String("method", "begin")) - return _Tx{tx, conn.sqlRequestTime}, err -} - -func (tx _Tx) exec(sql string, args ...interface{}) error { - start := time.Now() - _, err := tx.Exec(context.Background(), sql, args...) - tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), attribute.String("method", methodName(sql))) - return err -} - -func (tx _Tx) rollback() error { - start := time.Now() - err := tx.Rollback(context.Background()) - tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), attribute.String("method", "rollback")) - return err -} - -func (tx _Tx) commit() error { - start := time.Now() - err := tx.Commit(context.Background()) - tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), attribute.String("method", "commit")) - return err -} - -func methodName(sql string) string { - method := "unknown" - if parts := strings.Split(sql, ""); len(parts) > 0 { - method = parts[0] - } - return strings.ToLower(method) + delete(conn.sessionUpdates, sessionID) } diff --git a/backend/pkg/db/postgres/integration.go b/backend/pkg/db/postgres/integration.go index e44bd726e..1556006c1 100644 --- a/backend/pkg/db/postgres/integration.go +++ b/backend/pkg/db/postgres/integration.go @@ -15,7 +15,7 @@ type Integration struct { } func (conn *Conn) IterateIntegrationsOrdered(iter func(integration *Integration, err error)) error { - rows, err := conn.query(` + rows, err := conn.c.Query(` SELECT project_id, provider, options, request_data FROM integrations `) @@ -40,7 +40,7 @@ func (conn *Conn) IterateIntegrationsOrdered(iter func(integration *Integration, } func (conn *Conn) UpdateIntegrationRequestData(i *Integration) error { - return conn.exec(` + return conn.c.Exec(` UPDATE integrations SET request_data = $1 WHERE project_id=$2 AND provider=$3`, diff --git a/backend/pkg/db/postgres/messages-common.go b/backend/pkg/db/postgres/messages-common.go index a68d2c814..2925acde3 100644 --- a/backend/pkg/db/postgres/messages-common.go +++ b/backend/pkg/db/postgres/messages-common.go @@ -18,31 +18,8 @@ func getAutocompleteType(baseType string, platform string) string { } -func (conn *Conn) insertAutocompleteValue(sessionID uint64, tp string, value string) { - if len(value) == 0 { - return - } - sqlRequest := ` - INSERT INTO autocomplete ( - value, - type, - project_id - ) (SELECT - $1, $2, project_id - FROM sessions - WHERE session_id = $3 - ) ON CONFLICT DO NOTHING` - if err := conn.exec(sqlRequest, value, tp, sessionID); err != nil { - log.Printf("can't insert autocomplete: %s", err) - } - //conn.batchQueue(sessionID, sqlRequest, value, tp, sessionID) - - // Record approximate message size - //conn.updateBatchSize(sessionID, len(sqlRequest)+len(value)+len(tp)+8) -} - func (conn *Conn) InsertSessionStart(sessionID uint64, s *types.Session) error { - return conn.exec(` + return conn.c.Exec(` INSERT INTO sessions ( session_id, project_id, start_ts, user_uuid, user_device, user_device_type, user_country, @@ -74,18 +51,26 @@ func (conn *Conn) InsertSessionStart(sessionID uint64, s *types.Session) error { } func (conn *Conn) HandleSessionStart(sessionID uint64, s *types.Session) error { - conn.insertAutocompleteValue(sessionID, getAutocompleteType("USEROS", s.Platform), s.UserOS) - conn.insertAutocompleteValue(sessionID, getAutocompleteType("USERDEVICE", s.Platform), s.UserDevice) - conn.insertAutocompleteValue(sessionID, getAutocompleteType("USERCOUNTRY", s.Platform), s.UserCountry) - conn.insertAutocompleteValue(sessionID, getAutocompleteType("REVID", s.Platform), s.RevID) + conn.insertAutocompleteValue(sessionID, s.ProjectID, getAutocompleteType("USEROS", s.Platform), s.UserOS) + conn.insertAutocompleteValue(sessionID, s.ProjectID, getAutocompleteType("USERDEVICE", s.Platform), s.UserDevice) + conn.insertAutocompleteValue(sessionID, s.ProjectID, getAutocompleteType("USERCOUNTRY", s.Platform), s.UserCountry) + conn.insertAutocompleteValue(sessionID, s.ProjectID, getAutocompleteType("REVID", s.Platform), s.RevID) // s.Platform == "web" - conn.insertAutocompleteValue(sessionID, "USERBROWSER", s.UserBrowser) + conn.insertAutocompleteValue(sessionID, s.ProjectID, "USERBROWSER", s.UserBrowser) return nil } +func (conn *Conn) GetSessionDuration(sessionID uint64) (uint64, error) { + var dur uint64 + if err := conn.c.QueryRow("SELECT COALESCE( duration, 0 ) FROM sessions WHERE session_id=$1", sessionID).Scan(&dur); err != nil { + return 0, err + } + return dur, nil +} + func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, error) { var dur uint64 - if err := conn.queryRow(` + if err := conn.c.QueryRow(` UPDATE sessions SET duration=$2 - start_ts WHERE session_id=$1 RETURNING duration @@ -119,30 +104,16 @@ func (conn *Conn) HandleSessionEnd(sessionID uint64) error { } func (conn *Conn) InsertRequest(sessionID uint64, timestamp uint64, index uint64, url string, duration uint64, success bool) error { - sqlRequest := ` - INSERT INTO events_common.requests ( - session_id, timestamp, seq_index, url, duration, success - ) VALUES ( - $1, $2, $3, left($4, 2700), $5, $6 - )` - conn.batchQueue(sessionID, sqlRequest, sessionID, timestamp, getSqIdx(index), url, duration, success) - - // Record approximate message size - conn.updateBatchSize(sessionID, len(sqlRequest)+len(url)+8*4) + if err := conn.requests.Append(sessionID, timestamp, getSqIdx(index), url, duration, success); err != nil { + return fmt.Errorf("insert request in bulk err: %s", err) + } return nil } func (conn *Conn) InsertCustomEvent(sessionID uint64, timestamp uint64, index uint64, name string, payload string) error { - sqlRequest := ` - INSERT INTO events_common.customs ( - session_id, timestamp, seq_index, name, payload - ) VALUES ( - $1, $2, $3, left($4, 2700), $5 - )` - conn.batchQueue(sessionID, sqlRequest, sessionID, timestamp, getSqIdx(index), name, payload) - - // Record approximate message size - conn.updateBatchSize(sessionID, len(sqlRequest)+len(name)+len(payload)+8*3) + if err := conn.customEvents.Append(sessionID, timestamp, getSqIdx(index), name, payload); err != nil { + return fmt.Errorf("insert custom event in bulk err: %s", err) + } return nil } @@ -172,15 +143,21 @@ func (conn *Conn) InsertMetadata(sessionID uint64, keyNo uint, value string) err sqlRequest := ` UPDATE sessions SET metadata_%v = $1 WHERE session_id = $2` - return conn.exec(fmt.Sprintf(sqlRequest, keyNo), value, sessionID) + return conn.c.Exec(fmt.Sprintf(sqlRequest, keyNo), value, sessionID) } -func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messages.IssueEvent) error { - tx, err := conn.begin() +func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messages.IssueEvent) (err error) { + tx, err := conn.c.Begin() if err != nil { return err } - defer tx.rollback() + defer func() { + if err != nil { + if rollbackErr := tx.rollback(); rollbackErr != nil { + log.Printf("rollback err: %s", rollbackErr) + } + } + }() issueID := hashid.IssueID(projectID, e) // TEMP. TODO: nullable & json message field type @@ -237,5 +214,6 @@ func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messag return err } } - return tx.commit() + err = tx.commit() + return } diff --git a/backend/pkg/db/postgres/messages-ios.go b/backend/pkg/db/postgres/messages-ios.go index e75ff2acd..d7b2f58f3 100644 --- a/backend/pkg/db/postgres/messages-ios.go +++ b/backend/pkg/db/postgres/messages-ios.go @@ -9,7 +9,7 @@ import ( func (conn *Conn) InsertIOSCustomEvent(sessionID uint64, e *messages.IOSCustomEvent) error { err := conn.InsertCustomEvent(sessionID, e.Timestamp, e.Index, e.Name, e.Payload) if err == nil { - conn.insertAutocompleteValue(sessionID, "CUSTOM_IOS", e.Name) + conn.insertAutocompleteValue(sessionID, 0, "CUSTOM_IOS", e.Name) } return err } @@ -17,7 +17,7 @@ func (conn *Conn) InsertIOSCustomEvent(sessionID uint64, e *messages.IOSCustomEv func (conn *Conn) InsertIOSUserID(sessionID uint64, userID *messages.IOSUserID) error { err := conn.InsertUserID(sessionID, userID.Value) if err == nil { - conn.insertAutocompleteValue(sessionID, "USERID_IOS", userID.Value) + conn.insertAutocompleteValue(sessionID, 0, "USERID_IOS", userID.Value) } return err } @@ -25,7 +25,7 @@ func (conn *Conn) InsertIOSUserID(sessionID uint64, userID *messages.IOSUserID) func (conn *Conn) InsertIOSUserAnonymousID(sessionID uint64, userAnonymousID *messages.IOSUserAnonymousID) error { err := conn.InsertUserAnonymousID(sessionID, userAnonymousID.Value) if err == nil { - conn.insertAutocompleteValue(sessionID, "USERANONYMOUSID_IOS", userAnonymousID.Value) + conn.insertAutocompleteValue(sessionID, 0, "USERANONYMOUSID_IOS", userAnonymousID.Value) } return err } @@ -33,13 +33,13 @@ func (conn *Conn) InsertIOSUserAnonymousID(sessionID uint64, userAnonymousID *me func (conn *Conn) InsertIOSNetworkCall(sessionID uint64, e *messages.IOSNetworkCall) error { err := conn.InsertRequest(sessionID, e.Timestamp, e.Index, e.URL, e.Duration, e.Success) if err == nil { - conn.insertAutocompleteValue(sessionID, "REQUEST_IOS", url.DiscardURLQuery(e.URL)) + conn.insertAutocompleteValue(sessionID, 0, "REQUEST_IOS", url.DiscardURLQuery(e.URL)) } return err } func (conn *Conn) InsertIOSScreenEnter(sessionID uint64, screenEnter *messages.IOSScreenEnter) error { - tx, err := conn.begin() + tx, err := conn.c.Begin() if err != nil { return err } @@ -65,12 +65,12 @@ func (conn *Conn) InsertIOSScreenEnter(sessionID uint64, screenEnter *messages.I if err = tx.commit(); err != nil { return err } - conn.insertAutocompleteValue(sessionID, "VIEW_IOS", screenEnter.ViewName) + conn.insertAutocompleteValue(sessionID, 0, "VIEW_IOS", screenEnter.ViewName) return nil } func (conn *Conn) InsertIOSClickEvent(sessionID uint64, clickEvent *messages.IOSClickEvent) error { - tx, err := conn.begin() + tx, err := conn.c.Begin() if err != nil { return err } @@ -96,12 +96,12 @@ func (conn *Conn) InsertIOSClickEvent(sessionID uint64, clickEvent *messages.IOS if err = tx.commit(); err != nil { return err } - conn.insertAutocompleteValue(sessionID, "CLICK_IOS", clickEvent.Label) + conn.insertAutocompleteValue(sessionID, 0, "CLICK_IOS", clickEvent.Label) return nil } func (conn *Conn) InsertIOSInputEvent(sessionID uint64, inputEvent *messages.IOSInputEvent) error { - tx, err := conn.begin() + tx, err := conn.c.Begin() if err != nil { return err } @@ -132,13 +132,13 @@ func (conn *Conn) InsertIOSInputEvent(sessionID uint64, inputEvent *messages.IOS if err = tx.commit(); err != nil { return err } - conn.insertAutocompleteValue(sessionID, "INPUT_IOS", inputEvent.Label) + conn.insertAutocompleteValue(sessionID, 0, "INPUT_IOS", inputEvent.Label) // conn.insertAutocompleteValue(sessionID, "INPUT_VALUE", inputEvent.Label) return nil } func (conn *Conn) InsertIOSCrash(sessionID uint64, projectID uint32, crash *messages.IOSCrash) error { - tx, err := conn.begin() + tx, err := conn.c.Begin() if err != nil { return err } diff --git a/backend/pkg/db/postgres/messages-web.go b/backend/pkg/db/postgres/messages-web.go index e703ee933..c55344509 100644 --- a/backend/pkg/db/postgres/messages-web.go +++ b/backend/pkg/db/postgres/messages-web.go @@ -1,6 +1,7 @@ package postgres import ( + "log" "math" "openreplay/backend/pkg/hashid" @@ -13,104 +14,54 @@ func getSqIdx(messageID uint64) uint { return uint(messageID % math.MaxInt32) } -func (conn *Conn) InsertWebCustomEvent(sessionID uint64, e *CustomEvent) error { +func (conn *Conn) InsertWebCustomEvent(sessionID uint64, projectID uint32, e *CustomEvent) error { err := conn.InsertCustomEvent(sessionID, e.Timestamp, e.MessageID, e.Name, e.Payload) if err == nil { - conn.insertAutocompleteValue(sessionID, "CUSTOM", e.Name) + conn.insertAutocompleteValue(sessionID, projectID, "CUSTOM", e.Name) } return err } -func (conn *Conn) InsertWebUserID(sessionID uint64, userID *UserID) error { +func (conn *Conn) InsertWebUserID(sessionID uint64, projectID uint32, userID *UserID) error { err := conn.InsertUserID(sessionID, userID.ID) if err == nil { - conn.insertAutocompleteValue(sessionID, "USERID", userID.ID) + conn.insertAutocompleteValue(sessionID, projectID, "USERID", userID.ID) } return err } -func (conn *Conn) InsertWebUserAnonymousID(sessionID uint64, userAnonymousID *UserAnonymousID) error { +func (conn *Conn) InsertWebUserAnonymousID(sessionID uint64, projectID uint32, userAnonymousID *UserAnonymousID) error { err := conn.InsertUserAnonymousID(sessionID, userAnonymousID.ID) if err == nil { - conn.insertAutocompleteValue(sessionID, "USERANONYMOUSID", userAnonymousID.ID) + conn.insertAutocompleteValue(sessionID, projectID, "USERANONYMOUSID", userAnonymousID.ID) } return err } -// func (conn *Conn) InsertWebResourceEvent(sessionID uint64, e *ResourceEvent) error { -// if e.Type != "fetch" { -// return nil -// } -// err := conn.InsertRequest(sessionID, e.Timestamp, -// e.MessageID, -// e.URL, e.Duration, e.Success, -// ) -// if err == nil { -// conn.insertAutocompleteValue(sessionID, "REQUEST", url.DiscardURLQuery(e.URL)) -// } -// return err -// } - // TODO: fix column "dom_content_loaded_event_end" of relation "pages" -func (conn *Conn) InsertWebPageEvent(sessionID uint64, e *PageEvent) error { +func (conn *Conn) InsertWebPageEvent(sessionID uint64, projectID uint32, e *PageEvent) error { host, path, query, err := url.GetURLParts(e.URL) if err != nil { return err } - tx, err := conn.begin() - if err != nil { - return err + // base_path is deprecated + if err = conn.webPageEvents.Append(sessionID, e.MessageID, e.Timestamp, e.Referrer, url.DiscardURLQuery(e.Referrer), + host, path, query, e.DomContentLoadedEventEnd, e.LoadEventEnd, e.ResponseEnd, e.FirstPaint, e.FirstContentfulPaint, + e.SpeedIndex, e.VisuallyComplete, e.TimeToInteractive, calcResponseTime(e), calcDomBuildingTime(e)); err != nil { + log.Printf("insert web page event in bulk err: %s", err) } - defer tx.rollback() - // base_path is depricated - if err := tx.exec(` - INSERT INTO events.pages ( - session_id, message_id, timestamp, referrer, base_referrer, host, path, query, - dom_content_loaded_time, load_time, response_end, first_paint_time, first_contentful_paint_time, - speed_index, visually_complete, time_to_interactive, - response_time, dom_building_time - ) VALUES ( - $1, $2, $3, - $4, $5, - $6, $7, $8, - NULLIF($9, 0), NULLIF($10, 0), NULLIF($11, 0), NULLIF($12, 0), NULLIF($13, 0), - NULLIF($14, 0), NULLIF($15, 0), NULLIF($16, 0), - NULLIF($17, 0), NULLIF($18, 0) - ) - `, - sessionID, e.MessageID, e.Timestamp, - e.Referrer, url.DiscardURLQuery(e.Referrer), - host, path, query, - e.DomContentLoadedEventEnd, e.LoadEventEnd, e.ResponseEnd, e.FirstPaint, e.FirstContentfulPaint, - e.SpeedIndex, e.VisuallyComplete, e.TimeToInteractive, - calcResponseTime(e), calcDomBuildingTime(e), - ); err != nil { - return err - } - if err = tx.exec(` - UPDATE sessions SET (pages_count, events_count) = (pages_count + 1, events_count + 1) - WHERE session_id = $1`, - sessionID, - ); err != nil { - return err - } - if err = tx.commit(); err != nil { - return err - } - conn.insertAutocompleteValue(sessionID, "LOCATION", url.DiscardURLQuery(path)) - conn.insertAutocompleteValue(sessionID, "REFERRER", url.DiscardURLQuery(e.Referrer)) + // Accumulate session updates and exec inside batch with another sql commands + conn.updateSessionEvents(sessionID, 1, 1) + // Add new value set to autocomplete bulk + conn.insertAutocompleteValue(sessionID, projectID, "LOCATION", url.DiscardURLQuery(path)) + conn.insertAutocompleteValue(sessionID, projectID, "REFERRER", url.DiscardURLQuery(e.Referrer)) return nil } -func (conn *Conn) InsertWebClickEvent(sessionID uint64, e *ClickEvent) error { - tx, err := conn.begin() - if err != nil { - return err - } - defer tx.rollback() - if err = tx.exec(` +func (conn *Conn) InsertWebClickEvent(sessionID uint64, projectID uint32, e *ClickEvent) error { + sqlRequest := ` INSERT INTO events.clicks (session_id, message_id, timestamp, label, selector, url) (SELECT @@ -118,65 +69,40 @@ func (conn *Conn) InsertWebClickEvent(sessionID uint64, e *ClickEvent) error { FROM events.pages WHERE session_id = $1 AND timestamp <= $3 ORDER BY timestamp DESC LIMIT 1 ) - `, - sessionID, e.MessageID, e.Timestamp, e.Label, e.Selector, - ); err != nil { - return err - } - if err = tx.exec(` - UPDATE sessions SET events_count = events_count + 1 - WHERE session_id = $1`, - sessionID, - ); err != nil { - return err - } - if err = tx.commit(); err != nil { - return err - } - conn.insertAutocompleteValue(sessionID, "CLICK", e.Label) + ` + conn.batchQueue(sessionID, sqlRequest, sessionID, e.MessageID, e.Timestamp, e.Label, e.Selector) + // Accumulate session updates and exec inside batch with another sql commands + conn.updateSessionEvents(sessionID, 1, 0) + // Add new value set to autocomplete bulk + conn.insertAutocompleteValue(sessionID, projectID, "CLICK", e.Label) return nil } -func (conn *Conn) InsertWebInputEvent(sessionID uint64, e *InputEvent) error { - tx, err := conn.begin() - if err != nil { - return err - } - defer tx.rollback() +func (conn *Conn) InsertWebInputEvent(sessionID uint64, projectID uint32, e *InputEvent) error { value := &e.Value if e.ValueMasked { value = nil } - if err = tx.exec(` - INSERT INTO events.inputs - (session_id, message_id, timestamp, value, label) - VALUES - ($1, $2, $3, $4, NULLIF($5,'')) - `, - sessionID, e.MessageID, e.Timestamp, value, e.Label, - ); err != nil { - return err + if err := conn.webInputEvents.Append(sessionID, e.MessageID, e.Timestamp, value, e.Label); err != nil { + log.Printf("insert web input event err: %s", err) } - if err = tx.exec(` - UPDATE sessions SET events_count = events_count + 1 - WHERE session_id = $1`, - sessionID, - ); err != nil { - return err - } - if err = tx.commit(); err != nil { - return err - } - conn.insertAutocompleteValue(sessionID, "INPUT", e.Label) + conn.updateSessionEvents(sessionID, 1, 0) + conn.insertAutocompleteValue(sessionID, projectID, "INPUT", e.Label) return nil } -func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *ErrorEvent) error { - tx, err := conn.begin() +func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *ErrorEvent) (err error) { + tx, err := conn.c.Begin() if err != nil { return err } - defer tx.rollback() + defer func() { + if err != nil { + if rollbackErr := tx.rollback(); rollbackErr != nil { + log.Printf("rollback err: %s", rollbackErr) + } + } + }() errorID := hashid.WebErrorID(projectID, e) if err = tx.exec(` @@ -206,17 +132,18 @@ func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *Err ); err != nil { return err } - return tx.commit() + err = tx.commit() + return } -func (conn *Conn) InsertWebFetchEvent(sessionID uint64, savePayload bool, e *FetchEvent) error { +func (conn *Conn) InsertWebFetchEvent(sessionID uint64, projectID uint32, savePayload bool, e *FetchEvent) error { var request, response *string if savePayload { request = &e.Request response = &e.Response } host, path, query, err := url.GetURLParts(e.URL) - conn.insertAutocompleteValue(sessionID, "REQUEST", path) + conn.insertAutocompleteValue(sessionID, projectID, "REQUEST", path) if err != nil { return err } @@ -246,29 +173,15 @@ func (conn *Conn) InsertWebFetchEvent(sessionID uint64, savePayload bool, e *Fet return nil } -func (conn *Conn) InsertWebGraphQLEvent(sessionID uint64, savePayload bool, e *GraphQLEvent) error { +func (conn *Conn) InsertWebGraphQLEvent(sessionID uint64, projectID uint32, savePayload bool, e *GraphQLEvent) error { var request, response *string if savePayload { request = &e.Variables response = &e.Response } - conn.insertAutocompleteValue(sessionID, "GRAPHQL", e.OperationName) - - sqlRequest := ` - INSERT INTO events.graphql ( - session_id, timestamp, message_id, - name, - request_body, response_body - ) VALUES ( - $1, $2, $3, - left($4, 2700), - $5, $6 - ) ON CONFLICT DO NOTHING` - conn.batchQueue(sessionID, sqlRequest, sessionID, e.Timestamp, e.MessageID, - e.OperationName, request, response, - ) - - // Record approximate message size - conn.updateBatchSize(sessionID, len(sqlRequest)+len(e.OperationName)+len(e.Variables)+len(e.Response)+8*3) + if err := conn.webGraphQLEvents.Append(sessionID, e.Timestamp, e.MessageID, e.OperationName, request, response); err != nil { + log.Printf("insert web graphQL event err: %s", err) + } + conn.insertAutocompleteValue(sessionID, projectID, "GRAPHQL", e.OperationName) return nil } diff --git a/backend/pkg/db/postgres/pool.go b/backend/pkg/db/postgres/pool.go new file mode 100644 index 000000000..5f9cbaa29 --- /dev/null +++ b/backend/pkg/db/postgres/pool.go @@ -0,0 +1,175 @@ +package postgres + +import ( + "context" + "errors" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/instrument/syncfloat64" + "strings" + "time" +) + +// Pool is a pgx.Pool wrapper with metrics integration +type Pool interface { + Query(sql string, args ...interface{}) (pgx.Rows, error) + QueryRow(sql string, args ...interface{}) pgx.Row + Exec(sql string, arguments ...interface{}) error + SendBatch(b *pgx.Batch) pgx.BatchResults + Begin() (*_Tx, error) + Close() +} + +type poolImpl struct { + conn *pgxpool.Pool + sqlRequestTime syncfloat64.Histogram + sqlRequestCounter syncfloat64.Counter +} + +func (p *poolImpl) Query(sql string, args ...interface{}) (pgx.Rows, error) { + start := time.Now() + res, err := p.conn.Query(getTimeoutContext(), sql, args...) + method, table := methodName(sql) + p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), + attribute.String("method", method), attribute.String("table", table)) + p.sqlRequestCounter.Add(context.Background(), 1, + attribute.String("method", method), attribute.String("table", table)) + return res, err +} + +func (p *poolImpl) QueryRow(sql string, args ...interface{}) pgx.Row { + start := time.Now() + res := p.conn.QueryRow(getTimeoutContext(), sql, args...) + method, table := methodName(sql) + p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), + attribute.String("method", method), attribute.String("table", table)) + p.sqlRequestCounter.Add(context.Background(), 1, + attribute.String("method", method), attribute.String("table", table)) + return res +} + +func (p *poolImpl) Exec(sql string, arguments ...interface{}) error { + start := time.Now() + _, err := p.conn.Exec(getTimeoutContext(), sql, arguments...) + method, table := methodName(sql) + p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), + attribute.String("method", method), attribute.String("table", table)) + p.sqlRequestCounter.Add(context.Background(), 1, + attribute.String("method", method), attribute.String("table", table)) + return err +} + +func (p *poolImpl) SendBatch(b *pgx.Batch) pgx.BatchResults { + start := time.Now() + res := p.conn.SendBatch(getTimeoutContext(), b) + p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), + attribute.String("method", "sendBatch")) + p.sqlRequestCounter.Add(context.Background(), 1, + attribute.String("method", "sendBatch")) + return res +} + +func (p *poolImpl) Begin() (*_Tx, error) { + start := time.Now() + tx, err := p.conn.Begin(context.Background()) + p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), + attribute.String("method", "begin")) + p.sqlRequestCounter.Add(context.Background(), 1, + attribute.String("method", "begin")) + return &_Tx{tx, p.sqlRequestTime, p.sqlRequestCounter}, err +} + +func (p *poolImpl) Close() { + p.conn.Close() +} + +func NewPool(conn *pgxpool.Pool, sqlRequestTime syncfloat64.Histogram, sqlRequestCounter syncfloat64.Counter) (Pool, error) { + if conn == nil { + return nil, errors.New("conn is empty") + } + return &poolImpl{ + conn: conn, + sqlRequestTime: sqlRequestTime, + sqlRequestCounter: sqlRequestCounter, + }, nil +} + +// TX - start + +type _Tx struct { + pgx.Tx + sqlRequestTime syncfloat64.Histogram + sqlRequestCounter syncfloat64.Counter +} + +func (tx *_Tx) exec(sql string, args ...interface{}) error { + start := time.Now() + _, err := tx.Exec(context.Background(), sql, args...) + method, table := methodName(sql) + tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), + attribute.String("method", method), attribute.String("table", table)) + tx.sqlRequestCounter.Add(context.Background(), 1, + attribute.String("method", method), attribute.String("table", table)) + return err +} + +func (tx *_Tx) rollback() error { + start := time.Now() + err := tx.Rollback(context.Background()) + tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), + attribute.String("method", "rollback")) + tx.sqlRequestCounter.Add(context.Background(), 1, + attribute.String("method", "rollback")) + return err +} + +func (tx *_Tx) commit() error { + start := time.Now() + err := tx.Commit(context.Background()) + tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), + attribute.String("method", "commit")) + tx.sqlRequestCounter.Add(context.Background(), 1, + attribute.String("method", "commit")) + return err +} + +// TX - end + +func getTimeoutContext() context.Context { + ctx, _ := context.WithTimeout(context.Background(), time.Second*30) + return ctx +} + +func methodName(sql string) (string, string) { + cmd, table := "unknown", "unknown" + + // Prepare sql request for parsing + sql = strings.TrimSpace(sql) + sql = strings.ReplaceAll(sql, "\n", " ") + sql = strings.ReplaceAll(sql, "\t", "") + sql = strings.ToLower(sql) + + // Get sql command name + parts := strings.Split(sql, " ") + if parts[0] == "" { + return cmd, table + } else { + cmd = strings.TrimSpace(parts[0]) + } + + // Get table name + switch cmd { + case "select": + for i, p := range parts { + if strings.TrimSpace(p) == "from" { + table = strings.TrimSpace(parts[i+1]) + } + } + case "update": + table = strings.TrimSpace(parts[1]) + case "insert": + table = strings.TrimSpace(parts[2]) + } + return cmd, table +} diff --git a/backend/pkg/db/postgres/project.go b/backend/pkg/db/postgres/project.go index 066339791..f38161885 100644 --- a/backend/pkg/db/postgres/project.go +++ b/backend/pkg/db/postgres/project.go @@ -6,7 +6,7 @@ import ( func (conn *Conn) GetProjectByKey(projectKey string) (*Project, error) { p := &Project{ProjectKey: projectKey} - if err := conn.queryRow(` + if err := conn.c.QueryRow(` SELECT max_session_duration, sample_rate, project_id FROM projects WHERE project_key=$1 AND active = true @@ -21,7 +21,7 @@ func (conn *Conn) GetProjectByKey(projectKey string) (*Project, error) { // TODO: logical separation of metadata func (conn *Conn) GetProject(projectID uint32) (*Project, error) { p := &Project{ProjectID: projectID} - if err := conn.queryRow(` + if err := conn.c.QueryRow(` SELECT project_key, max_session_duration, save_request_payloads, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10 diff --git a/backend/pkg/db/postgres/session-updates.go b/backend/pkg/db/postgres/session-updates.go new file mode 100644 index 000000000..14260c2c6 --- /dev/null +++ b/backend/pkg/db/postgres/session-updates.go @@ -0,0 +1,30 @@ +package postgres + +// Mechanism of combination several session updates into one +const sessionUpdateReq = `UPDATE sessions SET (pages_count, events_count) = (pages_count + $1, events_count + $2) WHERE session_id = $3` + +type sessionUpdates struct { + sessionID uint64 + pages int + events int +} + +func NewSessionUpdates(sessionID uint64) *sessionUpdates { + return &sessionUpdates{ + sessionID: sessionID, + pages: 0, + events: 0, + } +} + +func (su *sessionUpdates) add(pages, events int) { + su.pages += pages + su.events += events +} + +func (su *sessionUpdates) request() (string, []interface{}) { + if su.pages == 0 && su.events == 0 { + return "", nil + } + return sessionUpdateReq, []interface{}{su.pages, su.events, su.sessionID} +} diff --git a/backend/pkg/db/postgres/session.go b/backend/pkg/db/postgres/session.go index 7148d9871..9735cdc1a 100644 --- a/backend/pkg/db/postgres/session.go +++ b/backend/pkg/db/postgres/session.go @@ -1,14 +1,11 @@ package postgres -//import . "openreplay/backend/pkg/messages" import . "openreplay/backend/pkg/db/types" -//import "log" - func (conn *Conn) GetSession(sessionID uint64) (*Session, error) { s := &Session{SessionID: sessionID} var revID, userOSVersion *string - if err := conn.queryRow(` + if err := conn.c.QueryRow(` SELECT platform, duration, project_id, start_ts, user_uuid, user_os, user_os_version, @@ -39,69 +36,3 @@ func (conn *Conn) GetSession(sessionID uint64) (*Session, error) { } return s, nil } - -// func (conn *Conn) GetSessionClickEvents(sessionID uint64) (list []IOSClickEvent, err error) { -// rows, err := conn.query(` -// SELECT -// timestamp, seq_index, label -// FROM events_ios.clicks -// WHERE session_id=$1 -// `, sessionID) -// if err != nil { -// return err -// } -// defer rows.Close() -// for rows.Next() { -// e := new(IOSClickEvent) -// if err = rows.Scan(&e.Timestamp, &e.Index, &e.Label); err != nil { -// log.Printf("Error while scanning click events: %v", err) -// } else { -// list = append(list, e) -// } -// } -// return list -// } - -// func (conn *Conn) GetSessionInputEvents(sessionID uint64) (list []IOSInputEvent, err error) { -// rows, err := conn.query(` -// SELECT -// timestamp, seq_index, label, value -// FROM events_ios.inputs -// WHERE session_id=$1 -// `, sessionID) -// if err != nil { -// return err -// } -// defer rows.Close() -// for rows.Next() { -// e := new(IOSInputEvent) -// if err = rows.Scan(&e.Timestamp, &e.Index, &e.Label, &e.Value); err != nil { -// log.Printf("Error while scanning click events: %v", err) -// } else { -// list = append(list, e) -// } -// } -// return list -// } - -// func (conn *Conn) GetSessionCrashEvents(sessionID uint64) (list []IOSCrash, err error) { -// rows, err := conn.query(` -// SELECT -// timestamp, seq_index -// FROM events_ios.crashes -// WHERE session_id=$1 -// `, sessionID) -// if err != nil { -// return err -// } -// defer rows.Close() -// for rows.Next() { -// e := new(IOSCrash) -// if err = rows.Scan(&e.Timestamp, &e.Index, &e.Label, &e.Value); err != nil { -// log.Printf("Error while scanning click events: %v", err) -// } else { -// list = append(list, e) -// } -// } -// return list -// } diff --git a/backend/pkg/db/postgres/unstarted-session.go b/backend/pkg/db/postgres/unstarted-session.go index 2a9a71037..cc27e3f5d 100644 --- a/backend/pkg/db/postgres/unstarted-session.go +++ b/backend/pkg/db/postgres/unstarted-session.go @@ -16,7 +16,7 @@ type UnstartedSession struct { } func (conn *Conn) InsertUnstartedSession(s UnstartedSession) error { - return conn.exec(` + return conn.c.Exec(` INSERT INTO unstarted_sessions ( project_id, tracker_version, do_not_track, diff --git a/ee/api/app.py b/ee/api/app.py index 505f1393c..1e12e6015 100644 --- a/ee/api/app.py +++ b/ee/api/app.py @@ -16,7 +16,7 @@ from routers.crons import core_crons from routers.crons import core_dynamic_crons from routers.subs import dashboard, insights, metrics, v1_api_ee -app = FastAPI(root_path="/api") +app = FastAPI(root_path="/api", docs_url=config("docs_url", default=""), redoc_url=config("redoc_url", default="")) @app.middleware('http') diff --git a/frontend/Dockerfile b/frontend/Dockerfile index 2ab0312ab..bfa86857d 100644 --- a/frontend/Dockerfile +++ b/frontend/Dockerfile @@ -18,6 +18,7 @@ RUN apk upgrade busybox --no-cache --repository=http://dl-cdn.alpinelinux.org/al COPY --from=builder /work/public /var/www/openreplay COPY nginx.conf /etc/nginx/conf.d/default.conf +EXPOSE 8080 RUN chown -R nginx:nginx /var/cache/nginx && \ chown -R nginx:nginx /var/log/nginx && \ chown -R nginx:nginx /etc/nginx/conf.d && \ diff --git a/frontend/app/components/BugFinder/SessionList/SessionListHeader.js b/frontend/app/components/BugFinder/SessionList/SessionListHeader.js index bf4bf8b55..5e2702639 100644 --- a/frontend/app/components/BugFinder/SessionList/SessionListHeader.js +++ b/frontend/app/components/BugFinder/SessionList/SessionListHeader.js @@ -4,7 +4,10 @@ import SortDropdown from '../Filters/SortDropdown'; import { numberWithCommas } from 'App/utils'; import SelectDateRange from 'Shared/SelectDateRange'; import { applyFilter } from 'Duck/search'; -import Period from 'Types/app/period'; +import Record from 'Types/app/period'; +import { useStore } from 'App/mstore'; +import { useObserver } from 'mobx-react-lite'; +import { moment } from 'App/dateRange'; const sortOptionsMap = { 'startTs-desc': 'Newest', @@ -15,13 +18,30 @@ const sortOptionsMap = { const sortOptions = Object.entries(sortOptionsMap).map(([value, label]) => ({ value, label })); function SessionListHeader({ activeTab, count, applyFilter, filter }) { + const { settingsStore } = useStore(); + + const label = useObserver(() => settingsStore.sessionSettings.timezone.label); + const getTimeZoneOffset = React.useCallback(() => { + return label.slice(-6); + }, [label]); + const { startDate, endDate, rangeValue } = filter; - const period = new Period({ start: startDate, end: endDate, rangeName: rangeValue }); + const period = new Record({ start: startDate, end: endDate, rangeName: rangeValue }); const onDateChange = (e) => { const dateValues = e.toJSON(); + dateValues.startDate = moment(dateValues.startDate).utcOffset(getTimeZoneOffset(), true).valueOf(); + dateValues.endDate = moment(dateValues.endDate).utcOffset(getTimeZoneOffset(), true).valueOf(); applyFilter(dateValues); }; + + React.useEffect(() => { + const dateValues = period.toJSON(); + dateValues.startDate = moment(dateValues.startDate).startOf('day').utcOffset(getTimeZoneOffset(), true).valueOf(); + dateValues.endDate = moment(dateValues.endDate).endOf('day').utcOffset(getTimeZoneOffset(), true).valueOf(); + applyFilter(dateValues); + }, [label]); + return (
@@ -32,7 +52,7 @@ function SessionListHeader({ activeTab, count, applyFilter, filter }) { {
Sessions Captured in - +
}
diff --git a/frontend/app/components/Errors/Error/DistributionBar.js b/frontend/app/components/Errors/Error/DistributionBar.js index 6df611d0a..e6cc38ca5 100644 --- a/frontend/app/components/Errors/Error/DistributionBar.js +++ b/frontend/app/components/Errors/Error/DistributionBar.js @@ -6,52 +6,55 @@ import cls from './distributionBar.module.css'; import { colorScale } from 'App/utils'; function DistributionBar({ className, title, partitions }) { - if (partitions.length === 0) { - return null; - } + if (partitions.length === 0) { + return null; + } - const values = Array(partitions.length).fill().map((element, index) => index + 0); - const colors = colorScale(values, Styles.colors); + const values = Array(partitions.length) + .fill() + .map((element, index) => index + 0); + const colors = colorScale(values, Styles.colors); - return ( -
-
-
{ title }
-
-
- -
-
{ `${ Math.round(partitions[0].prc) }% ` }
-
-
-
- { partitions.map((p, index) => - - { p.label }
- {`${ Math.round(p.prc) }%`} -
- } - className="w-full" - > -
- - )} -
-
- ); + return ( +
+
+
{title}
+
+
+ +
+
{`${Math.round(partitions[0].prc)}% `}
+
+
+
+ {partitions.map((p, index) => ( + + {p.label} +
+ {`${Math.round(p.prc)}%`} +
+ } + style={{ + marginLeft: '1px', + width: `${p.prc}%`, + backgroundColor: colors(index), + }} + > +
+ + ))} +
+
+ ); } -DistributionBar.displayName = "DistributionBar"; -export default DistributionBar; \ No newline at end of file +DistributionBar.displayName = 'DistributionBar'; +export default DistributionBar; diff --git a/frontend/app/components/Session_/PageInsightsPanel/PageInsightsPanel.tsx b/frontend/app/components/Session_/PageInsightsPanel/PageInsightsPanel.tsx index cec3e1af2..03d74a247 100644 --- a/frontend/app/components/Session_/PageInsightsPanel/PageInsightsPanel.tsx +++ b/frontend/app/components/Session_/PageInsightsPanel/PageInsightsPanel.tsx @@ -1,6 +1,5 @@ import React, { useEffect, useState } from 'react'; -import { Dropdown, Loader, Icon } from 'UI'; -import DateRange from 'Shared/DateRange'; +import { Loader, Icon } from 'UI'; import { connect } from 'react-redux'; import { fetchInsights } from 'Duck/sessions'; import SelectorsList from './components/SelectorsList/SelectorsList'; @@ -11,100 +10,103 @@ import Period from 'Types/app/period'; const JUMP_OFFSET = 1000; interface Props { - filters: any - fetchInsights: (filters: Record) => void - urls: [] - insights: any - events: Array - urlOptions: Array - loading: boolean - host: string - setActiveTab: (tab: string) => void + filters: any; + fetchInsights: (filters: Record) => void; + urls: []; + insights: any; + events: Array; + urlOptions: Array; + loading: boolean; + host: string; + setActiveTab: (tab: string) => void; } -function PageInsightsPanel({ - filters, fetchInsights, events = [], insights, urlOptions, host, loading = true, setActiveTab -}: Props) { - const [insightsFilters, setInsightsFilters] = useState(filters) - const defaultValue = (urlOptions && urlOptions[0]) ? urlOptions[0].value : '' +function PageInsightsPanel({ filters, fetchInsights, events = [], insights, urlOptions, host, loading = true, setActiveTab }: Props) { + const [insightsFilters, setInsightsFilters] = useState(filters); + const defaultValue = urlOptions && urlOptions[0] ? urlOptions[0].value : ''; - const period = new Period({ - start: insightsFilters.startDate, - end: insightsFilters.endDate, - rangeName: insightsFilters.rangeValue - }); + const period = Period({ + start: insightsFilters.startDate, + end: insightsFilters.endDate, + rangeName: insightsFilters.rangeValue, + }); - const onDateChange = (e) => { - const { startDate, endDate, rangeValue } = e.toJSON(); - setInsightsFilters({ ...insightsFilters, startDate, endDate, rangeValue }) - } + const onDateChange = (e: any) => { + const { startDate, endDate, rangeValue } = e.toJSON(); + setInsightsFilters({ ...insightsFilters, startDate, endDate, rangeValue }); + }; - useEffect(() => { - markTargets(insights.toJS()); - return () => { - markTargets(null) - } - }, [insights]) + useEffect(() => { + markTargets(insights.toJS()); + return () => { + markTargets(null); + }; + }, [insights]); - useEffect(() => { - if (urlOptions && urlOptions[0]) { - const url = insightsFilters.url ? insightsFilters.url : host + urlOptions[0].value; - Player.pause(); - fetchInsights({ ...insightsFilters, url }) - } - }, [insightsFilters]) + useEffect(() => { + if (urlOptions && urlOptions[0]) { + const url = insightsFilters.url ? insightsFilters.url : host + urlOptions[0].value; + Player.pause(); + fetchInsights({ ...insightsFilters, url }); + } + }, [insightsFilters]); - const onPageSelect = ({ value }: { value: Array }) => { - const event = events.find(item => item.url === value) - Player.jump(event.time + JUMP_OFFSET) - setInsightsFilters({ ...insightsFilters, url: host + value }) - markTargets([]) - }; + const onPageSelect = ({ value }: any) => { + const event = events.find((item) => item.url === value.value); + Player.jump(event.time + JUMP_OFFSET); + setInsightsFilters({ ...insightsFilters, url: host + value.value }); + markTargets([]); + }; - return ( -
-
-
- Clicks - + return ( +
+
+
+ Clicks + +
+
{ + setActiveTab(''); + }} + className="ml-auto flex items-center justify-center bg-white cursor-pointer" + > + +
+
+
+
In Page
+ -
- - - -
- ) + ); } -export default connect(state => { - const events = state.getIn([ 'sessions', 'visitedEvents' ]) - return { - filters: state.getIn(['sessions', 'insightFilters']), - host: state.getIn([ 'sessions', 'host' ]), - insights: state.getIn([ 'sessions', 'insights' ]), - events: events, - urlOptions: events.map(({ url, host }: any) => ({ label: url, value: url, host })), - loading: state.getIn([ 'sessions', 'fetchInsightsRequest', 'loading' ]), - } -}, { fetchInsights })(PageInsightsPanel); +export default connect( + (state) => { + const events = state.getIn(['sessions', 'visitedEvents']); + return { + filters: state.getIn(['sessions', 'insightFilters']), + host: state.getIn(['sessions', 'host']), + insights: state.getIn(['sessions', 'insights']), + events: events, + urlOptions: events.map(({ url, host }: any) => ({ label: url, value: url, host })), + loading: state.getIn(['sessions', 'fetchInsightsRequest', 'loading']), + }; + }, + { fetchInsights } +)(PageInsightsPanel); diff --git a/frontend/app/components/Session_/PageInsightsPanel/components/SelectorCard/SelectorCard.tsx b/frontend/app/components/Session_/PageInsightsPanel/components/SelectorCard/SelectorCard.tsx index 9007b6684..9a3ecc210 100644 --- a/frontend/app/components/Session_/PageInsightsPanel/components/SelectorCard/SelectorCard.tsx +++ b/frontend/app/components/Session_/PageInsightsPanel/components/SelectorCard/SelectorCard.tsx @@ -1,30 +1,34 @@ -import React, { useState } from 'react' -import stl from './SelectorCard.module.css' +import React, { useState } from 'react'; +import stl from './SelectorCard.module.css'; import cn from 'classnames'; import type { MarkedTarget } from 'Player/MessageDistributor/StatedScreen/StatedScreen'; import { activeTarget } from 'Player'; import { Tooltip } from 'react-tippy'; interface Props { - index?: number, - target: MarkedTarget, - showContent: boolean + index?: number; + target: MarkedTarget; + showContent: boolean; } -export default function SelectorCard({ index = 1, target, showContent } : Props) { - return ( -
activeTarget(index)}> -
- {/* @ts-ignore */} -
{index + 1}
-
{target.selector}
-
- { showContent && ( -
-
{target.count} Clicks - {target.percent}%
-
TOTAL CLICKS
+export default function SelectorCard({ index = 1, target, showContent }: Props) { + return ( +
activeTarget(index)}> +
+ {/* @ts-ignore */} + +
{index + 1}
+
+
{target.selector}
+
+ {showContent && ( +
+
+ {target.count} Clicks - {target.percent}% +
+
TOTAL CLICKS
+
+ )}
- ) } -
- ) + ); } diff --git a/frontend/app/components/Session_/PageInsightsPanel/components/SelectorsList/SelectorsList.tsx b/frontend/app/components/Session_/PageInsightsPanel/components/SelectorsList/SelectorsList.tsx index aceefb3b7..86274baba 100644 --- a/frontend/app/components/Session_/PageInsightsPanel/components/SelectorsList/SelectorsList.tsx +++ b/frontend/app/components/Session_/PageInsightsPanel/components/SelectorsList/SelectorsList.tsx @@ -1,33 +1,26 @@ -import React, { useState } from 'react' -import { NoContent } from 'UI' +import React from 'react'; +import { NoContent } from 'UI'; import { connectPlayer } from 'Player/store'; import SelectorCard from '../SelectorCard/SelectorCard'; import type { MarkedTarget } from 'Player/MessageDistributor/StatedScreen/StatedScreen'; -import stl from './selectorList.module.css' +import stl from './selectorList.module.css'; interface Props { - targets: Array, - activeTargetIndex: number + targets: Array; + activeTargetIndex: number; } -function SelectorsList({ targets, activeTargetIndex }: Props) { - return ( - -
- { targets && targets.map((target, index) => ( - - ))} -
-
- ) +function SelectorsList({ targets, activeTargetIndex }: Props) { + return ( + +
+ {targets && targets.map((target, index) => )} +
+
+ ); } - -export default connectPlayer(state => ({ - targets: state.markedTargets, - activeTargetIndex: state.activeTargetIndex, -}))(SelectorsList) +export default connectPlayer((state: any) => ({ + targets: state.markedTargets, + activeTargetIndex: state.activeTargetIndex, +}))(SelectorsList); diff --git a/frontend/app/components/shared/Filters/FilterItem/FilterItem.tsx b/frontend/app/components/shared/Filters/FilterItem/FilterItem.tsx index 154e862a7..3d82dae7c 100644 --- a/frontend/app/components/shared/Filters/FilterItem/FilterItem.tsx +++ b/frontend/app/components/shared/Filters/FilterItem/FilterItem.tsx @@ -10,7 +10,7 @@ import SubFilterItem from '../SubFilterItem'; interface Props { filterIndex: number; filter: any; // event/filter - onUpdate: (filter) => void; + onUpdate: (filter: any) => void; onRemoveFilter: () => void; isFilter?: boolean; saveRequestPayloads?: boolean; @@ -20,26 +20,26 @@ function FilterItem(props: Props) { const canShowValues = !(filter.operator === 'isAny' || filter.operator === 'onAny' || filter.operator === 'isUndefined'); const isSubFilter = filter.type === FilterType.SUB_FILTERS; - const replaceFilter = (filter) => { + const replaceFilter = (filter: any) => { props.onUpdate({ ...filter, value: [''], - filters: filter.filters ? filter.filters.map((i) => ({ ...i, value: [''] })) : [], + filters: filter.filters ? filter.filters.map((i: any) => ({ ...i, value: [''] })) : [], }); }; - const onOperatorChange = (e, { name, value }) => { + const onOperatorChange = (e: any, { name, value }: any) => { props.onUpdate({ ...filter, operator: value.value }); }; - const onSourceOperatorChange = (e, { name, value }) => { + const onSourceOperatorChange = (e: any, { name, value }: any) => { props.onUpdate({ ...filter, sourceOperator: value.value }); }; - const onUpdateSubFilter = (subFilter, subFilterIndex) => { + const onUpdateSubFilter = (subFilter: any, subFilterIndex: any) => { props.onUpdate({ ...filter, - filters: filter.filters.map((i, index) => { + filters: filter.filters.map((i: any, index: any) => { if (index === subFilterIndex) { return subFilter; } @@ -90,8 +90,8 @@ function FilterItem(props: Props) { {isSubFilter && (
{filter.filters - .filter((i) => (i.key !== FilterKey.FETCH_REQUEST_BODY && i.key !== FilterKey.FETCH_RESPONSE_BODY) || saveRequestPayloads) - .map((subFilter, subFilterIndex) => ( + .filter((i: any) => (i.key !== FilterKey.FETCH_REQUEST_BODY && i.key !== FilterKey.FETCH_RESPONSE_BODY) || saveRequestPayloads) + .map((subFilter: any, subFilterIndex: any) => ( void; + filter: any; + onUpdate: (filter: any) => void; } function FilterSource(props: Props) { - const { filter } = props; - const [value, setValue] = useState(filter.source[0] || ''); + const { filter } = props; + const [value, setValue] = useState(filter.source[0] || ''); + const debounceUpdate: any = React.useCallback(debounce(props.onUpdate, 1000), [props.onUpdate]); - const onChange = ({ target: { value, name } }) => { - props.onUpdate({ ...filter, [name]: [value] }) - } + useEffect(() => { + setValue(filter.source[0] || ''); + }, [filter]); - useEffect(() => { - setValue(filter.source[0] || ''); - }, [filter]) + useEffect(() => { + debounceUpdate({ ...filter, source: [value] }); + }, [value]); - useEffect(() => { - props.onUpdate({ ...filter, source: [value] }) - }, [value]) + const write = ({ target: { value, name } }: any) => setValue(value); - const write = ({ target: { value, name } }) => setValue(value) + const renderFiled = () => { + switch (filter.sourceType) { + case FilterType.NUMBER: + return ( +
+ +
{filter.sourceUnit}
+
+ ); + } + }; - const renderFiled = () => { - switch(filter.sourceType) { - case FilterType.NUMBER: - return ( - - ) - } - } - - return ( -
- { renderFiled()} -
- ); + return
{renderFiled()}
; } -export default FilterSource; \ No newline at end of file +export default FilterSource; diff --git a/frontend/app/components/shared/Filters/FilterValue/FilterValue.tsx b/frontend/app/components/shared/Filters/FilterValue/FilterValue.tsx index 29dce323d..5638f9a1d 100644 --- a/frontend/app/components/shared/Filters/FilterValue/FilterValue.tsx +++ b/frontend/app/components/shared/Filters/FilterValue/FilterValue.tsx @@ -6,6 +6,7 @@ import FilterValueDropdown from '../FilterValueDropdown'; import FilterDuration from '../FilterDuration'; import { debounce } from 'App/utils'; import { assist as assistRoute, isRoute } from 'App/routes'; +import cn from 'classnames'; const ASSIST_ROUTE = assistRoute(); @@ -172,7 +173,8 @@ function FilterValue(props: Props) { }; return ( -
+ // +
{filter.type === FilterType.DURATION ? renderValueFiled(filter.value, 0) : filter.value && diff --git a/frontend/app/components/shared/SelectDateRange/SelectDateRange.tsx b/frontend/app/components/shared/SelectDateRange/SelectDateRange.tsx index 12b6ba016..da8a940a5 100644 --- a/frontend/app/components/shared/SelectDateRange/SelectDateRange.tsx +++ b/frontend/app/components/shared/SelectDateRange/SelectDateRange.tsx @@ -6,17 +6,19 @@ import { components } from 'react-select'; import DateRangePopup from 'Shared/DateRangeDropdown/DateRangePopup'; import OutsideClickDetectingDiv from 'Shared/OutsideClickDetectingDiv'; import cn from 'classnames'; +import { observer } from 'mobx-react-lite'; interface Props { period: any; onChange: (data: any) => void; disableCustom?: boolean; right?: boolean; + timezone?: string; [x: string]: any; } function SelectDateRange(props: Props) { const [isCustom, setIsCustom] = React.useState(false); - const { right = false, period, disableCustom = false, ...rest } = props; + const { right = false, period, disableCustom = false, timezone, ...rest } = props; let selectedValue = DATE_RANGE_OPTIONS.find((obj: any) => obj.value === period.rangeName); const options = DATE_RANGE_OPTIONS.filter((obj: any) => (disableCustom ? obj.value !== CUSTOM_RANGE : true)); @@ -24,15 +26,20 @@ function SelectDateRange(props: Props) { if (value === CUSTOM_RANGE) { setIsCustom(true); } else { + // @ts-ignore props.onChange(new Period({ rangeName: value })); } }; const onApplyDateRange = (value: any) => { - props.onChange(new Period({ rangeName: CUSTOM_RANGE, start: value.start, end: value.end })); + // @ts-ignore + const range = new Period({ rangeName: CUSTOM_RANGE, start: value.start, end: value.end }) + props.onChange(range); setIsCustom(false); }; + const isCustomRange = period.rangeName === CUSTOM_RANGE; + const customRange = isCustomRange ? period.rangeFormatted(undefined, timezone) : ''; return (