openreplay/api/chalicelib/utils/helper.py
Taha Yassine Kraiem ed39bbf1d4 fix(chalice): fixed missing timestamp in sessions replay
fix(chalice): fixed nested custom events in session replay
fix(chalice): fixed issues events in session replay
2025-05-27 12:22:36 +02:00

338 lines
11 KiB
Python

import math
import random
import re
import string
from typing import Union
from urllib.parse import urlparse
from decouple import config
import schemas
from chalicelib.utils.TimeUTC import TimeUTC
def random_string(length=36):
return "".join(random.choices(string.hexdigits, k=length))
def list_to_camel_case(items: list[dict], flatten: bool = False, ignore_keys=[]) -> list[dict]:
for i in range(len(items)):
if flatten:
items[i] = flatten_nested_dicts(items[i])
items[i] = dict_to_camel_case(items[i], ignore_keys=[])
return items
def dict_to_camel_case(variable, delimiter='_', ignore_keys=[]):
if variable is None:
return None
if isinstance(variable, str):
return variable
elif isinstance(variable, dict):
aux = {}
for key in variable.keys():
if key in ignore_keys:
aux[key] = variable[key]
elif isinstance(variable[key], dict):
aux[key_to_camel_case(key, delimiter)] = dict_to_camel_case(variable[key])
elif isinstance(variable[key], list):
aux[key_to_camel_case(key, delimiter)] = list_to_camel_case(variable[key])
else:
aux[key_to_camel_case(key, delimiter)] = variable[key]
return aux
else:
return variable
def dict_to_CAPITAL_keys(variable):
if variable is None:
return None
if isinstance(variable, str):
return variable.upper()
elif isinstance(variable, dict):
aux = {}
for key in variable.keys():
if isinstance(variable[key], dict):
aux[key.upper()] = dict_to_CAPITAL_keys(variable[key])
else:
aux[key.upper()] = variable[key]
return aux
else:
return variable
def variable_to_snake_case(variable, delimiter='_', split_number=False):
if isinstance(variable, str):
return key_to_snake_case(variable, delimiter, split_number)
elif isinstance(variable, dict):
aux = {}
for key in variable.keys():
if isinstance(variable[key], dict):
aux[key_to_snake_case(key, delimiter, split_number)] = variable_to_snake_case(variable[key], delimiter,
split_number)
else:
aux[key_to_snake_case(key, delimiter, split_number)] = variable[key]
return aux
else:
return variable
def key_to_camel_case(snake_str, delimiter='_'):
if snake_str.startswith(delimiter):
snake_str = snake_str[1:]
components = snake_str.split(delimiter)
return components[0] + ''.join(x.title() for x in components[1:])
def key_to_snake_case(name, delimiter='_', split_number=False):
s1 = re.sub('(.)([A-Z][a-z]+)', fr'\1{delimiter}\2', name)
return re.sub('([a-z])([A-Z0-9])' if split_number else '([a-z0-9])([A-Z])', fr'\1{delimiter}\2', s1).lower()
TRACK_TIME = True
def allow_captcha():
return config("captcha_server", default=None) is not None and config("captcha_key", default=None) is not None \
and len(config("captcha_server")) > 0 and len(config("captcha_key")) > 0
def string_to_sql_like(value):
if value is None:
return None
value = re.sub(' +', ' ', value)
value = value.replace("*", "%")
if value.startswith("^"):
value = value[1:]
elif not value.startswith("%"):
value = '%' + value
if value.endswith("$"):
value = value[:-1]
elif not value.endswith("%"):
value = value + '%'
# value = value.replace(" ", "%")
return value
def string_to_sql_like_with_op(value, op):
if isinstance(value, list):
r = []
for v in value:
r.append(string_to_sql_like_with_op(v, op))
return r
else:
_value = value
if _value is None:
return _value
if op.upper() != 'ILIKE':
return _value.replace("%", "%%")
_value = _value.replace("*", "%")
if _value.startswith("^"):
_value = _value[1:]
elif not _value.startswith("%"):
_value = '%' + _value
if _value.endswith("$"):
_value = _value[:-1]
elif not _value.endswith("%"):
_value = _value + '%'
return _value.replace("%", "%%")
likable_operators = [schemas.SearchEventOperator.STARTS_WITH, schemas.SearchEventOperator.ENDS_WITH,
schemas.SearchEventOperator.CONTAINS, schemas.SearchEventOperator.NOT_CONTAINS,
schemas.ClickEventExtraOperator.STARTS_WITH, schemas.ClickEventExtraOperator.ENDS_WITH,
schemas.ClickEventExtraOperator.CONTAINS, schemas.ClickEventExtraOperator.NOT_CONTAINS]
def is_likable(op: Union[schemas.SearchEventOperator, schemas.ClickEventExtraOperator]):
return op in likable_operators
def values_for_operator(value: Union[str, list],
op: Union[schemas.SearchEventOperator, schemas.ClickEventExtraOperator]):
if not is_likable(op):
return value
if isinstance(value, list):
r = []
for v in value:
r.append(values_for_operator(v, op))
return r
else:
if value is None:
return value
if op in (schemas.SearchEventOperator.STARTS_WITH, schemas.ClickEventExtraOperator.STARTS_WITH):
return f"{value}%"
elif op in (schemas.SearchEventOperator.ENDS_WITH, schemas.ClickEventExtraOperator.ENDS_WITH):
return f"%{value}"
elif op in (schemas.SearchEventOperator.CONTAINS, schemas.SearchEventOperator.NOT_CONTAINS,
schemas.ClickEventExtraOperator.CONTAINS, schemas.ClickEventExtraOperator.NOT_CONTAINS):
return f"%{value}%"
return value
def is_alphabet_space_dash(word):
r = re.compile("^[a-zA-Z -]*$")
return r.match(word) is not None
def merge_lists_by_key(l1, l2, key):
merged = {}
for item in l1 + l2:
if item[key] in merged:
merged[item[key]].update(item)
else:
merged[item[key]] = item
return [val for (_, val) in merged.items()]
def flatten_nested_dicts(obj):
if obj is None:
return None
result = {}
for key in obj.keys():
if isinstance(obj[key], dict):
result = {**result, **flatten_nested_dicts(obj[key])}
else:
result[key] = obj[key]
return result
def delete_keys_from_dict(d, to_delete):
if isinstance(to_delete, str):
to_delete = [to_delete]
if isinstance(d, dict):
for single_to_delete in set(to_delete):
if single_to_delete in d:
del d[single_to_delete]
for k, v in d.items():
delete_keys_from_dict(v, to_delete)
elif isinstance(d, list):
for i in d:
delete_keys_from_dict(i, to_delete)
return d
def explode_widget(data, key=None):
result = []
for k in data.keys():
if k.endswith("Progress") or k == "chart":
continue
result.append({"key": key_to_snake_case(k) if key is None else key, "data": {"value": data[k]}})
if k + "Progress" in data:
result[-1]["data"]["progress"] = data[k + "Progress"]
if "chart" in data:
result[-1]["data"]["chart"] = []
for c in data["chart"]:
result[-1]["data"]["chart"].append({"timestamp": c["timestamp"], "value": c[k]})
return result
def get_issue_title(issue_type):
return {'click_rage': "Click Rage",
'dead_click': "Dead Click",
'excessive_scrolling': "Excessive Scrolling",
'bad_request': "Bad Request",
'missing_resource': "Missing Image",
'memory': "High Memory Usage",
'cpu': "High CPU",
'slow_resource': "Slow Resource",
'slow_page_load': "Slow Page",
'crash': "Crash",
'ml_cpu': "High CPU",
'ml_memory': "High Memory Usage",
'ml_dead_click': "Dead Click",
'ml_click_rage': "Click Rage",
'ml_mouse_thrashing': "Mouse Thrashing",
'ml_excessive_scrolling': "Excessive Scrolling",
'ml_slow_resources': "Slow Resource",
'custom': "Custom Event",
'js_exception': "Error",
'custom_event_error': "Custom Error",
'js_error': "Error",
"mouse_thrashing": "Mouse Thrashing"}.get(issue_type, issue_type)
def __progress(old_val, new_val):
return ((old_val - new_val) / new_val) * 100 if new_val > 0 else 0 if old_val == 0 else 100
def __decimal_limit(value, limit):
factor = pow(10, limit)
value = math.floor(value * factor)
if value % factor == 0:
return value // factor
return value / factor
def old_search_payload_to_flat(values):
# in case the old search body was passed
if values.get("events") is not None:
for v in values["events"]:
v["isEvent"] = True
for v in values.get("filters", []):
v["isEvent"] = False
values["filters"] = values.pop("events") + values.get("filters", [])
return values
def custom_alert_to_front(values):
# to support frontend format for payload
if values.get("seriesId") is not None and values["query"]["left"] == schemas.AlertColumn.CUSTOM:
values["query"]["left"] = values["seriesId"]
values["seriesId"] = None
return values
def __time_value(row):
row["unit"] = schemas.TemplatePredefinedUnits.MILLISECOND
factor = 1
if row["value"] > TimeUTC.MS_MINUTE:
row["value"] = row["value"] / TimeUTC.MS_MINUTE
row["unit"] = schemas.TemplatePredefinedUnits.MINUTE
factor = TimeUTC.MS_MINUTE
elif row["value"] > 1 * 1000:
row["value"] = row["value"] / 1000
row["unit"] = schemas.TemplatePredefinedUnits.SECOND
factor = 1000
if "chart" in row and factor > 1:
for r in row["chart"]:
r["value"] /= factor
def is_saml2_available():
return config("hastSAML2", default=False, cast=bool)
def get_domain():
_url = config("SITE_URL")
if not _url.startswith("http"):
_url = "http://" + _url
return '.'.join(urlparse(_url).netloc.split(".")[-2:])
def obfuscate(text, keep_last: int = 4):
if text is None or not isinstance(text, str):
return text
if len(text) <= keep_last:
return "*" * len(text)
return "*" * (len(text) - keep_last) + text[-keep_last:]
def cast_session_id_to_string(data):
if not isinstance(data, dict) and not isinstance(data, list):
return data
if isinstance(data, list):
for i, item in enumerate(data):
data[i] = cast_session_id_to_string(item)
elif isinstance(data, dict):
keys = data.keys()
if "sessionId" in keys:
data["sessionId"] = str(data["sessionId"])
else:
for key in keys:
data[key] = cast_session_id_to_string(data[key])
return data