feat(assist): extra cache for each session
This commit is contained in:
parent
9e070e4981
commit
eb2968435a
4 changed files with 516 additions and 2 deletions
4
ee/assist/.gitignore
vendored
4
ee/assist/.gitignore
vendored
|
|
@ -15,7 +15,7 @@ servers/sourcemaps-server.js
|
|||
/utils/HeapSnapshot.js
|
||||
/utils/helper.js
|
||||
/utils/assistHelper.js
|
||||
/utils/httpHandlers.js
|
||||
/utils/socketHandlers.js
|
||||
#/utils/httpHandlers.js
|
||||
#/utils/socketHandlers.js
|
||||
.local
|
||||
*.mmdb
|
||||
223
ee/assist/utils/httpHandlers.js
Normal file
223
ee/assist/utils/httpHandlers.js
Normal file
|
|
@ -0,0 +1,223 @@
|
|||
const {
|
||||
hasFilters,
|
||||
hasQuery,
|
||||
isValidSession,
|
||||
sortPaginate,
|
||||
getValidAttributes,
|
||||
uniqueAutocomplete
|
||||
} = require("./helper");
|
||||
const {
|
||||
extractProjectKeyFromRequest,
|
||||
extractSessionIdFromRequest,
|
||||
extractPayloadFromRequest,
|
||||
} = require("./extractors");
|
||||
const {
|
||||
RecordRequestDuration,
|
||||
IncreaseTotalRequests
|
||||
} = require('../utils/metrics');
|
||||
const {fetchSockets, getSessionFromCache} = require("./wsServer");
|
||||
const {IDENTITIES} = require("./assistHelper");
|
||||
const {logger} = require('./logger');
|
||||
|
||||
const respond = function (req, res, data) {
|
||||
logger.debug("responding with data: ", JSON.stringify(data))
|
||||
let result = {data}
|
||||
if (process.env.uws !== "true") {
|
||||
res.statusCode = 200;
|
||||
res.setHeader('Content-Type', 'application/json');
|
||||
res.end(JSON.stringify(result));
|
||||
} else {
|
||||
if (!res.aborted) {
|
||||
res.cork(() => {
|
||||
res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result));
|
||||
});
|
||||
} else {
|
||||
logger.debug("response aborted");
|
||||
return;
|
||||
}
|
||||
}
|
||||
const duration = performance.now() - req.startTs;
|
||||
IncreaseTotalRequests();
|
||||
RecordRequestDuration(req.method.toLowerCase(), res.handlerName, 200, duration/1000.0);
|
||||
}
|
||||
|
||||
const getParticularSession = async function (roomId, filters, all=false) {
|
||||
// let connected_sockets = await fetchSockets(roomId, all);
|
||||
// if (connected_sockets.length === 0) {
|
||||
// return null;
|
||||
// }
|
||||
// let sessInfo;
|
||||
// for (let item of connected_sockets) {
|
||||
// if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) {
|
||||
// sessInfo = item.handshake.query.sessionInfo;
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
let sessInfo = await getSessionFromCache();
|
||||
if (!sessInfo) {
|
||||
return null;
|
||||
}
|
||||
if (!hasFilters(filters)) {
|
||||
return sessInfo;
|
||||
}
|
||||
const result = isValidSession(sessInfo, filters.filter)
|
||||
if (result.matched) {
|
||||
return sessInfo;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
const getAllSessions = async function (projectKey, filters, counters, onlineOnly= false) {
|
||||
const sessions = [];
|
||||
const connected_sockets = await fetchSockets();
|
||||
if (connected_sockets.length === 0) {
|
||||
return sessions;
|
||||
}
|
||||
|
||||
const rooms = new Map();
|
||||
for (let item of connected_sockets) {
|
||||
// Prefilter checks
|
||||
if (rooms.has(item.handshake.query.roomId)) {
|
||||
continue;
|
||||
}
|
||||
if (item.handshake.query.projectKey !== projectKey || !item.handshake.query.sessionInfo) {
|
||||
continue;
|
||||
}
|
||||
if (onlineOnly && item.handshake.query.identity !== IDENTITIES.session) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Mark this room as visited
|
||||
rooms.set(item.handshake.query.roomId, true);
|
||||
|
||||
// Add session to the list without filtering
|
||||
if (!hasFilters(filters)) {
|
||||
sessions.push(item.handshake.query.sessionInfo);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Add session to the list if it passes the filter
|
||||
const result = isValidSession(item.handshake.query.sessionInfo, filters.filter)
|
||||
if (result.matched) {
|
||||
sessions.push(item.handshake.query.sessionInfo);
|
||||
// Add filter name/value to counter
|
||||
for (const [filterName, filterValue] of Object.entries(result.filters)) {
|
||||
if (counters[filterName] === undefined) {
|
||||
counters[filterName] = {};
|
||||
}
|
||||
if (counters[filterName][filterValue] === undefined) {
|
||||
counters[filterName][filterValue] = 0;
|
||||
}
|
||||
counters[filterName][filterValue] += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return sessions
|
||||
}
|
||||
|
||||
// Sort by projectKey
|
||||
const socketsListByProject = async function (req, res) {
|
||||
logger.debug("[WS]looking for available sessions");
|
||||
res.handlerName = 'socketsListByProject';
|
||||
|
||||
const _projectKey = extractProjectKeyFromRequest(req);
|
||||
const _sessionId = extractSessionIdFromRequest(req);
|
||||
const filters = await extractPayloadFromRequest(req, res);
|
||||
|
||||
// find a particular session
|
||||
if (_sessionId) {
|
||||
const sessInfo = await getParticularSession(_sessionId, filters);//(`${_projectKey}-${_sessionId}`, filters);
|
||||
return respond(req, res, sessInfo);
|
||||
}
|
||||
|
||||
// find all sessions for a project
|
||||
const counters = {};
|
||||
const sessions = await getAllSessions(_projectKey, filters, counters);
|
||||
|
||||
// send response
|
||||
respond(req, res, sortPaginate(sessions, filters, counters));
|
||||
}
|
||||
|
||||
// Sort by projectKey
|
||||
const socketsLiveByProject = async function (req, res) {
|
||||
logger.debug("[WS]looking for available LIVE sessions");
|
||||
res.handlerName = 'socketsLiveByProject';
|
||||
|
||||
const _projectKey = extractProjectKeyFromRequest(req);
|
||||
const _sessionId = extractSessionIdFromRequest(req);
|
||||
const filters = await extractPayloadFromRequest(req, res);
|
||||
|
||||
// find a particular session
|
||||
if (_sessionId) {
|
||||
let sessInfo = await getParticularSession(_sessionId, filters);//(`${_projectKey}-${_sessionId}`, filters);
|
||||
return respond(req, res, sessInfo);
|
||||
}
|
||||
|
||||
// find all sessions for a project
|
||||
const counters = {};
|
||||
const sessions = await getAllSessions(_projectKey, filters, counters, true);
|
||||
|
||||
// send response
|
||||
respond(req, res, sortPaginate(sessions, filters, counters));
|
||||
}
|
||||
|
||||
// Sort by roomID (projectKey+sessionId)
|
||||
const socketsLiveBySession = async function (req, res) {
|
||||
logger.debug("[WS]looking for LIVE session");
|
||||
res.handlerName = 'socketsLiveBySession';
|
||||
|
||||
const _projectKey = extractProjectKeyFromRequest(req);
|
||||
const _sessionId = extractSessionIdFromRequest(req);
|
||||
const filters = await extractPayloadFromRequest(req, res);
|
||||
|
||||
// find a particular session
|
||||
if (_sessionId) {
|
||||
let sessInfo = await getParticularSession(_sessionId, filters);//(`${_projectKey}-${_sessionId}`, filters, true);
|
||||
return respond(req, res, sessInfo);
|
||||
}
|
||||
return respond(req, res, null);
|
||||
}
|
||||
|
||||
// Sort by projectKey
|
||||
const autocomplete = async function (req, res) {
|
||||
logger.debug("[WS]autocomplete");
|
||||
res.handlerName = 'autocomplete';
|
||||
|
||||
const _projectKey = extractProjectKeyFromRequest(req);
|
||||
const filters = await extractPayloadFromRequest(req);
|
||||
let results = [];
|
||||
if (!hasQuery(filters)) {
|
||||
return respond(req, res, results);
|
||||
}
|
||||
|
||||
let connected_sockets = await fetchSockets();
|
||||
if (connected_sockets.length === 0) {
|
||||
return results;
|
||||
}
|
||||
|
||||
const rooms = new Map();
|
||||
for (let item of connected_sockets) {
|
||||
if (rooms.has(item.handshake.query.roomId)) {
|
||||
continue;
|
||||
}
|
||||
if (item.handshake.query.sessionInfo) {
|
||||
if ((item.handshake.query.projectKey !== _projectKey) || (item.handshake.query.identity !== IDENTITIES.session)) {
|
||||
continue;
|
||||
}
|
||||
// Mark this room as visited
|
||||
rooms.set(item.handshake.query.roomId, true);
|
||||
results.push(...getValidAttributes(item.handshake.query.sessionInfo, filters.query))
|
||||
}
|
||||
}
|
||||
|
||||
respond(req, res, uniqueAutocomplete(results));
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
respond,
|
||||
socketsListByProject,
|
||||
socketsLiveByProject,
|
||||
socketsLiveBySession,
|
||||
autocomplete
|
||||
}
|
||||
247
ee/assist/utils/socketHandlers.js
Normal file
247
ee/assist/utils/socketHandlers.js
Normal file
|
|
@ -0,0 +1,247 @@
|
|||
const {
|
||||
extractPeerId,
|
||||
} = require("./helper");
|
||||
const {
|
||||
IDENTITIES,
|
||||
EVENTS_DEFINITION,
|
||||
extractSessionInfo,
|
||||
errorHandler
|
||||
} = require("./assistHelper");
|
||||
const {
|
||||
startAssist,
|
||||
endAssist,
|
||||
handleEvent
|
||||
} = require("./stats");
|
||||
const {
|
||||
sendTo,
|
||||
sendFrom,
|
||||
fetchSockets,
|
||||
addSessionToCache,
|
||||
getSessionFromCache,
|
||||
removeSessionFromCache
|
||||
} = require('../utils/wsServer');
|
||||
const {
|
||||
IncreaseTotalWSConnections,
|
||||
IncreaseOnlineConnections,
|
||||
DecreaseOnlineConnections,
|
||||
IncreaseTotalRooms,
|
||||
IncreaseOnlineRooms,
|
||||
DecreaseOnlineRooms,
|
||||
} = require('../utils/metrics');
|
||||
const {logger} = require('./logger');
|
||||
const deepMerge = require('@fastify/deepmerge')({all: true});
|
||||
|
||||
const findSessionSocketId = async (roomId, tabId) => {
|
||||
let pickFirstSession = tabId === undefined;
|
||||
const connected_sockets = await fetchSockets(roomId);
|
||||
for (let socket of connected_sockets) {
|
||||
if (socket.handshake.query.identity === IDENTITIES.session) {
|
||||
if (pickFirstSession) {
|
||||
return socket.id;
|
||||
} else if (socket.handshake.query.tabId === tabId) {
|
||||
return socket.id;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
};
|
||||
|
||||
async function getRoomData(roomID) {
|
||||
let tabsCount = 0, agentsCount = 0, tabIDs = [], agentIDs = [];
|
||||
const connected_sockets = await fetchSockets(roomID);
|
||||
if (connected_sockets.length > 0) {
|
||||
for (let socket of connected_sockets) {
|
||||
if (socket.handshake.query.identity === IDENTITIES.session) {
|
||||
tabsCount++;
|
||||
tabIDs.push(socket.handshake.query.tabId);
|
||||
} else {
|
||||
agentsCount++;
|
||||
agentIDs.push(socket.id);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tabsCount = -1;
|
||||
agentsCount = -1;
|
||||
}
|
||||
return {tabsCount, agentsCount, tabIDs, agentIDs};
|
||||
}
|
||||
|
||||
function processNewSocket(socket) {
|
||||
socket._connectedAt = new Date();
|
||||
let {projectKey: connProjectKey, sessionId: connSessionId, tabId: connTabId} = extractPeerId(socket.handshake.query.peerId);
|
||||
socket.handshake.query.roomId = `${connProjectKey}-${connSessionId}`;
|
||||
socket.handshake.query.projectKey = connProjectKey;
|
||||
socket.handshake.query.sessId = connSessionId;
|
||||
socket.handshake.query.tabId = connTabId;
|
||||
logger.debug(`connProjectKey:${connProjectKey}, connSessionId:${connSessionId}, connTabId:${connTabId}, roomId:${socket.handshake.query.roomId}`);
|
||||
}
|
||||
|
||||
async function onConnect(socket) {
|
||||
logger.debug(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`);
|
||||
processNewSocket(socket);
|
||||
IncreaseTotalWSConnections(socket.handshake.query.identity);
|
||||
IncreaseOnlineConnections(socket.handshake.query.identity);
|
||||
|
||||
const {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(socket.handshake.query.roomId);
|
||||
|
||||
if (socket.handshake.query.identity === IDENTITIES.session) {
|
||||
// Check if session with the same tabID already connected, if so, refuse new connexion
|
||||
if (tabsCount > 0) {
|
||||
for (let tab of tabIDs) {
|
||||
if (tab === socket.handshake.query.tabId) {
|
||||
logger.debug(`session already connected, refusing new connexion, peerId: ${socket.handshake.query.peerId}`);
|
||||
sendTo(socket.id, EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED);
|
||||
return socket.disconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
extractSessionInfo(socket);
|
||||
if (tabsCount < 0) {
|
||||
// New session creates new room
|
||||
IncreaseTotalRooms();
|
||||
IncreaseOnlineRooms();
|
||||
}
|
||||
// Inform all connected agents about reconnected session
|
||||
if (agentsCount > 0) {
|
||||
logger.debug(`notifying new session about agent-existence`);
|
||||
sendTo(socket.id, EVENTS_DEFINITION.emit.AGENTS_CONNECTED, agentIDs);
|
||||
sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id);
|
||||
}
|
||||
} else if (tabsCount <= 0) {
|
||||
logger.debug(`notifying new agent about no SESSIONS with peerId:${socket.handshake.query.peerId}`);
|
||||
sendTo(socket.id, EVENTS_DEFINITION.emit.NO_SESSIONS);
|
||||
}
|
||||
await socket.join(socket.handshake.query.roomId);
|
||||
|
||||
// Add session to cache
|
||||
if (socket.handshake.query.identity === IDENTITIES.session) {
|
||||
await addSessionToCache(socket.handshake.query.sessId, socket.handshake.query.sessionInfo);
|
||||
}
|
||||
|
||||
logger.debug(`${socket.id} joined room:${socket.handshake.query.roomId}, as:${socket.handshake.query.identity}, connections:${agentsCount + tabsCount + 1}`)
|
||||
|
||||
if (socket.handshake.query.identity === IDENTITIES.agent) {
|
||||
if (socket.handshake.query.agentInfo !== undefined) {
|
||||
socket.handshake.query.agentInfo = JSON.parse(socket.handshake.query.agentInfo);
|
||||
socket.handshake.query.agentID = socket.handshake.query.agentInfo.id;
|
||||
// Stats
|
||||
startAssist(socket, socket.handshake.query.agentID);
|
||||
}
|
||||
sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo);
|
||||
}
|
||||
|
||||
// Set disconnect handler
|
||||
socket.on('disconnect', () => onDisconnect(socket));
|
||||
|
||||
// Handle update event
|
||||
socket.on(EVENTS_DEFINITION.listen.UPDATE_EVENT, (...args) => onUpdateEvent(socket, ...args));
|
||||
|
||||
// Handle webrtc events
|
||||
socket.on(EVENTS_DEFINITION.listen.WEBRTC_AGENT_CALL, (...args) => onWebrtcAgentHandler(socket, ...args));
|
||||
|
||||
// Handle errors
|
||||
socket.on(EVENTS_DEFINITION.listen.ERROR, err => errorHandler(EVENTS_DEFINITION.listen.ERROR, err));
|
||||
socket.on(EVENTS_DEFINITION.listen.CONNECT_ERROR, err => errorHandler(EVENTS_DEFINITION.listen.CONNECT_ERROR, err));
|
||||
socket.on(EVENTS_DEFINITION.listen.CONNECT_FAILED, err => errorHandler(EVENTS_DEFINITION.listen.CONNECT_FAILED, err));
|
||||
|
||||
// Handle all other events
|
||||
socket.onAny((eventName, ...args) => onAny(socket, eventName, ...args));
|
||||
}
|
||||
|
||||
async function onDisconnect(socket) {
|
||||
DecreaseOnlineConnections(socket.handshake.query.identity);
|
||||
logger.debug(`${socket.id} disconnected from ${socket.handshake.query.roomId}`);
|
||||
|
||||
if (socket.handshake.query.identity === IDENTITIES.agent) {
|
||||
sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id);
|
||||
// Stats
|
||||
endAssist(socket, socket.handshake.query.agentID);
|
||||
}
|
||||
logger.debug("checking for number of connected agents and sessions");
|
||||
let {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(socket.handshake.query.roomId);
|
||||
|
||||
if (tabsCount <= 0) {
|
||||
await removeSessionFromCache(socket.handshake.query.sessId);
|
||||
}
|
||||
|
||||
if (tabsCount === -1 && agentsCount === -1) {
|
||||
DecreaseOnlineRooms();
|
||||
logger.debug(`room not found: ${socket.handshake.query.roomId}`);
|
||||
return;
|
||||
}
|
||||
if (tabsCount === 0) {
|
||||
logger.debug(`notifying everyone in ${socket.handshake.query.roomId} about no SESSIONS`);
|
||||
sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.NO_SESSIONS);
|
||||
}
|
||||
if (agentsCount === 0) {
|
||||
logger.debug(`notifying everyone in ${socket.handshake.query.roomId} about no AGENTS`);
|
||||
sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.NO_AGENTS);
|
||||
}
|
||||
}
|
||||
|
||||
async function onUpdateEvent(socket, ...args) {
|
||||
logger.debug(`${socket.id} sent update event.`);
|
||||
if (socket.handshake.query.identity !== IDENTITIES.session) {
|
||||
logger.debug('Ignoring update event.');
|
||||
return
|
||||
}
|
||||
|
||||
args[0] = updateSessionData(socket, args[0])
|
||||
socket.handshake.query.sessionInfo = deepMerge(socket.handshake.query.sessionInfo, args[0]?.data, {tabId: args[0]?.meta?.tabId});
|
||||
|
||||
// Update sessionInfo for all agents in the room
|
||||
const connected_sockets = await fetchSockets(socket.handshake.query.roomId);
|
||||
for (let item of connected_sockets) {
|
||||
if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) {
|
||||
item.handshake.query.sessionInfo = deepMerge(item.handshake.query.sessionInfo, args[0]?.data, {tabId: args[0]?.meta?.tabId});
|
||||
} else if (item.handshake.query.identity === IDENTITIES.agent) {
|
||||
sendFrom(socket, item.id, EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function onWebrtcAgentHandler(socket, ...args) {
|
||||
if (socket.handshake.query.identity === IDENTITIES.agent) {
|
||||
const agentIdToConnect = args[0]?.data?.toAgentId;
|
||||
logger.debug(`${socket.id} sent webrtc event to agent:${agentIdToConnect}`);
|
||||
if (agentIdToConnect && socket.handshake.sessionData.AGENTS_CONNECTED.includes(agentIdToConnect)) {
|
||||
sendFrom(socket, agentIdToConnect, EVENTS_DEFINITION.listen.WEBRTC_AGENT_CALL, args[0]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function onAny(socket, eventName, ...args) {
|
||||
if (Object.values(EVENTS_DEFINITION.listen).indexOf(eventName) >= 0) {
|
||||
logger.debug(`received event:${eventName}, should be handled by another listener, stopping onAny.`);
|
||||
return
|
||||
}
|
||||
args[0] = updateSessionData(socket, args[0])
|
||||
if (socket.handshake.query.identity === IDENTITIES.session) {
|
||||
logger.debug(`received event:${eventName}, from:${socket.handshake.query.identity}, sending message to room:${socket.handshake.query.roomId}`);
|
||||
sendFrom(socket, socket.handshake.query.roomId, eventName, args[0]);
|
||||
} else {
|
||||
// Stats
|
||||
handleEvent(eventName, socket, args[0]);
|
||||
logger.debug(`received event:${eventName}, from:${socket.handshake.query.identity}, sending message to session of room:${socket.handshake.query.roomId}`);
|
||||
let socketId = await findSessionSocketId(socket.handshake.query.roomId, args[0]?.meta?.tabId);
|
||||
if (socketId === null) {
|
||||
logger.debug(`session not found for:${socket.handshake.query.roomId}`);
|
||||
sendTo(socket.id, EVENTS_DEFINITION.emit.NO_SESSIONS);
|
||||
} else {
|
||||
logger.debug("message sent");
|
||||
sendTo(socket.id, eventName, socket.id, args[0]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Back compatibility (add top layer with meta information)
|
||||
function updateSessionData(socket, sessionData) {
|
||||
if (sessionData?.meta === undefined && socket.handshake.query.identity === IDENTITIES.session) {
|
||||
sessionData = {meta: {tabId: socket.handshake.query.tabId, version: 1}, data: sessionData};
|
||||
}
|
||||
return sessionData
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
onConnect,
|
||||
}
|
||||
|
|
@ -10,6 +10,47 @@ let inMemorySocketsCache = [];
|
|||
let lastCacheUpdateTime = 0;
|
||||
const CACHE_REFRESH_INTERVAL = parseInt(process.env.cacheRefreshInterval) || 5000;
|
||||
|
||||
let redisClient;
|
||||
if (useRedis) {
|
||||
const {createClient} = require("redis");
|
||||
const REDIS_URL = (process.env.REDIS_URL || "localhost:6379").replace(/((^\w+:|^)\/\/|^)/, 'redis://');
|
||||
redisClient = createClient({url: REDIS_URL});
|
||||
redisClient.on("error", (error) => logger.error(`Redis error : ${error}`));
|
||||
void redisClient.connect();
|
||||
}
|
||||
|
||||
const addSessionToCache = async function (sessionID, sessionData) {
|
||||
try {
|
||||
await redisClient.set(`active_sessions:${sessionID}`, JSON.stringify(sessionData), 'EX', 3600); // 60 minutes
|
||||
console.log(`Session ${sessionID} stored in Redis`);
|
||||
} catch (error) {
|
||||
console.log(error);
|
||||
}
|
||||
}
|
||||
|
||||
const getSessionFromCache = async function (sessionID) {
|
||||
try {
|
||||
const sessionData = await redisClient.get(`active_sessions:${sessionID}`);
|
||||
if (sessionData) {
|
||||
console.log(`Session ${sessionID} retrieved from Redis`);
|
||||
return JSON.parse(sessionData);
|
||||
}
|
||||
return null;
|
||||
} catch (error) {
|
||||
console.log(error);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
const removeSessionFromCache = async function (sessionID) {
|
||||
try {
|
||||
await redisClient.del(`active_sessions:${sessionID}`);
|
||||
console.log(`Session ${sessionID} removed from Redis`);
|
||||
} catch (error) {
|
||||
console.log(error);
|
||||
}
|
||||
}
|
||||
|
||||
const doFetchAllSockets = async function () {
|
||||
if (useRedis) {
|
||||
const now = Date.now();
|
||||
|
|
@ -119,4 +160,7 @@ module.exports = {
|
|||
sendTo,
|
||||
sendFrom,
|
||||
fetchSockets,
|
||||
addSessionToCache,
|
||||
getSessionFromCache,
|
||||
removeSessionFromCache,
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue