openreplay/ee/api/chalicelib/core/significance_exp.py
Kraiem Taha Yassine 40b7aa1c78
Dev (#2740)
* fix(chalice): fixed Math-operators validation
refactor(chalice): search for sessions that have events for heatmaps

* refactor(chalice): search for sessions that have at least 1 location event for heatmaps

* fix(chalice): fixed Math-operators validation
refactor(chalice): search for sessions that have events for heatmaps

* refactor(chalice): search for sessions that have at least 1 location event for heatmaps

* feat(chalice): autocomplete return top 10 with stats

* fix(chalice): fixed autocomplete top 10 meta-filters

* fix(chalice): fixed exp funnels group by users
2024-11-13 17:43:13 +01:00

262 lines
12 KiB
Python

from chalicelib.utils import ch_client
from chalicelib.utils import exp_ch_helper
from .significance import *
logger = logging.getLogger(__name__)
def get_simple_funnel(filter_d: schemas.CardSeriesFilterSchema, project: schemas.ProjectContext,
metric_format: schemas.MetricExtendedFormatType) -> List[RealDictRow]:
stages: List[schemas.SessionSearchEventSchema2] = filter_d.events
filters: List[schemas.SessionSearchFilterSchema] = filter_d.filters
platform = project.platform
constraints = ["e.project_id = %(project_id)s",
"e.datetime >= toDateTime(%(startTimestamp)s/1000)",
"e.datetime <= toDateTime(%(endTimestamp)s/1000)",
"e.event_type IN %(eventTypes)s"]
full_args = {"project_id": project.project_id, "startTimestamp": filter_d.startTimestamp,
"endTimestamp": filter_d.endTimestamp}
MAIN_EVENTS_TABLE = exp_ch_helper.get_main_events_table(timestamp=filter_d.startTimestamp,
platform=platform)
MAIN_SESSIONS_TABLE = exp_ch_helper.get_main_sessions_table(filter_d.startTimestamp)
full_args["MAIN_EVENTS_TABLE"] = MAIN_EVENTS_TABLE
full_args["MAIN_SESSIONS_TABLE"] = MAIN_SESSIONS_TABLE
n_stages_query = []
n_stages_query_not = []
event_types = []
values = {}
has_filters = False
if len(filters) > 0:
meta_keys = None
for i, f in enumerate(filters):
if len(f.value) == 0:
continue
has_filters = True
f.value = helper.values_for_operator(value=f.value, op=f.operator)
op = sh.get_sql_operator(f.operator)
filter_type = f.type
f_k = f"f_value{i}"
values = {**values,
**sh.multi_values(f.value, value_key=f_k)}
is_not = False
if sh.is_negation_operator(f.operator):
is_not = True
if filter_type == schemas.FilterType.USER_BROWSER:
constraints.append(
sh.multi_conditions(f's.user_browser {op} %({f_k})s', f.value, is_not=is_not, value_key=f_k))
elif filter_type in [schemas.FilterType.USER_OS, schemas.FilterType.USER_OS_MOBILE]:
constraints.append(
sh.multi_conditions(f's.user_os {op} %({f_k})s', f.value, is_not=is_not, value_key=f_k))
elif filter_type in [schemas.FilterType.USER_DEVICE, schemas.FilterType.USER_DEVICE_MOBILE]:
constraints.append(
sh.multi_conditions(f's.user_device {op} %({f_k})s', f.value, is_not=is_not, value_key=f_k))
elif filter_type in [schemas.FilterType.USER_COUNTRY, schemas.FilterType.USER_COUNTRY_MOBILE]:
constraints.append(
sh.multi_conditions(f's.user_country {op} %({f_k})s', f.value, is_not=is_not, value_key=f_k))
elif filter_type == schemas.FilterType.DURATION:
if len(f.value) > 0 and f.value[0] is not None:
constraints.append(f's.duration >= %(minDuration)s')
values["minDuration"] = f.value[0]
if len(f["value"]) > 1 and f.value[1] is not None and int(f.value[1]) > 0:
constraints.append('s.duration <= %(maxDuration)s')
values["maxDuration"] = f.value[1]
elif filter_type == schemas.FilterType.REFERRER:
constraints.append(
sh.multi_conditions(f"s.base_referrer {op} %({f_k})s", f.value, is_not=is_not, value_key=f_k))
elif filter_type == events.EventType.METADATA.ui_type:
if meta_keys is None:
meta_keys = metadata.get(project_id=project.project_id)
meta_keys = {m["key"]: m["index"] for m in meta_keys}
if f.source in meta_keys.keys():
constraints.append(
sh.multi_conditions(
f's.{metadata.index_to_colname(meta_keys[f.source])} {op} %({f_k})s', f.value,
is_not=is_not, value_key=f_k))
elif filter_type in [schemas.FilterType.USER_ID, schemas.FilterType.USER_ID_MOBILE]:
constraints.append(
sh.multi_conditions(f's.user_id {op} %({f_k})s', f.value, is_not=is_not, value_key=f_k))
elif filter_type in [schemas.FilterType.USER_ANONYMOUS_ID,
schemas.FilterType.USER_ANONYMOUS_ID_MOBILE]:
constraints.append(
sh.multi_conditions(f's.user_anonymous_id {op} %({f_k})s', f.value, is_not=is_not, value_key=f_k))
elif filter_type in [schemas.FilterType.REV_ID, schemas.FilterType.REV_ID_MOBILE]:
constraints.append(
sh.multi_conditions(f's.rev_id {op} %({f_k})s', f.value, is_not=is_not, value_key=f_k))
i = -1
for s in stages:
if s.operator is None:
s.operator = schemas.SearchEventOperator.IS
if not isinstance(s.value, list):
s.value = [s.value]
is_any = sh.isAny_opreator(s.operator)
if not is_any and isinstance(s.value, list) and len(s.value) == 0:
continue
i += 1
op = sh.get_sql_operator(s.operator)
is_not = False
if sh.is_negation_operator(s.operator):
is_not = True
op = sh.reverse_sql_operator(op)
specific_condition = None
e_k = f"e_value{i}"
event_type = s.type
next_event_type = exp_ch_helper.get_event_type(event_type, platform=platform)
if event_type == events.EventType.CLICK.ui_type:
if platform == "web":
next_col_name = events.EventType.CLICK.column
if not is_any:
if schemas.ClickEventExtraOperator.has_value(s.operator):
specific_condition = sh.multi_conditions(f"selector {op} %({e_k})s", s.value, value_key=e_k)
else:
next_col_name = events.EventType.CLICK_MOBILE.column
elif event_type == events.EventType.INPUT.ui_type:
next_col_name = events.EventType.INPUT.column
elif event_type == events.EventType.LOCATION.ui_type:
next_col_name = 'url_path'
elif event_type == events.EventType.CUSTOM.ui_type:
next_col_name = events.EventType.CUSTOM.column
# IOS --------------
elif event_type == events.EventType.CLICK_MOBILE.ui_type:
next_col_name = events.EventType.CLICK_MOBILE.column
elif event_type == events.EventType.INPUT_MOBILE.ui_type:
next_col_name = events.EventType.INPUT_MOBILE.column
elif event_type == events.EventType.VIEW_MOBILE.ui_type:
next_col_name = events.EventType.VIEW_MOBILE.column
elif event_type == events.EventType.CUSTOM_MOBILE.ui_type:
next_col_name = events.EventType.CUSTOM_MOBILE.column
else:
logger.warning(f"=================UNDEFINED:{event_type}")
continue
values = {**values, **sh.multi_values(helper.values_for_operator(value=s.value, op=s.operator), value_key=e_k)}
if next_event_type not in event_types:
event_types.append(next_event_type)
full_args[f"event_type_{i}"] = next_event_type
n_stages_query.append(f"event_type=%(event_type_{i})s")
if is_not:
n_stages_query_not.append(n_stages_query[-1] + " AND " +
(sh.multi_conditions(f' {next_col_name} {op} %({e_k})s', s.value,
is_not=is_not, value_key=e_k)
if not specific_condition else specific_condition))
elif not is_any:
n_stages_query[-1] += " AND " + (sh.multi_conditions(f' {next_col_name} {op} %({e_k})s', s.value,
is_not=is_not, value_key=e_k)
if not specific_condition else specific_condition)
full_args = {"eventTypes": tuple(event_types), **full_args, **values}
n_stages = len(n_stages_query)
if n_stages == 0:
return []
extra_from = ""
if has_filters or metric_format == schemas.MetricExtendedFormatType.USER_COUNT:
extra_from = f"INNER JOIN {MAIN_SESSIONS_TABLE} AS s ON (e.session_id=s.session_id)"
constraints += ["s.project_id = %(project_id)s",
"s.datetime >= toDateTime(%(startTimestamp)s/1000)",
"s.datetime <= toDateTime(%(endTimestamp)s/1000)"]
if metric_format == schemas.MetricExtendedFormatType.SESSION_COUNT:
group_by = 'e.session_id'
else:
constraints.append("isNotNull(s.user_id)")
group_by = 's.user_id'
if len(n_stages_query_not) > 0:
value_conditions_not_base = ["project_id = %(project_id)s",
"datetime >= toDateTime(%(startTimestamp)s/1000)",
"datetime <= toDateTime(%(endTimestamp)s/1000)"]
_value_conditions_not = []
value_conditions_not = []
for c in n_stages_query_not:
_p = c % full_args
if _p not in _value_conditions_not:
_value_conditions_not.append(_p)
value_conditions_not.append(c)
extra_from += f"""LEFT ANTI JOIN (SELECT DISTINCT sub.session_id
FROM {MAIN_EVENTS_TABLE} AS sub
WHERE {' AND '.join(value_conditions_not_base)}
AND ({' OR '.join([c for c in value_conditions_not])})
) AS sub ON(e.session_id=sub.session_id)"""
del _value_conditions_not
del value_conditions_not
del value_conditions_not_base
sequences = []
projections = []
for i, s in enumerate(n_stages_query):
projections.append(f"SUM(T{i + 1}) AS stage{i + 1}")
if i == 0:
sequences.append(f"anyIf(1,{s}) AS T1")
else:
pattern = ""
conditions = []
j = 0
while j <= i:
pattern += f"(?{j + 1})"
conditions.append(n_stages_query[j])
j += 1
sequences.append(f"sequenceMatch('{pattern}')(e.datetime, {','.join(conditions)}) AS T{i + 1}")
n_stages_query = f"""
SELECT {",".join(projections)}
FROM (SELECT {",".join(sequences)}
FROM {MAIN_EVENTS_TABLE} AS e {extra_from}
WHERE {" AND ".join(constraints)}
GROUP BY {group_by}) AS raw;
"""
with ch_client.ClickHouseClient() as cur:
query = cur.format(n_stages_query, full_args)
logger.debug("---------------------------------------------------")
logger.debug(query)
logger.debug("---------------------------------------------------")
try:
row = cur.execute(query)
except Exception as err:
logger.warning("--------- SIMPLE FUNNEL SEARCH QUERY EXCEPTION CH-----------")
logger.warning(query)
logger.warning("--------- PAYLOAD -----------")
logger.warning(filter_d.model_dump_json())
logger.warning("--------------------")
raise err
stages_list = []
row = row[0]
for i, stage in enumerate(stages):
count = row[f"stage{i + 1}"]
drop = None
if i != 0:
base_count = row[f"stage{i}"]
if base_count == 0:
drop = 0
elif base_count > 0:
drop = int(100 * (base_count - count) / base_count)
stages_list.append(
{"value": stage.value,
"type": stage.type,
"operator": stage.operator,
"dropPct": drop,
"count": count
}
)
return stages_list