From e85933e96d1dd940310c23f2782921716517b396 Mon Sep 17 00:00:00 2001 From: MauricioGarciaS <47052044+MauricioGarciaS@users.noreply.github.com> Date: Fri, 20 Jan 2023 12:25:22 +0100 Subject: [PATCH] Changed python-kafka to confluent-kafka --- ee/connectors/consumer.py | 24 +++++++++++-------- .../deploy/requirements_bigquery.txt | 2 +- .../deploy/requirements_clickhouse.txt | 2 +- ee/connectors/deploy/requirements_pg.txt | 2 +- .../deploy/requirements_redshift.txt | 2 +- .../deploy/requirements_snowflake.txt | 2 +- 6 files changed, 19 insertions(+), 15 deletions(-) diff --git a/ee/connectors/consumer.py b/ee/connectors/consumer.py index c8d61ff7b..1c3488642 100644 --- a/ee/connectors/consumer.py +++ b/ee/connectors/consumer.py @@ -1,5 +1,5 @@ import os -from kafka import KafkaConsumer +from confluent_kafka import Consumer from datetime import datetime from collections import defaultdict @@ -29,17 +29,21 @@ def main(): sessions_batch = [] codec = MessageCodec() - consumer = KafkaConsumer(security_protocol="SSL", - bootstrap_servers=[os.environ['KAFKA_SERVER_2'], - os.environ['KAFKA_SERVER_1']], - group_id=f"my_test3_connector_{DATABASE}", - auto_offset_reset="earliest", - enable_auto_commit=False - ) + consumer = Consumer({ + "security.protocol": "SSL", + "bootstrap.servers": ",".join([os.environ['KAFKA_SERVER_1'], + os.environ['KAFKA_SERVER_2']]), + "group.id": f"connector_{DATABASE}", + "auto.offset.reset": "earliest", + "enable.auto.commit": False + }) - consumer.subscribe(topics=["raw", "raw_ios"]) + consumer.subscribe(["raw", "raw_ios"]) print("Kafka consumer subscribed") - for msg in consumer: + while True: + msg.consumer.poll(1.0) + if msg is None: + continue messages = codec.decode_detailed(msg.value) session_id = codec.decode_key(msg.key) if messages is None: diff --git a/ee/connectors/deploy/requirements_bigquery.txt b/ee/connectors/deploy/requirements_bigquery.txt index d4d63c953..1474c9ef4 100644 --- a/ee/connectors/deploy/requirements_bigquery.txt +++ b/ee/connectors/deploy/requirements_bigquery.txt @@ -1,4 +1,4 @@ -kafka-python==2.0.2 +confluent-kafka psycopg2-binary==2.9.3 SQLAlchemy==1.4.43 google-cloud-bigquery diff --git a/ee/connectors/deploy/requirements_clickhouse.txt b/ee/connectors/deploy/requirements_clickhouse.txt index b21ea36c5..8853a865f 100644 --- a/ee/connectors/deploy/requirements_clickhouse.txt +++ b/ee/connectors/deploy/requirements_clickhouse.txt @@ -3,7 +3,7 @@ chardet==5.0.0 clickhouse-driver==0.2.4 clickhouse-sqlalchemy==0.2.2 idna==3.4 -kafka-python==2.0.2 +confluent-kafka pandas==1.5.1 pytz==2022.6 requests==2.28.1 diff --git a/ee/connectors/deploy/requirements_pg.txt b/ee/connectors/deploy/requirements_pg.txt index 09bf8b34d..8354e61a9 100644 --- a/ee/connectors/deploy/requirements_pg.txt +++ b/ee/connectors/deploy/requirements_pg.txt @@ -1,7 +1,7 @@ certifi==2022.09.24 chardet==5.0.0 idna==3.4 -kafka-python==2.0.2 +confluent-kafka pandas==1.5.1 psycopg2-binary==2.9.3 pytz==2022.6 diff --git a/ee/connectors/deploy/requirements_redshift.txt b/ee/connectors/deploy/requirements_redshift.txt index e4182cb92..7bd5e3f08 100644 --- a/ee/connectors/deploy/requirements_redshift.txt +++ b/ee/connectors/deploy/requirements_redshift.txt @@ -3,7 +3,7 @@ chardet==5.0.0 clickhouse-driver==0.2.4 clickhouse-sqlalchemy==0.2.2 idna==3.4 -kafka-python==2.0.2 +confluent-kafka psycopg2-binary==2.9.3 pytz==2022.6 requests==2.28.1 diff --git a/ee/connectors/deploy/requirements_snowflake.txt b/ee/connectors/deploy/requirements_snowflake.txt index 895326b32..5a3aaca99 100644 --- a/ee/connectors/deploy/requirements_snowflake.txt +++ b/ee/connectors/deploy/requirements_snowflake.txt @@ -1,5 +1,5 @@ pandas==1.5.1 -kafka-python==2.0.2 +confluent-kafka SQLAlchemy==1.4.43 snowflake-connector-python==2.8.2 snowflake-sqlalchemy==1.4.4