diff --git a/ee/connectors/Dockerfile_fill b/ee/connectors/Dockerfile_fill new file mode 100644 index 000000000..33a877eba --- /dev/null +++ b/ee/connectors/Dockerfile_fill @@ -0,0 +1,13 @@ +FROM amancevice/pandas:2.0.2-alpine + +WORKDIR app + +COPY requirements-fill.txt . +RUN apk add --no-cache --virtual .build-deps gcc g++ musl-dev postgresql-dev && \ + pip install -r requirements-fill.txt --no-cache-dir && \ + apk --purge del .build-deps + +COPY utils utils +COPY fill_from_db.py . +COPY entrypoint-fill.sh . +ENTRYPOINT ./entrypoint-fill.sh diff --git a/ee/connectors/build_modules.sh b/ee/connectors/build_modules.sh new file mode 100755 index 000000000..197e037ca --- /dev/null +++ b/ee/connectors/build_modules.sh @@ -0,0 +1,9 @@ +pip install cython +cd msgcodec +python3 setup-messages.py build_ext --inplace +python3 setup-msgcodec.py build_ext --inplace +python3 setup-messages.py install +python3 setup-msgcodec.py install +cd .. +rm -rf msgcodec +pip uninstall cython -y diff --git a/ee/connectors/consumer_async.py b/ee/connectors/consumer_async.py index 9609eadb6..1753d641f 100644 --- a/ee/connectors/consumer_async.py +++ b/ee/connectors/consumer_async.py @@ -6,8 +6,8 @@ import asyncio from time import time, sleep from copy import deepcopy -from msgcodec.msgcodec import MessageCodec -from msgcodec.messages import SessionEnd +from msgcodec import MessageCodec +from messages import SessionEnd from db.api import DBConnection from db.models import events_detailed_table_name, events_table_name, sessions_table_name from db.writer import insert_batch, update_batch @@ -18,18 +18,18 @@ from utils import pg_client from psycopg2 import InterfaceError from utils.signal_handler import signal_handler -def process_message(msg, codec, sessions, batch, sessions_batch, interesting_sessions, interesting_events, EVENT_TYPE, projectFilter): +def process_message(msg, codec, sessions, batch, sessions_batch, interesting_sessions, interesting_events, EVENT_TYPE, projectFilter, broken_batchs = 0): if msg is None: return messages = codec.decode_detailed(msg.value()) try: session_id = codec.decode_key(msg.key()) except Exception as e: - print('[WARN] Broken sessionid') - print(e) + broken_batchs = broken_batchs + 1 + # print('[WARN] Broken sessionid') + # print(e) return if messages is None: - print('-') return elif not projectFilter.is_valid(session_id): # We check using projectFilter if session_id is from the selected projects @@ -60,7 +60,7 @@ def process_message(msg, codec, sessions, batch, sessions_batch, interesting_ses try: del sessions[sess_id] except KeyError: - print('[INFO] Session already deleted') + ... else: print('[WARN] Session not started received SessionEnd message') del sessions[session_id] @@ -78,9 +78,9 @@ def process_message(msg, codec, sessions, batch, sessions_batch, interesting_ses def attempt_session_insert(sess_batch, db, sessions_table_name, try_=0): if sess_batch: try: - print("inserting sessions...") + #print("inserting sessions...") insert_batch(db, sess_batch, table=sessions_table_name, level='sessions') - print("inserted sessions succesfully") + #print("inserted sessions succesfully") except TypeError as e: print("Type conversion error") print(repr(e)) @@ -99,7 +99,7 @@ def attempt_session_insert(sess_batch, db, sessions_table_name, try_=0): def attempt_session_update(sess_batch, db, sessions_table_name): if sess_batch: try: - print('updating sessions') + #print('updating sessions') update_batch(db, sess_batch, table=sessions_table_name) except TypeError as e: print('Type conversion error') @@ -117,9 +117,9 @@ def attempt_session_update(sess_batch, db, sessions_table_name): def attempt_batch_insert(batch, db, table_name, EVENT_TYPE, try_=0): # insert a batch try: - print("inserting...") + #print("inserting...") insert_batch(db=db, batch=batch, table=table_name, level=EVENT_TYPE) - print("inserted succesfully") + #print("inserted succesfully") except TypeError as e: print("Type conversion error") print(repr(e)) @@ -148,7 +148,8 @@ def decode_key(b) -> int: try: decoded = int.from_bytes(b, "little", signed=False) except Exception as e: - raise UnicodeDecodeError(f"Error while decoding message key (SessionID) from {b}\n{e}") + print(f'Error while decoding message key (SessionId) from {b}') + raise e return decoded @@ -200,12 +201,16 @@ async def main(): c_time = time() read_msgs = 0 + broken_batchs = 0 while signal_handler.KEEP_PROCESSING: msg = consumer.poll(1.0) - process_message(msg, codec, sessions, batch, sessions_batch, sessions_events_selection, selected_events, EVENT_TYPE, project_filter) + process_message(msg, codec, sessions, batch, sessions_batch, sessions_events_selection, selected_events, EVENT_TYPE, project_filter, broken_batchs) read_msgs += 1 if time() - c_time > upload_rate: - print(f'[INFO] {read_msgs} kafka messages read in {upload_rate} seconds') + #print(f'[INFO] {read_msgs} kafka messages read in {upload_rate} seconds') + if broken_batchs > 0: + print(f'[WARN] {broken_batchs} broken sessionIds') + broken_batchs = 0 await insertBatch(deepcopy(sessions_batch), deepcopy(batch), db, sessions_table_name, table_name, EVENT_TYPE) consumer.commit() try: @@ -231,7 +236,7 @@ async def insertBatch(sessions_batch, batch, db, sessions_table_name, table_name updated_sessions.append(session_in_batch) else: new_sessions.append(session_in_batch) - print(f'[DEBUG] Number of new sessions {len(new_sessions)}, number of sessions to update {len(updated_sessions)}') + #print(f'[DEBUG] Number of new sessions {len(new_sessions)}, number of sessions to update {len(updated_sessions)}') if new_sessions != []: attempt_session_insert(new_sessions, db, sessions_table_name) diff --git a/ee/connectors/consumer_test.py b/ee/connectors/consumer_test.py new file mode 100644 index 000000000..137910928 --- /dev/null +++ b/ee/connectors/consumer_test.py @@ -0,0 +1,41 @@ +from decouple import config, Csv +import asyncio +from db.api import DBConnection +from utils import pg_client +from utils.worker import WorkerPool + + +def main(): + DATABASE = config('CLOUD_SERVICE') + database_api = DBConnection(DATABASE) + + allowed_projects = config('PROJECT_IDS', default=None, cast=Csv(int)) + w_pool = WorkerPool(n_workers=60, + project_filter=allowed_projects) + try: + w_pool.load_checkpoint(database_api) + except Exception as e: + print('[WARN] Checkpoint not found') + print(repr(e)) + # ssl_protocol = config('KAFKA_USE_SSL', default=True, cast=bool) + # consumer_settings = { + # "bootstrap.servers": config('KAFKA_SERVERS'), + # "group.id": f"connector_{DATABASE}", + # "auto.offset.reset": "earliest", + # "enable.auto.commit": False + # } + # if ssl_protocol: + # consumer_settings['security.protocol'] = 'SSL' + # consumer = Consumer(consumer_settings) + + # consumer.subscribe(config("TOPICS", default="saas-raw").split(',')) + print("[INFO] Kafka consumer subscribed") + + # w_pool.run_workers(kafka_consumer=consumer, database_api=database_api) + w_pool.run_workers(database_api=database_api) + + +if __name__ == '__main__': + asyncio.run(pg_client.init()) + main() + diff --git a/ee/connectors/db/api.py b/ee/connectors/db/api.py index 4c449105b..d3c294042 100644 --- a/ee/connectors/db/api.py +++ b/ee/connectors/db/api.py @@ -156,6 +156,7 @@ class DBConnection: self.close() self.__init__(config=self.config) + def save_binary(self, binary_data, name, **kwargs): if self.config == 'redshift': try: diff --git a/ee/connectors/deploy/Dockerfile_redshift b/ee/connectors/deploy/Dockerfile_redshift index 0cefbc553..a459491a1 100644 --- a/ee/connectors/deploy/Dockerfile_redshift +++ b/ee/connectors/deploy/Dockerfile_redshift @@ -1,4 +1,4 @@ -FROM public.ecr.aws/p1t3u8a3/connectors/redshift:base +FROM redshift_connector_base ENV CLOUD_SERVICE=redshift \ CONNECTION_STRING=postgresql+psycopg2://{USER}:{PASSWORD}@{HOST}:{PORT}/{DBNAME} \ @@ -7,9 +7,17 @@ ENV CLOUD_SERVICE=redshift \ PG_MAXCONN=10 RUN apk add --no-cache postgresql-libs lz4-libs zstd-libs COPY deploy/requirements_redshift.txt . +COPY msgcodec msgcodec +COPY build_modules.sh . RUN apk add --no-cache --virtual .build-deps gcc g++ musl-dev postgresql-dev && \ - python3 -m pip install -r requirements_redshift.txt --no-cache-dir && \ + ./build_modules.sh && python3 -m pip install -r requirements_redshift.txt --no-cache-dir && \ apk --purge del .build-deps -COPY . . +COPY utils utils +COPY db db +COPY sql sql +COPY handler.py . +COPY consumer_async.py . +COPY fill_from_db.py . +COPY entrypoint.sh . ENTRYPOINT ./entrypoint.sh diff --git a/ee/connectors/deploy/Dockerfile_redshift_base b/ee/connectors/deploy/Dockerfile_redshift_base index 57100d8f2..2fc81d7f0 100644 --- a/ee/connectors/deploy/Dockerfile_redshift_base +++ b/ee/connectors/deploy/Dockerfile_redshift_base @@ -1,7 +1,7 @@ -FROM amancevice/pandas:2.0.0-alpine +FROM amancevice/pandas:2.0.2-alpine WORKDIR /usr/src/app -ENV LIBRD_VER=2.0.2 +ENV LIBRD_VER=2.1.1 WORKDIR /work RUN apk add --no-cache --virtual .make-deps postgresql-dev gcc python3-dev \ musl-dev linux-headers g++ libc-dev libffi-dev make cmake py-pip build-base \ diff --git a/ee/connectors/entrypoint-fill.sh b/ee/connectors/entrypoint-fill.sh new file mode 100755 index 000000000..7ef257b17 --- /dev/null +++ b/ee/connectors/entrypoint-fill.sh @@ -0,0 +1 @@ +python -u fill_from_db.py diff --git a/ee/connectors/fill_from_db.py b/ee/connectors/fill_from_db.py new file mode 100644 index 000000000..9bd7fe18b --- /dev/null +++ b/ee/connectors/fill_from_db.py @@ -0,0 +1,60 @@ +import pandas_redshift as pdredshift +from utils import pg_client +from decouple import config, Choices +import asyncio + +DATABASE = config('CLOUD_SERVICE') +sessions_table_name = config('SESSIONS_TABLE', default='connector_user_sessions') +table = sessions_table_name +sslmode = config('DB_SSLMODE', + cast=Choices(['disable', 'allow', 'prefer', 'require', 'verify-ca', 'verify-full']), + default='allow' +) +ci = config('cluster_info', default='') +cluster_info = dict() +if ci == '': + cluster_info['USER'] = config('USER') + cluster_info['HOST'] = config('HOST') + cluster_info['PORT'] = config('PORT') + cluster_info['PASSWORD'] = config('PASSWORD') + cluster_info['DBNAME'] = config('DBNAME') +else: + ci = ci.split(' ') + cluster_info = dict() + for _d in ci: + k,v = _d.split('=') + cluster_info[k]=v +pdredshift.connect_to_redshift(dbname=cluster_info['DBNAME'], + host=cluster_info['HOST'], + port=cluster_info['PORT'], + user=cluster_info['USER'], + password=cluster_info['PASSWORD'], + sslmode=sslmode) + +async def main(limit = 100): + query = "SELECT sessionid FROM {table} WHERE user_id = 'NULL' LIMIT {limit}" + try: + res = pdredshift.redshift_to_pandas(query.format(table=table, limit=limit)) + except Exception as e: + print(repr(e)) + if len(res) == 0: + return + sessionids = list(map(lambda k: str(k), res['sessionid'])) + + await pg_client.init() + with pg_client.PostgresClient() as conn: + conn.execute('SELECT session_id, user_id FROM sessions WHERE session_id IN ({session_id_list})'.format( + session_id_list = ','.join(sessionids)) + ) + pg_res = conn.fetchall() + + base_query = "UPDATE {table} SET user_id = '{user_id}' WHERE sessionid = {session_id}" + for e in pg_res: + query = base_query.format(table=table, user_id=e['user_id'], session_id=e['session_id']) + try: + pdredshift.exec_commit(query) + except Exception as e: + print(repr(e)) + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/ee/connectors/handler.py b/ee/connectors/handler.py index 9843eb085..9edfbd34e 100644 --- a/ee/connectors/handler.py +++ b/ee/connectors/handler.py @@ -1,7 +1,7 @@ from typing import Optional, Union from db.models import Event, DetailedEvent, Session -from msgcodec.messages import * +from messages import * def handle_normal_message(message: Message) -> Optional[Event]: @@ -75,7 +75,8 @@ def handle_normal_message(message: Message) -> Optional[Event]: return n if isinstance(message, UserID): - n.user_id = message.id + if message.id != '': + n.user_id = message.id return n if isinstance(message, IssueEvent): @@ -112,7 +113,7 @@ def handle_session(n: Session, message: Message) -> Optional[Session]: n.user_device_type = message.user_device_type n.user_device_memory_size = message.user_device_memory_size n.user_device_heap_size = message.user_device_heap_size - n.user_country = message.user_country + n.user_country = message.user_country.split('|')[0] return n if isinstance(message, SessionEnd): @@ -185,7 +186,12 @@ def handle_session(n: Session, message: Message) -> Optional[Session]: return n if isinstance(message, UserID): - n.user_id = message.id + try: + n.user_id = message.id + except AttributeError as e: + print(f'Session current type: {type(n)}') + print(f'Message id: {message.id}') + raise e return n if isinstance(message, UserAnonymousID): diff --git a/ee/connectors/msgcodec/abc.pxd b/ee/connectors/msgcodec/abc.pxd new file mode 100644 index 000000000..b4debcbad --- /dev/null +++ b/ee/connectors/msgcodec/abc.pxd @@ -0,0 +1,12 @@ +cdef extern from "Python.h": + ctypedef struct PyObject: + pass + +#cdef extern from "cpython/abc.pxd": +# cdef class ABC: +# pass + +cdef class ABC: + pass +# Optionally, you can include other declarations from abc.pxd if needed + diff --git a/ee/connectors/msgcodec/codec.pyx b/ee/connectors/msgcodec/codec.pyx new file mode 100644 index 000000000..688aa7632 --- /dev/null +++ b/ee/connectors/msgcodec/codec.pyx @@ -0,0 +1,92 @@ +cimport io +from libc.stdlib cimport abort + +cdef extern from "Python.h": + int PyArg_ParseTupleAndKeywords(object args, object kwargs, char* format, char** keywords, ...) + +ctypedef object PyBytesIO + +cdef class Codec: + """ + Implements encode/decode primitives + """ + + @staticmethod + cdef read_boolean(PyBytesIO reader): + cdef bint b + b = reader.read(1)[0] + return b == 1 + + @staticmethod + def read_bool_method(PyBytesIO reader): + return Codec.read_boolean(reader) + + @staticmethod + cdef read_uint(PyBytesIO reader): + cdef int x = 0 # the result + cdef int s = 0 # the shift (our result is big-ending) + cdef int i = 0 # n of byte (max 9 for uint64) + cdef bytes b + cdef int num + + while True: + b = reader.read(1) + if len(b) == 0: + raise IndexError('bytes out of range') + + num = int.from_bytes(b, "big", signed=False) + + if num < 0x80: + if i > 9 or (i == 9 and num > 1): + raise OverflowError() + return int(x | num << s) + x |= (num & 0x7f) << s + s += 7 + i += 1 + + @staticmethod + def read_size(PyBytesIO reader): + cdef int size = 0 + cdef bytes b + cdef int num + for i in range(3): + b = reader.read(1) + num = int.from_bytes(b, "big", signed=False) + size += num << (8*i) + return size + + + @staticmethod + def read_int(PyBytesIO reader): + """ + ux, err := ReadUint(reader) + x := int64(ux >> 1) + if err != nil { + return x, err + } + if ux&1 != 0 { + x = ^x + } + return x, err + """ + cdef int ux = Codec.read_uint(reader) + cdef int x = int(ux >> 1) + + if ux & 1 != 0: + x = - x - 1 + return x + + @staticmethod + def read_string(PyBytesIO reader): + cdef int length = Codec.read_uint(reader) + cdef int s + try: + s = reader.read(length) + except Exception as e: + print(f'Error while reading string of length {length}') + raise Exception(e) + try: + return s.decode("utf-8", errors="replace").replace("\x00", "\uFFFD") + except UnicodeDecodeError: + return None + diff --git a/ee/connectors/msgcodec/io.pxd b/ee/connectors/msgcodec/io.pxd new file mode 100644 index 000000000..8436e1dfe --- /dev/null +++ b/ee/connectors/msgcodec/io.pxd @@ -0,0 +1,21 @@ +cdef extern from "Python.h": + ctypedef struct PyBytesIO: + pass + +cdef PyBytesIO* PyBytesIO_New() +cdef void PyBytesIO_Init(PyBytesIO* self, object buf) +cdef object PyBytesIO_GetValue(PyBytesIO* self) +cdef void PyBytesIO_SetValue(PyBytesIO* self, object buf) +cdef void PyBytesIO_Write(PyBytesIO* self, const char* s, Py_ssize_t size) +cdef void PyBytesIO_WriteObject(PyBytesIO* self, object o) + +cdef object PyBytesIO_Read(PyBytesIO* self, Py_ssize_t n) +cdef object PyBytesIO_Readline(PyBytesIO* self, Py_ssize_t n) +cdef object PyBytesIO_Readlines(PyBytesIO* self, Py_ssize_t n) +cdef void PyBytesIO_Seek(PyBytesIO* self, Py_ssize_t pos, int whence) +cdef Py_ssize_t PyBytesIO_Tell(PyBytesIO* self) +cdef void PyBytesIO_Truncate(PyBytesIO* self, Py_ssize_t size) +cdef void PyBytesIO_Flush(PyBytesIO* self) +cdef object PyBytesIO_GetSize(PyBytesIO* self) +cdef void PyBytesIO_Close(PyBytesIO* self) + diff --git a/ee/connectors/msgcodec/messages.pyx b/ee/connectors/msgcodec/messages.pyx new file mode 100644 index 000000000..b73479dba --- /dev/null +++ b/ee/connectors/msgcodec/messages.pyx @@ -0,0 +1,1491 @@ +# Auto-generated, do not edit + +from abc cimport ABC + +class Message(ABC): + pass + +cdef class PyMessage: + def __cinit__(self): + pass + + +cdef class Timestamp(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + + def __init__(self, unsigned long timestamp): + self.__id__ = 0 + self.timestamp = timestamp + + +cdef class SessionStart(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public unsigned long project_id + cdef public str tracker_version + cdef public str rev_id + cdef public str user_uuid + cdef public str user_agent + cdef public str user_os + cdef public str user_os_version + cdef public str user_browser + cdef public str user_browser_version + cdef public str user_device + cdef public str user_device_type + cdef public unsigned long user_device_memory_size + cdef public unsigned long user_device_heap_size + cdef public str user_country + cdef public str user_id + + def __init__(self, unsigned long timestamp, unsigned long project_id, str tracker_version, str rev_id, str user_uuid, str user_agent, str user_os, str user_os_version, str user_browser, str user_browser_version, str user_device, str user_device_type, unsigned long user_device_memory_size, unsigned long user_device_heap_size, str user_country, str user_id): + self.__id__ = 1 + self.timestamp = timestamp + self.project_id = project_id + self.tracker_version = tracker_version + self.rev_id = rev_id + self.user_uuid = user_uuid + self.user_agent = user_agent + self.user_os = user_os + self.user_os_version = user_os_version + self.user_browser = user_browser + self.user_browser_version = user_browser_version + self.user_device = user_device + self.user_device_type = user_device_type + self.user_device_memory_size = user_device_memory_size + self.user_device_heap_size = user_device_heap_size + self.user_country = user_country + self.user_id = user_id + + +cdef class SessionEndDeprecated(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + + def __init__(self, unsigned long timestamp): + self.__id__ = 3 + self.timestamp = timestamp + + +cdef class SetPageLocation(PyMessage): + cdef public int __id__ + cdef public str url + cdef public str referrer + cdef public unsigned long navigation_start + + def __init__(self, str url, str referrer, unsigned long navigation_start): + self.__id__ = 4 + self.url = url + self.referrer = referrer + self.navigation_start = navigation_start + + +cdef class SetViewportSize(PyMessage): + cdef public int __id__ + cdef public unsigned long width + cdef public unsigned long height + + def __init__(self, unsigned long width, unsigned long height): + self.__id__ = 5 + self.width = width + self.height = height + + +cdef class SetViewportScroll(PyMessage): + cdef public int __id__ + cdef public long x + cdef public long y + + def __init__(self, long x, long y): + self.__id__ = 6 + self.x = x + self.y = y + + +cdef class CreateDocument(PyMessage): + cdef public int __id__ + + + def __init__(self, ): + self.__id__ = 7 + + + +cdef class CreateElementNode(PyMessage): + cdef public int __id__ + cdef public unsigned long id + cdef public unsigned long parent_id + cdef public unsigned long index + cdef public str tag + cdef public bint svg + + def __init__(self, unsigned long id, unsigned long parent_id, unsigned long index, str tag, bint svg): + self.__id__ = 8 + self.id = id + self.parent_id = parent_id + self.index = index + self.tag = tag + self.svg = svg + + +cdef class CreateTextNode(PyMessage): + cdef public int __id__ + cdef public unsigned long id + cdef public unsigned long parent_id + cdef public unsigned long index + + def __init__(self, unsigned long id, unsigned long parent_id, unsigned long index): + self.__id__ = 9 + self.id = id + self.parent_id = parent_id + self.index = index + + +cdef class MoveNode(PyMessage): + cdef public int __id__ + cdef public unsigned long id + cdef public unsigned long parent_id + cdef public unsigned long index + + def __init__(self, unsigned long id, unsigned long parent_id, unsigned long index): + self.__id__ = 10 + self.id = id + self.parent_id = parent_id + self.index = index + + +cdef class RemoveNode(PyMessage): + cdef public int __id__ + cdef public unsigned long id + + def __init__(self, unsigned long id): + self.__id__ = 11 + self.id = id + + +cdef class SetNodeAttribute(PyMessage): + cdef public int __id__ + cdef public unsigned long id + cdef public str name + cdef public str value + + def __init__(self, unsigned long id, str name, str value): + self.__id__ = 12 + self.id = id + self.name = name + self.value = value + + +cdef class RemoveNodeAttribute(PyMessage): + cdef public int __id__ + cdef public unsigned long id + cdef public str name + + def __init__(self, unsigned long id, str name): + self.__id__ = 13 + self.id = id + self.name = name + + +cdef class SetNodeData(PyMessage): + cdef public int __id__ + cdef public unsigned long id + cdef public str data + + def __init__(self, unsigned long id, str data): + self.__id__ = 14 + self.id = id + self.data = data + + +cdef class SetCSSData(PyMessage): + cdef public int __id__ + cdef public unsigned long id + cdef public str data + + def __init__(self, unsigned long id, str data): + self.__id__ = 15 + self.id = id + self.data = data + + +cdef class SetNodeScroll(PyMessage): + cdef public int __id__ + cdef public unsigned long id + cdef public long x + cdef public long y + + def __init__(self, unsigned long id, long x, long y): + self.__id__ = 16 + self.id = id + self.x = x + self.y = y + + +cdef class SetInputTarget(PyMessage): + cdef public int __id__ + cdef public unsigned long id + cdef public str label + + def __init__(self, unsigned long id, str label): + self.__id__ = 17 + self.id = id + self.label = label + + +cdef class SetInputValue(PyMessage): + cdef public int __id__ + cdef public unsigned long id + cdef public str value + cdef public long mask + + def __init__(self, unsigned long id, str value, long mask): + self.__id__ = 18 + self.id = id + self.value = value + self.mask = mask + + +cdef class SetInputChecked(PyMessage): + cdef public int __id__ + cdef public unsigned long id + cdef public bint checked + + def __init__(self, unsigned long id, bint checked): + self.__id__ = 19 + self.id = id + self.checked = checked + + +cdef class MouseMove(PyMessage): + cdef public int __id__ + cdef public unsigned long x + cdef public unsigned long y + + def __init__(self, unsigned long x, unsigned long y): + self.__id__ = 20 + self.x = x + self.y = y + + +cdef class NetworkRequest(PyMessage): + cdef public int __id__ + cdef public str type + cdef public str method + cdef public str url + cdef public str request + cdef public str response + cdef public unsigned long status + cdef public unsigned long timestamp + cdef public unsigned long duration + + def __init__(self, str type, str method, str url, str request, str response, unsigned long status, unsigned long timestamp, unsigned long duration): + self.__id__ = 21 + self.type = type + self.method = method + self.url = url + self.request = request + self.response = response + self.status = status + self.timestamp = timestamp + self.duration = duration + + +cdef class ConsoleLog(PyMessage): + cdef public int __id__ + cdef public str level + cdef public str value + + def __init__(self, str level, str value): + self.__id__ = 22 + self.level = level + self.value = value + + +cdef class PageLoadTiming(PyMessage): + cdef public int __id__ + cdef public unsigned long request_start + cdef public unsigned long response_start + cdef public unsigned long response_end + cdef public unsigned long dom_content_loaded_event_start + cdef public unsigned long dom_content_loaded_event_end + cdef public unsigned long load_event_start + cdef public unsigned long load_event_end + cdef public unsigned long first_paint + cdef public unsigned long first_contentful_paint + + def __init__(self, unsigned long request_start, unsigned long response_start, unsigned long response_end, unsigned long dom_content_loaded_event_start, unsigned long dom_content_loaded_event_end, unsigned long load_event_start, unsigned long load_event_end, unsigned long first_paint, unsigned long first_contentful_paint): + self.__id__ = 23 + self.request_start = request_start + self.response_start = response_start + self.response_end = response_end + self.dom_content_loaded_event_start = dom_content_loaded_event_start + self.dom_content_loaded_event_end = dom_content_loaded_event_end + self.load_event_start = load_event_start + self.load_event_end = load_event_end + self.first_paint = first_paint + self.first_contentful_paint = first_contentful_paint + + +cdef class PageRenderTiming(PyMessage): + cdef public int __id__ + cdef public unsigned long speed_index + cdef public unsigned long visually_complete + cdef public unsigned long time_to_interactive + + def __init__(self, unsigned long speed_index, unsigned long visually_complete, unsigned long time_to_interactive): + self.__id__ = 24 + self.speed_index = speed_index + self.visually_complete = visually_complete + self.time_to_interactive = time_to_interactive + + +cdef class JSExceptionDeprecated(PyMessage): + cdef public int __id__ + cdef public str name + cdef public str message + cdef public str payload + + def __init__(self, str name, str message, str payload): + self.__id__ = 25 + self.name = name + self.message = message + self.payload = payload + + +cdef class IntegrationEvent(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public str source + cdef public str name + cdef public str message + cdef public str payload + + def __init__(self, unsigned long timestamp, str source, str name, str message, str payload): + self.__id__ = 26 + self.timestamp = timestamp + self.source = source + self.name = name + self.message = message + self.payload = payload + + +cdef class CustomEvent(PyMessage): + cdef public int __id__ + cdef public str name + cdef public str payload + + def __init__(self, str name, str payload): + self.__id__ = 27 + self.name = name + self.payload = payload + + +cdef class UserID(PyMessage): + cdef public int __id__ + cdef public str id + + def __init__(self, str id): + self.__id__ = 28 + self.id = id + + +cdef class UserAnonymousID(PyMessage): + cdef public int __id__ + cdef public str id + + def __init__(self, str id): + self.__id__ = 29 + self.id = id + + +cdef class Metadata(PyMessage): + cdef public int __id__ + cdef public str key + cdef public str value + + def __init__(self, str key, str value): + self.__id__ = 30 + self.key = key + self.value = value + + +cdef class PageEvent(PyMessage): + cdef public int __id__ + cdef public unsigned long message_id + cdef public unsigned long timestamp + cdef public str url + cdef public str referrer + cdef public bint loaded + cdef public unsigned long request_start + cdef public unsigned long response_start + cdef public unsigned long response_end + cdef public unsigned long dom_content_loaded_event_start + cdef public unsigned long dom_content_loaded_event_end + cdef public unsigned long load_event_start + cdef public unsigned long load_event_end + cdef public unsigned long first_paint + cdef public unsigned long first_contentful_paint + cdef public unsigned long speed_index + cdef public unsigned long visually_complete + cdef public unsigned long time_to_interactive + + def __init__(self, unsigned long message_id, unsigned long timestamp, str url, str referrer, bint loaded, unsigned long request_start, unsigned long response_start, unsigned long response_end, unsigned long dom_content_loaded_event_start, unsigned long dom_content_loaded_event_end, unsigned long load_event_start, unsigned long load_event_end, unsigned long first_paint, unsigned long first_contentful_paint, unsigned long speed_index, unsigned long visually_complete, unsigned long time_to_interactive): + self.__id__ = 31 + self.message_id = message_id + self.timestamp = timestamp + self.url = url + self.referrer = referrer + self.loaded = loaded + self.request_start = request_start + self.response_start = response_start + self.response_end = response_end + self.dom_content_loaded_event_start = dom_content_loaded_event_start + self.dom_content_loaded_event_end = dom_content_loaded_event_end + self.load_event_start = load_event_start + self.load_event_end = load_event_end + self.first_paint = first_paint + self.first_contentful_paint = first_contentful_paint + self.speed_index = speed_index + self.visually_complete = visually_complete + self.time_to_interactive = time_to_interactive + + +cdef class InputEvent(PyMessage): + cdef public int __id__ + cdef public unsigned long message_id + cdef public unsigned long timestamp + cdef public str value + cdef public bint value_masked + cdef public str label + + def __init__(self, unsigned long message_id, unsigned long timestamp, str value, bint value_masked, str label): + self.__id__ = 32 + self.message_id = message_id + self.timestamp = timestamp + self.value = value + self.value_masked = value_masked + self.label = label + + +cdef class CSSInsertRule(PyMessage): + cdef public int __id__ + cdef public unsigned long id + cdef public str rule + cdef public unsigned long index + + def __init__(self, unsigned long id, str rule, unsigned long index): + self.__id__ = 37 + self.id = id + self.rule = rule + self.index = index + + +cdef class CSSDeleteRule(PyMessage): + cdef public int __id__ + cdef public unsigned long id + cdef public unsigned long index + + def __init__(self, unsigned long id, unsigned long index): + self.__id__ = 38 + self.id = id + self.index = index + + +cdef class Fetch(PyMessage): + cdef public int __id__ + cdef public str method + cdef public str url + cdef public str request + cdef public str response + cdef public unsigned long status + cdef public unsigned long timestamp + cdef public unsigned long duration + + def __init__(self, str method, str url, str request, str response, unsigned long status, unsigned long timestamp, unsigned long duration): + self.__id__ = 39 + self.method = method + self.url = url + self.request = request + self.response = response + self.status = status + self.timestamp = timestamp + self.duration = duration + + +cdef class Profiler(PyMessage): + cdef public int __id__ + cdef public str name + cdef public unsigned long duration + cdef public str args + cdef public str result + + def __init__(self, str name, unsigned long duration, str args, str result): + self.__id__ = 40 + self.name = name + self.duration = duration + self.args = args + self.result = result + + +cdef class OTable(PyMessage): + cdef public int __id__ + cdef public str key + cdef public str value + + def __init__(self, str key, str value): + self.__id__ = 41 + self.key = key + self.value = value + + +cdef class StateAction(PyMessage): + cdef public int __id__ + cdef public str type + + def __init__(self, str type): + self.__id__ = 42 + self.type = type + + +cdef class Redux(PyMessage): + cdef public int __id__ + cdef public str action + cdef public str state + cdef public unsigned long duration + + def __init__(self, str action, str state, unsigned long duration): + self.__id__ = 44 + self.action = action + self.state = state + self.duration = duration + + +cdef class Vuex(PyMessage): + cdef public int __id__ + cdef public str mutation + cdef public str state + + def __init__(self, str mutation, str state): + self.__id__ = 45 + self.mutation = mutation + self.state = state + + +cdef class MobX(PyMessage): + cdef public int __id__ + cdef public str type + cdef public str payload + + def __init__(self, str type, str payload): + self.__id__ = 46 + self.type = type + self.payload = payload + + +cdef class NgRx(PyMessage): + cdef public int __id__ + cdef public str action + cdef public str state + cdef public unsigned long duration + + def __init__(self, str action, str state, unsigned long duration): + self.__id__ = 47 + self.action = action + self.state = state + self.duration = duration + + +cdef class GraphQL(PyMessage): + cdef public int __id__ + cdef public str operation_kind + cdef public str operation_name + cdef public str variables + cdef public str response + + def __init__(self, str operation_kind, str operation_name, str variables, str response): + self.__id__ = 48 + self.operation_kind = operation_kind + self.operation_name = operation_name + self.variables = variables + self.response = response + + +cdef class PerformanceTrack(PyMessage): + cdef public int __id__ + cdef public long frames + cdef public long ticks + cdef public unsigned long total_js_heap_size + cdef public unsigned long used_js_heap_size + + def __init__(self, long frames, long ticks, unsigned long total_js_heap_size, unsigned long used_js_heap_size): + self.__id__ = 49 + self.frames = frames + self.ticks = ticks + self.total_js_heap_size = total_js_heap_size + self.used_js_heap_size = used_js_heap_size + + +cdef class StringDict(PyMessage): + cdef public int __id__ + cdef public unsigned long key + cdef public str value + + def __init__(self, unsigned long key, str value): + self.__id__ = 50 + self.key = key + self.value = value + + +cdef class SetNodeAttributeDict(PyMessage): + cdef public int __id__ + cdef public unsigned long id + cdef public unsigned long name_key + cdef public unsigned long value_key + + def __init__(self, unsigned long id, unsigned long name_key, unsigned long value_key): + self.__id__ = 51 + self.id = id + self.name_key = name_key + self.value_key = value_key + + +cdef class ResourceTimingDeprecated(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public unsigned long duration + cdef public unsigned long ttfb + cdef public unsigned long header_size + cdef public unsigned long encoded_body_size + cdef public unsigned long decoded_body_size + cdef public str url + cdef public str initiator + + def __init__(self, unsigned long timestamp, unsigned long duration, unsigned long ttfb, unsigned long header_size, unsigned long encoded_body_size, unsigned long decoded_body_size, str url, str initiator): + self.__id__ = 53 + self.timestamp = timestamp + self.duration = duration + self.ttfb = ttfb + self.header_size = header_size + self.encoded_body_size = encoded_body_size + self.decoded_body_size = decoded_body_size + self.url = url + self.initiator = initiator + + +cdef class ConnectionInformation(PyMessage): + cdef public int __id__ + cdef public unsigned long downlink + cdef public str type + + def __init__(self, unsigned long downlink, str type): + self.__id__ = 54 + self.downlink = downlink + self.type = type + + +cdef class SetPageVisibility(PyMessage): + cdef public int __id__ + cdef public bint hidden + + def __init__(self, bint hidden): + self.__id__ = 55 + self.hidden = hidden + + +cdef class PerformanceTrackAggr(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp_start + cdef public unsigned long timestamp_end + cdef public unsigned long min_fps + cdef public unsigned long avg_fps + cdef public unsigned long max_fps + cdef public unsigned long min_cpu + cdef public unsigned long avg_cpu + cdef public unsigned long max_cpu + cdef public unsigned long min_total_js_heap_size + cdef public unsigned long avg_total_js_heap_size + cdef public unsigned long max_total_js_heap_size + cdef public unsigned long min_used_js_heap_size + cdef public unsigned long avg_used_js_heap_size + cdef public unsigned long max_used_js_heap_size + + def __init__(self, unsigned long timestamp_start, unsigned long timestamp_end, unsigned long min_fps, unsigned long avg_fps, unsigned long max_fps, unsigned long min_cpu, unsigned long avg_cpu, unsigned long max_cpu, unsigned long min_total_js_heap_size, unsigned long avg_total_js_heap_size, unsigned long max_total_js_heap_size, unsigned long min_used_js_heap_size, unsigned long avg_used_js_heap_size, unsigned long max_used_js_heap_size): + self.__id__ = 56 + self.timestamp_start = timestamp_start + self.timestamp_end = timestamp_end + self.min_fps = min_fps + self.avg_fps = avg_fps + self.max_fps = max_fps + self.min_cpu = min_cpu + self.avg_cpu = avg_cpu + self.max_cpu = max_cpu + self.min_total_js_heap_size = min_total_js_heap_size + self.avg_total_js_heap_size = avg_total_js_heap_size + self.max_total_js_heap_size = max_total_js_heap_size + self.min_used_js_heap_size = min_used_js_heap_size + self.avg_used_js_heap_size = avg_used_js_heap_size + self.max_used_js_heap_size = max_used_js_heap_size + + +cdef class LoadFontFace(PyMessage): + cdef public int __id__ + cdef public unsigned long parent_id + cdef public str family + cdef public str source + cdef public str descriptors + + def __init__(self, unsigned long parent_id, str family, str source, str descriptors): + self.__id__ = 57 + self.parent_id = parent_id + self.family = family + self.source = source + self.descriptors = descriptors + + +cdef class SetNodeFocus(PyMessage): + cdef public int __id__ + cdef public long id + + def __init__(self, long id): + self.__id__ = 58 + self.id = id + + +cdef class LongTask(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public unsigned long duration + cdef public unsigned long context + cdef public unsigned long container_type + cdef public str container_src + cdef public str container_id + cdef public str container_name + + def __init__(self, unsigned long timestamp, unsigned long duration, unsigned long context, unsigned long container_type, str container_src, str container_id, str container_name): + self.__id__ = 59 + self.timestamp = timestamp + self.duration = duration + self.context = context + self.container_type = container_type + self.container_src = container_src + self.container_id = container_id + self.container_name = container_name + + +cdef class SetNodeAttributeURLBased(PyMessage): + cdef public int __id__ + cdef public unsigned long id + cdef public str name + cdef public str value + cdef public str base_url + + def __init__(self, unsigned long id, str name, str value, str base_url): + self.__id__ = 60 + self.id = id + self.name = name + self.value = value + self.base_url = base_url + + +cdef class SetCSSDataURLBased(PyMessage): + cdef public int __id__ + cdef public unsigned long id + cdef public str data + cdef public str base_url + + def __init__(self, unsigned long id, str data, str base_url): + self.__id__ = 61 + self.id = id + self.data = data + self.base_url = base_url + + +cdef class IssueEventDeprecated(PyMessage): + cdef public int __id__ + cdef public unsigned long message_id + cdef public unsigned long timestamp + cdef public str type + cdef public str context_string + cdef public str context + cdef public str payload + + def __init__(self, unsigned long message_id, unsigned long timestamp, str type, str context_string, str context, str payload): + self.__id__ = 62 + self.message_id = message_id + self.timestamp = timestamp + self.type = type + self.context_string = context_string + self.context = context + self.payload = payload + + +cdef class TechnicalInfo(PyMessage): + cdef public int __id__ + cdef public str type + cdef public str value + + def __init__(self, str type, str value): + self.__id__ = 63 + self.type = type + self.value = value + + +cdef class CustomIssue(PyMessage): + cdef public int __id__ + cdef public str name + cdef public str payload + + def __init__(self, str name, str payload): + self.__id__ = 64 + self.name = name + self.payload = payload + + +cdef class AssetCache(PyMessage): + cdef public int __id__ + cdef public str url + + def __init__(self, str url): + self.__id__ = 66 + self.url = url + + +cdef class CSSInsertRuleURLBased(PyMessage): + cdef public int __id__ + cdef public unsigned long id + cdef public str rule + cdef public unsigned long index + cdef public str base_url + + def __init__(self, unsigned long id, str rule, unsigned long index, str base_url): + self.__id__ = 67 + self.id = id + self.rule = rule + self.index = index + self.base_url = base_url + + +cdef class MouseClick(PyMessage): + cdef public int __id__ + cdef public unsigned long id + cdef public unsigned long hesitation_time + cdef public str label + cdef public str selector + + def __init__(self, unsigned long id, unsigned long hesitation_time, str label, str selector): + self.__id__ = 69 + self.id = id + self.hesitation_time = hesitation_time + self.label = label + self.selector = selector + + +cdef class CreateIFrameDocument(PyMessage): + cdef public int __id__ + cdef public unsigned long frame_id + cdef public unsigned long id + + def __init__(self, unsigned long frame_id, unsigned long id): + self.__id__ = 70 + self.frame_id = frame_id + self.id = id + + +cdef class AdoptedSSReplaceURLBased(PyMessage): + cdef public int __id__ + cdef public unsigned long sheet_id + cdef public str text + cdef public str base_url + + def __init__(self, unsigned long sheet_id, str text, str base_url): + self.__id__ = 71 + self.sheet_id = sheet_id + self.text = text + self.base_url = base_url + + +cdef class AdoptedSSReplace(PyMessage): + cdef public int __id__ + cdef public unsigned long sheet_id + cdef public str text + + def __init__(self, unsigned long sheet_id, str text): + self.__id__ = 72 + self.sheet_id = sheet_id + self.text = text + + +cdef class AdoptedSSInsertRuleURLBased(PyMessage): + cdef public int __id__ + cdef public unsigned long sheet_id + cdef public str rule + cdef public unsigned long index + cdef public str base_url + + def __init__(self, unsigned long sheet_id, str rule, unsigned long index, str base_url): + self.__id__ = 73 + self.sheet_id = sheet_id + self.rule = rule + self.index = index + self.base_url = base_url + + +cdef class AdoptedSSInsertRule(PyMessage): + cdef public int __id__ + cdef public unsigned long sheet_id + cdef public str rule + cdef public unsigned long index + + def __init__(self, unsigned long sheet_id, str rule, unsigned long index): + self.__id__ = 74 + self.sheet_id = sheet_id + self.rule = rule + self.index = index + + +cdef class AdoptedSSDeleteRule(PyMessage): + cdef public int __id__ + cdef public unsigned long sheet_id + cdef public unsigned long index + + def __init__(self, unsigned long sheet_id, unsigned long index): + self.__id__ = 75 + self.sheet_id = sheet_id + self.index = index + + +cdef class AdoptedSSAddOwner(PyMessage): + cdef public int __id__ + cdef public unsigned long sheet_id + cdef public unsigned long id + + def __init__(self, unsigned long sheet_id, unsigned long id): + self.__id__ = 76 + self.sheet_id = sheet_id + self.id = id + + +cdef class AdoptedSSRemoveOwner(PyMessage): + cdef public int __id__ + cdef public unsigned long sheet_id + cdef public unsigned long id + + def __init__(self, unsigned long sheet_id, unsigned long id): + self.__id__ = 77 + self.sheet_id = sheet_id + self.id = id + + +cdef class JSException(PyMessage): + cdef public int __id__ + cdef public str name + cdef public str message + cdef public str payload + cdef public str metadata + + def __init__(self, str name, str message, str payload, str metadata): + self.__id__ = 78 + self.name = name + self.message = message + self.payload = payload + self.metadata = metadata + + +cdef class Zustand(PyMessage): + cdef public int __id__ + cdef public str mutation + cdef public str state + + def __init__(self, str mutation, str state): + self.__id__ = 79 + self.mutation = mutation + self.state = state + + +cdef class BatchMeta(PyMessage): + cdef public int __id__ + cdef public unsigned long page_no + cdef public unsigned long first_index + cdef public long timestamp + + def __init__(self, unsigned long page_no, unsigned long first_index, long timestamp): + self.__id__ = 80 + self.page_no = page_no + self.first_index = first_index + self.timestamp = timestamp + + +cdef class BatchMetadata(PyMessage): + cdef public int __id__ + cdef public unsigned long version + cdef public unsigned long page_no + cdef public unsigned long first_index + cdef public long timestamp + cdef public str location + + def __init__(self, unsigned long version, unsigned long page_no, unsigned long first_index, long timestamp, str location): + self.__id__ = 81 + self.version = version + self.page_no = page_no + self.first_index = first_index + self.timestamp = timestamp + self.location = location + + +cdef class PartitionedMessage(PyMessage): + cdef public int __id__ + cdef public unsigned long part_no + cdef public unsigned long part_total + + def __init__(self, unsigned long part_no, unsigned long part_total): + self.__id__ = 82 + self.part_no = part_no + self.part_total = part_total + + +cdef class InputChange(PyMessage): + cdef public int __id__ + cdef public unsigned long id + cdef public str value + cdef public bint value_masked + cdef public str label + cdef public long hesitation_time + cdef public long input_duration + + def __init__(self, unsigned long id, str value, bint value_masked, str label, long hesitation_time, long input_duration): + self.__id__ = 112 + self.id = id + self.value = value + self.value_masked = value_masked + self.label = label + self.hesitation_time = hesitation_time + self.input_duration = input_duration + + +cdef class SelectionChange(PyMessage): + cdef public int __id__ + cdef public unsigned long selection_start + cdef public unsigned long selection_end + cdef public str selection + + def __init__(self, unsigned long selection_start, unsigned long selection_end, str selection): + self.__id__ = 113 + self.selection_start = selection_start + self.selection_end = selection_end + self.selection = selection + + +cdef class MouseThrashing(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + + def __init__(self, unsigned long timestamp): + self.__id__ = 114 + self.timestamp = timestamp + + +cdef class UnbindNodes(PyMessage): + cdef public int __id__ + cdef public unsigned long total_removed_percent + + def __init__(self, unsigned long total_removed_percent): + self.__id__ = 115 + self.total_removed_percent = total_removed_percent + + +cdef class ResourceTiming(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public unsigned long duration + cdef public unsigned long ttfb + cdef public unsigned long header_size + cdef public unsigned long encoded_body_size + cdef public unsigned long decoded_body_size + cdef public str url + cdef public str initiator + cdef public unsigned long transferred_size + cdef public bint cached + + def __init__(self, unsigned long timestamp, unsigned long duration, unsigned long ttfb, unsigned long header_size, unsigned long encoded_body_size, unsigned long decoded_body_size, str url, str initiator, unsigned long transferred_size, bint cached): + self.__id__ = 116 + self.timestamp = timestamp + self.duration = duration + self.ttfb = ttfb + self.header_size = header_size + self.encoded_body_size = encoded_body_size + self.decoded_body_size = decoded_body_size + self.url = url + self.initiator = initiator + self.transferred_size = transferred_size + self.cached = cached + + +cdef class IssueEvent(PyMessage): + cdef public int __id__ + cdef public unsigned long message_id + cdef public unsigned long timestamp + cdef public str type + cdef public str context_string + cdef public str context + cdef public str payload + cdef public str url + + def __init__(self, unsigned long message_id, unsigned long timestamp, str type, str context_string, str context, str payload, str url): + self.__id__ = 125 + self.message_id = message_id + self.timestamp = timestamp + self.type = type + self.context_string = context_string + self.context = context + self.payload = payload + self.url = url + + +cdef class SessionEnd(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public str encryption_key + + def __init__(self, unsigned long timestamp, str encryption_key): + self.__id__ = 126 + self.timestamp = timestamp + self.encryption_key = encryption_key + + +cdef class SessionSearch(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public unsigned long partition + + def __init__(self, unsigned long timestamp, unsigned long partition): + self.__id__ = 127 + self.timestamp = timestamp + self.partition = partition + + +cdef class IOSBatchMeta(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public unsigned long length + cdef public unsigned long first_index + + def __init__(self, unsigned long timestamp, unsigned long length, unsigned long first_index): + self.__id__ = 107 + self.timestamp = timestamp + self.length = length + self.first_index = first_index + + +cdef class IOSSessionStart(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public unsigned long project_id + cdef public str tracker_version + cdef public str rev_id + cdef public str user_uuid + cdef public str user_os + cdef public str user_os_version + cdef public str user_device + cdef public str user_device_type + cdef public str user_country + + def __init__(self, unsigned long timestamp, unsigned long project_id, str tracker_version, str rev_id, str user_uuid, str user_os, str user_os_version, str user_device, str user_device_type, str user_country): + self.__id__ = 90 + self.timestamp = timestamp + self.project_id = project_id + self.tracker_version = tracker_version + self.rev_id = rev_id + self.user_uuid = user_uuid + self.user_os = user_os + self.user_os_version = user_os_version + self.user_device = user_device + self.user_device_type = user_device_type + self.user_country = user_country + + +cdef class IOSSessionEnd(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + + def __init__(self, unsigned long timestamp): + self.__id__ = 91 + self.timestamp = timestamp + + +cdef class IOSMetadata(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public unsigned long length + cdef public str key + cdef public str value + + def __init__(self, unsigned long timestamp, unsigned long length, str key, str value): + self.__id__ = 92 + self.timestamp = timestamp + self.length = length + self.key = key + self.value = value + + +cdef class IOSCustomEvent(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public unsigned long length + cdef public str name + cdef public str payload + + def __init__(self, unsigned long timestamp, unsigned long length, str name, str payload): + self.__id__ = 93 + self.timestamp = timestamp + self.length = length + self.name = name + self.payload = payload + + +cdef class IOSUserID(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public unsigned long length + cdef public str value + + def __init__(self, unsigned long timestamp, unsigned long length, str value): + self.__id__ = 94 + self.timestamp = timestamp + self.length = length + self.value = value + + +cdef class IOSUserAnonymousID(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public unsigned long length + cdef public str value + + def __init__(self, unsigned long timestamp, unsigned long length, str value): + self.__id__ = 95 + self.timestamp = timestamp + self.length = length + self.value = value + + +cdef class IOSScreenChanges(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public unsigned long length + cdef public unsigned long x + cdef public unsigned long y + cdef public unsigned long width + cdef public unsigned long height + + def __init__(self, unsigned long timestamp, unsigned long length, unsigned long x, unsigned long y, unsigned long width, unsigned long height): + self.__id__ = 96 + self.timestamp = timestamp + self.length = length + self.x = x + self.y = y + self.width = width + self.height = height + + +cdef class IOSCrash(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public unsigned long length + cdef public str name + cdef public str reason + cdef public str stacktrace + + def __init__(self, unsigned long timestamp, unsigned long length, str name, str reason, str stacktrace): + self.__id__ = 97 + self.timestamp = timestamp + self.length = length + self.name = name + self.reason = reason + self.stacktrace = stacktrace + + +cdef class IOSScreenEnter(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public unsigned long length + cdef public str title + cdef public str view_name + + def __init__(self, unsigned long timestamp, unsigned long length, str title, str view_name): + self.__id__ = 98 + self.timestamp = timestamp + self.length = length + self.title = title + self.view_name = view_name + + +cdef class IOSScreenLeave(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public unsigned long length + cdef public str title + cdef public str view_name + + def __init__(self, unsigned long timestamp, unsigned long length, str title, str view_name): + self.__id__ = 99 + self.timestamp = timestamp + self.length = length + self.title = title + self.view_name = view_name + + +cdef class IOSClickEvent(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public unsigned long length + cdef public str label + cdef public unsigned long x + cdef public unsigned long y + + def __init__(self, unsigned long timestamp, unsigned long length, str label, unsigned long x, unsigned long y): + self.__id__ = 100 + self.timestamp = timestamp + self.length = length + self.label = label + self.x = x + self.y = y + + +cdef class IOSInputEvent(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public unsigned long length + cdef public str value + cdef public bint value_masked + cdef public str label + + def __init__(self, unsigned long timestamp, unsigned long length, str value, bint value_masked, str label): + self.__id__ = 101 + self.timestamp = timestamp + self.length = length + self.value = value + self.value_masked = value_masked + self.label = label + + +cdef class IOSPerformanceEvent(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public unsigned long length + cdef public str name + cdef public unsigned long value + + def __init__(self, unsigned long timestamp, unsigned long length, str name, unsigned long value): + self.__id__ = 102 + self.timestamp = timestamp + self.length = length + self.name = name + self.value = value + + +cdef class IOSLog(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public unsigned long length + cdef public str severity + cdef public str content + + def __init__(self, unsigned long timestamp, unsigned long length, str severity, str content): + self.__id__ = 103 + self.timestamp = timestamp + self.length = length + self.severity = severity + self.content = content + + +cdef class IOSInternalError(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public unsigned long length + cdef public str content + + def __init__(self, unsigned long timestamp, unsigned long length, str content): + self.__id__ = 104 + self.timestamp = timestamp + self.length = length + self.content = content + + +cdef class IOSNetworkCall(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public unsigned long length + cdef public unsigned long duration + cdef public str headers + cdef public str body + cdef public str url + cdef public bint success + cdef public str method + cdef public unsigned long status + + def __init__(self, unsigned long timestamp, unsigned long length, unsigned long duration, str headers, str body, str url, bint success, str method, unsigned long status): + self.__id__ = 105 + self.timestamp = timestamp + self.length = length + self.duration = duration + self.headers = headers + self.body = body + self.url = url + self.success = success + self.method = method + self.status = status + + +cdef class IOSPerformanceAggregated(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp_start + cdef public unsigned long timestamp_end + cdef public unsigned long min_fps + cdef public unsigned long avg_fps + cdef public unsigned long max_fps + cdef public unsigned long min_cpu + cdef public unsigned long avg_cpu + cdef public unsigned long max_cpu + cdef public unsigned long min_memory + cdef public unsigned long avg_memory + cdef public unsigned long max_memory + cdef public unsigned long min_battery + cdef public unsigned long avg_battery + cdef public unsigned long max_battery + + def __init__(self, unsigned long timestamp_start, unsigned long timestamp_end, unsigned long min_fps, unsigned long avg_fps, unsigned long max_fps, unsigned long min_cpu, unsigned long avg_cpu, unsigned long max_cpu, unsigned long min_memory, unsigned long avg_memory, unsigned long max_memory, unsigned long min_battery, unsigned long avg_battery, unsigned long max_battery): + self.__id__ = 110 + self.timestamp_start = timestamp_start + self.timestamp_end = timestamp_end + self.min_fps = min_fps + self.avg_fps = avg_fps + self.max_fps = max_fps + self.min_cpu = min_cpu + self.avg_cpu = avg_cpu + self.max_cpu = max_cpu + self.min_memory = min_memory + self.avg_memory = avg_memory + self.max_memory = max_memory + self.min_battery = min_battery + self.avg_battery = avg_battery + self.max_battery = max_battery + + +cdef class IOSIssueEvent(PyMessage): + cdef public int __id__ + cdef public unsigned long timestamp + cdef public str type + cdef public str context_string + cdef public str context + cdef public str payload + + def __init__(self, unsigned long timestamp, str type, str context_string, str context, str payload): + self.__id__ = 111 + self.timestamp = timestamp + self.type = type + self.context_string = context_string + self.context = context + self.payload = payload + + diff --git a/ee/connectors/msgcodec/msgcodec.pyx b/ee/connectors/msgcodec/msgcodec.pyx new file mode 100644 index 000000000..bf44236ee --- /dev/null +++ b/ee/connectors/msgcodec/msgcodec.pyx @@ -0,0 +1,993 @@ +# Auto-generated, do not edit + +from messages import * +#from io cimport BytesIO +from io import BytesIO +import copy +from libc.stdlib cimport abort + +cdef extern from "Python.h": + int PyArg_ParseTupleAndKeywords(object args, object kwargs, char* format, char** keywords, ...) + +cdef class PyMsg: + def __cinit__(self): + pass + +ctypedef object PyBytesIO +cdef unsigned long c_message_id + +cdef class MessageCodec: + """ + Implements encode/decode primitives + """ + cdef list msg_selector + + def __init__(self, list msg_selector): + self.msg_selector = msg_selector + + @staticmethod + cdef read_boolean(PyBytesIO reader): + cdef bint b + b = reader.read(1)[0] + return b == 1 + + @staticmethod + def read_bool_method(PyBytesIO reader): + return MessageCodec.read_boolean(reader) + + @staticmethod + cdef read_uint(PyBytesIO reader): + cdef unsigned long x = 0 # the result + cdef unsigned int s = 0 # the shift (our result is big-ending) + cdef int i = 0 # n of byte (max 9 for uint64) + cdef bytes b + cdef unsigned long num + + while True: + b = reader.read(1) + if len(b) == 0: + raise IndexError('bytes out of range') + + num = int.from_bytes(b, "big", signed=False) + + if num < 0x80: + if i > 9 or (i == 9 and num > 1): + raise OverflowError() + return int(x | num << s) + x |= (num & 0x7f) << s + s += 7 + i += 1 + + @staticmethod + def read_size(PyBytesIO reader): + cdef unsigned long size = 0 + cdef bytes b + cdef unsigned long num + for i in range(3): + b = reader.read(1) + num = int.from_bytes(b, "big", signed=False) + size += num << (8*i) + return size + + + @staticmethod + def read_int(PyBytesIO reader): + """ + ux, err := ReadUint(reader) + x := int64(ux >> 1) + if err != nil { + return x, err + } + if ux&1 != 0 { + x = ^x + } + return x, err + """ + cdef unsigned long ux = MessageCodec.read_uint(reader) + cdef long x = int(ux >> 1) + + if ux & 1 != 0: + x = - x - 1 + return x + + @staticmethod + def read_string(PyBytesIO reader): + cdef unsigned long length = MessageCodec.read_uint(reader) + cdef bytes s + try: + s = reader.read(length) + except Exception as e: + print(f'Error while reading string of length {length}') + raise Exception(e) + try: + return s.decode("utf-8", errors="replace").replace("\x00", "\uFFFD") + except UnicodeDecodeError: + return None + + @staticmethod + def read_message_id(PyBytesIO reader): + """ + Read and return the first byte where the message id is encoded + """ + cdef unsigned long id_ = MessageCodec.read_uint(reader) + return id_ + + @staticmethod + def encode(PyMsg m): + ... + + @staticmethod + def decode(bytes b): + cdef PyBytesIO reader = BytesIO(b) + return MessageCodec.read_head_message(reader) + + @staticmethod + def check_message_id(bytes b): + """ + todo: make it static and without reader. It's just the first byte + Read and return the first byte where the message id is encoded + """ + cdef PyBytesIO reader = BytesIO(b) + cdef unsigned long id_ = MessageCodec.read_uint(reader) + + return id_ + + @staticmethod + def decode_key(bytes b): + """ + Decode the message key (encoded with little endian) + """ + cdef unsigned long decoded + try: + decoded = int.from_bytes(b, "little", signed=False) + except Exception as e: + print(f"Error while decoding message key (SessionID) from {b}") + raise e + return decoded + + def decode_detailed(self, bytes b): + global c_message_id + cdef PyBytesIO reader = BytesIO(b) + cdef list messages_list + cdef int mode + try: + messages_list = [self.handler(reader, 0)] + except IndexError: + print(f'[WARN] Broken batch') + return list() + if isinstance(messages_list[0], BatchMeta): + # Old BatchMeta + mode = 0 + elif isinstance(messages_list[0], BatchMetadata): + # New BatchMeta + if messages_list[0].version == 0: + mode = 0 + else: + mode = 1 + else: + # print(f'{messages_list[0].__id__}') + return messages_list + while True: + try: + msg_decoded = self.handler(reader, mode) + if msg_decoded is not None: + messages_list.append(msg_decoded) + except IndexError: + break + return messages_list + + def handler(self, PyBytesIO reader, int mode = 0): + global c_message_id + cdef unsigned long message_id = MessageCodec.read_message_id(reader) + c_message_id = message_id + cdef int r_size + try: + if mode == 1: + # We read the three bytes representing the length of message. It can be used to skip unwanted messages + r_size = MessageCodec.read_size(reader) + if message_id not in self.msg_selector: + reader.read(r_size) + return None + return MessageCodec.read_head_message(reader, message_id) + elif mode == 0: + # Old format with no bytes for message length + return MessageCodec.read_head_message(reader, message_id) + else: + raise IOError() + except Exception as e: + print(f'[Error-inside] Broken message id {message_id}') + return None + + @staticmethod + def read_head_message(PyBytesIO reader, unsigned long message_id): + + if message_id == 0: + return Timestamp( + timestamp=MessageCodec.read_uint(reader) + ) + + if message_id == 1: + return SessionStart( + timestamp=MessageCodec.read_uint(reader), + project_id=MessageCodec.read_uint(reader), + tracker_version=MessageCodec.read_string(reader), + rev_id=MessageCodec.read_string(reader), + user_uuid=MessageCodec.read_string(reader), + user_agent=MessageCodec.read_string(reader), + user_os=MessageCodec.read_string(reader), + user_os_version=MessageCodec.read_string(reader), + user_browser=MessageCodec.read_string(reader), + user_browser_version=MessageCodec.read_string(reader), + user_device=MessageCodec.read_string(reader), + user_device_type=MessageCodec.read_string(reader), + user_device_memory_size=MessageCodec.read_uint(reader), + user_device_heap_size=MessageCodec.read_uint(reader), + user_country=MessageCodec.read_string(reader), + user_id=MessageCodec.read_string(reader) + ) + + if message_id == 3: + return SessionEndDeprecated( + timestamp=MessageCodec.read_uint(reader) + ) + + if message_id == 4: + return SetPageLocation( + url=MessageCodec.read_string(reader), + referrer=MessageCodec.read_string(reader), + navigation_start=MessageCodec.read_uint(reader) + ) + + if message_id == 5: + return SetViewportSize( + width=MessageCodec.read_uint(reader), + height=MessageCodec.read_uint(reader) + ) + + if message_id == 6: + return SetViewportScroll( + x=MessageCodec.read_int(reader), + y=MessageCodec.read_int(reader) + ) + + if message_id == 7: + return CreateDocument( + + ) + + if message_id == 8: + return CreateElementNode( + id=MessageCodec.read_uint(reader), + parent_id=MessageCodec.read_uint(reader), + index=MessageCodec.read_uint(reader), + tag=MessageCodec.read_string(reader), + svg=MessageCodec.read_boolean(reader) + ) + + if message_id == 9: + return CreateTextNode( + id=MessageCodec.read_uint(reader), + parent_id=MessageCodec.read_uint(reader), + index=MessageCodec.read_uint(reader) + ) + + if message_id == 10: + return MoveNode( + id=MessageCodec.read_uint(reader), + parent_id=MessageCodec.read_uint(reader), + index=MessageCodec.read_uint(reader) + ) + + if message_id == 11: + return RemoveNode( + id=MessageCodec.read_uint(reader) + ) + + if message_id == 12: + return SetNodeAttribute( + id=MessageCodec.read_uint(reader), + name=MessageCodec.read_string(reader), + value=MessageCodec.read_string(reader) + ) + + if message_id == 13: + return RemoveNodeAttribute( + id=MessageCodec.read_uint(reader), + name=MessageCodec.read_string(reader) + ) + + if message_id == 14: + return SetNodeData( + id=MessageCodec.read_uint(reader), + data=MessageCodec.read_string(reader) + ) + + if message_id == 15: + return SetCSSData( + id=MessageCodec.read_uint(reader), + data=MessageCodec.read_string(reader) + ) + + if message_id == 16: + return SetNodeScroll( + id=MessageCodec.read_uint(reader), + x=MessageCodec.read_int(reader), + y=MessageCodec.read_int(reader) + ) + + if message_id == 17: + return SetInputTarget( + id=MessageCodec.read_uint(reader), + label=MessageCodec.read_string(reader) + ) + + if message_id == 18: + return SetInputValue( + id=MessageCodec.read_uint(reader), + value=MessageCodec.read_string(reader), + mask=MessageCodec.read_int(reader) + ) + + if message_id == 19: + return SetInputChecked( + id=MessageCodec.read_uint(reader), + checked=MessageCodec.read_boolean(reader) + ) + + if message_id == 20: + return MouseMove( + x=MessageCodec.read_uint(reader), + y=MessageCodec.read_uint(reader) + ) + + if message_id == 21: + return NetworkRequest( + type=MessageCodec.read_string(reader), + method=MessageCodec.read_string(reader), + url=MessageCodec.read_string(reader), + request=MessageCodec.read_string(reader), + response=MessageCodec.read_string(reader), + status=MessageCodec.read_uint(reader), + timestamp=MessageCodec.read_uint(reader), + duration=MessageCodec.read_uint(reader) + ) + + if message_id == 22: + return ConsoleLog( + level=MessageCodec.read_string(reader), + value=MessageCodec.read_string(reader) + ) + + if message_id == 23: + return PageLoadTiming( + request_start=MessageCodec.read_uint(reader), + response_start=MessageCodec.read_uint(reader), + response_end=MessageCodec.read_uint(reader), + dom_content_loaded_event_start=MessageCodec.read_uint(reader), + dom_content_loaded_event_end=MessageCodec.read_uint(reader), + load_event_start=MessageCodec.read_uint(reader), + load_event_end=MessageCodec.read_uint(reader), + first_paint=MessageCodec.read_uint(reader), + first_contentful_paint=MessageCodec.read_uint(reader) + ) + + if message_id == 24: + return PageRenderTiming( + speed_index=MessageCodec.read_uint(reader), + visually_complete=MessageCodec.read_uint(reader), + time_to_interactive=MessageCodec.read_uint(reader) + ) + + if message_id == 25: + return JSExceptionDeprecated( + name=MessageCodec.read_string(reader), + message=MessageCodec.read_string(reader), + payload=MessageCodec.read_string(reader) + ) + + if message_id == 26: + return IntegrationEvent( + timestamp=MessageCodec.read_uint(reader), + source=MessageCodec.read_string(reader), + name=MessageCodec.read_string(reader), + message=MessageCodec.read_string(reader), + payload=MessageCodec.read_string(reader) + ) + + if message_id == 27: + return CustomEvent( + name=MessageCodec.read_string(reader), + payload=MessageCodec.read_string(reader) + ) + + if message_id == 28: + return UserID( + id=MessageCodec.read_string(reader) + ) + + if message_id == 29: + return UserAnonymousID( + id=MessageCodec.read_string(reader) + ) + + if message_id == 30: + return Metadata( + key=MessageCodec.read_string(reader), + value=MessageCodec.read_string(reader) + ) + + if message_id == 31: + return PageEvent( + message_id=MessageCodec.read_uint(reader), + timestamp=MessageCodec.read_uint(reader), + url=MessageCodec.read_string(reader), + referrer=MessageCodec.read_string(reader), + loaded=MessageCodec.read_boolean(reader), + request_start=MessageCodec.read_uint(reader), + response_start=MessageCodec.read_uint(reader), + response_end=MessageCodec.read_uint(reader), + dom_content_loaded_event_start=MessageCodec.read_uint(reader), + dom_content_loaded_event_end=MessageCodec.read_uint(reader), + load_event_start=MessageCodec.read_uint(reader), + load_event_end=MessageCodec.read_uint(reader), + first_paint=MessageCodec.read_uint(reader), + first_contentful_paint=MessageCodec.read_uint(reader), + speed_index=MessageCodec.read_uint(reader), + visually_complete=MessageCodec.read_uint(reader), + time_to_interactive=MessageCodec.read_uint(reader) + ) + + if message_id == 32: + return InputEvent( + message_id=MessageCodec.read_uint(reader), + timestamp=MessageCodec.read_uint(reader), + value=MessageCodec.read_string(reader), + value_masked=MessageCodec.read_boolean(reader), + label=MessageCodec.read_string(reader) + ) + + if message_id == 37: + return CSSInsertRule( + id=MessageCodec.read_uint(reader), + rule=MessageCodec.read_string(reader), + index=MessageCodec.read_uint(reader) + ) + + if message_id == 38: + return CSSDeleteRule( + id=MessageCodec.read_uint(reader), + index=MessageCodec.read_uint(reader) + ) + + if message_id == 39: + return Fetch( + method=MessageCodec.read_string(reader), + url=MessageCodec.read_string(reader), + request=MessageCodec.read_string(reader), + response=MessageCodec.read_string(reader), + status=MessageCodec.read_uint(reader), + timestamp=MessageCodec.read_uint(reader), + duration=MessageCodec.read_uint(reader) + ) + + if message_id == 40: + return Profiler( + name=MessageCodec.read_string(reader), + duration=MessageCodec.read_uint(reader), + args=MessageCodec.read_string(reader), + result=MessageCodec.read_string(reader) + ) + + if message_id == 41: + return OTable( + key=MessageCodec.read_string(reader), + value=MessageCodec.read_string(reader) + ) + + if message_id == 42: + return StateAction( + type=MessageCodec.read_string(reader) + ) + + if message_id == 44: + return Redux( + action=MessageCodec.read_string(reader), + state=MessageCodec.read_string(reader), + duration=MessageCodec.read_uint(reader) + ) + + if message_id == 45: + return Vuex( + mutation=MessageCodec.read_string(reader), + state=MessageCodec.read_string(reader) + ) + + if message_id == 46: + return MobX( + type=MessageCodec.read_string(reader), + payload=MessageCodec.read_string(reader) + ) + + if message_id == 47: + return NgRx( + action=MessageCodec.read_string(reader), + state=MessageCodec.read_string(reader), + duration=MessageCodec.read_uint(reader) + ) + + if message_id == 48: + return GraphQL( + operation_kind=MessageCodec.read_string(reader), + operation_name=MessageCodec.read_string(reader), + variables=MessageCodec.read_string(reader), + response=MessageCodec.read_string(reader) + ) + + if message_id == 49: + return PerformanceTrack( + frames=MessageCodec.read_int(reader), + ticks=MessageCodec.read_int(reader), + total_js_heap_size=MessageCodec.read_uint(reader), + used_js_heap_size=MessageCodec.read_uint(reader) + ) + + if message_id == 50: + return StringDict( + key=MessageCodec.read_uint(reader), + value=MessageCodec.read_string(reader) + ) + + if message_id == 51: + return SetNodeAttributeDict( + id=MessageCodec.read_uint(reader), + name_key=MessageCodec.read_uint(reader), + value_key=MessageCodec.read_uint(reader) + ) + + if message_id == 53: + return ResourceTimingDeprecated( + timestamp=MessageCodec.read_uint(reader), + duration=MessageCodec.read_uint(reader), + ttfb=MessageCodec.read_uint(reader), + header_size=MessageCodec.read_uint(reader), + encoded_body_size=MessageCodec.read_uint(reader), + decoded_body_size=MessageCodec.read_uint(reader), + url=MessageCodec.read_string(reader), + initiator=MessageCodec.read_string(reader) + ) + + if message_id == 54: + return ConnectionInformation( + downlink=MessageCodec.read_uint(reader), + type=MessageCodec.read_string(reader) + ) + + if message_id == 55: + return SetPageVisibility( + hidden=MessageCodec.read_boolean(reader) + ) + + if message_id == 56: + return PerformanceTrackAggr( + timestamp_start=MessageCodec.read_uint(reader), + timestamp_end=MessageCodec.read_uint(reader), + min_fps=MessageCodec.read_uint(reader), + avg_fps=MessageCodec.read_uint(reader), + max_fps=MessageCodec.read_uint(reader), + min_cpu=MessageCodec.read_uint(reader), + avg_cpu=MessageCodec.read_uint(reader), + max_cpu=MessageCodec.read_uint(reader), + min_total_js_heap_size=MessageCodec.read_uint(reader), + avg_total_js_heap_size=MessageCodec.read_uint(reader), + max_total_js_heap_size=MessageCodec.read_uint(reader), + min_used_js_heap_size=MessageCodec.read_uint(reader), + avg_used_js_heap_size=MessageCodec.read_uint(reader), + max_used_js_heap_size=MessageCodec.read_uint(reader) + ) + + if message_id == 57: + return LoadFontFace( + parent_id=MessageCodec.read_uint(reader), + family=MessageCodec.read_string(reader), + source=MessageCodec.read_string(reader), + descriptors=MessageCodec.read_string(reader) + ) + + if message_id == 58: + return SetNodeFocus( + id=MessageCodec.read_int(reader) + ) + + if message_id == 59: + return LongTask( + timestamp=MessageCodec.read_uint(reader), + duration=MessageCodec.read_uint(reader), + context=MessageCodec.read_uint(reader), + container_type=MessageCodec.read_uint(reader), + container_src=MessageCodec.read_string(reader), + container_id=MessageCodec.read_string(reader), + container_name=MessageCodec.read_string(reader) + ) + + if message_id == 60: + return SetNodeAttributeURLBased( + id=MessageCodec.read_uint(reader), + name=MessageCodec.read_string(reader), + value=MessageCodec.read_string(reader), + base_url=MessageCodec.read_string(reader) + ) + + if message_id == 61: + return SetCSSDataURLBased( + id=MessageCodec.read_uint(reader), + data=MessageCodec.read_string(reader), + base_url=MessageCodec.read_string(reader) + ) + + if message_id == 62: + return IssueEventDeprecated( + message_id=MessageCodec.read_uint(reader), + timestamp=MessageCodec.read_uint(reader), + type=MessageCodec.read_string(reader), + context_string=MessageCodec.read_string(reader), + context=MessageCodec.read_string(reader), + payload=MessageCodec.read_string(reader) + ) + + if message_id == 63: + return TechnicalInfo( + type=MessageCodec.read_string(reader), + value=MessageCodec.read_string(reader) + ) + + if message_id == 64: + return CustomIssue( + name=MessageCodec.read_string(reader), + payload=MessageCodec.read_string(reader) + ) + + if message_id == 66: + return AssetCache( + url=MessageCodec.read_string(reader) + ) + + if message_id == 67: + return CSSInsertRuleURLBased( + id=MessageCodec.read_uint(reader), + rule=MessageCodec.read_string(reader), + index=MessageCodec.read_uint(reader), + base_url=MessageCodec.read_string(reader) + ) + + if message_id == 69: + return MouseClick( + id=MessageCodec.read_uint(reader), + hesitation_time=MessageCodec.read_uint(reader), + label=MessageCodec.read_string(reader), + selector=MessageCodec.read_string(reader) + ) + + if message_id == 70: + return CreateIFrameDocument( + frame_id=MessageCodec.read_uint(reader), + id=MessageCodec.read_uint(reader) + ) + + if message_id == 71: + return AdoptedSSReplaceURLBased( + sheet_id=MessageCodec.read_uint(reader), + text=MessageCodec.read_string(reader), + base_url=MessageCodec.read_string(reader) + ) + + if message_id == 72: + return AdoptedSSReplace( + sheet_id=MessageCodec.read_uint(reader), + text=MessageCodec.read_string(reader) + ) + + if message_id == 73: + return AdoptedSSInsertRuleURLBased( + sheet_id=MessageCodec.read_uint(reader), + rule=MessageCodec.read_string(reader), + index=MessageCodec.read_uint(reader), + base_url=MessageCodec.read_string(reader) + ) + + if message_id == 74: + return AdoptedSSInsertRule( + sheet_id=MessageCodec.read_uint(reader), + rule=MessageCodec.read_string(reader), + index=MessageCodec.read_uint(reader) + ) + + if message_id == 75: + return AdoptedSSDeleteRule( + sheet_id=MessageCodec.read_uint(reader), + index=MessageCodec.read_uint(reader) + ) + + if message_id == 76: + return AdoptedSSAddOwner( + sheet_id=MessageCodec.read_uint(reader), + id=MessageCodec.read_uint(reader) + ) + + if message_id == 77: + return AdoptedSSRemoveOwner( + sheet_id=MessageCodec.read_uint(reader), + id=MessageCodec.read_uint(reader) + ) + + if message_id == 78: + return JSException( + name=MessageCodec.read_string(reader), + message=MessageCodec.read_string(reader), + payload=MessageCodec.read_string(reader), + metadata=MessageCodec.read_string(reader) + ) + + if message_id == 79: + return Zustand( + mutation=MessageCodec.read_string(reader), + state=MessageCodec.read_string(reader) + ) + + if message_id == 80: + return BatchMeta( + page_no=MessageCodec.read_uint(reader), + first_index=MessageCodec.read_uint(reader), + timestamp=MessageCodec.read_int(reader) + ) + + if message_id == 81: + return BatchMetadata( + version=MessageCodec.read_uint(reader), + page_no=MessageCodec.read_uint(reader), + first_index=MessageCodec.read_uint(reader), + timestamp=MessageCodec.read_int(reader), + location=MessageCodec.read_string(reader) + ) + + if message_id == 82: + return PartitionedMessage( + part_no=MessageCodec.read_uint(reader), + part_total=MessageCodec.read_uint(reader) + ) + + if message_id == 112: + return InputChange( + id=MessageCodec.read_uint(reader), + value=MessageCodec.read_string(reader), + value_masked=MessageCodec.read_boolean(reader), + label=MessageCodec.read_string(reader), + hesitation_time=MessageCodec.read_int(reader), + input_duration=MessageCodec.read_int(reader) + ) + + if message_id == 113: + return SelectionChange( + selection_start=MessageCodec.read_uint(reader), + selection_end=MessageCodec.read_uint(reader), + selection=MessageCodec.read_string(reader) + ) + + if message_id == 114: + return MouseThrashing( + timestamp=MessageCodec.read_uint(reader) + ) + + if message_id == 115: + return UnbindNodes( + total_removed_percent=MessageCodec.read_uint(reader) + ) + + if message_id == 116: + return ResourceTiming( + timestamp=MessageCodec.read_uint(reader), + duration=MessageCodec.read_uint(reader), + ttfb=MessageCodec.read_uint(reader), + header_size=MessageCodec.read_uint(reader), + encoded_body_size=MessageCodec.read_uint(reader), + decoded_body_size=MessageCodec.read_uint(reader), + url=MessageCodec.read_string(reader), + initiator=MessageCodec.read_string(reader), + transferred_size=MessageCodec.read_uint(reader), + cached=MessageCodec.read_boolean(reader) + ) + + if message_id == 125: + return IssueEvent( + message_id=MessageCodec.read_uint(reader), + timestamp=MessageCodec.read_uint(reader), + type=MessageCodec.read_string(reader), + context_string=MessageCodec.read_string(reader), + context=MessageCodec.read_string(reader), + payload=MessageCodec.read_string(reader), + url=MessageCodec.read_string(reader) + ) + + if message_id == 126: + return SessionEnd( + timestamp=MessageCodec.read_uint(reader), + encryption_key=MessageCodec.read_string(reader) + ) + + if message_id == 127: + return SessionSearch( + timestamp=MessageCodec.read_uint(reader), + partition=MessageCodec.read_uint(reader) + ) + + if message_id == 107: + return IOSBatchMeta( + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + first_index=MessageCodec.read_uint(reader) + ) + + if message_id == 90: + return IOSSessionStart( + timestamp=MessageCodec.read_uint(reader), + project_id=MessageCodec.read_uint(reader), + tracker_version=MessageCodec.read_string(reader), + rev_id=MessageCodec.read_string(reader), + user_uuid=MessageCodec.read_string(reader), + user_os=MessageCodec.read_string(reader), + user_os_version=MessageCodec.read_string(reader), + user_device=MessageCodec.read_string(reader), + user_device_type=MessageCodec.read_string(reader), + user_country=MessageCodec.read_string(reader) + ) + + if message_id == 91: + return IOSSessionEnd( + timestamp=MessageCodec.read_uint(reader) + ) + + if message_id == 92: + return IOSMetadata( + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + key=MessageCodec.read_string(reader), + value=MessageCodec.read_string(reader) + ) + + if message_id == 93: + return IOSCustomEvent( + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + name=MessageCodec.read_string(reader), + payload=MessageCodec.read_string(reader) + ) + + if message_id == 94: + return IOSUserID( + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + value=MessageCodec.read_string(reader) + ) + + if message_id == 95: + return IOSUserAnonymousID( + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + value=MessageCodec.read_string(reader) + ) + + if message_id == 96: + return IOSScreenChanges( + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + x=MessageCodec.read_uint(reader), + y=MessageCodec.read_uint(reader), + width=MessageCodec.read_uint(reader), + height=MessageCodec.read_uint(reader) + ) + + if message_id == 97: + return IOSCrash( + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + name=MessageCodec.read_string(reader), + reason=MessageCodec.read_string(reader), + stacktrace=MessageCodec.read_string(reader) + ) + + if message_id == 98: + return IOSScreenEnter( + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + title=MessageCodec.read_string(reader), + view_name=MessageCodec.read_string(reader) + ) + + if message_id == 99: + return IOSScreenLeave( + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + title=MessageCodec.read_string(reader), + view_name=MessageCodec.read_string(reader) + ) + + if message_id == 100: + return IOSClickEvent( + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + label=MessageCodec.read_string(reader), + x=MessageCodec.read_uint(reader), + y=MessageCodec.read_uint(reader) + ) + + if message_id == 101: + return IOSInputEvent( + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + value=MessageCodec.read_string(reader), + value_masked=MessageCodec.read_boolean(reader), + label=MessageCodec.read_string(reader) + ) + + if message_id == 102: + return IOSPerformanceEvent( + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + name=MessageCodec.read_string(reader), + value=MessageCodec.read_uint(reader) + ) + + if message_id == 103: + return IOSLog( + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + severity=MessageCodec.read_string(reader), + content=MessageCodec.read_string(reader) + ) + + if message_id == 104: + return IOSInternalError( + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + content=MessageCodec.read_string(reader) + ) + + if message_id == 105: + return IOSNetworkCall( + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + duration=MessageCodec.read_uint(reader), + headers=MessageCodec.read_string(reader), + body=MessageCodec.read_string(reader), + url=MessageCodec.read_string(reader), + success=MessageCodec.read_boolean(reader), + method=MessageCodec.read_string(reader), + status=MessageCodec.read_uint(reader) + ) + + if message_id == 110: + return IOSPerformanceAggregated( + timestamp_start=MessageCodec.read_uint(reader), + timestamp_end=MessageCodec.read_uint(reader), + min_fps=MessageCodec.read_uint(reader), + avg_fps=MessageCodec.read_uint(reader), + max_fps=MessageCodec.read_uint(reader), + min_cpu=MessageCodec.read_uint(reader), + avg_cpu=MessageCodec.read_uint(reader), + max_cpu=MessageCodec.read_uint(reader), + min_memory=MessageCodec.read_uint(reader), + avg_memory=MessageCodec.read_uint(reader), + max_memory=MessageCodec.read_uint(reader), + min_battery=MessageCodec.read_uint(reader), + avg_battery=MessageCodec.read_uint(reader), + max_battery=MessageCodec.read_uint(reader) + ) + + if message_id == 111: + return IOSIssueEvent( + timestamp=MessageCodec.read_uint(reader), + type=MessageCodec.read_string(reader), + context_string=MessageCodec.read_string(reader), + context=MessageCodec.read_string(reader), + payload=MessageCodec.read_string(reader) + ) + diff --git a/ee/connectors/requirements-fill.txt b/ee/connectors/requirements-fill.txt new file mode 100644 index 000000000..1d69f0ca8 --- /dev/null +++ b/ee/connectors/requirements-fill.txt @@ -0,0 +1,13 @@ +chardet==5.1.0 +idna==3.4 +psycopg2-binary==2.9.6 +python-decouple==3.8 +pytz==2022.6 +requests==2.28.1 +SQLAlchemy==1.4.48 +tzlocal==5.0.1 +urllib3==1.26.15 +sqlalchemy-redshift==0.8.14 +redshift-connector==2.0.911 +pandas-redshift==2.0.5 +PyYAML==6.0 diff --git a/ee/connectors/utils/cache.py b/ee/connectors/utils/cache.py index 2f4050a52..b2e92ca51 100644 --- a/ee/connectors/utils/cache.py +++ b/ee/connectors/utils/cache.py @@ -25,7 +25,7 @@ class CachedSessions: env: MAX_SESSION_LIFE: cache lifespan of session (default 7200 seconds)""" self.session_project = dict() - self.max_alive_time = config('MAX_SESSION_LIFE', default=7200, cast=int) # Default 2 hours + self.max_alive_time = config('MAX_SESSION_LIFE', default=7800, cast=int) # Default 2 hours def create(self, sessionid): """Saves a new session with status OPEN and set its insertion time""" @@ -121,4 +121,3 @@ class ProjectFilter: def terminate(self, db): # self.save_checkpoint(db) db.close() - diff --git a/ee/connectors/utils/pg_client.py b/ee/connectors/utils/pg_client.py index efd40b52f..1bfad6d36 100644 --- a/ee/connectors/utils/pg_client.py +++ b/ee/connectors/utils/pg_client.py @@ -19,6 +19,8 @@ if conn_str == '': "port": config("pg_port", cast=int), "application_name": config("APP_NAME", default="PY")} else: + import urllib.parse + conn_str = urllib.parse.unquote(conn_str) usr_info, host_info = conn_str.split('@') i = usr_info.find('://') pg_user, pg_password = usr_info[i+3:].split(':') diff --git a/ee/connectors/utils/uploader.py b/ee/connectors/utils/uploader.py new file mode 100644 index 000000000..803ff15b5 --- /dev/null +++ b/ee/connectors/utils/uploader.py @@ -0,0 +1,79 @@ +from datetime import datetime +from db.writer import insert_batch, update_batch +from psycopg2 import InterfaceError +from time import sleep + + +def insertBatch(events_batch, sessions_insert_batch, sessions_update_batch, db, sessions_table_name, table_name, EVENT_TYPE): + t1 = datetime.now().timestamp() + print(f'[BG-INFO] Number of events to add {len(events_batch)}, number of sessions to add {len(sessions_insert_batch)}, number of sessions to update {len(sessions_update_batch)}') + if sessions_insert_batch: + attempt_session_insert(sessions_insert_batch, db, sessions_table_name) + + if sessions_update_batch: + attempt_session_update(sessions_update_batch, db, sessions_table_name) + + if events_batch: + attempt_batch_insert(events_batch, db, table_name, EVENT_TYPE) + print(f'[BG-INFO] Uploaded into S3 in {datetime.now().timestamp()-t1} seconds') + + +def attempt_session_insert(sess_batch, db, sessions_table_name, try_=0): + if sess_batch: + try: + insert_batch(db, sess_batch, table=sessions_table_name, level='sessions') + except TypeError as e: + print("Type conversion error") + print(repr(e)) + except ValueError as e: + print("Message value could not be processed or inserted correctly") + print(repr(e)) + except InterfaceError as e: + if try_ < 3: + try_ += 1 + sleep(try_*2) + attempt_session_insert(sess_batch, db, sessions_table_name, try_) + except Exception as e: + print(repr(e)) + + +def attempt_session_update(sess_batch, db, sessions_table_name): + if sess_batch: + try: + update_batch(db, sess_batch, table=sessions_table_name) + except TypeError as e: + print('Type conversion error') + print(repr(e)) + except ValueError as e: + print('Message value could not be processed or inserted correctly') + print(repr(e)) + except InterfaceError as e: + print('Error while trying to update session into datawarehouse') + print(repr(e)) + except Exception as e: + print(repr(e)) + + +def attempt_batch_insert(events_batch, db, table_name, EVENT_TYPE, try_=0): + try: + insert_batch(db=db, batch=events_batch, table=table_name, level=EVENT_TYPE) + except TypeError as e: + print("Type conversion error") + print(repr(e)) + except ValueError as e: + print("Message value could not be processed or inserted correctly") + print(repr(e)) + except InterfaceError as e: + if try_ < 3: + try_ += 1 + sleep(try_*2) + attempt_batch_insert(events_batch, db, table_name, EVENT_TYPE, try_) + elif try_ == 3: + db.restart() + sleep(2) + attempt_batch_insert(events_batch, db, table_name, EVENT_TYPE, try_ + 1) + else: + print(repr(e)) + except Exception as e: + print(repr(e)) + diff --git a/ee/connectors/utils/worker.py b/ee/connectors/utils/worker.py new file mode 100644 index 000000000..9ae0633ee --- /dev/null +++ b/ee/connectors/utils/worker.py @@ -0,0 +1,377 @@ +from multiprocessing import Pool, Process, Pipe, TimeoutError +from multiprocessing.connection import Connection +from msgcodec import MessageCodec +from messages import SessionEnd +from utils.uploader import insertBatch +from utils.cache import CachedSessions +from db.models import DetailedEvent, Event, Session, events_detailed_table_name, events_table_name, sessions_table_name +from handler import handle_normal_message, handle_message, handle_session +from datetime import datetime +from decouple import config +from utils import pg_client +from utils.signal_handler import signal_handler +from copy import deepcopy +from confluent_kafka import Consumer +import json +import asyncio + +EVENT_TYPE = config('EVENT_TYPE') +DATABASE = config('CLOUD_SERVICE') +UPLOAD_RATE = config('upload_rate', default=30, cast=int) +if EVENT_TYPE == 'detailed': + table_name = events_detailed_table_name +elif EVENT_TYPE == 'normal': + table_name = events_table_name + +TOPICS = config("TOPICS", default="saas-raw").split(',') +ssl_protocol = config('KAFKA_USE_SSL', default=True, cast=bool) +consumer_settings = { + "bootstrap.servers": config('KAFKA_SERVERS'), + "group.id": f"connector_{DATABASE}", + "auto.offset.reset": "earliest", + "enable.auto.commit": False +} +if ssl_protocol: + consumer_settings['security.protocol'] = 'SSL' + +session_messages = [1, 25, 28, 29, 30, 31, 32, 54, 56, 62, 69, 78, 125, 126] +if EVENT_TYPE == 'normal': + events_messages = [21, 22, 25, 27, 64, 78, 125] +elif EVENT_TYPE == 'detailed': + events_messages = [1, 4, 21, 22, 25, 27, 31, 32, 39, 48, 59, 64, 69, 78, 125, 126] +allowed_messages = list(set(session_messages + events_messages)) +codec = MessageCodec(allowed_messages) + + +def init_consumer(): + global DATABASE, consumer_settings + consumer = Consumer(consumer_settings) + consumer.subscribe(TOPICS) + return consumer + + +def close_consumer(consumer): + consumer.unsubscribe() + consumer.close() + + +def session_to_dict(sess: Session): + _dict = sess.__dict__ + try: + del _dict['_sa_instance_state'] + except KeyError: + ... + return _dict + + +def dict_to_session(session_dict: dict): + n = Session() + n.__dict__ |= session_dict + return n + + +def event_to_dict(event: Event | DetailedEvent): + _dict = event.__dict__ + try: + del _dict['_sa_instance_state'] + except KeyError: + ... + return _dict + + +def dict_to_event(event_dict: dict): + global EVENT_TYPE + if EVENT_TYPE == 'detailed': + n = DetailedEvent() + else: + n = Event() + n.__dict__ |= event_dict + return n + +class ProjectFilter: + def __init__(self, project_filter): + self.max_lifespan = config('MAX_UNWANTED_SESSION_LIFE', default=7800, cast=int) + self.project_filter = project_filter + self.sessions_lifespan = CachedSessions() + self.non_valid_sessions_cache = dict() + + def is_valid(self, sessionId): + if len(self.project_filter) == 0: + return True + elif sessionId in self.sessions_lifespan.session_project.keys(): + return True + elif sessionId in self.non_valid_sessions_cache.keys(): + return False + else: + projetId = project_from_session(sessionId) + if projetId not in self.project_filter: + self.non_valid_sessions_cache[sessionId] = int(datetime.now().timestamp()) + return False + else: + return True + + def handle_clean(self): + if len(self.project_filter) == 0: + return + else: + current_timestamp = datetime.now().timestamp() + self.non_valid_sessions_cache = {sessionId: start_timestamp for sessionId, start_timestamp in + self.non_valid_sessions_cache.items() if + self.max_lifespan > current_timestamp - start_timestamp} + + +def read_from_kafka(pipe: Connection, params: dict): + global UPLOAD_RATE + try: + asyncio.run(pg_client.init()) + kafka_consumer = init_consumer() + project_filter = params['project_filter'] + while True: + to_decode = list() + sessionIds = list() + start_time = datetime.now().timestamp() + broken_batchs = 0 + n_messages = 0 + while datetime.now().timestamp() - start_time < UPLOAD_RATE: + msg = kafka_consumer.poll(5.0) + if msg is None: + continue + n_messages += 1 + try: + sessionId = codec.decode_key(msg.key()) + except Exception: + broken_batchs += 1 + continue + if project_filter.is_valid(sessionId): + to_decode.append(msg.value()) + sessionIds.append(sessionId) + if n_messages != 0: + print( + f'[INFO] Found {broken_batchs} broken batch over {n_messages} read messages ({100 * broken_batchs / n_messages:.2f}%)') + else: + print('[WARN] No messages read') + non_valid_updated = project_filter.non_valid_sessions_cache + pipe.send((non_valid_updated, sessionIds, to_decode)) + continue_signal = pipe.recv() + if continue_signal == 'CLOSE': + break + kafka_consumer.commit() + close_consumer(kafka_consumer) + asyncio.run(pg_client.terminate()) + except Exception as e: + print('[WARN]', repr(e)) + + +def into_batch(batch: list[Event | DetailedEvent], session_id: int, n: Session): + n.sessionid = session_id + n.received_at = int(datetime.now().timestamp() * 1000) + n.batch_order_number = len(batch) + batch.append(n) + return batch + + +def project_from_session(sessionId: int): + """Search projectId of requested sessionId in PG table sessions""" + with pg_client.PostgresClient() as conn: + conn.execute( + conn.mogrify("SELECT project_id FROM sessions WHERE session_id=%(sessionId)s LIMIT 1", + {'sessionId': sessionId}) + ) + res = conn.fetchone() + if res is None: + print(f'[WARN] sessionid {sessionId} not found in sessions table') + return None + return res['project_id'] + + +def decode_message(params: dict): + global codec, session_messages, events_messages, EVENT_TYPE + if len(params['message']) == 0: + return list(), None, list() + memory = {sessId: dict_to_session(sessObj) for sessId, sessObj in params['memory'].items()} + events_worker_batch = list() + sessionid_ended = list() + for session_id, encoded_message in params['message']: + messages = codec.decode_detailed(encoded_message) + if messages is None: + continue + for message in messages: + if message is None: + continue + if message.__id__ in events_messages and EVENT_TYPE != 'detailed': + n = handle_normal_message(message) + if n: + events_worker_batch = into_batch(batch=events_worker_batch, session_id=session_id, n=n) + elif message.__id__ in events_messages and EVENT_TYPE == 'detailed': + n = handle_message(message) + if n: + events_worker_batch = into_batch(batch=events_worker_batch, session_id=session_id, n=n) + + if message.__id__ in session_messages: + try: + memory[session_id] = handle_session(memory[session_id], message) + except KeyError: + memory[session_id] = handle_session(None, message) + memory[session_id].sessionid = session_id + if isinstance(message, SessionEnd): + sessionid_ended.append(session_id) + memory = {sessId: session_to_dict(sessObj) for sessId, sessObj in memory.items()} + return events_worker_batch, memory, sessionid_ended + + +def work_assigner(params): + flag = params.pop('flag') + if flag == 'decoder': + return {'flag': 'decoder', 'value': decode_message(params)} + # elif flag == 'reader': + # return {'flag': 'reader', 'value': read_from_kafka(params)} + + +class WorkerPool: + def __init__(self, n_workers: int, project_filter: list[int]): + self.pool = Pool(n_workers) + self.sessions = dict() + self.assigned_worker = dict() + self.pointer = 0 + self.n_workers = n_workers + self.project_filter_class = ProjectFilter(project_filter) + + def get_worker(self, session_id: int) -> int: + if session_id in self.assigned_worker.keys(): + worker_id = self.assigned_worker[session_id] + else: + worker_id = self.pointer + self.pointer = (self.pointer + 1) % self.n_workers + self.assigned_worker[session_id] = worker_id + return worker_id + + def _pool_response_handler(self, pool_results): + events_batch = list() + sessions_update_batch = list() + sessions_insert_batch = list() + count = 0 + for js_response in pool_results: + flag = js_response.pop('flag') + if flag == 'decoder': + worker_events, worker_memory, end_sessions = js_response['value'] + if worker_memory is None: + continue + events_batch += worker_events + for session_id in worker_memory.keys(): + self.sessions[session_id] = dict_to_session(worker_memory[session_id]) + self.project_filter_class.sessions_lifespan.add(session_id) + for session_id in end_sessions: + if self.sessions[session_id].session_start_timestamp: + old_status = self.project_filter_class.sessions_lifespan.close(session_id) + if old_status == 'UPDATE': + sessions_update_batch.append(deepcopy(self.sessions[session_id])) + elif old_status == 'OPEN': + sessions_insert_batch.append(deepcopy(self.sessions[session_id])) + else: + print('[WARN] Closed session should not be closed again') + elif flag == 'reader': + count += 1 + if count > 1: + raise Exception('Pool only accepts one reader task') + non_valid_updated, session_ids, messages = js_response['value'] + self.project_filter_class.non_valid_sessions_cache = non_valid_updated + + self.project_filter_class.handle_clean() + sessions_to_delete = self.project_filter_class.sessions_lifespan.clear_sessions() + for sess_id in sessions_to_delete: + try: + del self.sessions[sess_id] + except KeyError: + ... + try: + del self.assigned_worker[sess_id] + except KeyError: + ... + return events_batch, sessions_insert_batch, sessions_update_batch, session_ids, messages + + def run_workers(self, database_api): + global sessions_table_name, table_name, EVENT_TYPE + session_ids = list() + messages = list() + main_conn, reader_conn = Pipe() + kafka_task_params = {'flag': 'reader', + 'project_filter': self.project_filter_class} + kafka_reader_process = Process(target=read_from_kafka, args=(reader_conn, kafka_task_params)) + kafka_reader_process.start() + while signal_handler.KEEP_PROCESSING: + # Setup of parameters for workers + if not kafka_reader_process.is_alive(): + kafka_reader_process = Process(target=read_from_kafka, args=(reader_conn, kafka_task_params)) + kafka_reader_process.start() + decoding_params = [{'flag': 'decoder', + 'message': list(), + 'memory': dict()} for _ in range(self.n_workers) + ] + for i in range(len(session_ids)): + session_id = session_ids[i] + worker_id = self.get_worker(session_id) + decoding_params[worker_id]['message'].append([session_id, messages[i]]) + if session_id not in decoding_params[worker_id]['memory'].keys(): + try: + decoding_params[worker_id]['memory'][session_id] = session_to_dict(self.sessions[session_id]) + except KeyError: + ... + + # Hand tasks to workers + async_results = list() + # for params in kafka_task_params: + # async_results.append(self.pool.apply_async(work_assigner, args=[params])) + for params in decoding_params: + if params['message']: + async_results.append(self.pool.apply_async(work_assigner, args=[params])) + results = [{'flag': 'reader', 'value': main_conn.recv()}] + for async_result in async_results: + try: + results.append(async_result.get(timeout=32 * UPLOAD_RATE)) + except TimeoutError as e: + print('[TimeoutError] Decoding of messages is taking longer than expected') + raise e + events_batch, sessions_insert_batch, sessions_update_batch, session_ids, messages = self._pool_response_handler( + pool_results=results) + + insertBatch(events_batch, sessions_insert_batch, sessions_update_batch, database_api, sessions_table_name, + table_name, EVENT_TYPE) + self.save_snapshot(database_api) + main_conn.send('CONTINUE') + main_conn.send('CLOSE') + self.terminate() + kafka_reader_process.terminate() + + def load_checkpoint(self, database_api): + file = database_api.load_binary(name='checkpoint') + checkpoint = json.loads(file.getvalue().decode('utf-8')) + file.close() + if 'version' not in checkpoint.keys(): + sessions_cache_list = checkpoint['cache'] + reload_default_time = datetime.now().timestamp() + self.project_filter_class.non_valid_sessions_cache = {sessId: reload_default_time for sessId, value in + sessions_cache_list.items() if not value[1]} + self.project_filter_class.sessions_lifespan.session_project = checkpoint['cached_sessions'] + elif checkpoint['version'] == 'v1.0': + for sessionId, session_dict in checkpoint['sessions']: + self.sessions[sessionId] = dict_to_session(session_dict) + self.project_filter_class.sessions_lifespan.session_project = checkpoint['cached_sessions'] + + else: + raise Exception('Error in version of snapshot') + + def terminate(self, database_api): + self.pool.close() + self.save_snapshot(database_api) + database_api.close() + + def save_snapshot(self, database_api): + session_snapshot = list() + for sessionId, session in self.sessions.items(): + session_snapshot.append([sessionId, session_to_dict(session)]) + checkpoint = { + 'version': 'v1.0', + 'sessions': session_snapshot, + 'cached_sessions': self.project_filter_class.sessions_lifespan.session_project, + } + database_api.save_binary(binary_data=json.dumps(checkpoint).encode('utf-8'), name='checkpoint') diff --git a/mobs/run.rb b/mobs/run.rb index a66e685a0..d26924296 100644 --- a/mobs/run.rb +++ b/mobs/run.rb @@ -69,6 +69,23 @@ class Attribute end end + def type_pyx + case @type + when :int + 'long' + when :uint + 'unsigned long' + when :string + 'str' + when :data + 'str' + when :boolean + 'bint' + when :json + 'str' + end + end + def lengh_encoded case @type when :string, :data diff --git a/mobs/templates/ee~connectors~msgcodec~messages.pyx.erb b/mobs/templates/ee~connectors~msgcodec~messages.pyx.erb new file mode 100644 index 000000000..8e3b5bc83 --- /dev/null +++ b/mobs/templates/ee~connectors~msgcodec~messages.pyx.erb @@ -0,0 +1,23 @@ +# Auto-generated, do not edit + +from abc cimport ABC + +class Message(ABC): + pass + +cdef class PyMessage: + def __cinit__(self): + pass + +<% $messages.each do |msg| %> +cdef class <%= msg.name %>(PyMessage): + cdef public int __id__ + <%= msg.attributes.map { |attr| "cdef public #{attr.type_pyx} #{attr.name.snake_case}"}.join "\n " + %> + + def __init__(self, <%= msg.attributes.map { |attr| "#{attr.type_pyx} #{attr.name.snake_case}" }.join ", " %>): + self.__id__ = <%= msg.id %> + <%= msg.attributes.map { |attr| "self.#{attr.name.snake_case} = #{attr.name.snake_case}" }.join "\n " + %> + +<% end %> diff --git a/mobs/templates/ee~connectors~msgcodec~msgcodec.pyx.erb b/mobs/templates/ee~connectors~msgcodec~msgcodec.pyx.erb new file mode 100644 index 000000000..6ce6cb5d0 --- /dev/null +++ b/mobs/templates/ee~connectors~msgcodec~msgcodec.pyx.erb @@ -0,0 +1,201 @@ +# Auto-generated, do not edit + +from messages import * +#from io cimport BytesIO +from io import BytesIO +from libc.stdlib cimport abort + +cdef extern from "Python.h": + int PyArg_ParseTupleAndKeywords(object args, object kwargs, char* format, char** keywords, ...) + +cdef class PyMsg: + def __cinit__(self): + pass + +ctypedef object PyBytesIO + +cdef class MessageCodec: + """ + Implements encode/decode primitives + """ + cdef list msg_selector + + def __init__(self, list msg_selector): + self.msg_selector = msg_selector + + @staticmethod + cdef read_boolean(PyBytesIO reader): + cdef bint b + b = reader.read(1)[0] + return b == 1 + + @staticmethod + def read_bool_method(PyBytesIO reader): + return MessageCodec.read_boolean(reader) + + @staticmethod + cdef read_uint(PyBytesIO reader): + cdef unsigned long x = 0 # the result + cdef unsigned int s = 0 # the shift (our result is big-ending) + cdef int i = 0 # n of byte (max 9 for uint64) + cdef bytes b + cdef unsigned long num + + while True: + b = reader.read(1) + if len(b) == 0: + raise IndexError('bytes out of range') + + num = int.from_bytes(b, "big", signed=False) + + if num < 0x80: + if i > 9 or (i == 9 and num > 1): + raise OverflowError() + return int(x | num << s) + x |= (num & 0x7f) << s + s += 7 + i += 1 + + @staticmethod + def read_size(PyBytesIO reader): + cdef unsigned long size = 0 + cdef bytes b + cdef unsigned long num + for i in range(3): + b = reader.read(1) + num = int.from_bytes(b, "big", signed=False) + size += num << (8*i) + return size + + + @staticmethod + def read_int(PyBytesIO reader): + """ + ux, err := ReadUint(reader) + x := int64(ux >> 1) + if err != nil { + return x, err + } + if ux&1 != 0 { + x = ^x + } + return x, err + """ + cdef unsigned long ux = MessageCodec.read_uint(reader) + cdef long x = int(ux >> 1) + + if ux & 1 != 0: + x = - x - 1 + return x + + @staticmethod + def read_string(PyBytesIO reader): + cdef unsigned long length = MessageCodec.read_uint(reader) + cdef bytes s + try: + s = reader.read(length) + except Exception as e: + print(f'Error while reading string of length {length}') + raise Exception(e) + try: + return s.decode("utf-8", errors="replace").replace("\x00", "\uFFFD") + except UnicodeDecodeError: + return None + + @staticmethod + def read_message_id(PyBytesIO reader): + """ + Read and return the first byte where the message id is encoded + """ + cdef unsigned long id_ = MessageCodec.read_uint(reader) + return id_ + + @staticmethod + def encode(PyMsg m): + ... + + @staticmethod + def decode(bytes b): + cdef PyBytesIO reader = BytesIO(b) + return MessageCodec.read_head_message(reader) + + @staticmethod + def check_message_id(bytes b): + """ + todo: make it static and without reader. It's just the first byte + Read and return the first byte where the message id is encoded + """ + cdef PyBytesIO reader = BytesIO(b) + cdef unsigned long id_ = MessageCodec.read_uint(reader) + + return id_ + + @staticmethod + def decode_key(bytes b): + """ + Decode the message key (encoded with little endian) + """ + cdef unsigned long decoded + try: + decoded = int.from_bytes(b, "little", signed=False) + except Exception as e: + print(f"Error while decoding message key (SessionID) from {b}") + raise e + return decoded + + def decode_detailed(self, bytes b): + cdef PyBytesIO reader = BytesIO(b) + cdef list messages_list + cdef int mode + try: + messages_list = [self.handler(reader, 0)] + except IndexError: + print('[WARN] Broken batch') + return list() + if isinstance(messages_list[0], BatchMeta): + # Old BatchMeta + mode = 0 + elif isinstance(messages_list[0], BatchMetadata): + # New BatchMeta + if messages_list[0].version == 0: + mode = 0 + else: + mode = 1 + else: + return messages_list + while True: + try: + msg_decoded = self.handler(reader, mode) + if msg_decoded is not None: + messages_list.append(msg_decoded) + except IndexError: + break + return messages_list + + def handler(self, PyBytesIO reader, int mode = 0): + cdef unsigned long message_id = MessageCodec.read_message_id(reader) + cdef int r_size + if mode == 1: + # We read the three bytes representing the length of message. It can be used to skip unwanted messages + r_size = MessageCodec.read_size(reader) + if message_id not in self.msg_selector: + reader.read(r_size) + return None + return MessageCodec.read_head_message(reader, message_id) + elif mode == 0: + # Old format with no bytes for message length + return MessageCodec.read_head_message(reader, message_id) + else: + raise IOError() + + @staticmethod + def read_head_message(PyBytesIO reader, unsigned long message_id): +<% $messages.each do |msg| %> + if message_id == <%= msg.id %>: + return <%= msg.name %>( + <%= msg.attributes.map { |attr| + "#{attr.name.snake_case}=self.read_#{attr.type.to_s}(reader)" } + .join ",\n " + %> + ) +<% end %>