* Updated pg connector * fix(player): fix first 8 byte checker * fix(player): fix commit conflict * Added pylint * Removed pylint for incompatible license * change(ui): check for sessions records state * Patch/api v1.12.0 (#1299) * fix(chalice): include metadata in sessions exp search * fix(chalice): fixed sessions exp search wrong col name * fix(chalice): removed cookies * fix(chalice): changed base image to support SSO/xmlsec * fix(chalice): changed Dockerfile to support SSO/xmlsec * fix(chalice): changed Dockerfile to support SSO/xmlsec (cherry picked from commit4b8cf9742c) * fix(ui): project fallback to recorded variable * Patch/api v1.12.0 (#1301) * fix(chalice): changed base image to support SSO/xmlsec * fix(chalice): fixed exp search null metadata (cherry picked from commitab000751d2) * change(ui): assist no content message styles and icons * change(ui): revert menu disable * fix(connector): Added method to save state in s3 for redshift if sigterm arise * Rewriting python code in cython * Added pyx module for messages * Auto create pyx files * Updated and fixed msgcodec.pyx * Added new module to connector code * Updated kafka lib for base image * cleaned Docker and updated base image version for pandas * cleaned prints * Added code to fetch data from db and add it into redshift * Updated consumer reading method. Async multithreading over sessionId * Added split for country (Country,State,City) * Fixed decoding issue for uint * Created service able to fix data from redshift by reading from db * Handle when process ended because of lost connection to pg, country set to country value only
201 lines
5.8 KiB
Text
201 lines
5.8 KiB
Text
# Auto-generated, do not edit
|
|
|
|
from messages import *
|
|
#from io cimport BytesIO
|
|
from io import BytesIO
|
|
from libc.stdlib cimport abort
|
|
|
|
cdef extern from "Python.h":
|
|
int PyArg_ParseTupleAndKeywords(object args, object kwargs, char* format, char** keywords, ...)
|
|
|
|
cdef class PyMsg:
|
|
def __cinit__(self):
|
|
pass
|
|
|
|
ctypedef object PyBytesIO
|
|
|
|
cdef class MessageCodec:
|
|
"""
|
|
Implements encode/decode primitives
|
|
"""
|
|
cdef list msg_selector
|
|
|
|
def __init__(self, list msg_selector):
|
|
self.msg_selector = msg_selector
|
|
|
|
@staticmethod
|
|
cdef read_boolean(PyBytesIO reader):
|
|
cdef bint b
|
|
b = reader.read(1)[0]
|
|
return b == 1
|
|
|
|
@staticmethod
|
|
def read_bool_method(PyBytesIO reader):
|
|
return MessageCodec.read_boolean(reader)
|
|
|
|
@staticmethod
|
|
cdef read_uint(PyBytesIO reader):
|
|
cdef unsigned long x = 0 # the result
|
|
cdef unsigned int s = 0 # the shift (our result is big-ending)
|
|
cdef int i = 0 # n of byte (max 9 for uint64)
|
|
cdef bytes b
|
|
cdef unsigned long num
|
|
|
|
while True:
|
|
b = reader.read(1)
|
|
if len(b) == 0:
|
|
raise IndexError('bytes out of range')
|
|
|
|
num = int.from_bytes(b, "big", signed=False)
|
|
|
|
if num < 0x80:
|
|
if i > 9 or (i == 9 and num > 1):
|
|
raise OverflowError()
|
|
return int(x | num << s)
|
|
x |= (num & 0x7f) << s
|
|
s += 7
|
|
i += 1
|
|
|
|
@staticmethod
|
|
def read_size(PyBytesIO reader):
|
|
cdef unsigned long size = 0
|
|
cdef bytes b
|
|
cdef unsigned long num
|
|
for i in range(3):
|
|
b = reader.read(1)
|
|
num = int.from_bytes(b, "big", signed=False)
|
|
size += num << (8*i)
|
|
return size
|
|
|
|
|
|
@staticmethod
|
|
def read_int(PyBytesIO reader):
|
|
"""
|
|
ux, err := ReadUint(reader)
|
|
x := int64(ux >> 1)
|
|
if err != nil {
|
|
return x, err
|
|
}
|
|
if ux&1 != 0 {
|
|
x = ^x
|
|
}
|
|
return x, err
|
|
"""
|
|
cdef unsigned long ux = MessageCodec.read_uint(reader)
|
|
cdef long x = int(ux >> 1)
|
|
|
|
if ux & 1 != 0:
|
|
x = - x - 1
|
|
return x
|
|
|
|
@staticmethod
|
|
def read_string(PyBytesIO reader):
|
|
cdef unsigned long length = MessageCodec.read_uint(reader)
|
|
cdef bytes s
|
|
try:
|
|
s = reader.read(length)
|
|
except Exception as e:
|
|
print(f'Error while reading string of length {length}')
|
|
raise Exception(e)
|
|
try:
|
|
return s.decode("utf-8", errors="replace").replace("\x00", "\uFFFD")
|
|
except UnicodeDecodeError:
|
|
return None
|
|
|
|
@staticmethod
|
|
def read_message_id(PyBytesIO reader):
|
|
"""
|
|
Read and return the first byte where the message id is encoded
|
|
"""
|
|
cdef unsigned long id_ = MessageCodec.read_uint(reader)
|
|
return id_
|
|
|
|
@staticmethod
|
|
def encode(PyMsg m):
|
|
...
|
|
|
|
@staticmethod
|
|
def decode(bytes b):
|
|
cdef PyBytesIO reader = BytesIO(b)
|
|
return MessageCodec.read_head_message(reader)
|
|
|
|
@staticmethod
|
|
def check_message_id(bytes b):
|
|
"""
|
|
todo: make it static and without reader. It's just the first byte
|
|
Read and return the first byte where the message id is encoded
|
|
"""
|
|
cdef PyBytesIO reader = BytesIO(b)
|
|
cdef unsigned long id_ = MessageCodec.read_uint(reader)
|
|
|
|
return id_
|
|
|
|
@staticmethod
|
|
def decode_key(bytes b):
|
|
"""
|
|
Decode the message key (encoded with little endian)
|
|
"""
|
|
cdef unsigned long decoded
|
|
try:
|
|
decoded = int.from_bytes(b, "little", signed=False)
|
|
except Exception as e:
|
|
print(f"Error while decoding message key (SessionID) from {b}")
|
|
raise e
|
|
return decoded
|
|
|
|
def decode_detailed(self, bytes b):
|
|
cdef PyBytesIO reader = BytesIO(b)
|
|
cdef list messages_list
|
|
cdef int mode
|
|
try:
|
|
messages_list = [self.handler(reader, 0)]
|
|
except IndexError:
|
|
print('[WARN] Broken batch')
|
|
return list()
|
|
if isinstance(messages_list[0], BatchMeta):
|
|
# Old BatchMeta
|
|
mode = 0
|
|
elif isinstance(messages_list[0], BatchMetadata):
|
|
# New BatchMeta
|
|
if messages_list[0].version == 0:
|
|
mode = 0
|
|
else:
|
|
mode = 1
|
|
else:
|
|
return messages_list
|
|
while True:
|
|
try:
|
|
msg_decoded = self.handler(reader, mode)
|
|
if msg_decoded is not None:
|
|
messages_list.append(msg_decoded)
|
|
except IndexError:
|
|
break
|
|
return messages_list
|
|
|
|
def handler(self, PyBytesIO reader, int mode = 0):
|
|
cdef unsigned long message_id = MessageCodec.read_message_id(reader)
|
|
cdef int r_size
|
|
if mode == 1:
|
|
# We read the three bytes representing the length of message. It can be used to skip unwanted messages
|
|
r_size = MessageCodec.read_size(reader)
|
|
if message_id not in self.msg_selector:
|
|
reader.read(r_size)
|
|
return None
|
|
return MessageCodec.read_head_message(reader, message_id)
|
|
elif mode == 0:
|
|
# Old format with no bytes for message length
|
|
return MessageCodec.read_head_message(reader, message_id)
|
|
else:
|
|
raise IOError()
|
|
|
|
@staticmethod
|
|
def read_head_message(PyBytesIO reader, unsigned long message_id):
|
|
<% $messages.each do |msg| %>
|
|
if message_id == <%= msg.id %>:
|
|
return <%= msg.name %>(
|
|
<%= msg.attributes.map { |attr|
|
|
"#{attr.name.snake_case}=self.read_#{attr.type.to_s}(reader)" }
|
|
.join ",\n "
|
|
%>
|
|
)
|
|
<% end %>
|