diff --git a/backend/cmd/assets/main.go b/backend/cmd/assets/main.go index 629224da7..d62a25876 100644 --- a/backend/cmd/assets/main.go +++ b/backend/cmd/assets/main.go @@ -37,11 +37,19 @@ func main() { func(sessionID uint64, iter messages.Iterator, meta *types.Meta) { for iter.Next() { if iter.Type() == messages.MsgAssetCache { - msg := iter.Message().Decode().(*messages.AssetCache) + m := iter.Message().Decode() + if m == nil { + return + } + msg := m.(*messages.AssetCache) cacher.CacheURL(sessionID, msg.URL) totalAssets.Add(context.Background(), 1) } else if iter.Type() == messages.MsgErrorEvent { - msg := iter.Message().Decode().(*messages.ErrorEvent) + m := iter.Message().Decode() + if m == nil { + return + } + msg := m.(*messages.ErrorEvent) if msg.Source != "js_exception" { continue } diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index 2ea57b459..82e96dae6 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -69,6 +69,9 @@ func main() { continue } msg := iter.Message().Decode() + if msg == nil { + return + } // Just save session data into db without additional checks if err := saver.InsertMessage(sessionID, msg); err != nil { diff --git a/backend/cmd/sink/main.go b/backend/cmd/sink/main.go index d247d17b2..f0b145fae 100644 --- a/backend/cmd/sink/main.go +++ b/backend/cmd/sink/main.go @@ -76,7 +76,11 @@ func main() { iter.Type() == MsgCSSInsertRuleURLBased || iter.Type() == MsgAdoptedSSReplaceURLBased || iter.Type() == MsgAdoptedSSInsertRuleURLBased { - msg = assetMessageHandler.ParseAssets(sessionID, msg.Decode()) // TODO: filter type only once (use iterator inide or bring ParseAssets out here). + m := msg.Decode() + if m == nil { + return + } + msg = assetMessageHandler.ParseAssets(sessionID, m) // TODO: filter type only once (use iterator inide or bring ParseAssets out here). } // Filter message diff --git a/backend/pkg/messages/batch.go b/backend/pkg/messages/batch.go index 7dd6172bd..58b177aea 100644 --- a/backend/pkg/messages/batch.go +++ b/backend/pkg/messages/batch.go @@ -93,7 +93,11 @@ func (i *iteratorImpl) Next() bool { log.Printf("Batch Metadata found at the end of the batch") return false } - m := i.msg.Decode().(*BatchMetadata) + msg := i.msg.Decode() + if msg == nil { + return false + } + m := msg.(*BatchMetadata) i.index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha) i.timestamp = m.Timestamp i.version = m.Version @@ -108,7 +112,11 @@ func (i *iteratorImpl) Next() bool { log.Printf("Batch Meta found at the end of the batch") return false } - m := i.msg.Decode().(*BatchMeta) + msg := i.msg.Decode() + if msg == nil { + return false + } + m := msg.(*BatchMeta) i.index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha) i.timestamp = m.Timestamp isBatchMeta = true @@ -118,24 +126,44 @@ func (i *iteratorImpl) Next() bool { log.Printf("Batch Meta found at the end of the batch") return false } - m := i.msg.Decode().(*IOSBatchMeta) + msg := i.msg.Decode() + if msg == nil { + return false + } + m := msg.(*IOSBatchMeta) i.index = m.FirstIndex i.timestamp = int64(m.Timestamp) isBatchMeta = true // continue readLoop case MsgTimestamp: - m := i.msg.Decode().(*Timestamp) + msg := i.msg.Decode() + if msg == nil { + return false + } + m := msg.(*Timestamp) i.timestamp = int64(m.Timestamp) // No skipping here for making it easy to encode back the same sequence of message // continue readLoop case MsgSessionStart: - m := i.msg.Decode().(*SessionStart) + msg := i.msg.Decode() + if msg == nil { + return false + } + m := msg.(*SessionStart) i.timestamp = int64(m.Timestamp) case MsgSessionEnd: - m := i.msg.Decode().(*SessionEnd) + msg := i.msg.Decode() + if msg == nil { + return false + } + m := msg.(*SessionEnd) i.timestamp = int64(m.Timestamp) case MsgSetPageLocation: - m := i.msg.Decode().(*SetPageLocation) + msg := i.msg.Decode() + if msg == nil { + return false + } + m := msg.(*SetPageLocation) i.url = m.URL } i.msg.Meta().Index = i.index