openreplay/ee/recommendation/core/user_features.py
MauricioGarciaS cea5eda985
feat(recommendations): Added services recommendation (ml_service) and trainer (ml_trainer) (#1275)
* Created two services: recommendation training and recommendation serving

* Deleted Docker temporary

* Added features based in signals information

* Added method to get sessions features using PG

* Added same utils and core elements into ml_trainer

* Added checks before training models, added handler for model serving

* Updated serving API and recommendation functions to use frontend signals features

* reorganized modules to have base image and for both serving and training

* Added Dockerfiles and base Dockerfile

* Solved issue while ordering sessions by relevance

* Added method to save user feedback of recommendations

* Added security authorization

* Updated Dockerfile

* fixed issues with secret insertion to API

* Updated feedback structure

* Added git for dags

* Solved issue of insertion on recommendation feedback

* Changed update method from def to async def and it is called during startup

* Solved issues of airflow running mlflow in dag

* Changes sanity checks and added middleware params

* base path renaming

* Changed update method to a interval method which loads one model each 10s if there are models to download

* Added sql files for recommendation service and trainer

* Cleaned files and added documentation for methods and classes

* Added README file

* Renamed endpoints, changed None into empty array and updated readme

* refactor(recommendation): optimized query

* style(recommendation): changed import to top file, renamed endpoints parameters, function optimization

* refactor(recommendation): .gitignore

* refactor(recommendation): .gitignore

* refactor(recommendation): Optimized Dockerfiles

* refactor(recommendation): changed imports

* refactor(recommendation): optimized requests

* refactor(recommendation): optimized requests

* Fixed boot for fastapi, updated some queries

* Fixed issues while downloading models and while returning json response from API

* limited number of recommendations and set a minimum score to present recommendations

* fix(recommendation): fixed some queries and updated prediction method

* Added env value to control number of predictions to make

* docs(recommendation): Added third party libraries used in recommendation service

* frozen requirements

* Update base_crons.py

added `misfire_grace_time` to recommendation crons

---------

Co-authored-by: Taha Yassine Kraiem <tahayk2@gmail.com>
2023-06-07 15:58:33 +02:00

137 lines
6.3 KiB
Python

from utils.pg_client import PostgresClient
from decouple import config
from utils.df_utils import _process_pg_response
import numpy as np
def get_training_database(projectId, max_timestamp=None, favorites=False):
"""
Gets training database using projectId, max_timestamp [optional] and favorites (if true adds favorites)
Params:
projectId: project id of all sessions to be selected.
max_timestamp: max timestamp that a not seen session can have in order to be considered not interesting.
favorites: True to use favorite sessions as interesting sessions reference.
Output: Tuple (Set of features, set of labels, dict of indexes of each project_id, session_id, user_id in the set)
"""
args = {"projectId": projectId, "max_timestamp": max_timestamp, "limit": 20}
with PostgresClient() as conn:
x1 = signals_features(conn, **args)
if favorites:
x2 = user_favorite_sessions(args['projectId'], conn)
if max_timestamp is not None:
x3 = user_not_seen_sessions(args['projectId'], args['limit'], conn)
X_project_ids = dict()
X_users_ids = dict()
X_sessions_ids = dict()
_X = list()
_Y = list()
_process_pg_response(x1, _X, _Y, X_project_ids, X_users_ids, X_sessions_ids, label=None)
if favorites:
_process_pg_response(x2, _X, _Y, X_project_ids, X_users_ids, X_sessions_ids, label=1)
if max_timestamp:
_process_pg_response(x3, _X, _Y, X_project_ids, X_users_ids, X_sessions_ids, label=0)
return np.array(_X), np.array(_Y), \
{'project_id': X_project_ids,
'user_id': X_users_ids,
'session_id': X_sessions_ids}
def signals_features(conn, **kwargs):
"""
Selects features from frontend_signals table and mark as interesting given the following conditions:
* If number of events is greater than events_threshold (default=10). (env value)
* If session has been replayed more than once.
"""
assert 'projectId' in kwargs.keys(), 'projectId should be provided in kwargs'
projectId = kwargs['projectId']
events_threshold = config('events_threshold', default=10, cast=int)
query = conn.mogrify("""SELECT T.project_id,
T.session_id,
T.user_id,
T2.viewer_id,
T.events_count,
T.errors_count,
T.duration,
T.country,
T.issue_score,
T.device_type,
T2.interesting as train_label
FROM (SELECT project_id,
user_id as viewer_id,
session_id,
count(CASE WHEN source = 'replay' THEN 1 END) > 1 OR COUNT(1) > %(events_threshold)s as interesting
FROM frontend_signals
WHERE project_id = %(projectId)s
AND session_id is not null
GROUP BY project_id, viewer_id, session_id) as T2
INNER JOIN (SELECT project_id,
session_id,
user_id as viewer_id,
user_id,
events_count,
errors_count,
duration,
user_country as country,
issue_score,
user_device_type as device_type
FROM sessions
WHERE project_id = %(projectId)s
AND duration IS NOT NULL) as T
USING (session_id);""",
{"projectId": projectId, "events_threshold": events_threshold})
conn.execute(query)
res = conn.fetchall()
return res
def user_favorite_sessions(projectId, conn):
"""
Selects features from user_favorite_sessions table.
"""
query = """SELECT project_id,
session_id,
T1.user_id,
events_count,
errors_count,
duration,
user_country as country,
issue_score,
user_device_type as device_type,
T2.user_id AS viewer_id
FROM sessions AS T1
INNER JOIN user_favorite_sessions as T2
USING (session_id)
WHERE project_id = %(projectId)s;"""
conn.execute(
conn.mogrify(query, {"projectId": projectId})
)
res = conn.fetchall()
return res
def user_not_seen_sessions(projectId, limit, conn):
"""
Selects features from user_viewed_sessions table.
"""
# TODO: fetch un-viewed sessions alone, and the users list alone, then cross join them in python
# and ignore deleted users (WHERE users.deleted_at ISNULL)
query = """SELECT project_id, session_id, user_id, viewer_id, events_count, errors_count, duration, user_country as country, issue_score, user_device_type as device_type
FROM (
(SELECT sessions.*
FROM sessions LEFT JOIN user_viewed_sessions USING(session_id)
WHERE project_id = %(projectId)s
AND duration IS NOT NULL
AND user_viewed_sessions.session_id ISNULL
LIMIT %(limit)s) AS T1
LEFT JOIN
(SELECT user_id as viewer_id
FROM users
WHERE tenant_id = (SELECT tenant_id FROM projects WHERE project_id = %(projectId)s)) AS T2 ON true
)"""
conn.execute(
conn.mogrify(query, {"projectId": projectId, "limit": limit})
)
res = conn.fetchall()
return res