fix(frontend): live time travel - reuse messages received through socket

This commit is contained in:
Alex Kaminskii 2022-10-12 13:13:15 +02:00
parent 9972eead27
commit a73828fb43
2 changed files with 21 additions and 28 deletions

View file

@ -206,6 +206,10 @@ export default class MessageDistributor extends StatedScreen {
toast.error('Error requesting a session file')
}
private onFileReadFinally = () => {
this.incomingMessages
.filter(msg => msg.time >= this.lastMessageInFileTime)
.forEach(msg => this.distributeMessage(msg, 0))
this.waitingForFiles = false
this.setMessagesLoading(false)
}
@ -258,7 +262,6 @@ export default class MessageDistributor extends StatedScreen {
});
// assist will pause and skip messages to prevent timestamp related errors
this.assistManager.toggleTimeTravelJump()
this.reloadMessageManagers()
this.windowNodeCounter.reset()
@ -271,9 +274,6 @@ export default class MessageDistributor extends StatedScreen {
.then(this.onFileReadSuccess)
.catch(this.onFileReadFailed)
.finally(this.onFileReadFinally)
.then(() => {
this.assistManager.toggleTimeTravelJump()
})
}
private reloadMessageManagers() {
@ -384,14 +384,23 @@ export default class MessageDistributor extends StatedScreen {
return { ...msg, ...decoded };
}
/* Binded */
distributeMessage(msg: Message, index: number): void {
private readonly incomingMessages: Message[] = []
appendMessage(msg: Message, index: number) {
// @ts-ignore
// msg.time = this.md.getLastRecordedMessageTime() + msg.time\
//TODO: put index in message type
this.incomingMessages.push(msg)
if (!this.waitingForFiles) {
this.distributeMessage(msg, index)
}
}
private distributeMessage(msg: Message, index: number): void {
const lastMessageTime = Math.max(msg.time, this.lastMessageTime)
this.lastMessageTime = lastMessageTime
if (visualChanges.includes(msg.tp)) {
this.activityManager?.updateAcctivity(msg.time);
}
//const index = i + index; //?
let decoded;
const time = msg.time;
switch (msg.tp) {
@ -529,9 +538,7 @@ export default class MessageDistributor extends StatedScreen {
super.clean();
update(INITIAL_STATE);
this.assistManager.clear();
this.incomingMessages.length = 0
}
getLastRecordedMessageTime(): number {
return this.lastMessageInFileTime;
}
}

View file

@ -74,9 +74,8 @@ const MAX_RECONNECTION_COUNT = 4;
export default class AssistManager {
private timeTravelJump = false;
private jumped = false;
// TODO: Session type
constructor(private session: any, private md: MessageDistributor, private config: any) {}
private setStatus(status: ConnectionStatus) {
@ -121,7 +120,7 @@ export default class AssistManager {
private socket: Socket | null = null
connect(agentToken: string) {
const jmr = new JSONRawMessageReader()
const reader = new MStreamReader(jmr)
const reader = new MStreamReader(jmr, this.session.startedAt)
let waitingForMessages = true
let disconnectTimeout: ReturnType<typeof setTimeout> | undefined
let inactiveTimeout: ReturnType<typeof setTimeout> | undefined
@ -164,7 +163,7 @@ export default class AssistManager {
update({ calling: CallingState.NoCall })
})
socket.on('messages', messages => {
!this.timeTravelJump && jmr.append(messages) // as RawMessage[]
jmr.append(messages) // as RawMessage[]
if (waitingForMessages) {
waitingForMessages = false // TODO: more explicit
@ -176,17 +175,9 @@ export default class AssistManager {
}
}
if (this.timeTravelJump) {
return;
}
for (let msg = reader.readNext();msg !== null;msg = reader.readNext()) {
if (this.jumped) {
// @ts-ignore
msg.time = this.md.getLastRecordedMessageTime() + msg.time
}
// @ts-ignore TODO: fix msg types in generator
this.md.distributeMessage(msg, msg._index)
this.md.appendMessage(msg, msg._index)
}
})
socket.on("control_granted", id => {
@ -561,11 +552,6 @@ export default class AssistManager {
private annot: AnnotationCanvas | null = null
toggleTimeTravelJump() {
this.jumped = true;
this.timeTravelJump = !this.timeTravelJump;
}
/* ==== Cleaning ==== */
private cleaned: boolean = false
clear() {