Adding method to handle frontend responses in batches

This commit is contained in:
MauricioGarciaS 2022-11-21 16:20:33 +01:00
parent 1945640ca7
commit 9144606b08
6 changed files with 100 additions and 4 deletions

View file

@ -1,13 +1,13 @@
import json
import schemas_ee
from chalicelib.utils import helper, queue
from chalicelib.utils import helper
from chalicelib.utils import pg_client
def handle_frontend_signals(project_id: int, user_id: str, data: schemas_ee.SignalsSchema):
res = {'errors': 'query not executed'}
insights_query = """INSERT INTO public.frontend_signals VALUES ({project_id}, {user_id}, {timestamp}, {action}, {source}, {category}, {data})"""
insights_query = """INSERT INTO public.frontend_signals VALUES (%(project_id)s, %(user_id)s, %(timestamp)s, %(action)s, %(source)s, %(category)s, %(data)s)"""
with pg_client.PostgresClient() as conn:
query = conn.mogrify(insights_query, {'project_id': project_id, 'user_id': user_id, 'timestamp': data['timestamp'], 'source': data['source'],
'category': data['category'], 'data': json.dumps(data['data'])})

View file

@ -0,0 +1,59 @@
import queue
import logging
# import threading
# import schemas
# import schemas_ee
#from utils import helper
from utils import pg_client
global_queue = None
class EventQueue():
def __init__(self, test=False, queue_max_length=5):
self.events = queue.Queue()
self.events.maxsize = queue_max_length
self.test = test
def flush(self, conn):
events = list()
while not self.events.empty():
events.append(self.events.get())
# self.events.task_done()
if self.test:
print(events)
return 1
_query = conn.mogrify("""INSERT INTO %(database)s.%(table)s (project_id, user_id, timestamp, action, source, category, data) VALUES %(events)s""",
{'database': 'public', 'table': 'frontend_signals', 'events': "(0, 'test', 0, 'action', 's', 'c', '{}')"})
logging.info(_query)
res = 'done'
# res = conn.fetchone()
#res = helper.dict_to_camel_case(conn.fetchone())
return res
def force_flush(self):
if not self.events.empty():
with pg_client.PostgreClient() as conn:
self.flush(conn)
def put(self, element):
if self.events.full():
with pg_client.PostgresClient() as conn:
self.flush(conn)
self.events.put(element)
self.events.task_done()
async def init(test=False):
global global_queue
global_queue = EventQueue(test=test)
logging.info("> queue initialized")
async def terminate():
global global_queue
if global_queue is not None:
global_queue.force_flush()
logging.info('> queue fulshed')

View file

@ -1,2 +0,0 @@
import schemas
import schemas_ee

View file

@ -1,7 +1,43 @@
import logging
from fastapi import FastAPI
from fastapi_utils.tasks import repeat_every
from utils import events_queue
from utils import pg_client
app = FastAPI()
first_boot=True
@app.get('/')
def home():
return '<h1>This is a title</h1>'
@app.get('/value/{value}')
@app.put('/value/{value}')
def number(value: int):
logging.info(f'> {value} as input. Testing queue with pg')
events_queue.global_queue.put(value)
@app.on_event("startup")
@repeat_every(seconds=60*1) # every 5 mins
async def startup():
global first_boot
if first_boot:
await pg_client.init()
await events_queue.init(test=False)
first_boot = False
else:
events_queue.global_queue.force_flush()
# @repeat_every(seconds=60*5) # 5 min
# def clean_up():
# events_queue.force_flush()
@app.on_event("shutdown")
async def shutdown():
await events_queue.terminate()
await pg_client.terminate()

View file

@ -12,6 +12,7 @@ scikit-learn==1.1.3
apache-airflow==2.4.3
fastapi==0.85.0
fastapi-utils
uvicorn[standard]==0.18.3
python-decouple==3.6
pydantic[email]==1.10.2

View file

@ -1,6 +1,8 @@
mkdir tmp
cp ../api/chalicelib/utils/ch_client.py tmp
cp ../api/chalicelib/utils/events_queue.py tmp
cp ../../api/chalicelib/utils/pg_client.py tmp
docker build -t my_test .
rm tmp/*.py
rmdir tmp
docker run -d -p 8080:8080 my_test