* Updated pg connector * fix(player): fix first 8 byte checker * fix(player): fix commit conflict * Added pylint * Removed pylint for incompatible license * change(ui): check for sessions records state * Patch/api v1.12.0 (#1299) * fix(chalice): include metadata in sessions exp search * fix(chalice): fixed sessions exp search wrong col name * fix(chalice): removed cookies * fix(chalice): changed base image to support SSO/xmlsec * fix(chalice): changed Dockerfile to support SSO/xmlsec * fix(chalice): changed Dockerfile to support SSO/xmlsec (cherry picked from commit4b8cf9742c) * fix(ui): project fallback to recorded variable * Patch/api v1.12.0 (#1301) * fix(chalice): changed base image to support SSO/xmlsec * fix(chalice): fixed exp search null metadata (cherry picked from commitab000751d2) * change(ui): assist no content message styles and icons * change(ui): revert menu disable * fix(connector): Added method to save state in s3 for redshift if sigterm arise * Rewriting python code in cython * Added pyx module for messages * Auto create pyx files * Updated and fixed msgcodec.pyx * Added new module to connector code * Updated kafka lib for base image * cleaned Docker and updated base image version for pandas * cleaned prints * Added code to fetch data from db and add it into redshift * Updated consumer reading method. Async multithreading over sessionId * Added split for country (Country,State,City) * Fixed decoding issue for uint * Created service able to fix data from redshift by reading from db * Handle when process ended because of lost connection to pg, country set to country value only
79 lines
2.9 KiB
Python
79 lines
2.9 KiB
Python
from datetime import datetime
|
|
from db.writer import insert_batch, update_batch
|
|
from psycopg2 import InterfaceError
|
|
from time import sleep
|
|
|
|
|
|
def insertBatch(events_batch, sessions_insert_batch, sessions_update_batch, db, sessions_table_name, table_name, EVENT_TYPE):
|
|
t1 = datetime.now().timestamp()
|
|
print(f'[BG-INFO] Number of events to add {len(events_batch)}, number of sessions to add {len(sessions_insert_batch)}, number of sessions to update {len(sessions_update_batch)}')
|
|
if sessions_insert_batch:
|
|
attempt_session_insert(sessions_insert_batch, db, sessions_table_name)
|
|
|
|
if sessions_update_batch:
|
|
attempt_session_update(sessions_update_batch, db, sessions_table_name)
|
|
|
|
if events_batch:
|
|
attempt_batch_insert(events_batch, db, table_name, EVENT_TYPE)
|
|
print(f'[BG-INFO] Uploaded into S3 in {datetime.now().timestamp()-t1} seconds')
|
|
|
|
|
|
def attempt_session_insert(sess_batch, db, sessions_table_name, try_=0):
|
|
if sess_batch:
|
|
try:
|
|
insert_batch(db, sess_batch, table=sessions_table_name, level='sessions')
|
|
except TypeError as e:
|
|
print("Type conversion error")
|
|
print(repr(e))
|
|
except ValueError as e:
|
|
print("Message value could not be processed or inserted correctly")
|
|
print(repr(e))
|
|
except InterfaceError as e:
|
|
if try_ < 3:
|
|
try_ += 1
|
|
sleep(try_*2)
|
|
attempt_session_insert(sess_batch, db, sessions_table_name, try_)
|
|
except Exception as e:
|
|
print(repr(e))
|
|
|
|
|
|
def attempt_session_update(sess_batch, db, sessions_table_name):
|
|
if sess_batch:
|
|
try:
|
|
update_batch(db, sess_batch, table=sessions_table_name)
|
|
except TypeError as e:
|
|
print('Type conversion error')
|
|
print(repr(e))
|
|
except ValueError as e:
|
|
print('Message value could not be processed or inserted correctly')
|
|
print(repr(e))
|
|
except InterfaceError as e:
|
|
print('Error while trying to update session into datawarehouse')
|
|
print(repr(e))
|
|
except Exception as e:
|
|
print(repr(e))
|
|
|
|
|
|
def attempt_batch_insert(events_batch, db, table_name, EVENT_TYPE, try_=0):
|
|
try:
|
|
insert_batch(db=db, batch=events_batch, table=table_name, level=EVENT_TYPE)
|
|
except TypeError as e:
|
|
print("Type conversion error")
|
|
print(repr(e))
|
|
except ValueError as e:
|
|
print("Message value could not be processed or inserted correctly")
|
|
print(repr(e))
|
|
except InterfaceError as e:
|
|
if try_ < 3:
|
|
try_ += 1
|
|
sleep(try_*2)
|
|
attempt_batch_insert(events_batch, db, table_name, EVENT_TYPE, try_)
|
|
elif try_ == 3:
|
|
db.restart()
|
|
sleep(2)
|
|
attempt_batch_insert(events_batch, db, table_name, EVENT_TYPE, try_ + 1)
|
|
else:
|
|
print(repr(e))
|
|
except Exception as e:
|
|
print(repr(e))
|
|
|