From 164232377d3ece75d405e19cec1c4c06bacdffab Mon Sep 17 00:00:00 2001 From: MauricioGarciaS <47052044+MauricioGarciaS@users.noreply.github.com> Date: Wed, 31 May 2023 11:42:49 +0200 Subject: [PATCH] fix(connector): Fixed small issues and added checkpoint method (#1251) * fix(connector): fixed bug of cache dict size error * fix(connector): Added method to save state in s3 for redshift if sigterm arise * fix(connector): Added exit signal handler and checkpoint method * Added sslmode selection for connection to database, added use_ssl parameter for S3 connection * fix(connector): Handle error when broken session_id --- ee/connectors/consumer_async.py | 7 ++++++- ee/connectors/msgcodec/msgcodec.py | 3 ++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/ee/connectors/consumer_async.py b/ee/connectors/consumer_async.py index 2e6054606..9609eadb6 100644 --- a/ee/connectors/consumer_async.py +++ b/ee/connectors/consumer_async.py @@ -22,7 +22,12 @@ def process_message(msg, codec, sessions, batch, sessions_batch, interesting_ses if msg is None: return messages = codec.decode_detailed(msg.value()) - session_id = codec.decode_key(msg.key()) + try: + session_id = codec.decode_key(msg.key()) + except Exception as e: + print('[WARN] Broken sessionid') + print(e) + return if messages is None: print('-') return diff --git a/ee/connectors/msgcodec/msgcodec.py b/ee/connectors/msgcodec/msgcodec.py index 857ed303a..64d094521 100644 --- a/ee/connectors/msgcodec/msgcodec.py +++ b/ee/connectors/msgcodec/msgcodec.py @@ -43,7 +43,8 @@ class MessageCodec(Codec): try: decoded = int.from_bytes(b, "little", signed=False) except Exception as e: - raise UnicodeDecodeError(f"Error while decoding message key (SessionID) from {b}\n{e}") + print(f"Error while decoding message key (SessionID) from {b}\n{e}") + raise e return decoded def decode_detailed(self, b: bytes) -> List[Message]: