diff --git a/ee/connectors/consumer_async.py b/ee/connectors/consumer_async.py index 2e6054606..9609eadb6 100644 --- a/ee/connectors/consumer_async.py +++ b/ee/connectors/consumer_async.py @@ -22,7 +22,12 @@ def process_message(msg, codec, sessions, batch, sessions_batch, interesting_ses if msg is None: return messages = codec.decode_detailed(msg.value()) - session_id = codec.decode_key(msg.key()) + try: + session_id = codec.decode_key(msg.key()) + except Exception as e: + print('[WARN] Broken sessionid') + print(e) + return if messages is None: print('-') return diff --git a/ee/connectors/msgcodec/msgcodec.py b/ee/connectors/msgcodec/msgcodec.py index 857ed303a..64d094521 100644 --- a/ee/connectors/msgcodec/msgcodec.py +++ b/ee/connectors/msgcodec/msgcodec.py @@ -43,7 +43,8 @@ class MessageCodec(Codec): try: decoded = int.from_bytes(b, "little", signed=False) except Exception as e: - raise UnicodeDecodeError(f"Error while decoding message key (SessionID) from {b}\n{e}") + print(f"Error while decoding message key (SessionID) from {b}\n{e}") + raise e return decoded def decode_detailed(self, b: bytes) -> List[Message]: