* Updated dependancies for redshift connector, changed os module for python-decouple module
* Updated service and images
* Updated message protocol, added exception for BatchMetadata when version is 0 (we apply old read method)
* fixed load error from s3 to redshift. null values for string columns are now empty strings ("")
* Added file test consumer_async.py: reads every 3 minutes kafka raw and send task in background to upload to cloud
* Added method to skip messages that are not inserted to cloud
* Added logs into consumer_async. Changed urls and issues in sessions table from list to string
* Split between messages for sessions table and for events table
* Updated redshift tables
* Fixed small issue in query redshift_sessions.sql
* Updated Dockerfiles. Cleaned logs of consumer_async. Updated/Fixed tables. Transformed Nan as NULL for VARCHAR columns
* Added error handler for sql dropped connection
* chore(docker): Optimize docker builds
Signed-off-by: rjshrjndrn <rjshrjndrn@gmail.com>
* Variables renamed
* Adding compression libraries
* Set default value of count events to 0 (instead of NULL) when event did not occur
* Added support specific project tracking. Added PG handler to connect to sessions table
* Added method to update values in db connection for sessions ended and restarted
* Removing intelligent file copying
* chore(connector): Build file
Signed-off-by: rjshrjndrn <rjshrjndrn@gmail.com>
* Adding connection pool for pg
* Renaming and optimizing
* Fixed issue of missing information of sessions
---------
Signed-off-by: rjshrjndrn <rjshrjndrn@gmail.com>
Co-authored-by: rjshrjndrn <rjshrjndrn@gmail.com>
107 lines
4 KiB
Python
107 lines
4 KiB
Python
from pathlib import Path
|
|
from decouple import config
|
|
|
|
base_path = Path(__file__).parent.parent
|
|
EVENT_TYPE = config('EVENT_TYPE', default='normal')
|
|
|
|
def create_tables_clickhouse(db):
|
|
if EVENT_TYPE == 'normal':
|
|
with open(base_path / 'sql' / 'clickhouse_events.sql', 'r') as f:
|
|
q = f.read()
|
|
with db.get_live_session() as conn:
|
|
conn.execute(q)
|
|
print(f"`connector_user_events` table created succesfully.")
|
|
|
|
with open(base_path / 'sql' / 'clickhouse_events_buffer.sql', 'r') as f:
|
|
q = f.read()
|
|
with db.get_live_session() as conn:
|
|
conn.execute(q)
|
|
print(f"`connector_user_events_buffer` table created succesfully.")
|
|
|
|
with open(base_path / 'sql' / 'clickhouse_sessions.sql', 'r') as f:
|
|
q = f.read()
|
|
with db.get_live_session() as conn:
|
|
conn.execute(q)
|
|
print(f"`connector_sessions` table created succesfully.")
|
|
|
|
with open(base_path / 'sql' / 'clickhouse_sessions_buffer.sql', 'r') as f:
|
|
q = f.read()
|
|
with db.get_live_session() as conn:
|
|
conn.execute(q)
|
|
print(f"`connector_sessions_buffer` table created succesfully.")
|
|
|
|
if EVENT_TYPE == 'detailed':
|
|
with open(base_path / 'sql' / 'clickhouse_events_detailed.sql') as f:
|
|
q = f.read()
|
|
with db.get_live_session() as conn: conn.execute(q)
|
|
print(f"`connector_user_events_detailed` table created succesfully.")
|
|
|
|
with open(base_path / 'sql' / 'clickhouse_events_detailed_buffer.sql') as f:
|
|
q = f.read()
|
|
with db.get_live_session() as conn: conn.execute(q)
|
|
print(f"`connector_user_events_detailed_buffer` table created succesfully.")
|
|
|
|
|
|
def create_tables_postgres(db):
|
|
if EVENT_TYPE == 'normal':
|
|
with open(base_path / 'sql' / 'postgres_events.sql', 'r') as f:
|
|
q = f.read()
|
|
with db.get_live_session() as conn:
|
|
conn.execute(q)
|
|
print(f"`connector_user_events` table created succesfully.")
|
|
|
|
with open(base_path / 'sql' / 'postgres_sessions.sql', 'r') as f:
|
|
q = f.read()
|
|
with db.get_live_session() as conn:
|
|
conn.execute(q)
|
|
print(f"`connector_sessions` table created succesfully.")
|
|
|
|
if EVENT_TYPE == 'detailed':
|
|
with open(base_path / 'sql' / 'postgres_events_detailed.sql') as f:
|
|
q = f.read()
|
|
with db.get_live_session() as conn: conn.execute(q)
|
|
print(f"`connector_user_events_detailed` table created succesfully.")
|
|
|
|
|
|
def create_tables_snowflake(db):
|
|
|
|
if EVENT_TYPE == 'normal':
|
|
with open(base_path / 'sql' / 'snowflake_events.sql', 'r') as f:
|
|
q = f.read()
|
|
with db.get_live_session() as conn:
|
|
conn.execute(q)
|
|
print(f"`connector_user_events` table created succesfully.")
|
|
|
|
with open(base_path / 'sql' / 'snowflake_sessions.sql', 'r') as f:
|
|
q = f.read()
|
|
with db.get_live_session() as conn:
|
|
conn.execute(q)
|
|
print(f"`connector_sessions` table created succesfully.")
|
|
|
|
if EVENT_TYPE == 'detailed':
|
|
with open(base_path / 'sql' / 'snowflake_events_detailed.sql') as f:
|
|
q = f.read()
|
|
with db.get_live_session() as conn: conn.execute(q)
|
|
print(f"`connector_user_events_detailed` table created succesfully.")
|
|
|
|
|
|
def create_tables_redshift(db):
|
|
if EVENT_TYPE == 'normal':
|
|
with open(base_path / 'sql' / 'redshift_events.sql', 'r') as f:
|
|
q = f.read()
|
|
with db.get_live_session() as conn:
|
|
conn.execute(q)
|
|
print(f"`connector_user_events` table created succesfully.")
|
|
|
|
with open(base_path / 'sql' / 'redshift_sessions.sql', 'r') as f:
|
|
q = f.read()
|
|
with db.get_live_session() as conn:
|
|
conn.execute(q)
|
|
print(f"`connector_sessions` table created succesfully.")
|
|
|
|
if EVENT_TYPE == 'detailed':
|
|
with open(base_path / 'sql' / 'redshift_events_detailed.sql') as f:
|
|
q = f.read()
|
|
with db.get_live_session() as conn: conn.execute(q)
|
|
print(f"`connector_user_events_detailed` table created succesfully.")
|
|
|