openreplay/ee/quickwit/kafka_sample.py
mauricio garcia suarez 1da78afa01 Add 'ee/quickwit/' from commit '2ad4f4ea124232274bc09ab1f1faf2581053e619'
git-subtree-dir: ee/quickwit
git-subtree-mainline: f1e4d60ea8
git-subtree-split: 2ad4f4ea12
2022-08-01 11:27:57 +02:00

132 lines
4.7 KiB
Python

import os
from time import sleep
from kafka import KafkaConsumer, KafkaProducer
from datetime import datetime
from collections import defaultdict
from msgcodec.codec import MessageCodec
from msgcodec.messages import Fetch, FetchEvent, PageEvent, GraphQL
import json
import getopt, sys
n = 0
def transform_fetch(data):
global n
n += 1
return {
'method': data.method, 'url': data.url, 'request': data.url, 'response': data.request,
'status': data.status, 'timestamp': data.timestamp, 'duration': data.duration
}
def transform_graphql(data):
global n
n += 1
return {
'operation_kind': data.operation_kind, 'operation_name': data.operation_name,
'variables': data.variables, 'response': data.response
}
def transform_pageevent(data):
global n
n += 1
return {'massage_id': data.message_id, 'timestamp': data.timestamp, 'url': data.timestamp,
'referrer': data.referrer, 'loaded': data.loaded, 'request_start': data.request_start,
'response_start': data.response_start, 'response_end': data.response_end,
'dom_content_loaded_event_start': data.dom_content_loaded_event_start,
'dom_content_loaded_event_end': data.dom_content_loaded_event_end,
'load_event_start': data.load_event_start, 'load_event_end': data.load_event_end,
'first_paint': data.first_paint, 'first_contentful_paint': data.first_contentful_paint,
'speed_index': data.speed_index, 'visually_complete': data.visually_complete,
'time_to_interactive': data.time_to_interactive
}
def create_producer():
producer = KafkaProducer(#security_protocol="SSL",
bootstrap_servers=os.environ['KAFKA_SERVER_2'],
# os.environ['KAFKA_SERVER_1']],
#ssl_cafile="./ca.pem",
#ssl_certfile="./service.cert",
#ssl_keyfile="./service.key",
value_serializer=lambda v: json.dumps(v).encode('ascii')
)
return producer
def create_consumer():
consumer = KafkaConsumer(#security_protocol="SSL",
bootstrap_servers=os.environ['KAFKA_SERVER_2'],
# os.environ['KAFKA_SERVER_1']],
group_id=f"quickwit_connector2",
auto_offset_reset="earliest",
enable_auto_commit=False
)
return consumer
def consumer_producer_end():
global n
codec = MessageCodec()
consumer = create_consumer()
producer = create_producer()
consumer.subscribe(topics=["raw", "raw_ios"])
print("Kafka consumer subscribed")
escape = 0
for msg in consumer:
messages = codec.decode_detailed(msg.value)
session_id = codec.decode_key(msg.key)
if messages is None:
print('-')
for message in messages:
send = False
if isinstance(message, Fetch) or isinstance(message, FetchEvent):
producer.send('quickwit-kafka', value=transform_fetch(message))
print(f'added message {n} type Fetch')
sleep(5)
if isinstance(message, GraphQL):
producer.send('quickwit-kafka', value=transform_graphql(message))
print(f'added message {n} type GraphQL')
sleep(5)
if isinstance(message, PageEvent):
producer.send('quickwit-kafka', value=transform_pageevent(message))
print(f'added message {n} type PageEvent')
sleep(5)
def consumer_end():
consumer = create_consumer()
consumer.subscribe(topics=['quickwit-kafka'])
for msg in consumer:
print(msg)
def handle_args():
arguments = len(sys.argv)-1
argument_list = sys.argv[1:]
pos = 1
short_options = 'hm:'
long_options = ['help', 'method=']
try:
arguments, values = getopt.getopt(argument_list, short_options, long_options)
except getopt.error as err:
print(str(err))
sys.exit(2)
for arg, argv in arguments:
if arg in ('-h', '--help'):
print(""" Methods
--method, -m available methods: consumer, producer
--help, -h show help
""")
elif arg in ('-m', '--method'):
if argv == 'consumer':
consumer_end()
elif argv == 'producer':
consumer_producer_end()
else:
print('Method not found. Available methods: consumer, producer')
if __name__ == '__main__':
handle_args()