Merge pull request #542 from openreplay/tracker-wworker-writer-bug

Worker console fix
*worker activity state introduced 
*late worker stop 
*BatchWriter refactor
This commit is contained in:
Alex K 2022-06-22 14:15:56 +02:00 committed by GitHub
commit 33eca54031
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 54 additions and 42 deletions

View file

@ -248,7 +248,7 @@ export default class MessageDistributor extends StatedScreen {
}
});
update(stateToUpdate);
Object.keys(stateToUpdate).length > 0 && update(stateToUpdate);
/* Sequence of the managers is important here */
// Preparing the size of "screen"

View file

@ -16,4 +16,4 @@ type Auth = {
beaconSizeLimit?: number
}
export type WorkerMessageData = null | "stop" | Start | Auth | Array<{ _id: number }>
export type WorkerMessageData = null | "stop" | Start | Auth | Array<{ _id: number }>

View file

@ -15,7 +15,6 @@ import type { Options as SanitizerOptions } from "./sanitizer.js";
import type { Options as LoggerOptions } from "./logger.js"
import type { Options as WebworkerOptions, WorkerMessageData } from "../../common/webworker.js";
// TODO: Unify and clearly describe options logic
export interface StartOptions {
userID?: string,
@ -398,7 +397,6 @@ export default class App {
...startOpts
});
this.activityState = ActivityState.Active
const startWorkerMsg: WorkerMessageData = {
type: "auth",
token,
@ -406,6 +404,9 @@ export default class App {
}
this.worker.postMessage(startWorkerMsg)
this.activityState = ActivityState.Active
const onStartInfo = { sessionToken: token, userUUID, sessionID };
this.startCallbacks.forEach((cb) => cb(onStartInfo));
@ -448,15 +449,15 @@ export default class App {
stop(): void {
if (this.activityState !== ActivityState.NotActive) {
try {
if (this.worker) {
this.worker.postMessage("stop")
}
this.sanitizer.clear()
this.observer.disconnect()
this.nodes.clear()
this.ticker.stop()
this.stopCallbacks.forEach((cb) => cb())
this.notify.log("OpenReplay tracking stopped.")
if (this.worker) {
this.worker.postMessage("stop")
}
} finally {
this.activityState = ActivityState.NotActive
}

View file

@ -16,11 +16,24 @@ export default class BatchWriter {
private timestamp: number,
private onBatch: (batch: Uint8Array) => void
) {
this.prepareBatchMeta()
this.prepare()
}
private prepareBatchMeta(): boolean {
return new BatchMeta(this.pageNo, this.nextIndex, this.timestamp).encode(this.writer)
private prepare(): void {
if (!this.writer.isEmpty()) {
return
}
new BatchMeta(this.pageNo, this.nextIndex, this.timestamp).encode(this.writer)
}
private write(message: Message): boolean {
const wasWritten = message.encode(this.writer)
if (wasWritten) {
this.isEmpty = false
this.writer.checkpoint()
this.nextIndex++
}
return wasWritten
}
private beaconSizeLimit = 1e6
@ -28,41 +41,31 @@ export default class BatchWriter {
this.beaconSizeLimit = limit
}
// TODO: clear workflow
writeMessage(message: Message) {
if (message instanceof Timestamp) {
this.timestamp = (<any>message).timestamp
}
if (!message.encode(this.writer)) {
if (!this.isEmpty) {
this.onBatch(this.writer.flush())
this.prepareBatchMeta()
}
while (!message.encode(this.writer)) {
if (this.beaconSize === this.beaconSizeLimit) {
console.warn("OpenReplay: beacon size overflow. Skipping large message.");
this.writer.reset()
this.prepareBatchMeta()
this.isEmpty = true
return
}
// MBTODO: tempWriter for one message?
this.beaconSize = Math.min(this.beaconSize*2, this.beaconSizeLimit)
this.writer = new PrimitiveWriter(this.beaconSize)
this.prepareBatchMeta()
while (!this.write(message)) {
this.finaliseBatch()
if (this.beaconSize === this.beaconSizeLimit) {
console.warn("OpenReplay: beacon size overflow. Skipping large message.");
this.writer.reset()
this.prepare()
this.isEmpty = true
return
}
// MBTODO: tempWriter for one message?
this.beaconSize = Math.min(this.beaconSize*2, this.beaconSizeLimit)
this.writer = new PrimitiveWriter(this.beaconSize)
this.prepare()
this.isEmpty = true
}
this.writer.checkpoint()
this.nextIndex++
this.isEmpty = false
}
finaliseBatch() {
if (this.isEmpty) { return }
this.onBatch(this.writer.flush())
this.prepareBatchMeta()
this.prepare()
this.isEmpty = true
}
@ -70,4 +73,4 @@ export default class BatchWriter {
this.writer.reset()
}
}
}

View file

@ -1,5 +1,5 @@
import type Message from "../common/messages.js";
import type { WorkerMessageData } from "../common/webworker.js";
import { WorkerMessageData } from "../common/webworker.js";
import {
classes,
@ -9,12 +9,18 @@ import {
import QueueSender from "./QueueSender.js";
import BatchWriter from "./BatchWriter.js";
enum WorkerStatus {
NotActive,
Starting,
Stopping,
Active
}
const AUTO_SEND_INTERVAL = 10 * 1000
let sender: QueueSender | null = null
let writer: BatchWriter | null = null
let workerStatus: WorkerStatus = WorkerStatus.NotActive;
function send(): void {
if (!writer) {
@ -25,6 +31,7 @@ function send(): void {
function reset() {
workerStatus = WorkerStatus.Stopping
if (sendIntervalID !== null) {
clearInterval(sendIntervalID);
sendIntervalID = null;
@ -33,6 +40,7 @@ function reset() {
writer.clean()
writer = null
}
workerStatus = WorkerStatus.NotActive
}
function resetCleanQueue() {
@ -51,7 +59,6 @@ self.onmessage = ({ data }: MessageEvent<WorkerMessageData>) => {
send() // TODO: sendAll?
return
}
if (data === "stop") {
send()
reset()
@ -60,7 +67,7 @@ self.onmessage = ({ data }: MessageEvent<WorkerMessageData>) => {
if (Array.isArray(data)) {
if (!writer) {
throw new Error("WebWorker: writer not initialised.")
throw new Error("WebWorker: writer not initialised. Service Should be Started.")
}
const w = writer
// Message[]
@ -80,6 +87,7 @@ self.onmessage = ({ data }: MessageEvent<WorkerMessageData>) => {
}
if (data.type === 'start') {
workerStatus = WorkerStatus.Starting
sender = new QueueSender(
data.ingestPoint,
() => { // onUnauthorised
@ -101,15 +109,15 @@ self.onmessage = ({ data }: MessageEvent<WorkerMessageData>) => {
if (sendIntervalID === null) {
sendIntervalID = setInterval(send, AUTO_SEND_INTERVAL)
}
return
return workerStatus = WorkerStatus.Active
}
if (data.type === "auth") {
if (!sender) {
throw new Error("WebWorker: sender not initialised. Recieved auth.")
throw new Error("WebWorker: sender not initialised. Received auth.")
}
if (!writer) {
throw new Error("WebWorker: writer not initialised. Recieved auth.")
throw new Error("WebWorker: writer not initialised. Received auth.")
}
sender.authorise(data.token)
data.beaconSizeLimit && writer.setBeaconSizeLimit(data.beaconSizeLimit)