From 0500f30d14243fa7cab87817851f1b7b51536d16 Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Tue, 25 Mar 2025 17:43:45 +0100 Subject: [PATCH] feat(DB): use incremental materialized views to fill extra tables refactor(chalice): changed product analytics --- api/chalicelib/core/metadata.py | 22 +++++++ .../core/product_analytics/events.py | 10 ++- .../core/product_analytics/properties.py | 29 ++++++++- api/routers/subs/product_analytics.py | 18 +++-- api/schemas/schemas.py | 2 +- .../clickhouse/create/init_schema.sql | 65 ++++++++++++++++++- 6 files changed, 134 insertions(+), 12 deletions(-) diff --git a/api/chalicelib/core/metadata.py b/api/chalicelib/core/metadata.py index e761ab4f4..9469903d0 100644 --- a/api/chalicelib/core/metadata.py +++ b/api/chalicelib/core/metadata.py @@ -241,3 +241,25 @@ def get_colname_by_key(project_id, key): return None return index_to_colname(meta_keys[key]) + + +def get_for_filters(project_id): + with pg_client.PostgresClient() as cur: + query = cur.mogrify(f"""SELECT {",".join(column_names())} + FROM public.projects + WHERE project_id = %(project_id)s + AND deleted_at ISNULL + LIMIT 1;""", {"project_id": project_id}) + cur.execute(query=query) + metas = cur.fetchone() + results = [] + if metas is not None: + for i, k in enumerate(metas.keys()): + if metas[k] is not None: + results.append({"id": f"meta_{i}", + "name": k, + "displayName": metas[k], + "type": "string", + "autoCaptured": False, + "icon": None}) + return {"total": len(results), "list": results} diff --git a/api/chalicelib/core/product_analytics/events.py b/api/chalicelib/core/product_analytics/events.py index 8fde28e59..5b3b66938 100644 --- a/api/chalicelib/core/product_analytics/events.py +++ b/api/chalicelib/core/product_analytics/events.py @@ -12,17 +12,21 @@ def get_events(project_id: int, page: schemas.PaginatedSchema): with ClickHouseClient() as ch_client: r = ch_client.format( """SELECT COUNT(1) OVER () AS total, - event_name, display_name + event_name, display_name, description, + auto_captured FROM product_analytics.all_events WHERE project_id=%(project_id)s - ORDER BY display_name + ORDER BY auto_captured,display_name LIMIT %(limit)s OFFSET %(offset)s;""", parameters={"project_id": project_id, "limit": page.limit, "offset": (page.page - 1) * page.limit}) rows = ch_client.execute(r) if len(rows) == 0: return {"total": 0, "list": []} total = rows[0]["total"] - for row in rows: + for i, row in enumerate(rows): + row["id"] = f"event_{i}" + row["icon"] = None + row["type"] = "string" row.pop("total") return {"total": total, "list": rows} diff --git a/api/chalicelib/core/product_analytics/properties.py b/api/chalicelib/core/product_analytics/properties.py index cb9d082ec..7794814be 100644 --- a/api/chalicelib/core/product_analytics/properties.py +++ b/api/chalicelib/core/product_analytics/properties.py @@ -1,8 +1,33 @@ from chalicelib.utils import helper from chalicelib.utils.ch_client import ClickHouseClient +import schemas -def get_properties(project_id: int, event_name): +def get_all_properties(project_id: int, page: schemas.PaginatedSchema): + with ClickHouseClient() as ch_client: + r = ch_client.format( + """SELECT COUNT(1) OVER () AS total, + property_name, + display_name + FROM product_analytics.all_properties + WHERE all_properties.project_id=%(project_id)s + ORDER BY display_name + LIMIT %(limit)s OFFSET %(offset)s;""", + parameters={"project_id": project_id, + "limit": page.limit, + "offset": (page.page - 1) * page.limit}) + properties = ch_client.execute(r) + if len(properties) == 0: + return {"total": 0, "list": []} + total = properties[0]["total"] + for i, p in enumerate(properties): + p["id"] = f"prop_{i}" + p["icon"] = None + p.pop("total") + return {"total": total, "list": helper.list_to_camel_case(properties)} + + +def get_event_properties(project_id: int, event_name): with ClickHouseClient() as ch_client: r = ch_client.format( """SELECT all_properties.property_name, @@ -13,7 +38,7 @@ def get_properties(project_id: int, event_name): AND all_properties.project_id=%(project_id)s AND event_properties.event_name=%(event_name)s ORDER BY created_at;""", - parameters={"project_id": project_id,"event_name": event_name}) + parameters={"project_id": project_id, "event_name": event_name}) properties = ch_client.execute(r) return helper.list_to_camel_case(properties) diff --git a/api/routers/subs/product_analytics.py b/api/routers/subs/product_analytics.py index a158c4d92..aaebf788b 100644 --- a/api/routers/subs/product_analytics.py +++ b/api/routers/subs/product_analytics.py @@ -3,6 +3,7 @@ from typing import Annotated from fastapi import Body, Depends, Query import schemas +from chalicelib.core import metadata from chalicelib.core.product_analytics import events, properties from or_dependencies import OR_context from routers.base import get_routers @@ -10,23 +11,30 @@ from routers.base import get_routers public_app, app, app_apikey = get_routers() -@app.get('/{projectId}/events/names', tags=["product_analytics"]) -def get_all_events(projectId: int, filter_query: Annotated[schemas.PaginatedSchema, Query()], - context: schemas.CurrentContext = Depends(OR_context)): +@app.get('/{projectId}/filters', tags=["product_analytics"]) +def get_all_filters(projectId: int, filter_query: Annotated[schemas.PaginatedSchema, Query()], + context: schemas.CurrentContext = Depends(OR_context)): return { "data": { "events": events.get_events(project_id=projectId, page=filter_query), - "filters": {} + "filters": properties.get_all_properties(project_id=projectId, page=filter_query), + "metadata": metadata.get_for_filters(project_id=projectId) } } +@app.get('/{projectId}/events/names', tags=["product_analytics"]) +def get_all_events(projectId: int, filter_query: Annotated[schemas.PaginatedSchema, Query()], + context: schemas.CurrentContext = Depends(OR_context)): + return {"data": events.get_events(project_id=projectId, page=filter_query)} + + @app.get('/{projectId}/properties/search', tags=["product_analytics"]) def get_event_properties(projectId: int, event_name: str = None, context: schemas.CurrentContext = Depends(OR_context)): if not event_name or len(event_name) == 0: return {"data": []} - return {"data": properties.get_properties(project_id=projectId, event_name=event_name)} + return {"data": properties.get_event_properties(project_id=projectId, event_name=event_name)} @app.post('/{projectId}/events/search', tags=["product_analytics"]) diff --git a/api/schemas/schemas.py b/api/schemas/schemas.py index 2bef8fab0..f3173f0a2 100644 --- a/api/schemas/schemas.py +++ b/api/schemas/schemas.py @@ -584,7 +584,7 @@ class PropertyFilterSchema(BaseModel): name: Union[EventPredefinedPropertyType, str] = Field(...) operator: Union[SearchEventOperator, MathOperator] = Field(...) value: List[Union[int, str]] = Field(...) - property_type: Optional[Literal["string", "number", "date"]] = Field(default=None) + # property_type: Optional[Literal["string", "number", "date"]] = Field(default=None) @computed_field @property diff --git a/scripts/schema/db/init_dbs/clickhouse/create/init_schema.sql b/scripts/schema/db/init_dbs/clickhouse/create/init_schema.sql index 1816515c3..62bdbae69 100644 --- a/scripts/schema/db/init_dbs/clickhouse/create/init_schema.sql +++ b/scripts/schema/db/init_dbs/clickhouse/create/init_schema.sql @@ -501,6 +501,7 @@ CREATE TABLE IF NOT EXISTS product_analytics.group_properties -- The full list of events +-- Experimental: This table is filled by an incremental materialized view CREATE TABLE IF NOT EXISTS product_analytics.all_events ( project_id UInt16, @@ -516,19 +517,62 @@ CREATE TABLE IF NOT EXISTS product_analytics.all_events ) ENGINE = ReplacingMergeTree(_timestamp) ORDER BY (project_id, auto_captured, event_name); +-- ----------------- This is experimental, if it doesn't work, we need to do it in db worker ------------- +-- Incremental materialized view to fill event_properties using $properties +CREATE MATERIALIZED VIEW product_analytics.events_extractor TO product_analytics.all_events AS +SELECT DISTINCT ON (project_id,auto_captured,event_name) project_id, + `$auto_captured` AS auto_captured, + `$event_name` AS event_name, + display_name, + description +FROM product_analytics.events + LEFT JOIN (SELECT project_id, + auto_captured, + event_name, + display_name, + description + FROM product_analytics.all_events + WHERE all_events.display_name != '' + OR all_events.description != '') AS old_data + ON (events.project_id = old_data.project_id AND events.`$auto_captured` = old_data.auto_captured AND + events.`$event_name` = old_data.event_name); +-- -------- END --------- + -- The full list of event-properties (used to tell which property belongs to which event) +-- Experimental: This table is filled by an incremental materialized view CREATE TABLE IF NOT EXISTS product_analytics.event_properties ( project_id UInt16, event_name String, property_name String, +-- TODO: find a fix for this +-- value_type String, _timestamp DateTime DEFAULT now() ) ENGINE = ReplacingMergeTree(_timestamp) - ORDER BY (project_id, event_name, property_name); + ORDER BY (project_id, event_name, property_name, value_type); + +-- ----------------- This is experimental, if it doesn't work, we need to do it in db worker ------------- +-- Incremental materialized view to fill event_properties using $properties +CREATE MATERIALIZED VIEW product_analytics.event_properties_extractor TO product_analytics.event_properties AS +SELECT project_id, + `$event_name` AS event_name, + property_name +FROM product_analytics.events + ARRAY JOIN JSONExtractKeys(toString(`$properties`)) as property_name; + +-- Incremental materialized view to fill event_properties using properties +CREATE MATERIALIZED VIEW product_analytics.event_cproperties_extractor TO product_analytics.event_properties AS +SELECT project_id, + `$event_name` AS event_name, + property_name +FROM product_analytics.events + ARRAY JOIN JSONExtractKeys(toString(`properties`)) as property_name; +-- -------- END --------- -- The full list of properties (events and users) +-- Experimental: This table is filled by an incremental materialized view CREATE TABLE IF NOT EXISTS product_analytics.all_properties ( project_id UInt16, @@ -556,3 +600,22 @@ CREATE TABLE IF NOT EXISTS experimental.user_viewed_sessions PARTITION BY toYYYYMM(_timestamp) ORDER BY (project_id, user_id, session_id) TTL _timestamp + INTERVAL 3 MONTH; + +CREATE TABLE product_analytics.properties_example_values +( + id UInt64, + value String, + random_value Float64 +) + ENGINE = ReplacingMergeTree() + ORDER BY random_value; + +-- Incremental materialized view to get random examples of property values +CREATE MATERIALIZED VIEW product_analytics.properties_values_random_sampler TO product_analytics.properties_example_values AS +SELECT id, + value, + rand() AS random_value +FROM product_analytics.events +WHERE rand() < 0.5 -- This randomly skips ~50% of inserts +ORDER BY random_value +LIMIT 10;