diff --git a/ee/api/app.py b/ee/api/app.py index 055706792..f3edcefe3 100644 --- a/ee/api/app.py +++ b/ee/api/app.py @@ -12,6 +12,7 @@ from starlette.responses import StreamingResponse, JSONResponse from chalicelib.core import traces from chalicelib.utils import helper from chalicelib.utils import pg_client +from chalicelib.utils import events_queue from routers import core, core_dynamic, ee, saml from routers.crons import core_crons from routers.crons import core_dynamic_crons @@ -81,6 +82,7 @@ app.queue_system = queue.Queue() async def startup(): logging.info(">>>>> starting up <<<<<") await pg_client.init() + await events_queue.init() app.schedule.start() for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs + traces.cron_jobs: @@ -96,6 +98,7 @@ async def shutdown(): logging.info(">>>>> shutting down <<<<<") app.schedule.shutdown(wait=True) await traces.process_traces_queue() + await events_queue.terminate() await pg_client.terminate() diff --git a/ee/api/chalicelib/core/signals.py b/ee/api/chalicelib/core/signals.py index ed0664bde..cb4d714c4 100644 --- a/ee/api/chalicelib/core/signals.py +++ b/ee/api/chalicelib/core/signals.py @@ -4,10 +4,10 @@ import schemas_ee import logging from chalicelib.utils import helper from chalicelib.utils import pg_client +from chalicelib.utils import events_queue def handle_frontend_signals(project_id: int, user_id: str, data: schemas_ee.SignalsSchema): - res = {'errors': 'query not executed'} insights_query = """INSERT INTO public.frontend_signals (project_id, user_id, timestamp, action, source, category, data) VALUES (%(project_id)s, %(user_id)s, %(timestamp)s, %(action)s, %(source)s, %(category)s, %(data)s)""" try: with pg_client.PostgresClient() as conn: @@ -19,3 +19,11 @@ def handle_frontend_signals(project_id: int, user_id: str, data: schemas_ee.Sign except Exception as e: logging.info(f'Error while inserting: {e}') return {'errors': [e]} + +def handle_frontend_signals_queued(project_id: int, user_id: str, data: schemas_ee.SignalsSchema): + try: + events_queue.global_queue.put((project_id, user_id, data)) + return {'data': 'insertion succeded'} + except Exception as e: + logging.info(f'Error while inserting: {e}') + return {'errors': [e]} diff --git a/ee/api/chalicelib/utils/events_queue.py b/ee/api/chalicelib/utils/events_queue.py index a37cec121..c6208bc79 100644 --- a/ee/api/chalicelib/utils/events_queue.py +++ b/ee/api/chalicelib/utils/events_queue.py @@ -1,12 +1,7 @@ import queue import logging -# import threading -# import schemas -# import schemas_ee - -#from utils import helper -from utils import pg_client +from chalicelib.utils import pg_client global_queue = None @@ -20,15 +15,16 @@ class EventQueue(): def flush(self, conn): events = list() while not self.events.empty(): - events.append(self.events.get()) - # self.events.task_done() + project_id, user_id, element = self.events.get() + events.append("({project_id}, '{user_id}', {timestamp}, '{action}', '{source}', '{category}', '{data}')".format( + project_id=project_id, user_id=user_id, timestamp=element.timestamp, action=element.action, source=element.source, category=element.category, data=element.data)) + if len(events)==0: + return 0 if self.test: print(events) return 1 - _query = """INSERT INTO {database}.{table} (project_id, user_id, timestamp, action, source, category, data) VALUES %(events)s""".format( - database='public', table='frontend_signals') - _query = conn.mogrify(_query, {'events': (0, 'test', 0, 'action', 's', 'c', '{}')}) - conn.execute(_query) + _base_query = 'INSERT INTO {database}.{table} (project_id, user_id, timestamp, action, source, category, data) VALUES {values_list}' + conn.execute(_base_query.format(database='public', table='frontend_signals', values_list=', '.join(events))) # logging.info(_query) # res = 'done' # res = conn.fetchone() diff --git a/ee/api/routers/core_dynamic.py b/ee/api/routers/core_dynamic.py index 3250df394..addbe749b 100644 --- a/ee/api/routers/core_dynamic.py +++ b/ee/api/routers/core_dynamic.py @@ -440,7 +440,7 @@ def get_all_notes(projectId: int, data: schemas.SearchNoteSchema = Body(...), @app.post('/{projectId}/signals', tags=['signals']) def send_interactions(projectId:int, data: schemas_ee.SignalsSchema = Body(...), context: schemas.CurrentContext = Depends(OR_context)): - data = signals.handle_frontend_signals(project_id=projectId, user_id=context.user_id, data=data) + data = signals.handle_frontend_signals_queued(project_id=projectId, user_id=context.user_id, data=data) if "errors" in data: return data diff --git a/ee/api/routers/crons/core_dynamic_crons.py b/ee/api/routers/crons/core_dynamic_crons.py index 1d8320eb7..f57b341bb 100644 --- a/ee/api/routers/crons/core_dynamic_crons.py +++ b/ee/api/routers/crons/core_dynamic_crons.py @@ -1,6 +1,7 @@ from chalicelib.core import telemetry, unlock from chalicelib.core import jobs from chalicelib.core import weekly_report as weekly_report_script +from chalicelib.utils import events_queue from decouple import config @@ -22,6 +23,9 @@ def unlock_cron() -> None: print(f"valid: {unlock.is_valid()}") +def pg_events_queue() -> None: + events_queue.global_queue.force_flush() + cron_jobs = [ {"func": unlock_cron, "trigger": "cron", "hour": "*"} ] @@ -29,7 +33,8 @@ cron_jobs = [ SINGLE_CRONS = [{"func": telemetry_cron, "trigger": "cron", "day_of_week": "*"}, {"func": run_scheduled_jobs, "trigger": "interval", "seconds": 60, "misfire_grace_time": 20}, {"func": weekly_report, "trigger": "cron", "day_of_week": "mon", "hour": 5, - "misfire_grace_time": 60 * 60}] + "misfire_grace_time": 60 * 60}, + {"func": pg_events_queue, "trigger": "cron", "interval": 60*5, "misfire_grace_time": 20}] if config("LOCAL_CRONS", default=False, cast=bool): cron_jobs += SINGLE_CRONS diff --git a/ee/recommendation/api.py b/ee/recommendation/api.py index 213d797fa..f9d3e05bf 100644 --- a/ee/recommendation/api.py +++ b/ee/recommendation/api.py @@ -4,6 +4,7 @@ from fastapi import FastAPI # from fastapi_utils.tasks import repeat_every from utils import events_queue from utils import pg_client +from utils import schemas_ee app = FastAPI() app.schedule = AsyncIOScheduler() @@ -17,7 +18,10 @@ def home(): @app.put('/value/{value}') def number(value: int): logging.info(f'> {value} as input. Testing queue with pg') - events_queue.global_queue.put(value) + d = {'timestamp': 23786, 'action': 'action', 'source': 'source', 'category': 'cat', 'data': {}} + events = schemas_ee.SignalsSchema + event = events.parse_obj(d) + events_queue.global_queue.put((value, 0, event)) @app.on_event("startup")