openreplay/backend/internal/assets/cacher/pool.go
Alexander 45c956c489
Json logs format (#1952)
* feat(backend): try a new approach for logs formatting (http)

* feat(backend): added logger module

* feat(backend): added project/session info to /i endpoint

* feat(backend): found a solution for correct caller information

* feat(backend): finished logs for http handlers

* feat(backend): finished logs for mobile http handlers

* feat(backend): finished ender

* feat(backend): finished assets

* feat(backend): finished heuristics

* feat(backend): finished image-storage

* feat(backend): finished sink

* feat(backend): finished storage

* feat(backend): formatted logs in all services

* feat(backend): finished foss part

* feat(backend): added missed foss part

* feat(backend): fixed panic in memory manager and sink service

* feat(backend): connectors
2024-03-14 12:51:14 +01:00

77 lines
1.1 KiB
Go

package cacher
import (
"sync"
)
type Task struct {
requestURL string
sessionID uint64
depth byte
urlContext string
isJS bool
cachePath string
retries int
}
type WorkerPool struct {
tasks chan *Task
wg sync.WaitGroup
done chan struct{}
term sync.Once
size int
job Job
}
func (p *WorkerPool) CanAddTask() bool {
if len(p.tasks) < cap(p.tasks) {
return true
}
return false
}
type Job func(task *Task)
func NewPool(size int, job Job) *WorkerPool {
newPool := &WorkerPool{
tasks: make(chan *Task, 128),
done: make(chan struct{}),
size: size,
job: job,
}
newPool.init()
return newPool
}
func (p *WorkerPool) init() {
p.wg.Add(p.size)
for i := 0; i < p.size; i++ {
go p.worker()
}
}
func (p *WorkerPool) worker() {
for {
select {
case newTask := <-p.tasks:
p.job(newTask)
case <-p.done:
p.wg.Done()
return
}
}
}
func (p *WorkerPool) AddTask(task *Task) {
if task.retries <= 0 {
return
}
p.tasks <- task
}
func (p *WorkerPool) Stop() {
p.term.Do(func() {
close(p.done)
})
p.wg.Wait()
}