openreplay/ee/connectors/utils/uploader.py
MauricioGarciaS 91921d622e
fix(connectors): Created module in Cython to speed up message decoding (#1338)
* Updated pg connector

* fix(player): fix first 8 byte checker

* fix(player): fix commit conflict

* Added pylint

* Removed pylint for incompatible license

* change(ui): check for sessions records state

* Patch/api v1.12.0 (#1299)

* fix(chalice): include metadata in sessions exp search

* fix(chalice): fixed sessions exp search wrong col name

* fix(chalice): removed cookies

* fix(chalice): changed base image to support SSO/xmlsec

* fix(chalice): changed Dockerfile to support SSO/xmlsec

* fix(chalice): changed Dockerfile to support SSO/xmlsec

(cherry picked from commit 4b8cf9742c)

* fix(ui): project fallback to recorded variable

* Patch/api v1.12.0 (#1301)

* fix(chalice): changed base image to support SSO/xmlsec

* fix(chalice): fixed exp search null metadata

(cherry picked from commit ab000751d2)

* change(ui): assist no content message styles and icons

* change(ui): revert menu disable

* fix(connector): Added method to save state in s3 for redshift if sigterm arise

* Rewriting python code in cython

* Added pyx module for messages

* Auto create pyx files

* Updated and fixed msgcodec.pyx

* Added new module to connector code

* Updated kafka lib for base image

* cleaned Docker and updated base image version for pandas

* cleaned prints

* Added code to fetch data from db and add it into redshift

* Updated consumer reading method. Async multithreading over sessionId

* Added split for country (Country,State,City)

* Fixed decoding issue for uint

* Created service able to fix data from redshift by reading from db

* Handle when process ended because of lost connection to pg, country set to country value only
2023-06-23 14:49:39 +02:00

79 lines
2.9 KiB
Python

from datetime import datetime
from db.writer import insert_batch, update_batch
from psycopg2 import InterfaceError
from time import sleep
def insertBatch(events_batch, sessions_insert_batch, sessions_update_batch, db, sessions_table_name, table_name, EVENT_TYPE):
t1 = datetime.now().timestamp()
print(f'[BG-INFO] Number of events to add {len(events_batch)}, number of sessions to add {len(sessions_insert_batch)}, number of sessions to update {len(sessions_update_batch)}')
if sessions_insert_batch:
attempt_session_insert(sessions_insert_batch, db, sessions_table_name)
if sessions_update_batch:
attempt_session_update(sessions_update_batch, db, sessions_table_name)
if events_batch:
attempt_batch_insert(events_batch, db, table_name, EVENT_TYPE)
print(f'[BG-INFO] Uploaded into S3 in {datetime.now().timestamp()-t1} seconds')
def attempt_session_insert(sess_batch, db, sessions_table_name, try_=0):
if sess_batch:
try:
insert_batch(db, sess_batch, table=sessions_table_name, level='sessions')
except TypeError as e:
print("Type conversion error")
print(repr(e))
except ValueError as e:
print("Message value could not be processed or inserted correctly")
print(repr(e))
except InterfaceError as e:
if try_ < 3:
try_ += 1
sleep(try_*2)
attempt_session_insert(sess_batch, db, sessions_table_name, try_)
except Exception as e:
print(repr(e))
def attempt_session_update(sess_batch, db, sessions_table_name):
if sess_batch:
try:
update_batch(db, sess_batch, table=sessions_table_name)
except TypeError as e:
print('Type conversion error')
print(repr(e))
except ValueError as e:
print('Message value could not be processed or inserted correctly')
print(repr(e))
except InterfaceError as e:
print('Error while trying to update session into datawarehouse')
print(repr(e))
except Exception as e:
print(repr(e))
def attempt_batch_insert(events_batch, db, table_name, EVENT_TYPE, try_=0):
try:
insert_batch(db=db, batch=events_batch, table=table_name, level=EVENT_TYPE)
except TypeError as e:
print("Type conversion error")
print(repr(e))
except ValueError as e:
print("Message value could not be processed or inserted correctly")
print(repr(e))
except InterfaceError as e:
if try_ < 3:
try_ += 1
sleep(try_*2)
attempt_batch_insert(events_batch, db, table_name, EVENT_TYPE, try_)
elif try_ == 3:
db.restart()
sleep(2)
attempt_batch_insert(events_batch, db, table_name, EVENT_TYPE, try_ + 1)
else:
print(repr(e))
except Exception as e:
print(repr(e))