feat(backend/sink): added cache layer for assets url rewriter (#785)

This commit is contained in:
Alexander 2022-10-25 12:16:45 +02:00 committed by GitHub
parent 0318d45455
commit c4f0252677
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 170 additions and 34 deletions

View file

@ -3,6 +3,7 @@ package main
import (
"context"
"log"
"openreplay/backend/pkg/pprof"
"os"
"os/signal"
"syscall"
@ -19,6 +20,8 @@ import (
)
func main() {
pprof.StartProfilingServer()
metrics := monitoring.New("sink")
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
@ -34,9 +37,10 @@ func main() {
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
defer producer.Close(cfg.ProducerCloseTimeout)
rewriter := assets.NewRewriter(cfg.AssetsOrigin)
assetMessageHandler := assetscache.New(cfg, rewriter, producer)
assetMessageHandler := assetscache.New(cfg, rewriter, producer, metrics)
counter := storage.NewLogCounter()
// Session message metrics
totalMessages, err := metrics.RegisterCounter("messages_total")
if err != nil {
log.Printf("can't create messages_total metric: %s", err)
@ -68,7 +72,12 @@ func main() {
msg.TypeID() == messages.MsgCSSInsertRuleURLBased ||
msg.TypeID() == messages.MsgAdoptedSSReplaceURLBased ||
msg.TypeID() == messages.MsgAdoptedSSInsertRuleURLBased {
msg = assetMessageHandler.ParseAssets(msg.Decode()) // TODO: filter type only once (use iterator inside or bring ParseAssets out here).
m := msg.Decode()
if m == nil {
log.Printf("assets decode err, info: %s", msg.Meta().Batch().Info())
return
}
msg = assetMessageHandler.ParseAssets(m)
}
// Filter message
@ -87,15 +96,19 @@ func main() {
// Write encoded message with index to session file
data := msg.EncodeWithIndex()
if data == nil {
log.Printf("can't encode with index, err: %s", err)
return
}
if messages.IsDOMType(msg.TypeID()) {
if err := writer.WriteDOM(msg.SessionID(), data); err != nil {
log.Printf("DOM Writer error: %v\n", err)
log.Printf("DOM Writer error: %s, info: %s", err, msg.Meta().Batch().Info())
}
}
if !messages.IsDOMType(msg.TypeID()) || msg.TypeID() == messages.MsgTimestamp {
// TODO: write only necessary timestamps
if err := writer.WriteDEV(msg.SessionID(), data); err != nil {
log.Printf("Devtools Writer error: %v\n", err)
log.Printf("Devtools Writer error: %s, info: %s", err, msg.Meta().Batch().Info())
}
}
@ -146,5 +159,4 @@ func main() {
}
}
}
}

View file

@ -17,6 +17,8 @@ type Config struct {
CacheAssets bool `env:"CACHE_ASSETS,required"`
AssetsOrigin string `env:"ASSETS_ORIGIN,required"`
ProducerCloseTimeout int `env:"PRODUCER_CLOSE_TIMEOUT,default=15000"`
CacheThreshold int64 `env:"CACHE_THRESHOLD,default=75"`
CacheExpiration int64 `env:"CACHE_EXPIRATION,default=120"`
}
func New() *Config {

View file

@ -1,24 +1,69 @@
package assetscache
import (
"context"
"crypto/md5"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"io"
"log"
"net/url"
"openreplay/backend/internal/config/sink"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/monitoring"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/url/assets"
"time"
)
type AssetsCache struct {
cfg *sink.Config
rewriter *assets.Rewriter
producer types.Producer
type CachedAsset struct {
msg string
ts time.Time
}
func New(cfg *sink.Config, rewriter *assets.Rewriter, producer types.Producer) *AssetsCache {
type AssetsCache struct {
cfg *sink.Config
rewriter *assets.Rewriter
producer types.Producer
cache map[string]*CachedAsset
totalAssets syncfloat64.Counter
cachedAssets syncfloat64.Counter
skippedAssets syncfloat64.Counter
assetSize syncfloat64.Histogram
assetDuration syncfloat64.Histogram
}
func New(cfg *sink.Config, rewriter *assets.Rewriter, producer types.Producer, metrics *monitoring.Metrics) *AssetsCache {
// Assets metrics
totalAssets, err := metrics.RegisterCounter("assets_total")
if err != nil {
log.Printf("can't create assets_total metric: %s", err)
}
cachedAssets, err := metrics.RegisterCounter("assets_cached")
if err != nil {
log.Printf("can't create assets_cached metric: %s", err)
}
skippedAssets, err := metrics.RegisterCounter("assets_skipped")
if err != nil {
log.Printf("can't create assets_skipped metric: %s", err)
}
assetSize, err := metrics.RegisterHistogram("asset_size")
if err != nil {
log.Printf("can't create asset_size metric: %s", err)
}
assetDuration, err := metrics.RegisterHistogram("asset_duration")
if err != nil {
log.Printf("can't create asset_duration metric: %s", err)
}
return &AssetsCache{
cfg: cfg,
rewriter: rewriter,
producer: producer,
cfg: cfg,
rewriter: rewriter,
producer: producer,
cache: make(map[string]*CachedAsset, 64),
totalAssets: totalAssets,
cachedAssets: cachedAssets,
skippedAssets: skippedAssets,
assetSize: assetSize,
assetDuration: assetDuration,
}
}
@ -95,18 +140,67 @@ func (e *AssetsCache) sendAssetsForCacheFromCSS(sessionID uint64, baseURL string
}
}
func (e *AssetsCache) handleURL(sessionID uint64, baseURL string, url string) string {
func (e *AssetsCache) handleURL(sessionID uint64, baseURL string, urlVal string) string {
if e.cfg.CacheAssets {
e.sendAssetForCache(sessionID, baseURL, url)
return e.rewriter.RewriteURL(sessionID, baseURL, url)
e.sendAssetForCache(sessionID, baseURL, urlVal)
return e.rewriter.RewriteURL(sessionID, baseURL, urlVal)
} else {
return assets.ResolveURL(baseURL, urlVal)
}
return assets.ResolveURL(baseURL, url)
}
func (e *AssetsCache) handleCSS(sessionID uint64, baseURL string, css string) string {
ctx := context.Background()
e.totalAssets.Add(ctx, 1)
// Try to find asset in cache
h := md5.New()
// Cut first part of url (scheme + host)
u, err := url.Parse(baseURL)
if err != nil {
log.Printf("can't parse url: %s, err: %s", baseURL, err)
if e.cfg.CacheAssets {
e.sendAssetsForCacheFromCSS(sessionID, baseURL, css)
}
return e.getRewrittenCSS(sessionID, baseURL, css)
}
justUrl := u.Scheme + "://" + u.Host + "/"
// Calculate hash sum of url + css
io.WriteString(h, justUrl)
io.WriteString(h, css)
hash := string(h.Sum(nil))
// Check the resulting hash in cache
if cachedAsset, ok := e.cache[hash]; ok {
if int64(time.Now().Sub(cachedAsset.ts).Minutes()) < e.cfg.CacheExpiration {
e.skippedAssets.Add(ctx, 1)
return cachedAsset.msg
}
}
// Send asset to download in assets service
if e.cfg.CacheAssets {
e.sendAssetsForCacheFromCSS(sessionID, baseURL, css)
return e.rewriter.RewriteCSS(sessionID, baseURL, css)
}
return assets.ResolveCSS(baseURL, css)
// Rewrite asset
start := time.Now()
res := e.getRewrittenCSS(sessionID, baseURL, css)
duration := time.Now().Sub(start).Milliseconds()
e.assetSize.Record(ctx, float64(len(res)))
e.assetDuration.Record(ctx, float64(duration))
// Save asset to cache if we spent more than threshold
if duration > e.cfg.CacheThreshold {
e.cache[hash] = &CachedAsset{
msg: res,
ts: time.Now(),
}
e.cachedAssets.Add(ctx, 1)
}
// Return rewritten asset
return res
}
func (e *AssetsCache) getRewrittenCSS(sessionID uint64, url, css string) string {
if e.cfg.CacheAssets {
return e.rewriter.RewriteCSS(sessionID, url, css)
} else {
return assets.ResolveCSS(url, css)
}
}

View file

@ -23,6 +23,7 @@ type messageIteratorImpl struct {
version uint64
size uint64
canSkip bool
broken bool
messageInfo *message
batchInfo *BatchInfo
}
@ -47,6 +48,7 @@ func (i *messageIteratorImpl) prepareVars(batchInfo *BatchInfo) {
i.messageInfo = &message{batch: batchInfo}
i.version = 0
i.canSkip = false
i.broken = false
i.size = 0
}
@ -62,9 +64,14 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) {
// Increase message index (can be overwritten by batch info message)
i.messageInfo.Index++
if i.broken {
log.Printf("skipping broken batch, info: %s", i.batchInfo.Info())
return
}
if i.canSkip {
if _, err := reader.Seek(int64(i.size), io.SeekCurrent); err != nil {
log.Printf("seek err: %s", err)
log.Printf("can't skip message: %s, info: %s", err, i.batchInfo.Info())
return
}
}
@ -74,7 +81,7 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) {
msgType, err := ReadUint(reader)
if err != nil {
if err != io.EOF {
log.Printf("can't read message type: %s", err)
log.Printf("can't read message type: %s, info: %s", err, i.batchInfo.Info())
}
return
}
@ -85,7 +92,7 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) {
// Read message size if it is a new protocol version
i.size, err = ReadSize(reader)
if err != nil {
log.Printf("can't read message size: %s", err)
log.Printf("can't read message size: %s, info: %s", err, i.batchInfo.Info())
return
}
msg = &RawMessage{
@ -93,6 +100,7 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) {
size: i.size,
reader: reader,
skipped: &i.canSkip,
broken: &i.broken,
meta: i.messageInfo,
}
i.canSkip = true
@ -100,17 +108,18 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) {
msg, err = ReadMessage(msgType, reader)
if err != nil {
if err != io.EOF {
log.Printf("Batch Message decoding error on message with index %v, err: %s", i.messageInfo.Index, err)
log.Printf("can't read message body: %s, info: %s", err, i.batchInfo.Info())
}
return
}
msg = transformDeprecated(msg)
}
// Preprocess "system" messages
if _, ok := i.preFilter[msg.TypeID()]; ok {
msg = msg.Decode()
if msg == nil {
log.Printf("can't decode message")
log.Printf("decode error, type: %d, info: %s", msgType, i.batchInfo.Info())
return
}
if err := i.preprocessing(msg); err != nil {
@ -129,7 +138,7 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) {
if i.autoDecode {
msg = msg.Decode()
if msg == nil {
log.Printf("can't decode message")
log.Printf("decode error, type: %d, info: %s", msgType, i.batchInfo.Info())
return
}
}
@ -143,17 +152,17 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) {
}
func (i *messageIteratorImpl) zeroTsLog(msgType string) {
log.Printf("zero timestamp in %s, sess: %d", msgType, i.batchInfo.SessionID())
log.Printf("zero timestamp in %s, info: %s", msgType, i.batchInfo.Info())
}
func (i *messageIteratorImpl) preprocessing(msg Message) error {
switch m := msg.(type) {
case *BatchMetadata:
if i.messageInfo.Index > 1 { // Might be several 0-0 BatchMeta in a row without an error though
return fmt.Errorf("batchMetadata found at the end of the batch")
return fmt.Errorf("batchMetadata found at the end of the batch, info: %s", i.batchInfo.Info())
}
if m.Version > 1 {
return fmt.Errorf("incorrect batch version: %d, skip current batch", i.version)
return fmt.Errorf("incorrect batch version: %d, skip current batch, info: %s", i.version, i.batchInfo.Info())
}
i.messageInfo.Index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha)
i.messageInfo.Timestamp = m.Timestamp
@ -162,10 +171,11 @@ func (i *messageIteratorImpl) preprocessing(msg Message) error {
}
i.messageInfo.Url = m.Url
i.version = m.Version
i.batchInfo.version = m.Version
case *BatchMeta: // Is not required to be present in batch since IOS doesn't have it (though we might change it)
if i.messageInfo.Index > 1 { // Might be several 0-0 BatchMeta in a row without an error though
return fmt.Errorf("batchMeta found at the end of the batch")
return fmt.Errorf("batchMeta found at the end of the batch, info: %s", i.batchInfo.Info())
}
i.messageInfo.Index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha)
i.messageInfo.Timestamp = m.Timestamp
@ -183,6 +193,8 @@ func (i *messageIteratorImpl) preprocessing(msg Message) error {
i.messageInfo.Timestamp = int64(m.Timestamp)
if m.Timestamp == 0 {
i.zeroTsLog("SessionStart")
log.Printf("zero session start, project: %d, UA: %s, tracker: %s, info: %s",
m.ProjectID, m.UserAgent, m.TrackerVersion, i.batchInfo.Info())
}
case *SessionEnd:

View file

@ -1,5 +1,7 @@
package messages
import "fmt"
type Message interface {
Encode() []byte
EncodeWithIndex() []byte
@ -14,14 +16,17 @@ type BatchInfo struct {
sessionID uint64
id uint64
topic string
partition uint64
timestamp int64
version uint64
}
func NewBatchInfo(sessID uint64, topic string, id uint64, ts int64) *BatchInfo {
func NewBatchInfo(sessID uint64, topic string, id, partition uint64, ts int64) *BatchInfo {
return &BatchInfo{
sessionID: sessID,
id: id,
topic: topic,
partition: partition,
timestamp: ts,
}
}
@ -38,6 +43,10 @@ func (b *BatchInfo) Timestamp() int64 {
return b.timestamp
}
func (b *BatchInfo) Info() string {
return fmt.Sprintf("session: %d, partition: %d, offset: %d, ver: %d", b.sessionID, b.partition, b.id, b.version)
}
type message struct {
Timestamp int64
Index uint64

View file

@ -16,6 +16,7 @@ type RawMessage struct {
meta *message
encoded bool
skipped *bool
broken *bool
}
func (m *RawMessage) Encode() []byte {
@ -28,7 +29,7 @@ func (m *RawMessage) Encode() []byte {
*m.skipped = false
_, err := io.ReadFull(m.reader, m.data[1:])
if err != nil {
log.Printf("message encode err: %s", err)
log.Printf("message encode err: %s, type: %d, sess: %d", err, m.tp, m.SessionID())
return nil
}
return m.data
@ -36,7 +37,10 @@ func (m *RawMessage) Encode() []byte {
func (m *RawMessage) EncodeWithIndex() []byte {
if !m.encoded {
m.Encode()
if m.Encode() == nil {
*m.broken = true
return nil
}
}
if IsIOSType(int(m.tp)) {
return m.data
@ -49,14 +53,17 @@ func (m *RawMessage) EncodeWithIndex() []byte {
func (m *RawMessage) Decode() Message {
if !m.encoded {
m.Encode()
if m.Encode() == nil {
*m.broken = true
return nil
}
}
msg, err := ReadMessage(m.tp, bytes.NewReader(m.data[1:]))
if err != nil {
log.Printf("decode err: %s", err)
*m.broken = true
return nil
}
msg = transformDeprecated(msg)
msg.Meta().SetMeta(m.meta)
return msg
}