From ecbc42a03fddcd08c878c4553ccd91f736421de5 Mon Sep 17 00:00:00 2001 From: MauricioGarciaS <47052044+MauricioGarciaS@users.noreply.github.com> Date: Tue, 11 Jul 2023 13:59:46 +0200 Subject: [PATCH] fix(connectors): Updated sql to varchar 8000 (#1402) * fix(connectors): Changed sql from varchar to varchar 8000 * fix(connectors): changed cropping to size 8000 for str in worker task --- ee/connectors/db/utils.py | 4 +- ee/connectors/sql/redshift_events.sql | 40 ++++---- .../sql/redshift_events_detailed.sql | 98 +++++++++---------- ee/connectors/sql/redshift_sessions.sql | 30 +++--- ee/connectors/utils/worker.py | 19 ++-- 5 files changed, 98 insertions(+), 93 deletions(-) diff --git a/ee/connectors/db/utils.py b/ee/connectors/db/utils.py index e19399340..cda5232f1 100644 --- a/ee/connectors/db/utils.py +++ b/ee/connectors/db/utils.py @@ -215,11 +215,11 @@ def get_df_from_batch(batch, level): try: if df[x].dtype == "string" or current_types[x] == "string": df[x] = df[x].fillna('NULL') - df[x] = df[x].str.slice(0, 255) + df[x] = df[x].str.slice(0, 8000) df[x] = df[x].str.replace("|", "") except TypeError as e: print(repr(e)) if df[x].dtype == 'str': - df[x] = df[x].str.slice(0, 255) + df[x] = df[x].str.slice(0, 8000) df[x] = df[x].str.replace("|", "") return df diff --git a/ee/connectors/sql/redshift_events.sql b/ee/connectors/sql/redshift_events.sql index 7b04e3d29..48213a50a 100644 --- a/ee/connectors/sql/redshift_events.sql +++ b/ee/connectors/sql/redshift_events.sql @@ -1,31 +1,31 @@ CREATE TABLE IF NOT EXISTS connector_events ( sessionid BIGINT, - consolelog_level VARCHAR(5000), - consolelog_value VARCHAR(5000), - customevent_name VARCHAR(5000), - customevent_payload VARCHAR(5000), - jsexception_message VARCHAR(5000), - jsexception_name VARCHAR(5000), - jsexception_payload VARCHAR(5000), - jsexception_metadata VARCHAR(5000), - networkrequest_type VARCHAR(5000), - networkrequest_method VARCHAR(5000), - networkrequest_url VARCHAR(5000), - networkrequest_request VARCHAR(5000), - networkrequest_response VARCHAR(5000), + consolelog_level VARCHAR(8000), + consolelog_value VARCHAR(8000), + customevent_name VARCHAR(8000), + customevent_payload VARCHAR(8000), + jsexception_message VARCHAR(8000), + jsexception_name VARCHAR(8000), + jsexception_payload VARCHAR(8000), + jsexception_metadata VARCHAR(8000), + networkrequest_type VARCHAR(8000), + networkrequest_method VARCHAR(8000), + networkrequest_url VARCHAR(8000), + networkrequest_request VARCHAR(8000), + networkrequest_response VARCHAR(8000), networkrequest_status BIGINT, networkrequest_timestamp BIGINT, networkrequest_duration BIGINT, issueevent_message_id BIGINT, issueevent_timestamp BIGINT, - issueevent_type VARCHAR(5000), - issueevent_context_string VARCHAR(5000), - issueevent_context VARCHAR(5000), - issueevent_payload VARCHAR(5000), - issueevent_url VARCHAR(5000), - customissue_name VARCHAR(5000), - customissue_payload VARCHAR(5000), + issueevent_type VARCHAR(8000), + issueevent_context_string VARCHAR(8000), + issueevent_context VARCHAR(8000), + issueevent_payload VARCHAR(8000), + issueevent_url VARCHAR(8000), + customissue_name VARCHAR(8000), + customissue_payload VARCHAR(8000), received_at BIGINT, batch_order_number BIGINT ); diff --git a/ee/connectors/sql/redshift_events_detailed.sql b/ee/connectors/sql/redshift_events_detailed.sql index 1c9449dad..699e4bf56 100644 --- a/ee/connectors/sql/redshift_events_detailed.sql +++ b/ee/connectors/sql/redshift_events_detailed.sql @@ -2,43 +2,43 @@ CREATE TABLE IF NOT EXISTS connector_events_detailed ( sessionid BIGINT, clickevent_hesitationtime BIGINT, -clickevent_label VARCHAR(5000), +clickevent_label VARCHAR(8000), clickevent_messageid BIGINT, clickevent_timestamp BIGINT, connectioninformation_downlink BIGINT, -connectioninformation_type VARCHAR(5000), -consolelog_level VARCHAR(5000), -consolelog_value VARCHAR(5000), -customevent_name VARCHAR(5000), -customevent_payload VARCHAR(5000), +connectioninformation_type VARCHAR(8000), +consolelog_level VARCHAR(8000), +consolelog_value VARCHAR(8000), +customevent_name VARCHAR(8000), +customevent_payload VARCHAR(8000), fetch_duration BIGINT, -fetch_method VARCHAR(5000), -fetch_request VARCHAR(5000), -fetch_response VARCHAR(5000), +fetch_method VARCHAR(8000), +fetch_request VARCHAR(8000), +fetch_response VARCHAR(8000), fetch_status BIGINT, fetch_timestamp BIGINT, -fetch_url VARCHAR(5000), -graphql_operationkind VARCHAR(5000), -graphql_operationname VARCHAR(5000), -graphql_response VARCHAR(5000), -graphql_variables VARCHAR(5000), -inputevent_label VARCHAR(5000), +fetch_url VARCHAR(8000), +graphql_operationkind VARCHAR(8000), +graphql_operationname VARCHAR(8000), +graphql_response VARCHAR(8000), +graphql_variables VARCHAR(8000), +inputevent_label VARCHAR(8000), inputevent_messageid BIGINT, inputevent_timestamp BIGINT, -inputevent_value VARCHAR(5000), +inputevent_value VARCHAR(8000), inputevent_valuemasked BOOLEAN, -jsexception_message VARCHAR(5000), -jsexception_name VARCHAR(5000), -jsexception_payload VARCHAR(5000), -jsexception_metadata VARCHAR(5000), +jsexception_message VARCHAR(8000), +jsexception_name VARCHAR(8000), +jsexception_payload VARCHAR(8000), +jsexception_metadata VARCHAR(8000), mouseclick_id BIGINT, mouseclick_hesitationtime BIGINT, -mouseclick_label VARCHAR(5000), -networkrequest_type VARCHAR(5000), -networkrequest_method VARCHAR(5000), -networkrequest_url VARCHAR(5000), -networkrequest_request VARCHAR(5000), -networkrequest_response VARCHAR(5000), +mouseclick_label VARCHAR(8000), +networkrequest_type VARCHAR(8000), +networkrequest_method VARCHAR(8000), +networkrequest_url VARCHAR(8000), +networkrequest_request VARCHAR(8000), +networkrequest_response VARCHAR(8000), networkrequest_status BIGINT, networkrequest_timestamp BIGINT, networkrequest_duration BIGINT, @@ -50,42 +50,42 @@ pageevent_loaded BOOLEAN, pageevent_loadeventend BIGINT, pageevent_loadeventstart BIGINT, pageevent_messageid BIGINT, -pageevent_referrer VARCHAR(5000), +pageevent_referrer VARCHAR(8000), pageevent_requeststart BIGINT, pageevent_responseend BIGINT, pageevent_responsestart BIGINT, pageevent_speedindex BIGINT, pageevent_timestamp BIGINT, -pageevent_url VARCHAR(5000), +pageevent_url VARCHAR(8000), sessionend_timestamp BIGINT, -sessionend_encryption_key VARCHAR(5000), +sessionend_encryption_key VARCHAR(8000), sessionstart_projectid BIGINT, -sessionstart_revid VARCHAR(5000), +sessionstart_revid VARCHAR(8000), sessionstart_timestamp BIGINT, -sessionstart_trackerversion VARCHAR(5000), -sessionstart_useragent VARCHAR(5000), -sessionstart_userbrowser VARCHAR(5000), -sessionstart_userbrowserversion VARCHAR(5000), -sessionstart_usercountry VARCHAR(5000), -sessionstart_userdevice VARCHAR(5000), +sessionstart_trackerversion VARCHAR(8000), +sessionstart_useragent VARCHAR(8000), +sessionstart_userbrowser VARCHAR(8000), +sessionstart_userbrowserversion VARCHAR(8000), +sessionstart_usercountry VARCHAR(8000), +sessionstart_userdevice VARCHAR(8000), sessionstart_userdeviceheapsize BIGINT, sessionstart_userdevicememorysize BIGINT, -sessionstart_userdevicetype VARCHAR(5000), -sessionstart_useros VARCHAR(5000), -sessionstart_userosversion VARCHAR(5000), -sessionstart_useruuid VARCHAR(5000), +sessionstart_userdevicetype VARCHAR(8000), +sessionstart_useros VARCHAR(8000), +sessionstart_userosversion VARCHAR(8000), +sessionstart_useruuid VARCHAR(8000), setpagelocation_navigationstart BIGINT, -setpagelocation_referrer VARCHAR(5000), -setpagelocation_url VARCHAR(5000), +setpagelocation_referrer VARCHAR(8000), +setpagelocation_url VARCHAR(8000), issueevent_message_id BIGINT, issueevent_timestamp BIGINT, -issueevent_type VARCHAR(5000), -issueevent_context_string VARCHAR(5000), -issueevent_context VARCHAR(5000), -issueevent_payload VARCHAR(5000), -issueevent_url VARCHAR(5000), -customissue_name VARCHAR(5000), -customissue_payload VARCHAR(5000), +issueevent_type VARCHAR(8000), +issueevent_context_string VARCHAR(8000), +issueevent_context VARCHAR(8000), +issueevent_payload VARCHAR(8000), +issueevent_url VARCHAR(8000), +customissue_name VARCHAR(8000), +customissue_payload VARCHAR(8000), received_at BIGINT, batch_order_number BIGINT ); diff --git a/ee/connectors/sql/redshift_sessions.sql b/ee/connectors/sql/redshift_sessions.sql index d7ad8a603..6f2355de3 100644 --- a/ee/connectors/sql/redshift_sessions.sql +++ b/ee/connectors/sql/redshift_sessions.sql @@ -2,24 +2,24 @@ CREATE TABLE IF NOT EXISTS connector_user_sessions ( -- SESSION METADATA sessionid bigint, - user_agent VARCHAR, - user_browser VARCHAR, - user_browser_version VARCHAR, - user_country VARCHAR, - user_device VARCHAR, + user_agent VARCHAR(8000), + user_browser VARCHAR(8000), + user_browser_version VARCHAR(8000), + user_country VARCHAR(8000), + user_device VARCHAR(8000), user_device_heap_size bigint, user_device_memory_size bigint, - user_device_type VARCHAR, - user_os VARCHAR, - user_os_version VARCHAR, - user_uuid VARCHAR, + user_device_type VARCHAR(8000), + user_os VARCHAR(8000), + user_os_version VARCHAR(8000), + user_uuid VARCHAR(8000), connection_effective_bandwidth bigint, -- Downlink - connection_type VARCHAR, --"bluetooth", "cellular", "ethernet", "none", "wifi", "wimax", "other", "unknown" - metadata_key VARCHAR, - metadata_value VARCHAR, - referrer VARCHAR, - user_anonymous_id VARCHAR, - user_id VARCHAR, + connection_type VARCHAR(8000), --"bluetooth", "cellular", "ethernet", "none", "wifi", "wimax", "other", "unknown" + metadata_key VARCHAR(8000), + metadata_value VARCHAR(8000), + referrer VARCHAR(8000), + user_anonymous_id VARCHAR(8000), + user_id VARCHAR(8000), -- TIME session_start_timestamp bigint, session_end_timestamp bigint, diff --git a/ee/connectors/utils/worker.py b/ee/connectors/utils/worker.py index 5692ce234..164cebff7 100644 --- a/ee/connectors/utils/worker.py +++ b/ee/connectors/utils/worker.py @@ -307,7 +307,7 @@ def fix_missing_redshift(): try: res = database_api.pdredshift.redshift_to_pandas(query.format(table=table, limit=limit)) except Exception as e: - logging.error(f'[ERROR] Error while executing query {repr(e)}') + # logging.error(f'[ERROR] Error while selecting query. {repr(e)}') database_api.close() return if res is None: @@ -321,11 +321,16 @@ def fix_missing_redshift(): # logging.info(f'[FILL INFO] {len(res)} length response') sessionids = list(map(lambda k: str(k), res['sessionid'])) asyncio.run(pg_client.init()) - with pg_client.PostgresClient() as conn: - conn.execute('SELECT session_id, user_id FROM sessions WHERE session_id IN ({session_id_list})'.format( - session_id_list=','.join(sessionids)) - ) - pg_res = conn.fetchall() + try: + with pg_client.PostgresClient() as conn: + conn.execute('SELECT session_id, user_id FROM sessions WHERE session_id IN ({session_id_list})'.format( + session_id_list=','.join(sessionids)) + ) + pg_res = conn.fetchall() + except Exception as e: + #logging.error(f'[ERROR] Error while selecting from pg: {repr(e)}') + asyncio.run(pg_client.terminate()) + return logging.info(f'response from pg, length {len(pg_res)}') df = pd.DataFrame(pg_res) df.fillna('NN', inplace=True) @@ -351,7 +356,7 @@ def fix_missing_redshift(): try: database_api.pdredshift.exec_commit(base_query) except Exception as e: - logging.error(f'[ERROR] Error while executing query {repr(e)}') + logging.error(f'[ERROR] Error while executing query. {repr(e)}') database_api.close() asyncio.run(pg_client.terminate()) return