commit 4b11c77bcf063f45883d2cf8d02dd422f5478e4c Author: mauricio garcia suarez Date: Wed Jul 27 13:39:04 2022 +0200 quickwit server taking inputs from Kafka quickwit-kafka topic and saving data in s3 quickwit bucket diff --git a/clean.sh b/clean.sh new file mode 100644 index 000000000..81469034c --- /dev/null +++ b/clean.sh @@ -0,0 +1,2 @@ +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/qwdata:/quickwit/qwdata -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml quickwit/quickwit source delete --index quickwit-kafka --source kafka-source --config s3-config.yaml +docker run -v /etc/hosts:/etc/hosts:ro -v -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml $(pwd)/qwdata:/quickwit/qwdata quickwit/quickwit index delete --index quickwit-kafka --config s3-config.yaml diff --git a/create_kafka_index.sh b/create_kafka_index.sh new file mode 100644 index 000000000..651b3983c --- /dev/null +++ b/create_kafka_index.sh @@ -0,0 +1 @@ +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/qwdata:/quickwit/qwdata -v $(pwd)/index-config.yaml:/quickwit/index-config.yaml -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=eu-central-1 -e AWS_REGION=eu-central-1 quickwit/quickwit index create --index-config index-config.yaml --config s3-config.yaml diff --git a/create_source.sh b/create_source.sh new file mode 100644 index 000000000..b1c37c020 --- /dev/null +++ b/create_source.sh @@ -0,0 +1 @@ +docker run -v /etc/hosts:/etc/hosts:ro -v /etc/hosts:/etc/hosts:ro -v $(pwd)/qwdata:/quickwit/qwdata -v $(pwd)/s3-config.yaml:/quickwit/s3-config.yaml -v $(pwd)/kafka-source.yaml:/quickwit/kafka-source.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=eu-central-1 -e AWS_REGION=eu-central-1 quickwit/quickwit source create --index quickwit-kafka --source-config kafka-source.yaml --config s3-config.yaml diff --git a/index-config.yaml b/index-config.yaml new file mode 100644 index 000000000..43f8d3324 --- /dev/null +++ b/index-config.yaml @@ -0,0 +1,21 @@ +# +# Index config file for gh-archive dataset. +# + +version: 0 + +index_id: quickwit-kafka + +doc_mapping: + field_mappings: + - name: title + type: text + tokenizer: default + record: position + - name: body + type: text + tokenizer: default + record: position + +search_settings: + default_search_fields: [title, body] diff --git a/kafka-source.yaml b/kafka-source.yaml new file mode 100644 index 000000000..89b6174cf --- /dev/null +++ b/kafka-source.yaml @@ -0,0 +1,9 @@ +# +# Kafka source config file. +# +source_id: kafka-source +source_type: kafka +params: + topic: quickwit-kafka + client_params: + bootstrap.servers: kafka-1.kafka-headless.db.svc.cluster.local:9092 diff --git a/kafka_sample.py b/kafka_sample.py new file mode 100644 index 000000000..9d58ae54d --- /dev/null +++ b/kafka_sample.py @@ -0,0 +1,119 @@ +import os +from time import sleep +from kafka import KafkaConsumer, KafkaProducer +from datetime import datetime +from collections import defaultdict + +from msgcodec.codec import MessageCodec +from msgcodec.messages import SessionEnd, Fetch, FetchEvent, PageEvent, SetCSSData, SetStyleData +import json + +import getopt, sys + +n = 0 + +def transform(data): + global n + n += 1 + return {'title': f'message {n}', 'body': data} + +def create_producer(): + producer = KafkaProducer(#security_protocol="SSL", + bootstrap_servers=os.environ['KAFKA_SERVER_2'], + # os.environ['KAFKA_SERVER_1']], + #ssl_cafile="./ca.pem", + #ssl_certfile="./service.cert", + #ssl_keyfile="./service.key", + value_serializer=lambda v: json.dumps(v).encode('ascii') + ) + return producer + +def create_consumer(): + consumer = KafkaConsumer(#security_protocol="SSL", + bootstrap_servers=os.environ['KAFKA_SERVER_2'], + # os.environ['KAFKA_SERVER_1']], + group_id=f"my_test52_connector", + auto_offset_reset="earliest", + enable_auto_commit=False + ) + return consumer + + +def consumer_producer_end(): + global n + batch_size = 4000 + sessions_batch_size = 400 + batch = [] + sessions = defaultdict(lambda: None) + sessions_batch = [] + + codec = MessageCodec() + consumer = create_consumer() + producer = create_producer() + + consumer.subscribe(topics=["raw", "raw_ios"]) + print("Kafka consumer subscribed") + escape = 0 + for msg in consumer: + messages = codec.decode_detailed(msg.value) + session_id = codec.decode_key(msg.key) + if messages is None: + print('-') + for message in messages: + send = True + if isinstance(message, Fetch): + data = message.response + elif isinstance(message, FetchEvent): + data = message.response + elif isinstance(message, PageEvent): + print(message.url) + elif isinstance(message, SetCSSData): + data = message.data + elif isinstance(message, SetStyleData): + data = message.data + else: + send = False + continue + if send: + producer.send('quickwit-kafka', value=transform(data)) + print(f'added message {n}') + sleep(5) + + + +def consumer_end(): + consumer = create_consumer() + consumer.subscribe(topics=['quickwit-kafka']) + for msg in consumer: + print(msg) + + +def handle_args(): + arguments = len(sys.argv)-1 + argument_list = sys.argv[1:] + pos = 1 + short_options = 'hm:' + long_options = ['help', 'method='] + try: + arguments, values = getopt.getopt(argument_list, short_options, long_options) + except getopt.error as err: + print(str(err)) + sys.exit(2) + + for arg, argv in arguments: + if arg in ('-h', '--help'): + print(""" Methods +--method, -m available methods: consumer, producer +--help, -h show help + """) + elif arg in ('-m', '--method'): + if argv == 'consumer': + consumer_end() + elif argv == 'producer': + consumer_producer_end() + else: + print('Method not found. Available methods: consumer, producer') + + +if __name__ == '__main__': + handle_args() diff --git a/msgcodec/codec.py b/msgcodec/codec.py new file mode 100644 index 000000000..7e1aabbf9 --- /dev/null +++ b/msgcodec/codec.py @@ -0,0 +1,819 @@ +import io +from typing import List +from msgcodec.messages import * + + +class Codec: + """ + Implements encode/decode primitives + """ + + @staticmethod + def read_boolean(reader: io.BytesIO): + b = reader.read(1) + return b == 1 + + @staticmethod + def read_uint(reader: io.BytesIO): + """ + The ending "big" doesn't play any role here, + since we're dealing with data per one byte + """ + x = 0 # the result + s = 0 # the shift (our result is big-ending) + i = 0 # n of byte (max 9 for uint64) + while True: + b = reader.read(1) + if len(b) == 0: + raise IndexError('bytes out of range') + num = int.from_bytes(b, "big", signed=False) + # print(i, x) + + if num < 0x80: + if i > 9 | i == 9 & num > 1: + raise OverflowError() + return int(x | num << s) + x |= (num & 0x7f) << s + s += 7 + i += 1 + + @staticmethod + def read_int(reader: io.BytesIO) -> int: + """ + ux, err := ReadUint(reader) + x := int64(ux >> 1) + if err != nil { + return x, err + } + if ux&1 != 0 { + x = ^x + } + return x, err + """ + ux = Codec.read_uint(reader) + x = int(ux >> 1) + + if ux & 1 != 0: + x = - x - 1 + return x + + @staticmethod + def read_string(reader: io.BytesIO) -> str: + length = Codec.read_uint(reader) + s = reader.read(length) + try: + return s.decode("utf-8", errors="replace").replace("\x00", "\uFFFD") + except UnicodeDecodeError: + return None + + +class MessageCodec(Codec): + + def encode(self, m: Message) -> bytes: + ... + + def decode(self, b: bytes) -> Message: + reader = io.BytesIO(b) + return self.read_head_message(reader) + + def decode_detailed(self, b: bytes) -> List[Message]: + reader = io.BytesIO(b) + messages_list = list() + while True: + try: + messages_list.append(self.read_head_message(reader)) + except IndexError: + break + return messages_list + + def read_head_message(self, reader: io.BytesIO) -> Message: + message_id = self.read_message_id(reader) + if message_id == 0: + return Timestamp( + timestamp=self.read_uint(reader) + ) + if message_id == 1: + return SessionStart( + timestamp=self.read_uint(reader), + project_id=self.read_uint(reader), + tracker_version=self.read_string(reader), + rev_id=self.read_string(reader), + user_uuid=self.read_string(reader), + user_agent=self.read_string(reader), + user_os=self.read_string(reader), + user_os_version=self.read_string(reader), + user_browser=self.read_string(reader), + user_browser_version=self.read_string(reader), + user_device=self.read_string(reader), + user_device_type=self.read_string(reader), + user_device_memory_size=self.read_uint(reader), + user_device_heap_size=self.read_uint(reader), + user_country=self.read_string(reader) + ) + + if message_id == 2: + return SessionDisconnect( + timestamp=self.read_uint(reader) + ) + + if message_id == 3: + return SessionEnd( + timestamp=self.read_uint(reader) + ) + + if message_id == 4: + return SetPageLocation( + url=self.read_string(reader), + referrer=self.read_string(reader), + navigation_start=self.read_uint(reader) + ) + + if message_id == 5: + return SetViewportSize( + width=self.read_uint(reader), + height=self.read_uint(reader) + ) + + if message_id == 6: + return SetViewportScroll( + x=self.read_int(reader), + y=self.read_int(reader) + ) + + if message_id == 7: + return CreateDocument() + + if message_id == 8: + return CreateElementNode( + id=self.read_uint(reader), + parent_id=self.read_uint(reader), + index=self.read_uint(reader), + tag=self.read_string(reader), + svg=self.read_boolean(reader), + ) + + if message_id == 9: + return CreateTextNode( + id=self.read_uint(reader), + parent_id=self.read_uint(reader), + index=self.read_uint(reader) + ) + + if message_id == 10: + return MoveNode( + id=self.read_uint(reader), + parent_id=self.read_uint(reader), + index=self.read_uint(reader) + ) + + if message_id == 11: + return RemoveNode( + id=self.read_uint(reader) + ) + + if message_id == 12: + return SetNodeAttribute( + id=self.read_uint(reader), + name=self.read_string(reader), + value=self.read_string(reader) + ) + + if message_id == 13: + return RemoveNodeAttribute( + id=self.read_uint(reader), + name=self.read_string(reader) + ) + + if message_id == 14: + return SetNodeData( + id=self.read_uint(reader), + data=self.read_string(reader) + ) + + if message_id == 15: + return SetCSSData( + id=self.read_uint(reader), + data=self.read_string(reader) + ) + + if message_id == 16: + return SetNodeScroll( + id=self.read_uint(reader), + x=self.read_int(reader), + y=self.read_int(reader), + ) + + if message_id == 17: + return SetInputTarget( + id=self.read_uint(reader), + label=self.read_string(reader) + ) + + if message_id == 18: + return SetInputValue( + id=self.read_uint(reader), + value=self.read_string(reader), + mask=self.read_int(reader), + ) + + if message_id == 19: + return SetInputChecked( + id=self.read_uint(reader), + checked=self.read_boolean(reader) + ) + + if message_id == 20: + return MouseMove( + x=self.read_uint(reader), + y=self.read_uint(reader) + ) + + if message_id == 21: + return MouseClickDepricated( + id=self.read_uint(reader), + hesitation_time=self.read_uint(reader), + label=self.read_string(reader) + ) + + if message_id == 22: + return ConsoleLog( + level=self.read_string(reader), + value=self.read_string(reader) + ) + + if message_id == 23: + return PageLoadTiming( + request_start=self.read_uint(reader), + response_start=self.read_uint(reader), + response_end=self.read_uint(reader), + dom_content_loaded_event_start=self.read_uint(reader), + dom_content_loaded_event_end=self.read_uint(reader), + load_event_start=self.read_uint(reader), + load_event_end=self.read_uint(reader), + first_paint=self.read_uint(reader), + first_contentful_paint=self.read_uint(reader) + ) + + if message_id == 24: + return PageRenderTiming( + speed_index=self.read_uint(reader), + visually_complete=self.read_uint(reader), + time_to_interactive=self.read_uint(reader), + ) + + if message_id == 25: + return JSException( + name=self.read_string(reader), + message=self.read_string(reader), + payload=self.read_string(reader) + ) + + if message_id == 26: + return RawErrorEvent( + timestamp=self.read_uint(reader), + source=self.read_string(reader), + name=self.read_string(reader), + message=self.read_string(reader), + payload=self.read_string(reader) + ) + + if message_id == 27: + return RawCustomEvent( + name=self.read_string(reader), + payload=self.read_string(reader) + ) + + if message_id == 28: + return UserID( + id=self.read_string(reader) + ) + + if message_id == 29: + return UserAnonymousID( + id=self.read_string(reader) + ) + + if message_id == 30: + return Metadata( + key=self.read_string(reader), + value=self.read_string(reader) + ) + + if message_id == 31: + return PageEvent( + message_id=self.read_uint(reader), + timestamp=self.read_uint(reader), + url=self.read_string(reader), + referrer=self.read_string(reader), + loaded=self.read_boolean(reader), + request_start=self.read_uint(reader), + response_start=self.read_uint(reader), + response_end=self.read_uint(reader), + dom_content_loaded_event_start=self.read_uint(reader), + dom_content_loaded_event_end=self.read_uint(reader), + load_event_start=self.read_uint(reader), + load_event_end=self.read_uint(reader), + first_paint=self.read_uint(reader), + first_contentful_paint=self.read_uint(reader), + speed_index=self.read_uint(reader), + visually_complete=self.read_uint(reader), + time_to_interactive=self.read_uint(reader) + ) + + if message_id == 32: + return InputEvent( + message_id=self.read_uint(reader), + timestamp=self.read_uint(reader), + value=self.read_string(reader), + value_masked=self.read_boolean(reader), + label=self.read_string(reader), + ) + + if message_id == 33: + return ClickEvent( + message_id=self.read_uint(reader), + timestamp=self.read_uint(reader), + hesitation_time=self.read_uint(reader), + label=self.read_string(reader) + ) + + if message_id == 34: + return ErrorEvent( + message_id=self.read_uint(reader), + timestamp=self.read_uint(reader), + source=self.read_string(reader), + name=self.read_string(reader), + message=self.read_string(reader), + payload=self.read_string(reader) + ) + + if message_id == 35: + + message_id = self.read_uint(reader) + ts = self.read_uint(reader) + if ts > 9999999999999: + ts = None + return ResourceEvent( + message_id=message_id, + timestamp=ts, + duration=self.read_uint(reader), + ttfb=self.read_uint(reader), + header_size=self.read_uint(reader), + encoded_body_size=self.read_uint(reader), + decoded_body_size=self.read_uint(reader), + url=self.read_string(reader), + type=self.read_string(reader), + success=self.read_boolean(reader), + method=self.read_string(reader), + status=self.read_uint(reader) + ) + + if message_id == 36: + return CustomEvent( + message_id=self.read_uint(reader), + timestamp=self.read_uint(reader), + name=self.read_string(reader), + payload=self.read_string(reader) + ) + + if message_id == 37: + return CSSInsertRule( + id=self.read_uint(reader), + rule=self.read_string(reader), + index=self.read_uint(reader) + ) + + if message_id == 38: + return CSSDeleteRule( + id=self.read_uint(reader), + index=self.read_uint(reader) + ) + + if message_id == 39: + return Fetch( + method=self.read_string(reader), + url=self.read_string(reader), + request=self.read_string(reader), + response=self.read_string(reader), + status=self.read_uint(reader), + timestamp=self.read_uint(reader), + duration=self.read_uint(reader) + ) + + if message_id == 40: + return Profiler( + name=self.read_string(reader), + duration=self.read_uint(reader), + args=self.read_string(reader), + result=self.read_string(reader) + ) + + if message_id == 41: + return OTable( + key=self.read_string(reader), + value=self.read_string(reader) + ) + + if message_id == 42: + return StateAction( + type=self.read_string(reader) + ) + + if message_id == 43: + return StateActionEvent( + message_id=self.read_uint(reader), + timestamp=self.read_uint(reader), + type=self.read_string(reader) + ) + + if message_id == 44: + return Redux( + action=self.read_string(reader), + state=self.read_string(reader), + duration=self.read_uint(reader) + ) + + if message_id == 45: + return Vuex( + mutation=self.read_string(reader), + state=self.read_string(reader), + ) + + if message_id == 46: + return MobX( + type=self.read_string(reader), + payload=self.read_string(reader), + ) + + if message_id == 47: + return NgRx( + action=self.read_string(reader), + state=self.read_string(reader), + duration=self.read_uint(reader) + ) + + if message_id == 48: + return GraphQL( + operation_kind=self.read_string(reader), + operation_name=self.read_string(reader), + variables=self.read_string(reader), + response=self.read_string(reader) + ) + + if message_id == 49: + return PerformanceTrack( + frames=self.read_int(reader), + ticks=self.read_int(reader), + total_js_heap_size=self.read_uint(reader), + used_js_heap_size=self.read_uint(reader) + ) + + if message_id == 50: + return GraphQLEvent( + message_id=self.read_uint(reader), + timestamp=self.read_uint(reader), + name=self.read_string(reader) + ) + + if message_id == 51: + return FetchEvent( + message_id=self.read_uint(reader), + timestamp=self.read_uint(reader), + method=self.read_string(reader), + url=self.read_string(reader), + request=self.read_string(reader), + response=self.read_string(reader), + status=self.read_uint(reader), + duration=self.read_uint(reader) + ) + + if message_id == 52: + return DomDrop( + timestamp=self.read_uint(reader) + ) + + if message_id == 53: + return ResourceTiming( + timestamp=self.read_uint(reader), + duration=self.read_uint(reader), + ttfb=self.read_uint(reader), + header_size=self.read_uint(reader), + encoded_body_size=self.read_uint(reader), + decoded_body_size=self.read_uint(reader), + url=self.read_string(reader), + initiator=self.read_string(reader) + ) + + if message_id == 54: + return ConnectionInformation( + downlink=self.read_uint(reader), + type=self.read_string(reader) + ) + + if message_id == 55: + return SetPageVisibility( + hidden=self.read_boolean(reader) + ) + + if message_id == 56: + return PerformanceTrackAggr( + timestamp_start=self.read_uint(reader), + timestamp_end=self.read_uint(reader), + min_fps=self.read_uint(reader), + avg_fps=self.read_uint(reader), + max_fps=self.read_uint(reader), + min_cpu=self.read_uint(reader), + avg_cpu=self.read_uint(reader), + max_cpu=self.read_uint(reader), + min_total_js_heap_size=self.read_uint(reader), + avg_total_js_heap_size=self.read_uint(reader), + max_total_js_heap_size=self.read_uint(reader), + min_used_js_heap_size=self.read_uint(reader), + avg_used_js_heap_size=self.read_uint(reader), + max_used_js_heap_size=self.read_uint(reader) + ) + + if message_id == 59: + return LongTask( + timestamp=self.read_uint(reader), + duration=self.read_uint(reader), + context=self.read_uint(reader), + container_type=self.read_uint(reader), + container_src=self.read_string(reader), + container_id=self.read_string(reader), + container_name=self.read_string(reader) + ) + + if message_id == 60: + return SetNodeURLBasedAttribute( + id=self.read_uint(reader), + name=self.read_string(reader), + value=self.read_string(reader), + base_url=self.read_string(reader) + ) + + if message_id == 61: + return SetStyleData( + id=self.read_uint(reader), + data=self.read_string(reader), + base_url=self.read_string(reader) + ) + + if message_id == 62: + return IssueEvent( + message_id=self.read_uint(reader), + timestamp=self.read_uint(reader), + type=self.read_string(reader), + context_string=self.read_string(reader), + context=self.read_string(reader), + payload=self.read_string(reader) + ) + + if message_id == 63: + return TechnicalInfo( + type=self.read_string(reader), + value=self.read_string(reader) + ) + + if message_id == 64: + return CustomIssue( + name=self.read_string(reader), + payload=self.read_string(reader) + ) + + if message_id == 65: + return PageClose() + + if message_id == 66: + return AssetCache( + url=self.read_string(reader) + ) + + if message_id == 67: + return CSSInsertRuleURLBased( + id=self.read_uint(reader), + rule=self.read_string(reader), + index=self.read_uint(reader), + base_url=self.read_string(reader) + ) + + if message_id == 69: + return MouseClick( + id=self.read_uint(reader), + hesitation_time=self.read_uint(reader), + label=self.read_string(reader), + selector=self.read_string(reader) + ) + + if message_id == 70: + return CreateIFrameDocument( + frame_id=self.read_uint(reader), + id=self.read_uint(reader) + ) + + if message_id == 80: + return BatchMeta( + page_no=self.read_uint(reader), + first_index=self.read_uint(reader), + timestamp=self.read_int(reader) + ) + + if message_id == 90: + return IOSSessionStart( + timestamp=self.read_uint(reader), + project_id=self.read_uint(reader), + tracker_version=self.read_string(reader), + rev_id=self.read_string(reader), + user_uuid=self.read_string(reader), + user_os=self.read_string(reader), + user_os_version=self.read_string(reader), + user_device=self.read_string(reader), + user_device_type=self.read_string(reader), + user_country=self.read_string(reader) + ) + + if message_id == 91: + return IOSSessionEnd( + timestamp=self.read_uint(reader) + ) + + if message_id == 92: + return IOSMetadata( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + key=self.read_string(reader), + value=self.read_string(reader) + ) + + if message_id == 93: + return IOSCustomEvent( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + name=self.read_string(reader), + payload=self.read_string(reader) + ) + + if message_id == 94: + return IOSUserID( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + value=self.read_string(reader) + ) + + if message_id == 95: + return IOSUserAnonymousID( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + value=self.read_string(reader) + ) + + if message_id == 96: + return IOSScreenChanges( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + x=self.read_uint(reader), + y=self.read_uint(reader), + width=self.read_uint(reader), + height=self.read_uint(reader) + ) + + if message_id == 97: + return IOSCrash( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + name=self.read_string(reader), + reason=self.read_string(reader), + stacktrace=self.read_string(reader) + ) + + if message_id == 98: + return IOSScreenEnter( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + title=self.read_string(reader), + view_name=self.read_string(reader) + ) + + if message_id == 99: + return IOSScreenLeave( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + title=self.read_string(reader), + view_name=self.read_string(reader) + ) + + if message_id == 100: + return IOSClickEvent( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + label=self.read_string(reader), + x=self.read_uint(reader), + y=self.read_uint(reader) + ) + + if message_id == 101: + return IOSInputEvent( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + value=self.read_string(reader), + value_masked=self.read_boolean(reader), + label=self.read_string(reader) + ) + + if message_id == 102: + return IOSPreformanceEvent( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + name=self.read_string(reader), + value=self.read_uint(reader) + ) + + if message_id == 103: + return IOSLog( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + severity=self.read_string(reader), + content=self.read_string(reader) + ) + + if message_id == 104: + return IOSInternalError( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + content=self.read_string(reader) + ) + + if message_id == 105: + return IOSNetworkCall( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + duration=self.read_uint(reader), + headers=self.read_string(reader), + body=self.read_string(reader), + url=self.read_string(reader), + success=self.read_boolean(reader), + method=self.read_string(reader), + status=self.read_uint(reader) + ) + + if message_id == 107: + return IOSBatchMeta( + timestamp=self.read_uint(reader), + length=self.read_uint(reader), + first_index=self.read_uint(reader) + ) + + if message_id == 110: + return IOSPerformanceAggregated( + timestamp_start=self.read_uint(reader), + timestamp_end=self.read_uint(reader), + min_fps=self.read_uint(reader), + avg_fps=self.read_uint(reader), + max_fps=self.read_uint(reader), + min_cpu=self.read_uint(reader), + avg_cpu=self.read_uint(reader), + max_cpu=self.read_uint(reader), + min_memory=self.read_uint(reader), + avg_memory=self.read_uint(reader), + max_memory=self.read_uint(reader), + min_battery=self.read_uint(reader), + avg_battery=self.read_uint(reader), + max_battery=self.read_uint(reader) + ) + if message_id == 111: + return IOSIssueEvent( + timestamp=self.read_uint(reader), + type=self.read_string(reader), + context_string=self.read_string(reader), + context=self.read_string(reader), + payload=self.read_string(reader) + ) + + def read_message_id(self, reader: io.BytesIO) -> int: + """ + Read and return the first byte where the message id is encoded + """ + id_ = self.read_uint(reader) + return id_ + + @staticmethod + def check_message_id(b: bytes) -> int: + """ + todo: make it static and without reader. It's just the first byte + Read and return the first byte where the message id is encoded + """ + reader = io.BytesIO(b) + id_ = Codec.read_uint(reader) + + return id_ + + @staticmethod + def decode_key(b) -> int: + """ + Decode the message key (encoded with little endian) + """ + try: + decoded = int.from_bytes(b, "little", signed=False) + except Exception as e: + raise UnicodeDecodeError(f"Error while decoding message key (SessionID) from {b}\n{e}") + return decoded diff --git a/msgcodec/messages.py b/msgcodec/messages.py new file mode 100644 index 000000000..bc451b287 --- /dev/null +++ b/msgcodec/messages.py @@ -0,0 +1,919 @@ +""" +Representations of Kafka messages +""" +from abc import ABC + + +class Message(ABC): + pass + + +class Timestamp(Message): + __id__ = 0 + + def __init__(self, timestamp): + self.timestamp = timestamp + + +class SessionStart(Message): + __id__ = 1 + + def __init__(self, timestamp, project_id, tracker_version, rev_id, user_uuid, + user_agent, user_os, user_os_version, user_browser, user_browser_version, + user_device, user_device_type, user_device_memory_size, user_device_heap_size, + user_country): + self.timestamp = timestamp + self.project_id = project_id + self.tracker_version = tracker_version + self.rev_id = rev_id + self.user_uuid = user_uuid + self.user_agent = user_agent + self.user_os = user_os + self.user_os_version = user_os_version + self.user_browser = user_browser + self.user_browser_version = user_browser_version + self.user_device = user_device + self.user_device_type = user_device_type + self.user_device_memory_size = user_device_memory_size + self.user_device_heap_size = user_device_heap_size + self.user_country = user_country + + +class SessionDisconnect(Message): + __id__ = 2 + + def __init__(self, timestamp): + self.timestamp = timestamp + + +class SessionEnd(Message): + __id__ = 3 + __name__ = 'SessionEnd' + + def __init__(self, timestamp): + self.timestamp = timestamp + + +class SetPageLocation(Message): + __id__ = 4 + + def __init__(self, url, referrer, navigation_start): + self.url = url + self.referrer = referrer + self.navigation_start = navigation_start + + +class SetViewportSize(Message): + __id__ = 5 + + def __init__(self, width, height): + self.width = width + self.height = height + + +class SetViewportScroll(Message): + __id__ = 6 + + def __init__(self, x, y): + self.x = x + self.y = y + + +class CreateDocument(Message): + __id__ = 7 + + +class CreateElementNode(Message): + __id__ = 8 + + def __init__(self, id, parent_id, index, tag, svg): + self.id = id + self.parent_id = parent_id, + self.index = index + self.tag = tag + self.svg = svg + + +class CreateTextNode(Message): + __id__ = 9 + + def __init__(self, id, parent_id, index): + self.id = id + self.parent_id = parent_id + self.index = index + + +class MoveNode(Message): + __id__ = 10 + + def __init__(self, id, parent_id, index): + self.id = id + self.parent_id = parent_id + self.index = index + + +class RemoveNode(Message): + __id__ = 11 + + def __init__(self, id): + self.id = id + + +class SetNodeAttribute(Message): + __id__ = 12 + + def __init__(self, id, name: str, value: str): + self.id = id + self.name = name + self.value = value + + +class RemoveNodeAttribute(Message): + __id__ = 13 + + def __init__(self, id, name: str): + self.id = id + self.name = name + + +class SetNodeData(Message): + __id__ = 14 + + def __init__(self, id, data: str): + self.id = id + self.data = data + + +class SetCSSData(Message): + __id__ = 15 + + def __init__(self, id, data: str): + self.id = id + self.data = data + + +class SetNodeScroll(Message): + __id__ = 16 + + def __init__(self, id, x: int, y: int): + self.id = id + self.x = x + self.y = y + + +class SetInputTarget(Message): + __id__ = 17 + + def __init__(self, id, label: str): + self.id = id + self.label = label + + +class SetInputValue(Message): + __id__ = 18 + + def __init__(self, id, value: str, mask: int): + self.id = id + self.value = value + self.mask = mask + + +class SetInputChecked(Message): + __id__ = 19 + + def __init__(self, id, checked: bool): + self.id = id + self.checked = checked + + +class MouseMove(Message): + __id__ = 20 + + def __init__(self, x, y): + self.x = x + self.y = y + + +class MouseClickDepricated(Message): + __id__ = 21 + + def __init__(self, id, hesitation_time, label: str): + self.id = id + self.hesitation_time = hesitation_time + self.label = label + + +class ConsoleLog(Message): + __id__ = 22 + + def __init__(self, level: str, value: str): + self.level = level + self.value = value + + +class PageLoadTiming(Message): + __id__ = 23 + + def __init__(self, request_start, response_start, response_end, dom_content_loaded_event_start, + dom_content_loaded_event_end, load_event_start, load_event_end, + first_paint, first_contentful_paint): + self.request_start = request_start + self.response_start = response_start + self.response_end = response_end + self.dom_content_loaded_event_start = dom_content_loaded_event_start + self.dom_content_loaded_event_end = dom_content_loaded_event_end + self.load_event_start = load_event_start + self.load_event_end = load_event_end + self.first_paint = first_paint + self.first_contentful_paint = first_contentful_paint + + +class PageRenderTiming(Message): + __id__ = 24 + + def __init__(self, speed_index, visually_complete, time_to_interactive): + self.speed_index = speed_index + self.visually_complete = visually_complete + self.time_to_interactive = time_to_interactive + +class JSException(Message): + __id__ = 25 + + def __init__(self, name: str, message: str, payload: str): + self.name = name + self.message = message + self.payload = payload + + +class RawErrorEvent(Message): + __id__ = 26 + + def __init__(self, timestamp, source: str, name: str, message: str, + payload: str): + self.timestamp = timestamp + self.source = source + self.name = name + self.message = message + self.payload = payload + + +class RawCustomEvent(Message): + __id__ = 27 + + def __init__(self, name: str, payload: str): + self.name = name + self.payload = payload + + +class UserID(Message): + __id__ = 28 + + def __init__(self, id: str): + self.id = id + + +class UserAnonymousID(Message): + __id__ = 29 + + def __init__(self, id: str): + self.id = id + + +class Metadata(Message): + __id__ = 30 + + def __init__(self, key: str, value: str): + self.key = key + self.value = value + + +class PerformanceTrack(Message): + __id__ = 49 + + def __init__(self, frames: int, ticks: int, total_js_heap_size, + used_js_heap_size): + self.frames = frames + self.ticks = ticks + self.total_js_heap_size = total_js_heap_size + self.used_js_heap_size = used_js_heap_size + + +class PageEvent(Message): + __id__ = 31 + + def __init__(self, message_id, timestamp, url: str, referrer: str, + loaded: bool, request_start, response_start, response_end, + dom_content_loaded_event_start, dom_content_loaded_event_end, + load_event_start, load_event_end, first_paint, first_contentful_paint, + speed_index, visually_complete, time_to_interactive): + self.message_id = message_id + self.timestamp = timestamp + self.url = url + self.referrer = referrer + self.loaded = loaded + self.request_start = request_start + self.response_start = response_start + self.response_end = response_end + self.dom_content_loaded_event_start = dom_content_loaded_event_start + self.dom_content_loaded_event_end = dom_content_loaded_event_end + self.load_event_start = load_event_start + self.load_event_end = load_event_end + self.first_paint = first_paint + self.first_contentful_paint = first_contentful_paint + self.speed_index = speed_index + self.visually_complete = visually_complete + self.time_to_interactive = time_to_interactive + + +class InputEvent(Message): + __id__ = 32 + + def __init__(self, message_id, timestamp, value: str, value_masked: bool, label: str): + self.message_id = message_id + self.timestamp = timestamp + self.value = value + self.value_masked = value_masked + self.label = label + + +class ClickEvent(Message): + __id__ = 33 + + def __init__(self, message_id, timestamp, hesitation_time, label: str): + self.message_id = message_id + self.timestamp = timestamp + self.hesitation_time = hesitation_time + self.label = label + + +class ErrorEvent(Message): + __id__ = 34 + + def __init__(self, message_id, timestamp, source: str, name: str, message: str, + payload: str): + self.message_id = message_id + self.timestamp = timestamp + self.source = source + self.name = name + self.message = message + self.payload = payload + + +class ResourceEvent(Message): + __id__ = 35 + + def __init__(self, message_id, timestamp, duration, ttfb, header_size, encoded_body_size, + decoded_body_size, url: str, type: str, success: bool, method: str, status): + self.message_id = message_id + self.timestamp = timestamp + self.duration = duration + self.ttfb = ttfb + self.header_size = header_size + self.encoded_body_size = encoded_body_size + self.decoded_body_size = decoded_body_size + self.url = url + self.type = type + self.success = success + self.method = method + self.status = status + + +class CustomEvent(Message): + __id__ = 36 + + def __init__(self, message_id, timestamp, name: str, payload: str): + self.message_id = message_id + self.timestamp = timestamp + self.name = name + self.payload = payload + + +class CSSInsertRule(Message): + __id__ = 37 + + def __init__(self, id, rule: str, index): + self.id = id + self.rule = rule + self.index = index + + +class CSSDeleteRule(Message): + __id__ = 38 + + def __init__(self, id, index): + self.id = id + self.index = index + + +class Fetch(Message): + __id__ = 39 + + def __init__(self, method: str, url: str, request: str, response: str, status, + timestamp, duration): + self.method = method + self.url = url + self.request = request + self.response = response + self.status = status + self.timestamp = timestamp + self.duration = duration + + +class Profiler(Message): + __id__ = 40 + + def __init__(self, name: str, duration, args: str, result: str): + self.name = name + self.duration = duration + self.args = args + self.result = result + + +class OTable(Message): + __id__ = 41 + + def __init__(self, key: str, value: str): + self.key = key + self.value = value + + +class StateAction(Message): + __id__ = 42 + + def __init__(self, type: str): + self.type = type + + +class StateActionEvent(Message): + __id__ = 43 + + def __init__(self, message_id, timestamp, type: str): + self.message_id = message_id + self.timestamp = timestamp + self.type = type + + +class Redux(Message): + __id__ = 44 + + def __init__(self, action: str, state: str, duration): + self.action = action + self.state = state + self.duration = duration + + +class Vuex(Message): + __id__ = 45 + + def __init__(self, mutation: str, state: str): + self.mutation = mutation + self.state = state + + +class MobX(Message): + __id__ = 46 + + def __init__(self, type: str, payload: str): + self.type = type + self.payload = payload + + +class NgRx(Message): + __id__ = 47 + + def __init__(self, action: str, state: str, duration): + self.action = action + self.state = state + self.duration = duration + + +class GraphQL(Message): + __id__ = 48 + + def __init__(self, operation_kind: str, operation_name: str, + variables: str, response: str): + self.operation_kind = operation_kind + self.operation_name = operation_name + self.variables = variables + self.response = response + + +class PerformanceTrack(Message): + __id__ = 49 + + def __init__(self, frames: int, ticks: int, + total_js_heap_size, used_js_heap_size): + self.frames = frames + self.ticks = ticks + self.total_js_heap_size = total_js_heap_size + self.used_js_heap_size = used_js_heap_size + + +class GraphQLEvent(Message): + __id__ = 50 + + def __init__(self, message_id, timestamp, name: str): + self.message_id = message_id + self.timestamp = timestamp + self.name = name + + +class FetchEvent(Message): + __id__ = 51 + + def __init__(self, message_id, timestamp, method: str, url, request, response: str, + status, duration): + self.message_id = message_id + self.timestamp = timestamp + self.method = method + self.url = url + self.request = request + self.response = response + self.status = status + self.duration = duration + + +class DomDrop(Message): + __id__ = 52 + + def __init__(self, timestamp): + self.timestamp = timestamp + + +class ResourceTiming(Message): + __id__ = 53 + + def __init__(self, timestamp, duration, ttfb, header_size, encoded_body_size, + decoded_body_size, url, initiator): + self.timestamp = timestamp + self.duration = duration + self.ttfb = ttfb + self.header_size = header_size + self.encoded_body_size = encoded_body_size + self.decoded_body_size = decoded_body_size + self.url = url + self.initiator = initiator + + +class ConnectionInformation(Message): + __id__ = 54 + + def __init__(self, downlink, type: str): + self.downlink = downlink + self.type = type + + +class SetPageVisibility(Message): + __id__ = 55 + + def __init__(self, hidden: bool): + self.hidden = hidden + + +class PerformanceTrackAggr(Message): + __id__ = 56 + + def __init__(self, timestamp_start, timestamp_end, min_fps, avg_fps, + max_fps, min_cpu, avg_cpu, max_cpu, + min_total_js_heap_size, avg_total_js_heap_size, + max_total_js_heap_size, min_used_js_heap_size, + avg_used_js_heap_size, max_used_js_heap_size + ): + self.timestamp_start = timestamp_start + self.timestamp_end = timestamp_end + self.min_fps = min_fps + self.avg_fps = avg_fps + self.max_fps = max_fps + self.min_cpu = min_cpu + self.avg_cpu = avg_cpu + self.max_cpu = max_cpu + self.min_total_js_heap_size = min_total_js_heap_size + self.avg_total_js_heap_size = avg_total_js_heap_size + self.max_total_js_heap_size = max_total_js_heap_size + self.min_used_js_heap_size = min_used_js_heap_size + self.avg_used_js_heap_size = avg_used_js_heap_size + self.max_used_js_heap_size = max_used_js_heap_size + + +class LongTask(Message): + __id__ = 59 + + def __init__(self, timestamp, duration, context, container_type, container_src: str, + container_id: str, container_name: str): + self.timestamp = timestamp + self.duration = duration + self.context = context + self.container_type = container_type + self.container_src = container_src + self.container_id = container_id + self.container_name = container_name + + +class SetNodeURLBasedAttribute(Message): + __id__ = 60 + + def __init__(self, id, name: str, value: str, base_url: str): + self.id = id + self.name = name + self.value = value + self.base_url = base_url + + +class SetStyleData(Message): + __id__ = 61 + + def __init__(self, id, data: str, base_url: str): + self.id = id + self.data = data + self.base_url = base_url + + +class IssueEvent(Message): + __id__ = 62 + + def __init__(self, message_id, timestamp, type: str, context_string: str, + context: str, payload: str): + self.message_id = message_id + self.timestamp = timestamp + self.type = type + self.context_string = context_string + self.context = context + self.payload = payload + + +class TechnicalInfo(Message): + __id__ = 63 + + def __init__(self, type: str, value: str): + self.type = type + self.value = value + + +class CustomIssue(Message): + __id__ = 64 + + def __init__(self, name: str, payload: str): + self.name = name + self.payload = payload + + +class PageClose(Message): + __id__ = 65 + + +class AssetCache(Message): + __id__ = 66 + + def __init__(self, url): + self.url = url + + +class CSSInsertRuleURLBased(Message): + __id__ = 67 + + def __init__(self, id, rule, index, base_url): + self.id = id + self.rule = rule + self.index = index + self.base_url = base_url + + +class MouseClick(Message): + __id__ = 69 + + def __init__(self, id, hesitation_time, label: str, selector): + self.id = id + self.hesitation_time = hesitation_time + self.label = label + self.selector = selector + + +class CreateIFrameDocument(Message): + __id__ = 70 + + def __init__(self, frame_id, id): + self.frame_id = frame_id + self.id = id + + +class BatchMeta(Message): + __id__ = 80 + + def __init__(self, page_no, first_index, timestamp): + self.page_no = page_no + self.first_index = first_index + self.timestamp = timestamp + +class IOSSessionStart(Message): + __id__ = 90 + + def __init__(self, timestamp, project_id, tracker_version: str, + rev_id: str, user_uuid: str, user_os: str, user_os_version: str, + user_device: str, user_device_type: str, user_country: str): + self.timestamp = timestamp + self.project_id = project_id + self.tracker_version = tracker_version + self.rev_id = rev_id + self.user_uuid = user_uuid + self.user_os = user_os + self.user_os_version = user_os_version + self.user_device = user_device + self.user_device_type = user_device_type + self.user_country = user_country + + +class IOSSessionEnd(Message): + __id__ = 91 + + def __init__(self, timestamp): + self.timestamp = timestamp + + +class IOSMetadata(Message): + __id__ = 92 + + def __init__(self, timestamp, length, key: str, value: str): + self.timestamp = timestamp + self.length = length + self.key = key + self.value = value + + +class IOSCustomEvent(Message): + __id__ = 93 + + def __init__(self, timestamp, length, name: str, payload: str): + self.timestamp = timestamp + self.length = length + self.name = name + self.payload = payload + + +class IOSUserID(Message): + __id__ = 94 + + def __init__(self, timestamp, length, value: str): + self.timestamp = timestamp + self.length = length + self.value = value + + +class IOSUserAnonymousID(Message): + __id__ = 95 + + def __init__(self, timestamp, length, value: str): + self.timestamp = timestamp + self.length = length + self.value = value + + +class IOSScreenChanges(Message): + __id__ = 96 + + def __init__(self, timestamp, length, x, y, width, height): + self.timestamp = timestamp + self.length = length + self.x = x + self.y = y + self.width = width + self.height = height + + +class IOSCrash(Message): + __id__ = 97 + + def __init__(self, timestamp, length, name: str, reason: str, stacktrace): + self.timestamp = timestamp + self.length = length + self.name = name + self.reason = reason + self.stacktrace = stacktrace + + +class IOSScreenEnter(Message): + __id__ = 98 + + def __init__(self, timestamp, length, title, view_name): + self.timestamp = timestamp + self.length = length + self.title = title + self.view_name = view_name + + +class IOSScreenLeave(Message): + __id__ = 99 + + def __init__(self, timestamp, length, title: str, view_name: str): + self.timestamp = timestamp + self.length = length + self.title = title + self.view_name = view_name + + +class IOSClickEvent(Message): + __id__ = 100 + + def __init__(self, timestamp, length, label, x, y): + self.timestamp = timestamp + self.length = length + self.label = label + self.x = x + self.y = y + + +class IOSInputEvent(Message): + __id__ = 101 + + def __init__(self, timestamp, length, value: str, value_masked: bool, label: str): + self.timestamp = timestamp + self.length = length + self.value_masked = value_masked + self.label = label + + +class IOSPerformanceEvent(Message): + __id__ = 102 + + def __init__(self, timestamp, length, name: str, value): + self.timestamp = timestamp + self.length = length + self.name = name + self.value = value + + +class IOSLog(Message): + __id__ = 103 + + def __init__(self, timestamp, length, severity: str, content: str): + self.timestamp = timestamp + self.length = length + self.severity = severity + self.content = content + + +class IOSInternalError(Message): + __id__ = 104 + + def __init__(self, timestamp, length, content: str): + self.timestamp = timestamp + self.length = length + self.content = content + + +class IOSNetworkCall(Message): + __id__ = 105 + + def __init__(self, timestamp, length, duration, headers, body, url, success: bool, method: str, status): + self.timestamp = timestamp + self.length = length + self.duration = duration + self.headers = headers + self.body = body + self.url = url + self.success = success + self.method = method + self.status = status + + +class IOSBatchMeta(Message): + __id__ = 107 + + def __init__(self, timestamp, length, first_index): + self.timestamp = timestamp + self.length = length + self.first_index = first_index + + +class IOSPerformanceAggregated(Message): + __id__ = 110 + + def __init__(self, timestamp_start, timestamp_end, min_fps, avg_fps, + max_fps, min_cpu, avg_cpu, max_cpu, + min_memory, avg_memory, max_memory, + min_battery, avg_battery, max_battery + ): + self.timestamp_start = timestamp_start + self.timestamp_end = timestamp_end + self.min_fps = min_fps + self.avg_fps = avg_fps + self.max_fps = max_fps + self.min_cpu = min_cpu + self.avg_cpu = avg_cpu + self.max_cpu = max_cpu + self.min_memory = min_memory + self.avg_memory = avg_memory + self.max_memory = max_memory + self.min_battery = min_battery + self.avg_battery = avg_battery + self.max_battery = max_battery + + +class IOSIssueEvent(Message): + __id__ = 111 + + def __init__(self, timestamp, type: str, context_string: str, context: str, payload: str): + self.timestamp = timestamp + self.type = type + self.context_string = context_string + self.context = context + self.payload = payload diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 000000000..d328a9142 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +kafka-python diff --git a/run_quickwit.sh b/run_quickwit.sh new file mode 100644 index 000000000..a00b1046d --- /dev/null +++ b/run_quickwit.sh @@ -0,0 +1 @@ +docker run -v /etc/hosts:/etc/hosts:ro -v $(pwd)/qwdata:/quickwit/qwdata -v $(pwd)/s3-config-listen.yaml:/quickwit/s3-config-listen.yaml -e AWS_ACCESS_KEY_ID=$aws_access_key_id -e AWS_SECRET_ACCESS_KEY=$aws_secret_access_key -e AWS_DEFAULT_REGION=eu-central-1 -e AWS_REGION=eu-central-1 -p 127.0.0.1:7280:7280 quickwit/quickwit run --config s3-config-listen.yaml diff --git a/s3-config-listen.yaml b/s3-config-listen.yaml new file mode 100644 index 000000000..524339f2f --- /dev/null +++ b/s3-config-listen.yaml @@ -0,0 +1,5 @@ +## In order to save data into S3 +version: 0 +metastore_uri: s3://quickwit/quickwit-indexes +default_index_root_uri: s3://quickwit/quickwit-indexes +listen_address: 0.0.0.0 diff --git a/s3-config.yaml b/s3-config.yaml new file mode 100644 index 000000000..5cf727b28 --- /dev/null +++ b/s3-config.yaml @@ -0,0 +1,4 @@ +## In order to save data into S3 +version: 0 +metastore_uri: s3://quickwit/quickwit-indexes +default_index_root_uri: s3://quickwit/quickwit-indexes