Merge pull request #946 from openreplay/quickwit_confluent
Changed module kafka-python to confluent-kafka
This commit is contained in:
commit
07d33a96b1
7 changed files with 315 additions and 13 deletions
20
ee/quickwit/Dockerfile
Normal file
20
ee/quickwit/Dockerfile
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
FROM quickwit/quickwit
|
||||
|
||||
COPY *.yaml /quickwit/
|
||||
COPY entrypoint.sh /quickwit/
|
||||
COPY consumer.py /quickwit/
|
||||
COPY requirements.txt /quickwit/
|
||||
COPY msgcodec /quickwit/msgcodec
|
||||
WORKDIR /quickwit
|
||||
|
||||
RUN apt-get update
|
||||
RUN apt-get install python3 python3-pip -y
|
||||
RUN apt-get clean
|
||||
RUN pip install -r requirements.txt
|
||||
|
||||
ENV filter="true" \
|
||||
encrypted="false"
|
||||
|
||||
EXPOSE 7280
|
||||
|
||||
ENTRYPOINT ./entrypoint.sh
|
||||
190
ee/quickwit/consumer.py
Normal file
190
ee/quickwit/consumer.py
Normal file
|
|
@ -0,0 +1,190 @@
|
|||
from decouple import config
|
||||
from confluent_kafka import Consumer
|
||||
from datetime import datetime
|
||||
import os as _os
|
||||
import queue
|
||||
import requests
|
||||
import json
|
||||
|
||||
|
||||
from time import time, sleep
|
||||
|
||||
|
||||
#decryption = config('encrypted', cast=bool)
|
||||
decryption = False
|
||||
MessageCodec = None
|
||||
max_retry=3
|
||||
Fetch, FetchEvent, PageEvent, GraphQ = None, None, None, None
|
||||
if decryption:
|
||||
from msgcodec.msgcodec import MessageCodec
|
||||
from msgcodec.messages import Fetch, FetchEvent, PageEvent, GraphQL
|
||||
print("Enabled decryption mode")
|
||||
|
||||
def _quickwit_ingest(index, data_list, retry=0):
|
||||
try:
|
||||
res = requests.post(f'http://localhost:7280/api/v1/{index}/ingest', data=__jsonify_data(data_list, index))
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
retry += 1
|
||||
assert retry <= max_retry, f'[ENDPOINT CONNECTION FAIL] Failed to connect to endpoint http://localhost:7280/api/v1/{index}/ingest\n{e}\n'
|
||||
sleep(5*retry)
|
||||
print(f"[ENDPOINT ERROR] Failed to connect to endpoint http://localhost:7280/api/v1/{index}/ingest, retrying in {5*retry} seconds..\n")
|
||||
return _quickwit_ingest(index, data_list, retry=retry)
|
||||
return res
|
||||
|
||||
def __jsonify_data(data_list, msg_type):
|
||||
res = list()
|
||||
i = 0
|
||||
for data in data_list:
|
||||
if msg_type == 'fetchevent':
|
||||
try:
|
||||
_tmp = data['request']
|
||||
if _tmp != '':
|
||||
data['request'] = json.loads(_tmp)
|
||||
else:
|
||||
data['request'] = {}
|
||||
_tmp = data['response']
|
||||
if _tmp != '':
|
||||
data['response'] = json.loads(_tmp)
|
||||
if data['response']['body'][:1] == '{' or data['response']['body'][:2] == '[{':
|
||||
data['response']['body'] = json.loads(data['response']['body'])
|
||||
else:
|
||||
data['response'] = {}
|
||||
except Exception as e:
|
||||
print(f'Error {e}\tWhile decoding fetchevent\nEvent: {data}\n')
|
||||
elif msg_type == 'graphql':
|
||||
try:
|
||||
_tmp = data['variables']
|
||||
if _tmp != '':
|
||||
data['variables'] = json.loads(_tmp)
|
||||
else:
|
||||
data['variables'] = {}
|
||||
_tmp = data['response']
|
||||
if _tmp != '':
|
||||
data['response'] = json.loads(_tmp)
|
||||
else:
|
||||
data['response'] = {}
|
||||
except Exception as e:
|
||||
print(f'Error {e}\tWhile decoding graphql\nEvent: {data}\n')
|
||||
i += 1
|
||||
res.append(json.dumps(data))
|
||||
return '\n'.join(res)
|
||||
|
||||
def message_type(message):
|
||||
if decryption:
|
||||
if isinstance(message, FetchEvent) or isinstance(Fetch):
|
||||
return 'fetchevent'
|
||||
elif isinstance(message, PageEvent):
|
||||
return 'pageevent'
|
||||
elif isinstance(message, GraphQL):
|
||||
return 'graphql'
|
||||
else:
|
||||
return 'default'
|
||||
else:
|
||||
if 'loaded' in message.keys():
|
||||
return 'pageevent'
|
||||
elif 'variables' in message.keys():
|
||||
return 'graphql'
|
||||
elif 'status' in message.keys():
|
||||
return 'fetchevent'
|
||||
else:
|
||||
return 'default'
|
||||
|
||||
|
||||
class KafkaFilter():
|
||||
|
||||
def __init__(self):
|
||||
kafka_sources = config('KAFKA_SERVER')
|
||||
topic = config('QUICKWIT_TOPIC')
|
||||
|
||||
fetchevent_maxsize = config('fetch_maxsize', default=100, cast=int)
|
||||
graphql_maxsize = config('graphql_maxsize', default=100, cast=int)
|
||||
pageevent_maxsize = config('pageevent_maxsize', default=100, cast=int)
|
||||
|
||||
if decryption:
|
||||
self.codec = MessageCodec()
|
||||
self.consumer = Consumer({
|
||||
"security.protocol": "SSL",
|
||||
"bootstrap.servers": kafka_sources,
|
||||
"group.id":"saas-quickwit",
|
||||
"auto.offset.reset": "earliest",
|
||||
"enable.auto.commit":False
|
||||
})
|
||||
else:
|
||||
self.consumer = Consumer({
|
||||
"security.protocol": "SSL",
|
||||
"bootstrap.servers": kafka_sources,
|
||||
"group.id": "saas-quickwit",
|
||||
"auto.offset.reset": "earliest",
|
||||
#value_deserializer=lambda m: json.loads(m.decode('utf-8')),
|
||||
"enable.auto.commit": False
|
||||
})
|
||||
self.consumer.subscribe([topic])
|
||||
self.queues = {'fetchevent': queue.Queue(fetchevent_maxsize),
|
||||
'graphql': queue.Queue(graphql_maxsize),
|
||||
'pageevent': queue.Queue(pageevent_maxsize)
|
||||
}
|
||||
|
||||
def add_to_queue(self, message):
|
||||
associated_queue = message_type(message)
|
||||
if associated_queue == 'default':
|
||||
return
|
||||
if self.queues[associated_queue].full():
|
||||
self.flush_to_quickwit()
|
||||
self.queues[associated_queue].put(message)
|
||||
|
||||
def flush_to_quickwit(self):
|
||||
for queue_name, _queue in self.queues.items():
|
||||
_list = list()
|
||||
unix_timestamp = int(datetime.now().timestamp())
|
||||
while not _queue.empty():
|
||||
msg = _queue.get()
|
||||
if decryption:
|
||||
value = msg.__dict__
|
||||
else:
|
||||
value = dict(msg)
|
||||
value['insertion_timestamp'] = unix_timestamp
|
||||
if queue_name == 'fetchevent' and 'message_id' not in value.keys():
|
||||
value['message_id'] = 0
|
||||
_list.append(value)
|
||||
if len(_list) > 0:
|
||||
_quickwit_ingest(queue_name, _list)
|
||||
self.consumer.commit()
|
||||
|
||||
def run(self):
|
||||
_tmp_previous = None
|
||||
repeated = False
|
||||
while True:
|
||||
msg = self.consumer.poll(1.0)
|
||||
if msg is None:
|
||||
continue
|
||||
if msg.error():
|
||||
print(f'[Consumer error] {msg.error()}')
|
||||
continue
|
||||
value = json.loads(msg.value().decode('utf-8'))
|
||||
if decryption:
|
||||
messages = self.codec.decode_detailed(value)
|
||||
else:
|
||||
messages = [value]
|
||||
|
||||
if _tmp_previous is None:
|
||||
_tmp_previous = messages
|
||||
if type(messages)==list:
|
||||
for message in messages:
|
||||
self.add_to_queue(message)
|
||||
else:
|
||||
self.add_to_queue(messages)
|
||||
elif _tmp_previous != messages:
|
||||
if type(messages)==list:
|
||||
for message in messages:
|
||||
self.add_to_queue(message)
|
||||
else:
|
||||
self.add_to_queue(messages)
|
||||
_tmp_previous = messages
|
||||
repeated = False
|
||||
elif not repeated:
|
||||
repeated = True
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
layer = KafkaFilter()
|
||||
layer.run()
|
||||
27
ee/quickwit/entrypoint.sh
Executable file
27
ee/quickwit/entrypoint.sh
Executable file
|
|
@ -0,0 +1,27 @@
|
|||
#!/bin/sh
|
||||
|
||||
# This script will rplace the env variable values to the config files
|
||||
|
||||
ls config/
|
||||
find /quickwit/ -type f -name "*.yaml" -exec sed -i "s#{{KAFKA_SERVER}}#${KAFKA_SERVER}#g" {} \;
|
||||
find /quickwit/ -type f -name "*.yaml" -exec sed -i "s#{{AWS_BUCKET}}#${AWS_BUCKET}#g" {} \;
|
||||
find /quickwit/ -type f -name "*.yaml" -exec sed -i "s/{{QUICKWIT_TOPIC}}/${QUICKWIT_TOPIC}/g" {} \;
|
||||
find /quickwit/ -type f -name "*.yaml" -exec sed -i "s#{{data_dir_path}}#${data_dir_path}#g" {} \;
|
||||
|
||||
quickwit index create --index-config index-config-fetch.yaml --config s3-config.yaml
|
||||
quickwit index create --index-config index-config-graphql.yaml --config s3-config.yaml
|
||||
quickwit index create --index-config index-config-pageevent.yaml --config s3-config.yaml
|
||||
|
||||
quickwit source delete --index fetchevent --source fetch-kafka --config s3-config.yaml
|
||||
quickwit source delete --index graphql --source graphql-kafka --config s3-config.yaml
|
||||
quickwit source delete --index pageevent --source pageevent-kafka --config s3-config.yaml
|
||||
|
||||
|
||||
if [${filter} == "false"]; then
|
||||
quickwit source create --index fetchevent --source-config source-fetch.yaml --config s3-config.yaml
|
||||
quickwit source create --index graphql --source-config source-graphql.yaml --config s3-config.yaml
|
||||
quickwit source create --index pageevent --source-config source-pageevent.yaml --config s3-config.yaml
|
||||
quickwit run --config s3-config-listen.yaml
|
||||
else
|
||||
quickwit run --config s3-config-listen.yaml & python3 consumer.py && fg
|
||||
fi
|
||||
|
|
@ -2,13 +2,28 @@
|
|||
# Index config file for gh-archive dataset.
|
||||
#
|
||||
|
||||
version: 0
|
||||
version: 0.4
|
||||
|
||||
index_id: fetchevent
|
||||
|
||||
doc_mapping:
|
||||
mode: strict
|
||||
mode: dynamic
|
||||
field_mappings:
|
||||
- name: insertion_timestamp
|
||||
type: datetime
|
||||
input_formats:
|
||||
- unix_timestamp
|
||||
precision: seconds
|
||||
fast: true
|
||||
- name: project_id
|
||||
type: i64
|
||||
fast: true
|
||||
- name: session_id
|
||||
type: i64
|
||||
fast: true
|
||||
- name: message_id
|
||||
type: i64
|
||||
fast: true
|
||||
- name: method
|
||||
type: text
|
||||
tokenizer: default
|
||||
|
|
@ -18,11 +33,15 @@ doc_mapping:
|
|||
tokenizer: default
|
||||
record: position
|
||||
- name: request
|
||||
type: text
|
||||
type: json
|
||||
stored: true
|
||||
indexed: true
|
||||
tokenizer: default
|
||||
record: position
|
||||
- name: response
|
||||
type: text
|
||||
type: json
|
||||
stored: true
|
||||
indexed: true
|
||||
tokenizer: default
|
||||
record: position
|
||||
- name: status
|
||||
|
|
@ -35,6 +54,11 @@ doc_mapping:
|
|||
- name: duration
|
||||
type: i64
|
||||
fast: true
|
||||
timestamp_field: insertion_timestamp
|
||||
|
||||
search_settings:
|
||||
default_search_fields: [url, request, response]
|
||||
default_search_fields: [project_id, session_id, url, request]
|
||||
|
||||
retention:
|
||||
period: 30 days
|
||||
schedule: hourly
|
||||
|
|
|
|||
|
|
@ -2,13 +2,25 @@
|
|||
# Index config file for gh-archive dataset.
|
||||
#
|
||||
|
||||
version: 0
|
||||
version: 0.4
|
||||
|
||||
index_id: graphql
|
||||
|
||||
doc_mapping:
|
||||
mode: strict
|
||||
mode: dynamic
|
||||
field_mappings:
|
||||
- name: insertion_timestamp
|
||||
type: datetime
|
||||
input_formats:
|
||||
- unix_timestamp
|
||||
precision: seconds
|
||||
fast: true
|
||||
- name: project_id
|
||||
type: i64
|
||||
fast: true
|
||||
- name: session_id
|
||||
type: i64
|
||||
fast: true
|
||||
- name: operation_kind
|
||||
type: text
|
||||
tokenizer: default
|
||||
|
|
@ -18,13 +30,22 @@ doc_mapping:
|
|||
tokenizer: default
|
||||
record: position
|
||||
- name: variables
|
||||
type: text
|
||||
type: json
|
||||
stored: true
|
||||
indexed: true
|
||||
tokenizer: default
|
||||
record: position
|
||||
- name: response
|
||||
type: text
|
||||
type: json
|
||||
stored: true
|
||||
indexed: true
|
||||
tokenizer: default
|
||||
record: position
|
||||
timestamp_field: insertion_timestamp
|
||||
|
||||
search_settings:
|
||||
default_search_fields: [operation_kind, operation_name, variables]
|
||||
default_search_fields: [project_id, session_id, operation_kind, operation_name, variables]
|
||||
|
||||
retention:
|
||||
period: 30 days
|
||||
schedule: hourly
|
||||
|
|
|
|||
|
|
@ -2,13 +2,25 @@
|
|||
# Index config file for gh-archive dataset.
|
||||
#
|
||||
|
||||
version: 0
|
||||
version: 0.4
|
||||
|
||||
index_id: pageevent
|
||||
|
||||
doc_mapping:
|
||||
mode: strict
|
||||
field_mappings:
|
||||
- name: insertion_timestamp
|
||||
type: datetime
|
||||
input_formats:
|
||||
- unix_timestamp
|
||||
precision: seconds
|
||||
fast: true
|
||||
- name: project_id
|
||||
type: i64
|
||||
fast: true
|
||||
- name: session_id
|
||||
type: i64
|
||||
fast: true
|
||||
- name: message_id
|
||||
type: i64
|
||||
indexed: true
|
||||
|
|
@ -63,6 +75,11 @@ doc_mapping:
|
|||
- name: time_to_interactive
|
||||
type: i64
|
||||
fast: true
|
||||
timestamp_field: insertion_timestamp
|
||||
|
||||
search_settings:
|
||||
default_search_fields: [url, referrer, visually_complete]
|
||||
default_search_fields: [project_id, session_id, url, referrer, visually_complete]
|
||||
|
||||
retention:
|
||||
period: 30 days
|
||||
schedule: hourly
|
||||
|
|
|
|||
|
|
@ -1 +1,4 @@
|
|||
kafka-python
|
||||
confluent-kafka
|
||||
python-decouple
|
||||
requests
|
||||
zstd
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue