feat(backend): added assets retrier
This commit is contained in:
parent
2a1242ef7d
commit
72b7a9c869
2 changed files with 36 additions and 20 deletions
|
|
@ -70,33 +70,42 @@ func NewCacher(cfg *config.Config, metrics *monitoring.Metrics) *cacher {
|
|||
}
|
||||
|
||||
func (c *cacher) CacheFile(task *Task) {
|
||||
c.cacheURL(task.requestURL, task.sessionID, task.depth, task.urlContext, task.cachePath)
|
||||
c.cacheURL(task)
|
||||
}
|
||||
|
||||
func (c *cacher) cacheURL(requestURL string, sessionID uint64, depth byte, urlContext, cachePath string) {
|
||||
req, _ := http.NewRequest("GET", requestURL, nil)
|
||||
func (c *cacher) cacheURL(t *Task) {
|
||||
t.retries--
|
||||
//requestURL sessionID uint64, depth byte, urlContext, cachePath string) {
|
||||
req, _ := http.NewRequest("GET", t.requestURL, nil)
|
||||
req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20100101 Firefox/31.0")
|
||||
for k, v := range c.requestHeaders {
|
||||
req.Header.Set(k, v)
|
||||
}
|
||||
res, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
c.Errors <- errors.Wrap(err, urlContext)
|
||||
c.Errors <- errors.Wrap(err, t.urlContext)
|
||||
return
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode >= 400 {
|
||||
// TODO: retry
|
||||
c.Errors <- errors.Wrap(fmt.Errorf("Status code is %v, ", res.StatusCode), urlContext)
|
||||
printErr := true
|
||||
// Retry 403 error
|
||||
if res.StatusCode == 403 && t.retries > 0 {
|
||||
c.workers.AddTask(t)
|
||||
printErr = false
|
||||
}
|
||||
if printErr {
|
||||
c.Errors <- errors.Wrap(fmt.Errorf("Status code is %v, ", res.StatusCode), t.urlContext)
|
||||
}
|
||||
return
|
||||
}
|
||||
data, err := ioutil.ReadAll(io.LimitReader(res.Body, int64(c.sizeLimit+1)))
|
||||
if err != nil {
|
||||
c.Errors <- errors.Wrap(err, urlContext)
|
||||
c.Errors <- errors.Wrap(err, t.urlContext)
|
||||
return
|
||||
}
|
||||
if len(data) > c.sizeLimit {
|
||||
c.Errors <- errors.Wrap(errors.New("Maximum size exceeded"), urlContext)
|
||||
c.Errors <- errors.Wrap(errors.New("Maximum size exceeded"), t.urlContext)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -108,36 +117,37 @@ func (c *cacher) cacheURL(requestURL string, sessionID uint64, depth byte, urlCo
|
|||
|
||||
strData := string(data)
|
||||
if isCSS {
|
||||
strData = c.rewriter.RewriteCSS(sessionID, requestURL, strData) // TODO: one method for rewrite and return list
|
||||
strData = c.rewriter.RewriteCSS(t.sessionID, t.requestURL, strData) // TODO: one method for rewrite and return list
|
||||
}
|
||||
|
||||
// TODO: implement in streams
|
||||
err = c.s3.Upload(strings.NewReader(strData), cachePath, contentType, false)
|
||||
err = c.s3.Upload(strings.NewReader(strData), t.cachePath, contentType, false)
|
||||
if err != nil {
|
||||
c.Errors <- errors.Wrap(err, urlContext)
|
||||
c.Errors <- errors.Wrap(err, t.urlContext)
|
||||
return
|
||||
}
|
||||
c.downloadedAssets.Add(context.Background(), 1)
|
||||
|
||||
if isCSS {
|
||||
if depth > 0 {
|
||||
if t.depth > 0 {
|
||||
for _, extractedURL := range assets.ExtractURLsFromCSS(string(data)) {
|
||||
if fullURL, cachable := assets.GetFullCachableURL(requestURL, extractedURL); cachable {
|
||||
if fullURL, cachable := assets.GetFullCachableURL(t.requestURL, extractedURL); cachable {
|
||||
c.checkTask(&Task{
|
||||
requestURL: fullURL,
|
||||
sessionID: sessionID,
|
||||
depth: depth - 1,
|
||||
urlContext: urlContext + "\n -> " + fullURL,
|
||||
sessionID: t.sessionID,
|
||||
depth: t.depth - 1,
|
||||
urlContext: t.urlContext + "\n -> " + fullURL,
|
||||
isJS: false,
|
||||
retries: 5,
|
||||
})
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
c.Errors <- errors.Wrap(err, urlContext)
|
||||
c.Errors <- errors.Wrap(err, t.urlContext)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
c.Errors <- errors.Wrap(errors.New("Maximum recursion cache depth exceeded"), urlContext)
|
||||
c.Errors <- errors.Wrap(errors.New("Maximum recursion cache depth exceeded"), t.urlContext)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
@ -172,6 +182,7 @@ func (c *cacher) CacheJSFile(sourceURL string) {
|
|||
depth: 0,
|
||||
urlContext: sourceURL,
|
||||
isJS: true,
|
||||
retries: 5,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -182,6 +193,7 @@ func (c *cacher) CacheURL(sessionID uint64, fullURL string) {
|
|||
depth: MAX_CACHE_DEPTH,
|
||||
urlContext: fullURL,
|
||||
isJS: false,
|
||||
retries: 5,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ type Task struct {
|
|||
urlContext string
|
||||
isJS bool
|
||||
cachePath string
|
||||
retries int
|
||||
}
|
||||
|
||||
type WorkerPool struct {
|
||||
|
|
@ -62,8 +63,11 @@ func (p *WorkerPool) worker() {
|
|||
}
|
||||
}
|
||||
|
||||
func (p *WorkerPool) AddTask(newTask *Task) {
|
||||
p.tasks <- newTask
|
||||
func (p *WorkerPool) AddTask(task *Task) {
|
||||
if task.retries <= 0 {
|
||||
return
|
||||
}
|
||||
p.tasks <- task
|
||||
}
|
||||
|
||||
func (p *WorkerPool) Stop() {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue