From 2e8f582bf77b54fe99abf4751b5e168246648da3 Mon Sep 17 00:00:00 2001 From: mauricio garcia suarez Date: Wed, 13 Jul 2022 12:44:51 +0200 Subject: [PATCH] consumer.py now working with new message types and BigQuery --- ee/connectors/consumer.py | 86 ++++++++++++++++++++------------------- 1 file changed, 44 insertions(+), 42 deletions(-) diff --git a/ee/connectors/consumer.py b/ee/connectors/consumer.py index dfa856501..f65633f7d 100644 --- a/ee/connectors/consumer.py +++ b/ee/connectors/consumer.py @@ -30,59 +30,61 @@ def main(): codec = MessageCodec() consumer = KafkaConsumer(security_protocol="SSL", - bootstrap_servers=[os.environ['KAFKA_SERVER_1'], - os.environ['KAFKA_SERVER_2']], - group_id=f"connector_{DATABASE}", + bootstrap_servers=os.environ['KAFKA_SERVER_2'], + os.environ['KAFKA_SERVER_1']], + group_id=f"my_test3_connector_{DATABASE}", auto_offset_reset="earliest", - enable_auto_commit=False) + enable_auto_commit=False + ) - consumer.subscribe(topics=["events", "messages"]) + consumer.subscribe(topics=["raw", "raw_ios"]) print("Kafka consumer subscribed") for msg in consumer: - message = codec.decode(msg.value) - if message is None: + messages = codec.decode_detailed(msg.value) + session_id = codec.decode_key(msg.key) + if messages is None: print('-') continue + for message in messages: + if LEVEL == 'detailed': + n = handle_message(message) + elif LEVEL == 'normal': + n = handle_normal_message(message) - if LEVEL == 'detailed': - n = handle_message(message) - elif LEVEL == 'normal': - n = handle_normal_message(message) - - session_id = codec.decode_key(msg.key) - sessions[session_id] = handle_session(sessions[session_id], message) - if sessions[session_id]: - sessions[session_id].sessionid = session_id - - # put in a batch for insertion if received a SessionEnd - if isinstance(message, SessionEnd): + #session_id = codec.decode_key(msg.key) + sessions[session_id] = handle_session(sessions[session_id], message) if sessions[session_id]: - sessions_batch.append(sessions[session_id]) + sessions[session_id].sessionid = session_id - # try to insert sessions - if len(sessions_batch) >= sessions_batch_size: - attempt_session_insert(sessions_batch) - for s in sessions_batch: - try: - del sessions[s.sessionid] - except KeyError as e: - print(repr(e)) - sessions_batch = [] + # put in a batch for insertion if received a SessionEnd + if isinstance(message, SessionEnd): + if sessions[session_id]: + sessions_batch.append(sessions[session_id]) - if n: - n.sessionid = session_id - n.received_at = int(datetime.now().timestamp() * 1000) - n.batch_order_number = len(batch) - batch.append(n) - else: - continue + # try to insert sessions + if len(sessions_batch) >= sessions_batch_size: + attempt_session_insert(sessions_batch) + for s in sessions_batch: + try: + del sessions[s.sessionid] + except KeyError as e: + print(repr(e)) + sessions_batch = [] - # insert a batch of events - if len(batch) >= batch_size: - attempt_batch_insert(batch) - batch = [] - consumer.commit() - print("sessions in cache:", len(sessions)) + if n: + n.sessionid = session_id + n.received_at = int(datetime.now().timestamp() * 1000) + n.batch_order_number = len(batch) + batch.append(n) + else: + continue + + # insert a batch of events + if len(batch) >= batch_size: + attempt_batch_insert(batch) + batch = [] + consumer.commit() + print("sessions in cache:", len(sessions)) def attempt_session_insert(sess_batch):