From 4b11c77bcf063f45883d2cf8d02dd422f5478e4c Mon Sep 17 00:00:00 2001 From: mauricio garcia suarez Date: Wed, 27 Jul 2022 13:39:04 +0200 Subject: [PATCH 01/13] quickwit server taking inputs from Kafka quickwit-kafka topic and saving data in s3 quickwit bucket --- clean.sh | 2 + create_kafka_index.sh | 1 + create_source.sh | 1 + index-config.yaml | 21 + kafka-source.yaml | 9 + kafka_sample.py | 119 ++++++ msgcodec/codec.py | 819 +++++++++++++++++++++++++++++++++++++ msgcodec/messages.py | 919 ++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 1 + run_quickwit.sh | 1 + s3-config-listen.yaml | 5 + s3-config.yaml | 4 + 12 files changed, 1902 insertions(+) create mode 100644 clean.sh create mode 100644 create_kafka_index.sh create mode 100644 create_source.sh create mode 100644 index-config.yaml create mode 100644 kafka-source.yaml create mode 100644 kafka_sample.py create mode 100644 msgcodec/codec.py create mode 100644 msgcodec/messages.py create mode 100644 requirements.txt create mode 100644 run_quickwit.sh create mode 100644 s3-config-listen.yaml create mode 100644 s3-config.yaml diff --git a/clean.sh b/clean.sh new file mode 100644 index 000000000..81469034c --- /dev/null +++ b/clean.sh @@ -0,0 +1,2 @@ +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/qwdata:/quickwit/qwdata -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml quickwit/quickwit source delete --index quickwit-kafka --source kafka-source --config s3-config.yaml +docker run -v /etc/hosts:/etc/hosts:ro -v -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml $(pwd)/qwdata:/quickwit/qwdata quickwit/quickwit index delete --index quickwit-kafka --config s3-config.yaml diff --git a/create_kafka_index.sh b/create_kafka_index.sh new file mode 100644 index 000000000..651b3983c --- /dev/null +++ b/create_kafka_index.sh @@ -0,0 +1 @@ +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/qwdata:/quickwit/qwdata -v $(pwd)/index-config.yaml:/quickwit/index-config.yaml -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=eu-central-1 -e AWS_REGION=eu-central-1 quickwit/quickwit index create --index-config index-config.yaml --config s3-config.yaml diff --git a/create_source.sh b/create_source.sh new file mode 100644 index 000000000..b1c37c020 --- /dev/null +++ b/create_source.sh @@ -0,0 +1 @@ +docker run -v /etc/hosts:/etc/hosts:ro -v /etc/hosts:/etc/hosts:ro -v $(pwd)/qwdata:/quickwit/qwdata -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -v $(pwd)/kafka-source.yaml:/quickwit/kafka-source.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=eu-central-1 -e AWS_REGION=eu-central-1 quickwit/quickwit source create --index quickwit-kafka --source-config kafka-source.yaml --config s3-config.yaml diff --git a/index-config.yaml b/index-config.yaml new file mode 100644 index 000000000..43f8d3324 --- /dev/null +++ b/index-config.yaml @@ -0,0 +1,21 @@ +# +# Index config file for gh-archive dataset. +# + +version: 0 + +index_id: quickwit-kafka + +doc_mapping: + field_mappings: + - name: title + type: text + tokenizer: default + record: position + - name: body + type: text + tokenizer: default + record: position + +search_settings: + default_search_fields: [title, body] diff --git a/kafka-source.yaml b/kafka-source.yaml new file mode 100644 index 000000000..89b6174cf --- /dev/null +++ b/kafka-source.yaml @@ -0,0 +1,9 @@ +# +# Kafka source config file. +# +source_id: kafka-source +source_type: kafka +params: + topic: quickwit-kafka + client_params: + bootstrap.servers: kafka-1.kafka-headless.db.svc.cluster.local:9092 diff --git a/kafka_sample.py b/kafka_sample.py new file mode 100644 index 000000000..9d58ae54d --- /dev/null +++ b/kafka_sample.py @@ -0,0 +1,119 @@ +import os +from time import sleep +from kafka import KafkaConsumer, KafkaProducer +from datetime import datetime +from collections import defaultdict + +from msgcodec.codec import MessageCodec +from msgcodec.messages import SessionEnd, Fetch, FetchEvent, PageEvent, SetCSSData, SetStyleData +import json + +import getopt, sys + +n = 0 + +def transform(data): + global n + n += 1 + return {'title': f'message {n}', 'body': data} + +def create_producer(): + producer = KafkaProducer(#security_protocol="SSL", + bootstrap_servers=os.environ['KAFKA_SERVER_2'], + # os.environ['KAFKA_SERVER_1']], + #ssl_cafile="./ca.pem", + #ssl_certfile="./service.cert", + #ssl_keyfile="./service.key", + value_serializer=lambda v: json.dumps(v).encode('ascii') + ) + return producer + +def create_consumer(): + consumer = KafkaConsumer(#security_protocol="SSL", + bootstrap_servers=os.environ['KAFKA_SERVER_2'], + # os.environ['KAFKA_SERVER_1']], + group_id=f"my_test52_connector", + auto_offset_reset="earliest", + enable_auto_commit=False + ) + return consumer + + +def consumer_producer_end(): + global n + batch_size = 4000 + sessions_batch_size = 400 + batch = [] + sessions = defaultdict(lambda: None) + sessions_batch = [] + + codec = MessageCodec() + consumer = create_consumer() + producer = create_producer() + + consumer.subscribe(topics=["raw", "raw_ios"]) + print("Kafka consumer subscribed") + escape = 0 + for msg in consumer: + messages = codec.decode_detailed(msg.value) + session_id = codec.decode_key(msg.key) + if messages is None: + print('-') + for message in messages: + send = True + if isinstance(message, Fetch): + data = message.response + elif isinstance(message, FetchEvent): + data = message.response + elif isinstance(message, PageEvent): + print(message.url) + elif isinstance(message, SetCSSData): + data = message.data + elif isinstance(message, SetStyleData): + data = message.data + else: + send = False + continue + if send: + producer.send('quickwit-kafka', value=transform(data)) + print(f'added message {n}') + sleep(5) + + + +def consumer_end(): + consumer = create_consumer() + consumer.subscribe(topics=['quickwit-kafka']) + for msg in consumer: + print(msg) + + +def handle_args(): + arguments = len(sys.argv)-1 + argument_list = sys.argv[1:] + pos = 1 + short_options = 'hm:' + long_options = ['help', 'method='] + try: + arguments, values = getopt.getopt(argument_list, short_options, long_options) + except getopt.error as err: + print(str(err)) + sys.exit(2) + + for arg, argv in arguments: + if arg in ('-h', '--help'): + print(""" Methods +--method, -m available methods: consumer, producer +--help, -h show help + """) + elif arg in ('-m', '--method'): + if argv == 'consumer': + consumer_end() + elif argv == 'producer': + consumer_producer_end() + else: + print('Method not found. Available methods: consumer, producer') + + +if __name__ == '__main__': + handle_args() diff --git a/msgcodec/codec.py b/msgcodec/codec.py new file mode 100644 index 000000000..7e1aabbf9 --- /dev/null +++ b/msgcodec/codec.py @@ -0,0 +1,819 @@ +import io +from typing import List +from msgcodec.messages import * + + +class Codec: + """ + Implements encode/decode primitives + """ + + @staticmethod + def read_boolean(reader: io.BytesIO): + b = reader.read(1) + return b == 1 + + @staticmethod + def read_uint(reader: io.BytesIO): + """ + The ending "big" doesn't play any role here, + since we're dealing with data per one byte + """ + x = 0 # the result + s = 0 # the shift (our result is big-ending) + i = 0 # n of byte (max 9 for uint64) + while True: + b = reader.read(1) + if len(b) == 0: + raise IndexError('bytes out of range') + num = int.from_bytes(b, "big", signed=False) + # print(i, x) + + if num < 0x80: + if i > 9 | i == 9 & num > 1: + raise OverflowError() + return int(x | num << s) + x |= (num & 0x7f) << s + s += 7 + i += 1 + + @staticmethod + def read_int(reader: io.BytesIO) -> int: + """ + ux, err := ReadUint(reader) + x := int64(ux >> 1) + if err != nil { + return x, err + } + if ux&1 != 0 { + x = ^x + } + return x, err + """ + ux = Codec.read_uint(reader) + x = int(ux >> 1) + + if ux & 1 != 0: + x = - x - 1 + return x + + @staticmethod + def read_string(reader: io.BytesIO) -> str: + length = Codec.read_uint(reader) + s = reader.read(length) + try: + return s.decode("utf-8", errors="replace").replace("\x00", "\uFFFD") + except UnicodeDecodeError: + return None + + +class MessageCodec(Codec): + + def encode(self, m: Message) -> bytes: + ... + + def decode(self, b: bytes) -> Message: + reader = io.BytesIO(b) + return self.read_head_message(reader) + + def decode_detailed(self, b: bytes) -> List[Message]: + reader = io.BytesIO(b) + messages_list = list() + while True: + try: + messages_list.append(self.read_head_message(reader)) + except IndexError: + break + return messages_list + + def read_head_message(self, reader: io.BytesIO) -> Message: + message_id = self.read_message_id(reader) + if message_id == 0: + return Timestamp( + timestamp=self.read_uint(reader) + ) + if message_id == 1: + return SessionStart( + timestamp=self.read_uint(reader), + project_id=self.read_uint(reader), + tracker_version=self.read_string(reader), + rev_id=self.read_string(reader), + user_uuid=self.read_string(reader), + user_agent=self.read_string(reader), + user_os=self.read_string(reader), + user_os_version=self.read_string(reader), + user_browser=self.read_string(reader), + user_browser_version=self.read_string(reader), + user_device=self.read_string(reader), + user_device_type=self.read_string(reader), + user_device_memory_size=self.read_uint(reader), + user_device_heap_size=self.read_uint(reader), + user_country=self.read_string(reader) + ) + + if message_id == 2: + return SessionDisconnect( + timestamp=self.read_uint(reader) + ) + + if message_id == 3: + return SessionEnd( + timestamp=self.read_uint(reader) + ) + + if message_id == 4: + return SetPageLocation( + url=self.read_string(reader), + referrer=self.read_string(reader), + navigation_start=self.read_uint(reader) + ) + + if message_id == 5: + return SetViewportSize( + width=self.read_uint(reader), + height=self.read_uint(reader) + ) + + if message_id == 6: + return SetViewportScroll( + x=self.read_int(reader), + y=self.read_int(reader) + ) + + if message_id == 7: + return CreateDocument() + + if message_id == 8: + return CreateElementNode( + id=self.read_uint(reader), + parent_id=self.read_uint(reader), + index=self.read_uint(reader), + tag=self.read_string(reader), + svg=self.read_boolean(reader), + ) + + if message_id == 9: + return CreateTextNode( + id=self.read_uint(reader), + parent_id=self.read_uint(reader), + index=self.read_uint(reader) + ) + + if message_id == 10: + return MoveNode( + id=self.read_uint(reader), + parent_id=self.read_uint(reader), + index=self.read_uint(reader) + ) + + if message_id == 11: + return RemoveNode( + id=self.read_uint(reader) + ) + + if message_id == 12: + return SetNodeAttribute( + id=self.read_uint(reader), + name=self.read_string(reader), + value=self.read_string(reader) + ) + + if message_id == 13: + return RemoveNodeAttribute( + id=self.read_uint(reader), + name=self.read_string(reader) + ) + + if message_id == 14: + return SetNodeData( + id=self.read_uint(reader), + data=self.read_string(reader) + ) + + if message_id == 15: + return SetCSSData( + id=self.read_uint(reader), + data=self.read_string(reader) + ) + + if message_id == 16: + return SetNodeScroll( + id=self.read_uint(reader), + x=self.read_int(reader), + y=self.read_int(reader), + ) + + if message_id == 17: + return SetInputTarget( + id=self.read_uint(reader), + label=self.read_string(reader) + ) + + if message_id == 18: + return SetInputValue( + id=self.read_uint(reader), + value=self.read_string(reader), + mask=self.read_int(reader), + ) + + if message_id == 19: + return SetInputChecked( + id=self.read_uint(reader), + checked=self.read_boolean(reader) + ) + + if message_id == 20: + return MouseMove( + x=self.read_uint(reader), + y=self.read_uint(reader) + ) + + if message_id == 21: + return MouseClickDepricated( + id=self.read_uint(reader), + hesitation_time=self.read_uint(reader), + label=self.read_string(reader) + ) + + if message_id == 22: + return ConsoleLog( + level=self.read_string(reader), + value=self.read_string(reader) + ) + + if message_id == 23: + return PageLoadTiming( + request_start=self.read_uint(reader), + response_start=self.read_uint(reader), + response_end=self.read_uint(reader), + dom_content_loaded_event_start=self.read_uint(reader), + dom_content_loaded_event_end=self.read_uint(reader), + load_event_start=self.read_uint(reader), + load_event_end=self.read_uint(reader), + first_paint=self.read_uint(reader), + first_contentful_paint=self.read_uint(reader) + ) + + if message_id == 24: + return PageRenderTiming( + speed_index=self.read_uint(reader), + visually_complete=self.read_uint(reader), + time_to_interactive=self.read_uint(reader), + ) + + if message_id == 25: + return JSException( + name=self.read_string(reader), + message=self.read_string(reader), + payload=self.read_string(reader) + ) + + if message_id == 26: + return RawErrorEvent( + timestamp=self.read_uint(reader), + source=self.read_string(reader), + name=self.read_string(reader), + message=self.read_string(reader), + payload=self.read_string(reader) + ) + + if message_id == 27: + return RawCustomEvent( + name=self.read_string(reader), + payload=self.read_string(reader) + ) + + if message_id == 28: + return UserID( + id=self.read_string(reader) + ) + + if message_id == 29: + return UserAnonymousID( + id=self.read_string(reader) + ) + + if message_id == 30: + return Metadata( + key=self.read_string(reader), + value=self.read_string(reader) + ) + + if message_id == 31: + return PageEvent( + message_id=self.read_uint(reader), + timestamp=self.read_uint(reader), + url=self.read_string(reader), + referrer=self.read_string(reader), + loaded=self.read_boolean(reader), + request_start=self.read_uint(reader), + response_start=self.read_uint(reader), + response_end=self.read_uint(reader), + dom_content_loaded_event_start=self.read_uint(reader), + dom_content_loaded_event_end=self.read_uint(reader), + load_event_start=self.read_uint(reader), + load_event_end=self.read_uint(reader), + first_paint=self.read_uint(reader), + first_contentful_paint=self.read_uint(reader), + speed_index=self.read_uint(reader), + visually_complete=self.read_uint(reader), + time_to_interactive=self.read_uint(reader) + ) + + if message_id == 32: + return InputEvent( + message_id=self.read_uint(reader), + timestamp=self.read_uint(reader), + value=self.read_string(reader), + value_masked=self.read_boolean(reader), + label=self.read_string(reader), + ) + + if message_id == 33: + return ClickEvent( + message_id=self.read_uint(reader), + timestamp=self.read_uint(reader), + hesitation_time=self.read_uint(reader), + label=self.read_string(reader) + ) + + if message_id == 34: + return ErrorEvent( + message_id=self.read_uint(reader), + timestamp=self.read_uint(reader), + source=self.read_string(reader), + name=self.read_string(reader), + message=self.read_string(reader), + payload=self.read_string(reader) + ) + + if message_id == 35: + + message_id = self.read_uint(reader) + ts = self.read_uint(reader) + if ts > 9999999999999: + ts = None + return ResourceEvent( + message_id=message_id, + timestamp=ts, + duration=self.read_uint(reader), + ttfb=self.read_uint(reader), + header_size=self.read_uint(reader), + encoded_body_size=self.read_uint(reader), + decoded_body_size=self.read_uint(reader), + url=self.read_string(reader), + type=self.read_string(reader), + success=self.read_boolean(reader), + method=self.read_string(reader), + status=self.read_uint(reader) + ) + + if message_id == 36: + return CustomEvent( + message_id=self.read_uint(reader), + timestamp=self.read_uint(reader), + name=self.read_string(reader), + payload=self.read_string(reader) + ) + + if message_id == 37: + return CSSInsertRule( + id=self.read_uint(reader), + rule=self.read_string(reader), + index=self.read_uint(reader) + ) + + if message_id == 38: + return CSSDeleteRule( + id=self.read_uint(reader), + index=self.read_uint(reader) + ) + + if message_id == 39: + return Fetch( + method=self.read_string(reader), + url=self.read_string(reader), + request=self.read_string(reader), + response=self.read_string(reader), + status=self.read_uint(reader), + timestamp=self.read_uint(reader), + duration=self.read_uint(reader) + ) + + if message_id == 40: + return Profiler( + name=self.read_string(reader), + duration=self.read_uint(reader), + args=self.read_string(reader), + result=self.read_string(reader) + ) + + if message_id == 41: + return OTable( + key=self.read_string(reader), + value=self.read_string(reader) + ) + + if message_id == 42: + return StateAction( + type=self.read_string(reader) + ) + + if message_id == 43: + return StateActionEvent( + message_id=self.read_uint(reader), + timestamp=self.read_uint(reader), + type=self.read_string(reader) + ) + + if message_id == 44: + return Redux( + action=self.read_string(reader), + state=self.read_string(reader), + duration=self.read_uint(reader) + ) + + if message_id == 45: + return Vuex( + mutation=self.read_string(reader), + state=self.read_string(reader), + ) + + if message_id == 46: + return MobX( + type=self.read_string(reader), + payload=self.read_string(reader), + ) + + if message_id == 47: + return NgRx( + action=self.read_string(reader), + state=self.read_string(reader), + duration=self.read_uint(reader) + ) + + if message_id == 48: + return GraphQL( + operation_kind=self.read_string(reader), + operation_name=self.read_string(reader), + variables=self.read_string(reader), + response=self.read_string(reader) + ) + + if message_id == 49: + return PerformanceTrack( + frames=self.read_int(reader), + ticks=self.read_int(reader), + total_js_heap_size=self.read_uint(reader), + used_js_heap_size=self.read_uint(reader) + ) + + if message_id == 50: + return GraphQLEvent( + message_id=self.read_uint(reader), + timestamp=self.read_uint(reader), + name=self.read_string(reader) + ) + + if message_id == 51: + return FetchEvent( + message_id=self.read_uint(reader), + timestamp=self.read_uint(reader), + method=self.read_string(reader), + url=self.read_string(reader), + request=self.read_string(reader), + response=self.read_string(reader), + status=self.read_uint(reader), + duration=self.read_uint(reader) + ) + + if message_id == 52: + return DomDrop( + timestamp=self.read_uint(reader) + ) + + if message_id == 53: + return ResourceTiming( + timestamp=self.read_uint(reader), + duration=self.read_uint(reader), + ttfb=self.read_uint(reader), + header_size=self.read_uint(reader), + encoded_body_size=self.read_uint(reader), + decoded_body_size=self.read_uint(reader), + url=self.read_string(reader), + initiator=self.read_string(reader) + ) + + if message_id == 54: + return ConnectionInformation( + downlink=self.read_uint(reader), + type=self.read_string(reader) + ) + + if message_id == 55: + return SetPageVisibility( + hidden=self.read_boolean(reader) + ) + + if message_id == 56: + return PerformanceTrackAggr( + timestamp_start=self.read_uint(reader), + timestamp_end=self.read_uint(reader), + min_fps=self.read_uint(reader), + avg_fps=self.read_uint(reader), + max_fps=self.read_uint(reader), + min_cpu=self.read_uint(reader), + avg_cpu=self.read_uint(reader), + max_cpu=self.read_uint(reader), + min_total_js_heap_size=self.read_uint(reader), + avg_total_js_heap_size=self.read_uint(reader), + max_total_js_heap_size=self.read_uint(reader), + min_used_js_heap_size=self.read_uint(reader), + avg_used_js_heap_size=self.read_uint(reader), + max_used_js_heap_size=self.read_uint(reader) + ) + + if message_id == 59: + return LongTask( + timestamp=self.read_uint(reader), + duration=self.read_uint(reader), + context=self.read_uint(reader), + container_type=self.read_uint(reader), + container_src=self.read_string(reader), + container_id=self.read_string(reader), + container_name=self.read_string(reader) + ) + + if message_id == 60: + return SetNodeURLBasedAttribute( + id=self.read_uint(reader), + name=self.read_string(reader), + value=self.read_string(reader), + base_url=self.read_string(reader) + ) + + if message_id == 61: + return SetStyleData( + id=self.read_uint(reader), + data=self.read_string(reader), + base_url=self.read_string(reader) + ) + + if message_id == 62: + return IssueEvent( + message_id=self.read_uint(reader), + timestamp=self.read_uint(reader), + type=self.read_string(reader), + context_string=self.read_string(reader), + context=self.read_string(reader), + payload=self.read_string(reader) + ) + + if message_id == 63: + return TechnicalInfo( + type=self.read_string(reader), + value=self.read_string(reader) + ) + + if message_id == 64: + return CustomIssue( + name=self.read_string(reader), + payload=self.read_string(reader) + ) + + if message_id == 65: + return PageClose() + + if message_id == 66: + return AssetCache( + url=self.read_string(reader) + ) + + if message_id == 67: + return CSSInsertRuleURLBased( + id=self.read_uint(reader), + rule=self.read_string(reader), + index=self.read_uint(reader), + base_url=self.read_string(reader) + ) + + if message_id == 69: + return MouseClick( + id=self.read_uint(reader), + hesitation_time=self.read_uint(reader), + label=self.read_string(reader), + selector=self.read_string(reader) + ) + + if message_id == 70: + return CreateIFrameDocument( + frame_id=self.read_uint(reader), + id=self.read_uint(reader) + ) + + if message_id == 80: + return BatchMeta( + page_no=self.read_uint(reader), + first_index=self.read_uint(reader), + timestamp=self.read_int(reader) + ) + + if message_id == 90: + return IOSSessionStart( + timestamp=self.read_uint(reader), + project_id=self.read_uint(reader), + tracker_version=self.read_string(reader), + rev_id=self.read_string(reader), + user_uuid=self.read_string(reader), + user_os=self.read_string(reader), + user_os_version=self.read_string(reader), + user_device=self.read_string(reader), + user_device_type=self.read_string(reader), + user_country=self.read_string(reader) + ) + + if message_id == 91: + return IOSSessionEnd( + timestamp=self.read_uint(reader) + ) + + if message_id == 92: + return IOSMetadata( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + key=self.read_string(reader), + value=self.read_string(reader) + ) + + if message_id == 93: + return IOSCustomEvent( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + name=self.read_string(reader), + payload=self.read_string(reader) + ) + + if message_id == 94: + return IOSUserID( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + value=self.read_string(reader) + ) + + if message_id == 95: + return IOSUserAnonymousID( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + value=self.read_string(reader) + ) + + if message_id == 96: + return IOSScreenChanges( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + x=self.read_uint(reader), + y=self.read_uint(reader), + width=self.read_uint(reader), + height=self.read_uint(reader) + ) + + if message_id == 97: + return IOSCrash( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + name=self.read_string(reader), + reason=self.read_string(reader), + stacktrace=self.read_string(reader) + ) + + if message_id == 98: + return IOSScreenEnter( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + title=self.read_string(reader), + view_name=self.read_string(reader) + ) + + if message_id == 99: + return IOSScreenLeave( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + title=self.read_string(reader), + view_name=self.read_string(reader) + ) + + if message_id == 100: + return IOSClickEvent( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + label=self.read_string(reader), + x=self.read_uint(reader), + y=self.read_uint(reader) + ) + + if message_id == 101: + return IOSInputEvent( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + value=self.read_string(reader), + value_masked=self.read_boolean(reader), + label=self.read_string(reader) + ) + + if message_id == 102: + return IOSPreformanceEvent( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + name=self.read_string(reader), + value=self.read_uint(reader) + ) + + if message_id == 103: + return IOSLog( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + severity=self.read_string(reader), + content=self.read_string(reader) + ) + + if message_id == 104: + return IOSInternalError( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + content=self.read_string(reader) + ) + + if message_id == 105: + return IOSNetworkCall( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + duration=self.read_uint(reader), + headers=self.read_string(reader), + body=self.read_string(reader), + url=self.read_string(reader), + success=self.read_boolean(reader), + method=self.read_string(reader), + status=self.read_uint(reader) + ) + + if message_id == 107: + return IOSBatchMeta( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + first_index=self.read_uint(reader) + ) + + if message_id == 110: + return IOSPerformanceAggregated( + timestamp_start=self.read_uint(reader), + timestamp_end=self.read_uint(reader), + min_fps=self.read_uint(reader), + avg_fps=self.read_uint(reader), + max_fps=self.read_uint(reader), + min_cpu=self.read_uint(reader), + avg_cpu=self.read_uint(reader), + max_cpu=self.read_uint(reader), + min_memory=self.read_uint(reader), + avg_memory=self.read_uint(reader), + max_memory=self.read_uint(reader), + min_battery=self.read_uint(reader), + avg_battery=self.read_uint(reader), + max_battery=self.read_uint(reader) + ) + if message_id == 111: + return IOSIssueEvent( + timestamp=self.read_uint(reader), + type=self.read_string(reader), + context_string=self.read_string(reader), + context=self.read_string(reader), + payload=self.read_string(reader) + ) + + def read_message_id(self, reader: io.BytesIO) -> int: + """ + Read and return the first byte where the message id is encoded + """ + id_ = self.read_uint(reader) + return id_ + + @staticmethod + def check_message_id(b: bytes) -> int: + """ + todo: make it static and without reader. It's just the first byte + Read and return the first byte where the message id is encoded + """ + reader = io.BytesIO(b) + id_ = Codec.read_uint(reader) + + return id_ + + @staticmethod + def decode_key(b) -> int: + """ + Decode the message key (encoded with little endian) + """ + try: + decoded = int.from_bytes(b, "little", signed=False) + except Exception as e: + raise UnicodeDecodeError(f"Error while decoding message key (SessionID) from {b}\n{e}") + return decoded diff --git a/msgcodec/messages.py b/msgcodec/messages.py new file mode 100644 index 000000000..bc451b287 --- /dev/null +++ b/msgcodec/messages.py @@ -0,0 +1,919 @@ +""" +Representations of Kafka messages +""" +from abc import ABC + + +class Message(ABC): + pass + + +class Timestamp(Message): + __id__ = 0 + + def __init__(self, timestamp): + self.timestamp = timestamp + + +class SessionStart(Message): + __id__ = 1 + + def __init__(self, timestamp, project_id, tracker_version, rev_id, user_uuid, + user_agent, user_os, user_os_version, user_browser, user_browser_version, + user_device, user_device_type, user_device_memory_size, user_device_heap_size, + user_country): + self.timestamp = timestamp + self.project_id = project_id + self.tracker_version = tracker_version + self.rev_id = rev_id + self.user_uuid = user_uuid + self.user_agent = user_agent + self.user_os = user_os + self.user_os_version = user_os_version + self.user_browser = user_browser + self.user_browser_version = user_browser_version + self.user_device = user_device + self.user_device_type = user_device_type + self.user_device_memory_size = user_device_memory_size + self.user_device_heap_size = user_device_heap_size + self.user_country = user_country + + +class SessionDisconnect(Message): + __id__ = 2 + + def __init__(self, timestamp): + self.timestamp = timestamp + + +class SessionEnd(Message): + __id__ = 3 + __name__ = 'SessionEnd' + + def __init__(self, timestamp): + self.timestamp = timestamp + + +class SetPageLocation(Message): + __id__ = 4 + + def __init__(self, url, referrer, navigation_start): + self.url = url + self.referrer = referrer + self.navigation_start = navigation_start + + +class SetViewportSize(Message): + __id__ = 5 + + def __init__(self, width, height): + self.width = width + self.height = height + + +class SetViewportScroll(Message): + __id__ = 6 + + def __init__(self, x, y): + self.x = x + self.y = y + + +class CreateDocument(Message): + __id__ = 7 + + +class CreateElementNode(Message): + __id__ = 8 + + def __init__(self, id, parent_id, index, tag, svg): + self.id = id + self.parent_id = parent_id, + self.index = index + self.tag = tag + self.svg = svg + + +class CreateTextNode(Message): + __id__ = 9 + + def __init__(self, id, parent_id, index): + self.id = id + self.parent_id = parent_id + self.index = index + + +class MoveNode(Message): + __id__ = 10 + + def __init__(self, id, parent_id, index): + self.id = id + self.parent_id = parent_id + self.index = index + + +class RemoveNode(Message): + __id__ = 11 + + def __init__(self, id): + self.id = id + + +class SetNodeAttribute(Message): + __id__ = 12 + + def __init__(self, id, name: str, value: str): + self.id = id + self.name = name + self.value = value + + +class RemoveNodeAttribute(Message): + __id__ = 13 + + def __init__(self, id, name: str): + self.id = id + self.name = name + + +class SetNodeData(Message): + __id__ = 14 + + def __init__(self, id, data: str): + self.id = id + self.data = data + + +class SetCSSData(Message): + __id__ = 15 + + def __init__(self, id, data: str): + self.id = id + self.data = data + + +class SetNodeScroll(Message): + __id__ = 16 + + def __init__(self, id, x: int, y: int): + self.id = id + self.x = x + self.y = y + + +class SetInputTarget(Message): + __id__ = 17 + + def __init__(self, id, label: str): + self.id = id + self.label = label + + +class SetInputValue(Message): + __id__ = 18 + + def __init__(self, id, value: str, mask: int): + self.id = id + self.value = value + self.mask = mask + + +class SetInputChecked(Message): + __id__ = 19 + + def __init__(self, id, checked: bool): + self.id = id + self.checked = checked + + +class MouseMove(Message): + __id__ = 20 + + def __init__(self, x, y): + self.x = x + self.y = y + + +class MouseClickDepricated(Message): + __id__ = 21 + + def __init__(self, id, hesitation_time, label: str): + self.id = id + self.hesitation_time = hesitation_time + self.label = label + + +class ConsoleLog(Message): + __id__ = 22 + + def __init__(self, level: str, value: str): + self.level = level + self.value = value + + +class PageLoadTiming(Message): + __id__ = 23 + + def __init__(self, request_start, response_start, response_end, dom_content_loaded_event_start, + dom_content_loaded_event_end, load_event_start, load_event_end, + first_paint, first_contentful_paint): + self.request_start = request_start + self.response_start = response_start + self.response_end = response_end + self.dom_content_loaded_event_start = dom_content_loaded_event_start + self.dom_content_loaded_event_end = dom_content_loaded_event_end + self.load_event_start = load_event_start + self.load_event_end = load_event_end + self.first_paint = first_paint + self.first_contentful_paint = first_contentful_paint + + +class PageRenderTiming(Message): + __id__ = 24 + + def __init__(self, speed_index, visually_complete, time_to_interactive): + self.speed_index = speed_index + self.visually_complete = visually_complete + self.time_to_interactive = time_to_interactive + +class JSException(Message): + __id__ = 25 + + def __init__(self, name: str, message: str, payload: str): + self.name = name + self.message = message + self.payload = payload + + +class RawErrorEvent(Message): + __id__ = 26 + + def __init__(self, timestamp, source: str, name: str, message: str, + payload: str): + self.timestamp = timestamp + self.source = source + self.name = name + self.message = message + self.payload = payload + + +class RawCustomEvent(Message): + __id__ = 27 + + def __init__(self, name: str, payload: str): + self.name = name + self.payload = payload + + +class UserID(Message): + __id__ = 28 + + def __init__(self, id: str): + self.id = id + + +class UserAnonymousID(Message): + __id__ = 29 + + def __init__(self, id: str): + self.id = id + + +class Metadata(Message): + __id__ = 30 + + def __init__(self, key: str, value: str): + self.key = key + self.value = value + + +class PerformanceTrack(Message): + __id__ = 49 + + def __init__(self, frames: int, ticks: int, total_js_heap_size, + used_js_heap_size): + self.frames = frames + self.ticks = ticks + self.total_js_heap_size = total_js_heap_size + self.used_js_heap_size = used_js_heap_size + + +class PageEvent(Message): + __id__ = 31 + + def __init__(self, message_id, timestamp, url: str, referrer: str, + loaded: bool, request_start, response_start, response_end, + dom_content_loaded_event_start, dom_content_loaded_event_end, + load_event_start, load_event_end, first_paint, first_contentful_paint, + speed_index, visually_complete, time_to_interactive): + self.message_id = message_id + self.timestamp = timestamp + self.url = url + self.referrer = referrer + self.loaded = loaded + self.request_start = request_start + self.response_start = response_start + self.response_end = response_end + self.dom_content_loaded_event_start = dom_content_loaded_event_start + self.dom_content_loaded_event_end = dom_content_loaded_event_end + self.load_event_start = load_event_start + self.load_event_end = load_event_end + self.first_paint = first_paint + self.first_contentful_paint = first_contentful_paint + self.speed_index = speed_index + self.visually_complete = visually_complete + self.time_to_interactive = time_to_interactive + + +class InputEvent(Message): + __id__ = 32 + + def __init__(self, message_id, timestamp, value: str, value_masked: bool, label: str): + self.message_id = message_id + self.timestamp = timestamp + self.value = value + self.value_masked = value_masked + self.label = label + + +class ClickEvent(Message): + __id__ = 33 + + def __init__(self, message_id, timestamp, hesitation_time, label: str): + self.message_id = message_id + self.timestamp = timestamp + self.hesitation_time = hesitation_time + self.label = label + + +class ErrorEvent(Message): + __id__ = 34 + + def __init__(self, message_id, timestamp, source: str, name: str, message: str, + payload: str): + self.message_id = message_id + self.timestamp = timestamp + self.source = source + self.name = name + self.message = message + self.payload = payload + + +class ResourceEvent(Message): + __id__ = 35 + + def __init__(self, message_id, timestamp, duration, ttfb, header_size, encoded_body_size, + decoded_body_size, url: str, type: str, success: bool, method: str, status): + self.message_id = message_id + self.timestamp = timestamp + self.duration = duration + self.ttfb = ttfb + self.header_size = header_size + self.encoded_body_size = encoded_body_size + self.decoded_body_size = decoded_body_size + self.url = url + self.type = type + self.success = success + self.method = method + self.status = status + + +class CustomEvent(Message): + __id__ = 36 + + def __init__(self, message_id, timestamp, name: str, payload: str): + self.message_id = message_id + self.timestamp = timestamp + self.name = name + self.payload = payload + + +class CSSInsertRule(Message): + __id__ = 37 + + def __init__(self, id, rule: str, index): + self.id = id + self.rule = rule + self.index = index + + +class CSSDeleteRule(Message): + __id__ = 38 + + def __init__(self, id, index): + self.id = id + self.index = index + + +class Fetch(Message): + __id__ = 39 + + def __init__(self, method: str, url: str, request: str, response: str, status, + timestamp, duration): + self.method = method + self.url = url + self.request = request + self.response = response + self.status = status + self.timestamp = timestamp + self.duration = duration + + +class Profiler(Message): + __id__ = 40 + + def __init__(self, name: str, duration, args: str, result: str): + self.name = name + self.duration = duration + self.args = args + self.result = result + + +class OTable(Message): + __id__ = 41 + + def __init__(self, key: str, value: str): + self.key = key + self.value = value + + +class StateAction(Message): + __id__ = 42 + + def __init__(self, type: str): + self.type = type + + +class StateActionEvent(Message): + __id__ = 43 + + def __init__(self, message_id, timestamp, type: str): + self.message_id = message_id + self.timestamp = timestamp + self.type = type + + +class Redux(Message): + __id__ = 44 + + def __init__(self, action: str, state: str, duration): + self.action = action + self.state = state + self.duration = duration + + +class Vuex(Message): + __id__ = 45 + + def __init__(self, mutation: str, state: str): + self.mutation = mutation + self.state = state + + +class MobX(Message): + __id__ = 46 + + def __init__(self, type: str, payload: str): + self.type = type + self.payload = payload + + +class NgRx(Message): + __id__ = 47 + + def __init__(self, action: str, state: str, duration): + self.action = action + self.state = state + self.duration = duration + + +class GraphQL(Message): + __id__ = 48 + + def __init__(self, operation_kind: str, operation_name: str, + variables: str, response: str): + self.operation_kind = operation_kind + self.operation_name = operation_name + self.variables = variables + self.response = response + + +class PerformanceTrack(Message): + __id__ = 49 + + def __init__(self, frames: int, ticks: int, + total_js_heap_size, used_js_heap_size): + self.frames = frames + self.ticks = ticks + self.total_js_heap_size = total_js_heap_size + self.used_js_heap_size = used_js_heap_size + + +class GraphQLEvent(Message): + __id__ = 50 + + def __init__(self, message_id, timestamp, name: str): + self.message_id = message_id + self.timestamp = timestamp + self.name = name + + +class FetchEvent(Message): + __id__ = 51 + + def __init__(self, message_id, timestamp, method: str, url, request, response: str, + status, duration): + self.message_id = message_id + self.timestamp = timestamp + self.method = method + self.url = url + self.request = request + self.response = response + self.status = status + self.duration = duration + + +class DomDrop(Message): + __id__ = 52 + + def __init__(self, timestamp): + self.timestamp = timestamp + + +class ResourceTiming(Message): + __id__ = 53 + + def __init__(self, timestamp, duration, ttfb, header_size, encoded_body_size, + decoded_body_size, url, initiator): + self.timestamp = timestamp + self.duration = duration + self.ttfb = ttfb + self.header_size = header_size + self.encoded_body_size = encoded_body_size + self.decoded_body_size = decoded_body_size + self.url = url + self.initiator = initiator + + +class ConnectionInformation(Message): + __id__ = 54 + + def __init__(self, downlink, type: str): + self.downlink = downlink + self.type = type + + +class SetPageVisibility(Message): + __id__ = 55 + + def __init__(self, hidden: bool): + self.hidden = hidden + + +class PerformanceTrackAggr(Message): + __id__ = 56 + + def __init__(self, timestamp_start, timestamp_end, min_fps, avg_fps, + max_fps, min_cpu, avg_cpu, max_cpu, + min_total_js_heap_size, avg_total_js_heap_size, + max_total_js_heap_size, min_used_js_heap_size, + avg_used_js_heap_size, max_used_js_heap_size + ): + self.timestamp_start = timestamp_start + self.timestamp_end = timestamp_end + self.min_fps = min_fps + self.avg_fps = avg_fps + self.max_fps = max_fps + self.min_cpu = min_cpu + self.avg_cpu = avg_cpu + self.max_cpu = max_cpu + self.min_total_js_heap_size = min_total_js_heap_size + self.avg_total_js_heap_size = avg_total_js_heap_size + self.max_total_js_heap_size = max_total_js_heap_size + self.min_used_js_heap_size = min_used_js_heap_size + self.avg_used_js_heap_size = avg_used_js_heap_size + self.max_used_js_heap_size = max_used_js_heap_size + + +class LongTask(Message): + __id__ = 59 + + def __init__(self, timestamp, duration, context, container_type, container_src: str, + container_id: str, container_name: str): + self.timestamp = timestamp + self.duration = duration + self.context = context + self.container_type = container_type + self.container_src = container_src + self.container_id = container_id + self.container_name = container_name + + +class SetNodeURLBasedAttribute(Message): + __id__ = 60 + + def __init__(self, id, name: str, value: str, base_url: str): + self.id = id + self.name = name + self.value = value + self.base_url = base_url + + +class SetStyleData(Message): + __id__ = 61 + + def __init__(self, id, data: str, base_url: str): + self.id = id + self.data = data + self.base_url = base_url + + +class IssueEvent(Message): + __id__ = 62 + + def __init__(self, message_id, timestamp, type: str, context_string: str, + context: str, payload: str): + self.message_id = message_id + self.timestamp = timestamp + self.type = type + self.context_string = context_string + self.context = context + self.payload = payload + + +class TechnicalInfo(Message): + __id__ = 63 + + def __init__(self, type: str, value: str): + self.type = type + self.value = value + + +class CustomIssue(Message): + __id__ = 64 + + def __init__(self, name: str, payload: str): + self.name = name + self.payload = payload + + +class PageClose(Message): + __id__ = 65 + + +class AssetCache(Message): + __id__ = 66 + + def __init__(self, url): + self.url = url + + +class CSSInsertRuleURLBased(Message): + __id__ = 67 + + def __init__(self, id, rule, index, base_url): + self.id = id + self.rule = rule + self.index = index + self.base_url = base_url + + +class MouseClick(Message): + __id__ = 69 + + def __init__(self, id, hesitation_time, label: str, selector): + self.id = id + self.hesitation_time = hesitation_time + self.label = label + self.selector = selector + + +class CreateIFrameDocument(Message): + __id__ = 70 + + def __init__(self, frame_id, id): + self.frame_id = frame_id + self.id = id + + +class BatchMeta(Message): + __id__ = 80 + + def __init__(self, page_no, first_index, timestamp): + self.page_no = page_no + self.first_index = first_index + self.timestamp = timestamp + +class IOSSessionStart(Message): + __id__ = 90 + + def __init__(self, timestamp, project_id, tracker_version: str, + rev_id: str, user_uuid: str, user_os: str, user_os_version: str, + user_device: str, user_device_type: str, user_country: str): + self.timestamp = timestamp + self.project_id = project_id + self.tracker_version = tracker_version + self.rev_id = rev_id + self.user_uuid = user_uuid + self.user_os = user_os + self.user_os_version = user_os_version + self.user_device = user_device + self.user_device_type = user_device_type + self.user_country = user_country + + +class IOSSessionEnd(Message): + __id__ = 91 + + def __init__(self, timestamp): + self.timestamp = timestamp + + +class IOSMetadata(Message): + __id__ = 92 + + def __init__(self, timestamp, length, key: str, value: str): + self.timestamp = timestamp + self.length = length + self.key = key + self.value = value + + +class IOSCustomEvent(Message): + __id__ = 93 + + def __init__(self, timestamp, length, name: str, payload: str): + self.timestamp = timestamp + self.length = length + self.name = name + self.payload = payload + + +class IOSUserID(Message): + __id__ = 94 + + def __init__(self, timestamp, length, value: str): + self.timestamp = timestamp + self.length = length + self.value = value + + +class IOSUserAnonymousID(Message): + __id__ = 95 + + def __init__(self, timestamp, length, value: str): + self.timestamp = timestamp + self.length = length + self.value = value + + +class IOSScreenChanges(Message): + __id__ = 96 + + def __init__(self, timestamp, length, x, y, width, height): + self.timestamp = timestamp + self.length = length + self.x = x + self.y = y + self.width = width + self.height = height + + +class IOSCrash(Message): + __id__ = 97 + + def __init__(self, timestamp, length, name: str, reason: str, stacktrace): + self.timestamp = timestamp + self.length = length + self.name = name + self.reason = reason + self.stacktrace = stacktrace + + +class IOSScreenEnter(Message): + __id__ = 98 + + def __init__(self, timestamp, length, title, view_name): + self.timestamp = timestamp + self.length = length + self.title = title + self.view_name = view_name + + +class IOSScreenLeave(Message): + __id__ = 99 + + def __init__(self, timestamp, length, title: str, view_name: str): + self.timestamp = timestamp + self.length = length + self.title = title + self.view_name = view_name + + +class IOSClickEvent(Message): + __id__ = 100 + + def __init__(self, timestamp, length, label, x, y): + self.timestamp = timestamp + self.length = length + self.label = label + self.x = x + self.y = y + + +class IOSInputEvent(Message): + __id__ = 101 + + def __init__(self, timestamp, length, value: str, value_masked: bool, label: str): + self.timestamp = timestamp + self.length = length + self.value_masked = value_masked + self.label = label + + +class IOSPerformanceEvent(Message): + __id__ = 102 + + def __init__(self, timestamp, length, name: str, value): + self.timestamp = timestamp + self.length = length + self.name = name + self.value = value + + +class IOSLog(Message): + __id__ = 103 + + def __init__(self, timestamp, length, severity: str, content: str): + self.timestamp = timestamp + self.length = length + self.severity = severity + self.content = content + + +class IOSInternalError(Message): + __id__ = 104 + + def __init__(self, timestamp, length, content: str): + self.timestamp = timestamp + self.length = length + self.content = content + + +class IOSNetworkCall(Message): + __id__ = 105 + + def __init__(self, timestamp, length, duration, headers, body, url, success: bool, method: str, status): + self.timestamp = timestamp + self.length = length + self.duration = duration + self.headers = headers + self.body = body + self.url = url + self.success = success + self.method = method + self.status = status + + +class IOSBatchMeta(Message): + __id__ = 107 + + def __init__(self, timestamp, length, first_index): + self.timestamp = timestamp + self.length = length + self.first_index = first_index + + +class IOSPerformanceAggregated(Message): + __id__ = 110 + + def __init__(self, timestamp_start, timestamp_end, min_fps, avg_fps, + max_fps, min_cpu, avg_cpu, max_cpu, + min_memory, avg_memory, max_memory, + min_battery, avg_battery, max_battery + ): + self.timestamp_start = timestamp_start + self.timestamp_end = timestamp_end + self.min_fps = min_fps + self.avg_fps = avg_fps + self.max_fps = max_fps + self.min_cpu = min_cpu + self.avg_cpu = avg_cpu + self.max_cpu = max_cpu + self.min_memory = min_memory + self.avg_memory = avg_memory + self.max_memory = max_memory + self.min_battery = min_battery + self.avg_battery = avg_battery + self.max_battery = max_battery + + +class IOSIssueEvent(Message): + __id__ = 111 + + def __init__(self, timestamp, type: str, context_string: str, context: str, payload: str): + self.timestamp = timestamp + self.type = type + self.context_string = context_string + self.context = context + self.payload = payload diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 000000000..d328a9142 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +kafka-python diff --git a/run_quickwit.sh b/run_quickwit.sh new file mode 100644 index 000000000..a00b1046d --- /dev/null +++ b/run_quickwit.sh @@ -0,0 +1 @@ +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/qwdata:/quickwit/qwdata -v $(pwd)/s3-config-listen.yaml:/quickwit/s3-config-listen.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=eu-central-1 -e AWS_REGION=eu-central-1 -p 127.0.0.1:7280:7280 quickwit/quickwit run --config s3-config-listen.yaml diff --git a/s3-config-listen.yaml b/s3-config-listen.yaml new file mode 100644 index 000000000..524339f2f --- /dev/null +++ b/s3-config-listen.yaml @@ -0,0 +1,5 @@ +## In order to save data into S3 +version: 0 +metastore_uri: s3://quickwit/quickwit-indexes +default_index_root_uri: s3://quickwit/quickwit-indexes +listen_address: 0.0.0.0 diff --git a/s3-config.yaml b/s3-config.yaml new file mode 100644 index 000000000..5cf727b28 --- /dev/null +++ b/s3-config.yaml @@ -0,0 +1,4 @@ +## In order to save data into S3 +version: 0 +metastore_uri: s3://quickwit/quickwit-indexes +default_index_root_uri: s3://quickwit/quickwit-indexes From 42a9d80a0de90bf0617b79ca017106d1ade6677e Mon Sep 17 00:00:00 2001 From: mauricio garcia suarez Date: Wed, 27 Jul 2022 13:59:44 +0200 Subject: [PATCH 02/13] Fixed clean error --- clean.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clean.sh b/clean.sh index 81469034c..6101f890b 100644 --- a/clean.sh +++ b/clean.sh @@ -1,2 +1,2 @@ -docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/qwdata:/quickwit/qwdata -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml quickwit/quickwit source delete --index quickwit-kafka --source kafka-source --config s3-config.yaml -docker run -v /etc/hosts:/etc/hosts:ro -v -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml $(pwd)/qwdata:/quickwit/qwdata quickwit/quickwit index delete --index quickwit-kafka --config s3-config.yaml +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/qwdata:/quickwit/qwdata -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=eu-central-1 -e AWS_REGION=eu-central-1 quickwit/quickwit source delete --index quickwit-kafka --source kafka-source --config s3-config.yaml +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -v $(pwd)/qwdata:/quickwit/qwdata -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=eu-central-1 -e AWS_REGION=eu-central-1 quickwit/quickwit index delete --index quickwit-kafka --config s3-config.yaml From c1620db523cc1d2d1c283eb5c59645bdad17209c Mon Sep 17 00:00:00 2001 From: mauricio garcia suarez Date: Wed, 27 Jul 2022 14:06:23 +0200 Subject: [PATCH 03/13] updated bash files: deleted unnecessary commands --- clean.sh | 4 ++-- create_kafka_index.sh | 2 +- create_source.sh | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/clean.sh b/clean.sh index 6101f890b..9dbf56926 100644 --- a/clean.sh +++ b/clean.sh @@ -1,2 +1,2 @@ -docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/qwdata:/quickwit/qwdata -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=eu-central-1 -e AWS_REGION=eu-central-1 quickwit/quickwit source delete --index quickwit-kafka --source kafka-source --config s3-config.yaml -docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -v $(pwd)/qwdata:/quickwit/qwdata -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=eu-central-1 -e AWS_REGION=eu-central-1 quickwit/quickwit index delete --index quickwit-kafka --config s3-config.yaml +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=eu-central-1 -e AWS_REGION=eu-central-1 quickwit/quickwit source delete --index quickwit-kafka --source kafka-source --config s3-config.yaml +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=eu-central-1 -e AWS_REGION=eu-central-1 quickwit/quickwit index delete --index quickwit-kafka --config s3-config.yaml diff --git a/create_kafka_index.sh b/create_kafka_index.sh index 651b3983c..e2669c6ca 100644 --- a/create_kafka_index.sh +++ b/create_kafka_index.sh @@ -1 +1 @@ -docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/qwdata:/quickwit/qwdata -v $(pwd)/index-config.yaml:/quickwit/index-config.yaml -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=eu-central-1 -e AWS_REGION=eu-central-1 quickwit/quickwit index create --index-config index-config.yaml --config s3-config.yaml +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/index-config.yaml:/quickwit/index-config.yaml -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=eu-central-1 -e AWS_REGION=eu-central-1 quickwit/quickwit index create --index-config index-config.yaml --config s3-config.yaml diff --git a/create_source.sh b/create_source.sh index b1c37c020..767c60342 100644 --- a/create_source.sh +++ b/create_source.sh @@ -1 +1 @@ -docker run -v /etc/hosts:/etc/hosts:ro -v /etc/hosts:/etc/hosts:ro -v $(pwd)/qwdata:/quickwit/qwdata -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -v $(pwd)/kafka-source.yaml:/quickwit/kafka-source.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=eu-central-1 -e AWS_REGION=eu-central-1 quickwit/quickwit source create --index quickwit-kafka --source-config kafka-source.yaml --config s3-config.yaml +docker run -v /etc/hosts:/etc/hosts:ro -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -v $(pwd)/kafka-source.yaml:/quickwit/kafka-source.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=eu-central-1 -e AWS_REGION=eu-central-1 quickwit/quickwit source create --index quickwit-kafka --source-config kafka-source.yaml --config s3-config.yaml From e9961b16037039a1dd5a59b8b722dba24e3da707 Mon Sep 17 00:00:00 2001 From: mauricio garcia suarez Date: Wed, 27 Jul 2022 14:19:06 +0200 Subject: [PATCH 04/13] Added some comments and notes --- kafka-source.yaml | 1 + s3-config-listen.yaml | 1 + s3-config.yaml | 1 + 3 files changed, 3 insertions(+) diff --git a/kafka-source.yaml b/kafka-source.yaml index 89b6174cf..12f667dee 100644 --- a/kafka-source.yaml +++ b/kafka-source.yaml @@ -7,3 +7,4 @@ params: topic: quickwit-kafka client_params: bootstrap.servers: kafka-1.kafka-headless.db.svc.cluster.local:9092 + # security.protocol: SSL diff --git a/s3-config-listen.yaml b/s3-config-listen.yaml index 524339f2f..f6065e927 100644 --- a/s3-config-listen.yaml +++ b/s3-config-listen.yaml @@ -1,4 +1,5 @@ ## In order to save data into S3 +# metastore also accepts s3://{bucket/path}#pooling_interval={seconds}s version: 0 metastore_uri: s3://quickwit/quickwit-indexes default_index_root_uri: s3://quickwit/quickwit-indexes diff --git a/s3-config.yaml b/s3-config.yaml index 5cf727b28..2fa1e20d7 100644 --- a/s3-config.yaml +++ b/s3-config.yaml @@ -1,4 +1,5 @@ ## In order to save data into S3 +# metastore also accepts s3://{bucket/path}#pooling_interval={seconds}s version: 0 metastore_uri: s3://quickwit/quickwit-indexes default_index_root_uri: s3://quickwit/quickwit-indexes From ea21040ccc364e66879bb86b540edf4ab677e083 Mon Sep 17 00:00:00 2001 From: mauricio garcia suarez Date: Wed, 27 Jul 2022 15:56:33 +0200 Subject: [PATCH 05/13] aws_region set as environment variable --- clean.sh | 4 ++-- create_kafka_index.sh | 2 +- create_source.sh | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/clean.sh b/clean.sh index 9dbf56926..502db2a2e 100644 --- a/clean.sh +++ b/clean.sh @@ -1,2 +1,2 @@ -docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=eu-central-1 -e AWS_REGION=eu-central-1 quickwit/quickwit source delete --index quickwit-kafka --source kafka-source --config s3-config.yaml -docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=eu-central-1 -e AWS_REGION=eu-central-1 quickwit/quickwit index delete --index quickwit-kafka --config s3-config.yaml +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=$aws_region -e AWS_REGION=$aws_region quickwit/quickwit source delete --index quickwit-kafka --source kafka-source --config s3-config.yaml +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=$aws_region -e AWS_REGION=$aws_region quickwit/quickwit index delete --index quickwit-kafka --config s3-config.yaml diff --git a/create_kafka_index.sh b/create_kafka_index.sh index e2669c6ca..24e76803a 100644 --- a/create_kafka_index.sh +++ b/create_kafka_index.sh @@ -1 +1 @@ -docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/index-config.yaml:/quickwit/index-config.yaml -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=eu-central-1 -e AWS_REGION=eu-central-1 quickwit/quickwit index create --index-config index-config.yaml --config s3-config.yaml +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/index-config.yaml:/quickwit/index-config.yaml -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=$aws_region -e AWS_REGION=$aws_region quickwit/quickwit index create --index-config index-config.yaml --config s3-config.yaml diff --git a/create_source.sh b/create_source.sh index 767c60342..77aab1937 100644 --- a/create_source.sh +++ b/create_source.sh @@ -1 +1 @@ -docker run -v /etc/hosts:/etc/hosts:ro -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -v $(pwd)/kafka-source.yaml:/quickwit/kafka-source.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=eu-central-1 -e AWS_REGION=eu-central-1 quickwit/quickwit source create --index quickwit-kafka --source-config kafka-source.yaml --config s3-config.yaml +docker run -v /etc/hosts:/etc/hosts:ro -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -v $(pwd)/kafka-source.yaml:/quickwit/kafka-source.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=$aws_region -e AWS_REGION=$aws_region quickwit/quickwit source create --index quickwit-kafka --source-config kafka-source.yaml --config s3-config.yaml From df468f38e5f520dd4bdbc2ff99d31bc00d880647 Mon Sep 17 00:00:00 2001 From: mauricio garcia suarez Date: Wed, 27 Jul 2022 16:58:56 +0200 Subject: [PATCH 06/13] Added README.md --- README.md | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 000000000..7b566ed13 --- /dev/null +++ b/README.md @@ -0,0 +1,39 @@ +# Quickwit for kafka messages (S3 storage) + +## index +1. [Setup](#setup) +2. [Deploy](#deploy) + +## Setup +This setup is made using Docker, make changes to the files accordingly to run it locally. + +In order to connect to AWS S3 service the aws credentials must be defined in the environment +```bash +export aws_access_key_id={your_aws_access_key_id} +export aws_secret_access_key={your_aws_secret_access_key} +export aws_region={bucket_region} +``` +In the file kafka-source.yaml replace the bootstap.server with the address of your kafka service and uncomment to activate ssl protocol if needed. +## Deploy + +To create the index 'quickwit-kafka' run the command: +```bash +bash create_kafka_index.sh +``` +Having the topic 'quickwit-kafka' in the kafka server defined in the kafka-source.yaml, the connection between the created index and the topic can be achieved by running the command: +```bash +bash create_source.sh +``` +To delete both the index and the source connection run the command: +```bash +bash clean.sh +``` + +To deploy the indexer, search and UI services run the command: +```bash +bash run_quickwit.sh +``` +UI server will start at localhost:7280. The api can also be called through the url http://127.0.0.1:7280/api/v1/quickwit-kafka/search?query={your_query} for example +```angular2html +curl "http://127.0.0.1:7280/api/v1/quickwit-kafka/search?query=body:error" +``` \ No newline at end of file From 7b77a0e90e7e4743ada5437887fa7d5068b75dc6 Mon Sep 17 00:00:00 2001 From: mauricio garcia suarez Date: Wed, 27 Jul 2022 17:57:25 +0200 Subject: [PATCH 07/13] Updated JSON keys and index conf --- index-config.yaml | 26 +++++++++++++++++++++++--- kafka_sample.py | 34 +++++++++------------------------- 2 files changed, 32 insertions(+), 28 deletions(-) diff --git a/index-config.yaml b/index-config.yaml index 43f8d3324..8e68eb73d 100644 --- a/index-config.yaml +++ b/index-config.yaml @@ -8,14 +8,34 @@ index_id: quickwit-kafka doc_mapping: field_mappings: - - name: title + - name: method type: text tokenizer: default record: position - - name: body + - name: url + type: text + tokenizer: default + record: position + - name: request + type: text + tokenizer: default + record: position + - name: response + type: text + tokenizer: default + record: position + - name: status + type: text + tokenizer: default + record: position + - name: timestamp + type: text + tokenizer: default + record: position + - name: duration type: text tokenizer: default record: position search_settings: - default_search_fields: [title, body] + default_search_fields: [url, request, response] diff --git a/kafka_sample.py b/kafka_sample.py index 9d58ae54d..0ebce304c 100644 --- a/kafka_sample.py +++ b/kafka_sample.py @@ -11,11 +11,14 @@ import json import getopt, sys n = 0 - +fetch_keys = ['method', 'url', 'request', 'response', 'status', 'timestamp', 'duration'] def transform(data): global n n += 1 - return {'title': f'message {n}', 'body': data} + return { + 'method': data.method, 'url': data.url, 'request': data.url, 'response': data.request, + 'status': data.status, 'timestamp': data.timestamp, 'duration': data.duration + } def create_producer(): producer = KafkaProducer(#security_protocol="SSL", @@ -32,7 +35,7 @@ def create_consumer(): consumer = KafkaConsumer(#security_protocol="SSL", bootstrap_servers=os.environ['KAFKA_SERVER_2'], # os.environ['KAFKA_SERVER_1']], - group_id=f"my_test52_connector", + group_id=f"quickwit_connector", auto_offset_reset="earliest", enable_auto_commit=False ) @@ -41,11 +44,6 @@ def create_consumer(): def consumer_producer_end(): global n - batch_size = 4000 - sessions_batch_size = 400 - batch = [] - sessions = defaultdict(lambda: None) - sessions_batch = [] codec = MessageCodec() consumer = create_consumer() @@ -60,27 +58,13 @@ def consumer_producer_end(): if messages is None: print('-') for message in messages: - send = True - if isinstance(message, Fetch): - data = message.response - elif isinstance(message, FetchEvent): - data = message.response - elif isinstance(message, PageEvent): - print(message.url) - elif isinstance(message, SetCSSData): - data = message.data - elif isinstance(message, SetStyleData): - data = message.data - else: - send = False - continue - if send: - producer.send('quickwit-kafka', value=transform(data)) + send = False + if isinstance(message, Fetch) or isinstance(message, FetchEvent): + producer.send('quickwit-kafka', value=transform(message)) print(f'added message {n}') sleep(5) - def consumer_end(): consumer = create_consumer() consumer.subscribe(topics=['quickwit-kafka']) From c38255308a7564f6ab2f796a797c24b693592d76 Mon Sep 17 00:00:00 2001 From: mauricio garcia suarez Date: Fri, 29 Jul 2022 12:13:47 +0200 Subject: [PATCH 08/13] Added new Indexes, updated source and python test of quickwit --- README.md | 2 +- index-config.yaml => index-config-fetch.yaml | 17 +++-- index-config-graphql.yaml | 30 +++++++++ index-config-pageevent.yaml | 68 ++++++++++++++++++++ kafka_sample.py | 39 +++++++++-- sources.yaml | 29 +++++++++ 6 files changed, 170 insertions(+), 15 deletions(-) rename index-config.yaml => index-config-fetch.yaml (74%) create mode 100644 index-config-graphql.yaml create mode 100644 index-config-pageevent.yaml create mode 100644 sources.yaml diff --git a/README.md b/README.md index 7b566ed13..f5110c7ee 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,6 @@ To deploy the indexer, search and UI services run the command: bash run_quickwit.sh ``` UI server will start at localhost:7280. The api can also be called through the url http://127.0.0.1:7280/api/v1/quickwit-kafka/search?query={your_query} for example -```angular2html +```bash curl "http://127.0.0.1:7280/api/v1/quickwit-kafka/search?query=body:error" ``` \ No newline at end of file diff --git a/index-config.yaml b/index-config-fetch.yaml similarity index 74% rename from index-config.yaml rename to index-config-fetch.yaml index 8e68eb73d..1a7577450 100644 --- a/index-config.yaml +++ b/index-config-fetch.yaml @@ -4,9 +4,10 @@ version: 0 -index_id: quickwit-kafka +index_id: fetch doc_mapping: + mode: strict field_mappings: - name: method type: text @@ -25,17 +26,15 @@ doc_mapping: tokenizer: default record: position - name: status - type: text - tokenizer: default + type: i64 + fast: true record: position - name: timestamp - type: text - tokenizer: default - record: position + type: i64 + fast: true - name: duration - type: text - tokenizer: default - record: position + type: i64 + fast: true search_settings: default_search_fields: [url, request, response] diff --git a/index-config-graphql.yaml b/index-config-graphql.yaml new file mode 100644 index 000000000..bac1d8406 --- /dev/null +++ b/index-config-graphql.yaml @@ -0,0 +1,30 @@ +# +# Index config file for gh-archive dataset. +# + +version: 0 + +index_id: graphql + +doc_mapping: + mode: strict + field_mappings: + - name: operation_kind + type: text + tokenizer: default + record: position + - name: operation_name + type: text + tokenizer: default + record: position + - name: variables + type: text + tokenizer: default + record: position + - name: response + type: text + tokenizer: default + record: position + +search_settings: + default_search_fields: [operation_kind, operation_name, variables] diff --git a/index-config-pageevent.yaml b/index-config-pageevent.yaml new file mode 100644 index 000000000..90d8d152f --- /dev/null +++ b/index-config-pageevent.yaml @@ -0,0 +1,68 @@ +# +# Index config file for gh-archive dataset. +# + +version: 0 + +index_id: pageevent + +doc_mapping: + mode: strict + field_mappings: + - name: message_id + type: i64 + fast: true + record: position + - name: timestamp + type: i64 + fast: true + - name: url + type: text + tokenizer: default + record: position + - name: referrer + type: text + tokenizer: default + record: position + - name: loaded + type: i64 + fast: true + - name: request_start + type: i64 + fast: true + - name: response_start + type: i64 + fast: true + - name: response_end + type: i64 + fast: true + - name: dom_content_loaded_event_start + type: i64 + fast: true + - name: dom_content_loaded_event_end + type: i64 + fast: true + - name: load_event_start + type: i64 + fast: true + - name: load_event_end + type: i64 + fast: true + - name: first_paint + type: i64 + fast: true + - name: first_contentful_paint + type: i64 + fast: true + - name: speed_index + type: i64 + fast: true + - name: visually_complete + type: i64 + fast: true + - name: time_to_interactive + type: i64 + fast: true + +search_settings: + default_search_fields: [url, referrer, visually_complete] diff --git a/kafka_sample.py b/kafka_sample.py index 0ebce304c..47e9f8001 100644 --- a/kafka_sample.py +++ b/kafka_sample.py @@ -5,14 +5,13 @@ from datetime import datetime from collections import defaultdict from msgcodec.codec import MessageCodec -from msgcodec.messages import SessionEnd, Fetch, FetchEvent, PageEvent, SetCSSData, SetStyleData +from msgcodec.messages import Fetch, FetchEvent, PageEvent, GraphQL import json import getopt, sys n = 0 -fetch_keys = ['method', 'url', 'request', 'response', 'status', 'timestamp', 'duration'] -def transform(data): +def transform_fetch(data): global n n += 1 return { @@ -20,6 +19,28 @@ def transform(data): 'status': data.status, 'timestamp': data.timestamp, 'duration': data.duration } +def transform_graphql(data): + global n + n += 1 + return { + 'operation_kind': data.operation_kind, 'operation_name': data.operation_name, + 'variables': data.variables, 'response': data.response + } + +def transform_pageevent(data): + global n + n += 1 + return {'massage_id': data.message_id, 'timestamp': data.timestamp, 'url': data.timestamp, + 'referrer': data.referrer, 'loaded': data.loaded, 'request_start': data.request_start, + 'response_start': data.response_start, 'response_end': data.response_end, + 'dom_content_loaded_event_start': data.dom_content_loaded_event_start, + 'dom_content_loaded_event_end': data.dom_content_loaded_event_end, + 'load_event_start': data.load_event_start, 'load_event_end': data.load_event_end, + 'first_paint': data.first_paint, 'first_contentful_paint': data.first_contentful_paint, + 'speed_index': data.speed_index, 'visually_complete': data.visually_complete, + 'time_to_interactive': data.time_to_interactive + } + def create_producer(): producer = KafkaProducer(#security_protocol="SSL", bootstrap_servers=os.environ['KAFKA_SERVER_2'], @@ -60,8 +81,16 @@ def consumer_producer_end(): for message in messages: send = False if isinstance(message, Fetch) or isinstance(message, FetchEvent): - producer.send('quickwit-kafka', value=transform(message)) - print(f'added message {n}') + producer.send('quickwit-kafka', value=transform_fetch(message)) + print(f'added message {n} type Fetch') + sleep(5) + if isinstance(message, GraphQL): + producer.send('quickwit-kafka', value=transform_graphql(message)) + print(f'added message {n} type GraphQL') + sleep(5) + if isinstance(message, PageEvent): + producer.send('quickwit-kafka', value=transform_pageevent(message)) + print(f'added message {n} type PageEvent') sleep(5) diff --git a/sources.yaml b/sources.yaml new file mode 100644 index 000000000..7dad76619 --- /dev/null +++ b/sources.yaml @@ -0,0 +1,29 @@ +# +# Source config file. +# + +sources: + - fetch: fetch-kafka + source_type: kafka + params: + topic: quickwit-kafka + client_params: + bootstrap.servers: kafka-1.kafka-headless.db.svc.cluster.local:9092 + group.id: fetch-consumer + # security.protocol: SSL + - graphql: graphql-kafka + source_type: kafka + params: + topic: quickwit-kafka + client_params: + bootstrap.servers: kafka-1.kafka-headless.db.svc.cluster.local:9092 + group.id: graphql-consumer + # security.protocol: SSL + - graphql: graphql-pageevent + source_type: kafka + params: + topic: quickwit-kafka + client_params: + bootstrap.servers: kafka-1.kafka-headless.db.svc.cluster.local:9092 + group.id: pageevent-consumer + # security.protocol: SSL From 125e5289be880b5d64f1b2668bb1d93fe0f98c24 Mon Sep 17 00:00:00 2001 From: mauricio garcia suarez Date: Fri, 29 Jul 2022 16:46:49 +0200 Subject: [PATCH 09/13] Updated methods. New indexes created in quickwit reading from same topic in kafka with different group id --- clean.sh | 10 +++++++-- create_kafka_index.sh | 4 +++- create_source.sh | 1 - create_sources.sh | 3 +++ index-config-fetch.yaml | 4 ++-- index-config-pageevent.yaml | 2 +- run_quickwit.sh | 2 +- kafka-source.yaml => source-fetch.yaml | 6 ++++-- source-graphql.yaml | 12 +++++++++++ source-pageevent.yaml | 12 +++++++++++ sources.yaml | 29 -------------------------- 11 files changed, 46 insertions(+), 39 deletions(-) delete mode 100644 create_source.sh create mode 100644 create_sources.sh rename kafka-source.yaml => source-fetch.yaml (69%) create mode 100644 source-graphql.yaml create mode 100644 source-pageevent.yaml delete mode 100644 sources.yaml diff --git a/clean.sh b/clean.sh index 502db2a2e..9ec3a5f7e 100644 --- a/clean.sh +++ b/clean.sh @@ -1,2 +1,8 @@ -docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=$aws_region -e AWS_REGION=$aws_region quickwit/quickwit source delete --index quickwit-kafka --source kafka-source --config s3-config.yaml -docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=$aws_region -e AWS_REGION=$aws_region quickwit/quickwit index delete --index quickwit-kafka --config s3-config.yaml +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=$aws_region -e AWS_REGION=$aws_region quickwit/quickwit source delete --index fetchevent --source fetch-kafka --config s3-config.yaml +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=$aws_region -e AWS_REGION=$aws_region quickwit/quickwit index delete --index fetchevent --config s3-config.yaml + +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=$aws_region -e AWS_REGION=$aws_region quickwit/quickwit source delete --index graphql --source graphql-kafka --config s3-config.yaml +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=$aws_region -e AWS_REGION=$aws_region quickwit/quickwit index delete --index graphql --config s3-config.yaml + +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=$aws_region -e AWS_REGION=$aws_region quickwit/quickwit source delete --index pageevent --source pageevent-kafka --config s3-config.yaml +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=$aws_region -e AWS_REGION=$aws_region quickwit/quickwit index delete --index pageevent --config s3-config.yaml diff --git a/create_kafka_index.sh b/create_kafka_index.sh index 24e76803a..efb165db6 100644 --- a/create_kafka_index.sh +++ b/create_kafka_index.sh @@ -1 +1,3 @@ -docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/index-config.yaml:/quickwit/index-config.yaml -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=$aws_region -e AWS_REGION=$aws_region quickwit/quickwit index create --index-config index-config.yaml --config s3-config.yaml +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/index-config-fetch.yaml:/quickwit/index-config-fetch.yaml -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=$aws_region -e AWS_REGION=$aws_region quickwit/quickwit index create --index-config index-config-fetch.yaml --config s3-config.yaml +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/index-config-graphql.yaml:/quickwit/index-config-graphql.yaml -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=$aws_region -e AWS_REGION=$aws_region quickwit/quickwit index create --index-config index-config-graphql.yaml --config s3-config.yaml +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/index-config-pageevent.yaml:/quickwit/index-config-pageevent.yaml -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=$aws_region -e AWS_REGION=$aws_region quickwit/quickwit index create --index-config index-config-pageevent.yaml --config s3-config.yaml diff --git a/create_source.sh b/create_source.sh deleted file mode 100644 index 77aab1937..000000000 --- a/create_source.sh +++ /dev/null @@ -1 +0,0 @@ -docker run -v /etc/hosts:/etc/hosts:ro -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -v $(pwd)/kafka-source.yaml:/quickwit/kafka-source.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=$aws_region -e AWS_REGION=$aws_region quickwit/quickwit source create --index quickwit-kafka --source-config kafka-source.yaml --config s3-config.yaml diff --git a/create_sources.sh b/create_sources.sh new file mode 100644 index 000000000..c854899c3 --- /dev/null +++ b/create_sources.sh @@ -0,0 +1,3 @@ +docker run -v /etc/hosts:/etc/hosts:ro -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -v $(pwd)/source-fetch.yaml:/quickwit/source-fetch.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=$aws_region -e AWS_REGION=$aws_region quickwit/quickwit source create --index fetchevent --source-config source-fetch.yaml --config s3-config.yaml +docker run -v /etc/hosts:/etc/hosts:ro -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -v $(pwd)/source-graphql.yaml:/quickwit/source-graphql.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=$aws_region -e AWS_REGION=$aws_region quickwit/quickwit source create --index graphql --source-config source-graphql.yaml --config s3-config.yaml +docker run -v /etc/hosts:/etc/hosts:ro -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -v $(pwd)/source-pageevent.yaml:/quickwit/source-pageevent.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=$aws_region -e AWS_REGION=$aws_region quickwit/quickwit source create --index pageevent --source-config source-pageevent.yaml --config s3-config.yaml diff --git a/index-config-fetch.yaml b/index-config-fetch.yaml index 1a7577450..1d89f72c9 100644 --- a/index-config-fetch.yaml +++ b/index-config-fetch.yaml @@ -4,7 +4,7 @@ version: 0 -index_id: fetch +index_id: fetchevent doc_mapping: mode: strict @@ -27,8 +27,8 @@ doc_mapping: record: position - name: status type: i64 + indexed: true fast: true - record: position - name: timestamp type: i64 fast: true diff --git a/index-config-pageevent.yaml b/index-config-pageevent.yaml index 90d8d152f..36a0a69fe 100644 --- a/index-config-pageevent.yaml +++ b/index-config-pageevent.yaml @@ -11,8 +11,8 @@ doc_mapping: field_mappings: - name: message_id type: i64 + indexed: true fast: true - record: position - name: timestamp type: i64 fast: true diff --git a/run_quickwit.sh b/run_quickwit.sh index a00b1046d..b2afeb011 100644 --- a/run_quickwit.sh +++ b/run_quickwit.sh @@ -1 +1 @@ -docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/qwdata:/quickwit/qwdata -v $(pwd)/s3-config-listen.yaml:/quickwit/s3-config-listen.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=eu-central-1 -e AWS_REGION=eu-central-1 -p 127.0.0.1:7280:7280 quickwit/quickwit run --config s3-config-listen.yaml +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/s3-config-listen.yaml:/quickwit/s3-config-listen.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=$aws_region -e AWS_REGION=$aws_region -p 127.0.0.1:7280:7280 quickwit/quickwit run --config s3-config-listen.yaml diff --git a/kafka-source.yaml b/source-fetch.yaml similarity index 69% rename from kafka-source.yaml rename to source-fetch.yaml index 12f667dee..506c234f6 100644 --- a/kafka-source.yaml +++ b/source-fetch.yaml @@ -1,10 +1,12 @@ # -# Kafka source config file. +# Source config file. # -source_id: kafka-source + +source_id: fetch-kafka source_type: kafka params: topic: quickwit-kafka client_params: bootstrap.servers: kafka-1.kafka-headless.db.svc.cluster.local:9092 + group.id: fetch-consumer # security.protocol: SSL diff --git a/source-graphql.yaml b/source-graphql.yaml new file mode 100644 index 000000000..900e0f92b --- /dev/null +++ b/source-graphql.yaml @@ -0,0 +1,12 @@ +# +# Source config file. +# + +source_id: graphql-kafka +source_type: kafka +params: + topic: quickwit-kafka + client_params: + bootstrap.servers: kafka-1.kafka-headless.db.svc.cluster.local:9092 + group.id: graphql-consumer + # security.protocol: SSL diff --git a/source-pageevent.yaml b/source-pageevent.yaml new file mode 100644 index 000000000..6c5582a96 --- /dev/null +++ b/source-pageevent.yaml @@ -0,0 +1,12 @@ +# +# Source config file. +# + +source_id: pageevent-kafka +source_type: kafka +params: + topic: quickwit-kafka + client_params: + bootstrap.servers: kafka-1.kafka-headless.db.svc.cluster.local:9092 + group.id: pageevent-consumer + # security.protocol: SSL diff --git a/sources.yaml b/sources.yaml deleted file mode 100644 index 7dad76619..000000000 --- a/sources.yaml +++ /dev/null @@ -1,29 +0,0 @@ -# -# Source config file. -# - -sources: - - fetch: fetch-kafka - source_type: kafka - params: - topic: quickwit-kafka - client_params: - bootstrap.servers: kafka-1.kafka-headless.db.svc.cluster.local:9092 - group.id: fetch-consumer - # security.protocol: SSL - - graphql: graphql-kafka - source_type: kafka - params: - topic: quickwit-kafka - client_params: - bootstrap.servers: kafka-1.kafka-headless.db.svc.cluster.local:9092 - group.id: graphql-consumer - # security.protocol: SSL - - graphql: graphql-pageevent - source_type: kafka - params: - topic: quickwit-kafka - client_params: - bootstrap.servers: kafka-1.kafka-headless.db.svc.cluster.local:9092 - group.id: pageevent-consumer - # security.protocol: SSL From c198f30039d532619c05b1a60b7a3ecbc8325250 Mon Sep 17 00:00:00 2001 From: mauricio garcia suarez Date: Fri, 29 Jul 2022 16:47:29 +0200 Subject: [PATCH 10/13] quickwit test script updated --- kafka_sample.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka_sample.py b/kafka_sample.py index 47e9f8001..cba7869ab 100644 --- a/kafka_sample.py +++ b/kafka_sample.py @@ -56,7 +56,7 @@ def create_consumer(): consumer = KafkaConsumer(#security_protocol="SSL", bootstrap_servers=os.environ['KAFKA_SERVER_2'], # os.environ['KAFKA_SERVER_1']], - group_id=f"quickwit_connector", + group_id=f"quickwit_connector2", auto_offset_reset="earliest", enable_auto_commit=False ) From 5f669bf275625073cbd1f038b0836c4603b660a0 Mon Sep 17 00:00:00 2001 From: mauricio garcia suarez Date: Fri, 29 Jul 2022 17:25:51 +0200 Subject: [PATCH 11/13] added gitignore --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..bf7d42865 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.idea +Pip* + From 2ad4f4ea124232274bc09ab1f1faf2581053e619 Mon Sep 17 00:00:00 2001 From: mauricio garcia suarez Date: Mon, 1 Aug 2022 10:48:14 +0200 Subject: [PATCH 12/13] changed topic name to quickwit, and kafka source uri for localhost --- source-fetch.yaml | 6 +++--- source-graphql.yaml | 6 +++--- source-pageevent.yaml | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/source-fetch.yaml b/source-fetch.yaml index 506c234f6..9ea448e83 100644 --- a/source-fetch.yaml +++ b/source-fetch.yaml @@ -5,8 +5,8 @@ source_id: fetch-kafka source_type: kafka params: - topic: quickwit-kafka + topic: quickwit client_params: - bootstrap.servers: kafka-1.kafka-headless.db.svc.cluster.local:9092 + bootstrap.servers: localhost:9092 group.id: fetch-consumer - # security.protocol: SSL + security.protocol: SSL diff --git a/source-graphql.yaml b/source-graphql.yaml index 900e0f92b..0f0d8fa02 100644 --- a/source-graphql.yaml +++ b/source-graphql.yaml @@ -5,8 +5,8 @@ source_id: graphql-kafka source_type: kafka params: - topic: quickwit-kafka + topic: quickwit client_params: - bootstrap.servers: kafka-1.kafka-headless.db.svc.cluster.local:9092 + bootstrap.servers: localhost:9092 group.id: graphql-consumer - # security.protocol: SSL + security.protocol: SSL diff --git a/source-pageevent.yaml b/source-pageevent.yaml index 6c5582a96..0eca56956 100644 --- a/source-pageevent.yaml +++ b/source-pageevent.yaml @@ -5,8 +5,8 @@ source_id: pageevent-kafka source_type: kafka params: - topic: quickwit-kafka + topic: quickwit client_params: - bootstrap.servers: kafka-1.kafka-headless.db.svc.cluster.local:9092 + bootstrap.servers: localhost:9092 group.id: pageevent-consumer - # security.protocol: SSL + security.protocol: SSL From 4ca07f44002d3ac3e56a37503f8a93d0ad092f45 Mon Sep 17 00:00:00 2001 From: mauricio garcia suarez Date: Mon, 1 Aug 2022 14:48:15 +0200 Subject: [PATCH 13/13] Changed type loaded from Fetch from i64 to bool --- ee/quickwit/index-config-pageevent.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ee/quickwit/index-config-pageevent.yaml b/ee/quickwit/index-config-pageevent.yaml index 36a0a69fe..e47dd6a1d 100644 --- a/ee/quickwit/index-config-pageevent.yaml +++ b/ee/quickwit/index-config-pageevent.yaml @@ -25,7 +25,7 @@ doc_mapping: tokenizer: default record: position - name: loaded - type: i64 + type: bool fast: true - name: request_start type: i64