openreplay/ee/assist/utils/wsServer.js

119 lines
No EOL
3.7 KiB
JavaScript

const _io = require("socket.io");
const {getCompressionConfig} = require("./helper");
const {logger} = require('./logger');
const {Mutex} = require('async-mutex');
let io;
const getServer = function () {return io;}
const useRedis = process.env.redis === "true";
const cacheExpiration = parseInt(process.env.cacheExpiration) || 10; // in seconds
const mutexTimeout = parseInt(process.env.mutexTimeout) || 10000; // in milliseconds
const fetchMutex = new Mutex();
const fetchAllSocketsResultKey = 'fetchSocketsResult';
let lastKnownResult = [];
// Cache layer
let redisClient;
if (useRedis) {
const {createClient} = require("redis");
const REDIS_URL = (process.env.REDIS_URL || "localhost:6379").replace(/((^\w+:|^)\/\/|^)/, 'redis://');
redisClient = createClient({url: REDIS_URL});
redisClient.on("error", (error) => logger.error(`Redis error : ${error}`));
void redisClient.connect();
logger.info(`Using Redis for cache: ${REDIS_URL}`);
}
const processSocketsList = function (sockets) {
let res = []
for (let socket of sockets) {
let {handshake} = socket;
res.push({handshake});
}
return res
}
const doFetchAllSockets = async function () {
if (useRedis) {
try {
let cachedResult = await redisClient.get(fetchAllSocketsResultKey);
if (cachedResult) {
return JSON.parse(cachedResult);
}
return await fetchMutex.runExclusive(async () => {
try {
cachedResult = await redisClient.get(fetchAllSocketsResultKey);
if (cachedResult) {
return JSON.parse(cachedResult);
}
let result = await io.fetchSockets();
let cachedString = JSON.stringify(processSocketsList(result));
lastKnownResult = result;
await redisClient.set(fetchAllSocketsResultKey, cachedString, {EX: cacheExpiration});
return result;
} catch (err) {
logger.error('Error fetching new sockets:', err);
return lastKnownResult;
}
}, mutexTimeout);
} catch (error) {
logger.error('Error fetching cached sockets:', error);
return lastKnownResult;
}
}
try {
let result = await io.fetchSockets();
lastKnownResult = result;
return result;
} catch (error) {
logger.error('Error fetching sockets:', error);
return lastKnownResult;
}
}
const fetchSockets = async function (roomID) {
if (!io) {
return [];
}
if (!roomID) {
return await doFetchAllSockets();
}
return await io.in(roomID).fetchSockets();
}
const createSocketIOServer = function (server, prefix) {
if (io) {
return io;
}
if (process.env.uws !== "true") {
io = _io(server, {
maxHttpBufferSize: (parseFloat(process.env.maxHttpBufferSize) || 5) * 1e6,
cors: {
origin: "*",
methods: ["GET", "POST", "PUT"],
credentials: true
},
path: (prefix ? prefix : '') + '/socket',
...getCompressionConfig()
});
} else {
io = new _io.Server({
maxHttpBufferSize: (parseFloat(process.env.maxHttpBufferSize) || 5) * 1e6,
cors: {
origin: "*",
methods: ["GET", "POST", "PUT"],
credentials: true
},
path: (prefix ? prefix : '') + '/socket',
...getCompressionConfig()
});
io.attachApp(server);
}
return io;
}
module.exports = {
createSocketIOServer,
getServer,
fetchSockets,
}