From 8bf1dcba182d5ca551469b18fdf58ba0f5b93179 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 6 Oct 2022 14:10:25 +0200 Subject: [PATCH] Workers pool for assets (#763) * feat(backend): added workers pool to assets --- backend/cmd/assets/main.go | 5 +- backend/internal/assets/cacher/cacher.go | 77 ++++++++++++++++++------ backend/internal/assets/cacher/pool.go | 76 +++++++++++++++++++++++ 3 files changed, 138 insertions(+), 20 deletions(-) create mode 100644 backend/internal/assets/cacher/pool.go diff --git a/backend/cmd/assets/main.go b/backend/cmd/assets/main.go index 04dc5e634..0e5f80e55 100644 --- a/backend/cmd/assets/main.go +++ b/backend/cmd/assets/main.go @@ -68,7 +68,7 @@ func main() { select { case sig := <-sigchan: log.Printf("Caught signal %v: terminating\n", sig) - // TODO: wait assets workers here + cacher.Stop() msgConsumer.Close() os.Exit(0) case err := <-cacher.Errors: @@ -76,6 +76,9 @@ func main() { case <-tick: cacher.UpdateTimeouts() default: + if !cacher.CanCache() { + continue + } if err := msgConsumer.ConsumeNext(); err != nil { log.Fatalf("Error on consumption: %v", err) } diff --git a/backend/internal/assets/cacher/cacher.go b/backend/internal/assets/cacher/cacher.go index 619c28c7c..4d14a7dc8 100644 --- a/backend/internal/assets/cacher/cacher.go +++ b/backend/internal/assets/cacher/cacher.go @@ -33,6 +33,11 @@ type cacher struct { sizeLimit int downloadedAssets syncfloat64.Counter requestHeaders map[string]string + workers *WorkerPool +} + +func (c *cacher) CanCache() bool { + return c.workers.CanAddTask() } func NewCacher(cfg *config.Config, metrics *monitoring.Metrics) *cacher { @@ -44,7 +49,7 @@ func NewCacher(cfg *config.Config, metrics *monitoring.Metrics) *cacher { if err != nil { log.Printf("can't create downloaded_assets metric: %s", err) } - return &cacher{ + c := &cacher{ timeoutMap: newTimeoutMap(), s3: storage.NewS3(cfg.AWSRegion, cfg.S3BucketAssets), httpClient: &http.Client{ @@ -60,24 +65,15 @@ func NewCacher(cfg *config.Config, metrics *monitoring.Metrics) *cacher { downloadedAssets: downloadedAssets, requestHeaders: cfg.AssetsRequestHeaders, } + c.workers = NewPool(32, c.CacheFile) + return c } -func (c *cacher) cacheURL(requestURL string, sessionID uint64, depth byte, urlContext string, isJS bool) { - var cachePath string - if isJS { - cachePath = assets.GetCachePathForJS(requestURL) - } else { - cachePath = assets.GetCachePathForAssets(sessionID, requestURL) - } - if c.timeoutMap.contains(cachePath) { - return - } - c.timeoutMap.add(cachePath) - crTime := c.s3.GetCreationTime(cachePath) - if crTime != nil && crTime.After(time.Now().Add(-MAX_STORAGE_TIME)) { // recently uploaded - return - } +func (c *cacher) CacheFile(task *Task) { + c.cacheURL(task.requestURL, task.sessionID, task.depth, task.urlContext, task.cachePath) +} +func (c *cacher) cacheURL(requestURL string, sessionID uint64, depth byte, urlContext, cachePath string) { req, _ := http.NewRequest("GET", requestURL, nil) req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20100101 Firefox/31.0") for k, v := range c.requestHeaders { @@ -127,7 +123,13 @@ func (c *cacher) cacheURL(requestURL string, sessionID uint64, depth byte, urlCo if depth > 0 { for _, extractedURL := range assets.ExtractURLsFromCSS(string(data)) { if fullURL, cachable := assets.GetFullCachableURL(requestURL, extractedURL); cachable { - go c.cacheURL(fullURL, sessionID, depth-1, urlContext+"\n -> "+fullURL, false) + c.checkTask(&Task{ + requestURL: fullURL, + sessionID: sessionID, + depth: depth - 1, + urlContext: urlContext + "\n -> " + fullURL, + isJS: false, + }) } } if err != nil { @@ -142,14 +144,51 @@ func (c *cacher) cacheURL(requestURL string, sessionID uint64, depth byte, urlCo return } +func (c *cacher) checkTask(newTask *Task) { + // check if file was recently uploaded + var cachePath string + if newTask.isJS { + cachePath = assets.GetCachePathForJS(newTask.requestURL) + } else { + cachePath = assets.GetCachePathForAssets(newTask.sessionID, newTask.requestURL) + } + if c.timeoutMap.contains(cachePath) { + return + } + c.timeoutMap.add(cachePath) + crTime := c.s3.GetCreationTime(cachePath) + if crTime != nil && crTime.After(time.Now().Add(-MAX_STORAGE_TIME)) { + return + } + // add new file in queue to download + newTask.cachePath = cachePath + c.workers.AddTask(newTask) +} + func (c *cacher) CacheJSFile(sourceURL string) { - go c.cacheURL(sourceURL, 0, 0, sourceURL, true) + c.checkTask(&Task{ + requestURL: sourceURL, + sessionID: 0, + depth: 0, + urlContext: sourceURL, + isJS: true, + }) } func (c *cacher) CacheURL(sessionID uint64, fullURL string) { - go c.cacheURL(fullURL, sessionID, MAX_CACHE_DEPTH, fullURL, false) + c.checkTask(&Task{ + requestURL: fullURL, + sessionID: sessionID, + depth: MAX_CACHE_DEPTH, + urlContext: fullURL, + isJS: false, + }) } func (c *cacher) UpdateTimeouts() { c.timeoutMap.deleteOutdated() } + +func (c *cacher) Stop() { + c.workers.Stop() +} diff --git a/backend/internal/assets/cacher/pool.go b/backend/internal/assets/cacher/pool.go new file mode 100644 index 000000000..00ce1a63d --- /dev/null +++ b/backend/internal/assets/cacher/pool.go @@ -0,0 +1,76 @@ +package cacher + +import ( + "log" + "sync" +) + +type Task struct { + requestURL string + sessionID uint64 + depth byte + urlContext string + isJS bool + cachePath string +} + +type WorkerPool struct { + tasks chan *Task + wg sync.WaitGroup + done chan struct{} + term sync.Once + size int + job Job +} + +func (p *WorkerPool) CanAddTask() bool { + if len(p.tasks) < cap(p.tasks) { + return true + } + return false +} + +type Job func(task *Task) + +func NewPool(size int, job Job) *WorkerPool { + newPool := &WorkerPool{ + tasks: make(chan *Task, 128), + done: make(chan struct{}), + size: size, + job: job, + } + newPool.init() + return newPool +} + +func (p *WorkerPool) init() { + p.wg.Add(p.size) + for i := 0; i < p.size; i++ { + go p.worker() + } +} + +func (p *WorkerPool) worker() { + for { + select { + case newTask := <-p.tasks: + p.job(newTask) + case <-p.done: + p.wg.Done() + return + } + } +} + +func (p *WorkerPool) AddTask(newTask *Task) { + p.tasks <- newTask +} + +func (p *WorkerPool) Stop() { + log.Printf("stopping workers") + p.term.Do(func() { + close(p.done) + }) + p.wg.Wait() + log.Printf("all workers have been stopped") +}