API for signals in chalice ee, added folder for recommendation service

This commit is contained in:
MauricioGarciaS 2022-11-17 16:34:45 +01:00
parent 994ee425f0
commit d085b3583d
8 changed files with 203 additions and 1 deletions

View file

@ -0,0 +1,16 @@
import json
import schemas_ee
from chalicelib.utils import helper
from chalicelib.utils import pg_client
def handle_frontend_signals(project_id: int, user_id: str, data: schemas_ee.SignalsSchema):
res = {'errors': 'query not executed'}
insights_query = """INSERT INTO public.frontend_signals VALUES ({project_id}, {user_id}, {timestamp}, {action}, {source}, {category}, {data})"""
with pg_client.PostgresClient() as conn:
query = conn.mogrify(insights_query, {project_id=project_id, user_id=user_id, timestamp=data['timestamp'], source=data['source'],
category=data['category'], data=json.dumps(data['data'])})
conn.execute(query)
res = helper.dict_to_camel_case(conn.fetchone())
return res

View file

@ -8,7 +8,7 @@ import schemas
import schemas_ee
from chalicelib.core import sessions, assist, heatmaps, sessions_favorite, sessions_assignments, errors, errors_viewed, \
errors_favorite, sessions_notes
from chalicelib.core import sessions_viewed
from chalicelib.core import sessions_viewed, signals
from chalicelib.core import tenants, users, projects, license
from chalicelib.core import webhook
from chalicelib.core.collaboration_slack import Slack
@ -435,3 +435,13 @@ def get_all_notes(projectId: int, data: schemas.SearchNoteSchema = Body(...),
if "errors" in data:
return data
return {'data': data}
@app.post('/{projectId}/signals', tags=['signals'])
def send_interactions(data: schemas_ee.SignalsSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
data = signals.handle_frontend_signals(project_id=projectId, user_id=context.user_id, data=data)
if "errors" in data:
return data
return {'data': data}

View file

@ -31,6 +31,14 @@ class RolePayloadSchema(BaseModel):
alias_generator = schemas.attribute_to_camel_case
class SignalsSchema(BaseModel):
timestamp: int = Field(...)
action: str = Field(...)
source: str = Field(...)
category: str = Field(...)
data: dict = Field(default={})
class CreateMemberSchema(schemas.CreateMemberSchema):
roleId: Optional[int] = Field(None)

View file

@ -0,0 +1,24 @@
FROM python:3.10
WORKDIR service
COPY tmp/ch_client.py utils/
COPY tmp/pg_client.py utils/
COPY recommendation.py .
COPY requirements.txt .
RUN apt update
RUN apt-get install libxml2-dev libxmlsec1-dev libxmlsec1-openssl -y
RUN pip install --upgrade pip
RUN pip install -r requirements.txt
RUN touch utils/__init__.py
ENV pg_host=postgresql.db.svc.cluster.local \
pg_port=5432 \
pg_user='postgres' \
pg_password='8ea158c722fab5976023' \
pg_dbname='postgres' \
PG_TIMEOUT=30 \
ch_host=clickhouse-openreplay-clickhouse.db.svc.cluster.local \
ch_port=9000 \
ch_timeout=30 \
ch_receive_timeout=40
EXPOSE 8080
RUN airflow db init
CMD airflow webserver --host 0.0.0.0 --port 8080

7
ee/recommendation/api.py Normal file
View file

@ -0,0 +1,7 @@
from fastapi import FastAPI
app = FastAPI()
@app.get('/')
def home():
return '<h1>This is a title</h1>'

View file

@ -0,0 +1,109 @@
from utils.ch_client import ClickHouseClient
from utils.pg_client import PostgresClient
def get_features_clickhouse(**kwargs):
if 'limit' in kwargs:
limit = kwargs['limit']
else:
limit = 500
query = f"""SELECT session_id, project_id, user_id, events_count, errors_count, duration, country, issue_score, device_type, rage, jsexception, badrequest FROM (
SELECT session_id, project_id, user_id, events_count, errors_count, duration, toInt8(user_country) as country, issue_score, toInt8(user_device_type) as device_type FROM experimental.sessions WHERE user_id IS NOT NULL) as T1
INNER JOIN (SELECT session_id, project_id, sum(issue_type = 'click_rage') as rage, sum(issue_type = 'js_exception') as jsexception, sum(issue_type = 'bad_request') as badrequest FROM experimental.events WHERE event_type = 'ISSUE' AND session_id > 0 GROUP BY session_id, project_id LIMIT {limit}) as T2
ON T1.session_id = T2.session_id AND T1.project_id = T2.project_id;"""
with ClickHouseClient() as conn:
res = conn.execute(query)
return res
def query_funnels(*kwargs):
# If public.funnel is empty
funnels_query = f"""SELECT project_id, user_id, filter FROM (SELECT project_id, user_id, metric_id FROM public.metrics WHERE metric_type='funnel'
) as T1 LEFT JOIN (SELECT filter, metric_id FROM public.metric_series) as T2 ON T1.metric_id = T2.metric_id"""
# Else
# funnels_query = "SELECT project_id, user_id, filter FROM public.funnels"
with PostgresClient() as conn:
conn.execute(funnels_query)
res = conn.fetchall()
return res
def query_metrics(*kwargs):
metrics_query = """SELECT metric_type, metric_of, metric_value, metric_format FROM public.metrics"""
with PostgresClient() as conn:
conn.execute(metrics_query)
res = conn.fetchall()
return res
def query_with_filters(*kwargs):
filters_query = """SELECT T1.metric_id as metric_id, project_id, name, metric_type, metric_of, filter FROM (
SELECT metric_id, project_id, name, metric_type, metric_of FROM metric_series WHERE filter != '{}') as T1 INNER JOIN
(SELECT metric_id, filter FROM metrics) as T2 ON T1.metric_id = T2.metric_id"""
with PostgresClient() as conn:
conn.execute(filters_query)
res = conn.fetchall()
return res
def transform_funnel(project_id, user_id, data):
res = list()
for k in range(len(data)):
_tmp = data[k]
if _tmp['project_id'] != project_id or _tmp['user_id'] != user_id:
continue
else:
_tmp = _tmp['filter']['events']
res.append(_tmp)
return res
def transform_with_filter(data, *kwargs):
res = list()
for k in range(len(data)):
_tmp = data[k]
jump = False
for _key in kwargs.keys():
if data[_key] != kwargs[_key]:
jump = True
break
if jump:
continue
_type = data['metric_type']
if _type == 'funnel':
res.append(['funnel', _tmp['filter']['events']])
elif _type == 'timeseries':
res.append(['timeseries', _tmp['filter']['filters'], _tmp['filter']['events']])
elif _type == 'table':
res.append(['table', _tmp['metric_of'], _tmp['filter']['events']])
return res
def transform_data():
pass
def transform(element):
key_ = element.pop('user_id')
secondary_key_ = element.pop('session_id')
context_ = element.pop('project_id')
features_ = element
del element
return {(key_, context_): {secondary_key_: list(features_.values())}}
def get_by_project(data, project_id):
head_ = [list(d.keys())[0][1] for d in data]
index_ = [k for k in range(len(head_)) if head_[k] == project_id]
return [data[k] for k in index_]
def get_by_user(data, user_id):
head_ = [list(d.keys())[0][0] for d in data]
index_ = [k for k in range(len(head_)) if head_[k] == user_id]
return [data[k] for k in index_]
if __name__ == '__main__':
data = get_features_clickhouse()
print('Data length:', len(data))

View file

@ -0,0 +1,22 @@
requests==2.28.1
urllib3==1.26.12
pyjwt==2.5.0
psycopg2-binary==2.9.3
numpy==1.23.4
threadpoolctl==3.1.0
joblib==1.2.0
scipy==1.9.3
scikit-learn==1.1.3
apache-airflow==2.4.3
fastapi==0.85.0
uvicorn[standard]==0.18.3
python-decouple==3.6
pydantic[email]==1.10.2
apscheduler==3.9.1
clickhouse-driver==0.2.4
python3-saml==1.14.0
python-multipart==0.0.5

6
ee/recommendation/run.sh Normal file
View file

@ -0,0 +1,6 @@
mkdir tmp
cp ../api/chalicelib/utils/ch_client.py tmp
cp ../../api/chalicelib/utils/pg_client.py tmp
docker build -t my_test .
rm tmp/*.py
rmdir tmp