openreplay/backend/pkg/spot/transcoder/streams.go
Alexander 840f133f73
SQL for Spot feature (#2513)
* feat(spot): added sql for spot feature

* feat(spot): added sql code to all related files

* feat(spot): added default value for created_at

* feat(spot): added spots schema

* feat(spot): applied a new spots schema to all sql requests
2024-09-05 15:57:39 +02:00

106 lines
3.1 KiB
Go

package transcoder
import (
"context"
"fmt"
"strings"
"time"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/objectstorage"
)
type Streams interface {
Add(spotID uint64, originalStream string) error
Get(spotID uint64) ([]byte, error)
}
type streamsImpl struct {
log logger.Logger
conn pool.Pool
storage objectstorage.ObjectStorage
}
func (s *streamsImpl) Add(spotID uint64, originalStream string) error {
lines := strings.Split(originalStream, "\n")
// Replace indexN.ts with pre-signed URLs
for i, line := range lines {
if strings.HasPrefix(line, "index") && strings.HasSuffix(line, ".ts") {
key := fmt.Sprintf("%d/%s", spotID, line)
presignedURL, err := s.storage.GetPreSignedDownloadUrl(key)
if err != nil {
fmt.Println("Error generating pre-signed URL:", err)
return err
}
lines[i] = presignedURL
}
}
modifiedContent := strings.Join(lines, "\n")
now := time.Now()
// Insert playlist to DB
sql := `INSERT INTO spots.streams (spot_id, original_playlist, modified_playlist, created_at, expired_at)
VALUES ($1, $2, $3, $4, $5) ON CONFLICT (spot_id) DO UPDATE SET original_playlist = $2, modified_playlist = $3,
created_at = $4, expired_at = $5`
if err := s.conn.Exec(sql, spotID, originalStream, modifiedContent, now, now.Add(10*time.Minute)); err != nil {
fmt.Println("Error inserting playlist to DB:", err)
return err
}
return nil
}
func (s *streamsImpl) Get(spotID uint64) ([]byte, error) {
// Get modified playlist from DB
sql := `
SELECT
CASE
WHEN expired_at > $2 THEN modified_playlist
ELSE original_playlist
END AS playlist,
CASE
WHEN expired_at > $2 THEN 'modified'
ELSE 'original'
END AS playlist_type
FROM spots.streams
WHERE spot_id = $1`
var playlist, flag string
if err := s.conn.QueryRow(sql, spotID, time.Now()).Scan(&playlist, &flag); err != nil {
s.log.Error(context.Background(), "Error getting spot stream playlist: %v", err)
return []byte(""), err
}
if flag == "modified" {
return []byte(playlist), nil
}
// Have to generate a new modified playlist with updated pre-signed URLs for chunks
lines := strings.Split(playlist, "\n")
for i, line := range lines {
if strings.HasPrefix(line, "index") && strings.HasSuffix(line, ".ts") {
key := fmt.Sprintf("%d/%s", spotID, line)
presignedURL, err := s.storage.GetPreSignedDownloadUrl(key)
if err != nil {
s.log.Error(context.Background(), "Error generating pre-signed URL: %v", err)
return []byte(""), err
}
lines[i] = presignedURL
}
}
modifiedPlaylist := strings.Join(lines, "\n")
// Save modified playlist to DB
sql = `UPDATE spots.streams SET modified_playlist = $1, expired_at = $2 WHERE spot_id = $3`
if err := s.conn.Exec(sql, modifiedPlaylist, time.Now().Add(10*time.Minute), spotID); err != nil {
s.log.Warn(context.Background(), "Error updating modified playlist: %v", err)
}
return []byte(modifiedPlaylist), nil
}
func NewStreams(log logger.Logger, conn pool.Pool, storage objectstorage.ObjectStorage) Streams {
return &streamsImpl{
log: log,
conn: conn,
storage: storage,
}
}