Canvas refactoring (#1836)

* feat(backend): added new topic for canvasToVideo communication, moved some logic

* feat(backend): enabled canvas recording

* feat(backend): fixed canvas service main logic

* feat(backend): fixed sessionEnd detector

* feat(backend): send canvas mix lists instead of just a sessID

* feat(backend): enabled canvas recording

* feat(backend): removed old logs from video-storage

* feat(backend): default low setting for canvas recording
This commit is contained in:
Alexander 2024-01-17 14:39:12 +01:00 committed by GitHub
parent 5938fd95de
commit b4b6ceb025
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 223 additions and 118 deletions

View file

@ -72,6 +72,7 @@ ENV TZ=UTC \
TOPIC_TRIGGER=trigger \
TOPIC_MOBILE_TRIGGER=mobile-trigger \
TOPIC_CANVAS_IMAGES=canvas-images \
TOPIC_CANVAS_TRIGGER=canvas-trigger \
GROUP_SINK=sink \
GROUP_STORAGE=storage \
GROUP_DB=db \
@ -112,7 +113,8 @@ ENV TZ=UTC \
# Use to set compression threshold for tracker requests (20kb by default)
COMPRESSION_THRESHOLD="20000" \
# Set Access-Control-* headers for tracker requests if true
USE_CORS=false
USE_CORS=false \
RECORD_CANVAS=true
RUN if [ "$SERVICE_NAME" = "http" ]; then \

View file

@ -181,9 +181,13 @@ func main() {
}
} else {
if err := producer.Produce(cfg.TopicRawWeb, sessionID, msg.Encode()); err != nil {
log.Printf("can't send sessionEnd to topic: %s; sessID: %d", err, sessionID)
log.Printf("can't send sessionEnd to raw topic: %s; sessID: %d", err, sessionID)
return false, 0
}
// Inform canvas service about session end
if err := producer.Produce(cfg.TopicCanvasImages, sessionID, msg.Encode()); err != nil {
log.Printf("can't send sessionEnd signal to canvas topic: %s; sessID: %d", err, sessionID)
}
}
if currDuration != 0 {

View file

@ -1,6 +1,7 @@
package main
import (
"fmt"
"log"
"os"
"os/signal"
@ -29,19 +30,21 @@ func main() {
return
}
consumer := queue.NewConsumer(
cfg.GroupImageStorage,
[]string{
cfg.TopicRawImages,
},
messages.NewImagesMessageIterator(func(data []byte, sessID uint64) {
if err := srv.Process(sessID, data); err != nil {
log.Printf("can't process image: %s", err)
}
}, nil, true),
false,
cfg.MessageSizeLimit,
)
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
//consumer := queue.NewConsumer(
// cfg.GroupImageStorage,
// []string{
// cfg.TopicRawImages,
// },
// messages.NewImagesMessageIterator(func(data []byte, sessID uint64) {
// if err := srv.Process(sessID, data); err != nil {
// log.Printf("can't process image: %s", err)
// }
// }, nil, true),
// false,
// cfg.MessageSizeLimit,
//)
canvasConsumer := queue.NewConsumer(
cfg.GroupImageStorage,
@ -49,8 +52,39 @@ func main() {
cfg.TopicCanvasImages,
},
messages.NewImagesMessageIterator(func(data []byte, sessID uint64) {
if err := srv.ProcessCanvas(sessID, data); err != nil {
log.Printf("can't process canvas image: %s", err)
checkSessionEnd := func(data []byte) (messages.Message, error) {
reader := messages.NewBytesReader(data)
msgType, err := reader.ReadUint()
if err != nil {
return nil, err
}
if msgType != messages.MsgSessionEnd {
return nil, fmt.Errorf("not a session end message")
}
msg, err := messages.ReadMessage(msgType, reader)
if err != nil {
return nil, fmt.Errorf("read message err: %s", err)
}
return msg, nil
}
if msg, err := checkSessionEnd(data); err == nil {
sessEnd := msg.(*messages.SessionEnd)
// Received session end
if list, err := srv.PrepareCanvas(sessID); err != nil {
log.Printf("can't prepare canvas: %s", err)
} else {
for _, name := range list {
sessEnd.EncryptionKey = name
if err := producer.Produce(cfg.TopicCanvasTrigger, sessID, sessEnd.Encode()); err != nil {
log.Printf("can't send session end signal to video service: %s", err)
}
}
}
} else {
if err := srv.ProcessCanvas(sessID, data); err != nil {
log.Printf("can't process canvas image: %s", err)
}
}
}, nil, true),
false,
@ -68,23 +102,27 @@ func main() {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
srv.Wait()
consumer.Close()
// close all consumers
//consumer.Close()
canvasConsumer.Close()
os.Exit(0)
case <-counterTick:
srv.Wait()
if err := consumer.Commit(); err != nil {
log.Printf("can't commit messages: %s", err)
}
//if err := consumer.Commit(); err != nil {
// log.Printf("can't commit messages: %s", err)
//}
if err := canvasConsumer.Commit(); err != nil {
log.Printf("can't commit messages: %s", err)
}
case msg := <-consumer.Rebalanced():
//case msg := <-consumer.Rebalanced():
// log.Println(msg)
case msg := <-canvasConsumer.Rebalanced():
log.Println(msg)
default:
err := consumer.ConsumeNext()
if err != nil {
log.Fatalf("Error on images consumption: %v", err)
}
//err := consumer.ConsumeNext()
//if err != nil {
// log.Fatalf("Error on images consumption: %v", err)
//}
err = canvasConsumer.ConsumeNext()
if err != nil {
log.Fatalf("Error on images consumption: %v", err)

View file

@ -38,38 +38,50 @@ func main() {
workDir := cfg.FSDir
consumer := queue.NewConsumer(
cfg.GroupVideoStorage,
[]string{
cfg.TopicMobileTrigger,
},
messages.NewMessageIterator(
func(msg messages.Message) {
sesEnd := msg.(*messages.IOSSessionEnd)
log.Printf("recieved mobile session end: %d", sesEnd.SessionID())
if err := srv.Process(sesEnd.SessionID(), workDir+"/screenshots/"+strconv.FormatUint(sesEnd.SessionID(), 10)+"/", false); err != nil {
log.Printf("upload session err: %s, sessID: %d", err, msg.SessionID())
}
},
[]int{messages.MsgIOSSessionEnd},
true,
),
false,
cfg.MessageSizeLimit,
)
//consumer := queue.NewConsumer(
// cfg.GroupVideoStorage,
// []string{
// cfg.TopicMobileTrigger,
// },
// messages.NewMessageIterator(
// func(msg messages.Message) {
// sesEnd := msg.(*messages.IOSSessionEnd)
// log.Printf("skipped mobile session end: %d", sesEnd.SessionID())
// //log.Printf("recieved mobile session end: %d", sesEnd.SessionID())
// //if err := srv.Process(sesEnd.SessionID(), workDir+"/screenshots/"+strconv.FormatUint(sesEnd.SessionID(), 10)+"/", false); err != nil {
// // log.Printf("upload session err: %s, sessID: %d", err, msg.SessionID())
// //}
// },
// []int{messages.MsgIOSSessionEnd},
// true,
// ),
// false,
// cfg.MessageSizeLimit,
//)
// Debug: global counter for canvases
canvasCount := 0
canvasConsumer := queue.NewConsumer(
cfg.GroupVideoStorage,
[]string{
cfg.TopicTrigger,
cfg.TopicCanvasTrigger,
},
messages.NewMessageIterator(
func(msg messages.Message) {
sesEnd := msg.(*messages.SessionEnd)
if err := srv.Process(sesEnd.SessionID(), workDir+"/canvas/"+strconv.FormatUint(sesEnd.SessionID(), 10)+"/", true); err != nil {
filePath := workDir + "/canvas/" + strconv.FormatUint(sesEnd.SessionID(), 10) + "/"
canvasMix := sesEnd.EncryptionKey // dirty hack to use encryption key as canvas mix holder (only between canvas handler and canvas maker)
if canvasMix == "" {
log.Printf("no canvas mix for session: %d", sesEnd.SessionID())
return
}
if err := srv.Process(sesEnd.SessionID(), filePath, canvasMix); err != nil {
if !strings.Contains(err.Error(), "no such file or directory") {
log.Printf("upload session err: %s, sessID: %d", err, msg.SessionID())
}
} else {
canvasCount++
}
},
[]int{messages.MsgSessionEnd},
@ -90,23 +102,29 @@ func main() {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
srv.Wait()
consumer.Close()
//consumer.Close()
canvasConsumer.Close()
os.Exit(0)
case <-counterTick:
srv.Wait()
if err := consumer.Commit(); err != nil {
log.Printf("can't commit messages: %s", err)
}
//if err := consumer.Commit(); err != nil {
// log.Printf("can't commit messages: %s", err)
//}
if err := canvasConsumer.Commit(); err != nil {
log.Printf("can't commit messages: %s", err)
}
case msg := <-consumer.Rebalanced():
// Debug log
log.Printf("canvasCount: %d", canvasCount)
canvasCount = 0
//case msg := <-consumer.Rebalanced():
// log.Println(msg)
case msg := <-canvasConsumer.Rebalanced():
log.Println(msg)
default:
err = consumer.ConsumeNext()
if err != nil {
log.Fatalf("Error on end event consumption: %v", err)
}
//err = consumer.ConsumeNext()
//if err != nil {
// log.Fatalf("Error on end event consumption: %v", err)
//}
err = canvasConsumer.ConsumeNext()
if err != nil {
log.Fatalf("Error on end event consumption: %v", err)

View file

@ -16,6 +16,7 @@ type Config struct {
LoggerTimeout int `env:"LOG_QUEUE_STATS_INTERVAL_SEC,required"`
TopicRawWeb string `env:"TOPIC_RAW_WEB,required"`
TopicRawIOS string `env:"TOPIC_RAW_IOS,required"`
TopicCanvasImages string `env:"TOPIC_CANVAS_IMAGES,required"`
ProducerTimeout int `env:"PRODUCER_TIMEOUT,default=2000"`
PartitionsNumber int `env:"PARTITIONS_NUMBER,required"`
UseEncryption bool `env:"USE_ENCRYPTION,default=false"`

View file

@ -32,8 +32,8 @@ type Config struct {
UseAccessControlHeaders bool `env:"USE_CORS,default=false"`
ProjectExpiration time.Duration `env:"PROJECT_EXPIRATION,default=10m"`
RecordCanvas bool `env:"RECORD_CANVAS,default=false"`
CanvasQuality string `env:"CANVAS_QUALITY,default=medium"`
CanvasFps int `env:"CANVAS_FPS,default=2"`
CanvasQuality string `env:"CANVAS_QUALITY,default=low"`
CanvasFps int `env:"CANVAS_FPS,default=1"`
WorkerID uint16
}

View file

@ -7,13 +7,14 @@ import (
type Config struct {
common.Config
FSDir string `env:"FS_DIR,required"`
ScreenshotsDir string `env:"SCREENSHOTS_DIR,default=screenshots"`
CanvasDir string `env:"CANVAS_DIR,default=canvas"`
TopicRawImages string `env:"TOPIC_RAW_IMAGES,required"`
TopicCanvasImages string `env:"TOPIC_CANVAS_IMAGES,required"`
GroupImageStorage string `env:"GROUP_IMAGE_STORAGE,required"`
UseProfiler bool `env:"PROFILER_ENABLED,default=false"`
FSDir string `env:"FS_DIR,required"`
ScreenshotsDir string `env:"SCREENSHOTS_DIR,default=screenshots"`
CanvasDir string `env:"CANVAS_DIR,default=canvas"`
TopicRawImages string `env:"TOPIC_RAW_IMAGES,required"`
TopicCanvasImages string `env:"TOPIC_CANVAS_IMAGES,required"`
TopicCanvasTrigger string `env:"TOPIC_CANVAS_TRIGGER,required"`
GroupImageStorage string `env:"GROUP_IMAGE_STORAGE,required"`
UseProfiler bool `env:"PROFILER_ENABLED,default=false"`
}
func New() *Config {

View file

@ -13,6 +13,7 @@ type Config struct {
GroupVideoStorage string `env:"GROUP_VIDEO_STORAGE,required"`
TopicMobileTrigger string `env:"TOPIC_MOBILE_TRIGGER,required"`
TopicTrigger string `env:"TOPIC_TRIGGER,required"`
TopicCanvasTrigger string `env:"TOPIC_CANVAS_TRIGGER,required"`
VideoReplayFPS int `env:"VIDEO_REPLAY_FPS,default=3"`
UseProfiler bool `env:"PROFILER_ENABLED,default=false"`
}

View file

@ -6,9 +6,12 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"sort"
"strconv"
"strings"
"time"
gzip "github.com/klauspost/pgzip"
@ -74,6 +77,80 @@ type ScreenshotMessage struct {
Data []byte
}
func (v *ImageStorage) PrepareCanvas(sessID uint64) ([]string, error) {
// Build the directory path to session's canvas images
path := v.cfg.FSDir + "/"
if v.cfg.CanvasDir != "" {
path += v.cfg.CanvasDir + "/"
}
path += strconv.FormatUint(sessID, 10) + "/"
// Check that the directory exists
files, err := ioutil.ReadDir(path)
if err != nil {
return nil, err
}
if len(files) == 0 {
return []string{}, nil
}
log.Printf("There are %d canvas images of session %d\n", len(files), sessID)
type canvasData struct {
files map[int]string
times []int
}
images := make(map[string]*canvasData)
// Build the list of canvas images sets
for _, file := range files {
name := strings.Split(file.Name(), ".")
parts := strings.Split(name[0], "_")
if len(name) != 2 || len(parts) != 3 {
log.Printf("unknown file name: %s, skipping", file.Name())
continue
}
canvasID := fmt.Sprintf("%s_%s", parts[0], parts[1])
canvasTS, _ := strconv.Atoi(parts[2])
if _, ok := images[canvasID]; !ok {
images[canvasID] = &canvasData{
files: make(map[int]string),
times: make([]int, 0),
}
}
images[canvasID].files[canvasTS] = file.Name()
images[canvasID].times = append(images[canvasID].times, canvasTS)
}
// Prepare screenshot lists for ffmpeg
namesList := make([]string, 0)
for name, cData := range images {
// Write to file
mixName := fmt.Sprintf("%s-list", name)
mixList := path + mixName
outputFile, err := os.Create(mixList)
if err != nil {
log.Printf("can't create mix list, err: %s", err)
continue
}
sort.Ints(cData.times)
for i := 0; i < len(cData.times)-1; i++ {
dur := float64(cData.times[i+1]-cData.times[i]) / 1000.0
line := fmt.Sprintf("file %s\nduration %.3f\n", cData.files[cData.times[i]], dur)
_, err := outputFile.WriteString(line)
if err != nil {
outputFile.Close()
log.Printf("%s", err)
continue
}
}
outputFile.Close()
log.Printf("made canvas list %s", mixList)
namesList = append(namesList, mixName)
}
return namesList, nil
}
func (v *ImageStorage) ProcessCanvas(sessID uint64, data []byte) error {
var msg = &ScreenshotMessage{}
if err := json.Unmarshal(data, msg); err != nil {
@ -81,7 +158,7 @@ func (v *ImageStorage) ProcessCanvas(sessID uint64, data []byte) error {
}
// Use the same workflow
v.writeToDiskTasks <- &Task{sessionID: sessID, images: map[string]*bytes.Buffer{msg.Name: bytes.NewBuffer(msg.Data)}, imageType: canvas}
log.Printf("new canvas image, sessID: %d, name: %s, size: %d mb", sessID, msg.Name, len(msg.Data)/1024/1024)
log.Printf("new canvas image, sessID: %d, name: %s, size: %3.3f mb", sessID, msg.Name, float64(len(msg.Data))/1024.0/1024.0)
return nil
}
@ -138,6 +215,7 @@ func (v *ImageStorage) writeToDisk(task *Task) {
}
// Write images to disk
saved := 0
for name, img := range task.images {
outFile, err := os.Create(path + name) // or open file in rewrite mode
if err != nil {
@ -147,7 +225,9 @@ func (v *ImageStorage) writeToDisk(task *Task) {
log.Printf("can't copy file: %s", err.Error())
}
outFile.Close()
saved++
}
log.Printf("saved %d images to disk", saved)
return
}

View file

@ -7,9 +7,7 @@ import (
"log"
config "openreplay/backend/internal/config/videostorage"
"openreplay/backend/pkg/objectstorage"
"os"
"os/exec"
"sort"
"strconv"
"strings"
"time"
@ -81,7 +79,7 @@ func (v *VideoStorage) makeVideo(sessID uint64, filesPath string) error {
return nil
}
func (v *VideoStorage) makeCanvasVideo(sessID uint64, filesPath string) error {
func (v *VideoStorage) makeCanvasVideo(sessID uint64, filesPath, canvasMix string) error {
files, err := ioutil.ReadDir(filesPath)
if err != nil {
return err
@ -89,61 +87,23 @@ func (v *VideoStorage) makeCanvasVideo(sessID uint64, filesPath string) error {
if len(files) == 0 {
return nil
}
log.Printf("There are %d canvas images of session %d\n", len(files), 0)
type canvasData struct {
files map[int]string
times []int
}
images := make(map[string]*canvasData)
log.Printf("There are %d mix lists of session %d\n", len(files), sessID)
for _, file := range files {
name := strings.Split(file.Name(), ".")
parts := strings.Split(name[0], "_")
if len(name) != 2 || len(parts) != 3 {
log.Printf("unknown file name: %s, skipping", file.Name())
continue
}
canvasID := fmt.Sprintf("%s_%s", parts[0], parts[1])
canvasTS, _ := strconv.Atoi(parts[2])
log.Printf("%s : %d", canvasID, canvasTS)
if _, ok := images[canvasID]; !ok {
images[canvasID] = &canvasData{
files: make(map[int]string),
times: make([]int, 0),
}
}
images[canvasID].files[canvasTS] = file.Name()
images[canvasID].times = append(images[canvasID].times, canvasTS)
}
for name, cData := range images {
// Write to file
mixList := fmt.Sprintf("%s/%s-list", filesPath, name)
outputFile, err := os.Create(mixList)
if err != nil {
log.Printf("can't create mix list, err: %s", err)
if !strings.HasSuffix(file.Name(), "-list") {
continue
}
sort.Ints(cData.times)
for i := 0; i < len(cData.times)-1; i++ {
dur := float64(cData.times[i+1]-cData.times[i]) / 1000.0
line := fmt.Sprintf("file %s\nduration %.3f\n", cData.files[cData.times[i]], dur)
_, err := outputFile.WriteString(line)
if err != nil {
outputFile.Close()
log.Printf("%s", err)
continue
}
log.Printf(line)
}
outputFile.Close()
name := strings.TrimSuffix(file.Name(), "-list")
mixList := fmt.Sprintf("%s%s-list", filesPath, name)
videoPath := fmt.Sprintf("%s%s.mp4", filesPath, name)
// Run ffmpeg to build video
start := time.Now()
sessionID := strconv.FormatUint(sessID, 10)
videoPath := fmt.Sprintf("%s/%s.mp4", filesPath, name)
cmd := exec.Command("ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", mixList, "-vsync", "vfr",
cmd := exec.Command("ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", mixList, "-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2", "-vsync", "vfr",
"-pix_fmt", "yuv420p", "-preset", "ultrafast", videoPath)
// ffmpeg -f concat -safe 0 -i input.txt -vsync vfr -pix_fmt yuv420p output.mp4
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
@ -180,9 +140,9 @@ func (v *VideoStorage) sendToS3(task *Task) {
return
}
func (v *VideoStorage) Process(sessID uint64, filesPath string, isCanvas bool) error {
if isCanvas {
return v.makeCanvasVideo(sessID, filesPath)
func (v *VideoStorage) Process(sessID uint64, filesPath string, canvasMix string) error {
if canvasMix != "" {
return v.makeCanvasVideo(sessID, filesPath, canvasMix)
}
return v.makeVideo(sessID, filesPath)
}