diff --git a/backend/internal/sink/assetscache/assets.go b/backend/internal/sink/assetscache/assets.go index 96f225a45..aa2dccbba 100644 --- a/backend/internal/sink/assetscache/assets.go +++ b/backend/internal/sink/assetscache/assets.go @@ -12,6 +12,7 @@ import ( "openreplay/backend/pkg/monitoring" "openreplay/backend/pkg/queue/types" "openreplay/backend/pkg/url/assets" + "sync" "time" ) @@ -21,6 +22,7 @@ type CachedAsset struct { } type AssetsCache struct { + mutex sync.RWMutex cfg *sink.Config rewriter *assets.Rewriter producer types.Producer @@ -54,7 +56,7 @@ func New(cfg *sink.Config, rewriter *assets.Rewriter, producer types.Producer, m if err != nil { log.Printf("can't create asset_duration metric: %s", err) } - return &AssetsCache{ + assetsCache := &AssetsCache{ cfg: cfg, rewriter: rewriter, producer: producer, @@ -65,6 +67,35 @@ func New(cfg *sink.Config, rewriter *assets.Rewriter, producer types.Producer, m assetSize: assetSize, assetDuration: assetDuration, } + go assetsCache.cleaner() + return assetsCache +} + +func (e *AssetsCache) cleaner() { + cleanTick := time.Tick(time.Minute * 30) + for { + select { + case <-cleanTick: + e.clearCache() + } + } +} + +func (e *AssetsCache) clearCache() { + e.mutex.Lock() + defer e.mutex.Unlock() + + now := time.Now() + cacheSize := len(e.cache) + deleted := 0 + + for id, cache := range e.cache { + if int64(now.Sub(cache.ts).Minutes()) > e.cfg.CacheExpiration { + deleted++ + delete(e.cache, id) + } + } + log.Printf("cache cleaner: deleted %d/%d assets", deleted, cacheSize) } func (e *AssetsCache) ParseAssets(msg messages.Message) messages.Message { @@ -169,7 +200,10 @@ func (e *AssetsCache) handleCSS(sessionID uint64, baseURL string, css string) st io.WriteString(h, css) hash := string(h.Sum(nil)) // Check the resulting hash in cache - if cachedAsset, ok := e.cache[hash]; ok { + e.mutex.RLock() + cachedAsset, ok := e.cache[hash] + e.mutex.RUnlock() + if ok { if int64(time.Now().Sub(cachedAsset.ts).Minutes()) < e.cfg.CacheExpiration { e.skippedAssets.Add(ctx, 1) return cachedAsset.msg @@ -187,10 +221,12 @@ func (e *AssetsCache) handleCSS(sessionID uint64, baseURL string, css string) st e.assetDuration.Record(ctx, float64(duration)) // Save asset to cache if we spent more than threshold if duration > e.cfg.CacheThreshold { + e.mutex.Lock() e.cache[hash] = &CachedAsset{ msg: res, ts: time.Now(), } + e.mutex.Unlock() e.cachedAssets.Add(ctx, 1) } // Return rewritten asset