openreplay/ee/connectors/db/writer.py
MauricioGarciaS 28182b951e
Redshift connector (#1170)
* Updated dependancies for redshift connector, changed os module for python-decouple module

* Updated service and images

* Updated message protocol, added exception for BatchMetadata when version is 0 (we apply old read method)

* fixed load error from s3 to redshift. null values for string columns are now empty strings ("")

* Added file test consumer_async.py: reads every 3 minutes kafka raw and send task in background to upload to cloud

* Added method to skip messages that are not inserted to cloud

* Added logs into consumer_async. Changed urls and issues in sessions table from list to string

* Split between messages for sessions table and for events table

* Updated redshift tables

* Fixed small issue in query redshift_sessions.sql

* Updated Dockerfiles. Cleaned logs of consumer_async. Updated/Fixed tables. Transformed Nan as NULL for VARCHAR columns

* Added error handler for sql dropped connection

* chore(docker): Optimize docker builds

Signed-off-by: rjshrjndrn <rjshrjndrn@gmail.com>

* Variables renamed

* Adding compression libraries

* Set default value of count events to 0 (instead of NULL) when event did not occur

* Added support specific  project tracking. Added PG handler to connect to sessions table

* Added method to update values in db connection for sessions ended and restarted

* Removing intelligent file copying

* chore(connector): Build file

Signed-off-by: rjshrjndrn <rjshrjndrn@gmail.com>

* Adding connection pool for pg

* Renaming and optimizing

* Fixed issue of missing information of sessions

---------

Signed-off-by: rjshrjndrn <rjshrjndrn@gmail.com>
Co-authored-by: rjshrjndrn <rjshrjndrn@gmail.com>
2023-05-02 14:02:57 +02:00

92 lines
3.1 KiB
Python

from decouple import config
DATABASE = config('CLOUD_SERVICE')
from db.api import DBConnection
from db.utils import get_df_from_batch, dtypes_sessions
from db.tables import *
if DATABASE == 'redshift':
from db.loaders.redshift_loader import transit_insert_to_redshift
import pandas as pd
elif DATABASE == 'clickhouse':
from db.loaders.clickhouse_loader import insert_to_clickhouse
elif DATABASE == 'pg':
from db.loaders.postgres_loader import insert_to_postgres
elif DATABASE == 'bigquery':
from db.loaders.bigquery_loader import insert_to_bigquery
from bigquery_utils.create_table import create_tables_bigquery
elif DATABASE == 'snowflake':
from db.loaders.snowflake_loader import insert_to_snowflake
else:
raise Exception(f"{DATABASE}-database not supported")
# create tables if don't exist
_build_tables = config('build_tables', default=False, cast=bool)
if _build_tables:
try:
db = DBConnection(DATABASE)
if DATABASE == 'pg':
create_tables_postgres(db)
if DATABASE == 'clickhouse':
create_tables_clickhouse(db)
if DATABASE == 'snowflake':
create_tables_snowflake(db)
if DATABASE == 'bigquery':
create_tables_bigquery()
if DATABASE == 'redshift':
create_tables_redshift(db)
db.engine.dispose()
db = None
except Exception as e:
print(repr(e))
print("Please create the tables with scripts provided in " +
f"'/sql/{DATABASE}_sessions.sql' and '/sql/{DATABASE}_events.sql'")
def insert_batch(db: DBConnection, batch, table, level='normal'):
if len(batch) == 0:
return
df = get_df_from_batch(batch, level=level)
if db.config == 'redshift':
transit_insert_to_redshift(db=db, df=df, table=table)
return
if db.config == 'clickhouse':
insert_to_clickhouse(db=db, df=df, table=table)
if db.config == 'pg':
insert_to_postgres(db=db, df=df, table=table)
if db.config == 'bigquery':
insert_to_bigquery(df=df, table=table)
if db.config == 'snowflake':
insert_to_snowflake(db=db, df=df, table=table)
def update_batch(db: DBConnection, batch, table):
if len(batch) == 0:
return
df = get_df_from_batch(batch, level='sessions')
base_query = f"UPDATE {table} SET"
for column_name, column_type in dtypes_sessions.items():
if column_name == 'sessionid':
continue
elif column_type == 'string':
df[column_name] = df[column_name].fillna('NULL')
base_query += f" {column_name} = " + "'{" + f"{column_name}" + "}',"
else:
df[column_name] = df[column_name].fillna(0)
base_query += f" {column_name} = " + "{" + f"{column_name}" + "},"
base_query = base_query[:-1] + " WHERE sessionid = {sessionid};"
for i in range(len(df)):
if db.config == 'redshift':
params = dict(df.iloc[i])
query = base_query.format(**params)
try:
db.pdredshift.exec_commit(query)
except Exception as e:
print('[ERROR] Error while executing query')
print(repr(e))