feat(api): sessions-search support null results
feat(api): pg_client support timeout feat(api): pg_client support pool config feat(api): pg_client support long-queries
This commit is contained in:
parent
1ea681269b
commit
54677af4fd
4 changed files with 24 additions and 12 deletions
|
|
@ -35,6 +35,8 @@ pg_host=postgresql.db.svc.cluster.local
|
|||
pg_password=asayerPostgres
|
||||
pg_port=5432
|
||||
pg_user=postgres
|
||||
pg_timeout=30
|
||||
pg_minconn=50
|
||||
put_S3_TTL=20
|
||||
sentryURL=
|
||||
sessions_bucket=mobs
|
||||
|
|
|
|||
|
|
@ -562,7 +562,7 @@ def search2_pg(data: schemas.SessionsSearchPayloadSchema, project_id, user_id, f
|
|||
{query_part};""",
|
||||
full_args)
|
||||
else:
|
||||
main_query = cur.mogrify(f"""SELECT COUNT(full_sessions) AS count, JSONB_AGG(full_sessions) FILTER (WHERE rn <= 200) AS sessions
|
||||
main_query = cur.mogrify(f"""SELECT COUNT(full_sessions) AS count, COALESCE(JSONB_AGG(full_sessions) FILTER (WHERE rn <= 200), '[]'::JSONB) AS sessions
|
||||
FROM (SELECT *, ROW_NUMBER() OVER (ORDER BY favorite DESC, issue_score DESC, session_id desc, start_ts desc) AS rn FROM
|
||||
(SELECT DISTINCT ON(s.session_id) {SESSION_PROJECTION_COLS}
|
||||
{query_part}
|
||||
|
|
@ -825,7 +825,7 @@ def get_session_ids_by_user_ids(project_id, user_ids):
|
|||
|
||||
|
||||
def delete_sessions_by_session_ids(session_ids):
|
||||
with pg_client.PostgresClient() as cur:
|
||||
with pg_client.PostgresClient(long_query=True) as cur:
|
||||
query = cur.mogrify(
|
||||
"""\
|
||||
DELETE FROM public.sessions
|
||||
|
|
@ -839,7 +839,7 @@ def delete_sessions_by_session_ids(session_ids):
|
|||
|
||||
|
||||
def delete_sessions_by_user_ids(project_id, user_ids):
|
||||
with pg_client.PostgresClient() as cur:
|
||||
with pg_client.PostgresClient(long_query=True) as cur:
|
||||
query = cur.mogrify(
|
||||
"""\
|
||||
DELETE FROM public.sessions
|
||||
|
|
@ -853,6 +853,6 @@ def delete_sessions_by_user_ids(project_id, user_ids):
|
|||
|
||||
|
||||
def count_all():
|
||||
with pg_client.PostgresClient() as cur:
|
||||
with pg_client.PostgresClient(long_query=True) as cur:
|
||||
row = cur.execute(query="SELECT COUNT(session_id) AS count FROM public.sessions")
|
||||
return row.get("count", 0)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
from decouple import config
|
||||
|
||||
from chalicelib.utils import pg_client, helper
|
||||
from chalicelib.utils.TimeUTC import TimeUTC
|
||||
from decouple import config
|
||||
from chalicelib.utils.helper import get_issue_title
|
||||
|
||||
LOWEST_BAR_VALUE = 3
|
||||
|
|
@ -30,7 +31,7 @@ def edit_config(user_id, weekly_report):
|
|||
|
||||
|
||||
def cron():
|
||||
with pg_client.PostgresClient() as cur:
|
||||
with pg_client.PostgresClient(long_query=True) as cur:
|
||||
params = {"3_days_ago": TimeUTC.midnight(delta_days=-3),
|
||||
"1_week_ago": TimeUTC.midnight(delta_days=-7),
|
||||
"2_week_ago": TimeUTC.midnight(delta_days=-14),
|
||||
|
|
|
|||
|
|
@ -1,15 +1,17 @@
|
|||
from threading import Semaphore
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.extras
|
||||
from decouple import config
|
||||
from psycopg2 import pool
|
||||
|
||||
PG_CONFIG = {"host": config("pg_host"),
|
||||
"database": config("pg_dbname"),
|
||||
"user": config("pg_user"),
|
||||
"password": config("pg_password"),
|
||||
"port": config("pg_port", cast=int)}
|
||||
|
||||
from psycopg2 import pool
|
||||
from threading import Semaphore
|
||||
if config("pg_timeout", cast=int, default=0) > 0:
|
||||
PG_CONFIG["options"] = f"-c statement_timeout={config('pg_timeout', cast=int) * 1000}"
|
||||
|
||||
|
||||
class ORThreadedConnectionPool(psycopg2.pool.ThreadedConnectionPool):
|
||||
|
|
@ -27,7 +29,7 @@ class ORThreadedConnectionPool(psycopg2.pool.ThreadedConnectionPool):
|
|||
|
||||
|
||||
try:
|
||||
postgreSQL_pool = ORThreadedConnectionPool(50, 100, **PG_CONFIG)
|
||||
postgreSQL_pool = ORThreadedConnectionPool(config("pg_minconn", cast=int, default=20), 100, **PG_CONFIG)
|
||||
if (postgreSQL_pool):
|
||||
print("Connection pool created successfully")
|
||||
except (Exception, psycopg2.DatabaseError) as error:
|
||||
|
|
@ -38,9 +40,14 @@ except (Exception, psycopg2.DatabaseError) as error:
|
|||
class PostgresClient:
|
||||
connection = None
|
||||
cursor = None
|
||||
long_query = False
|
||||
|
||||
def __init__(self):
|
||||
self.connection = postgreSQL_pool.getconn()
|
||||
def __init__(self, long_query=False):
|
||||
self.long_query = long_query
|
||||
if long_query:
|
||||
self.connection = psycopg2.connect(**PG_CONFIG)
|
||||
else:
|
||||
self.connection = postgreSQL_pool.getconn()
|
||||
|
||||
def __enter__(self):
|
||||
if self.cursor is None:
|
||||
|
|
@ -51,6 +58,8 @@ class PostgresClient:
|
|||
try:
|
||||
self.connection.commit()
|
||||
self.cursor.close()
|
||||
if self.long_query:
|
||||
self.connection.close()
|
||||
except Exception as error:
|
||||
print("Error while committing/closing PG-connection", error)
|
||||
raise error
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue