Compare commits
2 commits
main
...
quickwit-m
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
56af3f5d52 | ||
|
|
fcf1247978 |
7 changed files with 106 additions and 95 deletions
|
|
@ -1,34 +1,26 @@
|
||||||
|
import asyncio
|
||||||
|
from asyncio import Queue
|
||||||
|
|
||||||
from decouple import config
|
from decouple import config
|
||||||
from confluent_kafka import Consumer
|
from confluent_kafka import Consumer
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import os as _os
|
|
||||||
import queue
|
|
||||||
import requests
|
import requests
|
||||||
import json
|
import json
|
||||||
|
|
||||||
|
|
||||||
from time import time, sleep
|
from time import time
|
||||||
QUICKWIT_PORT = config('QUICKWIT_PORT', default=7280, cast=int)
|
QUICKWIT_PORT = config('QUICKWIT_PORT', default=7280, cast=int)
|
||||||
|
|
||||||
#decryption = config('encrypted', cast=bool)
|
|
||||||
decryption = False
|
|
||||||
MessageCodec = None
|
|
||||||
max_retry=3
|
max_retry=3
|
||||||
Fetch, FetchEvent, PageEvent, GraphQ = None, None, None, None
|
|
||||||
if decryption:
|
|
||||||
from msgcodec.msgcodec import MessageCodec
|
|
||||||
from msgcodec.messages import Fetch, FetchEvent, PageEvent, GraphQL
|
|
||||||
print("Enabled decryption mode")
|
|
||||||
|
|
||||||
def _quickwit_ingest(index, data_list, retry=0):
|
async def _quickwit_ingest(index, data_list, retry=0):
|
||||||
try:
|
try:
|
||||||
res = requests.post(f'http://localhost:{QUICKWIT_PORT}/api/v1/{index}/ingest', data=__jsonify_data(data_list, index))
|
res = requests.post(f'http://localhost:{QUICKWIT_PORT}/api/v1/{index}/ingest', data=__jsonify_data(data_list, index))
|
||||||
except requests.exceptions.ConnectionError as e:
|
except requests.exceptions.ConnectionError as e:
|
||||||
retry += 1
|
retry += 1
|
||||||
assert retry <= max_retry, f'[ENDPOINT CONNECTION FAIL] Failed to connect to endpoint http://localhost:{QUICKWIT_PORT}/api/v1/{index}/ingest\n{e}\n'
|
assert retry <= max_retry, f'[ENDPOINT CONNECTION FAIL] Failed to connect to endpoint http://localhost:{QUICKWIT_PORT}/api/v1/{index}/ingest\n{e}\n'
|
||||||
sleep(5*retry)
|
await asyncio.sleep(3*retry)
|
||||||
print(f"[ENDPOINT ERROR] Failed to connect to endpoint http://localhost:{QUICKWIT_PORT}/api/v1/{index}/ingest, retrying in {5*retry} seconds..\n")
|
print(f"[ENDPOINT ERROR] Failed to connect to endpoint http://localhost:{QUICKWIT_PORT}/api/v1/{index}/ingest, retrying in {3*retry} seconds..\n")
|
||||||
return _quickwit_ingest(index, data_list, retry=retry)
|
return await _quickwit_ingest(index, data_list, retry=retry)
|
||||||
return res
|
return res
|
||||||
|
|
||||||
def __jsonify_data(data_list, msg_type):
|
def __jsonify_data(data_list, msg_type):
|
||||||
|
|
@ -70,121 +62,113 @@ def __jsonify_data(data_list, msg_type):
|
||||||
return '\n'.join(res)
|
return '\n'.join(res)
|
||||||
|
|
||||||
def message_type(message):
|
def message_type(message):
|
||||||
if decryption:
|
if 'loaded' in message.keys():
|
||||||
if isinstance(message, FetchEvent) or isinstance(Fetch):
|
return 'pageevent'
|
||||||
return 'fetchevent'
|
elif 'variables' in message.keys():
|
||||||
elif isinstance(message, PageEvent):
|
return 'graphql'
|
||||||
return 'pageevent'
|
elif 'status' in message.keys():
|
||||||
elif isinstance(message, GraphQL):
|
return 'fetchevent'
|
||||||
return 'graphql'
|
|
||||||
else:
|
|
||||||
return 'default'
|
|
||||||
else:
|
else:
|
||||||
if 'loaded' in message.keys():
|
return 'default'
|
||||||
return 'pageevent'
|
|
||||||
elif 'variables' in message.keys():
|
|
||||||
return 'graphql'
|
|
||||||
elif 'status' in message.keys():
|
|
||||||
return 'fetchevent'
|
|
||||||
else:
|
|
||||||
return 'default'
|
|
||||||
|
|
||||||
|
|
||||||
class KafkaFilter():
|
class KafkaFilter():
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self, uid):
|
||||||
|
self.uid = uid
|
||||||
kafka_sources = config('KAFKA_SERVER')
|
kafka_sources = config('KAFKA_SERVER')
|
||||||
topic = config('QUICKWIT_TOPIC')
|
topic = config('QUICKWIT_TOPIC')
|
||||||
|
|
||||||
fetchevent_maxsize = config('fetch_maxsize', default=100, cast=int)
|
self.fetchevent_maxsize = config('fetch_maxsize', default=100, cast=int)
|
||||||
graphql_maxsize = config('graphql_maxsize', default=100, cast=int)
|
self.graphql_maxsize = config('graphql_maxsize', default=100, cast=int)
|
||||||
pageevent_maxsize = config('pageevent_maxsize', default=100, cast=int)
|
self.pageevent_maxsize = config('pageevent_maxsize', default=100, cast=int)
|
||||||
|
|
||||||
if decryption:
|
self.consumer = Consumer({
|
||||||
self.codec = MessageCodec()
|
"security.protocol": "SSL",
|
||||||
self.consumer = Consumer({
|
"bootstrap.servers": kafka_sources,
|
||||||
"security.protocol": "SSL",
|
"group.id": config("group_id"),
|
||||||
"bootstrap.servers": kafka_sources,
|
"auto.offset.reset": "earliest",
|
||||||
"group.id": config("group_id"),
|
#value_deserializer=lambda m: json.loads(m.decode('utf-8')),
|
||||||
"auto.offset.reset": "earliest",
|
"enable.auto.commit": False
|
||||||
"enable.auto.commit":False
|
})
|
||||||
})
|
|
||||||
else:
|
|
||||||
self.consumer = Consumer({
|
|
||||||
"security.protocol": "SSL",
|
|
||||||
"bootstrap.servers": kafka_sources,
|
|
||||||
"group.id": config("group_id"),
|
|
||||||
"auto.offset.reset": "earliest",
|
|
||||||
#value_deserializer=lambda m: json.loads(m.decode('utf-8')),
|
|
||||||
"enable.auto.commit": False
|
|
||||||
})
|
|
||||||
self.consumer.subscribe([topic])
|
self.consumer.subscribe([topic])
|
||||||
self.queues = {'fetchevent': queue.Queue(fetchevent_maxsize),
|
self.queues = {'Fetchevent': Queue(self.fetchevent_maxsize),
|
||||||
'graphql': queue.Queue(graphql_maxsize),
|
'Graphql': Queue(self.graphql_maxsize),
|
||||||
'pageevent': queue.Queue(pageevent_maxsize)
|
'Pageevent': Queue(self.pageevent_maxsize)
|
||||||
}
|
}
|
||||||
|
|
||||||
def add_to_queue(self, message):
|
async def add_to_queue(self, message):
|
||||||
|
# TODO: Fix this method
|
||||||
associated_queue = message_type(message)
|
associated_queue = message_type(message)
|
||||||
if associated_queue == 'default':
|
if associated_queue == 'default':
|
||||||
return
|
return
|
||||||
if self.queues[associated_queue].full():
|
await self.queues[associated_queue].put(message)
|
||||||
self.flush_to_quickwit()
|
|
||||||
self.queues[associated_queue].put(message)
|
|
||||||
|
|
||||||
def flush_to_quickwit(self):
|
async def flush_to_quickwit(self):
|
||||||
|
# TODO: Fix this method
|
||||||
|
one_queue_full = any([q.full() for q in self.queues.values()])
|
||||||
|
if not one_queue_full:
|
||||||
|
return
|
||||||
for queue_name, _queue in self.queues.items():
|
for queue_name, _queue in self.queues.items():
|
||||||
_list = list()
|
_list = list()
|
||||||
unix_timestamp = int(datetime.now().timestamp())
|
unix_timestamp = int(datetime.now().timestamp())
|
||||||
while not _queue.empty():
|
while not _queue.empty():
|
||||||
msg = _queue.get()
|
msg = await _queue.get()
|
||||||
if decryption:
|
value = dict(msg)
|
||||||
value = msg.__dict__
|
|
||||||
else:
|
|
||||||
value = dict(msg)
|
|
||||||
value['insertion_timestamp'] = unix_timestamp
|
value['insertion_timestamp'] = unix_timestamp
|
||||||
if queue_name == 'fetchevent' and 'message_id' not in value.keys():
|
if queue_name == 'fetchevent' and 'message_id' not in value.keys():
|
||||||
value['message_id'] = 0
|
value['message_id'] = 0
|
||||||
_list.append(value)
|
_list.append(value)
|
||||||
if len(_list) > 0:
|
if len(_list) > 0:
|
||||||
_quickwit_ingest(queue_name, _list)
|
await _quickwit_ingest(queue_name, _list)
|
||||||
self.consumer.commit()
|
# self.consumer.commit() ## TODO: Find when to run commit
|
||||||
|
|
||||||
def run(self):
|
|
||||||
|
async def process_messages(self):
|
||||||
_tmp_previous = None
|
_tmp_previous = None
|
||||||
repeated = False
|
repeated = False
|
||||||
while True:
|
while True:
|
||||||
msg = self.consumer.poll(1.0)
|
msg = self.consumer.poll(1.0)
|
||||||
if msg is None:
|
if msg is None:
|
||||||
continue
|
await asyncio.sleep(0.1)
|
||||||
if msg.error():
|
|
||||||
print(f'[Consumer error] {msg.error()}')
|
|
||||||
continue
|
continue
|
||||||
value = json.loads(msg.value().decode('utf-8'))
|
value = json.loads(msg.value().decode('utf-8'))
|
||||||
if decryption:
|
messages = [value]
|
||||||
messages = self.codec.decode_detailed(value)
|
|
||||||
else:
|
|
||||||
messages = [value]
|
|
||||||
|
|
||||||
if _tmp_previous is None:
|
if _tmp_previous is None:
|
||||||
_tmp_previous = messages
|
_tmp_previous = messages
|
||||||
if type(messages)==list:
|
if isinstance(messages, list):
|
||||||
for message in messages:
|
for message in messages:
|
||||||
self.add_to_queue(message)
|
await self.add_to_queue(message)
|
||||||
else:
|
else:
|
||||||
self.add_to_queue(messages)
|
await self.add_to_queue(messages)
|
||||||
|
|
||||||
elif _tmp_previous != messages:
|
elif _tmp_previous != messages:
|
||||||
if type(messages)==list:
|
if isinstance(messages, list):
|
||||||
for message in messages:
|
for message in messages:
|
||||||
self.add_to_queue(message)
|
await self.add_to_queue(message)
|
||||||
else:
|
else:
|
||||||
self.add_to_queue(messages)
|
await self.add_to_queue(messages)
|
||||||
_tmp_previous = messages
|
_tmp_previous = messages
|
||||||
repeated = False
|
repeated = False
|
||||||
elif not repeated:
|
elif not repeated:
|
||||||
repeated = True
|
repeated = True
|
||||||
|
|
||||||
|
async def upload_messages(self):
|
||||||
|
while True:
|
||||||
|
await self.flush_to_quickwit()
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
async def run(self):
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
loop.create_task(self.process_messages())
|
||||||
|
loop.create_task(self.upload_messages())
|
||||||
|
return
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"Class object KafkaConsumer id #{self.uid}"
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
layer = KafkaFilter()
|
layer = KafkaFilter(uid=0)
|
||||||
layer.run()
|
asyncio.run(layer.run())
|
||||||
|
|
|
||||||
|
|
@ -9,4 +9,13 @@ find /quickwit/ -type f -name "*.yaml" -exec sed -i "s/{{QUICKWIT_TOPIC}}/${QUIC
|
||||||
find /quickwit/ -type f -name "*.yaml" -exec sed -i "s/{{QUICKWIT_PORT}}/${QUICKWIT_PORT}/g" {} \;
|
find /quickwit/ -type f -name "*.yaml" -exec sed -i "s/{{QUICKWIT_PORT}}/${QUICKWIT_PORT}/g" {} \;
|
||||||
find /quickwit/ -type f -name "*.yaml" -exec sed -i "s#{{data_dir_path}}#${data_dir_path}#g" {} \;
|
find /quickwit/ -type f -name "*.yaml" -exec sed -i "s#{{data_dir_path}}#${data_dir_path}#g" {} \;
|
||||||
|
|
||||||
./quickwit_start_task.sh & ./setup_indexes_and_worker.sh && fg
|
./quickwit_start_task.sh & pid1=$!
|
||||||
|
sleep 120
|
||||||
|
echo "Creating indexes.."
|
||||||
|
quickwit index create --index-config index-config-fetch.yaml
|
||||||
|
quickwit index create --index-config index-config-graphql.yaml
|
||||||
|
quickwit index create --index-config index-config-pageevent.yaml
|
||||||
|
echo "Running kafka reader.."
|
||||||
|
python3 -u consumer.py & pid2=$!
|
||||||
|
wait $pid1 $pid2
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@
|
||||||
|
|
||||||
version: 0.6
|
version: 0.6
|
||||||
|
|
||||||
index_id: "fetchevent"
|
index_id: "Fetchevent"
|
||||||
index_uri: "s3://openreplay-quickwit/quickwit-indexes/fetchevent"
|
index_uri: "s3://openreplay-quickwit/quickwit-indexes/fetchevent"
|
||||||
|
|
||||||
doc_mapping:
|
doc_mapping:
|
||||||
|
|
@ -63,3 +63,9 @@ search_settings:
|
||||||
retention:
|
retention:
|
||||||
period: 30 days
|
period: 30 days
|
||||||
schedule: hourly
|
schedule: hourly
|
||||||
|
|
||||||
|
indexing_settings:
|
||||||
|
merge_policy:
|
||||||
|
type: "stable_log"
|
||||||
|
min_level_num_docs: 40000
|
||||||
|
maturation_period: 12h
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@
|
||||||
|
|
||||||
version: 0.6
|
version: 0.6
|
||||||
|
|
||||||
index_id: "graphql"
|
index_id: "Graphql"
|
||||||
index_uri: "s3://openreplay-quickwit/quickwit-indexes/graphql"
|
index_uri: "s3://openreplay-quickwit/quickwit-indexes/graphql"
|
||||||
|
|
||||||
doc_mapping:
|
doc_mapping:
|
||||||
|
|
@ -50,3 +50,9 @@ search_settings:
|
||||||
retention:
|
retention:
|
||||||
period: 30 days
|
period: 30 days
|
||||||
schedule: hourly
|
schedule: hourly
|
||||||
|
|
||||||
|
indexing_settings:
|
||||||
|
merge_policy:
|
||||||
|
type: "stable_log"
|
||||||
|
min_level_num_docs: 40000
|
||||||
|
maturation_period: 12h
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@
|
||||||
|
|
||||||
version: 0.6
|
version: 0.6
|
||||||
|
|
||||||
index_id: "pageevent"
|
index_id: "Pageevent"
|
||||||
index_uri: "s3://openreplay-quickwit/quickwit-indexes/pageevent"
|
index_uri: "s3://openreplay-quickwit/quickwit-indexes/pageevent"
|
||||||
|
|
||||||
doc_mapping:
|
doc_mapping:
|
||||||
|
|
@ -84,3 +84,9 @@ search_settings:
|
||||||
retention:
|
retention:
|
||||||
period: 30 days
|
period: 30 days
|
||||||
schedule: hourly
|
schedule: hourly
|
||||||
|
|
||||||
|
indexing_settings:
|
||||||
|
merge_policy:
|
||||||
|
type: "stable_log"
|
||||||
|
min_level_num_docs: 40000
|
||||||
|
maturation_period: 12h
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
## In order to save data into S3
|
## In order to save data into S3
|
||||||
# metastore also accepts s3://{bucket/path}#pooling_interval={seconds}s
|
# metastore also accepts s3://{bucket/path}#pooling_interval={seconds}s
|
||||||
version: 0.6
|
version: 0.6
|
||||||
metastore_uri: s3://openreplay-quickwit/quickwit-indexes
|
metastore_uri: s3://{{AWS_BUCKET}}/quickwit-indexes
|
||||||
default_index_root_uri: s3://openreplay-quickwit/quickwit-indexes
|
default_index_root_uri: s3://{{AWS_BUCKET}}/quickwit-indexes
|
||||||
listen_address: 0.0.0.0
|
listen_address: 0.0.0.0
|
||||||
rest_listen_port: {{QUICKWIT_PORT}}
|
rest_listen_port: {{QUICKWIT_PORT}}
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
## In order to save data into S3
|
## In order to save data into S3
|
||||||
# metastore also accepts s3://{bucket/path}#pooling_interval={seconds}s
|
# metastore also accepts s3://{bucket/path}#pooling_interval={seconds}s
|
||||||
version: 0.6
|
version: 0.6
|
||||||
metastore_uri: s3://openreplay-quickwit/quickwit-indexes
|
metastore_uri: s3://{{AWS_BUCKET}}/quickwit-indexes
|
||||||
default_index_root_uri: s3://openreplay-quickwit/quickwit-indexes
|
default_index_root_uri: s3://{{AWS_BUCKET}}/quickwit-indexes
|
||||||
rest_listen_port: {{QUICKWIT_PORT}}
|
rest_listen_port: {{QUICKWIT_PORT}}
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue