Msg buffering offline rec (#1823)

* feat(tracker) start message buffering support

* feat(tracker): buffered recordings

* feat(tracker): buffered recordings timedelay adjust

* fix(tracker): condition manager

* fix(tracker): conditions handlers

* fix(tracker): conditions

* fix(tracker): pre-fetch feature flags and conditions, fix naming and dnt check repeating

* fix(tracker): fix conditions fetch

* feat(tracker): test coverage for conditionsManager

* feat(tracker): some api connections

* feat(tracker): fix projid in session info

* feat(tracker): added fetch req status condition, partially added offline recording, type fixes

* fix(tracker): fix tests

* fix(tracker): fix network req c

* fix(tracker): fix conditions test

* feat(ui): conditional recording ui

* fix(tracker): fix prestart callbacks

* feat(ui): conditions ui and api stuff

* feat(ui): fix ?

* fix(tracker): map raw db response in tracker

* fix(tracker): fix condition processing, add cond name to trigger event, change unit tests

* fix(tracker): simplify mapping, rename functions

* fix(tracker): change toggler design, change network request condition

* fix(tracker): some formatting

* fix(tracker): reformat logging

* feat(tracker): buffered recordings timedelay adjust

* feat(tracker): fix projid in session info

* feat(tracker): some api connections

* feat(tracker): offline recording methods

* feat(tracker): api to send offline buffer

* fix(tracker) fix backend delay, fix network proxy check

* fix(tracker): fix test

* fix(tracker): 12.0.0 changelogs
This commit is contained in:
Delirium 2024-01-09 13:32:23 +01:00 committed by GitHub
parent 5e21d88e8c
commit c2ce67b893
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 196 additions and 36 deletions

View file

@ -1,3 +1,9 @@
# 12.0.0
- offline session recording and manual sending
- conditional recording with 30s buffer
- websockets tracking hook
# 11.0.3
- move all logs under internal debugger

View file

@ -1,7 +1,7 @@
{
"name": "@openreplay/tracker",
"description": "The OpenReplay tracker main package",
"version": "11.0.2-39",
"version": "12.0.0",
"keywords": [
"logging",
"replay"

View file

@ -29,14 +29,20 @@ export type ToWorkerData =
| { type: 'compressed'; batch: Uint8Array }
| { type: 'uncompressed'; batch: Uint8Array }
| 'forceFlushBatch'
| 'check_queue'
type Failure = {
type: 'failure'
reason: string
}
type QEmpty = {
type: 'queue_empty'
}
export type FromWorkerData =
| 'restart'
| Failure
| 'not_init'
| { type: 'compress'; batch: Uint8Array }
| QEmpty

View file

@ -152,7 +152,7 @@ export default class App {
readonly sessionStorage: Storage
private readonly messages: Array<Message> = []
/**
* we need 2 buffers so we don't lose anything
* we need 2 buffers, so we don't lose anything
* @read coldStart implementation
* */
private bufferedMessages1: Array<Message> = []
@ -279,6 +279,8 @@ export default class App {
} else {
this.worker?.postMessage({ type: 'uncompressed', batch: batch })
}
} else if (data.type === 'queue_empty') {
this.onSessionSent()
}
}
const alertWorker = () => {
@ -388,7 +390,7 @@ export default class App {
// Clarify where urgent can be used for;
// Clarify workflow for each type of message in case it was sent before start
// (like Fetch before start; maybe add an option "preCapture: boolean" or sth alike)
// Careful: `this.delay` is equal to zero before start hense all Timestamp-s will have to be updated on start
// Careful: `this.delay` is equal to zero before start so all Timestamp-s will have to be updated on start
if (this.activityState === ActivityState.Active && urgent) {
this.commit()
}
@ -712,6 +714,7 @@ export default class App {
metadata: startOpts.metadata,
})
if (!isNewSession) {
this.debug.log('continuing session on new tab', this.session.getTabId())
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
this.send(TabChange(this.session.getTabId()))
}
@ -724,16 +727,25 @@ export default class App {
cycle()
}
public offlineRecording(startOpts: StartOptions = {}) {
onSessionSent = () => {
return
}
/**
* Starts offline session recording
* @param {Object} startOpts - options for session start, same as .start()
* @param {Function} onSessionSent - callback that will be called once session is fully sent
* */
public offlineRecording(startOpts: StartOptions = {}, onSessionSent: () => void) {
this.onSessionSent = onSessionSent
this.singleBuffer = true
const isNewSession = this.checkSessionToken(startOpts.forceNew)
adjustTimeOrigin()
this.coldStartTs = now()
this.bufferedMessages1.length = 0
const saverBuffer = this.localStorage.getItem(bufferStorageKey)
if (saverBuffer) {
const data = JSON.parse(saverBuffer)
this.bufferedMessages1 = Array.isArray(data) ? data : []
this.bufferedMessages1 = Array.isArray(data) ? data : this.bufferedMessages1
this.localStorage.removeItem(bufferStorageKey)
}
this.bufferedMessages1.push(Timestamp(this.timestamp()))
@ -751,12 +763,20 @@ export default class App {
userID: startOpts.userID,
metadata: startOpts.metadata,
})
const onStartInfo = { sessionToken: '', userUUID: '', sessionID: '' }
this.startCallbacks.forEach((cb) => cb(onStartInfo))
if (!isNewSession) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
this.send(TabChange(this.session.getTabId()))
}
this.observer.observe()
this.ticker.start()
return {
saveBuffer: this.saveBuffer,
getBuffer: this.getBuffer,
setBuffer: this.setBuffer,
}
}
/**
@ -771,11 +791,85 @@ export default class App {
}
/**
* Uploads the stored buffer to create session
* @returns buffer with stored messages for offline recording
* */
public uploadOfflineRecording() {
public getBuffer() {
return this.bufferedMessages1
}
/**
* Used to set a buffer with messages array
* */
public setBuffer(buffer: Message[]) {
this.bufferedMessages1 = buffer
}
/**
* Uploads the stored session buffer to backend
* @returns promise that resolves once messages are loaded, it has to be awaited
* so the session can be uploaded properly
* @resolve {boolean} - if messages were loaded successfully
* @reject {string} - error message
* */
public async uploadOfflineRecording() {
this.stop(false)
// then fetch it
const timestamp = now()
this.worker?.postMessage({
type: 'start',
pageNo: this.session.incPageNo(),
ingestPoint: this.options.ingestPoint,
timestamp: this.coldStartTs,
url: document.URL,
connAttemptCount: this.options.connAttemptCount,
connAttemptGap: this.options.connAttemptGap,
tabId: this.session.getTabId(),
})
const r = await fetch(this.options.ingestPoint + '/v1/web/start', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
...this.getTrackerInfo(),
timestamp: timestamp,
doNotRecord: false,
bufferDiff: timestamp - this.coldStartTs,
userID: this.session.getInfo().userID,
token: undefined,
deviceMemory,
jsHeapSizeLimit,
timezone: getTimezone(),
}),
})
const {
token,
userBrowser,
userCity,
userCountry,
userDevice,
userOS,
userState,
beaconSizeLimit,
projectID,
} = await r.json()
this.worker?.postMessage({
type: 'auth',
token,
beaconSizeLimit,
})
this.session.assign({ projectID })
this.session.setUserInfo({
userBrowser,
userCity,
userCountry,
userDevice,
userOS,
userState,
})
while (this.bufferedMessages1.length > 0) {
await this.flushBuffer(this.bufferedMessages1)
}
this.postToWorker([['q_end']] as unknown as Message[])
this.clearBuffers()
}
@ -825,7 +919,7 @@ export default class App {
type: 'start',
pageNo: this.session.incPageNo(),
ingestPoint: this.options.ingestPoint,
timestamp,
timestamp: isColdStart ? this.coldStartTs : timestamp,
url: document.URL,
connAttemptCount: this.options.connAttemptCount,
connAttemptGap: this.options.connAttemptGap,
@ -874,7 +968,7 @@ export default class App {
)
}
})
.then((r) => {
.then(async (r) => {
if (!this.worker) {
const reason = 'no worker found after start request (this might not happen)'
this.signalError(reason, [])
@ -951,32 +1045,19 @@ export default class App {
this.compressionThreshold = compressionThreshold
const onStartInfo = { sessionToken: token, userUUID, sessionID }
const flushBuffer = (buffer: Message[]) => {
let ended = false
const messagesBatch: Message[] = [buffer.shift() as unknown as Message]
while (!ended) {
const nextMsg = buffer[0]
if (!nextMsg || nextMsg[0] === MType.Timestamp) {
ended = true
} else {
messagesBatch.push(buffer.shift() as unknown as Message)
}
}
this.postToWorker(messagesBatch)
}
// TODO: start as early as possible (before receiving the token)
this.startCallbacks.forEach((cb) => cb(onStartInfo)) // MBTODO: callbacks after DOM "mounted" (observed)
void this.featureFlags.reloadFlags()
/** --------------- COLD START BUFFER ------------------*/
this.activityState = ActivityState.Active
/** --------------- COLD START BUFFER ------------------*/
if (isColdStart) {
const biggestBurger =
const biggestBuffer =
this.bufferedMessages1.length > this.bufferedMessages2.length
? this.bufferedMessages1
: this.bufferedMessages2
while (biggestBurger.length > 0) {
flushBuffer(biggestBurger)
while (biggestBuffer.length > 0) {
await this.flushBuffer(biggestBuffer)
}
this.clearBuffers()
this.commit()
@ -1045,6 +1126,23 @@ export default class App {
})
}
flushBuffer = async (buffer: Message[]) => {
return new Promise((res) => {
let ended = false
const messagesBatch: Message[] = [buffer.shift() as unknown as Message]
while (!ended) {
const nextMsg = buffer[0]
if (!nextMsg || nextMsg[0] === MType.Timestamp) {
ended = true
} else {
messagesBatch.push(buffer.shift() as unknown as Message)
}
}
this.postToWorker(messagesBatch)
res(null)
})
}
onUxtCb = []
addOnUxtCb(cb: (id: number) => void) {
@ -1090,7 +1188,12 @@ export default class App {
return this.session.getTabId()
}
/**
clearBuffers() {
this.bufferedMessages1.length = 0
this.bufferedMessages2.length = 0
}
/**
* Creates a named hook that expects event name, data string and msg direction (up/down),
* it will skip any message bigger than 5 mb or event name bigger than 255 symbols
* @returns {(msgType: string, data: string, dir: 'up' | 'down') => void}
@ -1109,11 +1212,6 @@ export default class App {
this.send(WSChannel('websocket', channel, data, this.timestamp(), dir, msgType))
}
}
clearBuffers() {
this.bufferedMessages1.length = 0
this.bufferedMessages2.length = 0
}
stop(stopWorker = true): void {
if (this.activityState !== ActivityState.NotActive) {

View file

@ -315,6 +315,45 @@ export default class API {
}
}
/**
* Starts offline session recording. Keep in mind that only user device time will be used for timestamps.
* (no backend delay sync)
*
* @param {Object} startOpts - options for session start, same as .start()
* @param {Function} onSessionSent - callback that will be called once session is fully sent
* @returns methods to manipulate buffer:
*
* saveBuffer - to save it in localStorage
*
* getBuffer - returns current buffer
*
* setBuffer - replaces current buffer with given
* */
startOfflineRecording(startOpts: Partial<StartOptions>, onSessionSent: () => void) {
if (this.browserEnvCheck()) {
if (this.app === null) {
return Promise.reject('Tracker not initialized')
}
return this.app.offlineRecording(startOpts, onSessionSent)
} else {
return Promise.reject('Trying to start not in browser.')
}
}
/**
* Uploads the stored session buffer to backend
* @returns promise that resolves once messages are loaded, it has to be awaited
* so the session can be uploaded properly
* @resolve {boolean} - if messages were loaded successfully
* @reject {string} - error message
* */
uploadOfflineRecording() {
if (this.app === null) {
return
}
return this.app.uploadOfflineRecording()
}
stop(): string | undefined {
if (this.app === null) {
return

View file

@ -18,6 +18,7 @@ export default class BatchWriter {
private url: string,
private readonly onBatch: (batch: Uint8Array) => void,
private tabId: string,
private readonly onOfflineEnd: () => void,
) {
this.prepare()
}
@ -90,6 +91,11 @@ export default class BatchWriter {
}
writeMessage(message: Message) {
// @ts-ignore
if (message[0] === 'q_end') {
this.finaliseBatch()
return this.onOfflineEnd()
}
if (message[0] === Messages.Type.Timestamp) {
this.timestamp = message[1] // .timestamp
}

View file

@ -9,7 +9,7 @@ describe('BatchWriter', () => {
beforeEach(() => {
onBatchMock = jest.fn()
batchWriter = new BatchWriter(1, 123456789, 'example.com', onBatchMock, '123')
batchWriter = new BatchWriter(1, 123456789, 'example.com', onBatchMock, '123', () => null)
})
afterEach(() => {

View file

@ -27,6 +27,10 @@ export default class QueueSender {
}
}
public getQueueStatus() {
return this.queue.length === 0 && !this.busy
}
authorise(token: string): void {
this.token = token
if (!this.busy) {

View file

@ -156,6 +156,7 @@ self.onmessage = ({ data }: { data: ToWorkerData }): any => {
sender && sender.push(batch)
},
data.tabId,
() => postMessage({ type: 'queue_empty' }),
)
if (sendIntervalID === null) {
sendIntervalID = setInterval(finalize, AUTO_SEND_INTERVAL)