Spots (#2305)

* feat(spot): first version to test http endpoints

* fix(helm): changed nginx path prefix

* fix(spots): added missing BUCKET_NAME env var

* fix(spots): added services init check

* feat(spots): removed geo module

* feat(spots): removed uaparser

* feat(spots): added more detailed authorization error log

* feat(spots): changed the authorization middleware

* feat(spots): extended http body size limit to 128kb

* feat(spots): added s3 error log

* feat(spots): added new handler for uploaded event

* feat(backend): small api changes in spot service

* feat(backend): rewrote request parameters grabber for getSpot handler

* feat(backend): added tenantID to auth struct

* feat(backend): added pre-signed download urls for preview, mob et video files

* feat(backend): added user's email to spots table, and getSpot responses

* feat(backend): returning spotID as a string

* feat(spot): added transcoder pipeline

* fix(spot): return spotID as a string

* feat(spot): added volume mount to spot service

* feat(spot): fixed volume mounting

* feat(spot): helm fix

* feat(spot): helm another fix

* fix(spot): correct video.webm path

* fix(spot): correct pre-signed url for download original video

* feat(spot): added PATCH and DELETE methods to CORS

* feat(spot): use string format for spotIDs in delete method

* feat(spot): added public key implemented

* fix(spot): correct public-key parser

* fix(spot): fixed query params issue + user's tenantID

* fix(spot): use 1 as a default tenant

* feat(spot): added correct total spots calculation

* fix(spot): fixed offset calculation

* feat(spot): added extra check in auth method

* fix(spot): removed / from video file name

* fix(spot): devided codec flag into 2 parts

* feat(spot): use fixed tenantID = 1 for oss users

* feat(spot): return 404 for public key not found issue

* feat(spots): added spots folder to minio path rule

* feat(spot): added spot video streaming support

* fix(spot): fixed an sql request for spot streams

* feat(spot): return playlist file in getSpot responce

* feat(spot): try to use aac audio codec

* feat(spot): added permissions support (oss/ee)

* feat(spot): added authorizer method

* feat(spot): added license check

* feat(spot): added spot preview for get response

* fix(spot): fixed a problem with permissions

* feat(spot): added crop feature

* feat(spot): upload cropped video back to s3

* feat(spot): manage expired modified playlist file

* feat(backend): hack with video formats

* feat(backend): removed space

* feat(spot): req tracing

* feat(spot): manual method's name mapping

* feat(spot): added a second method to public key auth support

* feat(spot): metrics

* feat(spot): added rate limiter per user

* feat(spot): added ping endpoint for spot jwt token check

* feat(spot): getStatus endpoint

* feat(spot): added missing import

* feat(spot): transcoding issue fix

* feat(spot): temp remove tasks

* feat(spot): better error log message

* feat(spot): set default jwt_secret value

* feat(spot): debug auth

* feat(spot): 2 diff jwt tokens support

* feat(spot): pg tasks with process status

* feat(spot): more logs

* feat(spot): improved defer for GetTask method

* feat(spot): keep only failed tasks

* feat(spot): removing temp dir with spot files

* feat(spot): added several workers for transcoding module

* feat(spot): fixed spot path for temp video files

* feat(spot): use custom statusWriter to track response code in middleware

* feat(spot): added body and parameter parser for auditrail feature

* feat(spot): fixed IsAuth method signature

* feat(spot): fixed ee service builder

* feat(spot): added import

* feat(spot): fix data type for payload and parameters jsonb fields

* feat(spot): typo fix

* feat(spot): moved out consts

* feat(spot): new table's name

* feat(spot): added missing imports in go.mod

* feat(spot): added a check for the number of comments (20 by default)
This commit is contained in:
Alexander 2024-08-29 16:08:33 +02:00 committed by GitHub
parent 7c52b83a63
commit 345f316b27
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
47 changed files with 3365 additions and 15 deletions

View file

@ -103,7 +103,10 @@ ENV TZ=UTC \
COMPRESSION_THRESHOLD="20000" \
# Set Access-Control-* headers for tracker requests if true
USE_CORS=false \
RECORD_CANVAS=true
RECORD_CANVAS=true \
JWT_SECRET="SECRET" \
JWT_SPOT_SECRET="SECRET" \
BUCKET_NAME="spots"
RUN if [ "$SERVICE_NAME" = "http" ]; then \
@ -113,7 +116,7 @@ RUN if [ "$SERVICE_NAME" = "http" ]; then \
apk add --no-cache zstd; \
elif [ "$SERVICE_NAME" = "canvas-handler" ]; then \
apk add --no-cache zstd; \
elif [ "$SERVICE_NAME" = "canvas-maker" ]; then \
elif [ "$SERVICE_NAME" = "spot" ]; then \
apk add --no-cache ffmpeg; \
fi

60
backend/cmd/spot/main.go Normal file
View file

@ -0,0 +1,60 @@
package main
import (
"context"
"openreplay/backend/pkg/spot"
"openreplay/backend/pkg/spot/api"
"os"
"os/signal"
"syscall"
spotConfig "openreplay/backend/internal/config/spot"
"openreplay/backend/internal/http/server"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/metrics"
databaseMetrics "openreplay/backend/pkg/metrics/database"
spotMetrics "openreplay/backend/pkg/metrics/spot"
)
func main() {
ctx := context.Background()
log := logger.New()
cfg := spotConfig.New(log)
metrics.New(log, append(spotMetrics.List(), databaseMetrics.List()...))
pgConn, err := pool.New(cfg.Postgres.String())
if err != nil {
log.Fatal(ctx, "can't init postgres connection: %s", err)
}
defer pgConn.Close()
services, err := spot.NewServiceBuilder(log, cfg, pgConn)
if err != nil {
log.Fatal(ctx, "can't init services: %s", err)
}
router, err := api.NewRouter(cfg, log, services)
if err != nil {
log.Fatal(ctx, "failed while creating router: %s", err)
}
spotServer, err := server.New(router.GetHandler(), cfg.HTTPHost, cfg.HTTPPort, cfg.HTTPTimeout)
if err != nil {
log.Fatal(ctx, "failed while creating server: %s", err)
}
go func() {
if err := spotServer.Start(); err != nil {
log.Fatal(ctx, "http server error: %s", err)
}
}()
log.Info(ctx, "server successfully started on port %s", cfg.HTTPPort)
// Wait stop signal to shut down server gracefully
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
<-sigchan
log.Info(ctx, "shutting down the server")
spotServer.Stop()
}

View file

@ -17,6 +17,7 @@ require (
github.com/elastic/go-elasticsearch/v7 v7.13.1
github.com/elastic/go-elasticsearch/v8 v8.13.0
github.com/go-redis/redis v6.15.9+incompatible
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.1
github.com/jackc/pgconn v1.14.3
@ -29,6 +30,7 @@ require (
github.com/oschwald/maxminddb-golang v1.7.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.16.0
github.com/rs/xid v1.2.1
github.com/sethvargo/go-envconfig v0.7.0
github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce
github.com/ua-parser/uap-go v0.0.0-20200325213135-e1c09f13e2fe

View file

@ -199,6 +199,8 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69
github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c=
github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg=
github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk=
github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
@ -460,6 +462,7 @@ github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJ
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU=
github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc=

View file

@ -0,0 +1,37 @@
package spot
import (
"time"
"openreplay/backend/internal/config/common"
"openreplay/backend/internal/config/configurator"
"openreplay/backend/internal/config/objectstorage"
"openreplay/backend/internal/config/redis"
"openreplay/backend/pkg/env"
"openreplay/backend/pkg/logger"
)
type Config struct {
common.Config
common.Postgres
redis.Redis
objectstorage.ObjectsConfig
FSDir string `env:"FS_DIR,required"`
SpotsDir string `env:"SPOTS_DIR,default=spots"`
HTTPHost string `env:"HTTP_HOST,default="`
HTTPPort string `env:"HTTP_PORT,required"`
HTTPTimeout time.Duration `env:"HTTP_TIMEOUT,default=60s"`
JsonSizeLimit int64 `env:"JSON_SIZE_LIMIT,default=131072"` // 128KB
UseAccessControlHeaders bool `env:"USE_CORS,default=false"`
ProjectExpiration time.Duration `env:"PROJECT_EXPIRATION,default=10m"`
JWTSecret string `env:"JWT_SECRET,required"`
JWTSpotSecret string `env:"JWT_SPOT_SECRET,required"`
MinimumStreamDuration int `env:"MINIMUM_STREAM_DURATION,default=15000"` // 15s
WorkerID uint16
}
func New(log logger.Logger) *Config {
cfg := &Config{WorkerID: env.WorkerID()}
configurator.Process(log, cfg)
return cfg
}

View file

@ -18,7 +18,7 @@ type Pool interface {
QueryRow(sql string, args ...interface{}) pgx.Row
Exec(sql string, arguments ...interface{}) error
SendBatch(b *pgx.Batch) pgx.BatchResults
Begin() (*_Tx, error)
Begin() (*Tx, error)
Close()
}
@ -62,12 +62,12 @@ func (p *poolImpl) SendBatch(b *pgx.Batch) pgx.BatchResults {
return res
}
func (p *poolImpl) Begin() (*_Tx, error) {
func (p *poolImpl) Begin() (*Tx, error) {
start := time.Now()
tx, err := p.conn.Begin(context.Background())
database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), "begin", "")
database.IncreaseTotalRequests("begin", "")
return &_Tx{tx}, err
return &Tx{tx}, err
}
func (p *poolImpl) Close() {
@ -91,11 +91,11 @@ func New(url string) (Pool, error) {
// TX - start
type _Tx struct {
type Tx struct {
pgx.Tx
}
func (tx *_Tx) exec(sql string, args ...interface{}) error {
func (tx *Tx) TxExec(sql string, args ...interface{}) error {
start := time.Now()
_, err := tx.Exec(context.Background(), sql, args...)
method, table := methodName(sql)
@ -104,7 +104,16 @@ func (tx *_Tx) exec(sql string, args ...interface{}) error {
return err
}
func (tx *_Tx) rollback() error {
func (tx *Tx) TxQueryRow(sql string, args ...interface{}) pgx.Row {
start := time.Now()
res := tx.QueryRow(context.Background(), sql, args...)
method, table := methodName(sql)
database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), method, table)
database.IncreaseTotalRequests(method, table)
return res
}
func (tx *Tx) TxRollback() error {
start := time.Now()
err := tx.Rollback(context.Background())
database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), "rollback", "")
@ -112,7 +121,7 @@ func (tx *_Tx) rollback() error {
return err
}
func (tx *_Tx) commit() error {
func (tx *Tx) TxCommit() error {
start := time.Now()
err := tx.Commit(context.Background())
database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), "commit", "")

View file

@ -9,3 +9,7 @@ var DefaultSizeBuckets = []float64{1, 10, 50, 100, 250, 500, 1000, 2500, 5000, 1
// DefaultBuckets is a set of buckets from 1 to 1_000_000 elements
var DefaultBuckets = []float64{1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10_000, 50_000, 100_000, 1_000_000}
// VideoSizeBuckets is a set of buckets from 1_000 bytes (~1 Kb) to 500_000_000 bytes (~500 Mb)
var VideoSizeBuckets = []float64{1_000, 10_000, 100_000, 1_000_000, 5_000_000, 10_000_000, 20_000_000, 30_000_000,
40_000_000, 50_000_000, 75_000_000, 100_000_000, 250_000_000, 500_000_000}

View file

@ -0,0 +1,194 @@
package spot
import (
"strconv"
"github.com/prometheus/client_golang/prometheus"
"openreplay/backend/pkg/metrics/common"
)
var spotRequestSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "spot",
Name: "request_size_bytes",
Help: "A histogram displaying the size of each HTTP request in bytes.",
Buckets: common.DefaultSizeBuckets,
},
[]string{"url", "response_code"},
)
func RecordRequestSize(size float64, url string, code int) {
spotRequestSize.WithLabelValues(url, strconv.Itoa(code)).Observe(size)
}
var spotRequestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "spot",
Name: "request_duration_seconds",
Help: "A histogram displaying the duration of each HTTP request in seconds.",
Buckets: common.DefaultDurationBuckets,
},
[]string{"url", "response_code"},
)
func RecordRequestDuration(durMillis float64, url string, code int) {
spotRequestDuration.WithLabelValues(url, strconv.Itoa(code)).Observe(durMillis / 1000.0)
}
var spotTotalRequests = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "spot",
Name: "requests_total",
Help: "A counter displaying the number all HTTP requests.",
},
)
func IncreaseTotalRequests() {
spotTotalRequests.Inc()
}
var spotOriginalVideoSize = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "spot",
Name: "original_video_size_bytes",
Help: "A histogram displaying the size of each original video in bytes.",
Buckets: common.VideoSizeBuckets,
},
)
func RecordOriginalVideoSize(size float64) {
spotOriginalVideoSize.Observe(size)
}
var spotCroppedVideoSize = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "spot",
Name: "cropped_video_size_bytes",
Help: "A histogram displaying the size of each cropped video in bytes.",
Buckets: common.VideoSizeBuckets,
},
)
func RecordCroppedVideoSize(size float64) {
spotCroppedVideoSize.Observe(size)
}
var spotVideosTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "spot",
Name: "videos_total",
Help: "A counter displaying the total number of all processed videos.",
},
)
func IncreaseVideosTotal() {
spotVideosTotal.Inc()
}
var spotVideosCropped = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "spot",
Name: "videos_cropped_total",
Help: "A counter displaying the total number of all cropped videos.",
},
)
func IncreaseVideosCropped() {
spotVideosCropped.Inc()
}
var spotVideosTranscoded = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "spot",
Name: "videos_transcoded_total",
Help: "A counter displaying the total number of all transcoded videos.",
},
)
func IncreaseVideosTranscoded() {
spotVideosTranscoded.Inc()
}
var spotOriginalVideoDownloadDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "spot",
Name: "original_video_download_duration_seconds",
Help: "A histogram displaying the duration of downloading each original video in seconds.",
Buckets: common.DefaultDurationBuckets,
},
)
func RecordOriginalVideoDownloadDuration(durMillis float64) {
spotOriginalVideoDownloadDuration.Observe(durMillis / 1000.0)
}
var spotCroppingDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "spot",
Name: "cropping_duration_seconds",
Help: "A histogram displaying the duration of cropping each video in seconds.",
Buckets: common.DefaultDurationBuckets,
},
)
func RecordCroppingDuration(durMillis float64) {
spotCroppingDuration.Observe(durMillis / 1000.0)
}
var spotCroppedVideoUploadDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "spot",
Name: "cropped_video_upload_duration_seconds",
Help: "A histogram displaying the duration of uploading each cropped video in seconds.",
Buckets: common.DefaultDurationBuckets,
},
)
func RecordCroppedVideoUploadDuration(durMillis float64) {
spotCroppedVideoUploadDuration.Observe(durMillis / 1000.0)
}
var spotTranscodingDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "spot",
Name: "transcoding_duration_seconds",
Help: "A histogram displaying the duration of transcoding each video in seconds.",
Buckets: common.DefaultDurationBuckets,
},
)
func RecordTranscodingDuration(durMillis float64) {
spotTranscodingDuration.Observe(durMillis / 1000.0)
}
var spotTranscodedVideoUploadDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "spot",
Name: "transcoded_video_upload_duration_seconds",
Help: "A histogram displaying the duration of uploading each transcoded video in seconds.",
Buckets: common.DefaultDurationBuckets,
},
)
func RecordTranscodedVideoUploadDuration(durMillis float64) {
spotTranscodedVideoUploadDuration.Observe(durMillis / 1000.0)
}
func List() []prometheus.Collector {
return []prometheus.Collector{
spotRequestSize,
spotRequestDuration,
spotTotalRequests,
spotOriginalVideoSize,
spotCroppedVideoSize,
spotVideosTotal,
spotVideosCropped,
spotVideosTranscoded,
spotOriginalVideoDownloadDuration,
spotCroppingDuration,
spotCroppedVideoUploadDuration,
spotTranscodingDuration,
spotTranscodedVideoUploadDuration,
}
}

View file

@ -20,4 +20,5 @@ type ObjectStorage interface {
Exists(key string) bool
GetCreationTime(key string) *time.Time
GetPreSignedUploadUrl(key string) (string, error)
GetPreSignedDownloadUrl(key string) (string, error)
}

View file

@ -206,3 +206,15 @@ func (s *storageImpl) GetPreSignedUploadUrl(key string) (string, error) {
}
return urlStr, nil
}
func (s *storageImpl) GetPreSignedDownloadUrl(key string) (string, error) {
req, _ := s.svc.GetObjectRequest(&s3.GetObjectInput{
Bucket: aws.String(*s.bucket),
Key: aws.String(key),
})
urlStr, err := req.Presign(15 * time.Minute)
if err != nil {
return "", err
}
return urlStr, nil
}

View file

@ -7,11 +7,11 @@ type task struct {
toStop bool
}
func NewTask(payload interface{}) *task {
func newTask(payload interface{}) *task {
return &task{Payload: payload}
}
func NewStopSignal() *task {
func newStopSignal() *task {
return &task{toStop: true}
}
@ -28,7 +28,7 @@ type WorkerPool interface {
Stop()
}
func NewPool(numberOfWorkers, queueSize int, handler func(payload interface{})) *workerPoolImpl {
func NewPool(numberOfWorkers, queueSize int, handler func(payload interface{})) WorkerPool {
pool := &workerPoolImpl{
wg: &sync.WaitGroup{},
tasks: make(chan *task, queueSize),
@ -47,12 +47,12 @@ func (p *workerPoolImpl) runWorkers() {
}
func (p *workerPoolImpl) Submit(payload interface{}) {
p.tasks <- NewTask(payload)
p.tasks <- newTask(payload)
}
func (p *workerPoolImpl) stop() {
for i := 0; i < p.numberOfWorkers; i++ {
p.tasks <- NewStopSignal()
p.tasks <- newStopSignal()
}
p.wg.Wait()
}

View file

@ -0,0 +1,605 @@
package api
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/gorilla/mux"
metrics "openreplay/backend/pkg/metrics/spot"
"openreplay/backend/pkg/objectstorage"
"openreplay/backend/pkg/spot/auth"
"openreplay/backend/pkg/spot/service"
)
func (e *Router) createSpot(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
bodyBytes, err := e.readBody(w, r, e.cfg.JsonSizeLimit)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusRequestEntityTooLarge, err, startTime, r.URL.Path, bodySize)
return
}
bodySize = len(bodyBytes)
req := &CreateSpotRequest{}
if err := json.Unmarshal(bodyBytes, req); err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
// Creat a spot
currUser := r.Context().Value("userData").(*auth.User)
newSpot, err := e.services.Spots.Add(currUser, req.Name, req.Comment, req.Duration, req.Crop)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
// Parse and upload preview image
previewImage, err := getSpotPreview(req.Preview)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
previewName := fmt.Sprintf("%d/preview.jpeg", newSpot.ID)
if err = e.services.ObjStorage.Upload(bytes.NewReader(previewImage), previewName, "image/jpeg", objectstorage.NoCompression); err != nil {
e.log.Error(r.Context(), "can't upload preview image: %s", err)
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, errors.New("can't upload preview image"), startTime, r.URL.Path, bodySize)
return
}
mobURL, err := e.getUploadMobURL(newSpot.ID)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
videoURL, err := e.getUploadVideoURL(newSpot.ID)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
resp := &CreateSpotResponse{
ID: strconv.Itoa(int(newSpot.ID)),
MobURL: mobURL,
VideoURL: videoURL,
}
e.ResponseWithJSON(r.Context(), w, resp, startTime, r.URL.Path, bodySize)
}
func getSpotPreview(preview string) ([]byte, error) {
parts := strings.Split(preview, ",")
if len(parts) != 2 {
return nil, fmt.Errorf("invalid preview format")
}
base64Str := parts[1]
data, err := base64.StdEncoding.DecodeString(base64Str)
if err != nil {
return nil, fmt.Errorf("can't decode base64 preview: %s", err)
}
return data, nil
}
func (e *Router) getUploadMobURL(spotID uint64) (string, error) {
mobKey := fmt.Sprintf("%d/events.mob", spotID)
mobURL, err := e.services.ObjStorage.GetPreSignedUploadUrl(mobKey)
if err != nil {
return "", fmt.Errorf("can't get mob URL: %s", err)
}
return mobURL, nil
}
func (e *Router) getUploadVideoURL(spotID uint64) (string, error) {
mobKey := fmt.Sprintf("%d/video.webm", spotID)
mobURL, err := e.services.ObjStorage.GetPreSignedUploadUrl(mobKey)
if err != nil {
return "", fmt.Errorf("can't get video URL: %s", err)
}
return mobURL, nil
}
func getSpotID(r *http.Request) (uint64, error) {
vars := mux.Vars(r)
idStr := vars["id"]
if idStr == "" {
return 0, fmt.Errorf("empty spot id")
}
id, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
return 0, fmt.Errorf("invalid spot id")
}
if id <= 0 {
return 0, fmt.Errorf("invalid spot id")
}
return id, nil
}
func getSpotsRequest(r *http.Request) (*GetSpotsRequest, error) {
params := r.URL.Query()
page := params.Get("page")
limit := params.Get("limit")
pageNum, _ := strconv.ParseUint(page, 10, 64)
limitNum, _ := strconv.ParseUint(limit, 10, 64)
req := &GetSpotsRequest{
Query: params.Get("query"),
FilterBy: params.Get("filterBy"),
Order: params.Get("order"),
Page: pageNum,
Limit: limitNum,
}
return req, nil
}
func (e *Router) getPreviewURL(spotID uint64) (string, error) {
previewKey := fmt.Sprintf("%d/preview.jpeg", spotID)
previewURL, err := e.services.ObjStorage.GetPreSignedDownloadUrl(previewKey)
if err != nil {
return "", fmt.Errorf("can't get preview URL: %s", err)
}
return previewURL, nil
}
func (e *Router) getMobURL(spotID uint64) (string, error) {
mobKey := fmt.Sprintf("%d/events.mob", spotID)
mobURL, err := e.services.ObjStorage.GetPreSignedDownloadUrl(mobKey)
if err != nil {
return "", fmt.Errorf("can't get mob URL: %s", err)
}
return mobURL, nil
}
func (e *Router) getVideoURL(spotID uint64) (string, error) {
mobKey := fmt.Sprintf("%d/video.webm", spotID) // TODO: later return url to m3u8 file
mobURL, err := e.services.ObjStorage.GetPreSignedDownloadUrl(mobKey)
if err != nil {
return "", fmt.Errorf("can't get video URL: %s", err)
}
return mobURL, nil
}
func (e *Router) getSpot(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
id, err := getSpotID(r)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
user := r.Context().Value("userData").(*auth.User)
res, err := e.services.Spots.GetByID(user, id)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
if res == nil {
e.ResponseWithError(r.Context(), w, http.StatusNotFound, fmt.Errorf("spot not found"), startTime, r.URL.Path, bodySize)
return
}
previewUrl, err := e.getPreviewURL(id)
if err != nil {
e.log.Error(r.Context(), "can't get preview URL: %s", err)
}
mobURL, err := e.getMobURL(id)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
videoURL, err := e.getVideoURL(id)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
spotInfo := &Info{
Name: res.Name,
UserEmail: res.UserEmail,
Duration: res.Duration,
Comments: res.Comments,
CreatedAt: res.CreatedAt,
PreviewURL: previewUrl,
MobURL: mobURL,
VideoURL: videoURL,
}
playlist, err := e.services.Transcoder.GetSpotStreamPlaylist(id)
if err != nil {
e.log.Warn(r.Context(), "can't get stream playlist: %s", err)
} else {
spotInfo.StreamFile = base64.StdEncoding.EncodeToString(playlist)
}
e.ResponseWithJSON(r.Context(), w, &GetSpotResponse{Spot: spotInfo}, startTime, r.URL.Path, bodySize)
}
func (e *Router) updateSpot(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
id, err := getSpotID(r)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
bodyBytes, err := e.readBody(w, r, e.cfg.JsonSizeLimit)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusRequestEntityTooLarge, err, startTime, r.URL.Path, bodySize)
return
}
bodySize = len(bodyBytes)
req := &UpdateSpotRequest{}
if err := json.Unmarshal(bodyBytes, req); err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
user := r.Context().Value("userData").(*auth.User)
_, err = e.services.Spots.UpdateName(user, id, req.Name)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
e.ResponseOK(r.Context(), w, startTime, r.URL.Path, bodySize)
}
func (e *Router) getSpots(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
req, err := getSpotsRequest(r)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
user := r.Context().Value("userData").(*auth.User)
opts := &service.GetOpts{
NameFilter: req.Query, Order: req.Order, Page: req.Page, Limit: req.Limit}
switch req.FilterBy {
case "own":
opts.UserID = user.ID
default:
opts.TenantID = user.TenantID
}
spots, total, err := e.services.Spots.Get(user, opts)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
res := make([]ShortInfo, 0, len(spots))
for _, spot := range spots {
previewUrl, err := e.getPreviewURL(spot.ID)
if err != nil {
e.log.Error(r.Context(), "can't get preview URL: %s", err)
}
res = append(res, ShortInfo{
ID: strconv.Itoa(int(spot.ID)),
Name: spot.Name,
UserEmail: spot.UserEmail,
Duration: spot.Duration,
CreatedAt: spot.CreatedAt,
PreviewURL: previewUrl,
})
}
e.ResponseWithJSON(r.Context(), w, &GetSpotsResponse{Spots: res, Total: total}, startTime, r.URL.Path, bodySize)
}
func (e *Router) deleteSpots(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
bodyBytes, err := e.readBody(w, r, e.cfg.JsonSizeLimit)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusRequestEntityTooLarge, err, startTime, r.URL.Path, bodySize)
return
}
bodySize = len(bodyBytes)
req := &DeleteSpotRequest{}
if err := json.Unmarshal(bodyBytes, req); err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
spotsToDelete := make([]uint64, 0, len(req.SpotIDs))
for _, idStr := range req.SpotIDs {
id, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, fmt.Errorf("invalid spot id: %s", idStr), startTime, r.URL.Path, bodySize)
return
}
spotsToDelete = append(spotsToDelete, id)
}
user := r.Context().Value("userData").(*auth.User)
if err := e.services.Spots.Delete(user, spotsToDelete); err != nil {
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
e.ResponseOK(r.Context(), w, startTime, r.URL.Path, bodySize)
}
func (e *Router) addComment(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
id, err := getSpotID(r)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
bodyBytes, err := e.readBody(w, r, e.cfg.JsonSizeLimit)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusRequestEntityTooLarge, err, startTime, r.URL.Path, bodySize)
return
}
bodySize = len(bodyBytes)
req := &AddCommentRequest{}
if err := json.Unmarshal(bodyBytes, req); err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
user := r.Context().Value("userData").(*auth.User)
updatedSpot, err := e.services.Spots.AddComment(user, id, &service.Comment{UserName: req.UserName, Text: req.Comment})
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
mobURL, err := e.getMobURL(id)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
videoURL, err := e.getVideoURL(id)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
spotInfo := &Info{
Name: updatedSpot.Name,
Duration: updatedSpot.Duration,
Comments: updatedSpot.Comments,
CreatedAt: updatedSpot.CreatedAt,
MobURL: mobURL,
VideoURL: videoURL,
}
e.ResponseWithJSON(r.Context(), w, &GetSpotResponse{Spot: spotInfo}, startTime, r.URL.Path, bodySize)
}
func (e *Router) uploadedSpot(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
id, err := getSpotID(r)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
user := r.Context().Value("userData").(*auth.User)
spot, err := e.services.Spots.GetByID(user, id) // check if spot exists
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
e.log.Info(r.Context(), "uploaded spot %+v, from user: %+v", spot, user)
if err := e.services.Transcoder.Process(spot); err != nil {
e.log.Error(r.Context(), "can't add transcoding task: %s", err)
}
e.ResponseOK(r.Context(), w, startTime, r.URL.Path, bodySize)
}
func (e *Router) getSpotVideo(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
id, err := getSpotID(r)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
key := fmt.Sprintf("%d/video.webm", id)
videoURL, err := e.services.ObjStorage.GetPreSignedDownloadUrl(key)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
resp := map[string]interface{}{
"url": videoURL,
}
e.ResponseWithJSON(r.Context(), w, resp, startTime, r.URL.Path, bodySize)
}
func (e *Router) getSpotStream(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
id, err := getSpotID(r)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
// Example data to serve as the file content
streamPlaylist, err := e.services.Transcoder.GetSpotStreamPlaylist(id)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
// Create a buffer with the file content
buffer := bytes.NewBuffer(streamPlaylist)
// Set the headers for the response
w.Header().Set("Content-Disposition", "attachment; filename=index.m3u8")
w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") //"application/octet-stream")
w.Header().Set("Content-Length", string(len(streamPlaylist)))
// Write the content of the buffer to the response writer
if _, err := buffer.WriteTo(w); err != nil {
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
}
func (e *Router) getPublicKey(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
id, err := getSpotID(r)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
user := r.Context().Value("userData").(*auth.User)
key, err := e.services.Keys.Get(id, user)
if err != nil {
if strings.Contains(err.Error(), "not found") {
e.ResponseWithError(r.Context(), w, http.StatusNotFound, err, startTime, r.URL.Path, bodySize)
} else {
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
}
return
}
resp := map[string]interface{}{
"key": key,
}
e.ResponseWithJSON(r.Context(), w, resp, startTime, r.URL.Path, bodySize)
}
func (e *Router) updatePublicKey(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
id, err := getSpotID(r)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
bodyBytes, err := e.readBody(w, r, e.cfg.JsonSizeLimit)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusRequestEntityTooLarge, err, startTime, r.URL.Path, bodySize)
return
}
bodySize = len(bodyBytes)
req := &UpdateSpotPublicKeyRequest{}
if err := json.Unmarshal(bodyBytes, req); err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
user := r.Context().Value("userData").(*auth.User)
key, err := e.services.Keys.Set(id, req.Expiration, user)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
resp := map[string]interface{}{
"key": key,
}
e.ResponseWithJSON(r.Context(), w, resp, startTime, r.URL.Path, bodySize)
}
func (e *Router) spotStatus(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
id, err := getSpotID(r)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
user := r.Context().Value("userData").(*auth.User)
status, err := e.services.Spots.GetStatus(user, id)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
resp := map[string]interface{}{
"status": status,
}
e.ResponseWithJSON(r.Context(), w, resp, startTime, r.URL.Path, bodySize)
}
func recordMetrics(requestStart time.Time, url string, code, bodySize int) {
if bodySize > 0 {
metrics.RecordRequestSize(float64(bodySize), url, code)
}
metrics.IncreaseTotalRequests()
metrics.RecordRequestDuration(float64(time.Now().Sub(requestStart).Milliseconds()), url, code)
}
func (e *Router) readBody(w http.ResponseWriter, r *http.Request, limit int64) ([]byte, error) {
body := http.MaxBytesReader(w, r.Body, limit)
bodyBytes, err := io.ReadAll(body)
// Close body
if closeErr := body.Close(); closeErr != nil {
e.log.Warn(r.Context(), "error while closing request body: %s", closeErr)
}
if err != nil {
return nil, err
}
return bodyBytes, nil
}
func (e *Router) ResponseOK(ctx context.Context, w http.ResponseWriter, requestStart time.Time, url string, bodySize int) {
w.WriteHeader(http.StatusOK)
e.log.Info(ctx, "response ok")
recordMetrics(requestStart, url, http.StatusOK, bodySize)
}
func (e *Router) ResponseWithJSON(ctx context.Context, w http.ResponseWriter, res interface{}, requestStart time.Time, url string, bodySize int) {
e.log.Info(ctx, "response ok")
body, err := json.Marshal(res)
if err != nil {
e.log.Error(ctx, "can't marshal response: %s", err)
}
w.Header().Set("Content-Type", "application/json")
w.Write(body)
recordMetrics(requestStart, url, http.StatusOK, bodySize)
}
type response struct {
Error string `json:"error"`
}
func (e *Router) ResponseWithError(ctx context.Context, w http.ResponseWriter, code int, err error, requestStart time.Time, url string, bodySize int) {
e.log.Error(ctx, "response error, code: %d, error: %s", code, err)
body, err := json.Marshal(&response{err.Error()})
if err != nil {
e.log.Error(ctx, "can't marshal response: %s", err)
}
w.WriteHeader(code)
w.Write(body)
recordMetrics(requestStart, url, code, bodySize)
}

View file

@ -0,0 +1,88 @@
package api
import (
"sync"
"time"
)
type RateLimiter struct {
rate int
burst int
tokens int
lastToken time.Time
lastUsed time.Time
mu sync.Mutex
}
func NewRateLimiter(rate int, burst int) *RateLimiter {
return &RateLimiter{
rate: rate,
burst: burst,
tokens: burst,
lastToken: time.Now(),
lastUsed: time.Now(),
}
}
func (rl *RateLimiter) Allow() bool {
rl.mu.Lock()
defer rl.mu.Unlock()
now := time.Now()
elapsed := now.Sub(rl.lastToken)
rl.tokens += int(elapsed.Seconds()) * rl.rate
if rl.tokens > rl.burst {
rl.tokens = rl.burst
}
rl.lastToken = now
rl.lastUsed = now
if rl.tokens > 0 {
rl.tokens--
return true
}
return false
}
type UserRateLimiter struct {
rateLimiters sync.Map
rate int
burst int
cleanupInterval time.Duration
maxIdleTime time.Duration
}
func NewUserRateLimiter(rate int, burst int, cleanupInterval time.Duration, maxIdleTime time.Duration) *UserRateLimiter {
url := &UserRateLimiter{
rate: rate,
burst: burst,
cleanupInterval: cleanupInterval,
maxIdleTime: maxIdleTime,
}
go url.cleanup()
return url
}
func (url *UserRateLimiter) GetRateLimiter(user uint64) *RateLimiter {
value, _ := url.rateLimiters.LoadOrStore(user, NewRateLimiter(url.rate, url.burst))
return value.(*RateLimiter)
}
func (url *UserRateLimiter) cleanup() {
for {
time.Sleep(url.cleanupInterval)
now := time.Now()
url.rateLimiters.Range(func(key, value interface{}) bool {
rl := value.(*RateLimiter)
rl.mu.Lock()
if now.Sub(rl.lastUsed) > url.maxIdleTime {
url.rateLimiters.Delete(key)
}
rl.mu.Unlock()
return true
})
}
}

View file

@ -0,0 +1,75 @@
package api
import (
"openreplay/backend/pkg/spot/service"
"time"
)
type CreateSpotRequest struct {
Name string `json:"name"`
Comment string `json:"comment"`
Duration int `json:"duration"`
Crop []int `json:"crop"`
Preview string `json:"preview"`
}
type CreateSpotResponse struct {
ID string `json:"id"`
MobURL string `json:"mobURL"`
VideoURL string `json:"videoURL"`
}
type Info struct {
Name string `json:"name"`
UserEmail string `json:"userEmail"`
Duration int `json:"duration"`
Comments []service.Comment `json:"comments"`
CreatedAt time.Time `json:"createdAt"`
MobURL string `json:"mobURL"`
PreviewURL string `json:"previewURL"`
VideoURL string `json:"videoURL"`
StreamFile string `json:"streamFile"`
}
type GetSpotResponse struct {
Spot *Info `json:"spot"`
}
type GetSpotsRequest struct {
Query string `json:"query"` // for search by name (optional)
FilterBy string `json:"filterBy"` // "own", "all", "shared"
Order string `json:"order"`
Page uint64 `json:"page"`
Limit uint64 `json:"limit"`
}
type ShortInfo struct {
ID string `json:"id"`
Name string `json:"name"`
UserEmail string `json:"userEmail"`
Duration int `json:"duration"`
CreatedAt time.Time `json:"createdAt"`
PreviewURL string `json:"previewURL"`
}
type GetSpotsResponse struct {
Spots []ShortInfo `json:"spots"`
Total uint64 `json:"total"`
}
type UpdateSpotRequest struct {
Name string `json:"name"`
}
type AddCommentRequest struct {
UserName string `json:"userName"`
Comment string `json:"comment"`
}
type DeleteSpotRequest struct {
SpotIDs []string `json:"spotIDs"`
}
type UpdateSpotPublicKeyRequest struct {
Expiration uint64 `json:"expiration"` // in seconds
}

View file

@ -0,0 +1,5 @@
package api
func getPermissions(urlPath string) []string {
return nil
}

View file

@ -0,0 +1,205 @@
package api
import (
"bytes"
"fmt"
"io"
"net/http"
"openreplay/backend/pkg/spot"
"openreplay/backend/pkg/spot/auth"
"sync"
"time"
"github.com/docker/distribution/context"
"github.com/gorilla/mux"
spotConfig "openreplay/backend/internal/config/spot"
"openreplay/backend/internal/http/util"
"openreplay/backend/pkg/logger"
)
type Router struct {
log logger.Logger
cfg *spotConfig.Config
router *mux.Router
mutex *sync.RWMutex
services *spot.ServicesBuilder
limiter *UserRateLimiter
}
func NewRouter(cfg *spotConfig.Config, log logger.Logger, services *spot.ServicesBuilder) (*Router, error) {
switch {
case cfg == nil:
return nil, fmt.Errorf("config is empty")
case services == nil:
return nil, fmt.Errorf("services is empty")
case log == nil:
return nil, fmt.Errorf("logger is empty")
}
e := &Router{
log: log,
cfg: cfg,
mutex: &sync.RWMutex{},
services: services,
limiter: NewUserRateLimiter(10, 30, 1*time.Minute, 5*time.Minute),
}
e.init()
return e, nil
}
func (e *Router) init() {
e.router = mux.NewRouter()
// Root route
e.router.HandleFunc("/", e.root)
// Spot routes
e.router.HandleFunc("/v1/spots", e.createSpot).Methods("POST", "OPTIONS")
e.router.HandleFunc("/v1/spots/{id}", e.getSpot).Methods("GET", "OPTIONS")
e.router.HandleFunc("/v1/spots/{id}", e.updateSpot).Methods("PATCH", "OPTIONS")
e.router.HandleFunc("/v1/spots", e.getSpots).Methods("GET", "OPTIONS")
e.router.HandleFunc("/v1/spots", e.deleteSpots).Methods("DELETE", "OPTIONS")
e.router.HandleFunc("/v1/spots/{id}/comment", e.addComment).Methods("POST", "OPTIONS")
e.router.HandleFunc("/v1/spots/{id}/uploaded", e.uploadedSpot).Methods("POST", "OPTIONS")
e.router.HandleFunc("/v1/spots/{id}/video", e.getSpotVideo).Methods("GET", "OPTIONS")
e.router.HandleFunc("/v1/spots/{id}/public-key", e.getPublicKey).Methods("GET", "OPTIONS")
e.router.HandleFunc("/v1/spots/{id}/public-key", e.updatePublicKey).Methods("PATCH", "OPTIONS")
e.router.HandleFunc("/v1/spots/{id}/status", e.spotStatus).Methods("GET", "OPTIONS")
e.router.HandleFunc("/v1/ping", e.ping).Methods("GET", "OPTIONS")
// CORS middleware
e.router.Use(e.corsMiddleware)
e.router.Use(e.authMiddleware)
e.router.Use(e.rateLimitMiddleware)
e.router.Use(e.actionMiddleware)
}
func (e *Router) root(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
func (e *Router) ping(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
func (e *Router) corsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if e.cfg.UseAccessControlHeaders {
// Prepare headers for preflight requests
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST,GET,PATCH,DELETE")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type,Authorization,Content-Encoding")
}
if r.Method == http.MethodOptions {
w.Header().Set("Cache-Control", "max-age=86400")
w.WriteHeader(http.StatusOK)
return
}
r = r.WithContext(context.WithValues(r.Context(), map[string]interface{}{"httpMethod": r.Method, "url": util.SafeString(r.URL.Path)}))
next.ServeHTTP(w, r)
})
}
func (e *Router) authMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/" {
next.ServeHTTP(w, r)
}
isExtension := false
pathTemplate, err := mux.CurrentRoute(r).GetPathTemplate()
if err != nil {
e.log.Error(r.Context(), "failed to get path template: %s", err)
} else {
if pathTemplate == "/v1/ping" ||
(pathTemplate == "/v1/spots" && r.Method == "POST") ||
(pathTemplate == "/v1/spots/{id}/uploaded" && r.Method == "POST") {
isExtension = true
}
}
// Check if the request is authorized
user, err := e.services.Auth.IsAuthorized(r.Header.Get("Authorization"), getPermissions(r.URL.Path), isExtension)
if err != nil {
e.log.Warn(r.Context(), "Unauthorized request: %s", err)
if !isSpotWithKeyRequest(r) {
w.WriteHeader(http.StatusUnauthorized)
return
}
user, err = e.services.Keys.IsValid(r.URL.Query().Get("key"))
if err != nil {
e.log.Warn(r.Context(), "Wrong public key: %s", err)
w.WriteHeader(http.StatusUnauthorized)
return
}
}
r = r.WithContext(context.WithValues(r.Context(), map[string]interface{}{"userData": user}))
next.ServeHTTP(w, r)
})
}
func isSpotWithKeyRequest(r *http.Request) bool {
pathTemplate, err := mux.CurrentRoute(r).GetPathTemplate()
if err != nil {
return false
}
getSpotPrefix := "/v1/spots/{id}" // GET
addCommentPrefix := "/v1/spots/{id}/comment" // POST
if (pathTemplate == getSpotPrefix && r.Method == "GET") || (pathTemplate == addCommentPrefix && r.Method == "POST") {
return true
}
return false
}
func (e *Router) rateLimitMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
user := r.Context().Value("userData").(*auth.User)
rl := e.limiter.GetRateLimiter(user.ID)
if !rl.Allow() {
http.Error(w, "Too Many Requests", http.StatusTooManyRequests)
return
}
next.ServeHTTP(w, r)
})
}
type statusWriter struct {
http.ResponseWriter
statusCode int
}
func (w *statusWriter) WriteHeader(statusCode int) {
w.statusCode = statusCode
w.ResponseWriter.WriteHeader(statusCode)
}
func (w *statusWriter) Write(b []byte) (int, error) {
if w.statusCode == 0 {
w.statusCode = http.StatusOK // Default status code is 200
}
return w.ResponseWriter.Write(b)
}
func (e *Router) actionMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Read body and restore the io.ReadCloser to its original state
bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "can't read body", http.StatusBadRequest)
return
}
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
// Use custom response writer to get the status code
sw := &statusWriter{ResponseWriter: w}
// Serve the request
next.ServeHTTP(sw, r)
e.logRequest(r, bodyBytes, sw.statusCode)
})
}
func (e *Router) GetHandler() http.Handler {
return e.router
}

View file

@ -0,0 +1,7 @@
package api
import (
"net/http"
)
func (e *Router) logRequest(r *http.Request, bodyBytes []byte, statusCode int) {}

View file

@ -0,0 +1,53 @@
package auth
import (
"fmt"
"strings"
"github.com/golang-jwt/jwt/v5"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/logger"
)
type Auth interface {
IsAuthorized(authHeader string, permissions []string, isExtension bool) (*User, error)
}
type authImpl struct {
log logger.Logger
secret string
spotSecret string
pgconn pool.Pool
}
func NewAuth(log logger.Logger, jwtSecret, jwtSpotSecret string, conn pool.Pool) Auth {
return &authImpl{
log: log,
secret: jwtSecret,
spotSecret: jwtSpotSecret,
pgconn: conn,
}
}
func parseJWT(authHeader, secret string) (*JWTClaims, error) {
if authHeader == "" {
return nil, fmt.Errorf("authorization header missing")
}
tokenParts := strings.Split(authHeader, "Bearer ")
if len(tokenParts) != 2 {
return nil, fmt.Errorf("invalid authorization header")
}
tokenString := tokenParts[1]
claims := &JWTClaims{}
token, err := jwt.ParseWithClaims(tokenString, claims,
func(token *jwt.Token) (interface{}, error) {
return []byte(secret), nil
})
if err != nil || !token.Valid {
fmt.Printf("token err: %v\n", err)
return nil, fmt.Errorf("invalid token")
}
return claims, nil
}

View file

@ -0,0 +1,13 @@
package auth
func (a *authImpl) IsAuthorized(authHeader string, permissions []string, isExtension bool) (*User, error) {
secret := a.secret
if isExtension {
secret = a.spotSecret
}
jwtInfo, err := parseJWT(authHeader, secret)
if err != nil {
return nil, err
}
return authUser(a.pgconn, jwtInfo.UserId, jwtInfo.TenantID, int(jwtInfo.IssuedAt.Unix()))
}

View file

@ -0,0 +1,34 @@
package auth
import "github.com/golang-jwt/jwt/v5"
type JWTClaims struct {
UserId int `json:"userId"`
TenantID int `json:"tenantId"`
jwt.RegisteredClaims
}
type User struct {
ID uint64 `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
TenantID uint64 `json:"tenantId"`
JwtIat int `json:"jwtIat"`
Permissions map[string]bool `json:"permissions"`
AuthMethod string
}
func (u *User) HasPermission(perm string) bool {
if u.Permissions == nil {
return true // no permissions
}
_, ok := u.Permissions[perm]
return ok
}
func abs(x int) int {
if x < 0 {
return -x
}
return x
}

View file

@ -0,0 +1,23 @@
package auth
import (
"fmt"
"openreplay/backend/pkg/db/postgres/pool"
)
func authUser(conn pool.Pool, userID, tenantID, jwtIAT int) (*User, error) {
sql := `
SELECT user_id, name, email, EXTRACT(epoch FROM spot_jwt_iat)::BIGINT AS spot_jwt_iat
FROM public.users
WHERE user_id = $1 AND deleted_at IS NULL
LIMIT 1;`
user := &User{TenantID: 1, AuthMethod: "jwt"}
if err := conn.QueryRow(sql, userID).Scan(&user.ID, &user.Name, &user.Email, &user.JwtIat); err != nil {
return nil, fmt.Errorf("user not found")
}
if user.JwtIat == 0 || abs(jwtIAT-user.JwtIat) > 1 {
return nil, fmt.Errorf("token expired")
}
return user, nil
}

View file

@ -0,0 +1,39 @@
package spot
import (
"openreplay/backend/internal/config/spot"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/flakeid"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/objectstorage"
"openreplay/backend/pkg/objectstorage/store"
"openreplay/backend/pkg/spot/auth"
"openreplay/backend/pkg/spot/service"
"openreplay/backend/pkg/spot/transcoder"
)
type ServicesBuilder struct {
Flaker *flakeid.Flaker
ObjStorage objectstorage.ObjectStorage
Auth auth.Auth
Spots service.Spots
Keys service.Keys
Transcoder transcoder.Transcoder
}
func NewServiceBuilder(log logger.Logger, cfg *spot.Config, pgconn pool.Pool) (*ServicesBuilder, error) {
objStore, err := store.NewStore(&cfg.ObjectsConfig)
if err != nil {
return nil, err
}
flaker := flakeid.NewFlaker(cfg.WorkerID)
spots := service.NewSpots(log, pgconn, flaker)
return &ServicesBuilder{
Flaker: flaker,
ObjStorage: objStore,
Auth: auth.NewAuth(log, cfg.JWTSecret, cfg.JWTSpotSecret, pgconn),
Spots: spots,
Keys: service.NewKeys(log, pgconn),
Transcoder: transcoder.NewTranscoder(cfg, log, objStore, pgconn, spots),
}, nil
}

View file

@ -0,0 +1,146 @@
package service
import (
"context"
"fmt"
"github.com/rs/xid"
"openreplay/backend/pkg/spot/auth"
"time"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/logger"
)
type Key struct {
SpotID uint64 `json:"-"`
UserID uint64 `json:"-"` // to track who generated the key
TenantID uint64 `json:"-"` // to check availability
Value string `json:"value"`
Expiration uint64 `json:"expiration"` // in seconds
ExpiredAt time.Time `json:"-"`
}
type Keys interface {
Set(spotID, expiration uint64, user *auth.User) (*Key, error)
Get(spotID uint64, user *auth.User) (*Key, error)
IsValid(key string) (*auth.User, error)
}
type keysImpl struct {
log logger.Logger
conn pool.Pool
}
func (k *keysImpl) Set(spotID, expiration uint64, user *auth.User) (*Key, error) {
switch {
case spotID == 0:
return nil, fmt.Errorf("spotID is required")
case expiration > 604800:
return nil, fmt.Errorf("expiration should be less than 7 days")
case user == nil:
return nil, fmt.Errorf("user is required")
}
now := time.Now()
if expiration == 0 {
sql := `UPDATE spots_keys SET expired_at = $1, expiration = 0 WHERE spot_id = $2`
if err := k.conn.Exec(sql, now, spotID); err != nil {
k.log.Error(context.Background(), "failed to set key: %v", err)
return nil, fmt.Errorf("key not updated")
}
return nil, nil
}
newKey := xid.New().String()
expiredAt := now.Add(time.Duration(expiration) * time.Second)
sql := `
WITH updated AS (
UPDATE spots_keys
SET
spot_key = CASE
WHEN expired_at < $1 THEN $2
ELSE spot_key
END,
user_id = $3,
expiration = $4,
expired_at = $5,
updated_at = $1
WHERE spot_id = $6
RETURNING spot_key, expiration, expired_at
),
inserted AS (
INSERT INTO spots_keys (spot_key, spot_id, user_id, tenant_id, expiration, created_at, expired_at)
SELECT $2, $6, $3, $7, $4, $1, $5
WHERE NOT EXISTS (SELECT 1 FROM updated)
RETURNING spot_key, expiration, expired_at
)
SELECT spot_key, expiration, expired_at FROM updated
UNION ALL
SELECT spot_key, expiration, expired_at FROM inserted;
`
key := &Key{}
if err := k.conn.QueryRow(sql, now, newKey, user.ID, expiration, expiredAt, spotID, user.TenantID).
Scan(&key.Value, &key.Expiration, &key.ExpiredAt); err != nil {
k.log.Error(context.Background(), "failed to set key: %v", err)
return nil, fmt.Errorf("key not updated")
}
return key, nil
}
func (k *keysImpl) Get(spotID uint64, user *auth.User) (*Key, error) {
switch {
case spotID == 0:
return nil, fmt.Errorf("spotID is required")
case user == nil:
return nil, fmt.Errorf("user is required")
}
//
key := &Key{}
sql := `SELECT spot_key, expiration, expired_at FROM spots_keys WHERE spot_id = $1 AND tenant_id = $2`
if err := k.conn.QueryRow(sql, spotID, user.TenantID).Scan(&key.Value, &key.Expiration, &key.ExpiredAt); err != nil {
k.log.Error(context.Background(), "failed to get key: %v", err)
return nil, fmt.Errorf("key not found")
}
now := time.Now()
if key.ExpiredAt.Before(now) {
return nil, fmt.Errorf("key is expired")
}
key.Expiration = uint64(key.ExpiredAt.Sub(now).Seconds())
return key, nil
}
func (k *keysImpl) IsValid(key string) (*auth.User, error) {
if key == "" {
return nil, fmt.Errorf("key is required")
}
var (
userID uint64
expiredAt time.Time
)
// Get userID if key is valid
sql := `SELECT user_id, expired_at FROM spots_keys WHERE spot_key = $1`
if err := k.conn.QueryRow(sql, key).Scan(&userID, &expiredAt); err != nil {
k.log.Error(context.Background(), "failed to get key: %v", err)
return nil, fmt.Errorf("key not found")
}
now := time.Now()
if expiredAt.Before(now) {
return nil, fmt.Errorf("key is expired")
}
// Get user info by userID
user := &auth.User{ID: userID, AuthMethod: "public-key"}
// We don't need tenantID here
sql = `SELECT 1, name, email FROM public.users WHERE user_id = $1 AND deleted_at IS NULL LIMIT 1`
if err := k.conn.QueryRow(sql, userID).Scan(&user.TenantID, &user.Name, &user.Email); err != nil {
k.log.Error(context.Background(), "failed to get user: %v", err)
return nil, fmt.Errorf("user not found")
}
return user, nil
}
func NewKeys(log logger.Logger, conn pool.Pool) Keys {
return &keysImpl{
log: log,
conn: conn,
}
}

View file

@ -0,0 +1,366 @@
package service
import (
"context"
"encoding/json"
"fmt"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/flakeid"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/spot/auth"
"time"
)
const MaxCommentLength = 120
const MaxNumberOfComments = 20
type Spot struct {
ID uint64 `json:"id"`
Name string `json:"name"`
UserID uint64 `json:"userID"`
UserEmail string `json:"userEmail"`
TenantID uint64 `json:"tenantID"`
Duration int `json:"duration"`
Crop []int `json:"crop"`
Comments []Comment `json:"comments"`
CreatedAt time.Time `json:"createdAt"`
}
type Comment struct {
UserName string `json:"user"`
Text string `json:"text"`
CreatedAt time.Time `json:"createdAt"`
}
type GetOpts struct {
SpotID uint64 // grab particular spot by ID
UserID uint64 // for filtering by user
TenantID uint64 // for filtering by all users in tenant
NameFilter string // for filtering by name (substring)
Order string // sorting ("asc" or "desc")
Limit uint64 // pagination (limit for page)
Offset uint64 // pagination (offset for page)
Page uint64
}
type spotsImpl struct {
log logger.Logger
pgconn pool.Pool
flaker *flakeid.Flaker
}
type Update struct {
ID uint64 `json:"id"`
NewName string `json:"newName"`
NewComment *Comment `json:"newComment"`
}
type Spots interface {
Add(user *auth.User, name, comment string, duration int, crop []int) (*Spot, error)
GetByID(user *auth.User, spotID uint64) (*Spot, error)
Get(user *auth.User, opts *GetOpts) ([]*Spot, uint64, error)
UpdateName(user *auth.User, spotID uint64, newName string) (*Spot, error)
AddComment(user *auth.User, spotID uint64, comment *Comment) (*Spot, error)
Delete(user *auth.User, spotIds []uint64) error
SetStatus(spotID uint64, status string) error
GetStatus(user *auth.User, spotID uint64) (string, error)
}
func NewSpots(log logger.Logger, pgconn pool.Pool, flaker *flakeid.Flaker) Spots {
return &spotsImpl{
log: log,
pgconn: pgconn,
flaker: flaker,
}
}
func (s *spotsImpl) Add(user *auth.User, name, comment string, duration int, crop []int) (*Spot, error) {
switch {
case user == nil:
return nil, fmt.Errorf("user is required")
case name == "":
return nil, fmt.Errorf("name is required")
case duration <= 0:
return nil, fmt.Errorf("duration should be greater than 0")
}
createdAt := time.Now()
spotID, err := s.flaker.Compose(uint64(createdAt.UnixMilli()))
if err != nil {
return nil, err
}
newSpot := &Spot{
ID: spotID,
Name: name,
UserID: user.ID,
UserEmail: user.Email,
TenantID: user.TenantID,
Duration: duration,
Crop: crop,
CreatedAt: createdAt,
}
if comment != "" {
newSpot.Comments = append(newSpot.Comments, Comment{
UserName: user.Name,
Text: comment,
CreatedAt: createdAt,
})
}
if err = s.add(newSpot); err != nil {
return nil, err
}
return newSpot, nil
}
func (s *spotsImpl) encodeComment(comment *Comment) string {
encodedComment, err := json.Marshal(comment)
if err != nil {
s.log.Warn(context.Background(), "failed to encode comment: %v, err: %s", comment, err)
return ""
}
return string(encodedComment)
}
func (s *spotsImpl) add(spot *Spot) error {
sql := `INSERT INTO spots (spot_id, name, user_id, user_email, tenant_id, duration, crop, comments, status, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`
var comments []string
for _, comment := range spot.Comments {
if encodedComment := s.encodeComment(&comment); encodedComment != "" {
comments = append(comments, encodedComment)
}
}
err := s.pgconn.Exec(sql, spot.ID, spot.Name, spot.UserID, spot.UserEmail, spot.TenantID, spot.Duration, spot.Crop,
comments, "pending", spot.CreatedAt)
if err != nil {
return err
}
return nil
}
func (s *spotsImpl) GetByID(user *auth.User, spotID uint64) (*Spot, error) {
switch {
case user == nil:
return nil, fmt.Errorf("user is required")
case spotID == 0:
return nil, fmt.Errorf("spot id is required")
}
return s.getByID(spotID, user)
}
func (s *spotsImpl) getByID(spotID uint64, user *auth.User) (*Spot, error) {
sql := `SELECT name, user_email, duration, crop, comments, created_at FROM spots
WHERE spot_id = $1 AND tenant_id = $2 AND deleted_at IS NULL`
spot := &Spot{ID: spotID}
var comments []string
err := s.pgconn.QueryRow(sql, spotID, user.TenantID).Scan(&spot.Name, &spot.UserEmail, &spot.Duration, &spot.Crop,
&comments, &spot.CreatedAt)
if err != nil {
return nil, err
}
for _, comment := range comments {
var decodedComment Comment
if err = json.Unmarshal([]byte(comment), &decodedComment); err != nil {
s.log.Warn(context.Background(), "failed to decode comment: %s", err)
continue
}
spot.Comments = append(spot.Comments, decodedComment)
}
return spot, nil
}
func (s *spotsImpl) Get(user *auth.User, opts *GetOpts) ([]*Spot, uint64, error) {
switch {
case user == nil:
return nil, 0, fmt.Errorf("user is required")
case opts == nil:
return nil, 0, fmt.Errorf("get options are required")
case user.TenantID == 0: // Tenant ID is required even for public get functions
return nil, 0, fmt.Errorf("tenant id is required")
}
// Show the latest spots first by default
if opts.Order != "asc" && opts.Order != "desc" {
opts.Order = "desc"
}
if opts.Limit <= 0 || opts.Limit > 10 {
opts.Limit = 9
}
if opts.Page < 1 {
opts.Page = 1
}
opts.Offset = (opts.Page - 1) * opts.Limit
return s.getAll(user, opts)
}
func (s *spotsImpl) getAll(user *auth.User, opts *GetOpts) ([]*Spot, uint64, error) {
sql := `SELECT COUNT(1) OVER () AS total, spot_id, name, user_email, duration, created_at FROM spots
WHERE tenant_id = $1 AND deleted_at IS NULL`
args := []interface{}{user.TenantID}
if opts.UserID != 0 {
sql += ` AND user_id = ` + fmt.Sprintf("$%d", len(args)+1)
args = append(args, opts.UserID)
}
if opts.NameFilter != "" {
sql += ` AND name ILIKE ` + fmt.Sprintf("$%d", len(args)+1)
args = append(args, "%"+opts.NameFilter+"%")
}
if opts.Order != "" {
sql += ` ORDER BY created_at ` + opts.Order
}
if opts.Limit != 0 {
sql += ` LIMIT ` + fmt.Sprintf("$%d", len(args)+1)
args = append(args, opts.Limit)
}
if opts.Offset != 0 {
sql += ` OFFSET ` + fmt.Sprintf("$%d", len(args)+1)
args = append(args, opts.Offset)
}
//s.log.Info(context.Background(), "sql: %s, args: %v", sql, args)
rows, err := s.pgconn.Query(sql, args...)
if err != nil {
return nil, 0, err
}
defer rows.Close()
var total uint64
var spots []*Spot
for rows.Next() {
spot := &Spot{}
if err = rows.Scan(&total, &spot.ID, &spot.Name, &spot.UserEmail, &spot.Duration, &spot.CreatedAt); err != nil {
return nil, 0, err
}
spots = append(spots, spot)
}
return spots, total, nil
}
func (s *spotsImpl) UpdateName(user *auth.User, spotID uint64, newName string) (*Spot, error) {
switch {
case user == nil:
return nil, fmt.Errorf("user is required")
case spotID == 0:
return nil, fmt.Errorf("spot id is required")
case newName == "":
return nil, fmt.Errorf("new name is required")
}
return s.updateName(spotID, newName, user)
}
func (s *spotsImpl) updateName(spotID uint64, newName string, user *auth.User) (*Spot, error) {
sql := `WITH updated AS (
UPDATE spots SET name = $1, updated_at = $2
WHERE spot_id = $3 AND tenant_id = $4 AND deleted_at IS NULL RETURNING *)
SELECT COUNT(*) FROM updated`
updated := 0
if err := s.pgconn.QueryRow(sql, newName, time.Now(), spotID, user.TenantID).Scan(&updated); err != nil {
return nil, err
}
if updated == 0 {
return nil, fmt.Errorf("not allowed to update name")
}
return &Spot{ID: spotID, Name: newName}, nil
}
func (s *spotsImpl) AddComment(user *auth.User, spotID uint64, comment *Comment) (*Spot, error) {
switch {
case user == nil:
return nil, fmt.Errorf("user is required")
case spotID == 0:
return nil, fmt.Errorf("spot id is required")
case comment == nil:
return nil, fmt.Errorf("comment is required")
case comment.UserName == "":
return nil, fmt.Errorf("user name is required")
case comment.Text == "":
return nil, fmt.Errorf("comment text is required")
}
if len(comment.Text) > MaxCommentLength {
comment.Text = comment.Text[:MaxCommentLength]
}
comment.CreatedAt = time.Now()
return s.addComment(spotID, comment, user)
}
func (s *spotsImpl) addComment(spotID uint64, newComment *Comment, user *auth.User) (*Spot, error) {
sql := `WITH updated AS (
UPDATE spots
SET comments = array_append(comments, $1), updated_at = $2
WHERE spot_id = $3 AND tenant_id = $4 AND deleted_at IS NULL AND array_length(comments, 1) < $5
RETURNING *)
SELECT COUNT(*) FROM updated`
encodedComment := s.encodeComment(newComment)
if encodedComment == "" {
return nil, fmt.Errorf("failed to encode comment")
}
updated := 0
if err := s.pgconn.QueryRow(sql, encodedComment, time.Now(), spotID, user.TenantID, MaxNumberOfComments).Scan(&updated); err != nil {
return nil, err
}
if updated == 0 {
return nil, fmt.Errorf("not allowed to add comment")
}
return &Spot{ID: spotID}, nil
}
func (s *spotsImpl) Delete(user *auth.User, spotIds []uint64) error {
switch {
case user == nil:
return fmt.Errorf("user is required")
case len(spotIds) == 0:
return fmt.Errorf("spot ids are required")
}
return s.deleteSpots(spotIds, user)
}
func (s *spotsImpl) deleteSpots(spotIds []uint64, user *auth.User) error {
sql := `WITH updated AS (UPDATE spots SET deleted_at = NOW() WHERE tenant_id = $1 AND spot_id IN (`
args := []interface{}{user.TenantID}
for i, spotID := range spotIds {
sql += fmt.Sprintf("$%d,", i+2)
args = append(args, spotID)
}
sql = sql[:len(sql)-1] + `) RETURNING *) SELECT COUNT(*) FROM updated`
count := 0
if err := s.pgconn.QueryRow(sql, args...).Scan(&count); err != nil {
return err
}
if count == 0 {
return fmt.Errorf("not allowed to delete spots")
}
if count != len(spotIds) {
s.log.Warn(context.Background(), "deleted %d spots, but expected to delete %d", count, len(spotIds))
return fmt.Errorf("failed to delete all requested spots")
}
return nil
}
func (s *spotsImpl) SetStatus(spotID uint64, status string) error {
switch {
case spotID == 0:
return fmt.Errorf("spot id is required")
case status == "":
return fmt.Errorf("status is required")
}
sql := `UPDATE spots SET status = $1, updated_at = $2 WHERE spot_id = $3 AND deleted_at IS NULL`
if err := s.pgconn.Exec(sql, status, time.Now(), spotID); err != nil {
return err
}
return nil
}
func (s *spotsImpl) GetStatus(user *auth.User, spotID uint64) (string, error) {
switch {
case user == nil:
return "", fmt.Errorf("user is required")
case spotID == 0:
return "", fmt.Errorf("spot id is required")
}
sql := `SELECT status FROM spots WHERE spot_id = $1 AND tenant_id = $2 AND deleted_at IS NULL`
var status string
if err := s.pgconn.QueryRow(sql, spotID, user.TenantID).Scan(&status); err != nil {
return "", err
}
return status, nil
}

View file

@ -0,0 +1,106 @@
package transcoder
import (
"context"
"fmt"
"strings"
"time"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/objectstorage"
)
type Streams interface {
Add(spotID uint64, originalStream string) error
Get(spotID uint64) ([]byte, error)
}
type streamsImpl struct {
log logger.Logger
conn pool.Pool
storage objectstorage.ObjectStorage
}
func (s *streamsImpl) Add(spotID uint64, originalStream string) error {
lines := strings.Split(originalStream, "\n")
// Replace indexN.ts with pre-signed URLs
for i, line := range lines {
if strings.HasPrefix(line, "index") && strings.HasSuffix(line, ".ts") {
key := fmt.Sprintf("%d/%s", spotID, line)
presignedURL, err := s.storage.GetPreSignedDownloadUrl(key)
if err != nil {
fmt.Println("Error generating pre-signed URL:", err)
return err
}
lines[i] = presignedURL
}
}
modifiedContent := strings.Join(lines, "\n")
now := time.Now()
// Insert playlist to DB
sql := `INSERT INTO spots_streams (spot_id, original_playlist, modified_playlist, created_at, expired_at)
VALUES ($1, $2, $3, $4, $5) ON CONFLICT (spot_id) DO UPDATE SET original_playlist = $2, modified_playlist = $3,
created_at = $4, expired_at = $5`
if err := s.conn.Exec(sql, spotID, originalStream, modifiedContent, now, now.Add(10*time.Minute)); err != nil {
fmt.Println("Error inserting playlist to DB:", err)
return err
}
return nil
}
func (s *streamsImpl) Get(spotID uint64) ([]byte, error) {
// Get modified playlist from DB
sql := `
SELECT
CASE
WHEN expired_at > $2 THEN modified_playlist
ELSE original_playlist
END AS playlist,
CASE
WHEN expired_at > $2 THEN 'modified'
ELSE 'original'
END AS playlist_type
FROM spots_streams
WHERE spot_id = $1`
var playlist, flag string
if err := s.conn.QueryRow(sql, spotID, time.Now()).Scan(&playlist, &flag); err != nil {
s.log.Error(context.Background(), "Error getting spot stream playlist: %v", err)
return []byte(""), err
}
if flag == "modified" {
return []byte(playlist), nil
}
// Have to generate a new modified playlist with updated pre-signed URLs for chunks
lines := strings.Split(playlist, "\n")
for i, line := range lines {
if strings.HasPrefix(line, "index") && strings.HasSuffix(line, ".ts") {
key := fmt.Sprintf("%d/%s", spotID, line)
presignedURL, err := s.storage.GetPreSignedDownloadUrl(key)
if err != nil {
s.log.Error(context.Background(), "Error generating pre-signed URL: %v", err)
return []byte(""), err
}
lines[i] = presignedURL
}
}
modifiedPlaylist := strings.Join(lines, "\n")
// Save modified playlist to DB
sql = `UPDATE spots_streams SET modified_playlist = $1, expired_at = $2 WHERE spot_id = $3`
if err := s.conn.Exec(sql, modifiedPlaylist, time.Now().Add(10*time.Minute), spotID); err != nil {
s.log.Warn(context.Background(), "Error updating modified playlist: %v", err)
}
return []byte(modifiedPlaylist), nil
}
func NewStreams(log logger.Logger, conn pool.Pool, storage objectstorage.ObjectStorage) Streams {
return &streamsImpl{
log: log,
conn: conn,
storage: storage,
}
}

View file

@ -0,0 +1,100 @@
package transcoder
import (
"errors"
"time"
"github.com/jackc/pgx/v4"
"openreplay/backend/pkg/db/postgres/pool"
)
type Tasks interface {
Add(spotID uint64, crop []int, duration int) error
Get() (*Task, error)
Done(task *Task) error
Failed(task *Task, taskErr error) error
}
type tasksImpl struct {
conn pool.Pool
}
func NewTasks(conn pool.Pool) Tasks {
return &tasksImpl{conn: conn}
}
type Task struct {
SpotID uint64
Crop []int
Duration int
Status string
Path string
tx pool.Tx
}
func (t *Task) HasToTrim() bool {
return t.Crop != nil && len(t.Crop) == 2
}
func (t *Task) HasToTranscode() bool {
return t.Duration > 15000
}
func (t *tasksImpl) Add(spotID uint64, crop []int, duration int) error {
sql := `INSERT INTO spot_tasks (id, crop, duration, status, added_time) VALUES ($1, $2, $3, $4, $5)`
if err := t.conn.Exec(sql, spotID, crop, duration, "pending", time.Now()); err != nil {
return err
}
return nil
}
type NoTasksError struct{}
func (NoTasksError) Error() string {
return "no tasks"
}
func (t *tasksImpl) Get() (task *Task, err error) {
tx, err := t.conn.Begin()
if err != nil {
return nil, err
}
defer func() {
if err != nil {
tx.TxRollback()
}
}()
task = &Task{tx: pool.Tx{Tx: tx}}
sql := `SELECT id, crop, duration FROM spots_tasks WHERE status = 'pending' ORDER BY added_time FOR UPDATE SKIP LOCKED LIMIT 1`
err = tx.TxQueryRow(sql).Scan(&task.SpotID, &task.Crop, &task.Duration)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, NoTasksError{}
}
return nil, err
}
return task, nil
}
func (t *tasksImpl) Done(task *Task) error {
sql := `DELETE FROM spots_tasks WHERE id = $1`
err := task.tx.TxExec(sql, task.SpotID)
if err != nil {
task.tx.TxRollback()
return err
}
return task.tx.TxCommit()
}
func (t *tasksImpl) Failed(task *Task, taskErr error) error {
sql := `UPDATE spots_tasks SET status = 'failed', error = $2 WHERE id = $1`
err := task.tx.TxExec(sql, task.SpotID, taskErr.Error())
if err != nil {
task.tx.TxRollback()
return err
}
return task.tx.TxCommit()
}

View file

@ -0,0 +1,349 @@
package transcoder
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"strconv"
"strings"
"time"
"openreplay/backend/internal/config/spot"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/logger"
metrics "openreplay/backend/pkg/metrics/spot"
"openreplay/backend/pkg/objectstorage"
workers "openreplay/backend/pkg/pool"
"openreplay/backend/pkg/spot/service"
)
type Transcoder interface {
Process(spot *service.Spot) error
GetSpotStreamPlaylist(spotID uint64) ([]byte, error)
Close()
}
type transcoderImpl struct {
cfg *spot.Config
log logger.Logger
close chan interface{}
objStorage objectstorage.ObjectStorage
conn pool.Pool
tasks Tasks
streams Streams
spots service.Spots
prepareWorkers workers.WorkerPool
transcodeWorkers workers.WorkerPool
}
func NewTranscoder(cfg *spot.Config, log logger.Logger, objStorage objectstorage.ObjectStorage, conn pool.Pool, spots service.Spots) Transcoder {
tnsc := &transcoderImpl{
cfg: cfg,
log: log,
close: make(chan interface{}, 1),
objStorage: objStorage,
conn: conn,
tasks: NewTasks(conn),
streams: NewStreams(log, conn, objStorage),
spots: spots,
}
tnsc.prepareWorkers = workers.NewPool(2, 4, tnsc.prepare)
tnsc.transcodeWorkers = workers.NewPool(2, 4, tnsc.transcode)
go tnsc.mainLoop()
return tnsc
}
func (t *transcoderImpl) Process(spot *service.Spot) error {
if spot.Crop == nil && spot.Duration < t.cfg.MinimumStreamDuration {
// Skip this spot and set processed status
t.log.Info(context.Background(), "Spot video %+v is too short for transcoding and without crop values", spot)
if err := t.spots.SetStatus(spot.ID, "processed"); err != nil {
t.log.Error(context.Background(), "Error updating spot status: %v", err)
}
return nil
}
return t.tasks.Add(spot.ID, spot.Crop, spot.Duration)
}
func (t *transcoderImpl) mainLoop() {
for {
select {
case closeEvent := <-t.close:
t.log.Info(context.Background(), "Transcoder is closing: %v", closeEvent)
return
default:
task, err := t.tasks.Get()
if err != nil {
if errors.Is(err, NoTasksError{}) {
time.Sleep(1 * time.Second)
} else {
t.log.Error(context.Background(), "Error getting task: %v", err)
}
continue
}
t.process(task)
}
}
}
func (t *transcoderImpl) failedTask(task *Task, err error) {
t.log.Error(context.Background(), "Task failed: %v", err)
if err := t.tasks.Failed(task, err); err != nil {
t.log.Error(context.Background(), "Error marking task as failed: %v", err)
}
if err := os.RemoveAll(task.Path); err != nil {
t.log.Error(context.Background(), "Error removing directory: %v", err)
}
}
func (t *transcoderImpl) doneTask(task *Task) {
if err := t.tasks.Done(task); err != nil {
t.log.Error(context.Background(), "Error marking task as done: %v", err)
}
if err := os.RemoveAll(task.Path); err != nil {
t.log.Error(context.Background(), "Error removing directory: %v", err)
}
}
func (t *transcoderImpl) process(task *Task) {
metrics.IncreaseVideosTotal()
//spotID := task.SpotID
t.log.Info(context.Background(), "Processing spot %s", task.SpotID)
// Prepare path for spot video
path := t.cfg.FSDir + "/"
if t.cfg.SpotsDir != "" {
path += t.cfg.SpotsDir + "/"
}
task.Path = path + strconv.FormatUint(task.SpotID, 10) + "/"
t.prepareWorkers.Submit(task)
}
// Download original video, crop if needed (and upload cropped).
func (t *transcoderImpl) prepare(payload interface{}) {
task := payload.(*Task)
// Download video from S3
if err := t.downloadSpotVideo(task.SpotID, task.Path); err != nil {
t.failedTask(task, fmt.Errorf("can't download video, spot: %d, err: %s", task.SpotID, err.Error()))
return
}
if task.HasToTrim() {
if err := t.cropSpotVideo(task.SpotID, task.Crop, task.Path); err != nil {
t.failedTask(task, fmt.Errorf("can't crop video, spot: %d, err: %s", task.SpotID, err.Error()))
return
}
}
if !task.HasToTranscode() {
t.log.Info(context.Background(), "Spot video %d is too short for transcoding", task.SpotID)
t.doneTask(task)
} else {
t.transcodeWorkers.Submit(task)
}
}
// Transcode video, upload to S3, save playlist to DB, delete local files.
func (t *transcoderImpl) transcode(payload interface{}) {
task := payload.(*Task)
// Transcode spot video to HLS format
streamPlaylist, err := t.transcodeSpotVideo(task.SpotID, task.Path)
if err != nil {
t.failedTask(task, fmt.Errorf("can't transcode video, spot: %d, err: %s", task.SpotID, err.Error()))
return
}
// Save stream playlist to DB
if err := t.streams.Add(task.SpotID, streamPlaylist); err != nil {
t.failedTask(task, fmt.Errorf("can't insert playlist to DB, spot: %d, err: %s", task.SpotID, err.Error()))
return
}
if err := t.spots.SetStatus(task.SpotID, "processed"); err != nil {
t.log.Error(context.Background(), "Error updating spot status: %v", err)
}
t.doneTask(task)
t.log.Info(context.Background(), "Transcoded spot %d, have to upload chunks to S3", task.SpotID)
}
func (t *transcoderImpl) downloadSpotVideo(spotID uint64, path string) error {
start := time.Now()
// Ensure the directory exists
if err := os.MkdirAll(path, 0755); err != nil {
t.log.Fatal(context.Background(), "Error creating directories: %v", err)
}
video, err := t.objStorage.Get(fmt.Sprintf("%d/video.webm", spotID))
if err != nil {
return err
}
defer video.Close()
// Save file to disk
originVideo, err := os.Create(path + "origin.webm")
if err != nil {
return fmt.Errorf("can't create file: %s", err.Error())
}
if _, err := io.Copy(originVideo, video); err != nil {
return fmt.Errorf("can't copy file: %s", err.Error())
}
if fileInfo, err := originVideo.Stat(); err != nil {
t.log.Error(context.Background(), "Failed to get file info: %v", err)
} else {
metrics.RecordOriginalVideoSize(float64(fileInfo.Size()))
}
originVideo.Close()
metrics.RecordOriginalVideoDownloadDuration(time.Since(start).Seconds())
t.log.Info(context.Background(), "Saved origin video to disk, spot: %d in %v sec", spotID, time.Since(start).Seconds())
return nil
}
func (t *transcoderImpl) cropSpotVideo(spotID uint64, crop []int, path string) error {
// Crop video
// ffmpeg -i input.webm -ss 5 -to 20 -c copy output.webm
start := time.Now()
cmd := exec.Command("ffmpeg", "-i", path+"origin.webm",
"-ss", fmt.Sprintf("%.2f", float64(crop[0])/1000.0),
"-to", fmt.Sprintf("%.2f", float64(crop[1])/1000.0),
"-c", "copy", path+"cropped.mp4")
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err := cmd.Run()
if err != nil {
return fmt.Errorf("failed to execute command: %v, stderr: %v", err, stderr.String())
}
metrics.IncreaseVideosCropped()
metrics.RecordCroppingDuration(time.Since(start).Seconds())
t.log.Info(context.Background(), "Cropped spot %d in %v", spotID, time.Since(start).Seconds())
// mv cropped.webm origin.webm
err = os.Rename(path+"cropped.mp4", path+"origin.webm")
// upload cropped video back to s3
start = time.Now()
video, err := os.Open(path + "origin.webm")
if err != nil {
return fmt.Errorf("failed to open cropped video: %v", err)
}
defer video.Close()
if fileInfo, err := video.Stat(); err != nil {
t.log.Error(context.Background(), "Failed to get file info: %v", err)
} else {
metrics.RecordCroppedVideoSize(float64(fileInfo.Size()))
}
err = t.objStorage.Upload(video, fmt.Sprintf("%d/video.webm", spotID), "video/webm", objectstorage.NoCompression)
if err != nil {
return fmt.Errorf("failed to upload cropped video: %v", err)
}
metrics.RecordCroppedVideoUploadDuration(time.Since(start).Seconds())
t.log.Info(context.Background(), "Uploaded cropped spot %d in %v", spotID, time.Since(start).Seconds())
return nil
}
func (t *transcoderImpl) transcodeSpotVideo(spotID uint64, path string) (string, error) {
// Transcode video tp HLS format
// ffmpeg -i origin.webm -c:v copy -c:a aac -b:a 128k -start_number 0 -hls_time 10 -hls_list_size 0 -f hls index.m3u8
start := time.Now()
videoPath := path + "origin.webm"
playlistPath := path + "index.m3u8"
cmd := exec.Command("ffmpeg", "-i", videoPath, "-c:v", "copy", "-c:a", "aac", "-b:a", "96k",
"-start_number", "0", "-hls_time", "10", "-hls_list_size", "0", "-f", "hls", playlistPath)
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err := cmd.Run()
if err != nil {
t.log.Error(context.Background(), "Failed to execute command: %v, stderr: %v", err, stderr.String())
return "", err
}
metrics.IncreaseVideosTranscoded()
metrics.RecordTranscodingDuration(time.Since(start).Seconds())
t.log.Info(context.Background(), "Transcoded spot %d in %v", spotID, time.Since(start).Seconds())
start = time.Now()
// Read the M3U8 file
file, err := os.Open(playlistPath)
if err != nil {
fmt.Println("Error opening file:", err)
return "", err
}
defer file.Close()
var originalLines []string
var lines []string
var chunks []string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
lines = append(lines, line)
if strings.HasPrefix(line, "index") && strings.HasSuffix(line, ".ts") {
chunks = append(chunks, line)
}
originalLines = append(originalLines, line)
}
if err := scanner.Err(); err != nil {
fmt.Println("Error reading file:", err)
return "", err
}
// Insert stream chunks to s3
for _, chunk := range chunks {
chunkPath := path + chunk
chunkFile, err := os.Open(chunkPath)
if err != nil {
fmt.Println("Error opening file:", err)
return "", err
}
defer chunkFile.Close()
key := fmt.Sprintf("%d/%s", spotID, chunk)
err = t.objStorage.Upload(chunkFile, key, "video/mp2t", objectstorage.NoCompression)
if err != nil {
fmt.Println("Error uploading file:", err)
return "", err
}
}
metrics.RecordTranscodedVideoUploadDuration(time.Since(start).Seconds())
t.log.Info(context.Background(), "Uploaded chunks for spot %d in %v", spotID, time.Since(start).Seconds())
return strings.Join(lines, "\n"), nil
}
func (t *transcoderImpl) GetSpotStreamPlaylist(spotID uint64) ([]byte, error) {
return t.streams.Get(spotID)
}
func (t *transcoderImpl) Wait() {
t.prepareWorkers.Pause()
t.transcodeWorkers.Pause()
}
func (t *transcoderImpl) Close() {
t.close <- nil
t.prepareWorkers.Stop()
t.transcodeWorkers.Stop()
}

View file

@ -140,6 +140,10 @@ func (s *storageImpl) GetPreSignedUploadUrl(key string) (string, error) {
return sasURL, nil
}
func (s *storageImpl) GetPreSignedDownloadUrl(key string) (string, error) {
return "", errors.New("not implemented")
}
func loadFileTag() map[string]string {
// Load file tag from env
key := "retention"

View file

@ -0,0 +1,11 @@
package api
import "strings"
func getPermissions(urlPath string) []string {
res := []string{"SPOT"}
if strings.Contains(urlPath, "public-key") {
res = append(res, "SPOT_PUBLIC")
}
return res
}

View file

@ -0,0 +1,57 @@
package api
import (
"encoding/json"
"net/http"
"github.com/gorilla/mux"
"openreplay/backend/pkg/spot/auth"
"openreplay/backend/pkg/spot/service"
)
var routeMatch = map[string]string{
"POST" + "/v1/spots": "createSpot",
"GET" + "/v1/spots/{id}": "getSpot",
"PATCH" + "/v1/spots/{id}": "updateSpot",
"GET" + "/v1/spots": "getSpots",
"DELETE" + "/v1/spots": "deleteSpots",
"POST" + "/v1/spots/{id}/comment": "addComment",
"GET" + "/v1/spots/{id}/video": "getSpotVideo",
"PATCH" + "/v1/spots/{id}/public-key": "updatePublicKey",
}
func (e *Router) logRequest(r *http.Request, bodyBytes []byte, statusCode int) {
pathTemplate, err := mux.CurrentRoute(r).GetPathTemplate()
if err != nil {
e.log.Error(r.Context(), "failed to get path template: %s", err)
}
e.log.Info(r.Context(), "path template: %s", pathTemplate)
// Convert the parameters to json
query := r.URL.Query()
params := make(map[string]interface{})
for key, values := range query {
if len(values) > 1 {
params[key] = values
} else {
params[key] = values[0]
}
}
jsonData, err := json.Marshal(params)
if err != nil {
e.log.Error(r.Context(), "failed to marshal query parameters: %s", err)
}
requestData := &service.RequestData{
Action: routeMatch[r.Method+pathTemplate],
Method: r.Method,
PathFormat: pathTemplate,
Endpoint: r.URL.Path,
Payload: bodyBytes,
Parameters: jsonData,
Status: statusCode,
}
userData := r.Context().Value("userData").(*auth.User)
e.services.Tracer.Trace(userData, requestData)
// DEBUG
e.log.Info(r.Context(), "request data: %v", requestData)
}

View file

@ -0,0 +1,25 @@
package auth
import "fmt"
func (a *authImpl) IsAuthorized(authHeader string, permissions []string, isExtension bool) (*User, error) {
secret := a.secret
if isExtension {
secret = a.spotSecret
}
jwtInfo, err := parseJWT(authHeader, secret)
if err != nil {
return nil, err
}
user, err := authUser(a.pgconn, jwtInfo.UserId, jwtInfo.TenantID, int(jwtInfo.IssuedAt.Unix()))
if err != nil {
return nil, err
}
for _, perm := range permissions {
if !user.HasPermission(perm) {
return nil, fmt.Errorf("user has no permission")
}
}
return user, nil
}

View file

@ -0,0 +1,32 @@
package auth
import (
"fmt"
"openreplay/backend/pkg/db/postgres/pool"
)
func authUser(conn pool.Pool, userID, tenantID, jwtIAT int) (*User, error) {
sql := `SELECT user_id, users.tenant_id, users.name, email, EXTRACT(epoch FROM spot_jwt_iat)::BIGINT AS spot_jwt_iat, roles.permissions
FROM users
JOIN tenants on users.tenant_id = tenants.tenant_id
JOIN roles on users.role_id = roles.role_id
WHERE users.user_id = $1 AND users.tenant_id = $2 AND users.deleted_at IS NULL ;`
user := &User{}
var permissions []string
if err := conn.QueryRow(sql, userID, tenantID).
Scan(&user.ID, &user.TenantID, &user.Name, &user.Email, &user.JwtIat, &permissions); err != nil {
return nil, fmt.Errorf("user not found")
}
if user.JwtIat == 0 || abs(jwtIAT-user.JwtIat) > 1 {
return nil, fmt.Errorf("token expired")
}
user.Permissions = make(map[string]bool)
for _, perm := range permissions {
user.Permissions[perm] = true
}
if _, ok := user.Permissions["SPOT"]; !ok {
return nil, fmt.Errorf("user has no permissions")
}
return user, nil
}

View file

@ -0,0 +1,45 @@
package spot
import (
"openreplay/backend/internal/config/spot"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/flakeid"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/objectstorage"
"openreplay/backend/pkg/objectstorage/store"
"openreplay/backend/pkg/spot/auth"
"openreplay/backend/pkg/spot/service"
"openreplay/backend/pkg/spot/transcoder"
)
type ServicesBuilder struct {
Flaker *flakeid.Flaker
ObjStorage objectstorage.ObjectStorage
Auth auth.Auth
Spots service.Spots
Keys service.Keys
Transcoder transcoder.Transcoder
Tracer service.Tracer
}
func NewServiceBuilder(log logger.Logger, cfg *spot.Config, pgconn pool.Pool) (*ServicesBuilder, error) {
objStore, err := store.NewStore(&cfg.ObjectsConfig)
if err != nil {
return nil, err
}
flaker := flakeid.NewFlaker(cfg.WorkerID)
tracer, err := service.NewTracer(log, pgconn)
if err != nil {
return nil, err
}
spots := service.NewSpots(log, pgconn, flaker)
return &ServicesBuilder{
Flaker: flaker,
ObjStorage: objStore,
Auth: auth.NewAuth(log, cfg.JWTSecret, cfg.JWTSpotSecret, pgconn),
Spots: spots,
Keys: service.NewKeys(log, pgconn),
Transcoder: transcoder.NewTranscoder(cfg, log, objStore, pgconn, spots),
Tracer: tracer,
}, nil
}

View file

@ -0,0 +1,104 @@
package service
import (
"context"
"errors"
"openreplay/backend/pkg/db/postgres"
db "openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/pool"
"openreplay/backend/pkg/spot/auth"
)
type Tracer interface {
Trace(user *auth.User, data *RequestData) error
Close() error
}
type tracerImpl struct {
log logger.Logger
conn db.Pool
traces postgres.Bulk
saver pool.WorkerPool
}
func NewTracer(log logger.Logger, conn db.Pool) (Tracer, error) {
switch {
case log == nil:
return nil, errors.New("logger is required")
case conn == nil:
return nil, errors.New("connection is required")
}
tracer := &tracerImpl{
log: log,
conn: conn,
}
if err := tracer.initBulk(); err != nil {
return nil, err
}
tracer.saver = pool.NewPool(1, 200, tracer.sendTraces)
return tracer, nil
}
func (t *tracerImpl) initBulk() (err error) {
t.traces, err = postgres.NewBulk(t.conn,
"traces",
"(user_id, tenant_id, auth, action, method, path_format, endpoint, payload, parameters, status)",
"($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)",
10, 50)
if err != nil {
return err
}
return nil
}
type Task struct {
UserID *uint64
TenantID uint64
Auth *string
Data *RequestData
}
func (t *tracerImpl) sendTraces(payload interface{}) {
rec := payload.(*Task)
t.log.Info(context.Background(), "Sending traces, %v", rec)
if err := t.traces.Append(rec.UserID, rec.TenantID, rec.Auth, rec.Data.Action, rec.Data.Method, rec.Data.PathFormat,
rec.Data.Endpoint, rec.Data.Payload, rec.Data.Parameters, rec.Data.Status); err != nil {
t.log.Error(context.Background(), "can't append trace: %s", err)
}
}
type RequestData struct {
Action string
Method string
PathFormat string
Endpoint string
Payload []byte
Parameters []byte
Status int
}
func (t *tracerImpl) Trace(user *auth.User, data *RequestData) error {
switch {
case user == nil:
return errors.New("user is required")
case data == nil:
return errors.New("request is required")
}
trace := &Task{
UserID: &user.ID,
TenantID: user.TenantID,
Auth: &user.AuthMethod,
Data: data,
}
t.saver.Submit(trace)
return nil
}
func (t *tracerImpl) Close() error {
t.saver.Stop()
if err := t.traces.Send(); err != nil {
return err
}
return nil
}

View file

@ -57,7 +57,7 @@ spec:
name: minio
port:
number: 9000
path: /(minio|mobs|sessions-assets|frontend|static|sourcemaps|ios-images|records|uxtesting-records)/
path: /(minio|mobs|sessions-assets|frontend|static|sourcemaps|ios-images|records|uxtesting-records|spots)/
tls:
- hosts:
- {{ .Values.global.domainName }}

View file

@ -0,0 +1,23 @@
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*.orig
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/

View file

@ -0,0 +1,24 @@
apiVersion: v2
name: spot
description: A Helm chart for Kubernetes
# A chart can be either an 'application' or a 'library' chart.
#
# Application charts are a collection of templates that can be packaged into versioned archives
# to be deployed.
#
# Library charts provide useful utilities or functions for the chart developer. They're included as
# a dependency of application charts to inject those utilities and functions into the rendering
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.1
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
AppVersion: "v1.18.0"

View file

@ -0,0 +1,22 @@
1. Get the application URL by running these commands:
{{- if .Values.ingress.enabled }}
{{- range $host := .Values.ingress.hosts }}
{{- range .paths }}
http{{ if $.Values.ingress.tls }}s{{ end }}://{{ $host.host }}{{ .path }}
{{- end }}
{{- end }}
{{- else if contains "NodePort" .Values.service.type }}
export NODE_PORT=$(kubectl get --namespace {{ .Release.Namespace }} -o jsonpath="{.spec.ports[0].nodePort}" services {{ include "http.fullname" . }})
export NODE_IP=$(kubectl get nodes --namespace {{ .Release.Namespace }} -o jsonpath="{.items[0].status.addresses[0].address}")
echo http://$NODE_IP:$NODE_PORT
{{- else if contains "LoadBalancer" .Values.service.type }}
NOTE: It may take a few minutes for the LoadBalancer IP to be available.
You can watch the status of by running 'kubectl get --namespace {{ .Release.Namespace }} svc -w {{ include "spot.fullname" . }}'
export SERVICE_IP=$(kubectl get svc --namespace {{ .Release.Namespace }} {{ include "spot.fullname" . }} --template "{{"{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}"}}")
echo http://$SERVICE_IP:{{ .Values.service.port }}
{{- else if contains "ClusterIP" .Values.service.type }}
export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "spot.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}")
export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}")
echo "Visit http://127.0.0.1:8080 to use your application"
kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT
{{- end }}

View file

@ -0,0 +1,62 @@
{{/*
Expand the name of the chart.
*/}}
{{- define "spot.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Create a default fully qualified app name.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
If release name contains chart name it will be used as a full name.
*/}}
{{- define "spot.fullname" -}}
{{- if .Values.fullnameOverride }}
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- $name := default .Chart.Name .Values.nameOverride }}
{{- if contains $name .Release.Name }}
{{- .Release.Name | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }}
{{- end }}
{{- end }}
{{- end }}
{{/*
Create chart name and version as used by the chart label.
*/}}
{{- define "spot.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Common labels
*/}}
{{- define "spot.labels" -}}
helm.sh/chart: {{ include "spot.chart" . }}
{{ include "spot.selectorLabels" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- end }}
{{/*
Selector labels
*/}}
{{- define "spot.selectorLabels" -}}
app.kubernetes.io/name: {{ include "spot.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end }}
{{/*
Create the name of the service account to use
*/}}
{{- define "spot.serviceAccountName" -}}
{{- if .Values.serviceAccount.create }}
{{- default (include "spot.fullname" .) .Values.serviceAccount.name }}
{{- else }}
{{- default "default" .Values.serviceAccount.name }}
{{- end }}
{{- end }}

View file

@ -0,0 +1,140 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "spot.fullname" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "spot.labels" . | nindent 4 }}
spec:
{{- if not .Values.autoscaling.enabled }}
replicas: {{ .Values.replicaCount }}
{{- end }}
selector:
matchLabels:
{{- include "spot.selectorLabels" . | nindent 6 }}
template:
metadata:
{{- with .Values.podAnnotations }}
annotations:
{{- toYaml . | nindent 8 }}
{{- end }}
labels:
{{- include "spot.selectorLabels" . | nindent 8 }}
spec:
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
serviceAccountName: {{ include "spot.serviceAccountName" . }}
securityContext:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
shareProcessNamespace: true
containers:
- name: {{ .Chart.Name }}
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
{{- if .Values.global.enterpriseEditionLicense }}
image: "{{ tpl .Values.image.repository . }}:{{ .Values.image.tag | default .Chart.AppVersion }}-ee"
{{- else }}
image: "{{ tpl .Values.image.repository . }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
{{- end }}
imagePullPolicy: {{ .Values.image.pullPolicy }}
{{- if .Values.healthCheck}}
{{- .Values.healthCheck | toYaml | nindent 10}}
{{- end}}
env:
{{- range $key, $val := .Values.env }}
- name: {{ $key }}
value: '{{ $val }}'
{{- end}}
{{- range $key, $val := .Values.global.env }}
- name: {{ $key }}
value: '{{ $val }}'
{{- end }}
- name: AWS_ACCESS_KEY_ID
{{- if .Values.global.s3.existingSecret }}
valueFrom:
secretKeyRef:
name: {{ .Values.global.s3.existingSecret }}
key: access-key
{{- else }}
value: {{ .Values.global.s3.accessKey }}
{{- end }}
- name: AWS_SECRET_ACCESS_KEY
{{- if .Values.global.s3.existingSecret }}
valueFrom:
secretKeyRef:
name: {{ .Values.global.s3.existingSecret }}
key: secret-key
{{- else }}
value: {{ .Values.global.s3.secretKey }}
{{- end }}
- name: AWS_REGION
value: '{{ .Values.global.s3.region }}'
- name: AWS_ENDPOINT
value: '{{- include "openreplay.s3Endpoint" . }}'
- name: LICENSE_KEY
value: '{{ .Values.global.enterpriseEditionLicense }}'
- name: KAFKA_SERVERS
value: '{{ .Values.global.kafka.kafkaHost }}:{{ .Values.global.kafka.kafkaPort }}'
- name: KAFKA_USE_SSL
value: '{{ .Values.global.kafka.kafkaUseSsl }}'
- name: pg_password
{{- if .Values.global.postgresql.existingSecret }}
valueFrom:
secretKeyRef:
name: {{ .Values.global.postgresql.existingSecret }}
key: postgresql-postgres-password
{{- else }}
value: '{{ .Values.global.postgresql.postgresqlPassword }}'
{{- end}}
- name: POSTGRES_STRING
value: 'postgres://{{ .Values.global.postgresql.postgresqlUser }}:$(pg_password)@{{ .Values.global.postgresql.postgresqlHost }}:{{ .Values.global.postgresql.postgresqlPort }}/{{ .Values.global.postgresql.postgresqlDatabase }}'
{{- include "openreplay.env.redis_string" .Values.global.redis | nindent 12 }}
ports:
{{- range $key, $val := .Values.service.ports }}
- name: {{ $key }}
containerPort: {{ $val }}
protocol: TCP
{{- end }}
volumeMounts:
- name: datadir
mountPath: /mnt/efs
{{- include "openreplay.volume.redis_ca_certificate.mount" .Values.global.redis | nindent 12 }}
{{- with .Values.persistence.mounts }}
{{- toYaml . | nindent 12 }}
{{- end }}
resources:
{{- toYaml .Values.resources | nindent 12 }}
{{- if eq (tpl .Values.pvc.name . ) "hostPath" }}
volumes:
{{- with .Values.persistence.volumes }}
{{- toYaml . | nindent 6 }}
{{- end }}
- name: datadir
hostPath:
# Ensure the file directory is created.
path: {{ tpl .Values.pvc.hostMountPath . }}
type: DirectoryOrCreate
{{- else }}
volumes:
{{- with .Values.persistence.volumes }}
{{- toYaml . | nindent 8 }}
{{- end }}
- name: datadir
persistentVolumeClaim:
claimName: "{{ tpl .Values.pvc.name . }}"
{{- end }}
{{- include "openreplay.volume.redis_ca_certificate" .Values.global.redis | nindent 6 }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}

View file

@ -0,0 +1,29 @@
{{- if .Values.autoscaling.enabled }}
apiVersion: autoscaling/v2beta1
kind: HorizontalPodAutoscaler
metadata:
name: {{ include "spot.fullname" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "spot.labels" . | nindent 4 }}
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: {{ include "spot.fullname" . }}
minReplicas: {{ .Values.autoscaling.minReplicas }}
maxReplicas: {{ .Values.autoscaling.maxReplicas }}
metrics:
{{- if .Values.autoscaling.targetCPUUtilizationPercentage }}
- type: Resource
resource:
name: cpu
targetAverageUtilization: {{ .Values.autoscaling.targetCPUUtilizationPercentage }}
{{- end }}
{{- if .Values.autoscaling.targetMemoryUtilizationPercentage }}
- type: Resource
resource:
name: memory
targetAverageUtilization: {{ .Values.autoscaling.targetMemoryUtilizationPercentage }}
{{- end }}
{{- end }}

View file

@ -0,0 +1,36 @@
{{- if .Values.ingress.enabled }}
{{- $fullName := include "spot.fullname" . -}}
{{- $svcPort := .Values.service.ports.http -}}
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: {{ $fullName }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "spot.labels" . | nindent 4 }}
annotations:
{{- with .Values.ingress.annotations }}
{{- toYaml . | nindent 4 }}
{{- end }}
nginx.ingress.kubernetes.io/rewrite-target: /$1
nginx.ingress.kubernetes.io/upstream-hash-by: $http_x_forwarded_for
spec:
ingressClassName: "{{ tpl .Values.ingress.className . }}"
tls:
- hosts:
- {{ .Values.global.domainName }}
{{- if .Values.ingress.tls.secretName}}
secretName: {{ .Values.ingress.tls.secretName }}
{{- end}}
rules:
- host: {{ .Values.global.domainName }}
http:
paths:
- pathType: Prefix
backend:
service:
name: {{ $fullName }}
port:
number: {{ $svcPort }}
path: /spot/(.*)
{{- end }}

View file

@ -0,0 +1,18 @@
apiVersion: v1
kind: Service
metadata:
name: {{ include "spot.fullname" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "spot.labels" . | nindent 4 }}
spec:
type: {{ .Values.service.type }}
ports:
{{- range $key, $val := .Values.service.ports }}
- port: {{ $val }}
targetPort: {{ $key }}
protocol: TCP
name: {{ $key }}
{{- end}}
selector:
{{- include "spot.selectorLabels" . | nindent 4 }}

View file

@ -0,0 +1,18 @@
{{- if and ( .Capabilities.APIVersions.Has "monitoring.coreos.com/v1" ) ( .Values.serviceMonitor.enabled ) }}
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: {{ include "spot.fullname" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "spot.labels" . | nindent 4 }}
{{- if .Values.serviceMonitor.additionalLabels }}
{{- toYaml .Values.serviceMonitor.additionalLabels | nindent 4 }}
{{- end }}
spec:
endpoints:
{{- .Values.serviceMonitor.scrapeConfigs | toYaml | nindent 4 }}
selector:
matchLabels:
{{- include "spot.selectorLabels" . | nindent 6 }}
{{- end }}

View file

@ -0,0 +1,13 @@
{{- if .Values.serviceAccount.create -}}
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ include "spot.serviceAccountName" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "spot.labels" . | nindent 4 }}
{{- with .Values.serviceAccount.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
{{- end }}
{{- end }}

View file

@ -0,0 +1,15 @@
apiVersion: v1
kind: Pod
metadata:
name: "{{ include "spot.fullname" . }}-test-connection"
labels:
{{- include "spot.labels" . | nindent 4 }}
annotations:
"helm.sh/hook": test
spec:
containers:
- name: wget
image: busybox
command: ['wget']
args: ['{{ include "spot.fullname" . }}:{{ .Values.service.port }}']
restartPolicy: Never

View file

@ -0,0 +1,133 @@
# Default values for openreplay.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.
replicaCount: 1
image:
repository: "{{ .Values.global.openReplayContainerRegistry }}/spot"
pullPolicy: IfNotPresent
# Overrides the image tag whose default is the chart appVersion.
tag: ""
imagePullSecrets: []
nameOverride: "spot"
fullnameOverride: "spot-openreplay"
serviceAccount:
# Specifies whether a service account should be created
create: true
# Annotations to add to the service account
annotations: {}
# The name of the service account to use.
# If not set and create is true, a name is generated using the fullname template
name: ""
podAnnotations: {}
securityContext:
runAsUser: 1001
runAsGroup: 1001
podSecurityContext:
runAsUser: 1001
runAsGroup: 1001
fsGroup: 1001
fsGroupChangePolicy: "OnRootMismatch"
# podSecurityContext: {}
# fsGroup: 2000
# securityContext: {}
# capabilities:
# drop:
# - ALL
# readOnlyRootFilesystem: true
# runAsNonRoot: true
# runAsUser: 1000
service:
type: ClusterIP
ports:
http: 8080
metrics: 8888
serviceMonitor:
enabled: true
additionalLabels:
release: observability
scrapeConfigs:
- port: metrics
honorLabels: true
interval: 15s
path: /metrics
scheme: http
scrapeTimeout: 10s
ingress:
enabled: true
className: "{{ .Values.global.ingress.controller.ingressClassResource.name }}"
annotations:
cert-manager.io/cluster-issuer: "letsencrypt-prod"
nginx.ingress.kubernetes.io/proxy-connect-timeout: "120"
nginx.ingress.kubernetes.io/proxy-send-timeout: "300"
nginx.ingress.kubernetes.io/proxy-read-timeout: "300"
nginx.ingress.kubernetes.io/cors-allow-methods: POST,PATCH,DELETE
nginx.ingress.kubernetes.io/cors-allow-headers: Content-Type,Authorization,Content-Encoding,X-Openreplay-Batch
nginx.ingress.kubernetes.io/cors-allow-origin: '*'
nginx.ingress.kubernetes.io/enable-cors: "true"
nginx.ingress.kubernetes.io/cors-expose-headers: "Content-Length"
# kubernetes.io/ingress.class: nginx
# kubernetes.io/tls-acme: "true"
tls:
secretName: openreplay-ssl
resources: {}
# We usually recommend not to specify default resources and to leave this as a conscious
# choice for the user. This also increases chances charts run on environments with little
# resources, such as Minikube. If you do want to specify resources, uncomment the following
# lines, adjust them as necessary, and remove the curly braces after 'resources:'.
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
autoscaling:
enabled: false
minReplicas: 1
maxReplicas: 5
targetCPUUtilizationPercentage: 80
# targetMemoryUtilizationPercentage: 80
env:
TOKEN_SECRET: secret_token_string # TODO: generate on build
CACHE_ASSETS: true
FS_CLEAN_HRS: 24
pvc:
# This can be either persistentVolumeClaim or hostPath.
# In case of pvc, you'll have to provide the pvc name.
# For example
# name: openreplay-efs
name: "{{ .Values.global.pvcRWXName }}"
hostMountPath: "{{ .Values.global.orTmpDir }}"
nodeSelector: {}
tolerations: []
affinity: {}
persistence: {}
# # Spec of spec.template.spec.containers[*].volumeMounts
# mounts:
# - name: kafka-ssl
# mountPath: /opt/kafka/ssl
# # Spec of spec.template.spec.volumes
# volumes:
# - name: kafka-ssl
# secret:
# secretName: kafka-ssl