feat(api): optimized live session check

feat(assist): optimized live session check
feat(assist): sort
feat(assist): pagination
This commit is contained in:
Taha Yassine Kraiem 2022-06-15 15:05:41 +02:00
parent 0aa94bbc3c
commit ccf951f8e4
7 changed files with 127 additions and 69 deletions

View file

@ -80,9 +80,8 @@ def is_live(project_id, session_id, project_key=None):
if project_key is None:
project_key = projects.get_project_key(project_id)
try:
connected_peers = requests.post(config("assistList") % config("S3_KEY") + f"/{project_key}",
json={"filter": {"sessionId": session_id}},
timeout=config("assistTimeout", cast=int, default=5))
connected_peers = requests.get(config("assistList") % config("S3_KEY") + f"/{project_key}/{session_id}",
timeout=config("assistTimeout", cast=int, default=5))
if connected_peers.status_code != 200:
print("!! issue with the peer-server")
print(connected_peers.text)

View file

@ -77,11 +77,13 @@ if (process.env.uws !== "true") {
uapp.post(`${PREFIX}/${process.env.S3_KEY}/sockets-list`, uWrapper(socket.handlers.socketsList));
uapp.get(`${PREFIX}/${process.env.S3_KEY}/sockets-list/:projectKey`, uWrapper(socket.handlers.socketsListByProject));
uapp.post(`${PREFIX}/${process.env.S3_KEY}/sockets-list/:projectKey`, uWrapper(socket.handlers.socketsListByProject));
uapp.get(`${PREFIX}/${process.env.S3_KEY}/sockets-list/:projectKey/:sessionId`, uWrapper(socket.handlers.socketsListByProject));
uapp.get(`${PREFIX}/${process.env.S3_KEY}/sockets-live`, uWrapper(socket.handlers.socketsLive));
uapp.post(`${PREFIX}/${process.env.S3_KEY}/sockets-live`, uWrapper(socket.handlers.socketsLive));
uapp.get(`${PREFIX}/${process.env.S3_KEY}/sockets-live/:projectKey`, uWrapper(socket.handlers.socketsLiveByProject));
uapp.post(`${PREFIX}/${process.env.S3_KEY}/sockets-live/:projectKey`, uWrapper(socket.handlers.socketsLiveByProject));
uapp.get(`${PREFIX}/${process.env.S3_KEY}/sockets-live/:projectKey/:sessionId`, uWrapper(socket.handlers.socketsLiveByProject));
socket.start(uapp);

View file

@ -1,8 +1,12 @@
const _io = require('socket.io');
const express = require('express');
const uaParser = require('ua-parser-js');
const {extractPeerId, hasFilters, isValidSession} = require('../utils/helper');
const {extractFiltersFromRequest} = require('../utils/helper-ee');
const {extractPeerId, hasFilters, isValidSession, sortPaginate} = require('../utils/helper');
const {
extractProjectKeyFromRequest,
extractSessionIdFromRequest,
extractFiltersFromRequest
} = require('../utils/helper-ee');
const {geoip} = require('../utils/geoIP');
const {createAdapter} = require("@socket.io/redis-adapter");
const {createClient} = require("redis");
@ -60,20 +64,6 @@ const uniqueSessions = function (data) {
return resArr;
}
const extractProjectKeyFromRequest = function (req) {
if (process.env.uws === "true") {
if (req.getParameter(0)) {
debug && console.log(`[WS]where projectKey=${req.getParameter(0)}`);
return req.getParameter(0);
}
} else if (req.params.projectKey) {
debug && console.log(`[WS]where projectKey=${req.params.projectKey}`);
return req.params.projectKey;
}
return undefined;
}
const getAvailableRooms = async function () {
return io.of('/').adapter.allRooms();
}
@ -120,12 +110,13 @@ wsRouter.post(`/sockets-list`, socketsList);
const socketsListByProject = async function (req, res) {
debug && console.log("[WS]looking for available sessions");
let _projectKey = extractProjectKeyFromRequest(req);
let _sessionId = extractSessionIdFromRequest(req);
let filters = await extractFiltersFromRequest(req, res);
let liveSessions = {};
let rooms = await getAvailableRooms();
for (let peerId of rooms) {
let {projectKey, sessionId} = extractPeerId(peerId);
if (projectKey === _projectKey) {
if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) {
liveSessions[projectKey] = liveSessions[projectKey] || [];
if (hasFilters(filters)) {
const connected_sockets = await io.in(peerId).fetchSockets();
@ -144,6 +135,7 @@ const socketsListByProject = async function (req, res) {
}
wsRouter.get(`/sockets-list/:projectKey`, socketsListByProject);
wsRouter.post(`/sockets-list/:projectKey`, socketsListByProject);
wsRouter.get(`/sockets-list/:projectKey/:sessionId`, socketsListByProject);
const socketsLive = async function (req, res) {
debug && console.log("[WS]looking for all available LIVE sessions");
@ -169,7 +161,7 @@ const socketsLive = async function (req, res) {
liveSessions[projectKey] = uniqueSessions(liveSessions[projectKey]);
}
}
respond(res, liveSessions);
respond(res, sortPaginate(liveSessions, filters));
}
wsRouter.get(`/sockets-live`, socketsLive);
wsRouter.post(`/sockets-live`, socketsLive);
@ -177,12 +169,13 @@ wsRouter.post(`/sockets-live`, socketsLive);
const socketsLiveByProject = async function (req, res) {
debug && console.log("[WS]looking for available LIVE sessions");
let _projectKey = extractProjectKeyFromRequest(req);
let _sessionId = extractSessionIdFromRequest(req);
let filters = await extractFiltersFromRequest(req, res);
let liveSessions = {};
let rooms = await getAvailableRooms();
for (let peerId of rooms) {
let {projectKey} = extractPeerId(peerId);
if (projectKey === _projectKey) {
let {projectKey, sessionId} = extractPeerId(peerId);
if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) {
let connected_sockets = await io.in(peerId).fetchSockets();
for (let item of connected_sockets) {
if (item.handshake.query.identity === IDENTITIES.session) {
@ -199,10 +192,11 @@ const socketsLiveByProject = async function (req, res) {
liveSessions[projectKey] = uniqueSessions(liveSessions[projectKey] || []);
}
}
respond(res, liveSessions[_projectKey] || []);
respond(res, sortPaginate(liveSessions[_projectKey] || [], filters));
}
wsRouter.get(`/sockets-live/:projectKey`, socketsLiveByProject);
wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject);
wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveByProject);
const findSessionSocketId = async (io, peerId) => {
const connected_sockets = await io.in(peerId).fetchSockets();

View file

@ -1,8 +1,12 @@
const _io = require('socket.io');
const express = require('express');
const uaParser = require('ua-parser-js');
const {extractPeerId, hasFilters, isValidSession} = require('../utils/helper');
const {extractFiltersFromRequest} = require('../utils/helper-ee');
const {extractPeerId, hasFilters, isValidSession, sortPaginate} = require('../utils/helper');
const {
extractProjectKeyFromRequest,
extractSessionIdFromRequest,
extractFiltersFromRequest
} = require('../utils/helper-ee');
const {geoip} = require('../utils/geoIP');
const wsRouter = express.Router();
const UPDATE_EVENT = "UPDATE_SESSION";
@ -43,20 +47,6 @@ const createSocketIOServer = function (server, prefix) {
}
}
const extractProjectKeyFromRequest = function (req) {
if (process.env.uws === "true") {
if (req.getParameter(0)) {
debug && console.log(`[WS]where projectKey=${req.getParameter(0)}`);
return req.getParameter(0);
}
} else if (req.params.projectKey) {
debug && console.log(`[WS]where projectKey=${req.params.projectKey}`);
return req.params.projectKey;
}
return undefined;
}
const getAvailableRooms = async function () {
return io.sockets.adapter.rooms.keys();
}
@ -102,12 +92,13 @@ wsRouter.post(`/sockets-list`, socketsList);
const socketsListByProject = async function (req, res) {
debug && console.log("[WS]looking for available sessions");
let _projectKey = extractProjectKeyFromRequest(req);
let _sessionId = extractSessionIdFromRequest(req);
let filters = await extractFiltersFromRequest(req, res);
let liveSessions = {};
let rooms = await getAvailableRooms();
for (let peerId of rooms) {
let {projectKey, sessionId} = extractPeerId(peerId);
if (projectKey === _projectKey) {
if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) {
liveSessions[projectKey] = liveSessions[projectKey] || [];
if (hasFilters(filters)) {
const connected_sockets = await io.in(peerId).fetchSockets();
@ -122,10 +113,11 @@ const socketsListByProject = async function (req, res) {
}
}
}
respond(res, liveSessions[_projectKey] || []);
respond(res, sortPaginate(liveSessions[_projectKey] || [], filters));
}
wsRouter.get(`/sockets-list/:projectKey`, socketsListByProject);
wsRouter.post(`/sockets-list/:projectKey`, socketsListByProject);
wsRouter.get(`/sockets-list/:projectKey/:sessionId`, socketsListByProject);
const socketsLive = async function (req, res) {
debug && console.log("[WS]looking for all available LIVE sessions");
@ -150,7 +142,7 @@ const socketsLive = async function (req, res) {
}
}
}
respond(res, liveSessions);
respond(res, sortPaginate(liveSessions, filters));
}
wsRouter.get(`/sockets-live`, socketsLive);
wsRouter.post(`/sockets-live`, socketsLive);
@ -158,12 +150,13 @@ wsRouter.post(`/sockets-live`, socketsLive);
const socketsLiveByProject = async function (req, res) {
debug && console.log("[WS]looking for available LIVE sessions");
let _projectKey = extractProjectKeyFromRequest(req);
let _sessionId = extractSessionIdFromRequest(req);
let filters = await extractFiltersFromRequest(req, res);
let liveSessions = {};
let rooms = await getAvailableRooms();
for (let peerId of rooms) {
let {projectKey} = extractPeerId(peerId);
if (projectKey === _projectKey) {
let {projectKey, sessionId} = extractPeerId(peerId);
if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) {
let connected_sockets = await io.in(peerId).fetchSockets();
for (let item of connected_sockets) {
if (item.handshake.query.identity === IDENTITIES.session) {
@ -179,10 +172,11 @@ const socketsLiveByProject = async function (req, res) {
}
}
}
respond(res, liveSessions[_projectKey] || []);
respond(res, sortPaginate(liveSessions[_projectKey] || [], filters));
}
wsRouter.get(`/sockets-live/:projectKey`, socketsLiveByProject);
wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject);
wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveByProject);
const findSessionSocketId = async (io, peerId) => {
const connected_sockets = await io.in(peerId).fetchSockets();

View file

@ -28,6 +28,28 @@ const getBodyFromUWSResponse = async function (res) {
});
}));
}
const extractProjectKeyFromRequest = function (req) {
if (process.env.uws === "true") {
if (req.getParameter(0)) {
debug && console.log(`[WS]where projectKey=${req.getParameter(0)}`);
return req.getParameter(0);
}
} else {
return helper.extractProjectKeyFromRequest(req);
}
return undefined;
}
const extractSessionIdFromRequest = function (req) {
if (process.env.uws === "true") {
if (req.getParameter(1)) {
debug && console.log(`[WS]where projectKey=${req.getParameter(1)}`);
return req.getParameter(1);
}
} else {
return helper.extractSessionIdFromRequest(req);
}
return undefined;
}
const extractFiltersFromRequest = async function (req, res) {
let filters = {};
if (process.env.uws === "true") {
@ -45,5 +67,7 @@ const extractFiltersFromRequest = async function (req, res) {
return Object.keys(filters).length > 0 ? filters : undefined;
}
module.exports = {
extractProjectKeyFromRequest,
extractSessionIdFromRequest,
extractFiltersFromRequest
};

View file

@ -1,7 +1,15 @@
const _io = require('socket.io');
const express = require('express');
const uaParser = require('ua-parser-js');
const {extractPeerId, hasFilters, isValidSession, extractFiltersFromRequest} = require('../utils/helper');
const {
extractPeerId,
extractProjectKeyFromRequest,
extractSessionIdFromRequest,
hasFilters,
isValidSession,
extractPayloadFromRequest,
sortPaginate
} = require('../utils/helper');
const {geoip} = require('../utils/geoIP');
const wsRouter = express.Router();
const UPDATE_EVENT = "UPDATE_SESSION";
@ -28,14 +36,6 @@ const createSocketIOServer = function (server, prefix) {
});
}
const extractProjectKeyFromRequest = function (req) {
if (req.params.projectKey) {
debug && console.log(`[WS]where projectKey=${req.params.projectKey}`);
return req.params.projectKey;
}
return undefined;
}
const getAvailableRooms = async function () {
return io.sockets.adapter.rooms.keys();
@ -49,7 +49,7 @@ const respond = function (res, data) {
const socketsList = async function (req, res) {
debug && console.log("[WS]looking for all available sessions");
let filters = extractFiltersFromRequest(req);
let filters = extractPayloadFromRequest(req);
let liveSessions = {};
let rooms = await getAvailableRooms();
for (let peerId of rooms) {
@ -60,7 +60,7 @@ const socketsList = async function (req, res) {
const connected_sockets = await io.in(peerId).fetchSockets();
for (let item of connected_sockets) {
if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo
&& isValidSession(item.handshake.query.sessionInfo, filters)) {
&& isValidSession(item.handshake.query.sessionInfo, filters.filter)) {
liveSessions[projectKey].push(sessionId);
}
}
@ -77,12 +77,13 @@ wsRouter.post(`/sockets-list`, socketsList);
const socketsListByProject = async function (req, res) {
debug && console.log("[WS]looking for available sessions");
let _projectKey = extractProjectKeyFromRequest(req);
let filters = extractFiltersFromRequest(req);
let _sessionId = extractSessionIdFromRequest(req);
let filters = extractPayloadFromRequest(req);
let liveSessions = {};
let rooms = await getAvailableRooms();
for (let peerId of rooms) {
let {projectKey, sessionId} = extractPeerId(peerId);
if (projectKey === _projectKey) {
if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) {
liveSessions[projectKey] = liveSessions[projectKey] || [];
if (hasFilters(filters)) {
const connected_sockets = await io.in(peerId).fetchSockets();
@ -100,11 +101,12 @@ const socketsListByProject = async function (req, res) {
respond(res, liveSessions[_projectKey] || []);
}
wsRouter.get(`/sockets-list/:projectKey`, socketsListByProject);
wsRouter.get(`/sockets-list/:projectKey/:sessionId`, socketsListByProject);
wsRouter.post(`/sockets-list/:projectKey`, socketsListByProject);
const socketsLive = async function (req, res) {
debug && console.log("[WS]looking for all available LIVE sessions");
let filters = extractFiltersFromRequest(req);
let filters = extractPayloadFromRequest(req);
let liveSessions = {};
let rooms = await getAvailableRooms();
for (let peerId of rooms) {
@ -125,7 +127,7 @@ const socketsLive = async function (req, res) {
}
}
}
respond(res, liveSessions);
respond(res, sortPaginate(liveSessions, filters));
}
wsRouter.get(`/sockets-live`, socketsLive);
wsRouter.post(`/sockets-live`, socketsLive);
@ -133,7 +135,7 @@ wsRouter.post(`/sockets-live`, socketsLive);
const socketsLiveByProject = async function (req, res) {
debug && console.log("[WS]looking for available LIVE sessions");
let _projectKey = extractProjectKeyFromRequest(req);
let filters = extractFiltersFromRequest(req);
let filters = extractPayloadFromRequest(req);
let liveSessions = {};
let rooms = await getAvailableRooms();
for (let peerId of rooms) {
@ -154,7 +156,7 @@ const socketsLiveByProject = async function (req, res) {
}
}
}
respond(res, liveSessions[_projectKey] || []);
respond(res, sortPaginate(liveSessions[_projectKey] || [], filters));
}
wsRouter.get(`/sockets-live/:projectKey`, socketsLiveByProject);
wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject);

View file

@ -24,6 +24,20 @@ const request_logger = (identity) => {
next();
}
};
const extractProjectKeyFromRequest = function (req) {
if (req.params.projectKey) {
debug && console.log(`[WS]where projectKey=${req.params.projectKey}`);
return req.params.projectKey;
}
return undefined;
}
const extractSessionIdFromRequest = function (req) {
if (req.params.sessionId) {
debug && console.log(`[WS]where sessionId=${req.params.sessionId}`);
return req.params.sessionId;
}
return undefined;
}
const isValidSession = function (sessionInfo, filters) {
let foundAll = true;
for (const [key, values] of Object.entries(filters)) {
@ -49,7 +63,7 @@ const isValidSession = function (sessionInfo, filters) {
return foundAll;
}
const hasFilters = function (filters) {
return filters !== undefined && Object.keys(filters).length > 0;
return filters && filters.filter && Object.keys(filters.filter).length > 0;
}
const objectToObjectOfArrays = function (obj) {
let _obj = {}
@ -66,15 +80,44 @@ const objectToObjectOfArrays = function (obj) {
}
return _obj;
}
const extractFiltersFromRequest = function (req) {
let filters = {};
const extractPayloadFromRequest = function (req) {
let filters = {
"filter": {},
"sort": {"key": undefined, "order": false},
"pagination": {"limit": undefined, "page": undefined}
};
if (req.query.userId) {
debug && console.log(`[WS]where userId=${req.query.userId}`);
filters.userID = [req.query.userId];
filters.filter.userID = [req.query.userId];
}
filters = objectToObjectOfArrays({...filters, ...(req.body.filter || {})});
return Object.keys(filters).length > 0 ? filters : undefined;
return filters;
}
const sortPaginate = function (list, filters) {
list.sort((a, b) => {
let aV = (a[filters.sort.key] || a["timestamp"]);
let bV = (b[filters.sort.key] || b["timestamp"]);
return aV > bV ? 1 : aV < bV ? -1 : 0;
})
if (filters.sort.order) {
list.reverse();
}
if (filters.pagination.page && filters.pagination.limit) {
return list.slice((filters.pagination.page - 1) * filters.pagination.limit,
filters.pagination.page * filters.pagination.limit);
}
return list;
}
module.exports = {
extractPeerId, request_logger, isValidSession, hasFilters, objectToObjectOfArrays, extractFiltersFromRequest
extractPeerId,
request_logger,
extractProjectKeyFromRequest,
extractSessionIdFromRequest,
isValidSession,
hasFilters,
objectToObjectOfArrays,
extractPayloadFromRequest,
sortPaginate
};