tracker: add buffer batching for offline recordings

This commit is contained in:
nick-delirium 2025-01-31 11:42:37 +01:00
parent 12f4d9a10c
commit 1fb10c3e72
No known key found for this signature in database
GPG key ID: 93ABD695DF5FDBA0
2 changed files with 43 additions and 19 deletions

View file

@ -1295,7 +1295,10 @@ export default class App {
* @resolve - if messages were loaded in service worker successfully
* @reject {string} - error message
* */
public async uploadOfflineRecording() {
public async uploadOfflineRecording({
throttleBatchSize = 10000,
throttleDelay = 10,
}: { throttleBatchSize?: number; throttleDelay?: number } = {}): Promise<void> {
this.stop(false)
const timestamp = now()
this.worker?.postMessage({
@ -1351,7 +1354,11 @@ export default class App {
userState,
})
while (this.bufferedMessages1.length > 0) {
await this.flushBuffer(this.bufferedMessages1)
await this.flushBuffer({
buffer: this.bufferedMessages1,
throttleBatchSize,
throttleDelay,
})
}
this.postToWorker([['q_end']] as unknown as Message[])
this.clearBuffers()
@ -1564,7 +1571,7 @@ export default class App {
? this.bufferedMessages1
: this.bufferedMessages2
while (biggestBuffer.length > 0) {
await this.flushBuffer(biggestBuffer)
await this.flushBuffer({ buffer: biggestBuffer })
}
this.clearBuffers()
this.commit()
@ -1636,21 +1643,33 @@ export default class App {
this.canvasRecorder?.restartTracking()
}
flushBuffer = async (buffer: Message[]) => {
return new Promise((res) => {
let ended = false
const messagesBatch: Message[] = [buffer.shift() as unknown as Message]
while (!ended) {
const nextMsg = buffer[0]
if (!nextMsg || nextMsg[0] === MType.Timestamp) {
ended = true
} else {
messagesBatch.push(buffer.shift() as unknown as Message)
}
flushBuffer = async ({
buffer,
throttleBatchSize = 10000,
throttleDelay = 10,
}: {
buffer: Message[]
throttleBatchSize?: number
throttleDelay?: number
}) => {
let ended = false
const messagesBatch: Message[] = [buffer.shift() as unknown as Message]
while (!ended) {
const nextMsg = buffer[0]
if (!nextMsg || nextMsg[0] === MType.Timestamp) {
ended = true
} else {
messagesBatch.push(buffer.shift() as unknown as Message)
}
if (messagesBatch.length > throttleBatchSize) {
this.postToWorker(messagesBatch)
messagesBatch.length = 0
await delay(throttleDelay)
}
}
if (messagesBatch.length) {
this.postToWorker(messagesBatch)
res(null)
})
}
}
onUxtCb = []

View file

@ -371,17 +371,22 @@ export default class API {
}
/**
* @param {Object} options - to control batching:
* - throttleBatchSize - number of messages to send in one batch; default 10_000
* - throttleDelay - delay between batches; default 10ms
* Uploads the stored session buffer to backend
* @returns promise that resolves once messages are loaded, it has to be awaited
* so the session can be uploaded properly
* @resolve - if messages were loaded into service worker successfully
* @reject {string} - error message
* */
uploadOfflineRecording() {
* @param options
**/
uploadOfflineRecording(options: { throttleBatchSize?: number; throttleDelay?: number }) {
const { throttleBatchSize = 10000, throttleDelay = 10 } = options
if (this.app === null) {
return
}
return this.app.uploadOfflineRecording()
return this.app.uploadOfflineRecording({ throttleBatchSize, throttleDelay })
}
stop(): string | undefined {