Mobile replay (#1853)

* feat(backend): added new trigger topic for mobile video replay maker

* feat(backend): mobile video maker with mix list support

* feat(backend): fixed panic

* feat(backend): removed commented code
This commit is contained in:
Alexander 2024-01-25 14:30:40 +01:00 committed by GitHub
parent 731f3fd6ac
commit 255f879476
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 163 additions and 48 deletions

View file

@ -73,6 +73,7 @@ ENV TZ=UTC \
TOPIC_MOBILE_TRIGGER=mobile-trigger \
TOPIC_CANVAS_IMAGES=canvas-images \
TOPIC_CANVAS_TRIGGER=canvas-trigger \
TOPIC_REPLAY_TRIGGER=replay-trigger \
GROUP_SINK=sink \
GROUP_STORAGE=storage \
GROUP_DB=db \

View file

@ -179,6 +179,10 @@ func main() {
log.Printf("can't send iOSSessionEnd to topic: %s; sessID: %d", err, sessionID)
return false, 0
}
// Inform canvas service about session end
if err := producer.Produce(cfg.TopicRawImages, sessionID, msg.Encode()); err != nil {
log.Printf("can't send sessionEnd signal to mobile images topic: %s; sessID: %d", err, sessionID)
}
} else {
if err := producer.Produce(cfg.TopicRawWeb, sessionID, msg.Encode()); err != nil {
log.Printf("can't send sessionEnd to raw topic: %s; sessID: %d", err, sessionID)

View file

@ -1,6 +1,7 @@
package main
import (
"fmt"
"log"
"os"
"os/signal"
@ -29,14 +30,44 @@ func main() {
return
}
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
consumer := queue.NewConsumer(
cfg.GroupImageStorage,
[]string{
cfg.TopicRawImages,
},
messages.NewImagesMessageIterator(func(data []byte, sessID uint64) {
if err := srv.Process(sessID, data); err != nil {
log.Printf("can't process image: %s", err)
checkSessionEnd := func(data []byte) (messages.Message, error) {
reader := messages.NewBytesReader(data)
msgType, err := reader.ReadUint()
if err != nil {
return nil, err
}
if msgType != messages.MsgIOSSessionEnd {
return nil, fmt.Errorf("not a mobile session end message")
}
msg, err := messages.ReadMessage(msgType, reader)
if err != nil {
return nil, fmt.Errorf("read message err: %s", err)
}
return msg, nil
}
if msg, err := checkSessionEnd(data); err == nil {
sessEnd := msg.(*messages.IOSSessionEnd)
// Received session end
if err := srv.Prepare(sessID); err != nil {
log.Printf("can't prepare mobile session: %s", err)
} else {
if err := producer.Produce(cfg.TopicReplayTrigger, sessID, sessEnd.Encode()); err != nil {
log.Printf("can't send session end signal to video service: %s", err)
}
}
} else {
if err := srv.Process(sessID, data); err != nil {
log.Printf("can't process mobile screenshots: %s", err)
}
}
}, nil, true),
false,

View file

@ -40,7 +40,7 @@ func main() {
consumer := queue.NewConsumer(
cfg.GroupVideoStorage,
[]string{
cfg.TopicMobileTrigger,
cfg.TopicReplayTrigger,
},
messages.NewMessageIterator(
func(msg messages.Message) {

View file

@ -17,6 +17,7 @@ type Config struct {
TopicRawWeb string `env:"TOPIC_RAW_WEB,required"`
TopicRawIOS string `env:"TOPIC_RAW_IOS,required"`
TopicCanvasImages string `env:"TOPIC_CANVAS_IMAGES,required"`
TopicRawImages string `env:"TOPIC_RAW_IMAGES,required"`
ProducerTimeout int `env:"PRODUCER_TIMEOUT,default=2000"`
PartitionsNumber int `env:"PARTITIONS_NUMBER,required"`
UseEncryption bool `env:"USE_ENCRYPTION,default=false"`

View file

@ -13,6 +13,7 @@ type Config struct {
TopicRawImages string `env:"TOPIC_RAW_IMAGES,required"`
TopicCanvasImages string `env:"TOPIC_CANVAS_IMAGES,required"`
TopicCanvasTrigger string `env:"TOPIC_CANVAS_TRIGGER,required"`
TopicReplayTrigger string `env:"TOPIC_REPLAY_TRIGGER,required"`
GroupImageStorage string `env:"GROUP_IMAGE_STORAGE,required"`
GroupCanvasImage string `env:"GROUP_CANVAS_IMAGE,required"`
UseProfiler bool `env:"PROFILER_ENABLED,default=false"`

View file

@ -12,8 +12,7 @@ type Config struct {
FSDir string `env:"FS_DIR,required"`
GroupVideoStorage string `env:"GROUP_VIDEO_STORAGE,required"`
GroupCanvasVideo string `env:"GROUP_CANVAS_VIDEO,required"`
TopicMobileTrigger string `env:"TOPIC_MOBILE_TRIGGER,required"`
TopicTrigger string `env:"TOPIC_TRIGGER,required"`
TopicReplayTrigger string `env:"TOPIC_REPLAY_TRIGGER,required"`
TopicCanvasTrigger string `env:"TOPIC_CANVAS_TRIGGER,required"`
VideoReplayFPS int `env:"VIDEO_REPLAY_FPS,default=3"`
UseProfiler bool `env:"PROFILER_ENABLED,default=false"`

View file

@ -55,6 +55,11 @@ func (e *Router) startSessionHandlerIOS(w http.ResponseWriter, r *http.Request)
return
}
if !checkMobileTrackerVersion(req.TrackerVersion) {
ResponseWithError(w, http.StatusUpgradeRequired, errors.New("tracker version not supported"), startTime, r.URL.Path, 0)
return
}
userUUID := uuid.GetUUID(req.UserUUID)
tokenData, err := e.services.Tokenizer.Parse(req.Token)

View file

@ -61,6 +61,23 @@ func (e *Router) readBody(w http.ResponseWriter, r *http.Request, limit int64) (
return bodyBytes, nil
}
func checkMobileTrackerVersion(ver string) bool {
c, err := semver.NewConstraint(">=1.0.9")
if err != nil {
return false
}
// Check for beta version
parts := strings.Split(ver, "-")
if len(parts) > 1 {
ver = parts[0]
}
v, err := semver.NewVersion(ver)
if err != nil {
return false
}
return c.Check(v)
}
func getSessionTimestamp(req *StartSessionRequest, startTimeMili int64) (ts uint64) {
ts = uint64(req.Timestamp)
if req.IsOffline {

View file

@ -4,6 +4,7 @@ import (
"archive/tar"
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
@ -72,6 +73,67 @@ func (v *ImageStorage) Process(sessID uint64, data []byte) error {
return nil
}
func (v *ImageStorage) Prepare(sessID uint64) error {
path := v.cfg.FSDir + "/"
if v.cfg.ScreenshotsDir != "" {
path += v.cfg.ScreenshotsDir + "/"
}
path += strconv.FormatUint(sessID, 10) + "/"
// Check that the directory exists
files, err := ioutil.ReadDir(path)
if err != nil {
return err
}
if len(files) == 0 {
return errors.New("no screenshots found")
}
images := make(map[int]string)
times := make([]int, 0, len(files))
// Build the list of canvas images sets
for _, file := range files {
name := strings.Split(file.Name(), ".")
parts := strings.Split(name[0], "_")
if len(name) != 2 || len(parts) != 3 {
log.Printf("unknown file name: %s, skipping", file.Name())
continue
}
screenshotTS, _ := strconv.Atoi(parts[2])
images[screenshotTS] = file.Name()
times = append(times, screenshotTS)
}
// Prepare screenshot lists for ffmpeg
mixName := fmt.Sprintf("%d-list", sessID)
mixList := path + mixName
outputFile, err := os.Create(mixList)
if err != nil {
log.Printf("can't create mix list, err: %s", err)
return err
}
sort.Ints(times)
count := 0
for i := 0; i < len(times)-1; i++ {
dur := float64(times[i+1]-times[i]) / 1000.0
line := fmt.Sprintf("file %s\nduration %.3f\n", images[times[i]], dur)
_, err := outputFile.WriteString(line)
if err != nil {
outputFile.Close()
log.Printf("%s", err)
continue
}
count++
}
outputFile.Close()
log.Printf("new canvas mix %s with %d images", mixList, count)
return nil
}
type ScreenshotMessage struct {
Name string
Data []byte
@ -93,7 +155,6 @@ func (v *ImageStorage) PrepareCanvas(sessID uint64) ([]string, error) {
if len(files) == 0 {
return []string{}, nil
}
log.Printf("There are %d canvas images of session %d\n", len(files), sessID)
type canvasData struct {
files map[int]string
@ -134,6 +195,7 @@ func (v *ImageStorage) PrepareCanvas(sessID uint64) ([]string, error) {
}
sort.Ints(cData.times)
count := 0
for i := 0; i < len(cData.times)-1; i++ {
dur := float64(cData.times[i+1]-cData.times[i]) / 1000.0
line := fmt.Sprintf("file %s\nduration %.3f\n", cData.files[cData.times[i]], dur)
@ -143,11 +205,13 @@ func (v *ImageStorage) PrepareCanvas(sessID uint64) ([]string, error) {
log.Printf("%s", err)
continue
}
count++
}
outputFile.Close()
log.Printf("made canvas list %s", mixList)
log.Printf("new canvas mix %s with %d images", mixList, count)
namesList = append(namesList, mixName)
}
log.Printf("prepared %d canvas mixes for session %d", len(namesList), sessID)
return namesList, nil
}

View file

@ -53,26 +53,28 @@ func New(cfg *config.Config, objStorage objectstorage.ObjectStorage) (*VideoStor
}
func (v *VideoStorage) makeVideo(sessID uint64, filesPath string) error {
files, _ := ioutil.ReadDir(filesPath)
files, err := ioutil.ReadDir(filesPath)
if err != nil || len(files) == 0 {
return err // nil error is there is no screenshots
}
log.Printf("There are %d screenshot of session %d\n", len(files), sessID)
// Try to call ffmpeg and print the result
start := time.Now()
sessionID := strconv.FormatUint(sessID, 10)
imagesPath := "/mnt/efs/screenshots/" + sessionID + "/%06d.jpeg"
mixList := fmt.Sprintf("%s%s", filesPath, sessionID+"-list")
videoPath := "/mnt/efs/screenshots/" + sessionID + "/replay.mp4"
cmd := exec.Command("ffmpeg", "-y", "-f", "image2", "-framerate", v.framerate, "-start_number", "000000", "-i",
imagesPath, "-vf", "scale=-2:1064", "-c:v", "libx264", "-preset", "ultrafast", "-crf", "23",
videoPath)
// ffmpeg -y -f concat -safe 0 -i 1699978964098_29-list -vf --pix_fmt yuv420p -preset ultrafast canvas.mp4
cmd := exec.Command("ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", mixList, "-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2", "-vsync", "vfr",
"-pix_fmt", "yuv420p", "-preset", "ultrafast", videoPath)
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err := cmd.Run()
err = cmd.Run()
if err != nil {
log.Fatalf("Failed to execute command: %v, stderr: %v", err, stderr.String())
log.Printf("Failed to execute command: %v, stderr: %v", err, stderr.String())
return err
}
log.Printf("made video replay in %v", time.Since(start))
v.sendToS3Tasks <- &Task{sessionID: sessionID, path: videoPath}
@ -80,42 +82,32 @@ func (v *VideoStorage) makeVideo(sessID uint64, filesPath string) error {
}
func (v *VideoStorage) makeCanvasVideo(sessID uint64, filesPath, canvasMix string) error {
files, err := ioutil.ReadDir(filesPath)
if err != nil {
name := strings.TrimSuffix(canvasMix, "-list")
mixList := fmt.Sprintf("%s%s", filesPath, canvasMix)
// check that mixList exists
if _, err := ioutil.ReadFile(mixList); err != nil {
return err
}
if len(files) == 0 {
return nil
}
log.Printf("There are %d mix lists of session %d\n", len(files), sessID)
for _, file := range files {
if !strings.HasSuffix(file.Name(), "-list") {
continue
}
name := strings.TrimSuffix(file.Name(), "-list")
mixList := fmt.Sprintf("%s%s-list", filesPath, name)
videoPath := fmt.Sprintf("%s%s.mp4", filesPath, name)
// Run ffmpeg to build video
start := time.Now()
sessionID := strconv.FormatUint(sessID, 10)
cmd := exec.Command("ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", mixList, "-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2", "-vsync", "vfr",
"-pix_fmt", "yuv420p", "-preset", "ultrafast", videoPath)
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err = cmd.Run()
if err != nil {
log.Fatalf("Failed to execute command: %v, stderr: %v", err, stderr.String())
}
log.Printf("made video replay in %v", time.Since(start))
v.sendToS3Tasks <- &Task{sessionID: sessionID, path: videoPath, name: "/" + name + ".mp4"}
videoPath := fmt.Sprintf("%s%s.mp4", filesPath, name)
// Run ffmpeg to build video
start := time.Now()
sessionID := strconv.FormatUint(sessID, 10)
cmd := exec.Command("ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", mixList, "-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2", "-vsync", "vfr",
"-pix_fmt", "yuv420p", "-preset", "ultrafast", videoPath)
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err := cmd.Run()
if err != nil {
log.Printf("Failed to execute command: %v, stderr: %v", err, stderr.String())
return err
}
log.Printf("made video replay in %v", time.Since(start))
v.sendToS3Tasks <- &Task{sessionID: sessionID, path: videoPath, name: "/" + name + ".mp4"}
return nil
}