* Updated dependancies for redshift connector, changed os module for python-decouple module
* Updated service and images
* Updated message protocol, added exception for BatchMetadata when version is 0 (we apply old read method)
* fixed load error from s3 to redshift. null values for string columns are now empty strings ("")
* Added file test consumer_async.py: reads every 3 minutes kafka raw and send task in background to upload to cloud
* Added method to skip messages that are not inserted to cloud
* Added logs into consumer_async. Changed urls and issues in sessions table from list to string
* Split between messages for sessions table and for events table
* Updated redshift tables
* Fixed small issue in query redshift_sessions.sql
* Updated Dockerfiles. Cleaned logs of consumer_async. Updated/Fixed tables. Transformed Nan as NULL for VARCHAR columns
* Added error handler for sql dropped connection
* chore(docker): Optimize docker builds
Signed-off-by: rjshrjndrn <rjshrjndrn@gmail.com>
* Variables renamed
* Adding compression libraries
* Set default value of count events to 0 (instead of NULL) when event did not occur
* Added support specific project tracking. Added PG handler to connect to sessions table
* Added method to update values in db connection for sessions ended and restarted
* Removing intelligent file copying
* chore(connector): Build file
Signed-off-by: rjshrjndrn <rjshrjndrn@gmail.com>
* Adding connection pool for pg
* Renaming and optimizing
* Fixed issue of missing information of sessions
---------
Signed-off-by: rjshrjndrn <rjshrjndrn@gmail.com>
Co-authored-by: rjshrjndrn <rjshrjndrn@gmail.com>
161 lines
5.4 KiB
Python
161 lines
5.4 KiB
Python
from decouple import config
|
|
from confluent_kafka import Consumer
|
|
from datetime import datetime
|
|
from collections import defaultdict
|
|
import json
|
|
from time import time
|
|
|
|
#from msgcodec.codec import MessageCodec
|
|
from msgcodec.msgcodec import MessageCodec
|
|
from msgcodec.messages import SessionEnd
|
|
print('[INFO] Importing DBConnection...')
|
|
from db.api import DBConnection
|
|
print('[INFO] Importing from models..')
|
|
from db.models import events_detailed_table_name, events_table_name, sessions_table_name
|
|
print('[INFO] Importing from writer..')
|
|
from db.writer import insert_batch
|
|
print('[INFO] Importing from handler..')
|
|
from handler import handle_message, handle_normal_message, handle_session
|
|
|
|
DATABASE = config('DATABASE_NAME')
|
|
LEVEL = config('LEVEL')
|
|
|
|
print(f'[INFO] Connecting to database {DATABASE}')
|
|
db = DBConnection(DATABASE)
|
|
print('Connected successfully')
|
|
|
|
if LEVEL == 'detailed':
|
|
table_name = events_detailed_table_name
|
|
elif LEVEL == 'normal':
|
|
table_name = events_table_name
|
|
|
|
|
|
def main():
|
|
batch_size = config('events_batch_size', default=4000, cast=int)
|
|
sessions_batch_size = config('sessions_batch_size', default=400, cast=int)
|
|
batch = []
|
|
sessions = defaultdict(lambda: None)
|
|
sessions_batch = []
|
|
|
|
codec = MessageCodec()
|
|
ssl_protocol = config('SSL_ENABLED', default=True, cast=bool)
|
|
consumer_settings = {
|
|
"bootstrap.servers": config('KAFKA_SERVER'),
|
|
"group.id": f"connector_{DATABASE}",
|
|
"auto.offset.reset": "earliest",
|
|
"enable.auto.commit": False
|
|
}
|
|
if ssl_protocol:
|
|
consumer_settings['security.protocol'] = 'SSL'
|
|
consumer = Consumer(consumer_settings)
|
|
|
|
consumer.subscribe([config("topic", default="saas-raw")])
|
|
print("Kafka consumer subscribed")
|
|
t_ = time()
|
|
while True:
|
|
msg = consumer.poll(1.0)
|
|
if msg is None:
|
|
continue
|
|
#value = json.loads(msg.value().decode('utf-8'))
|
|
messages = codec.decode_detailed(msg.value())
|
|
session_id = codec.decode_key(msg.key())
|
|
if messages is None:
|
|
print('-')
|
|
continue
|
|
|
|
for message in messages:
|
|
if LEVEL == 'detailed':
|
|
n = handle_message(message)
|
|
elif LEVEL == 'normal':
|
|
n = handle_normal_message(message)
|
|
|
|
#session_id = codec.decode_key(msg.key)
|
|
sessions[session_id] = handle_session(sessions[session_id], message)
|
|
if sessions[session_id]:
|
|
sessions[session_id].sessionid = session_id
|
|
|
|
# put in a batch for insertion if received a SessionEnd
|
|
if isinstance(message, SessionEnd):
|
|
if sessions[session_id]:
|
|
sessions_batch.append(sessions[session_id])
|
|
|
|
# try to insert sessions
|
|
if len(sessions_batch) >= sessions_batch_size:
|
|
t2 = time()
|
|
attempt_session_insert(sessions_batch)
|
|
t2_ = time()
|
|
print(f'[INFO] Inserted sessions into Redshift - time spent: {t2_-t2}')
|
|
t_ += t2_-t2
|
|
for s in sessions_batch:
|
|
try:
|
|
del sessions[s.sessionid]
|
|
except KeyError as e:
|
|
print(repr(e))
|
|
sessions_batch = []
|
|
|
|
if n:
|
|
n.sessionid = session_id
|
|
n.received_at = int(datetime.now().timestamp() * 1000)
|
|
n.batch_order_number = len(batch)
|
|
batch.append(n)
|
|
else:
|
|
continue
|
|
|
|
# insert a batch of events
|
|
if len(batch) >= batch_size:
|
|
t1 = time()
|
|
print(f'[INFO] Spent time filling ({batch_size})-batch: {t1-t_}')
|
|
attempt_batch_insert(batch)
|
|
t1_ = time()
|
|
t_ = t1_
|
|
print(f'[INFO] Inserted events into Redshift - time spent: {t1_-t1}')
|
|
batch = []
|
|
consumer.commit()
|
|
print("[INFO] sessions in cache:", len(sessions))
|
|
|
|
|
|
def attempt_session_insert(sess_batch):
|
|
if sess_batch:
|
|
try:
|
|
print("inserting sessions...")
|
|
insert_batch(db, sess_batch, table=sessions_table_name, level='sessions')
|
|
print("inserted sessions succesfully")
|
|
except TypeError as e:
|
|
print("Type conversion error")
|
|
print(repr(e))
|
|
except ValueError as e:
|
|
print("Message value could not be processed or inserted correctly")
|
|
print(repr(e))
|
|
except Exception as e:
|
|
print(repr(e))
|
|
|
|
|
|
def attempt_batch_insert(batch):
|
|
# insert a batch
|
|
try:
|
|
print("inserting...")
|
|
insert_batch(db=db, batch=batch, table=table_name, level=LEVEL)
|
|
print("inserted succesfully")
|
|
except TypeError as e:
|
|
print("Type conversion error")
|
|
print(repr(e))
|
|
except ValueError as e:
|
|
print("Message value could not be processed or inserted correctly")
|
|
print(repr(e))
|
|
except Exception as e:
|
|
print(repr(e))
|
|
|
|
def decode_key(b) -> int:
|
|
"""
|
|
Decode the message key (encoded with little endian)
|
|
"""
|
|
try:
|
|
decoded = int.from_bytes(b, "little", signed=False)
|
|
except Exception as e:
|
|
raise UnicodeDecodeError(f"Error while decoding message key (SessionID) from {b}\n{e}")
|
|
return decoded
|
|
|
|
if __name__ == '__main__':
|
|
print('[INFO] Setup complete')
|
|
print('[INFO] Starting script')
|
|
main()
|