From 1fb10c3e7289cb4e4bc5424eb1c68692cf6a5ab9 Mon Sep 17 00:00:00 2001 From: nick-delirium Date: Fri, 31 Jan 2025 11:42:37 +0100 Subject: [PATCH] tracker: add buffer batching for offline recordings --- tracker/tracker/src/main/app/index.ts | 51 ++++++++++++++++++--------- tracker/tracker/src/main/index.ts | 11 ++++-- 2 files changed, 43 insertions(+), 19 deletions(-) diff --git a/tracker/tracker/src/main/app/index.ts b/tracker/tracker/src/main/app/index.ts index 24c40b5c5..e10237c86 100644 --- a/tracker/tracker/src/main/app/index.ts +++ b/tracker/tracker/src/main/app/index.ts @@ -1295,7 +1295,10 @@ export default class App { * @resolve - if messages were loaded in service worker successfully * @reject {string} - error message * */ - public async uploadOfflineRecording() { + public async uploadOfflineRecording({ + throttleBatchSize = 10000, + throttleDelay = 10, + }: { throttleBatchSize?: number; throttleDelay?: number } = {}): Promise { this.stop(false) const timestamp = now() this.worker?.postMessage({ @@ -1351,7 +1354,11 @@ export default class App { userState, }) while (this.bufferedMessages1.length > 0) { - await this.flushBuffer(this.bufferedMessages1) + await this.flushBuffer({ + buffer: this.bufferedMessages1, + throttleBatchSize, + throttleDelay, + }) } this.postToWorker([['q_end']] as unknown as Message[]) this.clearBuffers() @@ -1564,7 +1571,7 @@ export default class App { ? this.bufferedMessages1 : this.bufferedMessages2 while (biggestBuffer.length > 0) { - await this.flushBuffer(biggestBuffer) + await this.flushBuffer({ buffer: biggestBuffer }) } this.clearBuffers() this.commit() @@ -1636,21 +1643,33 @@ export default class App { this.canvasRecorder?.restartTracking() } - flushBuffer = async (buffer: Message[]) => { - return new Promise((res) => { - let ended = false - const messagesBatch: Message[] = [buffer.shift() as unknown as Message] - while (!ended) { - const nextMsg = buffer[0] - if (!nextMsg || nextMsg[0] === MType.Timestamp) { - ended = true - } else { - messagesBatch.push(buffer.shift() as unknown as Message) - } + flushBuffer = async ({ + buffer, + throttleBatchSize = 10000, + throttleDelay = 10, + }: { + buffer: Message[] + throttleBatchSize?: number + throttleDelay?: number + }) => { + let ended = false + const messagesBatch: Message[] = [buffer.shift() as unknown as Message] + while (!ended) { + const nextMsg = buffer[0] + if (!nextMsg || nextMsg[0] === MType.Timestamp) { + ended = true + } else { + messagesBatch.push(buffer.shift() as unknown as Message) } + if (messagesBatch.length > throttleBatchSize) { + this.postToWorker(messagesBatch) + messagesBatch.length = 0 + await delay(throttleDelay) + } + } + if (messagesBatch.length) { this.postToWorker(messagesBatch) - res(null) - }) + } } onUxtCb = [] diff --git a/tracker/tracker/src/main/index.ts b/tracker/tracker/src/main/index.ts index 8af32da55..10bee7094 100644 --- a/tracker/tracker/src/main/index.ts +++ b/tracker/tracker/src/main/index.ts @@ -371,17 +371,22 @@ export default class API { } /** + * @param {Object} options - to control batching: + * - throttleBatchSize - number of messages to send in one batch; default 10_000 + * - throttleDelay - delay between batches; default 10ms * Uploads the stored session buffer to backend * @returns promise that resolves once messages are loaded, it has to be awaited * so the session can be uploaded properly * @resolve - if messages were loaded into service worker successfully * @reject {string} - error message - * */ - uploadOfflineRecording() { + * @param options + **/ + uploadOfflineRecording(options: { throttleBatchSize?: number; throttleDelay?: number }) { + const { throttleBatchSize = 10000, throttleDelay = 10 } = options if (this.app === null) { return } - return this.app.uploadOfflineRecording() + return this.app.uploadOfflineRecording({ throttleBatchSize, throttleDelay }) } stop(): string | undefined {