feat(DB): use incremental materialized views to fill extra tables

refactor(chalice): changed product analytics
This commit is contained in:
Taha Yassine Kraiem 2025-03-25 17:43:45 +01:00 committed by Kraiem Taha Yassine
parent ec2c42c688
commit 0500f30d14
6 changed files with 134 additions and 12 deletions

View file

@ -241,3 +241,25 @@ def get_colname_by_key(project_id, key):
return None
return index_to_colname(meta_keys[key])
def get_for_filters(project_id):
with pg_client.PostgresClient() as cur:
query = cur.mogrify(f"""SELECT {",".join(column_names())}
FROM public.projects
WHERE project_id = %(project_id)s
AND deleted_at ISNULL
LIMIT 1;""", {"project_id": project_id})
cur.execute(query=query)
metas = cur.fetchone()
results = []
if metas is not None:
for i, k in enumerate(metas.keys()):
if metas[k] is not None:
results.append({"id": f"meta_{i}",
"name": k,
"displayName": metas[k],
"type": "string",
"autoCaptured": False,
"icon": None})
return {"total": len(results), "list": results}

View file

@ -12,17 +12,21 @@ def get_events(project_id: int, page: schemas.PaginatedSchema):
with ClickHouseClient() as ch_client:
r = ch_client.format(
"""SELECT COUNT(1) OVER () AS total,
event_name, display_name
event_name, display_name, description,
auto_captured
FROM product_analytics.all_events
WHERE project_id=%(project_id)s
ORDER BY display_name
ORDER BY auto_captured,display_name
LIMIT %(limit)s OFFSET %(offset)s;""",
parameters={"project_id": project_id, "limit": page.limit, "offset": (page.page - 1) * page.limit})
rows = ch_client.execute(r)
if len(rows) == 0:
return {"total": 0, "list": []}
total = rows[0]["total"]
for row in rows:
for i, row in enumerate(rows):
row["id"] = f"event_{i}"
row["icon"] = None
row["type"] = "string"
row.pop("total")
return {"total": total, "list": rows}

View file

@ -1,8 +1,33 @@
from chalicelib.utils import helper
from chalicelib.utils.ch_client import ClickHouseClient
import schemas
def get_properties(project_id: int, event_name):
def get_all_properties(project_id: int, page: schemas.PaginatedSchema):
with ClickHouseClient() as ch_client:
r = ch_client.format(
"""SELECT COUNT(1) OVER () AS total,
property_name,
display_name
FROM product_analytics.all_properties
WHERE all_properties.project_id=%(project_id)s
ORDER BY display_name
LIMIT %(limit)s OFFSET %(offset)s;""",
parameters={"project_id": project_id,
"limit": page.limit,
"offset": (page.page - 1) * page.limit})
properties = ch_client.execute(r)
if len(properties) == 0:
return {"total": 0, "list": []}
total = properties[0]["total"]
for i, p in enumerate(properties):
p["id"] = f"prop_{i}"
p["icon"] = None
p.pop("total")
return {"total": total, "list": helper.list_to_camel_case(properties)}
def get_event_properties(project_id: int, event_name):
with ClickHouseClient() as ch_client:
r = ch_client.format(
"""SELECT all_properties.property_name,

View file

@ -3,6 +3,7 @@ from typing import Annotated
from fastapi import Body, Depends, Query
import schemas
from chalicelib.core import metadata
from chalicelib.core.product_analytics import events, properties
from or_dependencies import OR_context
from routers.base import get_routers
@ -10,23 +11,30 @@ from routers.base import get_routers
public_app, app, app_apikey = get_routers()
@app.get('/{projectId}/events/names', tags=["product_analytics"])
def get_all_events(projectId: int, filter_query: Annotated[schemas.PaginatedSchema, Query()],
@app.get('/{projectId}/filters', tags=["product_analytics"])
def get_all_filters(projectId: int, filter_query: Annotated[schemas.PaginatedSchema, Query()],
context: schemas.CurrentContext = Depends(OR_context)):
return {
"data": {
"events": events.get_events(project_id=projectId, page=filter_query),
"filters": {}
"filters": properties.get_all_properties(project_id=projectId, page=filter_query),
"metadata": metadata.get_for_filters(project_id=projectId)
}
}
@app.get('/{projectId}/events/names', tags=["product_analytics"])
def get_all_events(projectId: int, filter_query: Annotated[schemas.PaginatedSchema, Query()],
context: schemas.CurrentContext = Depends(OR_context)):
return {"data": events.get_events(project_id=projectId, page=filter_query)}
@app.get('/{projectId}/properties/search', tags=["product_analytics"])
def get_event_properties(projectId: int, event_name: str = None,
context: schemas.CurrentContext = Depends(OR_context)):
if not event_name or len(event_name) == 0:
return {"data": []}
return {"data": properties.get_properties(project_id=projectId, event_name=event_name)}
return {"data": properties.get_event_properties(project_id=projectId, event_name=event_name)}
@app.post('/{projectId}/events/search', tags=["product_analytics"])

View file

@ -584,7 +584,7 @@ class PropertyFilterSchema(BaseModel):
name: Union[EventPredefinedPropertyType, str] = Field(...)
operator: Union[SearchEventOperator, MathOperator] = Field(...)
value: List[Union[int, str]] = Field(...)
property_type: Optional[Literal["string", "number", "date"]] = Field(default=None)
# property_type: Optional[Literal["string", "number", "date"]] = Field(default=None)
@computed_field
@property

View file

@ -501,6 +501,7 @@ CREATE TABLE IF NOT EXISTS product_analytics.group_properties
-- The full list of events
-- Experimental: This table is filled by an incremental materialized view
CREATE TABLE IF NOT EXISTS product_analytics.all_events
(
project_id UInt16,
@ -516,19 +517,62 @@ CREATE TABLE IF NOT EXISTS product_analytics.all_events
) ENGINE = ReplacingMergeTree(_timestamp)
ORDER BY (project_id, auto_captured, event_name);
-- ----------------- This is experimental, if it doesn't work, we need to do it in db worker -------------
-- Incremental materialized view to fill event_properties using $properties
CREATE MATERIALIZED VIEW product_analytics.events_extractor TO product_analytics.all_events AS
SELECT DISTINCT ON (project_id,auto_captured,event_name) project_id,
`$auto_captured` AS auto_captured,
`$event_name` AS event_name,
display_name,
description
FROM product_analytics.events
LEFT JOIN (SELECT project_id,
auto_captured,
event_name,
display_name,
description
FROM product_analytics.all_events
WHERE all_events.display_name != ''
OR all_events.description != '') AS old_data
ON (events.project_id = old_data.project_id AND events.`$auto_captured` = old_data.auto_captured AND
events.`$event_name` = old_data.event_name);
-- -------- END ---------
-- The full list of event-properties (used to tell which property belongs to which event)
-- Experimental: This table is filled by an incremental materialized view
CREATE TABLE IF NOT EXISTS product_analytics.event_properties
(
project_id UInt16,
event_name String,
property_name String,
-- TODO: find a fix for this
-- value_type String,
_timestamp DateTime DEFAULT now()
) ENGINE = ReplacingMergeTree(_timestamp)
ORDER BY (project_id, event_name, property_name);
ORDER BY (project_id, event_name, property_name, value_type);
-- ----------------- This is experimental, if it doesn't work, we need to do it in db worker -------------
-- Incremental materialized view to fill event_properties using $properties
CREATE MATERIALIZED VIEW product_analytics.event_properties_extractor TO product_analytics.event_properties AS
SELECT project_id,
`$event_name` AS event_name,
property_name
FROM product_analytics.events
ARRAY JOIN JSONExtractKeys(toString(`$properties`)) as property_name;
-- Incremental materialized view to fill event_properties using properties
CREATE MATERIALIZED VIEW product_analytics.event_cproperties_extractor TO product_analytics.event_properties AS
SELECT project_id,
`$event_name` AS event_name,
property_name
FROM product_analytics.events
ARRAY JOIN JSONExtractKeys(toString(`properties`)) as property_name;
-- -------- END ---------
-- The full list of properties (events and users)
-- Experimental: This table is filled by an incremental materialized view
CREATE TABLE IF NOT EXISTS product_analytics.all_properties
(
project_id UInt16,
@ -556,3 +600,22 @@ CREATE TABLE IF NOT EXISTS experimental.user_viewed_sessions
PARTITION BY toYYYYMM(_timestamp)
ORDER BY (project_id, user_id, session_id)
TTL _timestamp + INTERVAL 3 MONTH;
CREATE TABLE product_analytics.properties_example_values
(
id UInt64,
value String,
random_value Float64
)
ENGINE = ReplacingMergeTree()
ORDER BY random_value;
-- Incremental materialized view to get random examples of property values
CREATE MATERIALIZED VIEW product_analytics.properties_values_random_sampler TO product_analytics.properties_example_values AS
SELECT id,
value,
rand() AS random_value
FROM product_analytics.events
WHERE rand() < 0.5 -- This randomly skips ~50% of inserts
ORDER BY random_value
LIMIT 10;