openreplay/ee/recommendation/ml_service/api.py
MauricioGarciaS cea5eda985
feat(recommendations): Added services recommendation (ml_service) and trainer (ml_trainer) (#1275)
* Created two services: recommendation training and recommendation serving

* Deleted Docker temporary

* Added features based in signals information

* Added method to get sessions features using PG

* Added same utils and core elements into ml_trainer

* Added checks before training models, added handler for model serving

* Updated serving API and recommendation functions to use frontend signals features

* reorganized modules to have base image and for both serving and training

* Added Dockerfiles and base Dockerfile

* Solved issue while ordering sessions by relevance

* Added method to save user feedback of recommendations

* Added security authorization

* Updated Dockerfile

* fixed issues with secret insertion to API

* Updated feedback structure

* Added git for dags

* Solved issue of insertion on recommendation feedback

* Changed update method from def to async def and it is called during startup

* Solved issues of airflow running mlflow in dag

* Changes sanity checks and added middleware params

* base path renaming

* Changed update method to a interval method which loads one model each 10s if there are models to download

* Added sql files for recommendation service and trainer

* Cleaned files and added documentation for methods and classes

* Added README file

* Renamed endpoints, changed None into empty array and updated readme

* refactor(recommendation): optimized query

* style(recommendation): changed import to top file, renamed endpoints parameters, function optimization

* refactor(recommendation): .gitignore

* refactor(recommendation): .gitignore

* refactor(recommendation): Optimized Dockerfiles

* refactor(recommendation): changed imports

* refactor(recommendation): optimized requests

* refactor(recommendation): optimized requests

* Fixed boot for fastapi, updated some queries

* Fixed issues while downloading models and while returning json response from API

* limited number of recommendations and set a minimum score to present recommendations

* fix(recommendation): fixed some queries and updated prediction method

* Added env value to control number of predictions to make

* docs(recommendation): Added third party libraries used in recommendation service

* frozen requirements

* Update base_crons.py

added `misfire_grace_time` to recommendation crons

---------

Co-authored-by: Taha Yassine Kraiem <tahayk2@gmail.com>
2023-06-07 15:58:33 +02:00

83 lines
2.4 KiB
Python

from fastapi import FastAPI, Depends
from contextlib import asynccontextmanager
from utils import pg_client
from core.model_handler import recommendation_model
from utils.declarations import FeedbackRecommendation
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from crons.base_crons import cron_jobs
from auth.auth_key import api_key_auth
from core import feedback
from fastapi.middleware.cors import CORSMiddleware
@asynccontextmanager
async def lifespan(app: FastAPI):
await pg_client.init()
await feedback.init()
await recommendation_model.update()
app.schedule.start()
for job in cron_jobs:
app.schedule.add_job(id=job['func'].__name__, **job)
yield
app.schedule.shutdown(wait=False)
await feedback.terminate()
await pg_client.terminate()
app = FastAPI(lifespan=lifespan)
app.schedule = AsyncIOScheduler()
origins = [
"*"
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# @app.on_event('startup')
# async def startup():
# await pg_client.init()
# await feedback.init()
# await recommendation_model.update()
# app.schedule.start()
# for job in cron_jobs:
# app.schedule.add_job(id=job['func'].__name__, **job)
#
#
# @app.on_event('shutdown')
# async def shutdown():
# app.schedule.shutdown(wait=False)
# await feedback.terminate()
# await pg_client.terminate()
@app.get('/recommendations/{user_id}/{project_id}', dependencies=[Depends(api_key_auth)])
async def get_recommended_sessions(user_id: int, project_id: int):
recommendations = recommendation_model.get_recommendations(user_id, project_id)
return {'userId': user_id,
'projectId': project_id,
'recommendations': recommendations
}
@app.get('/recommendations/{projectId}/{viewerId}/{sessionId}', dependencies=[Depends(api_key_auth)])
async def already_gave_feedback(projectId: int, viewerId: int, sessionId: int):
return feedback.has_feedback((viewerId, sessionId, projectId))
@app.post('/recommendations/feedback', dependencies=[Depends(api_key_auth)])
async def get_feedback(data: FeedbackRecommendation):
try:
feedback.global_queue.put(tuple(data.dict().values()))
except Exception as e:
return {'error': e}
return {'success': 1}
@app.get('/')
async def health():
return {'status': 200}