Worker pool for image processing services (#1912)

* feat(backend): added worker pool for screenshot handler

* feat(backend): added worker pool to canvas handler

* feat(backend): added worker pool to canvas maker

* feat(backend): added missing method to canvas-maker service
This commit is contained in:
Alexander 2024-02-27 13:48:27 +01:00 committed by GitHub
parent 20f9da5e84
commit 36fb1a07f7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 140 additions and 138 deletions

View file

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"io" "io"
"log" "log"
"openreplay/backend/pkg/pool"
"os" "os"
"sort" "sort"
"strconv" "strconv"
@ -14,22 +15,16 @@ import (
config "openreplay/backend/internal/config/imagestorage" config "openreplay/backend/internal/config/imagestorage"
) )
type Task struct { type saveTask struct {
sessionID uint64 // to generate path sessionID uint64
name string name string
image *bytes.Buffer image *bytes.Buffer
isBreakTask bool
}
func NewBreakTask() *Task {
return &Task{isBreakTask: true}
} }
type ImageStorage struct { type ImageStorage struct {
cfg *config.Config cfg *config.Config
basePath string basePath string
writeToDiskTasks chan *Task saverPool pool.WorkerPool
imageWorkerStopped chan struct{}
} }
func New(cfg *config.Config) (*ImageStorage, error) { func New(cfg *config.Config) (*ImageStorage, error) {
@ -41,22 +36,16 @@ func New(cfg *config.Config) (*ImageStorage, error) {
if cfg.CanvasDir != "" { if cfg.CanvasDir != "" {
path += cfg.CanvasDir + "/" path += cfg.CanvasDir + "/"
} }
newStorage := &ImageStorage{ s := &ImageStorage{
cfg: cfg, cfg: cfg,
basePath: path, basePath: path,
writeToDiskTasks: make(chan *Task, 1),
imageWorkerStopped: make(chan struct{}),
} }
go newStorage.runWorker() s.saverPool = pool.NewPool(4, 8, s.writeToDisk)
return newStorage, nil return s, nil
} }
func (v *ImageStorage) Wait() { func (v *ImageStorage) Wait() {
// send stop signal v.saverPool.Pause()
v.writeToDiskTasks <- NewBreakTask()
// wait for workers to stop
<-v.imageWorkerStopped
} }
func (v *ImageStorage) PrepareCanvasList(sessID uint64) ([]string, error) { func (v *ImageStorage) PrepareCanvasList(sessID uint64) ([]string, error) {
@ -140,11 +129,12 @@ func (v *ImageStorage) SaveCanvasToDisk(sessID uint64, data []byte) error {
log.Printf("can't parse canvas message, err: %s", err) log.Printf("can't parse canvas message, err: %s", err)
} }
// Use the same workflow // Use the same workflow
v.writeToDiskTasks <- &Task{sessionID: sessID, name: msg.Name, image: bytes.NewBuffer(msg.Data)} v.saverPool.Submit(&saveTask{sessionID: sessID, name: msg.Name, image: bytes.NewBuffer(msg.Data)})
return nil return nil
} }
func (v *ImageStorage) writeToDisk(task *Task) { func (v *ImageStorage) writeToDisk(payload interface{}) {
task := payload.(*saveTask)
path := fmt.Sprintf("%s/%d/", v.basePath, task.sessionID) path := fmt.Sprintf("%s/%d/", v.basePath, task.sessionID)
// Ensure the directory exists // Ensure the directory exists
@ -165,16 +155,3 @@ func (v *ImageStorage) writeToDisk(task *Task) {
log.Printf("new canvas image, sessID: %d, name: %s, size: %3.3f mb", task.sessionID, task.name, float64(task.image.Len())/1024.0/1024.0) log.Printf("new canvas image, sessID: %d, name: %s, size: %3.3f mb", task.sessionID, task.name, float64(task.image.Len())/1024.0/1024.0)
return return
} }
func (v *ImageStorage) runWorker() {
for {
select {
case task := <-v.writeToDiskTasks:
if task.isBreakTask {
v.imageWorkerStopped <- struct{}{}
continue
}
v.writeToDisk(task)
}
}
}

View file

@ -7,6 +7,7 @@ import (
"io" "io"
"log" "log"
"openreplay/backend/pkg/objectstorage" "openreplay/backend/pkg/objectstorage"
"openreplay/backend/pkg/pool"
"os" "os"
"os/exec" "os/exec"
"strconv" "strconv"
@ -16,34 +17,22 @@ import (
config "openreplay/backend/internal/config/imagestorage" config "openreplay/backend/internal/config/imagestorage"
) )
type Task struct { type saveTask struct {
sessionID uint64 // to generate path sessionID uint64
images map[string]*bytes.Buffer images map[string]*bytes.Buffer
isBreakTask bool
} }
func NewBreakTask() *Task { type uploadTask struct {
return &Task{isBreakTask: true} sessionID string
} path string
name string
type UploadTask struct {
sessionID string
path string
name string
isBreakTask bool
}
func NewBreakUploadTask() *UploadTask {
return &UploadTask{isBreakTask: true}
} }
type ImageStorage struct { type ImageStorage struct {
cfg *config.Config cfg *config.Config
objStorage objectstorage.ObjectStorage objStorage objectstorage.ObjectStorage
writeToDiskTasks chan *Task saverPool pool.WorkerPool
sendToS3Tasks chan *UploadTask uploaderPool pool.WorkerPool
imageWorkerStopped chan struct{}
uploadWorkersStopped chan struct{}
} }
func New(cfg *config.Config, objStorage objectstorage.ObjectStorage) (*ImageStorage, error) { func New(cfg *config.Config, objStorage objectstorage.ObjectStorage) (*ImageStorage, error) {
@ -51,26 +40,18 @@ func New(cfg *config.Config, objStorage objectstorage.ObjectStorage) (*ImageStor
case cfg == nil: case cfg == nil:
return nil, fmt.Errorf("config is empty") return nil, fmt.Errorf("config is empty")
} }
newStorage := &ImageStorage{ s := &ImageStorage{
cfg: cfg, cfg: cfg,
objStorage: objStorage, objStorage: objStorage,
writeToDiskTasks: make(chan *Task, 1),
sendToS3Tasks: make(chan *UploadTask, 1),
imageWorkerStopped: make(chan struct{}),
uploadWorkersStopped: make(chan struct{}),
} }
go newStorage.runWorker() s.saverPool = pool.NewPool(4, 8, s.writeToDisk)
return newStorage, nil s.uploaderPool = pool.NewPool(4, 4, s.sendToS3)
return s, nil
} }
func (v *ImageStorage) Wait() { func (v *ImageStorage) Wait() {
// send stop signal v.saverPool.Pause()
v.writeToDiskTasks <- NewBreakTask() v.uploaderPool.Pause()
v.sendToS3Tasks <- NewBreakUploadTask()
// wait for workers to stop
<-v.imageWorkerStopped
<-v.uploadWorkersStopped
} }
func (v *ImageStorage) Process(sessID uint64, data []byte) error { func (v *ImageStorage) Process(sessID uint64, data []byte) error {
@ -102,12 +83,13 @@ func (v *ImageStorage) Process(sessID uint64, data []byte) error {
} }
} }
v.writeToDiskTasks <- &Task{sessionID: sessID, images: images}
log.Printf("sessID: %d, arch size: %d, extracted archive in: %s", sessID, len(data), time.Since(start)) log.Printf("sessID: %d, arch size: %d, extracted archive in: %s", sessID, len(data), time.Since(start))
v.saverPool.Submit(&saveTask{sessionID: sessID, images: images})
return nil return nil
} }
func (v *ImageStorage) writeToDisk(task *Task) { func (v *ImageStorage) writeToDisk(payload interface{}) {
task := payload.(*saveTask)
// Build the directory path // Build the directory path
path := v.cfg.FSDir + "/" path := v.cfg.FSDir + "/"
if v.cfg.ScreenshotsDir != "" { if v.cfg.ScreenshotsDir != "" {
@ -162,11 +144,12 @@ func (v *ImageStorage) PackScreenshots(sessID uint64, filesPath string) error {
} }
log.Printf("packed replay in %v", time.Since(start)) log.Printf("packed replay in %v", time.Since(start))
v.sendToS3Tasks <- &UploadTask{sessionID: sessionID, path: archPath, name: sessionID + "/replay.tar.zst"} v.uploaderPool.Submit(&uploadTask{sessionID: sessionID, path: archPath, name: sessionID + "/replay.tar.zst"})
return nil return nil
} }
func (v *ImageStorage) sendToS3(task *UploadTask) { func (v *ImageStorage) sendToS3(payload interface{}) {
task := payload.(*uploadTask)
start := time.Now() start := time.Now()
video, err := os.ReadFile(task.path) video, err := os.ReadFile(task.path)
if err != nil { if err != nil {
@ -178,22 +161,3 @@ func (v *ImageStorage) sendToS3(task *UploadTask) {
log.Printf("Replay file (size: %d) uploaded successfully in %v", len(video), time.Since(start)) log.Printf("Replay file (size: %d) uploaded successfully in %v", len(video), time.Since(start))
return return
} }
func (v *ImageStorage) runWorker() {
for {
select {
case task := <-v.writeToDiskTasks:
if task.isBreakTask {
v.imageWorkerStopped <- struct{}{}
continue
}
v.writeToDisk(task)
case task := <-v.sendToS3Tasks:
if task.isBreakTask {
v.uploadWorkersStopped <- struct{}{}
continue
}
v.sendToS3(task)
}
}
}

View file

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"log" "log"
"openreplay/backend/pkg/pool"
"os" "os"
"os/exec" "os/exec"
"strconv" "strconv"
@ -14,21 +15,15 @@ import (
"openreplay/backend/pkg/objectstorage" "openreplay/backend/pkg/objectstorage"
) )
type Task struct { type uploadTask struct {
path string path string
name string name string
isBreakTask bool
}
func NewBreakTask() *Task {
return &Task{isBreakTask: true}
} }
type VideoStorage struct { type VideoStorage struct {
cfg *config.Config cfg *config.Config
objStorage objectstorage.ObjectStorage objStorage objectstorage.ObjectStorage
sendToS3Tasks chan *Task uploaderPool pool.WorkerPool
workersStopped chan struct{}
} }
func New(cfg *config.Config, objStorage objectstorage.ObjectStorage) (*VideoStorage, error) { func New(cfg *config.Config, objStorage objectstorage.ObjectStorage) (*VideoStorage, error) {
@ -38,14 +33,12 @@ func New(cfg *config.Config, objStorage objectstorage.ObjectStorage) (*VideoStor
case objStorage == nil: case objStorage == nil:
return nil, fmt.Errorf("object storage is empty") return nil, fmt.Errorf("object storage is empty")
} }
newStorage := &VideoStorage{ s := &VideoStorage{
cfg: cfg, cfg: cfg,
objStorage: objStorage, objStorage: objStorage,
sendToS3Tasks: make(chan *Task, 1),
workersStopped: make(chan struct{}),
} }
go newStorage.runWorker() s.uploaderPool = pool.NewPool(4, 4, s.sendToS3)
return newStorage, nil return s, nil
} }
func (v *VideoStorage) Process(sessID uint64, filesPath, canvasMix string) error { func (v *VideoStorage) Process(sessID uint64, filesPath, canvasMix string) error {
@ -74,11 +67,12 @@ func (v *VideoStorage) Process(sessID uint64, filesPath, canvasMix string) error
return err return err
} }
log.Printf("made video replay in %v", time.Since(start)) log.Printf("made video replay in %v", time.Since(start))
v.sendToS3Tasks <- &Task{path: videoPath, name: sessionID + "/" + name + ".mp4"} v.uploaderPool.Submit(&uploadTask{path: videoPath, name: sessionID + "/" + name + ".mp4"})
return nil return nil
} }
func (v *VideoStorage) sendToS3(task *Task) { func (v *VideoStorage) sendToS3(payload interface{}) {
task := payload.(*uploadTask)
start := time.Now() start := time.Now()
video, err := os.ReadFile(task.path) video, err := os.ReadFile(task.path)
if err != nil { if err != nil {
@ -92,19 +86,5 @@ func (v *VideoStorage) sendToS3(task *Task) {
} }
func (v *VideoStorage) Wait() { func (v *VideoStorage) Wait() {
v.sendToS3Tasks <- NewBreakTask() v.uploaderPool.Pause()
<-v.workersStopped
}
func (v *VideoStorage) runWorker() {
for {
select {
case task := <-v.sendToS3Tasks:
if task.isBreakTask {
v.workersStopped <- struct{}{}
continue
}
v.sendToS3(task)
}
}
} }

View file

@ -0,0 +1,81 @@
package pool
import "sync"
type task struct {
Payload interface{}
toStop bool
}
func NewTask(payload interface{}) *task {
return &task{Payload: payload}
}
func NewStopSignal() *task {
return &task{toStop: true}
}
type workerPoolImpl struct {
wg *sync.WaitGroup
tasks chan *task
numberOfWorkers int
handler func(interface{})
}
type WorkerPool interface {
Submit(payload interface{})
Pause()
Stop()
}
func NewPool(numberOfWorkers, queueSize int, handler func(payload interface{})) *workerPoolImpl {
pool := &workerPoolImpl{
wg: &sync.WaitGroup{},
tasks: make(chan *task, queueSize),
numberOfWorkers: numberOfWorkers,
handler: handler,
}
pool.runWorkers()
return pool
}
func (p *workerPoolImpl) runWorkers() {
for i := 0; i < p.numberOfWorkers; i++ {
p.wg.Add(1)
go p.worker()
}
}
func (p *workerPoolImpl) Submit(payload interface{}) {
p.tasks <- NewTask(payload)
}
func (p *workerPoolImpl) stop() {
for i := 0; i < p.numberOfWorkers; i++ {
p.tasks <- NewStopSignal()
}
p.wg.Wait()
}
func (p *workerPoolImpl) Pause() {
p.stop()
p.runWorkers()
}
func (p *workerPoolImpl) Stop() {
p.stop()
close(p.tasks)
}
func (p *workerPoolImpl) worker() {
defer p.wg.Done()
for {
select {
case t := <-p.tasks:
if t.toStop {
return
}
p.handler(t.Payload)
}
}
}