# Auto-generated, do not edit from msgcodec.codec import Codec from msgcodec.messages import * from typing import List import io class MessageCodec(Codec): def __init__(self, msg_selector: List[int] = list()): self.msg_selector = msg_selector def read_message_id(self, reader: io.BytesIO) -> int: """ Read and return the first byte where the message id is encoded """ id_ = self.read_uint(reader) return id_ def encode(self, m: Message) -> bytes: ... def decode(self, b: bytes) -> Message: reader = io.BytesIO(b) return self.read_head_message(reader) @staticmethod def check_message_id(b: bytes) -> int: """ todo: make it static and without reader. It's just the first byte Read and return the first byte where the message id is encoded """ reader = io.BytesIO(b) id_ = Codec.read_uint(reader) return id_ @staticmethod def decode_key(b) -> int: """ Decode the message key (encoded with little endian) """ try: decoded = int.from_bytes(b, "little", signed=False) except Exception as e: raise UnicodeDecodeError(f"Error while decoding message key (SessionID) from {b}\n{e}") return decoded def decode_detailed(self, b: bytes) -> List[Message]: reader = io.BytesIO(b) messages_list = list() try: messages_list.append(self.handler(reader, 0)) except IndexError: print('[WARN] Broken batch') return list() if isinstance(messages_list[0], BatchMeta): # Old BatchMeta mode = 0 elif isinstance(messages_list[0], BatchMetadata): # New BatchMeta if messages_list[0].version == 0: mode = 0 else: mode = 1 else: return messages_list while True: try: msg_decoded = self.handler(reader, mode) if msg_decoded is not None: messages_list.append(msg_decoded) except IndexError: break return messages_list def handler(self, reader: io.BytesIO, mode=0) -> Message: message_id = self.read_message_id(reader) #print(f'[INFO-context] Current mode {mode}') #print(f'[INFO] Currently processing message type {message_id}') if mode == 1: # We read the three bytes representing the length of message. It can be used to skip unwanted messages r_size = self.read_size(reader) if message_id not in self.msg_selector: reader.read(r_size) return None return self.read_head_message(reader, message_id) elif mode == 0: # Old format with no bytes for message length return self.read_head_message(reader, message_id) else: raise IOError() def read_head_message(self, reader: io.BytesIO, message_id) -> Message: <% $messages.each do |msg| %> if message_id == <%= msg.id %>: return <%= msg.name %>( <%= msg.attributes.map { |attr| "#{attr.name.snake_case}=self.read_#{attr.type.to_s}(reader)" } .join ",\n " %> ) <% end %>