diff --git a/ee/api/chalicelib/core/signals.py b/ee/api/chalicelib/core/signals.py index 20f3350aa..702e4f2fa 100644 --- a/ee/api/chalicelib/core/signals.py +++ b/ee/api/chalicelib/core/signals.py @@ -1,13 +1,13 @@ import json import schemas_ee -from chalicelib.utils import helper, queue +from chalicelib.utils import helper from chalicelib.utils import pg_client def handle_frontend_signals(project_id: int, user_id: str, data: schemas_ee.SignalsSchema): res = {'errors': 'query not executed'} - insights_query = """INSERT INTO public.frontend_signals VALUES ({project_id}, {user_id}, {timestamp}, {action}, {source}, {category}, {data})""" + insights_query = """INSERT INTO public.frontend_signals VALUES (%(project_id)s, %(user_id)s, %(timestamp)s, %(action)s, %(source)s, %(category)s, %(data)s)""" with pg_client.PostgresClient() as conn: query = conn.mogrify(insights_query, {'project_id': project_id, 'user_id': user_id, 'timestamp': data['timestamp'], 'source': data['source'], 'category': data['category'], 'data': json.dumps(data['data'])}) diff --git a/ee/api/chalicelib/utils/events_queue.py b/ee/api/chalicelib/utils/events_queue.py new file mode 100644 index 000000000..8a002a8cf --- /dev/null +++ b/ee/api/chalicelib/utils/events_queue.py @@ -0,0 +1,59 @@ +import queue +import logging +# import threading + +# import schemas +# import schemas_ee + +#from utils import helper +from utils import pg_client + +global_queue = None + +class EventQueue(): + + def __init__(self, test=False, queue_max_length=5): + self.events = queue.Queue() + self.events.maxsize = queue_max_length + self.test = test + + def flush(self, conn): + events = list() + while not self.events.empty(): + events.append(self.events.get()) + # self.events.task_done() + if self.test: + print(events) + return 1 + _query = conn.mogrify("""INSERT INTO %(database)s.%(table)s (project_id, user_id, timestamp, action, source, category, data) VALUES %(events)s""", + {'database': 'public', 'table': 'frontend_signals', 'events': "(0, 'test', 0, 'action', 's', 'c', '{}')"}) + logging.info(_query) + res = 'done' + # res = conn.fetchone() + #res = helper.dict_to_camel_case(conn.fetchone()) + return res + + def force_flush(self): + if not self.events.empty(): + with pg_client.PostgreClient() as conn: + self.flush(conn) + + def put(self, element): + if self.events.full(): + with pg_client.PostgresClient() as conn: + self.flush(conn) + self.events.put(element) + self.events.task_done() + +async def init(test=False): + global global_queue + global_queue = EventQueue(test=test) + logging.info("> queue initialized") + +async def terminate(): + global global_queue + if global_queue is not None: + global_queue.force_flush() + logging.info('> queue fulshed') + + diff --git a/ee/api/chalicelib/utils/queue.py b/ee/api/chalicelib/utils/queue.py deleted file mode 100644 index ac5f9c5a2..000000000 --- a/ee/api/chalicelib/utils/queue.py +++ /dev/null @@ -1,2 +0,0 @@ -import schemas -import schemas_ee diff --git a/ee/recommendation/api.py b/ee/recommendation/api.py index 596e8d0b0..b2d47b26d 100644 --- a/ee/recommendation/api.py +++ b/ee/recommendation/api.py @@ -1,7 +1,43 @@ +import logging from fastapi import FastAPI +from fastapi_utils.tasks import repeat_every +from utils import events_queue +from utils import pg_client app = FastAPI() +first_boot=True + @app.get('/') def home(): return '

This is a title

' + + +@app.get('/value/{value}') +@app.put('/value/{value}') +def number(value: int): + logging.info(f'> {value} as input. Testing queue with pg') + events_queue.global_queue.put(value) + + +@app.on_event("startup") +@repeat_every(seconds=60*1) # every 5 mins +async def startup(): + global first_boot + if first_boot: + await pg_client.init() + await events_queue.init(test=False) + first_boot = False + else: + events_queue.global_queue.force_flush() + + +# @repeat_every(seconds=60*5) # 5 min +# def clean_up(): +# events_queue.force_flush() + + +@app.on_event("shutdown") +async def shutdown(): + await events_queue.terminate() + await pg_client.terminate() diff --git a/ee/recommendation/requirements.txt b/ee/recommendation/requirements.txt index 9d2a7872d..8f3f9e958 100644 --- a/ee/recommendation/requirements.txt +++ b/ee/recommendation/requirements.txt @@ -12,6 +12,7 @@ scikit-learn==1.1.3 apache-airflow==2.4.3 fastapi==0.85.0 +fastapi-utils uvicorn[standard]==0.18.3 python-decouple==3.6 pydantic[email]==1.10.2 diff --git a/ee/recommendation/run.sh b/ee/recommendation/run.sh index 1754c4e99..e5f70a5a4 100644 --- a/ee/recommendation/run.sh +++ b/ee/recommendation/run.sh @@ -1,6 +1,8 @@ mkdir tmp cp ../api/chalicelib/utils/ch_client.py tmp +cp ../api/chalicelib/utils/events_queue.py tmp cp ../../api/chalicelib/utils/pg_client.py tmp docker build -t my_test . rm tmp/*.py rmdir tmp +docker run -d -p 8080:8080 my_test