diff --git a/backend/Dockerfile b/backend/Dockerfile index 3471d7428..d14919f91 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -72,6 +72,7 @@ ENV TZ=UTC \ TOPIC_TRIGGER=trigger \ TOPIC_MOBILE_TRIGGER=mobile-trigger \ TOPIC_CANVAS_IMAGES=canvas-images \ + TOPIC_CANVAS_TRIGGER=canvas-trigger \ GROUP_SINK=sink \ GROUP_STORAGE=storage \ GROUP_DB=db \ @@ -112,7 +113,8 @@ ENV TZ=UTC \ # Use to set compression threshold for tracker requests (20kb by default) COMPRESSION_THRESHOLD="20000" \ # Set Access-Control-* headers for tracker requests if true - USE_CORS=false + USE_CORS=false \ + RECORD_CANVAS=true RUN if [ "$SERVICE_NAME" = "http" ]; then \ diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index 0476018fe..8940d1dcc 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -181,9 +181,13 @@ func main() { } } else { if err := producer.Produce(cfg.TopicRawWeb, sessionID, msg.Encode()); err != nil { - log.Printf("can't send sessionEnd to topic: %s; sessID: %d", err, sessionID) + log.Printf("can't send sessionEnd to raw topic: %s; sessID: %d", err, sessionID) return false, 0 } + // Inform canvas service about session end + if err := producer.Produce(cfg.TopicCanvasImages, sessionID, msg.Encode()); err != nil { + log.Printf("can't send sessionEnd signal to canvas topic: %s; sessID: %d", err, sessionID) + } } if currDuration != 0 { diff --git a/backend/cmd/imagestorage/main.go b/backend/cmd/imagestorage/main.go index 07d2a20da..9b3cacc95 100644 --- a/backend/cmd/imagestorage/main.go +++ b/backend/cmd/imagestorage/main.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "log" "os" "os/signal" @@ -29,6 +30,8 @@ func main() { return } + producer := queue.NewProducer(cfg.MessageSizeLimit, true) + consumer := queue.NewConsumer( cfg.GroupImageStorage, []string{ @@ -49,8 +52,39 @@ func main() { cfg.TopicCanvasImages, }, messages.NewImagesMessageIterator(func(data []byte, sessID uint64) { - if err := srv.ProcessCanvas(sessID, data); err != nil { - log.Printf("can't process canvas image: %s", err) + checkSessionEnd := func(data []byte) (messages.Message, error) { + reader := messages.NewBytesReader(data) + msgType, err := reader.ReadUint() + if err != nil { + return nil, err + } + if msgType != messages.MsgSessionEnd { + return nil, fmt.Errorf("not a session end message") + } + msg, err := messages.ReadMessage(msgType, reader) + if err != nil { + return nil, fmt.Errorf("read message err: %s", err) + } + return msg, nil + } + + if msg, err := checkSessionEnd(data); err == nil { + sessEnd := msg.(*messages.SessionEnd) + // Received session end + if list, err := srv.PrepareCanvas(sessID); err != nil { + log.Printf("can't prepare canvas: %s", err) + } else { + for _, name := range list { + sessEnd.EncryptionKey = name + if err := producer.Produce(cfg.TopicCanvasTrigger, sessID, sessEnd.Encode()); err != nil { + log.Printf("can't send session end signal to video service: %s", err) + } + } + } + } else { + if err := srv.ProcessCanvas(sessID, data); err != nil { + log.Printf("can't process canvas image: %s", err) + } } }, nil, true), false, @@ -68,7 +102,9 @@ func main() { case sig := <-sigchan: log.Printf("Caught signal %v: terminating\n", sig) srv.Wait() + // close all consumers consumer.Close() + canvasConsumer.Close() os.Exit(0) case <-counterTick: srv.Wait() @@ -80,6 +116,8 @@ func main() { } case msg := <-consumer.Rebalanced(): log.Println(msg) + case msg := <-canvasConsumer.Rebalanced(): + log.Println(msg) default: err := consumer.ConsumeNext() if err != nil { diff --git a/backend/cmd/videostorage/main.go b/backend/cmd/videostorage/main.go index 3d2650cfb..df38249c3 100644 --- a/backend/cmd/videostorage/main.go +++ b/backend/cmd/videostorage/main.go @@ -47,7 +47,7 @@ func main() { func(msg messages.Message) { sesEnd := msg.(*messages.IOSSessionEnd) log.Printf("recieved mobile session end: %d", sesEnd.SessionID()) - if err := srv.Process(sesEnd.SessionID(), workDir+"/screenshots/"+strconv.FormatUint(sesEnd.SessionID(), 10)+"/", false); err != nil { + if err := srv.Process(sesEnd.SessionID(), workDir+"/screenshots/"+strconv.FormatUint(sesEnd.SessionID(), 10)+"/", ""); err != nil { log.Printf("upload session err: %s, sessID: %d", err, msg.SessionID()) } }, @@ -61,12 +61,18 @@ func main() { canvasConsumer := queue.NewConsumer( cfg.GroupVideoStorage, []string{ - cfg.TopicTrigger, + cfg.TopicCanvasTrigger, }, messages.NewMessageIterator( func(msg messages.Message) { sesEnd := msg.(*messages.SessionEnd) - if err := srv.Process(sesEnd.SessionID(), workDir+"/canvas/"+strconv.FormatUint(sesEnd.SessionID(), 10)+"/", true); err != nil { + filePath := workDir + "/canvas/" + strconv.FormatUint(sesEnd.SessionID(), 10) + "/" + canvasMix := sesEnd.EncryptionKey // dirty hack to use encryption key as canvas mix holder (only between canvas handler and canvas maker) + if canvasMix == "" { + log.Printf("no canvas mix for session: %d", sesEnd.SessionID()) + return + } + if err := srv.Process(sesEnd.SessionID(), filePath, canvasMix); err != nil { if !strings.Contains(err.Error(), "no such file or directory") { log.Printf("upload session err: %s, sessID: %d", err, msg.SessionID()) } @@ -91,6 +97,7 @@ func main() { log.Printf("Caught signal %v: terminating\n", sig) srv.Wait() consumer.Close() + canvasConsumer.Close() os.Exit(0) case <-counterTick: srv.Wait() @@ -102,6 +109,8 @@ func main() { } case msg := <-consumer.Rebalanced(): log.Println(msg) + case msg := <-canvasConsumer.Rebalanced(): + log.Println(msg) default: err = consumer.ConsumeNext() if err != nil { diff --git a/backend/internal/config/ender/config.go b/backend/internal/config/ender/config.go index b3182567a..bdbc4a74f 100644 --- a/backend/internal/config/ender/config.go +++ b/backend/internal/config/ender/config.go @@ -16,6 +16,7 @@ type Config struct { LoggerTimeout int `env:"LOG_QUEUE_STATS_INTERVAL_SEC,required"` TopicRawWeb string `env:"TOPIC_RAW_WEB,required"` TopicRawIOS string `env:"TOPIC_RAW_IOS,required"` + TopicCanvasImages string `env:"TOPIC_CANVAS_IMAGES,required"` ProducerTimeout int `env:"PRODUCER_TIMEOUT,default=2000"` PartitionsNumber int `env:"PARTITIONS_NUMBER,required"` UseEncryption bool `env:"USE_ENCRYPTION,default=false"` diff --git a/backend/internal/config/http/config.go b/backend/internal/config/http/config.go index fe2126573..824766c75 100644 --- a/backend/internal/config/http/config.go +++ b/backend/internal/config/http/config.go @@ -32,8 +32,8 @@ type Config struct { UseAccessControlHeaders bool `env:"USE_CORS,default=false"` ProjectExpiration time.Duration `env:"PROJECT_EXPIRATION,default=10m"` RecordCanvas bool `env:"RECORD_CANVAS,default=false"` - CanvasQuality string `env:"CANVAS_QUALITY,default=medium"` - CanvasFps int `env:"CANVAS_FPS,default=2"` + CanvasQuality string `env:"CANVAS_QUALITY,default=low"` + CanvasFps int `env:"CANVAS_FPS,default=1"` WorkerID uint16 } diff --git a/backend/internal/config/imagestorage/config.go b/backend/internal/config/imagestorage/config.go index dcae17997..3dabfa06b 100644 --- a/backend/internal/config/imagestorage/config.go +++ b/backend/internal/config/imagestorage/config.go @@ -7,13 +7,14 @@ import ( type Config struct { common.Config - FSDir string `env:"FS_DIR,required"` - ScreenshotsDir string `env:"SCREENSHOTS_DIR,default=screenshots"` - CanvasDir string `env:"CANVAS_DIR,default=canvas"` - TopicRawImages string `env:"TOPIC_RAW_IMAGES,required"` - TopicCanvasImages string `env:"TOPIC_CANVAS_IMAGES,required"` - GroupImageStorage string `env:"GROUP_IMAGE_STORAGE,required"` - UseProfiler bool `env:"PROFILER_ENABLED,default=false"` + FSDir string `env:"FS_DIR,required"` + ScreenshotsDir string `env:"SCREENSHOTS_DIR,default=screenshots"` + CanvasDir string `env:"CANVAS_DIR,default=canvas"` + TopicRawImages string `env:"TOPIC_RAW_IMAGES,required"` + TopicCanvasImages string `env:"TOPIC_CANVAS_IMAGES,required"` + TopicCanvasTrigger string `env:"TOPIC_CANVAS_TRIGGER,required"` + GroupImageStorage string `env:"GROUP_IMAGE_STORAGE,required"` + UseProfiler bool `env:"PROFILER_ENABLED,default=false"` } func New() *Config { diff --git a/backend/internal/config/videostorage/config.go b/backend/internal/config/videostorage/config.go index 32841f758..41fbed038 100644 --- a/backend/internal/config/videostorage/config.go +++ b/backend/internal/config/videostorage/config.go @@ -13,6 +13,7 @@ type Config struct { GroupVideoStorage string `env:"GROUP_VIDEO_STORAGE,required"` TopicMobileTrigger string `env:"TOPIC_MOBILE_TRIGGER,required"` TopicTrigger string `env:"TOPIC_TRIGGER,required"` + TopicCanvasTrigger string `env:"TOPIC_CANVAS_TRIGGER,required"` VideoReplayFPS int `env:"VIDEO_REPLAY_FPS,default=3"` UseProfiler bool `env:"PROFILER_ENABLED,default=false"` } diff --git a/backend/internal/imagestorage/service.go b/backend/internal/imagestorage/service.go index b2a6a125d..f7e57a9eb 100644 --- a/backend/internal/imagestorage/service.go +++ b/backend/internal/imagestorage/service.go @@ -6,9 +6,12 @@ import ( "encoding/json" "fmt" "io" + "io/ioutil" "log" "os" + "sort" "strconv" + "strings" "time" gzip "github.com/klauspost/pgzip" @@ -74,6 +77,80 @@ type ScreenshotMessage struct { Data []byte } +func (v *ImageStorage) PrepareCanvas(sessID uint64) ([]string, error) { + // Build the directory path to session's canvas images + path := v.cfg.FSDir + "/" + if v.cfg.CanvasDir != "" { + path += v.cfg.CanvasDir + "/" + } + path += strconv.FormatUint(sessID, 10) + "/" + + // Check that the directory exists + files, err := ioutil.ReadDir(path) + if err != nil { + return nil, err + } + if len(files) == 0 { + return []string{}, nil + } + log.Printf("There are %d canvas images of session %d\n", len(files), sessID) + + type canvasData struct { + files map[int]string + times []int + } + images := make(map[string]*canvasData) + + // Build the list of canvas images sets + for _, file := range files { + name := strings.Split(file.Name(), ".") + parts := strings.Split(name[0], "_") + if len(name) != 2 || len(parts) != 3 { + log.Printf("unknown file name: %s, skipping", file.Name()) + continue + } + canvasID := fmt.Sprintf("%s_%s", parts[0], parts[1]) + canvasTS, _ := strconv.Atoi(parts[2]) + if _, ok := images[canvasID]; !ok { + images[canvasID] = &canvasData{ + files: make(map[int]string), + times: make([]int, 0), + } + } + images[canvasID].files[canvasTS] = file.Name() + images[canvasID].times = append(images[canvasID].times, canvasTS) + } + + // Prepare screenshot lists for ffmpeg + namesList := make([]string, 0) + for name, cData := range images { + // Write to file + mixName := fmt.Sprintf("%s-list", name) + mixList := path + mixName + outputFile, err := os.Create(mixList) + if err != nil { + log.Printf("can't create mix list, err: %s", err) + continue + } + + sort.Ints(cData.times) + for i := 0; i < len(cData.times)-1; i++ { + dur := float64(cData.times[i+1]-cData.times[i]) / 1000.0 + line := fmt.Sprintf("file %s\nduration %.3f\n", cData.files[cData.times[i]], dur) + _, err := outputFile.WriteString(line) + if err != nil { + outputFile.Close() + log.Printf("%s", err) + continue + } + } + outputFile.Close() + log.Printf("made canvas list %s", mixList) + namesList = append(namesList, mixName) + } + return namesList, nil +} + func (v *ImageStorage) ProcessCanvas(sessID uint64, data []byte) error { var msg = &ScreenshotMessage{} if err := json.Unmarshal(data, msg); err != nil { @@ -81,7 +158,7 @@ func (v *ImageStorage) ProcessCanvas(sessID uint64, data []byte) error { } // Use the same workflow v.writeToDiskTasks <- &Task{sessionID: sessID, images: map[string]*bytes.Buffer{msg.Name: bytes.NewBuffer(msg.Data)}, imageType: canvas} - log.Printf("new canvas image, sessID: %d, name: %s, size: %d mb", sessID, msg.Name, len(msg.Data)/1024/1024) + log.Printf("new canvas image, sessID: %d, name: %s, size: %3.3f mb", sessID, msg.Name, float64(len(msg.Data))/1024.0/1024.0) return nil } @@ -138,6 +215,7 @@ func (v *ImageStorage) writeToDisk(task *Task) { } // Write images to disk + saved := 0 for name, img := range task.images { outFile, err := os.Create(path + name) // or open file in rewrite mode if err != nil { @@ -147,7 +225,9 @@ func (v *ImageStorage) writeToDisk(task *Task) { log.Printf("can't copy file: %s", err.Error()) } outFile.Close() + saved++ } + log.Printf("saved %d images to disk", saved) return } diff --git a/backend/internal/videostorage/service.go b/backend/internal/videostorage/service.go index a428d1964..7a3eae9db 100644 --- a/backend/internal/videostorage/service.go +++ b/backend/internal/videostorage/service.go @@ -7,9 +7,7 @@ import ( "log" config "openreplay/backend/internal/config/videostorage" "openreplay/backend/pkg/objectstorage" - "os" "os/exec" - "sort" "strconv" "strings" "time" @@ -81,7 +79,7 @@ func (v *VideoStorage) makeVideo(sessID uint64, filesPath string) error { return nil } -func (v *VideoStorage) makeCanvasVideo(sessID uint64, filesPath string) error { +func (v *VideoStorage) makeCanvasVideo(sessID uint64, filesPath, canvasMix string) error { files, err := ioutil.ReadDir(filesPath) if err != nil { return err @@ -89,61 +87,23 @@ func (v *VideoStorage) makeCanvasVideo(sessID uint64, filesPath string) error { if len(files) == 0 { return nil } - log.Printf("There are %d canvas images of session %d\n", len(files), 0) - type canvasData struct { - files map[int]string - times []int - } - images := make(map[string]*canvasData) + log.Printf("There are %d mix lists of session %d\n", len(files), sessID) + for _, file := range files { - name := strings.Split(file.Name(), ".") - parts := strings.Split(name[0], "_") - if len(name) != 2 || len(parts) != 3 { - log.Printf("unknown file name: %s, skipping", file.Name()) - continue - } - canvasID := fmt.Sprintf("%s_%s", parts[0], parts[1]) - canvasTS, _ := strconv.Atoi(parts[2]) - log.Printf("%s : %d", canvasID, canvasTS) - if _, ok := images[canvasID]; !ok { - images[canvasID] = &canvasData{ - files: make(map[int]string), - times: make([]int, 0), - } - } - images[canvasID].files[canvasTS] = file.Name() - images[canvasID].times = append(images[canvasID].times, canvasTS) - } - for name, cData := range images { - // Write to file - mixList := fmt.Sprintf("%s/%s-list", filesPath, name) - outputFile, err := os.Create(mixList) - if err != nil { - log.Printf("can't create mix list, err: %s", err) + if !strings.HasSuffix(file.Name(), "-list") { continue } - sort.Ints(cData.times) - for i := 0; i < len(cData.times)-1; i++ { - dur := float64(cData.times[i+1]-cData.times[i]) / 1000.0 - line := fmt.Sprintf("file %s\nduration %.3f\n", cData.files[cData.times[i]], dur) - _, err := outputFile.WriteString(line) - if err != nil { - outputFile.Close() - log.Printf("%s", err) - continue - } - log.Printf(line) - } - outputFile.Close() + name := strings.TrimSuffix(file.Name(), "-list") + mixList := fmt.Sprintf("%s%s-list", filesPath, name) + videoPath := fmt.Sprintf("%s%s.mp4", filesPath, name) // Run ffmpeg to build video start := time.Now() sessionID := strconv.FormatUint(sessID, 10) - videoPath := fmt.Sprintf("%s/%s.mp4", filesPath, name) - cmd := exec.Command("ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", mixList, "-vsync", "vfr", + + cmd := exec.Command("ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", mixList, "-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2", "-vsync", "vfr", "-pix_fmt", "yuv420p", "-preset", "ultrafast", videoPath) - // ffmpeg -f concat -safe 0 -i input.txt -vsync vfr -pix_fmt yuv420p output.mp4 var stdout, stderr bytes.Buffer cmd.Stdout = &stdout @@ -180,9 +140,9 @@ func (v *VideoStorage) sendToS3(task *Task) { return } -func (v *VideoStorage) Process(sessID uint64, filesPath string, isCanvas bool) error { - if isCanvas { - return v.makeCanvasVideo(sessID, filesPath) +func (v *VideoStorage) Process(sessID uint64, filesPath string, canvasMix string) error { + if canvasMix != "" { + return v.makeCanvasVideo(sessID, filesPath, canvasMix) } return v.makeVideo(sessID, filesPath) }