openreplay/backend/pkg/queue/messages.go
2022-06-29 12:20:42 +02:00

19 lines
580 B
Go

package queue
import (
"bytes"
"log"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue/types"
)
func NewMessageConsumer(group string, topics []string, handler types.DecodedMessageHandler, autoCommit bool, messageSizeLimit int) types.Consumer {
return NewConsumer(group, topics, func(sessionID uint64, value []byte, meta *types.Meta) {
if err := messages.ReadBatchReader(bytes.NewReader(value), func(msg messages.Message) {
handler(sessionID, msg, meta)
}); err != nil {
log.Printf("Decode error: %v\n", err)
}
}, autoCommit, messageSizeLimit)
}