added BatchMetadata and handle the new message format within it

This commit is contained in:
mauricio garcia suarez 2022-08-25 12:10:45 +02:00
parent 3381e3a081
commit 846c167db3
3 changed files with 41 additions and 0 deletions

View file

@ -146,6 +146,16 @@ def handle_session(n: Session, message: Message) -> Optional[Session]:
n.batchmeta_timestamp = message.timestamp
return n
if isinstance(message, BatchMetadata):
n.batchmeta_page_no = message.page_no
n.batchmeta_first_index = message.first_index
n.batchmeta_timestamp = message.timestamp
return n
if isinstance(message, PartitionedMessage):
n.part_no = message.part_no
n.part_total = message.part_total
# if isinstance(message, IOSBatchMeta):
# n.iosbatchmeta_page_no = message.page_no
# n.iosbatchmeta_first_index = message.first_index

View file

@ -21,6 +21,8 @@ class Codec:
i = 0 # n of byte (max 9 for uint64)
while True:
b = reader.read(1)
if len(b) == 0:
raise IndexError('bytes out of range')
num = int.from_bytes(b, "big", signed=False)
# print(i, x)

View file

@ -18,7 +18,36 @@ class MessageCodec(Codec):
def decode(self, b: bytes) -> Message:
reader = io.BytesIO(b)
return self.read_head_message(reader)
def decode_detailed(self, b: bytes):
reader = io.BytesIO(b)
messages_list = list()
messages_list.append(self.handler(reader, 0))
if isinstance(messages_list[0], BatchMeta):
mode = 0
elif isinstance(messages_list[0], BatchMetadata):
mode = 1
else:
return messages_list
while True:
try:
messages_list.append(self.handler(reader, mode))
except IndexError:
break
return messages_list
def handler(self, reader: io.BytesIO, mode=0):
message_id = self.read_message_id(reader)
if mode == 1:
reader.read(3)
return self.read_head_message(reader, message_id)
elif mode == 0:
return self.read_head_message(reader, message_id)
else:
raise IOError()
def read_head_message(self, reader: io.BytesIO, message_id: int):
if message_id == 80:
return BatchMeta(