No ffmpeg solution (#1905)
* feat(video-storage): added zstd library to the machine * feat(video-storage): added new method to pack screenshots into compressed tar arch * feat(video-storage): try to split command into 2 * feat(video-storage): try a new approad to avoid no file error * feat(api): added support to a new pre-signed url for screenshots archive * feat(video-storage): fixed an issue in extension check * feat(video-storage): correct file name * feat(backend): removed video-storage and splitted logic * feat(backend): removed video-storage from helm charts * feat(backend): split canvas and screenshot handlers * feat(canvas): clean up canvas-handler * feat(api): changed mobile replay url (screenshots instead of video) * feat(backend): removed msg.SessID() call * feat(backend): clean up code in imagestorage main
This commit is contained in:
parent
6465f206be
commit
a073ce498d
26 changed files with 414 additions and 1022 deletions
|
|
@ -50,7 +50,7 @@ S3_HOST=
|
||||||
S3_KEY=
|
S3_KEY=
|
||||||
S3_SECRET=
|
S3_SECRET=
|
||||||
SCH_DELETE_DAYS=30
|
SCH_DELETE_DAYS=30
|
||||||
SESSION_IOS_VIDEO_PATTERN=%(sessionId)s/replay.mp4
|
SESSION_IOS_VIDEO_PATTERN=%(sessionId)s/replay.tar.zst
|
||||||
SESSION_MOB_PATTERN_E=%(sessionId)s/dom.mobe
|
SESSION_MOB_PATTERN_E=%(sessionId)s/dom.mobe
|
||||||
SESSION_MOB_PATTERN_S=%(sessionId)s/dom.mobs
|
SESSION_MOB_PATTERN_S=%(sessionId)s/dom.mobs
|
||||||
sessions_bucket=mobs
|
sessions_bucket=mobs
|
||||||
|
|
|
||||||
|
|
@ -51,7 +51,7 @@ root_path=''
|
||||||
S3_HOST=
|
S3_HOST=
|
||||||
S3_KEY=
|
S3_KEY=
|
||||||
S3_SECRET=
|
S3_SECRET=
|
||||||
SESSION_IOS_VIDEO_PATTERN=%(sessionId)s/replay.mp4
|
SESSION_IOS_VIDEO_PATTERN=%(sessionId)s/replay.tar.zst
|
||||||
SESSION_MOB_PATTERN_E=%(sessionId)s/dom.mobe
|
SESSION_MOB_PATTERN_E=%(sessionId)s/dom.mobe
|
||||||
SESSION_MOB_PATTERN_S=%(sessionId)s/dom.mobs
|
SESSION_MOB_PATTERN_S=%(sessionId)s/dom.mobs
|
||||||
sessions_bucket=mobs
|
sessions_bucket=mobs
|
||||||
|
|
|
||||||
|
|
@ -61,7 +61,6 @@ ENV TZ=UTC \
|
||||||
TOPIC_MOBILE_TRIGGER=mobile-trigger \
|
TOPIC_MOBILE_TRIGGER=mobile-trigger \
|
||||||
TOPIC_CANVAS_IMAGES=canvas-images \
|
TOPIC_CANVAS_IMAGES=canvas-images \
|
||||||
TOPIC_CANVAS_TRIGGER=canvas-trigger \
|
TOPIC_CANVAS_TRIGGER=canvas-trigger \
|
||||||
TOPIC_REPLAY_TRIGGER=replay-trigger \
|
|
||||||
GROUP_SINK=sink \
|
GROUP_SINK=sink \
|
||||||
GROUP_STORAGE=storage \
|
GROUP_STORAGE=storage \
|
||||||
GROUP_DB=db \
|
GROUP_DB=db \
|
||||||
|
|
@ -69,7 +68,6 @@ ENV TZ=UTC \
|
||||||
GROUP_CACHE=cache \
|
GROUP_CACHE=cache \
|
||||||
GROUP_HEURISTICS=heuristics \
|
GROUP_HEURISTICS=heuristics \
|
||||||
GROUP_IMAGE_STORAGE=image-storage \
|
GROUP_IMAGE_STORAGE=image-storage \
|
||||||
GROUP_VIDEO_STORAGE=video-storage \
|
|
||||||
GROUP_CANVAS_IMAGE=canvas-image \
|
GROUP_CANVAS_IMAGE=canvas-image \
|
||||||
GROUP_CANVAS_VIDEO=canvas-video \
|
GROUP_CANVAS_VIDEO=canvas-video \
|
||||||
AWS_REGION_WEB=eu-central-1 \
|
AWS_REGION_WEB=eu-central-1 \
|
||||||
|
|
@ -111,8 +109,8 @@ ENV TZ=UTC \
|
||||||
RUN if [ "$SERVICE_NAME" = "http" ]; then \
|
RUN if [ "$SERVICE_NAME" = "http" ]; then \
|
||||||
wget https://raw.githubusercontent.com/ua-parser/uap-core/master/regexes.yaml -O "$UAPARSER_FILE" &&\
|
wget https://raw.githubusercontent.com/ua-parser/uap-core/master/regexes.yaml -O "$UAPARSER_FILE" &&\
|
||||||
wget https://static.openreplay.com/geoip/GeoLite2-City.mmdb -O "$MAXMINDDB_FILE"; \
|
wget https://static.openreplay.com/geoip/GeoLite2-City.mmdb -O "$MAXMINDDB_FILE"; \
|
||||||
elif [ "$SERVICE_NAME" = "videostorage" ]; then \
|
elif [ "$SERVICE_NAME" = "imagestorage" ]; then \
|
||||||
apk add --no-cache ffmpeg; \
|
apk add --no-cache zstd; \
|
||||||
elif [ "$SERVICE_NAME" = "canvas-maker" ]; then \
|
elif [ "$SERVICE_NAME" = "canvas-maker" ]; then \
|
||||||
apk add --no-cache ffmpeg; \
|
apk add --no-cache ffmpeg; \
|
||||||
fi
|
fi
|
||||||
|
|
|
||||||
|
|
@ -8,8 +8,8 @@ import (
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"openreplay/backend/internal/canvas-handler"
|
||||||
config "openreplay/backend/internal/config/imagestorage"
|
config "openreplay/backend/internal/config/imagestorage"
|
||||||
"openreplay/backend/internal/imagestorage"
|
|
||||||
"openreplay/backend/pkg/messages"
|
"openreplay/backend/pkg/messages"
|
||||||
"openreplay/backend/pkg/metrics"
|
"openreplay/backend/pkg/metrics"
|
||||||
storageMetrics "openreplay/backend/pkg/metrics/imagestorage"
|
storageMetrics "openreplay/backend/pkg/metrics/imagestorage"
|
||||||
|
|
@ -24,7 +24,7 @@ func main() {
|
||||||
|
|
||||||
cfg := config.New()
|
cfg := config.New()
|
||||||
|
|
||||||
srv, err := imagestorage.New(cfg)
|
srv, err := canvas_handler.New(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("can't init storage service: %s", err)
|
log.Printf("can't init storage service: %s", err)
|
||||||
return
|
return
|
||||||
|
|
@ -57,7 +57,7 @@ func main() {
|
||||||
if msg, err := checkSessionEnd(data); err == nil {
|
if msg, err := checkSessionEnd(data); err == nil {
|
||||||
sessEnd := msg.(*messages.SessionEnd)
|
sessEnd := msg.(*messages.SessionEnd)
|
||||||
// Received session end
|
// Received session end
|
||||||
if list, err := srv.PrepareCanvas(sessID); err != nil {
|
if list, err := srv.PrepareCanvasList(sessID); err != nil {
|
||||||
log.Printf("can't prepare canvas: %s", err)
|
log.Printf("can't prepare canvas: %s", err)
|
||||||
} else {
|
} else {
|
||||||
for _, name := range list {
|
for _, name := range list {
|
||||||
|
|
@ -68,7 +68,7 @@ func main() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if err := srv.ProcessCanvas(sessID, data); err != nil {
|
if err := srv.SaveCanvasToDisk(sessID, data); err != nil {
|
||||||
log.Printf("can't process canvas image: %s", err)
|
log.Printf("can't process canvas image: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
config "openreplay/backend/internal/config/videostorage"
|
config "openreplay/backend/internal/config/videostorage"
|
||||||
"openreplay/backend/internal/videostorage"
|
"openreplay/backend/internal/video-maker"
|
||||||
"openreplay/backend/pkg/messages"
|
"openreplay/backend/pkg/messages"
|
||||||
"openreplay/backend/pkg/metrics"
|
"openreplay/backend/pkg/metrics"
|
||||||
storageMetrics "openreplay/backend/pkg/metrics/videostorage"
|
storageMetrics "openreplay/backend/pkg/metrics/videostorage"
|
||||||
|
|
@ -30,7 +30,7 @@ func main() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("can't init object storage: %s", err)
|
log.Fatalf("can't init object storage: %s", err)
|
||||||
}
|
}
|
||||||
srv, err := videostorage.New(cfg, objStore)
|
srv, err := video_maker.New(cfg, objStore)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("can't init storage service: %s", err)
|
log.Printf("can't init storage service: %s", err)
|
||||||
return
|
return
|
||||||
|
|
|
||||||
|
|
@ -3,13 +3,15 @@ package main
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"openreplay/backend/pkg/objectstorage/store"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"strconv"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
config "openreplay/backend/internal/config/imagestorage"
|
config "openreplay/backend/internal/config/imagestorage"
|
||||||
"openreplay/backend/internal/imagestorage"
|
"openreplay/backend/internal/screenshot-handler"
|
||||||
"openreplay/backend/pkg/messages"
|
"openreplay/backend/pkg/messages"
|
||||||
"openreplay/backend/pkg/metrics"
|
"openreplay/backend/pkg/metrics"
|
||||||
storageMetrics "openreplay/backend/pkg/metrics/imagestorage"
|
storageMetrics "openreplay/backend/pkg/metrics/imagestorage"
|
||||||
|
|
@ -24,13 +26,18 @@ func main() {
|
||||||
|
|
||||||
cfg := config.New()
|
cfg := config.New()
|
||||||
|
|
||||||
srv, err := imagestorage.New(cfg)
|
objStore, err := store.NewStore(&cfg.ObjectsConfig)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("can't init object storage: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
srv, err := screenshot_handler.New(cfg, objStore)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("can't init storage service: %s", err)
|
log.Printf("can't init storage service: %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
|
workDir := cfg.FSDir
|
||||||
|
|
||||||
consumer := queue.NewConsumer(
|
consumer := queue.NewConsumer(
|
||||||
cfg.GroupImageStorage,
|
cfg.GroupImageStorage,
|
||||||
|
|
@ -38,6 +45,7 @@ func main() {
|
||||||
cfg.TopicRawImages,
|
cfg.TopicRawImages,
|
||||||
},
|
},
|
||||||
messages.NewImagesMessageIterator(func(data []byte, sessID uint64) {
|
messages.NewImagesMessageIterator(func(data []byte, sessID uint64) {
|
||||||
|
log.Printf("Received message for session %d\n", sessID)
|
||||||
checkSessionEnd := func(data []byte) (messages.Message, error) {
|
checkSessionEnd := func(data []byte) (messages.Message, error) {
|
||||||
reader := messages.NewBytesReader(data)
|
reader := messages.NewBytesReader(data)
|
||||||
msgType, err := reader.ReadUint()
|
msgType, err := reader.ReadUint()
|
||||||
|
|
@ -54,17 +62,13 @@ func main() {
|
||||||
return msg, nil
|
return msg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if msg, err := checkSessionEnd(data); err == nil {
|
if _, err := checkSessionEnd(data); err == nil {
|
||||||
sessEnd := msg.(*messages.IOSSessionEnd)
|
// Pack all screenshots from mobile session, compress and upload to object storage
|
||||||
// Received session end
|
if err := srv.PackScreenshots(sessID, workDir+"/screenshots/"+strconv.FormatUint(sessID, 10)+"/"); err != nil {
|
||||||
if err := srv.Prepare(sessID); err != nil {
|
log.Printf("upload session err: %s, sessID: %d", err, sessID)
|
||||||
log.Printf("can't prepare mobile session: %s", err)
|
|
||||||
} else {
|
|
||||||
if err := producer.Produce(cfg.TopicReplayTrigger, sessID, sessEnd.Encode()); err != nil {
|
|
||||||
log.Printf("can't send session end signal to video service: %s", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
// Unpack new screenshots package from mobile session
|
||||||
if err := srv.Process(sessID, data); err != nil {
|
if err := srv.Process(sessID, data); err != nil {
|
||||||
log.Printf("can't process mobile screenshots: %s", err)
|
log.Printf("can't process mobile screenshots: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,87 +0,0 @@
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"os/signal"
|
|
||||||
"strconv"
|
|
||||||
"syscall"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
config "openreplay/backend/internal/config/videostorage"
|
|
||||||
"openreplay/backend/internal/videostorage"
|
|
||||||
"openreplay/backend/pkg/messages"
|
|
||||||
"openreplay/backend/pkg/metrics"
|
|
||||||
storageMetrics "openreplay/backend/pkg/metrics/videostorage"
|
|
||||||
"openreplay/backend/pkg/objectstorage/store"
|
|
||||||
"openreplay/backend/pkg/queue"
|
|
||||||
)
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
m := metrics.New()
|
|
||||||
m.Register(storageMetrics.List())
|
|
||||||
|
|
||||||
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
|
|
||||||
|
|
||||||
cfg := config.New()
|
|
||||||
|
|
||||||
objStore, err := store.NewStore(&cfg.ObjectsConfig)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("can't init object storage: %s", err)
|
|
||||||
}
|
|
||||||
srv, err := videostorage.New(cfg, objStore)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("can't init storage service: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
workDir := cfg.FSDir
|
|
||||||
|
|
||||||
consumer := queue.NewConsumer(
|
|
||||||
cfg.GroupVideoStorage,
|
|
||||||
[]string{
|
|
||||||
cfg.TopicReplayTrigger,
|
|
||||||
},
|
|
||||||
messages.NewMessageIterator(
|
|
||||||
func(msg messages.Message) {
|
|
||||||
sesEnd := msg.(*messages.IOSSessionEnd)
|
|
||||||
log.Printf("recieved mobile session end: %d", sesEnd.SessionID())
|
|
||||||
if err := srv.Process(sesEnd.SessionID(), workDir+"/screenshots/"+strconv.FormatUint(sesEnd.SessionID(), 10)+"/", ""); err != nil {
|
|
||||||
log.Printf("upload session err: %s, sessID: %d", err, msg.SessionID())
|
|
||||||
}
|
|
||||||
},
|
|
||||||
[]int{messages.MsgIOSSessionEnd},
|
|
||||||
true,
|
|
||||||
),
|
|
||||||
false,
|
|
||||||
cfg.MessageSizeLimit,
|
|
||||||
)
|
|
||||||
|
|
||||||
log.Printf("Video storage service started\n")
|
|
||||||
|
|
||||||
sigchan := make(chan os.Signal, 1)
|
|
||||||
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
|
|
||||||
|
|
||||||
counterTick := time.Tick(time.Second * 30)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case sig := <-sigchan:
|
|
||||||
log.Printf("Caught signal %v: terminating\n", sig)
|
|
||||||
srv.Wait()
|
|
||||||
consumer.Close()
|
|
||||||
os.Exit(0)
|
|
||||||
case <-counterTick:
|
|
||||||
srv.Wait()
|
|
||||||
if err := consumer.Commit(); err != nil {
|
|
||||||
log.Printf("can't commit messages: %s", err)
|
|
||||||
}
|
|
||||||
case msg := <-consumer.Rebalanced():
|
|
||||||
log.Println(msg)
|
|
||||||
default:
|
|
||||||
err = consumer.ConsumeNext()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Error on end event consumption: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
180
backend/internal/canvas-handler/service.go
Normal file
180
backend/internal/canvas-handler/service.go
Normal file
|
|
@ -0,0 +1,180 @@
|
||||||
|
package canvas_handler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"sort"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
config "openreplay/backend/internal/config/imagestorage"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Task struct {
|
||||||
|
sessionID uint64 // to generate path
|
||||||
|
name string
|
||||||
|
image *bytes.Buffer
|
||||||
|
isBreakTask bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBreakTask() *Task {
|
||||||
|
return &Task{isBreakTask: true}
|
||||||
|
}
|
||||||
|
|
||||||
|
type ImageStorage struct {
|
||||||
|
cfg *config.Config
|
||||||
|
basePath string
|
||||||
|
writeToDiskTasks chan *Task
|
||||||
|
imageWorkerStopped chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(cfg *config.Config) (*ImageStorage, error) {
|
||||||
|
switch {
|
||||||
|
case cfg == nil:
|
||||||
|
return nil, fmt.Errorf("config is empty")
|
||||||
|
}
|
||||||
|
path := cfg.FSDir + "/"
|
||||||
|
if cfg.CanvasDir != "" {
|
||||||
|
path += cfg.CanvasDir + "/"
|
||||||
|
}
|
||||||
|
newStorage := &ImageStorage{
|
||||||
|
cfg: cfg,
|
||||||
|
basePath: path,
|
||||||
|
writeToDiskTasks: make(chan *Task, 1),
|
||||||
|
imageWorkerStopped: make(chan struct{}),
|
||||||
|
}
|
||||||
|
go newStorage.runWorker()
|
||||||
|
return newStorage, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *ImageStorage) Wait() {
|
||||||
|
// send stop signal
|
||||||
|
v.writeToDiskTasks <- NewBreakTask()
|
||||||
|
|
||||||
|
// wait for workers to stop
|
||||||
|
<-v.imageWorkerStopped
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *ImageStorage) PrepareCanvasList(sessID uint64) ([]string, error) {
|
||||||
|
path := fmt.Sprintf("%s/%d/", v.basePath, sessID)
|
||||||
|
|
||||||
|
// Check that the directory exists
|
||||||
|
files, err := os.ReadDir(path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(files) == 0 {
|
||||||
|
return []string{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type canvasData struct {
|
||||||
|
files map[int]string
|
||||||
|
times []int
|
||||||
|
}
|
||||||
|
images := make(map[string]*canvasData)
|
||||||
|
|
||||||
|
// Build the list of canvas images sets
|
||||||
|
for _, file := range files {
|
||||||
|
name := strings.Split(file.Name(), ".")
|
||||||
|
parts := strings.Split(name[0], "_")
|
||||||
|
if len(name) != 2 || len(parts) != 3 {
|
||||||
|
log.Printf("unknown file name: %s, skipping", file.Name())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
canvasID := fmt.Sprintf("%s_%s", parts[0], parts[1])
|
||||||
|
canvasTS, _ := strconv.Atoi(parts[2])
|
||||||
|
if _, ok := images[canvasID]; !ok {
|
||||||
|
images[canvasID] = &canvasData{
|
||||||
|
files: make(map[int]string),
|
||||||
|
times: make([]int, 0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
images[canvasID].files[canvasTS] = file.Name()
|
||||||
|
images[canvasID].times = append(images[canvasID].times, canvasTS)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepare screenshot lists for ffmpeg
|
||||||
|
namesList := make([]string, 0)
|
||||||
|
for name, cData := range images {
|
||||||
|
// Write to file
|
||||||
|
mixName := fmt.Sprintf("%s-list", name)
|
||||||
|
mixList := path + mixName
|
||||||
|
outputFile, err := os.Create(mixList)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("can't create mix list, err: %s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Ints(cData.times)
|
||||||
|
count := 0
|
||||||
|
for i := 0; i < len(cData.times)-1; i++ {
|
||||||
|
dur := float64(cData.times[i+1]-cData.times[i]) / 1000.0
|
||||||
|
line := fmt.Sprintf("file %s\nduration %.3f\n", cData.files[cData.times[i]], dur)
|
||||||
|
_, err := outputFile.WriteString(line)
|
||||||
|
if err != nil {
|
||||||
|
outputFile.Close()
|
||||||
|
log.Printf("%s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
outputFile.Close()
|
||||||
|
log.Printf("new canvas mix %s with %d images", mixList, count)
|
||||||
|
namesList = append(namesList, mixName)
|
||||||
|
}
|
||||||
|
log.Printf("prepared %d canvas mixes for session %d", len(namesList), sessID)
|
||||||
|
return namesList, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *ImageStorage) SaveCanvasToDisk(sessID uint64, data []byte) error {
|
||||||
|
type canvasData struct {
|
||||||
|
Name string
|
||||||
|
Data []byte
|
||||||
|
}
|
||||||
|
var msg = &canvasData{}
|
||||||
|
if err := json.Unmarshal(data, msg); err != nil {
|
||||||
|
log.Printf("can't parse canvas message, err: %s", err)
|
||||||
|
}
|
||||||
|
// Use the same workflow
|
||||||
|
v.writeToDiskTasks <- &Task{sessionID: sessID, name: msg.Name, image: bytes.NewBuffer(msg.Data)}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *ImageStorage) writeToDisk(task *Task) {
|
||||||
|
path := fmt.Sprintf("%s/%d/", v.basePath, task.sessionID)
|
||||||
|
|
||||||
|
// Ensure the directory exists
|
||||||
|
if err := os.MkdirAll(path, 0755); err != nil {
|
||||||
|
log.Fatalf("Error creating directories: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write images to disk
|
||||||
|
outFile, err := os.Create(path + task.name) // or open file in rewrite mode
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("can't create file: %s", err.Error())
|
||||||
|
}
|
||||||
|
if _, err := io.Copy(outFile, task.image); err != nil {
|
||||||
|
log.Printf("can't copy file: %s", err.Error())
|
||||||
|
}
|
||||||
|
outFile.Close()
|
||||||
|
|
||||||
|
log.Printf("new canvas image, sessID: %d, name: %s, size: %3.3f mb", task.sessionID, task.name, float64(task.image.Len())/1024.0/1024.0)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *ImageStorage) runWorker() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case task := <-v.writeToDiskTasks:
|
||||||
|
if task.isBreakTask {
|
||||||
|
v.imageWorkerStopped <- struct{}{}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
v.writeToDisk(task)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -3,17 +3,18 @@ package imagestorage
|
||||||
import (
|
import (
|
||||||
"openreplay/backend/internal/config/common"
|
"openreplay/backend/internal/config/common"
|
||||||
"openreplay/backend/internal/config/configurator"
|
"openreplay/backend/internal/config/configurator"
|
||||||
|
"openreplay/backend/internal/config/objectstorage"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
common.Config
|
common.Config
|
||||||
|
objectstorage.ObjectsConfig
|
||||||
FSDir string `env:"FS_DIR,required"`
|
FSDir string `env:"FS_DIR,required"`
|
||||||
ScreenshotsDir string `env:"SCREENSHOTS_DIR,default=screenshots"`
|
ScreenshotsDir string `env:"SCREENSHOTS_DIR,default=screenshots"`
|
||||||
CanvasDir string `env:"CANVAS_DIR,default=canvas"`
|
CanvasDir string `env:"CANVAS_DIR,default=canvas"`
|
||||||
TopicRawImages string `env:"TOPIC_RAW_IMAGES,required"`
|
TopicRawImages string `env:"TOPIC_RAW_IMAGES,required"`
|
||||||
TopicCanvasImages string `env:"TOPIC_CANVAS_IMAGES,required"`
|
TopicCanvasImages string `env:"TOPIC_CANVAS_IMAGES,required"`
|
||||||
TopicCanvasTrigger string `env:"TOPIC_CANVAS_TRIGGER,required"`
|
TopicCanvasTrigger string `env:"TOPIC_CANVAS_TRIGGER,required"`
|
||||||
TopicReplayTrigger string `env:"TOPIC_REPLAY_TRIGGER,required"`
|
|
||||||
GroupImageStorage string `env:"GROUP_IMAGE_STORAGE,required"`
|
GroupImageStorage string `env:"GROUP_IMAGE_STORAGE,required"`
|
||||||
GroupCanvasImage string `env:"GROUP_CANVAS_IMAGE,required"`
|
GroupCanvasImage string `env:"GROUP_CANVAS_IMAGE,required"`
|
||||||
UseProfiler bool `env:"PROFILER_ENABLED,default=false"`
|
UseProfiler bool `env:"PROFILER_ENABLED,default=false"`
|
||||||
|
|
|
||||||
|
|
@ -10,12 +10,8 @@ type Config struct {
|
||||||
common.Config
|
common.Config
|
||||||
objectstorage.ObjectsConfig
|
objectstorage.ObjectsConfig
|
||||||
FSDir string `env:"FS_DIR,required"`
|
FSDir string `env:"FS_DIR,required"`
|
||||||
GroupVideoStorage string `env:"GROUP_VIDEO_STORAGE,required"`
|
|
||||||
GroupCanvasVideo string `env:"GROUP_CANVAS_VIDEO,required"`
|
GroupCanvasVideo string `env:"GROUP_CANVAS_VIDEO,required"`
|
||||||
TopicReplayTrigger string `env:"TOPIC_REPLAY_TRIGGER,required"`
|
|
||||||
TopicCanvasTrigger string `env:"TOPIC_CANVAS_TRIGGER,required"`
|
TopicCanvasTrigger string `env:"TOPIC_CANVAS_TRIGGER,required"`
|
||||||
VideoReplayFPS int `env:"VIDEO_REPLAY_FPS,default=3"`
|
|
||||||
UseProfiler bool `env:"PROFILER_ENABLED,default=false"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func New() *Config {
|
func New() *Config {
|
||||||
|
|
|
||||||
|
|
@ -1,309 +0,0 @@
|
||||||
package imagestorage
|
|
||||||
|
|
||||||
import (
|
|
||||||
"archive/tar"
|
|
||||||
"bytes"
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"io/ioutil"
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"sort"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
gzip "github.com/klauspost/pgzip"
|
|
||||||
config "openreplay/backend/internal/config/imagestorage"
|
|
||||||
)
|
|
||||||
|
|
||||||
type ImageType uint8
|
|
||||||
|
|
||||||
const (
|
|
||||||
screenshot ImageType = iota
|
|
||||||
canvas
|
|
||||||
)
|
|
||||||
|
|
||||||
type Task struct {
|
|
||||||
sessionID uint64 // to generate path
|
|
||||||
images map[string]*bytes.Buffer
|
|
||||||
imageType ImageType
|
|
||||||
isBreakTask bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewBreakTask() *Task {
|
|
||||||
return &Task{isBreakTask: true}
|
|
||||||
}
|
|
||||||
|
|
||||||
type ImageStorage struct {
|
|
||||||
cfg *config.Config
|
|
||||||
writeToDiskTasks chan *Task
|
|
||||||
workersStopped chan struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func New(cfg *config.Config) (*ImageStorage, error) {
|
|
||||||
switch {
|
|
||||||
case cfg == nil:
|
|
||||||
return nil, fmt.Errorf("config is empty")
|
|
||||||
}
|
|
||||||
newStorage := &ImageStorage{
|
|
||||||
cfg: cfg,
|
|
||||||
writeToDiskTasks: make(chan *Task, 1),
|
|
||||||
workersStopped: make(chan struct{}),
|
|
||||||
}
|
|
||||||
go newStorage.runWorker()
|
|
||||||
return newStorage, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *ImageStorage) Wait() {
|
|
||||||
// send stop signal
|
|
||||||
v.writeToDiskTasks <- NewBreakTask()
|
|
||||||
// wait for workers to stop
|
|
||||||
<-v.workersStopped
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *ImageStorage) Process(sessID uint64, data []byte) error {
|
|
||||||
start := time.Now()
|
|
||||||
if err := v.extract(sessID, data); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
log.Printf("sessID: %d, arch size: %d, extracted archive in: %s", sessID, len(data), time.Since(start))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *ImageStorage) Prepare(sessID uint64) error {
|
|
||||||
path := v.cfg.FSDir + "/"
|
|
||||||
if v.cfg.ScreenshotsDir != "" {
|
|
||||||
path += v.cfg.ScreenshotsDir + "/"
|
|
||||||
}
|
|
||||||
path += strconv.FormatUint(sessID, 10) + "/"
|
|
||||||
|
|
||||||
// Check that the directory exists
|
|
||||||
files, err := ioutil.ReadDir(path)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if len(files) == 0 {
|
|
||||||
return errors.New("no screenshots found")
|
|
||||||
}
|
|
||||||
|
|
||||||
images := make(map[int]string)
|
|
||||||
times := make([]int, 0, len(files))
|
|
||||||
|
|
||||||
// Build the list of canvas images sets
|
|
||||||
for _, file := range files {
|
|
||||||
name := strings.Split(file.Name(), ".")
|
|
||||||
parts := strings.Split(name[0], "_")
|
|
||||||
if len(name) != 2 || len(parts) != 3 {
|
|
||||||
log.Printf("unknown file name: %s, skipping", file.Name())
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
screenshotTS, _ := strconv.Atoi(parts[2])
|
|
||||||
images[screenshotTS] = file.Name()
|
|
||||||
times = append(times, screenshotTS)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Prepare screenshot lists for ffmpeg
|
|
||||||
|
|
||||||
mixName := fmt.Sprintf("%d-list", sessID)
|
|
||||||
mixList := path + mixName
|
|
||||||
outputFile, err := os.Create(mixList)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("can't create mix list, err: %s", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
sort.Ints(times)
|
|
||||||
count := 0
|
|
||||||
for i := 0; i < len(times)-1; i++ {
|
|
||||||
dur := float64(times[i+1]-times[i]) / 1000.0
|
|
||||||
line := fmt.Sprintf("file %s\nduration %.3f\n", images[times[i]], dur)
|
|
||||||
_, err := outputFile.WriteString(line)
|
|
||||||
if err != nil {
|
|
||||||
outputFile.Close()
|
|
||||||
log.Printf("%s", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
count++
|
|
||||||
}
|
|
||||||
outputFile.Close()
|
|
||||||
log.Printf("new canvas mix %s with %d images", mixList, count)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type ScreenshotMessage struct {
|
|
||||||
Name string
|
|
||||||
Data []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *ImageStorage) PrepareCanvas(sessID uint64) ([]string, error) {
|
|
||||||
// Build the directory path to session's canvas images
|
|
||||||
path := v.cfg.FSDir + "/"
|
|
||||||
if v.cfg.CanvasDir != "" {
|
|
||||||
path += v.cfg.CanvasDir + "/"
|
|
||||||
}
|
|
||||||
path += strconv.FormatUint(sessID, 10) + "/"
|
|
||||||
|
|
||||||
// Check that the directory exists
|
|
||||||
files, err := ioutil.ReadDir(path)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if len(files) == 0 {
|
|
||||||
return []string{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type canvasData struct {
|
|
||||||
files map[int]string
|
|
||||||
times []int
|
|
||||||
}
|
|
||||||
images := make(map[string]*canvasData)
|
|
||||||
|
|
||||||
// Build the list of canvas images sets
|
|
||||||
for _, file := range files {
|
|
||||||
name := strings.Split(file.Name(), ".")
|
|
||||||
parts := strings.Split(name[0], "_")
|
|
||||||
if len(name) != 2 || len(parts) != 3 {
|
|
||||||
log.Printf("unknown file name: %s, skipping", file.Name())
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
canvasID := fmt.Sprintf("%s_%s", parts[0], parts[1])
|
|
||||||
canvasTS, _ := strconv.Atoi(parts[2])
|
|
||||||
if _, ok := images[canvasID]; !ok {
|
|
||||||
images[canvasID] = &canvasData{
|
|
||||||
files: make(map[int]string),
|
|
||||||
times: make([]int, 0),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
images[canvasID].files[canvasTS] = file.Name()
|
|
||||||
images[canvasID].times = append(images[canvasID].times, canvasTS)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Prepare screenshot lists for ffmpeg
|
|
||||||
namesList := make([]string, 0)
|
|
||||||
for name, cData := range images {
|
|
||||||
// Write to file
|
|
||||||
mixName := fmt.Sprintf("%s-list", name)
|
|
||||||
mixList := path + mixName
|
|
||||||
outputFile, err := os.Create(mixList)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("can't create mix list, err: %s", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
sort.Ints(cData.times)
|
|
||||||
count := 0
|
|
||||||
for i := 0; i < len(cData.times)-1; i++ {
|
|
||||||
dur := float64(cData.times[i+1]-cData.times[i]) / 1000.0
|
|
||||||
line := fmt.Sprintf("file %s\nduration %.3f\n", cData.files[cData.times[i]], dur)
|
|
||||||
_, err := outputFile.WriteString(line)
|
|
||||||
if err != nil {
|
|
||||||
outputFile.Close()
|
|
||||||
log.Printf("%s", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
count++
|
|
||||||
}
|
|
||||||
outputFile.Close()
|
|
||||||
log.Printf("new canvas mix %s with %d images", mixList, count)
|
|
||||||
namesList = append(namesList, mixName)
|
|
||||||
}
|
|
||||||
log.Printf("prepared %d canvas mixes for session %d", len(namesList), sessID)
|
|
||||||
return namesList, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *ImageStorage) ProcessCanvas(sessID uint64, data []byte) error {
|
|
||||||
var msg = &ScreenshotMessage{}
|
|
||||||
if err := json.Unmarshal(data, msg); err != nil {
|
|
||||||
log.Printf("can't parse canvas message, err: %s", err)
|
|
||||||
}
|
|
||||||
// Use the same workflow
|
|
||||||
v.writeToDiskTasks <- &Task{sessionID: sessID, images: map[string]*bytes.Buffer{msg.Name: bytes.NewBuffer(msg.Data)}, imageType: canvas}
|
|
||||||
log.Printf("new canvas image, sessID: %d, name: %s, size: %3.3f mb", sessID, msg.Name, float64(len(msg.Data))/1024.0/1024.0)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *ImageStorage) extract(sessID uint64, data []byte) error {
|
|
||||||
images := make(map[string]*bytes.Buffer)
|
|
||||||
uncompressedStream, err := gzip.NewReader(bytes.NewReader(data))
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("can't create gzip reader: %s", err.Error())
|
|
||||||
}
|
|
||||||
tarReader := tar.NewReader(uncompressedStream)
|
|
||||||
|
|
||||||
for {
|
|
||||||
header, err := tarReader.Next()
|
|
||||||
if err != nil {
|
|
||||||
if err == io.EOF {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
return fmt.Errorf("can't read tar header: %s", err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
if header.Typeflag == tar.TypeReg {
|
|
||||||
var buf bytes.Buffer
|
|
||||||
if _, err := buf.ReadFrom(tarReader); err != nil {
|
|
||||||
return fmt.Errorf("can't copy file: %s", err.Error())
|
|
||||||
}
|
|
||||||
images[header.Name] = &buf
|
|
||||||
} else {
|
|
||||||
log.Printf("ExtractTarGz: uknown type: %d in %s", header.Typeflag, header.Name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
v.writeToDiskTasks <- &Task{sessionID: sessID, images: images, imageType: screenshot}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *ImageStorage) writeToDisk(task *Task) {
|
|
||||||
// Build the directory path
|
|
||||||
path := v.cfg.FSDir + "/"
|
|
||||||
if task.imageType == screenshot {
|
|
||||||
if v.cfg.ScreenshotsDir != "" {
|
|
||||||
path += v.cfg.ScreenshotsDir + "/"
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if v.cfg.CanvasDir != "" {
|
|
||||||
path += v.cfg.CanvasDir + "/"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
path += strconv.FormatUint(task.sessionID, 10) + "/"
|
|
||||||
|
|
||||||
// Ensure the directory exists
|
|
||||||
if err := os.MkdirAll(path, 0755); err != nil {
|
|
||||||
log.Fatalf("Error creating directories: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write images to disk
|
|
||||||
saved := 0
|
|
||||||
for name, img := range task.images {
|
|
||||||
outFile, err := os.Create(path + name) // or open file in rewrite mode
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("can't create file: %s", err.Error())
|
|
||||||
}
|
|
||||||
if _, err := io.Copy(outFile, img); err != nil {
|
|
||||||
log.Printf("can't copy file: %s", err.Error())
|
|
||||||
}
|
|
||||||
outFile.Close()
|
|
||||||
saved++
|
|
||||||
}
|
|
||||||
log.Printf("saved %d images to disk", saved)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *ImageStorage) runWorker() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case task := <-v.writeToDiskTasks:
|
|
||||||
if task.isBreakTask {
|
|
||||||
v.workersStopped <- struct{}{}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
v.writeToDisk(task)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
199
backend/internal/screenshot-handler/service.go
Normal file
199
backend/internal/screenshot-handler/service.go
Normal file
|
|
@ -0,0 +1,199 @@
|
||||||
|
package screenshot_handler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"archive/tar"
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"openreplay/backend/pkg/objectstorage"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
gzip "github.com/klauspost/pgzip"
|
||||||
|
config "openreplay/backend/internal/config/imagestorage"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Task struct {
|
||||||
|
sessionID uint64 // to generate path
|
||||||
|
images map[string]*bytes.Buffer
|
||||||
|
isBreakTask bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBreakTask() *Task {
|
||||||
|
return &Task{isBreakTask: true}
|
||||||
|
}
|
||||||
|
|
||||||
|
type UploadTask struct {
|
||||||
|
sessionID string
|
||||||
|
path string
|
||||||
|
name string
|
||||||
|
isBreakTask bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBreakUploadTask() *UploadTask {
|
||||||
|
return &UploadTask{isBreakTask: true}
|
||||||
|
}
|
||||||
|
|
||||||
|
type ImageStorage struct {
|
||||||
|
cfg *config.Config
|
||||||
|
objStorage objectstorage.ObjectStorage
|
||||||
|
writeToDiskTasks chan *Task
|
||||||
|
sendToS3Tasks chan *UploadTask
|
||||||
|
imageWorkerStopped chan struct{}
|
||||||
|
uploadWorkersStopped chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(cfg *config.Config, objStorage objectstorage.ObjectStorage) (*ImageStorage, error) {
|
||||||
|
switch {
|
||||||
|
case cfg == nil:
|
||||||
|
return nil, fmt.Errorf("config is empty")
|
||||||
|
}
|
||||||
|
newStorage := &ImageStorage{
|
||||||
|
cfg: cfg,
|
||||||
|
objStorage: objStorage,
|
||||||
|
writeToDiskTasks: make(chan *Task, 1),
|
||||||
|
sendToS3Tasks: make(chan *UploadTask, 1),
|
||||||
|
imageWorkerStopped: make(chan struct{}),
|
||||||
|
uploadWorkersStopped: make(chan struct{}),
|
||||||
|
}
|
||||||
|
go newStorage.runWorker()
|
||||||
|
return newStorage, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *ImageStorage) Wait() {
|
||||||
|
// send stop signal
|
||||||
|
v.writeToDiskTasks <- NewBreakTask()
|
||||||
|
v.sendToS3Tasks <- NewBreakUploadTask()
|
||||||
|
|
||||||
|
// wait for workers to stop
|
||||||
|
<-v.imageWorkerStopped
|
||||||
|
<-v.uploadWorkersStopped
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *ImageStorage) Process(sessID uint64, data []byte) error {
|
||||||
|
start := time.Now()
|
||||||
|
images := make(map[string]*bytes.Buffer)
|
||||||
|
uncompressedStream, err := gzip.NewReader(bytes.NewReader(data))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't create gzip reader: %s", err.Error())
|
||||||
|
}
|
||||||
|
tarReader := tar.NewReader(uncompressedStream)
|
||||||
|
|
||||||
|
for {
|
||||||
|
header, err := tarReader.Next()
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return fmt.Errorf("can't read tar header: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if header.Typeflag == tar.TypeReg {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
if _, err := buf.ReadFrom(tarReader); err != nil {
|
||||||
|
return fmt.Errorf("can't copy file: %s", err.Error())
|
||||||
|
}
|
||||||
|
images[header.Name] = &buf
|
||||||
|
} else {
|
||||||
|
log.Printf("ExtractTarGz: uknown type: %d in %s", header.Typeflag, header.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
v.writeToDiskTasks <- &Task{sessionID: sessID, images: images}
|
||||||
|
log.Printf("sessID: %d, arch size: %d, extracted archive in: %s", sessID, len(data), time.Since(start))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *ImageStorage) writeToDisk(task *Task) {
|
||||||
|
// Build the directory path
|
||||||
|
path := v.cfg.FSDir + "/"
|
||||||
|
if v.cfg.ScreenshotsDir != "" {
|
||||||
|
path += v.cfg.ScreenshotsDir + "/"
|
||||||
|
}
|
||||||
|
|
||||||
|
path += strconv.FormatUint(task.sessionID, 10) + "/"
|
||||||
|
|
||||||
|
// Ensure the directory exists
|
||||||
|
if err := os.MkdirAll(path, 0755); err != nil {
|
||||||
|
log.Fatalf("Error creating directories: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write images to disk
|
||||||
|
saved := 0
|
||||||
|
for name, img := range task.images {
|
||||||
|
outFile, err := os.Create(path + name) // or open file in rewrite mode
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("can't create file: %s", err.Error())
|
||||||
|
}
|
||||||
|
if _, err := io.Copy(outFile, img); err != nil {
|
||||||
|
log.Printf("can't copy file: %s", err.Error())
|
||||||
|
}
|
||||||
|
outFile.Close()
|
||||||
|
saved++
|
||||||
|
}
|
||||||
|
log.Printf("saved %d images to disk", saved)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *ImageStorage) PackScreenshots(sessID uint64, filesPath string) error {
|
||||||
|
// Temporarily disabled for tests
|
||||||
|
if v.objStorage == nil {
|
||||||
|
return fmt.Errorf("object storage is empty")
|
||||||
|
}
|
||||||
|
start := time.Now()
|
||||||
|
sessionID := strconv.FormatUint(sessID, 10)
|
||||||
|
selector := fmt.Sprintf("%s*.jpeg", filesPath)
|
||||||
|
archPath := filesPath + "replay.tar.zst"
|
||||||
|
|
||||||
|
// tar cf - ./*.jpeg | zstd -o replay.tar.zst
|
||||||
|
fullCmd := fmt.Sprintf("tar cf - %s | zstd -o %s", selector, archPath)
|
||||||
|
cmd := exec.Command("sh", "-c", fullCmd)
|
||||||
|
var stdout, stderr bytes.Buffer
|
||||||
|
cmd.Stdout = &stdout
|
||||||
|
cmd.Stderr = &stderr
|
||||||
|
|
||||||
|
err := cmd.Run()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to execute command: %v, stderr: %v", err, stderr.String())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.Printf("packed replay in %v", time.Since(start))
|
||||||
|
|
||||||
|
v.sendToS3Tasks <- &UploadTask{sessionID: sessionID, path: archPath, name: sessionID + "/replay.tar.zst"}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *ImageStorage) sendToS3(task *UploadTask) {
|
||||||
|
start := time.Now()
|
||||||
|
video, err := os.ReadFile(task.path)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to read video file: %v", err)
|
||||||
|
}
|
||||||
|
if err := v.objStorage.Upload(bytes.NewReader(video), task.name, "application/octet-stream", objectstorage.Zstd); err != nil {
|
||||||
|
log.Fatalf("Storage: start uploading replay failed. %s", err)
|
||||||
|
}
|
||||||
|
log.Printf("Replay file (size: %d) uploaded successfully in %v", len(video), time.Since(start))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *ImageStorage) runWorker() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case task := <-v.writeToDiskTasks:
|
||||||
|
if task.isBreakTask {
|
||||||
|
v.imageWorkerStopped <- struct{}{}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
v.writeToDisk(task)
|
||||||
|
case task := <-v.sendToS3Tasks:
|
||||||
|
if task.isBreakTask {
|
||||||
|
v.uploadWorkersStopped <- struct{}{}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
v.sendToS3(task)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
package videostorage
|
package video_maker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
|
@ -15,7 +15,6 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Task struct {
|
type Task struct {
|
||||||
sessionID string
|
|
||||||
path string
|
path string
|
||||||
name string
|
name string
|
||||||
isBreakTask bool
|
isBreakTask bool
|
||||||
|
|
@ -27,7 +26,6 @@ func NewBreakTask() *Task {
|
||||||
|
|
||||||
type VideoStorage struct {
|
type VideoStorage struct {
|
||||||
cfg *config.Config
|
cfg *config.Config
|
||||||
framerate string
|
|
||||||
objStorage objectstorage.ObjectStorage
|
objStorage objectstorage.ObjectStorage
|
||||||
sendToS3Tasks chan *Task
|
sendToS3Tasks chan *Task
|
||||||
workersStopped chan struct{}
|
workersStopped chan struct{}
|
||||||
|
|
@ -39,12 +37,9 @@ func New(cfg *config.Config, objStorage objectstorage.ObjectStorage) (*VideoStor
|
||||||
return nil, fmt.Errorf("config is empty")
|
return nil, fmt.Errorf("config is empty")
|
||||||
case objStorage == nil:
|
case objStorage == nil:
|
||||||
return nil, fmt.Errorf("object storage is empty")
|
return nil, fmt.Errorf("object storage is empty")
|
||||||
case cfg.VideoReplayFPS <= 0:
|
|
||||||
return nil, fmt.Errorf("video replay fps is invalid: %d", cfg.VideoReplayFPS)
|
|
||||||
}
|
}
|
||||||
newStorage := &VideoStorage{
|
newStorage := &VideoStorage{
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
framerate: strconv.Itoa(cfg.VideoReplayFPS),
|
|
||||||
objStorage: objStorage,
|
objStorage: objStorage,
|
||||||
sendToS3Tasks: make(chan *Task, 1),
|
sendToS3Tasks: make(chan *Task, 1),
|
||||||
workersStopped: make(chan struct{}),
|
workersStopped: make(chan struct{}),
|
||||||
|
|
@ -53,36 +48,7 @@ func New(cfg *config.Config, objStorage objectstorage.ObjectStorage) (*VideoStor
|
||||||
return newStorage, nil
|
return newStorage, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *VideoStorage) makeVideo(sessID uint64, filesPath string) error {
|
func (v *VideoStorage) Process(sessID uint64, filesPath, canvasMix string) error {
|
||||||
files, err := os.ReadDir(filesPath)
|
|
||||||
if err != nil || len(files) == 0 {
|
|
||||||
return err // nil error is there is no screenshots
|
|
||||||
}
|
|
||||||
log.Printf("There are %d screenshot of session %d\n", len(files), sessID)
|
|
||||||
|
|
||||||
// Try to call ffmpeg and print the result
|
|
||||||
start := time.Now()
|
|
||||||
sessionID := strconv.FormatUint(sessID, 10)
|
|
||||||
mixList := fmt.Sprintf("%s%s", filesPath, sessionID+"-list")
|
|
||||||
videoPath := "/mnt/efs/screenshots/" + sessionID + "/replay.mp4"
|
|
||||||
cmd := exec.Command("ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", mixList, "-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2", "-vsync", "vfr",
|
|
||||||
"-pix_fmt", "yuv420p", "-preset", "ultrafast", videoPath)
|
|
||||||
|
|
||||||
var stdout, stderr bytes.Buffer
|
|
||||||
cmd.Stdout = &stdout
|
|
||||||
cmd.Stderr = &stderr
|
|
||||||
|
|
||||||
err = cmd.Run()
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Failed to execute command: %v, stderr: %v", err, stderr.String())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
log.Printf("made video replay in %v", time.Since(start))
|
|
||||||
v.sendToS3Tasks <- &Task{sessionID: sessionID, path: videoPath}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *VideoStorage) makeCanvasVideo(sessID uint64, filesPath, canvasMix string) error {
|
|
||||||
name := strings.TrimSuffix(canvasMix, "-list")
|
name := strings.TrimSuffix(canvasMix, "-list")
|
||||||
mixList := fmt.Sprintf("%s%s", filesPath, canvasMix)
|
mixList := fmt.Sprintf("%s%s", filesPath, canvasMix)
|
||||||
// check that mixList exists
|
// check that mixList exists
|
||||||
|
|
@ -108,38 +74,23 @@ func (v *VideoStorage) makeCanvasVideo(sessID uint64, filesPath, canvasMix strin
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Printf("made video replay in %v", time.Since(start))
|
log.Printf("made video replay in %v", time.Since(start))
|
||||||
v.sendToS3Tasks <- &Task{sessionID: sessionID, path: videoPath, name: "/" + name + ".mp4"}
|
v.sendToS3Tasks <- &Task{path: videoPath, name: sessionID + "/" + name + ".mp4"}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *VideoStorage) sendToS3(task *Task) {
|
func (v *VideoStorage) sendToS3(task *Task) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
// Read video file from disk
|
|
||||||
video, err := os.ReadFile(task.path)
|
video, err := os.ReadFile(task.path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Failed to read video file: %v", err)
|
log.Fatalf("Failed to read video file: %v", err)
|
||||||
}
|
}
|
||||||
// Upload video file to S3
|
if err := v.objStorage.Upload(bytes.NewReader(video), task.name, "video/mp4", objectstorage.NoCompression); err != nil {
|
||||||
key := task.sessionID
|
log.Fatalf("Storage: start uploading replay failed. %s", err)
|
||||||
if task.name != "" {
|
|
||||||
key += task.name
|
|
||||||
} else {
|
|
||||||
key += "/replay.mp4"
|
|
||||||
}
|
}
|
||||||
if err := v.objStorage.Upload(bytes.NewReader(video), key, "video/mp4", objectstorage.NoCompression); err != nil {
|
log.Printf("Viode file (size: %d) uploaded successfully in %v", len(video), time.Since(start))
|
||||||
log.Fatalf("Storage: start upload video replay failed. %s", err)
|
|
||||||
}
|
|
||||||
log.Printf("Video file (size: %d) uploaded successfully in %v", len(video), time.Since(start))
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *VideoStorage) Process(sessID uint64, filesPath string, canvasMix string) error {
|
|
||||||
if canvasMix != "" {
|
|
||||||
return v.makeCanvasVideo(sessID, filesPath, canvasMix)
|
|
||||||
}
|
|
||||||
return v.makeVideo(sessID, filesPath)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *VideoStorage) Wait() {
|
func (v *VideoStorage) Wait() {
|
||||||
v.sendToS3Tasks <- NewBreakTask()
|
v.sendToS3Tasks <- NewBreakTask()
|
||||||
<-v.workersStopped
|
<-v.workersStopped
|
||||||
|
|
@ -69,7 +69,7 @@ root_path=''
|
||||||
S3_HOST=
|
S3_HOST=
|
||||||
S3_KEY=
|
S3_KEY=
|
||||||
S3_SECRET=
|
S3_SECRET=
|
||||||
SESSION_IOS_VIDEO_PATTERN=%(sessionId)s/replay.mp4
|
SESSION_IOS_VIDEO_PATTERN=%(sessionId)s/replay.tar.zst
|
||||||
SESSION_MOB_PATTERN_E=%(sessionId)s/dom.mobe
|
SESSION_MOB_PATTERN_E=%(sessionId)s/dom.mobe
|
||||||
SESSION_MOB_PATTERN_S=%(sessionId)s/dom.mobs
|
SESSION_MOB_PATTERN_S=%(sessionId)s/dom.mobs
|
||||||
sessions_bucket=mobs
|
sessions_bucket=mobs
|
||||||
|
|
|
||||||
|
|
@ -1,23 +0,0 @@
|
||||||
# Patterns to ignore when building packages.
|
|
||||||
# This supports shell glob matching, relative path matching, and
|
|
||||||
# negation (prefixed with !). Only one pattern per line.
|
|
||||||
.DS_Store
|
|
||||||
# Common VCS dirs
|
|
||||||
.git/
|
|
||||||
.gitignore
|
|
||||||
.bzr/
|
|
||||||
.bzrignore
|
|
||||||
.hg/
|
|
||||||
.hgignore
|
|
||||||
.svn/
|
|
||||||
# Common backup files
|
|
||||||
*.swp
|
|
||||||
*.bak
|
|
||||||
*.tmp
|
|
||||||
*.orig
|
|
||||||
*~
|
|
||||||
# Various IDEs
|
|
||||||
.project
|
|
||||||
.idea/
|
|
||||||
*.tmproj
|
|
||||||
.vscode/
|
|
||||||
|
|
@ -1,24 +0,0 @@
|
||||||
apiVersion: v2
|
|
||||||
name: videostorage
|
|
||||||
description: A Helm chart for Kubernetes
|
|
||||||
|
|
||||||
# A chart can be either an 'application' or a 'library' chart.
|
|
||||||
#
|
|
||||||
# Application charts are a collection of templates that can be packaged into versioned archives
|
|
||||||
# to be deployed.
|
|
||||||
#
|
|
||||||
# Library charts provide useful utilities or functions for the chart developer. They're included as
|
|
||||||
# a dependency of application charts to inject those utilities and functions into the rendering
|
|
||||||
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
|
|
||||||
type: application
|
|
||||||
|
|
||||||
# This is the chart version. This version number should be incremented each time you make changes
|
|
||||||
# to the chart and its templates, including the app version.
|
|
||||||
# Versions are expected to follow Semantic Versioning (https://semver.org/)
|
|
||||||
version: 0.1.1
|
|
||||||
|
|
||||||
# This is the version number of the application being deployed. This version number should be
|
|
||||||
# incremented each time you make changes to the application. Versions are not expected to
|
|
||||||
# follow Semantic Versioning. They should reflect the version the application is using.
|
|
||||||
# It is recommended to use it with quotes.
|
|
||||||
AppVersion: "v1.17.0"
|
|
||||||
|
|
@ -1,22 +0,0 @@
|
||||||
1. Get the application URL by running these commands:
|
|
||||||
{{- if .Values.ingress.enabled }}
|
|
||||||
{{- range $host := .Values.ingress.hosts }}
|
|
||||||
{{- range .paths }}
|
|
||||||
http{{ if $.Values.ingress.tls }}s{{ end }}://{{ $host.host }}{{ .path }}
|
|
||||||
{{- end }}
|
|
||||||
{{- end }}
|
|
||||||
{{- else if contains "NodePort" .Values.service.type }}
|
|
||||||
export NODE_PORT=$(kubectl get --namespace {{ .Release.Namespace }} -o jsonpath="{.spec.ports[0].nodePort}" services {{ include "videostorage.fullname" . }})
|
|
||||||
export NODE_IP=$(kubectl get nodes --namespace {{ .Release.Namespace }} -o jsonpath="{.items[0].status.addresses[0].address}")
|
|
||||||
echo http://$NODE_IP:$NODE_PORT
|
|
||||||
{{- else if contains "LoadBalancer" .Values.service.type }}
|
|
||||||
NOTE: It may take a few minutes for the LoadBalancer IP to be available.
|
|
||||||
You can watch the status of by running 'kubectl get --namespace {{ .Release.Namespace }} svc -w {{ include "videostorage.fullname" . }}'
|
|
||||||
export SERVICE_IP=$(kubectl get svc --namespace {{ .Release.Namespace }} {{ include "videostorage.fullname" . }} --template "{{"{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}"}}")
|
|
||||||
echo http://$SERVICE_IP:{{ .Values.service.port }}
|
|
||||||
{{- else if contains "ClusterIP" .Values.service.type }}
|
|
||||||
export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "videostorage.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}")
|
|
||||||
export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}")
|
|
||||||
echo "Visit http://127.0.0.1:8080 to use your application"
|
|
||||||
kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT
|
|
||||||
{{- end }}
|
|
||||||
|
|
@ -1,62 +0,0 @@
|
||||||
{{/*
|
|
||||||
Expand the name of the chart.
|
|
||||||
*/}}
|
|
||||||
{{- define "videostorage.name" -}}
|
|
||||||
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
|
|
||||||
{{- end }}
|
|
||||||
|
|
||||||
{{/*
|
|
||||||
Create a default fully qualified app name.
|
|
||||||
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
|
|
||||||
If release name contains chart name it will be used as a full name.
|
|
||||||
*/}}
|
|
||||||
{{- define "videostorage.fullname" -}}
|
|
||||||
{{- if .Values.fullnameOverride }}
|
|
||||||
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }}
|
|
||||||
{{- else }}
|
|
||||||
{{- $name := default .Chart.Name .Values.nameOverride }}
|
|
||||||
{{- if contains $name .Release.Name }}
|
|
||||||
{{- .Release.Name | trunc 63 | trimSuffix "-" }}
|
|
||||||
{{- else }}
|
|
||||||
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }}
|
|
||||||
{{- end }}
|
|
||||||
{{- end }}
|
|
||||||
{{- end }}
|
|
||||||
|
|
||||||
{{/*
|
|
||||||
Create chart name and version as used by the chart label.
|
|
||||||
*/}}
|
|
||||||
{{- define "videostorage.chart" -}}
|
|
||||||
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
|
|
||||||
{{- end }}
|
|
||||||
|
|
||||||
{{/*
|
|
||||||
Common labels
|
|
||||||
*/}}
|
|
||||||
{{- define "videostorage.labels" -}}
|
|
||||||
helm.sh/chart: {{ include "videostorage.chart" . }}
|
|
||||||
{{ include "videostorage.selectorLabels" . }}
|
|
||||||
{{- if .Chart.AppVersion }}
|
|
||||||
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
|
|
||||||
{{- end }}
|
|
||||||
app.kubernetes.io/managed-by: {{ .Release.Service }}
|
|
||||||
{{- end }}
|
|
||||||
|
|
||||||
{{/*
|
|
||||||
Selector labels
|
|
||||||
*/}}
|
|
||||||
{{- define "videostorage.selectorLabels" -}}
|
|
||||||
app.kubernetes.io/name: {{ include "videostorage.name" . }}
|
|
||||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
|
||||||
{{- end }}
|
|
||||||
|
|
||||||
{{/*
|
|
||||||
Create the name of the service account to use
|
|
||||||
*/}}
|
|
||||||
{{- define "videostorage.serviceAccountName" -}}
|
|
||||||
{{- if .Values.serviceAccount.create }}
|
|
||||||
{{- default (include "videostorage.fullname" .) .Values.serviceAccount.name }}
|
|
||||||
{{- else }}
|
|
||||||
{{- default "default" .Values.serviceAccount.name }}
|
|
||||||
{{- end }}
|
|
||||||
{{- end }}
|
|
||||||
|
|
@ -1,131 +0,0 @@
|
||||||
apiVersion: apps/v1
|
|
||||||
kind: Deployment
|
|
||||||
metadata:
|
|
||||||
name: {{ include "videostorage.fullname" . }}
|
|
||||||
namespace: {{ .Release.Namespace }}
|
|
||||||
labels:
|
|
||||||
{{- include "videostorage.labels" . | nindent 4 }}
|
|
||||||
spec:
|
|
||||||
{{- if not .Values.autoscaling.enabled }}
|
|
||||||
replicas: {{ .Values.replicaCount }}
|
|
||||||
{{- end }}
|
|
||||||
selector:
|
|
||||||
matchLabels:
|
|
||||||
{{- include "videostorage.selectorLabels" . | nindent 6 }}
|
|
||||||
template:
|
|
||||||
metadata:
|
|
||||||
{{- with .Values.podAnnotations }}
|
|
||||||
annotations:
|
|
||||||
{{- toYaml . | nindent 8 }}
|
|
||||||
{{- end }}
|
|
||||||
labels:
|
|
||||||
{{- include "videostorage.selectorLabels" . | nindent 8 }}
|
|
||||||
spec:
|
|
||||||
{{- with .Values.imagePullSecrets }}
|
|
||||||
imagePullSecrets:
|
|
||||||
{{- toYaml . | nindent 8 }}
|
|
||||||
{{- end }}
|
|
||||||
serviceAccountName: {{ include "videostorage.serviceAccountName" . }}
|
|
||||||
securityContext:
|
|
||||||
{{- toYaml .Values.podSecurityContext | nindent 8 }}
|
|
||||||
shareProcessNamespace: true
|
|
||||||
containers:
|
|
||||||
- name: {{ .Chart.Name }}
|
|
||||||
securityContext:
|
|
||||||
{{- toYaml .Values.securityContext | nindent 12 }}
|
|
||||||
{{- if .Values.global.enterpriseEditionLicense }}
|
|
||||||
image: "{{ tpl .Values.image.repository . }}:{{ .Values.image.tag | default .Chart.AppVersion }}-ee"
|
|
||||||
{{- else }}
|
|
||||||
image: "{{ tpl .Values.image.repository . }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
|
|
||||||
{{- end }}
|
|
||||||
imagePullPolicy: {{ .Values.image.pullPolicy }}
|
|
||||||
{{- if .Values.healthCheck}}
|
|
||||||
{{- .Values.healthCheck | toYaml | nindent 10}}
|
|
||||||
{{- end}}
|
|
||||||
env:
|
|
||||||
- name: AWS_ACCESS_KEY_ID
|
|
||||||
{{- if .Values.global.s3.existingSecret }}
|
|
||||||
valueFrom:
|
|
||||||
secretKeyRef:
|
|
||||||
name: {{ .Values.global.s3.existingSecret }}
|
|
||||||
key: access-key
|
|
||||||
{{- else }}
|
|
||||||
value: {{ .Values.global.s3.accessKey }}
|
|
||||||
{{- end }}
|
|
||||||
- name: AWS_SECRET_ACCESS_KEY
|
|
||||||
{{- if .Values.global.s3.existingSecret }}
|
|
||||||
valueFrom:
|
|
||||||
secretKeyRef:
|
|
||||||
name: {{ .Values.global.s3.existingSecret }}
|
|
||||||
key: secret-key
|
|
||||||
{{- else }}
|
|
||||||
value: {{ .Values.global.s3.secretKey }}
|
|
||||||
{{- end }}
|
|
||||||
- name: AWS_ENDPOINT
|
|
||||||
value: '{{ .Values.global.s3.endpoint }}'
|
|
||||||
- name: AWS_REGION
|
|
||||||
value: '{{ .Values.global.s3.region }}'
|
|
||||||
- name: BUCKET_NAME
|
|
||||||
value: {{ .Values.global.s3.recordingsBucket }}
|
|
||||||
- name: LICENSE_KEY
|
|
||||||
value: '{{ .Values.global.enterpriseEditionLicense }}'
|
|
||||||
- name: KAFKA_SERVERS
|
|
||||||
value: '{{ .Values.global.kafka.kafkaHost }}:{{ .Values.global.kafka.kafkaPort }}'
|
|
||||||
- name: KAFKA_USE_SSL
|
|
||||||
value: '{{ .Values.global.kafka.kafkaUseSsl }}'
|
|
||||||
{{- include "openreplay.env.redis_string" .Values.global.redis | nindent 12 }}
|
|
||||||
{{- range $key, $val := .Values.global.env }}
|
|
||||||
- name: {{ $key }}
|
|
||||||
value: '{{ $val }}'
|
|
||||||
{{- end }}
|
|
||||||
{{- range $key, $val := .Values.env }}
|
|
||||||
- name: {{ $key }}
|
|
||||||
value: '{{ $val }}'
|
|
||||||
{{- end}}
|
|
||||||
ports:
|
|
||||||
{{- range $key, $val := .Values.service.ports }}
|
|
||||||
- name: {{ $key }}
|
|
||||||
containerPort: {{ $val }}
|
|
||||||
protocol: TCP
|
|
||||||
{{- end }}
|
|
||||||
resources:
|
|
||||||
{{- toYaml .Values.resources | nindent 12 }}
|
|
||||||
volumeMounts:
|
|
||||||
- name: datadir
|
|
||||||
mountPath: /mnt/efs
|
|
||||||
{{- include "openreplay.volume.redis_ca_certificate.mount" .Values.global.redis | nindent 10 }}
|
|
||||||
{{- with .Values.persistence.mounts }}
|
|
||||||
{{- toYaml . | nindent 10 }}
|
|
||||||
{{- end }}
|
|
||||||
{{- if eq (tpl .Values.pvc.name . ) "hostPath" }}
|
|
||||||
volumes:
|
|
||||||
{{- with .Values.persistence.volumes }}
|
|
||||||
{{- toYaml . | nindent 6 }}
|
|
||||||
{{- end }}
|
|
||||||
- name: datadir
|
|
||||||
hostPath:
|
|
||||||
# Ensure the file directory is created.
|
|
||||||
path: {{ .Values.pvc.hostMountPath }}
|
|
||||||
type: DirectoryOrCreate
|
|
||||||
{{- else }}
|
|
||||||
volumes:
|
|
||||||
{{- with .Values.persistence.volumes }}
|
|
||||||
{{- toYaml . | nindent 8 }}
|
|
||||||
{{- end }}
|
|
||||||
- name: datadir
|
|
||||||
persistentVolumeClaim:
|
|
||||||
claimName: "{{ tpl .Values.pvc.name . }}"
|
|
||||||
{{- end }}
|
|
||||||
{{- include "openreplay.volume.redis_ca_certificate" .Values.global.redis | nindent 6 }}
|
|
||||||
{{- with .Values.nodeSelector }}
|
|
||||||
nodeSelector:
|
|
||||||
{{- toYaml . | nindent 8 }}
|
|
||||||
{{- end }}
|
|
||||||
{{- with .Values.affinity }}
|
|
||||||
affinity:
|
|
||||||
{{- toYaml . | nindent 8 }}
|
|
||||||
{{- end }}
|
|
||||||
{{- with .Values.tolerations }}
|
|
||||||
tolerations:
|
|
||||||
{{- toYaml . | nindent 8 }}
|
|
||||||
{{- end }}
|
|
||||||
|
|
@ -1,29 +0,0 @@
|
||||||
{{- if .Values.autoscaling.enabled }}
|
|
||||||
apiVersion: autoscaling/v2beta1
|
|
||||||
kind: HorizontalPodAutoscaler
|
|
||||||
metadata:
|
|
||||||
name: {{ include "videostorage.fullname" . }}
|
|
||||||
namespace: {{ .Release.Namespace }}
|
|
||||||
labels:
|
|
||||||
{{- include "videostorage.labels" . | nindent 4 }}
|
|
||||||
spec:
|
|
||||||
scaleTargetRef:
|
|
||||||
apiVersion: apps/v1
|
|
||||||
kind: Deployment
|
|
||||||
name: {{ include "videostorage.fullname" . }}
|
|
||||||
minReplicas: {{ .Values.autoscaling.minReplicas }}
|
|
||||||
maxReplicas: {{ .Values.autoscaling.maxReplicas }}
|
|
||||||
metrics:
|
|
||||||
{{- if .Values.autoscaling.targetCPUUtilizationPercentage }}
|
|
||||||
- type: Resource
|
|
||||||
resource:
|
|
||||||
name: cpu
|
|
||||||
targetAverageUtilization: {{ .Values.autoscaling.targetCPUUtilizationPercentage }}
|
|
||||||
{{- end }}
|
|
||||||
{{- if .Values.autoscaling.targetMemoryUtilizationPercentage }}
|
|
||||||
- type: Resource
|
|
||||||
resource:
|
|
||||||
name: memory
|
|
||||||
targetAverageUtilization: {{ .Values.autoscaling.targetMemoryUtilizationPercentage }}
|
|
||||||
{{- end }}
|
|
||||||
{{- end }}
|
|
||||||
|
|
@ -1,62 +0,0 @@
|
||||||
{{- if .Values.ingress.enabled -}}
|
|
||||||
{{- $fullName := include "videostorage.fullname" . -}}
|
|
||||||
{{- $svcPort := .Values.service.ports.http -}}
|
|
||||||
{{- if and .Values.ingress.className (not (semverCompare ">=1.18-0" .Capabilities.KubeVersion.GitVersion)) }}
|
|
||||||
{{- if not (hasKey .Values.ingress.annotations "kubernetes.io/ingress.class") }}
|
|
||||||
{{- $_ := set .Values.ingress.annotations "kubernetes.io/ingress.class" .Values.ingress.className}}
|
|
||||||
{{- end }}
|
|
||||||
{{- end }}
|
|
||||||
{{- if semverCompare ">=1.19-0" .Capabilities.KubeVersion.GitVersion -}}
|
|
||||||
apiVersion: networking.k8s.io/v1
|
|
||||||
{{- else if semverCompare ">=1.14-0" .Capabilities.KubeVersion.GitVersion -}}
|
|
||||||
apiVersion: networking.k8s.io/v1beta1
|
|
||||||
{{- else -}}
|
|
||||||
apiVersion: extensions/v1beta1
|
|
||||||
{{- end }}
|
|
||||||
kind: Ingress
|
|
||||||
metadata:
|
|
||||||
name: {{ $fullName }}
|
|
||||||
namespace: {{ .Release.Namespace }}
|
|
||||||
labels:
|
|
||||||
{{- include "videostorage.labels" . | nindent 4 }}
|
|
||||||
{{- with .Values.ingress.annotations }}
|
|
||||||
annotations:
|
|
||||||
{{- toYaml . | nindent 4 }}
|
|
||||||
{{- end }}
|
|
||||||
spec:
|
|
||||||
{{- if and .Values.ingress.className (semverCompare ">=1.18-0" .Capabilities.KubeVersion.GitVersion) }}
|
|
||||||
ingressClassName: {{ .Values.ingress.className }}
|
|
||||||
{{- end }}
|
|
||||||
{{- if .Values.ingress.tls }}
|
|
||||||
tls:
|
|
||||||
{{- range .Values.ingress.tls }}
|
|
||||||
- hosts:
|
|
||||||
{{- range .hosts }}
|
|
||||||
- {{ . | quote }}
|
|
||||||
{{- end }}
|
|
||||||
secretName: {{ .secretName }}
|
|
||||||
{{- end }}
|
|
||||||
{{- end }}
|
|
||||||
rules:
|
|
||||||
{{- range .Values.ingress.hosts }}
|
|
||||||
- host: {{ .host | quote }}
|
|
||||||
http:
|
|
||||||
paths:
|
|
||||||
{{- range .paths }}
|
|
||||||
- path: {{ .path }}
|
|
||||||
{{- if and .pathType (semverCompare ">=1.18-0" $.Capabilities.KubeVersion.GitVersion) }}
|
|
||||||
pathType: {{ .pathType }}
|
|
||||||
{{- end }}
|
|
||||||
backend:
|
|
||||||
{{- if semverCompare ">=1.19-0" $.Capabilities.KubeVersion.GitVersion }}
|
|
||||||
service:
|
|
||||||
name: {{ $fullName }}
|
|
||||||
port:
|
|
||||||
number: {{ $svcPort }}
|
|
||||||
{{- else }}
|
|
||||||
serviceName: {{ $fullName }}
|
|
||||||
servicePort: {{ $svcPort }}
|
|
||||||
{{- end }}
|
|
||||||
{{- end }}
|
|
||||||
{{- end }}
|
|
||||||
{{- end }}
|
|
||||||
|
|
@ -1,18 +0,0 @@
|
||||||
apiVersion: v1
|
|
||||||
kind: Service
|
|
||||||
metadata:
|
|
||||||
name: {{ include "videostorage.fullname" . }}
|
|
||||||
namespace: {{ .Release.Namespace }}
|
|
||||||
labels:
|
|
||||||
{{- include "videostorage.labels" . | nindent 4 }}
|
|
||||||
spec:
|
|
||||||
type: {{ .Values.service.type }}
|
|
||||||
ports:
|
|
||||||
{{- range $key, $val := .Values.service.ports }}
|
|
||||||
- port: {{ $val }}
|
|
||||||
targetPort: {{ $key }}
|
|
||||||
protocol: TCP
|
|
||||||
name: {{ $key }}
|
|
||||||
{{- end}}
|
|
||||||
selector:
|
|
||||||
{{- include "videostorage.selectorLabels" . | nindent 4 }}
|
|
||||||
|
|
@ -1,18 +0,0 @@
|
||||||
{{- if and ( .Capabilities.APIVersions.Has "monitoring.coreos.com/v1" ) ( .Values.serviceMonitor.enabled ) }}
|
|
||||||
apiVersion: monitoring.coreos.com/v1
|
|
||||||
kind: ServiceMonitor
|
|
||||||
metadata:
|
|
||||||
name: {{ include "videostorage.fullname" . }}
|
|
||||||
namespace: {{ .Release.Namespace }}
|
|
||||||
labels:
|
|
||||||
{{- include "videostorage.labels" . | nindent 4 }}
|
|
||||||
{{- if .Values.serviceMonitor.additionalLabels }}
|
|
||||||
{{- toYaml .Values.serviceMonitor.additionalLabels | nindent 4 }}
|
|
||||||
{{- end }}
|
|
||||||
spec:
|
|
||||||
endpoints:
|
|
||||||
{{- .Values.serviceMonitor.scrapeConfigs | toYaml | nindent 4 }}
|
|
||||||
selector:
|
|
||||||
matchLabels:
|
|
||||||
{{- include "videostorage.selectorLabels" . | nindent 6 }}
|
|
||||||
{{- end }}
|
|
||||||
|
|
@ -1,13 +0,0 @@
|
||||||
{{- if .Values.serviceAccount.create -}}
|
|
||||||
apiVersion: v1
|
|
||||||
kind: ServiceAccount
|
|
||||||
metadata:
|
|
||||||
name: {{ include "videostorage.serviceAccountName" . }}
|
|
||||||
namespace: {{ .Release.Namespace }}
|
|
||||||
labels:
|
|
||||||
{{- include "videostorage.labels" . | nindent 4 }}
|
|
||||||
{{- with .Values.serviceAccount.annotations }}
|
|
||||||
annotations:
|
|
||||||
{{- toYaml . | nindent 4 }}
|
|
||||||
{{- end }}
|
|
||||||
{{- end }}
|
|
||||||
|
|
@ -1,15 +0,0 @@
|
||||||
apiVersion: v1
|
|
||||||
kind: Pod
|
|
||||||
metadata:
|
|
||||||
name: "{{ include "videostorage.fullname" . }}-test-connection"
|
|
||||||
labels:
|
|
||||||
{{- include "videostorage.labels" . | nindent 4 }}
|
|
||||||
annotations:
|
|
||||||
"helm.sh/hook": test
|
|
||||||
spec:
|
|
||||||
containers:
|
|
||||||
- name: wget
|
|
||||||
image: busybox
|
|
||||||
command: ['wget']
|
|
||||||
args: ['{{ include "videostorage.fullname" . }}:{{ .Values.service.port }}']
|
|
||||||
restartPolicy: Never
|
|
||||||
|
|
@ -1,124 +0,0 @@
|
||||||
# Default values for openreplay.
|
|
||||||
# This is a YAML-formatted file.
|
|
||||||
# Declare variables to be passed into your templates.
|
|
||||||
|
|
||||||
replicaCount: 1
|
|
||||||
|
|
||||||
image:
|
|
||||||
repository: "{{ .Values.global.openReplayContainerRegistry }}/videostorage"
|
|
||||||
pullPolicy: IfNotPresent
|
|
||||||
# Overrides the image tag whose default is the chart appVersion.
|
|
||||||
tag: ""
|
|
||||||
|
|
||||||
imagePullSecrets: []
|
|
||||||
nameOverride: "videostorage"
|
|
||||||
fullnameOverride: "videostorage-openreplay"
|
|
||||||
|
|
||||||
serviceAccount:
|
|
||||||
# Specifies whether a service account should be created
|
|
||||||
create: true
|
|
||||||
# Annotations to add to the service account
|
|
||||||
annotations: {}
|
|
||||||
# The name of the service account to use.
|
|
||||||
# If not set and create is true, a name is generated using the fullname template
|
|
||||||
name: ""
|
|
||||||
|
|
||||||
podAnnotations: {}
|
|
||||||
|
|
||||||
securityContext:
|
|
||||||
runAsUser: 1001
|
|
||||||
runAsGroup: 1001
|
|
||||||
podSecurityContext:
|
|
||||||
runAsUser: 1001
|
|
||||||
runAsGroup: 1001
|
|
||||||
fsGroup: 1001
|
|
||||||
fsGroupChangePolicy: "OnRootMismatch"
|
|
||||||
|
|
||||||
#securityContext: {}
|
|
||||||
# capabilities:
|
|
||||||
# drop:
|
|
||||||
# - ALL
|
|
||||||
# readOnlyRootFilesystem: true
|
|
||||||
# runAsNonRoot: true
|
|
||||||
# runAsUser: 1000
|
|
||||||
|
|
||||||
service:
|
|
||||||
type: ClusterIP
|
|
||||||
ports:
|
|
||||||
http: 9000
|
|
||||||
metrics: 8888
|
|
||||||
|
|
||||||
serviceMonitor:
|
|
||||||
enabled: true
|
|
||||||
additionalLabels:
|
|
||||||
release: observability
|
|
||||||
scrapeConfigs:
|
|
||||||
- port: metrics
|
|
||||||
honorLabels: true
|
|
||||||
interval: 15s
|
|
||||||
path: /metrics
|
|
||||||
scheme: http
|
|
||||||
scrapeTimeout: 10s
|
|
||||||
|
|
||||||
ingress:
|
|
||||||
enabled: false
|
|
||||||
className: ""
|
|
||||||
annotations: {}
|
|
||||||
# kubernetes.io/ingress.class: nginx
|
|
||||||
# kubernetes.io/tls-acme: "true"
|
|
||||||
hosts:
|
|
||||||
- host: chart-example.local
|
|
||||||
paths:
|
|
||||||
- path: /
|
|
||||||
pathType: ImplementationSpecific
|
|
||||||
tls: []
|
|
||||||
# - secretName: chart-example-tls
|
|
||||||
# hosts:
|
|
||||||
# - chart-example.local
|
|
||||||
|
|
||||||
resources: {}
|
|
||||||
# We usually recommend not to specify default resources and to leave this as a conscious
|
|
||||||
# choice for the user. This also increases chances charts run on environments with little
|
|
||||||
# resources, such as Minikube. If you do want to specify resources, uncomment the following
|
|
||||||
# lines, adjust them as necessary, and remove the curly braces after 'resources:'.
|
|
||||||
# limits:
|
|
||||||
# cpu: 100m
|
|
||||||
# memory: 128Mi
|
|
||||||
# requests:
|
|
||||||
# cpu: 100m
|
|
||||||
# memory: 128Mi
|
|
||||||
|
|
||||||
autoscaling:
|
|
||||||
enabled: false
|
|
||||||
minReplicas: 1
|
|
||||||
maxReplicas: 5
|
|
||||||
targetCPUUtilizationPercentage: 80
|
|
||||||
# targetMemoryUtilizationPercentage: 80
|
|
||||||
|
|
||||||
env:
|
|
||||||
FS_CLEAN_HRS: 24
|
|
||||||
|
|
||||||
pvc:
|
|
||||||
# This can be either persistentVolumeClaim or hostPath.
|
|
||||||
# In case of pvc, you'll have to provide the pvc name.
|
|
||||||
# For example
|
|
||||||
# name: openreplay-efs
|
|
||||||
name: "{{ .Values.global.pvcRWXName }}"
|
|
||||||
hostMountPath: /openreplay/storage/nfs
|
|
||||||
|
|
||||||
persistence: {}
|
|
||||||
# # Spec of spec.template.spec.containers[*].volumeMounts
|
|
||||||
# mounts:
|
|
||||||
# - name: kafka-ssl
|
|
||||||
# mountPath: /opt/kafka/ssl
|
|
||||||
# # Spec of spec.template.spec.volumes
|
|
||||||
# volumes:
|
|
||||||
# - name: kafka-ssl
|
|
||||||
# secret:
|
|
||||||
# secretName: kafka-ssl
|
|
||||||
|
|
||||||
nodeSelector: {}
|
|
||||||
|
|
||||||
tolerations: []
|
|
||||||
|
|
||||||
affinity: {}
|
|
||||||
Loading…
Add table
Reference in a new issue