diff --git a/backend/Dockerfile b/backend/Dockerfile index 94152aa39..67fc0cff0 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -73,6 +73,7 @@ ENV TZ=UTC \ TOPIC_MOBILE_TRIGGER=mobile-trigger \ TOPIC_CANVAS_IMAGES=canvas-images \ TOPIC_CANVAS_TRIGGER=canvas-trigger \ + TOPIC_REPLAY_TRIGGER=replay-trigger \ GROUP_SINK=sink \ GROUP_STORAGE=storage \ GROUP_DB=db \ diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index 8940d1dcc..962cbc2a6 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -179,6 +179,10 @@ func main() { log.Printf("can't send iOSSessionEnd to topic: %s; sessID: %d", err, sessionID) return false, 0 } + // Inform canvas service about session end + if err := producer.Produce(cfg.TopicRawImages, sessionID, msg.Encode()); err != nil { + log.Printf("can't send sessionEnd signal to mobile images topic: %s; sessID: %d", err, sessionID) + } } else { if err := producer.Produce(cfg.TopicRawWeb, sessionID, msg.Encode()); err != nil { log.Printf("can't send sessionEnd to raw topic: %s; sessID: %d", err, sessionID) diff --git a/backend/cmd/imagestorage/main.go b/backend/cmd/imagestorage/main.go index 338463f33..e7f652679 100644 --- a/backend/cmd/imagestorage/main.go +++ b/backend/cmd/imagestorage/main.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "log" "os" "os/signal" @@ -29,14 +30,44 @@ func main() { return } + producer := queue.NewProducer(cfg.MessageSizeLimit, true) + consumer := queue.NewConsumer( cfg.GroupImageStorage, []string{ cfg.TopicRawImages, }, messages.NewImagesMessageIterator(func(data []byte, sessID uint64) { - if err := srv.Process(sessID, data); err != nil { - log.Printf("can't process image: %s", err) + checkSessionEnd := func(data []byte) (messages.Message, error) { + reader := messages.NewBytesReader(data) + msgType, err := reader.ReadUint() + if err != nil { + return nil, err + } + if msgType != messages.MsgIOSSessionEnd { + return nil, fmt.Errorf("not a mobile session end message") + } + msg, err := messages.ReadMessage(msgType, reader) + if err != nil { + return nil, fmt.Errorf("read message err: %s", err) + } + return msg, nil + } + + if msg, err := checkSessionEnd(data); err == nil { + sessEnd := msg.(*messages.IOSSessionEnd) + // Received session end + if err := srv.Prepare(sessID); err != nil { + log.Printf("can't prepare mobile session: %s", err) + } else { + if err := producer.Produce(cfg.TopicReplayTrigger, sessID, sessEnd.Encode()); err != nil { + log.Printf("can't send session end signal to video service: %s", err) + } + } + } else { + if err := srv.Process(sessID, data); err != nil { + log.Printf("can't process mobile screenshots: %s", err) + } } }, nil, true), false, diff --git a/backend/cmd/videostorage/main.go b/backend/cmd/videostorage/main.go index 948a1ca11..1fd8b764c 100644 --- a/backend/cmd/videostorage/main.go +++ b/backend/cmd/videostorage/main.go @@ -40,7 +40,7 @@ func main() { consumer := queue.NewConsumer( cfg.GroupVideoStorage, []string{ - cfg.TopicMobileTrigger, + cfg.TopicReplayTrigger, }, messages.NewMessageIterator( func(msg messages.Message) { diff --git a/backend/internal/config/ender/config.go b/backend/internal/config/ender/config.go index bdbc4a74f..2a318f015 100644 --- a/backend/internal/config/ender/config.go +++ b/backend/internal/config/ender/config.go @@ -17,6 +17,7 @@ type Config struct { TopicRawWeb string `env:"TOPIC_RAW_WEB,required"` TopicRawIOS string `env:"TOPIC_RAW_IOS,required"` TopicCanvasImages string `env:"TOPIC_CANVAS_IMAGES,required"` + TopicRawImages string `env:"TOPIC_RAW_IMAGES,required"` ProducerTimeout int `env:"PRODUCER_TIMEOUT,default=2000"` PartitionsNumber int `env:"PARTITIONS_NUMBER,required"` UseEncryption bool `env:"USE_ENCRYPTION,default=false"` diff --git a/backend/internal/config/imagestorage/config.go b/backend/internal/config/imagestorage/config.go index c8b5ff5c2..fad646ae5 100644 --- a/backend/internal/config/imagestorage/config.go +++ b/backend/internal/config/imagestorage/config.go @@ -13,6 +13,7 @@ type Config struct { TopicRawImages string `env:"TOPIC_RAW_IMAGES,required"` TopicCanvasImages string `env:"TOPIC_CANVAS_IMAGES,required"` TopicCanvasTrigger string `env:"TOPIC_CANVAS_TRIGGER,required"` + TopicReplayTrigger string `env:"TOPIC_REPLAY_TRIGGER,required"` GroupImageStorage string `env:"GROUP_IMAGE_STORAGE,required"` GroupCanvasImage string `env:"GROUP_CANVAS_IMAGE,required"` UseProfiler bool `env:"PROFILER_ENABLED,default=false"` diff --git a/backend/internal/config/videostorage/config.go b/backend/internal/config/videostorage/config.go index 8b1d8924c..d5bc2afbf 100644 --- a/backend/internal/config/videostorage/config.go +++ b/backend/internal/config/videostorage/config.go @@ -12,8 +12,7 @@ type Config struct { FSDir string `env:"FS_DIR,required"` GroupVideoStorage string `env:"GROUP_VIDEO_STORAGE,required"` GroupCanvasVideo string `env:"GROUP_CANVAS_VIDEO,required"` - TopicMobileTrigger string `env:"TOPIC_MOBILE_TRIGGER,required"` - TopicTrigger string `env:"TOPIC_TRIGGER,required"` + TopicReplayTrigger string `env:"TOPIC_REPLAY_TRIGGER,required"` TopicCanvasTrigger string `env:"TOPIC_CANVAS_TRIGGER,required"` VideoReplayFPS int `env:"VIDEO_REPLAY_FPS,default=3"` UseProfiler bool `env:"PROFILER_ENABLED,default=false"` diff --git a/backend/internal/http/router/handlers-mobile.go b/backend/internal/http/router/handlers-mobile.go index 744aec9fd..123ecf899 100644 --- a/backend/internal/http/router/handlers-mobile.go +++ b/backend/internal/http/router/handlers-mobile.go @@ -55,6 +55,11 @@ func (e *Router) startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) return } + if !checkMobileTrackerVersion(req.TrackerVersion) { + ResponseWithError(w, http.StatusUpgradeRequired, errors.New("tracker version not supported"), startTime, r.URL.Path, 0) + return + } + userUUID := uuid.GetUUID(req.UserUUID) tokenData, err := e.services.Tokenizer.Parse(req.Token) diff --git a/backend/internal/http/router/handlers-web.go b/backend/internal/http/router/handlers-web.go index 3635fe9ca..6a88eed09 100644 --- a/backend/internal/http/router/handlers-web.go +++ b/backend/internal/http/router/handlers-web.go @@ -61,6 +61,23 @@ func (e *Router) readBody(w http.ResponseWriter, r *http.Request, limit int64) ( return bodyBytes, nil } +func checkMobileTrackerVersion(ver string) bool { + c, err := semver.NewConstraint(">=1.0.9") + if err != nil { + return false + } + // Check for beta version + parts := strings.Split(ver, "-") + if len(parts) > 1 { + ver = parts[0] + } + v, err := semver.NewVersion(ver) + if err != nil { + return false + } + return c.Check(v) +} + func getSessionTimestamp(req *StartSessionRequest, startTimeMili int64) (ts uint64) { ts = uint64(req.Timestamp) if req.IsOffline { diff --git a/backend/internal/imagestorage/service.go b/backend/internal/imagestorage/service.go index f7e57a9eb..dba8532b6 100644 --- a/backend/internal/imagestorage/service.go +++ b/backend/internal/imagestorage/service.go @@ -4,6 +4,7 @@ import ( "archive/tar" "bytes" "encoding/json" + "errors" "fmt" "io" "io/ioutil" @@ -72,6 +73,67 @@ func (v *ImageStorage) Process(sessID uint64, data []byte) error { return nil } +func (v *ImageStorage) Prepare(sessID uint64) error { + path := v.cfg.FSDir + "/" + if v.cfg.ScreenshotsDir != "" { + path += v.cfg.ScreenshotsDir + "/" + } + path += strconv.FormatUint(sessID, 10) + "/" + + // Check that the directory exists + files, err := ioutil.ReadDir(path) + if err != nil { + return err + } + if len(files) == 0 { + return errors.New("no screenshots found") + } + + images := make(map[int]string) + times := make([]int, 0, len(files)) + + // Build the list of canvas images sets + for _, file := range files { + name := strings.Split(file.Name(), ".") + parts := strings.Split(name[0], "_") + if len(name) != 2 || len(parts) != 3 { + log.Printf("unknown file name: %s, skipping", file.Name()) + continue + } + screenshotTS, _ := strconv.Atoi(parts[2]) + images[screenshotTS] = file.Name() + times = append(times, screenshotTS) + } + + // Prepare screenshot lists for ffmpeg + + mixName := fmt.Sprintf("%d-list", sessID) + mixList := path + mixName + outputFile, err := os.Create(mixList) + if err != nil { + log.Printf("can't create mix list, err: %s", err) + return err + } + + sort.Ints(times) + count := 0 + for i := 0; i < len(times)-1; i++ { + dur := float64(times[i+1]-times[i]) / 1000.0 + line := fmt.Sprintf("file %s\nduration %.3f\n", images[times[i]], dur) + _, err := outputFile.WriteString(line) + if err != nil { + outputFile.Close() + log.Printf("%s", err) + continue + } + count++ + } + outputFile.Close() + log.Printf("new canvas mix %s with %d images", mixList, count) + + return nil +} + type ScreenshotMessage struct { Name string Data []byte @@ -93,7 +155,6 @@ func (v *ImageStorage) PrepareCanvas(sessID uint64) ([]string, error) { if len(files) == 0 { return []string{}, nil } - log.Printf("There are %d canvas images of session %d\n", len(files), sessID) type canvasData struct { files map[int]string @@ -134,6 +195,7 @@ func (v *ImageStorage) PrepareCanvas(sessID uint64) ([]string, error) { } sort.Ints(cData.times) + count := 0 for i := 0; i < len(cData.times)-1; i++ { dur := float64(cData.times[i+1]-cData.times[i]) / 1000.0 line := fmt.Sprintf("file %s\nduration %.3f\n", cData.files[cData.times[i]], dur) @@ -143,11 +205,13 @@ func (v *ImageStorage) PrepareCanvas(sessID uint64) ([]string, error) { log.Printf("%s", err) continue } + count++ } outputFile.Close() - log.Printf("made canvas list %s", mixList) + log.Printf("new canvas mix %s with %d images", mixList, count) namesList = append(namesList, mixName) } + log.Printf("prepared %d canvas mixes for session %d", len(namesList), sessID) return namesList, nil } diff --git a/backend/internal/videostorage/service.go b/backend/internal/videostorage/service.go index 7a3eae9db..e5e6ddd11 100644 --- a/backend/internal/videostorage/service.go +++ b/backend/internal/videostorage/service.go @@ -53,26 +53,28 @@ func New(cfg *config.Config, objStorage objectstorage.ObjectStorage) (*VideoStor } func (v *VideoStorage) makeVideo(sessID uint64, filesPath string) error { - files, _ := ioutil.ReadDir(filesPath) + files, err := ioutil.ReadDir(filesPath) + if err != nil || len(files) == 0 { + return err // nil error is there is no screenshots + } log.Printf("There are %d screenshot of session %d\n", len(files), sessID) // Try to call ffmpeg and print the result start := time.Now() sessionID := strconv.FormatUint(sessID, 10) - imagesPath := "/mnt/efs/screenshots/" + sessionID + "/%06d.jpeg" + mixList := fmt.Sprintf("%s%s", filesPath, sessionID+"-list") videoPath := "/mnt/efs/screenshots/" + sessionID + "/replay.mp4" - cmd := exec.Command("ffmpeg", "-y", "-f", "image2", "-framerate", v.framerate, "-start_number", "000000", "-i", - imagesPath, "-vf", "scale=-2:1064", "-c:v", "libx264", "-preset", "ultrafast", "-crf", "23", - videoPath) - // ffmpeg -y -f concat -safe 0 -i 1699978964098_29-list -vf --pix_fmt yuv420p -preset ultrafast canvas.mp4 + cmd := exec.Command("ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", mixList, "-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2", "-vsync", "vfr", + "-pix_fmt", "yuv420p", "-preset", "ultrafast", videoPath) var stdout, stderr bytes.Buffer cmd.Stdout = &stdout cmd.Stderr = &stderr - err := cmd.Run() + err = cmd.Run() if err != nil { - log.Fatalf("Failed to execute command: %v, stderr: %v", err, stderr.String()) + log.Printf("Failed to execute command: %v, stderr: %v", err, stderr.String()) + return err } log.Printf("made video replay in %v", time.Since(start)) v.sendToS3Tasks <- &Task{sessionID: sessionID, path: videoPath} @@ -80,42 +82,32 @@ func (v *VideoStorage) makeVideo(sessID uint64, filesPath string) error { } func (v *VideoStorage) makeCanvasVideo(sessID uint64, filesPath, canvasMix string) error { - files, err := ioutil.ReadDir(filesPath) - if err != nil { + name := strings.TrimSuffix(canvasMix, "-list") + mixList := fmt.Sprintf("%s%s", filesPath, canvasMix) + // check that mixList exists + if _, err := ioutil.ReadFile(mixList); err != nil { return err } - if len(files) == 0 { - return nil - } - log.Printf("There are %d mix lists of session %d\n", len(files), sessID) - - for _, file := range files { - if !strings.HasSuffix(file.Name(), "-list") { - continue - } - - name := strings.TrimSuffix(file.Name(), "-list") - mixList := fmt.Sprintf("%s%s-list", filesPath, name) - videoPath := fmt.Sprintf("%s%s.mp4", filesPath, name) - - // Run ffmpeg to build video - start := time.Now() - sessionID := strconv.FormatUint(sessID, 10) - - cmd := exec.Command("ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", mixList, "-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2", "-vsync", "vfr", - "-pix_fmt", "yuv420p", "-preset", "ultrafast", videoPath) - - var stdout, stderr bytes.Buffer - cmd.Stdout = &stdout - cmd.Stderr = &stderr - - err = cmd.Run() - if err != nil { - log.Fatalf("Failed to execute command: %v, stderr: %v", err, stderr.String()) - } - log.Printf("made video replay in %v", time.Since(start)) - v.sendToS3Tasks <- &Task{sessionID: sessionID, path: videoPath, name: "/" + name + ".mp4"} + videoPath := fmt.Sprintf("%s%s.mp4", filesPath, name) + + // Run ffmpeg to build video + start := time.Now() + sessionID := strconv.FormatUint(sessID, 10) + + cmd := exec.Command("ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", mixList, "-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2", "-vsync", "vfr", + "-pix_fmt", "yuv420p", "-preset", "ultrafast", videoPath) + + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + err := cmd.Run() + if err != nil { + log.Printf("Failed to execute command: %v, stderr: %v", err, stderr.String()) + return err } + log.Printf("made video replay in %v", time.Since(start)) + v.sendToS3Tasks <- &Task{sessionID: sessionID, path: videoPath, name: "/" + name + ".mp4"} return nil }