feat(spot): small sql improvements
This commit is contained in:
parent
1d47d77dda
commit
b471d7f260
3 changed files with 37 additions and 22 deletions
|
|
@ -3,18 +3,19 @@ package service
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/rs/xid"
|
||||
"openreplay/backend/pkg/spot/auth"
|
||||
"time"
|
||||
|
||||
"github.com/rs/xid"
|
||||
|
||||
"openreplay/backend/pkg/db/postgres/pool"
|
||||
"openreplay/backend/pkg/logger"
|
||||
"openreplay/backend/pkg/spot/auth"
|
||||
)
|
||||
|
||||
type Key struct {
|
||||
SpotID uint64 `json:"-"`
|
||||
UserID uint64 `json:"-"` // to track who generated the key
|
||||
TenantID uint64 `json:"-"` // to check availability
|
||||
TenantID uint64 `json:"-"` // to check spot availability
|
||||
Value string `json:"value"`
|
||||
Expiration uint64 `json:"expiration"` // in seconds
|
||||
ExpiredAt time.Time `json:"-"`
|
||||
|
|
@ -40,6 +41,7 @@ func (k *keysImpl) Set(spotID, expiration uint64, user *auth.User) (*Key, error)
|
|||
case user == nil:
|
||||
return nil, fmt.Errorf("user is required")
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
if expiration == 0 {
|
||||
sql := `UPDATE spots_keys SET expired_at = $1, expiration = 0 WHERE spot_id = $2`
|
||||
|
|
@ -68,7 +70,7 @@ func (k *keysImpl) Set(spotID, expiration uint64, user *auth.User) (*Key, error)
|
|||
),
|
||||
|
||||
inserted AS (
|
||||
INSERT INTO spots_keys (spot_key, spot_id, user_id, tenant_id, expiration, created_at, expired_at)
|
||||
INSERT INTO spots_keys (spot_key, spot_id, user_id, expiration, created_at, expired_at)
|
||||
SELECT $2, $6, $3, $7, $4, $1, $5
|
||||
WHERE NOT EXISTS (SELECT 1 FROM updated)
|
||||
RETURNING spot_key, expiration, expired_at
|
||||
|
|
@ -79,7 +81,7 @@ func (k *keysImpl) Set(spotID, expiration uint64, user *auth.User) (*Key, error)
|
|||
SELECT spot_key, expiration, expired_at FROM inserted;
|
||||
`
|
||||
key := &Key{}
|
||||
if err := k.conn.QueryRow(sql, now, newKey, user.ID, expiration, expiredAt, spotID, user.TenantID).
|
||||
if err := k.conn.QueryRow(sql, now, newKey, user.ID, expiration, expiredAt, spotID).
|
||||
Scan(&key.Value, &key.Expiration, &key.ExpiredAt); err != nil {
|
||||
k.log.Error(context.Background(), "failed to set key: %v", err)
|
||||
return nil, fmt.Errorf("key not updated")
|
||||
|
|
@ -94,9 +96,12 @@ func (k *keysImpl) Get(spotID uint64, user *auth.User) (*Key, error) {
|
|||
case user == nil:
|
||||
return nil, fmt.Errorf("user is required")
|
||||
}
|
||||
//
|
||||
|
||||
key := &Key{}
|
||||
sql := `SELECT spot_key, expiration, expired_at FROM spots_keys WHERE spot_id = $1 AND tenant_id = $2`
|
||||
sql := `SELECT k.spot_key, k.expiration, k.expired_at
|
||||
FROM spots_keys k
|
||||
JOIN spots s ON s.spot_id = k.spot_id
|
||||
WHERE k.spot_id = $1 AND s.tenant_id = $2`
|
||||
if err := k.conn.QueryRow(sql, spotID, user.TenantID).Scan(&key.Value, &key.Expiration, &key.ExpiredAt); err != nil {
|
||||
k.log.Error(context.Background(), "failed to get key: %v", err)
|
||||
return nil, fmt.Errorf("key not found")
|
||||
|
|
|
|||
|
|
@ -4,15 +4,17 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"openreplay/backend/pkg/db/postgres/pool"
|
||||
"openreplay/backend/pkg/flakeid"
|
||||
"openreplay/backend/pkg/logger"
|
||||
"openreplay/backend/pkg/spot/auth"
|
||||
"time"
|
||||
)
|
||||
|
||||
const MaxNameLength = 64
|
||||
const MaxCommentLength = 120
|
||||
const MaxNumberOfComments = 20
|
||||
const MaxNumberOfComments = 25
|
||||
|
||||
type Spot struct {
|
||||
ID uint64 `json:"id"`
|
||||
|
|
@ -83,6 +85,9 @@ func (s *spotsImpl) Add(user *auth.User, name, comment string, duration int, cro
|
|||
case duration <= 0:
|
||||
return nil, fmt.Errorf("duration should be greater than 0")
|
||||
}
|
||||
if len(name) > MaxNameLength {
|
||||
name = name[:MaxNameLength]
|
||||
}
|
||||
|
||||
createdAt := time.Now()
|
||||
spotID, err := s.flaker.Compose(uint64(createdAt.UnixMilli()))
|
||||
|
|
@ -122,16 +127,15 @@ func (s *spotsImpl) encodeComment(comment *Comment) string {
|
|||
}
|
||||
|
||||
func (s *spotsImpl) add(spot *Spot) error {
|
||||
sql := `INSERT INTO spots (spot_id, name, user_id, user_email, tenant_id, duration, crop, comments, status, created_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`
|
||||
sql := `INSERT INTO spots (spot_id, name, user_id, tenant_id, duration, crop, comments, created_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`
|
||||
var comments []string
|
||||
for _, comment := range spot.Comments {
|
||||
if encodedComment := s.encodeComment(&comment); encodedComment != "" {
|
||||
comments = append(comments, encodedComment)
|
||||
}
|
||||
}
|
||||
err := s.pgconn.Exec(sql, spot.ID, spot.Name, spot.UserID, spot.UserEmail, spot.TenantID, spot.Duration, spot.Crop,
|
||||
comments, "pending", spot.CreatedAt)
|
||||
err := s.pgconn.Exec(sql, spot.ID, spot.Name, spot.UserID, spot.TenantID, spot.Duration, spot.Crop, comments, spot.CreatedAt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -149,8 +153,10 @@ func (s *spotsImpl) GetByID(user *auth.User, spotID uint64) (*Spot, error) {
|
|||
}
|
||||
|
||||
func (s *spotsImpl) getByID(spotID uint64, user *auth.User) (*Spot, error) {
|
||||
sql := `SELECT name, user_email, duration, crop, comments, created_at FROM spots
|
||||
WHERE spot_id = $1 AND tenant_id = $2 AND deleted_at IS NULL`
|
||||
sql := `SELECT s.name, u.email, s.duration, s.crop, s.comments, s.created_at
|
||||
FROM spots s
|
||||
JOIN users u ON s.user_id = u.user_id
|
||||
WHERE s.spot_id = $1 AND s.tenant_id = $2 AND s.deleted_at IS NULL`
|
||||
spot := &Spot{ID: spotID}
|
||||
var comments []string
|
||||
err := s.pgconn.QueryRow(sql, spotID, user.TenantID).Scan(&spot.Name, &spot.UserEmail, &spot.Duration, &spot.Crop,
|
||||
|
|
@ -195,8 +201,10 @@ func (s *spotsImpl) Get(user *auth.User, opts *GetOpts) ([]*Spot, uint64, error)
|
|||
}
|
||||
|
||||
func (s *spotsImpl) getAll(user *auth.User, opts *GetOpts) ([]*Spot, uint64, error) {
|
||||
sql := `SELECT COUNT(1) OVER () AS total, spot_id, name, user_email, duration, created_at FROM spots
|
||||
WHERE tenant_id = $1 AND deleted_at IS NULL`
|
||||
sql := `SELECT COUNT(1) OVER () AS total, s.spot_id, s.name, u.email, s.duration, s.created_at
|
||||
FROM spots s
|
||||
JOIN users u ON s.user_id = u.user_id
|
||||
WHERE s.tenant_id = $1 AND s.deleted_at IS NULL`
|
||||
args := []interface{}{user.TenantID}
|
||||
if opts.UserID != 0 {
|
||||
sql += ` AND user_id = ` + fmt.Sprintf("$%d", len(args)+1)
|
||||
|
|
@ -217,7 +225,6 @@ func (s *spotsImpl) getAll(user *auth.User, opts *GetOpts) ([]*Spot, uint64, err
|
|||
sql += ` OFFSET ` + fmt.Sprintf("$%d", len(args)+1)
|
||||
args = append(args, opts.Offset)
|
||||
}
|
||||
//s.log.Info(context.Background(), "sql: %s, args: %v", sql, args)
|
||||
rows, err := s.pgconn.Query(sql, args...)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
|
|
@ -245,6 +252,9 @@ func (s *spotsImpl) UpdateName(user *auth.User, spotID uint64, newName string) (
|
|||
case newName == "":
|
||||
return nil, fmt.Errorf("new name is required")
|
||||
}
|
||||
if len(newName) > MaxNameLength {
|
||||
newName = newName[:MaxNameLength]
|
||||
}
|
||||
return s.updateName(spotID, newName, user)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ func (t *Task) HasToTranscode() bool {
|
|||
}
|
||||
|
||||
func (t *tasksImpl) Add(spotID uint64, crop []int, duration int) error {
|
||||
sql := `INSERT INTO spots_tasks (id, crop, duration, status, added_time) VALUES ($1, $2, $3, $4, $5)`
|
||||
sql := `INSERT INTO spots_tasks (spot_id, crop, duration, status, added_time) VALUES ($1, $2, $3, $4, $5)`
|
||||
if err := t.conn.Exec(sql, spotID, crop, duration, "pending", time.Now()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -66,7 +66,7 @@ func (t *tasksImpl) Get() (task *Task, err error) {
|
|||
}()
|
||||
|
||||
task = &Task{tx: pool.Tx{Tx: tx}}
|
||||
sql := `SELECT id, crop, duration FROM spots_tasks WHERE status = 'pending' ORDER BY added_time FOR UPDATE SKIP LOCKED LIMIT 1`
|
||||
sql := `SELECT spot_id, crop, duration FROM spots_tasks WHERE status = 'pending' ORDER BY added_time FOR UPDATE SKIP LOCKED LIMIT 1`
|
||||
err = tx.TxQueryRow(sql).Scan(&task.SpotID, &task.Crop, &task.Duration)
|
||||
if err != nil {
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
|
|
@ -78,7 +78,7 @@ func (t *tasksImpl) Get() (task *Task, err error) {
|
|||
}
|
||||
|
||||
func (t *tasksImpl) Done(task *Task) error {
|
||||
sql := `DELETE FROM spots_tasks WHERE id = $1`
|
||||
sql := `DELETE FROM spots_tasks WHERE spot_id = $1`
|
||||
err := task.tx.TxExec(sql, task.SpotID)
|
||||
if err != nil {
|
||||
task.tx.TxRollback()
|
||||
|
|
@ -89,7 +89,7 @@ func (t *tasksImpl) Done(task *Task) error {
|
|||
}
|
||||
|
||||
func (t *tasksImpl) Failed(task *Task, taskErr error) error {
|
||||
sql := `UPDATE spots_tasks SET status = 'failed', error = $2 WHERE id = $1`
|
||||
sql := `UPDATE spots_tasks SET status = 'failed', error = $2 WHERE spot_id = $1`
|
||||
err := task.tx.TxExec(sql, task.SpotID, taskErr.Error())
|
||||
if err != nil {
|
||||
task.tx.TxRollback()
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue