consumer.py now working with new message types and BigQuery

This commit is contained in:
mauricio garcia suarez 2022-07-13 12:44:51 +02:00
parent c271a0b64e
commit 2e8f582bf7

View file

@ -30,59 +30,61 @@ def main():
codec = MessageCodec()
consumer = KafkaConsumer(security_protocol="SSL",
bootstrap_servers=[os.environ['KAFKA_SERVER_1'],
os.environ['KAFKA_SERVER_2']],
group_id=f"connector_{DATABASE}",
bootstrap_servers=os.environ['KAFKA_SERVER_2'],
os.environ['KAFKA_SERVER_1']],
group_id=f"my_test3_connector_{DATABASE}",
auto_offset_reset="earliest",
enable_auto_commit=False)
enable_auto_commit=False
)
consumer.subscribe(topics=["events", "messages"])
consumer.subscribe(topics=["raw", "raw_ios"])
print("Kafka consumer subscribed")
for msg in consumer:
message = codec.decode(msg.value)
if message is None:
messages = codec.decode_detailed(msg.value)
session_id = codec.decode_key(msg.key)
if messages is None:
print('-')
continue
for message in messages:
if LEVEL == 'detailed':
n = handle_message(message)
elif LEVEL == 'normal':
n = handle_normal_message(message)
if LEVEL == 'detailed':
n = handle_message(message)
elif LEVEL == 'normal':
n = handle_normal_message(message)
session_id = codec.decode_key(msg.key)
sessions[session_id] = handle_session(sessions[session_id], message)
if sessions[session_id]:
sessions[session_id].sessionid = session_id
# put in a batch for insertion if received a SessionEnd
if isinstance(message, SessionEnd):
#session_id = codec.decode_key(msg.key)
sessions[session_id] = handle_session(sessions[session_id], message)
if sessions[session_id]:
sessions_batch.append(sessions[session_id])
sessions[session_id].sessionid = session_id
# try to insert sessions
if len(sessions_batch) >= sessions_batch_size:
attempt_session_insert(sessions_batch)
for s in sessions_batch:
try:
del sessions[s.sessionid]
except KeyError as e:
print(repr(e))
sessions_batch = []
# put in a batch for insertion if received a SessionEnd
if isinstance(message, SessionEnd):
if sessions[session_id]:
sessions_batch.append(sessions[session_id])
if n:
n.sessionid = session_id
n.received_at = int(datetime.now().timestamp() * 1000)
n.batch_order_number = len(batch)
batch.append(n)
else:
continue
# try to insert sessions
if len(sessions_batch) >= sessions_batch_size:
attempt_session_insert(sessions_batch)
for s in sessions_batch:
try:
del sessions[s.sessionid]
except KeyError as e:
print(repr(e))
sessions_batch = []
# insert a batch of events
if len(batch) >= batch_size:
attempt_batch_insert(batch)
batch = []
consumer.commit()
print("sessions in cache:", len(sessions))
if n:
n.sessionid = session_id
n.received_at = int(datetime.now().timestamp() * 1000)
n.batch_order_number = len(batch)
batch.append(n)
else:
continue
# insert a batch of events
if len(batch) >= batch_size:
attempt_batch_insert(batch)
batch = []
consumer.commit()
print("sessions in cache:", len(sessions))
def attempt_session_insert(sess_batch):