diff --git a/.github/workflows/workers-ee.yaml b/.github/workflows/workers-ee.yaml index 35580b5a9..e434d2716 100644 --- a/.github/workflows/workers-ee.yaml +++ b/.github/workflows/workers-ee.yaml @@ -86,7 +86,11 @@ jobs: ;; esac - [[ $(cat /tmp/images_to_build.txt) != "" ]] || (echo "Nothing to build here"; exit 1) + if [[ $(cat /tmp/images_to_build.txt) == "" ]]; then + echo "Nothing to build here" + touch /tmp/nothing-to-build-here + exit 0 + fi # # Pushing image to registry # @@ -94,7 +98,7 @@ jobs: for image in $(cat /tmp/images_to_build.txt); do echo "Bulding $image" - PUSH_IMAGE=0 bash -x ./build.sh skip $image + PUSH_IMAGE=0 bash -x ./build.sh ee $image [[ "x$skip_security_checks" == "xtrue" ]] || { curl -L https://github.com/aquasecurity/trivy/releases/download/v0.34.0/trivy_0.34.0_Linux-64bit.tar.gz | tar -xzf - -C ./ ./trivy image --exit-code 1 --vuln-type os,library --severity "HIGH,CRITICAL" --ignore-unfixed $DOCKER_REPO/$image:$IMAGE_TAG @@ -105,7 +109,7 @@ jobs: } && { echo "Skipping Security Checks" } - PUSH_IMAGE=1 bash -x ./build.sh skip $image + PUSH_IMAGE=1 bash -x ./build.sh ee $image echo "::set-output name=image::$DOCKER_REPO/$image:$IMAGE_TAG" done @@ -118,6 +122,7 @@ jobs: # Deploying image to environment. # set -x + [[ -f /tmp/nothing-to-build-here ]] && exit 0 cd scripts/helmcharts/ ## Update secerts diff --git a/.github/workflows/workers.yaml b/.github/workflows/workers.yaml index 341a196ad..e222e00fb 100644 --- a/.github/workflows/workers.yaml +++ b/.github/workflows/workers.yaml @@ -86,7 +86,11 @@ jobs: ;; esac - [[ $(cat /tmp/images_to_build.txt) != "" ]] || (echo "Nothing to build here"; exit 1) + if [[ $(cat /tmp/images_to_build.txt) == "" ]]; then + echo "Nothing to build here" + touch /tmp/nothing-to-build-here + exit 0 + fi # # Pushing image to registry # @@ -116,6 +120,8 @@ jobs: # # Deploying image to environment. # + set -x + [[ -f /tmp/nothing-to-build-here ]] && exit 0 cd scripts/helmcharts/ ## Update secerts diff --git a/api/chalicelib/core/alerts_processor.py b/api/chalicelib/core/alerts_processor.py index 2ed9105b2..76ae5c615 100644 --- a/api/chalicelib/core/alerts_processor.py +++ b/api/chalicelib/core/alerts_processor.py @@ -199,7 +199,8 @@ def process(): logging.info(f"Valid alert, notifying users, alertId:{alert['alertId']} name: {alert['name']}") notifications.append(generate_notification(alert, result)) except Exception as e: - logging.error(f"!!!Error while running alert query for alertId:{alert['alertId']} name: {alert['name']}") + logging.error( + f"!!!Error while running alert query for alertId:{alert['alertId']} name: {alert['name']}") logging.error(query) logging.error(e) cur = cur.recreate(rollback=True) @@ -212,12 +213,22 @@ def process(): alerts.process_notifications(notifications) +def __format_value(x): + if x % 1 == 0: + x = int(x) + else: + x = round(x, 2) + return f"{x:,}" + + def generate_notification(alert, result): + left = __format_value(result['value']) + right = __format_value(alert['query']['right']) return { "alertId": alert["alertId"], "tenantId": alert["tenantId"], "title": alert["name"], - "description": f"has been triggered, {alert['query']['left']} = {round(result['value'], 2)} ({alert['query']['operator']} {alert['query']['right']}).", + "description": f"has been triggered, {alert['query']['left']} = {left} ({alert['query']['operator']} {right}).", "buttonText": "Check metrics for more details", "buttonUrl": f"/{alert['projectId']}/metrics", "imageUrl": None, diff --git a/api/chalicelib/core/metrics.py b/api/chalicelib/core/metrics.py index bf388c093..b25b441ed 100644 --- a/api/chalicelib/core/metrics.py +++ b/api/chalicelib/core/metrics.py @@ -419,7 +419,7 @@ def get_slowest_images(project_id, startTimestamp=TimeUTC.now(delta_days=-1), pg_sub_query_chart = __get_constraints(project_id=project_id, time_constraint=True, chart=True, data=args) pg_sub_query_chart.append("resources.type = 'img'") - pg_sub_query_chart.append("resources.url = top_img.url") + pg_sub_query_chart.append("resources.url_hostpath = top_img.url_hostpath") pg_sub_query_subset = __get_constraints(project_id=project_id, time_constraint=True, chart=False, data=args) @@ -431,13 +431,13 @@ def get_slowest_images(project_id, startTimestamp=TimeUTC.now(delta_days=-1), with pg_client.PostgresClient() as cur: pg_query = f"""SELECT * - FROM (SELECT resources.url, + FROM (SELECT resources.url_hostpath, COALESCE(AVG(resources.duration), 0) AS avg_duration, COUNT(resources.session_id) AS sessions_count FROM events.resources INNER JOIN sessions USING (session_id) WHERE {" AND ".join(pg_sub_query_subset)} - GROUP BY resources.url + GROUP BY resources.url_hostpath ORDER BY avg_duration DESC LIMIT 10) AS top_img LEFT JOIN LATERAL ( @@ -485,13 +485,13 @@ def get_performance(project_id, startTimestamp=TimeUTC.now(delta_days=-1), endTi if resources and len(resources) > 0: for r in resources: if r["type"] == "IMG": - img_constraints.append(f"resources.url = %(val_{len(img_constraints)})s") + img_constraints.append(f"resources.url_hostpath = %(val_{len(img_constraints)})s") img_constraints_vals["val_" + str(len(img_constraints) - 1)] = r['value'] elif r["type"] == "LOCATION": location_constraints.append(f"pages.path = %(val_{len(location_constraints)})s") location_constraints_vals["val_" + str(len(location_constraints) - 1)] = r['value'] else: - request_constraints.append(f"resources.url = %(val_{len(request_constraints)})s") + request_constraints.append(f"resources.url_hostpath = %(val_{len(request_constraints)})s") request_constraints_vals["val_" + str(len(request_constraints) - 1)] = r['value'] params = {"step_size": step_size, "project_id": project_id, "startTimestamp": startTimestamp, "endTimestamp": endTimestamp} @@ -627,12 +627,12 @@ def search(text, resource_type, project_id, performance=False, pages_only=False, pg_sub_query.append("url_hostpath ILIKE %(value)s") with pg_client.PostgresClient() as cur: pg_query = f"""SELECT key, value - FROM ( SELECT DISTINCT ON (url) ROW_NUMBER() OVER (PARTITION BY type ORDER BY url) AS r, - url AS value, + FROM ( SELECT DISTINCT ON (url_hostpath) ROW_NUMBER() OVER (PARTITION BY type ORDER BY url_hostpath) AS r, + url_hostpath AS value, type AS key FROM events.resources INNER JOIN public.sessions USING (session_id) WHERE {" AND ".join(pg_sub_query)} - ORDER BY url, type ASC) AS ranked_values + ORDER BY url_hostpath, type ASC) AS ranked_values WHERE ranked_values.r<=5;""" cur.execute(cur.mogrify(pg_query, {"project_id": project_id, "value": helper.string_to_sql_like(text)})) rows = cur.fetchall() @@ -893,7 +893,7 @@ def get_resources_loading_time(project_id, startTimestamp=TimeUTC.now(delta_days if type is not None: pg_sub_query_subset.append(f"resources.type = '{__get_resource_db_type_from_type(type)}'") if url is not None: - pg_sub_query_subset.append(f"resources.url = %(value)s") + pg_sub_query_subset.append(f"resources.url_hostpath = %(value)s") with pg_client.PostgresClient() as cur: pg_query = f"""WITH resources AS (SELECT resources.duration, timestamp @@ -1009,7 +1009,7 @@ def get_slowest_resources(project_id, startTimestamp=TimeUTC.now(delta_days=-1), ORDER BY avg DESC LIMIT 10) AS main_list INNER JOIN LATERAL ( - SELECT url, type + SELECT url_hostpath AS url, type FROM events.resources INNER JOIN public.sessions USING (session_id) WHERE {" AND ".join(pg_sub_query)} diff --git a/api/chalicelib/core/sessions.py b/api/chalicelib/core/sessions.py index fcea8621d..91efb967f 100644 --- a/api/chalicelib/core/sessions.py +++ b/api/chalicelib/core/sessions.py @@ -177,7 +177,7 @@ def _isUndefined_operator(op: schemas.SearchEventOperator): # This function executes the query and return result def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_id, errors_only=False, - error_status=schemas.ErrorStatus.all, count_only=False, issue=None): + error_status=schemas.ErrorStatus.all, count_only=False, issue=None, ids_only=False): if data.bookmarked: data.startDate, data.endDate = sessions_favorite.get_start_end_timestamp(project_id, user_id) @@ -185,9 +185,11 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_ favorite_only=data.bookmarked, issue=issue, project_id=project_id, user_id=user_id) if data.limit is not None and data.page is not None: + full_args["sessions_limit"] = data.limit full_args["sessions_limit_s"] = (data.page - 1) * data.limit full_args["sessions_limit_e"] = data.page * data.limit else: + full_args["sessions_limit"] = 200 full_args["sessions_limit_s"] = 1 full_args["sessions_limit_e"] = 200 @@ -235,6 +237,12 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_ GROUP BY user_id ) AS users_sessions;""", full_args) + elif ids_only: + main_query = cur.mogrify(f"""SELECT DISTINCT ON(s.session_id) s.session_id + {query_part} + ORDER BY s.session_id desc + LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s;""", + full_args) else: if data.order is None: data.order = schemas.SortOrderType.desc @@ -242,7 +250,6 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_ if data.sort is not None and data.sort != "session_id": # sort += " " + data.order + "," + helper.key_to_snake_case(data.sort) sort = helper.key_to_snake_case(data.sort) - meta_keys = metadata.get(project_id=project_id) main_query = cur.mogrify(f"""SELECT COUNT(full_sessions) AS count, COALESCE(JSONB_AGG(full_sessions) @@ -266,7 +273,7 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_ print(data.json()) print("--------------------") raise err - if errors_only: + if errors_only or ids_only: return helper.list_to_camel_case(cur.fetchall()) sessions = cur.fetchone() diff --git a/api/chalicelib/core/sessions_devtool.py b/api/chalicelib/core/sessions_devtool.py index eef7b8e6b..2afc3c366 100644 --- a/api/chalicelib/core/sessions_devtool.py +++ b/api/chalicelib/core/sessions_devtool.py @@ -1,6 +1,6 @@ from decouple import config -from chalicelib.utils.s3 import client +from chalicelib.utils import s3 def __get_devtools_keys(project_id, session_id): @@ -16,7 +16,7 @@ def __get_devtools_keys(project_id, session_id): def get_urls(session_id, project_id): results = [] for k in __get_devtools_keys(project_id=project_id, session_id=session_id): - results.append(client.generate_presigned_url( + results.append(s3.client.generate_presigned_url( 'get_object', Params={'Bucket': config("sessions_bucket"), 'Key': k}, ExpiresIn=config("PRESIGNED_URL_EXPIRATION", cast=int, default=900) diff --git a/api/chalicelib/core/sessions_mobs.py b/api/chalicelib/core/sessions_mobs.py index 3d966a47c..9a9237be8 100644 --- a/api/chalicelib/core/sessions_mobs.py +++ b/api/chalicelib/core/sessions_mobs.py @@ -1,7 +1,6 @@ from decouple import config from chalicelib.utils import s3 -from chalicelib.utils.s3 import client def __get_mob_keys(project_id, session_id): @@ -15,10 +14,14 @@ def __get_mob_keys(project_id, session_id): ] +def __get_mob_keys_deprecated(session_id): + return [str(session_id), str(session_id) + "e"] + + def get_urls(project_id, session_id): results = [] for k in __get_mob_keys(project_id=project_id, session_id=session_id): - results.append(client.generate_presigned_url( + results.append(s3.client.generate_presigned_url( 'get_object', Params={'Bucket': config("sessions_bucket"), 'Key': k}, ExpiresIn=config("PRESIGNED_URL_EXPIRATION", cast=int, default=900) @@ -27,27 +30,18 @@ def get_urls(project_id, session_id): def get_urls_depercated(session_id): - return [ - client.generate_presigned_url( + results = [] + for k in __get_mob_keys_deprecated(session_id=session_id): + results.append(s3.client.generate_presigned_url( 'get_object', - Params={ - 'Bucket': config("sessions_bucket"), - 'Key': str(session_id) - }, + Params={'Bucket': config("sessions_bucket"), 'Key': k}, ExpiresIn=100000 - ), - client.generate_presigned_url( - 'get_object', - Params={ - 'Bucket': config("sessions_bucket"), - 'Key': str(session_id) + "e" - }, - ExpiresIn=100000 - )] + )) + return results def get_ios(session_id): - return client.generate_presigned_url( + return s3.client.generate_presigned_url( 'get_object', Params={ 'Bucket': config("ios_bucket"), diff --git a/api/chalicelib/core/significance.py b/api/chalicelib/core/significance.py index 2abd87cf7..a38dc82d1 100644 --- a/api/chalicelib/core/significance.py +++ b/api/chalicelib/core/significance.py @@ -181,9 +181,7 @@ def get_stages_and_events(filter_d, project_id) -> List[RealDictRow]: values=s["value"], value_key=f"value{i + 1}") n_stages_query.append(f""" (SELECT main.session_id, - {"MIN(main.timestamp)" if i + 1 < len(stages) else "MAX(main.timestamp)"} AS stage{i + 1}_timestamp, - '{event_type}' AS type, - '{s["operator"]}' AS operator + {"MIN(main.timestamp)" if i + 1 < len(stages) else "MAX(main.timestamp)"} AS stage{i + 1}_timestamp FROM {next_table} AS main {" ".join(extra_from)} WHERE main.timestamp >= {f"T{i}.stage{i}_timestamp" if i > 0 else "%(startTimestamp)s"} {f"AND main.session_id=T1.session_id" if i > 0 else ""} @@ -191,30 +189,33 @@ def get_stages_and_events(filter_d, project_id) -> List[RealDictRow]: {(" AND " + " AND ".join(stage_constraints)) if len(stage_constraints) > 0 else ""} {(" AND " + " AND ".join(first_stage_extra_constraints)) if len(first_stage_extra_constraints) > 0 and i == 0 else ""} GROUP BY main.session_id) - AS T{i + 1} {"USING (session_id)" if i > 0 else ""} + AS T{i + 1} {"ON (TRUE)" if i > 0 else ""} """) - if len(n_stages_query) == 0: + n_stages=len(n_stages_query) + if n_stages == 0: return [] n_stages_query = " LEFT JOIN LATERAL ".join(n_stages_query) n_stages_query += ") AS stages_t" n_stages_query = f""" - SELECT stages_and_issues_t.*, sessions.user_uuid FROM ( + SELECT stages_and_issues_t.*, sessions.user_uuid + FROM ( SELECT * FROM ( - SELECT * FROM - {n_stages_query} + SELECT T1.session_id, {",".join([f"stage{i + 1}_timestamp" for i in range(n_stages)])} + FROM {n_stages_query} LEFT JOIN LATERAL - ( SELECT ISE.session_id, - ISS.type as issue_type, + ( SELECT ISS.type as issue_type, ISE.timestamp AS issue_timestamp, - ISS.context_string as issue_context, + COALESCE(ISS.context_string,'') as issue_context, ISS.issue_id as issue_id FROM events_common.issues AS ISE INNER JOIN issues AS ISS USING (issue_id) WHERE ISE.timestamp >= stages_t.stage1_timestamp AND ISE.timestamp <= stages_t.stage{i + 1}_timestamp AND ISS.project_id=%(project_id)s + AND ISE.session_id = stages_t.session_id {"AND ISS.type IN %(issueTypes)s" if len(filter_issues) > 0 else ""} - ) AS issues_t USING (session_id) + LIMIT 20 -- remove the limit to get exact stats + ) AS issues_t ON (TRUE) ) AS stages_and_issues_t INNER JOIN sessions USING(session_id); """ @@ -297,7 +298,21 @@ def pearson_corr(x: list, y: list): return r, confidence, False -def get_transitions_and_issues_of_each_type(rows: List[RealDictRow], all_issues_with_context, first_stage, last_stage): +# def tuple_or(t: tuple): +# x = 0 +# for el in t: +# x |= el # | is for bitwise OR +# return x +# +# The following function is correct optimization of the previous function because t is a list of 0,1 +def tuple_or(t: tuple): + for el in t: + if el > 0: + return 1 + return 0 + + +def get_transitions_and_issues_of_each_type(rows: List[RealDictRow], all_issues, first_stage, last_stage): """ Returns two lists with binary values 0/1: @@ -316,12 +331,6 @@ def get_transitions_and_issues_of_each_type(rows: List[RealDictRow], all_issues_ transitions = [] n_sess_affected = 0 errors = {} - for issue in all_issues_with_context: - split = issue.split('__^__') - errors[issue] = { - "errors": [], - "issue_type": split[0], - "context": split[1]} for row in rows: t = 0 @@ -329,38 +338,26 @@ def get_transitions_and_issues_of_each_type(rows: List[RealDictRow], all_issues_ last_ts = row[f'stage{last_stage}_timestamp'] if first_ts is None: continue - elif first_ts is not None and last_ts is not None: + elif last_ts is not None: t = 1 transitions.append(t) ic_present = False - for issue_type_with_context in errors: + for error_id in all_issues: + if error_id not in errors: + errors[error_id] = [] ic = 0 - issue_type = errors[issue_type_with_context]["issue_type"] - context = errors[issue_type_with_context]["context"] - if row['issue_type'] is not None: + row_issue_id=row['issue_id'] + if row_issue_id is not None: if last_ts is None or (first_ts < row['issue_timestamp'] < last_ts): - context_in_row = row['issue_context'] if row['issue_context'] is not None else '' - if issue_type == row['issue_type'] and context == context_in_row: + if error_id == row_issue_id: ic = 1 ic_present = True - errors[issue_type_with_context]["errors"].append(ic) + errors[error_id].append(ic) if ic_present and t: n_sess_affected += 1 - # def tuple_or(t: tuple): - # x = 0 - # for el in t: - # x |= el - # return x - def tuple_or(t: tuple): - for el in t: - if el > 0: - return 1 - return 0 - - errors = {key: errors[key]["errors"] for key in errors} all_errors = [tuple_or(t) for t in zip(*errors.values())] return transitions, errors, all_errors, n_sess_affected @@ -376,10 +373,9 @@ def get_affected_users_for_all_issues(rows, first_stage, last_stage): """ affected_users = defaultdict(lambda: set()) affected_sessions = defaultdict(lambda: set()) - contexts = defaultdict(lambda: None) + all_issues = {} n_affected_users_dict = defaultdict(lambda: None) n_affected_sessions_dict = defaultdict(lambda: None) - all_issues_with_context = set() n_issues_dict = defaultdict(lambda: 0) issues_by_session = defaultdict(lambda: 0) @@ -395,15 +391,13 @@ def get_affected_users_for_all_issues(rows, first_stage, last_stage): # check that the issue exists and belongs to subfunnel: if iss is not None and (row[f'stage{last_stage}_timestamp'] is None or (row[f'stage{first_stage}_timestamp'] < iss_ts < row[f'stage{last_stage}_timestamp'])): - context_string = row['issue_context'] if row['issue_context'] is not None else '' - issue_with_context = iss + '__^__' + context_string - contexts[issue_with_context] = {"context": context_string, "id": row["issue_id"]} - all_issues_with_context.add(issue_with_context) - n_issues_dict[issue_with_context] += 1 + if row["issue_id"] not in all_issues: + all_issues[row["issue_id"]] = {"context": row['issue_context'], "issue_type": row["issue_type"]} + n_issues_dict[row["issue_id"]] += 1 if row['user_uuid'] is not None: - affected_users[issue_with_context].add(row['user_uuid']) + affected_users[row["issue_id"]].add(row['user_uuid']) - affected_sessions[issue_with_context].add(row['session_id']) + affected_sessions[row["issue_id"]].add(row['session_id']) issues_by_session[row[f'session_id']] += 1 if len(affected_users) > 0: @@ -414,29 +408,28 @@ def get_affected_users_for_all_issues(rows, first_stage, last_stage): n_affected_sessions_dict.update({ iss: len(affected_sessions[iss]) for iss in affected_sessions }) - return all_issues_with_context, n_issues_dict, n_affected_users_dict, n_affected_sessions_dict, contexts + return all_issues, n_issues_dict, n_affected_users_dict, n_affected_sessions_dict def count_sessions(rows, n_stages): session_counts = {i: set() for i in range(1, n_stages + 1)} - for ind, row in enumerate(rows): + for row in rows: for i in range(1, n_stages + 1): if row[f"stage{i}_timestamp"] is not None: session_counts[i].add(row[f"session_id"]) + session_counts = {i: len(session_counts[i]) for i in session_counts} return session_counts def count_users(rows, n_stages): - users_in_stages = defaultdict(lambda: set()) - - for ind, row in enumerate(rows): + users_in_stages = {i: set() for i in range(1, n_stages + 1)} + for row in rows: for i in range(1, n_stages + 1): if row[f"stage{i}_timestamp"] is not None: users_in_stages[i].add(row["user_uuid"]) users_count = {i: len(users_in_stages[i]) for i in range(1, n_stages + 1)} - return users_count @@ -489,18 +482,18 @@ def get_issues(stages, rows, first_stage=None, last_stage=None, drop_only=False) last_stage = n_stages n_critical_issues = 0 - issues_dict = dict({"significant": [], - "insignificant": []}) + issues_dict = {"significant": [], + "insignificant": []} session_counts = count_sessions(rows, n_stages) drop = session_counts[first_stage] - session_counts[last_stage] - all_issues_with_context, n_issues_dict, affected_users_dict, affected_sessions, contexts = get_affected_users_for_all_issues( + all_issues, n_issues_dict, affected_users_dict, affected_sessions = get_affected_users_for_all_issues( rows, first_stage, last_stage) transitions, errors, all_errors, n_sess_affected = get_transitions_and_issues_of_each_type(rows, - all_issues_with_context, + all_issues, first_stage, last_stage) - # print("len(transitions) =", len(transitions)) + del rows if any(all_errors): total_drop_corr, conf, is_sign = pearson_corr(transitions, all_errors) @@ -513,33 +506,32 @@ def get_issues(stages, rows, first_stage=None, last_stage=None, drop_only=False) if drop_only: return total_drop_due_to_issues - for issue in all_issues_with_context: + for issue_id in all_issues: - if not any(errors[issue]): + if not any(errors[issue_id]): continue - r, confidence, is_sign = pearson_corr(transitions, errors[issue]) + r, confidence, is_sign = pearson_corr(transitions, errors[issue_id]) if r is not None and drop is not None and is_sign: - lost_conversions = int(r * affected_sessions[issue]) + lost_conversions = int(r * affected_sessions[issue_id]) else: lost_conversions = None if r is None: r = 0 - split = issue.split('__^__') issues_dict['significant' if is_sign else 'insignificant'].append({ - "type": split[0], - "title": helper.get_issue_title(split[0]), - "affected_sessions": affected_sessions[issue], - "unaffected_sessions": session_counts[1] - affected_sessions[issue], + "type": all_issues[issue_id]["issue_type"], + "title": helper.get_issue_title(all_issues[issue_id]["issue_type"]), + "affected_sessions": affected_sessions[issue_id], + "unaffected_sessions": session_counts[1] - affected_sessions[issue_id], "lost_conversions": lost_conversions, - "affected_users": affected_users_dict[issue], + "affected_users": affected_users_dict[issue_id], "conversion_impact": round(r * 100), - "context_string": contexts[issue]["context"], - "issue_id": contexts[issue]["id"] + "context_string": all_issues[issue_id]["context"], + "issue_id": issue_id }) if is_sign: - n_critical_issues += n_issues_dict[issue] + n_critical_issues += n_issues_dict[issue_id] return n_critical_issues, issues_dict, total_drop_due_to_issues diff --git a/api/chalicelib/core/sourcemaps.py b/api/chalicelib/core/sourcemaps.py index 921649d97..89df77926 100644 --- a/api/chalicelib/core/sourcemaps.py +++ b/api/chalicelib/core/sourcemaps.py @@ -77,7 +77,7 @@ def format_payload(p, truncate_to_first=False): def url_exists(url): try: r = requests.head(url, allow_redirects=False) - return r.status_code == 200 and r.headers.get("Content-Type") != "text/html" + return r.status_code == 200 and "text/html" not in r.headers.get("Content-Type", "") except Exception as e: print(f"!! Issue checking if URL exists: {url}") print(e) @@ -100,7 +100,6 @@ def get_traces_group(project_id, payload): and not (file_url[:params_idx] if params_idx > -1 else file_url).endswith(".js"): print(f"{u['absPath']} sourcemap is not a JS file") payloads[key] = None - continue if key not in payloads: file_exists_in_bucket = len(file_url) > 0 and s3.exists(config('sourcemaps_bucket'), key) diff --git a/api/env.default b/api/env.default index 82419328c..3ee65e89c 100644 --- a/api/env.default +++ b/api/env.default @@ -42,11 +42,11 @@ sourcemaps_reader=http://sourcemaps-reader-openreplay.app.svc.cluster.local:9000 STAGE=default-foss version_number=1.4.0 FS_DIR=/mnt/efs -EFS_SESSION_MOB_PATTERN=%(sessionId)s/dom.mob -EFS_DEVTOOLS_MOB_PATTERN=%(sessionId)s/devtools.mob +EFS_SESSION_MOB_PATTERN=%(sessionId)s +EFS_DEVTOOLS_MOB_PATTERN=%(sessionId)sdevtools SESSION_MOB_PATTERN_S=%(sessionId)s/dom.mobs SESSION_MOB_PATTERN_E=%(sessionId)s/dom.mobe -DEVTOOLS_MOB_PATTERN=%(sessionId)s/devtools.mobs +DEVTOOLS_MOB_PATTERN=%(sessionId)s/devtools.mob PRESIGNED_URL_EXPIRATION=3600 ASSIST_JWT_EXPIRATION=144000 ASSIST_JWT_SECRET= diff --git a/api/requirements-alerts.txt b/api/requirements-alerts.txt index b30e65988..ff36f3099 100644 --- a/api/requirements-alerts.txt +++ b/api/requirements-alerts.txt @@ -1,15 +1,15 @@ requests==2.28.1 urllib3==1.26.12 -boto3==1.26.4 +boto3==1.26.14 pyjwt==2.6.0 psycopg2-binary==2.9.5 -elasticsearch==8.5.0 +elasticsearch==8.5.1 jira==3.4.1 -fastapi==0.86.0 -uvicorn[standard]==0.19.0 +fastapi==0.87.0 +uvicorn[standard]==0.20.0 python-decouple==3.6 pydantic[email]==1.10.2 -apscheduler==3.9.1 \ No newline at end of file +apscheduler==3.9.1.post1 \ No newline at end of file diff --git a/api/requirements.txt b/api/requirements.txt index b30e65988..ff36f3099 100644 --- a/api/requirements.txt +++ b/api/requirements.txt @@ -1,15 +1,15 @@ requests==2.28.1 urllib3==1.26.12 -boto3==1.26.4 +boto3==1.26.14 pyjwt==2.6.0 psycopg2-binary==2.9.5 -elasticsearch==8.5.0 +elasticsearch==8.5.1 jira==3.4.1 -fastapi==0.86.0 -uvicorn[standard]==0.19.0 +fastapi==0.87.0 +uvicorn[standard]==0.20.0 python-decouple==3.6 pydantic[email]==1.10.2 -apscheduler==3.9.1 \ No newline at end of file +apscheduler==3.9.1.post1 \ No newline at end of file diff --git a/api/routers/core.py b/api/routers/core.py index 80f2b6296..7ee8364e7 100644 --- a/api/routers/core.py +++ b/api/routers/core.py @@ -56,6 +56,14 @@ def sessions_search(projectId: int, data: schemas.FlatSessionsSearchPayloadSchem return {'data': data} +@app.post('/{projectId}/sessions/search/ids', tags=["sessions"]) +@app.post('/{projectId}/sessions/search2/ids', tags=["sessions"]) +def session_ids_search(projectId: int, data: schemas.FlatSessionsSearchPayloadSchema = Body(...), + context: schemas.CurrentContext = Depends(OR_context)): + data = sessions.search_sessions(data=data, project_id=projectId, user_id=context.user_id, ids_only=True) + return {'data': data} + + @app.get('/{projectId}/events/search', tags=["events"]) def events_search(projectId: int, q: str, type: Union[schemas.FilterType, schemas.EventType, diff --git a/backend/Dockerfile b/backend/Dockerfile index 4e0064e9d..0d7cad075 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -1,6 +1,6 @@ FROM golang:1.18-alpine3.15 AS prepare -RUN apk add --no-cache git openssh openssl-dev pkgconf gcc g++ make libc-dev bash +RUN apk add --no-cache git openssh openssl-dev pkgconf gcc g++ make libc-dev bash librdkafka-dev cyrus-sasl cyrus-sasl-gssapiv2 krb5 WORKDIR /root @@ -15,11 +15,11 @@ COPY pkg pkg COPY internal internal ARG SERVICE_NAME -RUN CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -o service -tags musl openreplay/backend/cmd/$SERVICE_NAME +RUN CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -o service -tags dynamic openreplay/backend/cmd/$SERVICE_NAME FROM alpine AS entrypoint -RUN apk add --no-cache ca-certificates +RUN apk add --no-cache ca-certificates librdkafka-dev cyrus-sasl cyrus-sasl-gssapiv2 krb5 RUN adduser -u 1001 openreplay -D ENV TZ=UTC \ @@ -29,6 +29,18 @@ ENV TZ=UTC \ UAPARSER_FILE=/home/openreplay/regexes.yaml \ HTTP_PORT=8080 \ KAFKA_USE_SSL=true \ + # KAFKA_USE_KERBEROS should be set true if you wish to use Kerberos auth for Kafka + KAFKA_USE_KERBEROS=false \ + # KERBEROS_SERVICE_NAME is the primary name of the Brokers configured in the Broker JAAS file + KERBEROS_SERVICE_NAME="" \ + # KERBEROS_PRINCIPAL is this client's principal name + KERBEROS_PRINCIPAL="" \ + # KERBEROS_PRINCIPAL is the absolute path to the keytab to be used for authentication + KERBEROS_KEYTAB_LOCATION="" \ + # KAFKA_SSL_KEY is the absolute path to the CA cert for verifying the broker's key + KAFKA_SSL_KEY="" \ + # KAFKA_SSL_CERT is a CA cert string (PEM format) for verifying the broker's key + KAFKA_SSL_CERT="" \ KAFKA_MAX_POLL_INTERVAL_MS=400000 \ REDIS_STREAMS_MAX_LEN=10000 \ TOPIC_RAW_WEB=raw \ diff --git a/backend/Dockerfile.bundle b/backend/Dockerfile.bundle index 407a7b9d8..19c3b325c 100644 --- a/backend/Dockerfile.bundle +++ b/backend/Dockerfile.bundle @@ -1,6 +1,6 @@ FROM golang:1.18-alpine3.15 AS prepare -RUN apk add --no-cache git openssh openssl-dev pkgconf gcc g++ make libc-dev bash +RUN apk add --no-cache git openssh openssl-dev pkgconf gcc g++ make libc-dev bash librdkafka-dev cyrus-sasl-gssapi cyrus-sasl-devel WORKDIR /root @@ -14,11 +14,11 @@ COPY cmd cmd COPY pkg pkg COPY internal internal -RUN for name in assets db ender http integrations sink storage;do CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -o bin/$name -tags musl openreplay/backend/cmd/$name; done +RUN for name in assets db ender http integrations sink storage;do CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -o bin/$name -tags dynamic openreplay/backend/cmd/$name; done FROM alpine AS entrypoint #FROM pygmy/alpine-tini:latest -RUN apk add --no-cache ca-certificates +RUN apk add --no-cache ca-certificates librdkafka-dev cyrus-sasl-gssapi cyrus-sasl-devel pkgconf ENV TZ=UTC \ FS_ULIMIT=1000 \ @@ -28,6 +28,18 @@ ENV TZ=UTC \ HTTP_PORT=80 \ BEACON_SIZE_LIMIT=7000000 \ KAFKA_USE_SSL=true \ + # KAFKA_USE_KERBEROS should be set true if you wish to use Kerberos auth for Kafka + KAFKA_USE_KERBEROS=false \ + # KERBEROS_SERVICE_NAME is the primary name of the Brokers configured in the Broker JAAS file + KERBEROS_SERVICE_NAME="" \ + # KERBEROS_PRINCIPAL is this client's principal name + KERBEROS_PRINCIPAL="" \ + # KERBEROS_PRINCIPAL is the absolute path to the keytab to be used for authentication + KERBEROS_KEYTAB_LOCATION="" \ + # KAFKA_SSL_KEY is the absolute path to the CA cert for verifying the broker's key + KAFKA_SSL_KEY="" \ + # KAFKA_SSL_CERT is a CA cert string (PEM format) for verifying the broker's key + KAFKA_SSL_CERT="" \ KAFKA_MAX_POLL_INTERVAL_MS=400000 \ REDIS_STREAMS_MAX_LEN=3000 \ TOPIC_RAW_WEB=raw \ diff --git a/backend/cmd/sink/main.go b/backend/cmd/sink/main.go index d3cc99e40..84520dd33 100644 --- a/backend/cmd/sink/main.go +++ b/backend/cmd/sink/main.go @@ -3,19 +3,18 @@ package main import ( "context" "log" - "openreplay/backend/pkg/pprof" "os" "os/signal" - "strings" "syscall" "time" "openreplay/backend/internal/config/sink" "openreplay/backend/internal/sink/assetscache" - "openreplay/backend/internal/sink/oswriter" + "openreplay/backend/internal/sink/sessionwriter" "openreplay/backend/internal/storage" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/monitoring" + "openreplay/backend/pkg/pprof" "openreplay/backend/pkg/queue" "openreplay/backend/pkg/url/assets" ) @@ -33,7 +32,7 @@ func main() { log.Fatalf("%v doesn't exist. %v", cfg.FsDir, err) } - writer := oswriter.NewWriter(cfg.FsUlimit, cfg.FsDir) + writer := sessionwriter.NewWriter(cfg.FsUlimit, cfg.FsDir, cfg.DeadSessionTimeout) producer := queue.NewProducer(cfg.MessageSizeLimit, true) defer producer.Close(cfg.ProducerCloseTimeout) @@ -64,6 +63,7 @@ func main() { if err := producer.Produce(cfg.TopicTrigger, msg.SessionID(), msg.Encode()); err != nil { log.Printf("can't send SessionEnd to trigger topic: %s; sessID: %d", err, msg.SessionID()) } + writer.Close(msg.SessionID()) return } @@ -98,39 +98,18 @@ func main() { // Write encoded message with index to session file data := msg.EncodeWithIndex() if data == nil { - log.Printf("can't encode with index, err: %s", err) return } - wasWritten := false // To avoid timestamp duplicates in original mob file + + // Write message to file if messages.IsDOMType(msg.TypeID()) { if err := writer.WriteDOM(msg.SessionID(), data); err != nil { - if strings.Contains(err.Error(), "not a directory") { - // Trying to write data to mob file by original path - oldErr := writer.WriteMOB(msg.SessionID(), data) - if oldErr != nil { - log.Printf("MOB Writeer error: %s, prev DOM error: %s, info: %s", oldErr, err, msg.Meta().Batch().Info()) - } else { - wasWritten = true - } - } else { - log.Printf("DOM Writer error: %s, info: %s", err, msg.Meta().Batch().Info()) - } + log.Printf("Writer error: %v\n", err) } } if !messages.IsDOMType(msg.TypeID()) || msg.TypeID() == messages.MsgTimestamp { - // TODO: write only necessary timestamps if err := writer.WriteDEV(msg.SessionID(), data); err != nil { - if strings.Contains(err.Error(), "not a directory") { - if !wasWritten { - // Trying to write data to mob file by original path - oldErr := writer.WriteMOB(msg.SessionID(), data) - if oldErr != nil { - log.Printf("MOB Writeer error: %s, prev DEV error: %s, info: %s", oldErr, err, msg.Meta().Batch().Info()) - } - } - } else { - log.Printf("Devtools Writer error: %s, info: %s", err, msg.Meta().Batch().Info()) - } + log.Printf("Writer error: %v\n", err) } } @@ -158,22 +137,20 @@ func main() { select { case sig := <-sigchan: log.Printf("Caught signal %v: terminating\n", sig) - if err := writer.CloseAll(); err != nil { - log.Printf("closeAll error: %v\n", err) - } + // Sync and stop writer + writer.Stop() + // Commit and stop consumer if err := consumer.Commit(); err != nil { log.Printf("can't commit messages: %s", err) } consumer.Close() os.Exit(0) case <-tick: - if err := writer.SyncAll(); err != nil { - log.Fatalf("sync error: %v\n", err) - } counter.Print() if err := consumer.Commit(); err != nil { log.Printf("can't commit messages: %s", err) } + log.Printf("writer: %s", writer.Info()) default: err := consumer.ConsumeNext() if err != nil { diff --git a/backend/go.mod b/backend/go.mod index 0eead389c..b1046b08e 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -69,8 +69,8 @@ require ( golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 // indirect - golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect - golang.org/x/text v0.3.7 // indirect + golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect + golang.org/x/text v0.4.0 // indirect golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd // indirect diff --git a/backend/go.sum b/backend/go.sum index dbaee7216..fea2aa1a3 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -678,8 +678,9 @@ golang.org/x/sys v0.0.0-20220328115105-d36c6a25d886/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220502124256-b6088ccd6cba/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -690,8 +691,9 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg= +golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/backend/internal/config/sink/config.go b/backend/internal/config/sink/config.go index a7481f93a..a8703a596 100644 --- a/backend/internal/config/sink/config.go +++ b/backend/internal/config/sink/config.go @@ -9,6 +9,7 @@ type Config struct { common.Config FsDir string `env:"FS_DIR,required"` FsUlimit uint16 `env:"FS_ULIMIT,required"` + DeadSessionTimeout int64 `env:"DEAD_SESSION_TIMEOUT,default=600"` GroupSink string `env:"GROUP_SINK,required"` TopicRawWeb string `env:"TOPIC_RAW_WEB,required"` TopicRawIOS string `env:"TOPIC_RAW_IOS,required"` @@ -17,7 +18,7 @@ type Config struct { CacheAssets bool `env:"CACHE_ASSETS,required"` AssetsOrigin string `env:"ASSETS_ORIGIN,required"` ProducerCloseTimeout int `env:"PRODUCER_CLOSE_TIMEOUT,default=15000"` - CacheThreshold int64 `env:"CACHE_THRESHOLD,default=75"` + CacheThreshold int64 `env:"CACHE_THRESHOLD,default=5"` CacheExpiration int64 `env:"CACHE_EXPIRATION,default=120"` } diff --git a/backend/internal/config/storage/config.go b/backend/internal/config/storage/config.go index fdf29b7db..6083f0249 100644 --- a/backend/internal/config/storage/config.go +++ b/backend/internal/config/storage/config.go @@ -11,7 +11,6 @@ type Config struct { S3Region string `env:"AWS_REGION_WEB,required"` S3Bucket string `env:"S3_BUCKET_WEB,required"` FSDir string `env:"FS_DIR,required"` - FSCleanHRS int `env:"FS_CLEAN_HRS,required"` FileSplitSize int `env:"FILE_SPLIT_SIZE,required"` RetryTimeout time.Duration `env:"RETRY_TIMEOUT,default=2m"` GroupStorage string `env:"GROUP_STORAGE,required"` @@ -21,6 +20,7 @@ type Config struct { DeleteTimeout time.Duration `env:"DELETE_TIMEOUT,default=48h"` ProducerCloseTimeout int `env:"PRODUCER_CLOSE_TIMEOUT,default=15000"` UseFailover bool `env:"USE_FAILOVER,default=false"` + MaxFileSize int64 `env:"MAX_FILE_SIZE,default=524288000"` } func New() *Config { diff --git a/backend/internal/sink/oswriter/oswriter.go b/backend/internal/sink/oswriter/oswriter.go deleted file mode 100644 index 070540b1d..000000000 --- a/backend/internal/sink/oswriter/oswriter.go +++ /dev/null @@ -1,166 +0,0 @@ -package oswriter - -import ( - "errors" - "log" - "math" - "os" - "path/filepath" - "strconv" - "time" -) - -type Writer struct { - ulimit int - dir string - files map[string]*os.File - atimes map[string]int64 -} - -func NewWriter(ulimit uint16, dir string) *Writer { - return &Writer{ - ulimit: int(ulimit), - dir: dir + "/", - files: make(map[string]*os.File), - atimes: make(map[string]int64), - } -} - -func (w *Writer) open(fname string) (*os.File, error) { - file, ok := w.files[fname] - if ok { - return file, nil - } - if len(w.atimes) == w.ulimit { - var m_k string - var m_t int64 = math.MaxInt64 - for k, t := range w.atimes { - if t < m_t { - m_k = k - m_t = t - } - } - if err := w.close(m_k); err != nil { - return nil, err - } - } - - // mkdir if not exist - pathTo := w.dir + filepath.Dir(fname) - if info, err := os.Stat(pathTo); os.IsNotExist(err) { - if err := os.MkdirAll(pathTo, 0755); err != nil { - log.Printf("os.MkdirAll error: %s", err) - } - } else { - if err != nil { - return nil, err - } - if !info.IsDir() { - return nil, errors.New("not a directory") - } - } - - file, err := os.OpenFile(w.dir+fname, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) - if err != nil { - log.Printf("os.OpenFile error: %s", err) - return nil, err - } - w.files[fname] = file - w.atimes[fname] = time.Now().Unix() - return file, nil -} - -func (w *Writer) close(fname string) error { - file := w.files[fname] - if file == nil { - return nil - } - if err := file.Sync(); err != nil { - return err - } - if err := file.Close(); err != nil { - return err - } - delete(w.files, fname) - delete(w.atimes, fname) - return nil -} - -func (w *Writer) WriteDOM(sid uint64, data []byte) error { - return w.write(strconv.FormatUint(sid, 10)+"/dom.mob", data) -} - -func (w *Writer) WriteDEV(sid uint64, data []byte) error { - return w.write(strconv.FormatUint(sid, 10)+"/devtools.mob", data) -} - -func (w *Writer) WriteMOB(sid uint64, data []byte) error { - // Use session id as a file name without directory - fname := strconv.FormatUint(sid, 10) - file, err := w.openWithoutDir(fname) - if err != nil { - return err - } - _, err = file.Write(data) - return err -} - -func (w *Writer) write(fname string, data []byte) error { - file, err := w.open(fname) - if err != nil { - return err - } - _, err = file.Write(data) - return err -} - -func (w *Writer) openWithoutDir(fname string) (*os.File, error) { - file, ok := w.files[fname] - if ok { - return file, nil - } - if len(w.atimes) == w.ulimit { - var m_k string - var m_t int64 = math.MaxInt64 - for k, t := range w.atimes { - if t < m_t { - m_k = k - m_t = t - } - } - if err := w.close(m_k); err != nil { - return nil, err - } - } - - file, err := os.OpenFile(w.dir+fname, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) - if err != nil { - return nil, err - } - w.files[fname] = file - w.atimes[fname] = time.Now().Unix() - return file, nil -} - -func (w *Writer) SyncAll() error { - for _, file := range w.files { - if err := file.Sync(); err != nil { - return err - } - } - return nil -} - -func (w *Writer) CloseAll() error { - for _, file := range w.files { - if err := file.Sync(); err != nil { - return err - } - if err := file.Close(); err != nil { - return err - } - } - w.files = nil - w.atimes = nil - return nil -} diff --git a/backend/internal/sink/sessionwriter/session.go b/backend/internal/sink/sessionwriter/session.go new file mode 100644 index 000000000..f107c387b --- /dev/null +++ b/backend/internal/sink/sessionwriter/session.go @@ -0,0 +1,81 @@ +package sessionwriter + +import ( + "fmt" + "os" + "strconv" + "sync" + "time" +) + +type Session struct { + lock *sync.Mutex + dom *os.File + dev *os.File + lastUpdate time.Time +} + +func NewSession(dir string, id uint64) (*Session, error) { + if id == 0 { + return nil, fmt.Errorf("wrong session id") + } + + filePath := dir + strconv.FormatUint(id, 10) + domFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + return nil, err + } + filePath += "devtools" + devFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + domFile.Close() // should close first file descriptor + return nil, err + } + + return &Session{ + lock: &sync.Mutex{}, + dom: domFile, + dev: devFile, + lastUpdate: time.Now(), + }, nil +} + +func (s *Session) Lock() { + s.lock.Lock() +} + +func (s *Session) Unlock() { + s.lock.Unlock() +} + +func (s *Session) Write(mode FileType, data []byte) (err error) { + if mode == DOM { + _, err = s.dom.Write(data) + } else { + _, err = s.dev.Write(data) + } + s.lastUpdate = time.Now() + return err +} + +func (s *Session) LastUpdate() time.Time { + return s.lastUpdate +} + +func (s *Session) Sync() error { + domErr := s.dom.Sync() + devErr := s.dev.Sync() + if domErr == nil && devErr == nil { + return nil + } + return fmt.Errorf("dom: %s, dev: %s", domErr, devErr) +} + +func (s *Session) Close() error { + domErr := s.dom.Close() + devErr := s.dev.Close() + if domErr == nil && devErr == nil { + return nil + } + return fmt.Errorf("dom: %s, dev: %s", domErr, devErr) +} diff --git a/backend/internal/sink/sessionwriter/types.go b/backend/internal/sink/sessionwriter/types.go new file mode 100644 index 000000000..a20f61375 --- /dev/null +++ b/backend/internal/sink/sessionwriter/types.go @@ -0,0 +1,8 @@ +package sessionwriter + +type FileType int + +const ( + DOM FileType = 1 + DEV FileType = 2 +) diff --git a/backend/internal/sink/sessionwriter/writer.go b/backend/internal/sink/sessionwriter/writer.go new file mode 100644 index 000000000..94ff5dd66 --- /dev/null +++ b/backend/internal/sink/sessionwriter/writer.go @@ -0,0 +1,179 @@ +package sessionwriter + +import ( + "fmt" + "log" + "math" + "sync" + "time" +) + +type SessionWriter struct { + ulimit int + dir string + zombieSessionTimeout float64 + lock *sync.Mutex + sessions *sync.Map + meta map[uint64]int64 + done chan struct{} + stopped chan struct{} +} + +func NewWriter(ulimit uint16, dir string, zombieSessionTimeout int64) *SessionWriter { + w := &SessionWriter{ + ulimit: int(ulimit) / 2, // should divide by 2 because each session has 2 files + dir: dir + "/", + zombieSessionTimeout: float64(zombieSessionTimeout), + lock: &sync.Mutex{}, + sessions: &sync.Map{}, + meta: make(map[uint64]int64, ulimit), + done: make(chan struct{}), + stopped: make(chan struct{}), + } + go w.synchronizer() + return w +} + +func (w *SessionWriter) WriteDOM(sid uint64, data []byte) error { + return w.write(sid, DOM, data) +} + +func (w *SessionWriter) WriteDEV(sid uint64, data []byte) error { + return w.write(sid, DEV, data) +} + +func (w *SessionWriter) Close(sid uint64) { + w.close(sid) +} + +func (w *SessionWriter) Stop() { + w.done <- struct{}{} + <-w.stopped +} + +func (w *SessionWriter) Info() string { + return fmt.Sprintf("%d sessions", w.numberOfSessions()) +} + +func (w *SessionWriter) addSession(sid uint64) { + w.lock.Lock() + w.meta[sid] = time.Now().Unix() + w.lock.Unlock() +} + +func (w *SessionWriter) deleteSession(sid uint64) { + w.lock.Lock() + delete(w.meta, sid) + w.lock.Unlock() +} + +func (w *SessionWriter) numberOfSessions() int { + w.lock.Lock() + defer w.lock.Unlock() + return len(w.meta) +} + +func (w *SessionWriter) write(sid uint64, mode FileType, data []byte) error { + var ( + sess *Session + err error + ) + + sessObj, ok := w.sessions.Load(sid) + if !ok { + sess, err = NewSession(w.dir, sid) + if err != nil { + return fmt.Errorf("can't write to session: %d, err: %s", sid, err) + } + sess.Lock() + defer sess.Unlock() + + // Check opened files limit + if len(w.meta) >= w.ulimit { + var oldSessID uint64 + var minTimestamp int64 = math.MaxInt64 + for sessID, timestamp := range w.meta { + if timestamp < minTimestamp { + oldSessID = sessID + minTimestamp = timestamp + } + } + if err := w.close(oldSessID); err != nil { + log.Printf("can't close session: %s", err) + } + } + + // Add new session to manager + w.sessions.Store(sid, sess) + w.addSession(sid) + } else { + sess = sessObj.(*Session) + sess.Lock() + defer sess.Unlock() + } + + // Write data to session + return sess.Write(mode, data) +} + +func (w *SessionWriter) sync(sid uint64) error { + sessObj, ok := w.sessions.Load(sid) + if !ok { + return fmt.Errorf("can't sync, session: %d not found", sid) + } + sess := sessObj.(*Session) + sess.Lock() + defer sess.Unlock() + + err := sess.Sync() + if time.Now().Sub(sess.LastUpdate()).Seconds() > w.zombieSessionTimeout { + if err != nil { + log.Printf("can't sync session: %d, err: %s", sid, err) + } + // Close "zombie" session + err = sess.Close() + w.deleteSession(sid) + } + return err +} + +func (w *SessionWriter) close(sid uint64) error { + sessObj, ok := w.sessions.LoadAndDelete(sid) + if !ok { + return fmt.Errorf("can't close, session: %d not found", sid) + } + sess := sessObj.(*Session) + sess.Lock() + defer sess.Unlock() + + if err := sess.Sync(); err != nil { + log.Printf("can't sync session: %d, err: %s", sid, err) + } + err := sess.Close() + w.deleteSession(sid) + return err +} + +func (w *SessionWriter) synchronizer() { + tick := time.Tick(2 * time.Second) + for { + select { + case <-tick: + w.sessions.Range(func(sid, lockObj any) bool { + if err := w.sync(sid.(uint64)); err != nil { + log.Printf("can't sync file descriptor: %s", err) + } + return true + }) + case <-w.done: + w.sessions.Range(func(sid, lockObj any) bool { + if err := w.close(sid.(uint64)); err != nil { + log.Printf("can't close file descriptor: %s", err) + } + return true + }) + w.stopped <- struct{}{} + return + } + } +} diff --git a/backend/internal/storage/storage.go b/backend/internal/storage/storage.go index 7fdc06c4f..12a37183f 100644 --- a/backend/internal/storage/storage.go +++ b/backend/internal/storage/storage.go @@ -13,7 +13,6 @@ import ( "openreplay/backend/pkg/storage" "os" "strconv" - "strings" "time" ) @@ -71,43 +70,46 @@ func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Stor } func (s *Storage) UploadSessionFiles(msg *messages.SessionEnd) error { - sessionDir := strconv.FormatUint(msg.SessionID(), 10) - if err := s.uploadKey(msg.SessionID(), sessionDir+"/dom.mob", true, 5, msg.EncryptionKey); err != nil { - oldErr := s.uploadKey(msg.SessionID(), sessionDir, true, 5, msg.EncryptionKey) - if oldErr != nil { - return fmt.Errorf("upload file error: %s. failed checking mob file using old path: %s", err, oldErr) - } - // Exit method anyway because we don't have dev tools separation in prev version - return nil - } - if err := s.uploadKey(msg.SessionID(), sessionDir+"/devtools.mob", false, 4, msg.EncryptionKey); err != nil { + if err := s.uploadKey(msg.SessionID(), "/dom.mob", true, 5, msg.EncryptionKey); err != nil { return err } + if err := s.uploadKey(msg.SessionID(), "/devtools.mob", false, 4, msg.EncryptionKey); err != nil { + log.Printf("can't find devtools for session: %d, err: %s", msg.SessionID(), err) + } return nil } -// TODO: make a bit cleaner -func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCount int, encryptionKey string) error { +// TODO: make a bit cleaner. +// TODO: Of course, I'll do! +func (s *Storage) uploadKey(sessID uint64, suffix string, shouldSplit bool, retryCount int, encryptionKey string) error { if retryCount <= 0 { return nil } - start := time.Now() - file, err := os.Open(s.cfg.FSDir + "/" + key) + fileName := strconv.FormatUint(sessID, 10) + mobFileName := fileName + if suffix == "/devtools.mob" { + mobFileName += "devtools" + } + filePath := s.cfg.FSDir + "/" + mobFileName + + // Check file size before download into memory + info, err := os.Stat(filePath) + if err == nil { + if info.Size() > s.cfg.MaxFileSize { + log.Printf("big file, size: %d, session: %d", info.Size(), sessID) + return nil + } + } + file, err := os.Open(filePath) if err != nil { return fmt.Errorf("File open error: %v; sessID: %s, part: %d, sessStart: %s\n", - err, key, sessID%16, + err, fileName, sessID%16, time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))), ) } defer file.Close() - // Ignore "s" at the end of mob file name for "old" sessions - newVers := false - if strings.Contains(key, "/") { - newVers = true - } - var fileSize int64 = 0 fileInfo, err := file.Stat() if err != nil { @@ -117,17 +119,18 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo } var encryptedData []byte + fileName += suffix if shouldSplit { nRead, err := file.Read(s.startBytes) if err != nil { log.Printf("File read error: %s; sessID: %s, part: %d, sessStart: %s", err, - key, + fileName, sessID%16, time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))), ) time.AfterFunc(s.cfg.RetryTimeout, func() { - s.uploadKey(sessID, key, shouldSplit, retryCount-1, encryptionKey) + s.uploadKey(sessID, suffix, shouldSplit, retryCount-1, encryptionKey) }) return nil } @@ -146,11 +149,7 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo } // Compress and save to s3 startReader := bytes.NewBuffer(encryptedData) - startKey := key - if newVers { - startKey += "s" - } - if err := s.s3.Upload(s.gzipFile(startReader), startKey, "application/octet-stream", true); err != nil { + if err := s.s3.Upload(s.gzipFile(startReader), fileName+"s", "application/octet-stream", true); err != nil { log.Fatalf("Storage: start upload failed. %v\n", err) } // TODO: fix possible error (if we read less then FileSplitSize) @@ -161,7 +160,7 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo if err != nil { log.Printf("File read error: %s; sessID: %s, part: %d, sessStart: %s", err, - key, + fileName, sessID%16, time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))), ) @@ -183,7 +182,7 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo } // Compress and save to s3 endReader := bytes.NewBuffer(encryptedData) - if err := s.s3.Upload(s.gzipFile(endReader), key+"e", "application/octet-stream", true); err != nil { + if err := s.s3.Upload(s.gzipFile(endReader), fileName+"e", "application/octet-stream", true); err != nil { log.Fatalf("Storage: end upload failed. %v\n", err) } } @@ -195,7 +194,7 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo if err != nil { log.Printf("File read error: %s; sessID: %s, part: %d, sessStart: %s", err, - key, + fileName, sessID%16, time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))), ) @@ -216,7 +215,7 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo encryptedData = fileData } endReader := bytes.NewBuffer(encryptedData) - if err := s.s3.Upload(s.gzipFile(endReader), key+"s", "application/octet-stream", true); err != nil { + if err := s.s3.Upload(s.gzipFile(endReader), fileName, "application/octet-stream", true); err != nil { log.Fatalf("Storage: end upload failed. %v\n", err) } s.archivingTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds())) diff --git a/backend/pkg/db/types/error-event.go b/backend/pkg/db/types/error-event.go index 826cbba9e..bef9abd99 100644 --- a/backend/pkg/db/types/error-event.go +++ b/backend/pkg/db/types/error-event.go @@ -11,6 +11,8 @@ import ( . "openreplay/backend/pkg/messages" ) +const SOURCE_JS = "js_exception" + type ErrorEvent struct { MessageID uint64 Timestamp uint64 @@ -64,7 +66,7 @@ func WrapJSException(m *JSException) *ErrorEvent { return &ErrorEvent{ MessageID: m.Meta().Index, Timestamp: uint64(m.Meta().Timestamp), - Source: "js_exception", + Source: SOURCE_JS, Name: m.Name, Message: m.Message, Payload: m.Payload, @@ -105,14 +107,16 @@ func (e *ErrorEvent) ID(projectID uint32) string { hash.Write([]byte(e.Source)) hash.Write([]byte(e.Name)) hash.Write([]byte(e.Message)) - frame, err := parseFirstFrame(e.Payload) - if err != nil { - log.Printf("Can't parse stackframe ((( %v ))): %v", e.Payload, err) - } - if frame != nil { - hash.Write([]byte(frame.FileName)) - hash.Write([]byte(strconv.Itoa(frame.LineNo))) - hash.Write([]byte(strconv.Itoa(frame.ColNo))) + if e.Source == SOURCE_JS { + frame, err := parseFirstFrame(e.Payload) + if err != nil { + log.Printf("Can't parse stackframe ((( %v ))): %v", e.Payload, err) + } + if frame != nil { + hash.Write([]byte(frame.FileName)) + hash.Write([]byte(strconv.Itoa(frame.LineNo))) + hash.Write([]byte(strconv.Itoa(frame.ColNo))) + } } return strconv.FormatUint(uint64(projectID), 16) + hex.EncodeToString(hash.Sum(nil)) } diff --git a/ee/api/chalicelib/core/alerts_processor.py b/ee/api/chalicelib/core/alerts_processor.py index 087f23a05..326d17ffc 100644 --- a/ee/api/chalicelib/core/alerts_processor.py +++ b/ee/api/chalicelib/core/alerts_processor.py @@ -204,7 +204,8 @@ def process(): logging.info(f"Valid alert, notifying users, alertId:{alert['alertId']} name: {alert['name']}") notifications.append(generate_notification(alert, result)) except Exception as e: - logging.error(f"!!!Error while running alert query for alertId:{alert['alertId']} name: {alert['name']}") + logging.error( + f"!!!Error while running alert query for alertId:{alert['alertId']} name: {alert['name']}") logging.error(query) logging.error(e) cur = cur.recreate(rollback=True) @@ -217,12 +218,22 @@ def process(): alerts.process_notifications(notifications) +def __format_value(x): + if x % 1 == 0: + x = int(x) + else: + x = round(x, 2) + return f"{x:,}" + + def generate_notification(alert, result): + left = __format_value(result['value']) + right = __format_value(alert['query']['right']) return { "alertId": alert["alertId"], "tenantId": alert["tenantId"], "title": alert["name"], - "description": f"has been triggered, {alert['query']['left']} = {round(result['value'], 2)} ({alert['query']['operator']} {alert['query']['right']}).", + "description": f"has been triggered, {alert['query']['left']} = {left} ({alert['query']['operator']} {right}).", "buttonText": "Check metrics for more details", "buttonUrl": f"/{alert['projectId']}/metrics", "imageUrl": None, diff --git a/ee/api/chalicelib/core/metrics.py b/ee/api/chalicelib/core/metrics.py index 452566194..2a2f6ee20 100644 --- a/ee/api/chalicelib/core/metrics.py +++ b/ee/api/chalicelib/core/metrics.py @@ -452,18 +452,18 @@ def get_slowest_images(project_id, startTimestamp=TimeUTC.now(delta_days=-1), ch_sub_query.append("resources.type = 'img'") ch_sub_query_chart = __get_basic_constraints(table_name="resources", round_start=True, data=args) ch_sub_query_chart.append("resources.type = 'img'") - ch_sub_query_chart.append("resources.url IN %(url)s") + ch_sub_query_chart.append("resources.url_hostpath IN %(url)s") meta_condition = __get_meta_constraint(args) ch_sub_query += meta_condition ch_sub_query_chart += meta_condition with ch_client.ClickHouseClient() as ch: - ch_query = f"""SELECT resources.url, + ch_query = f"""SELECT resources.url_hostpath AS url, COALESCE(avgOrNull(resources.duration),0) AS avg, COUNT(1) AS count FROM resources {"INNER JOIN sessions_metadata USING(session_id)" if len(meta_condition) > 0 else ""} WHERE {" AND ".join(ch_sub_query)} AND resources.duration>0 - GROUP BY resources.url ORDER BY avg DESC LIMIT 10;""" + GROUP BY resources.url_hostpath ORDER BY avg DESC LIMIT 10;""" params = {"step_size": step_size, "project_id": project_id, "startTimestamp": startTimestamp, "endTimestamp": endTimestamp, **__get_constraint_values(args)} rows = ch.execute(query=ch_query, params=params) @@ -474,13 +474,13 @@ def get_slowest_images(project_id, startTimestamp=TimeUTC.now(delta_days=-1), urls = [row["url"] for row in rows] charts = {} - ch_query = f"""SELECT url, + ch_query = f"""SELECT url_hostpath AS url, toUnixTimestamp(toStartOfInterval(resources.datetime, INTERVAL %(step_size)s second ))*1000 AS timestamp, COALESCE(avgOrNull(resources.duration),0) AS avg FROM resources {"INNER JOIN sessions_metadata USING(session_id)" if len(meta_condition) > 0 else ""} WHERE {" AND ".join(ch_sub_query_chart)} AND resources.duration>0 - GROUP BY url, timestamp - ORDER BY url, timestamp;""" + GROUP BY url_hostpath, timestamp + ORDER BY url_hostpath, timestamp;""" params["url"] = urls u_rows = ch.execute(query=ch_query, params=params) for url in urls: @@ -526,13 +526,13 @@ def get_performance(project_id, startTimestamp=TimeUTC.now(delta_days=-1), endTi if resources and len(resources) > 0: for r in resources: if r["type"] == "IMG": - img_constraints.append(f"resources.url = %(val_{len(img_constraints)})s") + img_constraints.append(f"resources.url_hostpath = %(val_{len(img_constraints)})s") img_constraints_vals["val_" + str(len(img_constraints) - 1)] = r['value'] elif r["type"] == "LOCATION": location_constraints.append(f"pages.url_path = %(val_{len(location_constraints)})s") location_constraints_vals["val_" + str(len(location_constraints) - 1)] = r['value'] else: - request_constraints.append(f"resources.url = %(val_{len(request_constraints)})s") + request_constraints.append(f"resources.url_hostpath = %(val_{len(request_constraints)})s") request_constraints_vals["val_" + str(len(request_constraints) - 1)] = r['value'] params = {"step_size": step_size, "project_id": project_id, "startTimestamp": startTimestamp, "endTimestamp": endTimestamp} @@ -638,7 +638,7 @@ def search(text, resource_type, project_id, performance=False, pages_only=False, if resource_type == "ALL" and not pages_only and not events_only: ch_sub_query.append("positionUTF8(url_hostpath,%(value)s)!=0") with ch_client.ClickHouseClient() as ch: - ch_query = f"""SELECT arrayJoin(arraySlice(arrayReverseSort(arrayDistinct(groupArray(url))), 1, 5)) AS value, + ch_query = f"""SELECT arrayJoin(arraySlice(arrayReverseSort(arrayDistinct(groupArray(url_hostpath))), 1, 5)) AS value, type AS key FROM resources WHERE {" AND ".join(ch_sub_query)} @@ -884,7 +884,7 @@ def get_resources_loading_time(project_id, startTimestamp=TimeUTC.now(delta_days if type is not None: ch_sub_query_chart.append(f"resources.type = '{__get_resource_db_type_from_type(type)}'") if url is not None: - ch_sub_query_chart.append(f"resources.url = %(value)s") + ch_sub_query_chart.append(f"resources.url_hostpath = %(value)s") meta_condition = __get_meta_constraint(args) ch_sub_query_chart += meta_condition ch_sub_query_chart.append("resources.duration>0") @@ -966,7 +966,7 @@ def get_slowest_resources(project_id, startTimestamp=TimeUTC.now(delta_days=-1), ch_sub_query_chart.append("isNotNull(resources.duration)") ch_sub_query_chart.append("resources.duration>0") with ch_client.ClickHouseClient() as ch: - ch_query = f"""SELECT any(url) AS url, any(type) AS type, + ch_query = f"""SELECT any(url_hostpath) AS url, any(type) AS type, splitByChar('/', resources.url_hostpath)[-1] AS name, COALESCE(avgOrNull(NULLIF(resources.duration,0)),0) AS avg FROM resources {"INNER JOIN sessions_metadata USING(session_id)" if len(meta_condition) > 0 else ""} @@ -2179,7 +2179,7 @@ def get_performance_avg_image_load_time(ch, project_id, startTimestamp=TimeUTC.n if resources and len(resources) > 0: for r in resources: if r["type"] == "IMG": - img_constraints.append(f"resources.url = %(val_{len(img_constraints)})s") + img_constraints.append(f"resources.url_hostpath = %(val_{len(img_constraints)})s") img_constraints_vals["val_" + str(len(img_constraints) - 1)] = r['value'] params = {"step_size": step_size, "project_id": project_id, "startTimestamp": startTimestamp, @@ -2254,7 +2254,7 @@ def get_performance_avg_request_load_time(ch, project_id, startTimestamp=TimeUTC if resources and len(resources) > 0: for r in resources: if r["type"] != "IMG" and r["type"] == "LOCATION": - request_constraints.append(f"resources.url = %(val_{len(request_constraints)})s") + request_constraints.append(f"resources.url_hostpath = %(val_{len(request_constraints)})s") request_constraints_vals["val_" + str(len(request_constraints) - 1)] = r['value'] params = {"step_size": step_size, "project_id": project_id, "startTimestamp": startTimestamp, "endTimestamp": endTimestamp} diff --git a/ee/api/chalicelib/core/metrics_exp.py b/ee/api/chalicelib/core/metrics_exp.py index 9a8af012b..c41676d4a 100644 --- a/ee/api/chalicelib/core/metrics_exp.py +++ b/ee/api/chalicelib/core/metrics_exp.py @@ -462,18 +462,18 @@ def get_slowest_images(project_id, startTimestamp=TimeUTC.now(delta_days=-1), ch_sub_query_chart = __get_basic_constraints(table_name="resources", round_start=True, data=args) # ch_sub_query_chart.append("events.event_type='RESOURCE'") ch_sub_query_chart.append("resources.type = 'img'") - ch_sub_query_chart.append("resources.url IN %(url)s") + ch_sub_query_chart.append("resources.url_hostpath IN %(url)s") meta_condition = __get_meta_constraint(args) ch_sub_query += meta_condition ch_sub_query_chart += meta_condition with ch_client.ClickHouseClient() as ch: - ch_query = f"""SELECT resources.url, + ch_query = f"""SELECT resources.url_hostpath AS url, COALESCE(avgOrNull(resources.duration),0) AS avg, COUNT(1) AS count FROM {exp_ch_helper.get_main_resources_table(startTimestamp)} AS resources WHERE {" AND ".join(ch_sub_query)} AND resources.duration>0 - GROUP BY resources.url ORDER BY avg DESC LIMIT 10;""" + GROUP BY resources.url_hostpath ORDER BY avg DESC LIMIT 10;""" params = {"step_size": step_size, "project_id": project_id, "startTimestamp": startTimestamp, "endTimestamp": endTimestamp, **__get_constraint_values(args)} rows = ch.execute(query=ch_query, params=params) @@ -484,13 +484,13 @@ def get_slowest_images(project_id, startTimestamp=TimeUTC.now(delta_days=-1), urls = [row["url"] for row in rows] charts = {} - ch_query = f"""SELECT url, + ch_query = f"""SELECT url_hostpath AS url, toUnixTimestamp(toStartOfInterval(resources.datetime, INTERVAL %(step_size)s second ))*1000 AS timestamp, COALESCE(avgOrNull(resources.duration),0) AS avg FROM {exp_ch_helper.get_main_resources_table(startTimestamp)} AS resources WHERE {" AND ".join(ch_sub_query_chart)} AND resources.duration>0 - GROUP BY url, timestamp - ORDER BY url, timestamp;""" + GROUP BY url_hostpath, timestamp + ORDER BY url_hostpath, timestamp;""" params["url"] = urls # print(ch.format(query=ch_query, params=params)) u_rows = ch.execute(query=ch_query, params=params) @@ -538,13 +538,13 @@ def get_performance(project_id, startTimestamp=TimeUTC.now(delta_days=-1), endTi if resources and len(resources) > 0: for r in resources: if r["type"] == "IMG": - img_constraints.append(f"resources.url = %(val_{len(img_constraints)})s") + img_constraints.append(f"resources.url_hostpath = %(val_{len(img_constraints)})s") img_constraints_vals["val_" + str(len(img_constraints) - 1)] = r['value'] elif r["type"] == "LOCATION": location_constraints.append(f"pages.url_path = %(val_{len(location_constraints)})s") location_constraints_vals["val_" + str(len(location_constraints) - 1)] = r['value'] else: - request_constraints.append(f"resources.url = %(val_{len(request_constraints)})s") + request_constraints.append(f"resources.url_hostpath = %(val_{len(request_constraints)})s") request_constraints_vals["val_" + str(len(request_constraints) - 1)] = r['value'] params = {"step_size": step_size, "project_id": project_id, "startTimestamp": startTimestamp, "endTimestamp": endTimestamp} @@ -891,7 +891,7 @@ def get_resources_loading_time(project_id, startTimestamp=TimeUTC.now(delta_days if type is not None: ch_sub_query_chart.append(f"resources.type = '{__get_resource_db_type_from_type(type)}'") if url is not None: - ch_sub_query_chart.append(f"resources.url = %(value)s") + ch_sub_query_chart.append(f"resources.url_hostpath = %(value)s") meta_condition = __get_meta_constraint(args) ch_sub_query_chart += meta_condition ch_sub_query_chart.append("resources.duration>0") @@ -974,7 +974,7 @@ def get_slowest_resources(project_id, startTimestamp=TimeUTC.now(delta_days=-1), ch_sub_query_chart.append("isNotNull(resources.duration)") ch_sub_query_chart.append("resources.duration>0") with ch_client.ClickHouseClient() as ch: - ch_query = f"""SELECT any(url) AS url, any(type) AS type, name, + ch_query = f"""SELECT any(url_hostpath) AS url, any(type) AS type, name, COALESCE(avgOrNull(NULLIF(resources.duration,0)),0) AS avg FROM {exp_ch_helper.get_main_resources_table(startTimestamp)} AS resources WHERE {" AND ".join(ch_sub_query)} @@ -2185,7 +2185,7 @@ def get_performance_avg_image_load_time(ch, project_id, startTimestamp=TimeUTC.n if resources and len(resources) > 0: for r in resources: if r["type"] == "IMG": - img_constraints.append(f"resources.url = %(val_{len(img_constraints)})s") + img_constraints.append(f"resources.url_hostpath = %(val_{len(img_constraints)})s") img_constraints_vals["val_" + str(len(img_constraints) - 1)] = r['value'] params = {"step_size": step_size, "project_id": project_id, "startTimestamp": startTimestamp, @@ -2260,7 +2260,7 @@ def get_performance_avg_request_load_time(ch, project_id, startTimestamp=TimeUTC if resources and len(resources) > 0: for r in resources: if r["type"] != "IMG" and r["type"] == "LOCATION": - request_constraints.append(f"resources.url = %(val_{len(request_constraints)})s") + request_constraints.append(f"resources.url_hostpath = %(val_{len(request_constraints)})s") request_constraints_vals["val_" + str(len(request_constraints) - 1)] = r['value'] params = {"step_size": step_size, "project_id": project_id, "startTimestamp": startTimestamp, "endTimestamp": endTimestamp} diff --git a/ee/api/chalicelib/core/sessions.py b/ee/api/chalicelib/core/sessions.py index 8c9eaf006..7d999fe6c 100644 --- a/ee/api/chalicelib/core/sessions.py +++ b/ee/api/chalicelib/core/sessions.py @@ -107,8 +107,7 @@ def get_by_id2_pg(project_id, session_id, context: schemas_ee.CurrentContext, fu session_id=session_id, user_id=context.user_id) data['metadata'] = __group_metadata(project_metadata=data.pop("projectMetadata"), session=data) data['issues'] = issues.get_by_session_id(session_id=session_id, project_id=project_id) - data['live'] = live and assist.is_live(project_id=project_id, - session_id=session_id, + data['live'] = live and assist.is_live(project_id=project_id, session_id=session_id, project_key=data["projectKey"]) data["inDB"] = True return data @@ -181,7 +180,7 @@ def _isUndefined_operator(op: schemas.SearchEventOperator): # This function executes the query and return result def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_id, errors_only=False, - error_status=schemas.ErrorStatus.all, count_only=False, issue=None): + error_status=schemas.ErrorStatus.all, count_only=False, issue=None, ids_only=False): if data.bookmarked: data.startDate, data.endDate = sessions_favorite.get_start_end_timestamp(project_id, user_id) @@ -189,9 +188,11 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_ favorite_only=data.bookmarked, issue=issue, project_id=project_id, user_id=user_id) if data.limit is not None and data.page is not None: + full_args["sessions_limit"] = data.limit full_args["sessions_limit_s"] = (data.page - 1) * data.limit full_args["sessions_limit_e"] = data.page * data.limit else: + full_args["sessions_limit"] = 200 full_args["sessions_limit_s"] = 1 full_args["sessions_limit_e"] = 200 @@ -239,6 +240,12 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_ GROUP BY user_id ) AS users_sessions;""", full_args) + elif ids_only: + main_query = cur.mogrify(f"""SELECT DISTINCT ON(s.session_id) s.session_id + {query_part} + ORDER BY s.session_id desc + LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s;""", + full_args) else: if data.order is None: data.order = schemas.SortOrderType.desc @@ -246,7 +253,6 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_ if data.sort is not None and data.sort != "session_id": # sort += " " + data.order + "," + helper.key_to_snake_case(data.sort) sort = helper.key_to_snake_case(data.sort) - meta_keys = metadata.get(project_id=project_id) main_query = cur.mogrify(f"""SELECT COUNT(full_sessions) AS count, COALESCE(JSONB_AGG(full_sessions) @@ -270,7 +276,7 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_ print(data.json()) print("--------------------") raise err - if errors_only: + if errors_only or ids_only: return helper.list_to_camel_case(cur.fetchall()) sessions = cur.fetchone() diff --git a/ee/api/chalicelib/core/sessions_devtool.py b/ee/api/chalicelib/core/sessions_devtool.py index ed6ecf694..9435c2e24 100644 --- a/ee/api/chalicelib/core/sessions_devtool.py +++ b/ee/api/chalicelib/core/sessions_devtool.py @@ -3,7 +3,7 @@ from fastapi.security import SecurityScopes import schemas_ee from chalicelib.core import permissions -from chalicelib.utils.s3 import client +from chalicelib.utils import s3 SCOPES = SecurityScopes([schemas_ee.Permissions.dev_tools]) @@ -23,7 +23,7 @@ def get_urls(session_id, project_id, context: schemas_ee.CurrentContext): return [] results = [] for k in __get_devtools_keys(project_id=project_id, session_id=session_id): - results.append(client.generate_presigned_url( + results.append(s3.client.generate_presigned_url( 'get_object', Params={'Bucket': config("sessions_bucket"), 'Key': k}, ExpiresIn=config("PRESIGNED_URL_EXPIRATION", cast=int, default=900) diff --git a/ee/api/chalicelib/core/sessions_favorite.py b/ee/api/chalicelib/core/sessions_favorite.py index 3d6496424..7af995bad 100644 --- a/ee/api/chalicelib/core/sessions_favorite.py +++ b/ee/api/chalicelib/core/sessions_favorite.py @@ -1,7 +1,7 @@ from decouple import config import schemas_ee -from chalicelib.core import sessions, sessions_favorite_exp +from chalicelib.core import sessions, sessions_favorite_exp, sessions_mobs, sessions_devtool from chalicelib.utils import pg_client, s3_extra @@ -34,32 +34,31 @@ def remove_favorite_session(context: schemas_ee.CurrentContext, project_id, sess def favorite_session(context: schemas_ee.CurrentContext, project_id, session_id): + keys = sessions_mobs.__get_mob_keys(project_id=project_id, session_id=session_id) + keys += sessions_mobs.__get_mob_keys_deprecated(session_id=session_id) # To support old sessions + keys += sessions_devtool.__get_devtools_keys(project_id=project_id, session_id=session_id) + if favorite_session_exists(user_id=context.user_id, session_id=session_id): - key = str(session_id) - try: - s3_extra.tag_file(session_id=key, tag_value=config('RETENTION_D_VALUE', default='default')) - except Exception as e: - print(f"!!!Error while tagging: {key} to default") - print(str(e)) - key = str(session_id) + "e" - try: - s3_extra.tag_file(session_id=key, tag_value=config('RETENTION_D_VALUE', default='default')) - except Exception as e: - print(f"!!!Error while tagging: {key} to default") - print(str(e)) + tag = config('RETENTION_D_VALUE', default='default') + + for k in keys: + try: + s3_extra.tag_session(file_key=k, tag_value=tag) + except Exception as e: + print(f"!!!Error while tagging: {k} to {tag} for removal") + print(str(e)) + return remove_favorite_session(context=context, project_id=project_id, session_id=session_id) - key = str(session_id) - try: - s3_extra.tag_file(session_id=key, tag_value=config('RETENTION_L_VALUE', default='vault')) - except Exception as e: - print(f"!!!Error while tagging: {key} to vault") - print(str(e)) - key = str(session_id) + "e" - try: - s3_extra.tag_file(session_id=key, tag_value=config('RETENTION_L_VALUE', default='vault')) - except Exception as e: - print(f"!!!Error while tagging: {key} to vault") - print(str(e)) + + tag = config('RETENTION_L_VALUE', default='vault') + + for k in keys: + try: + s3_extra.tag_session(file_key=k, tag_value=tag) + except Exception as e: + print(f"!!!Error while tagging: {k} to {tag} for vault") + print(str(e)) + return add_favorite_session(context=context, project_id=project_id, session_id=session_id) diff --git a/ee/api/chalicelib/core/significance.py b/ee/api/chalicelib/core/significance.py index 59f773c9e..75df1cd94 100644 --- a/ee/api/chalicelib/core/significance.py +++ b/ee/api/chalicelib/core/significance.py @@ -188,9 +188,7 @@ def get_stages_and_events(filter_d, project_id) -> List[RealDictRow]: values=s["value"], value_key=f"value{i + 1}") n_stages_query.append(f""" (SELECT main.session_id, - {"MIN(main.timestamp)" if i + 1 < len(stages) else "MAX(main.timestamp)"} AS stage{i + 1}_timestamp, - '{event_type}' AS type, - '{s["operator"]}' AS operator + {"MIN(main.timestamp)" if i + 1 < len(stages) else "MAX(main.timestamp)"} AS stage{i + 1}_timestamp FROM {next_table} AS main {" ".join(extra_from)} WHERE main.timestamp >= {f"T{i}.stage{i}_timestamp" if i > 0 else "%(startTimestamp)s"} {f"AND main.session_id=T1.session_id" if i > 0 else ""} @@ -198,45 +196,54 @@ def get_stages_and_events(filter_d, project_id) -> List[RealDictRow]: {(" AND " + " AND ".join(stage_constraints)) if len(stage_constraints) > 0 else ""} {(" AND " + " AND ".join(first_stage_extra_constraints)) if len(first_stage_extra_constraints) > 0 and i == 0 else ""} GROUP BY main.session_id) - AS T{i + 1} {"USING (session_id)" if i > 0 else ""} + AS T{i + 1} {"ON (TRUE)" if i > 0 else ""} """) - if len(n_stages_query) == 0: + n_stages=len(n_stages_query) + if n_stages == 0: return [] n_stages_query = " LEFT JOIN LATERAL ".join(n_stages_query) n_stages_query += ") AS stages_t" n_stages_query = f""" - SELECT stages_and_issues_t.*,sessions.session_id, sessions.user_uuid FROM ( + SELECT stages_and_issues_t.*, sessions.user_uuid + FROM ( SELECT * FROM ( - SELECT * FROM - {n_stages_query} + SELECT T1.session_id, {",".join([f"stage{i + 1}_timestamp" for i in range(n_stages)])} + FROM {n_stages_query} LEFT JOIN LATERAL - ( - SELECT * FROM - (SELECT ISE.session_id, - ISS.type as issue_type, + ( SELECT ISS.type as issue_type, ISE.timestamp AS issue_timestamp, - ISS.context_string as issue_context, + COALESCE(ISS.context_string,'') as issue_context, ISS.issue_id as issue_id FROM events_common.issues AS ISE INNER JOIN issues AS ISS USING (issue_id) WHERE ISE.timestamp >= stages_t.stage1_timestamp AND ISE.timestamp <= stages_t.stage{i + 1}_timestamp AND ISS.project_id=%(project_id)s - {"AND ISS.type IN %(issueTypes)s" if len(filter_issues) > 0 else ""}) AS base_t - ) AS issues_t - USING (session_id)) AS stages_and_issues_t - inner join sessions USING(session_id); + AND ISE.session_id = stages_t.session_id + {"AND ISS.type IN %(issueTypes)s" if len(filter_issues) > 0 else ""} + LIMIT 20 -- remove the limit to get exact stats + ) AS issues_t ON (TRUE) + ) AS stages_and_issues_t INNER JOIN sessions USING(session_id); """ # LIMIT 10000 params = {"project_id": project_id, "startTimestamp": filter_d["startDate"], "endTimestamp": filter_d["endDate"], "issueTypes": tuple(filter_issues), **values} with pg_client.PostgresClient() as cur: + query = cur.mogrify(n_stages_query, params) # print("---------------------------------------------------") - # print(cur.mogrify(n_stages_query, params)) + # print(query) # print("---------------------------------------------------") - cur.execute(cur.mogrify(n_stages_query, params)) - rows = cur.fetchall() + try: + cur.execute(query) + rows = cur.fetchall() + except Exception as err: + print("--------- FUNNEL SEARCH QUERY EXCEPTION -----------") + print(query.decode('UTF-8')) + print("--------- PAYLOAD -----------") + print(filter_d) + print("--------------------") + raise err return rows @@ -298,7 +305,21 @@ def pearson_corr(x: list, y: list): return r, confidence, False -def get_transitions_and_issues_of_each_type(rows: List[RealDictRow], all_issues_with_context, first_stage, last_stage): +# def tuple_or(t: tuple): +# x = 0 +# for el in t: +# x |= el # | is for bitwise OR +# return x +# +# The following function is correct optimization of the previous function because t is a list of 0,1 +def tuple_or(t: tuple): + for el in t: + if el > 0: + return 1 + return 0 + + +def get_transitions_and_issues_of_each_type(rows: List[RealDictRow], all_issues, first_stage, last_stage): """ Returns two lists with binary values 0/1: @@ -317,12 +338,6 @@ def get_transitions_and_issues_of_each_type(rows: List[RealDictRow], all_issues_ transitions = [] n_sess_affected = 0 errors = {} - for issue in all_issues_with_context: - split = issue.split('__^__') - errors[issue] = { - "errors": [], - "issue_type": split[0], - "context": split[1]} for row in rows: t = 0 @@ -330,38 +345,26 @@ def get_transitions_and_issues_of_each_type(rows: List[RealDictRow], all_issues_ last_ts = row[f'stage{last_stage}_timestamp'] if first_ts is None: continue - elif first_ts is not None and last_ts is not None: + elif last_ts is not None: t = 1 transitions.append(t) ic_present = False - for issue_type_with_context in errors: + for error_id in all_issues: + if error_id not in errors: + errors[error_id] = [] ic = 0 - issue_type = errors[issue_type_with_context]["issue_type"] - context = errors[issue_type_with_context]["context"] - if row['issue_type'] is not None: + row_issue_id=row['issue_id'] + if row_issue_id is not None: if last_ts is None or (first_ts < row['issue_timestamp'] < last_ts): - context_in_row = row['issue_context'] if row['issue_context'] is not None else '' - if issue_type == row['issue_type'] and context == context_in_row: + if error_id == row_issue_id: ic = 1 ic_present = True - errors[issue_type_with_context]["errors"].append(ic) + errors[error_id].append(ic) if ic_present and t: n_sess_affected += 1 - # def tuple_or(t: tuple): - # x = 0 - # for el in t: - # x |= el - # return x - def tuple_or(t: tuple): - for el in t: - if el > 0: - return 1 - return 0 - - errors = {key: errors[key]["errors"] for key in errors} all_errors = [tuple_or(t) for t in zip(*errors.values())] return transitions, errors, all_errors, n_sess_affected @@ -377,10 +380,9 @@ def get_affected_users_for_all_issues(rows, first_stage, last_stage): """ affected_users = defaultdict(lambda: set()) affected_sessions = defaultdict(lambda: set()) - contexts = defaultdict(lambda: None) + all_issues = {} n_affected_users_dict = defaultdict(lambda: None) n_affected_sessions_dict = defaultdict(lambda: None) - all_issues_with_context = set() n_issues_dict = defaultdict(lambda: 0) issues_by_session = defaultdict(lambda: 0) @@ -396,15 +398,13 @@ def get_affected_users_for_all_issues(rows, first_stage, last_stage): # check that the issue exists and belongs to subfunnel: if iss is not None and (row[f'stage{last_stage}_timestamp'] is None or (row[f'stage{first_stage}_timestamp'] < iss_ts < row[f'stage{last_stage}_timestamp'])): - context_string = row['issue_context'] if row['issue_context'] is not None else '' - issue_with_context = iss + '__^__' + context_string - contexts[issue_with_context] = {"context": context_string, "id": row["issue_id"]} - all_issues_with_context.add(issue_with_context) - n_issues_dict[issue_with_context] += 1 + if row["issue_id"] not in all_issues: + all_issues[row["issue_id"]] = {"context": row['issue_context'], "issue_type": row["issue_type"]} + n_issues_dict[row["issue_id"]] += 1 if row['user_uuid'] is not None: - affected_users[issue_with_context].add(row['user_uuid']) + affected_users[row["issue_id"]].add(row['user_uuid']) - affected_sessions[issue_with_context].add(row['session_id']) + affected_sessions[row["issue_id"]].add(row['session_id']) issues_by_session[row[f'session_id']] += 1 if len(affected_users) > 0: @@ -415,29 +415,28 @@ def get_affected_users_for_all_issues(rows, first_stage, last_stage): n_affected_sessions_dict.update({ iss: len(affected_sessions[iss]) for iss in affected_sessions }) - return all_issues_with_context, n_issues_dict, n_affected_users_dict, n_affected_sessions_dict, contexts + return all_issues, n_issues_dict, n_affected_users_dict, n_affected_sessions_dict def count_sessions(rows, n_stages): session_counts = {i: set() for i in range(1, n_stages + 1)} - for ind, row in enumerate(rows): + for row in rows: for i in range(1, n_stages + 1): if row[f"stage{i}_timestamp"] is not None: session_counts[i].add(row[f"session_id"]) + session_counts = {i: len(session_counts[i]) for i in session_counts} return session_counts def count_users(rows, n_stages): - users_in_stages = defaultdict(lambda: set()) - - for ind, row in enumerate(rows): + users_in_stages = {i: set() for i in range(1, n_stages + 1)} + for row in rows: for i in range(1, n_stages + 1): if row[f"stage{i}_timestamp"] is not None: users_in_stages[i].add(row["user_uuid"]) users_count = {i: len(users_in_stages[i]) for i in range(1, n_stages + 1)} - return users_count @@ -490,18 +489,18 @@ def get_issues(stages, rows, first_stage=None, last_stage=None, drop_only=False) last_stage = n_stages n_critical_issues = 0 - issues_dict = dict({"significant": [], - "insignificant": []}) + issues_dict = {"significant": [], + "insignificant": []} session_counts = count_sessions(rows, n_stages) drop = session_counts[first_stage] - session_counts[last_stage] - all_issues_with_context, n_issues_dict, affected_users_dict, affected_sessions, contexts = get_affected_users_for_all_issues( + all_issues, n_issues_dict, affected_users_dict, affected_sessions = get_affected_users_for_all_issues( rows, first_stage, last_stage) transitions, errors, all_errors, n_sess_affected = get_transitions_and_issues_of_each_type(rows, - all_issues_with_context, + all_issues, first_stage, last_stage) - # print("len(transitions) =", len(transitions)) + del rows if any(all_errors): total_drop_corr, conf, is_sign = pearson_corr(transitions, all_errors) @@ -514,33 +513,32 @@ def get_issues(stages, rows, first_stage=None, last_stage=None, drop_only=False) if drop_only: return total_drop_due_to_issues - for issue in all_issues_with_context: + for issue_id in all_issues: - if not any(errors[issue]): + if not any(errors[issue_id]): continue - r, confidence, is_sign = pearson_corr(transitions, errors[issue]) + r, confidence, is_sign = pearson_corr(transitions, errors[issue_id]) if r is not None and drop is not None and is_sign: - lost_conversions = int(r * affected_sessions[issue]) + lost_conversions = int(r * affected_sessions[issue_id]) else: lost_conversions = None if r is None: r = 0 - split = issue.split('__^__') issues_dict['significant' if is_sign else 'insignificant'].append({ - "type": split[0], - "title": helper.get_issue_title(split[0]), - "affected_sessions": affected_sessions[issue], - "unaffected_sessions": session_counts[1] - affected_sessions[issue], + "type": all_issues[issue_id]["issue_type"], + "title": helper.get_issue_title(all_issues[issue_id]["issue_type"]), + "affected_sessions": affected_sessions[issue_id], + "unaffected_sessions": session_counts[1] - affected_sessions[issue_id], "lost_conversions": lost_conversions, - "affected_users": affected_users_dict[issue], + "affected_users": affected_users_dict[issue_id], "conversion_impact": round(r * 100), - "context_string": contexts[issue]["context"], - "issue_id": contexts[issue]["id"] + "context_string": all_issues[issue_id]["context"], + "issue_id": issue_id }) if is_sign: - n_critical_issues += n_issues_dict[issue] + n_critical_issues += n_issues_dict[issue_id] return n_critical_issues, issues_dict, total_drop_due_to_issues diff --git a/ee/api/chalicelib/utils/s3_extra.py b/ee/api/chalicelib/utils/s3_extra.py index f2a538dcc..0e594c890 100644 --- a/ee/api/chalicelib/utils/s3_extra.py +++ b/ee/api/chalicelib/utils/s3_extra.py @@ -1,12 +1,16 @@ from decouple import config -from chalicelib.utils.s3 import client +from chalicelib.utils import s3 -def tag_file(session_id, tag_key='retention', tag_value='vault'): - return client.put_object_tagging( - Bucket=config("sessions_bucket"), - Key=session_id, +def tag_session(file_key, tag_key='retention', tag_value='vault'): + return tag_file(file_key=file_key, bucket=config("sessions_bucket"), tag_key=tag_key, tag_value=tag_value) + + +def tag_file(file_key, bucket, tag_key, tag_value): + return s3.client.put_object_tagging( + Bucket=bucket, + Key=file_key, Tagging={ 'TagSet': [ { diff --git a/ee/api/env.default b/ee/api/env.default index e707bec57..f5574a8a1 100644 --- a/ee/api/env.default +++ b/ee/api/env.default @@ -61,11 +61,11 @@ EXP_ALERTS=false EXP_FUNNELS=false EXP_RESOURCES=true TRACE_PERIOD=300 -EFS_SESSION_MOB_PATTERN=%(sessionId)s/dom.mob -EFS_DEVTOOLS_MOB_PATTERN=%(sessionId)s/devtools.mob +EFS_SESSION_MOB_PATTERN=%(sessionId)s +EFS_DEVTOOLS_MOB_PATTERN=%(sessionId)sdevtools SESSION_MOB_PATTERN_S=%(sessionId)s/dom.mobs SESSION_MOB_PATTERN_E=%(sessionId)s/dom.mobe -DEVTOOLS_MOB_PATTERN=%(sessionId)s/devtools.mobs +DEVTOOLS_MOB_PATTERN=%(sessionId)s/devtools.mob PRESIGNED_URL_EXPIRATION=3600 ASSIST_JWT_EXPIRATION=144000 ASSIST_JWT_SECRET= \ No newline at end of file diff --git a/ee/api/requirements-alerts.txt b/ee/api/requirements-alerts.txt index fce0ba6cc..02042a778 100644 --- a/ee/api/requirements-alerts.txt +++ b/ee/api/requirements-alerts.txt @@ -1,18 +1,18 @@ requests==2.28.1 urllib3==1.26.12 -boto3==1.26.4 +boto3==1.26.14 pyjwt==2.6.0 psycopg2-binary==2.9.5 -elasticsearch==8.5.0 +elasticsearch==8.5.1 jira==3.4.1 -fastapi==0.86.0 -uvicorn[standard]==0.19.0 +fastapi==0.87.0 +uvicorn[standard]==0.20.0 python-decouple==3.6 pydantic[email]==1.10.2 -apscheduler==3.9.1 +apscheduler==3.9.1.post1 clickhouse-driver==0.2.4 python-multipart==0.0.5 \ No newline at end of file diff --git a/ee/api/requirements-crons.txt b/ee/api/requirements-crons.txt index fce0ba6cc..02042a778 100644 --- a/ee/api/requirements-crons.txt +++ b/ee/api/requirements-crons.txt @@ -1,18 +1,18 @@ requests==2.28.1 urllib3==1.26.12 -boto3==1.26.4 +boto3==1.26.14 pyjwt==2.6.0 psycopg2-binary==2.9.5 -elasticsearch==8.5.0 +elasticsearch==8.5.1 jira==3.4.1 -fastapi==0.86.0 -uvicorn[standard]==0.19.0 +fastapi==0.87.0 +uvicorn[standard]==0.20.0 python-decouple==3.6 pydantic[email]==1.10.2 -apscheduler==3.9.1 +apscheduler==3.9.1.post1 clickhouse-driver==0.2.4 python-multipart==0.0.5 \ No newline at end of file diff --git a/ee/api/requirements.txt b/ee/api/requirements.txt index 23fc32fe7..ac4f27a9d 100644 --- a/ee/api/requirements.txt +++ b/ee/api/requirements.txt @@ -1,18 +1,18 @@ requests==2.28.1 urllib3==1.26.12 -boto3==1.26.4 +boto3==1.26.14 pyjwt==2.6.0 psycopg2-binary==2.9.5 -elasticsearch==8.5.0 +elasticsearch==8.5.1 jira==3.4.1 -fastapi==0.86.0 -uvicorn[standard]==0.19.0 +fastapi==0.87.0 +uvicorn[standard]==0.20.0 python-decouple==3.6 pydantic[email]==1.10.2 -apscheduler==3.9.1 +apscheduler==3.9.1.post1 clickhouse-driver==0.2.4 python3-saml==1.14.0 diff --git a/ee/backend/pkg/kafka/consumer.go b/ee/backend/pkg/kafka/consumer.go index b951fcd9c..14f8d5a68 100644 --- a/ee/backend/pkg/kafka/consumer.go +++ b/ee/backend/pkg/kafka/consumer.go @@ -47,6 +47,16 @@ func NewConsumer( kafkaConfig.SetKey("ssl.key.location", os.Getenv("KAFKA_SSL_KEY")) kafkaConfig.SetKey("ssl.certificate.location", os.Getenv("KAFKA_SSL_CERT")) } + + // Apply Kerberos configuration + if env.Bool("KAFKA_USE_KERBEROS") { + kafkaConfig.SetKey("security.protocol", "sasl_plaintext") + kafkaConfig.SetKey("sasl.mechanisms", "GSSAPI") + kafkaConfig.SetKey("sasl.kerberos.service.name", os.Getenv("KERBEROS_SERVICE_NAME")) + kafkaConfig.SetKey("sasl.kerberos.principal", os.Getenv("KERBEROS_PRINCIPAL")) + kafkaConfig.SetKey("sasl.kerberos.keytab", os.Getenv("KERBEROS_KEYTAB_LOCATION")) + } + c, err := kafka.NewConsumer(kafkaConfig) if err != nil { log.Fatalln(err) diff --git a/ee/backend/pkg/kafka/producer.go b/ee/backend/pkg/kafka/producer.go index 6fb893b7a..f895241a7 100644 --- a/ee/backend/pkg/kafka/producer.go +++ b/ee/backend/pkg/kafka/producer.go @@ -30,6 +30,15 @@ func NewProducer(messageSizeLimit int, useBatch bool) *Producer { kafkaConfig.SetKey("ssl.key.location", os.Getenv("KAFKA_SSL_KEY")) kafkaConfig.SetKey("ssl.certificate.location", os.Getenv("KAFKA_SSL_CERT")) } + // Apply Kerberos configuration + if env.Bool("KAFKA_USE_KERBEROS") { + kafkaConfig.SetKey("security.protocol", "sasl_plaintext") + kafkaConfig.SetKey("sasl.mechanisms", "GSSAPI") + kafkaConfig.SetKey("sasl.kerberos.service.name", os.Getenv("KERBEROS_SERVICE_NAME")) + kafkaConfig.SetKey("sasl.kerberos.principal", os.Getenv("KERBEROS_PRINCIPAL")) + kafkaConfig.SetKey("sasl.kerberos.keytab", os.Getenv("KERBEROS_KEYTAB_LOCATION")) + } + producer, err := kafka.NewProducer(kafkaConfig) if err != nil { log.Fatalln(err) diff --git a/ee/connectors/deploy/requirements_snowflake.txt b/ee/connectors/deploy/requirements_snowflake.txt index 983a313d6..895326b32 100644 --- a/ee/connectors/deploy/requirements_snowflake.txt +++ b/ee/connectors/deploy/requirements_snowflake.txt @@ -1,8 +1,8 @@ pandas==1.5.1 kafka-python==2.0.2 SQLAlchemy==1.4.43 -snowflake-connector-python==2.8.1 -snowflake-sqlalchemy==1.4.3 +snowflake-connector-python==2.8.2 +snowflake-sqlalchemy==1.4.4 PyYAML asn1crypto==1.5.1 azure-common==1.1.28 diff --git a/ee/scripts/schema/db/init_dbs/postgresql/1.9.0/1.9.0.sql b/ee/scripts/schema/db/init_dbs/postgresql/1.9.0/1.9.0.sql index 65db23b07..6da0eebed 100644 --- a/ee/scripts/schema/db/init_dbs/postgresql/1.9.0/1.9.0.sql +++ b/ee/scripts/schema/db/init_dbs/postgresql/1.9.0/1.9.0.sql @@ -70,4 +70,11 @@ WHERE deleted_at IS NOT NULL; UPDATE roles SET permissions=array_remove(permissions, 'ERRORS'); +DROP INDEX IF EXISTS events_common.requests_url_idx; +DROP INDEX IF EXISTS events_common.requests_url_gin_idx; +DROP INDEX IF EXISTS events_common.requests_url_gin_idx2; + +DROP INDEX IF EXISTS events.resources_url_gin_idx; +DROP INDEX IF EXISTS events.resources_url_idx; + COMMIT; \ No newline at end of file diff --git a/ee/scripts/schema/db/init_dbs/postgresql/init_schema.sql b/ee/scripts/schema/db/init_dbs/postgresql/init_schema.sql index f486c731e..78026e245 100644 --- a/ee/scripts/schema/db/init_dbs/postgresql/init_schema.sql +++ b/ee/scripts/schema/db/init_dbs/postgresql/init_schema.sql @@ -1221,19 +1221,9 @@ $$ query text NULL, PRIMARY KEY (session_id, timestamp, seq_index) ); - CREATE INDEX IF NOT EXISTS requests_url_idx ON events_common.requests (url); + CREATE INDEX IF NOT EXISTS requests_duration_idx ON events_common.requests (duration); - CREATE INDEX IF NOT EXISTS requests_url_gin_idx ON events_common.requests USING GIN (url gin_trgm_ops); CREATE INDEX IF NOT EXISTS requests_timestamp_idx ON events_common.requests (timestamp); - CREATE INDEX IF NOT EXISTS requests_url_gin_idx2 ON events_common.requests USING GIN (RIGHT(url, - length(url) - - (CASE - WHEN url LIKE 'http://%' - THEN 7 - WHEN url LIKE 'https://%' - THEN 8 - ELSE 0 END)) - gin_trgm_ops); CREATE INDEX IF NOT EXISTS requests_timestamp_session_id_failed_idx ON events_common.requests (timestamp, session_id) WHERE success = FALSE; CREATE INDEX IF NOT EXISTS requests_request_body_nn_gin_idx ON events_common.requests USING GIN (request_body gin_trgm_ops) WHERE request_body IS NOT NULL; CREATE INDEX IF NOT EXISTS requests_response_body_nn_gin_idx ON events_common.requests USING GIN (response_body gin_trgm_ops) WHERE response_body IS NOT NULL; diff --git a/frontend/app/components/Dashboard/components/Alerts/AlertListItem.tsx b/frontend/app/components/Dashboard/components/Alerts/AlertListItem.tsx index aaecc0b14..3e8a68f11 100644 --- a/frontend/app/components/Dashboard/components/Alerts/AlertListItem.tsx +++ b/frontend/app/components/Dashboard/components/Alerts/AlertListItem.tsx @@ -2,6 +2,7 @@ import React from 'react'; import { Icon } from 'UI'; import { checkForRecent } from 'App/date'; import { withSiteId, alertEdit } from 'App/routes'; +import { numberWithCommas } from 'App/utils'; // @ts-ignore import { DateTime } from 'luxon'; import { withRouter, RouteComponentProps } from 'react-router-dom'; @@ -108,7 +109,7 @@ function AlertListItem(props: Props) { {' is '} {alert.query.operator} - {alert.query.right} {alert.metric.unit} + {numberWithCommas(alert.query.right)} {alert.metric.unit} {' over the past '} {getThreshold(alert.currentPeriod)} diff --git a/frontend/app/components/Dashboard/components/Alerts/NewAlert.tsx b/frontend/app/components/Dashboard/components/Alerts/NewAlert.tsx index 0ea012e71..6027646f7 100644 --- a/frontend/app/components/Dashboard/components/Alerts/NewAlert.tsx +++ b/frontend/app/components/Dashboard/components/Alerts/NewAlert.tsx @@ -122,6 +122,9 @@ const NewAlert = (props: IProps) => { ) { remove(instance.alertId).then(() => { props.history.push(withSiteId(alerts(), siteId)); + toast.success('Alert deleted'); + }).catch(() => { + toast.error('Failed to delete an alert'); }); } }; @@ -135,6 +138,8 @@ const NewAlert = (props: IProps) => { } else { toast.success('Alert updated'); } + }).catch(() => { + toast.error('Failed to create an alert'); }); }; diff --git a/frontend/app/components/Dashboard/components/DashboardList/DashboardsView.tsx b/frontend/app/components/Dashboard/components/DashboardList/DashboardsView.tsx index 5341c3487..7378e88f8 100644 --- a/frontend/app/components/Dashboard/components/DashboardList/DashboardsView.tsx +++ b/frontend/app/components/Dashboard/components/DashboardList/DashboardsView.tsx @@ -35,7 +35,7 @@ function DashboardsView({ history, siteId }: { history: any, siteId: string }) {