diff --git a/ee/connectors/consumer_async.py b/ee/connectors/consumer_async.py index 5e0d57610..99ae5a087 100644 --- a/ee/connectors/consumer_async.py +++ b/ee/connectors/consumer_async.py @@ -127,8 +127,12 @@ def attempt_batch_insert(batch, db, table_name, EVENT_TYPE, try_=0): try_ += 1 sleep(try_*2) attempt_batch_insert(batch, db, table_name, EVENT_TYPE, try_) - else: + elif try_ == 3: # TODO: Restart redshift + db.restart() + sleep(2) + attempt_batch_insert(batch, db, table_name, EVENT_TYPE, try_ + 1) + else: print(repr(e)) except Exception as e: print(repr(e)) diff --git a/ee/connectors/utils/pg_client.py b/ee/connectors/utils/pg_client.py index 64ca1719f..efd40b52f 100644 --- a/ee/connectors/utils/pg_client.py +++ b/ee/connectors/utils/pg_client.py @@ -10,12 +10,33 @@ from psycopg2 import pool logging.basicConfig(level=config("LOGLEVEL", default=logging.INFO)) logging.getLogger('apscheduler').setLevel(config("LOGLEVEL", default=logging.INFO)) -_PG_CONFIG = {"host": config("pg_host"), +conn_str = config('string_connection', default='') +if conn_str == '': + _PG_CONFIG = {"host": config("pg_host"), "database": config("pg_dbname"), "user": config("pg_user"), "password": config("pg_password"), "port": config("pg_port", cast=int), "application_name": config("APP_NAME", default="PY")} +else: + usr_info, host_info = conn_str.split('@') + i = usr_info.find('://') + pg_user, pg_password = usr_info[i+3:].split(':') + host_info, pg_dbname = host_info.split('/') + i = host_info.find(':') + if i == -1: + pg_host = host_info + pg_port = 5432 + else: + pg_host, pg_port = host_info.split(':') + pg_port = int(pg_port) + _PG_CONFIG = {"host": pg_host, + "database": pg_dbname, + "user": pg_user, + "password": pg_password, + "port": pg_port, + "application_name": config("APP_NAME", default="PY")} + PG_CONFIG = dict(_PG_CONFIG) if config("PG_TIMEOUT", cast=int, default=0) > 0: PG_CONFIG["options"] = f"-c statement_timeout={config('PG_TIMEOUT', cast=int) * 1000}"