Updated evets_queue.py and signals.py
This commit is contained in:
parent
0dde0d04cd
commit
f4b1ad1186
2 changed files with 28 additions and 15 deletions
|
|
@ -1,6 +1,7 @@
|
|||
import json
|
||||
|
||||
import schemas_ee
|
||||
import logging
|
||||
from chalicelib.utils import helper
|
||||
from chalicelib.utils import pg_client
|
||||
|
||||
|
|
@ -8,9 +9,13 @@ from chalicelib.utils import pg_client
|
|||
def handle_frontend_signals(project_id: int, user_id: str, data: schemas_ee.SignalsSchema):
|
||||
res = {'errors': 'query not executed'}
|
||||
insights_query = """INSERT INTO public.frontend_signals VALUES (%(project_id)s, %(user_id)s, %(timestamp)s, %(action)s, %(source)s, %(category)s, %(data)s)"""
|
||||
with pg_client.PostgresClient() as conn:
|
||||
query = conn.mogrify(insights_query, {'project_id': project_id, 'user_id': user_id, 'timestamp': data.timestamp, 'action': data.action, 'source': data.source,
|
||||
try:
|
||||
with pg_client.PostgresClient() as conn:
|
||||
query = conn.mogrify(insights_query, {'project_id': project_id, 'user_id': user_id, 'timestamp': data.timestamp, 'action': data.action, 'source': data.source,
|
||||
'category': data.category, 'data': json.dumps(data.data)})
|
||||
conn.execute(query)
|
||||
res = helper.dict_to_camel_case(conn.fetchone())
|
||||
return res
|
||||
conn.execute(query)
|
||||
# res = helper.dict_to_camel_case(conn.fetchone())
|
||||
return 1
|
||||
except Exception as e:
|
||||
logging.info(f'Error while inserting\n{e}')
|
||||
return 0
|
||||
|
|
|
|||
|
|
@ -25,23 +25,31 @@ class EventQueue():
|
|||
if self.test:
|
||||
print(events)
|
||||
return 1
|
||||
_query = conn.mogrify("""INSERT INTO %(database)s.%(table)s (project_id, user_id, timestamp, action, source, category, data) VALUES %(events)s""",
|
||||
{'database': 'public', 'table': 'frontend_signals', 'events': "(0, 'test', 0, 'action', 's', 'c', '{}')"})
|
||||
logging.info(_query)
|
||||
res = 'done'
|
||||
_query = """INSERT INTO {database}.{table} (project_id, user_id, timestamp, action, source, category, data) VALUES %(events)s""".format(
|
||||
database='public', table='frontend_signals')
|
||||
_query = conn.mogrify(_query, {'events': (0, 'test', 0, 'action', 's', 'c', '{}')})
|
||||
conn.execute(_query)
|
||||
# logging.info(_query)
|
||||
# res = 'done'
|
||||
# res = conn.fetchone()
|
||||
#res = helper.dict_to_camel_case(conn.fetchone())
|
||||
return res
|
||||
# res = helper.dict_to_camel_case(conn.fetchone())
|
||||
return 1
|
||||
|
||||
def force_flush(self):
|
||||
if not self.events.empty():
|
||||
with pg_client.PostgresClient() as conn:
|
||||
self.flush(conn)
|
||||
try:
|
||||
with pg_client.PostgresClient() as conn:
|
||||
self.flush(conn)
|
||||
except Exception as e:
|
||||
logging.info(f'Error: {e}')
|
||||
|
||||
def put(self, element):
|
||||
if self.events.full():
|
||||
with pg_client.PostgresClient() as conn:
|
||||
self.flush(conn)
|
||||
try:
|
||||
with pg_client.PostgresClient() as conn:
|
||||
self.flush(conn)
|
||||
except Exception as e:
|
||||
logging.info(f'Error: {e}')
|
||||
self.events.put(element)
|
||||
self.events.task_done()
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue