feat(backend/assets): added new metrics assets_downloaded
This commit is contained in:
parent
e3ead3ebb1
commit
f065e8faad
2 changed files with 34 additions and 31 deletions
|
|
@ -29,6 +29,7 @@ func main() {
|
|||
cfg.S3BucketAssets,
|
||||
cfg.AssetsOrigin,
|
||||
cfg.AssetsSizeLimit,
|
||||
metrics,
|
||||
)
|
||||
|
||||
totalAssets, err := metrics.RegisterCounter("assets_total")
|
||||
|
|
|
|||
|
|
@ -1,12 +1,16 @@
|
|||
package cacher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"mime"
|
||||
"net/http"
|
||||
"openreplay/backend/pkg/monitoring"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
|
@ -20,16 +24,24 @@ import (
|
|||
const MAX_CACHE_DEPTH = 5
|
||||
|
||||
type cacher struct {
|
||||
timeoutMap *timeoutMap // Concurrency implemented
|
||||
s3 *storage.S3 // AWS Docs: "These clients are safe to use concurrently."
|
||||
httpClient *http.Client // Docs: "Clients are safe for concurrent use by multiple goroutines."
|
||||
rewriter *assets.Rewriter // Read only
|
||||
Errors chan error
|
||||
sizeLimit int
|
||||
timeoutMap *timeoutMap // Concurrency implemented
|
||||
s3 *storage.S3 // AWS Docs: "These clients are safe to use concurrently."
|
||||
httpClient *http.Client // Docs: "Clients are safe for concurrent use by multiple goroutines."
|
||||
rewriter *assets.Rewriter // Read only
|
||||
Errors chan error
|
||||
sizeLimit int
|
||||
downloadedAssets syncfloat64.Counter
|
||||
}
|
||||
|
||||
func NewCacher(region string, bucket string, origin string, sizeLimit int) *cacher {
|
||||
func NewCacher(region string, bucket string, origin string, sizeLimit int, metrics *monitoring.Metrics) *cacher {
|
||||
rewriter := assets.NewRewriter(origin)
|
||||
if metrics == nil {
|
||||
log.Fatalf("metrics are empty")
|
||||
}
|
||||
downloadedAssets, err := metrics.RegisterCounter("assets_downloaded")
|
||||
if err != nil {
|
||||
log.Printf("can't create downloaded_assets metric: %s", err)
|
||||
}
|
||||
return &cacher{
|
||||
timeoutMap: newTimeoutMap(),
|
||||
s3: storage.NewS3(region, bucket),
|
||||
|
|
@ -39,13 +51,14 @@ func NewCacher(region string, bucket string, origin string, sizeLimit int) *cach
|
|||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
||||
},
|
||||
},
|
||||
rewriter: rewriter,
|
||||
Errors: make(chan error),
|
||||
sizeLimit: sizeLimit,
|
||||
rewriter: rewriter,
|
||||
Errors: make(chan error),
|
||||
sizeLimit: sizeLimit,
|
||||
downloadedAssets: downloadedAssets,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cacher) cacheURL(requestURL string, sessionID uint64, depth byte, context string, isJS bool) {
|
||||
func (c *cacher) cacheURL(requestURL string, sessionID uint64, depth byte, urlContext string, isJS bool) {
|
||||
var cachePath string
|
||||
if isJS {
|
||||
cachePath = assets.GetCachePathForJS(requestURL)
|
||||
|
|
@ -65,22 +78,22 @@ func (c *cacher) cacheURL(requestURL string, sessionID uint64, depth byte, conte
|
|||
req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20100101 Firefox/31.0")
|
||||
res, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
c.Errors <- errors.Wrap(err, context)
|
||||
c.Errors <- errors.Wrap(err, urlContext)
|
||||
return
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode != 200 {
|
||||
// TODO: retry
|
||||
c.Errors <- errors.Wrap(fmt.Errorf("Status code is %v, ", res.StatusCode), context)
|
||||
c.Errors <- errors.Wrap(fmt.Errorf("Status code is %v, ", res.StatusCode), urlContext)
|
||||
return
|
||||
}
|
||||
data, err := ioutil.ReadAll(io.LimitReader(res.Body, int64(c.sizeLimit+1)))
|
||||
if err != nil {
|
||||
c.Errors <- errors.Wrap(err, context)
|
||||
c.Errors <- errors.Wrap(err, urlContext)
|
||||
return
|
||||
}
|
||||
if len(data) > c.sizeLimit {
|
||||
c.Errors <- errors.Wrap(errors.New("Maximum size exceeded"), context)
|
||||
c.Errors <- errors.Wrap(errors.New("Maximum size exceeded"), urlContext)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -98,23 +111,24 @@ func (c *cacher) cacheURL(requestURL string, sessionID uint64, depth byte, conte
|
|||
// TODO: implement in streams
|
||||
err = c.s3.Upload(strings.NewReader(strData), cachePath, contentType, false)
|
||||
if err != nil {
|
||||
c.Errors <- errors.Wrap(err, context)
|
||||
c.Errors <- errors.Wrap(err, urlContext)
|
||||
return
|
||||
}
|
||||
c.downloadedAssets.Add(context.Background(), 1)
|
||||
|
||||
if isCSS {
|
||||
if depth > 0 {
|
||||
for _, extractedURL := range assets.ExtractURLsFromCSS(string(data)) {
|
||||
if fullURL, cachable := assets.GetFullCachableURL(requestURL, extractedURL); cachable {
|
||||
go c.cacheURL(fullURL, sessionID, depth-1, context+"\n -> "+fullURL, false)
|
||||
go c.cacheURL(fullURL, sessionID, depth-1, urlContext+"\n -> "+fullURL, false)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
c.Errors <- errors.Wrap(err, context)
|
||||
c.Errors <- errors.Wrap(err, urlContext)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
c.Errors <- errors.Wrap(errors.New("Maximum recursion cache depth exceeded"), context)
|
||||
c.Errors <- errors.Wrap(errors.New("Maximum recursion cache depth exceeded"), urlContext)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
@ -129,18 +143,6 @@ func (c *cacher) CacheURL(sessionID uint64, fullURL string) {
|
|||
go c.cacheURL(fullURL, sessionID, MAX_CACHE_DEPTH, fullURL, false)
|
||||
}
|
||||
|
||||
// func (c *cacher) CacheURL(sessionID uint64, baseURL string, relativeURL string) {
|
||||
// if fullURL, cachable := assets.GetFullCachableURL(baseURL, relativeURL); cachable {
|
||||
// c.CacheURL(sessionID, fullURL)
|
||||
// }
|
||||
// }
|
||||
|
||||
// func (c *cacher) CacheCSSLinks(baseURL string, css string, sessionID uint64) {
|
||||
// for _, extractedURL := range assets.ExtractURLsFromCSS(css) {
|
||||
// c.CacheURL(sessionID, baseURL, extractedURL)
|
||||
// }
|
||||
// }
|
||||
|
||||
func (c *cacher) UpdateTimeouts() {
|
||||
c.timeoutMap.deleteOutdated()
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue