diff --git a/ee/quickwit/consumer.py b/ee/quickwit/consumer.py index 5cf3c7818..be83d00e9 100644 --- a/ee/quickwit/consumer.py +++ b/ee/quickwit/consumer.py @@ -1,34 +1,26 @@ +import asyncio +from asyncio import Queue + from decouple import config from confluent_kafka import Consumer from datetime import datetime -import os as _os -import queue import requests import json -from time import time, sleep +from time import time QUICKWIT_PORT = config('QUICKWIT_PORT', default=7280, cast=int) - -#decryption = config('encrypted', cast=bool) -decryption = False -MessageCodec = None max_retry=3 -Fetch, FetchEvent, PageEvent, GraphQ = None, None, None, None -if decryption: - from msgcodec.msgcodec import MessageCodec - from msgcodec.messages import Fetch, FetchEvent, PageEvent, GraphQL - print("Enabled decryption mode") -def _quickwit_ingest(index, data_list, retry=0): +async def _quickwit_ingest(index, data_list, retry=0): try: res = requests.post(f'http://localhost:{QUICKWIT_PORT}/api/v1/{index}/ingest', data=__jsonify_data(data_list, index)) except requests.exceptions.ConnectionError as e: retry += 1 assert retry <= max_retry, f'[ENDPOINT CONNECTION FAIL] Failed to connect to endpoint http://localhost:{QUICKWIT_PORT}/api/v1/{index}/ingest\n{e}\n' - sleep(5*retry) - print(f"[ENDPOINT ERROR] Failed to connect to endpoint http://localhost:{QUICKWIT_PORT}/api/v1/{index}/ingest, retrying in {5*retry} seconds..\n") - return _quickwit_ingest(index, data_list, retry=retry) + await asyncio.sleep(3*retry) + print(f"[ENDPOINT ERROR] Failed to connect to endpoint http://localhost:{QUICKWIT_PORT}/api/v1/{index}/ingest, retrying in {3*retry} seconds..\n") + return await _quickwit_ingest(index, data_list, retry=retry) return res def __jsonify_data(data_list, msg_type): @@ -70,121 +62,113 @@ def __jsonify_data(data_list, msg_type): return '\n'.join(res) def message_type(message): - if decryption: - if isinstance(message, FetchEvent) or isinstance(Fetch): - return 'fetchevent' - elif isinstance(message, PageEvent): - return 'pageevent' - elif isinstance(message, GraphQL): - return 'graphql' - else: - return 'default' + if 'loaded' in message.keys(): + return 'pageevent' + elif 'variables' in message.keys(): + return 'graphql' + elif 'status' in message.keys(): + return 'fetchevent' else: - if 'loaded' in message.keys(): - return 'pageevent' - elif 'variables' in message.keys(): - return 'graphql' - elif 'status' in message.keys(): - return 'fetchevent' - else: - return 'default' + return 'default' class KafkaFilter(): - def __init__(self): + def __init__(self, uid): + self.uid = uid kafka_sources = config('KAFKA_SERVER') topic = config('QUICKWIT_TOPIC') - fetchevent_maxsize = config('fetch_maxsize', default=100, cast=int) - graphql_maxsize = config('graphql_maxsize', default=100, cast=int) - pageevent_maxsize = config('pageevent_maxsize', default=100, cast=int) + self.fetchevent_maxsize = config('fetch_maxsize', default=100, cast=int) + self.graphql_maxsize = config('graphql_maxsize', default=100, cast=int) + self.pageevent_maxsize = config('pageevent_maxsize', default=100, cast=int) - if decryption: - self.codec = MessageCodec() - self.consumer = Consumer({ - "security.protocol": "SSL", - "bootstrap.servers": kafka_sources, - "group.id": config("group_id"), - "auto.offset.reset": "earliest", - "enable.auto.commit":False - }) - else: - self.consumer = Consumer({ - "security.protocol": "SSL", - "bootstrap.servers": kafka_sources, - "group.id": config("group_id"), - "auto.offset.reset": "earliest", - #value_deserializer=lambda m: json.loads(m.decode('utf-8')), - "enable.auto.commit": False - }) + self.consumer = Consumer({ + "security.protocol": "SSL", + "bootstrap.servers": kafka_sources, + "group.id": config("group_id"), + "auto.offset.reset": "earliest", + #value_deserializer=lambda m: json.loads(m.decode('utf-8')), + "enable.auto.commit": False + }) self.consumer.subscribe([topic]) - self.queues = {'fetchevent': queue.Queue(fetchevent_maxsize), - 'graphql': queue.Queue(graphql_maxsize), - 'pageevent': queue.Queue(pageevent_maxsize) + self.queues = {'fetchevent': Queue(self.fetchevent_maxsize), + 'graphql': Queue(self.graphql_maxsize), + 'pageevent': Queue(self.pageevent_maxsize) } - def add_to_queue(self, message): + async def add_to_queue(self, message): + # TODO: Fix this method associated_queue = message_type(message) if associated_queue == 'default': return - if self.queues[associated_queue].full(): - self.flush_to_quickwit() - self.queues[associated_queue].put(message) + await self.queues[associated_queue].put(message) - def flush_to_quickwit(self): + async def flush_to_quickwit(self): + # TODO: Fix this method + one_queue_full = any([q.full() for q in self.queues.values()]) + if not one_queue_full: + return for queue_name, _queue in self.queues.items(): _list = list() unix_timestamp = int(datetime.now().timestamp()) while not _queue.empty(): - msg = _queue.get() - if decryption: - value = msg.__dict__ - else: - value = dict(msg) + msg = await _queue.get() + value = dict(msg) value['insertion_timestamp'] = unix_timestamp if queue_name == 'fetchevent' and 'message_id' not in value.keys(): value['message_id'] = 0 _list.append(value) if len(_list) > 0: - _quickwit_ingest(queue_name, _list) - self.consumer.commit() + await _quickwit_ingest(queue_name, _list) + # self.consumer.commit() ## TODO: Find when to run commit - def run(self): + + async def process_messages(self): _tmp_previous = None repeated = False while True: msg = self.consumer.poll(1.0) if msg is None: - continue - if msg.error(): - print(f'[Consumer error] {msg.error()}') + await asyncio.sleep(0.1) continue value = json.loads(msg.value().decode('utf-8')) - if decryption: - messages = self.codec.decode_detailed(value) - else: - messages = [value] - + messages = [value] + if _tmp_previous is None: _tmp_previous = messages - if type(messages)==list: + if isinstance(messages, list): for message in messages: - self.add_to_queue(message) + await self.add_to_queue(message) else: - self.add_to_queue(messages) + await self.add_to_queue(messages) + elif _tmp_previous != messages: - if type(messages)==list: + if isinstance(messages, list): for message in messages: - self.add_to_queue(message) + await self.add_to_queue(message) else: - self.add_to_queue(messages) + await self.add_to_queue(messages) _tmp_previous = messages repeated = False elif not repeated: repeated = True + async def upload_messages(self): + while True: + await self.flush_to_quickwit() + await asyncio.sleep(1) + + async def run(self): + loop = asyncio.get_event_loop() + loop.create_task(self.process_messages()) + loop.create_task(self.upload_messages()) + return + + def __repr__(self): + return f"Class object KafkaConsumer id #{self.uid}" + if __name__ == '__main__': - layer = KafkaFilter() - layer.run() + layer = KafkaFilter(uid=0) + asyncio.run(layer.run()) diff --git a/ee/quickwit/s3-config-listen.yaml b/ee/quickwit/s3-config-listen.yaml index bd27ec951..183e473f9 100644 --- a/ee/quickwit/s3-config-listen.yaml +++ b/ee/quickwit/s3-config-listen.yaml @@ -1,7 +1,7 @@ ## In order to save data into S3 # metastore also accepts s3://{bucket/path}#pooling_interval={seconds}s version: 0.6 -metastore_uri: s3://openreplay-quickwit/quickwit-indexes -default_index_root_uri: s3://openreplay-quickwit/quickwit-indexes +metastore_uri: s3://{{AWS_BUCKET}}/quickwit-indexes +default_index_root_uri: s3://{{AWS_BUCKET}}/quickwit-indexes listen_address: 0.0.0.0 rest_listen_port: {{QUICKWIT_PORT}} diff --git a/ee/quickwit/s3-config.yaml b/ee/quickwit/s3-config.yaml index 466ec56b3..554c8495c 100644 --- a/ee/quickwit/s3-config.yaml +++ b/ee/quickwit/s3-config.yaml @@ -1,6 +1,6 @@ ## In order to save data into S3 # metastore also accepts s3://{bucket/path}#pooling_interval={seconds}s version: 0.6 -metastore_uri: s3://openreplay-quickwit/quickwit-indexes -default_index_root_uri: s3://openreplay-quickwit/quickwit-indexes +metastore_uri: s3://{{AWS_BUCKET}}/quickwit-indexes +default_index_root_uri: s3://{{AWS_BUCKET}}/quickwit-indexes rest_listen_port: {{QUICKWIT_PORT}}