* Updated pg connector * fix(player): fix first 8 byte checker * fix(player): fix commit conflict * Added pylint * Removed pylint for incompatible license * change(ui): check for sessions records state * Patch/api v1.12.0 (#1299) * fix(chalice): include metadata in sessions exp search * fix(chalice): fixed sessions exp search wrong col name * fix(chalice): removed cookies * fix(chalice): changed base image to support SSO/xmlsec * fix(chalice): changed Dockerfile to support SSO/xmlsec * fix(chalice): changed Dockerfile to support SSO/xmlsec (cherry picked from commit4b8cf9742c) * fix(ui): project fallback to recorded variable * Patch/api v1.12.0 (#1301) * fix(chalice): changed base image to support SSO/xmlsec * fix(chalice): fixed exp search null metadata (cherry picked from commitab000751d2) * change(ui): assist no content message styles and icons * change(ui): revert menu disable * fix(connector): Added method to save state in s3 for redshift if sigterm arise * Rewriting python code in cython * Added pyx module for messages * Auto create pyx files * Updated and fixed msgcodec.pyx * Added new module to connector code * Updated kafka lib for base image * cleaned Docker and updated base image version for pandas * cleaned prints * Added code to fetch data from db and add it into redshift * Updated consumer reading method. Async multithreading over sessionId * Added split for country (Country,State,City) * Fixed decoding issue for uint * Created service able to fix data from redshift by reading from db * Handle when process ended because of lost connection to pg, country set to country value only
92 lines
2.3 KiB
Cython
92 lines
2.3 KiB
Cython
cimport io
|
|
from libc.stdlib cimport abort
|
|
|
|
cdef extern from "Python.h":
|
|
int PyArg_ParseTupleAndKeywords(object args, object kwargs, char* format, char** keywords, ...)
|
|
|
|
ctypedef object PyBytesIO
|
|
|
|
cdef class Codec:
|
|
"""
|
|
Implements encode/decode primitives
|
|
"""
|
|
|
|
@staticmethod
|
|
cdef read_boolean(PyBytesIO reader):
|
|
cdef bint b
|
|
b = reader.read(1)[0]
|
|
return b == 1
|
|
|
|
@staticmethod
|
|
def read_bool_method(PyBytesIO reader):
|
|
return Codec.read_boolean(reader)
|
|
|
|
@staticmethod
|
|
cdef read_uint(PyBytesIO reader):
|
|
cdef int x = 0 # the result
|
|
cdef int s = 0 # the shift (our result is big-ending)
|
|
cdef int i = 0 # n of byte (max 9 for uint64)
|
|
cdef bytes b
|
|
cdef int num
|
|
|
|
while True:
|
|
b = reader.read(1)
|
|
if len(b) == 0:
|
|
raise IndexError('bytes out of range')
|
|
|
|
num = int.from_bytes(b, "big", signed=False)
|
|
|
|
if num < 0x80:
|
|
if i > 9 or (i == 9 and num > 1):
|
|
raise OverflowError()
|
|
return int(x | num << s)
|
|
x |= (num & 0x7f) << s
|
|
s += 7
|
|
i += 1
|
|
|
|
@staticmethod
|
|
def read_size(PyBytesIO reader):
|
|
cdef int size = 0
|
|
cdef bytes b
|
|
cdef int num
|
|
for i in range(3):
|
|
b = reader.read(1)
|
|
num = int.from_bytes(b, "big", signed=False)
|
|
size += num << (8*i)
|
|
return size
|
|
|
|
|
|
@staticmethod
|
|
def read_int(PyBytesIO reader):
|
|
"""
|
|
ux, err := ReadUint(reader)
|
|
x := int64(ux >> 1)
|
|
if err != nil {
|
|
return x, err
|
|
}
|
|
if ux&1 != 0 {
|
|
x = ^x
|
|
}
|
|
return x, err
|
|
"""
|
|
cdef int ux = Codec.read_uint(reader)
|
|
cdef int x = int(ux >> 1)
|
|
|
|
if ux & 1 != 0:
|
|
x = - x - 1
|
|
return x
|
|
|
|
@staticmethod
|
|
def read_string(PyBytesIO reader):
|
|
cdef int length = Codec.read_uint(reader)
|
|
cdef int s
|
|
try:
|
|
s = reader.read(length)
|
|
except Exception as e:
|
|
print(f'Error while reading string of length {length}')
|
|
raise Exception(e)
|
|
try:
|
|
return s.decode("utf-8", errors="replace").replace("\x00", "\uFFFD")
|
|
except UnicodeDecodeError:
|
|
return None
|
|
|