diff --git a/ee/connectors/handler.py b/ee/connectors/handler.py index 6f6316545..1442c9148 100644 --- a/ee/connectors/handler.py +++ b/ee/connectors/handler.py @@ -146,6 +146,16 @@ def handle_session(n: Session, message: Message) -> Optional[Session]: n.batchmeta_timestamp = message.timestamp return n + if isinstance(message, BatchMetadata): + n.batchmeta_page_no = message.page_no + n.batchmeta_first_index = message.first_index + n.batchmeta_timestamp = message.timestamp + return n + + if isinstance(message, PartitionedMessage): + n.part_no = message.part_no + n.part_total = message.part_total + # if isinstance(message, IOSBatchMeta): # n.iosbatchmeta_page_no = message.page_no # n.iosbatchmeta_first_index = message.first_index diff --git a/ee/connectors/msgcodec/codec.py b/ee/connectors/msgcodec/codec.py index 5aeb0e4ed..577f02f48 100644 --- a/ee/connectors/msgcodec/codec.py +++ b/ee/connectors/msgcodec/codec.py @@ -21,6 +21,8 @@ class Codec: i = 0 # n of byte (max 9 for uint64) while True: b = reader.read(1) + if len(b) == 0: + raise IndexError('bytes out of range') num = int.from_bytes(b, "big", signed=False) # print(i, x) diff --git a/ee/connectors/msgcodec/msgcodec.py b/ee/connectors/msgcodec/msgcodec.py index 3cd6a846a..03515316e 100644 --- a/ee/connectors/msgcodec/msgcodec.py +++ b/ee/connectors/msgcodec/msgcodec.py @@ -18,7 +18,36 @@ class MessageCodec(Codec): def decode(self, b: bytes) -> Message: reader = io.BytesIO(b) + return self.read_head_message(reader) + + def decode_detailed(self, b: bytes): + reader = io.BytesIO(b) + messages_list = list() + messages_list.append(self.handler(reader, 0)) + if isinstance(messages_list[0], BatchMeta): + mode = 0 + elif isinstance(messages_list[0], BatchMetadata): + mode = 1 + else: + return messages_list + while True: + try: + messages_list.append(self.handler(reader, mode)) + except IndexError: + break + return messages_list + + def handler(self, reader: io.BytesIO, mode=0): message_id = self.read_message_id(reader) + if mode == 1: + reader.read(3) + return self.read_head_message(reader, message_id) + elif mode == 0: + return self.read_head_message(reader, message_id) + else: + raise IOError() + + def read_head_message(self, reader: io.BytesIO, message_id: int): if message_id == 80: return BatchMeta(