feat(api): dev rebase

This commit is contained in:
Shekar Siri 2025-04-18 14:39:19 +02:00
parent 8711648ac7
commit c077841b4e
4 changed files with 32 additions and 25 deletions

View file

@ -8,8 +8,9 @@ import (
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/metrics"
analyticsMetrics "openreplay/backend/pkg/metrics/analytics"
databaseMetrics "openreplay/backend/pkg/metrics/database"
//analyticsMetrics "openreplay/backend/pkg/metrics/analytics"
//databaseMetrics "openreplay/backend/pkg/metrics/database"
"openreplay/backend/pkg/metrics/database"
"openreplay/backend/pkg/metrics/web"
"openreplay/backend/pkg/server"
"openreplay/backend/pkg/server/api"
@ -20,9 +21,10 @@ func main() {
log := logger.New()
cfg := analyticsConfig.New(log)
webMetrics := web.New("analytics")
metrics.New(log, append(webMetrics.List(), append(analyticsMetrics.List(), databaseMetrics.List()...)...))
dbMetrics := database.New("analytics")
metrics.New(log, append(webMetrics.List(), dbMetrics.List()...))
pgConn, err := pool.New(cfg.Postgres.String())
pgConn, err := pool.New(dbMetrics, cfg.Postgres.String())
if err != nil {
log.Fatal(ctx, "can't init postgres connection: %s", err)
}
@ -34,7 +36,7 @@ func main() {
}
defer chConn.Stop()
builder, err := analytics.NewServiceBuilder(log, cfg, webMetrics, pgConn, chConn)
builder, err := analytics.NewServiceBuilder(log, cfg, webMetrics, dbMetrics, pgConn, chConn)
if err != nil {
log.Fatal(ctx, "can't init services: %s", err)
}

View file

@ -6,7 +6,6 @@ import (
"fmt"
"strings"
"github.com/jackc/pgx/v4"
"github.com/lib/pq"
"openreplay/backend/pkg/db/postgres/pool"
@ -48,12 +47,12 @@ func (s *cardsImpl) Create(projectId int, userID uint64, req *CardCreateRequest)
ctx := context.Background()
defer func() {
if err != nil {
tx.Rollback(ctx)
err := tx.TxRollback()
if err != nil {
return
}
} else {
err := tx.Commit(ctx)
err := tx.TxCommit()
if err != nil {
return
}
@ -67,8 +66,8 @@ func (s *cardsImpl) Create(projectId int, userID uint64, req *CardCreateRequest)
RETURNING metric_id, project_id, user_id, name, metric_type, view_type, metric_of, metric_value, metric_format, is_public, created_at, edited_at`
card := &CardGetResponse{}
err = tx.QueryRow(
ctx, sql,
err = tx.TxQueryRow(
sql,
projectId, userID, req.Name, req.MetricType, req.ViewType, req.MetricOf, req.MetricValue, req.MetricFormat, req.IsPublic,
).Scan(
&card.CardID,
@ -98,7 +97,7 @@ func (s *cardsImpl) Create(projectId int, userID uint64, req *CardCreateRequest)
return card, nil
}
func (s *cardsImpl) CreateSeries(ctx context.Context, tx pgx.Tx, metricId int64, series []CardSeriesBase) []CardSeries {
func (s *cardsImpl) CreateSeries(ctx context.Context, tx *pool.Tx, metricId int64, series []CardSeriesBase) []CardSeries {
if len(series) == 0 {
return nil // No series to create
}
@ -126,7 +125,7 @@ func (s *cardsImpl) CreateSeries(ctx context.Context, tx pgx.Tx, metricId int64,
query := fmt.Sprintf(sql, strings.Join(values, ","))
s.log.Info(ctx, "Executing query: %s with args: %v", query, args)
rows, err := tx.Query(ctx, query, args...)
rows, err := tx.TxQuery(query, args...)
if err != nil {
s.log.Error(ctx, "failed to execute batch insert for series: %v", err)
return nil
@ -359,12 +358,12 @@ func (s *cardsImpl) Update(projectId int, cardID int64, userID uint64, req *Card
ctx := context.Background()
defer func() {
if err != nil {
tx.Rollback(ctx)
err := tx.TxRollback()
if err != nil {
return
}
} else {
err := tx.Commit(ctx)
err := tx.TxCommit()
if err != nil {
return
}
@ -379,7 +378,7 @@ func (s *cardsImpl) Update(projectId int, cardID int64, userID uint64, req *Card
RETURNING metric_id, project_id, user_id, name, metric_type, view_type, metric_of, metric_value, metric_format, is_public, created_at, edited_at`
card := &CardGetResponse{}
err = tx.QueryRow(ctx, sql,
err = tx.TxQueryRow(sql,
req.Name, req.MetricType, req.ViewType, req.MetricOf, req.MetricValue, req.MetricFormat, req.IsPublic, cardID, projectId,
).Scan(
&card.CardID, &card.ProjectID, &card.UserID, &card.Name, &card.MetricType, &card.ViewType, &card.MetricOf,

View file

@ -1,7 +1,6 @@
package dashboards
import (
"context"
"encoding/json"
"errors"
"fmt"
@ -336,15 +335,14 @@ func (s *dashboardsImpl) AddCards(projectId int, dashboardId int, userId uint64,
return fmt.Errorf("failed to start transaction: %w", err)
}
ctx := context.Background()
defer func() {
if err != nil {
tx.Rollback(ctx)
err := tx.TxRollback()
if err != nil {
return
}
} else {
err := tx.Commit(ctx)
err := tx.TxCommit()
if err != nil {
return
}
@ -356,7 +354,7 @@ func (s *dashboardsImpl) AddCards(projectId int, dashboardId int, userId uint64,
for _, metricID := range req.MetricIDs {
// Check if the widget already exists
var exists bool
err := tx.QueryRow(ctx, `
err := tx.TxQueryRow(`
SELECT EXISTS (
SELECT 1 FROM public.dashboard_widgets
WHERE dashboard_id = $1 AND metric_id = $2
@ -371,10 +369,9 @@ func (s *dashboardsImpl) AddCards(projectId int, dashboardId int, userId uint64,
}
// Insert new widget
_, err = tx.Exec(ctx, `
INSERT INTO public.dashboard_widgets (dashboard_id, metric_id, user_id, config)
VALUES ($1, $2, $3, $4)
`, dashboardId, metricID, userId, req.Config)
query := `INSERT INTO public.dashboard_widgets (dashboard_id, metric_id, user_id, config)
VALUES ($1, $2, $3, $4)`
err = tx.TxExec(query, dashboardId, metricID, userId, req.Config)
if err != nil {
return fmt.Errorf("failed to insert widget: %w", err)
}
@ -382,7 +379,7 @@ func (s *dashboardsImpl) AddCards(projectId int, dashboardId int, userId uint64,
}
// Commit transaction
if err := tx.Commit(ctx); err != nil {
if err := tx.TxCommit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}

View file

@ -110,6 +110,15 @@ func (tx *Tx) TxExec(sql string, args ...interface{}) error {
return err
}
func (tx *Tx) TxQuery(sql string, args ...interface{}) (pgx.Rows, error) {
start := time.Now()
res, err := tx.origTx.Query(getTimeoutContext(), sql, args...)
method, table := methodName(sql)
tx.metrics.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), method, table)
tx.metrics.IncreaseTotalRequests(method, table)
return res, err
}
func (tx *Tx) TxQueryRow(sql string, args ...interface{}) pgx.Row {
start := time.Now()
res := tx.origTx.QueryRow(context.Background(), sql, args...)