From 2088d16c79577bc2590c555c0a96dfabeb99503a Mon Sep 17 00:00:00 2001 From: Alex Kaminskii Date: Fri, 29 Jul 2022 18:27:17 +0200 Subject: [PATCH] feat(backend&mobs):new batch metadata + PartitionedMessage service msg --- backend/pkg/messages/messages.go | 44 +++++++++++++++++++ backend/pkg/messages/read-message.go | 33 ++++++++++++-- mobs/messages.rb | 18 ++++++++ .../backend~pkg~messages~read-message.go.erb | 4 -- 4 files changed, 91 insertions(+), 8 deletions(-) diff --git a/backend/pkg/messages/messages.go b/backend/pkg/messages/messages.go index 6c4d75bfc..9f0136ab0 100644 --- a/backend/pkg/messages/messages.go +++ b/backend/pkg/messages/messages.go @@ -22,6 +22,50 @@ func (msg *BatchMeta) TypeID() int { return 80 } +type BatchMetadata struct { + message + Version uint64 + PageNo uint64 + FirstIndex uint64 + Timestamp int64 + Location string +} + +func (msg *BatchMetadata) Encode() []byte { + buf := make([]byte, 51+len(msg.Location)) + buf[0] = 81 + p := 1 + p = WriteUint(msg.Version, buf, p) + p = WriteUint(msg.PageNo, buf, p) + p = WriteUint(msg.FirstIndex, buf, p) + p = WriteInt(msg.Timestamp, buf, p) + p = WriteString(msg.Location, buf, p) + return buf[:p] +} + +func (msg *BatchMetadata) TypeID() int { + return 81 +} + +type PartitionedMessage struct { + message + PartNo uint64 + PartTotal uint64 +} + +func (msg *PartitionedMessage) Encode() []byte { + buf := make([]byte, 21) + buf[0] = 82 + p := 1 + p = WriteUint(msg.PartNo, buf, p) + p = WriteUint(msg.PartTotal, buf, p) + return buf[:p] +} + +func (msg *PartitionedMessage) TypeID() int { + return 82 +} + type Timestamp struct { message Timestamp uint64 diff --git a/backend/pkg/messages/read-message.go b/backend/pkg/messages/read-message.go index 5009994f5..89b02bfcd 100644 --- a/backend/pkg/messages/read-message.go +++ b/backend/pkg/messages/read-message.go @@ -7,10 +7,6 @@ import ( ) func ReadMessage(reader io.Reader) (Message, error) { - t, err := ReadUint(reader) - if err != nil { - return nil, err - } switch t { case 80: @@ -26,6 +22,35 @@ func ReadMessage(reader io.Reader) (Message, error) { } return msg, nil + case 81: + msg := &BatchMetadata{} + if msg.Version, err = ReadUint(reader); err != nil { + return nil, err + } + if msg.PageNo, err = ReadUint(reader); err != nil { + return nil, err + } + if msg.FirstIndex, err = ReadUint(reader); err != nil { + return nil, err + } + if msg.Timestamp, err = ReadInt(reader); err != nil { + return nil, err + } + if msg.Location, err = ReadString(reader); err != nil { + return nil, err + } + return msg, nil + + case 82: + msg := &PartitionedMessage{} + if msg.PartNo, err = ReadUint(reader); err != nil { + return nil, err + } + if msg.PartTotal, err = ReadUint(reader); err != nil { + return nil, err + } + return msg, nil + case 0: msg := &Timestamp{} if msg.Timestamp, err = ReadUint(reader); err != nil { diff --git a/mobs/messages.rb b/mobs/messages.rb index 0f97f3db5..e50d02be4 100644 --- a/mobs/messages.rb +++ b/mobs/messages.rb @@ -1,9 +1,27 @@ # Special one for Batch Meta. Message id could define the version +# Depricated since tracker 3.6.0 message 80, 'BatchMeta', :replayer => false do uint 'PageNo' uint 'FirstIndex' int 'Timestamp' end + +# since tracker 3.6.0 +message 81, 'BatchMetadata', :replayer => false do + uint 'Version' + uint 'PageNo' + uint 'FirstIndex' + int 'Timestamp' + string 'Location' +end + +# since tracker 3.6.0 +message 82, 'PartitionedMessage', :replayer => false do + uint 'PartNo' + uint 'PartTotal' +end + + message 0, 'Timestamp' do uint 'Timestamp' end diff --git a/mobs/templates/backend~pkg~messages~read-message.go.erb b/mobs/templates/backend~pkg~messages~read-message.go.erb index 2e8920747..c797e2e74 100644 --- a/mobs/templates/backend~pkg~messages~read-message.go.erb +++ b/mobs/templates/backend~pkg~messages~read-message.go.erb @@ -7,10 +7,6 @@ import ( ) func ReadMessage(reader io.Reader) (Message, error) { - t, err := ReadUint(reader) - if err != nil { - return nil, err - } switch t { <% $messages.each do |msg| %> case <%= msg.id %>: