From d085b3583d38fc858971483a122509680b86fe59 Mon Sep 17 00:00:00 2001 From: MauricioGarciaS <47052044+MauricioGarciaS@users.noreply.github.com> Date: Thu, 17 Nov 2022 16:34:45 +0100 Subject: [PATCH] API for signals in chalice ee, added folder for recommendation service --- ee/api/chalicelib/core/signals.py | 16 ++++ ee/api/routers/core_dynamic.py | 12 ++- ee/api/schemas_ee.py | 8 ++ ee/recommendation/Dockerfile | 24 ++++++ ee/recommendation/api.py | 7 ++ ee/recommendation/recommendation.py | 109 ++++++++++++++++++++++++++++ ee/recommendation/requirements.txt | 22 ++++++ ee/recommendation/run.sh | 6 ++ 8 files changed, 203 insertions(+), 1 deletion(-) create mode 100644 ee/api/chalicelib/core/signals.py create mode 100644 ee/recommendation/Dockerfile create mode 100644 ee/recommendation/api.py create mode 100644 ee/recommendation/recommendation.py create mode 100644 ee/recommendation/requirements.txt create mode 100644 ee/recommendation/run.sh diff --git a/ee/api/chalicelib/core/signals.py b/ee/api/chalicelib/core/signals.py new file mode 100644 index 000000000..1c8f76a62 --- /dev/null +++ b/ee/api/chalicelib/core/signals.py @@ -0,0 +1,16 @@ +import json + +import schemas_ee +from chalicelib.utils import helper +from chalicelib.utils import pg_client + + +def handle_frontend_signals(project_id: int, user_id: str, data: schemas_ee.SignalsSchema): + res = {'errors': 'query not executed'} + insights_query = """INSERT INTO public.frontend_signals VALUES ({project_id}, {user_id}, {timestamp}, {action}, {source}, {category}, {data})""" + with pg_client.PostgresClient() as conn: + query = conn.mogrify(insights_query, {project_id=project_id, user_id=user_id, timestamp=data['timestamp'], source=data['source'], + category=data['category'], data=json.dumps(data['data'])}) + conn.execute(query) + res = helper.dict_to_camel_case(conn.fetchone()) + return res diff --git a/ee/api/routers/core_dynamic.py b/ee/api/routers/core_dynamic.py index fb24aec96..a1e9734c4 100644 --- a/ee/api/routers/core_dynamic.py +++ b/ee/api/routers/core_dynamic.py @@ -8,7 +8,7 @@ import schemas import schemas_ee from chalicelib.core import sessions, assist, heatmaps, sessions_favorite, sessions_assignments, errors, errors_viewed, \ errors_favorite, sessions_notes -from chalicelib.core import sessions_viewed +from chalicelib.core import sessions_viewed, signals from chalicelib.core import tenants, users, projects, license from chalicelib.core import webhook from chalicelib.core.collaboration_slack import Slack @@ -435,3 +435,13 @@ def get_all_notes(projectId: int, data: schemas.SearchNoteSchema = Body(...), if "errors" in data: return data return {'data': data} + + +@app.post('/{projectId}/signals', tags=['signals']) +def send_interactions(data: schemas_ee.SignalsSchema = Body(...), + context: schemas.CurrentContext = Depends(OR_context)): + data = signals.handle_frontend_signals(project_id=projectId, user_id=context.user_id, data=data) + + if "errors" in data: + return data + return {'data': data} diff --git a/ee/api/schemas_ee.py b/ee/api/schemas_ee.py index 9690eb334..1fcc14352 100644 --- a/ee/api/schemas_ee.py +++ b/ee/api/schemas_ee.py @@ -31,6 +31,14 @@ class RolePayloadSchema(BaseModel): alias_generator = schemas.attribute_to_camel_case +class SignalsSchema(BaseModel): + timestamp: int = Field(...) + action: str = Field(...) + source: str = Field(...) + category: str = Field(...) + data: dict = Field(default={}) + + class CreateMemberSchema(schemas.CreateMemberSchema): roleId: Optional[int] = Field(None) diff --git a/ee/recommendation/Dockerfile b/ee/recommendation/Dockerfile new file mode 100644 index 000000000..72407160b --- /dev/null +++ b/ee/recommendation/Dockerfile @@ -0,0 +1,24 @@ +FROM python:3.10 +WORKDIR service +COPY tmp/ch_client.py utils/ +COPY tmp/pg_client.py utils/ +COPY recommendation.py . +COPY requirements.txt . +RUN apt update +RUN apt-get install libxml2-dev libxmlsec1-dev libxmlsec1-openssl -y +RUN pip install --upgrade pip +RUN pip install -r requirements.txt +RUN touch utils/__init__.py +ENV pg_host=postgresql.db.svc.cluster.local \ + pg_port=5432 \ + pg_user='postgres' \ + pg_password='8ea158c722fab5976023' \ + pg_dbname='postgres' \ + PG_TIMEOUT=30 \ + ch_host=clickhouse-openreplay-clickhouse.db.svc.cluster.local \ + ch_port=9000 \ + ch_timeout=30 \ + ch_receive_timeout=40 +EXPOSE 8080 +RUN airflow db init +CMD airflow webserver --host 0.0.0.0 --port 8080 diff --git a/ee/recommendation/api.py b/ee/recommendation/api.py new file mode 100644 index 000000000..596e8d0b0 --- /dev/null +++ b/ee/recommendation/api.py @@ -0,0 +1,7 @@ +from fastapi import FastAPI + +app = FastAPI() + +@app.get('/') +def home(): + return '

This is a title

' diff --git a/ee/recommendation/recommendation.py b/ee/recommendation/recommendation.py new file mode 100644 index 000000000..f525bdc98 --- /dev/null +++ b/ee/recommendation/recommendation.py @@ -0,0 +1,109 @@ +from utils.ch_client import ClickHouseClient +from utils.pg_client import PostgresClient + +def get_features_clickhouse(**kwargs): + if 'limit' in kwargs: + limit = kwargs['limit'] + else: + limit = 500 + query = f"""SELECT session_id, project_id, user_id, events_count, errors_count, duration, country, issue_score, device_type, rage, jsexception, badrequest FROM ( + SELECT session_id, project_id, user_id, events_count, errors_count, duration, toInt8(user_country) as country, issue_score, toInt8(user_device_type) as device_type FROM experimental.sessions WHERE user_id IS NOT NULL) as T1 +INNER JOIN (SELECT session_id, project_id, sum(issue_type = 'click_rage') as rage, sum(issue_type = 'js_exception') as jsexception, sum(issue_type = 'bad_request') as badrequest FROM experimental.events WHERE event_type = 'ISSUE' AND session_id > 0 GROUP BY session_id, project_id LIMIT {limit}) as T2 +ON T1.session_id = T2.session_id AND T1.project_id = T2.project_id;""" + with ClickHouseClient() as conn: + res = conn.execute(query) + return res + + +def query_funnels(*kwargs): + # If public.funnel is empty + funnels_query = f"""SELECT project_id, user_id, filter FROM (SELECT project_id, user_id, metric_id FROM public.metrics WHERE metric_type='funnel' + ) as T1 LEFT JOIN (SELECT filter, metric_id FROM public.metric_series) as T2 ON T1.metric_id = T2.metric_id""" + # Else + # funnels_query = "SELECT project_id, user_id, filter FROM public.funnels" + + with PostgresClient() as conn: + conn.execute(funnels_query) + res = conn.fetchall() + return res + + +def query_metrics(*kwargs): + metrics_query = """SELECT metric_type, metric_of, metric_value, metric_format FROM public.metrics""" + with PostgresClient() as conn: + conn.execute(metrics_query) + res = conn.fetchall() + return res + + +def query_with_filters(*kwargs): + filters_query = """SELECT T1.metric_id as metric_id, project_id, name, metric_type, metric_of, filter FROM ( + SELECT metric_id, project_id, name, metric_type, metric_of FROM metric_series WHERE filter != '{}') as T1 INNER JOIN + (SELECT metric_id, filter FROM metrics) as T2 ON T1.metric_id = T2.metric_id""" + with PostgresClient() as conn: + conn.execute(filters_query) + res = conn.fetchall() + return res + + +def transform_funnel(project_id, user_id, data): + res = list() + for k in range(len(data)): + _tmp = data[k] + if _tmp['project_id'] != project_id or _tmp['user_id'] != user_id: + continue + else: + _tmp = _tmp['filter']['events'] + res.append(_tmp) + return res + + +def transform_with_filter(data, *kwargs): + res = list() + for k in range(len(data)): + _tmp = data[k] + jump = False + for _key in kwargs.keys(): + if data[_key] != kwargs[_key]: + jump = True + break + if jump: + continue + _type = data['metric_type'] + if _type == 'funnel': + res.append(['funnel', _tmp['filter']['events']]) + elif _type == 'timeseries': + res.append(['timeseries', _tmp['filter']['filters'], _tmp['filter']['events']]) + elif _type == 'table': + res.append(['table', _tmp['metric_of'], _tmp['filter']['events']]) + return res + + +def transform_data(): + pass + + +def transform(element): + key_ = element.pop('user_id') + secondary_key_ = element.pop('session_id') + context_ = element.pop('project_id') + features_ = element + del element + return {(key_, context_): {secondary_key_: list(features_.values())}} + + +def get_by_project(data, project_id): + head_ = [list(d.keys())[0][1] for d in data] + index_ = [k for k in range(len(head_)) if head_[k] == project_id] + return [data[k] for k in index_] + + +def get_by_user(data, user_id): + head_ = [list(d.keys())[0][0] for d in data] + index_ = [k for k in range(len(head_)) if head_[k] == user_id] + return [data[k] for k in index_] + + +if __name__ == '__main__': + data = get_features_clickhouse() + print('Data length:', len(data)) diff --git a/ee/recommendation/requirements.txt b/ee/recommendation/requirements.txt new file mode 100644 index 000000000..9d2a7872d --- /dev/null +++ b/ee/recommendation/requirements.txt @@ -0,0 +1,22 @@ +requests==2.28.1 +urllib3==1.26.12 +pyjwt==2.5.0 +psycopg2-binary==2.9.3 + +numpy==1.23.4 +threadpoolctl==3.1.0 +joblib==1.2.0 +scipy==1.9.3 +scikit-learn==1.1.3 + +apache-airflow==2.4.3 + +fastapi==0.85.0 +uvicorn[standard]==0.18.3 +python-decouple==3.6 +pydantic[email]==1.10.2 +apscheduler==3.9.1 + +clickhouse-driver==0.2.4 +python3-saml==1.14.0 +python-multipart==0.0.5 diff --git a/ee/recommendation/run.sh b/ee/recommendation/run.sh new file mode 100644 index 000000000..1754c4e99 --- /dev/null +++ b/ee/recommendation/run.sh @@ -0,0 +1,6 @@ +mkdir tmp +cp ../api/chalicelib/utils/ch_client.py tmp +cp ../../api/chalicelib/utils/pg_client.py tmp +docker build -t my_test . +rm tmp/*.py +rmdir tmp