Compare commits

...
Sign in to create a new pull request.

2 commits

Author SHA1 Message Date
nick-delirium
a35d0d06f2
tracker: rm idlecb 2025-03-03 17:38:50 +01:00
nick-delirium
13ae208462
tracker: testing workerless model 2025-03-03 17:38:49 +01:00
12 changed files with 1287 additions and 61 deletions

View file

@ -1,7 +1,7 @@
{ {
"name": "@openreplay/tracker", "name": "@openreplay/tracker",
"description": "The OpenReplay tracker main package", "description": "The OpenReplay tracker main package",
"version": "16.0.1", "version": "16.0.1-2",
"keywords": [ "keywords": [
"logging", "logging",
"replay" "replay"

View file

@ -9,7 +9,7 @@ const require = createRequire(import.meta.url)
const packageConfig = require('./package.json') const packageConfig = require('./package.json')
export default async () => { export default async () => {
const webworkerContent = await buildWebWorker() // const webworkerContent = await buildWebWorker()
const commonPlugins = [ const commonPlugins = [
resolve(), resolve(),
@ -17,7 +17,7 @@ export default async () => {
preventAssignment: true, preventAssignment: true,
values: { values: {
TRACKER_VERSION: packageConfig.version, TRACKER_VERSION: packageConfig.version,
'global.WEBWORKER_BODY': JSON.stringify(webworkerContent), // 'global.WEBWORKER_BODY': JSON.stringify(webworkerContent),
}, },
}), }),
] ]

View file

@ -0,0 +1,139 @@
import type Message from '../common/messages.gen.js'
import * as Messages from '../common/messages.gen.js'
import MessageEncoder from './MessageEncoder.gen.js'
import { requestIdleCb } from '../main/utils.js';
const SIZE_BYTES = 3
const MAX_M_SIZE = (1 << (SIZE_BYTES * 8)) - 1
export default class BatchWriter {
private nextIndex = 0
private beaconSize = 2 * 1e5 // Default 200kB
private encoder = new MessageEncoder(this.beaconSize)
private readonly sizeBuffer = new Uint8Array(SIZE_BYTES)
private isEmpty = true
constructor(
private readonly pageNo: number,
private timestamp: number,
private url: string,
private readonly onBatch: (batch: Uint8Array) => void,
private tabId: string,
private readonly onOfflineEnd: () => void,
) {
this.prepare()
}
private writeType(m: Message): boolean {
return this.encoder.uint(m[0])
}
private writeFields(m: Message): boolean {
return this.encoder.encode(m)
}
private writeSizeAt(size: number, offset: number): void {
//boolean?
for (let i = 0; i < SIZE_BYTES; i++) {
this.sizeBuffer[i] = size >> (i * 8) // BigEndian
}
this.encoder.set(this.sizeBuffer, offset)
}
private prepare(): void {
if (!this.encoder.isEmpty) {
return
}
// MBTODO: move service-messages creation methods to webworker
const batchMetadata: Messages.BatchMetadata = [
Messages.Type.BatchMetadata,
1,
this.pageNo,
this.nextIndex,
this.timestamp,
this.url,
]
const tabData: Messages.TabData = [Messages.Type.TabData, this.tabId]
this.writeType(batchMetadata)
this.writeFields(batchMetadata)
this.writeWithSize(tabData as Message)
this.isEmpty = true
}
private writeWithSize(message: Message): boolean {
const e = this.encoder
if (!this.writeType(message) || !e.skip(SIZE_BYTES)) {
// app.debug.log
return false
}
const startOffset = e.getCurrentOffset()
const wasWritten = this.writeFields(message)
if (wasWritten) {
const endOffset = e.getCurrentOffset()
const size = endOffset - startOffset
if (size > MAX_M_SIZE) {
console.warn('OpenReplay: max message size overflow.')
return false
}
this.writeSizeAt(size, startOffset - SIZE_BYTES)
e.checkpoint()
this.isEmpty = this.isEmpty && message[0] === Messages.Type.Timestamp
this.nextIndex++
}
// app.debug.log
return wasWritten
}
private beaconSizeLimit = 1e6
setBeaconSizeLimit(limit: number) {
this.beaconSizeLimit = limit
}
writeMessage(message: Message) {
// @ts-ignore
if (message[0] === 'q_end') {
this.finaliseBatch()
return this.onOfflineEnd()
}
if (message[0] === Messages.Type.Timestamp) {
this.timestamp = message[1] // .timestamp
}
if (message[0] === Messages.Type.SetPageLocation) {
this.url = message[1] // .url
}
if (this.writeWithSize(message)) {
return
}
// buffer overflow, send already written data first then try again
this.finaliseBatch()
if (this.writeWithSize(message)) {
return
}
// buffer is too small. Creating one with maximal capacity for this message only
this.encoder = new MessageEncoder(this.beaconSizeLimit)
this.prepare()
if (!this.writeWithSize(message)) {
console.warn('OpenReplay: beacon size overflow. Skipping large message.', message, this)
} else {
this.finaliseBatch()
}
// reset encoder to normal size
this.encoder = new MessageEncoder(this.beaconSize)
this.prepare()
}
finaliseBatch() {
if (this.isEmpty) {
return
}
const batch = this.encoder.flush()
this.onBatch(batch)
this.prepare()
}
clean() {
this.encoder.reset()
}
}

View file

@ -0,0 +1,90 @@
import BatchWriter from './BatchWriter'
import * as Messages from '../common/messages.gen.js'
import { describe, expect, test, jest, beforeEach, afterEach } from '@jest/globals'
import Message from '../common/messages.gen.js'
describe('BatchWriter', () => {
let onBatchMock: (b: Uint8Array) => void
let batchWriter: BatchWriter
beforeEach(() => {
onBatchMock = jest.fn()
batchWriter = new BatchWriter(1, 123456789, 'example.com', onBatchMock, '123', () => null)
})
afterEach(() => {
jest.clearAllMocks()
})
test('constructor initializes BatchWriter instance', () => {
expect(batchWriter['pageNo']).toBe(1)
expect(batchWriter['timestamp']).toBe(123456789)
expect(batchWriter['url']).toBe('example.com')
expect(batchWriter['onBatch']).toBe(onBatchMock)
// we add tab id as first in the batch
expect(batchWriter['nextIndex']).toBe(1)
expect(batchWriter['beaconSize']).toBe(200000)
expect(batchWriter['encoder']).toBeDefined()
expect(batchWriter['sizeBuffer']).toHaveLength(3)
expect(batchWriter['isEmpty']).toBe(true)
})
test('writeType writes the type of the message', () => {
// @ts-ignore
const message = [Messages.Type.BatchMetadata, 1, 2, 3, 4, 'example.com']
const result = batchWriter['writeType'](message as Message)
expect(result).toBe(true)
})
test('writeFields encodes the message fields', () => {
// @ts-ignore
const message = [Messages.Type.BatchMetadata, 1, 2, 3, 4, 'example.com']
const result = batchWriter['writeFields'](message as Message)
expect(result).toBe(true)
})
test('writeSizeAt writes the size at the given offset', () => {
batchWriter['writeSizeAt'](100, 0)
expect(batchWriter['sizeBuffer']).toEqual(new Uint8Array([100, 0, 0]))
expect(batchWriter['encoder']['data'].slice(0, 3)).toEqual(new Uint8Array([100, 0, 0]))
})
test('prepare prepares the BatchWriter for writing', () => {
// TODO
})
test('writeWithSize writes the message with its size', () => {
// @ts-ignore
const message = [Messages.Type.BatchMetadata, 1, 2, 3, 4, 'example.com']
const result = batchWriter['writeWithSize'](message as Message)
expect(result).toBe(true)
})
test('setBeaconSizeLimit sets the beacon size limit', () => {
batchWriter['setBeaconSizeLimit'](500000)
expect(batchWriter['beaconSizeLimit']).toBe(500000)
})
test('writeMessage writes the given message', () => {
// @ts-ignore
const message = [Messages.Type.Timestamp, 987654321]
// @ts-ignore
batchWriter['writeWithSize'] = jest.fn().mockReturnValue(true)
batchWriter['writeMessage'](message as Message)
expect(batchWriter['writeWithSize']).toHaveBeenCalledWith(message)
})
test('finaliseBatch flushes the encoder and calls onBatch', () => {
const flushSpy = jest.spyOn(batchWriter['encoder'], 'flush')
batchWriter['isEmpty'] = false
batchWriter['finaliseBatch']()
expect(flushSpy).toHaveBeenCalled()
expect(onBatchMock).toHaveBeenCalled()
})
test('clean resets the encoder', () => {
const cleanSpy = jest.spyOn(batchWriter['encoder'], 'reset')
batchWriter['clean']()
expect(cleanSpy).toHaveBeenCalled()
})
})

View file

@ -0,0 +1,340 @@
// Auto-generated, do not edit
/* eslint-disable */
import * as Messages from '../common/messages.gen.js'
import Message from '../common/messages.gen.js'
import PrimitiveEncoder from './PrimitiveEncoder.js'
export default class MessageEncoder extends PrimitiveEncoder {
encode(msg: Message): boolean {
switch(msg[0]) {
case Messages.Type.Timestamp:
return this.uint(msg[1])
break
case Messages.Type.SetPageLocationDeprecated:
return this.string(msg[1]) && this.string(msg[2]) && this.uint(msg[3])
break
case Messages.Type.SetViewportSize:
return this.uint(msg[1]) && this.uint(msg[2])
break
case Messages.Type.SetViewportScroll:
return this.int(msg[1]) && this.int(msg[2])
break
case Messages.Type.CreateDocument:
return true
break
case Messages.Type.CreateElementNode:
return this.uint(msg[1]) && this.uint(msg[2]) && this.uint(msg[3]) && this.string(msg[4]) && this.boolean(msg[5])
break
case Messages.Type.CreateTextNode:
return this.uint(msg[1]) && this.uint(msg[2]) && this.uint(msg[3])
break
case Messages.Type.MoveNode:
return this.uint(msg[1]) && this.uint(msg[2]) && this.uint(msg[3])
break
case Messages.Type.RemoveNode:
return this.uint(msg[1])
break
case Messages.Type.SetNodeAttribute:
return this.uint(msg[1]) && this.string(msg[2]) && this.string(msg[3])
break
case Messages.Type.RemoveNodeAttribute:
return this.uint(msg[1]) && this.string(msg[2])
break
case Messages.Type.SetNodeData:
return this.uint(msg[1]) && this.string(msg[2])
break
case Messages.Type.SetNodeScroll:
return this.uint(msg[1]) && this.int(msg[2]) && this.int(msg[3])
break
case Messages.Type.SetInputTarget:
return this.uint(msg[1]) && this.string(msg[2])
break
case Messages.Type.SetInputValue:
return this.uint(msg[1]) && this.string(msg[2]) && this.int(msg[3])
break
case Messages.Type.SetInputChecked:
return this.uint(msg[1]) && this.boolean(msg[2])
break
case Messages.Type.MouseMove:
return this.uint(msg[1]) && this.uint(msg[2])
break
case Messages.Type.NetworkRequestDeprecated:
return this.string(msg[1]) && this.string(msg[2]) && this.string(msg[3]) && this.string(msg[4]) && this.string(msg[5]) && this.uint(msg[6]) && this.uint(msg[7]) && this.uint(msg[8])
break
case Messages.Type.ConsoleLog:
return this.string(msg[1]) && this.string(msg[2])
break
case Messages.Type.PageLoadTiming:
return this.uint(msg[1]) && this.uint(msg[2]) && this.uint(msg[3]) && this.uint(msg[4]) && this.uint(msg[5]) && this.uint(msg[6]) && this.uint(msg[7]) && this.uint(msg[8]) && this.uint(msg[9])
break
case Messages.Type.PageRenderTiming:
return this.uint(msg[1]) && this.uint(msg[2]) && this.uint(msg[3])
break
case Messages.Type.CustomEvent:
return this.string(msg[1]) && this.string(msg[2])
break
case Messages.Type.UserID:
return this.string(msg[1])
break
case Messages.Type.UserAnonymousID:
return this.string(msg[1])
break
case Messages.Type.Metadata:
return this.string(msg[1]) && this.string(msg[2])
break
case Messages.Type.StringDictGlobal:
return this.uint(msg[1]) && this.string(msg[2])
break
case Messages.Type.SetNodeAttributeDictGlobal:
return this.uint(msg[1]) && this.uint(msg[2]) && this.uint(msg[3])
break
case Messages.Type.CSSInsertRule:
return this.uint(msg[1]) && this.string(msg[2]) && this.uint(msg[3])
break
case Messages.Type.CSSDeleteRule:
return this.uint(msg[1]) && this.uint(msg[2])
break
case Messages.Type.Fetch:
return this.string(msg[1]) && this.string(msg[2]) && this.string(msg[3]) && this.string(msg[4]) && this.uint(msg[5]) && this.uint(msg[6]) && this.uint(msg[7])
break
case Messages.Type.Profiler:
return this.string(msg[1]) && this.uint(msg[2]) && this.string(msg[3]) && this.string(msg[4])
break
case Messages.Type.OTable:
return this.string(msg[1]) && this.string(msg[2])
break
case Messages.Type.StateAction:
return this.string(msg[1])
break
case Messages.Type.ReduxDeprecated:
return this.string(msg[1]) && this.string(msg[2]) && this.uint(msg[3])
break
case Messages.Type.Vuex:
return this.string(msg[1]) && this.string(msg[2])
break
case Messages.Type.MobX:
return this.string(msg[1]) && this.string(msg[2])
break
case Messages.Type.NgRx:
return this.string(msg[1]) && this.string(msg[2]) && this.uint(msg[3])
break
case Messages.Type.GraphQLDeprecated:
return this.string(msg[1]) && this.string(msg[2]) && this.string(msg[3]) && this.string(msg[4]) && this.int(msg[5])
break
case Messages.Type.PerformanceTrack:
return this.int(msg[1]) && this.int(msg[2]) && this.uint(msg[3]) && this.uint(msg[4])
break
case Messages.Type.StringDictDeprecated:
return this.uint(msg[1]) && this.string(msg[2])
break
case Messages.Type.SetNodeAttributeDictDeprecated:
return this.uint(msg[1]) && this.uint(msg[2]) && this.uint(msg[3])
break
case Messages.Type.StringDict:
return this.string(msg[1]) && this.string(msg[2])
break
case Messages.Type.SetNodeAttributeDict:
return this.uint(msg[1]) && this.string(msg[2]) && this.string(msg[3])
break
case Messages.Type.ResourceTimingDeprecated:
return this.uint(msg[1]) && this.uint(msg[2]) && this.uint(msg[3]) && this.uint(msg[4]) && this.uint(msg[5]) && this.uint(msg[6]) && this.string(msg[7]) && this.string(msg[8])
break
case Messages.Type.ConnectionInformation:
return this.uint(msg[1]) && this.string(msg[2])
break
case Messages.Type.SetPageVisibility:
return this.boolean(msg[1])
break
case Messages.Type.LoadFontFace:
return this.uint(msg[1]) && this.string(msg[2]) && this.string(msg[3]) && this.string(msg[4])
break
case Messages.Type.SetNodeFocus:
return this.int(msg[1])
break
case Messages.Type.LongTask:
return this.uint(msg[1]) && this.uint(msg[2]) && this.uint(msg[3]) && this.uint(msg[4]) && this.string(msg[5]) && this.string(msg[6]) && this.string(msg[7])
break
case Messages.Type.SetNodeAttributeURLBased:
return this.uint(msg[1]) && this.string(msg[2]) && this.string(msg[3]) && this.string(msg[4])
break
case Messages.Type.SetCSSDataURLBased:
return this.uint(msg[1]) && this.string(msg[2]) && this.string(msg[3])
break
case Messages.Type.TechnicalInfo:
return this.string(msg[1]) && this.string(msg[2])
break
case Messages.Type.CustomIssue:
return this.string(msg[1]) && this.string(msg[2])
break
case Messages.Type.CSSInsertRuleURLBased:
return this.uint(msg[1]) && this.string(msg[2]) && this.uint(msg[3]) && this.string(msg[4])
break
case Messages.Type.MouseClick:
return this.uint(msg[1]) && this.uint(msg[2]) && this.string(msg[3]) && this.string(msg[4]) && this.uint(msg[5]) && this.uint(msg[6])
break
case Messages.Type.MouseClickDeprecated:
return this.uint(msg[1]) && this.uint(msg[2]) && this.string(msg[3]) && this.string(msg[4])
break
case Messages.Type.CreateIFrameDocument:
return this.uint(msg[1]) && this.uint(msg[2])
break
case Messages.Type.AdoptedSSReplaceURLBased:
return this.uint(msg[1]) && this.string(msg[2]) && this.string(msg[3])
break
case Messages.Type.AdoptedSSInsertRuleURLBased:
return this.uint(msg[1]) && this.string(msg[2]) && this.uint(msg[3]) && this.string(msg[4])
break
case Messages.Type.AdoptedSSDeleteRule:
return this.uint(msg[1]) && this.uint(msg[2])
break
case Messages.Type.AdoptedSSAddOwner:
return this.uint(msg[1]) && this.uint(msg[2])
break
case Messages.Type.AdoptedSSRemoveOwner:
return this.uint(msg[1]) && this.uint(msg[2])
break
case Messages.Type.JSException:
return this.string(msg[1]) && this.string(msg[2]) && this.string(msg[3]) && this.string(msg[4])
break
case Messages.Type.Zustand:
return this.string(msg[1]) && this.string(msg[2])
break
case Messages.Type.BatchMetadata:
return this.uint(msg[1]) && this.uint(msg[2]) && this.uint(msg[3]) && this.int(msg[4]) && this.string(msg[5])
break
case Messages.Type.PartitionedMessage:
return this.uint(msg[1]) && this.uint(msg[2])
break
case Messages.Type.NetworkRequest:
return this.string(msg[1]) && this.string(msg[2]) && this.string(msg[3]) && this.string(msg[4]) && this.string(msg[5]) && this.uint(msg[6]) && this.uint(msg[7]) && this.uint(msg[8]) && this.uint(msg[9])
break
case Messages.Type.WSChannel:
return this.string(msg[1]) && this.string(msg[2]) && this.string(msg[3]) && this.uint(msg[4]) && this.string(msg[5]) && this.string(msg[6])
break
case Messages.Type.InputChange:
return this.uint(msg[1]) && this.string(msg[2]) && this.boolean(msg[3]) && this.string(msg[4]) && this.int(msg[5]) && this.int(msg[6])
break
case Messages.Type.SelectionChange:
return this.uint(msg[1]) && this.uint(msg[2]) && this.string(msg[3])
break
case Messages.Type.MouseThrashing:
return this.uint(msg[1])
break
case Messages.Type.UnbindNodes:
return this.uint(msg[1])
break
case Messages.Type.ResourceTiming:
return this.uint(msg[1]) && this.uint(msg[2]) && this.uint(msg[3]) && this.uint(msg[4]) && this.uint(msg[5]) && this.uint(msg[6]) && this.string(msg[7]) && this.string(msg[8]) && this.uint(msg[9]) && this.boolean(msg[10])
break
case Messages.Type.TabChange:
return this.string(msg[1])
break
case Messages.Type.TabData:
return this.string(msg[1])
break
case Messages.Type.CanvasNode:
return this.string(msg[1]) && this.uint(msg[2])
break
case Messages.Type.TagTrigger:
return this.int(msg[1])
break
case Messages.Type.Redux:
return this.string(msg[1]) && this.string(msg[2]) && this.uint(msg[3]) && this.uint(msg[4])
break
case Messages.Type.SetPageLocation:
return this.string(msg[1]) && this.string(msg[2]) && this.uint(msg[3]) && this.string(msg[4])
break
case Messages.Type.GraphQL:
return this.string(msg[1]) && this.string(msg[2]) && this.string(msg[3]) && this.string(msg[4]) && this.uint(msg[5])
break
case Messages.Type.WebVitals:
return this.string(msg[1]) && this.string(msg[2])
break
}
}
}

View file

@ -0,0 +1,117 @@
declare const TextEncoder: any
const textEncoder: { encode(str: string): Uint8Array } =
typeof TextEncoder === 'function'
? new TextEncoder()
: {
// Based on https://developer.mozilla.org/en-US/docs/Web/API/TextEncoder
encode(str): Uint8Array {
const Len = str.length,
resArr = new Uint8Array(Len * 3)
let resPos = -1
for (let point = 0, nextcode = 0, i = 0; i !== Len; ) {
;(point = str.charCodeAt(i)), (i += 1)
if (point >= 0xd800 && point <= 0xdbff) {
if (i === Len) {
resArr[(resPos += 1)] = 0xef /*0b11101111*/
resArr[(resPos += 1)] = 0xbf /*0b10111111*/
resArr[(resPos += 1)] = 0xbd /*0b10111101*/
break
}
// https://mathiasbynens.be/notes/javascript-encoding#surrogate-formulae
nextcode = str.charCodeAt(i)
if (nextcode >= 0xdc00 && nextcode <= 0xdfff) {
point = (point - 0xd800) * 0x400 + nextcode - 0xdc00 + 0x10000
i += 1
if (point > 0xffff) {
resArr[(resPos += 1)] = (0x1e /*0b11110*/ << 3) | (point >>> 18)
resArr[(resPos += 1)] =
(0x2 /*0b10*/ << 6) | ((point >>> 12) & 0x3f) /*0b00111111*/
resArr[(resPos += 1)] =
(0x2 /*0b10*/ << 6) | ((point >>> 6) & 0x3f) /*0b00111111*/
resArr[(resPos += 1)] = (0x2 /*0b10*/ << 6) | (point & 0x3f) /*0b00111111*/
continue
}
} else {
resArr[(resPos += 1)] = 0xef /*0b11101111*/
resArr[(resPos += 1)] = 0xbf /*0b10111111*/
resArr[(resPos += 1)] = 0xbd /*0b10111101*/
continue
}
}
if (point <= 0x007f) {
resArr[(resPos += 1)] = (0x0 /*0b0*/ << 7) | point
} else if (point <= 0x07ff) {
resArr[(resPos += 1)] = (0x6 /*0b110*/ << 5) | (point >>> 6)
resArr[(resPos += 1)] = (0x2 /*0b10*/ << 6) | (point & 0x3f) /*0b00111111*/
} else {
resArr[(resPos += 1)] = (0xe /*0b1110*/ << 4) | (point >>> 12)
resArr[(resPos += 1)] = (0x2 /*0b10*/ << 6) | ((point >>> 6) & 0x3f) /*0b00111111*/
resArr[(resPos += 1)] = (0x2 /*0b10*/ << 6) | (point & 0x3f) /*0b00111111*/
}
}
return resArr.subarray(0, resPos + 1)
},
}
export default class PrimitiveEncoder {
private offset = 0
private checkpointOffset = 0
private readonly data: Uint8Array
constructor(private readonly size: number) {
this.data = new Uint8Array(size)
}
getCurrentOffset(): number {
return this.offset
}
checkpoint() {
this.checkpointOffset = this.offset
}
get isEmpty(): boolean {
return this.offset === 0
}
skip(n: number): boolean {
this.offset += n
return this.offset <= this.size
}
set(bytes: Uint8Array, offset: number) {
this.data.set(bytes, offset)
}
boolean(value: boolean): boolean {
this.data[this.offset++] = +value
return this.offset <= this.size
}
uint(value: number): boolean {
if (value < 0 || value > Number.MAX_SAFE_INTEGER) {
value = 0
}
while (value >= 0x80) {
this.data[this.offset++] = value % 0x100 | 0x80
value = Math.floor(value / 128)
}
this.data[this.offset++] = value
return this.offset <= this.size
}
int(value: number): boolean {
value = Math.round(value)
return this.uint(value >= 0 ? value * 2 : value * -2 - 1)
}
string(value: string): boolean {
const encoded = textEncoder.encode(value)
const length = encoded.byteLength
if (!this.uint(length) || this.offset + length > this.size) {
return false
}
this.data.set(encoded, this.offset)
this.offset += length
return true
}
reset(): void {
this.offset = 0
this.checkpointOffset = 0
}
flush(): Uint8Array {
const data = this.data.slice(0, this.checkpointOffset)
this.reset()
return data
}
}

View file

@ -0,0 +1,63 @@
import { describe, expect, test, jest, beforeEach, afterEach } from '@jest/globals'
import PrimitiveEncoder from './PrimitiveEncoder.js'
describe('PrimitiveEncoder', () => {
test('initial state', () => {
const enc = new PrimitiveEncoder(10)
expect(enc.getCurrentOffset()).toBe(0)
expect(enc.isEmpty).toBe(true)
expect(enc.flush().length).toBe(0)
})
test('skip()', () => {
const enc = new PrimitiveEncoder(10)
enc.skip(5)
expect(enc.getCurrentOffset()).toBe(5)
expect(enc.isEmpty).toBe(false)
})
test('checkpoint()', () => {
const enc = new PrimitiveEncoder(10)
enc.skip(5)
enc.checkpoint()
expect(enc.flush().length).toBe(5)
})
test('boolean(true)', () => {
const enc = new PrimitiveEncoder(10)
enc.boolean(true)
enc.checkpoint()
const bytes = enc.flush()
expect(bytes.length).toBe(1)
expect(bytes[0]).toBe(1)
})
test('boolean(false)', () => {
const enc = new PrimitiveEncoder(10)
enc.boolean(false)
enc.checkpoint()
const bytes = enc.flush()
expect(bytes.length).toBe(1)
expect(bytes[0]).toBe(0)
})
// TODO: test correct enc/dec on a top level(?) with player(PrimitiveReader.ts)/tracker(PrimitiveEncoder.ts)
test('buffer oveflow with string()', () => {
const N = 10
const enc = new PrimitiveEncoder(N)
const wasWritten = enc.string('long string'.repeat(N))
expect(wasWritten).toBe(false)
})
test('buffer oveflow with uint()', () => {
const enc = new PrimitiveEncoder(1)
const wasWritten = enc.uint(Number.MAX_SAFE_INTEGER)
expect(wasWritten).toBe(false)
})
test('buffer oveflow with boolean()', () => {
const enc = new PrimitiveEncoder(1)
let wasWritten = enc.boolean(true)
expect(wasWritten).toBe(true)
wasWritten = enc.boolean(true)
expect(wasWritten).toBe(false)
})
})

View file

@ -0,0 +1,154 @@
const INGEST_PATH = '/v1/web/i'
const KEEPALIVE_SIZE_LIMIT = 64 << 10 // 64 kB
export default class QueueSender {
private attemptsCount = 0
private busy = false
private readonly queue: Array<Uint8Array> = []
private readonly ingestURL
private token: string | null = null
// its actually on #24
// eslint-disable-next-line
private isCompressing
private lastBatchNum = 0
constructor(
ingestBaseURL: string,
private readonly onUnauthorised: () => any,
private readonly onFailure: (reason: string) => any,
private readonly MAX_ATTEMPTS_COUNT = 10,
private readonly ATTEMPT_TIMEOUT = 250,
private readonly onCompress?: (batch: Uint8Array) => any,
private readonly pageNo?: number,
) {
this.ingestURL = ingestBaseURL + INGEST_PATH
if (onCompress !== undefined) {
this.isCompressing = true
} else {
this.isCompressing = false
}
}
public getQueueStatus() {
return this.queue.length === 0 && !this.busy
}
authorise(token: string): void {
this.token = token
if (!this.busy) {
// TODO: transparent busy/send logic
this.sendNext()
}
}
push(batch: Uint8Array): void {
if (this.busy || !this.token) {
this.queue.push(batch)
} else {
this.busy = true
if (this.isCompressing && this.onCompress) {
this.onCompress(batch)
} else {
const batchNum = ++this.lastBatchNum
this.sendBatch(batch, false, batchNum)
}
}
}
private sendNext() {
const nextBatch = this.queue.shift()
if (nextBatch) {
this.busy = true
if (this.isCompressing && this.onCompress) {
this.onCompress(nextBatch)
} else {
const batchNum = ++this.lastBatchNum
this.sendBatch(nextBatch, false, batchNum)
}
} else {
this.busy = false
}
}
private retry(batch: Uint8Array, isCompressed?: boolean, batchNum?: string | number): void {
if (this.attemptsCount >= this.MAX_ATTEMPTS_COUNT) {
this.onFailure(`Failed to send batch after ${this.attemptsCount} attempts.`)
// remains this.busy === true
return
}
this.attemptsCount++
setTimeout(
() => this.sendBatch(batch, isCompressed, batchNum),
this.ATTEMPT_TIMEOUT * this.attemptsCount,
)
}
// would be nice to use Beacon API, but it is not available in WebWorker
private sendBatch(batch: Uint8Array, isCompressed?: boolean, batchNum?: string | number): void {
const batchNumStr = batchNum?.toString().replace(/^([^_]+)_([^_]+).*/, '$1_$2_$3')
this.busy = true
const headers = {
Authorization: `Bearer ${this.token as string}`,
} as Record<string, string>
if (isCompressed) {
headers['Content-Encoding'] = 'gzip'
}
/**
* sometimes happen during assist connects for some reason
* */
if (this.token === null) {
setTimeout(() => {
this.sendBatch(batch, isCompressed, `${batchNum ?? 'noBatchNum'}_newToken`)
}, 500)
return
}
fetch(`${this.ingestURL}?batch=${this.pageNo ?? 'noPageNum'}_${batchNumStr ?? 'noBatchNum'}`, {
body: batch,
method: 'POST',
headers,
keepalive: batch.length < KEEPALIVE_SIZE_LIMIT,
})
.then((r: Record<string, any>) => {
if (r.status === 401) {
// TODO: continuous session ?
this.busy = false
this.onUnauthorised()
return
} else if (r.status >= 400) {
this.retry(batch, isCompressed, `${batchNum ?? 'noBatchNum'}_network:${r.status}`)
return
}
// Success
this.attemptsCount = 0
this.sendNext()
})
.catch((e: Error) => {
console.warn('OpenReplay:', e)
this.retry(batch, isCompressed, `${batchNum ?? 'noBatchNum'}_reject:${e.message}`)
})
}
sendCompressed(batch: Uint8Array) {
const batchNum = ++this.lastBatchNum
this.sendBatch(batch, true, batchNum)
}
sendUncompressed(batch: Uint8Array) {
const batchNum = ++this.lastBatchNum
this.sendBatch(batch, false, batchNum)
}
clean() {
// sending last batch and closing the shop
this.sendNext()
setTimeout(() => {
this.token = null
this.queue.length = 0
}, 10)
}
}

View file

@ -0,0 +1,132 @@
import { describe, expect, test, jest, beforeEach, afterEach } from '@jest/globals'
import QueueSender from './QueueSender.js'
global.fetch = () => Promise.resolve(new Response()) // jsdom does not have it
function mockFetch(status: number, headers?: Record<string, string>) {
return jest.spyOn(global, 'fetch').mockImplementation((request) =>
Promise.resolve({ status, headers, request } as unknown as Response & {
request: RequestInfo
}),
)
}
const baseURL = 'MYBASEURL'
const sampleArray = new Uint8Array(1)
const randomToken = 'abc'
const requestMock = {
body: sampleArray,
headers: { Authorization: 'Bearer abc' },
keepalive: true,
method: 'POST',
}
const gzipRequestMock = {
...requestMock,
headers: { ...requestMock.headers, 'Content-Encoding': 'gzip' },
}
function defaultQueueSender({
url = baseURL,
onUnauthorised = () => {},
onFailed = () => {},
onCompress = undefined,
}: Record<string, any> = {}) {
return new QueueSender(baseURL, onUnauthorised, onFailed, 10, 1000, onCompress)
}
describe('QueueSender', () => {
afterEach(() => {
jest.restoreAllMocks()
jest.useRealTimers()
})
// Test fetch first parameter + authorization header to be present
// authorise() / push()
test('Does not call fetch if not authorised', () => {
const queueSender = defaultQueueSender()
const fetchMock = mockFetch(200)
queueSender.push(sampleArray)
expect(fetchMock).not.toBeCalled()
})
test('Calls fetch on push() if authorised', () => {
const queueSender = defaultQueueSender()
const fetchMock = mockFetch(200)
queueSender.authorise(randomToken)
expect(fetchMock).toBeCalledTimes(0)
queueSender.push(sampleArray)
expect(fetchMock).toBeCalledTimes(1)
expect(fetchMock.mock.calls[0][1]).toMatchObject(requestMock)
})
test('Sends compressed request if onCompress is provided and compressed batch is included', () => {
const queueSender = defaultQueueSender({ onCompress: () => true })
const fetchMock = mockFetch(200)
// @ts-ignore
const spyOnCompress = jest.spyOn(queueSender, 'onCompress')
// @ts-ignore
const spyOnSendNext = jest.spyOn(queueSender, 'sendNext')
queueSender.authorise(randomToken)
queueSender.push(sampleArray)
expect(spyOnCompress).toBeCalledTimes(1)
queueSender.sendCompressed(sampleArray)
expect(fetchMock).toBeCalledTimes(1)
expect(spyOnSendNext).toBeCalledTimes(1)
expect(spyOnCompress).toBeCalledTimes(1)
expect(fetchMock.mock.calls[0][1]).toMatchObject(gzipRequestMock)
})
test('Calls fetch on authorisation if there was a push() call before', () => {
const queueSender = defaultQueueSender()
const fetchMock = mockFetch(200)
queueSender.push(sampleArray)
queueSender.authorise(randomToken)
expect(fetchMock).toBeCalledTimes(1)
})
// .clean()
test("Doesn't call fetch on push() after clean()", () => {
const queueSender = defaultQueueSender()
const fetchMock = mockFetch(200)
jest.useFakeTimers()
queueSender.authorise(randomToken)
queueSender.clean()
jest.runAllTimers()
queueSender.push(sampleArray)
expect(fetchMock).not.toBeCalled()
})
test("Doesn't call fetch on authorisation if there was push() & clean() calls before", () => {
const queueSender = defaultQueueSender()
const fetchMock = mockFetch(200)
queueSender.push(sampleArray)
queueSender.clean()
queueSender.authorise(randomToken)
expect(fetchMock).not.toBeCalled()
})
//Test N sequential ToBeCalledTimes(N)
//Test N sequential pushes with different timeouts to be sequential
// onUnauthorised
test('Calls onUnauthorized callback on 401', (done) => {
const onUnauthorised = jest.fn()
const queueSender = defaultQueueSender({
onUnauthorised,
})
const fetchMock = mockFetch(401)
queueSender.authorise(randomToken)
queueSender.push(sampleArray)
setTimeout(() => {
// how to make test simpler and more explicit?
expect(onUnauthorised).toBeCalled()
done()
}, 100)
})
//Test onFailure
//Test attempts timeout/ attempts count (toBeCalledTimes on one batch)
})

View file

@ -0,0 +1,190 @@
import { Type as MType } from '../common/messages.gen.js'
import { FromWorkerData } from '../common/interaction.js'
import QueueSender from './QueueSender.js'
import BatchWriter from './BatchWriter.js'
enum WorkerStatus {
NotActive,
Starting,
Stopping,
Active,
Stopped,
}
const AUTO_SEND_INTERVAL = 10 * 1000
export default class NetworkWorker {
sender: QueueSender | null = null
writer: BatchWriter | null = null
status: WorkerStatus = WorkerStatus.NotActive
sendIntervalID: ReturnType<typeof setInterval> | null = null
restartTimeoutID: ReturnType<typeof setTimeout> | null = null
constructor(private readonly app: any) {}
processEvent(data: any): void {
if (data === null) {
this.finalize()
return
}
if (data.type === 'start') {
this.status = WorkerStatus.Starting
this.sender = new QueueSender(
data.ingestPoint,
() => {
// onUnauthorised
this.initiateRestart()
},
(reason) => {
// onFailure
this.initiateFailure(reason)
},
data.connAttemptCount,
data.connAttemptGap,
(batch) => {
this.sendEvent({ type: 'compress', batch })
},
data.pageNo,
)
this.writer = new BatchWriter(
data.pageNo,
data.timestamp,
data.url,
(batch) => {
this.sender && this.sender.push(batch)
},
data.tabId,
() => this.sendEvent({ type: 'queue_empty' }),
)
if (this.sendIntervalID === null) {
this.sendIntervalID = setInterval(this.finalize, AUTO_SEND_INTERVAL)
}
this.status = WorkerStatus.Active
return
}
if (data === 'stop') {
this.finalize()
// eslint-disable-next-line
this.reset().then(() => {
this.status = WorkerStatus.Stopped
})
return
}
if (data === 'forceFlushBatch') {
this.finalize()
return
}
if (Array.isArray(data)) {
if (this.writer) {
const w = this.writer
data.forEach((message) => {
if (message[0] === MType.SetPageVisibility) {
if (message[1]) {
// .hidden
this.restartTimeoutID = setTimeout(() => this.initiateRestart(), 30 * 60 * 1000)
} else if (this.restartTimeoutID) {
clearTimeout(this.restartTimeoutID)
}
}
w.writeMessage(message)
})
} else {
this.sendEvent('not_init')
this.initiateRestart()
}
return
}
if (data.type === 'compressed') {
if (!this.sender) {
console.debug('OR WebWorker: sender not initialised. Compressed batch.')
this.initiateRestart()
return
}
data.batch && this.sender.sendCompressed(data.batch)
}
if (data.type === 'uncompressed') {
if (!this.sender) {
console.debug('OR WebWorker: sender not initialised. Uncompressed batch.')
this.initiateRestart()
return
}
data.batch && this.sender.sendUncompressed(data.batch)
}
if (data.type === 'auth') {
if (!this.sender) {
console.debug('OR WebWorker: sender not initialised. Received auth.')
this.initiateRestart()
return
}
if (!this.writer) {
console.debug('OR WebWorker: writer not initialised. Received auth.')
this.initiateRestart()
return
}
this.sender.authorise(data.token)
data.beaconSizeLimit && this.writer.setBeaconSizeLimit(data.beaconSizeLimit)
return
}
}
finalize(): void {
if (!this.writer) {
return
}
this.writer.finaliseBatch()
}
resetWriter(): void {
if (this.writer) {
this.writer.clean()
// we don't need to wait for anything here since its sync
this.writer = null
}
}
resetSender(): void {
if (this.sender) {
this.sender.clean()
// allowing some time to send last batch
setTimeout(() => {
this.sender = null
}, 20)
}
}
reset(): Promise<any> {
return new Promise((res) => {
this.status = WorkerStatus.Stopping
if (this.sendIntervalID !== null) {
clearInterval(this.sendIntervalID)
this.sendIntervalID = null
}
this.resetWriter()
this.resetSender()
setTimeout(() => {
this.status = WorkerStatus.NotActive
res(null)
}, 100)
})
}
initiateRestart(): void {
if ([WorkerStatus.Stopped, WorkerStatus.Stopping].includes(this.status)) return
this.sendEvent('a_stop')
// eslint-disable-next-line
this.reset().then(() => {
this.sendEvent('a_start')
})
}
initiateFailure(reason: string): void {
this.sendEvent({ type: 'failure', reason })
void this.reset()
}
sendEvent(data: any): void {
this.app.processEvent(data)
}
}

View file

@ -41,6 +41,7 @@ import type { Options as SessOptions } from './session.js'
import Session from './session.js' import Session from './session.js'
import Ticker from './ticker.js' import Ticker from './ticker.js'
import { MaintainerOptions } from './nodes/maintainer.js' import { MaintainerOptions } from './nodes/maintainer.js'
import NetworkWorker from '../../NetworkWorker/index.js'
interface TypedWorker extends Omit<Worker, 'postMessage'> { interface TypedWorker extends Omit<Worker, 'postMessage'> {
postMessage(data: ToWorkerData): void postMessage(data: ToWorkerData): void
@ -71,7 +72,7 @@ interface OnStartInfo {
* this value is injected during build time via rollup * this value is injected during build time via rollup
* */ * */
// @ts-ignore // @ts-ignore
const workerBodyFn = global.WEBWORKER_BODY // const workerBodyFn = global.WEBWORKER_BODY
const CANCELED = 'canceled' as const const CANCELED = 'canceled' as const
const uxtStorageKey = 'or_uxt_active' const uxtStorageKey = 'or_uxt_active'
const bufferStorageKey = 'or_buffer_1' const bufferStorageKey = 'or_buffer_1'
@ -251,7 +252,7 @@ export default class App {
private readonly revID: string private readonly revID: string
private activityState: ActivityState = ActivityState.NotActive private activityState: ActivityState = ActivityState.NotActive
private readonly version = 'TRACKER_VERSION' // TODO: version compatability check inside each plugin. private readonly version = 'TRACKER_VERSION' // TODO: version compatability check inside each plugin.
private worker?: TypedWorker private worker?: NetworkWorker
public attributeSender: AttributeSender public attributeSender: AttributeSender
public featureFlags: FeatureFlags public featureFlags: FeatureFlags
@ -736,19 +737,11 @@ export default class App {
private initWorker() { private initWorker() {
try { try {
this.worker = new Worker( this.worker = new NetworkWorker(this)
URL.createObjectURL(new Blob([workerBodyFn], { type: 'text/javascript' })),
)
this.worker.onerror = (e) => {
this._debug('webworker_error', e)
}
this.worker.onmessage = ({ data }: MessageEvent<FromWorkerData>) => {
this.handleWorkerMsg(data)
}
const alertWorker = () => { const alertWorker = () => {
if (this.worker) { if (this.worker) {
this.worker.postMessage(null) this.worker.processEvent(null)
} }
} }
// keep better tactics, discard others? // keep better tactics, discard others?
@ -761,6 +754,10 @@ export default class App {
} }
} }
public processEvent(data: FromWorkerData) {
this.handleWorkerMsg(data)
}
private handleWorkerMsg(data: FromWorkerData) { private handleWorkerMsg(data: FromWorkerData) {
// handling 401 auth restart (new token assignment) // handling 401 auth restart (new token assignment)
if (data === 'a_stop') { if (data === 'a_stop') {
@ -789,13 +786,13 @@ export default class App {
gzip(data.batch, { mtime: 0 }, (err, result) => { gzip(data.batch, { mtime: 0 }, (err, result) => {
if (err) { if (err) {
this.debug.error('Openreplay compression error:', err) this.debug.error('Openreplay compression error:', err)
this.worker?.postMessage({ type: 'uncompressed', batch: batch }) this.worker?.processEvent({ type: 'uncompressed', batch: batch })
} else { } else {
this.worker?.postMessage({ type: 'compressed', batch: result }) this.worker?.processEvent({ type: 'compressed', batch: result })
} }
}) })
} else { } else {
this.worker?.postMessage({ type: 'uncompressed', batch: batch }) this.worker?.processEvent({ type: 'uncompressed', batch: batch })
} }
} else if (data.type === 'queue_empty') { } else if (data.type === 'queue_empty') {
this.onSessionSent() this.onSessionSent()
@ -875,7 +872,7 @@ export default class App {
requestIdleCb(() => { requestIdleCb(() => {
this.messages.unshift(TabData(this.session.getTabId())) this.messages.unshift(TabData(this.session.getTabId()))
this.messages.unshift(Timestamp(this.timestamp())) this.messages.unshift(Timestamp(this.timestamp()))
this.worker?.postMessage(this.messages) this.worker?.processEvent(this.messages)
this.commitCallbacks.forEach((cb) => cb(this.messages)) this.commitCallbacks.forEach((cb) => cb(this.messages))
this.messages.length = 0 this.messages.length = 0
}) })
@ -916,7 +913,7 @@ export default class App {
} }
private postToWorker(messages: Array<Message>) { private postToWorker(messages: Array<Message>) {
this.worker?.postMessage(messages) this.worker?.processEvent(messages)
this.commitCallbacks.forEach((cb) => cb(messages)) this.commitCallbacks.forEach((cb) => cb(messages))
} }
@ -1297,7 +1294,7 @@ export default class App {
public async uploadOfflineRecording() { public async uploadOfflineRecording() {
this.stop(false) this.stop(false)
const timestamp = now() const timestamp = now()
this.worker?.postMessage({ this.worker?.processEvent({
type: 'start', type: 'start',
pageNo: this.session.incPageNo(), pageNo: this.session.incPageNo(),
ingestPoint: this.options.ingestPoint, ingestPoint: this.options.ingestPoint,
@ -1335,7 +1332,7 @@ export default class App {
beaconSizeLimit, beaconSizeLimit,
projectID, projectID,
} = await r.json() } = await r.json()
this.worker?.postMessage({ this.worker?.processEvent({
type: 'auth', type: 'auth',
token, token,
beaconSizeLimit, beaconSizeLimit,
@ -1401,7 +1398,7 @@ export default class App {
}) })
const timestamp = now() const timestamp = now()
this.worker?.postMessage({ this.worker?.processEvent({
type: 'start', type: 'start',
pageNo: this.session.incPageNo(), pageNo: this.session.incPageNo(),
ingestPoint: this.options.ingestPoint, ingestPoint: this.options.ingestPoint,
@ -1507,9 +1504,9 @@ export default class App {
if (socketOnly) { if (socketOnly) {
this.socketMode = true this.socketMode = true
this.worker?.postMessage('stop') this.worker?.processEvent('stop')
} else { } else {
this.worker?.postMessage({ this.worker?.processEvent({
type: 'auth', type: 'auth',
token, token,
beaconSizeLimit, beaconSizeLimit,
@ -1741,7 +1738,7 @@ export default class App {
} }
forceFlushBatch() { forceFlushBatch() {
this.worker?.postMessage('forceFlushBatch') this.worker?.processEvent('forceFlushBatch')
} }
getTabId() { getTabId() {
@ -1787,7 +1784,7 @@ export default class App {
this.stopCallbacks.forEach((cb) => cb()) this.stopCallbacks.forEach((cb) => cb())
this.tagWatcher.clear() this.tagWatcher.clear()
if (this.worker && stopWorker) { if (this.worker && stopWorker) {
this.worker.postMessage('stop') this.worker.processEvent('stop')
} }
this.canvasRecorder?.clear() this.canvasRecorder?.clear()
this.messages.length = 0 this.messages.length = 0

View file

@ -230,71 +230,75 @@ export function deleteEventListener(
} }
} }
type Task = () => void | any
class FIFOTaskScheduler { class FIFOTaskScheduler {
taskQueue: any[] private taskQueue: Task[]
isRunning: boolean private isRunning: boolean
constructor() { constructor() {
this.taskQueue = [] this.taskQueue = []
this.isRunning = false this.isRunning = false
} }
// Adds a task to the queue addTask(task: Task): void {
addTask(task: () => any) {
this.taskQueue.push(task) this.taskQueue.push(task)
this.runTasks() this.runTasks()
} }
// Runs tasks from the queue private runTasks(): void {
runTasks() {
if (this.isRunning || this.taskQueue.length === 0) { if (this.isRunning || this.taskQueue.length === 0) {
return return
} }
this.isRunning = true this.isRunning = true
const executeNextTask = () => { const executeNextTask = (): void => {
if (this.taskQueue.length === 0) { if (this.taskQueue.length === 0) {
this.isRunning = false this.isRunning = false
return return
} }
// Get the next task and execute it
const nextTask = this.taskQueue.shift() const nextTask = this.taskQueue.shift()
Promise.resolve(nextTask()).then(() => {
if (!nextTask) {
requestAnimationFrame(() => executeNextTask()) requestAnimationFrame(() => executeNextTask())
}) return
}
try {
const result = nextTask()
// Use Promise.resolve() which works with both regular values and promises
Promise.resolve(result)
.catch((error) => {
console.error('Task execution failed:', error)
})
.finally(() => {
requestAnimationFrame(() => executeNextTask())
})
} catch (error) {
console.error('Task execution failed:', error)
// Continue with next task even if this one failed
requestAnimationFrame(() => executeNextTask())
}
} }
executeNextTask() executeNextTask()
} }
clearTasks(): void {
this.taskQueue = []
}
get pendingTasksCount(): number {
return this.taskQueue.length
}
} }
const scheduler = new FIFOTaskScheduler() const scheduler = new FIFOTaskScheduler()
export function requestIdleCb(callback: () => void) {
// performance improvement experiment; export function requestIdleCb(callback: Task): void {
scheduler.addTask(callback) scheduler.addTask(callback)
/**
* This is a brief polyfill that suits our needs
* I took inspiration from Microsoft Clarity polyfill on this one
* then adapted it a little bit
*
* I'm very grateful for their bright idea
* */
// const taskTimeout = 3000
// if (window.requestIdleCallback) {
// return window.requestIdleCallback(callback, { timeout: taskTimeout })
// } else {
// const channel = new MessageChannel()
// const incoming = channel.port1
// const outgoing = channel.port2
//
// incoming.onmessage = (): void => {
// callback()
// }
// requestAnimationFrame((): void => {
// outgoing.postMessage(1)
// })
// }
} }
export function simpleMerge<T>(defaultObj: T, givenObj: Partial<T>): T { export function simpleMerge<T>(defaultObj: T, givenObj: Partial<T>): T {