diff --git a/backend/cmd/analytics/main.go b/backend/cmd/analytics/main.go index 1a4b099dd..cb53cb338 100644 --- a/backend/cmd/analytics/main.go +++ b/backend/cmd/analytics/main.go @@ -8,8 +8,9 @@ import ( "openreplay/backend/pkg/db/postgres/pool" "openreplay/backend/pkg/logger" "openreplay/backend/pkg/metrics" - analyticsMetrics "openreplay/backend/pkg/metrics/analytics" - databaseMetrics "openreplay/backend/pkg/metrics/database" + //analyticsMetrics "openreplay/backend/pkg/metrics/analytics" + //databaseMetrics "openreplay/backend/pkg/metrics/database" + "openreplay/backend/pkg/metrics/database" "openreplay/backend/pkg/metrics/web" "openreplay/backend/pkg/server" "openreplay/backend/pkg/server/api" @@ -20,9 +21,10 @@ func main() { log := logger.New() cfg := analyticsConfig.New(log) webMetrics := web.New("analytics") - metrics.New(log, append(webMetrics.List(), append(analyticsMetrics.List(), databaseMetrics.List()...)...)) + dbMetrics := database.New("analytics") + metrics.New(log, append(webMetrics.List(), dbMetrics.List()...)) - pgConn, err := pool.New(cfg.Postgres.String()) + pgConn, err := pool.New(dbMetrics, cfg.Postgres.String()) if err != nil { log.Fatal(ctx, "can't init postgres connection: %s", err) } @@ -34,7 +36,7 @@ func main() { } defer chConn.Stop() - builder, err := analytics.NewServiceBuilder(log, cfg, webMetrics, pgConn, chConn) + builder, err := analytics.NewServiceBuilder(log, cfg, webMetrics, dbMetrics, pgConn, chConn) if err != nil { log.Fatal(ctx, "can't init services: %s", err) } diff --git a/backend/pkg/analytics/cards/cards.go b/backend/pkg/analytics/cards/cards.go index 1425a61f0..794db1c98 100644 --- a/backend/pkg/analytics/cards/cards.go +++ b/backend/pkg/analytics/cards/cards.go @@ -6,7 +6,6 @@ import ( "fmt" "strings" - "github.com/jackc/pgx/v4" "github.com/lib/pq" "openreplay/backend/pkg/db/postgres/pool" @@ -48,12 +47,12 @@ func (s *cardsImpl) Create(projectId int, userID uint64, req *CardCreateRequest) ctx := context.Background() defer func() { if err != nil { - tx.Rollback(ctx) + err := tx.TxRollback() if err != nil { return } } else { - err := tx.Commit(ctx) + err := tx.TxCommit() if err != nil { return } @@ -67,8 +66,8 @@ func (s *cardsImpl) Create(projectId int, userID uint64, req *CardCreateRequest) RETURNING metric_id, project_id, user_id, name, metric_type, view_type, metric_of, metric_value, metric_format, is_public, created_at, edited_at` card := &CardGetResponse{} - err = tx.QueryRow( - ctx, sql, + err = tx.TxQueryRow( + sql, projectId, userID, req.Name, req.MetricType, req.ViewType, req.MetricOf, req.MetricValue, req.MetricFormat, req.IsPublic, ).Scan( &card.CardID, @@ -98,7 +97,7 @@ func (s *cardsImpl) Create(projectId int, userID uint64, req *CardCreateRequest) return card, nil } -func (s *cardsImpl) CreateSeries(ctx context.Context, tx pgx.Tx, metricId int64, series []CardSeriesBase) []CardSeries { +func (s *cardsImpl) CreateSeries(ctx context.Context, tx *pool.Tx, metricId int64, series []CardSeriesBase) []CardSeries { if len(series) == 0 { return nil // No series to create } @@ -126,7 +125,7 @@ func (s *cardsImpl) CreateSeries(ctx context.Context, tx pgx.Tx, metricId int64, query := fmt.Sprintf(sql, strings.Join(values, ",")) s.log.Info(ctx, "Executing query: %s with args: %v", query, args) - rows, err := tx.Query(ctx, query, args...) + rows, err := tx.TxQuery(query, args...) if err != nil { s.log.Error(ctx, "failed to execute batch insert for series: %v", err) return nil @@ -359,12 +358,12 @@ func (s *cardsImpl) Update(projectId int, cardID int64, userID uint64, req *Card ctx := context.Background() defer func() { if err != nil { - tx.Rollback(ctx) + err := tx.TxRollback() if err != nil { return } } else { - err := tx.Commit(ctx) + err := tx.TxCommit() if err != nil { return } @@ -379,7 +378,7 @@ func (s *cardsImpl) Update(projectId int, cardID int64, userID uint64, req *Card RETURNING metric_id, project_id, user_id, name, metric_type, view_type, metric_of, metric_value, metric_format, is_public, created_at, edited_at` card := &CardGetResponse{} - err = tx.QueryRow(ctx, sql, + err = tx.TxQueryRow(sql, req.Name, req.MetricType, req.ViewType, req.MetricOf, req.MetricValue, req.MetricFormat, req.IsPublic, cardID, projectId, ).Scan( &card.CardID, &card.ProjectID, &card.UserID, &card.Name, &card.MetricType, &card.ViewType, &card.MetricOf, diff --git a/backend/pkg/analytics/dashboards/dashboards.go b/backend/pkg/analytics/dashboards/dashboards.go index cb260f572..057c9c830 100644 --- a/backend/pkg/analytics/dashboards/dashboards.go +++ b/backend/pkg/analytics/dashboards/dashboards.go @@ -1,7 +1,6 @@ package dashboards import ( - "context" "encoding/json" "errors" "fmt" @@ -336,15 +335,14 @@ func (s *dashboardsImpl) AddCards(projectId int, dashboardId int, userId uint64, return fmt.Errorf("failed to start transaction: %w", err) } - ctx := context.Background() defer func() { if err != nil { - tx.Rollback(ctx) + err := tx.TxRollback() if err != nil { return } } else { - err := tx.Commit(ctx) + err := tx.TxCommit() if err != nil { return } @@ -356,7 +354,7 @@ func (s *dashboardsImpl) AddCards(projectId int, dashboardId int, userId uint64, for _, metricID := range req.MetricIDs { // Check if the widget already exists var exists bool - err := tx.QueryRow(ctx, ` + err := tx.TxQueryRow(` SELECT EXISTS ( SELECT 1 FROM public.dashboard_widgets WHERE dashboard_id = $1 AND metric_id = $2 @@ -371,10 +369,9 @@ func (s *dashboardsImpl) AddCards(projectId int, dashboardId int, userId uint64, } // Insert new widget - _, err = tx.Exec(ctx, ` - INSERT INTO public.dashboard_widgets (dashboard_id, metric_id, user_id, config) - VALUES ($1, $2, $3, $4) - `, dashboardId, metricID, userId, req.Config) + query := `INSERT INTO public.dashboard_widgets (dashboard_id, metric_id, user_id, config) + VALUES ($1, $2, $3, $4)` + err = tx.TxExec(query, dashboardId, metricID, userId, req.Config) if err != nil { return fmt.Errorf("failed to insert widget: %w", err) } @@ -382,7 +379,7 @@ func (s *dashboardsImpl) AddCards(projectId int, dashboardId int, userId uint64, } // Commit transaction - if err := tx.Commit(ctx); err != nil { + if err := tx.TxCommit(); err != nil { return fmt.Errorf("failed to commit transaction: %w", err) } diff --git a/backend/pkg/db/postgres/pool/pool.go b/backend/pkg/db/postgres/pool/pool.go index f6d82e6c3..747654695 100644 --- a/backend/pkg/db/postgres/pool/pool.go +++ b/backend/pkg/db/postgres/pool/pool.go @@ -110,6 +110,15 @@ func (tx *Tx) TxExec(sql string, args ...interface{}) error { return err } +func (tx *Tx) TxQuery(sql string, args ...interface{}) (pgx.Rows, error) { + start := time.Now() + res, err := tx.origTx.Query(getTimeoutContext(), sql, args...) + method, table := methodName(sql) + tx.metrics.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), method, table) + tx.metrics.IncreaseTotalRequests(method, table) + return res, err +} + func (tx *Tx) TxQueryRow(sql string, args ...interface{}) pgx.Row { start := time.Now() res := tx.origTx.QueryRow(context.Background(), sql, args...)