feat ui/tracker: add compression to assist plugin (#2186)

* feat ui/tracker: add compression to assist plugin

* fix: changelog, package.json changes etc

* feat tracker: split big batches in addition to compression
This commit is contained in:
Delirium 2024-05-24 16:49:48 +02:00 committed by GitHub
parent 62a3078c22
commit 297be2bc9c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 56 additions and 9 deletions

View file

@ -258,7 +258,6 @@ export default class MessageManager {
);
}
});
if (
this.waitingForFiles ||
(this.lastMessageTime <= t &&

View file

@ -1,5 +1,6 @@
import MessageManager from 'Player/web/MessageManager';
import type { Socket } from 'socket.io-client';
import { Message } from "../messages";
import type Screen from '../Screen/Screen';
import type { PlayerMsg, Store } from 'App/player';
import MStreamReader from '../messages/MStreamReader';
@ -8,6 +9,7 @@ import Call, { CallingState } from './Call';
import RemoteControl, { RemoteControlStatus } from './RemoteControl';
import ScreenRecording, { SessionRecordingStatus } from './ScreenRecording';
import CanvasReceiver from 'Player/web/assist/CanvasReceiver';
import { gunzipSync } from 'fflate';
export { RemoteControlStatus, SessionRecordingStatus, CallingState };
@ -196,7 +198,7 @@ export default class AssistManager {
this.setStatus(ConnectionStatus.WaitingMessages);
});
socket.on('messages', (messages) => {
const processMessages = (messages: { meta: { version: number, tabId: string }, data: Message[] }) => {
const isOldVersion = messages.meta.version === 1;
this.assistVersion = isOldVersion ? 1 : 2;
@ -217,7 +219,17 @@ export default class AssistManager {
for (let msg = reader.readNext(); msg !== null; msg = reader.readNext()) {
this.handleMessage(msg, msg._index);
}
}
socket.on('messages', (messages) => {
processMessages(messages)
});
socket.on('messages_gz', (gzBuf) => {
const unpackData = gunzipSync(new Uint8Array(gzBuf.data));
const str = new TextDecoder().decode(unpackData);
const messages = JSON.parse(str);
processMessages({ ...gzBuf, data: messages })
})
socket.on('SESSION_RECONNECTED', () => {
this.clearDisconnectTimeout();

View file

@ -55,6 +55,13 @@ export default class Call {
this.store.update({ calling: CallingState.NoCall });
}
});
socket.on('messages_gz', () => {
if (reconnecting) {
// 'messages' come frequently, so it is better to have Reconnecting
this._callSessionPeer();
reconnecting = false;
}
})
socket.on('messages', () => {
if (reconnecting) {
// 'messages' come frequently, so it is better to have Reconnecting

View file

@ -1,3 +1,7 @@
## 9.0.0
- support for message compression inside plugin
## 8.0.4
- fix for multiple canvas streaming

Binary file not shown.

View file

@ -1,7 +1,7 @@
{
"name": "@openreplay/tracker-assist",
"description": "Tracker plugin for screen assistance through the WebRTC",
"version": "8.0.4",
"version": "9.0.0",
"keywords": [
"WebRTC",
"assistance",
@ -30,6 +30,7 @@
},
"dependencies": {
"csstype": "^3.0.10",
"fflate": "^0.8.2",
"peerjs": "1.5.1",
"socket.io-client": "^4.7.2"
},

View file

@ -5,7 +5,7 @@ import Peer, { MediaConnection, } from 'peerjs'
import type { Properties, } from 'csstype'
import { App, } from '@openreplay/tracker'
import RequestLocalStream, { LocalStream } from './LocalStream.js';
import RequestLocalStream, { LocalStream, } from './LocalStream.js'
import {hasTag,} from './guards.js'
import RemoteControl, { RCStatus, } from './RemoteControl.js'
import CallWindow from './CallWindow.js'
@ -16,7 +16,7 @@ import type { Options as ConfirmOptions, } from './ConfirmWindow/defaults.js'
import ScreenRecordingState from './ScreenRecordingState.js'
import { pkgVersion, } from './version.js'
import Canvas from './Canvas.js'
import { gzip, } from 'fflate'
// TODO: fully specified strict check with no-any (everywhere)
// @ts-ignore
const safeCastedPeer = Peer.default || Peer
@ -54,6 +54,12 @@ export interface Options {
config: RTCConfiguration;
serverURL: string
callUITemplate?: string;
compressionEnabled: boolean;
/**
* Minimum amount of messages in a batch to trigger compression run
* @default 5000
* */
compressionMinBatchSize: number
}
@ -107,6 +113,8 @@ export default class Assist {
controlConfirm: {}, // TODO: clear options passing/merging/overwriting
recordingConfirm: {},
socketHost: '',
compressionEnabled: false,
compressionMinBatchSize: 5000,
},
options,
)
@ -144,9 +152,25 @@ export default class Assist {
})
app.attachCommitCallback((messages) => {
if (this.agentsConnected) {
const batchSize = messages.length
// @ts-ignore No need in statistics messages. TODO proper filter
if (messages.length === 2 && messages[0]._id === 0 && messages[1]._id === 49) { return }
this.emit('messages', messages)
if (batchSize === 2 && messages[0]._id === 0 && messages[1]._id === 49) { return }
if (batchSize > this.options.compressionMinBatchSize && this.options.compressionEnabled) {
while (messages.length > 0) {
const batch = messages.splice(0, this.options.compressionMinBatchSize)
const str = JSON.stringify(batch)
const byteArr = new TextEncoder().encode(str)
gzip(byteArr, { mtime: 0, }, (err, result) => {
if (err) {
this.emit('messages', batch)
} else {
this.emit('messages_gz', result)
}
})
}
} else {
this.emit('messages', messages)
}
}
})
app.session.attachUpdateCallback(sessInfo => this.emit('UPDATE_SESSION', sessInfo))

View file

@ -1 +1 @@
export const pkgVersion = "8.0.4";
export const pkgVersion = '8.0.5-4'

View file

@ -1,7 +1,7 @@
{
"name": "@openreplay/tracker",
"description": "The OpenReplay tracker main package",
"version": "12.0.12",
"version": "12.0.13-0",
"keywords": [
"logging",
"replay"