Changed function placements and mogrify for pg INSERT

This commit is contained in:
MauricioGarciaS 2022-12-13 18:00:24 +01:00
parent fd7a4b80f1
commit d83f731eb2
8 changed files with 55 additions and 52 deletions

View file

@ -16,6 +16,7 @@ from chalicelib.utils import events_queue
from routers import core, core_dynamic, ee, saml
from routers.crons import core_crons
from routers.crons import core_dynamic_crons
from routers.crons import ee_crons
from routers.subs import dashboard, insights, metrics, v1_api_ee
from routers.subs import v1_api
@ -85,7 +86,7 @@ async def startup():
await events_queue.init()
app.schedule.start()
for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs + traces.cron_jobs:
for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs + traces.cron_jobs + ee_crons.ee_cron_jobs:
app.schedule.add_job(id=job["func"].__name__, **job)
ap_logger.info(">Scheduled jobs:")

View file

@ -1,26 +1,9 @@
import json
import schemas_ee
import logging
from chalicelib.utils import helper
from chalicelib.utils import pg_client
from chalicelib.utils import events_queue
def handle_frontend_signals(project_id: int, user_id: str, data: schemas_ee.SignalsSchema):
insights_query = """INSERT INTO public.frontend_signals (project_id, user_id, timestamp, action, source, category, data) VALUES (%(project_id)s, %(user_id)s, %(timestamp)s, %(action)s, %(source)s, %(category)s, %(data)s)"""
try:
with pg_client.PostgresClient() as conn:
query = conn.mogrify(insights_query, {'project_id': project_id, 'user_id': user_id, 'timestamp': data.timestamp, 'action': data.action, 'source': data.source,
'category': data.category, 'data': json.dumps(data.data)})
conn.execute(query)
# res = helper.dict_to_camel_case(conn.fetchone())
return {'data': 'insertion succeded'}
except Exception as e:
logging.info(f'Error while inserting: {e}')
return {'errors': [e]}
def handle_frontend_signals_queued(project_id: int, user_id: str, data: schemas_ee.SignalsSchema):
def handle_frontend_signals_queued(project_id: int, user_id: int, data: schemas_ee.SignalsSchema):
try:
events_queue.global_queue.put((project_id, user_id, data))
return {'data': 'insertion succeded'}

View file

@ -15,21 +15,29 @@ class EventQueue():
def flush(self, conn):
events = list()
params = dict()
# while not self.events.empty():
# project_id, user_id, element = self.events.get()
# events.append("({project_id}, {user_id}, {timestamp}, '{action}', '{source}', '{category}', '{data}')".format(
# project_id=project_id, user_id=user_id, timestamp=element.timestamp, action=element.action, source=element.source, category=element.category, data=json.dumps(element.data)))
i = 0
while not self.events.empty():
project_id, user_id, element = self.events.get()
events.append("({project_id}, '{user_id}', {timestamp}, '{action}', '{source}', '{category}', '{data}')".format(
project_id=project_id, user_id=user_id, timestamp=element.timestamp, action=element.action, source=element.source, category=element.category, data=json.dumps(element.data)))
if len(events)==0:
params[f'project_id_{i}'] = project_id
params[f'user_id_{i}'] = user_id
for _key, _val in element.dict().items():
params[f'{_key}_{i}'] = _val
events.append(f"(%(project_id_{i})s, %(user_id_{i})s, %(timestamp_{i})s, %(action_{i})s, %(source_{i})s, %(category_{i})s, %(data_{i})s::jsonb)")
i += 1
if i == 0:
return 0
if self.test:
print(events)
return 1
_base_query = 'INSERT INTO {database}.{table} (project_id, user_id, timestamp, action, source, category, data) VALUES {values_list}'
conn.execute(_base_query.format(database='public', table='frontend_signals', values_list=', '.join(events)))
# logging.info(_query)
# res = 'done'
# res = conn.fetchone()
# res = helper.dict_to_camel_case(conn.fetchone())
conn.execute(
conn.mogrify(f"""INSERT INTO public.frontend_signals (project_id, user_id, timestamp, action, source, category, data)
VALUES {' , '.join(events)}""", params)
)
return 1
def force_flush(self):
@ -61,4 +69,9 @@ async def terminate():
global_queue.force_flush()
logging.info('> queue fulshed')
# def __process_schema(trace):
# data = trace.dict()
# data["parameters"] = json.dumps(trace.parameters) if trace.parameters is not None and len(
# trace.parameters.keys()) > 0 else None
# data["payload"] = json.dumps(trace.payload) if trace.payload is not None and len(trace.payload.keys()) > 0 else None
# return data

View file

@ -8,7 +8,7 @@ import schemas
import schemas_ee
from chalicelib.core import sessions, assist, heatmaps, sessions_favorite, sessions_assignments, errors, errors_viewed, \
errors_favorite, sessions_notes
from chalicelib.core import sessions_viewed, signals
from chalicelib.core import sessions_viewed
from chalicelib.core import tenants, users, projects, license
from chalicelib.core import webhook
from chalicelib.core.collaboration_slack import Slack
@ -436,12 +436,3 @@ def get_all_notes(projectId: int, data: schemas.SearchNoteSchema = Body(...),
return data
return {'data': data}
@app.post('/{projectId}/signals', tags=['signals'])
def send_interactions(projectId:int, data: schemas_ee.SignalsSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
data = signals.handle_frontend_signals_queued(project_id=projectId, user_id=context.user_id, data=data)
if "errors" in data:
return data
return {'data': data}

View file

@ -1,7 +1,6 @@
from chalicelib.core import telemetry, unlock
from chalicelib.core import jobs
from chalicelib.core import weekly_report as weekly_report_script
from chalicelib.utils import events_queue
from decouple import config
@ -23,12 +22,8 @@ def unlock_cron() -> None:
print(f"valid: {unlock.is_valid()}")
def pg_events_queue() -> None:
events_queue.global_queue.force_flush()
cron_jobs = [
{"func": unlock_cron, "trigger": "cron", "hour": "*"},
{"func": pg_events_queue, "trigger": "interval", "seconds": 60*5, "misfire_grace_time": 20}
]
SINGLE_CRONS = [{"func": telemetry_cron, "trigger": "cron", "day_of_week": "*"},

View file

@ -0,0 +1,10 @@
from chalicelib.utils import events_queue
def pg_events_queue() -> None:
events_queue.global_queue.force_flush()
ee_cron_jobs = [
{"func": pg_events_queue, "trigger": "interval", "seconds": 60*5, "misfire_grace_time": 20},
]

View file

@ -1,5 +1,5 @@
from chalicelib.core import roles, traces
from chalicelib.core import unlock
from chalicelib.core import unlock, signals
from chalicelib.utils import assist_helper
unlock.check()
@ -71,3 +71,13 @@ def get_trails(data: schemas_ee.TrailSearchPayloadSchema = Body(...),
@app.post('/trails/actions', tags=["traces", "trails"])
def get_available_trail_actions(context: schemas.CurrentContext = Depends(OR_context)):
return {'data': traces.get_available_actions(tenant_id=context.tenant_id)}
@app.post('/{projectId}/signals', tags=['signals'])
def send_interactions(projectId: int, data: schemas_ee.SignalsSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
data = signals.handle_frontend_signals_queued(project_id=projectId, user_id=context.user_id, data=data)
if "errors" in data:
return data
return {'data': data}

View file

@ -7,13 +7,13 @@ $$ LANGUAGE sql IMMUTABLE;
CREATE TABLE IF NOT EXISTS frontend_signals
(
project_id bigint NOT NULL,
user_id text NOT NULL,
timestamp bigint NOT NULL,
action text NOT NULL,
source text NOT NULL,
category text NOT NULL,
data json
project_id bigint NOT NULL,
user_id integer NOT NULL references users (user_id) ON DELETE CASCADE,
timestamp bigint NOT NULL,
action text NOT NULL,
source text NOT NULL,
category text NOT NULL,
data jsonb
);
CREATE INDEX IF NOT EXISTS frontend_signals_user_id_idx ON frontend_signals (user_id);