Added queue method to Signals
This commit is contained in:
parent
1cadd08774
commit
2e9ff89976
6 changed files with 32 additions and 16 deletions
|
|
@ -12,6 +12,7 @@ from starlette.responses import StreamingResponse, JSONResponse
|
|||
from chalicelib.core import traces
|
||||
from chalicelib.utils import helper
|
||||
from chalicelib.utils import pg_client
|
||||
from chalicelib.utils import events_queue
|
||||
from routers import core, core_dynamic, ee, saml
|
||||
from routers.crons import core_crons
|
||||
from routers.crons import core_dynamic_crons
|
||||
|
|
@ -81,6 +82,7 @@ app.queue_system = queue.Queue()
|
|||
async def startup():
|
||||
logging.info(">>>>> starting up <<<<<")
|
||||
await pg_client.init()
|
||||
await events_queue.init()
|
||||
app.schedule.start()
|
||||
|
||||
for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs + traces.cron_jobs:
|
||||
|
|
@ -96,6 +98,7 @@ async def shutdown():
|
|||
logging.info(">>>>> shutting down <<<<<")
|
||||
app.schedule.shutdown(wait=True)
|
||||
await traces.process_traces_queue()
|
||||
await events_queue.terminate()
|
||||
await pg_client.terminate()
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -4,10 +4,10 @@ import schemas_ee
|
|||
import logging
|
||||
from chalicelib.utils import helper
|
||||
from chalicelib.utils import pg_client
|
||||
from chalicelib.utils import events_queue
|
||||
|
||||
|
||||
def handle_frontend_signals(project_id: int, user_id: str, data: schemas_ee.SignalsSchema):
|
||||
res = {'errors': 'query not executed'}
|
||||
insights_query = """INSERT INTO public.frontend_signals (project_id, user_id, timestamp, action, source, category, data) VALUES (%(project_id)s, %(user_id)s, %(timestamp)s, %(action)s, %(source)s, %(category)s, %(data)s)"""
|
||||
try:
|
||||
with pg_client.PostgresClient() as conn:
|
||||
|
|
@ -19,3 +19,11 @@ def handle_frontend_signals(project_id: int, user_id: str, data: schemas_ee.Sign
|
|||
except Exception as e:
|
||||
logging.info(f'Error while inserting: {e}')
|
||||
return {'errors': [e]}
|
||||
|
||||
def handle_frontend_signals_queued(project_id: int, user_id: str, data: schemas_ee.SignalsSchema):
|
||||
try:
|
||||
events_queue.global_queue.put((project_id, user_id, data))
|
||||
return {'data': 'insertion succeded'}
|
||||
except Exception as e:
|
||||
logging.info(f'Error while inserting: {e}')
|
||||
return {'errors': [e]}
|
||||
|
|
|
|||
|
|
@ -1,12 +1,7 @@
|
|||
import queue
|
||||
import logging
|
||||
# import threading
|
||||
|
||||
# import schemas
|
||||
# import schemas_ee
|
||||
|
||||
#from utils import helper
|
||||
from utils import pg_client
|
||||
from chalicelib.utils import pg_client
|
||||
|
||||
global_queue = None
|
||||
|
||||
|
|
@ -20,15 +15,16 @@ class EventQueue():
|
|||
def flush(self, conn):
|
||||
events = list()
|
||||
while not self.events.empty():
|
||||
events.append(self.events.get())
|
||||
# self.events.task_done()
|
||||
project_id, user_id, element = self.events.get()
|
||||
events.append("({project_id}, '{user_id}', {timestamp}, '{action}', '{source}', '{category}', '{data}')".format(
|
||||
project_id=project_id, user_id=user_id, timestamp=element.timestamp, action=element.action, source=element.source, category=element.category, data=element.data))
|
||||
if len(events)==0:
|
||||
return 0
|
||||
if self.test:
|
||||
print(events)
|
||||
return 1
|
||||
_query = """INSERT INTO {database}.{table} (project_id, user_id, timestamp, action, source, category, data) VALUES %(events)s""".format(
|
||||
database='public', table='frontend_signals')
|
||||
_query = conn.mogrify(_query, {'events': (0, 'test', 0, 'action', 's', 'c', '{}')})
|
||||
conn.execute(_query)
|
||||
_base_query = 'INSERT INTO {database}.{table} (project_id, user_id, timestamp, action, source, category, data) VALUES {values_list}'
|
||||
conn.execute(_base_query.format(database='public', table='frontend_signals', values_list=', '.join(events)))
|
||||
# logging.info(_query)
|
||||
# res = 'done'
|
||||
# res = conn.fetchone()
|
||||
|
|
|
|||
|
|
@ -440,7 +440,7 @@ def get_all_notes(projectId: int, data: schemas.SearchNoteSchema = Body(...),
|
|||
@app.post('/{projectId}/signals', tags=['signals'])
|
||||
def send_interactions(projectId:int, data: schemas_ee.SignalsSchema = Body(...),
|
||||
context: schemas.CurrentContext = Depends(OR_context)):
|
||||
data = signals.handle_frontend_signals(project_id=projectId, user_id=context.user_id, data=data)
|
||||
data = signals.handle_frontend_signals_queued(project_id=projectId, user_id=context.user_id, data=data)
|
||||
|
||||
if "errors" in data:
|
||||
return data
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
from chalicelib.core import telemetry, unlock
|
||||
from chalicelib.core import jobs
|
||||
from chalicelib.core import weekly_report as weekly_report_script
|
||||
from chalicelib.utils import events_queue
|
||||
from decouple import config
|
||||
|
||||
|
||||
|
|
@ -22,6 +23,9 @@ def unlock_cron() -> None:
|
|||
print(f"valid: {unlock.is_valid()}")
|
||||
|
||||
|
||||
def pg_events_queue() -> None:
|
||||
events_queue.global_queue.force_flush()
|
||||
|
||||
cron_jobs = [
|
||||
{"func": unlock_cron, "trigger": "cron", "hour": "*"}
|
||||
]
|
||||
|
|
@ -29,7 +33,8 @@ cron_jobs = [
|
|||
SINGLE_CRONS = [{"func": telemetry_cron, "trigger": "cron", "day_of_week": "*"},
|
||||
{"func": run_scheduled_jobs, "trigger": "interval", "seconds": 60, "misfire_grace_time": 20},
|
||||
{"func": weekly_report, "trigger": "cron", "day_of_week": "mon", "hour": 5,
|
||||
"misfire_grace_time": 60 * 60}]
|
||||
"misfire_grace_time": 60 * 60},
|
||||
{"func": pg_events_queue, "trigger": "cron", "interval": 60*5, "misfire_grace_time": 20}]
|
||||
|
||||
if config("LOCAL_CRONS", default=False, cast=bool):
|
||||
cron_jobs += SINGLE_CRONS
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ from fastapi import FastAPI
|
|||
# from fastapi_utils.tasks import repeat_every
|
||||
from utils import events_queue
|
||||
from utils import pg_client
|
||||
from utils import schemas_ee
|
||||
|
||||
app = FastAPI()
|
||||
app.schedule = AsyncIOScheduler()
|
||||
|
|
@ -17,7 +18,10 @@ def home():
|
|||
@app.put('/value/{value}')
|
||||
def number(value: int):
|
||||
logging.info(f'> {value} as input. Testing queue with pg')
|
||||
events_queue.global_queue.put(value)
|
||||
d = {'timestamp': 23786, 'action': 'action', 'source': 'source', 'category': 'cat', 'data': {}}
|
||||
events = schemas_ee.SignalsSchema
|
||||
event = events.parse_obj(d)
|
||||
events_queue.global_queue.put((value, 0, event))
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue