Merge remote-tracking branch 'origin/dev' into api-v1.5.5

This commit is contained in:
Taha Yassine Kraiem 2022-04-06 16:48:58 +02:00
commit 8a1a34fbf2
3 changed files with 31 additions and 30 deletions

View file

@ -1,31 +1,31 @@
package cacher
import (
"crypto/tls"
"fmt"
"io"
"io/ioutil"
"mime"
"net/http"
"crypto/tls"
"path/filepath"
"strings"
"time"
"github.com/pkg/errors"
"openreplay/backend/pkg/url/assets"
"openreplay/backend/pkg/storage"
"openreplay/backend/pkg/url/assets"
)
const MAX_CACHE_DEPTH = 5
type cacher struct {
timeoutMap *timeoutMap // Concurrency implemented
s3 *storage.S3 // AWS Docs: "These clients are safe to use concurrently."
httpClient *http.Client // Docs: "Clients are safe for concurrent use by multiple goroutines."
rewriter *assets.Rewriter // Read only
Errors chan error
sizeLimit int
timeoutMap *timeoutMap // Concurrency implemented
s3 *storage.S3 // AWS Docs: "These clients are safe to use concurrently."
httpClient *http.Client // Docs: "Clients are safe for concurrent use by multiple goroutines."
rewriter *assets.Rewriter // Read only
Errors chan error
sizeLimit int
}
func NewCacher(region string, bucket string, origin string, sizeLimit int) *cacher {
@ -36,26 +36,26 @@ func NewCacher(region string, bucket string, origin string, sizeLimit int) *cach
httpClient: &http.Client{
Timeout: time.Duration(6) * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
},
rewriter: rewriter,
Errors: make(chan error),
sizeLimit: sizeLimit,
rewriter: rewriter,
Errors: make(chan error),
sizeLimit: sizeLimit,
}
}
func (c *cacher) cacheURL(requestURL string, sessionID uint64, depth byte, context string, isJS bool) {
if c.timeoutMap.contains(requestURL) {
return
}
c.timeoutMap.add(requestURL)
var cachePath string
if (isJS) {
if isJS {
cachePath = assets.GetCachePathForJS(requestURL)
} else {
cachePath = assets.GetCachePathForAssets(sessionID, requestURL)
}
if c.timeoutMap.contains(cachePath) {
return
}
c.timeoutMap.add(cachePath)
if c.s3.Exists(cachePath) {
return
}
@ -94,20 +94,19 @@ func (c *cacher) cacheURL(requestURL string, sessionID uint64, depth byte, conte
if isCSS {
strData = c.rewriter.RewriteCSS(sessionID, requestURL, strData) // TODO: one method for reqrite and return list
}
// TODO: implement in streams
// TODO: implement in streams
err = c.s3.Upload(strings.NewReader(strData), cachePath, contentType, false)
if err != nil {
c.Errors <- errors.Wrap(err, context)
return
}
c.timeoutMap.add(requestURL)
if isCSS {
if depth > 0 {
for _, extractedURL := range assets.ExtractURLsFromCSS(string(data)) {
if fullURL, cachable := assets.GetFullCachableURL(requestURL, extractedURL); cachable {
go c.cacheURL(fullURL, sessionID, depth-1, context + "\n -> " + fullURL, false)
if fullURL, cachable := assets.GetFullCachableURL(requestURL, extractedURL); cachable {
go c.cacheURL(fullURL, sessionID, depth-1, context+"\n -> "+fullURL, false)
}
}
if err != nil {

View file

@ -5,30 +5,30 @@ import (
"time"
)
const MAX_STORAGE_TIME = 18 * time.Hour
const MAX_STORAGE_TIME = 24 * time.Hour
// If problem with cache contention (>=4 core) look at sync.Map
type timeoutMap struct {
mx sync.RWMutex
m map[string]time.Time
m map[string]time.Time
}
func newTimeoutMap() *timeoutMap {
return &timeoutMap{
m: make(map[string]time.Time),
}
}
}
func (tm *timeoutMap) add(key string) {
tm.mx.Lock()
defer tm.mx.Unlock()
defer tm.mx.Unlock()
tm.m[key] = time.Now()
}
func (tm *timeoutMap) contains(key string) bool {
tm.mx.RLock()
defer tm.mx.RUnlock()
defer tm.mx.RUnlock()
_, ok := tm.m[key]
return ok
}
@ -36,7 +36,7 @@ func (tm *timeoutMap) contains(key string) bool {
func (tm *timeoutMap) deleteOutdated() {
now := time.Now()
tm.mx.Lock()
defer tm.mx.Unlock()
defer tm.mx.Unlock()
for key, t := range tm.m {
if now.Sub(t) > MAX_STORAGE_TIME {
delete(tm.m, key)

View file

@ -12,6 +12,7 @@ const AGENT_DISCONNECT = "AGENT_DISCONNECTED";
const AGENTS_CONNECTED = "AGENTS_CONNECTED";
const NO_SESSIONS = "SESSION_DISCONNECTED";
const SESSION_ALREADY_CONNECTED = "SESSION_ALREADY_CONNECTED";
const SESSION_RECONNECTED = "SESSION_RECONNECTED"
let io;
const debug = process.env.debug === "1" || false;
@ -258,6 +259,7 @@ module.exports = {
debug && console.log(`notifying new session about agent-existence`);
let agents_ids = await get_all_agents_ids(io, socket);
io.to(socket.id).emit(AGENTS_CONNECTED, agents_ids);
socket.to(socket.peerId).emit(SESSION_RECONNECTED, socket.id);
}
} else if (c_sessions <= 0) {