Compare commits

...
Sign in to create a new pull request.

1 commit

Author SHA1 Message Date
nick-delirium
1fb10c3e72
tracker: add buffer batching for offline recordings 2025-01-31 11:42:37 +01:00
2 changed files with 43 additions and 19 deletions

View file

@ -1295,7 +1295,10 @@ export default class App {
* @resolve - if messages were loaded in service worker successfully * @resolve - if messages were loaded in service worker successfully
* @reject {string} - error message * @reject {string} - error message
* */ * */
public async uploadOfflineRecording() { public async uploadOfflineRecording({
throttleBatchSize = 10000,
throttleDelay = 10,
}: { throttleBatchSize?: number; throttleDelay?: number } = {}): Promise<void> {
this.stop(false) this.stop(false)
const timestamp = now() const timestamp = now()
this.worker?.postMessage({ this.worker?.postMessage({
@ -1351,7 +1354,11 @@ export default class App {
userState, userState,
}) })
while (this.bufferedMessages1.length > 0) { while (this.bufferedMessages1.length > 0) {
await this.flushBuffer(this.bufferedMessages1) await this.flushBuffer({
buffer: this.bufferedMessages1,
throttleBatchSize,
throttleDelay,
})
} }
this.postToWorker([['q_end']] as unknown as Message[]) this.postToWorker([['q_end']] as unknown as Message[])
this.clearBuffers() this.clearBuffers()
@ -1564,7 +1571,7 @@ export default class App {
? this.bufferedMessages1 ? this.bufferedMessages1
: this.bufferedMessages2 : this.bufferedMessages2
while (biggestBuffer.length > 0) { while (biggestBuffer.length > 0) {
await this.flushBuffer(biggestBuffer) await this.flushBuffer({ buffer: biggestBuffer })
} }
this.clearBuffers() this.clearBuffers()
this.commit() this.commit()
@ -1636,8 +1643,15 @@ export default class App {
this.canvasRecorder?.restartTracking() this.canvasRecorder?.restartTracking()
} }
flushBuffer = async (buffer: Message[]) => { flushBuffer = async ({
return new Promise((res) => { buffer,
throttleBatchSize = 10000,
throttleDelay = 10,
}: {
buffer: Message[]
throttleBatchSize?: number
throttleDelay?: number
}) => {
let ended = false let ended = false
const messagesBatch: Message[] = [buffer.shift() as unknown as Message] const messagesBatch: Message[] = [buffer.shift() as unknown as Message]
while (!ended) { while (!ended) {
@ -1647,10 +1661,15 @@ export default class App {
} else { } else {
messagesBatch.push(buffer.shift() as unknown as Message) messagesBatch.push(buffer.shift() as unknown as Message)
} }
} if (messagesBatch.length > throttleBatchSize) {
this.postToWorker(messagesBatch) this.postToWorker(messagesBatch)
res(null) messagesBatch.length = 0
}) await delay(throttleDelay)
}
}
if (messagesBatch.length) {
this.postToWorker(messagesBatch)
}
} }
onUxtCb = [] onUxtCb = []

View file

@ -371,17 +371,22 @@ export default class API {
} }
/** /**
* @param {Object} options - to control batching:
* - throttleBatchSize - number of messages to send in one batch; default 10_000
* - throttleDelay - delay between batches; default 10ms
* Uploads the stored session buffer to backend * Uploads the stored session buffer to backend
* @returns promise that resolves once messages are loaded, it has to be awaited * @returns promise that resolves once messages are loaded, it has to be awaited
* so the session can be uploaded properly * so the session can be uploaded properly
* @resolve - if messages were loaded into service worker successfully * @resolve - if messages were loaded into service worker successfully
* @reject {string} - error message * @reject {string} - error message
* @param options
**/ **/
uploadOfflineRecording() { uploadOfflineRecording(options: { throttleBatchSize?: number; throttleDelay?: number }) {
const { throttleBatchSize = 10000, throttleDelay = 10 } = options
if (this.app === null) { if (this.app === null) {
return return
} }
return this.app.uploadOfflineRecording() return this.app.uploadOfflineRecording({ throttleBatchSize, throttleDelay })
} }
stop(): string | undefined { stop(): string | undefined {