[Sink] Improved files sync algo (#831)
* feat(backend): use channel of changed sessions instead of sync.Map * feat(backend): avoid memory alloc for message body in message iterator * feat(backend): removed unnecessary locks in file syncer * feat(backend): sync.Map with prev updates * feat(backend): improved write algorith (added bufio.Writer) * feat(backend): session writer refactoring * feat(backend): removed unnecessary type definition * feat(backend): added write retrier to avoid data losing * feat(backend): refactoring * feat(backend): added session file implementation
This commit is contained in:
parent
d97409b60b
commit
9e319ed27c
10 changed files with 281 additions and 206 deletions
|
|
@ -32,7 +32,7 @@ func main() {
|
|||
log.Fatalf("%v doesn't exist. %v", cfg.FsDir, err)
|
||||
}
|
||||
|
||||
writer := sessionwriter.NewWriter(cfg.FsUlimit, cfg.FsDir, cfg.DeadSessionTimeout)
|
||||
writer := sessionwriter.NewWriter(cfg.FsUlimit, cfg.FsDir, cfg.FileBuffer, cfg.SyncTimeout)
|
||||
|
||||
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
|
||||
defer producer.Close(cfg.ProducerCloseTimeout)
|
||||
|
|
@ -95,26 +95,20 @@ func main() {
|
|||
counter.Update(msg.SessionID(), time.UnixMilli(ts))
|
||||
}
|
||||
|
||||
// Write encoded message with index to session file
|
||||
data := msg.EncodeWithIndex()
|
||||
// Try to encode message to avoid null data inserts
|
||||
data := msg.Encode()
|
||||
if data == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Write message to file
|
||||
if messages.IsDOMType(msg.TypeID()) {
|
||||
if err := writer.WriteDOM(msg.SessionID(), data); err != nil {
|
||||
log.Printf("Writer error: %v\n", err)
|
||||
}
|
||||
}
|
||||
if !messages.IsDOMType(msg.TypeID()) || msg.TypeID() == messages.MsgTimestamp {
|
||||
if err := writer.WriteDEV(msg.SessionID(), data); err != nil {
|
||||
log.Printf("Writer error: %v\n", err)
|
||||
}
|
||||
if err := writer.Write(msg); err != nil {
|
||||
log.Printf("writer error: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
// [METRICS] Increase the number of written to the files messages and the message size
|
||||
messageSize.Record(context.Background(), float64(len(data)))
|
||||
messageSize.Record(context.Background(), float64(len(msg.Encode())))
|
||||
savedMessages.Add(context.Background(), 1)
|
||||
}
|
||||
|
||||
|
|
@ -132,7 +126,8 @@ func main() {
|
|||
sigchan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
tick := time.Tick(30 * time.Second)
|
||||
tick := time.Tick(10 * time.Second)
|
||||
tickInfo := time.Tick(30 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case sig := <-sigchan:
|
||||
|
|
@ -146,10 +141,11 @@ func main() {
|
|||
consumer.Close()
|
||||
os.Exit(0)
|
||||
case <-tick:
|
||||
counter.Print()
|
||||
if err := consumer.Commit(); err != nil {
|
||||
log.Printf("can't commit messages: %s", err)
|
||||
}
|
||||
case <-tickInfo:
|
||||
counter.Print()
|
||||
log.Printf("writer: %s", writer.Info())
|
||||
default:
|
||||
err := consumer.ConsumeNext()
|
||||
|
|
|
|||
|
|
@ -9,7 +9,8 @@ type Config struct {
|
|||
common.Config
|
||||
FsDir string `env:"FS_DIR,required"`
|
||||
FsUlimit uint16 `env:"FS_ULIMIT,required"`
|
||||
DeadSessionTimeout int64 `env:"DEAD_SESSION_TIMEOUT,default=600"`
|
||||
FileBuffer int `env:"FILE_BUFFER,default=32768"`
|
||||
SyncTimeout int `env:"SYNC_TIMEOUT,default=5"`
|
||||
GroupSink string `env:"GROUP_SINK,required"`
|
||||
TopicRawWeb string `env:"TOPIC_RAW_WEB,required"`
|
||||
TopicRawIOS string `env:"TOPIC_RAW_IOS,required"`
|
||||
|
|
|
|||
57
backend/internal/sink/sessionwriter/file.go
Normal file
57
backend/internal/sink/sessionwriter/file.go
Normal file
|
|
@ -0,0 +1,57 @@
|
|||
package sessionwriter
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"os"
|
||||
)
|
||||
|
||||
type File struct {
|
||||
file *os.File
|
||||
buffer *bufio.Writer
|
||||
updated bool
|
||||
}
|
||||
|
||||
func NewFile(path string, bufSize int) (*File, error) {
|
||||
file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &File{
|
||||
file: file,
|
||||
buffer: bufio.NewWriterSize(file, bufSize),
|
||||
updated: false,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (f *File) Write(data []byte) error {
|
||||
leftToWrite := len(data)
|
||||
for leftToWrite > 0 {
|
||||
writtenDown, err := f.buffer.Write(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
leftToWrite -= writtenDown
|
||||
}
|
||||
f.updated = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *File) Sync() error {
|
||||
if !f.updated {
|
||||
return nil
|
||||
}
|
||||
if err := f.buffer.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := f.file.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
f.updated = false
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *File) Close() error {
|
||||
_ = f.buffer.Flush()
|
||||
_ = f.file.Sync()
|
||||
return f.file.Close()
|
||||
}
|
||||
56
backend/internal/sink/sessionwriter/meta.go
Normal file
56
backend/internal/sink/sessionwriter/meta.go
Normal file
|
|
@ -0,0 +1,56 @@
|
|||
package sessionwriter
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Meta struct {
|
||||
limit int
|
||||
lock *sync.Mutex
|
||||
meta map[uint64]int64
|
||||
}
|
||||
|
||||
func NewMeta(limit int) *Meta {
|
||||
return &Meta{
|
||||
limit: limit,
|
||||
lock: &sync.Mutex{},
|
||||
meta: make(map[uint64]int64, limit),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Meta) Add(sid uint64) {
|
||||
m.lock.Lock()
|
||||
m.meta[sid] = time.Now().Unix()
|
||||
m.lock.Unlock()
|
||||
}
|
||||
|
||||
func (m *Meta) Count() int {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
return len(m.meta)
|
||||
}
|
||||
|
||||
func (m *Meta) Delete(sid uint64) {
|
||||
m.lock.Lock()
|
||||
delete(m.meta, sid)
|
||||
m.lock.Unlock()
|
||||
}
|
||||
|
||||
func (m *Meta) GetExtra() uint64 {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
if len(m.meta) >= m.limit {
|
||||
var extraSessID uint64
|
||||
var minTimestamp int64 = math.MaxInt64
|
||||
for sessID, timestamp := range m.meta {
|
||||
if timestamp < minTimestamp {
|
||||
extraSessID = sessID
|
||||
minTimestamp = timestamp
|
||||
}
|
||||
}
|
||||
return extraSessID
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
|
@ -1,81 +1,96 @@
|
|||
package sessionwriter
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"openreplay/backend/pkg/messages"
|
||||
)
|
||||
|
||||
type Session struct {
|
||||
lock *sync.Mutex
|
||||
dom *os.File
|
||||
dev *os.File
|
||||
lastUpdate time.Time
|
||||
lock *sync.Mutex
|
||||
dom *File
|
||||
dev *File
|
||||
index []byte
|
||||
updated bool
|
||||
}
|
||||
|
||||
func NewSession(dir string, id uint64) (*Session, error) {
|
||||
if id == 0 {
|
||||
func NewSession(sessID uint64, workDir string, bufSize int) (*Session, error) {
|
||||
if sessID == 0 {
|
||||
return nil, fmt.Errorf("wrong session id")
|
||||
}
|
||||
filePath := workDir + strconv.FormatUint(sessID, 10)
|
||||
|
||||
filePath := dir + strconv.FormatUint(id, 10)
|
||||
domFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
|
||||
dom, err := NewFile(filePath, bufSize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
filePath += "devtools"
|
||||
devFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
|
||||
dev, err := NewFile(filePath+"devtools", bufSize)
|
||||
if err != nil {
|
||||
domFile.Close() // should close first file descriptor
|
||||
dom.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Session{
|
||||
lock: &sync.Mutex{},
|
||||
dom: domFile,
|
||||
dev: devFile,
|
||||
lastUpdate: time.Now(),
|
||||
lock: &sync.Mutex{},
|
||||
dom: dom,
|
||||
dev: dev,
|
||||
index: make([]byte, 8),
|
||||
updated: false,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Session) Lock() {
|
||||
func (s *Session) Write(msg messages.Message) error {
|
||||
s.lock.Lock()
|
||||
}
|
||||
defer s.lock.Unlock()
|
||||
|
||||
func (s *Session) Unlock() {
|
||||
s.lock.Unlock()
|
||||
}
|
||||
// Encode message index
|
||||
binary.LittleEndian.PutUint64(s.index, msg.Meta().Index)
|
||||
|
||||
func (s *Session) Write(mode FileType, data []byte) (err error) {
|
||||
if mode == DOM {
|
||||
_, err = s.dom.Write(data)
|
||||
} else {
|
||||
_, err = s.dev.Write(data)
|
||||
// Write message to dom.mob file
|
||||
if messages.IsDOMType(msg.TypeID()) {
|
||||
// Write message index
|
||||
if err := s.dom.Write(s.index); err != nil {
|
||||
return err
|
||||
}
|
||||
// Write message body
|
||||
if err := s.dom.Write(msg.Encode()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
s.lastUpdate = time.Now()
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Session) LastUpdate() time.Time {
|
||||
return s.lastUpdate
|
||||
s.updated = true
|
||||
// Write message to dev.mob file
|
||||
if !messages.IsDOMType(msg.TypeID()) || msg.TypeID() == messages.MsgTimestamp {
|
||||
// Write message index
|
||||
if err := s.dev.Write(s.index); err != nil {
|
||||
return err
|
||||
}
|
||||
// Write message body
|
||||
if err := s.dev.Write(msg.Encode()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Session) Sync() error {
|
||||
domErr := s.dom.Sync()
|
||||
devErr := s.dev.Sync()
|
||||
if domErr == nil && devErr == nil {
|
||||
return nil
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
if err := s.dom.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("dom: %s, dev: %s", domErr, devErr)
|
||||
return s.dev.Sync()
|
||||
}
|
||||
|
||||
func (s *Session) Close() error {
|
||||
domErr := s.dom.Close()
|
||||
devErr := s.dev.Close()
|
||||
if domErr == nil && devErr == nil {
|
||||
return nil
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
if err := s.dom.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("dom: %s, dev: %s", domErr, devErr)
|
||||
return s.dev.Close()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,8 +0,0 @@
|
|||
package sessionwriter
|
||||
|
||||
type FileType int
|
||||
|
||||
const (
|
||||
DOM FileType = 1
|
||||
DEV FileType = 2
|
||||
)
|
||||
|
|
@ -3,47 +3,89 @@ package sessionwriter
|
|||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"openreplay/backend/pkg/messages"
|
||||
)
|
||||
|
||||
type SessionWriter struct {
|
||||
ulimit int
|
||||
dir string
|
||||
zombieSessionTimeout float64
|
||||
lock *sync.Mutex
|
||||
sessions *sync.Map
|
||||
meta map[uint64]int64
|
||||
done chan struct{}
|
||||
stopped chan struct{}
|
||||
filesLimit int
|
||||
workingDir string
|
||||
fileBuffer int
|
||||
syncTimeout time.Duration
|
||||
meta *Meta
|
||||
sessions *sync.Map
|
||||
done chan struct{}
|
||||
stopped chan struct{}
|
||||
}
|
||||
|
||||
func NewWriter(ulimit uint16, dir string, zombieSessionTimeout int64) *SessionWriter {
|
||||
func NewWriter(filesLimit uint16, workingDir string, fileBuffer int, syncTimeout int) *SessionWriter {
|
||||
w := &SessionWriter{
|
||||
ulimit: int(ulimit) / 2, // should divide by 2 because each session has 2 files
|
||||
dir: dir + "/",
|
||||
zombieSessionTimeout: float64(zombieSessionTimeout),
|
||||
lock: &sync.Mutex{},
|
||||
sessions: &sync.Map{},
|
||||
meta: make(map[uint64]int64, ulimit),
|
||||
done: make(chan struct{}),
|
||||
stopped: make(chan struct{}),
|
||||
filesLimit: int(filesLimit) / 2, // should divide by 2 because each session has 2 files
|
||||
workingDir: workingDir + "/",
|
||||
fileBuffer: fileBuffer,
|
||||
syncTimeout: time.Duration(syncTimeout) * time.Second,
|
||||
meta: NewMeta(int(filesLimit)),
|
||||
sessions: &sync.Map{},
|
||||
done: make(chan struct{}),
|
||||
stopped: make(chan struct{}),
|
||||
}
|
||||
go w.synchronizer()
|
||||
return w
|
||||
}
|
||||
|
||||
func (w *SessionWriter) WriteDOM(sid uint64, data []byte) error {
|
||||
return w.write(sid, DOM, data)
|
||||
func (w *SessionWriter) Write(msg messages.Message) (err error) {
|
||||
var (
|
||||
sess *Session
|
||||
sid = msg.SessionID()
|
||||
)
|
||||
|
||||
// Load session
|
||||
sessObj, ok := w.sessions.Load(sid)
|
||||
if !ok {
|
||||
// Create new session
|
||||
sess, err = NewSession(sid, w.workingDir, w.fileBuffer)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't create session: %d, err: %s", sid, err)
|
||||
}
|
||||
|
||||
// Check opened sessions limit and close extra session if you need to
|
||||
if extraSessID := w.meta.GetExtra(); extraSessID != 0 {
|
||||
if err := w.Close(extraSessID); err != nil {
|
||||
log.Printf("can't close session: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Add created session
|
||||
w.sessions.Store(sid, sess)
|
||||
w.meta.Add(sid)
|
||||
} else {
|
||||
sess = sessObj.(*Session)
|
||||
}
|
||||
|
||||
// Write data to session
|
||||
return sess.Write(msg)
|
||||
}
|
||||
|
||||
func (w *SessionWriter) WriteDEV(sid uint64, data []byte) error {
|
||||
return w.write(sid, DEV, data)
|
||||
func (w *SessionWriter) sync(sid uint64) error {
|
||||
sessObj, ok := w.sessions.Load(sid)
|
||||
if !ok {
|
||||
return fmt.Errorf("session: %d not found", sid)
|
||||
}
|
||||
sess := sessObj.(*Session)
|
||||
return sess.Sync()
|
||||
}
|
||||
|
||||
func (w *SessionWriter) Close(sid uint64) {
|
||||
w.close(sid)
|
||||
func (w *SessionWriter) Close(sid uint64) error {
|
||||
sessObj, ok := w.sessions.LoadAndDelete(sid)
|
||||
if !ok {
|
||||
return fmt.Errorf("session: %d not found", sid)
|
||||
}
|
||||
sess := sessObj.(*Session)
|
||||
err := sess.Close()
|
||||
w.meta.Delete(sid)
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *SessionWriter) Stop() {
|
||||
|
|
@ -52,110 +94,11 @@ func (w *SessionWriter) Stop() {
|
|||
}
|
||||
|
||||
func (w *SessionWriter) Info() string {
|
||||
return fmt.Sprintf("%d sessions", w.numberOfSessions())
|
||||
}
|
||||
|
||||
func (w *SessionWriter) addSession(sid uint64) {
|
||||
w.lock.Lock()
|
||||
w.meta[sid] = time.Now().Unix()
|
||||
w.lock.Unlock()
|
||||
}
|
||||
|
||||
func (w *SessionWriter) deleteSession(sid uint64) {
|
||||
w.lock.Lock()
|
||||
delete(w.meta, sid)
|
||||
w.lock.Unlock()
|
||||
}
|
||||
|
||||
func (w *SessionWriter) numberOfSessions() int {
|
||||
w.lock.Lock()
|
||||
defer w.lock.Unlock()
|
||||
return len(w.meta)
|
||||
}
|
||||
|
||||
func (w *SessionWriter) write(sid uint64, mode FileType, data []byte) error {
|
||||
var (
|
||||
sess *Session
|
||||
err error
|
||||
)
|
||||
|
||||
sessObj, ok := w.sessions.Load(sid)
|
||||
if !ok {
|
||||
sess, err = NewSession(w.dir, sid)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't write to session: %d, err: %s", sid, err)
|
||||
}
|
||||
sess.Lock()
|
||||
defer sess.Unlock()
|
||||
|
||||
// Check opened files limit
|
||||
if len(w.meta) >= w.ulimit {
|
||||
var oldSessID uint64
|
||||
var minTimestamp int64 = math.MaxInt64
|
||||
for sessID, timestamp := range w.meta {
|
||||
if timestamp < minTimestamp {
|
||||
oldSessID = sessID
|
||||
minTimestamp = timestamp
|
||||
}
|
||||
}
|
||||
if err := w.close(oldSessID); err != nil {
|
||||
log.Printf("can't close session: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Add new session to manager
|
||||
w.sessions.Store(sid, sess)
|
||||
w.addSession(sid)
|
||||
} else {
|
||||
sess = sessObj.(*Session)
|
||||
sess.Lock()
|
||||
defer sess.Unlock()
|
||||
}
|
||||
|
||||
// Write data to session
|
||||
return sess.Write(mode, data)
|
||||
}
|
||||
|
||||
func (w *SessionWriter) sync(sid uint64) error {
|
||||
sessObj, ok := w.sessions.Load(sid)
|
||||
if !ok {
|
||||
return fmt.Errorf("can't sync, session: %d not found", sid)
|
||||
}
|
||||
sess := sessObj.(*Session)
|
||||
sess.Lock()
|
||||
defer sess.Unlock()
|
||||
|
||||
err := sess.Sync()
|
||||
if time.Now().Sub(sess.LastUpdate()).Seconds() > w.zombieSessionTimeout {
|
||||
if err != nil {
|
||||
log.Printf("can't sync session: %d, err: %s", sid, err)
|
||||
}
|
||||
// Close "zombie" session
|
||||
err = sess.Close()
|
||||
w.deleteSession(sid)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *SessionWriter) close(sid uint64) error {
|
||||
sessObj, ok := w.sessions.LoadAndDelete(sid)
|
||||
if !ok {
|
||||
return fmt.Errorf("can't close, session: %d not found", sid)
|
||||
}
|
||||
sess := sessObj.(*Session)
|
||||
sess.Lock()
|
||||
defer sess.Unlock()
|
||||
|
||||
if err := sess.Sync(); err != nil {
|
||||
log.Printf("can't sync session: %d, err: %s", sid, err)
|
||||
}
|
||||
err := sess.Close()
|
||||
w.deleteSession(sid)
|
||||
return err
|
||||
return fmt.Sprintf("%d sessions", w.meta.Count())
|
||||
}
|
||||
|
||||
func (w *SessionWriter) synchronizer() {
|
||||
tick := time.Tick(2 * time.Second)
|
||||
tick := time.Tick(w.syncTimeout)
|
||||
for {
|
||||
select {
|
||||
case <-tick:
|
||||
|
|
@ -167,7 +110,7 @@ func (w *SessionWriter) synchronizer() {
|
|||
})
|
||||
case <-w.done:
|
||||
w.sessions.Range(func(sid, lockObj any) bool {
|
||||
if err := w.close(sid.(uint64)); err != nil {
|
||||
if err := w.Close(sid.(uint64)); err != nil {
|
||||
log.Printf("can't close file descriptor: %s", err)
|
||||
}
|
||||
return true
|
||||
|
|
|
|||
|
|
@ -100,6 +100,7 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) {
|
|||
tp: msgType,
|
||||
size: i.size,
|
||||
reader: reader,
|
||||
raw: batchData,
|
||||
skipped: &i.canSkip,
|
||||
broken: &i.broken,
|
||||
meta: i.messageInfo,
|
||||
|
|
|
|||
|
|
@ -8,13 +8,17 @@ import (
|
|||
"log"
|
||||
)
|
||||
|
||||
var (
|
||||
one = []byte{0}
|
||||
three = []byte{0, 0, 0}
|
||||
)
|
||||
|
||||
func ReadByte(reader io.Reader) (byte, error) {
|
||||
p := make([]byte, 1)
|
||||
_, err := io.ReadFull(reader, p)
|
||||
_, err := io.ReadFull(reader, one)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return p[0], nil
|
||||
return one[0], nil
|
||||
}
|
||||
|
||||
func ReadData(reader io.Reader) ([]byte, error) {
|
||||
|
|
@ -156,8 +160,7 @@ func WriteSize(size uint64, buf []byte, p int) {
|
|||
}
|
||||
|
||||
func ReadSize(reader io.Reader) (uint64, error) {
|
||||
buf := make([]byte, 3)
|
||||
n, err := io.ReadFull(reader, buf)
|
||||
n, err := io.ReadFull(reader, three)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
|
@ -165,7 +168,7 @@ func ReadSize(reader io.Reader) (uint64, error) {
|
|||
return 0, fmt.Errorf("read only %d of 3 size bytes", n)
|
||||
}
|
||||
var size uint64
|
||||
for i, b := range buf {
|
||||
for i, b := range three {
|
||||
size += uint64(b) << (8 * i)
|
||||
}
|
||||
return size, nil
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ type RawMessage struct {
|
|||
size uint64
|
||||
data []byte
|
||||
reader *bytes.Reader
|
||||
raw []byte
|
||||
meta *message
|
||||
encoded bool
|
||||
skipped *bool
|
||||
|
|
@ -23,15 +24,25 @@ func (m *RawMessage) Encode() []byte {
|
|||
if m.encoded {
|
||||
return m.data
|
||||
}
|
||||
m.data = make([]byte, m.size+1)
|
||||
m.data[0] = uint8(m.tp)
|
||||
m.encoded = true
|
||||
*m.skipped = false
|
||||
_, err := io.ReadFull(m.reader, m.data[1:])
|
||||
if err != nil {
|
||||
log.Printf("message encode err: %s, type: %d, sess: %d", err, m.tp, m.SessionID())
|
||||
// Try to avoid EOF error
|
||||
if m.reader.Len() < int(m.size) {
|
||||
return nil
|
||||
}
|
||||
// Get current batch position
|
||||
currPos, err := m.reader.Seek(0, io.SeekCurrent)
|
||||
if err != nil {
|
||||
log.Printf("can't get current batch position: %s", err)
|
||||
return nil
|
||||
}
|
||||
// "Move" message type
|
||||
if currPos == 0 {
|
||||
log.Printf("can't move message type, curr position = %d", currPos)
|
||||
return nil
|
||||
}
|
||||
// Dirty hack to avoid extra memory allocation
|
||||
m.raw[currPos-1] = uint8(m.tp)
|
||||
m.data = m.raw[currPos-1 : currPos+int64(m.size)]
|
||||
m.encoded = true
|
||||
return m.data
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue