openreplay/mobs/templates/ee~connectors~msgcodec~msgcodec.pyx.erb
MauricioGarciaS 91921d622e
fix(connectors): Created module in Cython to speed up message decoding (#1338)
* Updated pg connector

* fix(player): fix first 8 byte checker

* fix(player): fix commit conflict

* Added pylint

* Removed pylint for incompatible license

* change(ui): check for sessions records state

* Patch/api v1.12.0 (#1299)

* fix(chalice): include metadata in sessions exp search

* fix(chalice): fixed sessions exp search wrong col name

* fix(chalice): removed cookies

* fix(chalice): changed base image to support SSO/xmlsec

* fix(chalice): changed Dockerfile to support SSO/xmlsec

* fix(chalice): changed Dockerfile to support SSO/xmlsec

(cherry picked from commit 4b8cf9742c)

* fix(ui): project fallback to recorded variable

* Patch/api v1.12.0 (#1301)

* fix(chalice): changed base image to support SSO/xmlsec

* fix(chalice): fixed exp search null metadata

(cherry picked from commit ab000751d2)

* change(ui): assist no content message styles and icons

* change(ui): revert menu disable

* fix(connector): Added method to save state in s3 for redshift if sigterm arise

* Rewriting python code in cython

* Added pyx module for messages

* Auto create pyx files

* Updated and fixed msgcodec.pyx

* Added new module to connector code

* Updated kafka lib for base image

* cleaned Docker and updated base image version for pandas

* cleaned prints

* Added code to fetch data from db and add it into redshift

* Updated consumer reading method. Async multithreading over sessionId

* Added split for country (Country,State,City)

* Fixed decoding issue for uint

* Created service able to fix data from redshift by reading from db

* Handle when process ended because of lost connection to pg, country set to country value only
2023-06-23 14:49:39 +02:00

201 lines
5.8 KiB
Text

# Auto-generated, do not edit
from messages import *
#from io cimport BytesIO
from io import BytesIO
from libc.stdlib cimport abort
cdef extern from "Python.h":
int PyArg_ParseTupleAndKeywords(object args, object kwargs, char* format, char** keywords, ...)
cdef class PyMsg:
def __cinit__(self):
pass
ctypedef object PyBytesIO
cdef class MessageCodec:
"""
Implements encode/decode primitives
"""
cdef list msg_selector
def __init__(self, list msg_selector):
self.msg_selector = msg_selector
@staticmethod
cdef read_boolean(PyBytesIO reader):
cdef bint b
b = reader.read(1)[0]
return b == 1
@staticmethod
def read_bool_method(PyBytesIO reader):
return MessageCodec.read_boolean(reader)
@staticmethod
cdef read_uint(PyBytesIO reader):
cdef unsigned long x = 0 # the result
cdef unsigned int s = 0 # the shift (our result is big-ending)
cdef int i = 0 # n of byte (max 9 for uint64)
cdef bytes b
cdef unsigned long num
while True:
b = reader.read(1)
if len(b) == 0:
raise IndexError('bytes out of range')
num = int.from_bytes(b, "big", signed=False)
if num < 0x80:
if i > 9 or (i == 9 and num > 1):
raise OverflowError()
return int(x | num << s)
x |= (num & 0x7f) << s
s += 7
i += 1
@staticmethod
def read_size(PyBytesIO reader):
cdef unsigned long size = 0
cdef bytes b
cdef unsigned long num
for i in range(3):
b = reader.read(1)
num = int.from_bytes(b, "big", signed=False)
size += num << (8*i)
return size
@staticmethod
def read_int(PyBytesIO reader):
"""
ux, err := ReadUint(reader)
x := int64(ux >> 1)
if err != nil {
return x, err
}
if ux&1 != 0 {
x = ^x
}
return x, err
"""
cdef unsigned long ux = MessageCodec.read_uint(reader)
cdef long x = int(ux >> 1)
if ux & 1 != 0:
x = - x - 1
return x
@staticmethod
def read_string(PyBytesIO reader):
cdef unsigned long length = MessageCodec.read_uint(reader)
cdef bytes s
try:
s = reader.read(length)
except Exception as e:
print(f'Error while reading string of length {length}')
raise Exception(e)
try:
return s.decode("utf-8", errors="replace").replace("\x00", "\uFFFD")
except UnicodeDecodeError:
return None
@staticmethod
def read_message_id(PyBytesIO reader):
"""
Read and return the first byte where the message id is encoded
"""
cdef unsigned long id_ = MessageCodec.read_uint(reader)
return id_
@staticmethod
def encode(PyMsg m):
...
@staticmethod
def decode(bytes b):
cdef PyBytesIO reader = BytesIO(b)
return MessageCodec.read_head_message(reader)
@staticmethod
def check_message_id(bytes b):
"""
todo: make it static and without reader. It's just the first byte
Read and return the first byte where the message id is encoded
"""
cdef PyBytesIO reader = BytesIO(b)
cdef unsigned long id_ = MessageCodec.read_uint(reader)
return id_
@staticmethod
def decode_key(bytes b):
"""
Decode the message key (encoded with little endian)
"""
cdef unsigned long decoded
try:
decoded = int.from_bytes(b, "little", signed=False)
except Exception as e:
print(f"Error while decoding message key (SessionID) from {b}")
raise e
return decoded
def decode_detailed(self, bytes b):
cdef PyBytesIO reader = BytesIO(b)
cdef list messages_list
cdef int mode
try:
messages_list = [self.handler(reader, 0)]
except IndexError:
print('[WARN] Broken batch')
return list()
if isinstance(messages_list[0], BatchMeta):
# Old BatchMeta
mode = 0
elif isinstance(messages_list[0], BatchMetadata):
# New BatchMeta
if messages_list[0].version == 0:
mode = 0
else:
mode = 1
else:
return messages_list
while True:
try:
msg_decoded = self.handler(reader, mode)
if msg_decoded is not None:
messages_list.append(msg_decoded)
except IndexError:
break
return messages_list
def handler(self, PyBytesIO reader, int mode = 0):
cdef unsigned long message_id = MessageCodec.read_message_id(reader)
cdef int r_size
if mode == 1:
# We read the three bytes representing the length of message. It can be used to skip unwanted messages
r_size = MessageCodec.read_size(reader)
if message_id not in self.msg_selector:
reader.read(r_size)
return None
return MessageCodec.read_head_message(reader, message_id)
elif mode == 0:
# Old format with no bytes for message length
return MessageCodec.read_head_message(reader, message_id)
else:
raise IOError()
@staticmethod
def read_head_message(PyBytesIO reader, unsigned long message_id):
<% $messages.each do |msg| %>
if message_id == <%= msg.id %>:
return <%= msg.name %>(
<%= msg.attributes.map { |attr|
"#{attr.name.snake_case}=self.read_#{attr.type.to_s}(reader)" }
.join ",\n "
%>
)
<% end %>