* feat(spot): added sql for spot feature * feat(spot): added sql code to all related files * feat(spot): added default value for created_at * feat(spot): added spots schema * feat(spot): applied a new spots schema to all sql requests
106 lines
3.1 KiB
Go
106 lines
3.1 KiB
Go
package transcoder
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"openreplay/backend/pkg/db/postgres/pool"
|
|
"openreplay/backend/pkg/logger"
|
|
"openreplay/backend/pkg/objectstorage"
|
|
)
|
|
|
|
type Streams interface {
|
|
Add(spotID uint64, originalStream string) error
|
|
Get(spotID uint64) ([]byte, error)
|
|
}
|
|
|
|
type streamsImpl struct {
|
|
log logger.Logger
|
|
conn pool.Pool
|
|
storage objectstorage.ObjectStorage
|
|
}
|
|
|
|
func (s *streamsImpl) Add(spotID uint64, originalStream string) error {
|
|
lines := strings.Split(originalStream, "\n")
|
|
|
|
// Replace indexN.ts with pre-signed URLs
|
|
for i, line := range lines {
|
|
if strings.HasPrefix(line, "index") && strings.HasSuffix(line, ".ts") {
|
|
key := fmt.Sprintf("%d/%s", spotID, line)
|
|
presignedURL, err := s.storage.GetPreSignedDownloadUrl(key)
|
|
if err != nil {
|
|
fmt.Println("Error generating pre-signed URL:", err)
|
|
return err
|
|
}
|
|
lines[i] = presignedURL
|
|
}
|
|
}
|
|
|
|
modifiedContent := strings.Join(lines, "\n")
|
|
now := time.Now()
|
|
// Insert playlist to DB
|
|
sql := `INSERT INTO spots.streams (spot_id, original_playlist, modified_playlist, created_at, expired_at)
|
|
VALUES ($1, $2, $3, $4, $5) ON CONFLICT (spot_id) DO UPDATE SET original_playlist = $2, modified_playlist = $3,
|
|
created_at = $4, expired_at = $5`
|
|
if err := s.conn.Exec(sql, spotID, originalStream, modifiedContent, now, now.Add(10*time.Minute)); err != nil {
|
|
fmt.Println("Error inserting playlist to DB:", err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *streamsImpl) Get(spotID uint64) ([]byte, error) {
|
|
// Get modified playlist from DB
|
|
sql := `
|
|
SELECT
|
|
CASE
|
|
WHEN expired_at > $2 THEN modified_playlist
|
|
ELSE original_playlist
|
|
END AS playlist,
|
|
CASE
|
|
WHEN expired_at > $2 THEN 'modified'
|
|
ELSE 'original'
|
|
END AS playlist_type
|
|
FROM spots.streams
|
|
WHERE spot_id = $1`
|
|
var playlist, flag string
|
|
if err := s.conn.QueryRow(sql, spotID, time.Now()).Scan(&playlist, &flag); err != nil {
|
|
s.log.Error(context.Background(), "Error getting spot stream playlist: %v", err)
|
|
return []byte(""), err
|
|
}
|
|
if flag == "modified" {
|
|
return []byte(playlist), nil
|
|
}
|
|
|
|
// Have to generate a new modified playlist with updated pre-signed URLs for chunks
|
|
lines := strings.Split(playlist, "\n")
|
|
for i, line := range lines {
|
|
if strings.HasPrefix(line, "index") && strings.HasSuffix(line, ".ts") {
|
|
key := fmt.Sprintf("%d/%s", spotID, line)
|
|
presignedURL, err := s.storage.GetPreSignedDownloadUrl(key)
|
|
if err != nil {
|
|
s.log.Error(context.Background(), "Error generating pre-signed URL: %v", err)
|
|
return []byte(""), err
|
|
}
|
|
lines[i] = presignedURL
|
|
}
|
|
}
|
|
modifiedPlaylist := strings.Join(lines, "\n")
|
|
|
|
// Save modified playlist to DB
|
|
sql = `UPDATE spots.streams SET modified_playlist = $1, expired_at = $2 WHERE spot_id = $3`
|
|
if err := s.conn.Exec(sql, modifiedPlaylist, time.Now().Add(10*time.Minute), spotID); err != nil {
|
|
s.log.Warn(context.Background(), "Error updating modified playlist: %v", err)
|
|
}
|
|
return []byte(modifiedPlaylist), nil
|
|
}
|
|
|
|
func NewStreams(log logger.Logger, conn pool.Pool, storage objectstorage.ObjectStorage) Streams {
|
|
return &streamsImpl{
|
|
log: log,
|
|
conn: conn,
|
|
storage: storage,
|
|
}
|
|
}
|