openreplay/backend/pkg/pool/worker-pool.go
Alexander 36fb1a07f7
Worker pool for image processing services (#1912)
* feat(backend): added worker pool for screenshot handler

* feat(backend): added worker pool to canvas handler

* feat(backend): added worker pool to canvas maker

* feat(backend): added missing method to canvas-maker service
2024-02-27 13:48:27 +01:00

81 lines
1.3 KiB
Go

package pool
import "sync"
type task struct {
Payload interface{}
toStop bool
}
func NewTask(payload interface{}) *task {
return &task{Payload: payload}
}
func NewStopSignal() *task {
return &task{toStop: true}
}
type workerPoolImpl struct {
wg *sync.WaitGroup
tasks chan *task
numberOfWorkers int
handler func(interface{})
}
type WorkerPool interface {
Submit(payload interface{})
Pause()
Stop()
}
func NewPool(numberOfWorkers, queueSize int, handler func(payload interface{})) *workerPoolImpl {
pool := &workerPoolImpl{
wg: &sync.WaitGroup{},
tasks: make(chan *task, queueSize),
numberOfWorkers: numberOfWorkers,
handler: handler,
}
pool.runWorkers()
return pool
}
func (p *workerPoolImpl) runWorkers() {
for i := 0; i < p.numberOfWorkers; i++ {
p.wg.Add(1)
go p.worker()
}
}
func (p *workerPoolImpl) Submit(payload interface{}) {
p.tasks <- NewTask(payload)
}
func (p *workerPoolImpl) stop() {
for i := 0; i < p.numberOfWorkers; i++ {
p.tasks <- NewStopSignal()
}
p.wg.Wait()
}
func (p *workerPoolImpl) Pause() {
p.stop()
p.runWorkers()
}
func (p *workerPoolImpl) Stop() {
p.stop()
close(p.tasks)
}
func (p *workerPoolImpl) worker() {
defer p.wg.Done()
for {
select {
case t := <-p.tasks:
if t.toStop {
return
}
p.handler(t.Payload)
}
}
}