From f065e8faade73a89d037e9e7ea09480ab5f37546 Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Thu, 30 Jun 2022 15:25:42 +0200 Subject: [PATCH] feat(backend/assets): added new metrics assets_downloaded --- backend/cmd/assets/main.go | 1 + backend/internal/assets/cacher/cacher.go | 64 ++++++++++++------------ 2 files changed, 34 insertions(+), 31 deletions(-) diff --git a/backend/cmd/assets/main.go b/backend/cmd/assets/main.go index c7b612fc8..5627d0be8 100644 --- a/backend/cmd/assets/main.go +++ b/backend/cmd/assets/main.go @@ -29,6 +29,7 @@ func main() { cfg.S3BucketAssets, cfg.AssetsOrigin, cfg.AssetsSizeLimit, + metrics, ) totalAssets, err := metrics.RegisterCounter("assets_total") diff --git a/backend/internal/assets/cacher/cacher.go b/backend/internal/assets/cacher/cacher.go index 70ea31928..fe8cc7ef8 100644 --- a/backend/internal/assets/cacher/cacher.go +++ b/backend/internal/assets/cacher/cacher.go @@ -1,12 +1,16 @@ package cacher import ( + "context" "crypto/tls" "fmt" + "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "io" "io/ioutil" + "log" "mime" "net/http" + "openreplay/backend/pkg/monitoring" "path/filepath" "strings" "time" @@ -20,16 +24,24 @@ import ( const MAX_CACHE_DEPTH = 5 type cacher struct { - timeoutMap *timeoutMap // Concurrency implemented - s3 *storage.S3 // AWS Docs: "These clients are safe to use concurrently." - httpClient *http.Client // Docs: "Clients are safe for concurrent use by multiple goroutines." - rewriter *assets.Rewriter // Read only - Errors chan error - sizeLimit int + timeoutMap *timeoutMap // Concurrency implemented + s3 *storage.S3 // AWS Docs: "These clients are safe to use concurrently." + httpClient *http.Client // Docs: "Clients are safe for concurrent use by multiple goroutines." + rewriter *assets.Rewriter // Read only + Errors chan error + sizeLimit int + downloadedAssets syncfloat64.Counter } -func NewCacher(region string, bucket string, origin string, sizeLimit int) *cacher { +func NewCacher(region string, bucket string, origin string, sizeLimit int, metrics *monitoring.Metrics) *cacher { rewriter := assets.NewRewriter(origin) + if metrics == nil { + log.Fatalf("metrics are empty") + } + downloadedAssets, err := metrics.RegisterCounter("assets_downloaded") + if err != nil { + log.Printf("can't create downloaded_assets metric: %s", err) + } return &cacher{ timeoutMap: newTimeoutMap(), s3: storage.NewS3(region, bucket), @@ -39,13 +51,14 @@ func NewCacher(region string, bucket string, origin string, sizeLimit int) *cach TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, }, }, - rewriter: rewriter, - Errors: make(chan error), - sizeLimit: sizeLimit, + rewriter: rewriter, + Errors: make(chan error), + sizeLimit: sizeLimit, + downloadedAssets: downloadedAssets, } } -func (c *cacher) cacheURL(requestURL string, sessionID uint64, depth byte, context string, isJS bool) { +func (c *cacher) cacheURL(requestURL string, sessionID uint64, depth byte, urlContext string, isJS bool) { var cachePath string if isJS { cachePath = assets.GetCachePathForJS(requestURL) @@ -65,22 +78,22 @@ func (c *cacher) cacheURL(requestURL string, sessionID uint64, depth byte, conte req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20100101 Firefox/31.0") res, err := c.httpClient.Do(req) if err != nil { - c.Errors <- errors.Wrap(err, context) + c.Errors <- errors.Wrap(err, urlContext) return } defer res.Body.Close() if res.StatusCode != 200 { // TODO: retry - c.Errors <- errors.Wrap(fmt.Errorf("Status code is %v, ", res.StatusCode), context) + c.Errors <- errors.Wrap(fmt.Errorf("Status code is %v, ", res.StatusCode), urlContext) return } data, err := ioutil.ReadAll(io.LimitReader(res.Body, int64(c.sizeLimit+1))) if err != nil { - c.Errors <- errors.Wrap(err, context) + c.Errors <- errors.Wrap(err, urlContext) return } if len(data) > c.sizeLimit { - c.Errors <- errors.Wrap(errors.New("Maximum size exceeded"), context) + c.Errors <- errors.Wrap(errors.New("Maximum size exceeded"), urlContext) return } @@ -98,23 +111,24 @@ func (c *cacher) cacheURL(requestURL string, sessionID uint64, depth byte, conte // TODO: implement in streams err = c.s3.Upload(strings.NewReader(strData), cachePath, contentType, false) if err != nil { - c.Errors <- errors.Wrap(err, context) + c.Errors <- errors.Wrap(err, urlContext) return } + c.downloadedAssets.Add(context.Background(), 1) if isCSS { if depth > 0 { for _, extractedURL := range assets.ExtractURLsFromCSS(string(data)) { if fullURL, cachable := assets.GetFullCachableURL(requestURL, extractedURL); cachable { - go c.cacheURL(fullURL, sessionID, depth-1, context+"\n -> "+fullURL, false) + go c.cacheURL(fullURL, sessionID, depth-1, urlContext+"\n -> "+fullURL, false) } } if err != nil { - c.Errors <- errors.Wrap(err, context) + c.Errors <- errors.Wrap(err, urlContext) return } } else { - c.Errors <- errors.Wrap(errors.New("Maximum recursion cache depth exceeded"), context) + c.Errors <- errors.Wrap(errors.New("Maximum recursion cache depth exceeded"), urlContext) return } } @@ -129,18 +143,6 @@ func (c *cacher) CacheURL(sessionID uint64, fullURL string) { go c.cacheURL(fullURL, sessionID, MAX_CACHE_DEPTH, fullURL, false) } -// func (c *cacher) CacheURL(sessionID uint64, baseURL string, relativeURL string) { -// if fullURL, cachable := assets.GetFullCachableURL(baseURL, relativeURL); cachable { -// c.CacheURL(sessionID, fullURL) -// } -// } - -// func (c *cacher) CacheCSSLinks(baseURL string, css string, sessionID uint64) { -// for _, extractedURL := range assets.ExtractURLsFromCSS(css) { -// c.CacheURL(sessionID, baseURL, extractedURL) -// } -// } - func (c *cacher) UpdateTimeouts() { c.timeoutMap.deleteOutdated() }