* feat(DB): rearranged queries feat(DB): ready for v1.15.0 * refactor(chalice): upgraded dependencies refactor(crons): upgraded dependencies refactor(alerts): upgraded dependencies * fix(chalice): return error when updating inexistant webhook * feat(chalice): fixed delete webhook response * feat(chalice): limit webhooks name length * feat(chalice): upgraded dependencies feat(alerts): upgraded dependencies feat(crons): upgraded dependencies * fix(chalice): remove urllib3 dependency * feat(chalice): remove FOSS to pydantic v2 * fix(chalice): freeze urllib3 to not have conflicts between boto3 and requests * feat(chalice): refactoring schema in progress * feat(chalice): refactoring schema in progress * feat(chalice): refactoring schema in progress * feat(chalice): refactoring schema in progress feat(chalice): upgraded dependencies * feat(chalice): refactored schema * fix(chalice): pull rebase dev * feat(DB): transfer size support * feat(chalice): support service account * feat(chalice): support service account * fix(chalice): fixed refactored PayloadSchema-name * feat(chalice): path analysis * feat(chalice): support service account 1/2 * feat(DB): timezone support * feat(chalice): upgraded dependencies feat(alerts): upgraded dependencies feat(crons): upgraded dependencies feat(assist): upgraded dependencies feat(sourcemaps): upgraded dependencies * feat(chalice): path analysis schema changes * feat(chalice): path analysis query change * feat(chalice): path analysis query change * feat(chalice): ios replay support * feat(chalice): ios replay support * feat(chalice): path analysis changes * feat(chalice): upgraded dependencies * feat(chalice): simple hide minor paths * feat(chalice): path analysis density * feat(chalice): session's replay ios events * feat(chalice): fixed typo * feat(chalice): support project's platform * feat(DB): support project's platform * feat(chalice): path analysis EE in progress * feat(chalice): project's platform API * feat(chalice): fixed create project * feat(chalice): EE path analysis in progress * feat(chalice): EE path analysis refactor(chalice): support specific database name for clickhouse-client * feat(chalice): upgraded dependencies feat(chalice): path analysis specific event type for startPoint feat(chalice): path analysis specific event type for endPoint feat(chalice): path analysis specific event type for exclude * refactoring(chalice): changed IOS click event type
84 lines
3.1 KiB
Python
84 lines
3.1 KiB
Python
import json
|
|
import queue
|
|
import logging
|
|
|
|
from chalicelib.utils import pg_client
|
|
|
|
global_queue = None
|
|
|
|
class EventQueue():
|
|
|
|
def __init__(self, test=False, queue_max_length=100):
|
|
self.events = queue.Queue()
|
|
self.events.maxsize = queue_max_length
|
|
self.test = test
|
|
|
|
def flush(self, conn):
|
|
events = list()
|
|
params = dict()
|
|
# while not self.events.empty():
|
|
# project_id, user_id, element = self.events.get()
|
|
# events.append("({project_id}, {user_id}, {timestamp}, '{action}', '{source}', '{category}', '{data}')".format(
|
|
# project_id=project_id, user_id=user_id, timestamp=element.timestamp, action=element.action, source=element.source, category=element.category, data=json.dumps(element.data)))
|
|
i = 0
|
|
while not self.events.empty():
|
|
project_id, user_id, element = self.events.get()
|
|
params[f'project_id_{i}'] = project_id
|
|
params[f'user_id_{i}'] = user_id
|
|
for _key, _val in element.model_dump().items():
|
|
if _key == 'data':
|
|
params[f'{_key}_{i}'] = json.dumps(_val)
|
|
if 'sessionId' in _val.keys():
|
|
params[f'session_id_{i}'] = int(_val['sessionId'])
|
|
else:
|
|
params[f'session_id_{i}'] = None
|
|
else:
|
|
params[f'{_key}_{i}'] = _val
|
|
events.append(f"(%(project_id_{i})s, %(user_id_{i})s, %(timestamp_{i})s, %(action_{i})s, %(source_{i})s, %(category_{i})s, %(data_{i})s::jsonb, %(session_id_{i})s)")
|
|
i += 1
|
|
if i == 0:
|
|
return 0
|
|
if self.test:
|
|
print(events)
|
|
return 1
|
|
conn.execute(
|
|
conn.mogrify(f"""INSERT INTO public.frontend_signals (project_id, user_id, timestamp, action, source, category, data, session_id)
|
|
VALUES {' , '.join(events)}""", params)
|
|
)
|
|
return 1
|
|
|
|
def force_flush(self):
|
|
if not self.events.empty():
|
|
try:
|
|
with pg_client.PostgresClient() as conn:
|
|
self.flush(conn)
|
|
except Exception as e:
|
|
logging.info(f'Error: {e}')
|
|
|
|
def put(self, element):
|
|
if self.events.full():
|
|
try:
|
|
with pg_client.PostgresClient() as conn:
|
|
self.flush(conn)
|
|
except Exception as e:
|
|
logging.info(f'Error: {e}')
|
|
self.events.put(element)
|
|
self.events.task_done()
|
|
|
|
async def init(test=False):
|
|
global global_queue
|
|
global_queue = EventQueue(test=test)
|
|
logging.info("> queue initialized")
|
|
|
|
async def terminate():
|
|
global global_queue
|
|
if global_queue is not None:
|
|
global_queue.force_flush()
|
|
logging.info('> queue fulshed')
|
|
|
|
# def __process_schema(trace):
|
|
# data = trace.model_dump()
|
|
# data["parameters"] = json.dumps(trace.parameters) if trace.parameters is not None and len(
|
|
# trace.parameters.keys()) > 0 else None
|
|
# data["payload"] = json.dumps(trace.payload) if trace.payload is not None and len(trace.payload.keys()) > 0 else None
|
|
# return data
|