feat(backend): added cache cleaner for assets in sink

This commit is contained in:
Alexander Zavorotynskiy 2022-11-04 10:38:25 +01:00
parent b129b526f3
commit 1df54774ad

View file

@ -12,6 +12,7 @@ import (
"openreplay/backend/pkg/monitoring"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/url/assets"
"sync"
"time"
)
@ -21,6 +22,7 @@ type CachedAsset struct {
}
type AssetsCache struct {
mutex sync.RWMutex
cfg *sink.Config
rewriter *assets.Rewriter
producer types.Producer
@ -54,7 +56,7 @@ func New(cfg *sink.Config, rewriter *assets.Rewriter, producer types.Producer, m
if err != nil {
log.Printf("can't create asset_duration metric: %s", err)
}
return &AssetsCache{
assetsCache := &AssetsCache{
cfg: cfg,
rewriter: rewriter,
producer: producer,
@ -65,6 +67,35 @@ func New(cfg *sink.Config, rewriter *assets.Rewriter, producer types.Producer, m
assetSize: assetSize,
assetDuration: assetDuration,
}
go assetsCache.cleaner()
return assetsCache
}
func (e *AssetsCache) cleaner() {
cleanTick := time.Tick(time.Minute * 30)
for {
select {
case <-cleanTick:
e.clearCache()
}
}
}
func (e *AssetsCache) clearCache() {
e.mutex.Lock()
defer e.mutex.Unlock()
now := time.Now()
cacheSize := len(e.cache)
deleted := 0
for id, cache := range e.cache {
if int64(now.Sub(cache.ts).Minutes()) > e.cfg.CacheExpiration {
deleted++
delete(e.cache, id)
}
}
log.Printf("cache cleaner: deleted %d/%d assets", deleted, cacheSize)
}
func (e *AssetsCache) ParseAssets(msg messages.Message) messages.Message {
@ -169,7 +200,10 @@ func (e *AssetsCache) handleCSS(sessionID uint64, baseURL string, css string) st
io.WriteString(h, css)
hash := string(h.Sum(nil))
// Check the resulting hash in cache
if cachedAsset, ok := e.cache[hash]; ok {
e.mutex.RLock()
cachedAsset, ok := e.cache[hash]
e.mutex.RUnlock()
if ok {
if int64(time.Now().Sub(cachedAsset.ts).Minutes()) < e.cfg.CacheExpiration {
e.skippedAssets.Add(ctx, 1)
return cachedAsset.msg
@ -187,10 +221,12 @@ func (e *AssetsCache) handleCSS(sessionID uint64, baseURL string, css string) st
e.assetDuration.Record(ctx, float64(duration))
// Save asset to cache if we spent more than threshold
if duration > e.cfg.CacheThreshold {
e.mutex.Lock()
e.cache[hash] = &CachedAsset{
msg: res,
ts: time.Now(),
}
e.mutex.Unlock()
e.cachedAssets.Add(ctx, 1)
}
// Return rewritten asset