Workers pool for assets (#763)

* feat(backend): added workers pool to assets
This commit is contained in:
Alexander 2022-10-06 14:10:25 +02:00 committed by GitHub
parent 6eb3e82516
commit 8bf1dcba18
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 138 additions and 20 deletions

View file

@ -68,7 +68,7 @@ func main() {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
// TODO: wait assets workers here
cacher.Stop()
msgConsumer.Close()
os.Exit(0)
case err := <-cacher.Errors:
@ -76,6 +76,9 @@ func main() {
case <-tick:
cacher.UpdateTimeouts()
default:
if !cacher.CanCache() {
continue
}
if err := msgConsumer.ConsumeNext(); err != nil {
log.Fatalf("Error on consumption: %v", err)
}

View file

@ -33,6 +33,11 @@ type cacher struct {
sizeLimit int
downloadedAssets syncfloat64.Counter
requestHeaders map[string]string
workers *WorkerPool
}
func (c *cacher) CanCache() bool {
return c.workers.CanAddTask()
}
func NewCacher(cfg *config.Config, metrics *monitoring.Metrics) *cacher {
@ -44,7 +49,7 @@ func NewCacher(cfg *config.Config, metrics *monitoring.Metrics) *cacher {
if err != nil {
log.Printf("can't create downloaded_assets metric: %s", err)
}
return &cacher{
c := &cacher{
timeoutMap: newTimeoutMap(),
s3: storage.NewS3(cfg.AWSRegion, cfg.S3BucketAssets),
httpClient: &http.Client{
@ -60,24 +65,15 @@ func NewCacher(cfg *config.Config, metrics *monitoring.Metrics) *cacher {
downloadedAssets: downloadedAssets,
requestHeaders: cfg.AssetsRequestHeaders,
}
c.workers = NewPool(32, c.CacheFile)
return c
}
func (c *cacher) cacheURL(requestURL string, sessionID uint64, depth byte, urlContext string, isJS bool) {
var cachePath string
if isJS {
cachePath = assets.GetCachePathForJS(requestURL)
} else {
cachePath = assets.GetCachePathForAssets(sessionID, requestURL)
}
if c.timeoutMap.contains(cachePath) {
return
}
c.timeoutMap.add(cachePath)
crTime := c.s3.GetCreationTime(cachePath)
if crTime != nil && crTime.After(time.Now().Add(-MAX_STORAGE_TIME)) { // recently uploaded
return
}
func (c *cacher) CacheFile(task *Task) {
c.cacheURL(task.requestURL, task.sessionID, task.depth, task.urlContext, task.cachePath)
}
func (c *cacher) cacheURL(requestURL string, sessionID uint64, depth byte, urlContext, cachePath string) {
req, _ := http.NewRequest("GET", requestURL, nil)
req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20100101 Firefox/31.0")
for k, v := range c.requestHeaders {
@ -127,7 +123,13 @@ func (c *cacher) cacheURL(requestURL string, sessionID uint64, depth byte, urlCo
if depth > 0 {
for _, extractedURL := range assets.ExtractURLsFromCSS(string(data)) {
if fullURL, cachable := assets.GetFullCachableURL(requestURL, extractedURL); cachable {
go c.cacheURL(fullURL, sessionID, depth-1, urlContext+"\n -> "+fullURL, false)
c.checkTask(&Task{
requestURL: fullURL,
sessionID: sessionID,
depth: depth - 1,
urlContext: urlContext + "\n -> " + fullURL,
isJS: false,
})
}
}
if err != nil {
@ -142,14 +144,51 @@ func (c *cacher) cacheURL(requestURL string, sessionID uint64, depth byte, urlCo
return
}
func (c *cacher) checkTask(newTask *Task) {
// check if file was recently uploaded
var cachePath string
if newTask.isJS {
cachePath = assets.GetCachePathForJS(newTask.requestURL)
} else {
cachePath = assets.GetCachePathForAssets(newTask.sessionID, newTask.requestURL)
}
if c.timeoutMap.contains(cachePath) {
return
}
c.timeoutMap.add(cachePath)
crTime := c.s3.GetCreationTime(cachePath)
if crTime != nil && crTime.After(time.Now().Add(-MAX_STORAGE_TIME)) {
return
}
// add new file in queue to download
newTask.cachePath = cachePath
c.workers.AddTask(newTask)
}
func (c *cacher) CacheJSFile(sourceURL string) {
go c.cacheURL(sourceURL, 0, 0, sourceURL, true)
c.checkTask(&Task{
requestURL: sourceURL,
sessionID: 0,
depth: 0,
urlContext: sourceURL,
isJS: true,
})
}
func (c *cacher) CacheURL(sessionID uint64, fullURL string) {
go c.cacheURL(fullURL, sessionID, MAX_CACHE_DEPTH, fullURL, false)
c.checkTask(&Task{
requestURL: fullURL,
sessionID: sessionID,
depth: MAX_CACHE_DEPTH,
urlContext: fullURL,
isJS: false,
})
}
func (c *cacher) UpdateTimeouts() {
c.timeoutMap.deleteOutdated()
}
func (c *cacher) Stop() {
c.workers.Stop()
}

View file

@ -0,0 +1,76 @@
package cacher
import (
"log"
"sync"
)
type Task struct {
requestURL string
sessionID uint64
depth byte
urlContext string
isJS bool
cachePath string
}
type WorkerPool struct {
tasks chan *Task
wg sync.WaitGroup
done chan struct{}
term sync.Once
size int
job Job
}
func (p *WorkerPool) CanAddTask() bool {
if len(p.tasks) < cap(p.tasks) {
return true
}
return false
}
type Job func(task *Task)
func NewPool(size int, job Job) *WorkerPool {
newPool := &WorkerPool{
tasks: make(chan *Task, 128),
done: make(chan struct{}),
size: size,
job: job,
}
newPool.init()
return newPool
}
func (p *WorkerPool) init() {
p.wg.Add(p.size)
for i := 0; i < p.size; i++ {
go p.worker()
}
}
func (p *WorkerPool) worker() {
for {
select {
case newTask := <-p.tasks:
p.job(newTask)
case <-p.done:
p.wg.Done()
return
}
}
}
func (p *WorkerPool) AddTask(newTask *Task) {
p.tasks <- newTask
}
func (p *WorkerPool) Stop() {
log.Printf("stopping workers")
p.term.Do(func() {
close(p.done)
})
p.wg.Wait()
log.Printf("all workers have been stopped")
}