feat(mobs,backend):PartitionedMessage & BatchMetadata messages
This commit is contained in:
parent
d0e486233a
commit
820994b55f
4 changed files with 91 additions and 8 deletions
|
|
@ -22,6 +22,50 @@ func (msg *BatchMeta) TypeID() int {
|
|||
return 80
|
||||
}
|
||||
|
||||
type BatchMetadata struct {
|
||||
message
|
||||
Version uint64
|
||||
PageNo uint64
|
||||
FirstIndex uint64
|
||||
Timestamp int64
|
||||
Location string
|
||||
}
|
||||
|
||||
func (msg *BatchMetadata) Encode() []byte {
|
||||
buf := make([]byte, 51+len(msg.Location))
|
||||
buf[0] = 81
|
||||
p := 1
|
||||
p = WriteUint(msg.Version, buf, p)
|
||||
p = WriteUint(msg.PageNo, buf, p)
|
||||
p = WriteUint(msg.FirstIndex, buf, p)
|
||||
p = WriteInt(msg.Timestamp, buf, p)
|
||||
p = WriteString(msg.Location, buf, p)
|
||||
return buf[:p]
|
||||
}
|
||||
|
||||
func (msg *BatchMetadata) TypeID() int {
|
||||
return 81
|
||||
}
|
||||
|
||||
type PartitionedMessage struct {
|
||||
message
|
||||
PartNo uint64
|
||||
PartTotal uint64
|
||||
}
|
||||
|
||||
func (msg *PartitionedMessage) Encode() []byte {
|
||||
buf := make([]byte, 21)
|
||||
buf[0] = 82
|
||||
p := 1
|
||||
p = WriteUint(msg.PartNo, buf, p)
|
||||
p = WriteUint(msg.PartTotal, buf, p)
|
||||
return buf[:p]
|
||||
}
|
||||
|
||||
func (msg *PartitionedMessage) TypeID() int {
|
||||
return 82
|
||||
}
|
||||
|
||||
type Timestamp struct {
|
||||
message
|
||||
Timestamp uint64
|
||||
|
|
|
|||
|
|
@ -7,10 +7,6 @@ import (
|
|||
)
|
||||
|
||||
func ReadMessage(reader io.Reader) (Message, error) {
|
||||
t, err := ReadUint(reader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
switch t {
|
||||
|
||||
case 80:
|
||||
|
|
@ -26,6 +22,35 @@ func ReadMessage(reader io.Reader) (Message, error) {
|
|||
}
|
||||
return msg, nil
|
||||
|
||||
case 81:
|
||||
msg := &BatchMetadata{}
|
||||
if msg.Version, err = ReadUint(reader); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if msg.PageNo, err = ReadUint(reader); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if msg.FirstIndex, err = ReadUint(reader); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if msg.Timestamp, err = ReadInt(reader); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if msg.Location, err = ReadString(reader); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return msg, nil
|
||||
|
||||
case 82:
|
||||
msg := &PartitionedMessage{}
|
||||
if msg.PartNo, err = ReadUint(reader); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if msg.PartTotal, err = ReadUint(reader); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return msg, nil
|
||||
|
||||
case 0:
|
||||
msg := &Timestamp{}
|
||||
if msg.Timestamp, err = ReadUint(reader); err != nil {
|
||||
|
|
|
|||
|
|
@ -1,9 +1,27 @@
|
|||
# Special one for Batch Meta. Message id could define the version
|
||||
# Depricated since tracker 3.6.0
|
||||
message 80, 'BatchMeta', :replayer => false do
|
||||
uint 'PageNo'
|
||||
uint 'FirstIndex'
|
||||
int 'Timestamp'
|
||||
end
|
||||
|
||||
# since tracker 3.6.0
|
||||
message 81, 'BatchMetadata', :replayer => false do
|
||||
uint 'Version'
|
||||
uint 'PageNo'
|
||||
uint 'FirstIndex'
|
||||
int 'Timestamp'
|
||||
string 'Location'
|
||||
end
|
||||
|
||||
# since tracker 3.6.0
|
||||
message 82, 'PartitionedMessage', :replayer => false do
|
||||
uint 'PartNo'
|
||||
uint 'PartTotal'
|
||||
end
|
||||
|
||||
|
||||
message 0, 'Timestamp' do
|
||||
uint 'Timestamp'
|
||||
end
|
||||
|
|
|
|||
|
|
@ -7,10 +7,6 @@ import (
|
|||
)
|
||||
|
||||
func ReadMessage(reader io.Reader) (Message, error) {
|
||||
t, err := ReadUint(reader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
switch t {
|
||||
<% $messages.each do |msg| %>
|
||||
case <%= msg.id %>:
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue