Changed python-kafka to confluent-kafka
This commit is contained in:
parent
ed5a8fbd69
commit
e85933e96d
6 changed files with 19 additions and 15 deletions
|
|
@ -1,5 +1,5 @@
|
|||
import os
|
||||
from kafka import KafkaConsumer
|
||||
from confluent_kafka import Consumer
|
||||
from datetime import datetime
|
||||
from collections import defaultdict
|
||||
|
||||
|
|
@ -29,17 +29,21 @@ def main():
|
|||
sessions_batch = []
|
||||
|
||||
codec = MessageCodec()
|
||||
consumer = KafkaConsumer(security_protocol="SSL",
|
||||
bootstrap_servers=[os.environ['KAFKA_SERVER_2'],
|
||||
os.environ['KAFKA_SERVER_1']],
|
||||
group_id=f"my_test3_connector_{DATABASE}",
|
||||
auto_offset_reset="earliest",
|
||||
enable_auto_commit=False
|
||||
)
|
||||
consumer = Consumer({
|
||||
"security.protocol": "SSL",
|
||||
"bootstrap.servers": ",".join([os.environ['KAFKA_SERVER_1'],
|
||||
os.environ['KAFKA_SERVER_2']]),
|
||||
"group.id": f"connector_{DATABASE}",
|
||||
"auto.offset.reset": "earliest",
|
||||
"enable.auto.commit": False
|
||||
})
|
||||
|
||||
consumer.subscribe(topics=["raw", "raw_ios"])
|
||||
consumer.subscribe(["raw", "raw_ios"])
|
||||
print("Kafka consumer subscribed")
|
||||
for msg in consumer:
|
||||
while True:
|
||||
msg.consumer.poll(1.0)
|
||||
if msg is None:
|
||||
continue
|
||||
messages = codec.decode_detailed(msg.value)
|
||||
session_id = codec.decode_key(msg.key)
|
||||
if messages is None:
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
kafka-python==2.0.2
|
||||
confluent-kafka
|
||||
psycopg2-binary==2.9.3
|
||||
SQLAlchemy==1.4.43
|
||||
google-cloud-bigquery
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ chardet==5.0.0
|
|||
clickhouse-driver==0.2.4
|
||||
clickhouse-sqlalchemy==0.2.2
|
||||
idna==3.4
|
||||
kafka-python==2.0.2
|
||||
confluent-kafka
|
||||
pandas==1.5.1
|
||||
pytz==2022.6
|
||||
requests==2.28.1
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
certifi==2022.09.24
|
||||
chardet==5.0.0
|
||||
idna==3.4
|
||||
kafka-python==2.0.2
|
||||
confluent-kafka
|
||||
pandas==1.5.1
|
||||
psycopg2-binary==2.9.3
|
||||
pytz==2022.6
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ chardet==5.0.0
|
|||
clickhouse-driver==0.2.4
|
||||
clickhouse-sqlalchemy==0.2.2
|
||||
idna==3.4
|
||||
kafka-python==2.0.2
|
||||
confluent-kafka
|
||||
psycopg2-binary==2.9.3
|
||||
pytz==2022.6
|
||||
requests==2.28.1
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
pandas==1.5.1
|
||||
kafka-python==2.0.2
|
||||
confluent-kafka
|
||||
SQLAlchemy==1.4.43
|
||||
snowflake-connector-python==2.8.2
|
||||
snowflake-sqlalchemy==1.4.4
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue