diff --git a/api/schemas/schemas.py b/api/schemas/schemas.py index f873d32a8..973dce8f1 100644 --- a/api/schemas/schemas.py +++ b/api/schemas/schemas.py @@ -3,12 +3,13 @@ from typing import Optional, List, Union, Literal from pydantic import Field, EmailStr, HttpUrl, SecretStr, AnyHttpUrl from pydantic import field_validator, model_validator, computed_field +from pydantic import AfterValidator from pydantic.functional_validators import BeforeValidator from chalicelib.utils.TimeUTC import TimeUTC from .overrides import BaseModel, Enum, ORUnion from .transformers_validators import transform_email, remove_whitespace, remove_duplicate_values, single_to_list, \ - force_is_event, NAME_PATTERN, int_to_string, check_alphanumeric + force_is_event, NAME_PATTERN, int_to_string, check_alphanumeric, check_regex class _GRecaptcha(BaseModel): @@ -537,7 +538,7 @@ class GraphqlFilterType(str, Enum): class RequestGraphqlFilterSchema(BaseModel): type: Union[FetchFilterType, GraphqlFilterType] = Field(...) value: List[Union[int, str]] = Field(...) - operator: Union[SearchEventOperator, MathOperator] = Field(...) + operator: Annotated[Union[SearchEventOperator, MathOperator], AfterValidator(check_regex)] = Field(...) @model_validator(mode="before") @classmethod @@ -612,6 +613,13 @@ class PropertyFilterSchema(BaseModel): self.name = self.name.value return self + @model_validator(mode='after') + def _check_regex_value(self): + if self.operator == SearchEventOperator.PATTERN: + for v in self.value: + check_regex(v) + return self + class EventPropertiesSchema(BaseModel): operator: Literal["and", "or"] = Field(...) @@ -657,6 +665,13 @@ class SessionSearchEventSchema(BaseModel): f"operator:{self.operator} is only available for event-type: {EventType.CLICK}" return self + @model_validator(mode='after') + def _check_regex_value(self): + if self.operator == SearchEventOperator.PATTERN: + for v in self.value: + check_regex(v) + return self + class SessionSearchFilterSchema(BaseModel): is_event: Literal[False] = False @@ -714,6 +729,13 @@ class SessionSearchFilterSchema(BaseModel): return self + @model_validator(mode='after') + def _check_regex_value(self): + if self.operator == SearchEventOperator.PATTERN: + for v in self.value: + check_regex(v) + return self + class _PaginatedSchema(BaseModel): limit: int = Field(default=200, gt=0, le=200) @@ -880,6 +902,13 @@ class PathAnalysisSubFilterSchema(BaseModel): values["isEvent"] = True return values + @model_validator(mode='after') + def _check_regex_value(self): + if self.operator == SearchEventOperator.PATTERN: + for v in self.value: + check_regex(v) + return self + class _ProductAnalyticsFilter(BaseModel): is_event: Literal[False] = False @@ -890,6 +919,13 @@ class _ProductAnalyticsFilter(BaseModel): _remove_duplicate_values = field_validator('value', mode='before')(remove_duplicate_values) + @model_validator(mode='after') + def _check_regex_value(self): + if self.operator == SearchEventOperator.PATTERN: + for v in self.value: + check_regex(v) + return self + class _ProductAnalyticsEventFilter(BaseModel): is_event: Literal[True] = True @@ -900,6 +936,13 @@ class _ProductAnalyticsEventFilter(BaseModel): _remove_duplicate_values = field_validator('value', mode='before')(remove_duplicate_values) + @model_validator(mode='after') + def _check_regex_value(self): + if self.operator == SearchEventOperator.PATTERN: + for v in self.value: + check_regex(v) + return self + # this type is created to allow mixing events&filters and specifying a discriminator for PathAnalysis series filter ProductAnalyticsFilter = Annotated[Union[_ProductAnalyticsFilter, _ProductAnalyticsEventFilter], @@ -1344,6 +1387,13 @@ class LiveSessionSearchFilterSchema(BaseModel): assert len(self.source) > 0, "source should not be empty for METADATA type" return self + @model_validator(mode='after') + def _check_regex_value(self): + if self.operator == SearchEventOperator.PATTERN: + for v in self.value: + check_regex(v) + return self + class LiveSessionsSearchPayloadSchema(_PaginatedSchema): filters: List[LiveSessionSearchFilterSchema] = Field([]) diff --git a/api/schemas/transformers_validators.py b/api/schemas/transformers_validators.py index 106f8747c..90d88aead 100644 --- a/api/schemas/transformers_validators.py +++ b/api/schemas/transformers_validators.py @@ -1,3 +1,4 @@ +import re from typing import Union, Any, Type from pydantic import ValidationInfo @@ -57,3 +58,17 @@ def check_alphanumeric(v: str, info: ValidationInfo) -> str: is_alphanumeric = v.replace(' ', '').isalnum() assert is_alphanumeric, f'{info.field_name} must be alphanumeric' return v + + +def check_regex(v: str) -> str: + assert v is not None, "Regex is null" + assert isinstance(v, str), "Regex value must be a string" + assert len(v) > 0, "Regex is empty" + is_valid = None + try: + re.compile(v) + except re.error as exc: + is_valid = f"Invalid regex: {exc} (at position {exc.pos})" + + assert is_valid is None, is_valid + return v