224 lines
6.4 KiB
Go
224 lines
6.4 KiB
Go
package canvases
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"os/exec"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
config "openreplay/backend/internal/config/canvases"
|
|
"openreplay/backend/pkg/logger"
|
|
"openreplay/backend/pkg/messages"
|
|
"openreplay/backend/pkg/metrics/canvas"
|
|
"openreplay/backend/pkg/objectstorage"
|
|
"openreplay/backend/pkg/pool"
|
|
"openreplay/backend/pkg/queue/types"
|
|
)
|
|
|
|
type ImageStorage struct {
|
|
cfg *config.Config
|
|
log logger.Logger
|
|
basePath string
|
|
saverPool pool.WorkerPool
|
|
packerPool pool.WorkerPool
|
|
uploaderPool pool.WorkerPool
|
|
objStorage objectstorage.ObjectStorage
|
|
producer types.Producer
|
|
metrics canvas.Canvas
|
|
}
|
|
|
|
type saveTask struct {
|
|
ctx context.Context
|
|
sessionID uint64
|
|
name string
|
|
image *bytes.Buffer
|
|
}
|
|
|
|
type packTask struct {
|
|
ctx context.Context
|
|
sessionID uint64
|
|
path string
|
|
name string
|
|
}
|
|
|
|
type uploadTask struct {
|
|
ctx context.Context
|
|
path string
|
|
name string
|
|
}
|
|
|
|
func New(cfg *config.Config, log logger.Logger, objStorage objectstorage.ObjectStorage, producer types.Producer, metrics canvas.Canvas) (*ImageStorage, error) {
|
|
switch {
|
|
case cfg == nil:
|
|
return nil, fmt.Errorf("config is empty")
|
|
case log == nil:
|
|
return nil, fmt.Errorf("logger is empty")
|
|
case objStorage == nil:
|
|
return nil, fmt.Errorf("objectStorage is empty")
|
|
case producer == nil:
|
|
return nil, fmt.Errorf("producer is empty")
|
|
case metrics == nil:
|
|
return nil, fmt.Errorf("metrics is empty")
|
|
}
|
|
path := cfg.FSDir + "/"
|
|
if cfg.CanvasDir != "" {
|
|
path += cfg.CanvasDir + "/"
|
|
}
|
|
s := &ImageStorage{
|
|
cfg: cfg,
|
|
log: log,
|
|
basePath: path,
|
|
objStorage: objStorage,
|
|
producer: producer,
|
|
metrics: metrics,
|
|
}
|
|
s.saverPool = pool.NewPool(2, 2, s.writeToDisk)
|
|
s.packerPool = pool.NewPool(8, 16, s.packCanvas)
|
|
s.uploaderPool = pool.NewPool(8, 16, s.sendToS3)
|
|
return s, nil
|
|
}
|
|
|
|
func (v *ImageStorage) Wait() {
|
|
v.saverPool.Pause()
|
|
v.uploaderPool.Pause()
|
|
}
|
|
|
|
func (v *ImageStorage) SaveCanvasToDisk(ctx context.Context, sessID uint64, data []byte) error {
|
|
type canvasData struct {
|
|
Name string
|
|
Data []byte
|
|
}
|
|
var msg = &canvasData{}
|
|
if err := json.Unmarshal(data, msg); err != nil {
|
|
return fmt.Errorf("can't parse canvas message, err: %s", err)
|
|
}
|
|
v.saverPool.Submit(&saveTask{ctx: ctx, sessionID: sessID, name: msg.Name, image: bytes.NewBuffer(msg.Data)})
|
|
return nil
|
|
}
|
|
|
|
func (v *ImageStorage) writeToDisk(payload interface{}) {
|
|
task := payload.(*saveTask)
|
|
path := fmt.Sprintf("%s%d/", v.basePath, task.sessionID)
|
|
|
|
// Ensure the directory exists
|
|
if err := os.MkdirAll(path, 0755); err != nil {
|
|
v.log.Fatal(task.ctx, "can't create a dir, err: %s", err)
|
|
}
|
|
|
|
// Write images to disk
|
|
outFile, err := os.Create(path + task.name)
|
|
if err != nil {
|
|
v.log.Fatal(task.ctx, "can't create an image: %s", err)
|
|
}
|
|
if _, err := io.Copy(outFile, task.image); err != nil {
|
|
v.log.Fatal(task.ctx, "can't copy data to image: %s", err)
|
|
}
|
|
if outFile != nil {
|
|
if err := outFile.Close(); err != nil {
|
|
v.log.Warn(task.ctx, "can't close out file: %s", err)
|
|
}
|
|
}
|
|
v.metrics.RecordCanvasImageSize(float64(task.image.Len()))
|
|
v.metrics.IncreaseTotalSavedImages()
|
|
|
|
v.log.Debug(task.ctx, "canvas image saved, name: %s, size: %3.3f mb", task.name, float64(task.image.Len())/1024.0/1024.0)
|
|
return
|
|
}
|
|
|
|
func (v *ImageStorage) PrepareSessionCanvases(ctx context.Context, sessID uint64) error {
|
|
start := time.Now()
|
|
path := fmt.Sprintf("%s%d/", v.basePath, sessID)
|
|
|
|
// Check that the directory exists
|
|
files, err := os.ReadDir(path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(files) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Build the list of canvas images sets
|
|
names := make(map[string]int)
|
|
for _, file := range files {
|
|
if strings.HasSuffix(file.Name(), ".tar.zst") {
|
|
continue // Skip already created archives
|
|
}
|
|
name := strings.Split(file.Name(), ".")
|
|
parts := strings.Split(name[0], "_")
|
|
if len(name) != 2 || len(parts) != 3 {
|
|
v.log.Warn(ctx, "unknown file name: %s, skipping", file.Name())
|
|
continue
|
|
}
|
|
canvasID := fmt.Sprintf("%s_%s", parts[0], parts[1])
|
|
names[canvasID]++
|
|
}
|
|
|
|
for name, number := range names {
|
|
msg := &messages.CustomEvent{
|
|
Name: name,
|
|
Payload: path,
|
|
}
|
|
if err := v.producer.Produce(v.cfg.TopicCanvasTrigger, sessID, msg.Encode()); err != nil {
|
|
v.log.Error(ctx, "can't send canvas trigger: %s", err)
|
|
}
|
|
v.metrics.RecordImagesPerCanvas(float64(number))
|
|
}
|
|
v.metrics.RecordCanvasesPerSession(float64(len(names)))
|
|
v.metrics.RecordPreparingDuration(time.Since(start).Seconds())
|
|
|
|
v.log.Debug(ctx, "session canvases (%d) prepared in %.3fs, session: %d", len(names), time.Since(start).Seconds(), sessID)
|
|
return nil
|
|
}
|
|
|
|
func (v *ImageStorage) ProcessSessionCanvas(ctx context.Context, sessID uint64, path, name string) error {
|
|
v.packerPool.Submit(&packTask{ctx: ctx, sessionID: sessID, path: path, name: name})
|
|
return nil
|
|
}
|
|
|
|
func (v *ImageStorage) packCanvas(payload interface{}) {
|
|
task := payload.(*packTask)
|
|
start := time.Now()
|
|
sessionID := strconv.FormatUint(task.sessionID, 10)
|
|
|
|
// Save to archives
|
|
archPath := fmt.Sprintf("%s%s.tar.zst", task.path, task.name)
|
|
fullCmd := fmt.Sprintf("find %s -type f -name '%s*' ! -name '*.tar.zst' | tar -cf - --files-from=- | zstd -f -o %s",
|
|
task.path, task.name, archPath)
|
|
cmd := exec.Command("sh", "-c", fullCmd)
|
|
var stdout, stderr bytes.Buffer
|
|
cmd.Stdout = &stdout
|
|
cmd.Stderr = &stderr
|
|
|
|
err := cmd.Run()
|
|
if err != nil {
|
|
v.log.Fatal(task.ctx, "failed to execute command, err: %s, stderr: %v", err, stderr.String())
|
|
}
|
|
v.metrics.RecordArchivingDuration(time.Since(start).Seconds())
|
|
v.metrics.IncreaseTotalCreatedArchives()
|
|
|
|
v.log.Debug(task.ctx, "canvas packed successfully in %.3fs, session: %d", time.Since(start).Seconds(), task.sessionID)
|
|
v.uploaderPool.Submit(&uploadTask{ctx: task.ctx, path: archPath, name: sessionID + "/" + task.name + ".tar.zst"})
|
|
}
|
|
|
|
func (v *ImageStorage) sendToS3(payload interface{}) {
|
|
task := payload.(*uploadTask)
|
|
start := time.Now()
|
|
video, err := os.ReadFile(task.path)
|
|
if err != nil {
|
|
v.log.Fatal(task.ctx, "failed to read canvas archive: %s", err)
|
|
}
|
|
if err := v.objStorage.Upload(bytes.NewReader(video), task.name, "application/octet-stream", objectstorage.NoContentEncoding, objectstorage.Zstd); err != nil {
|
|
v.log.Fatal(task.ctx, "failed to upload canvas to storage: %s", err)
|
|
}
|
|
v.metrics.RecordUploadingDuration(time.Since(start).Seconds())
|
|
v.metrics.RecordArchiveSize(float64(len(video)))
|
|
|
|
v.log.Debug(task.ctx, "replay file (size: %d) uploaded successfully in %.3fs", len(video), time.Since(start).Seconds())
|
|
}
|