feat(api): project conditions foss to ee
This commit is contained in:
parent
650df7090d
commit
2fbd250e2a
2 changed files with 366 additions and 0 deletions
202
ee/api/chalicelib/core/db_request_handler.py
Normal file
202
ee/api/chalicelib/core/db_request_handler.py
Normal file
|
|
@ -0,0 +1,202 @@
|
|||
import logging
|
||||
from chalicelib.utils import helper, pg_client
|
||||
|
||||
|
||||
class DatabaseRequestHandler:
|
||||
def __init__(self, table_name):
|
||||
self.table_name = table_name
|
||||
self.constraints = []
|
||||
self.params = {}
|
||||
self.order_clause = ""
|
||||
self.sort_clause = ""
|
||||
self.select_columns = []
|
||||
self.sub_queries = []
|
||||
self.joins = []
|
||||
self.group_by_clause = ""
|
||||
self.client = pg_client
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.pagination = {}
|
||||
|
||||
def add_constraint(self, constraint, param=None):
|
||||
self.constraints.append(constraint)
|
||||
if param:
|
||||
self.params.update(param)
|
||||
|
||||
def add_subquery(self, subquery, alias, param=None):
|
||||
self.sub_queries.append((subquery, alias))
|
||||
if param:
|
||||
self.params.update(param)
|
||||
|
||||
def add_join(self, join_clause):
|
||||
self.joins.append(join_clause)
|
||||
|
||||
def add_param(self, key, value):
|
||||
self.params[key] = value
|
||||
|
||||
def set_order_by(self, order_by):
|
||||
self.order_clause = order_by
|
||||
|
||||
def set_sort_by(self, sort_by):
|
||||
self.sort_clause = sort_by
|
||||
|
||||
def set_select_columns(self, columns):
|
||||
self.select_columns = columns
|
||||
|
||||
def set_group_by(self, group_by_clause):
|
||||
self.group_by_clause = group_by_clause
|
||||
|
||||
def set_pagination(self, page, page_size):
|
||||
"""
|
||||
Set pagination parameters for the query.
|
||||
:param page: The page number (1-indexed)
|
||||
:param page_size: Number of items per page
|
||||
"""
|
||||
self.pagination = {
|
||||
'offset': (page - 1) * page_size,
|
||||
'limit': page_size
|
||||
}
|
||||
|
||||
def build_query(self, action="select", additional_clauses=None, data=None):
|
||||
|
||||
if action == "select":
|
||||
query = f"SELECT {', '.join(self.select_columns)} FROM {self.table_name}"
|
||||
elif action == "insert":
|
||||
columns = ', '.join(data.keys())
|
||||
placeholders = ', '.join(f'%({k})s' for k in data.keys())
|
||||
query = f"INSERT INTO {self.table_name} ({columns}) VALUES ({placeholders})"
|
||||
elif action == "update":
|
||||
set_clause = ', '.join(f"{k} = %({k})s" for k in data.keys())
|
||||
query = f"UPDATE {self.table_name} SET {set_clause}"
|
||||
elif action == "delete":
|
||||
query = f"DELETE FROM {self.table_name}"
|
||||
|
||||
for join in self.joins:
|
||||
query += f" {join}"
|
||||
for subquery, alias in self.sub_queries:
|
||||
query += f", ({subquery}) AS {alias}"
|
||||
if self.constraints:
|
||||
query += " WHERE " + " AND ".join(self.constraints)
|
||||
if action == "select":
|
||||
if self.group_by_clause:
|
||||
query += " GROUP BY " + self.group_by_clause
|
||||
if self.sort_clause:
|
||||
query += " ORDER BY " + self.sort_clause
|
||||
if self.order_clause:
|
||||
query += " " + self.order_clause
|
||||
if hasattr(self, 'pagination') and self.pagination:
|
||||
query += " LIMIT %(limit)s OFFSET %(offset)s"
|
||||
self.params.update(self.pagination)
|
||||
|
||||
if additional_clauses:
|
||||
query += " " + additional_clauses
|
||||
|
||||
logging.info(f"Query: {query}")
|
||||
return query
|
||||
|
||||
def execute_query(self, query, data=None):
|
||||
try:
|
||||
with self.client.PostgresClient() as cur:
|
||||
mogrified_query = cur.mogrify(query, {**data, **self.params} if data else self.params)
|
||||
cur.execute(mogrified_query)
|
||||
return cur.fetchall() if cur.description else None
|
||||
except Exception as e:
|
||||
self.logger.error(f"Database operation failed: {e}")
|
||||
raise
|
||||
|
||||
def fetchall(self):
|
||||
query = self.build_query()
|
||||
return self.execute_query(query)
|
||||
|
||||
def fetchone(self):
|
||||
query = self.build_query()
|
||||
result = self.execute_query(query)
|
||||
return result[0] if result else None
|
||||
|
||||
def insert(self, data):
|
||||
query = self.build_query(action="insert", data=data)
|
||||
query += " RETURNING *;"
|
||||
|
||||
result = self.execute_query(query, data)
|
||||
return result[0] if result else None
|
||||
|
||||
def update(self, data):
|
||||
query = self.build_query(action="update", data=data)
|
||||
query += " RETURNING *;"
|
||||
|
||||
result = self.execute_query(query, data)
|
||||
return result[0] if result else None
|
||||
|
||||
def delete(self):
|
||||
query = self.build_query(action="delete")
|
||||
return self.execute_query(query)
|
||||
|
||||
def batch_insert(self, items):
|
||||
if not items:
|
||||
return None
|
||||
|
||||
columns = ', '.join(items[0].keys())
|
||||
|
||||
# Building a values string with unique parameter names for each item
|
||||
all_values_query = ', '.join(
|
||||
'(' + ', '.join([f"%({key}_{i})s" for key in item]) + ')'
|
||||
for i, item in enumerate(items)
|
||||
)
|
||||
|
||||
query = f"INSERT INTO {self.table_name} ({columns}) VALUES {all_values_query} RETURNING *;"
|
||||
|
||||
try:
|
||||
with self.client.PostgresClient() as cur:
|
||||
# Flatten items into a single dictionary with unique keys
|
||||
combined_params = {f"{k}_{i}": v for i, item in enumerate(items) for k, v in item.items()}
|
||||
mogrified_query = cur.mogrify(query, combined_params)
|
||||
cur.execute(mogrified_query)
|
||||
return cur.fetchall()
|
||||
except Exception as e:
|
||||
self.logger.error(f"Database batch insert operation failed: {e}")
|
||||
raise
|
||||
|
||||
def raw_query(self, query, params=None):
|
||||
try:
|
||||
with self.client.PostgresClient() as cur:
|
||||
mogrified_query = cur.mogrify(query, params)
|
||||
cur.execute(mogrified_query)
|
||||
return cur.fetchall() if cur.description else None
|
||||
except Exception as e:
|
||||
self.logger.error(f"Database operation failed: {e}")
|
||||
raise
|
||||
|
||||
def batch_update(self, items):
|
||||
if not items:
|
||||
return None
|
||||
|
||||
id_column = list(items[0])[0]
|
||||
|
||||
# Building the set clause for the update statement
|
||||
update_columns = list(items[0].keys())
|
||||
update_columns.remove(id_column)
|
||||
set_clause = ', '.join([f"{col} = v.{col}" for col in update_columns])
|
||||
|
||||
# Building the values part for the 'VALUES' section
|
||||
values_rows = []
|
||||
for item in items:
|
||||
values = ', '.join([f"%({key})s" for key in item.keys()])
|
||||
values_rows.append(f"({values})")
|
||||
values_query = ', '.join(values_rows)
|
||||
|
||||
# Constructing the full update query
|
||||
query = f"""
|
||||
UPDATE {self.table_name} AS t
|
||||
SET {set_clause}
|
||||
FROM (VALUES {values_query}) AS v ({', '.join(items[0].keys())})
|
||||
WHERE t.{id_column} = v.{id_column};
|
||||
"""
|
||||
|
||||
try:
|
||||
with self.client.PostgresClient() as cur:
|
||||
# Flatten items into a single dictionary for mogrify
|
||||
combined_params = {k: v for item in items for k, v in item.items()}
|
||||
mogrified_query = cur.mogrify(query, combined_params)
|
||||
cur.execute(mogrified_query)
|
||||
except Exception as e:
|
||||
self.logger.error(f"Database batch update operation failed: {e}")
|
||||
raise
|
||||
|
|
@ -290,6 +290,170 @@ def update_capture_status(project_id, changes: schemas.SampleRateSchema):
|
|||
return changes
|
||||
|
||||
|
||||
def get_conditions(project_id):
|
||||
with pg_client.PostgresClient() as cur:
|
||||
query = cur.mogrify("""SELECT p.sample_rate AS rate, p.conditional_capture,
|
||||
COALESCE(
|
||||
array_agg(
|
||||
json_build_object(
|
||||
'condition_id', pc.condition_id,
|
||||
'capture_rate', pc.capture_rate,
|
||||
'name', pc.name,
|
||||
'filters', pc.filters
|
||||
)
|
||||
) FILTER (WHERE pc.condition_id IS NOT NULL),
|
||||
ARRAY[]::json[]
|
||||
) AS conditions
|
||||
FROM public.projects AS p
|
||||
LEFT JOIN (
|
||||
SELECT * FROM public.projects_conditions
|
||||
WHERE project_id = %(project_id)s ORDER BY condition_id
|
||||
) AS pc ON p.project_id = pc.project_id
|
||||
WHERE p.project_id = %(project_id)s
|
||||
AND p.deleted_at IS NULL
|
||||
GROUP BY p.sample_rate, p.conditional_capture;""",
|
||||
{"project_id": project_id})
|
||||
cur.execute(query=query)
|
||||
row = cur.fetchone()
|
||||
row = helper.dict_to_camel_case(row)
|
||||
row["conditions"] = [schemas.ProjectConditions(**c) for c in row["conditions"]]
|
||||
|
||||
return row
|
||||
|
||||
|
||||
def validate_conditions(conditions: List[schemas.ProjectConditions]) -> List[str]:
|
||||
errors = []
|
||||
names = [condition.name for condition in conditions]
|
||||
|
||||
# Check for empty strings
|
||||
if any(name.strip() == "" for name in names):
|
||||
errors.append("Condition names cannot be empty strings")
|
||||
|
||||
# Check for duplicates
|
||||
name_counts = Counter(names)
|
||||
duplicates = [name for name, count in name_counts.items() if count > 1]
|
||||
if duplicates:
|
||||
errors.append(f"Duplicate condition names found: {duplicates}")
|
||||
|
||||
return errors
|
||||
|
||||
|
||||
def update_conditions(project_id, changes: schemas.ProjectSettings):
|
||||
validation_errors = validate_conditions(changes.conditions)
|
||||
if validation_errors:
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=validation_errors)
|
||||
|
||||
conditions = []
|
||||
for condition in changes.conditions:
|
||||
conditions.append(condition.model_dump())
|
||||
|
||||
with pg_client.PostgresClient() as cur:
|
||||
query = cur.mogrify("""UPDATE public.projects
|
||||
SET
|
||||
sample_rate= %(sample_rate)s,
|
||||
conditional_capture = %(conditional_capture)s
|
||||
WHERE project_id =%(project_id)s
|
||||
AND deleted_at IS NULL;""",
|
||||
{
|
||||
"project_id": project_id,
|
||||
"sample_rate": changes.rate,
|
||||
"conditional_capture": changes.conditional_capture
|
||||
})
|
||||
cur.execute(query=query)
|
||||
|
||||
return update_project_conditions(project_id, changes.conditions)
|
||||
|
||||
|
||||
def create_project_conditions(project_id, conditions):
|
||||
rows = []
|
||||
|
||||
# insert all conditions rows with single sql query
|
||||
if len(conditions) > 0:
|
||||
columns = (
|
||||
"project_id",
|
||||
"name",
|
||||
"capture_rate",
|
||||
"filters",
|
||||
)
|
||||
|
||||
sql = f"""
|
||||
INSERT INTO projects_conditions
|
||||
(project_id, name, capture_rate, filters)
|
||||
VALUES {", ".join(["%s"] * len(conditions))}
|
||||
RETURNING condition_id, {", ".join(columns)}
|
||||
"""
|
||||
|
||||
with pg_client.PostgresClient() as cur:
|
||||
params = [
|
||||
(project_id, c.name, c.capture_rate, json.dumps([filter_.model_dump() for filter_ in c.filters]))
|
||||
for c in conditions]
|
||||
query = cur.mogrify(sql, params)
|
||||
cur.execute(query)
|
||||
rows = cur.fetchall()
|
||||
|
||||
return rows
|
||||
|
||||
|
||||
def update_project_condition(project_id, conditions):
|
||||
values = []
|
||||
params = {
|
||||
"project_id": project_id,
|
||||
}
|
||||
for i in range(len(conditions)):
|
||||
values.append(f"(%(condition_id_{i})s, %(name_{i})s, %(capture_rate_{i})s, %(filters_{i})s::jsonb)")
|
||||
params[f"condition_id_{i}"] = conditions[i].condition_id
|
||||
params[f"name_{i}"] = conditions[i].name
|
||||
params[f"capture_rate_{i}"] = conditions[i].capture_rate
|
||||
params[f"filters_{i}"] = json.dumps(conditions[i].filters)
|
||||
|
||||
sql = f"""
|
||||
UPDATE projects_conditions
|
||||
SET name = c.name, capture_rate = c.capture_rate, filters = c.filters
|
||||
FROM (VALUES {','.join(values)}) AS c(condition_id, name, capture_rate, filters)
|
||||
WHERE c.condition_id = projects_conditions.condition_id AND project_id = %(project_id)s;
|
||||
"""
|
||||
|
||||
with pg_client.PostgresClient() as cur:
|
||||
query = cur.mogrify(sql, params)
|
||||
cur.execute(query)
|
||||
|
||||
|
||||
def delete_project_condition(project_id, ids):
|
||||
sql = """
|
||||
DELETE FROM projects_conditions
|
||||
WHERE condition_id IN %(ids)s
|
||||
AND project_id= %(project_id)s;
|
||||
"""
|
||||
|
||||
with pg_client.PostgresClient() as cur:
|
||||
query = cur.mogrify(sql, {"project_id": project_id, "ids": tuple(ids)})
|
||||
cur.execute(query)
|
||||
|
||||
|
||||
def update_project_conditions(project_id, conditions):
|
||||
if conditions is None:
|
||||
return
|
||||
|
||||
existing = get_conditions(project_id)["conditions"]
|
||||
existing_ids = {c.condition_id for c in existing}
|
||||
|
||||
to_be_updated = [c for c in conditions if c.condition_id in existing_ids]
|
||||
to_be_created = [c for c in conditions if c.condition_id not in existing_ids]
|
||||
to_be_deleted = existing_ids - {c.condition_id for c in conditions}
|
||||
|
||||
if to_be_deleted:
|
||||
delete_project_condition(project_id, to_be_deleted)
|
||||
|
||||
if to_be_created:
|
||||
create_project_conditions(project_id, to_be_created)
|
||||
|
||||
if to_be_updated:
|
||||
print(to_be_updated)
|
||||
update_project_condition(project_id, to_be_updated)
|
||||
|
||||
return get_conditions(project_id)
|
||||
|
||||
|
||||
def get_projects_ids(tenant_id):
|
||||
with pg_client.PostgresClient() as cur:
|
||||
query = cur.mogrify("""SELECT s.project_id
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue