diff --git a/backend/internal/canvas-handler/service.go b/backend/internal/canvas-handler/service.go index 204978d7d..982719856 100644 --- a/backend/internal/canvas-handler/service.go +++ b/backend/internal/canvas-handler/service.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "log" + "openreplay/backend/pkg/pool" "os" "sort" "strconv" @@ -14,22 +15,16 @@ import ( config "openreplay/backend/internal/config/imagestorage" ) -type Task struct { - sessionID uint64 // to generate path - name string - image *bytes.Buffer - isBreakTask bool -} - -func NewBreakTask() *Task { - return &Task{isBreakTask: true} +type saveTask struct { + sessionID uint64 + name string + image *bytes.Buffer } type ImageStorage struct { - cfg *config.Config - basePath string - writeToDiskTasks chan *Task - imageWorkerStopped chan struct{} + cfg *config.Config + basePath string + saverPool pool.WorkerPool } func New(cfg *config.Config) (*ImageStorage, error) { @@ -41,22 +36,16 @@ func New(cfg *config.Config) (*ImageStorage, error) { if cfg.CanvasDir != "" { path += cfg.CanvasDir + "/" } - newStorage := &ImageStorage{ - cfg: cfg, - basePath: path, - writeToDiskTasks: make(chan *Task, 1), - imageWorkerStopped: make(chan struct{}), + s := &ImageStorage{ + cfg: cfg, + basePath: path, } - go newStorage.runWorker() - return newStorage, nil + s.saverPool = pool.NewPool(4, 8, s.writeToDisk) + return s, nil } func (v *ImageStorage) Wait() { - // send stop signal - v.writeToDiskTasks <- NewBreakTask() - - // wait for workers to stop - <-v.imageWorkerStopped + v.saverPool.Pause() } func (v *ImageStorage) PrepareCanvasList(sessID uint64) ([]string, error) { @@ -140,11 +129,12 @@ func (v *ImageStorage) SaveCanvasToDisk(sessID uint64, data []byte) error { log.Printf("can't parse canvas message, err: %s", err) } // Use the same workflow - v.writeToDiskTasks <- &Task{sessionID: sessID, name: msg.Name, image: bytes.NewBuffer(msg.Data)} + v.saverPool.Submit(&saveTask{sessionID: sessID, name: msg.Name, image: bytes.NewBuffer(msg.Data)}) return nil } -func (v *ImageStorage) writeToDisk(task *Task) { +func (v *ImageStorage) writeToDisk(payload interface{}) { + task := payload.(*saveTask) path := fmt.Sprintf("%s/%d/", v.basePath, task.sessionID) // Ensure the directory exists @@ -165,16 +155,3 @@ func (v *ImageStorage) writeToDisk(task *Task) { log.Printf("new canvas image, sessID: %d, name: %s, size: %3.3f mb", task.sessionID, task.name, float64(task.image.Len())/1024.0/1024.0) return } - -func (v *ImageStorage) runWorker() { - for { - select { - case task := <-v.writeToDiskTasks: - if task.isBreakTask { - v.imageWorkerStopped <- struct{}{} - continue - } - v.writeToDisk(task) - } - } -} diff --git a/backend/internal/screenshot-handler/service.go b/backend/internal/screenshot-handler/service.go index 202638ff5..90771a1ae 100644 --- a/backend/internal/screenshot-handler/service.go +++ b/backend/internal/screenshot-handler/service.go @@ -7,6 +7,7 @@ import ( "io" "log" "openreplay/backend/pkg/objectstorage" + "openreplay/backend/pkg/pool" "os" "os/exec" "strconv" @@ -16,34 +17,22 @@ import ( config "openreplay/backend/internal/config/imagestorage" ) -type Task struct { - sessionID uint64 // to generate path - images map[string]*bytes.Buffer - isBreakTask bool +type saveTask struct { + sessionID uint64 + images map[string]*bytes.Buffer } -func NewBreakTask() *Task { - return &Task{isBreakTask: true} -} - -type UploadTask struct { - sessionID string - path string - name string - isBreakTask bool -} - -func NewBreakUploadTask() *UploadTask { - return &UploadTask{isBreakTask: true} +type uploadTask struct { + sessionID string + path string + name string } type ImageStorage struct { - cfg *config.Config - objStorage objectstorage.ObjectStorage - writeToDiskTasks chan *Task - sendToS3Tasks chan *UploadTask - imageWorkerStopped chan struct{} - uploadWorkersStopped chan struct{} + cfg *config.Config + objStorage objectstorage.ObjectStorage + saverPool pool.WorkerPool + uploaderPool pool.WorkerPool } func New(cfg *config.Config, objStorage objectstorage.ObjectStorage) (*ImageStorage, error) { @@ -51,26 +40,18 @@ func New(cfg *config.Config, objStorage objectstorage.ObjectStorage) (*ImageStor case cfg == nil: return nil, fmt.Errorf("config is empty") } - newStorage := &ImageStorage{ - cfg: cfg, - objStorage: objStorage, - writeToDiskTasks: make(chan *Task, 1), - sendToS3Tasks: make(chan *UploadTask, 1), - imageWorkerStopped: make(chan struct{}), - uploadWorkersStopped: make(chan struct{}), + s := &ImageStorage{ + cfg: cfg, + objStorage: objStorage, } - go newStorage.runWorker() - return newStorage, nil + s.saverPool = pool.NewPool(4, 8, s.writeToDisk) + s.uploaderPool = pool.NewPool(4, 4, s.sendToS3) + return s, nil } func (v *ImageStorage) Wait() { - // send stop signal - v.writeToDiskTasks <- NewBreakTask() - v.sendToS3Tasks <- NewBreakUploadTask() - - // wait for workers to stop - <-v.imageWorkerStopped - <-v.uploadWorkersStopped + v.saverPool.Pause() + v.uploaderPool.Pause() } func (v *ImageStorage) Process(sessID uint64, data []byte) error { @@ -102,12 +83,13 @@ func (v *ImageStorage) Process(sessID uint64, data []byte) error { } } - v.writeToDiskTasks <- &Task{sessionID: sessID, images: images} log.Printf("sessID: %d, arch size: %d, extracted archive in: %s", sessID, len(data), time.Since(start)) + v.saverPool.Submit(&saveTask{sessionID: sessID, images: images}) return nil } -func (v *ImageStorage) writeToDisk(task *Task) { +func (v *ImageStorage) writeToDisk(payload interface{}) { + task := payload.(*saveTask) // Build the directory path path := v.cfg.FSDir + "/" if v.cfg.ScreenshotsDir != "" { @@ -162,11 +144,12 @@ func (v *ImageStorage) PackScreenshots(sessID uint64, filesPath string) error { } log.Printf("packed replay in %v", time.Since(start)) - v.sendToS3Tasks <- &UploadTask{sessionID: sessionID, path: archPath, name: sessionID + "/replay.tar.zst"} + v.uploaderPool.Submit(&uploadTask{sessionID: sessionID, path: archPath, name: sessionID + "/replay.tar.zst"}) return nil } -func (v *ImageStorage) sendToS3(task *UploadTask) { +func (v *ImageStorage) sendToS3(payload interface{}) { + task := payload.(*uploadTask) start := time.Now() video, err := os.ReadFile(task.path) if err != nil { @@ -178,22 +161,3 @@ func (v *ImageStorage) sendToS3(task *UploadTask) { log.Printf("Replay file (size: %d) uploaded successfully in %v", len(video), time.Since(start)) return } - -func (v *ImageStorage) runWorker() { - for { - select { - case task := <-v.writeToDiskTasks: - if task.isBreakTask { - v.imageWorkerStopped <- struct{}{} - continue - } - v.writeToDisk(task) - case task := <-v.sendToS3Tasks: - if task.isBreakTask { - v.uploadWorkersStopped <- struct{}{} - continue - } - v.sendToS3(task) - } - } -} diff --git a/backend/internal/video-maker/service.go b/backend/internal/video-maker/service.go index 5a1db93b7..4182a76de 100644 --- a/backend/internal/video-maker/service.go +++ b/backend/internal/video-maker/service.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "log" + "openreplay/backend/pkg/pool" "os" "os/exec" "strconv" @@ -14,21 +15,15 @@ import ( "openreplay/backend/pkg/objectstorage" ) -type Task struct { - path string - name string - isBreakTask bool -} - -func NewBreakTask() *Task { - return &Task{isBreakTask: true} +type uploadTask struct { + path string + name string } type VideoStorage struct { - cfg *config.Config - objStorage objectstorage.ObjectStorage - sendToS3Tasks chan *Task - workersStopped chan struct{} + cfg *config.Config + objStorage objectstorage.ObjectStorage + uploaderPool pool.WorkerPool } func New(cfg *config.Config, objStorage objectstorage.ObjectStorage) (*VideoStorage, error) { @@ -38,14 +33,12 @@ func New(cfg *config.Config, objStorage objectstorage.ObjectStorage) (*VideoStor case objStorage == nil: return nil, fmt.Errorf("object storage is empty") } - newStorage := &VideoStorage{ - cfg: cfg, - objStorage: objStorage, - sendToS3Tasks: make(chan *Task, 1), - workersStopped: make(chan struct{}), + s := &VideoStorage{ + cfg: cfg, + objStorage: objStorage, } - go newStorage.runWorker() - return newStorage, nil + s.uploaderPool = pool.NewPool(4, 4, s.sendToS3) + return s, nil } func (v *VideoStorage) Process(sessID uint64, filesPath, canvasMix string) error { @@ -74,11 +67,12 @@ func (v *VideoStorage) Process(sessID uint64, filesPath, canvasMix string) error return err } log.Printf("made video replay in %v", time.Since(start)) - v.sendToS3Tasks <- &Task{path: videoPath, name: sessionID + "/" + name + ".mp4"} + v.uploaderPool.Submit(&uploadTask{path: videoPath, name: sessionID + "/" + name + ".mp4"}) return nil } -func (v *VideoStorage) sendToS3(task *Task) { +func (v *VideoStorage) sendToS3(payload interface{}) { + task := payload.(*uploadTask) start := time.Now() video, err := os.ReadFile(task.path) if err != nil { @@ -92,19 +86,5 @@ func (v *VideoStorage) sendToS3(task *Task) { } func (v *VideoStorage) Wait() { - v.sendToS3Tasks <- NewBreakTask() - <-v.workersStopped -} - -func (v *VideoStorage) runWorker() { - for { - select { - case task := <-v.sendToS3Tasks: - if task.isBreakTask { - v.workersStopped <- struct{}{} - continue - } - v.sendToS3(task) - } - } + v.uploaderPool.Pause() } diff --git a/backend/pkg/pool/worker-pool.go b/backend/pkg/pool/worker-pool.go new file mode 100644 index 000000000..791b0bb3b --- /dev/null +++ b/backend/pkg/pool/worker-pool.go @@ -0,0 +1,81 @@ +package pool + +import "sync" + +type task struct { + Payload interface{} + toStop bool +} + +func NewTask(payload interface{}) *task { + return &task{Payload: payload} +} + +func NewStopSignal() *task { + return &task{toStop: true} +} + +type workerPoolImpl struct { + wg *sync.WaitGroup + tasks chan *task + numberOfWorkers int + handler func(interface{}) +} + +type WorkerPool interface { + Submit(payload interface{}) + Pause() + Stop() +} + +func NewPool(numberOfWorkers, queueSize int, handler func(payload interface{})) *workerPoolImpl { + pool := &workerPoolImpl{ + wg: &sync.WaitGroup{}, + tasks: make(chan *task, queueSize), + numberOfWorkers: numberOfWorkers, + handler: handler, + } + pool.runWorkers() + return pool +} + +func (p *workerPoolImpl) runWorkers() { + for i := 0; i < p.numberOfWorkers; i++ { + p.wg.Add(1) + go p.worker() + } +} + +func (p *workerPoolImpl) Submit(payload interface{}) { + p.tasks <- NewTask(payload) +} + +func (p *workerPoolImpl) stop() { + for i := 0; i < p.numberOfWorkers; i++ { + p.tasks <- NewStopSignal() + } + p.wg.Wait() +} + +func (p *workerPoolImpl) Pause() { + p.stop() + p.runWorkers() +} + +func (p *workerPoolImpl) Stop() { + p.stop() + close(p.tasks) +} + +func (p *workerPoolImpl) worker() { + defer p.wg.Done() + for { + select { + case t := <-p.tasks: + if t.toStop { + return + } + p.handler(t.Payload) + } + } +}