diff --git a/frontend/app/player/MessageDistributor/MessageDistributor.ts b/frontend/app/player/MessageDistributor/MessageDistributor.ts index 582cd7211..1abd26979 100644 --- a/frontend/app/player/MessageDistributor/MessageDistributor.ts +++ b/frontend/app/player/MessageDistributor/MessageDistributor.ts @@ -248,7 +248,7 @@ export default class MessageDistributor extends StatedScreen { } }); - update(stateToUpdate); + Object.keys(stateToUpdate).length > 0 && update(stateToUpdate); /* Sequence of the managers is important here */ // Preparing the size of "screen" diff --git a/tracker/tracker/src/common/webworker.ts b/tracker/tracker/src/common/webworker.ts index 34ac7f582..3f6de9c35 100644 --- a/tracker/tracker/src/common/webworker.ts +++ b/tracker/tracker/src/common/webworker.ts @@ -16,4 +16,4 @@ type Auth = { beaconSizeLimit?: number } -export type WorkerMessageData = null | "stop" | Start | Auth | Array<{ _id: number }> \ No newline at end of file +export type WorkerMessageData = null | "stop" | Start | Auth | Array<{ _id: number }> diff --git a/tracker/tracker/src/main/app/index.ts b/tracker/tracker/src/main/app/index.ts index 4f5487bfa..e7dab811e 100644 --- a/tracker/tracker/src/main/app/index.ts +++ b/tracker/tracker/src/main/app/index.ts @@ -15,7 +15,6 @@ import type { Options as SanitizerOptions } from "./sanitizer.js"; import type { Options as LoggerOptions } from "./logger.js" import type { Options as WebworkerOptions, WorkerMessageData } from "../../common/webworker.js"; - // TODO: Unify and clearly describe options logic export interface StartOptions { userID?: string, @@ -398,7 +397,6 @@ export default class App { ...startOpts }); - this.activityState = ActivityState.Active const startWorkerMsg: WorkerMessageData = { type: "auth", token, @@ -406,6 +404,9 @@ export default class App { } this.worker.postMessage(startWorkerMsg) + + this.activityState = ActivityState.Active + const onStartInfo = { sessionToken: token, userUUID, sessionID }; this.startCallbacks.forEach((cb) => cb(onStartInfo)); @@ -448,15 +449,15 @@ export default class App { stop(): void { if (this.activityState !== ActivityState.NotActive) { try { - if (this.worker) { - this.worker.postMessage("stop") - } this.sanitizer.clear() this.observer.disconnect() this.nodes.clear() this.ticker.stop() this.stopCallbacks.forEach((cb) => cb()) this.notify.log("OpenReplay tracking stopped.") + if (this.worker) { + this.worker.postMessage("stop") + } } finally { this.activityState = ActivityState.NotActive } diff --git a/tracker/tracker/src/webworker/BatchWriter.ts b/tracker/tracker/src/webworker/BatchWriter.ts index 7ddf97dbe..181663300 100644 --- a/tracker/tracker/src/webworker/BatchWriter.ts +++ b/tracker/tracker/src/webworker/BatchWriter.ts @@ -16,11 +16,24 @@ export default class BatchWriter { private timestamp: number, private onBatch: (batch: Uint8Array) => void ) { - this.prepareBatchMeta() + this.prepare() } - private prepareBatchMeta(): boolean { - return new BatchMeta(this.pageNo, this.nextIndex, this.timestamp).encode(this.writer) + private prepare(): void { + if (!this.writer.isEmpty()) { + return + } + new BatchMeta(this.pageNo, this.nextIndex, this.timestamp).encode(this.writer) + } + + private write(message: Message): boolean { + const wasWritten = message.encode(this.writer) + if (wasWritten) { + this.isEmpty = false + this.writer.checkpoint() + this.nextIndex++ + } + return wasWritten } private beaconSizeLimit = 1e6 @@ -28,41 +41,31 @@ export default class BatchWriter { this.beaconSizeLimit = limit } - // TODO: clear workflow writeMessage(message: Message) { if (message instanceof Timestamp) { this.timestamp = (message).timestamp } - - if (!message.encode(this.writer)) { - if (!this.isEmpty) { - this.onBatch(this.writer.flush()) - this.prepareBatchMeta() - } - - while (!message.encode(this.writer)) { - if (this.beaconSize === this.beaconSizeLimit) { - console.warn("OpenReplay: beacon size overflow. Skipping large message."); - this.writer.reset() - this.prepareBatchMeta() - this.isEmpty = true - return - } - // MBTODO: tempWriter for one message? - this.beaconSize = Math.min(this.beaconSize*2, this.beaconSizeLimit) - this.writer = new PrimitiveWriter(this.beaconSize) - this.prepareBatchMeta() + while (!this.write(message)) { + this.finaliseBatch() + if (this.beaconSize === this.beaconSizeLimit) { + console.warn("OpenReplay: beacon size overflow. Skipping large message."); + this.writer.reset() + this.prepare() + this.isEmpty = true + return } + // MBTODO: tempWriter for one message? + this.beaconSize = Math.min(this.beaconSize*2, this.beaconSizeLimit) + this.writer = new PrimitiveWriter(this.beaconSize) + this.prepare() + this.isEmpty = true } - this.writer.checkpoint() - this.nextIndex++ - this.isEmpty = false } finaliseBatch() { if (this.isEmpty) { return } this.onBatch(this.writer.flush()) - this.prepareBatchMeta() + this.prepare() this.isEmpty = true } @@ -70,4 +73,4 @@ export default class BatchWriter { this.writer.reset() } -} \ No newline at end of file +} diff --git a/tracker/tracker/src/webworker/index.ts b/tracker/tracker/src/webworker/index.ts index 3e9341a2d..7d1cb0393 100644 --- a/tracker/tracker/src/webworker/index.ts +++ b/tracker/tracker/src/webworker/index.ts @@ -1,5 +1,5 @@ import type Message from "../common/messages.js"; -import type { WorkerMessageData } from "../common/webworker.js"; +import { WorkerMessageData } from "../common/webworker.js"; import { classes, @@ -9,12 +9,18 @@ import { import QueueSender from "./QueueSender.js"; import BatchWriter from "./BatchWriter.js"; - +enum WorkerStatus { + NotActive, + Starting, + Stopping, + Active +} const AUTO_SEND_INTERVAL = 10 * 1000 let sender: QueueSender | null = null let writer: BatchWriter | null = null +let workerStatus: WorkerStatus = WorkerStatus.NotActive; function send(): void { if (!writer) { @@ -25,6 +31,7 @@ function send(): void { function reset() { + workerStatus = WorkerStatus.Stopping if (sendIntervalID !== null) { clearInterval(sendIntervalID); sendIntervalID = null; @@ -33,6 +40,7 @@ function reset() { writer.clean() writer = null } + workerStatus = WorkerStatus.NotActive } function resetCleanQueue() { @@ -51,7 +59,6 @@ self.onmessage = ({ data }: MessageEvent) => { send() // TODO: sendAll? return } - if (data === "stop") { send() reset() @@ -60,7 +67,7 @@ self.onmessage = ({ data }: MessageEvent) => { if (Array.isArray(data)) { if (!writer) { - throw new Error("WebWorker: writer not initialised.") + throw new Error("WebWorker: writer not initialised. Service Should be Started.") } const w = writer // Message[] @@ -80,6 +87,7 @@ self.onmessage = ({ data }: MessageEvent) => { } if (data.type === 'start') { + workerStatus = WorkerStatus.Starting sender = new QueueSender( data.ingestPoint, () => { // onUnauthorised @@ -101,15 +109,15 @@ self.onmessage = ({ data }: MessageEvent) => { if (sendIntervalID === null) { sendIntervalID = setInterval(send, AUTO_SEND_INTERVAL) } - return + return workerStatus = WorkerStatus.Active } if (data.type === "auth") { if (!sender) { - throw new Error("WebWorker: sender not initialised. Recieved auth.") + throw new Error("WebWorker: sender not initialised. Received auth.") } if (!writer) { - throw new Error("WebWorker: writer not initialised. Recieved auth.") + throw new Error("WebWorker: writer not initialised. Received auth.") } sender.authorise(data.token) data.beaconSizeLimit && writer.setBeaconSizeLimit(data.beaconSizeLimit)