git-subtree-dir: ee/quickwit git-subtree-mainline:f1e4d60ea8git-subtree-split:2ad4f4ea12
132 lines
4.7 KiB
Python
132 lines
4.7 KiB
Python
import os
|
|
from time import sleep
|
|
from kafka import KafkaConsumer, KafkaProducer
|
|
from datetime import datetime
|
|
from collections import defaultdict
|
|
|
|
from msgcodec.codec import MessageCodec
|
|
from msgcodec.messages import Fetch, FetchEvent, PageEvent, GraphQL
|
|
import json
|
|
|
|
import getopt, sys
|
|
|
|
n = 0
|
|
def transform_fetch(data):
|
|
global n
|
|
n += 1
|
|
return {
|
|
'method': data.method, 'url': data.url, 'request': data.url, 'response': data.request,
|
|
'status': data.status, 'timestamp': data.timestamp, 'duration': data.duration
|
|
}
|
|
|
|
def transform_graphql(data):
|
|
global n
|
|
n += 1
|
|
return {
|
|
'operation_kind': data.operation_kind, 'operation_name': data.operation_name,
|
|
'variables': data.variables, 'response': data.response
|
|
}
|
|
|
|
def transform_pageevent(data):
|
|
global n
|
|
n += 1
|
|
return {'massage_id': data.message_id, 'timestamp': data.timestamp, 'url': data.timestamp,
|
|
'referrer': data.referrer, 'loaded': data.loaded, 'request_start': data.request_start,
|
|
'response_start': data.response_start, 'response_end': data.response_end,
|
|
'dom_content_loaded_event_start': data.dom_content_loaded_event_start,
|
|
'dom_content_loaded_event_end': data.dom_content_loaded_event_end,
|
|
'load_event_start': data.load_event_start, 'load_event_end': data.load_event_end,
|
|
'first_paint': data.first_paint, 'first_contentful_paint': data.first_contentful_paint,
|
|
'speed_index': data.speed_index, 'visually_complete': data.visually_complete,
|
|
'time_to_interactive': data.time_to_interactive
|
|
}
|
|
|
|
def create_producer():
|
|
producer = KafkaProducer(#security_protocol="SSL",
|
|
bootstrap_servers=os.environ['KAFKA_SERVER_2'],
|
|
# os.environ['KAFKA_SERVER_1']],
|
|
#ssl_cafile="./ca.pem",
|
|
#ssl_certfile="./service.cert",
|
|
#ssl_keyfile="./service.key",
|
|
value_serializer=lambda v: json.dumps(v).encode('ascii')
|
|
)
|
|
return producer
|
|
|
|
def create_consumer():
|
|
consumer = KafkaConsumer(#security_protocol="SSL",
|
|
bootstrap_servers=os.environ['KAFKA_SERVER_2'],
|
|
# os.environ['KAFKA_SERVER_1']],
|
|
group_id=f"quickwit_connector2",
|
|
auto_offset_reset="earliest",
|
|
enable_auto_commit=False
|
|
)
|
|
return consumer
|
|
|
|
|
|
def consumer_producer_end():
|
|
global n
|
|
|
|
codec = MessageCodec()
|
|
consumer = create_consumer()
|
|
producer = create_producer()
|
|
|
|
consumer.subscribe(topics=["raw", "raw_ios"])
|
|
print("Kafka consumer subscribed")
|
|
escape = 0
|
|
for msg in consumer:
|
|
messages = codec.decode_detailed(msg.value)
|
|
session_id = codec.decode_key(msg.key)
|
|
if messages is None:
|
|
print('-')
|
|
for message in messages:
|
|
send = False
|
|
if isinstance(message, Fetch) or isinstance(message, FetchEvent):
|
|
producer.send('quickwit-kafka', value=transform_fetch(message))
|
|
print(f'added message {n} type Fetch')
|
|
sleep(5)
|
|
if isinstance(message, GraphQL):
|
|
producer.send('quickwit-kafka', value=transform_graphql(message))
|
|
print(f'added message {n} type GraphQL')
|
|
sleep(5)
|
|
if isinstance(message, PageEvent):
|
|
producer.send('quickwit-kafka', value=transform_pageevent(message))
|
|
print(f'added message {n} type PageEvent')
|
|
sleep(5)
|
|
|
|
|
|
def consumer_end():
|
|
consumer = create_consumer()
|
|
consumer.subscribe(topics=['quickwit-kafka'])
|
|
for msg in consumer:
|
|
print(msg)
|
|
|
|
|
|
def handle_args():
|
|
arguments = len(sys.argv)-1
|
|
argument_list = sys.argv[1:]
|
|
pos = 1
|
|
short_options = 'hm:'
|
|
long_options = ['help', 'method=']
|
|
try:
|
|
arguments, values = getopt.getopt(argument_list, short_options, long_options)
|
|
except getopt.error as err:
|
|
print(str(err))
|
|
sys.exit(2)
|
|
|
|
for arg, argv in arguments:
|
|
if arg in ('-h', '--help'):
|
|
print(""" Methods
|
|
--method, -m available methods: consumer, producer
|
|
--help, -h show help
|
|
""")
|
|
elif arg in ('-m', '--method'):
|
|
if argv == 'consumer':
|
|
consumer_end()
|
|
elif argv == 'producer':
|
|
consumer_producer_end()
|
|
else:
|
|
print('Method not found. Available methods: consumer, producer')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
handle_args()
|