fix(tracker): do not send stop-message to WebWorker during worker-initiated stop() call

This commit is contained in:
Alex Kaminskii 2022-09-19 17:38:59 +02:00
parent 2fae193d8b
commit 2bd57e2704
2 changed files with 32 additions and 20 deletions

View file

@ -155,10 +155,10 @@ export default class App {
}
this.worker.onmessage = ({ data }: MessageEvent) => {
if (data === 'failed') {
this.stop()
this.stop(false)
this._debug('worker_failed', {}) // add context (from worker)
} else if (data === 'restart') {
this.stop()
this.stop(false)
this.start({ forceNew: true })
}
}

View file

@ -18,11 +18,25 @@ let sender: QueueSender | null = null
let writer: BatchWriter | null = null
let workerStatus: WorkerStatus = WorkerStatus.NotActive
function send(): void {
function finalize(): void {
if (!writer) {
return
}
writer.finaliseBatch()
writer.finaliseBatch() // TODO: force sendAll?
}
function resetWriter(): void {
if (writer) {
writer.clean()
writer = null
}
}
function resetSender(): void {
if (sender) {
sender.clean()
sender = null
}
}
function reset(): void {
@ -31,18 +45,17 @@ function reset(): void {
clearInterval(sendIntervalID)
sendIntervalID = null
}
if (writer) {
writer.clean()
writer = null
}
resetWriter()
resetSender()
workerStatus = WorkerStatus.NotActive
}
function resetCleanQueue(): void {
if (sender) {
sender.clean()
sender = null
}
function initiateRestart(): void {
self.postMessage('restart')
reset()
}
function initiateFailure(): void {
self.postMessage('failed')
reset()
}
@ -51,11 +64,11 @@ let restartTimeoutID: ReturnType<typeof setTimeout>
self.onmessage = ({ data }: MessageEvent<WorkerMessageData>): any => {
if (data == null) {
send() // TODO: sendAll?
finalize()
return
}
if (data === 'stop') {
send()
finalize()
reset()
return
}
@ -70,7 +83,7 @@ self.onmessage = ({ data }: MessageEvent<WorkerMessageData>): any => {
if (message[0] === MType.SetPageVisibility) {
if (message[1]) {
// .hidden
restartTimeoutID = setTimeout(() => self.postMessage('restart'), 30 * 60 * 1000)
restartTimeoutID = setTimeout(() => initiateRestart(), 30 * 60 * 1000)
} else {
clearTimeout(restartTimeoutID)
}
@ -86,12 +99,11 @@ self.onmessage = ({ data }: MessageEvent<WorkerMessageData>): any => {
data.ingestPoint,
() => {
// onUnauthorised
self.postMessage('restart')
initiateRestart()
},
() => {
// onFailure
resetCleanQueue()
self.postMessage('failed')
initiateFailure()
},
data.connAttemptCount,
data.connAttemptGap,
@ -104,7 +116,7 @@ self.onmessage = ({ data }: MessageEvent<WorkerMessageData>): any => {
(batch) => sender && sender.push(batch),
)
if (sendIntervalID === null) {
sendIntervalID = setInterval(send, AUTO_SEND_INTERVAL)
sendIntervalID = setInterval(finalize, AUTO_SEND_INTERVAL)
}
return (workerStatus = WorkerStatus.Active)
}