feat(assist): support compression
feat(assist): centralized getRooms logic feat(DB): new issue type
This commit is contained in:
parent
665518c83f
commit
c2d23e8537
7 changed files with 158 additions and 182 deletions
|
|
@ -10,7 +10,8 @@ const {
|
|||
sortPaginate,
|
||||
getValidAttributes,
|
||||
uniqueAutocomplete,
|
||||
getAvailableRooms
|
||||
getAvailableRooms,
|
||||
getCompressionConfig
|
||||
} = require('../utils/helper');
|
||||
const {
|
||||
IDENTITIES,
|
||||
|
|
@ -32,7 +33,8 @@ const createSocketIOServer = function (server, prefix) {
|
|||
origin: "*",
|
||||
methods: ["GET", "POST", "PUT"]
|
||||
},
|
||||
path: (prefix ? prefix : '') + '/socket'
|
||||
path: (prefix ? prefix : '') + '/socket',
|
||||
...getCompressionConfig()
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -44,10 +46,10 @@ const respond = function (res, data) {
|
|||
|
||||
const socketsList = async function (req, res) {
|
||||
debug && console.log("[WS]looking for all available sessions");
|
||||
let filters = extractPayloadFromRequest(req);
|
||||
let filters = await extractPayloadFromRequest(req);
|
||||
let liveSessions = {};
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let peerId of rooms) {
|
||||
for (let peerId of rooms.keys()) {
|
||||
let {projectKey, sessionId} = extractPeerId(peerId);
|
||||
if (projectKey !== undefined) {
|
||||
liveSessions[projectKey] = liveSessions[projectKey] || [];
|
||||
|
|
@ -71,10 +73,10 @@ const socketsListByProject = async function (req, res) {
|
|||
debug && console.log("[WS]looking for available sessions");
|
||||
let _projectKey = extractProjectKeyFromRequest(req);
|
||||
let _sessionId = extractSessionIdFromRequest(req);
|
||||
let filters = extractPayloadFromRequest(req);
|
||||
let filters = await extractPayloadFromRequest(req);
|
||||
let liveSessions = {};
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let peerId of rooms) {
|
||||
for (let peerId of rooms.keys()) {
|
||||
let {projectKey, sessionId} = extractPeerId(peerId);
|
||||
if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) {
|
||||
liveSessions[projectKey] = liveSessions[projectKey] || [];
|
||||
|
|
@ -92,17 +94,17 @@ const socketsListByProject = async function (req, res) {
|
|||
}
|
||||
}
|
||||
liveSessions[_projectKey] = liveSessions[_projectKey] || [];
|
||||
respond(res, _sessionId === undefined ? liveSessions[_projectKey]
|
||||
respond(res, _sessionId === undefined ? sortPaginate(liveSessions[_projectKey], filters)
|
||||
: liveSessions[_projectKey].length > 0 ? liveSessions[_projectKey][0]
|
||||
: null);
|
||||
}
|
||||
|
||||
const socketsLive = async function (req, res) {
|
||||
debug && console.log("[WS]looking for all available LIVE sessions");
|
||||
let filters = extractPayloadFromRequest(req);
|
||||
let filters = await extractPayloadFromRequest(req);
|
||||
let liveSessions = {};
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let peerId of rooms) {
|
||||
for (let peerId of rooms.keys()) {
|
||||
let {projectKey} = extractPeerId(peerId);
|
||||
if (projectKey !== undefined) {
|
||||
let connected_sockets = await io.in(peerId).fetchSockets();
|
||||
|
|
@ -127,10 +129,10 @@ const socketsLiveByProject = async function (req, res) {
|
|||
debug && console.log("[WS]looking for available LIVE sessions");
|
||||
let _projectKey = extractProjectKeyFromRequest(req);
|
||||
let _sessionId = extractSessionIdFromRequest(req);
|
||||
let filters = extractPayloadFromRequest(req);
|
||||
let filters = await extractPayloadFromRequest(req);
|
||||
let liveSessions = {};
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let peerId of rooms) {
|
||||
for (let peerId of rooms.keys()) {
|
||||
let {projectKey, sessionId} = extractPeerId(peerId);
|
||||
if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) {
|
||||
let connected_sockets = await io.in(peerId).fetchSockets();
|
||||
|
|
@ -157,11 +159,11 @@ const socketsLiveByProject = async function (req, res) {
|
|||
const autocomplete = async function (req, res) {
|
||||
debug && console.log("[WS]autocomplete");
|
||||
let _projectKey = extractProjectKeyFromRequest(req);
|
||||
let filters = extractPayloadFromRequest(req);
|
||||
let filters = await extractPayloadFromRequest(req);
|
||||
let results = [];
|
||||
if (filters.query && Object.keys(filters.query).length > 0) {
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let peerId of rooms) {
|
||||
for (let peerId of rooms.keys()) {
|
||||
let {projectKey} = extractPeerId(peerId);
|
||||
if (projectKey === _projectKey) {
|
||||
let connected_sockets = await io.in(peerId).fetchSockets();
|
||||
|
|
@ -176,7 +178,6 @@ const autocomplete = async function (req, res) {
|
|||
respond(res, uniqueAutocomplete(results));
|
||||
}
|
||||
|
||||
|
||||
const findSessionSocketId = async (io, peerId) => {
|
||||
const connected_sockets = await io.in(peerId).fetchSockets();
|
||||
for (let item of connected_sockets) {
|
||||
|
|
@ -189,7 +190,8 @@ const findSessionSocketId = async (io, peerId) => {
|
|||
|
||||
async function sessions_agents_count(io, socket) {
|
||||
let c_sessions = 0, c_agents = 0;
|
||||
if (io.sockets.adapter.rooms.get(socket.peerId)) {
|
||||
const rooms = await getAvailableRooms(io);
|
||||
if (rooms.get(socket.peerId)) {
|
||||
const connected_sockets = await io.in(socket.peerId).fetchSockets();
|
||||
|
||||
for (let item of connected_sockets) {
|
||||
|
|
@ -208,7 +210,8 @@ async function sessions_agents_count(io, socket) {
|
|||
|
||||
async function get_all_agents_ids(io, socket) {
|
||||
let agents = [];
|
||||
if (io.sockets.adapter.rooms.get(socket.peerId)) {
|
||||
const rooms = await getAvailableRooms(io);
|
||||
if (rooms.get(socket.peerId)) {
|
||||
const connected_sockets = await io.in(socket.peerId).fetchSockets();
|
||||
for (let item of connected_sockets) {
|
||||
if (item.handshake.query.identity === IDENTITIES.agent) {
|
||||
|
|
@ -219,13 +222,12 @@ async function get_all_agents_ids(io, socket) {
|
|||
return agents;
|
||||
}
|
||||
|
||||
|
||||
wsRouter.get(`/sockets-list`, socketsList);
|
||||
wsRouter.post(`/sockets-list`, socketsList);
|
||||
wsRouter.get(`/sockets-list/:projectKey/autocomplete`, autocomplete);
|
||||
wsRouter.get(`/sockets-list/:projectKey`, socketsListByProject);
|
||||
wsRouter.get(`/sockets-list/:projectKey/:sessionId`, socketsListByProject);
|
||||
wsRouter.post(`/sockets-list/:projectKey`, socketsListByProject);
|
||||
wsRouter.get(`/sockets-list/:projectKey/:sessionId`, socketsListByProject);
|
||||
|
||||
wsRouter.get(`/sockets-live`, socketsLive);
|
||||
wsRouter.post(`/sockets-live`, socketsLive);
|
||||
|
|
@ -261,12 +263,13 @@ module.exports = {
|
|||
}
|
||||
|
||||
} else if (c_sessions <= 0) {
|
||||
debug && console.log(`notifying new agent about no SESSIONS`);
|
||||
debug && console.log(`notifying new agent about no SESSIONS with peerId:${socket.peerId}`);
|
||||
io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
|
||||
}
|
||||
await socket.join(socket.peerId);
|
||||
if (io.sockets.adapter.rooms.get(socket.peerId)) {
|
||||
debug && console.log(`${socket.id} joined room:${socket.peerId}, as:${socket.identity}, members:${io.sockets.adapter.rooms.get(socket.peerId).size}`);
|
||||
const rooms = await getAvailableRooms(io);
|
||||
if (rooms.get(socket.peerId)) {
|
||||
debug && console.log(`${socket.id} joined room:${socket.peerId}, as:${socket.identity}, members:${rooms.get(socket.peerId).size}`);
|
||||
}
|
||||
if (socket.identity === IDENTITIES.agent) {
|
||||
if (socket.handshake.query.agentInfo !== undefined) {
|
||||
|
|
@ -334,8 +337,9 @@ module.exports = {
|
|||
setInterval(async (io) => {
|
||||
try {
|
||||
let count = 0;
|
||||
console.log(` ====== Rooms: ${io.sockets.adapter.rooms.size} ====== `);
|
||||
const arr = Array.from(io.sockets.adapter.rooms);
|
||||
const rooms = await getAvailableRooms(io);
|
||||
console.log(` ====== Rooms: ${rooms.size} ====== `);
|
||||
const arr = Array.from(rooms);
|
||||
const filtered = arr.filter(room => !room[1].has(room[0]));
|
||||
for (let i of filtered) {
|
||||
let {projectKey, sessionId} = extractPeerId(i[0]);
|
||||
|
|
|
|||
|
|
@ -125,7 +125,7 @@ const transformFilters = function (filter) {
|
|||
}
|
||||
return filter;
|
||||
}
|
||||
const extractPayloadFromRequest = function (req) {
|
||||
const extractPayloadFromRequest = async function (req) {
|
||||
let filters = {
|
||||
"query": {}, // for autocomplete
|
||||
"filter": {}, // for sessions search
|
||||
|
|
@ -219,7 +219,30 @@ const uniqueAutocomplete = function (list) {
|
|||
return _list;
|
||||
}
|
||||
const getAvailableRooms = async function (io) {
|
||||
return io.sockets.adapter.rooms.keys();
|
||||
return io.sockets.adapter.rooms;
|
||||
}
|
||||
const getCompressionConfig = function () {
|
||||
// WS: The theoretical overhead per socket is 19KB (11KB for compressor and 8KB for decompressor)
|
||||
let perMessageDeflate = false;
|
||||
if (process.env.COMPRESSION === "true") {
|
||||
console.log(`WS compression: enabled`);
|
||||
perMessageDeflate = {
|
||||
zlibDeflateOptions: {
|
||||
windowBits: 10,
|
||||
memLevel: 1
|
||||
},
|
||||
zlibInflateOptions: {
|
||||
windowBits: 10
|
||||
}
|
||||
}
|
||||
} else {
|
||||
console.log(`WS compression: disabled`);
|
||||
}
|
||||
return {
|
||||
perMessageDeflate: perMessageDeflate,
|
||||
clientNoContextTakeover: true
|
||||
};
|
||||
|
||||
}
|
||||
module.exports = {
|
||||
transformFilters,
|
||||
|
|
@ -234,5 +257,6 @@ module.exports = {
|
|||
extractPayloadFromRequest,
|
||||
sortPaginate,
|
||||
uniqueAutocomplete,
|
||||
getAvailableRooms
|
||||
getAvailableRooms,
|
||||
getCompressionConfig
|
||||
};
|
||||
|
|
@ -42,7 +42,7 @@ const createSocketIOServer = function (server, prefix) {
|
|||
methods: ["GET", "POST", "PUT"]
|
||||
},
|
||||
path: (prefix ? prefix : '') + '/socket',
|
||||
perMessageDeflate: getCompressionConfig()
|
||||
...getCompressionConfig()
|
||||
});
|
||||
} else {
|
||||
io = new _io.Server({
|
||||
|
|
@ -52,7 +52,7 @@ const createSocketIOServer = function (server, prefix) {
|
|||
methods: ["GET", "POST", "PUT"]
|
||||
},
|
||||
path: (prefix ? prefix : '') + '/socket',
|
||||
perMessageDeflate: getCompressionConfig()
|
||||
...getCompressionConfig()
|
||||
});
|
||||
io.attachApp(server);
|
||||
}
|
||||
|
|
@ -85,10 +85,9 @@ const respond = function (res, data) {
|
|||
const socketsList = async function (req, res) {
|
||||
debug && console.log("[WS]looking for all available sessions");
|
||||
let filters = await extractPayloadFromRequest(req, res);
|
||||
|
||||
let liveSessions = {};
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let peerId of rooms) {
|
||||
for (let peerId of rooms.keys()) {
|
||||
let {projectKey, sessionId} = extractPeerId(peerId);
|
||||
if (projectKey !== undefined) {
|
||||
liveSessions[projectKey] = liveSessions[projectKey] || [];
|
||||
|
|
@ -115,7 +114,7 @@ const socketsListByProject = async function (req, res) {
|
|||
let filters = await extractPayloadFromRequest(req, res);
|
||||
let liveSessions = {};
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let peerId of rooms) {
|
||||
for (let peerId of rooms.keys()) {
|
||||
let {projectKey, sessionId} = extractPeerId(peerId);
|
||||
if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) {
|
||||
liveSessions[projectKey] = liveSessions[projectKey] || [];
|
||||
|
|
@ -133,7 +132,7 @@ const socketsListByProject = async function (req, res) {
|
|||
}
|
||||
}
|
||||
liveSessions[_projectKey] = liveSessions[_projectKey] || [];
|
||||
respond(res, _sessionId === undefined ? liveSessions[_projectKey]
|
||||
respond(res, _sessionId === undefined ? sortPaginate(liveSessions[_projectKey], filters)
|
||||
: liveSessions[_projectKey].length > 0 ? liveSessions[_projectKey][0]
|
||||
: null);
|
||||
}
|
||||
|
|
@ -143,7 +142,7 @@ const socketsLive = async function (req, res) {
|
|||
let filters = await extractPayloadFromRequest(req, res);
|
||||
let liveSessions = {};
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let peerId of rooms) {
|
||||
for (let peerId of rooms.keys()) {
|
||||
let {projectKey} = extractPeerId(peerId);
|
||||
if (projectKey !== undefined) {
|
||||
let connected_sockets = await io.in(peerId).fetchSockets();
|
||||
|
|
@ -172,7 +171,7 @@ const socketsLiveByProject = async function (req, res) {
|
|||
let filters = await extractPayloadFromRequest(req, res);
|
||||
let liveSessions = {};
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let peerId of rooms) {
|
||||
for (let peerId of rooms.keys()) {
|
||||
let {projectKey, sessionId} = extractPeerId(peerId);
|
||||
if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) {
|
||||
let connected_sockets = await io.in(peerId).fetchSockets();
|
||||
|
|
@ -204,7 +203,7 @@ const autocomplete = async function (req, res) {
|
|||
let results = [];
|
||||
if (filters.query && Object.keys(filters.query).length > 0) {
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let peerId of rooms) {
|
||||
for (let peerId of rooms.keys()) {
|
||||
let {projectKey} = extractPeerId(peerId);
|
||||
if (projectKey === _projectKey) {
|
||||
let connected_sockets = await io.in(peerId).fetchSockets();
|
||||
|
|
@ -231,7 +230,7 @@ const findSessionSocketId = async (io, peerId) => {
|
|||
|
||||
async function sessions_agents_count(io, socket) {
|
||||
let c_sessions = 0, c_agents = 0;
|
||||
let rooms = await io.of('/').adapter.allRooms();
|
||||
const rooms = await getAvailableRooms(io);
|
||||
if (rooms.has(socket.peerId)) {
|
||||
const connected_sockets = await io.in(socket.peerId).fetchSockets();
|
||||
|
||||
|
|
@ -251,7 +250,7 @@ async function sessions_agents_count(io, socket) {
|
|||
|
||||
async function get_all_agents_ids(io, socket) {
|
||||
let agents = [];
|
||||
let rooms = await io.of('/').adapter.allRooms();
|
||||
const rooms = await getAvailableRooms(io);
|
||||
if (rooms.has(socket.peerId)) {
|
||||
const connected_sockets = await io.in(socket.peerId).fetchSockets();
|
||||
for (let item of connected_sockets) {
|
||||
|
|
@ -277,7 +276,6 @@ wsRouter.get(`/sockets-live/:projectKey`, socketsLiveByProject);
|
|||
wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject);
|
||||
wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveByProject);
|
||||
|
||||
|
||||
module.exports = {
|
||||
wsRouter,
|
||||
start: (server, prefix) => {
|
||||
|
|
@ -305,12 +303,11 @@ module.exports = {
|
|||
}
|
||||
|
||||
} else if (c_sessions <= 0) {
|
||||
debug && console.log(`notifying new agent about no SESSIONS`);
|
||||
debug && console.log(`notifying new agent about no SESSIONS with peerId:${socket.peerId}`);
|
||||
io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
|
||||
}
|
||||
// await io.of('/').adapter.join(socket.id, socket.peerId);
|
||||
await socket.join(socket.peerId);
|
||||
let rooms = await io.of('/').adapter.allRooms();
|
||||
const rooms = await getAvailableRooms(io);
|
||||
if (rooms.has(socket.peerId)) {
|
||||
let connectedSockets = await io.in(socket.peerId).fetchSockets();
|
||||
debug && console.log(`${socket.id} joined room:${socket.peerId}, as:${socket.identity}, members:${connectedSockets.length}`);
|
||||
|
|
@ -380,7 +377,7 @@ module.exports = {
|
|||
console.log("WS server started");
|
||||
setInterval(async (io) => {
|
||||
try {
|
||||
let rooms = await io.of('/').adapter.allRooms();
|
||||
const rooms = await getAvailableRooms(io);
|
||||
let validRooms = [];
|
||||
console.log(` ====== Rooms: ${rooms.size} ====== `);
|
||||
// const arr = Array.from(rooms)
|
||||
|
|
|
|||
|
|
@ -21,8 +21,6 @@ const {
|
|||
extractSessionIdFromRequest,
|
||||
extractPayloadFromRequest,
|
||||
getCompressionConfig,
|
||||
getUWSCompressionConfig,
|
||||
getUWSDecompressionConfig,
|
||||
getAvailableRooms
|
||||
} = require('../utils/helper-ee');
|
||||
const wsRouter = express.Router();
|
||||
|
|
@ -39,8 +37,7 @@ const createSocketIOServer = function (server, prefix) {
|
|||
methods: ["GET", "POST", "PUT"]
|
||||
},
|
||||
path: (prefix ? prefix : '') + '/socket',
|
||||
perMessageDeflate: getCompressionConfig(),
|
||||
clientNoContextTakeover: process.env.CLIENT_CONTEXT_TAKEOVER && process.env.CLIENT_CONTEXT_TAKEOVER === "true"
|
||||
...getCompressionConfig()
|
||||
});
|
||||
} else {
|
||||
io = new _io.Server({
|
||||
|
|
@ -50,9 +47,7 @@ const createSocketIOServer = function (server, prefix) {
|
|||
methods: ["GET", "POST", "PUT"]
|
||||
},
|
||||
path: (prefix ? prefix : '') + '/socket',
|
||||
// perMessageDeflate: getCompressionConfig(),
|
||||
compression: getUWSCompressionConfig(),
|
||||
decompression: getUWSDecompressionConfig()
|
||||
...getCompressionConfig()
|
||||
});
|
||||
io.attachApp(server);
|
||||
}
|
||||
|
|
@ -74,7 +69,7 @@ const socketsList = async function (req, res) {
|
|||
let filters = await extractPayloadFromRequest(req, res);
|
||||
let liveSessions = {};
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let peerId of rooms) {
|
||||
for (let peerId of rooms.keys()) {
|
||||
let {projectKey, sessionId} = extractPeerId(peerId);
|
||||
if (projectKey !== undefined) {
|
||||
liveSessions[projectKey] = liveSessions[projectKey] || [];
|
||||
|
|
@ -101,7 +96,7 @@ const socketsListByProject = async function (req, res) {
|
|||
let filters = await extractPayloadFromRequest(req, res);
|
||||
let liveSessions = {};
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let peerId of rooms) {
|
||||
for (let peerId of rooms.keys()) {
|
||||
let {projectKey, sessionId} = extractPeerId(peerId);
|
||||
if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) {
|
||||
liveSessions[projectKey] = liveSessions[projectKey] || [];
|
||||
|
|
@ -129,7 +124,7 @@ const socketsLive = async function (req, res) {
|
|||
let filters = await extractPayloadFromRequest(req, res);
|
||||
let liveSessions = {};
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let peerId of rooms) {
|
||||
for (let peerId of rooms.keys()) {
|
||||
let {projectKey} = extractPeerId(peerId);
|
||||
if (projectKey !== undefined) {
|
||||
let connected_sockets = await io.in(peerId).fetchSockets();
|
||||
|
|
@ -157,7 +152,7 @@ const socketsLiveByProject = async function (req, res) {
|
|||
let filters = await extractPayloadFromRequest(req, res);
|
||||
let liveSessions = {};
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let peerId of rooms) {
|
||||
for (let peerId of rooms.keys()) {
|
||||
let {projectKey, sessionId} = extractPeerId(peerId);
|
||||
if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) {
|
||||
let connected_sockets = await io.in(peerId).fetchSockets();
|
||||
|
|
@ -188,7 +183,7 @@ const autocomplete = async function (req, res) {
|
|||
let results = [];
|
||||
if (filters.query && Object.keys(filters.query).length > 0) {
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let peerId of rooms) {
|
||||
for (let peerId of rooms.keys()) {
|
||||
let {projectKey} = extractPeerId(peerId);
|
||||
if (projectKey === _projectKey) {
|
||||
let connected_sockets = await io.in(peerId).fetchSockets();
|
||||
|
|
@ -214,27 +209,15 @@ const findSessionSocketId = async (io, peerId) => {
|
|||
};
|
||||
|
||||
async function sessions_agents_count(io, socket) {
|
||||
debug && console.log(`----looking for rooms of peerId:${socket.peerId}`);
|
||||
let c_sessions = 0, c_agents = 0;
|
||||
const rooms = io.sockets.adapter.rooms;
|
||||
debug && console.log("----rooms from adapter:");
|
||||
debug && console.log(rooms);
|
||||
const rooms2 = await getAvailableRooms(io);
|
||||
debug && console.log("----rooms from getRooms");
|
||||
debug && console.log(rooms2);
|
||||
const rooms = await getAvailableRooms(io);
|
||||
if (rooms.get(socket.peerId)) {
|
||||
const connected_sockets = await io.in(socket.peerId).fetchSockets();
|
||||
debug && console.log(`----connected sockets to peerId:${socket.peerId}`)
|
||||
debug && console.log(connected_sockets)
|
||||
|
||||
for (let item of connected_sockets) {
|
||||
debug && console.log(`---checking identity of:`)
|
||||
debug && console.log(item)
|
||||
debug && console.log(`---==> ${item.handshake.query.identity}`)
|
||||
if (item.handshake.query.identity === IDENTITIES.session) {
|
||||
debug && console.log(`---session`)
|
||||
c_sessions++;
|
||||
} else {
|
||||
debug && console.log(`---agent`)
|
||||
c_agents++;
|
||||
}
|
||||
}
|
||||
|
|
@ -247,7 +230,8 @@ async function sessions_agents_count(io, socket) {
|
|||
|
||||
async function get_all_agents_ids(io, socket) {
|
||||
let agents = [];
|
||||
if (io.sockets.adapter.rooms.get(socket.peerId)) {
|
||||
const rooms = await getAvailableRooms(io);
|
||||
if (rooms.get(socket.peerId)) {
|
||||
const connected_sockets = await io.in(socket.peerId).fetchSockets();
|
||||
for (let item of connected_sockets) {
|
||||
if (item.handshake.query.identity === IDENTITIES.agent) {
|
||||
|
|
@ -272,7 +256,6 @@ wsRouter.get(`/sockets-live/:projectKey`, socketsLiveByProject);
|
|||
wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject);
|
||||
wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveByProject);
|
||||
|
||||
|
||||
module.exports = {
|
||||
wsRouter,
|
||||
start: (server, prefix) => {
|
||||
|
|
@ -300,12 +283,13 @@ module.exports = {
|
|||
}
|
||||
|
||||
} else if (c_sessions <= 0) {
|
||||
debug && console.log(`notifying new agent about no SESSIONS: ${c_sessions}`);
|
||||
debug && console.log(`notifying new agent about no SESSIONS with peerId:${socket.peerId}`);
|
||||
io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
|
||||
}
|
||||
await socket.join(socket.peerId);
|
||||
if (io.sockets.adapter.rooms.get(socket.peerId)) {
|
||||
debug && console.log(`${socket.id} joined room:${socket.peerId}, as:${socket.identity}, members:${io.sockets.adapter.rooms.get(socket.peerId).size}`);
|
||||
const rooms = await getAvailableRooms(io);
|
||||
if (rooms.get(socket.peerId)) {
|
||||
debug && console.log(`${socket.id} joined room:${socket.peerId}, as:${socket.identity}, members:${rooms.get(socket.peerId).size}`);
|
||||
}
|
||||
if (socket.identity === IDENTITIES.agent) {
|
||||
if (socket.handshake.query.agentInfo !== undefined) {
|
||||
|
|
@ -373,8 +357,9 @@ module.exports = {
|
|||
setInterval(async (io) => {
|
||||
try {
|
||||
let count = 0;
|
||||
console.log(` ====== Rooms: ${io.sockets.adapter.rooms.size} ====== `);
|
||||
const arr = Array.from(io.sockets.adapter.rooms);
|
||||
const rooms = await getAvailableRooms(io);
|
||||
console.log(` ====== Rooms: ${rooms.size} ====== `);
|
||||
const arr = Array.from(rooms);
|
||||
const filtered = arr.filter(room => !room[1].has(room[0]));
|
||||
for (let i of filtered) {
|
||||
let {projectKey, sessionId} = extractPeerId(i[0]);
|
||||
|
|
|
|||
|
|
@ -96,111 +96,6 @@ const extractPayloadFromRequest = async function (req, res) {
|
|||
debug && console.log("payload/filters:" + JSON.stringify(filters))
|
||||
return Object.keys(filters).length > 0 ? filters : undefined;
|
||||
}
|
||||
const getCompressionConfig = function () {
|
||||
let perMessageDeflate = false;
|
||||
if (process.env.COMPRESSION_LEVEL && process.env.COMPRESSION_LEVEL > 0) {
|
||||
if (process.env.COMPRESSION_LEVEL > 3) {
|
||||
console.log(`WS compression level: ${process.env.COMPRESSION_LEVEL} not found, ignoring`);
|
||||
} else {
|
||||
let windowBits;
|
||||
switch (parseInt(process.env.COMPRESSION_LEVEL)) {
|
||||
case 1:
|
||||
windowBits = 8;
|
||||
break;
|
||||
case 2:
|
||||
windowBits = 10;
|
||||
break;
|
||||
case 3:
|
||||
windowBits = 11;
|
||||
break;
|
||||
}
|
||||
console.log(`WS compression level: ${process.env.COMPRESSION_LEVEL} => windowBits: ${windowBits}`);
|
||||
perMessageDeflate = {
|
||||
zlibDeflateOptions: {
|
||||
windowBits: windowBits,
|
||||
memLevel: 1
|
||||
},
|
||||
|
||||
zlibInflateOptions: {
|
||||
windowBits: windowBits
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
console.log(`WS compression level: disabled`);
|
||||
}
|
||||
return perMessageDeflate;
|
||||
}
|
||||
const getUWSCompressionConfig = function () {
|
||||
let compression = uWS.DISABLED;
|
||||
if (process.env.UWS_COMPRESSION_LEVEL) {
|
||||
switch (process.env.UWS_COMPRESSION_LEVEL) {
|
||||
case 'SHARED_C':
|
||||
compression = uWS.SHARED_COMPRESSOR;
|
||||
break;
|
||||
case 'D_C_3':
|
||||
compression = uWS.DEDICATED_COMPRESSOR_3KB;
|
||||
break;
|
||||
case 'D_C_4':
|
||||
compression = uWS.DEDICATED_COMPRESSOR_4KB;
|
||||
break;
|
||||
case 'D_C_8':
|
||||
compression = uWS.DEDICATED_COMPRESSOR_8KB;
|
||||
break;
|
||||
case 'D_C_16':
|
||||
compression = uWS.DEDICATED_COMPRESSOR_16KB;
|
||||
break;
|
||||
case 'D_C_32':
|
||||
compression = uWS.DEDICATED_COMPRESSOR_32KB;
|
||||
break;
|
||||
case 'D_C_64':
|
||||
compression = uWS.DEDICATED_COMPRESSOR_64KB;
|
||||
break;
|
||||
case 'D_C_128':
|
||||
compression = uWS.DEDICATED_COMPRESSOR_128KB;
|
||||
break;
|
||||
case 'D_C_256':
|
||||
compression = uWS.DEDICATED_COMPRESSOR_256KB;
|
||||
break;
|
||||
}
|
||||
console.log(`WS UWS compression level: ${process.env.UWS_COMPRESSION_LEVEL} => ${compression}`);
|
||||
} else {
|
||||
console.log(`WS UWS compression level: disabled`);
|
||||
}
|
||||
return compression;
|
||||
}
|
||||
const getUWSDecompressionConfig = function () {
|
||||
let compression = uWS.DISABLED;
|
||||
if (process.env.UWS_DECOMPRESSION_LEVEL) {
|
||||
switch (process.env.UWS_COMPRESSION_LEVEL) {
|
||||
case 'SHARED_D':
|
||||
compression = uWS.SHARED_DECOMPRESSOR;
|
||||
break;
|
||||
case 'D_D_1':
|
||||
compression = uWS.DEDICATED_DECOMPRESSOR_1KB;
|
||||
break;
|
||||
case 'D_D_2':
|
||||
compression = uWS.DEDICATED_DECOMPRESSOR_2KB;
|
||||
break;
|
||||
case 'D_D_4':
|
||||
compression = uWS.DEDICATED_DECOMPRESSOR_4KB;
|
||||
break;
|
||||
case 'D_D_8':
|
||||
compression = uWS.DEDICATED_DECOMPRESSOR_8KB;
|
||||
break;
|
||||
case 'D_D_16':
|
||||
compression = uWS.DEDICATED_DECOMPRESSOR_16KB;
|
||||
break;
|
||||
case 'D_D_32':
|
||||
compression = uWS.DEDICATED_DECOMPRESSOR_32KB;
|
||||
break;
|
||||
}
|
||||
console.log(`WS UWS compression level: ${process.env.UWS_COMPRESSION_LEVEL} => ${compression}`);
|
||||
} else {
|
||||
console.log(`WS UWS compression level: disabled`);
|
||||
}
|
||||
return compression;
|
||||
}
|
||||
const getAvailableRooms = async function (io) {
|
||||
if (process.env.redis === "true") {
|
||||
return io.of('/').adapter.allRooms();
|
||||
|
|
@ -208,12 +103,29 @@ const getAvailableRooms = async function (io) {
|
|||
return helper.getAvailableRooms(io);
|
||||
}
|
||||
}
|
||||
const getCompressionConfig = function () {
|
||||
if (process.env.uws !== "true") {
|
||||
return helper.getCompressionConfig();
|
||||
} else {
|
||||
// uWS: The theoretical overhead per socket is 32KB (8KB for compressor and for 24KB decompressor)
|
||||
if (process.env.COMPRESSION === "true") {
|
||||
console.log(`uWS compression: enabled`);
|
||||
return {
|
||||
compression: uWS.DEDICATED_COMPRESSOR_8KB,
|
||||
decompression: uWS.DEDICATED_DECOMPRESSOR_1KB
|
||||
};
|
||||
} else {
|
||||
console.log(`uWS compression: disabled`);
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
extractProjectKeyFromRequest,
|
||||
extractSessionIdFromRequest,
|
||||
extractPayloadFromRequest,
|
||||
getCompressionConfig,
|
||||
getUWSCompressionConfig,
|
||||
getUWSDecompressionConfig,
|
||||
getAvailableRooms
|
||||
};
|
||||
27
ee/scripts/schema/db/init_dbs/postgresql/1.12.0/1.12.0.sql
Normal file
27
ee/scripts/schema/db/init_dbs/postgresql/1.12.0/1.12.0.sql
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
DO
|
||||
$$
|
||||
DECLARE
|
||||
previous_version CONSTANT text := 'v1.11.0-ee';
|
||||
next_version CONSTANT text := 'v1.12.0-ee';
|
||||
BEGIN
|
||||
IF (SELECT openreplay_version()) = previous_version THEN
|
||||
raise notice 'valid previous DB version';
|
||||
ELSEIF (SELECT openreplay_version()) = next_version THEN
|
||||
raise notice 'new version detected, nothing to do';
|
||||
ELSE
|
||||
RAISE EXCEPTION 'upgrade to % failed, invalid previous version, expected %, got %', next_version,previous_version,(SELECT openreplay_version());
|
||||
END IF;
|
||||
END ;
|
||||
$$
|
||||
LANGUAGE plpgsql;
|
||||
|
||||
BEGIN;
|
||||
CREATE OR REPLACE FUNCTION openreplay_version()
|
||||
RETURNS text AS
|
||||
$$
|
||||
SELECT 'v1.12.0-ee'
|
||||
$$ LANGUAGE sql IMMUTABLE;
|
||||
|
||||
ALTER TYPE issue_type ADD VALUE IF NOT EXISTS 'app_crash';
|
||||
|
||||
COMMIT;
|
||||
27
scripts/schema/db/init_dbs/postgresql/1.12.0/1.12.0.sql
Normal file
27
scripts/schema/db/init_dbs/postgresql/1.12.0/1.12.0.sql
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
DO
|
||||
$$
|
||||
DECLARE
|
||||
previous_version CONSTANT text := 'v1.11.0';
|
||||
next_version CONSTANT text := 'v1.12.0';
|
||||
BEGIN
|
||||
IF (SELECT openreplay_version()) = previous_version THEN
|
||||
raise notice 'valid previous DB version';
|
||||
ELSEIF (SELECT openreplay_version()) = next_version THEN
|
||||
raise notice 'new version detected, nothing to do';
|
||||
ELSE
|
||||
RAISE EXCEPTION 'upgrade to % failed, invalid previous version, expected %, got %', next_version,previous_version,(SELECT openreplay_version());
|
||||
END IF;
|
||||
END ;
|
||||
$$
|
||||
LANGUAGE plpgsql;
|
||||
|
||||
BEGIN;
|
||||
CREATE OR REPLACE FUNCTION openreplay_version()
|
||||
RETURNS text AS
|
||||
$$
|
||||
SELECT 'v1.12.0'
|
||||
$$ LANGUAGE sql IMMUTABLE;
|
||||
|
||||
ALTER TYPE issue_type ADD VALUE IF NOT EXISTS 'app_crash';
|
||||
|
||||
COMMIT;
|
||||
Loading…
Add table
Reference in a new issue