diff --git a/ee/api/chalicelib/core/db_request_handler.py b/ee/api/chalicelib/core/db_request_handler.py new file mode 100644 index 000000000..6e31ee450 --- /dev/null +++ b/ee/api/chalicelib/core/db_request_handler.py @@ -0,0 +1,202 @@ +import logging +from chalicelib.utils import helper, pg_client + + +class DatabaseRequestHandler: + def __init__(self, table_name): + self.table_name = table_name + self.constraints = [] + self.params = {} + self.order_clause = "" + self.sort_clause = "" + self.select_columns = [] + self.sub_queries = [] + self.joins = [] + self.group_by_clause = "" + self.client = pg_client + self.logger = logging.getLogger(__name__) + self.pagination = {} + + def add_constraint(self, constraint, param=None): + self.constraints.append(constraint) + if param: + self.params.update(param) + + def add_subquery(self, subquery, alias, param=None): + self.sub_queries.append((subquery, alias)) + if param: + self.params.update(param) + + def add_join(self, join_clause): + self.joins.append(join_clause) + + def add_param(self, key, value): + self.params[key] = value + + def set_order_by(self, order_by): + self.order_clause = order_by + + def set_sort_by(self, sort_by): + self.sort_clause = sort_by + + def set_select_columns(self, columns): + self.select_columns = columns + + def set_group_by(self, group_by_clause): + self.group_by_clause = group_by_clause + + def set_pagination(self, page, page_size): + """ + Set pagination parameters for the query. + :param page: The page number (1-indexed) + :param page_size: Number of items per page + """ + self.pagination = { + 'offset': (page - 1) * page_size, + 'limit': page_size + } + + def build_query(self, action="select", additional_clauses=None, data=None): + + if action == "select": + query = f"SELECT {', '.join(self.select_columns)} FROM {self.table_name}" + elif action == "insert": + columns = ', '.join(data.keys()) + placeholders = ', '.join(f'%({k})s' for k in data.keys()) + query = f"INSERT INTO {self.table_name} ({columns}) VALUES ({placeholders})" + elif action == "update": + set_clause = ', '.join(f"{k} = %({k})s" for k in data.keys()) + query = f"UPDATE {self.table_name} SET {set_clause}" + elif action == "delete": + query = f"DELETE FROM {self.table_name}" + + for join in self.joins: + query += f" {join}" + for subquery, alias in self.sub_queries: + query += f", ({subquery}) AS {alias}" + if self.constraints: + query += " WHERE " + " AND ".join(self.constraints) + if action == "select": + if self.group_by_clause: + query += " GROUP BY " + self.group_by_clause + if self.sort_clause: + query += " ORDER BY " + self.sort_clause + if self.order_clause: + query += " " + self.order_clause + if hasattr(self, 'pagination') and self.pagination: + query += " LIMIT %(limit)s OFFSET %(offset)s" + self.params.update(self.pagination) + + if additional_clauses: + query += " " + additional_clauses + + logging.info(f"Query: {query}") + return query + + def execute_query(self, query, data=None): + try: + with self.client.PostgresClient() as cur: + mogrified_query = cur.mogrify(query, {**data, **self.params} if data else self.params) + cur.execute(mogrified_query) + return cur.fetchall() if cur.description else None + except Exception as e: + self.logger.error(f"Database operation failed: {e}") + raise + + def fetchall(self): + query = self.build_query() + return self.execute_query(query) + + def fetchone(self): + query = self.build_query() + result = self.execute_query(query) + return result[0] if result else None + + def insert(self, data): + query = self.build_query(action="insert", data=data) + query += " RETURNING *;" + + result = self.execute_query(query, data) + return result[0] if result else None + + def update(self, data): + query = self.build_query(action="update", data=data) + query += " RETURNING *;" + + result = self.execute_query(query, data) + return result[0] if result else None + + def delete(self): + query = self.build_query(action="delete") + return self.execute_query(query) + + def batch_insert(self, items): + if not items: + return None + + columns = ', '.join(items[0].keys()) + + # Building a values string with unique parameter names for each item + all_values_query = ', '.join( + '(' + ', '.join([f"%({key}_{i})s" for key in item]) + ')' + for i, item in enumerate(items) + ) + + query = f"INSERT INTO {self.table_name} ({columns}) VALUES {all_values_query} RETURNING *;" + + try: + with self.client.PostgresClient() as cur: + # Flatten items into a single dictionary with unique keys + combined_params = {f"{k}_{i}": v for i, item in enumerate(items) for k, v in item.items()} + mogrified_query = cur.mogrify(query, combined_params) + cur.execute(mogrified_query) + return cur.fetchall() + except Exception as e: + self.logger.error(f"Database batch insert operation failed: {e}") + raise + + def raw_query(self, query, params=None): + try: + with self.client.PostgresClient() as cur: + mogrified_query = cur.mogrify(query, params) + cur.execute(mogrified_query) + return cur.fetchall() if cur.description else None + except Exception as e: + self.logger.error(f"Database operation failed: {e}") + raise + + def batch_update(self, items): + if not items: + return None + + id_column = list(items[0])[0] + + # Building the set clause for the update statement + update_columns = list(items[0].keys()) + update_columns.remove(id_column) + set_clause = ', '.join([f"{col} = v.{col}" for col in update_columns]) + + # Building the values part for the 'VALUES' section + values_rows = [] + for item in items: + values = ', '.join([f"%({key})s" for key in item.keys()]) + values_rows.append(f"({values})") + values_query = ', '.join(values_rows) + + # Constructing the full update query + query = f""" + UPDATE {self.table_name} AS t + SET {set_clause} + FROM (VALUES {values_query}) AS v ({', '.join(items[0].keys())}) + WHERE t.{id_column} = v.{id_column}; + """ + + try: + with self.client.PostgresClient() as cur: + # Flatten items into a single dictionary for mogrify + combined_params = {k: v for item in items for k, v in item.items()} + mogrified_query = cur.mogrify(query, combined_params) + cur.execute(mogrified_query) + except Exception as e: + self.logger.error(f"Database batch update operation failed: {e}") + raise diff --git a/ee/api/chalicelib/core/projects.py b/ee/api/chalicelib/core/projects.py index 3d33160bd..aa5bf4d75 100644 --- a/ee/api/chalicelib/core/projects.py +++ b/ee/api/chalicelib/core/projects.py @@ -290,6 +290,170 @@ def update_capture_status(project_id, changes: schemas.SampleRateSchema): return changes +def get_conditions(project_id): + with pg_client.PostgresClient() as cur: + query = cur.mogrify("""SELECT p.sample_rate AS rate, p.conditional_capture, + COALESCE( + array_agg( + json_build_object( + 'condition_id', pc.condition_id, + 'capture_rate', pc.capture_rate, + 'name', pc.name, + 'filters', pc.filters + ) + ) FILTER (WHERE pc.condition_id IS NOT NULL), + ARRAY[]::json[] + ) AS conditions + FROM public.projects AS p + LEFT JOIN ( + SELECT * FROM public.projects_conditions + WHERE project_id = %(project_id)s ORDER BY condition_id + ) AS pc ON p.project_id = pc.project_id + WHERE p.project_id = %(project_id)s + AND p.deleted_at IS NULL + GROUP BY p.sample_rate, p.conditional_capture;""", + {"project_id": project_id}) + cur.execute(query=query) + row = cur.fetchone() + row = helper.dict_to_camel_case(row) + row["conditions"] = [schemas.ProjectConditions(**c) for c in row["conditions"]] + + return row + + +def validate_conditions(conditions: List[schemas.ProjectConditions]) -> List[str]: + errors = [] + names = [condition.name for condition in conditions] + + # Check for empty strings + if any(name.strip() == "" for name in names): + errors.append("Condition names cannot be empty strings") + + # Check for duplicates + name_counts = Counter(names) + duplicates = [name for name, count in name_counts.items() if count > 1] + if duplicates: + errors.append(f"Duplicate condition names found: {duplicates}") + + return errors + + +def update_conditions(project_id, changes: schemas.ProjectSettings): + validation_errors = validate_conditions(changes.conditions) + if validation_errors: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=validation_errors) + + conditions = [] + for condition in changes.conditions: + conditions.append(condition.model_dump()) + + with pg_client.PostgresClient() as cur: + query = cur.mogrify("""UPDATE public.projects + SET + sample_rate= %(sample_rate)s, + conditional_capture = %(conditional_capture)s + WHERE project_id =%(project_id)s + AND deleted_at IS NULL;""", + { + "project_id": project_id, + "sample_rate": changes.rate, + "conditional_capture": changes.conditional_capture + }) + cur.execute(query=query) + + return update_project_conditions(project_id, changes.conditions) + + +def create_project_conditions(project_id, conditions): + rows = [] + + # insert all conditions rows with single sql query + if len(conditions) > 0: + columns = ( + "project_id", + "name", + "capture_rate", + "filters", + ) + + sql = f""" + INSERT INTO projects_conditions + (project_id, name, capture_rate, filters) + VALUES {", ".join(["%s"] * len(conditions))} + RETURNING condition_id, {", ".join(columns)} + """ + + with pg_client.PostgresClient() as cur: + params = [ + (project_id, c.name, c.capture_rate, json.dumps([filter_.model_dump() for filter_ in c.filters])) + for c in conditions] + query = cur.mogrify(sql, params) + cur.execute(query) + rows = cur.fetchall() + + return rows + + +def update_project_condition(project_id, conditions): + values = [] + params = { + "project_id": project_id, + } + for i in range(len(conditions)): + values.append(f"(%(condition_id_{i})s, %(name_{i})s, %(capture_rate_{i})s, %(filters_{i})s::jsonb)") + params[f"condition_id_{i}"] = conditions[i].condition_id + params[f"name_{i}"] = conditions[i].name + params[f"capture_rate_{i}"] = conditions[i].capture_rate + params[f"filters_{i}"] = json.dumps(conditions[i].filters) + + sql = f""" + UPDATE projects_conditions + SET name = c.name, capture_rate = c.capture_rate, filters = c.filters + FROM (VALUES {','.join(values)}) AS c(condition_id, name, capture_rate, filters) + WHERE c.condition_id = projects_conditions.condition_id AND project_id = %(project_id)s; + """ + + with pg_client.PostgresClient() as cur: + query = cur.mogrify(sql, params) + cur.execute(query) + + +def delete_project_condition(project_id, ids): + sql = """ + DELETE FROM projects_conditions + WHERE condition_id IN %(ids)s + AND project_id= %(project_id)s; + """ + + with pg_client.PostgresClient() as cur: + query = cur.mogrify(sql, {"project_id": project_id, "ids": tuple(ids)}) + cur.execute(query) + + +def update_project_conditions(project_id, conditions): + if conditions is None: + return + + existing = get_conditions(project_id)["conditions"] + existing_ids = {c.condition_id for c in existing} + + to_be_updated = [c for c in conditions if c.condition_id in existing_ids] + to_be_created = [c for c in conditions if c.condition_id not in existing_ids] + to_be_deleted = existing_ids - {c.condition_id for c in conditions} + + if to_be_deleted: + delete_project_condition(project_id, to_be_deleted) + + if to_be_created: + create_project_conditions(project_id, to_be_created) + + if to_be_updated: + print(to_be_updated) + update_project_condition(project_id, to_be_updated) + + return get_conditions(project_id) + + def get_projects_ids(tenant_id): with pg_client.PostgresClient() as cur: query = cur.mogrify("""SELECT s.project_id