change(tracker): compress inside tracker, then post batch to sender

This commit is contained in:
nick-delirium 2023-04-06 16:04:37 +02:00 committed by Delirium
parent ab279697a4
commit 312600ee56
4 changed files with 35 additions and 13 deletions

View file

@ -19,11 +19,21 @@ type Auth = {
beaconSizeLimit?: number
}
export type ToWorkerData = null | 'stop' | Start | Auth | Array<Message>
export type ToWorkerData =
| null
| 'stop'
| Start
| Auth
| Array<Message>
| { type: 'compressed'; batch: Uint8Array }
type Failure = {
type: 'failure'
reason: string
}
export type FromWorkerData = 'restart' | Failure | 'not_init'
export type FromWorkerData =
| 'restart'
| Failure
| 'not_init'
| { type: 'compress'; batch: Uint8Array }

View file

@ -7,7 +7,7 @@ import Sanitizer from './sanitizer.js'
import Ticker from './ticker.js'
import Logger, { LogLevel } from './logger.js'
import Session from './session.js'
import { gzip, strToU8 } from 'fflate'
import { gzip } from 'fflate'
import { deviceMemory, jsHeapSizeLimit } from '../modules/performance.js'
import type { Options as ObserverOptions } from './observer/top_observer.js'
@ -22,6 +22,9 @@ import type {
FromWorkerData,
} from '../../common/interaction.js'
// @ts-ignore
window.gzip = gzip
interface TypedWorker extends Omit<Worker, 'postMessage'> {
postMessage(data: ToWorkerData): void
}
@ -175,6 +178,11 @@ export default class App {
} else if (data.type === 'failure') {
this.stop(false)
this._debug('worker_failed', data.reason)
} else if (data.type === 'compress') {
gzip(data.batch, (err, result) => {
if (err) console.error(err)
this.worker?.postMessage({ type: 'compressed', batch: result })
})
}
}
const alertWorker = () => {
@ -208,13 +216,6 @@ export default class App {
private _usingOldFetchPlugin = false
send(message: Message, urgent = false): void {
// @ts-ignore
const compressedMessage = message.reduce((acc, curr, index) => {
if (index === 0) return curr
// @ts-ignore
return gzip(strToU8(curr.toString()), (err, data) => data)
}, [])
console.log(message, compressedMessage)
if (this.activityState === ActivityState.NotActive) {
return
}

View file

@ -8,12 +8,14 @@ export default class QueueSender {
private readonly queue: Array<Uint8Array> = []
private readonly ingestURL
private token: string | null = null
constructor(
ingestBaseURL: string,
private readonly onUnauthorised: () => any,
private readonly onFailure: (reason: string) => any,
private readonly MAX_ATTEMPTS_COUNT = 10,
private readonly ATTEMPT_TIMEOUT = 1000,
private readonly onCompress: (batch: Uint8Array) => any,
) {
this.ingestURL = ingestBaseURL + INGEST_PATH
}
@ -30,14 +32,14 @@ export default class QueueSender {
if (this.busy || !this.token) {
this.queue.push(batch)
} else {
this.sendBatch(batch)
this.onCompress(batch)
}
}
private sendNext() {
const nextBatch = this.queue.shift()
if (nextBatch) {
this.sendBatch(nextBatch)
this.onCompress(nextBatch)
} else {
this.busy = false
}
@ -50,7 +52,7 @@ export default class QueueSender {
return
}
this.attemptsCount++
setTimeout(() => this.sendBatch(batch), this.ATTEMPT_TIMEOUT * this.attemptsCount)
setTimeout(() => this.onCompress(batch), this.ATTEMPT_TIMEOUT * this.attemptsCount)
}
// would be nice to use Beacon API, but it is not available in WebWorker
@ -64,6 +66,7 @@ export default class QueueSender {
headers: {
Authorization: `Bearer ${this.token as string}`,
//"Content-Type": "",
'Content-Encoding': 'gzip',
},
keepalive: batch.length < KEEPALIVE_SIZE_LIMIT,
})
@ -88,6 +91,10 @@ export default class QueueSender {
})
}
sendCompressed(batch: Uint8Array) {
this.sendBatch(batch)
}
clean() {
this.queue.length = 0
this.token = null

View file

@ -117,6 +117,7 @@ self.onmessage = ({ data }: any): any => {
},
data.connAttemptCount,
data.connAttemptGap,
(batch) => postMessage({ type: 'compress', batch }),
)
writer = new BatchWriter(
data.pageNo,
@ -130,6 +131,9 @@ self.onmessage = ({ data }: any): any => {
}
return (workerStatus = WorkerStatus.Active)
}
if (data.type === 'compressed') {
sender?.sendCompressed(data.batch)
}
if (data.type === 'auth') {
if (!sender) {