Dev (#2867)
* fix(chalice): fixed CH funnels query for new driver * fix(chalice): fixed CH funnels support for nonexistent sequence
This commit is contained in:
parent
a654e30df2
commit
77d4c890cf
2 changed files with 5 additions and 5 deletions
|
|
@ -7,6 +7,8 @@ import schemas
|
||||||
from chalicelib.utils import ch_client
|
from chalicelib.utils import ch_client
|
||||||
from chalicelib.utils import exp_ch_helper
|
from chalicelib.utils import exp_ch_helper
|
||||||
from chalicelib.utils import helper
|
from chalicelib.utils import helper
|
||||||
|
from chalicelib.utils import sql_helper as sh
|
||||||
|
from chalicelib.core import events
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
@ -208,7 +210,7 @@ def get_simple_funnel(filter_d: schemas.CardSeriesFilterSchema, project: schemas
|
||||||
sequences = []
|
sequences = []
|
||||||
projections = []
|
projections = []
|
||||||
for i, s in enumerate(n_stages_query):
|
for i, s in enumerate(n_stages_query):
|
||||||
projections.append(f"SUM(T{i + 1}) AS stage{i + 1}")
|
projections.append(f"coalesce(SUM(T{i + 1}),0) AS stage{i + 1}")
|
||||||
if i == 0:
|
if i == 0:
|
||||||
sequences.append(f"anyIf(1,{s}) AS T1")
|
sequences.append(f"anyIf(1,{s}) AS T1")
|
||||||
else:
|
else:
|
||||||
|
|
@ -226,11 +228,10 @@ def get_simple_funnel(filter_d: schemas.CardSeriesFilterSchema, project: schemas
|
||||||
FROM (SELECT {",".join(sequences)}
|
FROM (SELECT {",".join(sequences)}
|
||||||
FROM {MAIN_EVENTS_TABLE} AS e {extra_from}
|
FROM {MAIN_EVENTS_TABLE} AS e {extra_from}
|
||||||
WHERE {" AND ".join(constraints)}
|
WHERE {" AND ".join(constraints)}
|
||||||
GROUP BY {group_by}) AS raw;
|
GROUP BY {group_by}) AS raw;"""
|
||||||
"""
|
|
||||||
|
|
||||||
with ch_client.ClickHouseClient() as cur:
|
with ch_client.ClickHouseClient() as cur:
|
||||||
query = cur.format(n_stages_query, full_args)
|
query = cur.format(query=n_stages_query, parameters=full_args)
|
||||||
logger.debug("---------------------------------------------------")
|
logger.debug("---------------------------------------------------")
|
||||||
logger.debug(query)
|
logger.debug(query)
|
||||||
logger.debug("---------------------------------------------------")
|
logger.debug("---------------------------------------------------")
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,6 @@ if config("CH_COMPRESSION", cast=bool, default=True):
|
||||||
def transform_result(original_function):
|
def transform_result(original_function):
|
||||||
@wraps(original_function)
|
@wraps(original_function)
|
||||||
def wrapper(*args, **kwargs):
|
def wrapper(*args, **kwargs):
|
||||||
logger.info("Executing query on CH")
|
|
||||||
result = original_function(*args, **kwargs)
|
result = original_function(*args, **kwargs)
|
||||||
if isinstance(result, clickhouse_connect.driver.query.QueryResult):
|
if isinstance(result, clickhouse_connect.driver.query.QueryResult):
|
||||||
column_names = result.column_names
|
column_names = result.column_names
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue