fix(ackend-assets): use cachePath in timeout map instead of original url
This commit is contained in:
parent
e75f7ddb4f
commit
011cbd006b
2 changed files with 29 additions and 30 deletions
|
|
@ -1,31 +1,31 @@
|
|||
package cacher
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"mime"
|
||||
"net/http"
|
||||
"crypto/tls"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"openreplay/backend/pkg/url/assets"
|
||||
|
||||
"openreplay/backend/pkg/storage"
|
||||
"openreplay/backend/pkg/url/assets"
|
||||
)
|
||||
|
||||
const MAX_CACHE_DEPTH = 5
|
||||
|
||||
type cacher struct {
|
||||
timeoutMap *timeoutMap // Concurrency implemented
|
||||
s3 *storage.S3 // AWS Docs: "These clients are safe to use concurrently."
|
||||
httpClient *http.Client // Docs: "Clients are safe for concurrent use by multiple goroutines."
|
||||
rewriter *assets.Rewriter // Read only
|
||||
Errors chan error
|
||||
sizeLimit int
|
||||
timeoutMap *timeoutMap // Concurrency implemented
|
||||
s3 *storage.S3 // AWS Docs: "These clients are safe to use concurrently."
|
||||
httpClient *http.Client // Docs: "Clients are safe for concurrent use by multiple goroutines."
|
||||
rewriter *assets.Rewriter // Read only
|
||||
Errors chan error
|
||||
sizeLimit int
|
||||
}
|
||||
|
||||
func NewCacher(region string, bucket string, origin string, sizeLimit int) *cacher {
|
||||
|
|
@ -36,26 +36,26 @@ func NewCacher(region string, bucket string, origin string, sizeLimit int) *cach
|
|||
httpClient: &http.Client{
|
||||
Timeout: time.Duration(6) * time.Second,
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
||||
},
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
||||
},
|
||||
},
|
||||
rewriter: rewriter,
|
||||
Errors: make(chan error),
|
||||
sizeLimit: sizeLimit,
|
||||
rewriter: rewriter,
|
||||
Errors: make(chan error),
|
||||
sizeLimit: sizeLimit,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cacher) cacheURL(requestURL string, sessionID uint64, depth byte, context string, isJS bool) {
|
||||
if c.timeoutMap.contains(requestURL) {
|
||||
return
|
||||
}
|
||||
c.timeoutMap.add(requestURL)
|
||||
var cachePath string
|
||||
if (isJS) {
|
||||
if isJS {
|
||||
cachePath = assets.GetCachePathForJS(requestURL)
|
||||
} else {
|
||||
cachePath = assets.GetCachePathForAssets(sessionID, requestURL)
|
||||
}
|
||||
if c.timeoutMap.contains(cachePath) {
|
||||
return
|
||||
}
|
||||
c.timeoutMap.add(cachePath)
|
||||
if c.s3.Exists(cachePath) {
|
||||
return
|
||||
}
|
||||
|
|
@ -94,20 +94,19 @@ func (c *cacher) cacheURL(requestURL string, sessionID uint64, depth byte, conte
|
|||
if isCSS {
|
||||
strData = c.rewriter.RewriteCSS(sessionID, requestURL, strData) // TODO: one method for reqrite and return list
|
||||
}
|
||||
|
||||
// TODO: implement in streams
|
||||
|
||||
// TODO: implement in streams
|
||||
err = c.s3.Upload(strings.NewReader(strData), cachePath, contentType, false)
|
||||
if err != nil {
|
||||
c.Errors <- errors.Wrap(err, context)
|
||||
return
|
||||
}
|
||||
c.timeoutMap.add(requestURL)
|
||||
|
||||
if isCSS {
|
||||
if depth > 0 {
|
||||
for _, extractedURL := range assets.ExtractURLsFromCSS(string(data)) {
|
||||
if fullURL, cachable := assets.GetFullCachableURL(requestURL, extractedURL); cachable {
|
||||
go c.cacheURL(fullURL, sessionID, depth-1, context + "\n -> " + fullURL, false)
|
||||
if fullURL, cachable := assets.GetFullCachableURL(requestURL, extractedURL); cachable {
|
||||
go c.cacheURL(fullURL, sessionID, depth-1, context+"\n -> "+fullURL, false)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -5,30 +5,30 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
const MAX_STORAGE_TIME = 18 * time.Hour
|
||||
const MAX_STORAGE_TIME = 24 * time.Hour
|
||||
|
||||
// If problem with cache contention (>=4 core) look at sync.Map
|
||||
|
||||
type timeoutMap struct {
|
||||
mx sync.RWMutex
|
||||
m map[string]time.Time
|
||||
m map[string]time.Time
|
||||
}
|
||||
|
||||
func newTimeoutMap() *timeoutMap {
|
||||
return &timeoutMap{
|
||||
m: make(map[string]time.Time),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (tm *timeoutMap) add(key string) {
|
||||
tm.mx.Lock()
|
||||
defer tm.mx.Unlock()
|
||||
defer tm.mx.Unlock()
|
||||
tm.m[key] = time.Now()
|
||||
}
|
||||
|
||||
func (tm *timeoutMap) contains(key string) bool {
|
||||
tm.mx.RLock()
|
||||
defer tm.mx.RUnlock()
|
||||
defer tm.mx.RUnlock()
|
||||
_, ok := tm.m[key]
|
||||
return ok
|
||||
}
|
||||
|
|
@ -36,7 +36,7 @@ func (tm *timeoutMap) contains(key string) bool {
|
|||
func (tm *timeoutMap) deleteOutdated() {
|
||||
now := time.Now()
|
||||
tm.mx.Lock()
|
||||
defer tm.mx.Unlock()
|
||||
defer tm.mx.Unlock()
|
||||
for key, t := range tm.m {
|
||||
if now.Sub(t) > MAX_STORAGE_TIME {
|
||||
delete(tm.m, key)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue