implementing conference call

This commit is contained in:
Андрей Бабушкин 2025-02-24 13:46:28 +01:00
parent 12741d6377
commit 3ea3c092b5
7 changed files with 171 additions and 55 deletions

View file

@ -9,7 +9,7 @@ import type { LocalStream } from 'Player';
import { PlayerContext } from 'App/components/Session/playerContext';
export interface Props {
incomeStream: MediaStream[] | null;
incomeStream: { stream: MediaStream, isAgent: boolean }[] | null;
localStream: LocalStream | null;
userId: string;
isPrestart?: boolean;
@ -50,8 +50,8 @@ function ChatWindow({ userId, incomeStream, localStream, endCall, isPrestart }:
>
{incomeStream ? (
incomeStream.map((stream) => (
<React.Fragment key={stream.id}>
<VideoContainer stream={stream} setRemoteEnabled={setRemoteEnabled} />
<React.Fragment key={stream.stream.id}>
<VideoContainer stream={stream.stream} setRemoteEnabled={setRemoteEnabled} isAgent={stream.isAgent} />
</React.Fragment>
))
) : (

View file

@ -80,7 +80,7 @@ function AssistActions({
} = store.get();
const [isPrestart, setPrestart] = useState(false);
const [incomeStream, setIncomeStream] = useState<MediaStream[] | null>([]);
const [incomeStream, setIncomeStream] = useState<{ stream: MediaStream; isAgent: boolean }[] | null>([]);
const [localStream, setLocalStream] = useState<LocalStream | null>(null);
const [callObject, setCallObject] = useState<{ end: () => void } | null>(null);
@ -131,12 +131,12 @@ function AssistActions({
}
}, [peerConnectionStatus]);
const addIncomeStream = (stream: MediaStream) => {
const addIncomeStream = (stream: MediaStream, isAgent: boolean) => {
setIncomeStream((oldState) => {
if (oldState === null) return [stream];
if (!oldState.find((existingStream) => existingStream.id === stream.id)) {
if (oldState === null) return [{ stream, isAgent }];
if (!oldState.find((existingStream) => existingStream.stream.id === stream.id)) {
audioContextManager.mergeAudioStreams(stream);
return [...oldState, stream];
return [...oldState, { stream, isAgent }];
}
return oldState;
});
@ -156,16 +156,16 @@ function AssistActions({
addIncomeStream,
() => {
player.assistManager.ping(AssistActionsPing.call.end, agentId)
lStream.stop.bind(lStream);
lStream.stop.apply(lStream);
removeIncomeStream(lStream.stream);
},
onReject,
onError
);
setCallObject(callPeer());
if (additionalAgentIds) {
callPeer(additionalAgentIds);
}
// if (additionalAgentIds) {
// callPeer(additionalAgentIds);
// }
})
.catch(onError);
}

View file

@ -6,9 +6,17 @@ interface Props {
height?: number | string;
setRemoteEnabled?: (isEnabled: boolean) => void;
local?: boolean;
isAgent?: boolean;
}
function VideoContainer({ stream, muted = false, height = 280, setRemoteEnabled, local }: Props) {
function VideoContainer({
stream,
muted = false,
height = 280,
setRemoteEnabled,
local,
isAgent,
}: Props) {
const ref = useRef<HTMLVideoElement>(null);
const [isEnabled, setEnabled] = React.useState(false);
@ -16,14 +24,14 @@ function VideoContainer({ stream, muted = false, height = 280, setRemoteEnabled,
if (ref.current) {
ref.current.srcObject = stream;
}
}, [ref.current, stream, stream.getVideoTracks()[0]?.getSettings().width]);
}, [ref.current, stream, stream?.getVideoTracks()[0]?.getSettings().width]);
useEffect(() => {
if (!stream) {
return;
}
const iid = setInterval(() => {
const track = stream.getVideoTracks()[0]
const track = stream.getVideoTracks()[0];
const settings = track?.getSettings();
const isDummyVideoTrack = settings
? settings.width === 2 ||
@ -52,6 +60,15 @@ function VideoContainer({ stream, muted = false, height = 280, setRemoteEnabled,
}}
>
<video autoPlay ref={ref} muted={muted} style={{ height: height }} />
{isAgent ? (
<div
style={{
position: 'absolute',
}}
>
Agent
</div>
) : null}
</div>
);
}

View file

@ -10,6 +10,7 @@ import RemoteControl, { RemoteControlStatus } from './RemoteControl';
import ScreenRecording, { SessionRecordingStatus } from './ScreenRecording';
import CanvasReceiver from 'Player/web/assist/CanvasReceiver';
import { gunzipSync } from 'fflate';
import logger from '@/logger';
export { RemoteControlStatus, SessionRecordingStatus, CallingState };
@ -192,6 +193,12 @@ export default class AssistManager {
}),
},
}));
socket.onAny((event, ...args) => {
logger.log(`📩 Socket: ${event}`, args);
});
socket.on('connect', () => {
waitingForMessages = true;
// TODO: reconnect happens frequently on bad network
@ -288,7 +295,11 @@ export default class AssistManager {
socket,
this.config,
this.peerID,
this.getAssistVersion
this.getAssistVersion,
{
...this.session.agentInfo,
id: agentId,
}
);
this.remoteControl = new RemoteControl(
this.store,

View file

@ -32,8 +32,33 @@ export default class Call {
private socket: Socket,
private config: RTCIceServer[] | null,
private peerID: string,
private getAssistVersion: () => number
private getAssistVersion: () => number,
private agent: Record<string, any>,
private agentInCallIds: string[] = [],
private callId: string,
) {
socket.on('UPDATE_SESSION', (data: { data: { agentIds: string[] }}) => {
const { agentIds } = data.data;
if (agentIds) {
const filteredAgentIds = agentIds.filter((id: string) => id.split('-')[3] !== this.agent.id.toString());
console.log("!!! FILTERED IDS", filteredAgentIds);
const newIds = filteredAgentIds.filter((id: string) => !this.agentInCallIds.includes(id));
console.log("!!! NEW IDS", newIds);
const removedIds = this.agentInCallIds.filter((id: string) => !filteredAgentIds.includes(id));
console.log("!!! REMOVED IDS", removedIds);
removedIds.forEach((id: string) => this.agentDisconnected(id));
if (store.get().calling !== CallingState.OnCall) {
newIds.forEach((id: string) => {
console.log("CALL3 for", id);
this._peerConnection(id)
});
}
this.agentInCallIds = filteredAgentIds;
}
});
socket.on('call_end', () => {
this.onRemoteCallEnd()
});
@ -70,6 +95,11 @@ export default class Call {
this.store.update({ calling: CallingState.NoCall });
});
socket.on('webrtc_call_offer', (data: { data: { from: string, offer: RTCSessionDescriptionInit } }) => {
console.log("RECEIVED OFFER", data);
this.handleOffer(data.data);
});
socket.on('webrtc_call_answer', (data: { data: { from: string, answer: RTCSessionDescriptionInit } }) => {
this.handleAnswer(data.data);
});
@ -81,7 +111,7 @@ export default class Call {
}
// CREATE A LOCAL PEER
private async createPeerConnection(remotePeerId: string): Promise<RTCPeerConnection> {
private async createPeerConnection(callId: string): Promise<RTCPeerConnection> {
// create pc with ice config
const pc = new RTCPeerConnection({
iceServers: [{ urls: "stun:stun.l.google.com:19302" }],
@ -97,7 +127,7 @@ export default class Call {
// when ice is ready we send it
pc.onicecandidate = (event) => {
if (event.candidate) {
this.socket.emit('webrtc_call_ice_candidate', { from: remotePeerId, candidate: event.candidate });
this.socket.emit('webrtc_call_ice_candidate', { from: callId, candidate: event.candidate });
} else {
logger.log("ICE candidate gathering complete");
}
@ -107,12 +137,12 @@ export default class Call {
pc.ontrack = (event) => {
const stream = event.streams[0];
if (stream) {
this.videoStreams[remotePeerId] = stream.getVideoTracks()[0];
this.videoStreams[callId] = stream.getVideoTracks()[0];
if (this.store.get().calling !== CallingState.OnCall) {
this.store.update({ calling: CallingState.OnCall });
}
if (this.callArgs) {
this.callArgs.onStream(stream);
this.callArgs.onStream(stream, isAgentId(callId));
}
}
};
@ -141,6 +171,7 @@ export default class Call {
// ESTABLISHING A CONNECTION
private async _peerConnection(remotePeerId: string) {
console.log("_ PEER CONNECTION", remotePeerId);
try {
// Create RTCPeerConnection
const pc = await this.createPeerConnection(remotePeerId);
@ -161,6 +192,7 @@ export default class Call {
this.connectAttempts++;
logger.log('reconnecting', this.connectAttempts);
await new Promise((resolve) => setTimeout(resolve, 250));
console.log("CALL2")
await this._peerConnection(remotePeerId);
} else {
logger.log('error', this.connectAttempts);
@ -171,13 +203,41 @@ export default class Call {
}
}
// Process the received offer to answer
private async handleOffer(data: { from: string, offer: RTCSessionDescriptionInit }) {
// set to remotePeerId data.from
const callId = data.from;
const pc = this.connections[callId];
if (!pc) {
logger.error("No connection found for remote peer", callId);
return;
}
try {
// if the connection is not established yet, then set remoteDescription to peer
if (pc.signalingState !== "stable") {
await pc.setRemoteDescription(new RTCSessionDescription(data.offer));
const answer = await pc.createAnswer();
await pc.setLocalDescription(answer);
this.socket.emit('webrtc_call_answer', { from: callId, answer: pc.localDescription });
} else {
logger.warn("Skipping setRemoteDescription: Already in stable state");
}
} catch (e) {
logger.error("Error setting remote description from answer", e);
this.callArgs?.onError?.(e);
}
}
// Process the received answer to offer
private async handleAnswer(data: { from: string, answer: RTCSessionDescriptionInit }) {
// set to remotePeerId data.from
const remotePeerId = data.from;
const pc = this.connections[remotePeerId];
if (this.agentInCallIds.includes(data.from)) {
return;
}
const callId = data.from;
const pc = this.connections[callId];
if (!pc) {
logger.error("No connection found for remote peer", remotePeerId);
logger.error("No connection found for remote peer", callId);
return;
}
try {
@ -195,8 +255,8 @@ export default class Call {
// process the received iceCandidate
private async handleIceCandidate(data: { from: string, candidate: RTCIceCandidateInit }) {
const remotePeerId = data.from;
const pc = this.connections[remotePeerId];
const callId = data.from;
const pc = this.connections[callId];
if (!pc) return;
// if there are ice candidates then add candidate to peer
if (data.candidate && (data.candidate.sdpMid || data.candidate.sdpMLineIndex !== null)) {
@ -245,8 +305,7 @@ export default class Call {
// Ends the call and sends the call_end signal
initiateCallEnd = async () => {
const userName = userStore.account.name;
this.emitData('call_end', userName);
this.emitData('call_end', this.callId);
this.handleCallEnd();
};
@ -260,7 +319,7 @@ export default class Call {
private callArgs: {
localStream: LocalStream;
onStream: (s: MediaStream) => void;
onStream: (s: MediaStream, isAgent: boolean) => void;
onCallEnd: () => void;
onReject: () => void;
onError?: (arg?: any) => void;
@ -300,12 +359,14 @@ export default class Call {
}
// Connect with other agents
addPeerCall(thirdPartyPeers: string[]) {
thirdPartyPeers.forEach((peerId) => this._peerConnection(peerId));
}
// addPeerCall(thirdPartyPeers: string[]) {
// console.log("ADD THRID PARTY CALL", thirdPartyPeers);
// thirdPartyPeers.forEach((peerId) => this._peerConnection(peerId));
// }
// Calls the method to create a connection with a peer
private _callSessionPeer() {
console.log("CALL1")
if (![CallingState.NoCall, CallingState.Reconnecting].includes(this.store.get().calling)) {
return;
}
@ -319,13 +380,19 @@ export default class Call {
const peerId =
this.getAssistVersion() === 1
? this.peerID
: `${this.peerID}-${tab || Array.from(this.store.get().tabs)[0]}`;
: `${this.peerID}-${tab || Array.from(this.store.get().tabs)[0]}-${this.agent.id}-agent`;
this.callId = peerId;
const userName = userStore.account.name;
this.emitData('_agent_name', userName);
void this._peerConnection(peerId);
}
agentDisconnected(agentId: string) {
this.connections[agentId]?.close();
delete this.connections[agentId];
}
// Method for clearing resources
clean() {
void this.initiateCallEnd();
@ -334,3 +401,7 @@ export default class Call {
this.callArgs?.onCallEnd();
}
}
function isAgentId(id: string): boolean {
return id.endsWith('-agent');
}

View file

@ -77,7 +77,7 @@ export default class Assist {
readonly version = pkgVersion
private socket: Socket | null = null
private calls: Record<string, RTCPeerConnection> = {};
private calls: Map<string, RTCPeerConnection> = new Map();
private canvasPeers: { [id: number]: RTCPeerConnection | null } = {}
private canvasNodeCheckers: Map<number, any> = new Map()
private assistDemandedRestart = false
@ -250,7 +250,9 @@ export default class Assist {
if (args[0] === 'messages' || args[0] === 'UPDATE_SESSION') {
return
}
app.debug.log('Socket:', ...args)
if (args[0] !== 'webrtc_call_ice_candidate') {
app.debug.log('Socket:', ...args)
};
socket.on('close', (e) => {
app.debug.warn('Socket closed:', e);
})
@ -398,10 +400,10 @@ export default class Assist {
delete this.agents[id]
Object.values(this.calls).forEach(pc => pc.close())
this.calls = {}
this.calls.clear();
recordingState.stopAgentRecording(id)
endAgentCall(id)
endAgentCall({ socketId: id })
})
socket.on('NO_AGENT', () => {
@ -411,19 +413,20 @@ export default class Assist {
if (recordingState.isActive) recordingState.stopRecording()
})
socket.on('call_end', (id) => {
if (!callingAgents.has(id)) {
app.debug.warn('Received call_end from unknown agent', id)
socket.on('call_end', (socketId, { data: callId }) => {
if (!callingAgents.has(socketId)) {
app.debug.warn('Received call_end from unknown agent', socketId)
return
}
endAgentCall(id)
endAgentCall({ socketId, callId })
})
socket.on('_agent_name', (id, info) => {
if (app.getTabId() !== info.meta.tabId) return
const name = info.data
callingAgents.set(id, name)
console.log('CALLING AGENTS', callingAgents)
updateCallerNames()
})
@ -474,7 +477,10 @@ export default class Assist {
})
socket.on('webrtc_call_offer', async (_, data: { from: string, offer: RTCSessionDescriptionInit }) => {
await handleIncomingCallOffer(data.from, data.offer);
console.log("OFFER FROM", data.from)
if (!this.calls.has(data.from)) {
await handleIncomingCallOffer(data.from, data.offer);
}
});
socket.on('webrtc_call_ice_candidate', async (data: { from: string, candidate: RTCIceCandidateInit }) => {
@ -495,20 +501,31 @@ export default class Assist {
function updateCallerNames() {
callUI?.setAssistentName(callingAgents)
}
function endAgentCall(id: string) {
callingAgents.delete(id)
function endAgentCall({ socketId, callId }: { socketId: string, callId?: string }) {
callingAgents.delete(socketId)
console.log("CALLING AGENTS", callingAgents)
if (callingAgents.size === 0) {
handleCallEnd()
} else {
updateCallerNames()
if (callId) {
handleCallEndWithAgent(callId)
}
}
}
const handleCallEndWithAgent = (id: string) => {
console.log("!!!!", this.calls.get(id))
this.calls.get(id)?.close()
this.calls.delete(id)
}
// call end handling
const handleCallEnd = () => {
Object.values(this.calls).forEach(pc => pc.close())
this.calls = {}
this.calls.clear();
Object.values(lStreams).forEach((stream) => { stream.stop() })
Object.keys(lStreams).forEach((peerId: string) => { delete lStreams[peerId] })
// UI
@ -629,7 +646,7 @@ export default class Assist {
};
// Keep connection with the caller
this.calls[from] = pc;
this.calls.set(from, pc);
// set remote description on incoming request
await pc.setRemoteDescription(new RTCSessionDescription(offer));
@ -641,11 +658,11 @@ export default class Assist {
socket.emit('webrtc_call_answer', { from, answer });
// If the state changes to an error, we terminate the call
pc.onconnectionstatechange = () => {
if (pc.connectionState === 'disconnected' || pc.connectionState === 'failed') {
initiateCallEnd();
}
};
// pc.onconnectionstatechange = () => {
// if (pc.connectionState === 'disconnected' || pc.connectionState === 'failed') {
// initiateCallEnd();
// }
// };
// Update track when local video changes
lStreams[from].onVideoTrack(vTrack => {
@ -665,7 +682,7 @@ export default class Assist {
// when everything is set, we change the state to true
this.setCallingState(CallingState.True)
if (!callEndCallback) { callEndCallback = this.options.onCallStart?.() }
const callingPeerIdsNow = Object.keys(this.calls)
const callingPeerIdsNow = Array.from(this.calls.keys())
// in session storage we write down everyone with whom the call is established
sessionStorage.setItem(this.options.session_calling_peer_key, JSON.stringify(callingPeerIdsNow))
this.emit('UPDATE_SESSION', { agentIds: callingPeerIdsNow, isCallActive: true })
@ -723,7 +740,7 @@ export default class Assist {
app.nodes.attachNodeCallback((node) => {
const id = app.nodes.getID(node)
if (id && hasTag(node, 'canvas')) {
if (id && hasTag(node, 'canvas') && !app.sanitizer.isHidden(id)) {
// app.debug.log(`Creating stream for canvas ${id}`)
const canvasHandler = new Canvas(
node as unknown as HTMLCanvasElement,
@ -789,7 +806,7 @@ export default class Assist {
}
this.cleanCanvasConnections();
Object.values(this.calls).forEach(pc => pc.close())
this.calls = {}
this.calls.clear();
if (this.socket) {
this.socket.disconnect()
this.app.debug.log('Socket disconnected')