fix(connectors): Updated sql to varchar 8000 (#1402)
* fix(connectors): Changed sql from varchar to varchar 8000 * fix(connectors): changed cropping to size 8000 for str in worker task
This commit is contained in:
parent
05571b6813
commit
ecbc42a03f
5 changed files with 98 additions and 93 deletions
|
|
@ -215,11 +215,11 @@ def get_df_from_batch(batch, level):
|
|||
try:
|
||||
if df[x].dtype == "string" or current_types[x] == "string":
|
||||
df[x] = df[x].fillna('NULL')
|
||||
df[x] = df[x].str.slice(0, 255)
|
||||
df[x] = df[x].str.slice(0, 8000)
|
||||
df[x] = df[x].str.replace("|", "")
|
||||
except TypeError as e:
|
||||
print(repr(e))
|
||||
if df[x].dtype == 'str':
|
||||
df[x] = df[x].str.slice(0, 255)
|
||||
df[x] = df[x].str.slice(0, 8000)
|
||||
df[x] = df[x].str.replace("|", "")
|
||||
return df
|
||||
|
|
|
|||
|
|
@ -1,31 +1,31 @@
|
|||
CREATE TABLE IF NOT EXISTS connector_events
|
||||
(
|
||||
sessionid BIGINT,
|
||||
consolelog_level VARCHAR(5000),
|
||||
consolelog_value VARCHAR(5000),
|
||||
customevent_name VARCHAR(5000),
|
||||
customevent_payload VARCHAR(5000),
|
||||
jsexception_message VARCHAR(5000),
|
||||
jsexception_name VARCHAR(5000),
|
||||
jsexception_payload VARCHAR(5000),
|
||||
jsexception_metadata VARCHAR(5000),
|
||||
networkrequest_type VARCHAR(5000),
|
||||
networkrequest_method VARCHAR(5000),
|
||||
networkrequest_url VARCHAR(5000),
|
||||
networkrequest_request VARCHAR(5000),
|
||||
networkrequest_response VARCHAR(5000),
|
||||
consolelog_level VARCHAR(8000),
|
||||
consolelog_value VARCHAR(8000),
|
||||
customevent_name VARCHAR(8000),
|
||||
customevent_payload VARCHAR(8000),
|
||||
jsexception_message VARCHAR(8000),
|
||||
jsexception_name VARCHAR(8000),
|
||||
jsexception_payload VARCHAR(8000),
|
||||
jsexception_metadata VARCHAR(8000),
|
||||
networkrequest_type VARCHAR(8000),
|
||||
networkrequest_method VARCHAR(8000),
|
||||
networkrequest_url VARCHAR(8000),
|
||||
networkrequest_request VARCHAR(8000),
|
||||
networkrequest_response VARCHAR(8000),
|
||||
networkrequest_status BIGINT,
|
||||
networkrequest_timestamp BIGINT,
|
||||
networkrequest_duration BIGINT,
|
||||
issueevent_message_id BIGINT,
|
||||
issueevent_timestamp BIGINT,
|
||||
issueevent_type VARCHAR(5000),
|
||||
issueevent_context_string VARCHAR(5000),
|
||||
issueevent_context VARCHAR(5000),
|
||||
issueevent_payload VARCHAR(5000),
|
||||
issueevent_url VARCHAR(5000),
|
||||
customissue_name VARCHAR(5000),
|
||||
customissue_payload VARCHAR(5000),
|
||||
issueevent_type VARCHAR(8000),
|
||||
issueevent_context_string VARCHAR(8000),
|
||||
issueevent_context VARCHAR(8000),
|
||||
issueevent_payload VARCHAR(8000),
|
||||
issueevent_url VARCHAR(8000),
|
||||
customissue_name VARCHAR(8000),
|
||||
customissue_payload VARCHAR(8000),
|
||||
received_at BIGINT,
|
||||
batch_order_number BIGINT
|
||||
);
|
||||
|
|
|
|||
|
|
@ -2,43 +2,43 @@ CREATE TABLE IF NOT EXISTS connector_events_detailed
|
|||
(
|
||||
sessionid BIGINT,
|
||||
clickevent_hesitationtime BIGINT,
|
||||
clickevent_label VARCHAR(5000),
|
||||
clickevent_label VARCHAR(8000),
|
||||
clickevent_messageid BIGINT,
|
||||
clickevent_timestamp BIGINT,
|
||||
connectioninformation_downlink BIGINT,
|
||||
connectioninformation_type VARCHAR(5000),
|
||||
consolelog_level VARCHAR(5000),
|
||||
consolelog_value VARCHAR(5000),
|
||||
customevent_name VARCHAR(5000),
|
||||
customevent_payload VARCHAR(5000),
|
||||
connectioninformation_type VARCHAR(8000),
|
||||
consolelog_level VARCHAR(8000),
|
||||
consolelog_value VARCHAR(8000),
|
||||
customevent_name VARCHAR(8000),
|
||||
customevent_payload VARCHAR(8000),
|
||||
fetch_duration BIGINT,
|
||||
fetch_method VARCHAR(5000),
|
||||
fetch_request VARCHAR(5000),
|
||||
fetch_response VARCHAR(5000),
|
||||
fetch_method VARCHAR(8000),
|
||||
fetch_request VARCHAR(8000),
|
||||
fetch_response VARCHAR(8000),
|
||||
fetch_status BIGINT,
|
||||
fetch_timestamp BIGINT,
|
||||
fetch_url VARCHAR(5000),
|
||||
graphql_operationkind VARCHAR(5000),
|
||||
graphql_operationname VARCHAR(5000),
|
||||
graphql_response VARCHAR(5000),
|
||||
graphql_variables VARCHAR(5000),
|
||||
inputevent_label VARCHAR(5000),
|
||||
fetch_url VARCHAR(8000),
|
||||
graphql_operationkind VARCHAR(8000),
|
||||
graphql_operationname VARCHAR(8000),
|
||||
graphql_response VARCHAR(8000),
|
||||
graphql_variables VARCHAR(8000),
|
||||
inputevent_label VARCHAR(8000),
|
||||
inputevent_messageid BIGINT,
|
||||
inputevent_timestamp BIGINT,
|
||||
inputevent_value VARCHAR(5000),
|
||||
inputevent_value VARCHAR(8000),
|
||||
inputevent_valuemasked BOOLEAN,
|
||||
jsexception_message VARCHAR(5000),
|
||||
jsexception_name VARCHAR(5000),
|
||||
jsexception_payload VARCHAR(5000),
|
||||
jsexception_metadata VARCHAR(5000),
|
||||
jsexception_message VARCHAR(8000),
|
||||
jsexception_name VARCHAR(8000),
|
||||
jsexception_payload VARCHAR(8000),
|
||||
jsexception_metadata VARCHAR(8000),
|
||||
mouseclick_id BIGINT,
|
||||
mouseclick_hesitationtime BIGINT,
|
||||
mouseclick_label VARCHAR(5000),
|
||||
networkrequest_type VARCHAR(5000),
|
||||
networkrequest_method VARCHAR(5000),
|
||||
networkrequest_url VARCHAR(5000),
|
||||
networkrequest_request VARCHAR(5000),
|
||||
networkrequest_response VARCHAR(5000),
|
||||
mouseclick_label VARCHAR(8000),
|
||||
networkrequest_type VARCHAR(8000),
|
||||
networkrequest_method VARCHAR(8000),
|
||||
networkrequest_url VARCHAR(8000),
|
||||
networkrequest_request VARCHAR(8000),
|
||||
networkrequest_response VARCHAR(8000),
|
||||
networkrequest_status BIGINT,
|
||||
networkrequest_timestamp BIGINT,
|
||||
networkrequest_duration BIGINT,
|
||||
|
|
@ -50,42 +50,42 @@ pageevent_loaded BOOLEAN,
|
|||
pageevent_loadeventend BIGINT,
|
||||
pageevent_loadeventstart BIGINT,
|
||||
pageevent_messageid BIGINT,
|
||||
pageevent_referrer VARCHAR(5000),
|
||||
pageevent_referrer VARCHAR(8000),
|
||||
pageevent_requeststart BIGINT,
|
||||
pageevent_responseend BIGINT,
|
||||
pageevent_responsestart BIGINT,
|
||||
pageevent_speedindex BIGINT,
|
||||
pageevent_timestamp BIGINT,
|
||||
pageevent_url VARCHAR(5000),
|
||||
pageevent_url VARCHAR(8000),
|
||||
sessionend_timestamp BIGINT,
|
||||
sessionend_encryption_key VARCHAR(5000),
|
||||
sessionend_encryption_key VARCHAR(8000),
|
||||
sessionstart_projectid BIGINT,
|
||||
sessionstart_revid VARCHAR(5000),
|
||||
sessionstart_revid VARCHAR(8000),
|
||||
sessionstart_timestamp BIGINT,
|
||||
sessionstart_trackerversion VARCHAR(5000),
|
||||
sessionstart_useragent VARCHAR(5000),
|
||||
sessionstart_userbrowser VARCHAR(5000),
|
||||
sessionstart_userbrowserversion VARCHAR(5000),
|
||||
sessionstart_usercountry VARCHAR(5000),
|
||||
sessionstart_userdevice VARCHAR(5000),
|
||||
sessionstart_trackerversion VARCHAR(8000),
|
||||
sessionstart_useragent VARCHAR(8000),
|
||||
sessionstart_userbrowser VARCHAR(8000),
|
||||
sessionstart_userbrowserversion VARCHAR(8000),
|
||||
sessionstart_usercountry VARCHAR(8000),
|
||||
sessionstart_userdevice VARCHAR(8000),
|
||||
sessionstart_userdeviceheapsize BIGINT,
|
||||
sessionstart_userdevicememorysize BIGINT,
|
||||
sessionstart_userdevicetype VARCHAR(5000),
|
||||
sessionstart_useros VARCHAR(5000),
|
||||
sessionstart_userosversion VARCHAR(5000),
|
||||
sessionstart_useruuid VARCHAR(5000),
|
||||
sessionstart_userdevicetype VARCHAR(8000),
|
||||
sessionstart_useros VARCHAR(8000),
|
||||
sessionstart_userosversion VARCHAR(8000),
|
||||
sessionstart_useruuid VARCHAR(8000),
|
||||
setpagelocation_navigationstart BIGINT,
|
||||
setpagelocation_referrer VARCHAR(5000),
|
||||
setpagelocation_url VARCHAR(5000),
|
||||
setpagelocation_referrer VARCHAR(8000),
|
||||
setpagelocation_url VARCHAR(8000),
|
||||
issueevent_message_id BIGINT,
|
||||
issueevent_timestamp BIGINT,
|
||||
issueevent_type VARCHAR(5000),
|
||||
issueevent_context_string VARCHAR(5000),
|
||||
issueevent_context VARCHAR(5000),
|
||||
issueevent_payload VARCHAR(5000),
|
||||
issueevent_url VARCHAR(5000),
|
||||
customissue_name VARCHAR(5000),
|
||||
customissue_payload VARCHAR(5000),
|
||||
issueevent_type VARCHAR(8000),
|
||||
issueevent_context_string VARCHAR(8000),
|
||||
issueevent_context VARCHAR(8000),
|
||||
issueevent_payload VARCHAR(8000),
|
||||
issueevent_url VARCHAR(8000),
|
||||
customissue_name VARCHAR(8000),
|
||||
customissue_payload VARCHAR(8000),
|
||||
received_at BIGINT,
|
||||
batch_order_number BIGINT
|
||||
);
|
||||
|
|
|
|||
|
|
@ -2,24 +2,24 @@ CREATE TABLE IF NOT EXISTS connector_user_sessions
|
|||
(
|
||||
-- SESSION METADATA
|
||||
sessionid bigint,
|
||||
user_agent VARCHAR,
|
||||
user_browser VARCHAR,
|
||||
user_browser_version VARCHAR,
|
||||
user_country VARCHAR,
|
||||
user_device VARCHAR,
|
||||
user_agent VARCHAR(8000),
|
||||
user_browser VARCHAR(8000),
|
||||
user_browser_version VARCHAR(8000),
|
||||
user_country VARCHAR(8000),
|
||||
user_device VARCHAR(8000),
|
||||
user_device_heap_size bigint,
|
||||
user_device_memory_size bigint,
|
||||
user_device_type VARCHAR,
|
||||
user_os VARCHAR,
|
||||
user_os_version VARCHAR,
|
||||
user_uuid VARCHAR,
|
||||
user_device_type VARCHAR(8000),
|
||||
user_os VARCHAR(8000),
|
||||
user_os_version VARCHAR(8000),
|
||||
user_uuid VARCHAR(8000),
|
||||
connection_effective_bandwidth bigint, -- Downlink
|
||||
connection_type VARCHAR, --"bluetooth", "cellular", "ethernet", "none", "wifi", "wimax", "other", "unknown"
|
||||
metadata_key VARCHAR,
|
||||
metadata_value VARCHAR,
|
||||
referrer VARCHAR,
|
||||
user_anonymous_id VARCHAR,
|
||||
user_id VARCHAR,
|
||||
connection_type VARCHAR(8000), --"bluetooth", "cellular", "ethernet", "none", "wifi", "wimax", "other", "unknown"
|
||||
metadata_key VARCHAR(8000),
|
||||
metadata_value VARCHAR(8000),
|
||||
referrer VARCHAR(8000),
|
||||
user_anonymous_id VARCHAR(8000),
|
||||
user_id VARCHAR(8000),
|
||||
-- TIME
|
||||
session_start_timestamp bigint,
|
||||
session_end_timestamp bigint,
|
||||
|
|
|
|||
|
|
@ -307,7 +307,7 @@ def fix_missing_redshift():
|
|||
try:
|
||||
res = database_api.pdredshift.redshift_to_pandas(query.format(table=table, limit=limit))
|
||||
except Exception as e:
|
||||
logging.error(f'[ERROR] Error while executing query {repr(e)}')
|
||||
# logging.error(f'[ERROR] Error while selecting query. {repr(e)}')
|
||||
database_api.close()
|
||||
return
|
||||
if res is None:
|
||||
|
|
@ -321,11 +321,16 @@ def fix_missing_redshift():
|
|||
# logging.info(f'[FILL INFO] {len(res)} length response')
|
||||
sessionids = list(map(lambda k: str(k), res['sessionid']))
|
||||
asyncio.run(pg_client.init())
|
||||
with pg_client.PostgresClient() as conn:
|
||||
conn.execute('SELECT session_id, user_id FROM sessions WHERE session_id IN ({session_id_list})'.format(
|
||||
session_id_list=','.join(sessionids))
|
||||
)
|
||||
pg_res = conn.fetchall()
|
||||
try:
|
||||
with pg_client.PostgresClient() as conn:
|
||||
conn.execute('SELECT session_id, user_id FROM sessions WHERE session_id IN ({session_id_list})'.format(
|
||||
session_id_list=','.join(sessionids))
|
||||
)
|
||||
pg_res = conn.fetchall()
|
||||
except Exception as e:
|
||||
#logging.error(f'[ERROR] Error while selecting from pg: {repr(e)}')
|
||||
asyncio.run(pg_client.terminate())
|
||||
return
|
||||
logging.info(f'response from pg, length {len(pg_res)}')
|
||||
df = pd.DataFrame(pg_res)
|
||||
df.fillna('NN', inplace=True)
|
||||
|
|
@ -351,7 +356,7 @@ def fix_missing_redshift():
|
|||
try:
|
||||
database_api.pdredshift.exec_commit(base_query)
|
||||
except Exception as e:
|
||||
logging.error(f'[ERROR] Error while executing query {repr(e)}')
|
||||
logging.error(f'[ERROR] Error while executing query. {repr(e)}')
|
||||
database_api.close()
|
||||
asyncio.run(pg_client.terminate())
|
||||
return
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue