diff --git a/ee/quickwit/Dockerfile b/ee/quickwit/Dockerfile new file mode 100644 index 000000000..41af8ccae --- /dev/null +++ b/ee/quickwit/Dockerfile @@ -0,0 +1,20 @@ +FROM quickwit/quickwit + +COPY *.yaml /quickwit/ +COPY entrypoint.sh /quickwit/ +COPY consumer.py /quickwit/ +COPY requirements.txt /quickwit/ +COPY msgcodec /quickwit/msgcodec +WORKDIR /quickwit + +RUN apt-get update +RUN apt-get install python3 python3-pip -y +RUN apt-get clean +RUN pip install -r requirements.txt + +ENV filter="true" \ + encrypted="false" + +EXPOSE 7280 + +ENTRYPOINT ./entrypoint.sh diff --git a/ee/quickwit/consumer.py b/ee/quickwit/consumer.py new file mode 100644 index 000000000..e293475ca --- /dev/null +++ b/ee/quickwit/consumer.py @@ -0,0 +1,190 @@ +from decouple import config +from confluent_kafka import Consumer +from datetime import datetime +import os as _os +import queue +import requests +import json + + +from time import time, sleep + + +#decryption = config('encrypted', cast=bool) +decryption = False +MessageCodec = None +max_retry=3 +Fetch, FetchEvent, PageEvent, GraphQ = None, None, None, None +if decryption: + from msgcodec.msgcodec import MessageCodec + from msgcodec.messages import Fetch, FetchEvent, PageEvent, GraphQL + print("Enabled decryption mode") + +def _quickwit_ingest(index, data_list, retry=0): + try: + res = requests.post(f'http://localhost:7280/api/v1/{index}/ingest', data=__jsonify_data(data_list, index)) + except requests.exceptions.ConnectionError as e: + retry += 1 + assert retry <= max_retry, f'[ENDPOINT CONNECTION FAIL] Failed to connect to endpoint http://localhost:7280/api/v1/{index}/ingest\n{e}\n' + sleep(5*retry) + print(f"[ENDPOINT ERROR] Failed to connect to endpoint http://localhost:7280/api/v1/{index}/ingest, retrying in {5*retry} seconds..\n") + return _quickwit_ingest(index, data_list, retry=retry) + return res + +def __jsonify_data(data_list, msg_type): + res = list() + i = 0 + for data in data_list: + if msg_type == 'fetchevent': + try: + _tmp = data['request'] + if _tmp != '': + data['request'] = json.loads(_tmp) + else: + data['request'] = {} + _tmp = data['response'] + if _tmp != '': + data['response'] = json.loads(_tmp) + if data['response']['body'][:1] == '{' or data['response']['body'][:2] == '[{': + data['response']['body'] = json.loads(data['response']['body']) + else: + data['response'] = {} + except Exception as e: + print(f'Error {e}\tWhile decoding fetchevent\nEvent: {data}\n') + elif msg_type == 'graphql': + try: + _tmp = data['variables'] + if _tmp != '': + data['variables'] = json.loads(_tmp) + else: + data['variables'] = {} + _tmp = data['response'] + if _tmp != '': + data['response'] = json.loads(_tmp) + else: + data['response'] = {} + except Exception as e: + print(f'Error {e}\tWhile decoding graphql\nEvent: {data}\n') + i += 1 + res.append(json.dumps(data)) + return '\n'.join(res) + +def message_type(message): + if decryption: + if isinstance(message, FetchEvent) or isinstance(Fetch): + return 'fetchevent' + elif isinstance(message, PageEvent): + return 'pageevent' + elif isinstance(message, GraphQL): + return 'graphql' + else: + return 'default' + else: + if 'loaded' in message.keys(): + return 'pageevent' + elif 'variables' in message.keys(): + return 'graphql' + elif 'status' in message.keys(): + return 'fetchevent' + else: + return 'default' + + +class KafkaFilter(): + + def __init__(self): + kafka_sources = config('KAFKA_SERVER') + topic = config('QUICKWIT_TOPIC') + + fetchevent_maxsize = config('fetch_maxsize', default=100, cast=int) + graphql_maxsize = config('graphql_maxsize', default=100, cast=int) + pageevent_maxsize = config('pageevent_maxsize', default=100, cast=int) + + if decryption: + self.codec = MessageCodec() + self.consumer = Consumer({ + "security.protocol": "SSL", + "bootstrap.servers": kafka_sources, + "group.id":"saas-quickwit", + "auto.offset.reset": "earliest", + "enable.auto.commit":False + }) + else: + self.consumer = Consumer({ + "security.protocol": "SSL", + "bootstrap.servers": kafka_sources, + "group.id": "saas-quickwit", + "auto.offset.reset": "earliest", + #value_deserializer=lambda m: json.loads(m.decode('utf-8')), + "enable.auto.commit": False + }) + self.consumer.subscribe([topic]) + self.queues = {'fetchevent': queue.Queue(fetchevent_maxsize), + 'graphql': queue.Queue(graphql_maxsize), + 'pageevent': queue.Queue(pageevent_maxsize) + } + + def add_to_queue(self, message): + associated_queue = message_type(message) + if associated_queue == 'default': + return + if self.queues[associated_queue].full(): + self.flush_to_quickwit() + self.queues[associated_queue].put(message) + + def flush_to_quickwit(self): + for queue_name, _queue in self.queues.items(): + _list = list() + unix_timestamp = int(datetime.now().timestamp()) + while not _queue.empty(): + msg = _queue.get() + if decryption: + value = msg.__dict__ + else: + value = dict(msg) + value['insertion_timestamp'] = unix_timestamp + if queue_name == 'fetchevent' and 'message_id' not in value.keys(): + value['message_id'] = 0 + _list.append(value) + if len(_list) > 0: + _quickwit_ingest(queue_name, _list) + self.consumer.commit() + + def run(self): + _tmp_previous = None + repeated = False + while True: + msg = self.consumer.poll(1.0) + if msg is None: + continue + if msg.error(): + print(f'[Consumer error] {msg.error()}') + continue + value = json.loads(msg.value().decode('utf-8')) + if decryption: + messages = self.codec.decode_detailed(value) + else: + messages = [value] + + if _tmp_previous is None: + _tmp_previous = messages + if type(messages)==list: + for message in messages: + self.add_to_queue(message) + else: + self.add_to_queue(messages) + elif _tmp_previous != messages: + if type(messages)==list: + for message in messages: + self.add_to_queue(message) + else: + self.add_to_queue(messages) + _tmp_previous = messages + repeated = False + elif not repeated: + repeated = True + + +if __name__ == '__main__': + layer = KafkaFilter() + layer.run() diff --git a/ee/quickwit/entrypoint.sh b/ee/quickwit/entrypoint.sh new file mode 100755 index 000000000..549cdbd12 --- /dev/null +++ b/ee/quickwit/entrypoint.sh @@ -0,0 +1,27 @@ +#!/bin/sh + +# This script will rplace the env variable values to the config files + +ls config/ +find /quickwit/ -type f -name "*.yaml" -exec sed -i "s#{{KAFKA_SERVER}}#${KAFKA_SERVER}#g" {} \; +find /quickwit/ -type f -name "*.yaml" -exec sed -i "s#{{AWS_BUCKET}}#${AWS_BUCKET}#g" {} \; +find /quickwit/ -type f -name "*.yaml" -exec sed -i "s/{{QUICKWIT_TOPIC}}/${QUICKWIT_TOPIC}/g" {} \; +find /quickwit/ -type f -name "*.yaml" -exec sed -i "s#{{data_dir_path}}#${data_dir_path}#g" {} \; + +quickwit index create --index-config index-config-fetch.yaml --config s3-config.yaml +quickwit index create --index-config index-config-graphql.yaml --config s3-config.yaml +quickwit index create --index-config index-config-pageevent.yaml --config s3-config.yaml + +quickwit source delete --index fetchevent --source fetch-kafka --config s3-config.yaml +quickwit source delete --index graphql --source graphql-kafka --config s3-config.yaml +quickwit source delete --index pageevent --source pageevent-kafka --config s3-config.yaml + + +if [${filter} == "false"]; then + quickwit source create --index fetchevent --source-config source-fetch.yaml --config s3-config.yaml + quickwit source create --index graphql --source-config source-graphql.yaml --config s3-config.yaml + quickwit source create --index pageevent --source-config source-pageevent.yaml --config s3-config.yaml + quickwit run --config s3-config-listen.yaml +else + quickwit run --config s3-config-listen.yaml & python3 consumer.py && fg +fi diff --git a/ee/quickwit/index-config-fetch.yaml b/ee/quickwit/index-config-fetch.yaml index 1d89f72c9..55cced160 100644 --- a/ee/quickwit/index-config-fetch.yaml +++ b/ee/quickwit/index-config-fetch.yaml @@ -2,13 +2,28 @@ # Index config file for gh-archive dataset. # -version: 0 +version: 0.4 index_id: fetchevent doc_mapping: - mode: strict + mode: dynamic field_mappings: + - name: insertion_timestamp + type: datetime + input_formats: + - unix_timestamp + precision: seconds + fast: true + - name: project_id + type: i64 + fast: true + - name: session_id + type: i64 + fast: true + - name: message_id + type: i64 + fast: true - name: method type: text tokenizer: default @@ -18,11 +33,15 @@ doc_mapping: tokenizer: default record: position - name: request - type: text + type: json + stored: true + indexed: true tokenizer: default record: position - name: response - type: text + type: json + stored: true + indexed: true tokenizer: default record: position - name: status @@ -35,6 +54,11 @@ doc_mapping: - name: duration type: i64 fast: true + timestamp_field: insertion_timestamp search_settings: - default_search_fields: [url, request, response] + default_search_fields: [project_id, session_id, url, request] + +retention: + period: 30 days + schedule: hourly diff --git a/ee/quickwit/index-config-graphql.yaml b/ee/quickwit/index-config-graphql.yaml index bac1d8406..b94c5d4a6 100644 --- a/ee/quickwit/index-config-graphql.yaml +++ b/ee/quickwit/index-config-graphql.yaml @@ -2,13 +2,25 @@ # Index config file for gh-archive dataset. # -version: 0 +version: 0.4 index_id: graphql doc_mapping: - mode: strict + mode: dynamic field_mappings: + - name: insertion_timestamp + type: datetime + input_formats: + - unix_timestamp + precision: seconds + fast: true + - name: project_id + type: i64 + fast: true + - name: session_id + type: i64 + fast: true - name: operation_kind type: text tokenizer: default @@ -18,13 +30,22 @@ doc_mapping: tokenizer: default record: position - name: variables - type: text + type: json + stored: true + indexed: true tokenizer: default record: position - name: response - type: text + type: json + stored: true + indexed: true tokenizer: default record: position + timestamp_field: insertion_timestamp search_settings: - default_search_fields: [operation_kind, operation_name, variables] + default_search_fields: [project_id, session_id, operation_kind, operation_name, variables] + +retention: + period: 30 days + schedule: hourly diff --git a/ee/quickwit/index-config-pageevent.yaml b/ee/quickwit/index-config-pageevent.yaml index e47dd6a1d..1ffdae9f0 100644 --- a/ee/quickwit/index-config-pageevent.yaml +++ b/ee/quickwit/index-config-pageevent.yaml @@ -2,13 +2,25 @@ # Index config file for gh-archive dataset. # -version: 0 +version: 0.4 index_id: pageevent doc_mapping: mode: strict field_mappings: + - name: insertion_timestamp + type: datetime + input_formats: + - unix_timestamp + precision: seconds + fast: true + - name: project_id + type: i64 + fast: true + - name: session_id + type: i64 + fast: true - name: message_id type: i64 indexed: true @@ -63,6 +75,11 @@ doc_mapping: - name: time_to_interactive type: i64 fast: true + timestamp_field: insertion_timestamp search_settings: - default_search_fields: [url, referrer, visually_complete] + default_search_fields: [project_id, session_id, url, referrer, visually_complete] + +retention: + period: 30 days + schedule: hourly diff --git a/ee/quickwit/requirements.txt b/ee/quickwit/requirements.txt index d328a9142..78fb272f5 100644 --- a/ee/quickwit/requirements.txt +++ b/ee/quickwit/requirements.txt @@ -1 +1,4 @@ -kafka-python +confluent-kafka +python-decouple +requests +zstd