* ci(deployment): injecting secrets Signed-off-by: Rajesh Rajendran <rjshrjndrn@gmail.com> * fix: typo * feat(installation): Enterprise license check * fix(install): reset ee cli args Signed-off-by: Rajesh Rajendran <rjshrjndrn@gmail.com> * Fix typo * Update README.md * feat (tracker-axios): init plugin * fix (tracker-axios): version patch * Fixed alert's unknown metrics handler * fix (tracker-mobx): dev-dependencies and updated package-lock * feat: APIs for user session data deleteion - wip * fix: alert metric value of performance.speed_index * Build and deploy scripts for enterprise edition (#13) * feat(installation): enterprise installation * chore(install): enabling ansible gather_facts Signed-off-by: Rajesh Rajendran <rjshrjndrn@gmail.com> * chore(install): quotes for enterprise key Signed-off-by: Rajesh Rajendran <rjshrjndrn@gmail.com> * chore(installation): enterprise install dbs Signed-off-by: Rajesh Rajendran <rjshrjndrn@gmail.com> * chore(install): rename yaml * chore(install): change image tag Signed-off-by: Rajesh Rajendran <rjshrjndrn@gmail.com> * chore(install): License key variable added * chore(deployment): Injecting enterprise license key in workers. * chore(install): remove deprecated files * chore(install): make domain_name mandatory in vars.yaml Signed-off-by: Rajesh Rajendran <rjshrjndrn@gmail.com> * chore(actions): ee workers Signed-off-by: Rajesh Rajendran <rjshrjndrn@gmail.com> * feat(install): use local docker instead of crictl You can use the images built in the local machine, in installation, without putting that in any external registry. Signed-off-by: Rajesh Rajendran <rjshrjndrn@gmail.com> * feat: APIs for user session data deleteion * feat: prefix deleted mobs with DEL_ * feat: schedules to delete mobs * chore(ci): fix ee build Signed-off-by: Rajesh Rajendran <rjshrjndrn@gmail.com> * feat(build): passing build args to internal scripts Signed-off-by: Rajesh Rajendran <rjshrjndrn@gmail.com> * chore(install): moving kafka topic creation at the end Kafka pods usually takes time to be active. Signed-off-by: Rajesh Rajendran <rjshrjndrn@gmail.com> * chore(install): removing auth service. * chore(install): Adding rancher for cluster management * chore(install): proper name for alerts template * separate requirements and clean up * feat (frontend): typescript support * feat (tracker): 3.0.4: maintain baseURL & connAttempt options * feat(api): changed license validation * feat(api): ee-license fix for unprovided value * feat(api): fixed ee-signup cursor * feat(api): FOS fix replay-mob issue * feat(api): ee log ch-resources query * chore(ci): change openreplay-cli with kube-install.sh Signed-off-by: Rajesh Rajendran <rjshrjndrn@gmail.com> * ci(actions): change ee naming * feat(api): removed ch-logs * feat(install): injecting ee variables only on ee installation. Signed-off-by: Rajesh Rajendran <rjshrjndrn@gmail.com> * chore(install): remove licence key from ee Signed-off-by: Rajesh Rajendran <rjshrjndrn@gmail.com> * fix(install): ch values for chalice * feat(clickhouse): moved creation scripts to EE folder * fix (backend-ee): disable ios tables so far * chore(install): remove deprecated mandatory variables. Signed-off-by: Rajesh Rajendran <rjshrjndrn@gmail.com> * feat(api): remove duplicate files & changed signup * fix(backend-ee): ch prepare after commit * fix(backend-ee): syntax * feat(api): added missing EE tenant column * fix(scripts-ee): correct default clickhouse host * feat(api): changed version_number location * feat(api): ee log ch-errors query * feat(api): ee fix ch-errors query * feat: skip to issue button (#23) * feat(api): 🐛 ee fix ambiguous ch-error query & accounts endpoint * Feature: Autoplay Sessions (#22) * feat: autoplay sessions * change: removed unused import * auto play filter by tab * feat(api): changed JWT authorizer & API_KEY authorizer & fix undefined project_key * feat (backend-devops): Dockerfile for all services in one image * feat(sourcemap-uploader): --verbose argument use instead of --log * feat(api): log middleware * Feature - dom inspector (#28) * feat (frontend): typescript support * feat(frontend): DOM Inspector init * fix(frontend): use tailwind bg * feat(frontend dom-inspector): add element selection & deletion * fix(frontend): todo comment * di - styling wip * feature(di) - editor theme * feat(frontend): parse attributes with RE (+ability to add) * feature(di) - input width * fix(ui): di - review changes Co-authored-by: ShiKhu <alex.kaminsky.11@gmail.com> * chore(install): remove depricated init_dbs * feat(api): ee override multi-tenant-core * fix(frontend-build): gen css types before build * fix(ui) - checking for the license (#30) Co-authored-by: Rajesh Rajendran <rjshrjndrn@gmail.com> Co-authored-by: Mehdi Osman <estradino@users.noreply.github.com> Co-authored-by: ShiKhu <alex.kaminsky.11@gmail.com> Co-authored-by: KRAIEM Taha Yassine <tahayk2@gmail.com> Co-authored-by: Rajesh Rajendran <rjshrjndrn@users.noreply.github.com> Co-authored-by: ourvakan <hi-psi@yandex.com> Co-authored-by: tahayk2@gmail.com <enissay4ever4github>
121 lines
3.8 KiB
Python
121 lines
3.8 KiB
Python
import os
|
|
from kafka import KafkaConsumer
|
|
from datetime import datetime
|
|
from collections import defaultdict
|
|
|
|
from msgcodec.codec import MessageCodec
|
|
from msgcodec.messages import SessionEnd
|
|
from db.api import DBConnection
|
|
from db.models import events_detailed_table_name, events_table_name, sessions_table_name
|
|
from db.writer import insert_batch
|
|
from handler import handle_message, handle_normal_message, handle_session
|
|
|
|
DATABASE = os.environ['DATABASE_NAME']
|
|
LEVEL = os.environ['level']
|
|
|
|
db = DBConnection(DATABASE)
|
|
|
|
if LEVEL == 'detailed':
|
|
table_name = events_detailed_table_name
|
|
elif LEVEL == 'normal':
|
|
table_name = events_table_name
|
|
|
|
|
|
def main():
|
|
batch_size = 4000
|
|
sessions_batch_size = 400
|
|
batch = []
|
|
sessions = defaultdict(lambda: None)
|
|
sessions_batch = []
|
|
|
|
codec = MessageCodec()
|
|
consumer = KafkaConsumer(security_protocol="SSL",
|
|
bootstrap_servers=[os.environ['KAFKA_SERVER_1'],
|
|
os.environ['KAFKA_SERVER_2']],
|
|
group_id=f"connector_{DATABASE}",
|
|
auto_offset_reset="earliest",
|
|
enable_auto_commit=False)
|
|
|
|
consumer.subscribe(topics=["events", "messages"])
|
|
print("Kafka consumer subscribed")
|
|
for msg in consumer:
|
|
message = codec.decode(msg.value)
|
|
if message is None:
|
|
print('-')
|
|
continue
|
|
|
|
if LEVEL == 'detailed':
|
|
n = handle_message(message)
|
|
elif LEVEL == 'normal':
|
|
n = handle_normal_message(message)
|
|
|
|
session_id = codec.decode_key(msg.key)
|
|
sessions[session_id] = handle_session(sessions[session_id], message)
|
|
if sessions[session_id]:
|
|
sessions[session_id].sessionid = session_id
|
|
|
|
# put in a batch for insertion if received a SessionEnd
|
|
if isinstance(message, SessionEnd):
|
|
if sessions[session_id]:
|
|
sessions_batch.append(sessions[session_id])
|
|
|
|
# try to insert sessions
|
|
if len(sessions_batch) >= sessions_batch_size:
|
|
attempt_session_insert(sessions_batch)
|
|
for s in sessions_batch:
|
|
try:
|
|
del sessions[s.sessionid]
|
|
except KeyError as e:
|
|
print(repr(e))
|
|
sessions_batch = []
|
|
|
|
if n:
|
|
n.sessionid = session_id
|
|
n.received_at = int(datetime.now().timestamp() * 1000)
|
|
n.batch_order_number = len(batch)
|
|
batch.append(n)
|
|
else:
|
|
continue
|
|
|
|
# insert a batch of events
|
|
if len(batch) >= batch_size:
|
|
attempt_batch_insert(batch)
|
|
batch = []
|
|
consumer.commit()
|
|
print("sessions in cache:", len(sessions))
|
|
|
|
|
|
def attempt_session_insert(sess_batch):
|
|
if sess_batch:
|
|
try:
|
|
print("inserting sessions...")
|
|
insert_batch(db, sess_batch, table=sessions_table_name, level='sessions')
|
|
print("inserted sessions succesfully")
|
|
except TypeError as e:
|
|
print("Type conversion error")
|
|
print(repr(e))
|
|
except ValueError as e:
|
|
print("Message value could not be processed or inserted correctly")
|
|
print(repr(e))
|
|
except Exception as e:
|
|
print(repr(e))
|
|
|
|
|
|
def attempt_batch_insert(batch):
|
|
# insert a batch
|
|
try:
|
|
print("inserting...")
|
|
insert_batch(db=db, batch=batch, table=table_name, level=LEVEL)
|
|
print("inserted succesfully")
|
|
except TypeError as e:
|
|
print("Type conversion error")
|
|
print(repr(e))
|
|
except ValueError as e:
|
|
print("Message value could not be processed or inserted correctly")
|
|
print(repr(e))
|
|
except Exception as e:
|
|
print(repr(e))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|