* feat(backend): removed duplicate code in ender message iterator * feat(backend): removed duplicate code in sink message iterator
31 lines
773 B
Go
31 lines
773 B
Go
package messages
|
|
|
|
import (
|
|
"openreplay/backend/pkg/metrics/sink"
|
|
)
|
|
|
|
type sinkIteratorImpl struct {
|
|
coreIterator MessageIterator
|
|
handler MessageHandler
|
|
}
|
|
|
|
func NewSinkMessageIterator(messageHandler MessageHandler, messageFilter []int, autoDecode bool) MessageIterator {
|
|
iter := &sinkIteratorImpl{
|
|
handler: messageHandler,
|
|
}
|
|
iter.coreIterator = NewMessageIterator(iter.handle, messageFilter, autoDecode)
|
|
return iter
|
|
}
|
|
|
|
func (i *sinkIteratorImpl) handle(message Message) {
|
|
i.handler(message)
|
|
}
|
|
|
|
func (i *sinkIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) {
|
|
sink.RecordBatchSize(float64(len(batchData)))
|
|
sink.IncreaseTotalBatches()
|
|
// Call core iterator
|
|
i.coreIterator.Iterate(batchData, batchInfo)
|
|
// Send batch end signal
|
|
i.handler(nil)
|
|
}
|