* feat(backend): added worker pool for screenshot handler * feat(backend): added worker pool to canvas handler * feat(backend): added worker pool to canvas maker * feat(backend): added missing method to canvas-maker service
81 lines
1.3 KiB
Go
81 lines
1.3 KiB
Go
package pool
|
|
|
|
import "sync"
|
|
|
|
type task struct {
|
|
Payload interface{}
|
|
toStop bool
|
|
}
|
|
|
|
func NewTask(payload interface{}) *task {
|
|
return &task{Payload: payload}
|
|
}
|
|
|
|
func NewStopSignal() *task {
|
|
return &task{toStop: true}
|
|
}
|
|
|
|
type workerPoolImpl struct {
|
|
wg *sync.WaitGroup
|
|
tasks chan *task
|
|
numberOfWorkers int
|
|
handler func(interface{})
|
|
}
|
|
|
|
type WorkerPool interface {
|
|
Submit(payload interface{})
|
|
Pause()
|
|
Stop()
|
|
}
|
|
|
|
func NewPool(numberOfWorkers, queueSize int, handler func(payload interface{})) *workerPoolImpl {
|
|
pool := &workerPoolImpl{
|
|
wg: &sync.WaitGroup{},
|
|
tasks: make(chan *task, queueSize),
|
|
numberOfWorkers: numberOfWorkers,
|
|
handler: handler,
|
|
}
|
|
pool.runWorkers()
|
|
return pool
|
|
}
|
|
|
|
func (p *workerPoolImpl) runWorkers() {
|
|
for i := 0; i < p.numberOfWorkers; i++ {
|
|
p.wg.Add(1)
|
|
go p.worker()
|
|
}
|
|
}
|
|
|
|
func (p *workerPoolImpl) Submit(payload interface{}) {
|
|
p.tasks <- NewTask(payload)
|
|
}
|
|
|
|
func (p *workerPoolImpl) stop() {
|
|
for i := 0; i < p.numberOfWorkers; i++ {
|
|
p.tasks <- NewStopSignal()
|
|
}
|
|
p.wg.Wait()
|
|
}
|
|
|
|
func (p *workerPoolImpl) Pause() {
|
|
p.stop()
|
|
p.runWorkers()
|
|
}
|
|
|
|
func (p *workerPoolImpl) Stop() {
|
|
p.stop()
|
|
close(p.tasks)
|
|
}
|
|
|
|
func (p *workerPoolImpl) worker() {
|
|
defer p.wg.Done()
|
|
for {
|
|
select {
|
|
case t := <-p.tasks:
|
|
if t.toStop {
|
|
return
|
|
}
|
|
p.handler(t.Payload)
|
|
}
|
|
}
|
|
}
|