Merge remote-tracking branch 'origin/api-v1.8.2' into dev

This commit is contained in:
Taha Yassine Kraiem 2022-11-24 18:42:46 +01:00
commit 971ea6ed3b
5 changed files with 195 additions and 180 deletions

View file

@ -177,7 +177,7 @@ def _isUndefined_operator(op: schemas.SearchEventOperator):
# This function executes the query and return result
def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_id, errors_only=False,
error_status=schemas.ErrorStatus.all, count_only=False, issue=None):
error_status=schemas.ErrorStatus.all, count_only=False, issue=None, ids_only=False):
if data.bookmarked:
data.startDate, data.endDate = sessions_favorite.get_start_end_timestamp(project_id, user_id)
@ -185,9 +185,11 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_
favorite_only=data.bookmarked, issue=issue, project_id=project_id,
user_id=user_id)
if data.limit is not None and data.page is not None:
full_args["sessions_limit"] = data.limit
full_args["sessions_limit_s"] = (data.page - 1) * data.limit
full_args["sessions_limit_e"] = data.page * data.limit
else:
full_args["sessions_limit"] = 200
full_args["sessions_limit_s"] = 1
full_args["sessions_limit_e"] = 200
@ -243,17 +245,24 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_
# sort += " " + data.order + "," + helper.key_to_snake_case(data.sort)
sort = helper.key_to_snake_case(data.sort)
meta_keys = metadata.get(project_id=project_id)
main_query = cur.mogrify(f"""SELECT COUNT(full_sessions) AS count,
COALESCE(JSONB_AGG(full_sessions)
FILTER (WHERE rn>%(sessions_limit_s)s AND rn<=%(sessions_limit_e)s), '[]'::JSONB) AS sessions
FROM (SELECT *, ROW_NUMBER() OVER (ORDER BY {sort} {data.order}, issue_score DESC) AS rn
FROM (SELECT DISTINCT ON(s.session_id) {SESSION_PROJECTION_COLS}
{"," if len(meta_keys) > 0 else ""}{",".join([f'metadata_{m["index"]}' for m in meta_keys])}
{query_part}
ORDER BY s.session_id desc) AS filtred_sessions
ORDER BY {sort} {data.order}, issue_score DESC) AS full_sessions;""",
full_args)
if ids_only:
main_query = cur.mogrify(f"""SELECT DISTINCT ON(s.session_id) s.session_id
{query_part}
ORDER BY s.session_id desc
LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s;""",
full_args)
else:
meta_keys = metadata.get(project_id=project_id)
main_query = cur.mogrify(f"""SELECT COUNT(full_sessions) AS count,
COALESCE(JSONB_AGG(full_sessions)
FILTER (WHERE rn>%(sessions_limit_s)s AND rn<=%(sessions_limit_e)s), '[]'::JSONB) AS sessions
FROM (SELECT *, ROW_NUMBER() OVER (ORDER BY {sort} {data.order}, issue_score DESC) AS rn
FROM (SELECT DISTINCT ON(s.session_id) {SESSION_PROJECTION_COLS}
{"," if len(meta_keys) > 0 else ""}{",".join([f'metadata_{m["index"]}' for m in meta_keys])}
{query_part}
ORDER BY s.session_id desc) AS filtred_sessions
ORDER BY {sort} {data.order}, issue_score DESC) AS full_sessions;""",
full_args)
# print("--------------------")
# print(main_query)
# print("--------------------")
@ -266,7 +275,7 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_
print(data.json())
print("--------------------")
raise err
if errors_only:
if errors_only or ids_only:
return helper.list_to_camel_case(cur.fetchall())
sessions = cur.fetchone()

View file

@ -181,9 +181,7 @@ def get_stages_and_events(filter_d, project_id) -> List[RealDictRow]:
values=s["value"], value_key=f"value{i + 1}")
n_stages_query.append(f"""
(SELECT main.session_id,
{"MIN(main.timestamp)" if i + 1 < len(stages) else "MAX(main.timestamp)"} AS stage{i + 1}_timestamp,
'{event_type}' AS type,
'{s["operator"]}' AS operator
{"MIN(main.timestamp)" if i + 1 < len(stages) else "MAX(main.timestamp)"} AS stage{i + 1}_timestamp
FROM {next_table} AS main {" ".join(extra_from)}
WHERE main.timestamp >= {f"T{i}.stage{i}_timestamp" if i > 0 else "%(startTimestamp)s"}
{f"AND main.session_id=T1.session_id" if i > 0 else ""}
@ -191,30 +189,33 @@ def get_stages_and_events(filter_d, project_id) -> List[RealDictRow]:
{(" AND " + " AND ".join(stage_constraints)) if len(stage_constraints) > 0 else ""}
{(" AND " + " AND ".join(first_stage_extra_constraints)) if len(first_stage_extra_constraints) > 0 and i == 0 else ""}
GROUP BY main.session_id)
AS T{i + 1} {"USING (session_id)" if i > 0 else ""}
AS T{i + 1} {"ON (TRUE)" if i > 0 else ""}
""")
if len(n_stages_query) == 0:
n_stages=len(n_stages_query)
if n_stages == 0:
return []
n_stages_query = " LEFT JOIN LATERAL ".join(n_stages_query)
n_stages_query += ") AS stages_t"
n_stages_query = f"""
SELECT stages_and_issues_t.*, sessions.user_uuid FROM (
SELECT stages_and_issues_t.*, sessions.user_uuid
FROM (
SELECT * FROM (
SELECT * FROM
{n_stages_query}
SELECT T1.session_id, {",".join([f"stage{i + 1}_timestamp" for i in range(n_stages)])}
FROM {n_stages_query}
LEFT JOIN LATERAL
( SELECT ISE.session_id,
ISS.type as issue_type,
( SELECT ISS.type as issue_type,
ISE.timestamp AS issue_timestamp,
ISS.context_string as issue_context,
COALESCE(ISS.context_string,'') as issue_context,
ISS.issue_id as issue_id
FROM events_common.issues AS ISE INNER JOIN issues AS ISS USING (issue_id)
WHERE ISE.timestamp >= stages_t.stage1_timestamp
AND ISE.timestamp <= stages_t.stage{i + 1}_timestamp
AND ISS.project_id=%(project_id)s
AND ISE.session_id = stages_t.session_id
{"AND ISS.type IN %(issueTypes)s" if len(filter_issues) > 0 else ""}
) AS issues_t USING (session_id)
LIMIT 20 -- remove the limit to get exact stats
) AS issues_t ON (TRUE)
) AS stages_and_issues_t INNER JOIN sessions USING(session_id);
"""
@ -297,7 +298,21 @@ def pearson_corr(x: list, y: list):
return r, confidence, False
def get_transitions_and_issues_of_each_type(rows: List[RealDictRow], all_issues_with_context, first_stage, last_stage):
# def tuple_or(t: tuple):
# x = 0
# for el in t:
# x |= el # | is for bitwise OR
# return x
#
# The following function is correct optimization of the previous function because t is a list of 0,1
def tuple_or(t: tuple):
for el in t:
if el > 0:
return 1
return 0
def get_transitions_and_issues_of_each_type(rows: List[RealDictRow], all_issues, first_stage, last_stage):
"""
Returns two lists with binary values 0/1:
@ -316,12 +331,6 @@ def get_transitions_and_issues_of_each_type(rows: List[RealDictRow], all_issues_
transitions = []
n_sess_affected = 0
errors = {}
for issue in all_issues_with_context:
split = issue.split('__^__')
errors[issue] = {
"errors": [],
"issue_type": split[0],
"context": split[1]}
for row in rows:
t = 0
@ -329,38 +338,26 @@ def get_transitions_and_issues_of_each_type(rows: List[RealDictRow], all_issues_
last_ts = row[f'stage{last_stage}_timestamp']
if first_ts is None:
continue
elif first_ts is not None and last_ts is not None:
elif last_ts is not None:
t = 1
transitions.append(t)
ic_present = False
for issue_type_with_context in errors:
for error_id in all_issues:
if error_id not in errors:
errors[error_id] = []
ic = 0
issue_type = errors[issue_type_with_context]["issue_type"]
context = errors[issue_type_with_context]["context"]
if row['issue_type'] is not None:
row_issue_id=row['issue_id']
if row_issue_id is not None:
if last_ts is None or (first_ts < row['issue_timestamp'] < last_ts):
context_in_row = row['issue_context'] if row['issue_context'] is not None else ''
if issue_type == row['issue_type'] and context == context_in_row:
if error_id == row_issue_id:
ic = 1
ic_present = True
errors[issue_type_with_context]["errors"].append(ic)
errors[error_id].append(ic)
if ic_present and t:
n_sess_affected += 1
# def tuple_or(t: tuple):
# x = 0
# for el in t:
# x |= el
# return x
def tuple_or(t: tuple):
for el in t:
if el > 0:
return 1
return 0
errors = {key: errors[key]["errors"] for key in errors}
all_errors = [tuple_or(t) for t in zip(*errors.values())]
return transitions, errors, all_errors, n_sess_affected
@ -376,10 +373,9 @@ def get_affected_users_for_all_issues(rows, first_stage, last_stage):
"""
affected_users = defaultdict(lambda: set())
affected_sessions = defaultdict(lambda: set())
contexts = defaultdict(lambda: None)
all_issues = {}
n_affected_users_dict = defaultdict(lambda: None)
n_affected_sessions_dict = defaultdict(lambda: None)
all_issues_with_context = set()
n_issues_dict = defaultdict(lambda: 0)
issues_by_session = defaultdict(lambda: 0)
@ -395,15 +391,13 @@ def get_affected_users_for_all_issues(rows, first_stage, last_stage):
# check that the issue exists and belongs to subfunnel:
if iss is not None and (row[f'stage{last_stage}_timestamp'] is None or
(row[f'stage{first_stage}_timestamp'] < iss_ts < row[f'stage{last_stage}_timestamp'])):
context_string = row['issue_context'] if row['issue_context'] is not None else ''
issue_with_context = iss + '__^__' + context_string
contexts[issue_with_context] = {"context": context_string, "id": row["issue_id"]}
all_issues_with_context.add(issue_with_context)
n_issues_dict[issue_with_context] += 1
if row["issue_id"] not in all_issues:
all_issues[row["issue_id"]] = {"context": row['issue_context'], "issue_type": row["issue_type"]}
n_issues_dict[row["issue_id"]] += 1
if row['user_uuid'] is not None:
affected_users[issue_with_context].add(row['user_uuid'])
affected_users[row["issue_id"]].add(row['user_uuid'])
affected_sessions[issue_with_context].add(row['session_id'])
affected_sessions[row["issue_id"]].add(row['session_id'])
issues_by_session[row[f'session_id']] += 1
if len(affected_users) > 0:
@ -414,29 +408,28 @@ def get_affected_users_for_all_issues(rows, first_stage, last_stage):
n_affected_sessions_dict.update({
iss: len(affected_sessions[iss]) for iss in affected_sessions
})
return all_issues_with_context, n_issues_dict, n_affected_users_dict, n_affected_sessions_dict, contexts
return all_issues, n_issues_dict, n_affected_users_dict, n_affected_sessions_dict
def count_sessions(rows, n_stages):
session_counts = {i: set() for i in range(1, n_stages + 1)}
for ind, row in enumerate(rows):
for row in rows:
for i in range(1, n_stages + 1):
if row[f"stage{i}_timestamp"] is not None:
session_counts[i].add(row[f"session_id"])
session_counts = {i: len(session_counts[i]) for i in session_counts}
return session_counts
def count_users(rows, n_stages):
users_in_stages = defaultdict(lambda: set())
for ind, row in enumerate(rows):
users_in_stages = {i: set() for i in range(1, n_stages + 1)}
for row in rows:
for i in range(1, n_stages + 1):
if row[f"stage{i}_timestamp"] is not None:
users_in_stages[i].add(row["user_uuid"])
users_count = {i: len(users_in_stages[i]) for i in range(1, n_stages + 1)}
return users_count
@ -489,18 +482,18 @@ def get_issues(stages, rows, first_stage=None, last_stage=None, drop_only=False)
last_stage = n_stages
n_critical_issues = 0
issues_dict = dict({"significant": [],
"insignificant": []})
issues_dict = {"significant": [],
"insignificant": []}
session_counts = count_sessions(rows, n_stages)
drop = session_counts[first_stage] - session_counts[last_stage]
all_issues_with_context, n_issues_dict, affected_users_dict, affected_sessions, contexts = get_affected_users_for_all_issues(
all_issues, n_issues_dict, affected_users_dict, affected_sessions = get_affected_users_for_all_issues(
rows, first_stage, last_stage)
transitions, errors, all_errors, n_sess_affected = get_transitions_and_issues_of_each_type(rows,
all_issues_with_context,
all_issues,
first_stage, last_stage)
# print("len(transitions) =", len(transitions))
del rows
if any(all_errors):
total_drop_corr, conf, is_sign = pearson_corr(transitions, all_errors)
@ -513,33 +506,32 @@ def get_issues(stages, rows, first_stage=None, last_stage=None, drop_only=False)
if drop_only:
return total_drop_due_to_issues
for issue in all_issues_with_context:
for issue_id in all_issues:
if not any(errors[issue]):
if not any(errors[issue_id]):
continue
r, confidence, is_sign = pearson_corr(transitions, errors[issue])
r, confidence, is_sign = pearson_corr(transitions, errors[issue_id])
if r is not None and drop is not None and is_sign:
lost_conversions = int(r * affected_sessions[issue])
lost_conversions = int(r * affected_sessions[issue_id])
else:
lost_conversions = None
if r is None:
r = 0
split = issue.split('__^__')
issues_dict['significant' if is_sign else 'insignificant'].append({
"type": split[0],
"title": helper.get_issue_title(split[0]),
"affected_sessions": affected_sessions[issue],
"unaffected_sessions": session_counts[1] - affected_sessions[issue],
"type": all_issues[issue_id]["issue_type"],
"title": helper.get_issue_title(all_issues[issue_id]["issue_type"]),
"affected_sessions": affected_sessions[issue_id],
"unaffected_sessions": session_counts[1] - affected_sessions[issue_id],
"lost_conversions": lost_conversions,
"affected_users": affected_users_dict[issue],
"affected_users": affected_users_dict[issue_id],
"conversion_impact": round(r * 100),
"context_string": contexts[issue]["context"],
"issue_id": contexts[issue]["id"]
"context_string": all_issues[issue_id]["context"],
"issue_id": issue_id
})
if is_sign:
n_critical_issues += n_issues_dict[issue]
n_critical_issues += n_issues_dict[issue_id]
return n_critical_issues, issues_dict, total_drop_due_to_issues

View file

@ -56,6 +56,14 @@ def sessions_search(projectId: int, data: schemas.FlatSessionsSearchPayloadSchem
return {'data': data}
@app.post('/{projectId}/sessions/search/ids', tags=["sessions"])
@app.post('/{projectId}/sessions/search2/ids', tags=["sessions"])
def session_ids_search(projectId: int, data: schemas.FlatSessionsSearchPayloadSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
data = sessions.search_sessions(data=data, project_id=projectId, user_id=context.user_id, ids_only=True)
return {'data': data}
@app.get('/{projectId}/events/search', tags=["events"])
def events_search(projectId: int, q: str,
type: Union[schemas.FilterType, schemas.EventType,

View file

@ -107,8 +107,7 @@ def get_by_id2_pg(project_id, session_id, context: schemas_ee.CurrentContext, fu
session_id=session_id, user_id=context.user_id)
data['metadata'] = __group_metadata(project_metadata=data.pop("projectMetadata"), session=data)
data['issues'] = issues.get_by_session_id(session_id=session_id, project_id=project_id)
data['live'] = live and assist.is_live(project_id=project_id,
session_id=session_id,
data['live'] = live and assist.is_live(project_id=project_id, session_id=session_id,
project_key=data["projectKey"])
data["inDB"] = True
return data
@ -181,7 +180,7 @@ def _isUndefined_operator(op: schemas.SearchEventOperator):
# This function executes the query and return result
def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_id, errors_only=False,
error_status=schemas.ErrorStatus.all, count_only=False, issue=None):
error_status=schemas.ErrorStatus.all, count_only=False, issue=None, ids_only=False):
if data.bookmarked:
data.startDate, data.endDate = sessions_favorite.get_start_end_timestamp(project_id, user_id)
@ -189,9 +188,11 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_
favorite_only=data.bookmarked, issue=issue, project_id=project_id,
user_id=user_id)
if data.limit is not None and data.page is not None:
full_args["sessions_limit"] = data.limit
full_args["sessions_limit_s"] = (data.page - 1) * data.limit
full_args["sessions_limit_e"] = data.page * data.limit
else:
full_args["sessions_limit"] = 200
full_args["sessions_limit_s"] = 1
full_args["sessions_limit_e"] = 200
@ -247,17 +248,24 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_
# sort += " " + data.order + "," + helper.key_to_snake_case(data.sort)
sort = helper.key_to_snake_case(data.sort)
meta_keys = metadata.get(project_id=project_id)
main_query = cur.mogrify(f"""SELECT COUNT(full_sessions) AS count,
COALESCE(JSONB_AGG(full_sessions)
FILTER (WHERE rn>%(sessions_limit_s)s AND rn<=%(sessions_limit_e)s), '[]'::JSONB) AS sessions
FROM (SELECT *, ROW_NUMBER() OVER (ORDER BY {sort} {data.order}, issue_score DESC) AS rn
FROM (SELECT DISTINCT ON(s.session_id) {SESSION_PROJECTION_COLS}
{"," if len(meta_keys) > 0 else ""}{",".join([f'metadata_{m["index"]}' for m in meta_keys])}
{query_part}
ORDER BY s.session_id desc) AS filtred_sessions
ORDER BY {sort} {data.order}, issue_score DESC) AS full_sessions;""",
full_args)
if ids_only:
main_query = cur.mogrify(f"""SELECT DISTINCT ON(s.session_id) s.session_id
{query_part}
ORDER BY s.session_id desc
LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s;""",
full_args)
else:
meta_keys = metadata.get(project_id=project_id)
main_query = cur.mogrify(f"""SELECT COUNT(full_sessions) AS count,
COALESCE(JSONB_AGG(full_sessions)
FILTER (WHERE rn>%(sessions_limit_s)s AND rn<=%(sessions_limit_e)s), '[]'::JSONB) AS sessions
FROM (SELECT *, ROW_NUMBER() OVER (ORDER BY {sort} {data.order}, issue_score DESC) AS rn
FROM (SELECT DISTINCT ON(s.session_id) {SESSION_PROJECTION_COLS}
{"," if len(meta_keys) > 0 else ""}{",".join([f'metadata_{m["index"]}' for m in meta_keys])}
{query_part}
ORDER BY s.session_id desc) AS filtred_sessions
ORDER BY {sort} {data.order}, issue_score DESC) AS full_sessions;""",
full_args)
# print("--------------------")
# print(main_query)
# print("--------------------")
@ -270,7 +278,7 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_
print(data.json())
print("--------------------")
raise err
if errors_only:
if errors_only or ids_only:
return helper.list_to_camel_case(cur.fetchall())
sessions = cur.fetchone()

View file

@ -188,9 +188,7 @@ def get_stages_and_events(filter_d, project_id) -> List[RealDictRow]:
values=s["value"], value_key=f"value{i + 1}")
n_stages_query.append(f"""
(SELECT main.session_id,
{"MIN(main.timestamp)" if i + 1 < len(stages) else "MAX(main.timestamp)"} AS stage{i + 1}_timestamp,
'{event_type}' AS type,
'{s["operator"]}' AS operator
{"MIN(main.timestamp)" if i + 1 < len(stages) else "MAX(main.timestamp)"} AS stage{i + 1}_timestamp
FROM {next_table} AS main {" ".join(extra_from)}
WHERE main.timestamp >= {f"T{i}.stage{i}_timestamp" if i > 0 else "%(startTimestamp)s"}
{f"AND main.session_id=T1.session_id" if i > 0 else ""}
@ -198,45 +196,54 @@ def get_stages_and_events(filter_d, project_id) -> List[RealDictRow]:
{(" AND " + " AND ".join(stage_constraints)) if len(stage_constraints) > 0 else ""}
{(" AND " + " AND ".join(first_stage_extra_constraints)) if len(first_stage_extra_constraints) > 0 and i == 0 else ""}
GROUP BY main.session_id)
AS T{i + 1} {"USING (session_id)" if i > 0 else ""}
AS T{i + 1} {"ON (TRUE)" if i > 0 else ""}
""")
if len(n_stages_query) == 0:
n_stages=len(n_stages_query)
if n_stages == 0:
return []
n_stages_query = " LEFT JOIN LATERAL ".join(n_stages_query)
n_stages_query += ") AS stages_t"
n_stages_query = f"""
SELECT stages_and_issues_t.*,sessions.session_id, sessions.user_uuid FROM (
SELECT stages_and_issues_t.*, sessions.user_uuid
FROM (
SELECT * FROM (
SELECT * FROM
{n_stages_query}
SELECT T1.session_id, {",".join([f"stage{i + 1}_timestamp" for i in range(n_stages)])}
FROM {n_stages_query}
LEFT JOIN LATERAL
(
SELECT * FROM
(SELECT ISE.session_id,
ISS.type as issue_type,
( SELECT ISS.type as issue_type,
ISE.timestamp AS issue_timestamp,
ISS.context_string as issue_context,
COALESCE(ISS.context_string,'') as issue_context,
ISS.issue_id as issue_id
FROM events_common.issues AS ISE INNER JOIN issues AS ISS USING (issue_id)
WHERE ISE.timestamp >= stages_t.stage1_timestamp
AND ISE.timestamp <= stages_t.stage{i + 1}_timestamp
AND ISS.project_id=%(project_id)s
{"AND ISS.type IN %(issueTypes)s" if len(filter_issues) > 0 else ""}) AS base_t
) AS issues_t
USING (session_id)) AS stages_and_issues_t
inner join sessions USING(session_id);
AND ISE.session_id = stages_t.session_id
{"AND ISS.type IN %(issueTypes)s" if len(filter_issues) > 0 else ""}
LIMIT 20 -- remove the limit to get exact stats
) AS issues_t ON (TRUE)
) AS stages_and_issues_t INNER JOIN sessions USING(session_id);
"""
# LIMIT 10000
params = {"project_id": project_id, "startTimestamp": filter_d["startDate"], "endTimestamp": filter_d["endDate"],
"issueTypes": tuple(filter_issues), **values}
with pg_client.PostgresClient() as cur:
query = cur.mogrify(n_stages_query, params)
# print("---------------------------------------------------")
# print(cur.mogrify(n_stages_query, params))
# print(query)
# print("---------------------------------------------------")
cur.execute(cur.mogrify(n_stages_query, params))
rows = cur.fetchall()
try:
cur.execute(query)
rows = cur.fetchall()
except Exception as err:
print("--------- FUNNEL SEARCH QUERY EXCEPTION -----------")
print(query.decode('UTF-8'))
print("--------- PAYLOAD -----------")
print(filter_d)
print("--------------------")
raise err
return rows
@ -298,7 +305,21 @@ def pearson_corr(x: list, y: list):
return r, confidence, False
def get_transitions_and_issues_of_each_type(rows: List[RealDictRow], all_issues_with_context, first_stage, last_stage):
# def tuple_or(t: tuple):
# x = 0
# for el in t:
# x |= el # | is for bitwise OR
# return x
#
# The following function is correct optimization of the previous function because t is a list of 0,1
def tuple_or(t: tuple):
for el in t:
if el > 0:
return 1
return 0
def get_transitions_and_issues_of_each_type(rows: List[RealDictRow], all_issues, first_stage, last_stage):
"""
Returns two lists with binary values 0/1:
@ -317,12 +338,6 @@ def get_transitions_and_issues_of_each_type(rows: List[RealDictRow], all_issues_
transitions = []
n_sess_affected = 0
errors = {}
for issue in all_issues_with_context:
split = issue.split('__^__')
errors[issue] = {
"errors": [],
"issue_type": split[0],
"context": split[1]}
for row in rows:
t = 0
@ -330,38 +345,26 @@ def get_transitions_and_issues_of_each_type(rows: List[RealDictRow], all_issues_
last_ts = row[f'stage{last_stage}_timestamp']
if first_ts is None:
continue
elif first_ts is not None and last_ts is not None:
elif last_ts is not None:
t = 1
transitions.append(t)
ic_present = False
for issue_type_with_context in errors:
for error_id in all_issues:
if error_id not in errors:
errors[error_id] = []
ic = 0
issue_type = errors[issue_type_with_context]["issue_type"]
context = errors[issue_type_with_context]["context"]
if row['issue_type'] is not None:
row_issue_id=row['issue_id']
if row_issue_id is not None:
if last_ts is None or (first_ts < row['issue_timestamp'] < last_ts):
context_in_row = row['issue_context'] if row['issue_context'] is not None else ''
if issue_type == row['issue_type'] and context == context_in_row:
if error_id == row_issue_id:
ic = 1
ic_present = True
errors[issue_type_with_context]["errors"].append(ic)
errors[error_id].append(ic)
if ic_present and t:
n_sess_affected += 1
# def tuple_or(t: tuple):
# x = 0
# for el in t:
# x |= el
# return x
def tuple_or(t: tuple):
for el in t:
if el > 0:
return 1
return 0
errors = {key: errors[key]["errors"] for key in errors}
all_errors = [tuple_or(t) for t in zip(*errors.values())]
return transitions, errors, all_errors, n_sess_affected
@ -377,10 +380,9 @@ def get_affected_users_for_all_issues(rows, first_stage, last_stage):
"""
affected_users = defaultdict(lambda: set())
affected_sessions = defaultdict(lambda: set())
contexts = defaultdict(lambda: None)
all_issues = {}
n_affected_users_dict = defaultdict(lambda: None)
n_affected_sessions_dict = defaultdict(lambda: None)
all_issues_with_context = set()
n_issues_dict = defaultdict(lambda: 0)
issues_by_session = defaultdict(lambda: 0)
@ -396,15 +398,13 @@ def get_affected_users_for_all_issues(rows, first_stage, last_stage):
# check that the issue exists and belongs to subfunnel:
if iss is not None and (row[f'stage{last_stage}_timestamp'] is None or
(row[f'stage{first_stage}_timestamp'] < iss_ts < row[f'stage{last_stage}_timestamp'])):
context_string = row['issue_context'] if row['issue_context'] is not None else ''
issue_with_context = iss + '__^__' + context_string
contexts[issue_with_context] = {"context": context_string, "id": row["issue_id"]}
all_issues_with_context.add(issue_with_context)
n_issues_dict[issue_with_context] += 1
if row["issue_id"] not in all_issues:
all_issues[row["issue_id"]] = {"context": row['issue_context'], "issue_type": row["issue_type"]}
n_issues_dict[row["issue_id"]] += 1
if row['user_uuid'] is not None:
affected_users[issue_with_context].add(row['user_uuid'])
affected_users[row["issue_id"]].add(row['user_uuid'])
affected_sessions[issue_with_context].add(row['session_id'])
affected_sessions[row["issue_id"]].add(row['session_id'])
issues_by_session[row[f'session_id']] += 1
if len(affected_users) > 0:
@ -415,29 +415,28 @@ def get_affected_users_for_all_issues(rows, first_stage, last_stage):
n_affected_sessions_dict.update({
iss: len(affected_sessions[iss]) for iss in affected_sessions
})
return all_issues_with_context, n_issues_dict, n_affected_users_dict, n_affected_sessions_dict, contexts
return all_issues, n_issues_dict, n_affected_users_dict, n_affected_sessions_dict
def count_sessions(rows, n_stages):
session_counts = {i: set() for i in range(1, n_stages + 1)}
for ind, row in enumerate(rows):
for row in rows:
for i in range(1, n_stages + 1):
if row[f"stage{i}_timestamp"] is not None:
session_counts[i].add(row[f"session_id"])
session_counts = {i: len(session_counts[i]) for i in session_counts}
return session_counts
def count_users(rows, n_stages):
users_in_stages = defaultdict(lambda: set())
for ind, row in enumerate(rows):
users_in_stages = {i: set() for i in range(1, n_stages + 1)}
for row in rows:
for i in range(1, n_stages + 1):
if row[f"stage{i}_timestamp"] is not None:
users_in_stages[i].add(row["user_uuid"])
users_count = {i: len(users_in_stages[i]) for i in range(1, n_stages + 1)}
return users_count
@ -490,18 +489,18 @@ def get_issues(stages, rows, first_stage=None, last_stage=None, drop_only=False)
last_stage = n_stages
n_critical_issues = 0
issues_dict = dict({"significant": [],
"insignificant": []})
issues_dict = {"significant": [],
"insignificant": []}
session_counts = count_sessions(rows, n_stages)
drop = session_counts[first_stage] - session_counts[last_stage]
all_issues_with_context, n_issues_dict, affected_users_dict, affected_sessions, contexts = get_affected_users_for_all_issues(
all_issues, n_issues_dict, affected_users_dict, affected_sessions = get_affected_users_for_all_issues(
rows, first_stage, last_stage)
transitions, errors, all_errors, n_sess_affected = get_transitions_and_issues_of_each_type(rows,
all_issues_with_context,
all_issues,
first_stage, last_stage)
# print("len(transitions) =", len(transitions))
del rows
if any(all_errors):
total_drop_corr, conf, is_sign = pearson_corr(transitions, all_errors)
@ -514,33 +513,32 @@ def get_issues(stages, rows, first_stage=None, last_stage=None, drop_only=False)
if drop_only:
return total_drop_due_to_issues
for issue in all_issues_with_context:
for issue_id in all_issues:
if not any(errors[issue]):
if not any(errors[issue_id]):
continue
r, confidence, is_sign = pearson_corr(transitions, errors[issue])
r, confidence, is_sign = pearson_corr(transitions, errors[issue_id])
if r is not None and drop is not None and is_sign:
lost_conversions = int(r * affected_sessions[issue])
lost_conversions = int(r * affected_sessions[issue_id])
else:
lost_conversions = None
if r is None:
r = 0
split = issue.split('__^__')
issues_dict['significant' if is_sign else 'insignificant'].append({
"type": split[0],
"title": helper.get_issue_title(split[0]),
"affected_sessions": affected_sessions[issue],
"unaffected_sessions": session_counts[1] - affected_sessions[issue],
"type": all_issues[issue_id]["issue_type"],
"title": helper.get_issue_title(all_issues[issue_id]["issue_type"]),
"affected_sessions": affected_sessions[issue_id],
"unaffected_sessions": session_counts[1] - affected_sessions[issue_id],
"lost_conversions": lost_conversions,
"affected_users": affected_users_dict[issue],
"affected_users": affected_users_dict[issue_id],
"conversion_impact": round(r * 100),
"context_string": contexts[issue]["context"],
"issue_id": contexts[issue]["id"]
"context_string": all_issues[issue_id]["context"],
"issue_id": issue_id
})
if is_sign:
n_critical_issues += n_issues_dict[issue]
n_critical_issues += n_issues_dict[issue_id]
return n_critical_issues, issues_dict, total_drop_due_to_issues