Compare commits

...
Sign in to create a new pull request.

2 commits

Author SHA1 Message Date
MauricioGarciaS
56af3f5d52 Fixed Docker run issues 2023-11-30 13:39:33 +01:00
MauricioGarciaS
fcf1247978 Changing methods to async and upload method to quickwit 2023-11-30 13:39:33 +01:00
7 changed files with 106 additions and 95 deletions

View file

@ -1,34 +1,26 @@
import asyncio
from asyncio import Queue
from decouple import config from decouple import config
from confluent_kafka import Consumer from confluent_kafka import Consumer
from datetime import datetime from datetime import datetime
import os as _os
import queue
import requests import requests
import json import json
from time import time, sleep from time import time
QUICKWIT_PORT = config('QUICKWIT_PORT', default=7280, cast=int) QUICKWIT_PORT = config('QUICKWIT_PORT', default=7280, cast=int)
#decryption = config('encrypted', cast=bool)
decryption = False
MessageCodec = None
max_retry=3 max_retry=3
Fetch, FetchEvent, PageEvent, GraphQ = None, None, None, None
if decryption:
from msgcodec.msgcodec import MessageCodec
from msgcodec.messages import Fetch, FetchEvent, PageEvent, GraphQL
print("Enabled decryption mode")
def _quickwit_ingest(index, data_list, retry=0): async def _quickwit_ingest(index, data_list, retry=0):
try: try:
res = requests.post(f'http://localhost:{QUICKWIT_PORT}/api/v1/{index}/ingest', data=__jsonify_data(data_list, index)) res = requests.post(f'http://localhost:{QUICKWIT_PORT}/api/v1/{index}/ingest', data=__jsonify_data(data_list, index))
except requests.exceptions.ConnectionError as e: except requests.exceptions.ConnectionError as e:
retry += 1 retry += 1
assert retry <= max_retry, f'[ENDPOINT CONNECTION FAIL] Failed to connect to endpoint http://localhost:{QUICKWIT_PORT}/api/v1/{index}/ingest\n{e}\n' assert retry <= max_retry, f'[ENDPOINT CONNECTION FAIL] Failed to connect to endpoint http://localhost:{QUICKWIT_PORT}/api/v1/{index}/ingest\n{e}\n'
sleep(5*retry) await asyncio.sleep(3*retry)
print(f"[ENDPOINT ERROR] Failed to connect to endpoint http://localhost:{QUICKWIT_PORT}/api/v1/{index}/ingest, retrying in {5*retry} seconds..\n") print(f"[ENDPOINT ERROR] Failed to connect to endpoint http://localhost:{QUICKWIT_PORT}/api/v1/{index}/ingest, retrying in {3*retry} seconds..\n")
return _quickwit_ingest(index, data_list, retry=retry) return await _quickwit_ingest(index, data_list, retry=retry)
return res return res
def __jsonify_data(data_list, msg_type): def __jsonify_data(data_list, msg_type):
@ -70,121 +62,113 @@ def __jsonify_data(data_list, msg_type):
return '\n'.join(res) return '\n'.join(res)
def message_type(message): def message_type(message):
if decryption: if 'loaded' in message.keys():
if isinstance(message, FetchEvent) or isinstance(Fetch): return 'pageevent'
return 'fetchevent' elif 'variables' in message.keys():
elif isinstance(message, PageEvent): return 'graphql'
return 'pageevent' elif 'status' in message.keys():
elif isinstance(message, GraphQL): return 'fetchevent'
return 'graphql'
else:
return 'default'
else: else:
if 'loaded' in message.keys(): return 'default'
return 'pageevent'
elif 'variables' in message.keys():
return 'graphql'
elif 'status' in message.keys():
return 'fetchevent'
else:
return 'default'
class KafkaFilter(): class KafkaFilter():
def __init__(self): def __init__(self, uid):
self.uid = uid
kafka_sources = config('KAFKA_SERVER') kafka_sources = config('KAFKA_SERVER')
topic = config('QUICKWIT_TOPIC') topic = config('QUICKWIT_TOPIC')
fetchevent_maxsize = config('fetch_maxsize', default=100, cast=int) self.fetchevent_maxsize = config('fetch_maxsize', default=100, cast=int)
graphql_maxsize = config('graphql_maxsize', default=100, cast=int) self.graphql_maxsize = config('graphql_maxsize', default=100, cast=int)
pageevent_maxsize = config('pageevent_maxsize', default=100, cast=int) self.pageevent_maxsize = config('pageevent_maxsize', default=100, cast=int)
if decryption: self.consumer = Consumer({
self.codec = MessageCodec() "security.protocol": "SSL",
self.consumer = Consumer({ "bootstrap.servers": kafka_sources,
"security.protocol": "SSL", "group.id": config("group_id"),
"bootstrap.servers": kafka_sources, "auto.offset.reset": "earliest",
"group.id": config("group_id"), #value_deserializer=lambda m: json.loads(m.decode('utf-8')),
"auto.offset.reset": "earliest", "enable.auto.commit": False
"enable.auto.commit":False })
})
else:
self.consumer = Consumer({
"security.protocol": "SSL",
"bootstrap.servers": kafka_sources,
"group.id": config("group_id"),
"auto.offset.reset": "earliest",
#value_deserializer=lambda m: json.loads(m.decode('utf-8')),
"enable.auto.commit": False
})
self.consumer.subscribe([topic]) self.consumer.subscribe([topic])
self.queues = {'fetchevent': queue.Queue(fetchevent_maxsize), self.queues = {'Fetchevent': Queue(self.fetchevent_maxsize),
'graphql': queue.Queue(graphql_maxsize), 'Graphql': Queue(self.graphql_maxsize),
'pageevent': queue.Queue(pageevent_maxsize) 'Pageevent': Queue(self.pageevent_maxsize)
} }
def add_to_queue(self, message): async def add_to_queue(self, message):
# TODO: Fix this method
associated_queue = message_type(message) associated_queue = message_type(message)
if associated_queue == 'default': if associated_queue == 'default':
return return
if self.queues[associated_queue].full(): await self.queues[associated_queue].put(message)
self.flush_to_quickwit()
self.queues[associated_queue].put(message)
def flush_to_quickwit(self): async def flush_to_quickwit(self):
# TODO: Fix this method
one_queue_full = any([q.full() for q in self.queues.values()])
if not one_queue_full:
return
for queue_name, _queue in self.queues.items(): for queue_name, _queue in self.queues.items():
_list = list() _list = list()
unix_timestamp = int(datetime.now().timestamp()) unix_timestamp = int(datetime.now().timestamp())
while not _queue.empty(): while not _queue.empty():
msg = _queue.get() msg = await _queue.get()
if decryption: value = dict(msg)
value = msg.__dict__
else:
value = dict(msg)
value['insertion_timestamp'] = unix_timestamp value['insertion_timestamp'] = unix_timestamp
if queue_name == 'fetchevent' and 'message_id' not in value.keys(): if queue_name == 'fetchevent' and 'message_id' not in value.keys():
value['message_id'] = 0 value['message_id'] = 0
_list.append(value) _list.append(value)
if len(_list) > 0: if len(_list) > 0:
_quickwit_ingest(queue_name, _list) await _quickwit_ingest(queue_name, _list)
self.consumer.commit() # self.consumer.commit() ## TODO: Find when to run commit
def run(self):
async def process_messages(self):
_tmp_previous = None _tmp_previous = None
repeated = False repeated = False
while True: while True:
msg = self.consumer.poll(1.0) msg = self.consumer.poll(1.0)
if msg is None: if msg is None:
continue await asyncio.sleep(0.1)
if msg.error():
print(f'[Consumer error] {msg.error()}')
continue continue
value = json.loads(msg.value().decode('utf-8')) value = json.loads(msg.value().decode('utf-8'))
if decryption: messages = [value]
messages = self.codec.decode_detailed(value)
else:
messages = [value]
if _tmp_previous is None: if _tmp_previous is None:
_tmp_previous = messages _tmp_previous = messages
if type(messages)==list: if isinstance(messages, list):
for message in messages: for message in messages:
self.add_to_queue(message) await self.add_to_queue(message)
else: else:
self.add_to_queue(messages) await self.add_to_queue(messages)
elif _tmp_previous != messages: elif _tmp_previous != messages:
if type(messages)==list: if isinstance(messages, list):
for message in messages: for message in messages:
self.add_to_queue(message) await self.add_to_queue(message)
else: else:
self.add_to_queue(messages) await self.add_to_queue(messages)
_tmp_previous = messages _tmp_previous = messages
repeated = False repeated = False
elif not repeated: elif not repeated:
repeated = True repeated = True
async def upload_messages(self):
while True:
await self.flush_to_quickwit()
await asyncio.sleep(1)
async def run(self):
loop = asyncio.get_event_loop()
loop.create_task(self.process_messages())
loop.create_task(self.upload_messages())
return
def __repr__(self):
return f"Class object KafkaConsumer id #{self.uid}"
if __name__ == '__main__': if __name__ == '__main__':
layer = KafkaFilter() layer = KafkaFilter(uid=0)
layer.run() asyncio.run(layer.run())

View file

@ -9,4 +9,13 @@ find /quickwit/ -type f -name "*.yaml" -exec sed -i "s/{{QUICKWIT_TOPIC}}/${QUIC
find /quickwit/ -type f -name "*.yaml" -exec sed -i "s/{{QUICKWIT_PORT}}/${QUICKWIT_PORT}/g" {} \; find /quickwit/ -type f -name "*.yaml" -exec sed -i "s/{{QUICKWIT_PORT}}/${QUICKWIT_PORT}/g" {} \;
find /quickwit/ -type f -name "*.yaml" -exec sed -i "s#{{data_dir_path}}#${data_dir_path}#g" {} \; find /quickwit/ -type f -name "*.yaml" -exec sed -i "s#{{data_dir_path}}#${data_dir_path}#g" {} \;
./quickwit_start_task.sh & ./setup_indexes_and_worker.sh && fg ./quickwit_start_task.sh & pid1=$!
sleep 120
echo "Creating indexes.."
quickwit index create --index-config index-config-fetch.yaml
quickwit index create --index-config index-config-graphql.yaml
quickwit index create --index-config index-config-pageevent.yaml
echo "Running kafka reader.."
python3 -u consumer.py & pid2=$!
wait $pid1 $pid2

View file

@ -4,7 +4,7 @@
version: 0.6 version: 0.6
index_id: "fetchevent" index_id: "Fetchevent"
index_uri: "s3://openreplay-quickwit/quickwit-indexes/fetchevent" index_uri: "s3://openreplay-quickwit/quickwit-indexes/fetchevent"
doc_mapping: doc_mapping:
@ -63,3 +63,9 @@ search_settings:
retention: retention:
period: 30 days period: 30 days
schedule: hourly schedule: hourly
indexing_settings:
merge_policy:
type: "stable_log"
min_level_num_docs: 40000
maturation_period: 12h

View file

@ -4,7 +4,7 @@
version: 0.6 version: 0.6
index_id: "graphql" index_id: "Graphql"
index_uri: "s3://openreplay-quickwit/quickwit-indexes/graphql" index_uri: "s3://openreplay-quickwit/quickwit-indexes/graphql"
doc_mapping: doc_mapping:
@ -50,3 +50,9 @@ search_settings:
retention: retention:
period: 30 days period: 30 days
schedule: hourly schedule: hourly
indexing_settings:
merge_policy:
type: "stable_log"
min_level_num_docs: 40000
maturation_period: 12h

View file

@ -4,7 +4,7 @@
version: 0.6 version: 0.6
index_id: "pageevent" index_id: "Pageevent"
index_uri: "s3://openreplay-quickwit/quickwit-indexes/pageevent" index_uri: "s3://openreplay-quickwit/quickwit-indexes/pageevent"
doc_mapping: doc_mapping:
@ -84,3 +84,9 @@ search_settings:
retention: retention:
period: 30 days period: 30 days
schedule: hourly schedule: hourly
indexing_settings:
merge_policy:
type: "stable_log"
min_level_num_docs: 40000
maturation_period: 12h

View file

@ -1,7 +1,7 @@
## In order to save data into S3 ## In order to save data into S3
# metastore also accepts s3://{bucket/path}#pooling_interval={seconds}s # metastore also accepts s3://{bucket/path}#pooling_interval={seconds}s
version: 0.6 version: 0.6
metastore_uri: s3://openreplay-quickwit/quickwit-indexes metastore_uri: s3://{{AWS_BUCKET}}/quickwit-indexes
default_index_root_uri: s3://openreplay-quickwit/quickwit-indexes default_index_root_uri: s3://{{AWS_BUCKET}}/quickwit-indexes
listen_address: 0.0.0.0 listen_address: 0.0.0.0
rest_listen_port: {{QUICKWIT_PORT}} rest_listen_port: {{QUICKWIT_PORT}}

View file

@ -1,6 +1,6 @@
## In order to save data into S3 ## In order to save data into S3
# metastore also accepts s3://{bucket/path}#pooling_interval={seconds}s # metastore also accepts s3://{bucket/path}#pooling_interval={seconds}s
version: 0.6 version: 0.6
metastore_uri: s3://openreplay-quickwit/quickwit-indexes metastore_uri: s3://{{AWS_BUCKET}}/quickwit-indexes
default_index_root_uri: s3://openreplay-quickwit/quickwit-indexes default_index_root_uri: s3://{{AWS_BUCKET}}/quickwit-indexes
rest_listen_port: {{QUICKWIT_PORT}} rest_listen_port: {{QUICKWIT_PORT}}