feat(api): conditional capture - new table for conditions
This commit is contained in:
parent
c1181287f5
commit
e7285e6eed
2 changed files with 119 additions and 19 deletions
|
|
@ -71,7 +71,7 @@ def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False):
|
|||
query = cur.mogrify(f"""{"SELECT *, first_recorded IS NOT NULL AS recorded FROM (" if recorded else ""}
|
||||
SELECT s.project_id, s.name, s.project_key, s.save_request_payloads, s.first_recorded_session_at,
|
||||
s.created_at, s.sessions_last_check_at, s.sample_rate, s.platform,
|
||||
(SELECT count(*) FROM jsonb_array_elements_text(s.conditions)) AS conditions_count
|
||||
(SELECT count(*) FROM projects_conditions WHERE project_id = s.project_id) AS conditions_count
|
||||
{extra_projection}
|
||||
FROM public.projects AS s
|
||||
WHERE s.deleted_at IS NULL
|
||||
|
|
@ -254,18 +254,31 @@ def update_capture_status(project_id, changes: schemas.SampleRateSchema):
|
|||
|
||||
def get_conditions(project_id):
|
||||
with pg_client.PostgresClient() as cur:
|
||||
query = cur.mogrify("""SELECT sample_rate AS rate, sample_rate=100 AS capture_all, conditions
|
||||
FROM public.projects
|
||||
WHERE project_id =%(project_id)s
|
||||
AND deleted_at ISNULL;""",
|
||||
query = cur.mogrify("""SELECT p.sample_rate AS rate, p.conditional_capture,
|
||||
COALESCE(
|
||||
array_agg(
|
||||
json_build_object(
|
||||
'condition_id', pc.condition_id,
|
||||
'capture_rate', pc.capture_rate,
|
||||
'name', pc.name,
|
||||
'filters', pc.filters
|
||||
)
|
||||
) FILTER (WHERE pc.condition_id IS NOT NULL),
|
||||
ARRAY[]::json[]
|
||||
) AS conditions
|
||||
FROM public.projects AS p
|
||||
LEFT JOIN (
|
||||
SELECT * FROM public.projects_conditions
|
||||
WHERE project_id = %(project_id)s ORDER BY condition_id
|
||||
) AS pc ON p.project_id = pc.project_id
|
||||
WHERE p.project_id = %(project_id)s
|
||||
AND p.deleted_at IS NULL
|
||||
GROUP BY p.sample_rate, p.conditional_capture;""",
|
||||
{"project_id": project_id})
|
||||
cur.execute(query=query)
|
||||
row = cur.fetchone()
|
||||
row = helper.dict_to_camel_case(row)
|
||||
if row["conditions"] is None:
|
||||
row["conditions"] = []
|
||||
else:
|
||||
row["conditions"] = [schemas.ProjectConditions(**c) for c in row["conditions"]]
|
||||
row["conditions"] = [schemas.ProjectConditions(**c) for c in row["conditions"]]
|
||||
|
||||
return row
|
||||
|
||||
|
|
@ -288,10 +301,6 @@ def validate_conditions(conditions: List[schemas.ProjectConditions]) -> List[str
|
|||
|
||||
|
||||
def update_conditions(project_id, changes: schemas.ProjectSettings):
|
||||
sample_rate = changes.rate
|
||||
if changes.capture_all:
|
||||
sample_rate = 100
|
||||
|
||||
validation_errors = validate_conditions(changes.conditions)
|
||||
if validation_errors:
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=validation_errors)
|
||||
|
|
@ -304,17 +313,107 @@ def update_conditions(project_id, changes: schemas.ProjectSettings):
|
|||
query = cur.mogrify("""UPDATE public.projects
|
||||
SET
|
||||
sample_rate= %(sample_rate)s,
|
||||
conditions = %(conditions)s::jsonb
|
||||
conditional_capture = %(conditional_capture)s
|
||||
WHERE project_id =%(project_id)s
|
||||
AND deleted_at ISNULL;""",
|
||||
AND deleted_at IS NULL;""",
|
||||
{
|
||||
"project_id": project_id,
|
||||
"sample_rate": sample_rate,
|
||||
"conditions": json.dumps(conditions)
|
||||
"sample_rate": changes.rate,
|
||||
"conditional_capture": changes.conditional_capture
|
||||
})
|
||||
cur.execute(query=query)
|
||||
|
||||
return changes
|
||||
return update_project_conditions(project_id, changes.conditions)
|
||||
|
||||
|
||||
def create_project_conditions(project_id, conditions):
|
||||
rows = []
|
||||
|
||||
# insert all conditions rows with single sql query
|
||||
if len(conditions) > 0:
|
||||
columns = (
|
||||
"project_id",
|
||||
"name",
|
||||
"capture_rate",
|
||||
"filters",
|
||||
)
|
||||
|
||||
sql = f"""
|
||||
INSERT INTO projects_conditions
|
||||
(project_id, name, capture_rate, filters)
|
||||
VALUES {", ".join(["%s"] * len(conditions))}
|
||||
RETURNING condition_id, {", ".join(columns)}
|
||||
"""
|
||||
|
||||
with pg_client.PostgresClient() as cur:
|
||||
params = [
|
||||
(project_id, c.name, c.capture_rate, json.dumps([filter_.model_dump() for filter_ in c.filters]))
|
||||
for c in conditions]
|
||||
query = cur.mogrify(sql, params)
|
||||
cur.execute(query)
|
||||
rows = cur.fetchall()
|
||||
|
||||
return rows
|
||||
|
||||
|
||||
def update_project_condition(project_id, conditions):
|
||||
values = []
|
||||
params = {
|
||||
"project_id": project_id,
|
||||
}
|
||||
for i in range(len(conditions)):
|
||||
values.append(f"(%(condition_id_{i})s, %(name_{i})s, %(capture_rate_{i})s, %(filters_{i})s::jsonb)")
|
||||
params[f"condition_id_{i}"] = conditions[i].condition_id
|
||||
params[f"name_{i}"] = conditions[i].name
|
||||
params[f"capture_rate_{i}"] = conditions[i].capture_rate
|
||||
params[f"filters_{i}"] = json.dumps(conditions[i].filters)
|
||||
|
||||
sql = f"""
|
||||
UPDATE projects_conditions
|
||||
SET name = c.name, capture_rate = c.capture_rate, filters = c.filters
|
||||
FROM (VALUES {','.join(values)}) AS c(condition_id, name, capture_rate, filters)
|
||||
WHERE c.condition_id = projects_conditions.condition_id AND project_id = %(project_id)s;
|
||||
"""
|
||||
|
||||
with pg_client.PostgresClient() as cur:
|
||||
query = cur.mogrify(sql, params)
|
||||
cur.execute(query)
|
||||
|
||||
|
||||
def delete_project_condition(project_id, ids):
|
||||
sql = """
|
||||
DELETE FROM projects_conditions
|
||||
WHERE condition_id IN %(ids)s
|
||||
AND project_id= %(project_id)s;
|
||||
"""
|
||||
|
||||
with pg_client.PostgresClient() as cur:
|
||||
query = cur.mogrify(sql, {"project_id": project_id, "ids": tuple(ids)})
|
||||
cur.execute(query)
|
||||
|
||||
|
||||
def update_project_conditions(project_id, conditions):
|
||||
if conditions is None:
|
||||
return
|
||||
|
||||
existing = get_conditions(project_id)["conditions"]
|
||||
existing_ids = {c.condition_id for c in existing}
|
||||
|
||||
to_be_updated = [c for c in conditions if c.condition_id in existing_ids]
|
||||
to_be_created = [c for c in conditions if c.condition_id not in existing_ids]
|
||||
to_be_deleted = existing_ids - {c.condition_id for c in conditions}
|
||||
|
||||
if to_be_deleted:
|
||||
delete_project_condition(project_id, to_be_deleted)
|
||||
|
||||
if to_be_created:
|
||||
create_project_conditions(project_id, to_be_created)
|
||||
|
||||
if to_be_updated:
|
||||
print(to_be_updated)
|
||||
update_project_condition(project_id, to_be_updated)
|
||||
|
||||
return get_conditions(project_id)
|
||||
|
||||
|
||||
def get_projects_ids(tenant_id):
|
||||
|
|
|
|||
|
|
@ -1324,6 +1324,7 @@ class SavedSearchSchema(BaseModel):
|
|||
|
||||
|
||||
class ProjectConditions(BaseModel):
|
||||
condition_id: Optional[int] = Field(default=None)
|
||||
name: str = Field(...)
|
||||
capture_rate: int = Field(..., ge=0, le=100)
|
||||
filters: List[GroupedFilterType] = Field(default=[])
|
||||
|
|
@ -1331,7 +1332,7 @@ class ProjectConditions(BaseModel):
|
|||
|
||||
class ProjectSettings(BaseModel):
|
||||
rate: int = Field(..., ge=0, le=100)
|
||||
capture_all: bool = Field(default=False)
|
||||
conditional_capture: bool = Field(default=False)
|
||||
conditions: List[ProjectConditions] = Field(default=[])
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue