From 5a5df5f21861528ac2fb918f080ba9838950bb3c Mon Sep 17 00:00:00 2001 From: MauricioGarciaS <47052044+MauricioGarciaS@users.noreply.github.com> Date: Tue, 24 Oct 2023 14:06:26 +0200 Subject: [PATCH] fix(redshift-connector): Solved issue while inserting from s3 to redshift (events table) (#1543) --- ee/connectors/consumer_pool.py | 12 ------------ ee/connectors/db/models.py | 8 ++++---- ee/connectors/db/utils.py | 18 ++++++++++++------ ee/connectors/utils/worker.py | 4 ++++ 4 files changed, 20 insertions(+), 22 deletions(-) diff --git a/ee/connectors/consumer_pool.py b/ee/connectors/consumer_pool.py index e145b96c0..5a2dbf34e 100644 --- a/ee/connectors/consumer_pool.py +++ b/ee/connectors/consumer_pool.py @@ -17,21 +17,9 @@ def main(): except Exception as e: print('[WORKER WARN] Checkpoint not found') print(repr(e)) - # ssl_protocol = config('KAFKA_USE_SSL', default=True, cast=bool) - # consumer_settings = { - # "bootstrap.servers": config('KAFKA_SERVERS'), - # "group.id": f"connector_{DATABASE}", - # "auto.offset.reset": "earliest", - # "enable.auto.commit": False - # } - # if ssl_protocol: - # consumer_settings['security.protocol'] = 'SSL' - # consumer = Consumer(consumer_settings) - # consumer.subscribe(config("TOPICS", default="saas-raw").split(',')) print("[WORKER INFO] Kafka consumer subscribed") - # w_pool.run_workers(kafka_consumer=consumer, database_api=database_api) w_pool.run_workers(database_api=database_api) diff --git a/ee/connectors/db/models.py b/ee/connectors/db/models.py index 8bcc3171c..e381c572f 100644 --- a/ee/connectors/db/models.py +++ b/ee/connectors/db/models.py @@ -86,10 +86,6 @@ class Event(Base): consolelog_value = Column(VARCHAR(5000)) customevent_name = Column(VARCHAR(5000)) customevent_payload = Column(VARCHAR(5000)) - clickevent_hesitationtime = Column(BigInteger) - clickevent_messageid = Column(BigInteger) - clickevent_label = Column(VARCHAR(5000)) - clickevent_selector = Column(VARCHAR(5000)) jsexception_message = Column(VARCHAR(5000)) jsexception_name = Column(VARCHAR(5000)) jsexception_payload = Column(VARCHAR(5000)) @@ -113,6 +109,10 @@ class Event(Base): customissue_payload = Column(VARCHAR(5000)) received_at = Column(BigInteger) batch_order_number = Column(BigInteger) + clickevent_hesitationtime = Column(BigInteger) + clickevent_label = Column(VARCHAR(5000)) + clickevent_messageid = Column(BigInteger) + clickevent_selector = Column(VARCHAR(5000)) class DetailedEvent(Base): diff --git a/ee/connectors/db/utils.py b/ee/connectors/db/utils.py index ed66f1a71..207845543 100644 --- a/ee/connectors/db/utils.py +++ b/ee/connectors/db/utils.py @@ -5,10 +5,6 @@ dtypes_events = { 'sessionid': "Int64", 'consolelog_level': "string", 'consolelog_value': "string", - 'clickevent_hesitationtime': "Int64", - 'clickevent_label': "string", - 'clickevent_messageid': "Int64", - 'clickevent_selector': "string", 'customevent_name': "string", 'customevent_payload': "string", 'jsexception_message': "string", @@ -33,7 +29,11 @@ dtypes_events = { 'customissue_name': "string", 'customissue_payload': "string", 'received_at': "Int64", - 'batch_order_number': "Int64"} + 'batch_order_number': "Int64", + 'clickevent_hesitationtime': "Int64", + 'clickevent_label': "string", + 'clickevent_messageid': "Int64", + 'clickevent_selector': "string"} dtypes_detailed_events = { "sessionid": "Int64", "clickevent_hesitationtime": "Int64", @@ -198,10 +198,14 @@ def get_df_from_batch(batch, level): if level == 'normal': current_types = dtypes_events + #df['clickevent_hesitationtime'] = df['clickevent_hesitationtime'].fillna(0) + #df['clickevent_messageid'] = df['clickevent_messageid'].fillna(0) if level == 'detailed': current_types = dtypes_detailed_events df['inputevent_value'] = None df['customevent_payload'] = None + #df['clickevent_hesitationtime'] = df['clickevent_hesitationtime'].fillna(0) + #df['clickevent_messageid'] = df['clickevent_messageid'].fillna(0) if level == 'sessions': current_types = dtypes_sessions df['js_exceptions_count'] = df['js_exceptions_count'].fillna(0) @@ -218,12 +222,14 @@ def get_df_from_batch(batch, level): for x in df.columns: try: if df[x].dtype == "string" or current_types[x] == "string": - df[x] = df[x].fillna('NULL') + df[x] = df[x].fillna("NULL") if x == 'user_id' or x == 'user_anonymous_id': df[x] = df[x].str.slice(0, 7999) else: df[x] = df[x].str.slice(0, 255) df[x] = df[x].str.replace("|", "") + elif current_types[x] == 'Int64': + df[x] = df[x].fillna(0) except TypeError as e: print(repr(e)) if df[x].dtype == 'str': diff --git a/ee/connectors/utils/worker.py b/ee/connectors/utils/worker.py index fab10a101..a79e3dd3a 100644 --- a/ee/connectors/utils/worker.py +++ b/ee/connectors/utils/worker.py @@ -450,14 +450,18 @@ class WorkerPool: kafka_reader_process = Process(target=read_from_kafka, args=(reader_conn, kafka_task_params)) kafka_reader_process.start() current_loop_number = 0 + n_kafka_restarts = 0 while signal_handler.KEEP_PROCESSING: current_loop_number = (current_loop_number + 1) % self.n_of_loops # Setup of parameters for workers if not kafka_reader_process.is_alive(): + if n_kafka_restarts > 3: + break print('[WORKER-INFO] Restarting reader task') del kafka_reader_process kafka_reader_process = Process(target=read_from_kafka, args=(reader_conn, kafka_task_params)) kafka_reader_process.start() + n_kafka_restarts += 1 decoding_params = [{'flag': 'decoder', 'message': list(), 'memory': dict()} for _ in range(self.n_workers)