diff --git a/tracker/tracker/CHANGELOG.md b/tracker/tracker/CHANGELOG.md index 942005ce5..28a3c3149 100644 --- a/tracker/tracker/CHANGELOG.md +++ b/tracker/tracker/CHANGELOG.md @@ -1,3 +1,9 @@ +# 12.0.0 + +- offline session recording and manual sending +- conditional recording with 30s buffer +- websockets tracking hook + # 11.0.3 - move all logs under internal debugger diff --git a/tracker/tracker/package.json b/tracker/tracker/package.json index 01205688e..b99cc9dfb 100644 --- a/tracker/tracker/package.json +++ b/tracker/tracker/package.json @@ -1,7 +1,7 @@ { "name": "@openreplay/tracker", "description": "The OpenReplay tracker main package", - "version": "11.0.2-39", + "version": "12.0.0", "keywords": [ "logging", "replay" diff --git a/tracker/tracker/src/common/interaction.ts b/tracker/tracker/src/common/interaction.ts index 22d70bca3..a36c46b43 100644 --- a/tracker/tracker/src/common/interaction.ts +++ b/tracker/tracker/src/common/interaction.ts @@ -29,14 +29,20 @@ export type ToWorkerData = | { type: 'compressed'; batch: Uint8Array } | { type: 'uncompressed'; batch: Uint8Array } | 'forceFlushBatch' + | 'check_queue' type Failure = { type: 'failure' reason: string } +type QEmpty = { + type: 'queue_empty' +} + export type FromWorkerData = | 'restart' | Failure | 'not_init' | { type: 'compress'; batch: Uint8Array } + | QEmpty diff --git a/tracker/tracker/src/main/app/index.ts b/tracker/tracker/src/main/app/index.ts index 11d9249d1..fcf399837 100644 --- a/tracker/tracker/src/main/app/index.ts +++ b/tracker/tracker/src/main/app/index.ts @@ -152,7 +152,7 @@ export default class App { readonly sessionStorage: Storage private readonly messages: Array = [] /** - * we need 2 buffers so we don't lose anything + * we need 2 buffers, so we don't lose anything * @read coldStart implementation * */ private bufferedMessages1: Array = [] @@ -279,6 +279,8 @@ export default class App { } else { this.worker?.postMessage({ type: 'uncompressed', batch: batch }) } + } else if (data.type === 'queue_empty') { + this.onSessionSent() } } const alertWorker = () => { @@ -388,7 +390,7 @@ export default class App { // Clarify where urgent can be used for; // Clarify workflow for each type of message in case it was sent before start // (like Fetch before start; maybe add an option "preCapture: boolean" or sth alike) - // Careful: `this.delay` is equal to zero before start hense all Timestamp-s will have to be updated on start + // Careful: `this.delay` is equal to zero before start so all Timestamp-s will have to be updated on start if (this.activityState === ActivityState.Active && urgent) { this.commit() } @@ -712,6 +714,7 @@ export default class App { metadata: startOpts.metadata, }) if (!isNewSession) { + this.debug.log('continuing session on new tab', this.session.getTabId()) // eslint-disable-next-line @typescript-eslint/no-unsafe-argument this.send(TabChange(this.session.getTabId())) } @@ -724,16 +727,25 @@ export default class App { cycle() } - public offlineRecording(startOpts: StartOptions = {}) { + onSessionSent = () => { + return + } + + /** + * Starts offline session recording + * @param {Object} startOpts - options for session start, same as .start() + * @param {Function} onSessionSent - callback that will be called once session is fully sent + * */ + public offlineRecording(startOpts: StartOptions = {}, onSessionSent: () => void) { + this.onSessionSent = onSessionSent this.singleBuffer = true const isNewSession = this.checkSessionToken(startOpts.forceNew) adjustTimeOrigin() this.coldStartTs = now() - this.bufferedMessages1.length = 0 const saverBuffer = this.localStorage.getItem(bufferStorageKey) if (saverBuffer) { const data = JSON.parse(saverBuffer) - this.bufferedMessages1 = Array.isArray(data) ? data : [] + this.bufferedMessages1 = Array.isArray(data) ? data : this.bufferedMessages1 this.localStorage.removeItem(bufferStorageKey) } this.bufferedMessages1.push(Timestamp(this.timestamp())) @@ -751,12 +763,20 @@ export default class App { userID: startOpts.userID, metadata: startOpts.metadata, }) + const onStartInfo = { sessionToken: '', userUUID: '', sessionID: '' } + this.startCallbacks.forEach((cb) => cb(onStartInfo)) if (!isNewSession) { // eslint-disable-next-line @typescript-eslint/no-unsafe-argument this.send(TabChange(this.session.getTabId())) } this.observer.observe() this.ticker.start() + + return { + saveBuffer: this.saveBuffer, + getBuffer: this.getBuffer, + setBuffer: this.setBuffer, + } } /** @@ -771,11 +791,85 @@ export default class App { } /** - * Uploads the stored buffer to create session + * @returns buffer with stored messages for offline recording * */ - public uploadOfflineRecording() { + public getBuffer() { + return this.bufferedMessages1 + } + + /** + * Used to set a buffer with messages array + * */ + public setBuffer(buffer: Message[]) { + this.bufferedMessages1 = buffer + } + + /** + * Uploads the stored session buffer to backend + * @returns promise that resolves once messages are loaded, it has to be awaited + * so the session can be uploaded properly + * @resolve {boolean} - if messages were loaded successfully + * @reject {string} - error message + * */ + public async uploadOfflineRecording() { this.stop(false) - // then fetch it + const timestamp = now() + this.worker?.postMessage({ + type: 'start', + pageNo: this.session.incPageNo(), + ingestPoint: this.options.ingestPoint, + timestamp: this.coldStartTs, + url: document.URL, + connAttemptCount: this.options.connAttemptCount, + connAttemptGap: this.options.connAttemptGap, + tabId: this.session.getTabId(), + }) + const r = await fetch(this.options.ingestPoint + '/v1/web/start', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + ...this.getTrackerInfo(), + timestamp: timestamp, + doNotRecord: false, + bufferDiff: timestamp - this.coldStartTs, + userID: this.session.getInfo().userID, + token: undefined, + deviceMemory, + jsHeapSizeLimit, + timezone: getTimezone(), + }), + }) + const { + token, + userBrowser, + userCity, + userCountry, + userDevice, + userOS, + userState, + beaconSizeLimit, + projectID, + } = await r.json() + this.worker?.postMessage({ + type: 'auth', + token, + beaconSizeLimit, + }) + this.session.assign({ projectID }) + this.session.setUserInfo({ + userBrowser, + userCity, + userCountry, + userDevice, + userOS, + userState, + }) + while (this.bufferedMessages1.length > 0) { + await this.flushBuffer(this.bufferedMessages1) + } + this.postToWorker([['q_end']] as unknown as Message[]) this.clearBuffers() } @@ -825,7 +919,7 @@ export default class App { type: 'start', pageNo: this.session.incPageNo(), ingestPoint: this.options.ingestPoint, - timestamp, + timestamp: isColdStart ? this.coldStartTs : timestamp, url: document.URL, connAttemptCount: this.options.connAttemptCount, connAttemptGap: this.options.connAttemptGap, @@ -874,7 +968,7 @@ export default class App { ) } }) - .then((r) => { + .then(async (r) => { if (!this.worker) { const reason = 'no worker found after start request (this might not happen)' this.signalError(reason, []) @@ -951,32 +1045,19 @@ export default class App { this.compressionThreshold = compressionThreshold const onStartInfo = { sessionToken: token, userUUID, sessionID } - - const flushBuffer = (buffer: Message[]) => { - let ended = false - const messagesBatch: Message[] = [buffer.shift() as unknown as Message] - while (!ended) { - const nextMsg = buffer[0] - if (!nextMsg || nextMsg[0] === MType.Timestamp) { - ended = true - } else { - messagesBatch.push(buffer.shift() as unknown as Message) - } - } - this.postToWorker(messagesBatch) - } // TODO: start as early as possible (before receiving the token) this.startCallbacks.forEach((cb) => cb(onStartInfo)) // MBTODO: callbacks after DOM "mounted" (observed) void this.featureFlags.reloadFlags() - /** --------------- COLD START BUFFER ------------------*/ this.activityState = ActivityState.Active + + /** --------------- COLD START BUFFER ------------------*/ if (isColdStart) { - const biggestBurger = + const biggestBuffer = this.bufferedMessages1.length > this.bufferedMessages2.length ? this.bufferedMessages1 : this.bufferedMessages2 - while (biggestBurger.length > 0) { - flushBuffer(biggestBurger) + while (biggestBuffer.length > 0) { + await this.flushBuffer(biggestBuffer) } this.clearBuffers() this.commit() @@ -1045,6 +1126,23 @@ export default class App { }) } + flushBuffer = async (buffer: Message[]) => { + return new Promise((res) => { + let ended = false + const messagesBatch: Message[] = [buffer.shift() as unknown as Message] + while (!ended) { + const nextMsg = buffer[0] + if (!nextMsg || nextMsg[0] === MType.Timestamp) { + ended = true + } else { + messagesBatch.push(buffer.shift() as unknown as Message) + } + } + this.postToWorker(messagesBatch) + res(null) + }) + } + onUxtCb = [] addOnUxtCb(cb: (id: number) => void) { @@ -1090,7 +1188,12 @@ export default class App { return this.session.getTabId() } - /** + clearBuffers() { + this.bufferedMessages1.length = 0 + this.bufferedMessages2.length = 0 + } + + /** * Creates a named hook that expects event name, data string and msg direction (up/down), * it will skip any message bigger than 5 mb or event name bigger than 255 symbols * @returns {(msgType: string, data: string, dir: 'up' | 'down') => void} @@ -1109,11 +1212,6 @@ export default class App { this.send(WSChannel('websocket', channel, data, this.timestamp(), dir, msgType)) } } - - clearBuffers() { - this.bufferedMessages1.length = 0 - this.bufferedMessages2.length = 0 - } stop(stopWorker = true): void { if (this.activityState !== ActivityState.NotActive) { diff --git a/tracker/tracker/src/main/index.ts b/tracker/tracker/src/main/index.ts index 08871dcc1..4263525e3 100644 --- a/tracker/tracker/src/main/index.ts +++ b/tracker/tracker/src/main/index.ts @@ -315,6 +315,45 @@ export default class API { } } + /** + * Starts offline session recording. Keep in mind that only user device time will be used for timestamps. + * (no backend delay sync) + * + * @param {Object} startOpts - options for session start, same as .start() + * @param {Function} onSessionSent - callback that will be called once session is fully sent + * @returns methods to manipulate buffer: + * + * saveBuffer - to save it in localStorage + * + * getBuffer - returns current buffer + * + * setBuffer - replaces current buffer with given + * */ + startOfflineRecording(startOpts: Partial, onSessionSent: () => void) { + if (this.browserEnvCheck()) { + if (this.app === null) { + return Promise.reject('Tracker not initialized') + } + return this.app.offlineRecording(startOpts, onSessionSent) + } else { + return Promise.reject('Trying to start not in browser.') + } + } + + /** + * Uploads the stored session buffer to backend + * @returns promise that resolves once messages are loaded, it has to be awaited + * so the session can be uploaded properly + * @resolve {boolean} - if messages were loaded successfully + * @reject {string} - error message + * */ + uploadOfflineRecording() { + if (this.app === null) { + return + } + return this.app.uploadOfflineRecording() + } + stop(): string | undefined { if (this.app === null) { return diff --git a/tracker/tracker/src/webworker/BatchWriter.ts b/tracker/tracker/src/webworker/BatchWriter.ts index cf704f505..77fb352ba 100644 --- a/tracker/tracker/src/webworker/BatchWriter.ts +++ b/tracker/tracker/src/webworker/BatchWriter.ts @@ -18,6 +18,7 @@ export default class BatchWriter { private url: string, private readonly onBatch: (batch: Uint8Array) => void, private tabId: string, + private readonly onOfflineEnd: () => void, ) { this.prepare() } @@ -90,6 +91,11 @@ export default class BatchWriter { } writeMessage(message: Message) { + // @ts-ignore + if (message[0] === 'q_end') { + this.finaliseBatch() + return this.onOfflineEnd() + } if (message[0] === Messages.Type.Timestamp) { this.timestamp = message[1] // .timestamp } diff --git a/tracker/tracker/src/webworker/BatchWriter.unit.test.ts b/tracker/tracker/src/webworker/BatchWriter.unit.test.ts index cd0d8330e..b4c1030ec 100644 --- a/tracker/tracker/src/webworker/BatchWriter.unit.test.ts +++ b/tracker/tracker/src/webworker/BatchWriter.unit.test.ts @@ -9,7 +9,7 @@ describe('BatchWriter', () => { beforeEach(() => { onBatchMock = jest.fn() - batchWriter = new BatchWriter(1, 123456789, 'example.com', onBatchMock, '123') + batchWriter = new BatchWriter(1, 123456789, 'example.com', onBatchMock, '123', () => null) }) afterEach(() => { diff --git a/tracker/tracker/src/webworker/QueueSender.ts b/tracker/tracker/src/webworker/QueueSender.ts index 83b7e8f7b..3a41821ba 100644 --- a/tracker/tracker/src/webworker/QueueSender.ts +++ b/tracker/tracker/src/webworker/QueueSender.ts @@ -27,6 +27,10 @@ export default class QueueSender { } } + public getQueueStatus() { + return this.queue.length === 0 && !this.busy + } + authorise(token: string): void { this.token = token if (!this.busy) { diff --git a/tracker/tracker/src/webworker/index.ts b/tracker/tracker/src/webworker/index.ts index 3409b737d..d3874a825 100644 --- a/tracker/tracker/src/webworker/index.ts +++ b/tracker/tracker/src/webworker/index.ts @@ -156,6 +156,7 @@ self.onmessage = ({ data }: { data: ToWorkerData }): any => { sender && sender.push(batch) }, data.tabId, + () => postMessage({ type: 'queue_empty' }), ) if (sendIntervalID === null) { sendIntervalID = setInterval(finalize, AUTO_SEND_INTERVAL)