From e5985f376eec4a591c36554ba7c78de74cd3241d Mon Sep 17 00:00:00 2001 From: ShiKhu Date: Fri, 15 Apr 2022 20:09:31 +0200 Subject: [PATCH] feat(tracker): 3.5.6: use fetch-keepalive; webworker refactor; isInstance fix --- tracker/tracker/package.json | 2 +- tracker/tracker/src/main/app/context.ts | 6 +- tracker/tracker/src/main/app/index.ts | 34 +-- tracker/tracker/src/main/app/session.ts | 1 - tracker/tracker/src/main/index.ts | 4 +- tracker/tracker/src/messages/webworker.ts | 17 -- tracker/tracker/src/webworker/BatchWriter.ts | 71 ++++++ tracker/tracker/src/webworker/QueueSender.ts | 84 +++++++ tracker/tracker/src/webworker/index.ts | 231 +++++++------------ tracker/tracker/src/webworker/types.ts | 19 ++ 10 files changed, 282 insertions(+), 187 deletions(-) delete mode 100644 tracker/tracker/src/messages/webworker.ts create mode 100644 tracker/tracker/src/webworker/BatchWriter.ts create mode 100644 tracker/tracker/src/webworker/QueueSender.ts create mode 100644 tracker/tracker/src/webworker/types.ts diff --git a/tracker/tracker/package.json b/tracker/tracker/package.json index 9c7920ad7..089f95f24 100644 --- a/tracker/tracker/package.json +++ b/tracker/tracker/package.json @@ -1,7 +1,7 @@ { "name": "@openreplay/tracker", "description": "The OpenReplay tracker main package", - "version": "3.5.5", + "version": "3.5.6", "keywords": [ "logging", "replay" diff --git a/tracker/tracker/src/main/app/context.ts b/tracker/tracker/src/main/app/context.ts index fd7ec11dd..87e4df20a 100644 --- a/tracker/tracker/src/main/app/context.ts +++ b/tracker/tracker/src/main/app/context.ts @@ -32,6 +32,8 @@ type Constructor = { new (...args: any[]): T , name: string }; // TODO: we need a type expert here so we won't have to ignore the lines // TODO: use it everywhere (static function; export from which file? <-- global Window typing required) + // TODO: most efficient and common way + // Problem: on YouTube there is context[constr.name] undefined for constr=ShadowDom due to some minimisations export function isInstance(node: Node, constr: Constructor): node is T { const doc = node.ownerDocument; if (!doc) { // null if Document @@ -43,14 +45,14 @@ export function isInstance(node: Node, constr: Cons doc.defaultView; // TODO: smart global typing for Window object while(context !== window) { // @ts-ignore - if (node instanceof context[constr.name]) { + if (context[constr.name] && node instanceof context[constr.name]) { return true } // @ts-ignore context = context.parent || window } // @ts-ignore - return node instanceof context[constr.name] + return context[constr.name] ? node instanceof context[constr.name] : node instanceof constr } // TODO: ensure 1. it works in every cases (iframes/detached nodes) and 2. the most efficient diff --git a/tracker/tracker/src/main/app/index.ts b/tracker/tracker/src/main/app/index.ts index de9518af6..8fd2cb4ac 100644 --- a/tracker/tracker/src/main/app/index.ts +++ b/tracker/tracker/src/main/app/index.ts @@ -15,7 +15,7 @@ import type { Options as SanitizerOptions } from "./sanitizer.js"; import type { Options as LoggerOptions } from "./logger.js" -import type { Options as WebworkerOptions, WorkerMessageData } from "../../messages/webworker.js"; +import type { Options as WebworkerOptions, WorkerMessageData } from "../../webworker/types.js"; export interface OnStartInfo { sessionID: string, @@ -23,10 +23,12 @@ export interface OnStartInfo { userUUID: string, } + +// TODO: Unify and clearly describe options logic export interface StartOptions { userID?: string, metadata?: Record, - forceNew: boolean, + forceNew?: boolean, } type AppOptions = { @@ -46,12 +48,12 @@ type AppOptions = { // @deprecated onStart?: (info: OnStartInfo) => void; -} & WebworkerOptions; +} & WebworkerOptions; export type Options = AppOptions & ObserverOptions & SanitizerOptions -type Callback = () => void; -type CommitCallback = (messages: Array) => void; +type Callback = () => void +type CommitCallback = (messages: Array) => void enum ActivityState { NotActive, Starting, @@ -130,13 +132,13 @@ export default class App { this._debug("webworker_error", e) } this.worker.onmessage = ({ data }: MessageEvent) => { - if (data === null) { + if (data === "failed") { this.stop(); } else if (data === "restart") { this.stop(); this.start({ forceNew: true }); } - }; + } const alertWorker = () => { if (this.worker) { this.worker.postMessage(null); @@ -331,14 +333,15 @@ export default class App { sessionStorage.setItem(this.options.session_pageno_key, pageNo.toString()); const startInfo = this.getStartInfo() - const messageData: WorkerMessageData = { - ingestPoint: this.options.ingestPoint, + const startWorkerMsg: WorkerMessageData = { + type: "start", pageNo, - startTimestamp: startInfo.timestamp, + ingestPoint: this.options.ingestPoint, + timestamp: startInfo.timestamp, connAttemptCount: this.options.connAttemptCount, connAttemptGap: this.options.connAttemptGap, } - this.worker.postMessage(messageData); // brings delay of 10th ms? + this.worker.postMessage(startWorkerMsg) // brings delay of 10th ms? const sReset = sessionStorage.getItem(this.options.session_reset_key); sessionStorage.removeItem(this.options.session_reset_key); @@ -385,7 +388,12 @@ export default class App { }); this.activityState = ActivityState.Active - this.worker.postMessage({ token, beaconSizeLimit }); + const startWorkerMsg: WorkerMessageData = { + type: "auth", + token, + beaconSizeLimit + } + this.worker.postMessage(startWorkerMsg) this.startCallbacks.forEach((cb) => cb()); this.observer.observe(); this.ticker.start(); @@ -411,7 +419,7 @@ export default class App { }) } - start(options: StartOptions = { forceNew: false }): Promise { + start(options: StartOptions = {}): Promise { if (!document.hidden) { return this._start(options); } else { diff --git a/tracker/tracker/src/main/app/session.ts b/tracker/tracker/src/main/app/session.ts index ec5dadf51..169241aa7 100644 --- a/tracker/tracker/src/main/app/session.ts +++ b/tracker/tracker/src/main/app/session.ts @@ -78,7 +78,6 @@ export default class Session { sessionID: this.sessionID, metadata: this.metadata, userID: this.userID, - } } } \ No newline at end of file diff --git a/tracker/tracker/src/main/index.ts b/tracker/tracker/src/main/index.ts index 8961d6e54..a55df1d1c 100644 --- a/tracker/tracker/src/main/index.ts +++ b/tracker/tracker/src/main/index.ts @@ -151,7 +151,7 @@ export default class API { return this.app.active(); } - start(startOpts?: StartOptions) : Promise { + start(startOpts?: Partial) : Promise { if (!IN_BROWSER) { console.error(`OpenReplay: you are trying to start Tracker on a node.js environment. If you want to use OpenReplay with SSR, please, use componentDidMount or useEffect API for placing the \`tracker.start()\` line. Check documentation on ${DOCS_HOST}${DOCS_SETUP}`) return Promise.reject("Trying to start not in browser."); @@ -159,7 +159,7 @@ export default class API { if (this.app === null) { return Promise.reject("Browser doesn't support required api, or doNotTrack is active."); } - // TODO: check argument typing + // TODO: check argument type return this.app.start(startOpts); } stop(): void { diff --git a/tracker/tracker/src/messages/webworker.ts b/tracker/tracker/src/messages/webworker.ts deleted file mode 100644 index e4bc97818..000000000 --- a/tracker/tracker/src/messages/webworker.ts +++ /dev/null @@ -1,17 +0,0 @@ -// TODO: "common" folder instead of "messages". (better file structure) -export interface Options { - connAttemptCount?: number; - connAttemptGap?: number; - beaconSize?: number; -} - -type Settings = { - ingestPoint?: string; - token?: string; - pageNo?: number; - startTimestamp?: number; - timeAdjustment?: number; - beaconSizeLimit?: number; -} & Partial; - -export type WorkerMessageData = null | "stop" | Settings | Array<{ _id: number }>; \ No newline at end of file diff --git a/tracker/tracker/src/webworker/BatchWriter.ts b/tracker/tracker/src/webworker/BatchWriter.ts new file mode 100644 index 000000000..de0e956ef --- /dev/null +++ b/tracker/tracker/src/webworker/BatchWriter.ts @@ -0,0 +1,71 @@ +import Writer from "../messages/writer.js"; +import Message from "../messages/message.js"; +import { + BatchMeta, + Timestamp, +} from "../messages/index.js"; + +export default class BatchWriter { + private nextIndex = 0 + private beaconSize = 2 * 1e5 // Default 200kB + private writer = new Writer(this.beaconSize) + private isEmpty = true + + constructor( + private readonly pageNo: number, + private timestamp: number, + private onBatch: (batch: Uint8Array) => void + ) { + this.prepareBatchMeta() + } + + private prepareBatchMeta(): boolean { + return new BatchMeta(this.pageNo, this.nextIndex, this.timestamp).encode(this.writer) + } + + private beaconSizeLimit = 1e6 + setBeaconSizeLimit(limit: number) { + this.beaconSizeLimit = limit + } + + writeMessage(message: Message) { + if (message instanceof Timestamp) { + this.timestamp = (message).timestamp; + } + + if (!message.encode(this.writer)) { + if (!this.isEmpty) { + this.onBatch(this.writer.flush()) + this.prepareBatchMeta() + } + + while (!message.encode(this.writer)) { + if (this.beaconSize === this.beaconSizeLimit) { + console.warn("OpenReplay: beacon size overflow. Skipping large message."); + this.writer.reset() + this.prepareBatchMeta() + this.isEmpty = true + return + } + // MBTODO: tempWriter for one message? + this.beaconSize = Math.min(this.beaconSize*2, this.beaconSizeLimit) + this.writer = new Writer(this.beaconSize) + this.prepareBatchMeta() + } + } + this.writer.checkpoint() + this.nextIndex++ + this.isEmpty = false + } + + flush(): Uint8Array | null { + if (this.isEmpty) { return null } + this.isEmpty = true + return this.writer.flush() + } + + clean() { + this.writer.reset() + } + +} \ No newline at end of file diff --git a/tracker/tracker/src/webworker/QueueSender.ts b/tracker/tracker/src/webworker/QueueSender.ts new file mode 100644 index 000000000..b2cb4d3eb --- /dev/null +++ b/tracker/tracker/src/webworker/QueueSender.ts @@ -0,0 +1,84 @@ +const INGEST_PATH = "/v1/web/i" + +export default class QueueSender { + private attemptsCount = 0 + private busy = false + private readonly queue: Array = [] + private readonly ingestURL + private token: string | null = null + constructor( + ingestBaseURL: string, + private readonly onUnauthorised: Function, + private readonly onFailure: Function, + private readonly MAX_ATTEMPTS_COUNT = 10, + private readonly ATTEMPT_TIMEOUT = 1000, + ) { + this.ingestURL = ingestBaseURL + INGEST_PATH + } + + authorise(token: string) { + this.token = token + } + + push(batch: Uint8Array) { + if (this.busy || !this.token) { + this.queue.push(batch) + } else { + this.busy = true + this.sendBatch(batch) + } + } + + private retry(batch: Uint8Array) { + if (this.attemptsCount >= this.MAX_ATTEMPTS_COUNT) { + this.onFailure() + return + } + this.attemptsCount++ + setTimeout(() => this.sendBatch(batch), this.ATTEMPT_TIMEOUT * this.attemptsCount) + } + + // would be nice to use Beacon API, but it is not available in WebWorker + private sendBatch(batch: Uint8Array):void { + fetch(this.ingestURL, { + body: batch, + method: 'POST', + headers: { + "Authorization": "Bearer " + this.token, + //"Content-Type": "", + }, + keepalive: true, + }) + .then(r => { + if (r.status === 401) { // TODO: continuous session ? + this.busy = false + this.onUnauthorised() + return + } else if (r.status >= 400) { + this.retry(batch) + return + } + + // Success + this.attemptsCount = 0 + const nextBatch = this.queue.shift() + if (nextBatch) { + this.sendBatch(nextBatch) + } else { + this.busy = false + } + }) + .catch(e => { + this.retry(batch) + }) // Does it handle offline exceptions (?) + + } + + clean() { + this.queue.length = 0 + } + +} + + + diff --git a/tracker/tracker/src/webworker/index.ts b/tracker/tracker/src/webworker/index.ts index f598ac0a4..519deeb0f 100644 --- a/tracker/tracker/src/webworker/index.ts +++ b/tracker/tracker/src/webworker/index.ts @@ -1,178 +1,107 @@ -import { classes, BatchMeta, Timestamp, SetPageVisibility, CreateDocument } from "../messages/index.js"; import Message from "../messages/message.js"; -import Writer from "../messages/writer.js"; +import { + classes, + SetPageVisibility, +} from "../messages/index.js"; +import QueueSender from "./QueueSender.js"; +import BatchWriter from "./BatchWriter.js"; -import type { WorkerMessageData } from "../messages/webworker.js"; +import type { WorkerMessageData } from "./types.js"; -const SEND_INTERVAL = 10 * 1000; -let BEACON_SIZE_LIMIT = 1e6 // Limit is set in the backend/services/http -let beaconSize = 2 * 1e5; // Default 400kB +const AUTO_SEND_INTERVAL = 10 * 1000 - -let writer: Writer = new Writer(beaconSize); - -let ingestPoint: string = ""; -let token: string = ""; -let pageNo: number = 0; -let timestamp: number = 0; -let timeAdjustment: number = 0; -let nextIndex: number = 0; -// TODO: clear logic: isEmpty here means presence of BatchMeta but absence of other messages -// BatchWriter should be abstracted -let isEmpty: boolean = true; - -function writeBatchMeta(): boolean { // TODO: move to encoder - return new BatchMeta(pageNo, nextIndex, timestamp).encode(writer) -} - -let sendIntervalID: ReturnType | null = null; - -const sendQueue: Array = []; -let busy = false; -let attemptsCount = 0; -let ATTEMPT_TIMEOUT = 3000; -let MAX_ATTEMPTS_COUNT = 10; - -// TODO?: exploit https://developer.mozilla.org/en-US/docs/Web/API/Navigator/sendBeacon -function sendBatch(batch: Uint8Array):void { - const xhr = new XMLHttpRequest(); - // TODO: async=false (3d param) instead of sendQueue array ? - xhr.open("POST", ingestPoint + "/v1/web/i", false); - xhr.setRequestHeader("Authorization", "Bearer " + token); - // xhr.setRequestHeader("Content-Type", ""); - - function retry() { - if (attemptsCount >= MAX_ATTEMPTS_COUNT) { - reset(); - self.postMessage(null); - return - } - attemptsCount++; - setTimeout(() => sendBatch(batch), ATTEMPT_TIMEOUT); - } - xhr.onreadystatechange = function() { - if (this.readyState === 4) { - if (this.status == 0) { - return; // happens simultaneously with onerror TODO: clear codeflow - } - if (this.status === 401) { // Unauthorised (Token expired) - busy = false - self.postMessage("restart") - return - } else if (this.status >= 400) { // TODO: test workflow. After 400+ it calls /start for some reason - retry() - return - } - // Success - attemptsCount = 0 - const nextBatch = sendQueue.shift(); - if (nextBatch) { - sendBatch(nextBatch); - } else { - busy = false; - } - } - }; - xhr.onerror = retry // TODO: when in Offline mode it doesn't handle the error - // TODO: handle offline exception (?) - xhr.send(batch.buffer); -} +let sender: QueueSender | null = null +let writer: BatchWriter | null = null function send(): void { - if (isEmpty || token === "" || ingestPoint === "") { - return; + if (!sender || !writer) { + return } - const batch = writer.flush(); - if (busy) { - sendQueue.push(batch); - } else { - busy = true; - sendBatch(batch); - } - isEmpty = true; - writeBatchMeta(); + const batch = writer.flush() + batch && sender.push(batch) } + function reset() { - ingestPoint = "" - token = "" if (sendIntervalID !== null) { clearInterval(sendIntervalID); sendIntervalID = null; } - sendQueue.length = 0; - writer.reset(); + if (writer) { + writer.clean() + writer = null + } } -let restartTimeoutID: ReturnType; - -function hasTimestamp(msg: any): msg is { timestamp: number } { - return typeof msg === 'object' && typeof msg.timestamp === 'number'; +function resetCleanQueue() { + if (sender) { + sender.clean() + sender = null + } + reset() } +let sendIntervalID: ReturnType | null = null +let restartTimeoutID: ReturnType + self.onmessage = ({ data }: MessageEvent) => { - if (data === null) { - send(); - return; + if (data == null) { + send() // TODO: sendAll? + return } + if (data === "stop") { - send(); - reset(); - return; + send() + reset() + return } - if (!Array.isArray(data)) { - ingestPoint = data.ingestPoint || ingestPoint; - token = data.token || token; - pageNo = data.pageNo || pageNo; - timestamp = data.startTimestamp || timestamp; - timeAdjustment = data.timeAdjustment || timeAdjustment; - MAX_ATTEMPTS_COUNT = data.connAttemptCount || MAX_ATTEMPTS_COUNT; - ATTEMPT_TIMEOUT = data.connAttemptGap || ATTEMPT_TIMEOUT; - BEACON_SIZE_LIMIT = data.beaconSizeLimit || BEACON_SIZE_LIMIT; - beaconSize = Math.min(BEACON_SIZE_LIMIT, data.beaconSize || beaconSize); - if (writer.isEmpty()) { - writeBatchMeta(); - } - if (sendIntervalID === null) { - sendIntervalID = setInterval(send, SEND_INTERVAL); - } - return; - } - data.forEach((data) => { - const message: Message = new (classes.get(data._id))(); - Object.assign(message, data); - if (message instanceof Timestamp) { - timestamp = (message).timestamp; - } else if (message instanceof SetPageVisibility) { - if ( (message).hidden) { - restartTimeoutID = setTimeout(() => self.postMessage("restart"), 30*60*1000); - } else { - clearTimeout(restartTimeoutID); - } - } - - writer.checkpoint(); // TODO: encapsulate in writer - if (!message.encode(writer)) { - send(); - // writer.reset(); // TODO: semantically clear code - if (!message.encode(writer)) { // Try to encode within empty state - // MBTODO: tempWriter for one message? - while (!message.encode(writer)) { - if (beaconSize === BEACON_SIZE_LIMIT) { - console.warn("OpenReplay: beacon size overflow."); - writer.reset(); - writeBatchMeta(); - return - } - beaconSize = Math.min(beaconSize*2, BEACON_SIZE_LIMIT); - writer = new Writer(beaconSize); - writeBatchMeta(); + if (Array.isArray(data)) { + // Message[] + data.forEach((data) => { + const message: Message = new (classes.get(data._id))(); + Object.assign(message, data) + if (message instanceof SetPageVisibility) { + if ( (message).hidden) { + restartTimeoutID = setTimeout(() => self.postMessage("restart"), 30*60*1000) + } else { + clearTimeout(restartTimeoutID) } } - }; - nextIndex++; // TODO: encapsulate in writer - isEmpty = false; - }); + writer && writer.writeMessage(message) + }) + return + } + + if (data.type === 'start') { + sender = new QueueSender( + data.ingestPoint, + () => { // onUnauthorised + self.postMessage("restart") + }, + () => { // onFailure + resetCleanQueue() + self.postMessage("failed") + }, + data.connAttemptCount, + data.connAttemptGap, + ) + writer = new BatchWriter( + data.pageNo, + data.timestamp, + // onBatch + batch => sender && sender.push(batch) + ) + if (sendIntervalID === null) { + sendIntervalID = setInterval(send, AUTO_SEND_INTERVAL) + } + return + } + + if (data.type === "auth") { + sender && sender.authorise(data.token) + data.beaconSizeLimit && writer && writer.setBeaconSizeLimit(data.beaconSizeLimit) + return + } }; diff --git a/tracker/tracker/src/webworker/types.ts b/tracker/tracker/src/webworker/types.ts new file mode 100644 index 000000000..34ac7f582 --- /dev/null +++ b/tracker/tracker/src/webworker/types.ts @@ -0,0 +1,19 @@ +export interface Options { + connAttemptCount?: number + connAttemptGap?: number +} + +type Start = { + type: "start", + ingestPoint: string + pageNo: number + timestamp: number +} & Options + +type Auth = { + type: "auth" + token: string + beaconSizeLimit?: number +} + +export type WorkerMessageData = null | "stop" | Start | Auth | Array<{ _id: number }> \ No newline at end of file