openreplay/ee/recommendation/core/recommendation_model.py
MauricioGarciaS cea5eda985
feat(recommendations): Added services recommendation (ml_service) and trainer (ml_trainer) (#1275)
* Created two services: recommendation training and recommendation serving

* Deleted Docker temporary

* Added features based in signals information

* Added method to get sessions features using PG

* Added same utils and core elements into ml_trainer

* Added checks before training models, added handler for model serving

* Updated serving API and recommendation functions to use frontend signals features

* reorganized modules to have base image and for both serving and training

* Added Dockerfiles and base Dockerfile

* Solved issue while ordering sessions by relevance

* Added method to save user feedback of recommendations

* Added security authorization

* Updated Dockerfile

* fixed issues with secret insertion to API

* Updated feedback structure

* Added git for dags

* Solved issue of insertion on recommendation feedback

* Changed update method from def to async def and it is called during startup

* Solved issues of airflow running mlflow in dag

* Changes sanity checks and added middleware params

* base path renaming

* Changed update method to a interval method which loads one model each 10s if there are models to download

* Added sql files for recommendation service and trainer

* Cleaned files and added documentation for methods and classes

* Added README file

* Renamed endpoints, changed None into empty array and updated readme

* refactor(recommendation): optimized query

* style(recommendation): changed import to top file, renamed endpoints parameters, function optimization

* refactor(recommendation): .gitignore

* refactor(recommendation): .gitignore

* refactor(recommendation): Optimized Dockerfiles

* refactor(recommendation): changed imports

* refactor(recommendation): optimized requests

* refactor(recommendation): optimized requests

* Fixed boot for fastapi, updated some queries

* Fixed issues while downloading models and while returning json response from API

* limited number of recommendations and set a minimum score to present recommendations

* fix(recommendation): fixed some queries and updated prediction method

* Added env value to control number of predictions to make

* docs(recommendation): Added third party libraries used in recommendation service

* frozen requirements

* Update base_crons.py

added `misfire_grace_time` to recommendation crons

---------

Co-authored-by: Taha Yassine Kraiem <tahayk2@gmail.com>
2023-06-07 15:58:33 +02:00

128 lines
4.1 KiB
Python

import mlflow.pyfunc
import random
import numpy as np
from sklearn import metrics
from sklearn.svm import SVC
from sklearn.feature_selection import SequentialFeatureSelector as sfs
from sklearn.preprocessing import normalize
from sklearn.decomposition import PCA
from sklearn.neighbors import KNeighborsClassifier as knc
def select_features(X, y):
"""
Dimensional reduction of X using k-nearest neighbors and sequential feature selector.
Final dimension set to three features.
Params:
X: Array which will be reduced in dimension (batch_size, n_features).
y: Array of labels (batch_size,).
Output: function that reduces dimension of array.
"""
knn = knc(n_neighbors=3)
selector = sfs(knn, n_features_to_select=3)
X_transformed = selector.fit_transform(X, y)
def transform(input):
return selector.transform(input)
return transform, X_transformed
def sort_database(X, y):
"""
Random shuffle of training values with its respective labels.
Params:
X: Array of features.
y: Array of labels.
Output: Tuple (X_rand_sorted, y_rand_sorted).
"""
sort_list = list(range(len(y)))
random.shuffle(sort_list)
return X[sort_list], y[sort_list]
def preprocess(X):
"""
Preprocessing of features (no dimensional reduction) using principal component analysis.
Params:
X: Array of features.
Output: Tuple (processed array of features function that reduces dimension of array).
"""
_, n = X.shape
pca = PCA(n_components=n)
x = pca.fit_transform(normalize(X))
def transform(input):
return pca.transform(normalize(input))
return x, transform
class SVM_recommendation(mlflow.pyfunc.PythonModel):
def __init__(self, test=False, **params):
f"""{SVC.__doc__}"""
params['probability'] = True
self.svm = SVC(**params)
self.transforms = []
self.score = 0
self.confusion_matrix = None
if test:
knn = knc(n_neighbors=3)
self.transform = [PCA(n_components=3), sfs(knn, n_features_to_select=2)]
def fit(self, X, y):
"""
Train preprocess function, feature selection and Support Vector Machine model
Params:
X: Array of features.
y: Array of labels.
"""
assert X.shape[0] == y.shape[0], 'X and y must have same length'
assert len(X.shape) == 2, 'X must be a two dimension vector'
X, t1 = preprocess(X)
t2, X = select_features(X, y)
self.transforms = [t1, t2]
self.svm.fit(X, y)
pred = self.svm.predict(X)
z = y + 2 * pred
n = len(z)
false_pos = np.count_nonzero(z == 1) / n
false_neg = np.count_nonzero(z == 2) / n
true_pos = np.count_nonzero(z == 3) / n
true_neg = 1 - false_neg - false_pos - true_pos
self.confusion_matrix = np.array([[true_neg, false_pos], [false_neg, true_pos]])
self.score = true_pos + true_neg
def predict(self, x):
"""
Transform and prediction of input features and sorting of each by probability
Params:
X: Array of features.
Output: prediction probability for True (1).
"""
for t in self.transforms:
x = t(x)
return self.svm.predict_proba(x)[:, 1]
def recommendation_order(self, x):
"""
Transform and prediction of input features and sorting of each by probability
Params:
X: Array of features.
Output: Tuple (sorted_features, predictions).
"""
for t in self.transforms:
x = t(x)
pred = self.svm.predict_proba(x)
return sorted(range(len(pred)), key=lambda k: pred[k][1], reverse=True), pred
def plots(self):
"""
Returns the plots in a dict format.
{
'confusion_matrix': confusion matrix figure,
}
"""
display = metrics.ConfusionMatrixDisplay(confusion_matrix=self.confusion_matrix, display_labels=[False, True])
return {'confusion_matrix': display.plot().figure_}