openreplay/ee/connectors/db/tables.py
MauricioGarciaS 28182b951e
Redshift connector (#1170)
* Updated dependancies for redshift connector, changed os module for python-decouple module

* Updated service and images

* Updated message protocol, added exception for BatchMetadata when version is 0 (we apply old read method)

* fixed load error from s3 to redshift. null values for string columns are now empty strings ("")

* Added file test consumer_async.py: reads every 3 minutes kafka raw and send task in background to upload to cloud

* Added method to skip messages that are not inserted to cloud

* Added logs into consumer_async. Changed urls and issues in sessions table from list to string

* Split between messages for sessions table and for events table

* Updated redshift tables

* Fixed small issue in query redshift_sessions.sql

* Updated Dockerfiles. Cleaned logs of consumer_async. Updated/Fixed tables. Transformed Nan as NULL for VARCHAR columns

* Added error handler for sql dropped connection

* chore(docker): Optimize docker builds

Signed-off-by: rjshrjndrn <rjshrjndrn@gmail.com>

* Variables renamed

* Adding compression libraries

* Set default value of count events to 0 (instead of NULL) when event did not occur

* Added support specific  project tracking. Added PG handler to connect to sessions table

* Added method to update values in db connection for sessions ended and restarted

* Removing intelligent file copying

* chore(connector): Build file

Signed-off-by: rjshrjndrn <rjshrjndrn@gmail.com>

* Adding connection pool for pg

* Renaming and optimizing

* Fixed issue of missing information of sessions

---------

Signed-off-by: rjshrjndrn <rjshrjndrn@gmail.com>
Co-authored-by: rjshrjndrn <rjshrjndrn@gmail.com>
2023-05-02 14:02:57 +02:00

107 lines
4 KiB
Python

from pathlib import Path
from decouple import config
base_path = Path(__file__).parent.parent
EVENT_TYPE = config('EVENT_TYPE', default='normal')
def create_tables_clickhouse(db):
if EVENT_TYPE == 'normal':
with open(base_path / 'sql' / 'clickhouse_events.sql', 'r') as f:
q = f.read()
with db.get_live_session() as conn:
conn.execute(q)
print(f"`connector_user_events` table created succesfully.")
with open(base_path / 'sql' / 'clickhouse_events_buffer.sql', 'r') as f:
q = f.read()
with db.get_live_session() as conn:
conn.execute(q)
print(f"`connector_user_events_buffer` table created succesfully.")
with open(base_path / 'sql' / 'clickhouse_sessions.sql', 'r') as f:
q = f.read()
with db.get_live_session() as conn:
conn.execute(q)
print(f"`connector_sessions` table created succesfully.")
with open(base_path / 'sql' / 'clickhouse_sessions_buffer.sql', 'r') as f:
q = f.read()
with db.get_live_session() as conn:
conn.execute(q)
print(f"`connector_sessions_buffer` table created succesfully.")
if EVENT_TYPE == 'detailed':
with open(base_path / 'sql' / 'clickhouse_events_detailed.sql') as f:
q = f.read()
with db.get_live_session() as conn: conn.execute(q)
print(f"`connector_user_events_detailed` table created succesfully.")
with open(base_path / 'sql' / 'clickhouse_events_detailed_buffer.sql') as f:
q = f.read()
with db.get_live_session() as conn: conn.execute(q)
print(f"`connector_user_events_detailed_buffer` table created succesfully.")
def create_tables_postgres(db):
if EVENT_TYPE == 'normal':
with open(base_path / 'sql' / 'postgres_events.sql', 'r') as f:
q = f.read()
with db.get_live_session() as conn:
conn.execute(q)
print(f"`connector_user_events` table created succesfully.")
with open(base_path / 'sql' / 'postgres_sessions.sql', 'r') as f:
q = f.read()
with db.get_live_session() as conn:
conn.execute(q)
print(f"`connector_sessions` table created succesfully.")
if EVENT_TYPE == 'detailed':
with open(base_path / 'sql' / 'postgres_events_detailed.sql') as f:
q = f.read()
with db.get_live_session() as conn: conn.execute(q)
print(f"`connector_user_events_detailed` table created succesfully.")
def create_tables_snowflake(db):
if EVENT_TYPE == 'normal':
with open(base_path / 'sql' / 'snowflake_events.sql', 'r') as f:
q = f.read()
with db.get_live_session() as conn:
conn.execute(q)
print(f"`connector_user_events` table created succesfully.")
with open(base_path / 'sql' / 'snowflake_sessions.sql', 'r') as f:
q = f.read()
with db.get_live_session() as conn:
conn.execute(q)
print(f"`connector_sessions` table created succesfully.")
if EVENT_TYPE == 'detailed':
with open(base_path / 'sql' / 'snowflake_events_detailed.sql') as f:
q = f.read()
with db.get_live_session() as conn: conn.execute(q)
print(f"`connector_user_events_detailed` table created succesfully.")
def create_tables_redshift(db):
if EVENT_TYPE == 'normal':
with open(base_path / 'sql' / 'redshift_events.sql', 'r') as f:
q = f.read()
with db.get_live_session() as conn:
conn.execute(q)
print(f"`connector_user_events` table created succesfully.")
with open(base_path / 'sql' / 'redshift_sessions.sql', 'r') as f:
q = f.read()
with db.get_live_session() as conn:
conn.execute(q)
print(f"`connector_sessions` table created succesfully.")
if EVENT_TYPE == 'detailed':
with open(base_path / 'sql' / 'redshift_events_detailed.sql') as f:
q = f.read()
with db.get_live_session() as conn: conn.execute(q)
print(f"`connector_user_events_detailed` table created succesfully.")