* Updated pg connector * fix(player): fix first 8 byte checker * fix(player): fix commit conflict * Added pylint * Removed pylint for incompatible license * change(ui): check for sessions records state * Patch/api v1.12.0 (#1299) * fix(chalice): include metadata in sessions exp search * fix(chalice): fixed sessions exp search wrong col name * fix(chalice): removed cookies * fix(chalice): changed base image to support SSO/xmlsec * fix(chalice): changed Dockerfile to support SSO/xmlsec * fix(chalice): changed Dockerfile to support SSO/xmlsec (cherry picked from commit4b8cf9742c) * fix(ui): project fallback to recorded variable * Patch/api v1.12.0 (#1301) * fix(chalice): changed base image to support SSO/xmlsec * fix(chalice): fixed exp search null metadata (cherry picked from commitab000751d2) * change(ui): assist no content message styles and icons * change(ui): revert menu disable * fix(connector): Added method to save state in s3 for redshift if sigterm arise * Rewriting python code in cython * Added pyx module for messages * Auto create pyx files * Updated and fixed msgcodec.pyx * Added new module to connector code * Updated kafka lib for base image * cleaned Docker and updated base image version for pandas * cleaned prints * Added code to fetch data from db and add it into redshift * Updated consumer reading method. Async multithreading over sessionId * Added split for country (Country,State,City) * Fixed decoding issue for uint * Created service able to fix data from redshift by reading from db * Handle when process ended because of lost connection to pg, country set to country value only
253 lines
9.3 KiB
Python
253 lines
9.3 KiB
Python
from decouple import config, Csv
|
|
from confluent_kafka import Consumer
|
|
from datetime import datetime
|
|
from collections import defaultdict
|
|
import asyncio
|
|
from time import time, sleep
|
|
from copy import deepcopy
|
|
|
|
from msgcodec import MessageCodec
|
|
from messages import SessionEnd
|
|
from db.api import DBConnection
|
|
from db.models import events_detailed_table_name, events_table_name, sessions_table_name
|
|
from db.writer import insert_batch, update_batch
|
|
from handler import handle_message, handle_normal_message, handle_session
|
|
from utils.cache import ProjectFilter as PF
|
|
from utils import pg_client
|
|
|
|
from psycopg2 import InterfaceError
|
|
from utils.signal_handler import signal_handler
|
|
|
|
def process_message(msg, codec, sessions, batch, sessions_batch, interesting_sessions, interesting_events, EVENT_TYPE, projectFilter, broken_batchs = 0):
|
|
if msg is None:
|
|
return
|
|
messages = codec.decode_detailed(msg.value())
|
|
try:
|
|
session_id = codec.decode_key(msg.key())
|
|
except Exception as e:
|
|
broken_batchs = broken_batchs + 1
|
|
# print('[WARN] Broken sessionid')
|
|
# print(e)
|
|
return
|
|
if messages is None:
|
|
return
|
|
elif not projectFilter.is_valid(session_id):
|
|
# We check using projectFilter if session_id is from the selected projects
|
|
return
|
|
|
|
for message in messages:
|
|
if message.__id__ in interesting_events:
|
|
if EVENT_TYPE == 'detailed':
|
|
n = handle_message(message)
|
|
elif EVENT_TYPE == 'normal':
|
|
n = handle_normal_message(message)
|
|
if message.__id__ in interesting_sessions:
|
|
|
|
# Here we create the session if not exists or append message event if session exists
|
|
sessions[session_id] = handle_session(sessions[session_id], message)
|
|
if sessions[session_id]:
|
|
sessions[session_id].sessionid = session_id
|
|
projectFilter.cached_sessions.add(session_id)
|
|
|
|
if isinstance(message, SessionEnd):
|
|
# Here only if session exists and we get sessionend we start cleanup
|
|
if sessions[session_id].session_start_timestamp:
|
|
projectFilter.handle_clean()
|
|
old_status = projectFilter.cached_sessions.close(session_id)
|
|
sessions_batch.append((old_status, deepcopy(sessions[session_id])))
|
|
sessions_to_delete = projectFilter.cached_sessions.clear_sessions()
|
|
for sess_id in sessions_to_delete:
|
|
try:
|
|
del sessions[sess_id]
|
|
except KeyError:
|
|
...
|
|
else:
|
|
print('[WARN] Session not started received SessionEnd message')
|
|
del sessions[session_id]
|
|
|
|
if message.__id__ in interesting_events:
|
|
if n:
|
|
n.sessionid = session_id
|
|
n.received_at = int(datetime.now().timestamp() * 1000)
|
|
n.batch_order_number = len(batch)
|
|
batch.append(n)
|
|
else:
|
|
continue
|
|
|
|
|
|
def attempt_session_insert(sess_batch, db, sessions_table_name, try_=0):
|
|
if sess_batch:
|
|
try:
|
|
#print("inserting sessions...")
|
|
insert_batch(db, sess_batch, table=sessions_table_name, level='sessions')
|
|
#print("inserted sessions succesfully")
|
|
except TypeError as e:
|
|
print("Type conversion error")
|
|
print(repr(e))
|
|
except ValueError as e:
|
|
print("Message value could not be processed or inserted correctly")
|
|
print(repr(e))
|
|
except InterfaceError as e:
|
|
if try_ < 3:
|
|
try_ += 1
|
|
sleep(try_*2)
|
|
attempt_session_insert(sess_batch, db, sessions_table_name, try_)
|
|
except Exception as e:
|
|
print(repr(e))
|
|
|
|
|
|
def attempt_session_update(sess_batch, db, sessions_table_name):
|
|
if sess_batch:
|
|
try:
|
|
#print('updating sessions')
|
|
update_batch(db, sess_batch, table=sessions_table_name)
|
|
except TypeError as e:
|
|
print('Type conversion error')
|
|
print(repr(e))
|
|
except ValueError as e:
|
|
print('Message value could not be processed or inserted correctly')
|
|
print(repr(e))
|
|
except InterfaceError as e:
|
|
print('Error while trying to update session into datawarehouse')
|
|
print(repr(e))
|
|
except Exception as e:
|
|
print(repr(e))
|
|
|
|
|
|
def attempt_batch_insert(batch, db, table_name, EVENT_TYPE, try_=0):
|
|
# insert a batch
|
|
try:
|
|
#print("inserting...")
|
|
insert_batch(db=db, batch=batch, table=table_name, level=EVENT_TYPE)
|
|
#print("inserted succesfully")
|
|
except TypeError as e:
|
|
print("Type conversion error")
|
|
print(repr(e))
|
|
except ValueError as e:
|
|
print("Message value could not be processed or inserted correctly")
|
|
print(repr(e))
|
|
except InterfaceError as e:
|
|
if try_ < 3:
|
|
try_ += 1
|
|
sleep(try_*2)
|
|
attempt_batch_insert(batch, db, table_name, EVENT_TYPE, try_)
|
|
elif try_ == 3:
|
|
# TODO: Restart redshift
|
|
db.restart()
|
|
sleep(2)
|
|
attempt_batch_insert(batch, db, table_name, EVENT_TYPE, try_ + 1)
|
|
else:
|
|
print(repr(e))
|
|
except Exception as e:
|
|
print(repr(e))
|
|
|
|
def decode_key(b) -> int:
|
|
"""
|
|
Decode the message key (encoded with little endian)
|
|
"""
|
|
try:
|
|
decoded = int.from_bytes(b, "little", signed=False)
|
|
except Exception as e:
|
|
print(f'Error while decoding message key (SessionId) from {b}')
|
|
raise e
|
|
return decoded
|
|
|
|
|
|
async def main():
|
|
await pg_client.init()
|
|
DATABASE = config('CLOUD_SERVICE')
|
|
EVENT_TYPE = config('EVENT_TYPE')
|
|
|
|
db = DBConnection(DATABASE)
|
|
upload_rate = config('upload_rate', default=30, cast=int)
|
|
|
|
if EVENT_TYPE == 'detailed':
|
|
table_name = events_detailed_table_name
|
|
elif EVENT_TYPE == 'normal':
|
|
table_name = events_table_name
|
|
|
|
batch = []
|
|
sessions = defaultdict(lambda: None)
|
|
sessions_batch = []
|
|
|
|
sessions_events_selection = [1,25,28,29,30,31,32,54,56,62,69,78,125,126]
|
|
if EVENT_TYPE == 'normal':
|
|
selected_events = [21,22,25,27,64,78,125]
|
|
elif EVENT_TYPE == 'detailed':
|
|
selected_events = [1,4,21,22,25,27,31,32,39,48,59,64,69,78,125,126]
|
|
filter_events = list(set(sessions_events_selection+selected_events))
|
|
|
|
allowed_projects = config('PROJECT_IDS', default=None, cast=Csv(int))
|
|
project_filter = PF(allowed_projects)
|
|
try:
|
|
project_filter.load_checkpoint(db)
|
|
except Exception as e:
|
|
print('[WARN] Checkpoint not found')
|
|
print(repr(e))
|
|
codec = MessageCodec(filter_events)
|
|
ssl_protocol = config('KAFKA_USE_SSL', default=True, cast=bool)
|
|
consumer_settings = {
|
|
"bootstrap.servers": config('KAFKA_SERVERS'),
|
|
"group.id": f"connector_{DATABASE}",
|
|
"auto.offset.reset": "earliest",
|
|
"enable.auto.commit": False
|
|
}
|
|
if ssl_protocol:
|
|
consumer_settings['security.protocol'] = 'SSL'
|
|
consumer = Consumer(consumer_settings)
|
|
|
|
consumer.subscribe(config("TOPICS", default="saas-raw").split(','))
|
|
print("[INFO] Kafka consumer subscribed")
|
|
|
|
c_time = time()
|
|
read_msgs = 0
|
|
broken_batchs = 0
|
|
while signal_handler.KEEP_PROCESSING:
|
|
msg = consumer.poll(1.0)
|
|
process_message(msg, codec, sessions, batch, sessions_batch, sessions_events_selection, selected_events, EVENT_TYPE, project_filter, broken_batchs)
|
|
read_msgs += 1
|
|
if time() - c_time > upload_rate:
|
|
#print(f'[INFO] {read_msgs} kafka messages read in {upload_rate} seconds')
|
|
if broken_batchs > 0:
|
|
print(f'[WARN] {broken_batchs} broken sessionIds')
|
|
broken_batchs = 0
|
|
await insertBatch(deepcopy(sessions_batch), deepcopy(batch), db, sessions_table_name, table_name, EVENT_TYPE)
|
|
consumer.commit()
|
|
try:
|
|
project_filter.save_checkpoint(db)
|
|
except Exception as e:
|
|
print("[Error] Error while saving checkpoint")
|
|
print(repr(e))
|
|
sessions_batch = []
|
|
batch = []
|
|
read_msgs = 0
|
|
c_time = time()
|
|
project_filter.terminate(db)
|
|
|
|
|
|
|
|
async def insertBatch(sessions_batch, batch, db, sessions_table_name, table_name, EVENT_TYPE):
|
|
t1 = time()
|
|
print(f'[BG-INFO] Number of events to add {len(batch)}, number of sessions to add {len(sessions_batch)}')
|
|
new_sessions = list()
|
|
updated_sessions = list()
|
|
for old_status, session_in_batch in sessions_batch:
|
|
if old_status == 'UPDATE':
|
|
updated_sessions.append(session_in_batch)
|
|
else:
|
|
new_sessions.append(session_in_batch)
|
|
#print(f'[DEBUG] Number of new sessions {len(new_sessions)}, number of sessions to update {len(updated_sessions)}')
|
|
if new_sessions != []:
|
|
attempt_session_insert(new_sessions, db, sessions_table_name)
|
|
|
|
if updated_sessions != []:
|
|
attempt_session_update(updated_sessions, db, sessions_table_name)
|
|
|
|
# insert a batch of events
|
|
if batch != []:
|
|
attempt_batch_insert(batch, db, table_name, EVENT_TYPE)
|
|
print(f'[BG-INFO] Uploaded into S3 in {time()-t1} seconds')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(main())
|