diff --git a/frontend/app/player/web/MessageManager.ts b/frontend/app/player/web/MessageManager.ts index 1eea15095..104a9365a 100644 --- a/frontend/app/player/web/MessageManager.ts +++ b/frontend/app/player/web/MessageManager.ts @@ -228,7 +228,7 @@ export default class MessageManager { } const sorted = msgs.sort((m1, m2) => { // @ts-ignore - if (m1.time === m2.time) return m1._index - m2._index + if (!m1.time || !m2.time || m1.time === m2.time) return m1._index - m2._index return m1.time - m2.time }) @@ -256,9 +256,8 @@ export default class MessageManager { loadFiles(loadMethod.url, loadMethod.parser()) // EFS fallback - .catch((e) => - requestEFSDom(this.session.sessionId) - .then(createNewParser(false, 'domEFS')) + .catch((e) => requestEFSDom(this.session.sessionId) + .then(createNewParser(false, 'domEFS')) ) .then(this.onFileReadSuccess) .catch(this.onFileReadFailed) diff --git a/frontend/app/player/web/network/loadFiles.ts b/frontend/app/player/web/network/loadFiles.ts index 8d8142716..888fd257f 100644 --- a/frontend/app/player/web/network/loadFiles.ts +++ b/frontend/app/player/web/network/loadFiles.ts @@ -15,7 +15,7 @@ export async function loadFiles( try { for (let url of urls) { const response = await window.fetch(url) - const data = await processAPIStreamResponse(response, url !== url[0]) + const data = await processAPIStreamResponse(response, url !== urls[0]) onData(data) } } catch(e) { diff --git a/tracker/tracker/src/webworker/PrimitiveEncoder.ts b/tracker/tracker/src/webworker/PrimitiveEncoder.ts index caf0b2db7..ac3e98a03 100644 --- a/tracker/tracker/src/webworker/PrimitiveEncoder.ts +++ b/tracker/tracker/src/webworker/PrimitiveEncoder.ts @@ -114,14 +114,4 @@ export default class PrimitiveEncoder { this.reset() return data } - - encodeCompressed(message: Uint8Array) { - const length = message.byteLength - if (!this.uint(length) || this.offset + length > this.size) { - return false - } - this.data.set(message, this.offset) - this.offset += length - return true - } } diff --git a/tracker/tracker/src/webworker/QueueSender.ts b/tracker/tracker/src/webworker/QueueSender.ts index 7eff9c662..f46040b3e 100644 --- a/tracker/tracker/src/webworker/QueueSender.ts +++ b/tracker/tracker/src/webworker/QueueSender.ts @@ -1,5 +1,4 @@ const INGEST_PATH = '/v1/web/i' - const KEEPALIVE_SIZE_LIMIT = 64 << 10 // 64 kB export default class QueueSender { @@ -32,6 +31,7 @@ export default class QueueSender { if (this.busy || !this.token) { this.queue.push(batch) } else { + this.busy = true this.onCompress(batch) } } @@ -39,20 +39,21 @@ export default class QueueSender { private sendNext() { const nextBatch = this.queue.shift() if (nextBatch) { + this.busy = true this.onCompress(nextBatch) } else { this.busy = false } } - private retry(batch: Uint8Array): void { + private retry(batch: Uint8Array, isCompressed?: boolean): void { if (this.attemptsCount >= this.MAX_ATTEMPTS_COUNT) { this.onFailure(`Failed to send batch after ${this.attemptsCount} attempts.`) // remains this.busy === true return } this.attemptsCount++ - setTimeout(() => this.onCompress(batch), this.ATTEMPT_TIMEOUT * this.attemptsCount) + setTimeout(() => this.sendBatch(batch, isCompressed), this.ATTEMPT_TIMEOUT * this.attemptsCount) } // would be nice to use Beacon API, but it is not available in WebWorker @@ -80,7 +81,7 @@ export default class QueueSender { this.onUnauthorised() return } else if (r.status >= 400) { - this.retry(batch) + this.retry(batch, isCompressed) return } @@ -90,7 +91,7 @@ export default class QueueSender { }) .catch((e: any) => { console.warn('OpenReplay:', e) - this.retry(batch) + this.retry(batch, isCompressed) }) } @@ -99,7 +100,7 @@ export default class QueueSender { } sendUncompressed(batch: Uint8Array) { - this.sendBatch(batch) + this.sendBatch(batch, false) } clean() { diff --git a/tracker/tracker/src/webworker/index.ts b/tracker/tracker/src/webworker/index.ts index 45418d54d..cd32ec5cf 100644 --- a/tracker/tracker/src/webworker/index.ts +++ b/tracker/tracker/src/webworker/index.ts @@ -1,9 +1,7 @@ // Do strong type WebWorker as soon as it is possible: // https://github.com/microsoft/TypeScript/issues/14877 // At the moment "webworker" lib conflicts with jest-environment-jsdom that uses "dom" lib -// -import type Message from '../common/messages.gen.js' import { Type as MType } from '../common/messages.gen.js' import { ToWorkerData, FromWorkerData } from '../common/interaction.js' @@ -103,6 +101,23 @@ self.onmessage = ({ data }: any): any => { return } + if (data.type === 'compressed') { + if (!sender) { + console.debug('WebWorker: sender not initialised. Received auth.') + initiateRestart() + return + } + sender.sendCompressed(data.batch) + } + if (data.type === 'uncompressed') { + if (!sender) { + console.debug('WebWorker: sender not initialised. Received auth.') + initiateRestart() + return + } + sender.sendUncompressed(data.batch) + } + if (data.type === 'start') { workerStatus = WorkerStatus.Starting sender = new QueueSender( @@ -133,12 +148,6 @@ self.onmessage = ({ data }: any): any => { } return (workerStatus = WorkerStatus.Active) } - if (data.type === 'compressed') { - sender?.sendCompressed(data.batch) - } - if (data.type === 'uncompressed') { - sender?.sendUncompressed(data.batch) - } if (data.type === 'auth') { if (!sender) {