* feat(backend): added new trigger topic for mobile video replay maker * feat(backend): mobile video maker with mix list support * feat(backend): fixed panic * feat(backend): removed commented code
309 lines
7.4 KiB
Go
309 lines
7.4 KiB
Go
package imagestorage
|
|
|
|
import (
|
|
"archive/tar"
|
|
"bytes"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"log"
|
|
"os"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
gzip "github.com/klauspost/pgzip"
|
|
config "openreplay/backend/internal/config/imagestorage"
|
|
)
|
|
|
|
type ImageType uint8
|
|
|
|
const (
|
|
screenshot ImageType = iota
|
|
canvas
|
|
)
|
|
|
|
type Task struct {
|
|
sessionID uint64 // to generate path
|
|
images map[string]*bytes.Buffer
|
|
imageType ImageType
|
|
isBreakTask bool
|
|
}
|
|
|
|
func NewBreakTask() *Task {
|
|
return &Task{isBreakTask: true}
|
|
}
|
|
|
|
type ImageStorage struct {
|
|
cfg *config.Config
|
|
writeToDiskTasks chan *Task
|
|
workersStopped chan struct{}
|
|
}
|
|
|
|
func New(cfg *config.Config) (*ImageStorage, error) {
|
|
switch {
|
|
case cfg == nil:
|
|
return nil, fmt.Errorf("config is empty")
|
|
}
|
|
newStorage := &ImageStorage{
|
|
cfg: cfg,
|
|
writeToDiskTasks: make(chan *Task, 1),
|
|
workersStopped: make(chan struct{}),
|
|
}
|
|
go newStorage.runWorker()
|
|
return newStorage, nil
|
|
}
|
|
|
|
func (v *ImageStorage) Wait() {
|
|
// send stop signal
|
|
v.writeToDiskTasks <- NewBreakTask()
|
|
// wait for workers to stop
|
|
<-v.workersStopped
|
|
}
|
|
|
|
func (v *ImageStorage) Process(sessID uint64, data []byte) error {
|
|
start := time.Now()
|
|
if err := v.extract(sessID, data); err != nil {
|
|
return err
|
|
}
|
|
log.Printf("sessID: %d, arch size: %d, extracted archive in: %s", sessID, len(data), time.Since(start))
|
|
return nil
|
|
}
|
|
|
|
func (v *ImageStorage) Prepare(sessID uint64) error {
|
|
path := v.cfg.FSDir + "/"
|
|
if v.cfg.ScreenshotsDir != "" {
|
|
path += v.cfg.ScreenshotsDir + "/"
|
|
}
|
|
path += strconv.FormatUint(sessID, 10) + "/"
|
|
|
|
// Check that the directory exists
|
|
files, err := ioutil.ReadDir(path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(files) == 0 {
|
|
return errors.New("no screenshots found")
|
|
}
|
|
|
|
images := make(map[int]string)
|
|
times := make([]int, 0, len(files))
|
|
|
|
// Build the list of canvas images sets
|
|
for _, file := range files {
|
|
name := strings.Split(file.Name(), ".")
|
|
parts := strings.Split(name[0], "_")
|
|
if len(name) != 2 || len(parts) != 3 {
|
|
log.Printf("unknown file name: %s, skipping", file.Name())
|
|
continue
|
|
}
|
|
screenshotTS, _ := strconv.Atoi(parts[2])
|
|
images[screenshotTS] = file.Name()
|
|
times = append(times, screenshotTS)
|
|
}
|
|
|
|
// Prepare screenshot lists for ffmpeg
|
|
|
|
mixName := fmt.Sprintf("%d-list", sessID)
|
|
mixList := path + mixName
|
|
outputFile, err := os.Create(mixList)
|
|
if err != nil {
|
|
log.Printf("can't create mix list, err: %s", err)
|
|
return err
|
|
}
|
|
|
|
sort.Ints(times)
|
|
count := 0
|
|
for i := 0; i < len(times)-1; i++ {
|
|
dur := float64(times[i+1]-times[i]) / 1000.0
|
|
line := fmt.Sprintf("file %s\nduration %.3f\n", images[times[i]], dur)
|
|
_, err := outputFile.WriteString(line)
|
|
if err != nil {
|
|
outputFile.Close()
|
|
log.Printf("%s", err)
|
|
continue
|
|
}
|
|
count++
|
|
}
|
|
outputFile.Close()
|
|
log.Printf("new canvas mix %s with %d images", mixList, count)
|
|
|
|
return nil
|
|
}
|
|
|
|
type ScreenshotMessage struct {
|
|
Name string
|
|
Data []byte
|
|
}
|
|
|
|
func (v *ImageStorage) PrepareCanvas(sessID uint64) ([]string, error) {
|
|
// Build the directory path to session's canvas images
|
|
path := v.cfg.FSDir + "/"
|
|
if v.cfg.CanvasDir != "" {
|
|
path += v.cfg.CanvasDir + "/"
|
|
}
|
|
path += strconv.FormatUint(sessID, 10) + "/"
|
|
|
|
// Check that the directory exists
|
|
files, err := ioutil.ReadDir(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(files) == 0 {
|
|
return []string{}, nil
|
|
}
|
|
|
|
type canvasData struct {
|
|
files map[int]string
|
|
times []int
|
|
}
|
|
images := make(map[string]*canvasData)
|
|
|
|
// Build the list of canvas images sets
|
|
for _, file := range files {
|
|
name := strings.Split(file.Name(), ".")
|
|
parts := strings.Split(name[0], "_")
|
|
if len(name) != 2 || len(parts) != 3 {
|
|
log.Printf("unknown file name: %s, skipping", file.Name())
|
|
continue
|
|
}
|
|
canvasID := fmt.Sprintf("%s_%s", parts[0], parts[1])
|
|
canvasTS, _ := strconv.Atoi(parts[2])
|
|
if _, ok := images[canvasID]; !ok {
|
|
images[canvasID] = &canvasData{
|
|
files: make(map[int]string),
|
|
times: make([]int, 0),
|
|
}
|
|
}
|
|
images[canvasID].files[canvasTS] = file.Name()
|
|
images[canvasID].times = append(images[canvasID].times, canvasTS)
|
|
}
|
|
|
|
// Prepare screenshot lists for ffmpeg
|
|
namesList := make([]string, 0)
|
|
for name, cData := range images {
|
|
// Write to file
|
|
mixName := fmt.Sprintf("%s-list", name)
|
|
mixList := path + mixName
|
|
outputFile, err := os.Create(mixList)
|
|
if err != nil {
|
|
log.Printf("can't create mix list, err: %s", err)
|
|
continue
|
|
}
|
|
|
|
sort.Ints(cData.times)
|
|
count := 0
|
|
for i := 0; i < len(cData.times)-1; i++ {
|
|
dur := float64(cData.times[i+1]-cData.times[i]) / 1000.0
|
|
line := fmt.Sprintf("file %s\nduration %.3f\n", cData.files[cData.times[i]], dur)
|
|
_, err := outputFile.WriteString(line)
|
|
if err != nil {
|
|
outputFile.Close()
|
|
log.Printf("%s", err)
|
|
continue
|
|
}
|
|
count++
|
|
}
|
|
outputFile.Close()
|
|
log.Printf("new canvas mix %s with %d images", mixList, count)
|
|
namesList = append(namesList, mixName)
|
|
}
|
|
log.Printf("prepared %d canvas mixes for session %d", len(namesList), sessID)
|
|
return namesList, nil
|
|
}
|
|
|
|
func (v *ImageStorage) ProcessCanvas(sessID uint64, data []byte) error {
|
|
var msg = &ScreenshotMessage{}
|
|
if err := json.Unmarshal(data, msg); err != nil {
|
|
log.Printf("can't parse canvas message, err: %s", err)
|
|
}
|
|
// Use the same workflow
|
|
v.writeToDiskTasks <- &Task{sessionID: sessID, images: map[string]*bytes.Buffer{msg.Name: bytes.NewBuffer(msg.Data)}, imageType: canvas}
|
|
log.Printf("new canvas image, sessID: %d, name: %s, size: %3.3f mb", sessID, msg.Name, float64(len(msg.Data))/1024.0/1024.0)
|
|
return nil
|
|
}
|
|
|
|
func (v *ImageStorage) extract(sessID uint64, data []byte) error {
|
|
images := make(map[string]*bytes.Buffer)
|
|
uncompressedStream, err := gzip.NewReader(bytes.NewReader(data))
|
|
if err != nil {
|
|
return fmt.Errorf("can't create gzip reader: %s", err.Error())
|
|
}
|
|
tarReader := tar.NewReader(uncompressedStream)
|
|
|
|
for {
|
|
header, err := tarReader.Next()
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
return fmt.Errorf("can't read tar header: %s", err.Error())
|
|
}
|
|
|
|
if header.Typeflag == tar.TypeReg {
|
|
var buf bytes.Buffer
|
|
if _, err := buf.ReadFrom(tarReader); err != nil {
|
|
return fmt.Errorf("can't copy file: %s", err.Error())
|
|
}
|
|
images[header.Name] = &buf
|
|
} else {
|
|
log.Printf("ExtractTarGz: uknown type: %d in %s", header.Typeflag, header.Name)
|
|
}
|
|
}
|
|
|
|
v.writeToDiskTasks <- &Task{sessionID: sessID, images: images, imageType: screenshot}
|
|
return nil
|
|
}
|
|
|
|
func (v *ImageStorage) writeToDisk(task *Task) {
|
|
// Build the directory path
|
|
path := v.cfg.FSDir + "/"
|
|
if task.imageType == screenshot {
|
|
if v.cfg.ScreenshotsDir != "" {
|
|
path += v.cfg.ScreenshotsDir + "/"
|
|
}
|
|
} else {
|
|
if v.cfg.CanvasDir != "" {
|
|
path += v.cfg.CanvasDir + "/"
|
|
}
|
|
}
|
|
|
|
path += strconv.FormatUint(task.sessionID, 10) + "/"
|
|
|
|
// Ensure the directory exists
|
|
if err := os.MkdirAll(path, 0755); err != nil {
|
|
log.Fatalf("Error creating directories: %v", err)
|
|
}
|
|
|
|
// Write images to disk
|
|
saved := 0
|
|
for name, img := range task.images {
|
|
outFile, err := os.Create(path + name) // or open file in rewrite mode
|
|
if err != nil {
|
|
log.Printf("can't create file: %s", err.Error())
|
|
}
|
|
if _, err := io.Copy(outFile, img); err != nil {
|
|
log.Printf("can't copy file: %s", err.Error())
|
|
}
|
|
outFile.Close()
|
|
saved++
|
|
}
|
|
log.Printf("saved %d images to disk", saved)
|
|
return
|
|
}
|
|
|
|
func (v *ImageStorage) runWorker() {
|
|
for {
|
|
select {
|
|
case task := <-v.writeToDiskTasks:
|
|
if task.isBreakTask {
|
|
v.workersStopped <- struct{}{}
|
|
continue
|
|
}
|
|
v.writeToDisk(task)
|
|
}
|
|
}
|
|
}
|