[Storage] timestamp sorting and filtering (#1218)
* feat(backend): combined sorting by index and timestamp * feat(backend): write the only last timestamp message in a row
This commit is contained in:
parent
4031ac90f5
commit
c98a93cfd4
1 changed files with 45 additions and 14 deletions
|
|
@ -9,11 +9,12 @@ import (
|
|||
)
|
||||
|
||||
type msgInfo struct {
|
||||
index uint64
|
||||
start int64
|
||||
end int64
|
||||
body Message
|
||||
msgType uint64
|
||||
index uint64
|
||||
start int64
|
||||
end int64
|
||||
body Message
|
||||
msgType uint64
|
||||
timestamp uint64
|
||||
}
|
||||
|
||||
func (m *msgInfo) Print() string {
|
||||
|
|
@ -23,6 +24,7 @@ func (m *msgInfo) Print() string {
|
|||
func SplitMessages(data []byte) ([]*msgInfo, error) {
|
||||
messages := make([]*msgInfo, 0)
|
||||
indexes := make(map[uint64]bool)
|
||||
var lastTimestamp uint64
|
||||
reader := NewBytesReader(data)
|
||||
for {
|
||||
// Get message start
|
||||
|
|
@ -59,20 +61,34 @@ func SplitMessages(data []byte) ([]*msgInfo, error) {
|
|||
}
|
||||
indexes[msgIndex] = true
|
||||
|
||||
// Update current timestamp
|
||||
if msgType == MsgTimestamp {
|
||||
msgBody := body.(*Timestamp)
|
||||
lastTimestamp = msgBody.Timestamp
|
||||
}
|
||||
|
||||
// Add new message info to messages slice
|
||||
messages = append(messages, &msgInfo{
|
||||
index: msgIndex,
|
||||
start: msgStart,
|
||||
end: reader.Pointer(),
|
||||
body: body,
|
||||
msgType: msgType,
|
||||
index: msgIndex,
|
||||
start: msgStart + 8, // start pointer without index (that's why we use +8)
|
||||
end: reader.Pointer(),
|
||||
body: body,
|
||||
msgType: msgType,
|
||||
timestamp: lastTimestamp,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func SortMessages(messages []*msgInfo) []*msgInfo {
|
||||
// Sort messages by index
|
||||
sort.SliceStable(messages, func(i, j int) bool {
|
||||
return messages[i].index < messages[j].index
|
||||
if messages[i].timestamp < messages[j].timestamp {
|
||||
return true
|
||||
}
|
||||
if messages[i].timestamp == messages[j].timestamp {
|
||||
return messages[i].index < messages[j].index
|
||||
}
|
||||
return false
|
||||
})
|
||||
return messages
|
||||
}
|
||||
|
|
@ -81,9 +97,24 @@ func MergeMessages(data []byte, messages []*msgInfo) []byte {
|
|||
sortedSession := bytes.NewBuffer(make([]byte, 0, len(data)))
|
||||
// Add maximum possible index value to the start of the session to inform player about new version of mob file
|
||||
sortedSession.Write([]byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff})
|
||||
for _, info := range messages {
|
||||
// Write message without index (that's why we use +8)
|
||||
sortedSession.Write(data[info.start+8 : info.end])
|
||||
|
||||
var lastTsIndex int = -1 // not set
|
||||
for i, info := range messages {
|
||||
if info.msgType == MsgTimestamp {
|
||||
// Save index of last timestamp message and continue to read next message
|
||||
lastTsIndex = i
|
||||
continue
|
||||
}
|
||||
|
||||
// Write last timestamp message if it exists
|
||||
if lastTsIndex != -1 {
|
||||
tsInfo := messages[lastTsIndex]
|
||||
sortedSession.Write(data[tsInfo.start:tsInfo.end])
|
||||
lastTsIndex = -1
|
||||
}
|
||||
|
||||
// Write current message
|
||||
sortedSession.Write(data[info.start:info.end])
|
||||
}
|
||||
return sortedSession.Bytes()
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue