diff --git a/mobs/templates/ee~connectors~msgcodec~msgcodec.py.erb b/mobs/templates/ee~connectors~msgcodec~msgcodec.py.erb index b55764ee9..97d238ea8 100644 --- a/mobs/templates/ee~connectors~msgcodec~msgcodec.py.erb +++ b/mobs/templates/ee~connectors~msgcodec~msgcodec.py.erb @@ -7,6 +7,9 @@ import io class MessageCodec(Codec): + def __init__(self, msg_selector: List[int] = list()): + self.msg_selector = msg_selector + def read_message_id(self, reader: io.BytesIO) -> int: """ Read and return the first byte where the message id is encoded @@ -46,27 +49,41 @@ class MessageCodec(Codec): def decode_detailed(self, b: bytes) -> List[Message]: reader = io.BytesIO(b) messages_list = list() - messages_list.append(self.handler(reader, 0)) + try: + messages_list.append(self.handler(reader, 0)) + except IndexError: + print('[WARN] Broken batch') + return list() if isinstance(messages_list[0], BatchMeta): # Old BatchMeta mode = 0 elif isinstance(messages_list[0], BatchMetadata): # New BatchMeta - mode = 1 + if messages_list[0].version == 0: + mode = 0 + else: + mode = 1 else: return messages_list while True: try: - messages_list.append(self.handler(reader, mode)) + msg_decoded = self.handler(reader, mode) + if msg_decoded is not None: + messages_list.append(msg_decoded) except IndexError: break return messages_list def handler(self, reader: io.BytesIO, mode=0) -> Message: message_id = self.read_message_id(reader) + #print(f'[INFO-context] Current mode {mode}') + #print(f'[INFO] Currently processing message type {message_id}') if mode == 1: - # We skip the three bytes representing the length of message. It can be used to skip unwanted messages - reader.read(3) + # We read the three bytes representing the length of message. It can be used to skip unwanted messages + r_size = self.read_size(reader) + if message_id not in self.msg_selector: + reader.read(r_size) + return None return self.read_head_message(reader, message_id) elif mode == 0: # Old format with no bytes for message length