diff --git a/backend/cmd/sink/main.go b/backend/cmd/sink/main.go index 84520dd33..a7e2804c4 100644 --- a/backend/cmd/sink/main.go +++ b/backend/cmd/sink/main.go @@ -32,7 +32,7 @@ func main() { log.Fatalf("%v doesn't exist. %v", cfg.FsDir, err) } - writer := sessionwriter.NewWriter(cfg.FsUlimit, cfg.FsDir, cfg.DeadSessionTimeout) + writer := sessionwriter.NewWriter(cfg.FsUlimit, cfg.FsDir, cfg.FileBuffer, cfg.SyncTimeout) producer := queue.NewProducer(cfg.MessageSizeLimit, true) defer producer.Close(cfg.ProducerCloseTimeout) @@ -95,26 +95,20 @@ func main() { counter.Update(msg.SessionID(), time.UnixMilli(ts)) } - // Write encoded message with index to session file - data := msg.EncodeWithIndex() + // Try to encode message to avoid null data inserts + data := msg.Encode() if data == nil { return } // Write message to file - if messages.IsDOMType(msg.TypeID()) { - if err := writer.WriteDOM(msg.SessionID(), data); err != nil { - log.Printf("Writer error: %v\n", err) - } - } - if !messages.IsDOMType(msg.TypeID()) || msg.TypeID() == messages.MsgTimestamp { - if err := writer.WriteDEV(msg.SessionID(), data); err != nil { - log.Printf("Writer error: %v\n", err) - } + if err := writer.Write(msg); err != nil { + log.Printf("writer error: %s", err) + return } // [METRICS] Increase the number of written to the files messages and the message size - messageSize.Record(context.Background(), float64(len(data))) + messageSize.Record(context.Background(), float64(len(msg.Encode()))) savedMessages.Add(context.Background(), 1) } @@ -132,7 +126,8 @@ func main() { sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - tick := time.Tick(30 * time.Second) + tick := time.Tick(10 * time.Second) + tickInfo := time.Tick(30 * time.Second) for { select { case sig := <-sigchan: @@ -146,10 +141,11 @@ func main() { consumer.Close() os.Exit(0) case <-tick: - counter.Print() if err := consumer.Commit(); err != nil { log.Printf("can't commit messages: %s", err) } + case <-tickInfo: + counter.Print() log.Printf("writer: %s", writer.Info()) default: err := consumer.ConsumeNext() diff --git a/backend/internal/config/sink/config.go b/backend/internal/config/sink/config.go index a8703a596..1a2df142e 100644 --- a/backend/internal/config/sink/config.go +++ b/backend/internal/config/sink/config.go @@ -9,7 +9,8 @@ type Config struct { common.Config FsDir string `env:"FS_DIR,required"` FsUlimit uint16 `env:"FS_ULIMIT,required"` - DeadSessionTimeout int64 `env:"DEAD_SESSION_TIMEOUT,default=600"` + FileBuffer int `env:"FILE_BUFFER,default=32768"` + SyncTimeout int `env:"SYNC_TIMEOUT,default=5"` GroupSink string `env:"GROUP_SINK,required"` TopicRawWeb string `env:"TOPIC_RAW_WEB,required"` TopicRawIOS string `env:"TOPIC_RAW_IOS,required"` diff --git a/backend/internal/sink/sessionwriter/file.go b/backend/internal/sink/sessionwriter/file.go new file mode 100644 index 000000000..1ad076d72 --- /dev/null +++ b/backend/internal/sink/sessionwriter/file.go @@ -0,0 +1,57 @@ +package sessionwriter + +import ( + "bufio" + "os" +) + +type File struct { + file *os.File + buffer *bufio.Writer + updated bool +} + +func NewFile(path string, bufSize int) (*File, error) { + file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + return nil, err + } + return &File{ + file: file, + buffer: bufio.NewWriterSize(file, bufSize), + updated: false, + }, nil +} + +func (f *File) Write(data []byte) error { + leftToWrite := len(data) + for leftToWrite > 0 { + writtenDown, err := f.buffer.Write(data) + if err != nil { + return err + } + leftToWrite -= writtenDown + } + f.updated = true + return nil +} + +func (f *File) Sync() error { + if !f.updated { + return nil + } + if err := f.buffer.Flush(); err != nil { + return err + } + if err := f.file.Sync(); err != nil { + return err + } + f.updated = false + return nil +} + +func (f *File) Close() error { + _ = f.buffer.Flush() + _ = f.file.Sync() + return f.file.Close() +} diff --git a/backend/internal/sink/sessionwriter/meta.go b/backend/internal/sink/sessionwriter/meta.go new file mode 100644 index 000000000..4fac56e50 --- /dev/null +++ b/backend/internal/sink/sessionwriter/meta.go @@ -0,0 +1,56 @@ +package sessionwriter + +import ( + "math" + "sync" + "time" +) + +type Meta struct { + limit int + lock *sync.Mutex + meta map[uint64]int64 +} + +func NewMeta(limit int) *Meta { + return &Meta{ + limit: limit, + lock: &sync.Mutex{}, + meta: make(map[uint64]int64, limit), + } +} + +func (m *Meta) Add(sid uint64) { + m.lock.Lock() + m.meta[sid] = time.Now().Unix() + m.lock.Unlock() +} + +func (m *Meta) Count() int { + m.lock.Lock() + defer m.lock.Unlock() + return len(m.meta) +} + +func (m *Meta) Delete(sid uint64) { + m.lock.Lock() + delete(m.meta, sid) + m.lock.Unlock() +} + +func (m *Meta) GetExtra() uint64 { + m.lock.Lock() + defer m.lock.Unlock() + if len(m.meta) >= m.limit { + var extraSessID uint64 + var minTimestamp int64 = math.MaxInt64 + for sessID, timestamp := range m.meta { + if timestamp < minTimestamp { + extraSessID = sessID + minTimestamp = timestamp + } + } + return extraSessID + } + return 0 +} diff --git a/backend/internal/sink/sessionwriter/session.go b/backend/internal/sink/sessionwriter/session.go index f107c387b..8cf8881de 100644 --- a/backend/internal/sink/sessionwriter/session.go +++ b/backend/internal/sink/sessionwriter/session.go @@ -1,81 +1,96 @@ package sessionwriter import ( + "encoding/binary" "fmt" - "os" "strconv" "sync" - "time" + + "openreplay/backend/pkg/messages" ) type Session struct { - lock *sync.Mutex - dom *os.File - dev *os.File - lastUpdate time.Time + lock *sync.Mutex + dom *File + dev *File + index []byte + updated bool } -func NewSession(dir string, id uint64) (*Session, error) { - if id == 0 { +func NewSession(sessID uint64, workDir string, bufSize int) (*Session, error) { + if sessID == 0 { return nil, fmt.Errorf("wrong session id") } + filePath := workDir + strconv.FormatUint(sessID, 10) - filePath := dir + strconv.FormatUint(id, 10) - domFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + dom, err := NewFile(filePath, bufSize) if err != nil { return nil, err } - filePath += "devtools" - devFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + dev, err := NewFile(filePath+"devtools", bufSize) if err != nil { - domFile.Close() // should close first file descriptor + dom.Close() return nil, err } return &Session{ - lock: &sync.Mutex{}, - dom: domFile, - dev: devFile, - lastUpdate: time.Now(), + lock: &sync.Mutex{}, + dom: dom, + dev: dev, + index: make([]byte, 8), + updated: false, }, nil } -func (s *Session) Lock() { +func (s *Session) Write(msg messages.Message) error { s.lock.Lock() -} + defer s.lock.Unlock() -func (s *Session) Unlock() { - s.lock.Unlock() -} + // Encode message index + binary.LittleEndian.PutUint64(s.index, msg.Meta().Index) -func (s *Session) Write(mode FileType, data []byte) (err error) { - if mode == DOM { - _, err = s.dom.Write(data) - } else { - _, err = s.dev.Write(data) + // Write message to dom.mob file + if messages.IsDOMType(msg.TypeID()) { + // Write message index + if err := s.dom.Write(s.index); err != nil { + return err + } + // Write message body + if err := s.dom.Write(msg.Encode()); err != nil { + return err + } } - s.lastUpdate = time.Now() - return err -} - -func (s *Session) LastUpdate() time.Time { - return s.lastUpdate + s.updated = true + // Write message to dev.mob file + if !messages.IsDOMType(msg.TypeID()) || msg.TypeID() == messages.MsgTimestamp { + // Write message index + if err := s.dev.Write(s.index); err != nil { + return err + } + // Write message body + if err := s.dev.Write(msg.Encode()); err != nil { + return err + } + } + return nil } func (s *Session) Sync() error { - domErr := s.dom.Sync() - devErr := s.dev.Sync() - if domErr == nil && devErr == nil { - return nil + s.lock.Lock() + defer s.lock.Unlock() + + if err := s.dom.Sync(); err != nil { + return err } - return fmt.Errorf("dom: %s, dev: %s", domErr, devErr) + return s.dev.Sync() } func (s *Session) Close() error { - domErr := s.dom.Close() - devErr := s.dev.Close() - if domErr == nil && devErr == nil { - return nil + s.lock.Lock() + defer s.lock.Unlock() + + if err := s.dom.Close(); err != nil { + return err } - return fmt.Errorf("dom: %s, dev: %s", domErr, devErr) + return s.dev.Close() } diff --git a/backend/internal/sink/sessionwriter/types.go b/backend/internal/sink/sessionwriter/types.go deleted file mode 100644 index a20f61375..000000000 --- a/backend/internal/sink/sessionwriter/types.go +++ /dev/null @@ -1,8 +0,0 @@ -package sessionwriter - -type FileType int - -const ( - DOM FileType = 1 - DEV FileType = 2 -) diff --git a/backend/internal/sink/sessionwriter/writer.go b/backend/internal/sink/sessionwriter/writer.go index 94ff5dd66..f2eb052c7 100644 --- a/backend/internal/sink/sessionwriter/writer.go +++ b/backend/internal/sink/sessionwriter/writer.go @@ -3,47 +3,89 @@ package sessionwriter import ( "fmt" "log" - "math" "sync" "time" + + "openreplay/backend/pkg/messages" ) type SessionWriter struct { - ulimit int - dir string - zombieSessionTimeout float64 - lock *sync.Mutex - sessions *sync.Map - meta map[uint64]int64 - done chan struct{} - stopped chan struct{} + filesLimit int + workingDir string + fileBuffer int + syncTimeout time.Duration + meta *Meta + sessions *sync.Map + done chan struct{} + stopped chan struct{} } -func NewWriter(ulimit uint16, dir string, zombieSessionTimeout int64) *SessionWriter { +func NewWriter(filesLimit uint16, workingDir string, fileBuffer int, syncTimeout int) *SessionWriter { w := &SessionWriter{ - ulimit: int(ulimit) / 2, // should divide by 2 because each session has 2 files - dir: dir + "/", - zombieSessionTimeout: float64(zombieSessionTimeout), - lock: &sync.Mutex{}, - sessions: &sync.Map{}, - meta: make(map[uint64]int64, ulimit), - done: make(chan struct{}), - stopped: make(chan struct{}), + filesLimit: int(filesLimit) / 2, // should divide by 2 because each session has 2 files + workingDir: workingDir + "/", + fileBuffer: fileBuffer, + syncTimeout: time.Duration(syncTimeout) * time.Second, + meta: NewMeta(int(filesLimit)), + sessions: &sync.Map{}, + done: make(chan struct{}), + stopped: make(chan struct{}), } go w.synchronizer() return w } -func (w *SessionWriter) WriteDOM(sid uint64, data []byte) error { - return w.write(sid, DOM, data) +func (w *SessionWriter) Write(msg messages.Message) (err error) { + var ( + sess *Session + sid = msg.SessionID() + ) + + // Load session + sessObj, ok := w.sessions.Load(sid) + if !ok { + // Create new session + sess, err = NewSession(sid, w.workingDir, w.fileBuffer) + if err != nil { + return fmt.Errorf("can't create session: %d, err: %s", sid, err) + } + + // Check opened sessions limit and close extra session if you need to + if extraSessID := w.meta.GetExtra(); extraSessID != 0 { + if err := w.Close(extraSessID); err != nil { + log.Printf("can't close session: %s", err) + } + } + + // Add created session + w.sessions.Store(sid, sess) + w.meta.Add(sid) + } else { + sess = sessObj.(*Session) + } + + // Write data to session + return sess.Write(msg) } -func (w *SessionWriter) WriteDEV(sid uint64, data []byte) error { - return w.write(sid, DEV, data) +func (w *SessionWriter) sync(sid uint64) error { + sessObj, ok := w.sessions.Load(sid) + if !ok { + return fmt.Errorf("session: %d not found", sid) + } + sess := sessObj.(*Session) + return sess.Sync() } -func (w *SessionWriter) Close(sid uint64) { - w.close(sid) +func (w *SessionWriter) Close(sid uint64) error { + sessObj, ok := w.sessions.LoadAndDelete(sid) + if !ok { + return fmt.Errorf("session: %d not found", sid) + } + sess := sessObj.(*Session) + err := sess.Close() + w.meta.Delete(sid) + return err } func (w *SessionWriter) Stop() { @@ -52,110 +94,11 @@ func (w *SessionWriter) Stop() { } func (w *SessionWriter) Info() string { - return fmt.Sprintf("%d sessions", w.numberOfSessions()) -} - -func (w *SessionWriter) addSession(sid uint64) { - w.lock.Lock() - w.meta[sid] = time.Now().Unix() - w.lock.Unlock() -} - -func (w *SessionWriter) deleteSession(sid uint64) { - w.lock.Lock() - delete(w.meta, sid) - w.lock.Unlock() -} - -func (w *SessionWriter) numberOfSessions() int { - w.lock.Lock() - defer w.lock.Unlock() - return len(w.meta) -} - -func (w *SessionWriter) write(sid uint64, mode FileType, data []byte) error { - var ( - sess *Session - err error - ) - - sessObj, ok := w.sessions.Load(sid) - if !ok { - sess, err = NewSession(w.dir, sid) - if err != nil { - return fmt.Errorf("can't write to session: %d, err: %s", sid, err) - } - sess.Lock() - defer sess.Unlock() - - // Check opened files limit - if len(w.meta) >= w.ulimit { - var oldSessID uint64 - var minTimestamp int64 = math.MaxInt64 - for sessID, timestamp := range w.meta { - if timestamp < minTimestamp { - oldSessID = sessID - minTimestamp = timestamp - } - } - if err := w.close(oldSessID); err != nil { - log.Printf("can't close session: %s", err) - } - } - - // Add new session to manager - w.sessions.Store(sid, sess) - w.addSession(sid) - } else { - sess = sessObj.(*Session) - sess.Lock() - defer sess.Unlock() - } - - // Write data to session - return sess.Write(mode, data) -} - -func (w *SessionWriter) sync(sid uint64) error { - sessObj, ok := w.sessions.Load(sid) - if !ok { - return fmt.Errorf("can't sync, session: %d not found", sid) - } - sess := sessObj.(*Session) - sess.Lock() - defer sess.Unlock() - - err := sess.Sync() - if time.Now().Sub(sess.LastUpdate()).Seconds() > w.zombieSessionTimeout { - if err != nil { - log.Printf("can't sync session: %d, err: %s", sid, err) - } - // Close "zombie" session - err = sess.Close() - w.deleteSession(sid) - } - return err -} - -func (w *SessionWriter) close(sid uint64) error { - sessObj, ok := w.sessions.LoadAndDelete(sid) - if !ok { - return fmt.Errorf("can't close, session: %d not found", sid) - } - sess := sessObj.(*Session) - sess.Lock() - defer sess.Unlock() - - if err := sess.Sync(); err != nil { - log.Printf("can't sync session: %d, err: %s", sid, err) - } - err := sess.Close() - w.deleteSession(sid) - return err + return fmt.Sprintf("%d sessions", w.meta.Count()) } func (w *SessionWriter) synchronizer() { - tick := time.Tick(2 * time.Second) + tick := time.Tick(w.syncTimeout) for { select { case <-tick: @@ -167,7 +110,7 @@ func (w *SessionWriter) synchronizer() { }) case <-w.done: w.sessions.Range(func(sid, lockObj any) bool { - if err := w.close(sid.(uint64)); err != nil { + if err := w.Close(sid.(uint64)); err != nil { log.Printf("can't close file descriptor: %s", err) } return true diff --git a/backend/pkg/messages/iterator.go b/backend/pkg/messages/iterator.go index 8b23cb97e..7b7991b19 100644 --- a/backend/pkg/messages/iterator.go +++ b/backend/pkg/messages/iterator.go @@ -100,6 +100,7 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) { tp: msgType, size: i.size, reader: reader, + raw: batchData, skipped: &i.canSkip, broken: &i.broken, meta: i.messageInfo, diff --git a/backend/pkg/messages/primitives.go b/backend/pkg/messages/primitives.go index eb65ae7b1..1d3d2410b 100644 --- a/backend/pkg/messages/primitives.go +++ b/backend/pkg/messages/primitives.go @@ -8,13 +8,17 @@ import ( "log" ) +var ( + one = []byte{0} + three = []byte{0, 0, 0} +) + func ReadByte(reader io.Reader) (byte, error) { - p := make([]byte, 1) - _, err := io.ReadFull(reader, p) + _, err := io.ReadFull(reader, one) if err != nil { return 0, err } - return p[0], nil + return one[0], nil } func ReadData(reader io.Reader) ([]byte, error) { @@ -156,8 +160,7 @@ func WriteSize(size uint64, buf []byte, p int) { } func ReadSize(reader io.Reader) (uint64, error) { - buf := make([]byte, 3) - n, err := io.ReadFull(reader, buf) + n, err := io.ReadFull(reader, three) if err != nil { return 0, err } @@ -165,7 +168,7 @@ func ReadSize(reader io.Reader) (uint64, error) { return 0, fmt.Errorf("read only %d of 3 size bytes", n) } var size uint64 - for i, b := range buf { + for i, b := range three { size += uint64(b) << (8 * i) } return size, nil diff --git a/backend/pkg/messages/raw.go b/backend/pkg/messages/raw.go index 33419d115..dbc71f4e6 100644 --- a/backend/pkg/messages/raw.go +++ b/backend/pkg/messages/raw.go @@ -13,6 +13,7 @@ type RawMessage struct { size uint64 data []byte reader *bytes.Reader + raw []byte meta *message encoded bool skipped *bool @@ -23,15 +24,25 @@ func (m *RawMessage) Encode() []byte { if m.encoded { return m.data } - m.data = make([]byte, m.size+1) - m.data[0] = uint8(m.tp) - m.encoded = true - *m.skipped = false - _, err := io.ReadFull(m.reader, m.data[1:]) - if err != nil { - log.Printf("message encode err: %s, type: %d, sess: %d", err, m.tp, m.SessionID()) + // Try to avoid EOF error + if m.reader.Len() < int(m.size) { return nil } + // Get current batch position + currPos, err := m.reader.Seek(0, io.SeekCurrent) + if err != nil { + log.Printf("can't get current batch position: %s", err) + return nil + } + // "Move" message type + if currPos == 0 { + log.Printf("can't move message type, curr position = %d", currPos) + return nil + } + // Dirty hack to avoid extra memory allocation + m.raw[currPos-1] = uint8(m.tp) + m.data = m.raw[currPos-1 : currPos+int64(m.size)] + m.encoded = true return m.data }