103 lines
3.3 KiB
Text
103 lines
3.3 KiB
Text
# Auto-generated, do not edit
|
|
|
|
from msgcodec.codec import Codec
|
|
from msgcodec.messages import *
|
|
from typing import List
|
|
import io
|
|
|
|
class MessageCodec(Codec):
|
|
|
|
def __init__(self, msg_selector: List[int] = list()):
|
|
self.msg_selector = msg_selector
|
|
|
|
def read_message_id(self, reader: io.BytesIO) -> int:
|
|
"""
|
|
Read and return the first byte where the message id is encoded
|
|
"""
|
|
id_ = self.read_uint(reader)
|
|
return id_
|
|
|
|
def encode(self, m: Message) -> bytes:
|
|
...
|
|
|
|
def decode(self, b: bytes) -> Message:
|
|
reader = io.BytesIO(b)
|
|
return self.read_head_message(reader)
|
|
|
|
@staticmethod
|
|
def check_message_id(b: bytes) -> int:
|
|
"""
|
|
todo: make it static and without reader. It's just the first byte
|
|
Read and return the first byte where the message id is encoded
|
|
"""
|
|
reader = io.BytesIO(b)
|
|
id_ = Codec.read_uint(reader)
|
|
|
|
return id_
|
|
|
|
@staticmethod
|
|
def decode_key(b) -> int:
|
|
"""
|
|
Decode the message key (encoded with little endian)
|
|
"""
|
|
try:
|
|
decoded = int.from_bytes(b, "little", signed=False)
|
|
except Exception as e:
|
|
raise UnicodeDecodeError(f"Error while decoding message key (SessionID) from {b}\n{e}")
|
|
return decoded
|
|
|
|
def decode_detailed(self, b: bytes) -> List[Message]:
|
|
reader = io.BytesIO(b)
|
|
messages_list = list()
|
|
try:
|
|
messages_list.append(self.handler(reader, 0))
|
|
except IndexError:
|
|
print('[WARN] Broken batch')
|
|
return list()
|
|
if isinstance(messages_list[0], BatchMeta):
|
|
# Old BatchMeta
|
|
mode = 0
|
|
elif isinstance(messages_list[0], BatchMetadata):
|
|
# New BatchMeta
|
|
if messages_list[0].version == 0:
|
|
mode = 0
|
|
else:
|
|
mode = 1
|
|
else:
|
|
return messages_list
|
|
while True:
|
|
try:
|
|
msg_decoded = self.handler(reader, mode)
|
|
if msg_decoded is not None:
|
|
messages_list.append(msg_decoded)
|
|
except IndexError:
|
|
break
|
|
return messages_list
|
|
|
|
def handler(self, reader: io.BytesIO, mode=0) -> Message:
|
|
message_id = self.read_message_id(reader)
|
|
#print(f'[INFO-context] Current mode {mode}')
|
|
#print(f'[INFO] Currently processing message type {message_id}')
|
|
if mode == 1:
|
|
# We read the three bytes representing the length of message. It can be used to skip unwanted messages
|
|
r_size = self.read_size(reader)
|
|
if message_id not in self.msg_selector:
|
|
reader.read(r_size)
|
|
return None
|
|
return self.read_head_message(reader, message_id)
|
|
elif mode == 0:
|
|
# Old format with no bytes for message length
|
|
return self.read_head_message(reader, message_id)
|
|
else:
|
|
raise IOError()
|
|
|
|
def read_head_message(self, reader: io.BytesIO, message_id) -> Message:
|
|
<% $messages.each do |msg| %>
|
|
if message_id == <%= msg.id %>:
|
|
return <%= msg.name %>(
|
|
<%= msg.attributes.map { |attr|
|
|
"#{attr.name.snake_case}=self.read_#{attr.type.to_s}(reader)" }
|
|
.join ",\n "
|
|
%>
|
|
)
|
|
<% end %>
|