diff --git a/index-config.yaml b/index-config.yaml index 43f8d3324..8e68eb73d 100644 --- a/index-config.yaml +++ b/index-config.yaml @@ -8,14 +8,34 @@ index_id: quickwit-kafka doc_mapping: field_mappings: - - name: title + - name: method type: text tokenizer: default record: position - - name: body + - name: url + type: text + tokenizer: default + record: position + - name: request + type: text + tokenizer: default + record: position + - name: response + type: text + tokenizer: default + record: position + - name: status + type: text + tokenizer: default + record: position + - name: timestamp + type: text + tokenizer: default + record: position + - name: duration type: text tokenizer: default record: position search_settings: - default_search_fields: [title, body] + default_search_fields: [url, request, response] diff --git a/kafka_sample.py b/kafka_sample.py index 9d58ae54d..0ebce304c 100644 --- a/kafka_sample.py +++ b/kafka_sample.py @@ -11,11 +11,14 @@ import json import getopt, sys n = 0 - +fetch_keys = ['method', 'url', 'request', 'response', 'status', 'timestamp', 'duration'] def transform(data): global n n += 1 - return {'title': f'message {n}', 'body': data} + return { + 'method': data.method, 'url': data.url, 'request': data.url, 'response': data.request, + 'status': data.status, 'timestamp': data.timestamp, 'duration': data.duration + } def create_producer(): producer = KafkaProducer(#security_protocol="SSL", @@ -32,7 +35,7 @@ def create_consumer(): consumer = KafkaConsumer(#security_protocol="SSL", bootstrap_servers=os.environ['KAFKA_SERVER_2'], # os.environ['KAFKA_SERVER_1']], - group_id=f"my_test52_connector", + group_id=f"quickwit_connector", auto_offset_reset="earliest", enable_auto_commit=False ) @@ -41,11 +44,6 @@ def create_consumer(): def consumer_producer_end(): global n - batch_size = 4000 - sessions_batch_size = 400 - batch = [] - sessions = defaultdict(lambda: None) - sessions_batch = [] codec = MessageCodec() consumer = create_consumer() @@ -60,27 +58,13 @@ def consumer_producer_end(): if messages is None: print('-') for message in messages: - send = True - if isinstance(message, Fetch): - data = message.response - elif isinstance(message, FetchEvent): - data = message.response - elif isinstance(message, PageEvent): - print(message.url) - elif isinstance(message, SetCSSData): - data = message.data - elif isinstance(message, SetStyleData): - data = message.data - else: - send = False - continue - if send: - producer.send('quickwit-kafka', value=transform(data)) + send = False + if isinstance(message, Fetch) or isinstance(message, FetchEvent): + producer.send('quickwit-kafka', value=transform(message)) print(f'added message {n}') sleep(5) - def consumer_end(): consumer = create_consumer() consumer.subscribe(topics=['quickwit-kafka'])