* Created two services: recommendation training and recommendation serving * Deleted Docker temporary * Added features based in signals information * Added method to get sessions features using PG * Added same utils and core elements into ml_trainer * Added checks before training models, added handler for model serving * Updated serving API and recommendation functions to use frontend signals features * reorganized modules to have base image and for both serving and training * Added Dockerfiles and base Dockerfile * Solved issue while ordering sessions by relevance * Added method to save user feedback of recommendations * Added security authorization * Updated Dockerfile * fixed issues with secret insertion to API * Updated feedback structure * Added git for dags * Solved issue of insertion on recommendation feedback * Changed update method from def to async def and it is called during startup * Solved issues of airflow running mlflow in dag * Changes sanity checks and added middleware params * base path renaming * Changed update method to a interval method which loads one model each 10s if there are models to download * Added sql files for recommendation service and trainer * Cleaned files and added documentation for methods and classes * Added README file * Renamed endpoints, changed None into empty array and updated readme * refactor(recommendation): optimized query * style(recommendation): changed import to top file, renamed endpoints parameters, function optimization * refactor(recommendation): .gitignore * refactor(recommendation): .gitignore * refactor(recommendation): Optimized Dockerfiles * refactor(recommendation): changed imports * refactor(recommendation): optimized requests * refactor(recommendation): optimized requests * Fixed boot for fastapi, updated some queries * Fixed issues while downloading models and while returning json response from API * limited number of recommendations and set a minimum score to present recommendations * fix(recommendation): fixed some queries and updated prediction method * Added env value to control number of predictions to make * docs(recommendation): Added third party libraries used in recommendation service * frozen requirements * Update base_crons.py added `misfire_grace_time` to recommendation crons --------- Co-authored-by: Taha Yassine Kraiem <tahayk2@gmail.com>
128 lines
4.1 KiB
Python
128 lines
4.1 KiB
Python
import mlflow.pyfunc
|
|
import random
|
|
import numpy as np
|
|
from sklearn import metrics
|
|
from sklearn.svm import SVC
|
|
from sklearn.feature_selection import SequentialFeatureSelector as sfs
|
|
from sklearn.preprocessing import normalize
|
|
from sklearn.decomposition import PCA
|
|
from sklearn.neighbors import KNeighborsClassifier as knc
|
|
|
|
|
|
def select_features(X, y):
|
|
"""
|
|
Dimensional reduction of X using k-nearest neighbors and sequential feature selector.
|
|
Final dimension set to three features.
|
|
Params:
|
|
X: Array which will be reduced in dimension (batch_size, n_features).
|
|
y: Array of labels (batch_size,).
|
|
Output: function that reduces dimension of array.
|
|
"""
|
|
knn = knc(n_neighbors=3)
|
|
selector = sfs(knn, n_features_to_select=3)
|
|
X_transformed = selector.fit_transform(X, y)
|
|
|
|
def transform(input):
|
|
return selector.transform(input)
|
|
return transform, X_transformed
|
|
|
|
|
|
def sort_database(X, y):
|
|
"""
|
|
Random shuffle of training values with its respective labels.
|
|
Params:
|
|
X: Array of features.
|
|
y: Array of labels.
|
|
Output: Tuple (X_rand_sorted, y_rand_sorted).
|
|
"""
|
|
sort_list = list(range(len(y)))
|
|
random.shuffle(sort_list)
|
|
return X[sort_list], y[sort_list]
|
|
|
|
|
|
def preprocess(X):
|
|
"""
|
|
Preprocessing of features (no dimensional reduction) using principal component analysis.
|
|
Params:
|
|
X: Array of features.
|
|
Output: Tuple (processed array of features function that reduces dimension of array).
|
|
"""
|
|
_, n = X.shape
|
|
pca = PCA(n_components=n)
|
|
x = pca.fit_transform(normalize(X))
|
|
|
|
def transform(input):
|
|
return pca.transform(normalize(input))
|
|
|
|
return x, transform
|
|
|
|
|
|
class SVM_recommendation(mlflow.pyfunc.PythonModel):
|
|
|
|
def __init__(self, test=False, **params):
|
|
f"""{SVC.__doc__}"""
|
|
params['probability'] = True
|
|
self.svm = SVC(**params)
|
|
self.transforms = []
|
|
self.score = 0
|
|
self.confusion_matrix = None
|
|
if test:
|
|
knn = knc(n_neighbors=3)
|
|
self.transform = [PCA(n_components=3), sfs(knn, n_features_to_select=2)]
|
|
|
|
def fit(self, X, y):
|
|
"""
|
|
Train preprocess function, feature selection and Support Vector Machine model
|
|
Params:
|
|
X: Array of features.
|
|
y: Array of labels.
|
|
"""
|
|
assert X.shape[0] == y.shape[0], 'X and y must have same length'
|
|
assert len(X.shape) == 2, 'X must be a two dimension vector'
|
|
X, t1 = preprocess(X)
|
|
t2, X = select_features(X, y)
|
|
self.transforms = [t1, t2]
|
|
self.svm.fit(X, y)
|
|
pred = self.svm.predict(X)
|
|
z = y + 2 * pred
|
|
n = len(z)
|
|
false_pos = np.count_nonzero(z == 1) / n
|
|
false_neg = np.count_nonzero(z == 2) / n
|
|
true_pos = np.count_nonzero(z == 3) / n
|
|
true_neg = 1 - false_neg - false_pos - true_pos
|
|
self.confusion_matrix = np.array([[true_neg, false_pos], [false_neg, true_pos]])
|
|
self.score = true_pos + true_neg
|
|
|
|
|
|
def predict(self, x):
|
|
"""
|
|
Transform and prediction of input features and sorting of each by probability
|
|
Params:
|
|
X: Array of features.
|
|
Output: prediction probability for True (1).
|
|
"""
|
|
for t in self.transforms:
|
|
x = t(x)
|
|
return self.svm.predict_proba(x)[:, 1]
|
|
|
|
def recommendation_order(self, x):
|
|
"""
|
|
Transform and prediction of input features and sorting of each by probability
|
|
Params:
|
|
X: Array of features.
|
|
Output: Tuple (sorted_features, predictions).
|
|
"""
|
|
for t in self.transforms:
|
|
x = t(x)
|
|
pred = self.svm.predict_proba(x)
|
|
return sorted(range(len(pred)), key=lambda k: pred[k][1], reverse=True), pred
|
|
|
|
def plots(self):
|
|
"""
|
|
Returns the plots in a dict format.
|
|
{
|
|
'confusion_matrix': confusion matrix figure,
|
|
}
|
|
"""
|
|
display = metrics.ConfusionMatrixDisplay(confusion_matrix=self.confusion_matrix, display_labels=[False, True])
|
|
return {'confusion_matrix': display.plot().figure_}
|