connectors

This commit is contained in:
ourvakan 2021-05-18 17:10:08 +03:00
parent f2a035a793
commit d3db0c0734
33 changed files with 4243 additions and 0 deletions

View file

@ -0,0 +1,357 @@
import os
from google.cloud import bigquery
from db.loaders.bigquery_loader import creds_file
def create_tables_bigquery():
create_sessions_table(creds_file=creds_file,
table_id=f"{os.environ['project_id']}.{os.environ['dataset']}.{os.environ['sessions_table']}")
print(f"`{os.environ['sessions_table']}` table created succesfully.")
create_events_table(creds_file=creds_file,
table_id=f"{os.environ['project_id']}.{os.environ['dataset']}.{os.environ['events_table_name']}")
print(f"`{os.environ['events_table_name']}` table created succesfully.")
def create_table(creds_file, table_id, schema):
client = bigquery.Client.from_service_account_json(creds_file)
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table) # Make an API request.
print(
"Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
)
def create_sessions_table(creds_file, table_id):
schema = [
bigquery.SchemaField("sessionid", "INT64", mode="REQUIRED"),
bigquery.SchemaField("user_agent", "STRING"),
bigquery.SchemaField("user_browser", "STRING"),
bigquery.SchemaField("user_browser_version", "STRING"),
bigquery.SchemaField("user_country", "STRING"),
bigquery.SchemaField("user_device", "STRING"),
bigquery.SchemaField("user_device_heap_size", "INT64"),
bigquery.SchemaField("user_device_memory_size", "INT64"),
bigquery.SchemaField("user_device_type", "STRING"),
bigquery.SchemaField("user_os", "STRING"),
bigquery.SchemaField("user_os_version", "STRING"),
bigquery.SchemaField("user_uuid", "STRING"),
bigquery.SchemaField("connection_effective_bandwidth", "INT64"),
bigquery.SchemaField("connection_type", "STRING"),
bigquery.SchemaField("metadata_key", "STRING"),
bigquery.SchemaField("metadata_value", "STRING"),
bigquery.SchemaField("referrer", "STRING"),
bigquery.SchemaField("user_anonymous_id", "STRING"),
bigquery.SchemaField("user_id", "STRING"),
bigquery.SchemaField("session_start_timestamp", "INT64"),
bigquery.SchemaField("session_end_timestamp", "INT64"),
bigquery.SchemaField("session_duration", "INT64"),
bigquery.SchemaField("first_contentful_paint", "INT64"),
bigquery.SchemaField("speed_index", "INT64"),
bigquery.SchemaField("visually_complete", "INT64"),
bigquery.SchemaField("timing_time_to_interactive", "INT64"),
bigquery.SchemaField("avg_cpu", "INT64"),
bigquery.SchemaField("avg_fps", "INT64"),
bigquery.SchemaField("max_cpu", "INT64"),
bigquery.SchemaField("max_fps", "INT64"),
bigquery.SchemaField("max_total_js_heap_size", "INT64"),
bigquery.SchemaField("max_used_js_heap_size", "INT64"),
bigquery.SchemaField("js_exceptions_count", "INT64"),
bigquery.SchemaField("long_tasks_total_duration", "INT64"),
bigquery.SchemaField("long_tasks_max_duration", "INT64"),
bigquery.SchemaField("long_tasks_count", "INT64"),
bigquery.SchemaField("inputs_count", "INT64"),
bigquery.SchemaField("clicks_count", "INT64"),
bigquery.SchemaField("issues_count", "INT64"),
bigquery.SchemaField("issues", "STRING"),
bigquery.SchemaField("urls_count", "INT64"),
bigquery.SchemaField("urls", "STRING")]
create_table(creds_file, table_id, schema)
def create_events_table(creds_file, table_id):
schema = [
bigquery.SchemaField("sessionid", "INT64"),
bigquery.SchemaField("connectioninformation_downlink", "INT64"),
bigquery.SchemaField("connectioninformation_type", "STRING"),
bigquery.SchemaField("consolelog_level", "STRING"),
bigquery.SchemaField("consolelog_value", "STRING"),
bigquery.SchemaField("customevent_messageid", "INT64"),
bigquery.SchemaField("customevent_name", "STRING"),
bigquery.SchemaField("customevent_payload", "STRING"),
bigquery.SchemaField("customevent_timestamp", "INT64"),
bigquery.SchemaField("errorevent_message", "STRING"),
bigquery.SchemaField("errorevent_messageid", "INT64"),
bigquery.SchemaField("errorevent_name", "STRING"),
bigquery.SchemaField("errorevent_payload", "STRING"),
bigquery.SchemaField("errorevent_source", "STRING"),
bigquery.SchemaField("errorevent_timestamp", "INT64"),
bigquery.SchemaField("jsexception_message", "STRING"),
bigquery.SchemaField("jsexception_name", "STRING"),
bigquery.SchemaField("jsexception_payload", "STRING"),
bigquery.SchemaField("metadata_key", "STRING"),
bigquery.SchemaField("metadata_value", "STRING"),
bigquery.SchemaField("mouseclick_id", "INT64"),
bigquery.SchemaField("mouseclick_hesitationtime", "INT64"),
bigquery.SchemaField("mouseclick_label", "STRING"),
bigquery.SchemaField("pageevent_firstcontentfulpaint", "INT64"),
bigquery.SchemaField("pageevent_firstpaint", "INT64"),
bigquery.SchemaField("pageevent_messageid", "INT64"),
bigquery.SchemaField("pageevent_referrer", "STRING"),
bigquery.SchemaField("pageevent_speedindex", "INT64"),
bigquery.SchemaField("pageevent_timestamp", "INT64"),
bigquery.SchemaField("pageevent_url", "STRING"),
bigquery.SchemaField("pagerendertiming_timetointeractive", "INT64"),
bigquery.SchemaField("pagerendertiming_visuallycomplete", "INT64"),
bigquery.SchemaField("rawcustomevent_name", "STRING"),
bigquery.SchemaField("rawcustomevent_payload", "STRING"),
bigquery.SchemaField("setviewportsize_height", "INT64"),
bigquery.SchemaField("setviewportsize_width", "INT64"),
bigquery.SchemaField("timestamp_timestamp", "INT64"),
bigquery.SchemaField("user_anonymous_id", "STRING"),
bigquery.SchemaField("user_id", "STRING"),
bigquery.SchemaField("issueevent_messageid", "INT64"),
bigquery.SchemaField("issueevent_timestamp", "INT64"),
bigquery.SchemaField("issueevent_type", "STRING"),
bigquery.SchemaField("issueevent_contextstring", "STRING"),
bigquery.SchemaField("issueevent_context", "STRING"),
bigquery.SchemaField("issueevent_payload", "STRING"),
bigquery.SchemaField("customissue_name", "STRING"),
bigquery.SchemaField("customissue_payload", "STRING"),
bigquery.SchemaField("received_at", "INT64"),
bigquery.SchemaField("batch_order_number", "INT64")]
create_table(creds_file, table_id, schema)
def create_table_negatives(creds_file, table_id):
client = bigquery.Client.from_service_account_json(creds_file)
schema = [
bigquery.SchemaField("sessionid", "INT64", mode="REQUIRED"),
bigquery.SchemaField("clickevent_hesitationtime", "INT64"),
bigquery.SchemaField("clickevent_label", "STRING"),
bigquery.SchemaField("clickevent_messageid", "INT64"),
bigquery.SchemaField("clickevent_timestamp", "INT64"),
bigquery.SchemaField("connectioninformation_downlink", "INT64"),
bigquery.SchemaField("connectioninformation_type", "STRING"),
bigquery.SchemaField("consolelog_level", "STRING"),
bigquery.SchemaField("consolelog_value", "STRING"),
bigquery.SchemaField("cpuissue_duration", "INT64"),
bigquery.SchemaField("cpuissue_rate", "INT64"),
bigquery.SchemaField("cpuissue_timestamp", "INT64"),
bigquery.SchemaField("createdocument", "BOOL"),
bigquery.SchemaField("createelementnode_id", "INT64"),
bigquery.SchemaField("createelementnode_parentid", "INT64"),
bigquery.SchemaField("cssdeleterule_index", "INT64"),
bigquery.SchemaField("cssdeleterule_stylesheetid", "INT64"),
bigquery.SchemaField("cssinsertrule_index", "INT64"),
bigquery.SchemaField("cssinsertrule_rule", "STRING"),
bigquery.SchemaField("cssinsertrule_stylesheetid", "INT64"),
bigquery.SchemaField("customevent_messageid", "INT64"),
bigquery.SchemaField("customevent_name", "STRING"),
bigquery.SchemaField("customevent_payload", "STRING"),
bigquery.SchemaField("customevent_timestamp", "INT64"),
bigquery.SchemaField("domdrop_timestamp", "INT64"),
bigquery.SchemaField("errorevent_message", "STRING"),
bigquery.SchemaField("errorevent_messageid", "INT64"),
bigquery.SchemaField("errorevent_name", "STRING"),
bigquery.SchemaField("errorevent_payload", "STRING"),
bigquery.SchemaField("errorevent_source", "STRING"),
bigquery.SchemaField("errorevent_timestamp", "INT64"),
bigquery.SchemaField("fetch_duration", "INT64"),
bigquery.SchemaField("fetch_method", "STRING"),
bigquery.SchemaField("fetch_request", "STRING"),
bigquery.SchemaField("fetch_response", "STRING"),
bigquery.SchemaField("fetch_status", "INT64"),
bigquery.SchemaField("fetch_timestamp", "INT64"),
bigquery.SchemaField("fetch_url", "STRING"),
bigquery.SchemaField("graphql_operationkind", "STRING"),
bigquery.SchemaField("graphql_operationname", "STRING"),
bigquery.SchemaField("graphql_response", "STRING"),
bigquery.SchemaField("graphql_variables", "STRING"),
bigquery.SchemaField("graphqlevent_messageid", "INT64"),
bigquery.SchemaField("graphqlevent_name", "STRING"),
bigquery.SchemaField("graphqlevent_timestamp", "INT64"),
bigquery.SchemaField("inputevent_label", "STRING"),
bigquery.SchemaField("inputevent_messageid", "INT64"),
bigquery.SchemaField("inputevent_timestamp", "INT64"),
bigquery.SchemaField("inputevent_value", "STRING"),
bigquery.SchemaField("inputevent_valuemasked", "BOOL"),
bigquery.SchemaField("is_asayer_event", "BOOL"),
bigquery.SchemaField("jsexception_message", "STRING"),
bigquery.SchemaField("jsexception_name", "STRING"),
bigquery.SchemaField("jsexception_payload", "STRING"),
bigquery.SchemaField("longtasks_timestamp", "INT64"),
bigquery.SchemaField("longtasks_duration", "INT64"),
bigquery.SchemaField("longtasks_containerid", "STRING"),
bigquery.SchemaField("longtasks_containersrc", "STRING"),
bigquery.SchemaField("memoryissue_duration", "INT64"),
bigquery.SchemaField("memoryissue_rate", "INT64"),
bigquery.SchemaField("memoryissue_timestamp", "INT64"),
bigquery.SchemaField("metadata_key", "STRING"),
bigquery.SchemaField("metadata_value", "STRING"),
bigquery.SchemaField("mobx_payload", "STRING"),
bigquery.SchemaField("mobx_type", "STRING"),
bigquery.SchemaField("mouseclick_id", "INT64"),
bigquery.SchemaField("mouseclick_hesitationtime", "INT64"),
bigquery.SchemaField("mouseclick_label", "STRING"),
bigquery.SchemaField("mousemove_x", "INT64"),
bigquery.SchemaField("mousemove_y", "INT64"),
bigquery.SchemaField("movenode_id", "INT64"),
bigquery.SchemaField("movenode_index", "INT64"),
bigquery.SchemaField("movenode_parentid", "INT64"),
bigquery.SchemaField("ngrx_action", "STRING"),
bigquery.SchemaField("ngrx_duration", "INT64"),
bigquery.SchemaField("ngrx_state", "STRING"),
bigquery.SchemaField("otable_key", "STRING"),
bigquery.SchemaField("otable_value", "STRING"),
bigquery.SchemaField("pageevent_domcontentloadedeventend", "INT64"),
bigquery.SchemaField("pageevent_domcontentloadedeventstart", "INT64"),
bigquery.SchemaField("pageevent_firstcontentfulpaint", "INT64"),
bigquery.SchemaField("pageevent_firstpaint", "INT64"),
bigquery.SchemaField("pageevent_loaded", "BOOL"),
bigquery.SchemaField("pageevent_loadeventend", "INT64"),
bigquery.SchemaField("pageevent_loadeventstart", "INT64"),
bigquery.SchemaField("pageevent_messageid", "INT64"),
bigquery.SchemaField("pageevent_referrer", "STRING"),
bigquery.SchemaField("pageevent_requeststart", "INT64"),
bigquery.SchemaField("pageevent_responseend", "INT64"),
bigquery.SchemaField("pageevent_responsestart", "INT64"),
bigquery.SchemaField("pageevent_speedindex", "INT64"),
bigquery.SchemaField("pageevent_timestamp", "INT64"),
bigquery.SchemaField("pageevent_url", "STRING"),
bigquery.SchemaField("pageloadtiming_domcontentloadedeventend", "INT64"),
bigquery.SchemaField("pageloadtiming_domcontentloadedeventstart", "INT64"),
bigquery.SchemaField("pageloadtiming_firstcontentfulpaint", "INT64"),
bigquery.SchemaField("pageloadtiming_firstpaint", "INT64"),
bigquery.SchemaField("pageloadtiming_loadeventend", "INT64"),
bigquery.SchemaField("pageloadtiming_loadeventstart", "INT64"),
bigquery.SchemaField("pageloadtiming_requeststart", "INT64"),
bigquery.SchemaField("pageloadtiming_responseend", "INT64"),
bigquery.SchemaField("pageloadtiming_responsestart", "INT64"),
bigquery.SchemaField("pagerendertiming_speedindex", "INT64"),
bigquery.SchemaField("pagerendertiming_timetointeractive", "INT64"),
bigquery.SchemaField("pagerendertiming_visuallycomplete", "INT64"),
bigquery.SchemaField("performancetrack_frames", "INT64"),
bigquery.SchemaField("performancetrack_ticks", "INT64"),
bigquery.SchemaField("performancetrack_totaljsheapsize", "INT64"),
bigquery.SchemaField("performancetrack_usedjsheapsize", "INT64"),
bigquery.SchemaField("performancetrackaggr_avgcpu", "INT64"),
bigquery.SchemaField("performancetrackaggr_avgfps", "INT64"),
bigquery.SchemaField("performancetrackaggr_avgtotaljsheapsize", "INT64"),
bigquery.SchemaField("performancetrackaggr_avgusedjsheapsize", "INT64"),
bigquery.SchemaField("performancetrackaggr_maxcpu", "INT64"),
bigquery.SchemaField("performancetrackaggr_maxfps", "INT64"),
bigquery.SchemaField("performancetrackaggr_maxtotaljsheapsize", "INT64"),
bigquery.SchemaField("performancetrackaggr_maxusedjsheapsize", "INT64"),
bigquery.SchemaField("performancetrackaggr_mincpu", "INT64"),
bigquery.SchemaField("performancetrackaggr_minfps", "INT64"),
bigquery.SchemaField("performancetrackaggr_mintotaljsheapsize", "INT64"),
bigquery.SchemaField("performancetrackaggr_minusedjsheapsize", "INT64"),
bigquery.SchemaField("performancetrackaggr_timestampend", "INT64"),
bigquery.SchemaField("performancetrackaggr_timestampstart", "INT64"),
bigquery.SchemaField("profiler_args", "STRING"),
bigquery.SchemaField("profiler_duration", "INT64"),
bigquery.SchemaField("profiler_name", "STRING"),
bigquery.SchemaField("profiler_result", "STRING"),
bigquery.SchemaField("rawcustomevent_name", "STRING"),
bigquery.SchemaField("rawcustomevent_payload", "STRING"),
bigquery.SchemaField("rawerrorevent_message", "STRING"),
bigquery.SchemaField("rawerrorevent_name", "STRING"),
bigquery.SchemaField("rawerrorevent_payload", "STRING"),
bigquery.SchemaField("rawerrorevent_source", "STRING"),
bigquery.SchemaField("rawerrorevent_timestamp", "INT64"),
bigquery.SchemaField("redux_action", "STRING"),
bigquery.SchemaField("redux_duration", "INT64"),
bigquery.SchemaField("redux_state", "STRING"),
bigquery.SchemaField("removenode_id", "INT64"),
bigquery.SchemaField("removenodeattribute_id", "INT64"),
bigquery.SchemaField("removenodeattribute_name", "STRING"),
bigquery.SchemaField("resourceevent_decodedbodysize", "INT64"),
bigquery.SchemaField("resourceevent_duration", "INT64"),
bigquery.SchemaField("resourceevent_encodedbodysize", "INT64"),
bigquery.SchemaField("resourceevent_headersize", "INT64"),
bigquery.SchemaField("resourceevent_messageid", "INT64"),
bigquery.SchemaField("resourceevent_method", "STRING"),
bigquery.SchemaField("resourceevent_status", "INT64"),
bigquery.SchemaField("resourceevent_success", "BOOL"),
bigquery.SchemaField("resourceevent_timestamp", "INT64"),
bigquery.SchemaField("resourceevent_ttfb", "INT64"),
bigquery.SchemaField("resourceevent_type", "STRING"),
bigquery.SchemaField("resourceevent_url", "STRING"),
bigquery.SchemaField("resourcetiming_decodedbodysize", "INT64"),
bigquery.SchemaField("resourcetiming_duration", "INT64"),
bigquery.SchemaField("resourcetiming_encodedbodysize", "INT64"),
bigquery.SchemaField("resourcetiming_headersize", "INT64"),
bigquery.SchemaField("resourcetiming_initiator", "STRING"),
bigquery.SchemaField("resourcetiming_timestamp", "INT64"),
bigquery.SchemaField("resourcetiming_ttfb", "INT64"),
bigquery.SchemaField("resourcetiming_url", "STRING"),
bigquery.SchemaField("sessiondisconnect", "BOOL"),
bigquery.SchemaField("sessiondisconnect_timestamp", "INT64"),
bigquery.SchemaField("sessionend", "BOOL"),
bigquery.SchemaField("sessionend_timestamp", "INT64"),
bigquery.SchemaField("sessionstart_projectid", "INT64"),
bigquery.SchemaField("sessionstart_revid", "STRING"),
bigquery.SchemaField("sessionstart_timestamp", "INT64"),
bigquery.SchemaField("sessionstart_trackerversion", "STRING"),
bigquery.SchemaField("sessionstart_useragent", "STRING"),
bigquery.SchemaField("sessionstart_userbrowser", "STRING"),
bigquery.SchemaField("sessionstart_userbrowserversion", "STRING"),
bigquery.SchemaField("sessionstart_usercountry", "STRING"),
bigquery.SchemaField("sessionstart_userdevice", "STRING"),
bigquery.SchemaField("sessionstart_userdeviceheapsize", "INT64"),
bigquery.SchemaField("sessionstart_userdevicememorysize", "INT64"),
bigquery.SchemaField("sessionstart_userdevicetype", "STRING"),
bigquery.SchemaField("sessionstart_useros", "STRING"),
bigquery.SchemaField("sessionstart_userosversion", "STRING"),
bigquery.SchemaField("sessionstart_useruuid", "STRING"),
bigquery.SchemaField("setcssdata_data", "INT64"),
bigquery.SchemaField("setcssdata_id", "INT64"),
bigquery.SchemaField("setinputchecked_checked", "INT64"),
bigquery.SchemaField("setinputchecked_id", "INT64"),
bigquery.SchemaField("setinputtarget_id", "INT64"),
bigquery.SchemaField("setinputtarget_label", "INT64"),
bigquery.SchemaField("setinputvalue_id", "INT64"),
bigquery.SchemaField("setinputvalue_mask", "INT64"),
bigquery.SchemaField("setinputvalue_value", "INT64"),
bigquery.SchemaField("setnodeattribute_id", "INT64"),
bigquery.SchemaField("setnodeattribute_name", "INT64"),
bigquery.SchemaField("setnodeattribute_value", "INT64"),
bigquery.SchemaField("setnodedata_data", "INT64"),
bigquery.SchemaField("setnodedata_id", "INT64"),
bigquery.SchemaField("setnodescroll_id", "INT64"),
bigquery.SchemaField("setnodescroll_x", "INT64"),
bigquery.SchemaField("setnodescroll_y", "INT64"),
bigquery.SchemaField("setpagelocation_navigationstart", "INT64"),
bigquery.SchemaField("setpagelocation_referrer", "STRING"),
bigquery.SchemaField("setpagelocation_url", "STRING"),
bigquery.SchemaField("setpagevisibility_hidden", "BOOL"),
bigquery.SchemaField("setviewportscroll_x", "BOOL"),
bigquery.SchemaField("setviewportscroll_y", "BOOL"),
bigquery.SchemaField("setviewportsize_height", "INT64"),
bigquery.SchemaField("setviewportsize_width", "INT64"),
bigquery.SchemaField("stateaction_type", "STRING"),
bigquery.SchemaField("stateactionevent_messageid", "INT64"),
bigquery.SchemaField("stateactionevent_timestamp", "INT64"),
bigquery.SchemaField("stateactionevent_type", "STRING"),
bigquery.SchemaField("timestamp_timestamp", "INT64"),
bigquery.SchemaField("useranonymousid_id", "STRING"),
bigquery.SchemaField("userid_id", "STRING"),
bigquery.SchemaField("vuex_mutation", "STRING"),
bigquery.SchemaField("vuex_state", "STRING"),
bigquery.SchemaField("received_at", "INT64", mode="REQUIRED"),
bigquery.SchemaField("batch_order_number", "INT64", mode="REQUIRED")
]
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table) # Make an API request.
print(
"Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
)

129
ee/connectors/db/api.py Normal file
View file

@ -0,0 +1,129 @@
from sqlalchemy import create_engine
from sqlalchemy import MetaData
from sqlalchemy.orm import sessionmaker, session
from contextlib import contextmanager
import logging
import os
from pathlib import Path
DATABASE = os.environ['DATABASE_NAME']
if DATABASE == 'redshift':
import pandas_redshift as pr
base_path = Path(__file__).parent.parent
from db.models import Base
logger = logging.getLogger(__file__)
def get_class_by_tablename(tablename):
"""Return class reference mapped to table.
Raise an exception if class not found
:param tablename: String with name of table.
:return: Class reference.
"""
for c in Base._decl_class_registry.values():
if hasattr(c, '__tablename__') and c.__tablename__ == tablename:
return c
raise AttributeError(f'No model with tablename "{tablename}"')
class DBConnection:
"""
Initializes connection to a database
To update models file use:
sqlacodegen --outfile models_universal.py mysql+pymysql://{user}:{pwd}@{address}
"""
_sessions = sessionmaker()
def __init__(self, config) -> None:
self.metadata = MetaData()
self.config = config
if config == 'redshift':
self.pdredshift = pr
self.pdredshift.connect_to_redshift(dbname=os.environ['schema'],
host=os.environ['address'],
port=os.environ['port'],
user=os.environ['user'],
password=os.environ['password'])
self.pdredshift.connect_to_s3(aws_access_key_id=os.environ['aws_access_key_id'],
aws_secret_access_key=os.environ['aws_secret_access_key'],
bucket=os.environ['bucket'],
subdirectory=os.environ['subdirectory'])
self.connect_str = os.environ['connect_str'].format(
user=os.environ['user'],
password=os.environ['password'],
address=os.environ['address'],
port=os.environ['port'],
schema=os.environ['schema']
)
self.engine = create_engine(self.connect_str)
elif config == 'clickhouse':
self.connect_str = os.environ['connect_str'].format(
address=os.environ['address'],
database=os.environ['database']
)
self.engine = create_engine(self.connect_str)
elif config == 'pg':
self.connect_str = os.environ['connect_str'].format(
user=os.environ['user'],
password=os.environ['password'],
address=os.environ['address'],
port=os.environ['port'],
database=os.environ['database']
)
self.engine = create_engine(self.connect_str)
elif config == 'bigquery':
pass
elif config == 'snowflake':
self.connect_str = os.environ['connect_str'].format(
user=os.environ['user'],
password=os.environ['password'],
account=os.environ['account'],
database=os.environ['database'],
schema = os.environ['schema'],
warehouse = os.environ['warehouse']
)
self.engine = create_engine(self.connect_str)
else:
raise ValueError("This db configuration doesn't exist. Add into keys file.")
@contextmanager
def get_test_session(self, **kwargs) -> session:
"""
Test session context, even commits won't be persisted into db.
:Keyword Arguments:
* autoflush (``bool``) -- default: True
* autocommit (``bool``) -- default: False
* expire_on_commit (``bool``) -- default: True
"""
connection = self.engine.connect()
transaction = connection.begin()
my_session = type(self)._sessions(bind=connection, **kwargs)
yield my_session
# Do cleanup, rollback and closing, whatever happens
my_session.close()
transaction.rollback()
connection.close()
@contextmanager
def get_live_session(self) -> session:
"""
This is a session that can be committed.
Changes will be reflected in the database.
"""
# Automatic transaction and connection handling in session
connection = self.engine.connect()
my_session = type(self)._sessions(bind=connection)
yield my_session
my_session.close()
connection.close()

View file

View file

@ -0,0 +1,34 @@
import os
from pathlib import Path
from google.oauth2.service_account import Credentials
# obtain the JSON file:
# In the Cloud Console, go to the Create service account key page.
#
# Go to the Create Service Account Key page
# From the Service account list, select New service account.
# In the Service account name field, enter a name.
# From the Role list, select Project > Owner.
#
# Note: The Role field affects which resources your service account can access in your project. You can revoke these roles or grant additional roles later. In production environments, do not grant the Owner, Editor, or Viewer roles. For more information, see Granting, changing, and revoking access to resources.
# Click Create. A JSON file that contains your key downloads to your computer.
#
# Put it in utils under a name bigquery_service_account
base_path = Path(__file__).parent.parent.parent
creds_file = base_path / 'utils' / 'bigquery_service_account.json'
credentials = Credentials.from_service_account_file(
creds_file)
def insert_to_bigquery(df, table):
df.to_gbq(destination_table=f"{os.environ['dataset']}.{table}",
project_id=os.environ['project_id'],
if_exists='append',
credentials=credentials)
def transit_insert_to_bigquery(db, batch):
...

View file

@ -0,0 +1,4 @@
def insert_to_clickhouse(db, df, table: str):
df.to_sql(table, db.engine, if_exists='append', index=False)

View file

@ -0,0 +1,3 @@
def insert_to_postgres(db, df, table: str):
df.to_sql(table, db.engine, if_exists='append', index=False)

View file

@ -0,0 +1,19 @@
from db.models import DetailedEvent
from psycopg2.errors import InternalError_
def transit_insert_to_redshift(db, df, table):
try:
insert_df(db.pdredshift, df, table)
except InternalError_ as e:
print(repr(e))
print("loading failed. check stl_load_errors")
def insert_df(pr, df, table):
# Write the DataFrame to S3 and then to redshift
pr.pandas_to_redshift(data_frame=df,
redshift_table_name=table,
append=True,
delimiter='|')

View file

@ -0,0 +1,5 @@
def insert_to_snowflake(db, df, table):
df.to_sql(table, db.engine, if_exists='append', index=False)

389
ee/connectors/db/models.py Normal file
View file

@ -0,0 +1,389 @@
# coding: utf-8
import yaml
from sqlalchemy import BigInteger, Boolean, Column, Integer, ARRAY, VARCHAR, text, VARCHAR
from sqlalchemy.ext.declarative import declarative_base
from pathlib import Path
import os
DATABASE = os.environ['DATABASE_NAME']
Base = declarative_base()
metadata = Base.metadata
base_path = Path(__file__).parent.parent
# Load configuration file
conf = yaml.load(
open(f'{base_path}/utils/config.yml'), Loader=yaml.FullLoader)
try:
db_conf = conf[DATABASE]
except KeyError:
raise KeyError("Please provide a configuration in a YAML file with a key like\n"
"'snowflake', 'pg', 'bigquery', 'clickhouse' or 'redshift'.")
# Get a table name from a configuration file
try:
events_table_name = db_conf['events_table_name']
except KeyError as e:
events_table_name = None
print(repr(e))
try:
events_detailed_table_name = db_conf['events_detailed_table_name']
except KeyError as e:
print(repr(e))
events_detailed_table_name = None
try:
sessions_table_name = db_conf['sessions_table']
except KeyError as e:
print(repr(e))
raise KeyError("Please provide a table name under a key 'table' in a YAML configuration file")
class Session(Base):
__tablename__ = sessions_table_name
sessionid = Column(BigInteger, primary_key=True)
user_agent = Column(VARCHAR(5000))
user_browser = Column(VARCHAR(5000))
user_browser_version = Column(VARCHAR(5000))
user_country = Column(VARCHAR(5000))
user_device = Column(VARCHAR(5000))
user_device_heap_size = Column(BigInteger)
user_device_memory_size = Column(BigInteger)
user_device_type = Column(VARCHAR(5000))
user_os = Column(VARCHAR(5000))
user_os_version = Column(VARCHAR(5000))
user_uuid = Column(VARCHAR(5000))
connection_effective_bandwidth = Column(BigInteger) # Downlink
connection_type = Column(VARCHAR(5000)) # "bluetooth", "cellular", "ethernet", "none", "wifi", "wimax", "other", "unknown"
metadata_key = Column(VARCHAR(5000))
metadata_value = Column(VARCHAR(5000))
referrer = Column(VARCHAR(5000))
user_anonymous_id = Column(VARCHAR(5000))
user_id = Column(VARCHAR(5000))
# TIME
session_start_timestamp = Column(BigInteger)
session_end_timestamp = Column(BigInteger)
session_duration = Column(BigInteger)
# SPEED INDEX RELATED
first_contentful_paint = Column(BigInteger)
speed_index = Column(BigInteger)
visually_complete = Column(BigInteger)
timing_time_to_interactive = Column(BigInteger)
# PERFORMANCE
avg_cpu = Column(Integer)
avg_fps = Column(BigInteger)
max_cpu = Column(Integer)
max_fps = Column(BigInteger)
max_total_js_heap_size = Column(BigInteger)
max_used_js_heap_size = Column(BigInteger)
# ISSUES AND EVENTS
js_exceptions_count = Column(BigInteger)
long_tasks_total_duration = Column(BigInteger)
long_tasks_max_duration = Column(BigInteger)
long_tasks_count = Column(BigInteger)
inputs_count = Column(BigInteger)
clicks_count = Column(BigInteger)
issues_count = Column(BigInteger)
issues = ARRAY(VARCHAR(5000))
urls_count = Column(BigInteger)
urls = ARRAY(VARCHAR(5000))
class Event(Base):
__tablename__ = events_table_name
sessionid = Column(BigInteger, primary_key=True)
connectioninformation_downlink = Column(BigInteger)
connectioninformation_type = Column(VARCHAR(5000))
consolelog_level = Column(VARCHAR(5000))
consolelog_value = Column(VARCHAR(5000))
customevent_messageid = Column(BigInteger)
customevent_name = Column(VARCHAR(5000))
customevent_payload = Column(VARCHAR(5000))
customevent_timestamp = Column(BigInteger)
errorevent_message = Column(VARCHAR(5000))
errorevent_messageid = Column(BigInteger)
errorevent_name = Column(VARCHAR(5000))
errorevent_payload = Column(VARCHAR(5000))
errorevent_source = Column(VARCHAR(5000))
errorevent_timestamp = Column(BigInteger)
jsexception_message = Column(VARCHAR(5000))
jsexception_name = Column(VARCHAR(5000))
jsexception_payload = Column(VARCHAR(5000))
metadata_key = Column(VARCHAR(5000))
metadata_value = Column(VARCHAR(5000))
mouseclick_id = Column(BigInteger)
mouseclick_hesitationtime = Column(BigInteger)
mouseclick_label = Column(VARCHAR(5000))
pageevent_firstcontentfulpaint = Column(BigInteger)
pageevent_firstpaint = Column(BigInteger)
pageevent_messageid = Column(BigInteger)
pageevent_referrer = Column(VARCHAR(5000))
pageevent_speedindex = Column(BigInteger)
pageevent_timestamp = Column(BigInteger)
pageevent_url = Column(VARCHAR(5000))
pagerendertiming_timetointeractive = Column(BigInteger)
pagerendertiming_visuallycomplete = Column(BigInteger)
rawcustomevent_name = Column(VARCHAR(5000))
rawcustomevent_payload = Column(VARCHAR(5000))
setviewportsize_height = Column(BigInteger)
setviewportsize_width = Column(BigInteger)
timestamp_timestamp = Column(BigInteger)
user_anonymous_id = Column(VARCHAR(5000))
user_id = Column(VARCHAR(5000))
issueevent_messageid = Column(BigInteger)
issueevent_timestamp = Column(BigInteger)
issueevent_type = Column(VARCHAR(5000))
issueevent_contextstring = Column(VARCHAR(5000))
issueevent_context = Column(VARCHAR(5000))
issueevent_payload = Column(VARCHAR(5000))
customissue_name = Column(VARCHAR(5000))
customissue_payload = Column(VARCHAR(5000))
received_at = Column(BigInteger)
batch_order_number = Column(BigInteger)
class DetailedEvent(Base):
__tablename__ = events_detailed_table_name
# id = Column(Integer, primary_key=True, server_default=text("\"identity\"(119029, 0, '0,1'::text)"))
sessionid = Column(BigInteger, primary_key=True)
clickevent_hesitationtime = Column(BigInteger)
clickevent_label = Column(VARCHAR(5000))
clickevent_messageid = Column(BigInteger)
clickevent_timestamp = Column(BigInteger)
connectioninformation_downlink = Column(BigInteger)
connectioninformation_type = Column(VARCHAR(5000))
consolelog_level = Column(VARCHAR(5000))
consolelog_value = Column(VARCHAR(5000))
cpuissue_duration = Column(BigInteger)
cpuissue_rate = Column(BigInteger)
cpuissue_timestamp = Column(BigInteger)
createdocument = Column(Boolean)
createelementnode_id = Column(BigInteger)
createelementnode_parentid = Column(BigInteger)
cssdeleterule_index = Column(BigInteger)
cssdeleterule_stylesheetid = Column(BigInteger)
cssinsertrule_index = Column(BigInteger)
cssinsertrule_rule = Column(VARCHAR(5000))
cssinsertrule_stylesheetid = Column(BigInteger)
customevent_messageid = Column(BigInteger)
customevent_name = Column(VARCHAR(5000))
customevent_payload = Column(VARCHAR(5000))
customevent_timestamp = Column(BigInteger)
domdrop_timestamp = Column(BigInteger)
errorevent_message = Column(VARCHAR(5000))
errorevent_messageid = Column(BigInteger)
errorevent_name = Column(VARCHAR(5000))
errorevent_payload = Column(VARCHAR(5000))
errorevent_source = Column(VARCHAR(5000))
errorevent_timestamp = Column(BigInteger)
fetch_duration = Column(BigInteger)
fetch_method = Column(VARCHAR(5000))
fetch_request = Column(VARCHAR(5000))
fetch_response = Column(VARCHAR(5000))
fetch_status = Column(BigInteger)
fetch_timestamp = Column(BigInteger)
fetch_url = Column(VARCHAR(5000))
graphql_operationkind = Column(VARCHAR(5000))
graphql_operationname = Column(VARCHAR(5000))
graphql_response = Column(VARCHAR(5000))
graphql_variables = Column(VARCHAR(5000))
graphqlevent_messageid = Column(BigInteger)
graphqlevent_name = Column(VARCHAR(5000))
graphqlevent_timestamp = Column(BigInteger)
inputevent_label = Column(VARCHAR(5000))
inputevent_messageid = Column(BigInteger)
inputevent_timestamp = Column(BigInteger)
inputevent_value = Column(VARCHAR(5000))
inputevent_valuemasked = Column(Boolean)
jsexception_message = Column(VARCHAR(5000))
jsexception_name = Column(VARCHAR(5000))
jsexception_payload = Column(VARCHAR(5000))
memoryissue_duration = Column(BigInteger)
memoryissue_rate = Column(BigInteger)
memoryissue_timestamp = Column(BigInteger)
metadata_key = Column(VARCHAR(5000))
metadata_value = Column(VARCHAR(5000))
mobx_payload = Column(VARCHAR(5000))
mobx_type = Column(VARCHAR(5000))
mouseclick_id = Column(BigInteger)
mouseclick_hesitationtime = Column(BigInteger)
mouseclick_label = Column(VARCHAR(5000))
mousemove_x = Column(BigInteger)
mousemove_y = Column(BigInteger)
movenode_id = Column(BigInteger)
movenode_index = Column(BigInteger)
movenode_parentid = Column(BigInteger)
ngrx_action = Column(VARCHAR(5000))
ngrx_duration = Column(BigInteger)
ngrx_state = Column(VARCHAR(5000))
otable_key = Column(VARCHAR(5000))
otable_value = Column(VARCHAR(5000))
pageevent_domcontentloadedeventend = Column(BigInteger)
pageevent_domcontentloadedeventstart = Column(BigInteger)
pageevent_firstcontentfulpaint = Column(BigInteger)
pageevent_firstpaint = Column(BigInteger)
pageevent_loaded = Column(Boolean)
pageevent_loadeventend = Column(BigInteger)
pageevent_loadeventstart = Column(BigInteger)
pageevent_messageid = Column(BigInteger)
pageevent_referrer = Column(VARCHAR(5000))
pageevent_requeststart = Column(BigInteger)
pageevent_responseend = Column(BigInteger)
pageevent_responsestart = Column(BigInteger)
pageevent_speedindex = Column(BigInteger)
pageevent_timestamp = Column(BigInteger)
pageevent_url = Column(VARCHAR(5000))
pageloadtiming_domcontentloadedeventend = Column(BigInteger)
pageloadtiming_domcontentloadedeventstart = Column(BigInteger)
pageloadtiming_firstcontentfulpaint = Column(BigInteger)
pageloadtiming_firstpaint = Column(BigInteger)
pageloadtiming_loadeventend = Column(BigInteger)
pageloadtiming_loadeventstart = Column(BigInteger)
pageloadtiming_requeststart = Column(BigInteger)
pageloadtiming_responseend = Column(BigInteger)
pageloadtiming_responsestart = Column(BigInteger)
pagerendertiming_speedindex = Column(BigInteger)
pagerendertiming_timetointeractive = Column(BigInteger)
pagerendertiming_visuallycomplete = Column(BigInteger)
performancetrack_frames = Column(BigInteger)
performancetrack_ticks = Column(BigInteger)
performancetrack_totaljsheapsize = Column(BigInteger)
performancetrack_usedjsheapsize = Column(BigInteger)
performancetrackaggr_avgcpu = Column(BigInteger)
performancetrackaggr_avgfps = Column(BigInteger)
performancetrackaggr_avgtotaljsheapsize = Column(BigInteger)
performancetrackaggr_avgusedjsheapsize = Column(BigInteger)
performancetrackaggr_maxcpu = Column(BigInteger)
performancetrackaggr_maxfps = Column(BigInteger)
performancetrackaggr_maxtotaljsheapsize = Column(BigInteger)
performancetrackaggr_maxusedjsheapsize = Column(BigInteger)
performancetrackaggr_mincpu = Column(BigInteger)
performancetrackaggr_minfps = Column(BigInteger)
performancetrackaggr_mintotaljsheapsize = Column(BigInteger)
performancetrackaggr_minusedjsheapsize = Column(BigInteger)
performancetrackaggr_timestampend = Column(BigInteger)
performancetrackaggr_timestampstart = Column(BigInteger)
profiler_args = Column(VARCHAR(5000))
profiler_duration = Column(BigInteger)
profiler_name = Column(VARCHAR(5000))
profiler_result = Column(VARCHAR(5000))
rawcustomevent_name = Column(VARCHAR(5000))
rawcustomevent_payload = Column(VARCHAR(5000))
rawerrorevent_message = Column(VARCHAR(5000))
rawerrorevent_name = Column(VARCHAR(5000))
rawerrorevent_payload = Column(VARCHAR(5000))
rawerrorevent_source = Column(VARCHAR(5000))
rawerrorevent_timestamp = Column(BigInteger)
redux_action = Column(VARCHAR(5000))
redux_duration = Column(BigInteger)
redux_state = Column(VARCHAR(5000))
removenode_id = Column(BigInteger)
removenodeattribute_id = Column(BigInteger)
removenodeattribute_name = Column(VARCHAR(5000))
resourceevent_decodedbodysize = Column(BigInteger)
resourceevent_duration = Column(BigInteger)
resourceevent_encodedbodysize = Column(BigInteger)
resourceevent_headersize = Column(BigInteger)
resourceevent_messageid = Column(BigInteger)
resourceevent_method = Column(VARCHAR(5000))
resourceevent_status = Column(BigInteger)
resourceevent_success = Column(Boolean)
resourceevent_timestamp = Column(BigInteger)
resourceevent_ttfb = Column(BigInteger)
resourceevent_type = Column(VARCHAR(5000))
resourceevent_url = Column(VARCHAR(5000))
resourcetiming_decodedbodysize = Column(BigInteger)
resourcetiming_duration = Column(BigInteger)
resourcetiming_encodedbodysize = Column(BigInteger)
resourcetiming_headersize = Column(BigInteger)
resourcetiming_initiator = Column(VARCHAR(5000))
resourcetiming_timestamp = Column(BigInteger)
resourcetiming_ttfb = Column(BigInteger)
resourcetiming_url = Column(VARCHAR(5000))
sessiondisconnect = Column(Boolean)
sessiondisconnect_timestamp = Column(BigInteger)
sessionend = Column(Boolean)
sessionend_timestamp = Column(BigInteger)
sessionstart_projectid = Column(BigInteger)
sessionstart_revid = Column(VARCHAR(5000))
sessionstart_timestamp = Column(BigInteger)
sessionstart_trackerversion = Column(VARCHAR(5000))
sessionstart_useragent = Column(VARCHAR(5000))
sessionstart_userbrowser = Column(VARCHAR(5000))
sessionstart_userbrowserversion = Column(VARCHAR(5000))
sessionstart_usercountry = Column(VARCHAR(5000))
sessionstart_userdevice = Column(VARCHAR(5000))
sessionstart_userdeviceheapsize = Column(BigInteger)
sessionstart_userdevicememorysize = Column(BigInteger)
sessionstart_userdevicetype = Column(VARCHAR(5000))
sessionstart_useros = Column(VARCHAR(5000))
sessionstart_userosversion = Column(VARCHAR(5000))
sessionstart_useruuid = Column(VARCHAR(5000))
setcssdata_data = Column(BigInteger)
setcssdata_id = Column(BigInteger)
setinputchecked_checked = Column(BigInteger)
setinputchecked_id = Column(BigInteger)
setinputtarget_id = Column(BigInteger)
setinputtarget_label = Column(BigInteger)
setinputvalue_id = Column(BigInteger)
setinputvalue_mask = Column(BigInteger)
setinputvalue_value = Column(BigInteger)
setnodeattribute_id = Column(BigInteger)
setnodeattribute_name = Column(BigInteger)
setnodeattribute_value = Column(BigInteger)
setnodedata_data = Column(BigInteger)
setnodedata_id = Column(BigInteger)
setnodescroll_id = Column(BigInteger)
setnodescroll_x = Column(BigInteger)
setnodescroll_y = Column(BigInteger)
setpagelocation_navigationstart = Column(BigInteger)
setpagelocation_referrer = Column(VARCHAR(5000))
setpagelocation_url = Column(VARCHAR(5000))
setpagevisibility_hidden = Column(Boolean)
setviewportscroll_x = Column(BigInteger)
setviewportscroll_y = Column(BigInteger)
setviewportsize_height = Column(BigInteger)
setviewportsize_width = Column(BigInteger)
stateaction_type = Column(VARCHAR(5000))
stateactionevent_messageid = Column(BigInteger)
stateactionevent_timestamp = Column(BigInteger)
stateactionevent_type = Column(VARCHAR(5000))
timestamp_timestamp = Column(BigInteger)
useranonymousid_id = Column(VARCHAR(5000))
userid_id = Column(VARCHAR(5000))
vuex_mutation = Column(VARCHAR(5000))
vuex_state = Column(VARCHAR(5000))
longtask_timestamp = Column(BigInteger)
longtask_duration = Column(BigInteger)
longtask_context = Column(BigInteger)
longtask_containertype = Column(BigInteger)
longtask_containersrc = Column(VARCHAR(5000))
longtask_containerid = Column(VARCHAR(5000))
longtask_containername = Column(VARCHAR(5000))
setnodeurlbasedattribute_id = Column(BigInteger)
setnodeurlbasedattribute_name = Column(VARCHAR(5000))
setnodeurlbasedattribute_value = Column(VARCHAR(5000))
setnodeurlbasedattribute_baseurl = Column(VARCHAR(5000))
setstyledata_id = Column(BigInteger)
setstyledata_data = Column(VARCHAR(5000))
setstyledata_baseurl = Column(VARCHAR(5000))
issueevent_messageid = Column(BigInteger)
issueevent_timestamp = Column(BigInteger)
issueevent_type = Column(VARCHAR(5000))
issueevent_contextstring = Column(VARCHAR(5000))
issueevent_context = Column(VARCHAR(5000))
issueevent_payload = Column(VARCHAR(5000))
technicalinfo_type = Column(VARCHAR(5000))
technicalinfo_value = Column(VARCHAR(5000))
customissue_name = Column(VARCHAR(5000))
customissue_payload = Column(VARCHAR(5000))
pageclose = Column(Boolean)
received_at = Column(BigInteger)
batch_order_number = Column(BigInteger)

View file

@ -0,0 +1,61 @@
from pathlib import Path
base_path = Path(__file__).parent.parent
def create_tables_clickhouse(db):
with open(base_path / 'sql' / 'clickhouse_events.sql') as f:
q = f.read()
db.engine.execute(q)
print(f"`connector_user_events` table created succesfully.")
with open(base_path / 'sql' / 'clickhouse_events_buffer.sql') as f:
q = f.read()
db.engine.execute(q)
print(f"`connector_user_events_buffer` table created succesfully.")
with open(base_path / 'sql' / 'clickhouse_sessions.sql') as f:
q = f.read()
db.engine.execute(q)
print(f"`connector_sessions` table created succesfully.")
with open(base_path / 'sql' / 'clickhouse_sessions_buffer.sql') as f:
q = f.read()
db.engine.execute(q)
print(f"`connector_sessions_buffer` table created succesfully.")
def create_tables_postgres(db):
with open(base_path / 'sql' / 'postgres_events.sql') as f:
q = f.read()
db.engine.execute(q)
print(f"`connector_user_events` table created succesfully.")
with open(base_path / 'sql' / 'postgres_sessions.sql') as f:
q = f.read()
db.engine.execute(q)
print(f"`connector_sessions` table created succesfully.")
def create_tables_snowflake(db):
with open(base_path / 'sql' / 'snowflake_events.sql') as f:
q = f.read()
db.engine.execute(q)
print(f"`connector_user_events` table created succesfully.")
with open(base_path / 'sql' / 'snowflake_sessions.sql') as f:
q = f.read()
db.engine.execute(q)
print(f"`connector_sessions` table created succesfully.")
def create_tables_redshift(db):
with open(base_path / 'sql' / 'redshift_events.sql') as f:
q = f.read()
db.engine.execute(q)
print(f"`connector_user_events` table created succesfully.")
with open(base_path / 'sql' / 'redshift_sessions.sql') as f:
q = f.read()
db.engine.execute(q)
print(f"`connector_sessions` table created succesfully.")

368
ee/connectors/db/utils.py Normal file
View file

@ -0,0 +1,368 @@
import pandas as pd
from db.models import DetailedEvent, Event, Session, DATABASE
dtypes_events = {'sessionid': "Int64",
'connectioninformation_downlink': "Int64",
'connectioninformation_type': "string",
'consolelog_level': "string",
'consolelog_value': "string",
'customevent_messageid': "Int64",
'customevent_name': "string",
'customevent_payload': "string",
'customevent_timestamp': "Int64",
'errorevent_message': "string",
'errorevent_messageid': "Int64",
'errorevent_name': "string",
'errorevent_payload': "string",
'errorevent_source': "string",
'errorevent_timestamp': "Int64",
'jsexception_message': "string",
'jsexception_name': "string",
'jsexception_payload': "string",
'metadata_key': "string",
'metadata_value': "string",
'mouseclick_id': "Int64",
'mouseclick_hesitationtime': "Int64",
'mouseclick_label': "string",
'pageevent_firstcontentfulpaint': "Int64",
'pageevent_firstpaint': "Int64",
'pageevent_messageid': "Int64",
'pageevent_referrer': "string",
'pageevent_speedindex': "Int64",
'pageevent_timestamp': "Int64",
'pageevent_url': "string",
'pagerendertiming_timetointeractive': "Int64",
'pagerendertiming_visuallycomplete': "Int64",
'rawcustomevent_name': "string",
'rawcustomevent_payload': "string",
'setviewportsize_height': "Int64",
'setviewportsize_width': "Int64",
'timestamp_timestamp': "Int64",
'user_anonymous_id': "string",
'user_id': "string",
'issueevent_messageid': "Int64",
'issueevent_timestamp': "Int64",
'issueevent_type': "string",
'issueevent_contextstring': "string",
'issueevent_context': "string",
'issueevent_payload': "string",
'customissue_name': "string",
'customissue_payload': "string",
'received_at': "Int64",
'batch_order_number': "Int64"}
dtypes_detailed_events = {
"sessionid": "Int64",
"clickevent_hesitationtime": "Int64",
"clickevent_label": "object",
"clickevent_messageid": "Int64",
"clickevent_timestamp": "Int64",
"connectioninformation_downlink": "Int64",
"connectioninformation_type": "object",
"consolelog_level": "object",
"consolelog_value": "object",
"cpuissue_duration": "Int64",
"cpuissue_rate": "Int64",
"cpuissue_timestamp": "Int64",
"createdocument": "boolean",
"createelementnode_id": "Int64",
"createelementnode_parentid": "Int64",
"cssdeleterule_index": "Int64",
"cssdeleterule_stylesheetid": "Int64",
"cssinsertrule_index": "Int64",
"cssinsertrule_rule": "object",
"cssinsertrule_stylesheetid": "Int64",
"customevent_messageid": "Int64",
"customevent_name": "object",
"customevent_payload": "object",
"customevent_timestamp": "Int64",
"domdrop_timestamp": "Int64",
"errorevent_message": "object",
"errorevent_messageid": "Int64",
"errorevent_name": "object",
"errorevent_payload": "object",
"errorevent_source": "object",
"errorevent_timestamp": "Int64",
"fetch_duration": "Int64",
"fetch_method": "object",
"fetch_request": "object",
"fetch_response": "object",
"fetch_status": "Int64",
"fetch_timestamp": "Int64",
"fetch_url": "object",
"graphql_operationkind": "object",
"graphql_operationname": "object",
"graphql_response": "object",
"graphql_variables": "object",
"graphqlevent_messageid": "Int64",
"graphqlevent_name": "object",
"graphqlevent_timestamp": "Int64",
"inputevent_label": "object",
"inputevent_messageid": "Int64",
"inputevent_timestamp": "Int64",
"inputevent_value": "object",
"inputevent_valuemasked": "boolean",
"jsexception_message": "object",
"jsexception_name": "object",
"jsexception_payload": "object",
"longtasks_timestamp": "Int64",
"longtasks_duration": "Int64",
"longtasks_containerid": "object",
"longtasks_containersrc": "object",
"memoryissue_duration": "Int64",
"memoryissue_rate": "Int64",
"memoryissue_timestamp": "Int64",
"metadata_key": "object",
"metadata_value": "object",
"mobx_payload": "object",
"mobx_type": "object",
"mouseclick_id": "Int64",
"mouseclick_hesitationtime": "Int64",
"mouseclick_label": "object",
"mousemove_x": "Int64",
"mousemove_y": "Int64",
"movenode_id": "Int64",
"movenode_index": "Int64",
"movenode_parentid": "Int64",
"ngrx_action": "object",
"ngrx_duration": "Int64",
"ngrx_state": "object",
"otable_key": "object",
"otable_value": "object",
"pageevent_domcontentloadedeventend": "Int64",
"pageevent_domcontentloadedeventstart": "Int64",
"pageevent_firstcontentfulpaint": "Int64",
"pageevent_firstpaint": "Int64",
"pageevent_loaded": "boolean",
"pageevent_loadeventend": "Int64",
"pageevent_loadeventstart": "Int64",
"pageevent_messageid": "Int64",
"pageevent_referrer": "object",
"pageevent_requeststart": "Int64",
"pageevent_responseend": "Int64",
"pageevent_responsestart": "Int64",
"pageevent_speedindex": "Int64",
"pageevent_timestamp": "Int64",
"pageevent_url": "object",
"pageloadtiming_domcontentloadedeventend": "Int64",
"pageloadtiming_domcontentloadedeventstart": "Int64",
"pageloadtiming_firstcontentfulpaint": "Int64",
"pageloadtiming_firstpaint": "Int64",
"pageloadtiming_loadeventend": "Int64",
"pageloadtiming_loadeventstart": "Int64",
"pageloadtiming_requeststart": "Int64",
"pageloadtiming_responseend": "Int64",
"pageloadtiming_responsestart": "Int64",
"pagerendertiming_speedindex": "Int64",
"pagerendertiming_timetointeractive": "Int64",
"pagerendertiming_visuallycomplete": "Int64",
"performancetrack_frames": "Int64",
"performancetrack_ticks": "Int64",
"performancetrack_totaljsheapsize": "Int64",
"performancetrack_usedjsheapsize": "Int64",
"performancetrackaggr_avgcpu": "Int64",
"performancetrackaggr_avgfps": "Int64",
"performancetrackaggr_avgtotaljsheapsize": "Int64",
"performancetrackaggr_avgusedjsheapsize": "Int64",
"performancetrackaggr_maxcpu": "Int64",
"performancetrackaggr_maxfps": "Int64",
"performancetrackaggr_maxtotaljsheapsize": "Int64",
"performancetrackaggr_maxusedjsheapsize": "Int64",
"performancetrackaggr_mincpu": "Int64",
"performancetrackaggr_minfps": "Int64",
"performancetrackaggr_mintotaljsheapsize": "Int64",
"performancetrackaggr_minusedjsheapsize": "Int64",
"performancetrackaggr_timestampend": "Int64",
"performancetrackaggr_timestampstart": "Int64",
"profiler_args": "object",
"profiler_duration": "Int64",
"profiler_name": "object",
"profiler_result": "object",
"rawcustomevent_name": "object",
"rawcustomevent_payload": "object",
"rawerrorevent_message": "object",
"rawerrorevent_name": "object",
"rawerrorevent_payload": "object",
"rawerrorevent_source": "object",
"rawerrorevent_timestamp": "Int64",
"redux_action": "object",
"redux_duration": "Int64",
"redux_state": "object",
"removenode_id": "Int64",
"removenodeattribute_id": "Int64",
"removenodeattribute_name": "object",
"resourceevent_decodedbodysize": "Int64",
"resourceevent_duration": "Int64",
"resourceevent_encodedbodysize": "Int64",
"resourceevent_headersize": "Int64",
"resourceevent_messageid": "Int64",
"resourceevent_method": "object",
"resourceevent_status": "Int64",
"resourceevent_success": "boolean",
"resourceevent_timestamp": "Int64",
"resourceevent_ttfb": "Int64",
"resourceevent_type": "object",
"resourceevent_url": "object",
"resourcetiming_decodedbodysize": "Int64",
"resourcetiming_duration": "Int64",
"resourcetiming_encodedbodysize": "Int64",
"resourcetiming_headersize": "Int64",
"resourcetiming_initiator": "object",
"resourcetiming_timestamp": "Int64",
"resourcetiming_ttfb": "Int64",
"resourcetiming_url": "object",
"sessiondisconnect": "boolean",
"sessiondisconnect_timestamp": "Int64",
"sessionend": "boolean",
"sessionend_timestamp": "Int64",
"sessionstart_projectid": "Int64",
"sessionstart_revid": "object",
"sessionstart_timestamp": "Int64",
"sessionstart_trackerversion": "object",
"sessionstart_useragent": "object",
"sessionstart_userbrowser": "object",
"sessionstart_userbrowserversion": "object",
"sessionstart_usercountry": "object",
"sessionstart_userdevice": "object",
"sessionstart_userdeviceheapsize": "Int64",
"sessionstart_userdevicememorysize": "Int64",
"sessionstart_userdevicetype": "object",
"sessionstart_useros": "object",
"sessionstart_userosversion": "object",
"sessionstart_useruuid": "object",
"setcssdata_data": "Int64",
"setcssdata_id": "Int64",
"setinputchecked_checked": "Int64",
"setinputchecked_id": "Int64",
"setinputtarget_id": "Int64",
"setinputtarget_label": "Int64",
"setinputvalue_id": "Int64",
"setinputvalue_mask": "Int64",
"setinputvalue_value": "Int64",
"setnodeattribute_id": "Int64",
"setnodeattribute_name": "Int64",
"setnodeattribute_value": "Int64",
"setnodedata_data": "Int64",
"setnodedata_id": "Int64",
"setnodescroll_id": "Int64",
"setnodescroll_x": "Int64",
"setnodescroll_y": "Int64",
"setpagelocation_navigationstart": "Int64",
"setpagelocation_referrer": "object",
"setpagelocation_url": "object",
"setpagevisibility_hidden": "boolean",
"setviewportscroll_x": "Int64",
"setviewportscroll_y": "Int64",
"setviewportsize_height": "Int64",
"setviewportsize_width": "Int64",
"stateaction_type": "object",
"stateactionevent_messageid": "Int64",
"stateactionevent_timestamp": "Int64",
"stateactionevent_type": "object",
"timestamp_timestamp": "Int64",
"useranonymousid_id": "object",
"userid_id": "object",
"vuex_mutation": "object",
"vuex_state": "string",
"received_at": "Int64",
"batch_order_number": "Int64"
}
dtypes_sessions = {'sessionid': 'Int64',
'user_agent': 'string',
'user_browser': 'string',
'user_browser_version': 'string',
'user_country': 'string',
'user_device': 'string',
'user_device_heap_size': 'Int64',
'user_device_memory_size': 'Int64',
'user_device_type': 'string',
'user_os': 'string',
'user_os_version': 'string',
'user_uuid': 'string',
'connection_effective_bandwidth': 'Int64',
'connection_type': 'string',
'metadata_key': 'string',
'metadata_value': 'string',
'referrer': 'string',
'user_anonymous_id': 'string',
'user_id': 'string',
'session_start_timestamp': 'Int64',
'session_end_timestamp': 'Int64',
'session_duration': 'Int64',
'first_contentful_paint': 'Int64',
'speed_index': 'Int64',
'visually_complete': 'Int64',
'timing_time_to_interactive': 'Int64',
'avg_cpu': 'Int64',
'avg_fps': 'Int64',
'max_cpu': 'Int64',
'max_fps': 'Int64',
'max_total_js_heap_size': 'Int64',
'max_used_js_heap_size': 'Int64',
'js_exceptions_count': 'Int64',
'long_tasks_total_duration': 'Int64',
'long_tasks_max_duration': 'Int64',
'long_tasks_count': 'Int64',
'inputs_count': 'Int64',
'clicks_count': 'Int64',
'issues_count': 'Int64',
'issues': 'object',
'urls_count': 'Int64',
'urls': 'object'}
if DATABASE == 'bigquery':
dtypes_sessions['urls'] = 'string'
dtypes_sessions['issues'] = 'string'
detailed_events_col = []
for col in DetailedEvent.__dict__:
if not col.startswith('_'):
detailed_events_col.append(col)
events_col = []
for col in Event.__dict__:
if not col.startswith('_'):
events_col.append(col)
sessions_col = []
for col in Session.__dict__:
if not col.startswith('_'):
sessions_col.append(col)
def get_df_from_batch(batch, level):
if level == 'normal':
df = pd.DataFrame([b.__dict__ for b in batch], columns=events_col)
if level == 'detailed':
df = pd.DataFrame([b.__dict__ for b in batch], columns=detailed_events_col)
if level == 'sessions':
df = pd.DataFrame([b.__dict__ for b in batch], columns=sessions_col)
try:
df = df.drop('_sa_instance_state', axis=1)
except KeyError:
pass
if level == 'normal':
df = df.astype(dtypes_events)
if level == 'detailed':
df['inputevent_value'] = None
df['customevent_payload'] = None
df = df.astype(dtypes_detailed_events)
if level == 'sessions':
df = df.astype(dtypes_sessions)
if DATABASE == 'clickhouse' and level == 'sessions':
df['issues'] = df['issues'].fillna('')
df['urls'] = df['urls'].fillna('')
for x in df.columns:
try:
if df[x].dtype == 'string':
df[x] = df[x].str.slice(0, 255)
df[x] = df[x].str.replace("|", "")
except TypeError as e:
print(repr(e))
if df[x].dtype == 'str':
df[x] = df[x].str.slice(0, 255)
df[x] = df[x].str.replace("|", "")
return df

View file

@ -0,0 +1,63 @@
import os
DATABASE = os.environ['DATABASE_NAME']
from db.api import DBConnection
from db.utils import get_df_from_batch
from db.tables import *
if DATABASE == 'redshift':
from db.loaders.redshift_loader import transit_insert_to_redshift
if DATABASE == 'clickhouse':
from db.loaders.clickhouse_loader import insert_to_clickhouse
if DATABASE == 'pg':
from db.loaders.postgres_loader import insert_to_postgres
if DATABASE == 'bigquery':
from db.loaders.bigquery_loader import insert_to_bigquery
from bigquery_utils.create_table import create_tables_bigquery
if DATABASE == 'snowflake':
from db.loaders.snowflake_loader import insert_to_snowflake
# create tables if don't exist
try:
db = DBConnection(DATABASE)
if DATABASE == 'pg':
create_tables_postgres(db)
if DATABASE == 'clickhouse':
create_tables_clickhouse(db)
if DATABASE == 'snowflake':
create_tables_snowflake(db)
if DATABASE == 'bigquery':
create_tables_bigquery()
if DATABASE == 'redshift':
create_tables_redshift(db)
db.engine.dispose()
db = None
except Exception as e:
print(repr(e))
print("Please create the tables with scripts provided in "
"'/sql/{DATABASE}_sessions.sql' and '/sql/{DATABASE}_events.sql'")
def insert_batch(db: DBConnection, batch, table, level='normal'):
if len(batch) == 0:
return
df = get_df_from_batch(batch, level=level)
if db.config == 'redshift':
transit_insert_to_redshift(db=db, df=df, table=table)
return
if db.config == 'clickhouse':
insert_to_clickhouse(db=db, df=df, table=table)
if db.config == 'pg':
insert_to_postgres(db=db, df=df, table=table)
if db.config == 'bigquery':
insert_to_bigquery(df=df, table=table)
if db.config == 'snowflake':
insert_to_snowflake(db=db, df=df, table=table)

647
ee/connectors/handler.py Normal file
View file

@ -0,0 +1,647 @@
from typing import Optional, Union
from db.models import Event, DetailedEvent, Session
from msgcodec.messages import *
def handle_normal_message(message: Message) -> Optional[Event]:
n = Event()
if isinstance(message, ConnectionInformation):
n.connectioninformation_downlink = message.downlink
n.connectioninformation_type = message.type
return n
if isinstance(message, ConsoleLog):
n.consolelog_level = message.level
n.consolelog_value = message.value
return n
if isinstance(message, CustomEvent):
n.customevent_messageid = message.message_id
n.customevent_name = message.name
n.customevent_timestamp = message.timestamp
n.customevent_payload = message.payload
return n
if isinstance(message, ErrorEvent):
n.errorevent_message = message.message
n.errorevent_messageid = message.message_id
n.errorevent_name = message.name
n.errorevent_payload = message.payload
n.errorevent_source = message.source
n.errorevent_timestamp = message.timestamp
return n
if isinstance(message, JSException):
n.jsexception_name = message.name
n.jsexception_payload = message.payload
n.jsexception_message = message.message
return n
if isinstance(message, Metadata):
n.metadata_key = message.key
n.metadata_value = message.value
return n
if isinstance(message, MouseClick):
n.mouseclick_hesitationtime = message.hesitation_time
n.mouseclick_id = message.id
n.mouseclick_label = message.label
return n
if isinstance(message, PageEvent):
n.pageevent_firstcontentfulpaint = message.first_contentful_paint
n.pageevent_firstpaint = message.first_paint
n.pageevent_messageid = message.message_id
n.pageevent_referrer = message.referrer
n.pageevent_speedindex = message.speed_index
n.pageevent_timestamp = message.timestamp
n.pageevent_url = message.url
return n
if isinstance(message, PageRenderTiming):
n.pagerendertiming_timetointeractive = message.time_to_interactive
n.pagerendertiming_visuallycomplete = message.visually_complete
return n
if isinstance(message, RawCustomEvent):
n.rawcustomevent_name = message.name
n.rawcustomevent_payload = message.payload
return n
if isinstance(message, SetViewportSize):
n.setviewportsize_height = message.height
n.setviewportsize_width = message.width
return n
if isinstance(message, Timestamp):
n.timestamp_timestamp = message.timestamp
return n
if isinstance(message, UserAnonymousID):
n.user_anonymous_id = message.id
return n
if isinstance(message, UserID):
n.user_id = message.id
return n
if isinstance(message, IssueEvent):
n.issueevent_messageid = message.message_id
n.issueevent_timestamp = message.timestamp
n.issueevent_type = message.type
n.issueevent_contextstring = message.context_string
n.issueevent_context = message.context
n.issueevent_payload = message.payload
return n
if isinstance(message, CustomIssue):
n.customissue_name = message.name
n.customissue_payload = message.payload
return n
def handle_session(n: Session, message: Message) -> Optional[Session]:
if not n:
n = Session()
if isinstance(message, SessionStart):
n.session_start_timestamp = message.timestamp
n.user_uuid = message.user_uuid
n.user_agent = message.user_agent
n.user_os = message.user_os
n.user_os_version = message.user_os_version
n.user_browser = message.user_browser
n.user_browser_version = message.user_browser_version
n.user_device = message.user_device
n.user_device_type = message.user_device_type
n.user_device_memory_size = message.user_device_memory_size
n.user_device_heap_size = message.user_device_heap_size
n.user_country = message.user_country
return n
if isinstance(message, SessionEnd):
n.session_end_timestamp = message.timestamp
try:
n.session_duration = n.session_end_timestamp - n.session_start_timestamp
except TypeError:
pass
return n
if isinstance(message, ConnectionInformation):
n.connection_effective_bandwidth = message.downlink
n.connection_type = message.type
return n
if isinstance(message, Metadata):
n.metadata_key = message.key
n.metadata_value = message.value
return n
if isinstance(message, PageEvent):
n.referrer = message.referrer
n.first_contentful_paint = message.first_contentful_paint
n.speed_index = message.speed_index
n.timing_time_to_interactive = message.time_to_interactive
n.visually_complete = message.visually_complete
try:
n.urls_count += 1
except TypeError:
n.urls_count = 1
try:
n.urls.append(message.url)
except AttributeError:
n.urls = [message.url]
return n
if isinstance(message, PerformanceTrackAggr):
n.avg_cpu = message.avg_cpu
n.avg_fps = message.avg_fps
n.max_cpu = message.max_cpu
n.max_fps = message.max_fps
n.max_total_js_heap_size = message.max_total_js_heap_size
n.max_used_js_heap_size = message.max_used_js_heap_size
return n
if isinstance(message, UserID):
n.user_id = message.id
return n
if isinstance(message, UserAnonymousID):
n.user_anonymous_id = message.id
return n
if isinstance(message, JSException):
try:
n.js_exceptions_count += 1
except TypeError:
n.js_exceptions_count = 1
return n
if isinstance(message, LongTask):
try:
n.long_tasks_total_duration += message.duration
except TypeError:
n.long_tasks_total_duration = message.duration
try:
if n.long_tasks_max_duration > message.duration:
n.long_tasks_max_duration = message.duration
except TypeError:
n.long_tasks_max_duration = message.duration
try:
n.long_tasks_count += 1
except TypeError:
n.long_tasks_count = 1
return n
if isinstance(message, InputEvent):
try:
n.inputs_count += 1
except TypeError:
n.inputs_count = 1
return n
if isinstance(message, MouseClick):
try:
n.inputs_count += 1
except TypeError:
n.inputs_count = 1
return n
if isinstance(message, IssueEvent):
try:
n.issues_count += 1
except TypeError:
n.issues_count = 1
n.inputs_count = 1
return n
if isinstance(message, MouseClick):
try:
n.inputs_count += 1
except TypeError:
n.inputs_count = 1
return n
if isinstance(message, IssueEvent):
try:
n.issues_count += 1
except TypeError:
n.issues_count = 1
try:
n.issues.append(message.type)
except AttributeError:
n.issues = [message.type]
return n
def handle_message(message: Message) -> Optional[DetailedEvent]:
n = DetailedEvent()
if isinstance(message, SessionEnd):
n.sessionend = True
n.sessionend_timestamp = message.timestamp
return n
if isinstance(message, Timestamp):
n.timestamp_timestamp = message.timestamp
return n
if isinstance(message, SessionDisconnect):
n.sessiondisconnect = True
n.sessiondisconnect_timestamp = message.timestamp
return n
if isinstance(message, SessionStart):
n.sessionstart_trackerversion = message.tracker_version
n.sessionstart_revid = message.rev_id
n.sessionstart_timestamp = message.timestamp
n.sessionstart_useruuid = message.user_uuid
n.sessionstart_useragent = message.user_agent
n.sessionstart_useros = message.user_os
n.sessionstart_userosversion = message.user_os_version
n.sessionstart_userbrowser = message.user_browser
n.sessionstart_userbrowserversion = message.user_browser_version
n.sessionstart_userdevice = message.user_device
n.sessionstart_userdevicetype = message.user_device_type
n.sessionstart_userdevicememorysize = message.user_device_memory_size
n.sessionstart_userdeviceheapsize = message.user_device_heap_size
n.sessionstart_usercountry = message.user_country
return n
if isinstance(message, SetViewportSize):
n.setviewportsize_width = message.width
n.setviewportsize_height = message.height
return n
if isinstance(message, SetViewportScroll):
n.setviewportscroll_x = message.x
n.setviewportscroll_y = message.y
return n
if isinstance(message, SetNodeScroll):
n.setnodescroll_id = message.id
n.setnodescroll_x = message.x
n.setnodescroll_y = message.y
return n
if isinstance(message, ConsoleLog):
n.consolelog_level = message.level
n.consolelog_value = message.value
return n
if isinstance(message, PageLoadTiming):
n.pageloadtiming_requeststart = message.request_start
n.pageloadtiming_responsestart = message.response_start
n.pageloadtiming_responseend = message.response_end
n.pageloadtiming_domcontentloadedeventstart = message.dom_content_loaded_event_start
n.pageloadtiming_domcontentloadedeventend = message.dom_content_loaded_event_end
n.pageloadtiming_loadeventstart = message.load_event_start
n.pageloadtiming_loadeventend = message.load_event_end
n.pageloadtiming_firstpaint = message.first_paint
n.pageloadtiming_firstcontentfulpaint = message.first_contentful_paint
return n
if isinstance(message, PageRenderTiming):
n.pagerendertiming_speedindex = message.speed_index
n.pagerendertiming_visuallycomplete = message.visually_complete
n.pagerendertiming_timetointeractive = message.time_to_interactive
return n
if isinstance(message, ResourceTiming):
n.resourcetiming_timestamp = message.timestamp
n.resourcetiming_duration = message.duration
n.resourcetiming_ttfb = message.ttfb
n.resourcetiming_headersize = message.header_size
n.resourcetiming_encodedbodysize = message.encoded_body_size
n.resourcetiming_decodedbodysize = message.decoded_body_size
n.resourcetiming_url = message.url
n.resourcetiming_initiator = message.initiator
return n
if isinstance(message, JSException):
n.jsexception_name = message.name
n.jsexception_message = message.message
n.jsexception_payload = message.payload
return n
if isinstance(message, RawErrorEvent):
n.rawerrorevent_timestamp = message.timestamp
n.rawerrorevent_source = message.source
n.rawerrorevent_name = message.name
n.rawerrorevent_message = message.message
n.rawerrorevent_payload = message.payload
return n
if isinstance(message, RawCustomEvent):
n.rawcustomevent_name = message.name
n.rawcustomevent_payload = message.payload
return n
if isinstance(message, UserID):
n.userid_id = message.id
return n
if isinstance(message, UserAnonymousID):
n.useranonymousid_id = message.id
return n
if isinstance(message, Metadata):
n.metadata_key = message.key
n.metadata_value = message.value
return n
if isinstance(message, PerformanceTrack):
n.performancetrack_frames = message.frames
n.performancetrack_ticks = message.ticks
n.performancetrack_totaljsheapsize = message.total_js_heap_size
n.performancetrack_usedjsheapsize = message.used_js_heap_size
return n
if isinstance(message, PerformanceTrackAggr):
n.performancetrackaggr_timestampstart = message.timestamp_start
n.performancetrackaggr_timestampend = message.timestamp_end
n.performancetrackaggr_minfps = message.min_fps
n.performancetrackaggr_avgfps = message.avg_fps
n.performancetrackaggr_maxfps = message.max_fps
n.performancetrackaggr_mincpu = message.min_cpu
n.performancetrackaggr_avgcpu = message.avg_cpu
n.performancetrackaggr_maxcpu = message.max_cpu
n.performancetrackaggr_mintotaljsheapsize = message.min_total_js_heap_size
n.performancetrackaggr_avgtotaljsheapsize = message.avg_total_js_heap_size
n.performancetrackaggr_maxtotaljsheapsize = message.max_total_js_heap_size
n.performancetrackaggr_minusedjsheapsize = message.min_used_js_heap_size
n.performancetrackaggr_avgusedjsheapsize = message.avg_used_js_heap_size
n.performancetrackaggr_maxusedjsheapsize = message.max_used_js_heap_size
return n
if isinstance(message, ConnectionInformation):
n.connectioninformation_downlink = message.downlink
n.connectioninformation_type = message.type
return n
if isinstance(message, PageEvent):
n.pageevent_messageid = message.message_id
n.pageevent_timestamp = message.timestamp
n.pageevent_url = message.url
n.pageevent_referrer = message.referrer
n.pageevent_loaded = message.loaded
n.pageevent_requeststart = message.request_start
n.pageevent_responsestart = message.response_start
n.pageevent_responseend = message.response_end
n.pageevent_domcontentloadedeventstart = message.dom_content_loaded_event_start
n.pageevent_domcontentloadedeventend = message.dom_content_loaded_event_end
n.pageevent_loadeventstart = message.load_event_start
n.pageevent_loadeventend = message.load_event_end
n.pageevent_firstpaint = message.first_paint
n.pageevent_firstcontentfulpaint = message.first_contentful_paint
n.pageevent_speedindex = message.speed_index
return n
if isinstance(message, InputEvent):
n.inputevent_messageid = message.message_id
n.inputevent_timestamp = message.timestamp
n.inputevent_value = message.value
n.inputevent_valuemasked = message.value_masked
n.inputevent_label = message.label
return n
if isinstance(message, ClickEvent):
n.clickevent_messageid = message.message_id
n.clickevent_timestamp = message.timestamp
n.clickevent_hesitationtime = message.hesitation_time
n.clickevent_label = message.label
return n
if isinstance(message, ErrorEvent):
n.errorevent_messageid = message.message_id
n.errorevent_timestamp = message.timestamp
n.errorevent_source = message.source
n.errorevent_name = message.name
n.errorevent_message = message.message
n.errorevent_payload = message.payload
return n
if isinstance(message, ResourceEvent):
n.resourceevent_messageid = message.message_id
n.resourceevent_timestamp = message.timestamp
n.resourceevent_duration = message.duration
n.resourceevent_ttfb = message.ttfb
n.resourceevent_headersize = message.header_size
n.resourceevent_encodedbodysize = message.encoded_body_size
n.resourceevent_decodedbodysize = message.decoded_body_size
n.resourceevent_url = message.url
n.resourceevent_type = message.type
n.resourceevent_success = message.success
n.resourceevent_method = message.method
n.resourceevent_status = message.status
return n
if isinstance(message, CustomEvent):
n.customevent_messageid = message.message_id
n.customevent_timestamp = message.timestamp
n.customevent_name = message.name
n.customevent_payload = message.payload
return n
# if isinstance(message, CreateDocument):
# n.createdocument = True
# return n
#
# if isinstance(message, CreateElementNode):
# n.createelementnode_id = message.id
# if isinstance(message.parent_id, tuple):
# n.createelementnode_parentid = message.parent_id[0]
# else:
# n.createelementnode_parentid = message.parent_id
# return n
# if isinstance(message, CSSInsertRule):
# n.cssinsertrule_stylesheetid = message.id
# n.cssinsertrule_rule = message.rule
# n.cssinsertrule_index = message.index
# return n
#
# if isinstance(message, CSSDeleteRule):
# n.cssdeleterule_stylesheetid = message.id
# n.cssdeleterule_index = message.index
# return n
if isinstance(message, Fetch):
n.fetch_method = message.method
n.fetch_url = message.url
n.fetch_request = message.request
n.fetch_status = message.status
n.fetch_timestamp = message.timestamp
n.fetch_duration = message.duration
return n
if isinstance(message, Profiler):
n.profiler_name = message.name
n.profiler_duration = message.duration
n.profiler_args = message.args
n.profiler_result = message.result
return n
if isinstance(message, GraphQL):
n.graphql_operationkind = message.operation_kind
n.graphql_operationname = message.operation_name
n.graphql_variables = message.variables
n.graphql_response = message.response
return n
if isinstance(message, GraphQLEvent):
n.graphqlevent_messageid = message.message_id
n.graphqlevent_timestamp = message.timestamp
n.graphqlevent_name = message.name
return n
if isinstance(message, DomDrop):
n.domdrop_timestamp = message.timestamp
return n
if isinstance(message, MouseClick):
n.mouseclick_id = message.id
n.mouseclick_hesitationtime = message.hesitation_time
n.mouseclick_label = message.label
return n
if isinstance(message, SetPageLocation):
n.setpagelocation_url = message.url
n.setpagelocation_referrer = message.referrer
n.setpagelocation_navigationstart = message.navigation_start
return n
if isinstance(message, MouseMove):
n.mousemove_x = message.x
n.mousemove_y = message.y
return n
if isinstance(message, LongTask):
n.longtasks_timestamp = message.timestamp
n.longtasks_duration = message.duration
n.longtask_context = message.context
n.longtask_containertype = message.container_type
n.longtasks_containersrc = message.container_src
n.longtasks_containerid = message.container_id
n.longtasks_containername = message.container_name
return n
if isinstance(message, SetNodeURLBasedAttribute):
n.setnodeurlbasedattribute_id = message.id
n.setnodeurlbasedattribute_name = message.name
n.setnodeurlbasedattribute_value = message.value
n.setnodeurlbasedattribute_baseurl = message.base_url
return n
if isinstance(message, SetStyleData):
n.setstyledata_id = message.id
n.setstyledata_data = message.data
n.setstyledata_baseurl = message.base_url
return n
if isinstance(message, IssueEvent):
n.issueevent_messageid = message.message_id
n.issueevent_timestamp = message.timestamp
n.issueevent_type = message.type
n.issueevent_contextstring = message.context_string
n.issueevent_context = message.context
n.issueevent_payload = message.payload
return n
if isinstance(message, TechnicalInfo):
n.technicalinfo_type = message.type
n.technicalinfo_value = message.value
return n
if isinstance(message, CustomIssue):
n.customissue_name = message.name
n.customissue_payload = message.payload
return n
if isinstance(message, PageClose):
n.pageclose = True
return n
if isinstance(message, IOSSessionStart):
n.iossessionstart_timestamp = message.timestamp
n.iossessionstart_projectid = message.project_id
n.iossessionstart_trackerversion = message.tracker_version
n.iossessionstart_revid = message.rev_id
n.iossessionstart_useruuid = message.user_uuid
n.iossessionstart_useros = message.user_os
n.iossessionstart_userosversion = message.user_os_version
n.iossessionstart_userdevice = message.user_device
n.iossessionstart_userdevicetype = message.user_device_type
n.iossessionstart_usercountry = message.user_country
return n
if isinstance(message, IOSSessionEnd):
n.iossessionend_timestamp = message.timestamp
return n
if isinstance(message, IOSMetadata):
n.iosmetadata_timestamp = message.timestamp
n.iosmetadata_length = message.length
n.iosmetadata_key = message.key
n.iosmetadata_value = message.value
return n
if isinstance(message, IOSUserID):
n.iosuserid_timestamp = message.timestamp
n.iosuserid_length = message.length
n.iosuserid_value = message.value
return n
if isinstance(message, IOSUserAnonymousID):
n.iosuseranonymousid_timestamp = message.timestamp
n.iosuseranonymousid_length = message.length
n.iosuseranonymousid_value = message.value
return n
if isinstance(message, IOSScreenLeave):
n.iosscreenleave_timestamp = message.timestamp
n.iosscreenleave_length = message.length
n.iosscreenleave_title = message.title
n.iosscreenleave_viewname = message.view_name
return n
if isinstance(message, IOSLog):
n.ioslog_timestamp = message.timestamp
n.ioslog_length = message.length
n.ioslog_severity = message.severity
n.ioslog_content = message.content
return n
if isinstance(message, IOSInternalError):
n.iosinternalerror_timestamp = message.timestamp
n.iosinternalerror_length = message.length
n.iosinternalerror_content = message.content
return n
if isinstance(message, IOSPerformanceAggregated):
n.iosperformanceaggregated_timestampstart = message.timestamp_start
n.iosperformanceaggregated_timestampend = message.timestamp_end
n.iosperformanceaggregated_minfps = message.min_fps
n.iosperformanceaggregated_avgfps = message.avg_fps
n.iosperformanceaggregated_maxfps = message.max_fps
n.iosperformanceaggregated_mincpu = message.min_cpu
n.iosperformanceaggregated_avgcpu = message.avg_cpu
n.iosperformanceaggregated_maxcpu = message.max_cpu
n.iosperformanceaggregated_minmemory = message.min_memory
n.iosperformanceaggregated_avgmemory = message.avg_memory
n.iosperformanceaggregated_maxmemory = message.max_memory
n.iosperformanceaggregated_minbattery = message.min_battery
n.iosperformanceaggregated_avgbattery = message.avg_battery
n.iosperformanceaggregated_maxbattery = message.max_battery
return n
return None

121
ee/connectors/main.py Normal file
View file

@ -0,0 +1,121 @@
import os
from kafka import KafkaConsumer
from datetime import datetime
from collections import defaultdict
from msgcodec.codec import MessageCodec
from msgcodec.messages import SessionEnd
from db.api import DBConnection
from db.models import events_detailed_table_name, events_table_name, sessions_table_name, conf
from db.writer import insert_batch
from handler import handle_message, handle_normal_message, handle_session
DATABASE = os.environ['DATABASE_NAME']
LEVEL = conf[DATABASE]['level']
db = DBConnection(DATABASE)
if LEVEL == 'detailed':
table_name = events_detailed_table_name
elif LEVEL == 'normal':
table_name = events_table_name
def main():
batch_size = 4000
sessions_batch_size = 400
batch = []
sessions = defaultdict(lambda: None)
sessions_batch = []
codec = MessageCodec()
consumer = KafkaConsumer(security_protocol="SSL",
bootstrap_servers=[os.environ['KAFKA_SERVER_1'],
os.environ['KAFKA_SERVER_2']],
group_id=f"connector_{DATABASE}",
auto_offset_reset="earliest",
enable_auto_commit=False)
consumer.subscribe(topics=["events", "messages"])
print("Kafka consumer subscribed")
for msg in consumer:
message = codec.decode(msg.value)
if message is None:
print('-')
continue
if LEVEL == 'detailed':
n = handle_message(message)
elif LEVEL == 'normal':
n = handle_normal_message(message)
session_id = codec.decode_key(msg.key)
sessions[session_id] = handle_session(sessions[session_id], message)
if sessions[session_id]:
sessions[session_id].sessionid = session_id
# put in a batch for insertion if received a SessionEnd
if isinstance(message, SessionEnd):
if sessions[session_id]:
sessions_batch.append(sessions[session_id])
# try to insert sessions
if len(sessions_batch) >= sessions_batch_size:
attempt_session_insert(sessions_batch)
for s in sessions_batch:
try:
del sessions[s.sessionid]
except KeyError as e:
print(repr(e))
sessions_batch = []
if n:
n.sessionid = session_id
n.received_at = int(datetime.now().timestamp() * 1000)
n.batch_order_number = len(batch)
batch.append(n)
else:
continue
# insert a batch of events
if len(batch) >= batch_size:
attempt_batch_insert(batch)
batch = []
consumer.commit()
print("sessions in cache:", len(sessions))
def attempt_session_insert(sess_batch):
if sess_batch:
try:
print("inserting sessions...")
insert_batch(db, sess_batch, table=sessions_table_name, level='sessions')
print("inserted sessions succesfully")
except TypeError as e:
print("Type conversion error")
print(repr(e))
except ValueError as e:
print("Message value could not be processed or inserted correctly")
print(repr(e))
except Exception as e:
print(repr(e))
def attempt_batch_insert(batch):
# insert a batch
try:
print("inserting...")
insert_batch(db=db, batch=batch, table=table_name, level=LEVEL)
print("inserted succesfully")
except TypeError as e:
print("Type conversion error")
print(repr(e))
except ValueError as e:
print("Message value could not be processed or inserted correctly")
print(repr(e))
except Exception as e:
print(repr(e))
if __name__ == '__main__':
main()

View file

@ -0,0 +1,670 @@
import io
from msgcodec.messages import *
class Codec:
"""
Implements encode/decode primitives
"""
@staticmethod
def read_boolean(reader: io.BytesIO):
b = reader.read(1)
return b == 1
@staticmethod
def read_uint(reader: io.BytesIO):
"""
The ending "big" doesn't play any role here,
since we're dealing with data per one byte
"""
x = 0 # the result
s = 0 # the shift (our result is big-ending)
i = 0 # n of byte (max 9 for uint64)
while True:
b = reader.read(1)
num = int.from_bytes(b, "big", signed=False)
# print(i, x)
if num < 0x80:
if i > 9 | i == 9 & num > 1:
raise OverflowError()
return int(x | num << s)
x |= (num & 0x7f) << s
s += 7
i += 1
@staticmethod
def read_int(reader: io.BytesIO) -> int:
"""
ux, err := ReadUint(reader)
x := int64(ux >> 1)
if err != nil {
return x, err
}
if ux&1 != 0 {
x = ^x
}
return x, err
"""
ux = Codec.read_uint(reader)
x = int(ux >> 1)
if ux & 1 != 0:
x = - x - 1
return x
@staticmethod
def read_string(reader: io.BytesIO) -> str:
length = Codec.read_uint(reader)
s = reader.read(length)
try:
return s.decode("utf-8", errors="replace").replace("\x00", "\uFFFD")
except UnicodeDecodeError:
return None
class MessageCodec(Codec):
def encode(self, m: Message) -> bytes:
...
def decode(self, b: bytes) -> Message:
reader = io.BytesIO(b)
message_id = self.read_message_id(reader)
if message_id == 0:
return Timestamp(
timestamp=self.read_uint(reader)
)
if message_id == 1:
return SessionStart(
timestamp=self.read_uint(reader),
project_id=self.read_uint(reader),
tracker_version=self.read_string(reader),
rev_id=self.read_string(reader),
user_uuid=self.read_string(reader),
user_agent=self.read_string(reader),
user_os=self.read_string(reader),
user_os_version=self.read_string(reader),
user_browser=self.read_string(reader),
user_browser_version=self.read_string(reader),
user_device=self.read_string(reader),
user_device_type=self.read_string(reader),
user_device_memory_size=self.read_uint(reader),
user_device_heap_size=self.read_uint(reader),
user_country=self.read_string(reader)
)
if message_id == 2:
return SessionDisconnect(
timestamp=self.read_uint(reader)
)
if message_id == 3:
return SessionEnd(
timestamp=self.read_uint(reader)
)
if message_id == 4:
return SetPageLocation(
url=self.read_string(reader),
referrer=self.read_string(reader),
navigation_start=self.read_uint(reader)
)
if message_id == 5:
return SetViewportSize(
width=self.read_uint(reader),
height=self.read_uint(reader)
)
if message_id == 6:
return SetViewportScroll(
x=self.read_int(reader),
y=self.read_int(reader)
)
if message_id == 7:
return CreateDocument()
if message_id == 8:
return CreateElementNode(
id=self.read_uint(reader),
parent_id=self.read_uint(reader),
index=self.read_uint(reader),
tag=self.read_string(reader),
svg=self.read_boolean(reader),
)
if message_id == 9:
return CreateTextNode(
id=self.read_uint(reader),
parent_id=self.read_uint(reader),
index=self.read_uint(reader)
)
if message_id == 10:
return MoveNode(
id=self.read_uint(reader),
parent_id=self.read_uint(reader),
index=self.read_uint(reader)
)
if message_id == 11:
return RemoveNode(
id=self.read_uint(reader)
)
if message_id == 12:
return SetNodeAttribute(
id=self.read_uint(reader),
name=self.read_string(reader),
value=self.read_string(reader)
)
if message_id == 13:
return RemoveNodeAttribute(
id=self.read_uint(reader),
name=self.read_string(reader)
)
if message_id == 14:
return SetNodeData(
id=self.read_uint(reader),
data=self.read_string(reader)
)
if message_id == 15:
return SetCSSData(
id=self.read_uint(reader),
data=self.read_string(reader)
)
if message_id == 16:
return SetNodeScroll(
id=self.read_uint(reader),
x=self.read_int(reader),
y=self.read_int(reader),
)
if message_id == 17:
return SetInputTarget(
id=self.read_uint(reader),
label=self.read_string(reader)
)
if message_id == 18:
return SetInputValue(
id=self.read_uint(reader),
value=self.read_string(reader),
mask=self.read_int(reader),
)
if message_id == 19:
return SetInputChecked(
id=self.read_uint(reader),
checked=self.read_boolean(reader)
)
if message_id == 20:
return MouseMove(
x=self.read_uint(reader),
y=self.read_uint(reader)
)
if message_id == 21:
return MouseClick(
id=self.read_uint(reader),
hesitation_time=self.read_uint(reader),
label=self.read_string(reader)
)
if message_id == 22:
return ConsoleLog(
level=self.read_string(reader),
value=self.read_string(reader)
)
if message_id == 23:
return PageLoadTiming(
request_start=self.read_uint(reader),
response_start=self.read_uint(reader),
response_end=self.read_uint(reader),
dom_content_loaded_event_start=self.read_uint(reader),
dom_content_loaded_event_end=self.read_uint(reader),
load_event_start=self.read_uint(reader),
load_event_end=self.read_uint(reader),
first_paint=self.read_uint(reader),
first_contentful_paint=self.read_uint(reader)
)
if message_id == 24:
return PageRenderTiming(
speed_index=self.read_uint(reader),
visually_complete=self.read_uint(reader),
time_to_interactive=self.read_uint(reader),
)
if message_id == 25:
return JSException(
name=self.read_string(reader),
message=self.read_string(reader),
payload=self.read_string(reader)
)
if message_id == 26:
return RawErrorEvent(
timestamp=self.read_uint(reader),
source=self.read_string(reader),
name=self.read_string(reader),
message=self.read_string(reader),
payload=self.read_string(reader)
)
if message_id == 27:
return RawCustomEvent(
name=self.read_string(reader),
payload=self.read_string(reader)
)
if message_id == 28:
return UserID(
id=self.read_string(reader)
)
if message_id == 29:
return UserAnonymousID(
id=self.read_string(reader)
)
if message_id == 30:
return Metadata(
key=self.read_string(reader),
value=self.read_string(reader)
)
if message_id == 31:
return PageEvent(
message_id=self.read_uint(reader),
timestamp=self.read_uint(reader),
url=self.read_string(reader),
referrer=self.read_string(reader),
loaded=self.read_boolean(reader),
request_start=self.read_uint(reader),
response_start=self.read_uint(reader),
response_end=self.read_uint(reader),
dom_content_loaded_event_start=self.read_uint(reader),
dom_content_loaded_event_end=self.read_uint(reader),
load_event_start=self.read_uint(reader),
load_event_end=self.read_uint(reader),
first_paint=self.read_uint(reader),
first_contentful_paint=self.read_uint(reader),
speed_index=self.read_uint(reader),
visually_complete=self.read_uint(reader),
time_to_interactive=self.read_uint(reader)
)
if message_id == 32:
return InputEvent(
message_id=self.read_uint(reader),
timestamp=self.read_uint(reader),
value=self.read_string(reader),
value_masked=self.read_boolean(reader),
label=self.read_string(reader),
)
if message_id == 33:
return ClickEvent(
message_id=self.read_uint(reader),
timestamp=self.read_uint(reader),
hesitation_time=self.read_uint(reader),
label=self.read_string(reader)
)
if message_id == 34:
return ErrorEvent(
message_id=self.read_uint(reader),
timestamp=self.read_uint(reader),
source=self.read_string(reader),
name=self.read_string(reader),
message=self.read_string(reader),
payload=self.read_string(reader)
)
if message_id == 35:
message_id = self.read_uint(reader)
ts = self.read_uint(reader)
if ts > 9999999999999:
ts = None
return ResourceEvent(
message_id=message_id,
timestamp=ts,
duration=self.read_uint(reader),
ttfb=self.read_uint(reader),
header_size=self.read_uint(reader),
encoded_body_size=self.read_uint(reader),
decoded_body_size=self.read_uint(reader),
url=self.read_string(reader),
type=self.read_string(reader),
success=self.read_boolean(reader),
method=self.read_string(reader),
status=self.read_uint(reader)
)
if message_id == 36:
return CustomEvent(
message_id=self.read_uint(reader),
timestamp=self.read_uint(reader),
name=self.read_string(reader),
payload=self.read_string(reader)
)
if message_id == 37:
return CSSInsertRule(
id=self.read_uint(reader),
rule=self.read_string(reader),
index=self.read_uint(reader)
)
if message_id == 38:
return CSSDeleteRule(
id=self.read_uint(reader),
index=self.read_uint(reader)
)
if message_id == 39:
return Fetch(
method=self.read_string(reader),
url=self.read_string(reader),
request=self.read_string(reader),
response=self.read_string(reader),
status=self.read_uint(reader),
timestamp=self.read_uint(reader),
duration=self.read_uint(reader)
)
if message_id == 40:
return Profiler(
name=self.read_string(reader),
duration=self.read_uint(reader),
args=self.read_string(reader),
result=self.read_string(reader)
)
if message_id == 41:
return OTable(
key=self.read_string(reader),
value=self.read_string(reader)
)
if message_id == 42:
return StateAction(
type=self.read_string(reader)
)
if message_id == 43:
return StateActionEvent(
message_id=self.read_uint(reader),
timestamp=self.read_uint(reader),
type=self.read_string(reader)
)
if message_id == 44:
return Redux(
action=self.read_string(reader),
state=self.read_string(reader),
duration=self.read_uint(reader)
)
if message_id == 45:
return Vuex(
mutation=self.read_string(reader),
state=self.read_string(reader),
)
if message_id == 46:
return MobX(
type=self.read_string(reader),
payload=self.read_string(reader),
)
if message_id == 47:
return NgRx(
action=self.read_string(reader),
state=self.read_string(reader),
duration=self.read_uint(reader)
)
if message_id == 48:
return GraphQL(
operation_kind=self.read_string(reader),
operation_name=self.read_string(reader),
variables=self.read_string(reader),
response=self.read_string(reader)
)
if message_id == 49:
return PerformanceTrack(
frames=self.read_int(reader),
ticks=self.read_int(reader),
total_js_heap_size=self.read_uint(reader),
used_js_heap_size=self.read_uint(reader)
)
if message_id == 50:
return GraphQLEvent(
message_id=self.read_uint(reader),
timestamp=self.read_uint(reader),
name=self.read_string(reader)
)
if message_id == 52:
return DomDrop(
timestamp=self.read_uint(reader)
)
if message_id == 53:
return ResourceTiming(
timestamp=self.read_uint(reader),
duration=self.read_uint(reader),
ttfb=self.read_uint(reader),
header_size=self.read_uint(reader),
encoded_body_size=self.read_uint(reader),
decoded_body_size=self.read_uint(reader),
url=self.read_string(reader),
initiator=self.read_string(reader)
)
if message_id == 54:
return ConnectionInformation(
downlink=self.read_uint(reader),
type=self.read_string(reader)
)
if message_id == 55:
return SetPageVisibility(
hidden=self.read_boolean(reader)
)
if message_id == 56:
return PerformanceTrackAggr(
timestamp_start=self.read_uint(reader),
timestamp_end=self.read_uint(reader),
min_fps=self.read_uint(reader),
avg_fps=self.read_uint(reader),
max_fps=self.read_uint(reader),
min_cpu=self.read_uint(reader),
avg_cpu=self.read_uint(reader),
max_cpu=self.read_uint(reader),
min_total_js_heap_size=self.read_uint(reader),
avg_total_js_heap_size=self.read_uint(reader),
max_total_js_heap_size=self.read_uint(reader),
min_used_js_heap_size=self.read_uint(reader),
avg_used_js_heap_size=self.read_uint(reader),
max_used_js_heap_size=self.read_uint(reader)
)
if message_id == 59:
return LongTask(
timestamp=self.read_uint(reader),
duration=self.read_uint(reader),
context=self.read_uint(reader),
container_type=self.read_uint(reader),
container_src=self.read_string(reader),
container_id=self.read_string(reader),
container_name=self.read_string(reader)
)
if message_id == 60:
return SetNodeURLBasedAttribute(
id=self.read_uint(reader),
name=self.read_string(reader),
value=self.read_string(reader),
base_url=self.read_string(reader)
)
if message_id == 61:
return SetStyleData(
id=self.read_uint(reader),
data=self.read_string(reader),
base_url=self.read_string(reader)
)
if message_id == 62:
return IssueEvent(
message_id=self.read_uint(reader),
timestamp=self.read_uint(reader),
type=self.read_string(reader),
context_string=self.read_string(reader),
context=self.read_string(reader),
payload=self.read_string(reader)
)
if message_id == 63:
return TechnicalInfo(
type=self.read_string(reader),
value=self.read_string(reader)
)
if message_id == 64:
return CustomIssue(
name=self.read_string(reader),
payload=self.read_string(reader)
)
if message_id == 65:
return PageClose()
if message_id == 90:
return IOSSessionStart(
timestamp=self.read_uint(reader),
project_id=self.read_uint(reader),
tracker_version=self.read_string(reader),
rev_id=self.read_string(reader),
user_uuid=self.read_string(reader),
user_os=self.read_string(reader),
user_os_version=self.read_string(reader),
user_device=self.read_string(reader),
user_device_type=self.read_string(reader),
user_country=self.read_string(reader)
)
if message_id == 91:
return IOSSessionEnd(
timestamp=self.read_uint(reader)
)
if message_id == 92:
return IOSMetadata(
timestamp=self.read_uint(reader),
length=self.read_uint(reader),
key=self.read_string(reader),
value=self.read_string(reader)
)
if message_id == 94:
return IOSUserID(
timestamp=self.read_uint(reader),
length=self.read_uint(reader),
value=self.read_string(reader)
)
if message_id == 95:
return IOSUserAnonymousID(
timestamp=self.read_uint(reader),
length=self.read_uint(reader),
value=self.read_string(reader)
)
if message_id == 99:
return IOSScreenLeave(
timestamp=self.read_uint(reader),
length=self.read_uint(reader),
title=self.read_string(reader),
view_name=self.read_string(reader)
)
if message_id == 103:
return IOSLog(
timestamp=self.read_uint(reader),
length=self.read_uint(reader),
severity=self.read_string(reader),
content=self.read_string(reader)
)
if message_id == 104:
return IOSInternalError(
timestamp=self.read_uint(reader),
length=self.read_uint(reader),
content=self.read_string(reader)
)
if message_id == 110:
return IOSPerformanceAggregated(
timestamp_start=self.read_uint(reader),
timestamp_end=self.read_uint(reader),
min_fps=self.read_uint(reader),
avg_fps=self.read_uint(reader),
max_fps=self.read_uint(reader),
min_cpu=self.read_uint(reader),
avg_cpu=self.read_uint(reader),
max_cpu=self.read_uint(reader),
min_memory=self.read_uint(reader),
avg_memory=self.read_uint(reader),
max_memory=self.read_uint(reader),
min_battery=self.read_uint(reader),
avg_battery=self.read_uint(reader),
max_battery=self.read_uint(reader)
)
def read_message_id(self, reader: io.BytesIO) -> int:
"""
Read and return the first byte where the message id is encoded
"""
id_ = self.read_uint(reader)
return id_
@staticmethod
def check_message_id(b: bytes) -> int:
"""
todo: make it static and without reader. It's just the first byte
Read and return the first byte where the message id is encoded
"""
reader = io.BytesIO(b)
id_ = Codec.read_uint(reader)
return id_
@staticmethod
def decode_key(b) -> int:
"""
Decode the message key (encoded with little endian)
"""
try:
decoded = int.from_bytes(b, "little", signed=False)
except Exception as e:
raise UnicodeDecodeError(f"Error while decoding message key (SessionID) from {b}\n{e}")
return decoded

View file

@ -0,0 +1,752 @@
"""
Representations of Kafka messages
"""
from abc import ABC
class Message(ABC):
pass
class Timestamp(Message):
__id__ = 0
def __init__(self, timestamp):
self.timestamp = timestamp
class SessionStart(Message):
__id__ = 1
def __init__(self, timestamp, project_id, tracker_version, rev_id, user_uuid,
user_agent, user_os, user_os_version, user_browser, user_browser_version,
user_device, user_device_type, user_device_memory_size, user_device_heap_size,
user_country):
self.timestamp = timestamp
self.project_id = project_id
self.tracker_version = tracker_version
self.rev_id = rev_id
self.user_uuid = user_uuid
self.user_agent = user_agent
self.user_os = user_os
self.user_os_version = user_os_version
self.user_browser = user_browser
self.user_browser_version = user_browser_version
self.user_device = user_device
self.user_device_type = user_device_type
self.user_device_memory_size = user_device_memory_size
self.user_device_heap_size = user_device_heap_size
self.user_country = user_country
class SessionDisconnect(Message):
__id__ = 2
def __init__(self, timestamp):
self.timestamp = timestamp
class SessionEnd(Message):
__id__ = 3
__name__ = 'SessionEnd'
def __init__(self, timestamp):
self.timestamp = timestamp
class SetPageLocation(Message):
__id__ = 4
def __init__(self, url, referrer, navigation_start):
self.url = url
self.referrer = referrer
self.navigation_start = navigation_start
class SetViewportSize(Message):
__id__ = 5
def __init__(self, width, height):
self.width = width
self.height = height
class SetViewportScroll(Message):
__id__ = 6
def __init__(self, x, y):
self.x = x
self.y = y
class CreateDocument(Message):
__id__ = 7
class CreateElementNode(Message):
__id__ = 8
def __init__(self, id, parent_id, index, tag, svg):
self.id = id
self.parent_id = parent_id,
self.index = index
self.tag = tag
self.svg = svg
class CreateTextNode(Message):
__id__ = 9
def __init__(self, id, parent_id, index):
self.id = id
self.parent_id = parent_id
self.index = index
class MoveNode(Message):
__id__ = 10
def __init__(self, id, parent_id, index):
self.id = id
self.parent_id = parent_id
self.index = index
class RemoveNode(Message):
__id__ = 11
def __init__(self, id):
self.id = id
class SetNodeAttribute(Message):
__id__ = 12
def __init__(self, id, name: str, value: str):
self.id = id
self.name = name
self.value = value
class RemoveNodeAttribute(Message):
__id__ = 13
def __init__(self, id, name: str):
self.id = id
self.name = name
class SetNodeData(Message):
__id__ = 14
def __init__(self, id, data: str):
self.id = id
self.data = data
class SetCSSData(Message):
__id__ = 15
def __init__(self, id, data: str):
self.id = id
self.data = data
class SetNodeScroll(Message):
__id__ = 16
def __init__(self, id, x: int, y: int):
self.id = id
self.x = x
self.y = y
class SetInputTarget(Message):
__id__ = 17
def __init__(self, id, label: str):
self.id = id
self.label = label
class SetInputValue(Message):
__id__ = 18
def __init__(self, id, value: str, mask: int):
self.id = id
self.value = value
self.mask = mask
class SetInputChecked(Message):
__id__ = 19
def __init__(self, id, checked: bool):
self.id = id
self.checked = checked
class MouseMove(Message):
__id__ = 20
def __init__(self, x, y):
self.x = x
self.y = y
class MouseClick(Message):
__id__ = 21
def __init__(self, id, hesitation_time, label: str):
self.id = id
self.hesitation_time = hesitation_time
self.label = label
class ConsoleLog(Message):
__id__ = 22
def __init__(self, level: str, value: str):
self.level = level
self.value = value
class PageLoadTiming(Message):
__id__ = 23
def __init__(self, request_start, response_start, response_end, dom_content_loaded_event_start,
dom_content_loaded_event_end, load_event_start, load_event_end,
first_paint, first_contentful_paint):
self.request_start = request_start
self.response_start = response_start
self.response_end = response_end
self.dom_content_loaded_event_start = dom_content_loaded_event_start
self.dom_content_loaded_event_end = dom_content_loaded_event_end
self.load_event_start = load_event_start
self.load_event_end = load_event_end
self.first_paint = first_paint
self.first_contentful_paint = first_contentful_paint
class PageRenderTiming(Message):
__id__ = 24
def __init__(self, speed_index, visually_complete, time_to_interactive):
self.speed_index = speed_index
self.visually_complete = visually_complete
self.time_to_interactive = time_to_interactive
class JSException(Message):
__id__ = 25
def __init__(self, name: str, message: str, payload: str):
self.name = name
self.message = message
self.payload = payload
class RawErrorEvent(Message):
__id__ = 26
def __init__(self, timestamp, source: str, name: str, message: str,
payload: str):
self.timestamp = timestamp
self.source = source
self.name = name
self.message = message
self.payload = payload
class RawCustomEvent(Message):
__id__ = 27
def __init__(self, name: str, payload: str):
self.name = name
self.payload = payload
class UserID(Message):
__id__ = 28
def __init__(self, id: str):
self.id = id
class UserAnonymousID(Message):
__id__ = 29
def __init__(self, id: str):
self.id = id
class Metadata(Message):
__id__ = 30
def __init__(self, key: str, value: str):
self.key = key
self.value = value
class PerformanceTrack(Message):
__id__ = 49
def __init__(self, frames: int, ticks: int, total_js_heap_size,
used_js_heap_size):
self.frames = frames
self.ticks = ticks
self.total_js_heap_size = total_js_heap_size
self.used_js_heap_size = used_js_heap_size
class PageEvent(Message):
__id__ = 31
def __init__(self, message_id, timestamp, url: str, referrer: str,
loaded: bool, request_start, response_start, response_end,
dom_content_loaded_event_start, dom_content_loaded_event_end,
load_event_start, load_event_end, first_paint, first_contentful_paint,
speed_index, visually_complete, time_to_interactive):
self.message_id = message_id
self.timestamp = timestamp
self.url = url
self.referrer = referrer
self.loaded = loaded
self.request_start = request_start
self.response_start = response_start
self.response_end = response_end
self.dom_content_loaded_event_start = dom_content_loaded_event_start
self.dom_content_loaded_event_end = dom_content_loaded_event_end
self.load_event_start = load_event_start
self.load_event_end = load_event_end
self.first_paint = first_paint
self.first_contentful_paint = first_contentful_paint
self.speed_index = speed_index
self.visually_complete = visually_complete
self.time_to_interactive = time_to_interactive
class InputEvent(Message):
__id__ = 32
def __init__(self, message_id, timestamp, value: str, value_masked: bool, label: str):
self.message_id = message_id
self.timestamp = timestamp
self.value = value
self.value_masked = value_masked
self.label = label
class ClickEvent(Message):
__id__ = 33
def __init__(self, message_id, timestamp, hesitation_time, label: str):
self.message_id = message_id
self.timestamp = timestamp
self.hesitation_time = hesitation_time
self.label = label
class ErrorEvent(Message):
__id__ = 34
def __init__(self, message_id, timestamp, source: str, name: str, message: str,
payload: str):
self.message_id = message_id
self.timestamp = timestamp
self.source = source
self.name = name
self.message = message
self.payload = payload
class ResourceEvent(Message):
__id__ = 35
def __init__(self, message_id, timestamp, duration, ttfb, header_size, encoded_body_size,
decoded_body_size, url: str, type: str, success: bool, method: str, status):
self.message_id = message_id
self.timestamp = timestamp
self.duration = duration
self.ttfb = ttfb
self.header_size = header_size
self.encoded_body_size = encoded_body_size
self.decoded_body_size = decoded_body_size
self.url = url
self.type = type
self.success = success
self.method = method
self.status = status
class CustomEvent(Message):
__id__ = 36
def __init__(self, message_id, timestamp, name: str, payload: str):
self.message_id = message_id
self.timestamp = timestamp
self.name = name
self.payload = payload
class CSSInsertRule(Message):
__id__ = 37
def __init__(self, id, rule: str, index):
self.id = id
self.rule = rule
self.index = index
class CSSDeleteRule(Message):
__id__ = 38
def __init__(self, id, index):
self.id = id
self.index = index
class Fetch(Message):
__id__ = 39
def __init__(self, method: str, url: str, request: str, response: str, status,
timestamp, duration):
self.method = method
self.url = url
self.request = request
self.response = response
self.status = status
self.timestamp = timestamp
self.duration = duration
class Profiler(Message):
__id__ = 40
def __init__(self, name: str, duration, args: str, result: str):
self.name = name
self.duration = duration
self.args = args
self.result = result
class OTable(Message):
__id__ = 41
def __init__(self, key: str, value: str):
self.key = key
self.value = value
class StateAction(Message):
__id__ = 42
def __init__(self, type: str):
self.type = type
class StateActionEvent(Message):
__id__ = 43
def __init__(self, message_id, timestamp, type: str):
self.message_id = message_id
self.timestamp = timestamp
self.type = type
class Redux(Message):
__id__ = 44
def __init__(self, action: str, state: str, duration):
self.action = action
self.state = state
self.duration = duration
class Vuex(Message):
__id__ = 45
def __init__(self, mutation: str, state: str):
self.mutation = mutation
self.state = state
class MobX(Message):
__id__ = 46
def __init__(self, type: str, payload: str):
self.type = type
self.payload = payload
class NgRx(Message):
__id__ = 47
def __init__(self, action: str, state: str, duration):
self.action = action
self.state = state
self.duration = duration
class GraphQL(Message):
__id__ = 48
def __init__(self, operation_kind: str, operation_name: str,
variables: str, response: str):
self.operation_kind = operation_kind
self.operation_name = operation_name
self.variables = variables
self.response = response
class PerformanceTrack(Message):
__id__ = 49
def __init__(self, frames: int, ticks: int,
total_js_heap_size, used_js_heap_size):
self.frames = frames
self.ticks = ticks
self.total_js_heap_size = total_js_heap_size
self.used_js_heap_size = used_js_heap_size
class GraphQLEvent(Message):
__id__ = 50
def __init__(self, message_id, timestamp, name: str):
self.message_id = message_id
self.timestamp = timestamp
self.name = name
class DomDrop(Message):
__id__ = 52
def __init__(self, timestamp):
self.timestamp = timestamp
class ResourceTiming(Message):
__id__ = 53
def __init__(self, timestamp, duration, ttfb, header_size, encoded_body_size,
decoded_body_size, url, initiator):
self.timestamp = timestamp
self.duration = duration
self.ttfb = ttfb
self.header_size = header_size
self.encoded_body_size = encoded_body_size
self.decoded_body_size = decoded_body_size
self.url = url
self.initiator = initiator
class ConnectionInformation(Message):
__id__ = 54
def __init__(self, downlink, type: str):
self.downlink = downlink
self.type = type
class SetPageVisibility(Message):
__id__ = 55
def __init__(self, hidden: bool):
self.hidden = hidden
class PerformanceTrackAggr(Message):
__id__ = 56
def __init__(self, timestamp_start, timestamp_end, min_fps, avg_fps,
max_fps, min_cpu, avg_cpu, max_cpu,
min_total_js_heap_size, avg_total_js_heap_size,
max_total_js_heap_size, min_used_js_heap_size,
avg_used_js_heap_size, max_used_js_heap_size
):
self.timestamp_start = timestamp_start
self.timestamp_end = timestamp_end
self.min_fps = min_fps
self.avg_fps = avg_fps
self.max_fps = max_fps
self.min_cpu = min_cpu
self.avg_cpu = avg_cpu
self.max_cpu = max_cpu
self.min_total_js_heap_size = min_total_js_heap_size
self.avg_total_js_heap_size = avg_total_js_heap_size
self.max_total_js_heap_size = max_total_js_heap_size
self.min_used_js_heap_size = min_used_js_heap_size
self.avg_used_js_heap_size = avg_used_js_heap_size
self.max_used_js_heap_size = max_used_js_heap_size
class LongTask(Message):
__id__ = 59
def __init__(self, timestamp, duration, context, container_type, container_src: str,
container_id: str, container_name: str):
self.timestamp = timestamp
self.duration = duration
self.context = context
self.container_type = container_type
self.container_src = container_src
self.container_id = container_id
self.container_name = container_name
class SetNodeURLBasedAttribute(Message):
__id__ = 60
def __init__(self, id, name: str, value: str, base_url: str):
self.id = id
self.name = name
self.value = value
self.base_url = base_url
class SetStyleData(Message):
__id__ = 61
def __init__(self, id, data: str, base_url: str):
self.id = id
self.data = data
self.base_url = base_url
class IssueEvent(Message):
__id__ = 62
def __init__(self, message_id, timestamp, type: str, context_string: str,
context: str, payload: str):
self.message_id = message_id
self.timestamp = timestamp
self.type = type
self.context_string = context_string
self.context = context
self.payload = payload
class TechnicalInfo(Message):
__id__ = 63
def __init__(self, type: str, value: str):
self.type = type
self.value = value
class CustomIssue(Message):
__id__ = 64
def __init__(self, name: str, payload: str):
self.name = name
self.payload = payload
class PageClose(Message):
__id__ = 65
class IOSSessionStart(Message):
__id__ = 90
def __init__(self, timestamp, project_id, tracker_version: str,
rev_id: str, user_uuid: str, user_os: str, user_os_version: str,
user_device: str, user_device_type: str, user_country: str):
self.timestamp = timestamp
self.project_id = project_id
self.tracker_version = tracker_version
self.rev_id = rev_id
self.user_uuid = user_uuid
self.user_os = user_os
self.user_os_version = user_os_version
self.user_device = user_device
self.user_device_type = user_device_type
self.user_country = user_country
class IOSSessionEnd(Message):
__id__ = 91
def __init__(self, timestamp):
self.timestamp = timestamp
class IOSMetadata(Message):
__id__ = 92
def __init__(self, timestamp, length, key: str, value: str):
self.timestamp = timestamp
self.length = length
self.key = key
self.value = value
class IOSUserID(Message):
__id__ = 94
def __init__(self, timestamp, length, value: str):
self.timestamp = timestamp
self.length = length
self.value = value
class IOSUserAnonymousID(Message):
__id__ = 95
def __init__(self, timestamp, length, value: str):
self.timestamp = timestamp
self.length = length
self.value = value
class IOSScreenLeave(Message):
__id__ = 99
def __init__(self, timestamp, length, title: str, view_name: str):
self.timestamp = timestamp
self.length = length
self.title = title
self.view_name = view_name
class IOSLog(Message):
__id__ = 103
def __init__(self, timestamp, length, severity: str, content: str):
self.timestamp = timestamp
self.length = length
self.severity = severity
self.content = content
class IOSInternalError(Message):
__id__ = 104
def __init__(self, timestamp, length, content: str):
self.timestamp = timestamp
self.length = length
self.content = content
class IOSPerformanceAggregated(Message):
__id__ = 110
def __init__(self, timestamp_start, timestamp_end, min_fps, avg_fps,
max_fps, min_cpu, avg_cpu, max_cpu,
min_memory, avg_memory, max_memory,
min_battery, avg_battery, max_battery
):
self.timestamp_start = timestamp_start
self.timestamp_end = timestamp_end
self.min_fps = min_fps
self.avg_fps = avg_fps
self.max_fps = max_fps
self.min_cpu = min_cpu
self.avg_cpu = avg_cpu
self.max_cpu = max_cpu
self.min_memory = min_memory
self.avg_memory = avg_memory
self.max_memory = max_memory
self.min_battery = min_battery
self.avg_battery = avg_battery
self.max_battery = max_battery

View file

@ -0,0 +1,43 @@
certifi==2020.12.5
chardet==4.0.0
clickhouse-driver==0.2.0
clickhouse-sqlalchemy==0.1.5
idna==2.10
kafka-python==2.0.2
pandas==1.2.3
psycopg2-binary==2.8.6
pytz==2021.1
requests==2.25.1
SQLAlchemy==1.3.23
tzlocal==2.1
urllib3==1.26.3
PyYAML==5.4.1
pandas-redshift
awswrangler
google-auth-httplib2
google-auth-oauthlib
google-cloud-bigquery
pandas-gbq
snowflake-connector-python==2.4.1
snowflake-sqlalchemy==1.2.4
asn1crypto==1.4.0
azure-common==1.1.25
azure-core==1.8.2
azure-storage-blob==12.5.0
boto3==1.15.18
botocore==1.18.18
cffi==1.14.3
cryptography==2.9.2
isodate==0.6.0
jmespath==0.10.0
msrest==0.6.19
oauthlib==3.1.0
oscrypto==1.2.1
pycparser==2.20
pycryptodomex==3.9.8
PyJWT==1.7.1
pyOpenSSL==19.1.0
python-dateutil==2.8.1
requests-oauthlib==1.3.0
s3transfer==0.3.3
six==1.15.0

View file

@ -0,0 +1,56 @@
CREATE TABLE IF NOT EXISTS connector_events
(
sessionid UInt64,
connectioninformation_downlink Nullable(UInt64),
connectioninformation_type Nullable(String),
consolelog_level Nullable(String),
consolelog_value Nullable(String),
customevent_messageid Nullable(UInt64),
customevent_name Nullable(String),
customevent_payload Nullable(String),
customevent_timestamp Nullable(UInt64),
errorevent_message Nullable(String),
errorevent_messageid Nullable(UInt64),
errorevent_name Nullable(String),
errorevent_payload Nullable(String),
errorevent_source Nullable(String),
errorevent_timestamp Nullable(UInt64),
jsexception_message Nullable(String),
jsexception_name Nullable(String),
jsexception_payload Nullable(String),
metadata_key Nullable(String),
metadata_value Nullable(String),
mouseclick_id Nullable(UInt64),
mouseclick_hesitationtime Nullable(UInt64),
mouseclick_label Nullable(String),
pageevent_firstcontentfulpaint Nullable(UInt64),
pageevent_firstpaint Nullable(UInt64),
pageevent_messageid Nullable(UInt64),
pageevent_referrer Nullable(String),
pageevent_speedindex Nullable(UInt64),
pageevent_timestamp Nullable(UInt64),
pageevent_url Nullable(String),
pagerendertiming_timetointeractive Nullable(UInt64),
pagerendertiming_visuallycomplete Nullable(UInt64),
rawcustomevent_name Nullable(String),
rawcustomevent_payload Nullable(String),
setviewportsize_height Nullable(UInt64),
setviewportsize_width Nullable(UInt64),
timestamp_timestamp Nullable(UInt64),
user_anonymous_id Nullable(String),
user_id Nullable(String),
issueevent_messageid Nullable(UInt64),
issueevent_timestamp Nullable(UInt64),
issueevent_type Nullable(String),
issueevent_contextstring Nullable(String),
issueevent_context Nullable(String),
issueevent_payload Nullable(String),
customissue_name Nullable(String),
customissue_payload Nullable(String),
received_at UInt64,
batch_order_number UInt64
) ENGINE = MergeTree()
PARTITION BY intDiv(received_at, 100000)
ORDER BY (received_at, batch_order_number, sessionid)
PRIMARY KEY (received_at)
SETTINGS use_minimalistic_part_header_in_zookeeper=1, index_granularity=1000;

View file

@ -0,0 +1,52 @@
CREATE TABLE IF NOT EXISTS connector_events_buffer
(
sessionid UInt64,
connectioninformation_downlink Nullable(UInt64),
connectioninformation_type Nullable(String),
consolelog_level Nullable(String),
consolelog_value Nullable(String),
customevent_messageid Nullable(UInt64),
customevent_name Nullable(String),
customevent_payload Nullable(String),
customevent_timestamp Nullable(UInt64),
errorevent_message Nullable(String),
errorevent_messageid Nullable(UInt64),
errorevent_name Nullable(String),
errorevent_payload Nullable(String),
errorevent_source Nullable(String),
errorevent_timestamp Nullable(UInt64),
jsexception_message Nullable(String),
jsexception_name Nullable(String),
jsexception_payload Nullable(String),
metadata_key Nullable(String),
metadata_value Nullable(String),
mouseclick_id Nullable(UInt64),
mouseclick_hesitationtime Nullable(UInt64),
mouseclick_label Nullable(String),
pageevent_firstcontentfulpaint Nullable(UInt64),
pageevent_firstpaint Nullable(UInt64),
pageevent_messageid Nullable(UInt64),
pageevent_referrer Nullable(String),
pageevent_speedindex Nullable(UInt64),
pageevent_timestamp Nullable(UInt64),
pageevent_url Nullable(String),
pagerendertiming_timetointeractive Nullable(UInt64),
pagerendertiming_visuallycomplete Nullable(UInt64),
rawcustomevent_name Nullable(String),
rawcustomevent_payload Nullable(String),
setviewportsize_height Nullable(UInt64),
setviewportsize_width Nullable(UInt64),
timestamp_timestamp Nullable(UInt64),
user_anonymous_id Nullable(String),
user_id Nullable(String),
issueevent_messageid Nullable(UInt64),
issueevent_timestamp Nullable(UInt64),
issueevent_type Nullable(String),
issueevent_contextstring Nullable(String),
issueevent_context Nullable(String),
issueevent_payload Nullable(String),
customissue_name Nullable(String),
customissue_payload Nullable(String),
received_at UInt64,
batch_order_number UInt64
) ENGINE = Buffer(default, connector_events, 16, 10, 120, 10000, 1000000, 10000, 100000000);

View file

@ -0,0 +1,52 @@
CREATE TABLE IF NOT EXISTS connector_user_sessions
(
-- SESSION METADATA
sessionid UInt64,
user_agent Nullable(String),
user_browser Nullable(String),
user_browser_version Nullable(String),
user_country Nullable(String),
user_device Nullable(String),
user_device_heap_size Nullable(UInt64),
user_device_memory_size Nullable(UInt64),
user_device_type Nullable(String),
user_os Nullable(String),
user_os_version Nullable(String),
user_uuid Nullable(String),
connection_effective_bandwidth Nullable(UInt64), -- Downlink
connection_type Nullable(String), --"bluetooth", "cellular", "ethernet", "none", "wifi", "wimax", "other", "unknown"
metadata_key Nullable(String),
metadata_value Nullable(String),
referrer Nullable(String),
user_anonymous_id Nullable(String),
user_id Nullable(String),
-- TIME
session_start_timestamp Nullable(UInt64),
session_end_timestamp Nullable(UInt64),
session_duration Nullable(UInt64),
-- SPEED INDEX RELATED
first_contentful_paint Nullable(UInt64),
speed_index Nullable(UInt64),
visually_complete Nullable(UInt64),
timing_time_to_interactive Nullable(UInt64),
-- PERFORMANCE
avg_cpu Nullable(UInt64),
avg_fps Nullable(UInt64),
max_cpu Nullable(UInt64),
max_fps Nullable(UInt64),
max_total_js_heap_size Nullable(UInt64),
max_used_js_heap_size Nullable(UInt64),
-- ISSUES AND EVENTS
js_exceptions_count Nullable(UInt64),
long_tasks_total_duration Nullable(UInt64),
long_tasks_max_duration Nullable(UInt64),
long_tasks_count Nullable(UInt64),
inputs_count Nullable(UInt64),
clicks_count Nullable(UInt64),
issues_count Nullable(UInt64),
issues Array(Nullable(String)),
urls_count Nullable(UInt64),
urls Array(Nullable(String))
) ENGINE = MergeTree()
ORDER BY (sessionid)
PRIMARY KEY (sessionid);

View file

@ -0,0 +1,50 @@
CREATE TABLE IF NOT EXISTS connector_user_sessions_buffer
(
-- SESSION METADATA
sessionid UInt64,
user_agent Nullable(String),
user_browser Nullable(String),
user_browser_version Nullable(String),
user_country Nullable(String),
user_device Nullable(String),
user_device_heap_size Nullable(UInt64),
user_device_memory_size Nullable(UInt64),
user_device_type Nullable(String),
user_os Nullable(String),
user_os_version Nullable(String),
user_uuid Nullable(String),
connection_effective_bandwidth Nullable(UInt64), -- Downlink
connection_type Nullable(String), --"bluetooth", "cellular", "ethernet", "none", "wifi", "wimax", "other", "unknown"
metadata_key Nullable(String),
metadata_value Nullable(String),
referrer Nullable(String),
user_anonymous_id Nullable(String),
user_id Nullable(String),
-- TIME
session_start_timestamp Nullable(UInt64),
session_end_timestamp Nullable(UInt64),
session_duration Nullable(UInt64),
-- SPEED INDEX RELATED
first_contentful_paint Nullable(UInt64),
speed_index Nullable(UInt64),
visually_complete Nullable(UInt64),
timing_time_to_interactive Nullable(UInt64),
-- PERFORMANCE
avg_cpu Nullable(UInt64),
avg_fps Nullable(UInt64),
max_cpu Nullable(UInt64),
max_fps Nullable(UInt64),
max_total_js_heap_size Nullable(UInt64),
max_used_js_heap_size Nullable(UInt64),
-- ISSUES AND EVENTS
js_exceptions_count Nullable(UInt64),
long_tasks_total_duration Nullable(UInt64),
long_tasks_max_duration Nullable(UInt64),
long_tasks_count Nullable(UInt64),
inputs_count Nullable(UInt64),
clicks_count Nullable(UInt64),
issues_count Nullable(UInt64),
issues Array(Nullable(String)),
urls_count Nullable(UInt64),
urls Array(Nullable(String))
) ENGINE = Buffer(default, connector_user_sessions, 16, 10, 120, 10000, 1000000, 10000, 100000000);

View file

@ -0,0 +1,52 @@
CREATE TABLE IF NOT EXISTS connector_events
(
sessionid bigint,
connectioninformation_downlink bigint,
connectioninformation_type text,
consolelog_level text,
consolelog_value text,
customevent_messageid bigint,
customevent_name text,
customevent_payload text,
customevent_timestamp bigint,
errorevent_message text,
errorevent_messageid bigint,
errorevent_name text,
errorevent_payload text,
errorevent_source text,
errorevent_timestamp bigint,
jsexception_message text,
jsexception_name text,
jsexception_payload text,
metadata_key text,
metadata_value text,
mouseclick_id bigint,
mouseclick_hesitationtime bigint,
mouseclick_label text,
pageevent_firstcontentfulpaint bigint,
pageevent_firstpaint bigint,
pageevent_messageid bigint,
pageevent_referrer text,
pageevent_speedindex bigint,
pageevent_timestamp bigint,
pageevent_url text,
pagerendertiming_timetointeractive bigint,
pagerendertiming_visuallycomplete bigint,
rawcustomevent_name text,
rawcustomevent_payload text,
setviewportsize_height bigint,
setviewportsize_width bigint,
timestamp_timestamp bigint,
user_anonymous_id text,
user_id text,
issueevent_messageid bigint,
issueevent_timestamp bigint,
issueevent_type text,
issueevent_contextstring text,
issueevent_context text,
issueevent_payload text,
customissue_name text,
customissue_payload text,
received_at bigint,
batch_order_number bigint
);

View file

@ -0,0 +1,50 @@
CREATE TABLE IF NOT EXISTS connector_user_sessions
(
-- SESSION METADATA
sessionid bigint,
user_agent text,
user_browser text,
user_browser_version text,
user_country text,
user_device text,
user_device_heap_size bigint,
user_device_memory_size bigint,
user_device_type text,
user_os text,
user_os_version text,
user_uuid text,
connection_effective_bandwidth bigint, -- Downlink
connection_type text, --"bluetooth", "cellular", "ethernet", "none", "wifi", "wimax", "other", "unknown"
metadata_key text,
metadata_value text,
referrer text,
user_anonymous_id text,
user_id text,
-- TIME
session_start_timestamp bigint,
session_end_timestamp bigint,
session_duration bigint,
-- SPEED INDEX RELATED
first_contentful_paint bigint,
speed_index bigint,
visually_complete bigint,
timing_time_to_interactive bigint,
-- PERFORMANCE
avg_cpu bigint,
avg_fps bigint,
max_cpu bigint,
max_fps bigint,
max_total_js_heap_size bigint,
max_used_js_heap_size bigint,
-- ISSUES AND EVENTS
js_exceptions_count bigint,
long_tasks_total_duration bigint,
long_tasks_max_duration bigint,
long_tasks_count bigint,
inputs_count bigint,
clicks_count bigint,
issues_count bigint,
issues text[],
urls_count bigint,
urls text[]
);

View file

@ -0,0 +1,52 @@
CREATE TABLE connector_events
(
sessionid BIGINT,
connectioninformation_downlink BIGINT,
connectioninformation_type VARCHAR(300),
consolelog_level VARCHAR(300),
consolelog_value VARCHAR(300),
customevent_messageid BIGINT,
customevent_name VARCHAR(300),
customevent_payload VARCHAR(300),
customevent_timestamp BIGINT,
errorevent_message VARCHAR(300),
errorevent_messageid BIGINT,
errorevent_name VARCHAR(300),
errorevent_payload VARCHAR(300),
errorevent_source VARCHAR(300),
errorevent_timestamp BIGINT,
jsexception_message VARCHAR(300),
jsexception_name VARCHAR(300),
jsexception_payload VARCHAR(300),
metadata_key VARCHAR(300),
metadata_value VARCHAR(300),
mouseclick_id BIGINT,
mouseclick_hesitationtime BIGINT,
mouseclick_label VARCHAR(300),
pageevent_firstcontentfulpaint BIGINT,
pageevent_firstpaint BIGINT,
pageevent_messageid BIGINT,
pageevent_referrer VARCHAR(300),
pageevent_speedindex BIGINT,
pageevent_timestamp BIGINT,
pageevent_url VARCHAR(300),
pagerendertiming_timetointeractive BIGINT,
pagerendertiming_visuallycomplete BIGINT,
rawcustomevent_name VARCHAR(300),
rawcustomevent_payload VARCHAR(300),
setviewportsize_height BIGINT,
setviewportsize_width BIGINT,
timestamp_timestamp BIGINT,
user_anonymous_id VARCHAR(300),
user_id VARCHAR(300),
issueevent_messageid BIGINT,
issueevent_timestamp BIGINT,
issueevent_type VARCHAR(300),
issueevent_contextstring VARCHAR(300),
issueevent_context VARCHAR(300),
issueevent_payload VARCHAR(300),
customissue_name VARCHAR(300),
customissue_payload VARCHAR(300),
received_at BIGINT,
batch_order_number BIGINT
);

View file

@ -0,0 +1,50 @@
CREATE TABLE connector_user_sessions
(
-- SESSION METADATA
sessionid bigint,
user_agent VARCHAR,
user_browser VARCHAR,
user_browser_version VARCHAR,
user_country VARCHAR,
user_device VARCHAR,
user_device_heap_size bigint,
user_device_memory_size bigint,
user_device_type VARCHAR,
user_os VARCHAR,
user_os_version VARCHAR,
user_uuid VARCHAR,
connection_effective_bandwidth bigint, -- Downlink
connection_type VARCHAR, --"bluetooth", "cellular", "ethernet", "none", "wifi", "wimax", "other", "unknown"
metadata_key VARCHAR,
metadata_value VARCHAR,
referrer VARCHAR,
user_anonymous_id VARCHAR,
user_id VARCHAR,
-- TIME
session_start_timestamp bigint,
session_end_timestamp bigint,
session_duration bigint,
-- SPEED INDEX RELATED
first_contentful_paint bigint,
speed_index bigint,
visually_complete bigint,
timing_time_to_interactive bigint,
-- PERFORMANCE
avg_cpu bigint,
avg_fps bigint,
max_cpu bigint,
max_fps bigint,
max_total_js_heap_size bigint,
max_used_js_heap_size bigint,
-- ISSUES AND EVENTS
js_exceptions_count bigint,
long_tasks_total_duration bigint,
long_tasks_max_duration bigint,
long_tasks_count bigint,
inputs_count bigint,
clicks_count bigint,
issues_count bigint,
issues VARCHAR,
urls_count bigint,
urls VARCHAR
);

View file

@ -0,0 +1,52 @@
CREATE TABLE IF NOT EXISTS connector_events
(
sessionid bigint,
connectioninformation_downlink bigint,
connectioninformation_type text,
consolelog_level text,
consolelog_value text,
customevent_messageid bigint,
customevent_name text,
customevent_payload text,
customevent_timestamp bigint,
errorevent_message text,
errorevent_messageid bigint,
errorevent_name text,
errorevent_payload text,
errorevent_source text,
errorevent_timestamp bigint,
jsexception_message text,
jsexception_name text,
jsexception_payload text,
metadata_key text,
metadata_value text,
mouseclick_id bigint,
mouseclick_hesitationtime bigint,
mouseclick_label text,
pageevent_firstcontentfulpaint bigint,
pageevent_firstpaint bigint,
pageevent_messageid bigint,
pageevent_referrer text,
pageevent_speedindex bigint,
pageevent_timestamp bigint,
pageevent_url text,
pagerendertiming_timetointeractive bigint,
pagerendertiming_visuallycomplete bigint,
rawcustomevent_name text,
rawcustomevent_payload text,
setviewportsize_height bigint,
setviewportsize_width bigint,
timestamp_timestamp bigint,
user_anonymous_id text,
user_id text,
issueevent_messageid bigint,
issueevent_timestamp bigint,
issueevent_type text,
issueevent_contextstring text,
issueevent_context text,
issueevent_payload text,
customissue_name text,
customissue_payload text,
received_at bigint,
batch_order_number bigint
);

View file

@ -0,0 +1,50 @@
CREATE TABLE IF NOT EXISTS connector_user_sessions
(
-- SESSION METADATA
sessionid bigint,
user_agent text,
user_browser text,
user_browser_version text,
user_country text,
user_device text,
user_device_heap_size bigint,
user_device_memory_size bigint,
user_device_type text,
user_os text,
user_os_version text,
user_uuid text,
connection_effective_bandwidth bigint, -- Downlink
connection_type text, --"bluetooth", "cellular", "ethernet", "none", "wifi", "wimax", "other", "unknown"
metadata_key text,
metadata_value text,
referrer text,
user_anonymous_id text,
user_id text,
-- TIME
session_start_timestamp bigint,
session_end_timestamp bigint,
session_duration bigint,
-- SPEED INDEX RELATED
first_contentful_paint bigint,
speed_index bigint,
visually_complete bigint,
timing_time_to_interactive bigint,
-- PERFORMANCE
avg_cpu bigint,
avg_fps bigint,
max_cpu bigint,
max_fps bigint,
max_total_js_heap_size bigint,
max_used_js_heap_size bigint,
-- ISSUES AND EVENTS
js_exceptions_count bigint,
long_tasks_total_duration bigint,
long_tasks_max_duration bigint,
long_tasks_count bigint,
inputs_count bigint,
clicks_count bigint,
issues_count bigint,
issues array,
urls_count bigint,
urls array
);

View file

@ -0,0 +1,7 @@
table_id='{project_id}.{dataset}.{table}'
project_id=name-123456
dataset=datasetname
sessions_table=connector_user_sessions
events_table_name=connector_events
events_detailed_table_name=connector_events_detailed
level=normal

View file

@ -0,0 +1,12 @@
{
"type": "service_account",
"project_id": "aaaaaa-123456",
"private_key_id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"private_key": "-----BEGIN PRIVATE KEY-----\some_letters_and_numbers\n-----END PRIVATE KEY-----\n",
"client_email": "abc-aws@aaaaa-123456.iam.gserviceaccount.com",
"client_id": "12345678910111213",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/bigquery-connector-aws%40asayer-143408.iam.gserviceaccount.com"
}

View file

@ -0,0 +1,7 @@
connect_str='clickhouse+native://{address}/{database}'
address=1.1.1.1:9000
database=default
sessions_table=connector_user_sessions_buffer
events_table_name=connector_events_buffer
events_detailed_table_name=connector_events_detailed_buffer
level=normal

View file

@ -0,0 +1,10 @@
connect_str='postgresql://{user}:{password}@{address}:{port}/{database}'
address=1.1.1.1
port=8080
database=dev
user=qwerty
password=qwertyQWERTY12345
sessions_table=connector_user_sessions
events_table_name=connector_events
events_detailed_table_name=connector_events_detailed
level=normal

View file

@ -0,0 +1,15 @@
aws_access_key_id=QWERTYQWERTYQWERTY
aws_secret_access_key=abcdefgh12345678
region_name=eu-central-3
bucket=name_of_the_bucket
subdirectory=name_of_the_bucket_subdirectory
connect_str='postgresql://{user}:{password}@{address}:{port}/{schema}'
address=redshift-cluster-1.aaaaaaaaa.eu-central-3.redshift.amazonaws.com
port=5439
schema=dev
user=admin
password=admin
sessions_table=connector_user_sessions
events_table_name=connector_events
events_detailed_table_name=connector_events_detailed
level=normal

View file

@ -0,0 +1,11 @@
connect_str='snowflake://{user}:{password}@{account}/{database}/{schema}?warehouse={warehouse}'
user=admin
password=12345678
account=aaaaaaa.eu-central-3
database=dev
schema=public
warehouse=SOME_WH
sessions_table=connector_user_sessions
events_table_name=connector_events
events_detailed_table_name=connector_events_detailed
level=normal