* fix(chalice): fixed Math-operators validation refactor(chalice): search for sessions that have events for heatmaps * refactor(chalice): search for sessions that have at least 1 location event for heatmaps * fix(chalice): fixed Math-operators validation refactor(chalice): search for sessions that have events for heatmaps * refactor(chalice): search for sessions that have at least 1 location event for heatmaps * feat(chalice): autocomplete return top 10 with stats * fix(chalice): fixed autocomplete top 10 meta-filters * fix(chalice): fixed usability test logger
205 lines
7.1 KiB
Python
205 lines
7.1 KiB
Python
import logging
|
|
|
|
from chalicelib.utils import pg_client
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DatabaseRequestHandler:
|
|
def __init__(self, table_name):
|
|
self.table_name = table_name
|
|
self.constraints = []
|
|
self.params = {}
|
|
self.order_clause = ""
|
|
self.sort_clause = ""
|
|
self.select_columns = []
|
|
self.sub_queries = []
|
|
self.joins = []
|
|
self.group_by_clause = ""
|
|
self.client = pg_client
|
|
self.logger = logging.getLogger(__name__)
|
|
self.pagination = {}
|
|
|
|
def add_constraint(self, constraint, param=None):
|
|
self.constraints.append(constraint)
|
|
if param:
|
|
self.params.update(param)
|
|
|
|
def add_subquery(self, subquery, alias, param=None):
|
|
self.sub_queries.append((subquery, alias))
|
|
if param:
|
|
self.params.update(param)
|
|
|
|
def add_join(self, join_clause):
|
|
self.joins.append(join_clause)
|
|
|
|
def add_param(self, key, value):
|
|
self.params[key] = value
|
|
|
|
def set_order_by(self, order_by):
|
|
self.order_clause = order_by
|
|
|
|
def set_sort_by(self, sort_by):
|
|
self.sort_clause = sort_by
|
|
|
|
def set_select_columns(self, columns):
|
|
self.select_columns = columns
|
|
|
|
def set_group_by(self, group_by_clause):
|
|
self.group_by_clause = group_by_clause
|
|
|
|
def set_pagination(self, page, page_size):
|
|
"""
|
|
Set pagination parameters for the query.
|
|
:param page: The page number (1-indexed)
|
|
:param page_size: Number of items per page
|
|
"""
|
|
self.pagination = {
|
|
'offset': (page - 1) * page_size,
|
|
'limit': page_size
|
|
}
|
|
|
|
def build_query(self, action="select", additional_clauses=None, data=None):
|
|
|
|
if action == "select":
|
|
query = f"SELECT {', '.join(self.select_columns)} FROM {self.table_name}"
|
|
elif action == "insert":
|
|
columns = ', '.join(data.keys())
|
|
placeholders = ', '.join(f'%({k})s' for k in data.keys())
|
|
query = f"INSERT INTO {self.table_name} ({columns}) VALUES ({placeholders})"
|
|
elif action == "update":
|
|
set_clause = ', '.join(f"{k} = %({k})s" for k in data.keys())
|
|
query = f"UPDATE {self.table_name} SET {set_clause}"
|
|
elif action == "delete":
|
|
query = f"DELETE FROM {self.table_name}"
|
|
|
|
for join in self.joins:
|
|
query += f" {join}"
|
|
for subquery, alias in self.sub_queries:
|
|
query += f", ({subquery}) AS {alias}"
|
|
if self.constraints:
|
|
query += " WHERE " + " AND ".join(self.constraints)
|
|
if action == "select":
|
|
if self.group_by_clause:
|
|
query += " GROUP BY " + self.group_by_clause
|
|
if self.sort_clause:
|
|
query += " ORDER BY " + self.sort_clause
|
|
if self.order_clause:
|
|
query += " " + self.order_clause
|
|
if hasattr(self, 'pagination') and self.pagination:
|
|
query += " LIMIT %(limit)s OFFSET %(offset)s"
|
|
self.params.update(self.pagination)
|
|
|
|
if additional_clauses:
|
|
query += " " + additional_clauses
|
|
|
|
logger.debug(f"Query: {query}")
|
|
return query
|
|
|
|
def execute_query(self, query, data=None):
|
|
try:
|
|
with self.client.PostgresClient() as cur:
|
|
mogrified_query = cur.mogrify(query, {**data, **self.params} if data else self.params)
|
|
cur.execute(mogrified_query)
|
|
return cur.fetchall() if cur.description else None
|
|
except Exception as e:
|
|
self.logger.error(f"Database operation failed: {e}")
|
|
raise
|
|
|
|
def fetchall(self):
|
|
query = self.build_query()
|
|
return self.execute_query(query)
|
|
|
|
def fetchone(self):
|
|
query = self.build_query()
|
|
result = self.execute_query(query)
|
|
return result[0] if result else None
|
|
|
|
def insert(self, data):
|
|
query = self.build_query(action="insert", data=data)
|
|
query += " RETURNING *;"
|
|
|
|
result = self.execute_query(query, data)
|
|
return result[0] if result else None
|
|
|
|
def update(self, data):
|
|
query = self.build_query(action="update", data=data)
|
|
query += " RETURNING *;"
|
|
|
|
result = self.execute_query(query, data)
|
|
return result[0] if result else None
|
|
|
|
def delete(self):
|
|
query = self.build_query(action="delete")
|
|
return self.execute_query(query)
|
|
|
|
def batch_insert(self, items):
|
|
if not items:
|
|
return None
|
|
|
|
columns = ', '.join(items[0].keys())
|
|
|
|
# Building a values string with unique parameter names for each item
|
|
all_values_query = ', '.join(
|
|
'(' + ', '.join([f"%({key}_{i})s" for key in item]) + ')'
|
|
for i, item in enumerate(items)
|
|
)
|
|
|
|
query = f"INSERT INTO {self.table_name} ({columns}) VALUES {all_values_query} RETURNING *;"
|
|
|
|
try:
|
|
with self.client.PostgresClient() as cur:
|
|
# Flatten items into a single dictionary with unique keys
|
|
combined_params = {f"{k}_{i}": v for i, item in enumerate(items) for k, v in item.items()}
|
|
mogrified_query = cur.mogrify(query, combined_params)
|
|
cur.execute(mogrified_query)
|
|
return cur.fetchall()
|
|
except Exception as e:
|
|
self.logger.error(f"Database batch insert operation failed: {e}")
|
|
raise
|
|
|
|
def raw_query(self, query, params=None):
|
|
try:
|
|
with self.client.PostgresClient() as cur:
|
|
mogrified_query = cur.mogrify(query, params)
|
|
cur.execute(mogrified_query)
|
|
return cur.fetchall() if cur.description else None
|
|
except Exception as e:
|
|
self.logger.error(f"Database operation failed: {e}")
|
|
raise
|
|
|
|
def batch_update(self, items):
|
|
if not items:
|
|
return None
|
|
|
|
id_column = list(items[0])[0]
|
|
|
|
# Building the set clause for the update statement
|
|
update_columns = list(items[0].keys())
|
|
update_columns.remove(id_column)
|
|
set_clause = ', '.join([f"{col} = v.{col}" for col in update_columns])
|
|
|
|
# Building the values part for the 'VALUES' section
|
|
values_rows = []
|
|
for item in items:
|
|
values = ', '.join([f"%({key})s" for key in item.keys()])
|
|
values_rows.append(f"({values})")
|
|
values_query = ', '.join(values_rows)
|
|
|
|
# Constructing the full update query
|
|
query = f"""
|
|
UPDATE {self.table_name} AS t
|
|
SET {set_clause}
|
|
FROM (VALUES {values_query}) AS v ({', '.join(items[0].keys())})
|
|
WHERE t.{id_column} = v.{id_column};
|
|
"""
|
|
|
|
try:
|
|
with self.client.PostgresClient() as cur:
|
|
# Flatten items into a single dictionary for mogrify
|
|
combined_params = {k: v for item in items for k, v in item.items()}
|
|
mogrified_query = cur.mogrify(query, combined_params)
|
|
cur.execute(mogrified_query)
|
|
except Exception as e:
|
|
self.logger.error(f"Database batch update operation failed: {e}")
|
|
raise
|