Redshift connector (#1170)

* Updated dependancies for redshift connector, changed os module for python-decouple module

* Updated service and images

* Updated message protocol, added exception for BatchMetadata when version is 0 (we apply old read method)

* fixed load error from s3 to redshift. null values for string columns are now empty strings ("")

* Added file test consumer_async.py: reads every 3 minutes kafka raw and send task in background to upload to cloud

* Added method to skip messages that are not inserted to cloud

* Added logs into consumer_async. Changed urls and issues in sessions table from list to string

* Split between messages for sessions table and for events table

* Updated redshift tables

* Fixed small issue in query redshift_sessions.sql

* Updated Dockerfiles. Cleaned logs of consumer_async. Updated/Fixed tables. Transformed Nan as NULL for VARCHAR columns

* Added error handler for sql dropped connection

* chore(docker): Optimize docker builds

Signed-off-by: rjshrjndrn <rjshrjndrn@gmail.com>

* Variables renamed

* Adding compression libraries

* Set default value of count events to 0 (instead of NULL) when event did not occur

* Added support specific  project tracking. Added PG handler to connect to sessions table

* Added method to update values in db connection for sessions ended and restarted

* Removing intelligent file copying

* chore(connector): Build file

Signed-off-by: rjshrjndrn <rjshrjndrn@gmail.com>

* Adding connection pool for pg

* Renaming and optimizing

* Fixed issue of missing information of sessions

---------

Signed-off-by: rjshrjndrn <rjshrjndrn@gmail.com>
Co-authored-by: rjshrjndrn <rjshrjndrn@gmail.com>
This commit is contained in:
MauricioGarciaS 2023-05-02 14:02:57 +02:00 committed by GitHub
parent 28ab64595f
commit 28182b951e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 1363 additions and 1217 deletions

3
ee/connectors/Readme.md Normal file
View file

@ -0,0 +1,3 @@
## Build
docker build -f deploy/Dockerfile_redshift -t {tag} .

View file

@ -1,19 +1,28 @@
import os from decouple import config
from confluent_kafka import Consumer from confluent_kafka import Consumer
from datetime import datetime from datetime import datetime
from collections import defaultdict from collections import defaultdict
import json
from time import time
#from msgcodec.codec import MessageCodec
from msgcodec.msgcodec import MessageCodec from msgcodec.msgcodec import MessageCodec
from msgcodec.messages import SessionEnd from msgcodec.messages import SessionEnd
print('[INFO] Importing DBConnection...')
from db.api import DBConnection from db.api import DBConnection
print('[INFO] Importing from models..')
from db.models import events_detailed_table_name, events_table_name, sessions_table_name from db.models import events_detailed_table_name, events_table_name, sessions_table_name
print('[INFO] Importing from writer..')
from db.writer import insert_batch from db.writer import insert_batch
print('[INFO] Importing from handler..')
from handler import handle_message, handle_normal_message, handle_session from handler import handle_message, handle_normal_message, handle_session
DATABASE = os.environ['DATABASE_NAME'] DATABASE = config('DATABASE_NAME')
LEVEL = os.environ['level'] LEVEL = config('LEVEL')
print(f'[INFO] Connecting to database {DATABASE}')
db = DBConnection(DATABASE) db = DBConnection(DATABASE)
print('Connected successfully')
if LEVEL == 'detailed': if LEVEL == 'detailed':
table_name = events_detailed_table_name table_name = events_detailed_table_name
@ -22,30 +31,34 @@ elif LEVEL == 'normal':
def main(): def main():
batch_size = 4000 batch_size = config('events_batch_size', default=4000, cast=int)
sessions_batch_size = 400 sessions_batch_size = config('sessions_batch_size', default=400, cast=int)
batch = [] batch = []
sessions = defaultdict(lambda: None) sessions = defaultdict(lambda: None)
sessions_batch = [] sessions_batch = []
codec = MessageCodec() codec = MessageCodec()
consumer = Consumer({ ssl_protocol = config('SSL_ENABLED', default=True, cast=bool)
"security.protocol": "SSL", consumer_settings = {
"bootstrap.servers": ",".join([os.environ['KAFKA_SERVER_1'], "bootstrap.servers": config('KAFKA_SERVER'),
os.environ['KAFKA_SERVER_2']]),
"group.id": f"connector_{DATABASE}", "group.id": f"connector_{DATABASE}",
"auto.offset.reset": "earliest", "auto.offset.reset": "earliest",
"enable.auto.commit": False "enable.auto.commit": False
}) }
if ssl_protocol:
consumer_settings['security.protocol'] = 'SSL'
consumer = Consumer(consumer_settings)
consumer.subscribe(["raw", "raw_ios"]) consumer.subscribe([config("topic", default="saas-raw")])
print("Kafka consumer subscribed") print("Kafka consumer subscribed")
t_ = time()
while True: while True:
msg.consumer.poll(1.0) msg = consumer.poll(1.0)
if msg is None: if msg is None:
continue continue
messages = codec.decode_detailed(msg.value) #value = json.loads(msg.value().decode('utf-8'))
session_id = codec.decode_key(msg.key) messages = codec.decode_detailed(msg.value())
session_id = codec.decode_key(msg.key())
if messages is None: if messages is None:
print('-') print('-')
continue continue
@ -68,7 +81,11 @@ def main():
# try to insert sessions # try to insert sessions
if len(sessions_batch) >= sessions_batch_size: if len(sessions_batch) >= sessions_batch_size:
t2 = time()
attempt_session_insert(sessions_batch) attempt_session_insert(sessions_batch)
t2_ = time()
print(f'[INFO] Inserted sessions into Redshift - time spent: {t2_-t2}')
t_ += t2_-t2
for s in sessions_batch: for s in sessions_batch:
try: try:
del sessions[s.sessionid] del sessions[s.sessionid]
@ -86,10 +103,15 @@ def main():
# insert a batch of events # insert a batch of events
if len(batch) >= batch_size: if len(batch) >= batch_size:
t1 = time()
print(f'[INFO] Spent time filling ({batch_size})-batch: {t1-t_}')
attempt_batch_insert(batch) attempt_batch_insert(batch)
t1_ = time()
t_ = t1_
print(f'[INFO] Inserted events into Redshift - time spent: {t1_-t1}')
batch = [] batch = []
consumer.commit() consumer.commit()
print("sessions in cache:", len(sessions)) print("[INFO] sessions in cache:", len(sessions))
def attempt_session_insert(sess_batch): def attempt_session_insert(sess_batch):
@ -134,4 +156,6 @@ def decode_key(b) -> int:
return decoded return decoded
if __name__ == '__main__': if __name__ == '__main__':
print('[INFO] Setup complete')
print('[INFO] Starting script')
main() main()

View file

@ -0,0 +1,229 @@
from numpy._typing import _16Bit
from decouple import config, Csv
from confluent_kafka import Consumer
from datetime import datetime
from collections import defaultdict
import json
import asyncio
from time import time, sleep
from copy import deepcopy
from msgcodec.msgcodec import MessageCodec
from msgcodec.messages import SessionStart, SessionEnd
from db.api import DBConnection
from db.models import events_detailed_table_name, events_table_name, sessions_table_name
from db.writer import insert_batch, update_batch
from handler import handle_message, handle_normal_message, handle_session
from utils.cache import ProjectFilter as PF
from utils import pg_client
from psycopg2 import InterfaceError
def process_message(msg, codec, sessions, batch, sessions_batch, interesting_sessions, interesting_events, EVENT_TYPE, projectFilter):
if msg is None:
return
messages = codec.decode_detailed(msg.value())
session_id = codec.decode_key(msg.key())
if messages is None:
print('-')
return
elif not projectFilter.is_valid(session_id):
# We check using projectFilter if session_id is from the selected projects
return
for message in messages:
if message.__id__ in interesting_events:
if EVENT_TYPE == 'detailed':
n = handle_message(message)
elif EVENT_TYPE == 'normal':
n = handle_normal_message(message)
if message.__id__ in interesting_sessions:
# Here we create the session if not exists or append message event if session exists
sessions[session_id] = handle_session(sessions[session_id], message)
if sessions[session_id]:
sessions[session_id].sessionid = session_id
projectFilter.cached_sessions.add(session_id)
if isinstance(message, SessionEnd):
# Here only if session exists and we get sessionend we start cleanup
if sessions[session_id].session_start_timestamp:
projectFilter.handle_clean()
old_status = projectFilter.cached_sessions.close(session_id)
sessions_batch.append((old_status, deepcopy(sessions[session_id])))
sessions_to_delete = projectFilter.cached_sessions.clear_sessions()
for sess_id in sessions_to_delete:
try:
del sessions[sess_id]
except KeyError:
print('[INFO] Session already deleted')
else:
print('[WARN] Session not started received SessionEnd message')
del sessions[session_id]
if message.__id__ in interesting_events:
if n:
n.sessionid = session_id
n.received_at = int(datetime.now().timestamp() * 1000)
n.batch_order_number = len(batch)
batch.append(n)
else:
continue
def attempt_session_insert(sess_batch, db, sessions_table_name, try_=0):
if sess_batch:
try:
print("inserting sessions...")
insert_batch(db, sess_batch, table=sessions_table_name, level='sessions')
print("inserted sessions succesfully")
except TypeError as e:
print("Type conversion error")
print(repr(e))
except ValueError as e:
print("Message value could not be processed or inserted correctly")
print(repr(e))
except InterfaceError as e:
if try_ < 3:
try_ += 1
sleep(try_*2)
attempt_session_insert(sess_batch, db, sessions_table_name, try_)
except Exception as e:
print(repr(e))
def attempt_session_update(sess_batch, db, sessions_table_name):
if sess_batch:
try:
print('updating sessions')
update_batch(db, sess_batch, table=sessions_table_name)
except TypeError as e:
print('Type conversion error')
print(repr(e))
except ValueError as e:
print('Message value could not be processed or inserted correctly')
print(repr(e))
except InterfaceError as e:
print('Error while trying to update session into datawarehouse')
print(repr(e))
except Exception as e:
print(repr(e))
def attempt_batch_insert(batch, db, table_name, EVENT_TYPE, try_=0):
# insert a batch
try:
print("inserting...")
insert_batch(db=db, batch=batch, table=table_name, level=EVENT_TYPE)
print("inserted succesfully")
except TypeError as e:
print("Type conversion error")
print(repr(e))
except ValueError as e:
print("Message value could not be processed or inserted correctly")
print(repr(e))
except InterfaceError as e:
if try_ < 3:
try_ += 1
sleep(try_*2)
attempt_batch_insert(batch, db, table_name, EVENT_TYPE, try_)
else:
# TODO: Restart redshift
print(repr(e))
except Exception as e:
print(repr(e))
def decode_key(b) -> int:
"""
Decode the message key (encoded with little endian)
"""
try:
decoded = int.from_bytes(b, "little", signed=False)
except Exception as e:
raise UnicodeDecodeError(f"Error while decoding message key (SessionID) from {b}\n{e}")
return decoded
async def main():
await pg_client.init()
DATABASE = config('CLOUD_SERVICE')
EVENT_TYPE = config('EVENT_TYPE')
db = DBConnection(DATABASE)
upload_rate = config('upload_rate', default=30, cast=int)
if EVENT_TYPE == 'detailed':
table_name = events_detailed_table_name
elif EVENT_TYPE == 'normal':
table_name = events_table_name
batch = []
sessions = defaultdict(lambda: None)
sessions_batch = []
sessions_events_selection = [1,25,28,29,30,31,32,54,56,62,69,78,125,126]
if EVENT_TYPE == 'normal':
selected_events = [21,22,25,27,64,78,125]
elif EVENT_TYPE == 'detailed':
selected_events = [1,4,21,22,25,27,31,32,39,48,59,64,69,78,125,126]
filter_events = list(set(sessions_events_selection+selected_events))
allowed_projects = config('PROJECT_IDS', default=None, cast=Csv(int))
project_filter = PF(allowed_projects)
codec = MessageCodec(filter_events)
ssl_protocol = config('KAFKA_USE_SSL', default=True, cast=bool)
consumer_settings = {
"bootstrap.servers": config('KAFKA_SERVERS'),
"group.id": f"connector_{DATABASE}",
"auto.offset.reset": "earliest",
"enable.auto.commit": False
}
if ssl_protocol:
consumer_settings['security.protocol'] = 'SSL'
consumer = Consumer(consumer_settings)
consumer.subscribe(config("TOPICS", default="saas-raw").split(','))
print("[INFO] Kafka consumer subscribed")
c_time = time()
read_msgs = 0
while True:
msg = consumer.poll(1.0)
process_message(msg, codec, sessions, batch, sessions_batch, sessions_events_selection, selected_events, EVENT_TYPE, project_filter)
read_msgs += 1
if time() - c_time > upload_rate:
print(f'[INFO] {read_msgs} kafka messages read in {upload_rate} seconds')
await insertBatch(deepcopy(sessions_batch), deepcopy(batch), db, sessions_table_name, table_name, EVENT_TYPE)
consumer.commit()
sessions_batch = []
batch = []
read_msgs = 0
c_time = time()
async def insertBatch(sessions_batch, batch, db, sessions_table_name, table_name, EVENT_TYPE):
t1 = time()
print(f'[BG-INFO] Number of events to add {len(batch)}, number of sessions to add {len(sessions_batch)}')
new_sessions = list()
updated_sessions = list()
for old_status, session_in_batch in sessions_batch:
if old_status == 'UPDATE':
updated_sessions.append(session_in_batch)
else:
new_sessions.append(session_in_batch)
print(f'[DEBUG] Number of new sessions {len(new_sessions)}, number of sessions to update {len(updated_sessions)}')
if new_sessions != []:
attempt_session_insert(new_sessions, db, sessions_table_name)
if updated_sessions != []:
attempt_session_update(updated_sessions, db, sessions_table_name)
# insert a batch of events
if batch != []:
attempt_batch_insert(batch, db, table_name, EVENT_TYPE)
print(f'[BG-INFO] Uploaded into S3 in {time()-t1} seconds')
if __name__ == '__main__':
asyncio.run(main())

View file

@ -3,10 +3,10 @@ from sqlalchemy import MetaData
from sqlalchemy.orm import sessionmaker, session from sqlalchemy.orm import sessionmaker, session
from contextlib import contextmanager from contextlib import contextmanager
import logging import logging
import os from decouple import config as _config
from pathlib import Path from pathlib import Path
DATABASE = os.environ['DATABASE_NAME'] DATABASE = _config('CLOUD_SERVICE')
if DATABASE == 'redshift': if DATABASE == 'redshift':
import pandas_redshift as pr import pandas_redshift as pr
@ -34,7 +34,7 @@ class DBConnection:
""" """
Initializes connection to a database Initializes connection to a database
To update models file use: To update models file use:
sqlacodegen --outfile models_universal.py mysql+pymysql://{user}:{pwd}@{address} sqlacodegen --outfile models_universal.py mysql+pymysql://{USER}:{pwd}@{HOST}
""" """
_sessions = sessionmaker() _sessions = sessionmaker()
@ -44,53 +44,67 @@ class DBConnection:
if config == 'redshift': if config == 'redshift':
self.pdredshift = pr self.pdredshift = pr
self.pdredshift.connect_to_redshift(dbname=os.environ['schema'], ci = _config('cluster_info', default='')
host=os.environ['address'], cluster_info = dict()
port=os.environ['port'], if ci == '':
user=os.environ['user'], cluster_info['USER'] = _config('USER')
password=os.environ['password']) cluster_info['HOST'] = _config('HOST')
cluster_info['PORT'] = _config('PORT')
cluster_info['PASSWORD'] = _config('PASSWORD')
cluster_info['DBNAME'] = _config('DBNAME')
else:
ci = ci.split(' ')
cluster_info = dict()
for _d in ci:
k,v = _d.split('=')
cluster_info[k]=v
self.pdredshift.connect_to_redshift(dbname=cluster_info['DBNAME'],
host=cluster_info['HOST'],
port=cluster_info['PORT'],
user=cluster_info['USER'],
password=cluster_info['PASSWORD'])
self.pdredshift.connect_to_s3(aws_access_key_id=os.environ['aws_access_key_id'], self.pdredshift.connect_to_s3(aws_access_key_id=_config('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.environ['aws_secret_access_key'], aws_secret_access_key=_config('AWS_SECRET_ACCESS_KEY'),
bucket=os.environ['bucket'], bucket=_config('BUCKET'),
subdirectory=os.environ['subdirectory']) subdirectory=_config('SUBDIRECTORY', default=None))
self.connect_str = os.environ['connect_str'].format( self.CONNECTION_STRING = _config('CONNECTION_STRING').format(
user=os.environ['user'], USER=cluster_info['USER'],
password=os.environ['password'], PASSWORD=cluster_info['PASSWORD'],
address=os.environ['address'], HOST=cluster_info['HOST'],
port=os.environ['port'], PORT=cluster_info['PORT'],
schema=os.environ['schema'] DBNAME=cluster_info['DBNAME']
) )
self.engine = create_engine(self.connect_str) self.engine = create_engine(self.CONNECTION_STRING)
elif config == 'clickhouse': elif config == 'clickhouse':
self.connect_str = os.environ['connect_str'].format( self.CONNECTION_STRING = _config('CONNECTION_STRING').format(
address=os.environ['address'], HOST=_config('HOST'),
database=os.environ['database'] DATABASE=_config('DATABASE')
) )
self.engine = create_engine(self.connect_str) self.engine = create_engine(self.CONNECTION_STRING)
elif config == 'pg': elif config == 'pg':
self.connect_str = os.environ['connect_str'].format( self.CONNECTION_STRING = _config('CONNECTION_STRING').format(
user=os.environ['user'], USER=_config('USER'),
password=os.environ['password'], PASSWORD=_config('PASSWORD'),
address=os.environ['address'], HOST=_config('HOST'),
port=os.environ['port'], PORT=_config('PORT'),
database=os.environ['database'] DATABASE=_config('DATABASE')
) )
self.engine = create_engine(self.connect_str) self.engine = create_engine(self.CONNECTION_STRING)
elif config == 'bigquery': elif config == 'bigquery':
pass pass
elif config == 'snowflake': elif config == 'snowflake':
self.connect_str = os.environ['connect_str'].format( self.CONNECTION_STRING = _config('CONNECTION_STRING').format(
user=os.environ['user'], USER=_config('USER'),
password=os.environ['password'], PASSWORD=_config('PASSWORD'),
account=os.environ['account'], ACCOUNT=_config('ACCOUNT'),
database=os.environ['database'], DATABASE=_config('DATABASE'),
schema = os.environ['schema'], DBNAME = _config('DBNAME'),
warehouse = os.environ['warehouse'] WAREHOUSE = _config('WAREHOUSE')
) )
self.engine = create_engine(self.connect_str) self.engine = create_engine(self.CONNECTION_STRING)
else: else:
raise ValueError("This db configuration doesn't exist. Add into keys file.") raise ValueError("This db configuration doesn't exist. Add into keys file.")
@ -128,3 +142,10 @@ class DBConnection:
my_session.close() my_session.close()
connection.close() connection.close()
def restart(self):
self.close()
self.__init__(config=self.config)
def close(self):
if self.config == 'redshift':
self.pdredshift.close_up_shop()

View file

@ -2,9 +2,9 @@
from sqlalchemy import BigInteger, Boolean, Column, Integer, ARRAY, VARCHAR, text, VARCHAR from sqlalchemy import BigInteger, Boolean, Column, Integer, ARRAY, VARCHAR, text, VARCHAR
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from pathlib import Path from pathlib import Path
import os from decouple import config
DATABASE = os.environ['DATABASE_NAME'] DATABASE = config('CLOUD_SERVICE')
Base = declarative_base() Base = declarative_base()
metadata = Base.metadata metadata = Base.metadata
@ -13,16 +13,16 @@ base_path = Path(__file__).parent.parent
# Get a table name from a configuration file # Get a table name from a configuration file
try: try:
events_table_name = os.environ['events_table_name'] events_table_name = config('EVENTS_TABLE_NAME', default='connector_events')
except KeyError as e: except KeyError as e:
events_table_name = None events_table_name = None
print(repr(e)) print(repr(e))
try: try:
events_detailed_table_name = os.environ['events_detailed_table_name'] events_detailed_table_name = config('EVENTS_DETAILED_TABLE_NAME', default='connector_events_detailed')
except KeyError as e: except KeyError as e:
print(repr(e)) print(repr(e))
events_detailed_table_name = None events_detailed_table_name = None
sessions_table_name = os.environ['sessions_table'] sessions_table_name = config('SESSIONS_TABLE', default='connector_user_sessions')
class Session(Base): class Session(Base):
@ -69,65 +69,42 @@ class Session(Base):
# ISSUES AND EVENTS # ISSUES AND EVENTS
js_exceptions_count = Column(BigInteger) js_exceptions_count = Column(BigInteger)
long_tasks_total_duration = Column(BigInteger) #long_tasks_total_duration = Column(BigInteger)
long_tasks_max_duration = Column(BigInteger) #long_tasks_max_duration = Column(BigInteger)
long_tasks_count = Column(BigInteger) #long_tasks_count = Column(BigInteger)
inputs_count = Column(BigInteger) inputs_count = Column(BigInteger)
clicks_count = Column(BigInteger) clicks_count = Column(BigInteger)
issues_count = Column(BigInteger) issues_count = Column(BigInteger)
issues = ARRAY(VARCHAR(5000))
urls_count = Column(BigInteger) urls_count = Column(BigInteger)
urls = ARRAY(VARCHAR(5000))
class Event(Base): class Event(Base):
__tablename__ = events_table_name __tablename__ = events_table_name
sessionid = Column(BigInteger, primary_key=True) sessionid = Column(BigInteger, primary_key=True)
connectioninformation_downlink = Column(BigInteger)
connectioninformation_type = Column(VARCHAR(5000))
consolelog_level = Column(VARCHAR(5000)) consolelog_level = Column(VARCHAR(5000))
consolelog_value = Column(VARCHAR(5000)) consolelog_value = Column(VARCHAR(5000))
customevent_messageid = Column(BigInteger)
customevent_name = Column(VARCHAR(5000)) customevent_name = Column(VARCHAR(5000))
customevent_payload = Column(VARCHAR(5000)) customevent_payload = Column(VARCHAR(5000))
customevent_timestamp = Column(BigInteger)
errorevent_message = Column(VARCHAR(5000))
errorevent_messageid = Column(BigInteger)
errorevent_name = Column(VARCHAR(5000))
errorevent_payload = Column(VARCHAR(5000))
errorevent_source = Column(VARCHAR(5000))
errorevent_timestamp = Column(BigInteger)
jsexception_message = Column(VARCHAR(5000)) jsexception_message = Column(VARCHAR(5000))
jsexception_name = Column(VARCHAR(5000)) jsexception_name = Column(VARCHAR(5000))
jsexception_payload = Column(VARCHAR(5000)) jsexception_payload = Column(VARCHAR(5000))
metadata_key = Column(VARCHAR(5000)) jsexception_metadata = Column(VARCHAR(5000))
metadata_value = Column(VARCHAR(5000)) networkrequest_type = Column(VARCHAR(5000))
mouseclick_id = Column(BigInteger) networkrequest_method = Column(VARCHAR(5000))
mouseclick_hesitationtime = Column(BigInteger) networkrequest_url = Column(VARCHAR(5000))
mouseclick_label = Column(VARCHAR(5000)) networkrequest_request = Column(VARCHAR(5000))
pageevent_firstcontentfulpaint = Column(BigInteger) networkrequest_response = Column(VARCHAR(5000))
pageevent_firstpaint = Column(BigInteger) networkrequest_status = Column(BigInteger)
pageevent_messageid = Column(BigInteger) networkrequest_timestamp = Column(BigInteger)
pageevent_referrer = Column(VARCHAR(5000)) networkrequest_duration = Column(BigInteger)
pageevent_speedindex = Column(BigInteger) issueevent_message_id = Column(BigInteger)
pageevent_timestamp = Column(BigInteger)
pageevent_url = Column(VARCHAR(5000))
pagerendertiming_timetointeractive = Column(BigInteger)
pagerendertiming_visuallycomplete = Column(BigInteger)
rawcustomevent_name = Column(VARCHAR(5000))
rawcustomevent_payload = Column(VARCHAR(5000))
setviewportsize_height = Column(BigInteger)
setviewportsize_width = Column(BigInteger)
timestamp_timestamp = Column(BigInteger)
user_anonymous_id = Column(VARCHAR(5000))
user_id = Column(VARCHAR(5000))
issueevent_messageid = Column(BigInteger)
issueevent_timestamp = Column(BigInteger) issueevent_timestamp = Column(BigInteger)
issueevent_type = Column(VARCHAR(5000)) issueevent_type = Column(VARCHAR(5000))
issueevent_contextstring = Column(VARCHAR(5000)) issueevent_context_string = Column(VARCHAR(5000))
issueevent_context = Column(VARCHAR(5000)) issueevent_context = Column(VARCHAR(5000))
issueevent_payload = Column(VARCHAR(5000)) issueevent_payload = Column(VARCHAR(5000))
issueevent_url = Column(VARCHAR(5000))
customissue_name = Column(VARCHAR(5000)) customissue_name = Column(VARCHAR(5000))
customissue_payload = Column(VARCHAR(5000)) customissue_payload = Column(VARCHAR(5000))
received_at = Column(BigInteger) received_at = Column(BigInteger)
@ -137,7 +114,6 @@ class Event(Base):
class DetailedEvent(Base): class DetailedEvent(Base):
__tablename__ = events_detailed_table_name __tablename__ = events_detailed_table_name
# id = Column(Integer, primary_key=True, server_default=text("\"identity\"(119029, 0, '0,1'::text)"))
sessionid = Column(BigInteger, primary_key=True) sessionid = Column(BigInteger, primary_key=True)
clickevent_hesitationtime = Column(BigInteger) clickevent_hesitationtime = Column(BigInteger)
clickevent_label = Column(VARCHAR(5000)) clickevent_label = Column(VARCHAR(5000))
@ -147,28 +123,8 @@ class DetailedEvent(Base):
connectioninformation_type = Column(VARCHAR(5000)) connectioninformation_type = Column(VARCHAR(5000))
consolelog_level = Column(VARCHAR(5000)) consolelog_level = Column(VARCHAR(5000))
consolelog_value = Column(VARCHAR(5000)) consolelog_value = Column(VARCHAR(5000))
cpuissue_duration = Column(BigInteger)
cpuissue_rate = Column(BigInteger)
cpuissue_timestamp = Column(BigInteger)
createdocument = Column(Boolean)
createelementnode_id = Column(BigInteger)
createelementnode_parentid = Column(BigInteger)
cssdeleterule_index = Column(BigInteger)
cssdeleterule_stylesheetid = Column(BigInteger)
cssinsertrule_index = Column(BigInteger)
cssinsertrule_rule = Column(VARCHAR(5000))
cssinsertrule_stylesheetid = Column(BigInteger)
customevent_messageid = Column(BigInteger)
customevent_name = Column(VARCHAR(5000)) customevent_name = Column(VARCHAR(5000))
customevent_payload = Column(VARCHAR(5000)) customevent_payload = Column(VARCHAR(5000))
customevent_timestamp = Column(BigInteger)
domdrop_timestamp = Column(BigInteger)
errorevent_message = Column(VARCHAR(5000))
errorevent_messageid = Column(BigInteger)
errorevent_name = Column(VARCHAR(5000))
errorevent_payload = Column(VARCHAR(5000))
errorevent_source = Column(VARCHAR(5000))
errorevent_timestamp = Column(BigInteger)
fetch_duration = Column(BigInteger) fetch_duration = Column(BigInteger)
fetch_method = Column(VARCHAR(5000)) fetch_method = Column(VARCHAR(5000))
fetch_request = Column(VARCHAR(5000)) fetch_request = Column(VARCHAR(5000))
@ -180,9 +136,6 @@ class DetailedEvent(Base):
graphql_operationname = Column(VARCHAR(5000)) graphql_operationname = Column(VARCHAR(5000))
graphql_response = Column(VARCHAR(5000)) graphql_response = Column(VARCHAR(5000))
graphql_variables = Column(VARCHAR(5000)) graphql_variables = Column(VARCHAR(5000))
graphqlevent_messageid = Column(BigInteger)
graphqlevent_name = Column(VARCHAR(5000))
graphqlevent_timestamp = Column(BigInteger)
inputevent_label = Column(VARCHAR(5000)) inputevent_label = Column(VARCHAR(5000))
inputevent_messageid = Column(BigInteger) inputevent_messageid = Column(BigInteger)
inputevent_timestamp = Column(BigInteger) inputevent_timestamp = Column(BigInteger)
@ -191,26 +144,18 @@ class DetailedEvent(Base):
jsexception_message = Column(VARCHAR(5000)) jsexception_message = Column(VARCHAR(5000))
jsexception_name = Column(VARCHAR(5000)) jsexception_name = Column(VARCHAR(5000))
jsexception_payload = Column(VARCHAR(5000)) jsexception_payload = Column(VARCHAR(5000))
memoryissue_duration = Column(BigInteger) jsexception_metadata = Column(VARCHAR(5000))
memoryissue_rate = Column(BigInteger)
memoryissue_timestamp = Column(BigInteger)
metadata_key = Column(VARCHAR(5000))
metadata_value = Column(VARCHAR(5000))
mobx_payload = Column(VARCHAR(5000))
mobx_type = Column(VARCHAR(5000))
mouseclick_id = Column(BigInteger) mouseclick_id = Column(BigInteger)
mouseclick_hesitationtime = Column(BigInteger) mouseclick_hesitationtime = Column(BigInteger)
mouseclick_label = Column(VARCHAR(5000)) mouseclick_label = Column(VARCHAR(5000))
mousemove_x = Column(BigInteger) networkrequest_type = Column(VARCHAR(5000))
mousemove_y = Column(BigInteger) networkrequest_method = Column(VARCHAR(5000))
movenode_id = Column(BigInteger) networkrequest_url = Column(VARCHAR(5000))
movenode_index = Column(BigInteger) networkrequest_request = Column(VARCHAR(5000))
movenode_parentid = Column(BigInteger) networkrequest_response = Column(VARCHAR(5000))
ngrx_action = Column(VARCHAR(5000)) networkrequest_status = Column(BigInteger)
ngrx_duration = Column(BigInteger) networkrequest_timestamp = Column(BigInteger)
ngrx_state = Column(VARCHAR(5000)) networkrequest_duration = Column(BigInteger)
otable_key = Column(VARCHAR(5000))
otable_value = Column(VARCHAR(5000))
pageevent_domcontentloadedeventend = Column(BigInteger) pageevent_domcontentloadedeventend = Column(BigInteger)
pageevent_domcontentloadedeventstart = Column(BigInteger) pageevent_domcontentloadedeventstart = Column(BigInteger)
pageevent_firstcontentfulpaint = Column(BigInteger) pageevent_firstcontentfulpaint = Column(BigInteger)
@ -226,77 +171,8 @@ class DetailedEvent(Base):
pageevent_speedindex = Column(BigInteger) pageevent_speedindex = Column(BigInteger)
pageevent_timestamp = Column(BigInteger) pageevent_timestamp = Column(BigInteger)
pageevent_url = Column(VARCHAR(5000)) pageevent_url = Column(VARCHAR(5000))
pageloadtiming_domcontentloadedeventend = Column(BigInteger)
pageloadtiming_domcontentloadedeventstart = Column(BigInteger)
pageloadtiming_firstcontentfulpaint = Column(BigInteger)
pageloadtiming_firstpaint = Column(BigInteger)
pageloadtiming_loadeventend = Column(BigInteger)
pageloadtiming_loadeventstart = Column(BigInteger)
pageloadtiming_requeststart = Column(BigInteger)
pageloadtiming_responseend = Column(BigInteger)
pageloadtiming_responsestart = Column(BigInteger)
pagerendertiming_speedindex = Column(BigInteger)
pagerendertiming_timetointeractive = Column(BigInteger)
pagerendertiming_visuallycomplete = Column(BigInteger)
performancetrack_frames = Column(BigInteger)
performancetrack_ticks = Column(BigInteger)
performancetrack_totaljsheapsize = Column(BigInteger)
performancetrack_usedjsheapsize = Column(BigInteger)
performancetrackaggr_avgcpu = Column(BigInteger)
performancetrackaggr_avgfps = Column(BigInteger)
performancetrackaggr_avgtotaljsheapsize = Column(BigInteger)
performancetrackaggr_avgusedjsheapsize = Column(BigInteger)
performancetrackaggr_maxcpu = Column(BigInteger)
performancetrackaggr_maxfps = Column(BigInteger)
performancetrackaggr_maxtotaljsheapsize = Column(BigInteger)
performancetrackaggr_maxusedjsheapsize = Column(BigInteger)
performancetrackaggr_mincpu = Column(BigInteger)
performancetrackaggr_minfps = Column(BigInteger)
performancetrackaggr_mintotaljsheapsize = Column(BigInteger)
performancetrackaggr_minusedjsheapsize = Column(BigInteger)
performancetrackaggr_timestampend = Column(BigInteger)
performancetrackaggr_timestampstart = Column(BigInteger)
profiler_args = Column(VARCHAR(5000))
profiler_duration = Column(BigInteger)
profiler_name = Column(VARCHAR(5000))
profiler_result = Column(VARCHAR(5000))
rawcustomevent_name = Column(VARCHAR(5000))
rawcustomevent_payload = Column(VARCHAR(5000))
rawerrorevent_message = Column(VARCHAR(5000))
rawerrorevent_name = Column(VARCHAR(5000))
rawerrorevent_payload = Column(VARCHAR(5000))
rawerrorevent_source = Column(VARCHAR(5000))
rawerrorevent_timestamp = Column(BigInteger)
redux_action = Column(VARCHAR(5000))
redux_duration = Column(BigInteger)
redux_state = Column(VARCHAR(5000))
removenode_id = Column(BigInteger)
removenodeattribute_id = Column(BigInteger)
removenodeattribute_name = Column(VARCHAR(5000))
resourceevent_decodedbodysize = Column(BigInteger)
resourceevent_duration = Column(BigInteger)
resourceevent_encodedbodysize = Column(BigInteger)
resourceevent_headersize = Column(BigInteger)
resourceevent_messageid = Column(BigInteger)
resourceevent_method = Column(VARCHAR(5000))
resourceevent_status = Column(BigInteger)
resourceevent_success = Column(Boolean)
resourceevent_timestamp = Column(BigInteger)
resourceevent_ttfb = Column(BigInteger)
resourceevent_type = Column(VARCHAR(5000))
resourceevent_url = Column(VARCHAR(5000))
resourcetiming_decodedbodysize = Column(BigInteger)
resourcetiming_duration = Column(BigInteger)
resourcetiming_encodedbodysize = Column(BigInteger)
resourcetiming_headersize = Column(BigInteger)
resourcetiming_initiator = Column(VARCHAR(5000))
resourcetiming_timestamp = Column(BigInteger)
resourcetiming_ttfb = Column(BigInteger)
resourcetiming_url = Column(VARCHAR(5000))
sessiondisconnect = Column(Boolean)
sessiondisconnect_timestamp = Column(BigInteger)
sessionend = Column(Boolean)
sessionend_timestamp = Column(BigInteger) sessionend_timestamp = Column(BigInteger)
sessionend_encryption_key = Column(VARCHAR(5000))
sessionstart_projectid = Column(BigInteger) sessionstart_projectid = Column(BigInteger)
sessionstart_revid = Column(VARCHAR(5000)) sessionstart_revid = Column(VARCHAR(5000))
sessionstart_timestamp = Column(BigInteger) sessionstart_timestamp = Column(BigInteger)
@ -312,65 +188,18 @@ class DetailedEvent(Base):
sessionstart_useros = Column(VARCHAR(5000)) sessionstart_useros = Column(VARCHAR(5000))
sessionstart_userosversion = Column(VARCHAR(5000)) sessionstart_userosversion = Column(VARCHAR(5000))
sessionstart_useruuid = Column(VARCHAR(5000)) sessionstart_useruuid = Column(VARCHAR(5000))
setcssdata_data = Column(BigInteger)
setcssdata_id = Column(BigInteger)
setinputchecked_checked = Column(BigInteger)
setinputchecked_id = Column(BigInteger)
setinputtarget_id = Column(BigInteger)
setinputtarget_label = Column(BigInteger)
setinputvalue_id = Column(BigInteger)
setinputvalue_mask = Column(BigInteger)
setinputvalue_value = Column(BigInteger)
setnodeattribute_id = Column(BigInteger)
setnodeattribute_name = Column(BigInteger)
setnodeattribute_value = Column(BigInteger)
setnodedata_data = Column(BigInteger)
setnodedata_id = Column(BigInteger)
setnodescroll_id = Column(BigInteger)
setnodescroll_x = Column(BigInteger)
setnodescroll_y = Column(BigInteger)
setpagelocation_navigationstart = Column(BigInteger) setpagelocation_navigationstart = Column(BigInteger)
setpagelocation_referrer = Column(VARCHAR(5000)) setpagelocation_referrer = Column(VARCHAR(5000))
setpagelocation_url = Column(VARCHAR(5000)) setpagelocation_url = Column(VARCHAR(5000))
setpagevisibility_hidden = Column(Boolean) issueevent_message_id = Column(BigInteger)
setviewportscroll_x = Column(BigInteger)
setviewportscroll_y = Column(BigInteger)
setviewportsize_height = Column(BigInteger)
setviewportsize_width = Column(BigInteger)
stateaction_type = Column(VARCHAR(5000))
stateactionevent_messageid = Column(BigInteger)
stateactionevent_timestamp = Column(BigInteger)
stateactionevent_type = Column(VARCHAR(5000))
timestamp_timestamp = Column(BigInteger)
useranonymousid_id = Column(VARCHAR(5000))
userid_id = Column(VARCHAR(5000))
vuex_mutation = Column(VARCHAR(5000))
vuex_state = Column(VARCHAR(5000))
longtasks_timestamp = Column(BigInteger)
longtasks_duration = Column(BigInteger)
longtasks_context = Column(BigInteger)
longtasks_containertype = Column(BigInteger)
longtasks_containersrc = Column(VARCHAR(5000))
longtasks_containerid = Column(VARCHAR(5000))
longtasks_containername = Column(VARCHAR(5000))
setnodeurlbasedattribute_id = Column(BigInteger)
setnodeurlbasedattribute_name = Column(VARCHAR(5000))
setnodeurlbasedattribute_value = Column(VARCHAR(5000))
setnodeurlbasedattribute_baseurl = Column(VARCHAR(5000))
setstyledata_id = Column(BigInteger)
setstyledata_data = Column(VARCHAR(5000))
setstyledata_baseurl = Column(VARCHAR(5000))
issueevent_messageid = Column(BigInteger)
issueevent_timestamp = Column(BigInteger) issueevent_timestamp = Column(BigInteger)
issueevent_type = Column(VARCHAR(5000)) issueevent_type = Column(VARCHAR(5000))
issueevent_contextstring = Column(VARCHAR(5000)) issueevent_context_string = Column(VARCHAR(5000))
issueevent_context = Column(VARCHAR(5000)) issueevent_context = Column(VARCHAR(5000))
issueevent_payload = Column(VARCHAR(5000)) issueevent_payload = Column(VARCHAR(5000))
technicalinfo_type = Column(VARCHAR(5000)) issueevent_url = Column(VARCHAR(5000))
technicalinfo_value = Column(VARCHAR(5000))
customissue_name = Column(VARCHAR(5000)) customissue_name = Column(VARCHAR(5000))
customissue_payload = Column(VARCHAR(5000)) customissue_payload = Column(VARCHAR(5000))
pageclose = Column(Boolean)
received_at = Column(BigInteger) received_at = Column(BigInteger)
batch_order_number = Column(BigInteger) batch_order_number = Column(BigInteger)

View file

@ -1,86 +1,107 @@
from pathlib import Path from pathlib import Path
from decouple import config
base_path = Path(__file__).parent.parent base_path = Path(__file__).parent.parent
EVENT_TYPE = config('EVENT_TYPE', default='normal')
def create_tables_clickhouse(db): def create_tables_clickhouse(db):
with open(base_path / 'sql' / 'clickhouse_events.sql') as f: if EVENT_TYPE == 'normal':
q = f.read() with open(base_path / 'sql' / 'clickhouse_events.sql', 'r') as f:
db.engine.execute(q) q = f.read()
print(f"`connector_user_events` table created succesfully.") with db.get_live_session() as conn:
conn.execute(q)
print(f"`connector_user_events` table created succesfully.")
with open(base_path / 'sql' / 'clickhouse_events_buffer.sql') as f: with open(base_path / 'sql' / 'clickhouse_events_buffer.sql', 'r') as f:
q = f.read() q = f.read()
db.engine.execute(q) with db.get_live_session() as conn:
print(f"`connector_user_events_buffer` table created succesfully.") conn.execute(q)
print(f"`connector_user_events_buffer` table created succesfully.")
with open(base_path / 'sql' / 'clickhouse_sessions.sql') as f: with open(base_path / 'sql' / 'clickhouse_sessions.sql', 'r') as f:
q = f.read() q = f.read()
db.engine.execute(q) with db.get_live_session() as conn:
conn.execute(q)
print(f"`connector_sessions` table created succesfully.") print(f"`connector_sessions` table created succesfully.")
with open(base_path / 'sql' / 'clickhouse_sessions_buffer.sql') as f: with open(base_path / 'sql' / 'clickhouse_sessions_buffer.sql', 'r') as f:
q = f.read() q = f.read()
db.engine.execute(q) with db.get_live_session() as conn:
conn.execute(q)
print(f"`connector_sessions_buffer` table created succesfully.") print(f"`connector_sessions_buffer` table created succesfully.")
#with open(base_path / 'sql' / 'clickhouse_events_detailed.sql') as f: if EVENT_TYPE == 'detailed':
# q = f.read() with open(base_path / 'sql' / 'clickhouse_events_detailed.sql') as f:
#db.engine.execute(q) q = f.read()
#print(f"`connector_user_events_detailed` table created succesfully.") with db.get_live_session() as conn: conn.execute(q)
print(f"`connector_user_events_detailed` table created succesfully.")
#with open(base_path / 'sql' / 'clickhouse_events_detailed_buffer.sql') as f:
# q = f.read() with open(base_path / 'sql' / 'clickhouse_events_detailed_buffer.sql') as f:
#db.engine.execute(q) q = f.read()
#print(f"`connector_user_events_detailed_buffer` table created succesfully.") with db.get_live_session() as conn: conn.execute(q)
print(f"`connector_user_events_detailed_buffer` table created succesfully.")
def create_tables_postgres(db): def create_tables_postgres(db):
with open(base_path / 'sql' / 'postgres_events.sql') as f: if EVENT_TYPE == 'normal':
q = f.read() with open(base_path / 'sql' / 'postgres_events.sql', 'r') as f:
db.engine.execute(q) q = f.read()
print(f"`connector_user_events` table created succesfully.") with db.get_live_session() as conn:
conn.execute(q)
print(f"`connector_user_events` table created succesfully.")
with open(base_path / 'sql' / 'postgres_sessions.sql') as f: with open(base_path / 'sql' / 'postgres_sessions.sql', 'r') as f:
q = f.read() q = f.read()
db.engine.execute(q) with db.get_live_session() as conn:
conn.execute(q)
print(f"`connector_sessions` table created succesfully.") print(f"`connector_sessions` table created succesfully.")
#with open(base_path / 'sql' / 'postgres_events_detailed.sql') as f: if EVENT_TYPE == 'detailed':
# q = f.read() with open(base_path / 'sql' / 'postgres_events_detailed.sql') as f:
#db.engine.execute(q) q = f.read()
#print(f"`connector_user_events_detailed` table created succesfully.") with db.get_live_session() as conn: conn.execute(q)
print(f"`connector_user_events_detailed` table created succesfully.")
def create_tables_snowflake(db): def create_tables_snowflake(db):
with open(base_path / 'sql' / 'snowflake_events.sql') as f:
q = f.read()
db.engine.execute(q)
print(f"`connector_user_events` table created succesfully.")
with open(base_path / 'sql' / 'snowflake_sessions.sql') as f: if EVENT_TYPE == 'normal':
with open(base_path / 'sql' / 'snowflake_events.sql', 'r') as f:
q = f.read()
with db.get_live_session() as conn:
conn.execute(q)
print(f"`connector_user_events` table created succesfully.")
with open(base_path / 'sql' / 'snowflake_sessions.sql', 'r') as f:
q = f.read() q = f.read()
db.engine.execute(q) with db.get_live_session() as conn:
conn.execute(q)
print(f"`connector_sessions` table created succesfully.") print(f"`connector_sessions` table created succesfully.")
#with open(base_path / 'sql' / 'snowflake_events_detailed.sql') as f: if EVENT_TYPE == 'detailed':
# q = f.read() with open(base_path / 'sql' / 'snowflake_events_detailed.sql') as f:
#db.engine.execute(q) q = f.read()
#print(f"`connector_user_events_detailed` table created succesfully.") with db.get_live_session() as conn: conn.execute(q)
print(f"`connector_user_events_detailed` table created succesfully.")
def create_tables_redshift(db): def create_tables_redshift(db):
with open(base_path / 'sql' / 'redshift_events.sql') as f: if EVENT_TYPE == 'normal':
q = f.read() with open(base_path / 'sql' / 'redshift_events.sql', 'r') as f:
db.engine.execute(q) q = f.read()
print(f"`connector_user_events` table created succesfully.") with db.get_live_session() as conn:
conn.execute(q)
print(f"`connector_user_events` table created succesfully.")
with open(base_path / 'sql' / 'redshift_sessions.sql') as f: with open(base_path / 'sql' / 'redshift_sessions.sql', 'r') as f:
q = f.read() q = f.read()
db.engine.execute(q) with db.get_live_session() as conn:
conn.execute(q)
print(f"`connector_sessions` table created succesfully.") print(f"`connector_sessions` table created succesfully.")
#with open(base_path / 'sql' / 'redshift_events_detailed.sql') as f: if EVENT_TYPE == 'detailed':
# q = f.read() with open(base_path / 'sql' / 'redshift_events_detailed.sql') as f:
#db.engine.execute(q) q = f.read()
#print(f"`connector_user_events_detailed` table created succesfully.") with db.get_live_session() as conn: conn.execute(q)
print(f"`connector_user_events_detailed` table created succesfully.")

View file

@ -1,334 +1,167 @@
import pandas as pd import pandas as pd
from db.models import DetailedEvent, Event, Session, DATABASE from db.models import DetailedEvent, Event, Session, DATABASE
dtypes_events = {'sessionid': "Int64", dtypes_events = {
'connectioninformation_downlink': "Int64", 'sessionid': "Int64",
'connectioninformation_type': "string", 'consolelog_level': "string",
'consolelog_level': "string", 'consolelog_value': "string",
'consolelog_value': "string", 'customevent_name': "string",
'customevent_messageid': "Int64", 'customevent_payload': "string",
'customevent_name': "string", 'jsexception_message': "string",
'customevent_payload': "string", 'jsexception_name': "string",
'customevent_timestamp': "Int64", 'jsexception_payload': "string",
'errorevent_message': "string", 'jsexception_metadata': "string",
'errorevent_messageid': "Int64", 'networkrequest_type': "string",
'errorevent_name': "string", 'networkrequest_method': "string",
'errorevent_payload': "string", 'networkrequest_url': "string",
'errorevent_source': "string", 'networkrequest_request': "string",
'errorevent_timestamp': "Int64", 'networkrequest_response': "string",
'jsexception_message': "string", 'networkrequest_status': "Int64",
'jsexception_name': "string", 'networkrequest_timestamp': "Int64",
'jsexception_payload': "string", 'networkrequest_duration': "Int64",
'metadata_key': "string", 'issueevent_message_id': "Int64",
'metadata_value': "string", 'issueevent_timestamp': "Int64",
'mouseclick_id': "Int64", 'issueevent_type': "string",
'mouseclick_hesitationtime': "Int64", 'issueevent_context_string': "string",
'mouseclick_label': "string", 'issueevent_context': "string",
'pageevent_firstcontentfulpaint': "Int64", 'issueevent_url': "string",
'pageevent_firstpaint': "Int64", 'issueevent_payload': "string",
'pageevent_messageid': "Int64", 'customissue_name': "string",
'pageevent_referrer': "string", 'customissue_payload': "string",
'pageevent_speedindex': "Int64", 'received_at': "Int64",
'pageevent_timestamp': "Int64", 'batch_order_number': "Int64"}
'pageevent_url': "string",
'pagerendertiming_timetointeractive': "Int64",
'pagerendertiming_visuallycomplete': "Int64",
'rawcustomevent_name': "string",
'rawcustomevent_payload': "string",
'setviewportsize_height': "Int64",
'setviewportsize_width': "Int64",
'timestamp_timestamp': "Int64",
'user_anonymous_id': "string",
'user_id': "string",
'issueevent_messageid': "Int64",
'issueevent_timestamp': "Int64",
'issueevent_type': "string",
'issueevent_contextstring': "string",
'issueevent_context': "string",
'issueevent_payload': "string",
'customissue_name': "string",
'customissue_payload': "string",
'received_at': "Int64",
'batch_order_number': "Int64"}
dtypes_detailed_events = { dtypes_detailed_events = {
"sessionid": "Int64", "sessionid": "Int64",
"clickevent_hesitationtime": "Int64", "clickevent_hesitationtime": "Int64",
"clickevent_label": "object", "clickevent_label": "string",
"clickevent_messageid": "Int64", "clickevent_messageid": "Int64",
"clickevent_timestamp": "Int64", "clickevent_timestamp": "Int64",
"connectioninformation_downlink": "Int64", "connectioninformation_downlink": "Int64",
"connectioninformation_type": "object", "connectioninformation_type": "string",
"consolelog_level": "object", "consolelog_level": "string",
"consolelog_value": "object", "consolelog_value": "string",
"cpuissue_duration": "Int64", "customevent_name": "string",
"cpuissue_rate": "Int64", "customevent_payload": "string",
"cpuissue_timestamp": "Int64", "fetch_duration": "Int64",
"createdocument": "boolean", "fetch_method": "string",
"createelementnode_id": "Int64", "fetch_request": "string",
"createelementnode_parentid": "Int64", "fetch_response": "string",
"cssdeleterule_index": "Int64", "fetch_status": "Int64",
"cssdeleterule_stylesheetid": "Int64", "fetch_timestamp": "Int64",
"cssinsertrule_index": "Int64", "fetch_url": "string",
"cssinsertrule_rule": "object", "graphql_operationkind": "string",
"cssinsertrule_stylesheetid": "Int64", "graphql_operationname": "string",
"customevent_messageid": "Int64", "graphql_response": "string",
"customevent_name": "object", "graphql_variables": "string",
"customevent_payload": "object", "inputevent_label": "string",
"customevent_timestamp": "Int64", "inputevent_messageid": "Int64",
"domdrop_timestamp": "Int64", "inputevent_timestamp": "Int64",
"errorevent_message": "object", "inputevent_value": "string",
"errorevent_messageid": "Int64", "inputevent_valuemasked": "boolean",
"errorevent_name": "object", "jsexception_message": "string",
"errorevent_payload": "object", "jsexception_name": "string",
"errorevent_source": "object", "jsexception_payload": "string",
"errorevent_timestamp": "Int64", "jsexception_metadata": "string",
"fetch_duration": "Int64", "mouseclick_id": "Int64",
"fetch_method": "object", "mouseclick_hesitationtime": "Int64",
"fetch_request": "object", "mouseclick_label": "string",
"fetch_response": "object", "networkrequest_type": "string",
"fetch_status": "Int64", "networkrequest_method": "string",
"fetch_timestamp": "Int64", "networkrequest_url": "string",
"fetch_url": "object", "networkrequest_request": "string",
"graphql_operationkind": "object", "networkrequest_response": "string",
"graphql_operationname": "object", "networkrequest_status": "Int64",
"graphql_response": "object", "networkrequest_timestamp": "Int64",
"graphql_variables": "object", "networkrequest_duration": "Int64",
"graphqlevent_messageid": "Int64", "pageevent_domcontentloadedeventend": "Int64",
"graphqlevent_name": "object", "pageevent_domcontentloadedeventstart": "Int64",
"graphqlevent_timestamp": "Int64", "pageevent_firstcontentfulpaint": "Int64",
"inputevent_label": "object", "pageevent_firstpaint": "Int64",
"inputevent_messageid": "Int64", "pageevent_loaded": "boolean",
"inputevent_timestamp": "Int64", "pageevent_loadeventend": "Int64",
"inputevent_value": "object", "pageevent_loadeventstart": "Int64",
"inputevent_valuemasked": "boolean", "pageevent_messageid": "Int64",
"jsexception_message": "object", "pageevent_referrer": "string",
"jsexception_name": "object", "pageevent_requeststart": "Int64",
"jsexception_payload": "object", "pageevent_responseend": "Int64",
"longtasks_timestamp": "Int64", "pageevent_responsestart": "Int64",
"longtasks_duration": "Int64", "pageevent_speedindex": "Int64",
"longtasks_containerid": "object", "pageevent_timestamp": "Int64",
"longtasks_containersrc": "object", "pageevent_url": "string",
"memoryissue_duration": "Int64", "sessionend_timestamp": "Int64",
"memoryissue_rate": "Int64", "sessionend_encryption_key": "string",
"memoryissue_timestamp": "Int64", "sessionstart_projectid": "Int64",
"metadata_key": "object", "sessionstart_revid": "string",
"metadata_value": "object", "sessionstart_timestamp": "Int64",
"mobx_payload": "object", "sessionstart_trackerversion": "string",
"mobx_type": "object", "sessionstart_useragent": "string",
"mouseclick_id": "Int64", "sessionstart_userbrowser": "string",
"mouseclick_hesitationtime": "Int64", "sessionstart_userbrowserversion": "string",
"mouseclick_label": "object", "sessionstart_usercountry": "string",
"mousemove_x": "Int64", "sessionstart_userdevice": "string",
"mousemove_y": "Int64", "sessionstart_userdeviceheapsize": "Int64",
"movenode_id": "Int64", "sessionstart_userdevicememorysize": "Int64",
"movenode_index": "Int64", "sessionstart_userdevicetype": "string",
"movenode_parentid": "Int64", "sessionstart_useros": "string",
"ngrx_action": "object", "sessionstart_userosversion": "string",
"ngrx_duration": "Int64", "sessionstart_useruuid": "string",
"ngrx_state": "object", "setpagelocation_navigationstart": "Int64",
"otable_key": "object", "setpagelocation_referrer": "string",
"otable_value": "object", "setpagelocation_url": "string",
"pageevent_domcontentloadedeventend": "Int64", "issueevent_message_id": "Int64",
"pageevent_domcontentloadedeventstart": "Int64", "issueevent_timestamp": "Int64",
"pageevent_firstcontentfulpaint": "Int64", "issueevent_type": "string",
"pageevent_firstpaint": "Int64", "issueevent_context_string": "string",
"pageevent_loaded": "boolean", "issueevent_context": "string",
"pageevent_loadeventend": "Int64", "issueevent_payload": "string",
"pageevent_loadeventstart": "Int64", "issueevent_url": "string",
"pageevent_messageid": "Int64", "customissue_name": "string",
"pageevent_referrer": "object", "customissue_payload": "string",
"pageevent_requeststart": "Int64", "received_at": "Int64",
"pageevent_responseend": "Int64", "batch_order_number": "Int64",
"pageevent_responsestart": "Int64",
"pageevent_speedindex": "Int64",
"pageevent_timestamp": "Int64",
"pageevent_url": "object",
"pageloadtiming_domcontentloadedeventend": "Int64",
"pageloadtiming_domcontentloadedeventstart": "Int64",
"pageloadtiming_firstcontentfulpaint": "Int64",
"pageloadtiming_firstpaint": "Int64",
"pageloadtiming_loadeventend": "Int64",
"pageloadtiming_loadeventstart": "Int64",
"pageloadtiming_requeststart": "Int64",
"pageloadtiming_responseend": "Int64",
"pageloadtiming_responsestart": "Int64",
"pagerendertiming_speedindex": "Int64",
"pagerendertiming_timetointeractive": "Int64",
"pagerendertiming_visuallycomplete": "Int64",
"performancetrack_frames": "Int64",
"performancetrack_ticks": "Int64",
"performancetrack_totaljsheapsize": "Int64",
"performancetrack_usedjsheapsize": "Int64",
"performancetrackaggr_avgcpu": "Int64",
"performancetrackaggr_avgfps": "Int64",
"performancetrackaggr_avgtotaljsheapsize": "Int64",
"performancetrackaggr_avgusedjsheapsize": "Int64",
"performancetrackaggr_maxcpu": "Int64",
"performancetrackaggr_maxfps": "Int64",
"performancetrackaggr_maxtotaljsheapsize": "Int64",
"performancetrackaggr_maxusedjsheapsize": "Int64",
"performancetrackaggr_mincpu": "Int64",
"performancetrackaggr_minfps": "Int64",
"performancetrackaggr_mintotaljsheapsize": "Int64",
"performancetrackaggr_minusedjsheapsize": "Int64",
"performancetrackaggr_timestampend": "Int64",
"performancetrackaggr_timestampstart": "Int64",
"profiler_args": "object",
"profiler_duration": "Int64",
"profiler_name": "object",
"profiler_result": "object",
"rawcustomevent_name": "object",
"rawcustomevent_payload": "object",
"rawerrorevent_message": "object",
"rawerrorevent_name": "object",
"rawerrorevent_payload": "object",
"rawerrorevent_source": "object",
"rawerrorevent_timestamp": "Int64",
"redux_action": "object",
"redux_duration": "Int64",
"redux_state": "object",
"removenode_id": "Int64",
"removenodeattribute_id": "Int64",
"removenodeattribute_name": "object",
"resourceevent_decodedbodysize": "Int64",
"resourceevent_duration": "Int64",
"resourceevent_encodedbodysize": "Int64",
"resourceevent_headersize": "Int64",
"resourceevent_messageid": "Int64",
"resourceevent_method": "object",
"resourceevent_status": "Int64",
"resourceevent_success": "boolean",
"resourceevent_timestamp": "Int64",
"resourceevent_ttfb": "Int64",
"resourceevent_type": "object",
"resourceevent_url": "object",
"resourcetiming_decodedbodysize": "Int64",
"resourcetiming_duration": "Int64",
"resourcetiming_encodedbodysize": "Int64",
"resourcetiming_headersize": "Int64",
"resourcetiming_initiator": "object",
"resourcetiming_timestamp": "Int64",
"resourcetiming_ttfb": "Int64",
"resourcetiming_url": "object",
"sessiondisconnect": "boolean",
"sessiondisconnect_timestamp": "Int64",
"sessionend": "boolean",
"sessionend_timestamp": "Int64",
"sessionstart_projectid": "Int64",
"sessionstart_revid": "object",
"sessionstart_timestamp": "Int64",
"sessionstart_trackerversion": "object",
"sessionstart_useragent": "object",
"sessionstart_userbrowser": "object",
"sessionstart_userbrowserversion": "object",
"sessionstart_usercountry": "object",
"sessionstart_userdevice": "object",
"sessionstart_userdeviceheapsize": "Int64",
"sessionstart_userdevicememorysize": "Int64",
"sessionstart_userdevicetype": "object",
"sessionstart_useros": "object",
"sessionstart_userosversion": "object",
"sessionstart_useruuid": "object",
"setcssdata_data": "Int64",
"setcssdata_id": "Int64",
"setinputchecked_checked": "Int64",
"setinputchecked_id": "Int64",
"setinputtarget_id": "Int64",
"setinputtarget_label": "Int64",
"setinputvalue_id": "Int64",
"setinputvalue_mask": "Int64",
"setinputvalue_value": "Int64",
"setnodeattribute_id": "Int64",
"setnodeattribute_name": "Int64",
"setnodeattribute_value": "Int64",
"setnodedata_data": "Int64",
"setnodedata_id": "Int64",
"setnodescroll_id": "Int64",
"setnodescroll_x": "Int64",
"setnodescroll_y": "Int64",
"setpagelocation_navigationstart": "Int64",
"setpagelocation_referrer": "object",
"setpagelocation_url": "object",
"setpagevisibility_hidden": "boolean",
"setviewportscroll_x": "Int64",
"setviewportscroll_y": "Int64",
"setviewportsize_height": "Int64",
"setviewportsize_width": "Int64",
"stateaction_type": "object",
"stateactionevent_messageid": "Int64",
"stateactionevent_timestamp": "Int64",
"stateactionevent_type": "object",
"timestamp_timestamp": "Int64",
"useranonymousid_id": "object",
"userid_id": "object",
"vuex_mutation": "object",
"vuex_state": "string",
"received_at": "Int64",
"batch_order_number": "Int64",
#NEW
'setnodeurlbasedattribute_id': 'Int64',
'setnodeurlbasedattribute_name': 'string',
'setnodeurlbasedattribute_value': 'string',
'setnodeurlbasedattribute_baseurl': 'string',
'setstyledata_id': 'Int64',
'setstyledata_data': 'string',
'setstyledata_baseurl': 'string',
'customissue_payload': 'string',
'customissue_name': 'string',
'technicalinfo_value': 'string',
'technicalinfo_type': 'string',
'issueevent_payload': 'string',
'issueevent_context': 'string',
'issueevent_contextstring': 'string',
'issueevent_type': 'string'
} }
dtypes_sessions = {'sessionid': 'Int64', dtypes_sessions = {'sessionid': "Int64",
'user_agent': 'string', 'user_agent': "string",
'user_browser': 'string', 'user_browser': "string",
'user_browser_version': 'string', 'user_browser_version': "string",
'user_country': 'string', 'user_country': "string",
'user_device': 'string', 'user_device': "string",
'user_device_heap_size': 'Int64', 'user_device_heap_size': "Int64",
'user_device_memory_size': 'Int64', 'user_device_memory_size': "Int64",
'user_device_type': 'string', 'user_device_type': "string",
'user_os': 'string', 'user_os': "string",
'user_os_version': 'string', 'user_os_version': "string",
'user_uuid': 'string', 'user_uuid': "string",
'connection_effective_bandwidth': 'Int64', 'connection_effective_bandwidth': "Int64",
'connection_type': 'string', 'connection_type': "string",
'metadata_key': 'string', 'metadata_key': "string",
'metadata_value': 'string', 'metadata_value': "string",
'referrer': 'string', 'referrer': "string",
'user_anonymous_id': 'string', 'user_anonymous_id': "string",
'user_id': 'string', 'user_id': "string",
'session_start_timestamp': 'Int64', 'session_start_timestamp': "Int64",
'session_end_timestamp': 'Int64', 'session_end_timestamp': "Int64",
'session_duration': 'Int64', 'session_duration': "Int64",
'first_contentful_paint': 'Int64', 'first_contentful_paint': "Int64",
'speed_index': 'Int64', 'speed_index': "Int64",
'visually_complete': 'Int64', 'visually_complete': "Int64",
'timing_time_to_interactive': 'Int64', 'timing_time_to_interactive': "Int64",
'avg_cpu': 'Int64', 'avg_cpu': "Int64",
'avg_fps': 'Int64', 'avg_fps': "Int64",
'max_cpu': 'Int64', 'max_cpu': "Int64",
'max_fps': 'Int64', 'max_fps': "Int64",
'max_total_js_heap_size': 'Int64', 'max_total_js_heap_size': "Int64",
'max_used_js_heap_size': 'Int64', 'max_used_js_heap_size': "Int64",
'js_exceptions_count': 'Int64', 'js_exceptions_count': "Int64",
'long_tasks_total_duration': 'Int64', 'inputs_count': "Int64",
'long_tasks_max_duration': 'Int64', 'clicks_count': "Int64",
'long_tasks_count': 'Int64', 'issues_count': "Int64",
'inputs_count': 'Int64', 'urls_count': "Int64",
'clicks_count': 'Int64', }
'issues_count': 'Int64',
'issues': 'object',
'urls_count': 'Int64',
'urls': 'object'}
if DATABASE == 'bigquery': if DATABASE == 'bigquery':
dtypes_sessions['urls'] = 'string' dtypes_sessions['urls'] = "string"
dtypes_sessions['issues'] = 'string' dtypes_sessions['issues'] = "string"
detailed_events_col = [] detailed_events_col = []
for col in DetailedEvent.__dict__: for col in DetailedEvent.__dict__:
@ -360,13 +193,19 @@ def get_df_from_batch(batch, level):
pass pass
if level == 'normal': if level == 'normal':
df = df.astype(dtypes_events) current_types = dtypes_events
if level == 'detailed': if level == 'detailed':
current_types = dtypes_detailed_events
df['inputevent_value'] = None df['inputevent_value'] = None
df['customevent_payload'] = None df['customevent_payload'] = None
df = df.astype(dtypes_detailed_events)
if level == 'sessions': if level == 'sessions':
df = df.astype(dtypes_sessions) current_types = dtypes_sessions
df['js_exceptions_count'] = df['js_exceptions_count'].fillna(0)
df['inputs_count'] = df['inputs_count'].fillna(0)
df['clicks_count'] = df['clicks_count'].fillna(0)
df['issues_count'] = df['issues_count'].fillna(0)
df['urls_count'] = df['urls_count'].fillna(0)
df = df.astype(current_types)
if DATABASE == 'clickhouse' and level == 'sessions': if DATABASE == 'clickhouse' and level == 'sessions':
df['issues'] = df['issues'].fillna('') df['issues'] = df['issues'].fillna('')
@ -374,7 +213,8 @@ def get_df_from_batch(batch, level):
for x in df.columns: for x in df.columns:
try: try:
if df[x].dtype == 'string': if df[x].dtype == "string" or current_types[x] == "string":
df[x] = df[x].fillna('NULL')
df[x] = df[x].str.slice(0, 255) df[x] = df[x].str.slice(0, 255)
df[x] = df[x].str.replace("|", "") df[x] = df[x].str.replace("|", "")
except TypeError as e: except TypeError as e:

View file

@ -1,13 +1,14 @@
import os from decouple import config
DATABASE = os.environ['DATABASE_NAME'] DATABASE = config('CLOUD_SERVICE')
from db.api import DBConnection from db.api import DBConnection
from db.utils import get_df_from_batch from db.utils import get_df_from_batch, dtypes_sessions
from db.tables import * from db.tables import *
if DATABASE == 'redshift': if DATABASE == 'redshift':
from db.loaders.redshift_loader import transit_insert_to_redshift from db.loaders.redshift_loader import transit_insert_to_redshift
import pandas as pd
elif DATABASE == 'clickhouse': elif DATABASE == 'clickhouse':
from db.loaders.clickhouse_loader import insert_to_clickhouse from db.loaders.clickhouse_loader import insert_to_clickhouse
elif DATABASE == 'pg': elif DATABASE == 'pg':
@ -21,23 +22,25 @@ else:
raise Exception(f"{DATABASE}-database not supported") raise Exception(f"{DATABASE}-database not supported")
# create tables if don't exist # create tables if don't exist
try: _build_tables = config('build_tables', default=False, cast=bool)
db = DBConnection(DATABASE) if _build_tables:
if DATABASE == 'pg': try:
create_tables_postgres(db) db = DBConnection(DATABASE)
if DATABASE == 'clickhouse': if DATABASE == 'pg':
create_tables_clickhouse(db) create_tables_postgres(db)
if DATABASE == 'snowflake': if DATABASE == 'clickhouse':
create_tables_snowflake(db) create_tables_clickhouse(db)
if DATABASE == 'bigquery': if DATABASE == 'snowflake':
create_tables_bigquery() create_tables_snowflake(db)
if DATABASE == 'redshift': if DATABASE == 'bigquery':
create_tables_redshift(db) create_tables_bigquery()
db.engine.dispose() if DATABASE == 'redshift':
db = None create_tables_redshift(db)
except Exception as e: db.engine.dispose()
print(repr(e)) db = None
print("Please create the tables with scripts provided in " + except Exception as e:
print(repr(e))
print("Please create the tables with scripts provided in " +
f"'/sql/{DATABASE}_sessions.sql' and '/sql/{DATABASE}_events.sql'") f"'/sql/{DATABASE}_sessions.sql' and '/sql/{DATABASE}_events.sql'")
@ -61,3 +64,29 @@ def insert_batch(db: DBConnection, batch, table, level='normal'):
if db.config == 'snowflake': if db.config == 'snowflake':
insert_to_snowflake(db=db, df=df, table=table) insert_to_snowflake(db=db, df=df, table=table)
def update_batch(db: DBConnection, batch, table):
if len(batch) == 0:
return
df = get_df_from_batch(batch, level='sessions')
base_query = f"UPDATE {table} SET"
for column_name, column_type in dtypes_sessions.items():
if column_name == 'sessionid':
continue
elif column_type == 'string':
df[column_name] = df[column_name].fillna('NULL')
base_query += f" {column_name} = " + "'{" + f"{column_name}" + "}',"
else:
df[column_name] = df[column_name].fillna(0)
base_query += f" {column_name} = " + "{" + f"{column_name}" + "},"
base_query = base_query[:-1] + " WHERE sessionid = {sessionid};"
for i in range(len(df)):
if db.config == 'redshift':
params = dict(df.iloc[i])
query = base_query.format(**params)
try:
db.pdredshift.exec_commit(query)
except Exception as e:
print('[ERROR] Error while executing query')
print(repr(e))

View file

@ -1,15 +1,10 @@
FROM python:3.11 FROM python:3.11-alpine
WORKDIR /usr/src/app WORKDIR /usr/src/app
ENV LIBRD_VER=2.0.2
RUN apk update && apk add postgresql-dev gcc python3-dev musl-dev linux-headers g++ libc-dev libffi-dev make cmake py-pip build-base
RUN apk add --no-cache --virtual .make-deps bash make wget git gcc g++ && apk add --no-cache musl-dev zlib-dev openssl zstd-dev pkgconfig libc-dev && wget https://github.com/edenhill/librdkafka/archive/v${LIBRD_VER}.tar.gz && tar -xvf v${LIBRD_VER}.tar.gz && cd librdkafka-${LIBRD_VER} && ./configure --prefix /usr && make && make install && make clean && rm -rf librdkafka-${LIBRD_VER} && rm -rf v${LIBRD_VER}.tar.gz && apk del .make-deps
COPY . . COPY . .
RUN apt update RUN pip install -r deploy/requirements_bigquery.txt
RUN apt-get install -y libc-dev libffi-dev gcc ENTRYPOINT ./entrypoint.sh
RUN apt update && apt -y install software-properties-common gcc
RUN git clone https://github.com/edenhill/librdkafka
RUN cd librdkafka && ./configure && make && make install && ldconfig
RUN pip install -r ./deploy/requirements_bigquery.txt
CMD ["python", "consumer.py"]

View file

@ -1,15 +1,10 @@
FROM python:3.11 FROM python:3.11-alpine
WORKDIR /usr/src/app WORKDIR /usr/src/app
ENV LIBRD_VER=2.0.2
RUN apk update && apk add postgresql-dev gcc python3-dev musl-dev linux-headers g++ libc-dev libffi-dev make cmake py-pip build-base
RUN apk add --no-cache --virtual .make-deps bash make wget git gcc g++ && apk add --no-cache musl-dev zlib-dev openssl zstd-dev pkgconfig libc-dev && wget https://github.com/edenhill/librdkafka/archive/v${LIBRD_VER}.tar.gz && tar -xvf v${LIBRD_VER}.tar.gz && cd librdkafka-${LIBRD_VER} && ./configure --prefix /usr && make && make install && make clean && rm -rf librdkafka-${LIBRD_VER} && rm -rf v${LIBRD_VER}.tar.gz && apk del .make-deps
COPY . . COPY . .
RUN apt update RUN pip install -r deploy/requirements_clickhouse.txt
RUN apt-get install -y libc-dev libffi-dev gcc ENTRYPOINT ./entrypoint.sh
RUN apt update && apt -y install software-properties-common gcc
RUN git clone https://github.com/edenhill/librdkafka
RUN cd librdkafka && ./configure && make && make install && ldconfig
RUN pip install -r ./deploy/requirements_clickhouse.txt
CMD ["python", "consumer.py"]

View file

@ -1,15 +1,10 @@
FROM python:3.11 FROM python:3.11-alpine
WORKDIR /usr/src/app WORKDIR /usr/src/app
ENV LIBRD_VER=2.0.2
RUN apk update && apk add postgresql-dev gcc python3-dev musl-dev linux-headers g++ libc-dev libffi-dev make cmake py-pip build-base
RUN apk add --no-cache --virtual .make-deps bash make wget git gcc g++ && apk add --no-cache musl-dev zlib-dev openssl zstd-dev pkgconfig libc-dev && wget https://github.com/edenhill/librdkafka/archive/v${LIBRD_VER}.tar.gz && tar -xvf v${LIBRD_VER}.tar.gz && cd librdkafka-${LIBRD_VER} && ./configure --prefix /usr && make && make install && make clean && rm -rf librdkafka-${LIBRD_VER} && rm -rf v${LIBRD_VER}.tar.gz && apk del .make-deps
COPY . . COPY . .
RUN apt update RUN pip install -r deploy/requirements_pg.txt
RUN apt-get install -y libc-dev libffi-dev gcc ENTRYPOINT ./entrypoint.sh
RUN apt update && apt -y install software-properties-common gcc
RUN git clone https://github.com/edenhill/librdkafka
RUN cd librdkafka && ./configure && make && make install && ldconfig
RUN pip install -r ./deploy/requirements_pg.txt
CMD ["python", "consumer.py"]

View file

@ -1,15 +1,15 @@
FROM python:3.11 FROM public.ecr.aws/p1t3u8a3/connectors/redshift:base
WORKDIR /usr/src/app
ENV CLOUD_SERVICE=redshift \
CONNECTION_STRING=postgresql+psycopg2://{USER}:{PASSWORD}@{HOST}:{PORT}/{DBNAME} \
# Keep postgres connection
PG_MINCONN=3 \
PG_MAXCONN=10
RUN apk add --no-cache postgresql-libs lz4-libs zstd-libs
COPY deploy/requirements_redshift.txt .
RUN apk add --no-cache --virtual .build-deps gcc g++ musl-dev postgresql-dev && \
python3 -m pip install -r requirements_redshift.txt --no-cache-dir && \
apk --purge del .build-deps
COPY . . COPY . .
RUN apt update
RUN apt-get install -y libc-dev libffi-dev gcc
RUN apt update && apt -y install software-properties-common gcc
RUN git clone https://github.com/edenhill/librdkafka
RUN cd librdkafka && ./configure && make && make install && ldconfig
RUN pip install -r ./deploy/requirements_redshift.txt
CMD ["python", "consumer.py"]
ENTRYPOINT ./entrypoint.sh

View file

@ -0,0 +1,13 @@
FROM amancevice/pandas:2.0.0-alpine
WORKDIR /usr/src/app
ENV LIBRD_VER=2.0.2
WORKDIR /work
RUN apk add --no-cache --virtual .make-deps postgresql-dev gcc python3-dev \
musl-dev linux-headers g++ libc-dev libffi-dev make cmake py-pip build-base \
bash make wget git gcc g++ musl-dev zlib-dev openssl zstd-dev pkgconfig libc-dev && \
wget https://github.com/edenhill/librdkafka/archive/v${LIBRD_VER}.tar.gz && \
tar -xvf v${LIBRD_VER}.tar.gz && cd librdkafka-${LIBRD_VER} && \
./configure --prefix /usr && make && make install && make clean && \
cd /work && rm -rf librdkafka-${LIBRD_VER} && rm -rf v${LIBRD_VER}.tar.gz \
&& apk del .make-deps

View file

@ -1,15 +1,10 @@
FROM python:3.11 FROM python:3.11-alpine
WORKDIR /usr/src/app WORKDIR /usr/src/app
ENV LIBRD_VER=2.0.2
RUN apk update && apk add postgresql-dev gcc python3-dev musl-dev linux-headers g++ libc-dev libffi-dev make cmake py-pip build-base
RUN apk add --no-cache --virtual .make-deps bash make wget git gcc g++ && apk add --no-cache musl-dev zlib-dev openssl zstd-dev pkgconfig libc-dev && wget https://github.com/edenhill/librdkafka/archive/v${LIBRD_VER}.tar.gz && tar -xvf v${LIBRD_VER}.tar.gz && cd librdkafka-${LIBRD_VER} && ./configure --prefix /usr && make && make install && make clean && rm -rf librdkafka-${LIBRD_VER} && rm -rf v${LIBRD_VER}.tar.gz && apk del .make-deps
COPY . . COPY . .
RUN apt update RUN pip install -r deploy/requirements_snowflake.txt
RUN apt-get install -y libc-dev libffi-dev gcc ENTRYPOINT ./entrypoint.sh
RUN apt update && apt -y install software-properties-common gcc
RUN git clone https://github.com/edenhill/librdkafka
RUN cd librdkafka && ./configure && make && make install && ldconfig
RUN pip install -r ./deploy/requirements_snowflake.txt
CMD ["python", "consumer.py"]

View file

@ -1,14 +1,14 @@
certifi==2022.09.24 chardet==5.1.0
chardet==5.0.0
clickhouse-driver==0.2.4
clickhouse-sqlalchemy==0.2.2
idna==3.4 idna==3.4
confluent-kafka confluent-kafka==2.0.2
psycopg2-binary==2.9.3 psycopg2-binary==2.9.6
python-decouple==3.8
pytz==2022.6 pytz==2022.6
requests==2.28.1 requests==2.28.1
SQLAlchemy==1.4.43 SQLAlchemy==2.0.8
tzlocal tzlocal
urllib3==1.26.12 urllib3==1.26.15
sqlalchemy-redshift
redshift-connector
pandas-redshift pandas-redshift
PyYAML PyYAML

2
ee/connectors/entrypoint.sh Executable file
View file

@ -0,0 +1,2 @@
echo "[INFO] Starting service"
python -u consumer_async.py

View file

@ -19,27 +19,10 @@ def handle_normal_message(message: Message) -> Optional[Event]:
return n return n
if isinstance(message, CustomEvent): if isinstance(message, CustomEvent):
n.customevent_messageid = message.message_id
n.customevent_name = message.name n.customevent_name = message.name
n.customevent_timestamp = message.timestamp
n.customevent_payload = message.payload n.customevent_payload = message.payload
return n return n
if isinstance(message, ErrorEvent):
n.errorevent_message = message.message
n.errorevent_messageid = message.message_id
n.errorevent_name = message.name
n.errorevent_payload = message.payload
n.errorevent_source = message.source
n.errorevent_timestamp = message.timestamp
return n
if isinstance(message, JSException):
n.jsexception_name = message.name
n.jsexception_payload = message.payload
n.jsexception_message = message.message
return n
if isinstance(message, Metadata): if isinstance(message, Metadata):
n.metadata_key = message.key n.metadata_key = message.key
n.metadata_value = message.value n.metadata_value = message.value
@ -52,11 +35,15 @@ def handle_normal_message(message: Message) -> Optional[Event]:
n.mouseclick_selector = message.selector n.mouseclick_selector = message.selector
return n return n
if isinstance(message, MouseClickDepricated): if isinstance(message, NetworkRequest):
n.mouseclick_hesitationtime = message.hesitation_time n.networkrequest_type = message.type
n.mouseclick_id = message.id n.networkrequest_method = message.method
n.mouseclick_label = message.label n.networkrequest_url = message.url
n.mouseclick_selector = '' n.networkrequest_request = message.request
n.networkrequest_response = message.response
n.networkrequest_status = message.status
n.networkrequest_timestamp = message.timestamp
n.networkrequest_duration = message.duration
return n return n
if isinstance(message, PageEvent): if isinstance(message, PageEvent):
@ -74,11 +61,6 @@ def handle_normal_message(message: Message) -> Optional[Event]:
n.pagerendertiming_visuallycomplete = message.visually_complete n.pagerendertiming_visuallycomplete = message.visually_complete
return n return n
if isinstance(message, RawCustomEvent):
n.rawcustomevent_name = message.name
n.rawcustomevent_payload = message.payload
return n
if isinstance(message, SetViewportSize): if isinstance(message, SetViewportSize):
n.setviewportsize_height = message.height n.setviewportsize_height = message.height
n.setviewportsize_width = message.width n.setviewportsize_width = message.width
@ -100,9 +82,10 @@ def handle_normal_message(message: Message) -> Optional[Event]:
n.issueevent_messageid = message.message_id n.issueevent_messageid = message.message_id
n.issueevent_timestamp = message.timestamp n.issueevent_timestamp = message.timestamp
n.issueevent_type = message.type n.issueevent_type = message.type
n.issueevent_contextstring = message.context_string n.issueevent_context_string = message.context_string
n.issueevent_context = message.context n.issueevent_context = message.context
n.issueevent_payload = message.payload n.issueevent_payload = message.payload
n.issueevent_url = message.url
return n return n
if isinstance(message, CustomIssue): if isinstance(message, CustomIssue):
@ -147,14 +130,18 @@ def handle_session(n: Session, message: Message) -> Optional[Session]:
return n return n
if isinstance(message, BatchMetadata): if isinstance(message, BatchMetadata):
n.batchmeta_page_no = message.page_no n.batchmetadata_version = message.version
n.batchmeta_first_index = message.first_index n.batchmetadata_page_no = message.page_no
n.batchmeta_timestamp = message.timestamp n.batchmetadata_first_index = message.first_index
n.batchmetadata_timestamp = message.timestamp
n.batchmetadata_location = message.location
return n return n
if isinstance(message, PartitionedMessage): if isinstance(message, PartitionedMessage):
n.part_no = message.part_no n.partitionedmessage_part_no = message.part_no
n.part_total = message.part_total n.partitionedmessage_part_total = message.part_total
return n
# if isinstance(message, IOSBatchMeta): # if isinstance(message, IOSBatchMeta):
# n.iosbatchmeta_page_no = message.page_no # n.iosbatchmeta_page_no = message.page_no
@ -182,10 +169,10 @@ def handle_session(n: Session, message: Message) -> Optional[Session]:
n.urls_count += 1 n.urls_count += 1
except TypeError: except TypeError:
n.urls_count = 1 n.urls_count = 1
try: #itry:
n.urls.append(message.url) # n.urls.append(message.url)
except AttributeError: #except AttributeError:
n.urls = [message.url] # n.urls = [message.url]
return n return n
if isinstance(message, PerformanceTrackAggr): if isinstance(message, PerformanceTrackAggr):
@ -205,30 +192,30 @@ def handle_session(n: Session, message: Message) -> Optional[Session]:
n.user_anonymous_id = message.id n.user_anonymous_id = message.id
return n return n
if isinstance(message, JSException): if isinstance(message, JSException) or isinstance(message, JSExceptionDeprecated):
try: try:
n.js_exceptions_count += 1 n.js_exceptions_count += 1
except TypeError: except TypeError:
n.js_exceptions_count = 1 n.js_exceptions_count = 1
return n return n
if isinstance(message, LongTask): #if isinstance(message, LongTask):
try: # try:
n.long_tasks_total_duration += message.duration # n.long_tasks_total_duration += message.duration
except TypeError: # except TypeError:
n.long_tasks_total_duration = message.duration # n.long_tasks_total_duration = message.duration
try: # try:
if n.long_tasks_max_duration > message.duration: # if n.long_tasks_max_duration > message.duration:
n.long_tasks_max_duration = message.duration # n.long_tasks_max_duration = message.duration
except TypeError: # except TypeError:
n.long_tasks_max_duration = message.duration # n.long_tasks_max_duration = message.duration
try: # try:
n.long_tasks_count += 1 # n.long_tasks_count += 1
except TypeError: # except TypeError:
n.long_tasks_count = 1 # n.long_tasks_count = 1
return n # return n
if isinstance(message, InputEvent): if isinstance(message, InputEvent):
try: try:
@ -239,58 +226,36 @@ def handle_session(n: Session, message: Message) -> Optional[Session]:
if isinstance(message, MouseClick): if isinstance(message, MouseClick):
try: try:
n.inputs_count += 1 n.clicks_count += 1
except TypeError: except TypeError:
n.inputs_count = 1 n.clicks_count = 1
return n return n
if isinstance(message, IssueEvent): if isinstance(message, IssueEvent) or isinstance(message, IssueEventDeprecated):
try: try:
n.issues_count += 1 n.issues_count += 1
except TypeError: except TypeError:
n.issues_count = 1 n.issues_count = 1
#try:
n.inputs_count = 1 # n.issues.append(message.type)
return n #except AttributeError:
# n.issues = [message.type]
if isinstance(message, MouseClickDepricated):
try:
n.inputs_count += 1
except TypeError:
n.inputs_count = 1
return n
if isinstance(message, IssueEvent):
try:
n.issues_count += 1
except TypeError:
n.issues_count = 1
try:
n.issues.append(message.type)
except AttributeError:
n.issues = [message.type]
return n return n
def handle_message(message: Message) -> Optional[DetailedEvent]: def handle_message(message: Message) -> Optional[DetailedEvent]:
n = DetailedEvent() n = DetailedEvent()
if isinstance(message, SessionEnd): # if isinstance(message, SessionEnd):
n.sessionend = True # n.sessionend = True
n.sessionend_timestamp = message.timestamp # n.sessionend_timestamp = message.timestamp
return n # return n
if isinstance(message, Timestamp): if isinstance(message, Timestamp):
n.timestamp_timestamp = message.timestamp n.timestamp_timestamp = message.timestamp
return n return n
if isinstance(message, SessionDisconnect):
n.sessiondisconnect = True
n.sessiondisconnect_timestamp = message.timestamp
return n
if isinstance(message, SessionStart): if isinstance(message, SessionStart):
n.sessionstart_trackerversion = message.tracker_version n.sessionstart_trackerversion = message.tracker_version
n.sessionstart_revid = message.rev_id n.sessionstart_revid = message.rev_id
@ -352,35 +317,27 @@ def handle_message(message: Message) -> Optional[DetailedEvent]:
n.pagerendertiming_timetointeractive = message.time_to_interactive n.pagerendertiming_timetointeractive = message.time_to_interactive
return n return n
if isinstance(message, ResourceTiming): # if isinstance(message, ResourceTiming):
n.resourcetiming_timestamp = message.timestamp # n.resourcetiming_timestamp = message.timestamp
n.resourcetiming_duration = message.duration # n.resourcetiming_duration = message.duration
n.resourcetiming_ttfb = message.ttfb # n.resourcetiming_ttfb = message.ttfb
n.resourcetiming_headersize = message.header_size # n.resourcetiming_headersize = message.header_size
n.resourcetiming_encodedbodysize = message.encoded_body_size # n.resourcetiming_encodedbodysize = message.encoded_body_size
n.resourcetiming_decodedbodysize = message.decoded_body_size # n.resourcetiming_decodedbodysize = message.decoded_body_size
n.resourcetiming_url = message.url # n.resourcetiming_url = message.url
n.resourcetiming_initiator = message.initiator # n.resourcetiming_initiator = message.initiator
# return n
if isinstance(message, IntegrationEvent):
n.integrationevent_timestamp = message.timestamp
n.integrationevent_source = message.source
n.integrationevent_name = message.name
n.integrationevent_message = message.message
n.integrationevent_payload = message.payload
return n return n
if isinstance(message, JSException):
n.jsexception_name = message.name
n.jsexception_message = message.message
n.jsexception_payload = message.payload
return n
if isinstance(message, RawErrorEvent):
n.rawerrorevent_timestamp = message.timestamp
n.rawerrorevent_source = message.source
n.rawerrorevent_name = message.name
n.rawerrorevent_message = message.message
n.rawerrorevent_payload = message.payload
return n
if isinstance(message, RawCustomEvent):
n.rawcustomevent_name = message.name
n.rawcustomevent_payload = message.payload
return n
if isinstance(message, UserID): if isinstance(message, UserID):
n.userid_id = message.id n.userid_id = message.id
@ -402,14 +359,78 @@ def handle_message(message: Message) -> Optional[DetailedEvent]:
return n return n
if isinstance(message, BatchMetadata): if isinstance(message, BatchMetadata):
n.batchmeta_page_no = message.page_no n.batchmetadata_version = message.version
n.batchmeta_first_index = message.first_index n.batchmetadata_page_no = message.page_no
n.batchmeta_timestamp = message.timestamp n.batchmetadata_first_index = message.first_index
n.batchmetadata_timestamp = message.timestamp
n.batchmetadata_location = message.location
return n return n
if isinstance(message, PartitionedMessage): if isinstance(message, PartitionedMessage):
n.part_no = message.part_no n.partitionedmessage_part_no = message.part_no
n.part_total = message.part_total n.partitionedmessage_part_total = message.part_total
return n
if isinstance(message, InputChange):
n.inputchange_id=message.id
n.inputchange_value=message.value
n.inputchange_value_masked=message.value_masked
n.inputchange_label=message.label
n.inputchange_hesitation_time=message.hesitation_time
n.inputchange_input_duration=message.input_duration
return n
if isinstance(message, SelectionChange):
n.selectionchange_selection_start=message.selection_start
n.selectionchange_selection_end=message.selection_end
n.selectionchange_selection=message.selection
return n
if isinstance(message, MouseThrashing):
n.mousethrashing_timestamp=message.timestamp
return n
if isinstance(message, UnbindNodes):
n.unbindnodes_total_removed_percent=message.total_removed_percent
return n
if isinstance(message, ResourceTiming):
n.resourcetiming_timestamp=message.timestamp
n.resourcetiming_duration=message.duration
n.resourcetiming_ttfb=message.ttfb
n.resourcetiming_header_size=message.header_size
n.resourcetiming_encoded_body_size=message.encoded_body_size
n.resourcetiming_decoded_body_size=message.decoded_body_size
n.resourcetiming_url=message.url
n.resourcetiming_initiator=message.initiator
n.resourcetiming_transferred_size=message.transferred_size
n.resourcetiming_cached=message.cached
return n
if isinstance(message, IssueEvent):
n.issueevent_message_id=message.message_id
n.issueevent_timestamp=message.timestamp
n.issueevent_type=message.type
n.issueevent_context_string=message.context_string
n.issueevent_context=message.context
n.issueevent_payload=message.payload
n.issueevent_url=message.url
return n
if isinstance(message, SessionEnd):
n.sessionend_timestamp=message.timestamp
n.sessionend_encryption_key=message.encryption_key
return n
if isinstance(message, SessionSearch):
n.sessionsearch_timestamp=message.timestamp
n.sessionsearch_partition=message.partition
return n
if isinstance(message, PerformanceTrack): if isinstance(message, PerformanceTrack):
n.performancetrack_frames = message.frames n.performancetrack_frames = message.frames
@ -466,44 +487,73 @@ def handle_message(message: Message) -> Optional[DetailedEvent]:
n.inputevent_label = message.label n.inputevent_label = message.label
return n return n
if isinstance(message, ClickEvent):
n.clickevent_messageid = message.message_id
n.clickevent_timestamp = message.timestamp
n.clickevent_hesitationtime = message.hesitation_time
n.clickevent_label = message.label
return n
if isinstance(message, ErrorEvent):
n.errorevent_messageid = message.message_id
n.errorevent_timestamp = message.timestamp
n.errorevent_source = message.source
n.errorevent_name = message.name
n.errorevent_message = message.message
n.errorevent_payload = message.payload
return n
if isinstance(message, ResourceEvent):
n.resourceevent_messageid = message.message_id
n.resourceevent_timestamp = message.timestamp
n.resourceevent_duration = message.duration
n.resourceevent_ttfb = message.ttfb
n.resourceevent_headersize = message.header_size
n.resourceevent_encodedbodysize = message.encoded_body_size
n.resourceevent_decodedbodysize = message.decoded_body_size
n.resourceevent_url = message.url
n.resourceevent_type = message.type
n.resourceevent_success = message.success
n.resourceevent_method = message.method
n.resourceevent_status = message.status
return n
if isinstance(message, CustomEvent): if isinstance(message, CustomEvent):
n.customevent_messageid = message.message_id
n.customevent_timestamp = message.timestamp
n.customevent_name = message.name n.customevent_name = message.name
n.customevent_payload = message.payload n.customevent_payload = message.payload
return n return n
if isinstance(message, LoadFontFace):
n.loadfontface_parent_id = message.parent_id
n.loadfontface_family = message.family
n.loadfontface_source = message.source
n.loadfontface_descriptors = message.descriptors
return n
if isinstance(message, SetNodeFocus):
n.setnodefocus_id = message.id
return n
if isinstance(message, AdoptedSSReplaceURLBased):
n.adoptedssreplaceurlbased_sheet_id = message.sheet_id
n.adoptedssreplaceurlbased_text = message.text
n.adoptedssreplaceurlbased_base_url = message.base_url
return n
if isinstance(message, AdoptedSSReplace):
n.adoptedssreplace_sheet_id = message.sheet_id
n.adoptedssreplace_text = message.text
return n
if isinstance(message, AdoptedSSInsertRuleURLBased):
n.adoptedssinsertruleurlbased_sheet_id = message.sheet_id
n.adoptedssinsertruleurlbased_rule = message.rule
n.adoptedssinsertruleurlbased_index = message.index
n.adoptedssinsertruleurlbased_base_url = message.base_url
return n
if isinstance(message, AdoptedSSInsertRule):
n.adoptedssinsertrule_sheet_id = message.sheet_id
n.adoptedssinsertrule_rule = message.rule
n.adoptedssinsertrule_index = message.index
return n
if isinstance(message, AdoptedSSDeleteRule):
n.adoptedssdeleterule_sheet_id = message.sheet_id
n.adoptedssdeleterule_index = message.index
return n
if isinstance(message, AdoptedSSAddOwner):
n.adoptedssaddowner_sheet_id = message.sheet_id
n.adoptedssaddowner_id = message.id
return n
if isinstance(message, AdoptedSSRemoveOwner):
n.adoptedssremoveowner_sheet_id = message.sheet_id
n.adoptedssremoveowner_id = message.id
return n
if isinstance(message, JSException):
n.jsexception_name = message.name
n.jsexception_message = message.message
n.jsexception_payload = message.payload
n.jsexception_metadata = message.metadata
return n
if isinstance(message, Zustand):
n.zustand_mutation = message.mutation
n.zustand_state = message.state
return n
# if isinstance(message, CreateDocument): # if isinstance(message, CreateDocument):
# n.createdocument = True # n.createdocument = True
# return n # return n
@ -542,15 +592,10 @@ def handle_message(message: Message) -> Optional[DetailedEvent]:
n.fetch_duration = message.duration n.fetch_duration = message.duration
return n return n
if isinstance(message, FetchEvent): if isinstance(message, SetNodeAttributeDict):
n.fetch_event_message_id = message.message_id n.setnodeattributedict_id = message.id,
n.fetch_event_timestamp = message.timestamp n.setnodeattributedict_name_key = message.name_key
n.fetch_event_method = message.method n.setnodeattributedict_value_key = message.value_key
n.fetch_event_url = message.url
n.fetch_event_request = message.request
n.fetch_event_response = message.response
n.fetch_event_status = message.status
n.fetch_event_duration = message.duration
return n return n
if isinstance(message, Profiler): if isinstance(message, Profiler):
@ -567,16 +612,6 @@ def handle_message(message: Message) -> Optional[DetailedEvent]:
n.graphql_response = message.response n.graphql_response = message.response
return n return n
if isinstance(message, GraphQLEvent):
n.graphqlevent_messageid = message.message_id
n.graphqlevent_timestamp = message.timestamp
n.graphqlevent_name = message.name
return n
if isinstance(message, DomDrop):
n.domdrop_timestamp = message.timestamp
return n
if isinstance(message, MouseClick): if isinstance(message, MouseClick):
n.mouseclick_id = message.id n.mouseclick_id = message.id
n.mouseclick_hesitationtime = message.hesitation_time n.mouseclick_hesitationtime = message.hesitation_time
@ -584,13 +619,6 @@ def handle_message(message: Message) -> Optional[DetailedEvent]:
n.mouseclick_selector = message.selector n.mouseclick_selector = message.selector
return n return n
if isinstance(message, MouseClickDepricated):
n.mouseclick_id = message.id
n.mouseclick_hesitationtime = message.hesitation_time
n.mouseclick_label = message.label
n.mouseclick_selector = ''
return n
if isinstance(message, SetPageLocation): if isinstance(message, SetPageLocation):
n.setpagelocation_url = message.url n.setpagelocation_url = message.url
n.setpagelocation_referrer = message.referrer n.setpagelocation_referrer = message.referrer
@ -612,27 +640,15 @@ def handle_message(message: Message) -> Optional[DetailedEvent]:
n.longtasks_containername = message.container_name n.longtasks_containername = message.container_name
return n return n
if isinstance(message, SetNodeURLBasedAttribute): #if isinstance(message, IssueEvent):
n.setnodeurlbasedattribute_id = message.id # n.issueevent_message_id = message.message_id
n.setnodeurlbasedattribute_name = message.name # n.issueevent_timestamp = message.timestamp
n.setnodeurlbasedattribute_value = message.value # n.issueevent_type = message.type
n.setnodeurlbasedattribute_baseurl = message.base_url # n.issueevent_context_string = message.context_string
return n # n.issueevent_context = message.context
# n.issueevent_payload = message.payload
if isinstance(message, SetStyleData): # n.issueevent_url = message.url
n.setstyledata_id = message.id # return n
n.setstyledata_data = message.data
n.setstyledata_baseurl = message.base_url
return n
if isinstance(message, IssueEvent):
n.issueevent_messageid = message.message_id
n.issueevent_timestamp = message.timestamp
n.issueevent_type = message.type
n.issueevent_contextstring = message.context_string
n.issueevent_context = message.context
n.issueevent_payload = message.payload
return n
if isinstance(message, TechnicalInfo): if isinstance(message, TechnicalInfo):
n.technicalinfo_type = message.type n.technicalinfo_type = message.type
@ -644,10 +660,6 @@ def handle_message(message: Message) -> Optional[DetailedEvent]:
n.customissue_payload = message.payload n.customissue_payload = message.payload
return n return n
if isinstance(message, PageClose):
n.pageclose = True
return n
if isinstance(message, AssetCache): if isinstance(message, AssetCache):
n.asset_cache_url = message.url n.asset_cache_url = message.url
return n return n
@ -677,7 +689,7 @@ def handle_message(message: Message) -> Optional[DetailedEvent]:
return n return n
if isinstance(message, IOSBatchMeta): if isinstance(message, IOSBatchMeta):
n.iosbatchmeta_page_no = message.page_no n.iosbatchmeta_lenght = message.length
n.iosbatchmeta_first_index = message.first_index n.iosbatchmeta_first_index = message.first_index
n.iosbatchmeta_timestamp = message.timestamp n.iosbatchmeta_timestamp = message.timestamp
return n return n

View file

@ -34,6 +34,16 @@ class Codec:
s += 7 s += 7
i += 1 i += 1
@staticmethod
def read_size(reader: io.BytesIO):
size = 0
for i in range(3):
b = reader.read(1)
num = int.from_bytes(b, "big", signed=False)
size += num << (8*i)
return size
@staticmethod @staticmethod
def read_int(reader: io.BytesIO) -> int: def read_int(reader: io.BytesIO) -> int:
""" """
@ -57,7 +67,11 @@ class Codec:
@staticmethod @staticmethod
def read_string(reader: io.BytesIO) -> str: def read_string(reader: io.BytesIO) -> str:
length = Codec.read_uint(reader) length = Codec.read_uint(reader)
s = reader.read(length) try:
s = reader.read(length)
except Exception as e:
print(f'Error while reading string of length {length}')
raise Exception(e)
try: try:
return s.decode("utf-8", errors="replace").replace("\x00", "\uFFFD") return s.decode("utf-8", errors="replace").replace("\x00", "\uFFFD")
except UnicodeDecodeError: except UnicodeDecodeError:

View file

@ -71,7 +71,7 @@ class CreateDocument(Message):
__id__ = 7 __id__ = 7
def __init__(self, ): def __init__(self, ):
pass
class CreateElementNode(Message): class CreateElementNode(Message):

View file

@ -7,6 +7,9 @@ import io
class MessageCodec(Codec): class MessageCodec(Codec):
def __init__(self, msg_selector: List[int] = list()):
self.msg_selector = msg_selector
def read_message_id(self, reader: io.BytesIO) -> int: def read_message_id(self, reader: io.BytesIO) -> int:
""" """
Read and return the first byte where the message id is encoded Read and return the first byte where the message id is encoded
@ -46,27 +49,41 @@ class MessageCodec(Codec):
def decode_detailed(self, b: bytes) -> List[Message]: def decode_detailed(self, b: bytes) -> List[Message]:
reader = io.BytesIO(b) reader = io.BytesIO(b)
messages_list = list() messages_list = list()
messages_list.append(self.handler(reader, 0)) try:
messages_list.append(self.handler(reader, 0))
except IndexError:
print('[WARN] Broken batch')
return list()
if isinstance(messages_list[0], BatchMeta): if isinstance(messages_list[0], BatchMeta):
# Old BatchMeta # Old BatchMeta
mode = 0 mode = 0
elif isinstance(messages_list[0], BatchMetadata): elif isinstance(messages_list[0], BatchMetadata):
# New BatchMeta # New BatchMeta
mode = 1 if messages_list[0].version == 0:
mode = 0
else:
mode = 1
else: else:
return messages_list return messages_list
while True: while True:
try: try:
messages_list.append(self.handler(reader, mode)) msg_decoded = self.handler(reader, mode)
if msg_decoded is not None:
messages_list.append(msg_decoded)
except IndexError: except IndexError:
break break
return messages_list return messages_list
def handler(self, reader: io.BytesIO, mode=0) -> Message: def handler(self, reader: io.BytesIO, mode=0) -> Message:
message_id = self.read_message_id(reader) message_id = self.read_message_id(reader)
#print(f'[INFO-context] Current mode {mode}')
#print(f'[INFO] Currently processing message type {message_id}')
if mode == 1: if mode == 1:
# We skip the three bytes representing the length of message. It can be used to skip unwanted messages # We read the three bytes representing the length of message. It can be used to skip unwanted messages
reader.read(3) r_size = self.read_size(reader)
if message_id not in self.msg_selector:
reader.read(r_size)
return None
return self.read_head_message(reader, message_id) return self.read_head_message(reader, message_id)
elif mode == 0: elif mode == 0:
# Old format with no bytes for message length # Old format with no bytes for message length

View file

@ -1,52 +1,31 @@
CREATE TABLE IF NOT EXISTS connector_events CREATE TABLE IF NOT EXISTS connector_events
( (
sessionid BIGINT, sessionid BIGINT,
connectioninformation_downlink BIGINT, consolelog_level VARCHAR(5000),
connectioninformation_type VARCHAR(300), consolelog_value VARCHAR(5000),
consolelog_level VARCHAR(300), customevent_name VARCHAR(5000),
consolelog_value VARCHAR(300), customevent_payload VARCHAR(5000),
customevent_messageid BIGINT, jsexception_message VARCHAR(5000),
customevent_name VARCHAR(300), jsexception_name VARCHAR(5000),
customevent_payload VARCHAR(300), jsexception_payload VARCHAR(5000),
customevent_timestamp BIGINT, jsexception_metadata VARCHAR(5000),
errorevent_message VARCHAR(300), networkrequest_type VARCHAR(5000),
errorevent_messageid BIGINT, networkrequest_method VARCHAR(5000),
errorevent_name VARCHAR(300), networkrequest_url VARCHAR(5000),
errorevent_payload VARCHAR(300), networkrequest_request VARCHAR(5000),
errorevent_source VARCHAR(300), networkrequest_response VARCHAR(5000),
errorevent_timestamp BIGINT, networkrequest_status BIGINT,
jsexception_message VARCHAR(300), networkrequest_timestamp BIGINT,
jsexception_name VARCHAR(300), networkrequest_duration BIGINT,
jsexception_payload VARCHAR(300), issueevent_message_id BIGINT,
metadata_key VARCHAR(300),
metadata_value VARCHAR(300),
mouseclick_id BIGINT,
mouseclick_hesitationtime BIGINT,
mouseclick_label VARCHAR(300),
pageevent_firstcontentfulpaint BIGINT,
pageevent_firstpaint BIGINT,
pageevent_messageid BIGINT,
pageevent_referrer VARCHAR(300),
pageevent_speedindex BIGINT,
pageevent_timestamp BIGINT,
pageevent_url VARCHAR(300),
pagerendertiming_timetointeractive BIGINT,
pagerendertiming_visuallycomplete BIGINT,
rawcustomevent_name VARCHAR(300),
rawcustomevent_payload VARCHAR(300),
setviewportsize_height BIGINT,
setviewportsize_width BIGINT,
timestamp_timestamp BIGINT,
user_anonymous_id VARCHAR(300),
user_id VARCHAR(300),
issueevent_messageid BIGINT,
issueevent_timestamp BIGINT, issueevent_timestamp BIGINT,
issueevent_type VARCHAR(300), issueevent_type VARCHAR(5000),
issueevent_contextstring VARCHAR(300), issueevent_context_string VARCHAR(5000),
issueevent_context VARCHAR(300), issueevent_context VARCHAR(5000),
issueevent_payload VARCHAR(300), issueevent_payload VARCHAR(5000),
customissue_name VARCHAR(300), issueevent_url VARCHAR(5000),
customissue_payload VARCHAR(300), customissue_name VARCHAR(5000),
customissue_payload VARCHAR(5000),
received_at BIGINT, received_at BIGINT,
batch_order_number BIGINT batch_order_number BIGINT
); );

View file

@ -1,238 +1,91 @@
CREATE TABLE IF NOT EXISTS connector_events_detailed CREATE TABLE IF NOT EXISTS connector_events_detailed
( (
sessionid BIGINT, sessionid BIGINT,
clickevent_hesitationtime BIGINT, clickevent_hesitationtime BIGINT,
clickevent_label VARCHAR(300), clickevent_label VARCHAR(5000),
clickevent_messageid BIGINT, clickevent_messageid BIGINT,
clickevent_timestamp BIGINT, clickevent_timestamp BIGINT,
connectioninformation_downlink BIGINT, connectioninformation_downlink BIGINT,
connectioninformation_type VARCHAR(300), connectioninformation_type VARCHAR(5000),
consolelog_level VARCHAR(300), consolelog_level VARCHAR(5000),
consolelog_value VARCHAR(300), consolelog_value VARCHAR(5000),
cpuissue_duration BIGINT, customevent_name VARCHAR(5000),
cpuissue_rate BIGINT, customevent_payload VARCHAR(5000),
cpuissue_timestamp BIGINT, fetch_duration BIGINT,
createdocument BOOLEAN, fetch_method VARCHAR(5000),
createelementnode_id BIGINT, fetch_request VARCHAR(5000),
createelementnode_parentid BIGINT, fetch_response VARCHAR(5000),
cssdeleterule_index BIGINT, fetch_status BIGINT,
cssdeleterule_stylesheetid BIGINT, fetch_timestamp BIGINT,
cssinsertrule_index BIGINT, fetch_url VARCHAR(5000),
cssinsertrule_rule VARCHAR(300), graphql_operationkind VARCHAR(5000),
cssinsertrule_stylesheetid BIGINT, graphql_operationname VARCHAR(5000),
customevent_messageid BIGINT, graphql_response VARCHAR(5000),
customevent_name VARCHAR(300), graphql_variables VARCHAR(5000),
customevent_payload VARCHAR(300), inputevent_label VARCHAR(5000),
customevent_timestamp BIGINT, inputevent_messageid BIGINT,
domdrop_timestamp BIGINT, inputevent_timestamp BIGINT,
errorevent_message VARCHAR(300), inputevent_value VARCHAR(5000),
errorevent_messageid BIGINT, inputevent_valuemasked BOOLEAN,
errorevent_name VARCHAR(300), jsexception_message VARCHAR(5000),
errorevent_payload VARCHAR(300), jsexception_name VARCHAR(5000),
errorevent_source VARCHAR(300), jsexception_payload VARCHAR(5000),
errorevent_timestamp BIGINT, jsexception_metadata VARCHAR(5000),
fetch_duration BIGINT, mouseclick_id BIGINT,
fetch_method VARCHAR(300), mouseclick_hesitationtime BIGINT,
fetch_request VARCHAR(300), mouseclick_label VARCHAR(5000),
fetch_response VARCHAR(300), networkrequest_type VARCHAR(5000),
fetch_status BIGINT, networkrequest_method VARCHAR(5000),
fetch_timestamp BIGINT, networkrequest_url VARCHAR(5000),
fetch_url VARCHAR(300), networkrequest_request VARCHAR(5000),
graphql_operationkind VARCHAR(300), networkrequest_response VARCHAR(5000),
graphql_operationname VARCHAR(300), networkrequest_status BIGINT,
graphql_response VARCHAR(300), networkrequest_timestamp BIGINT,
graphql_variables VARCHAR(300), networkrequest_duration BIGINT,
graphqlevent_messageid BIGINT, pageevent_domcontentloadedeventend BIGINT,
graphqlevent_name VARCHAR(300), pageevent_domcontentloadedeventstart BIGINT,
graphqlevent_timestamp BIGINT, pageevent_firstcontentfulpaint BIGINT,
inputevent_label VARCHAR(300), pageevent_firstpaint BIGINT,
inputevent_messageid BIGINT, pageevent_loaded BOOLEAN,
inputevent_timestamp BIGINT, pageevent_loadeventend BIGINT,
inputevent_value VARCHAR(300), pageevent_loadeventstart BIGINT,
inputevent_valuemasked BOOLEAN, pageevent_messageid BIGINT,
jsexception_message VARCHAR(300), pageevent_referrer VARCHAR(5000),
jsexception_name VARCHAR(300), pageevent_requeststart BIGINT,
jsexception_payload VARCHAR(300), pageevent_responseend BIGINT,
memoryissue_duration BIGINT, pageevent_responsestart BIGINT,
memoryissue_rate BIGINT, pageevent_speedindex BIGINT,
memoryissue_timestamp BIGINT, pageevent_timestamp BIGINT,
metadata_key VARCHAR(300), pageevent_url VARCHAR(5000),
metadata_value VARCHAR(300), sessionend_timestamp BIGINT,
mobx_payload VARCHAR(300), sessionend_encryption_key VARCHAR(5000),
mobx_type VARCHAR(300), sessionstart_projectid BIGINT,
mouseclick_id BIGINT, sessionstart_revid VARCHAR(5000),
mouseclick_hesitationtime BIGINT, sessionstart_timestamp BIGINT,
mouseclick_label VARCHAR(300), sessionstart_trackerversion VARCHAR(5000),
mousemove_x BIGINT, sessionstart_useragent VARCHAR(5000),
mousemove_y BIGINT, sessionstart_userbrowser VARCHAR(5000),
movenode_id BIGINT, sessionstart_userbrowserversion VARCHAR(5000),
movenode_index BIGINT, sessionstart_usercountry VARCHAR(5000),
movenode_parentid BIGINT, sessionstart_userdevice VARCHAR(5000),
ngrx_action VARCHAR(300), sessionstart_userdeviceheapsize BIGINT,
ngrx_duration BIGINT, sessionstart_userdevicememorysize BIGINT,
ngrx_state VARCHAR(300), sessionstart_userdevicetype VARCHAR(5000),
otable_key VARCHAR(300), sessionstart_useros VARCHAR(5000),
otable_value VARCHAR(300), sessionstart_userosversion VARCHAR(5000),
pageevent_domcontentloadedeventend BIGINT, sessionstart_useruuid VARCHAR(5000),
pageevent_domcontentloadedeventstart BIGINT, setpagelocation_navigationstart BIGINT,
pageevent_firstcontentfulpaint BIGINT, setpagelocation_referrer VARCHAR(5000),
pageevent_firstpaint BIGINT, setpagelocation_url VARCHAR(5000),
pageevent_loaded BOOLEAN, issueevent_message_id BIGINT,
pageevent_loadeventend BIGINT, issueevent_timestamp BIGINT,
pageevent_loadeventstart BIGINT, issueevent_type VARCHAR(5000),
pageevent_messageid BIGINT, issueevent_context_string VARCHAR(5000),
pageevent_referrer VARCHAR(300), issueevent_context VARCHAR(5000),
pageevent_requeststart BIGINT, issueevent_payload VARCHAR(5000),
pageevent_responseend BIGINT, issueevent_url VARCHAR(5000),
pageevent_responsestart BIGINT, customissue_name VARCHAR(5000),
pageevent_speedindex BIGINT, customissue_payload VARCHAR(5000),
pageevent_timestamp BIGINT, received_at BIGINT,
pageevent_url VARCHAR(300), batch_order_number BIGINT
pageloadtiming_domcontentloadedeventend BIGINT,
pageloadtiming_domcontentloadedeventstart BIGINT,
pageloadtiming_firstcontentfulpaint BIGINT,
pageloadtiming_firstpaint BIGINT,
pageloadtiming_loadeventend BIGINT,
pageloadtiming_loadeventstart BIGINT,
pageloadtiming_requeststart BIGINT,
pageloadtiming_responseend BIGINT,
pageloadtiming_responsestart BIGINT,
pagerendertiming_speedindex BIGINT,
pagerendertiming_timetointeractive BIGINT,
pagerendertiming_visuallycomplete BIGINT,
performancetrack_frames BIGINT,
performancetrack_ticks BIGINT,
performancetrack_totaljsheapsize BIGINT,
performancetrack_usedjsheapsize BIGINT,
performancetrackaggr_avgcpu BIGINT,
performancetrackaggr_avgfps BIGINT,
performancetrackaggr_avgtotaljsheapsize BIGINT,
performancetrackaggr_avgusedjsheapsize BIGINT,
performancetrackaggr_maxcpu BIGINT,
performancetrackaggr_maxfps BIGINT,
performancetrackaggr_maxtotaljsheapsize BIGINT,
performancetrackaggr_maxusedjsheapsize BIGINT,
performancetrackaggr_mincpu BIGINT,
performancetrackaggr_minfps BIGINT,
performancetrackaggr_mintotaljsheapsize BIGINT,
performancetrackaggr_minusedjsheapsize BIGINT,
performancetrackaggr_timestampend BIGINT,
performancetrackaggr_timestampstart BIGINT,
profiler_args VARCHAR(300),
profiler_duration BIGINT,
profiler_name VARCHAR(300),
profiler_result VARCHAR(300),
rawcustomevent_name VARCHAR(300),
rawcustomevent_payload VARCHAR(300),
rawerrorevent_message VARCHAR(300),
rawerrorevent_name VARCHAR(300),
rawerrorevent_payload VARCHAR(300),
rawerrorevent_source VARCHAR(300),
rawerrorevent_timestamp BIGINT,
redux_action VARCHAR(300),
redux_duration BIGINT,
redux_state VARCHAR(300),
removenode_id BIGINT,
removenodeattribute_id BIGINT,
removenodeattribute_name VARCHAR(300),
resourceevent_decodedbodysize BIGINT,
resourceevent_duration BIGINT,
resourceevent_encodedbodysize BIGINT,
resourceevent_headersize BIGINT,
resourceevent_messageid BIGINT,
resourceevent_method VARCHAR(300),
resourceevent_status BIGINT,
resourceevent_success BOOLEAN,
resourceevent_timestamp BIGINT,
resourceevent_ttfb BIGINT,
resourceevent_type VARCHAR(300),
resourceevent_url VARCHAR(300),
resourcetiming_decodedbodysize BIGINT,
resourcetiming_duration BIGINT,
resourcetiming_encodedbodysize BIGINT,
resourcetiming_headersize BIGINT,
resourcetiming_initiator VARCHAR(300),
resourcetiming_timestamp BIGINT,
resourcetiming_ttfb BIGINT,
resourcetiming_url VARCHAR(300),
sessiondisconnect BOOLEAN,
sessiondisconnect_timestamp BIGINT,
sessionend BOOLEAN,
sessionend_timestamp BIGINT,
sessionstart_projectid BIGINT,
sessionstart_revid VARCHAR(300),
sessionstart_timestamp BIGINT,
sessionstart_trackerversion VARCHAR(300),
sessionstart_useragent VARCHAR(300),
sessionstart_userbrowser VARCHAR(300),
sessionstart_userbrowserversion VARCHAR(300),
sessionstart_usercountry VARCHAR(300),
sessionstart_userdevice VARCHAR(300),
sessionstart_userdeviceheapsize BIGINT,
sessionstart_userdevicememorysize BIGINT,
sessionstart_userdevicetype VARCHAR(300),
sessionstart_useros VARCHAR(300),
sessionstart_userosversion VARCHAR(300),
sessionstart_useruuid VARCHAR(300),
setcssdata_data BIGINT,
setcssdata_id BIGINT,
setinputchecked_checked BIGINT,
setinputchecked_id BIGINT,
setinputtarget_id BIGINT,
setinputtarget_label BIGINT,
setinputvalue_id BIGINT,
setinputvalue_mask BIGINT,
setinputvalue_value BIGINT,
setnodeattribute_id BIGINT,
setnodeattribute_name BIGINT,
setnodeattribute_value BIGINT,
setnodedata_data BIGINT,
setnodedata_id BIGINT,
setnodescroll_id BIGINT,
setnodescroll_x BIGINT,
setnodescroll_y BIGINT,
setpagelocation_navigationstart BIGINT,
setpagelocation_referrer VARCHAR(300),
setpagelocation_url VARCHAR(300),
setpagevisibility_hidden BOOLEAN,
setviewportscroll_x BIGINT,
setviewportscroll_y BIGINT,
setviewportsize_height BIGINT,
setviewportsize_width BIGINT,
stateaction_type VARCHAR(300),
stateactionevent_messageid BIGINT,
stateactionevent_timestamp BIGINT,
stateactionevent_type VARCHAR(300),
timestamp_timestamp BIGINT,
useranonymousid_id VARCHAR(300),
userid_id VARCHAR(300),
vuex_mutation VARCHAR(300),
vuex_state VARCHAR(300),
longtasks_timestamp BIGINT,
longtasks_duration BIGINT,
longtasks_context BIGINT,
longtasks_containertype BIGINT,
longtasks_containersrc VARCHAR(300),
longtasks_containerid VARCHAR(300),
longtasks_containername BIGINT,
setnodeurlbasedattribute_id BIGINT,
setnodeurlbasedattribute_name VARCHAR(300),
setnodeurlbasedattribute_value VARCHAR(300),
setnodeurlbasedattribute_baseurl VARCHAR(300),
setstyledata_id BIGINT,
setstyledata_data VARCHAR(300),
setstyledata_baseurl VARCHAR(300),
issueevent_messageid BIGINT,
issueevent_timestamp BIGINT,
issueevent_type VARCHAR(300),
issueevent_contextstring VARCHAR(300),
issueevent_context VARCHAR(300),
issueevent_payload VARCHAR(300),
technicalinfo_type VARCHAR(300),
technicalinfo_value VARCHAR(300),
customissue_name VARCHAR(300),
customissue_payload VARCHAR(300),
pageclose BIGINT,
received_at BIGINT,
batch_order_number BIGINT
); );

View file

@ -38,13 +38,8 @@ CREATE TABLE IF NOT EXISTS connector_user_sessions
max_used_js_heap_size bigint, max_used_js_heap_size bigint,
-- ISSUES AND EVENTS -- ISSUES AND EVENTS
js_exceptions_count bigint, js_exceptions_count bigint,
long_tasks_total_duration bigint,
long_tasks_max_duration bigint,
long_tasks_count bigint,
inputs_count bigint, inputs_count bigint,
clicks_count bigint, clicks_count bigint,
issues_count bigint, issues_count bigint,
issues VARCHAR, urls_count bigint
urls_count bigint,
urls VARCHAR
); );

View file

@ -0,0 +1,103 @@
from utils.pg_client import PostgresClient
from queue import Queue
from decouple import config
from time import time
def _project_from_session(sessionId):
"""Search projectId of requested sessionId in PG table sessions"""
with PostgresClient() as conn:
conn.execute(
conn.mogrify("SELECT project_id FROM sessions WHERE session_id=%(sessionId)s LIMIT 1",
{'sessionId': sessionId})
)
res = conn.fetchone()
if res is None:
print(f'[WARN] sessionid {sessionId} not found in sessions table')
return None
return res['project_id']
class CachedSessions:
def __init__(self):
"""cached_sessions of open and recently closed sessions with its current status.
env:
MAX_SESSION_LIFE: cache lifespan of session (default 7200 seconds)"""
self.session_project = dict()
self.max_alive_time = config('MAX_SESSION_LIFE', default=7200, cast=int) # Default 2 hours
def create(self, sessionid):
"""Saves a new session with status OPEN and set its insertion time"""
self.session_project[sessionid] = (time(), 'OPEN')
def add(self, sessionid):
"""Handle the creation of a cached session or update its status if already in cache"""
if sessionid in self.session_project.keys():
if self.session_project[sessionid][1] == 'CLOSE':
tmp = self.session_project[sessionid]
self.session_project[sessionid] = (tmp[0], 'UPDATE')
else:
self.create(sessionid)
def close(self, sessionid):
"""Sets status of session to closed session (received sessionend message)"""
tmp = self.session_project[sessionid]
old_status = tmp[1]
self.session_project[sessionid] = (tmp[0], 'CLOSE')
return old_status
def clear_sessions(self):
"""Delete all sessions that reached max_alive_time"""
to_clean_list = list()
current_time = time()
for sessionid, values in self.session_project.items():
if current_time - values[0] > self.max_alive_time:
to_clean_list.append(sessionid)
del self.session_project[sessionid]
return to_clean_list
class ProjectFilter:
def __init__(self, filter=list()):
"""Filters all sessions that comes from selected projects. This class reads from PG to find projectId and uses cache to avoid duplicated requests.
env:
max_cache_size: max allowed cache lenght - starts cleanup when oversize
cache_lifespan: max lifetime of cached - if surpased it is deleted in cleanup phase"""
self.filter = filter
self.cache = dict()
self.cached_sessions = CachedSessions()
self.to_clean = list()
self.count_bad = 0
self.max_cache_size = config('max_cache_size', default=50, cast=int)
self.cache_lifespan = config('cache_lifespan', default=900, cast=int)
def is_valid(self, sessionId):
"""Verify if sessionId is from selected project"""
if len(self.filter)==0:
return True
elif sessionId in self.cache.keys():
return self.cache[sessionId][1]
else:
found_project_id = _project_from_session(sessionId)
if found_project_id is None:
self.count_bad += 1
return False
else:
project_is_valid = found_project_id in self.filter
self.cache[sessionId] = [time(), project_is_valid]
return project_is_valid
def cleanup(self):
"""Deletes cache when reached cache_lifespan value"""
current_time = time()
self.cache = {sessionid: values for sessionid, values in self.cache.items() if current_time - values[0] < self.cache_lifespan}
def handle_clean(self):
"""Verifies and execute cleanup if needed"""
if len(self.filter)==0:
return
elif len(self.cache) > self.max_cache_size:
self.cleanup()

View file

@ -0,0 +1,182 @@
import logging
import time
from threading import Semaphore
import psycopg2
import psycopg2.extras
from decouple import config
from psycopg2 import pool
logging.basicConfig(level=config("LOGLEVEL", default=logging.INFO))
logging.getLogger('apscheduler').setLevel(config("LOGLEVEL", default=logging.INFO))
_PG_CONFIG = {"host": config("pg_host"),
"database": config("pg_dbname"),
"user": config("pg_user"),
"password": config("pg_password"),
"port": config("pg_port", cast=int),
"application_name": config("APP_NAME", default="PY")}
PG_CONFIG = dict(_PG_CONFIG)
if config("PG_TIMEOUT", cast=int, default=0) > 0:
PG_CONFIG["options"] = f"-c statement_timeout={config('PG_TIMEOUT', cast=int) * 1000}"
class ORThreadedConnectionPool(psycopg2.pool.ThreadedConnectionPool):
def __init__(self, minconn, maxconn, *args, **kwargs):
self._semaphore = Semaphore(maxconn)
super().__init__(minconn, maxconn, *args, **kwargs)
def getconn(self, *args, **kwargs):
self._semaphore.acquire()
try:
return super().getconn(*args, **kwargs)
except psycopg2.pool.PoolError as e:
if str(e) == "connection pool is closed":
make_pool()
raise e
def putconn(self, *args, **kwargs):
try:
super().putconn(*args, **kwargs)
self._semaphore.release()
except psycopg2.pool.PoolError as e:
if str(e) == "trying to put unkeyed connection":
print("!!! trying to put unkeyed connection")
print(f"env-PG_POOL:{config('PG_POOL', default=None)}")
return
raise e
postgreSQL_pool: ORThreadedConnectionPool = None
RETRY_MAX = config("PG_RETRY_MAX", cast=int, default=50)
RETRY_INTERVAL = config("PG_RETRY_INTERVAL", cast=int, default=2)
RETRY = 0
def make_pool():
if not config('PG_POOL', cast=bool, default=True):
return
global postgreSQL_pool
global RETRY
if postgreSQL_pool is not None:
try:
postgreSQL_pool.closeall()
except (Exception, psycopg2.DatabaseError) as error:
logging.error("Error while closing all connexions to PostgreSQL", error)
try:
postgreSQL_pool = ORThreadedConnectionPool(config("PG_MINCONN", cast=int, default=20),
config("PG_MAXCONN", cast=int, default=80),
**PG_CONFIG)
if (postgreSQL_pool):
logging.info("Connection pool created successfully")
except (Exception, psycopg2.DatabaseError) as error:
logging.error("Error while connecting to PostgreSQL", error)
if RETRY < RETRY_MAX:
RETRY += 1
logging.info(f"waiting for {RETRY_INTERVAL}s before retry n°{RETRY}")
time.sleep(RETRY_INTERVAL)
make_pool()
else:
raise error
class PostgresClient:
connection = None
cursor = None
long_query = False
unlimited_query = False
def __init__(self, long_query=False, unlimited_query=False, use_pool=True):
self.long_query = long_query
self.unlimited_query = unlimited_query
self.use_pool = use_pool
if unlimited_query:
long_config = dict(_PG_CONFIG)
long_config["application_name"] += "-UNLIMITED"
self.connection = psycopg2.connect(**long_config)
elif long_query:
long_config = dict(_PG_CONFIG)
long_config["application_name"] += "-LONG"
long_config["options"] = f"-c statement_timeout=" \
f"{config('pg_long_timeout', cast=int, default=5 * 60) * 1000}"
self.connection = psycopg2.connect(**long_config)
elif not use_pool or not config('PG_POOL', cast=bool, default=True):
single_config = dict(_PG_CONFIG)
single_config["application_name"] += "-NOPOOL"
single_config["options"] = f"-c statement_timeout={config('PG_TIMEOUT', cast=int, default=30) * 1000}"
self.connection = psycopg2.connect(**single_config)
else:
self.connection = postgreSQL_pool.getconn()
def __enter__(self):
if self.cursor is None:
self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
self.cursor.cursor_execute = self.cursor.execute
self.cursor.execute = self.__execute
self.cursor.recreate = self.recreate_cursor
return self.cursor
def __exit__(self, *args):
try:
self.connection.commit()
self.cursor.close()
if not self.use_pool or self.long_query or self.unlimited_query:
self.connection.close()
except Exception as error:
logging.error("Error while committing/closing PG-connection", error)
if str(error) == "connection already closed" \
and self.use_pool \
and not self.long_query \
and not self.unlimited_query \
and config('PG_POOL', cast=bool, default=True):
logging.info("Recreating the connexion pool")
make_pool()
else:
raise error
finally:
if config('PG_POOL', cast=bool, default=True) \
and self.use_pool \
and not self.long_query \
and not self.unlimited_query:
postgreSQL_pool.putconn(self.connection)
def __execute(self, query, vars=None):
try:
result = self.cursor.cursor_execute(query=query, vars=vars)
except psycopg2.Error as error:
logging.error(f"!!! Error of type:{type(error)} while executing query:")
logging.error(query)
logging.info("starting rollback to allow future execution")
self.connection.rollback()
raise error
return result
def recreate_cursor(self, rollback=False):
if rollback:
try:
self.connection.rollback()
except Exception as error:
logging.error("Error while rollbacking connection for recreation", error)
try:
self.cursor.close()
except Exception as error:
logging.error("Error while closing cursor for recreation", error)
self.cursor = None
return self.__enter__()
async def init():
logging.info(f">PG_POOL:{config('PG_POOL', default=None)}")
if config('PG_POOL', cast=bool, default=True):
make_pool()
async def terminate():
global postgreSQL_pool
if postgreSQL_pool is not None:
try:
postgreSQL_pool.closeall()
logging.info("Closed all connexions to PostgreSQL")
except (Exception, psycopg2.DatabaseError) as error:
logging.error("Error while closing all connexions to PostgreSQL", error)