diff --git a/backend/cmd/sink/main.go b/backend/cmd/sink/main.go index ffcaa0b94..1f34760e1 100644 --- a/backend/cmd/sink/main.go +++ b/backend/cmd/sink/main.go @@ -3,6 +3,7 @@ package main import ( "context" "log" + "openreplay/backend/pkg/pprof" "os" "os/signal" "syscall" @@ -19,6 +20,8 @@ import ( ) func main() { + pprof.StartProfilingServer() + metrics := monitoring.New("sink") log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) @@ -34,9 +37,10 @@ func main() { producer := queue.NewProducer(cfg.MessageSizeLimit, true) defer producer.Close(cfg.ProducerCloseTimeout) rewriter := assets.NewRewriter(cfg.AssetsOrigin) - assetMessageHandler := assetscache.New(cfg, rewriter, producer) + assetMessageHandler := assetscache.New(cfg, rewriter, producer, metrics) counter := storage.NewLogCounter() + // Session message metrics totalMessages, err := metrics.RegisterCounter("messages_total") if err != nil { log.Printf("can't create messages_total metric: %s", err) @@ -68,7 +72,12 @@ func main() { msg.TypeID() == messages.MsgCSSInsertRuleURLBased || msg.TypeID() == messages.MsgAdoptedSSReplaceURLBased || msg.TypeID() == messages.MsgAdoptedSSInsertRuleURLBased { - msg = assetMessageHandler.ParseAssets(msg.Decode()) // TODO: filter type only once (use iterator inside or bring ParseAssets out here). + m := msg.Decode() + if m == nil { + log.Printf("assets decode err, info: %s", msg.Meta().Batch().Info()) + return + } + msg = assetMessageHandler.ParseAssets(m) } // Filter message @@ -87,15 +96,19 @@ func main() { // Write encoded message with index to session file data := msg.EncodeWithIndex() + if data == nil { + log.Printf("can't encode with index, err: %s", err) + return + } if messages.IsDOMType(msg.TypeID()) { if err := writer.WriteDOM(msg.SessionID(), data); err != nil { - log.Printf("DOM Writer error: %v\n", err) + log.Printf("DOM Writer error: %s, info: %s", err, msg.Meta().Batch().Info()) } } if !messages.IsDOMType(msg.TypeID()) || msg.TypeID() == messages.MsgTimestamp { // TODO: write only necessary timestamps if err := writer.WriteDEV(msg.SessionID(), data); err != nil { - log.Printf("Devtools Writer error: %v\n", err) + log.Printf("Devtools Writer error: %s, info: %s", err, msg.Meta().Batch().Info()) } } @@ -146,5 +159,4 @@ func main() { } } } - } diff --git a/backend/internal/config/sink/config.go b/backend/internal/config/sink/config.go index 215484082..a7481f93a 100644 --- a/backend/internal/config/sink/config.go +++ b/backend/internal/config/sink/config.go @@ -17,6 +17,8 @@ type Config struct { CacheAssets bool `env:"CACHE_ASSETS,required"` AssetsOrigin string `env:"ASSETS_ORIGIN,required"` ProducerCloseTimeout int `env:"PRODUCER_CLOSE_TIMEOUT,default=15000"` + CacheThreshold int64 `env:"CACHE_THRESHOLD,default=75"` + CacheExpiration int64 `env:"CACHE_EXPIRATION,default=120"` } func New() *Config { diff --git a/backend/internal/sink/assetscache/assets.go b/backend/internal/sink/assetscache/assets.go index 478141453..96f225a45 100644 --- a/backend/internal/sink/assetscache/assets.go +++ b/backend/internal/sink/assetscache/assets.go @@ -1,24 +1,69 @@ package assetscache import ( + "context" + "crypto/md5" + "go.opentelemetry.io/otel/metric/instrument/syncfloat64" + "io" "log" + "net/url" "openreplay/backend/internal/config/sink" "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/monitoring" "openreplay/backend/pkg/queue/types" "openreplay/backend/pkg/url/assets" + "time" ) -type AssetsCache struct { - cfg *sink.Config - rewriter *assets.Rewriter - producer types.Producer +type CachedAsset struct { + msg string + ts time.Time } -func New(cfg *sink.Config, rewriter *assets.Rewriter, producer types.Producer) *AssetsCache { +type AssetsCache struct { + cfg *sink.Config + rewriter *assets.Rewriter + producer types.Producer + cache map[string]*CachedAsset + totalAssets syncfloat64.Counter + cachedAssets syncfloat64.Counter + skippedAssets syncfloat64.Counter + assetSize syncfloat64.Histogram + assetDuration syncfloat64.Histogram +} + +func New(cfg *sink.Config, rewriter *assets.Rewriter, producer types.Producer, metrics *monitoring.Metrics) *AssetsCache { + // Assets metrics + totalAssets, err := metrics.RegisterCounter("assets_total") + if err != nil { + log.Printf("can't create assets_total metric: %s", err) + } + cachedAssets, err := metrics.RegisterCounter("assets_cached") + if err != nil { + log.Printf("can't create assets_cached metric: %s", err) + } + skippedAssets, err := metrics.RegisterCounter("assets_skipped") + if err != nil { + log.Printf("can't create assets_skipped metric: %s", err) + } + assetSize, err := metrics.RegisterHistogram("asset_size") + if err != nil { + log.Printf("can't create asset_size metric: %s", err) + } + assetDuration, err := metrics.RegisterHistogram("asset_duration") + if err != nil { + log.Printf("can't create asset_duration metric: %s", err) + } return &AssetsCache{ - cfg: cfg, - rewriter: rewriter, - producer: producer, + cfg: cfg, + rewriter: rewriter, + producer: producer, + cache: make(map[string]*CachedAsset, 64), + totalAssets: totalAssets, + cachedAssets: cachedAssets, + skippedAssets: skippedAssets, + assetSize: assetSize, + assetDuration: assetDuration, } } @@ -95,18 +140,67 @@ func (e *AssetsCache) sendAssetsForCacheFromCSS(sessionID uint64, baseURL string } } -func (e *AssetsCache) handleURL(sessionID uint64, baseURL string, url string) string { +func (e *AssetsCache) handleURL(sessionID uint64, baseURL string, urlVal string) string { if e.cfg.CacheAssets { - e.sendAssetForCache(sessionID, baseURL, url) - return e.rewriter.RewriteURL(sessionID, baseURL, url) + e.sendAssetForCache(sessionID, baseURL, urlVal) + return e.rewriter.RewriteURL(sessionID, baseURL, urlVal) + } else { + return assets.ResolveURL(baseURL, urlVal) } - return assets.ResolveURL(baseURL, url) } func (e *AssetsCache) handleCSS(sessionID uint64, baseURL string, css string) string { + ctx := context.Background() + e.totalAssets.Add(ctx, 1) + // Try to find asset in cache + h := md5.New() + // Cut first part of url (scheme + host) + u, err := url.Parse(baseURL) + if err != nil { + log.Printf("can't parse url: %s, err: %s", baseURL, err) + if e.cfg.CacheAssets { + e.sendAssetsForCacheFromCSS(sessionID, baseURL, css) + } + return e.getRewrittenCSS(sessionID, baseURL, css) + } + justUrl := u.Scheme + "://" + u.Host + "/" + // Calculate hash sum of url + css + io.WriteString(h, justUrl) + io.WriteString(h, css) + hash := string(h.Sum(nil)) + // Check the resulting hash in cache + if cachedAsset, ok := e.cache[hash]; ok { + if int64(time.Now().Sub(cachedAsset.ts).Minutes()) < e.cfg.CacheExpiration { + e.skippedAssets.Add(ctx, 1) + return cachedAsset.msg + } + } + // Send asset to download in assets service if e.cfg.CacheAssets { e.sendAssetsForCacheFromCSS(sessionID, baseURL, css) - return e.rewriter.RewriteCSS(sessionID, baseURL, css) } - return assets.ResolveCSS(baseURL, css) + // Rewrite asset + start := time.Now() + res := e.getRewrittenCSS(sessionID, baseURL, css) + duration := time.Now().Sub(start).Milliseconds() + e.assetSize.Record(ctx, float64(len(res))) + e.assetDuration.Record(ctx, float64(duration)) + // Save asset to cache if we spent more than threshold + if duration > e.cfg.CacheThreshold { + e.cache[hash] = &CachedAsset{ + msg: res, + ts: time.Now(), + } + e.cachedAssets.Add(ctx, 1) + } + // Return rewritten asset + return res +} + +func (e *AssetsCache) getRewrittenCSS(sessionID uint64, url, css string) string { + if e.cfg.CacheAssets { + return e.rewriter.RewriteCSS(sessionID, url, css) + } else { + return assets.ResolveCSS(url, css) + } } diff --git a/backend/pkg/messages/iterator.go b/backend/pkg/messages/iterator.go index cd29293b0..100497de3 100644 --- a/backend/pkg/messages/iterator.go +++ b/backend/pkg/messages/iterator.go @@ -23,6 +23,7 @@ type messageIteratorImpl struct { version uint64 size uint64 canSkip bool + broken bool messageInfo *message batchInfo *BatchInfo } @@ -47,6 +48,7 @@ func (i *messageIteratorImpl) prepareVars(batchInfo *BatchInfo) { i.messageInfo = &message{batch: batchInfo} i.version = 0 i.canSkip = false + i.broken = false i.size = 0 } @@ -62,9 +64,14 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) { // Increase message index (can be overwritten by batch info message) i.messageInfo.Index++ + if i.broken { + log.Printf("skipping broken batch, info: %s", i.batchInfo.Info()) + return + } + if i.canSkip { if _, err := reader.Seek(int64(i.size), io.SeekCurrent); err != nil { - log.Printf("seek err: %s", err) + log.Printf("can't skip message: %s, info: %s", err, i.batchInfo.Info()) return } } @@ -74,7 +81,7 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) { msgType, err := ReadUint(reader) if err != nil { if err != io.EOF { - log.Printf("can't read message type: %s", err) + log.Printf("can't read message type: %s, info: %s", err, i.batchInfo.Info()) } return } @@ -85,7 +92,7 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) { // Read message size if it is a new protocol version i.size, err = ReadSize(reader) if err != nil { - log.Printf("can't read message size: %s", err) + log.Printf("can't read message size: %s, info: %s", err, i.batchInfo.Info()) return } msg = &RawMessage{ @@ -93,6 +100,7 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) { size: i.size, reader: reader, skipped: &i.canSkip, + broken: &i.broken, meta: i.messageInfo, } i.canSkip = true @@ -100,17 +108,18 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) { msg, err = ReadMessage(msgType, reader) if err != nil { if err != io.EOF { - log.Printf("Batch Message decoding error on message with index %v, err: %s", i.messageInfo.Index, err) + log.Printf("can't read message body: %s, info: %s", err, i.batchInfo.Info()) } return } + msg = transformDeprecated(msg) } // Preprocess "system" messages if _, ok := i.preFilter[msg.TypeID()]; ok { msg = msg.Decode() if msg == nil { - log.Printf("can't decode message") + log.Printf("decode error, type: %d, info: %s", msgType, i.batchInfo.Info()) return } if err := i.preprocessing(msg); err != nil { @@ -129,7 +138,7 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) { if i.autoDecode { msg = msg.Decode() if msg == nil { - log.Printf("can't decode message") + log.Printf("decode error, type: %d, info: %s", msgType, i.batchInfo.Info()) return } } @@ -143,17 +152,17 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) { } func (i *messageIteratorImpl) zeroTsLog(msgType string) { - log.Printf("zero timestamp in %s, sess: %d", msgType, i.batchInfo.SessionID()) + log.Printf("zero timestamp in %s, info: %s", msgType, i.batchInfo.Info()) } func (i *messageIteratorImpl) preprocessing(msg Message) error { switch m := msg.(type) { case *BatchMetadata: if i.messageInfo.Index > 1 { // Might be several 0-0 BatchMeta in a row without an error though - return fmt.Errorf("batchMetadata found at the end of the batch") + return fmt.Errorf("batchMetadata found at the end of the batch, info: %s", i.batchInfo.Info()) } if m.Version > 1 { - return fmt.Errorf("incorrect batch version: %d, skip current batch", i.version) + return fmt.Errorf("incorrect batch version: %d, skip current batch, info: %s", i.version, i.batchInfo.Info()) } i.messageInfo.Index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha) i.messageInfo.Timestamp = m.Timestamp @@ -162,10 +171,11 @@ func (i *messageIteratorImpl) preprocessing(msg Message) error { } i.messageInfo.Url = m.Url i.version = m.Version + i.batchInfo.version = m.Version case *BatchMeta: // Is not required to be present in batch since IOS doesn't have it (though we might change it) if i.messageInfo.Index > 1 { // Might be several 0-0 BatchMeta in a row without an error though - return fmt.Errorf("batchMeta found at the end of the batch") + return fmt.Errorf("batchMeta found at the end of the batch, info: %s", i.batchInfo.Info()) } i.messageInfo.Index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha) i.messageInfo.Timestamp = m.Timestamp @@ -183,6 +193,8 @@ func (i *messageIteratorImpl) preprocessing(msg Message) error { i.messageInfo.Timestamp = int64(m.Timestamp) if m.Timestamp == 0 { i.zeroTsLog("SessionStart") + log.Printf("zero session start, project: %d, UA: %s, tracker: %s, info: %s", + m.ProjectID, m.UserAgent, m.TrackerVersion, i.batchInfo.Info()) } case *SessionEnd: diff --git a/backend/pkg/messages/message.go b/backend/pkg/messages/message.go index ad21dbf6a..7bb2572eb 100644 --- a/backend/pkg/messages/message.go +++ b/backend/pkg/messages/message.go @@ -1,5 +1,7 @@ package messages +import "fmt" + type Message interface { Encode() []byte EncodeWithIndex() []byte @@ -14,14 +16,17 @@ type BatchInfo struct { sessionID uint64 id uint64 topic string + partition uint64 timestamp int64 + version uint64 } -func NewBatchInfo(sessID uint64, topic string, id uint64, ts int64) *BatchInfo { +func NewBatchInfo(sessID uint64, topic string, id, partition uint64, ts int64) *BatchInfo { return &BatchInfo{ sessionID: sessID, id: id, topic: topic, + partition: partition, timestamp: ts, } } @@ -38,6 +43,10 @@ func (b *BatchInfo) Timestamp() int64 { return b.timestamp } +func (b *BatchInfo) Info() string { + return fmt.Sprintf("session: %d, partition: %d, offset: %d, ver: %d", b.sessionID, b.partition, b.id, b.version) +} + type message struct { Timestamp int64 Index uint64 diff --git a/backend/pkg/messages/raw.go b/backend/pkg/messages/raw.go index 608d0c711..814b509e6 100644 --- a/backend/pkg/messages/raw.go +++ b/backend/pkg/messages/raw.go @@ -16,6 +16,7 @@ type RawMessage struct { meta *message encoded bool skipped *bool + broken *bool } func (m *RawMessage) Encode() []byte { @@ -28,7 +29,7 @@ func (m *RawMessage) Encode() []byte { *m.skipped = false _, err := io.ReadFull(m.reader, m.data[1:]) if err != nil { - log.Printf("message encode err: %s", err) + log.Printf("message encode err: %s, type: %d, sess: %d", err, m.tp, m.SessionID()) return nil } return m.data @@ -36,7 +37,10 @@ func (m *RawMessage) Encode() []byte { func (m *RawMessage) EncodeWithIndex() []byte { if !m.encoded { - m.Encode() + if m.Encode() == nil { + *m.broken = true + return nil + } } if IsIOSType(int(m.tp)) { return m.data @@ -49,14 +53,17 @@ func (m *RawMessage) EncodeWithIndex() []byte { func (m *RawMessage) Decode() Message { if !m.encoded { - m.Encode() + if m.Encode() == nil { + *m.broken = true + return nil + } } msg, err := ReadMessage(m.tp, bytes.NewReader(m.data[1:])) if err != nil { log.Printf("decode err: %s", err) + *m.broken = true return nil } - msg = transformDeprecated(msg) msg.Meta().SetMeta(m.meta) return msg }