diff --git a/backend/cmd/sink/main.go b/backend/cmd/sink/main.go index 675d965c9..d6ebc0abc 100644 --- a/backend/cmd/sink/main.go +++ b/backend/cmd/sink/main.go @@ -10,7 +10,7 @@ import ( "openreplay/backend/internal/config/sink" "openreplay/backend/internal/sink/assetscache" - "openreplay/backend/internal/sink/oswriter" + "openreplay/backend/internal/sink/sessionwriter" "openreplay/backend/internal/storage" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/monitoring" @@ -32,7 +32,7 @@ func main() { log.Fatalf("%v doesn't exist. %v", cfg.FsDir, err) } - writer := oswriter.NewWriter(cfg.FsUlimit, cfg.FsDir) + writer := sessionwriter.NewWriter(cfg.FsUlimit, cfg.FsDir) producer := queue.NewProducer(cfg.MessageSizeLimit, true) defer producer.Close(cfg.ProducerCloseTimeout) @@ -63,9 +63,7 @@ func main() { if err := producer.Produce(cfg.TopicTrigger, msg.SessionID(), msg.Encode()); err != nil { log.Printf("can't send SessionEnd to trigger topic: %s; sessID: %d", err, msg.SessionID()) } - if err := writer.Close(msg.SessionID()); err != nil { - log.Printf("can't close session file: %s", err) - } + writer.Close(msg.SessionID()) return } @@ -139,9 +137,9 @@ func main() { select { case sig := <-sigchan: log.Printf("Caught signal %v: terminating\n", sig) - if err := writer.CloseAll(); err != nil { - log.Printf("closeAll error: %v\n", err) - } + // Sync and stop writer + writer.Stop() + // Commit and stop consumer if err := consumer.Commit(); err != nil { log.Printf("can't commit messages: %s", err) } @@ -149,16 +147,10 @@ func main() { os.Exit(0) case <-tick: counter.Print() - s := time.Now() - if err := writer.SyncAll(); err != nil { - log.Fatalf("sync error: %v\n", err) - } - dur := time.Now().Sub(s).Milliseconds() - s = time.Now() if err := consumer.Commit(); err != nil { log.Printf("can't commit messages: %s", err) } - log.Printf("sync: %d, commit: %d, writer: %s", dur, time.Now().Sub(s).Milliseconds(), writer.Info()) + log.Printf("writer: %s", writer.Info()) default: err := consumer.ConsumeNext() if err != nil { diff --git a/backend/internal/sink/oswriter/oswriter.go b/backend/internal/sink/oswriter/oswriter.go deleted file mode 100644 index ec42d7668..000000000 --- a/backend/internal/sink/oswriter/oswriter.go +++ /dev/null @@ -1,164 +0,0 @@ -package oswriter - -import ( - "fmt" - "math" - "os" - "strconv" - "time" -) - -type FileType int - -const ( - DOM FileType = 1 - DEV FileType = 2 -) - -type Writer struct { - ulimit int - dir string - files map[uint64]*os.File - devtools map[uint64]*os.File - atimes map[uint64]int64 -} - -func NewWriter(ulimit uint16, dir string) *Writer { - return &Writer{ - ulimit: int(ulimit), - dir: dir + "/", - files: make(map[uint64]*os.File, 1024), - devtools: make(map[uint64]*os.File, 1024), - atimes: make(map[uint64]int64, 1024), - } -} - -func (w *Writer) open(key uint64, mode FileType) (*os.File, error) { - if mode == DOM { - file, ok := w.files[key] - if ok { - return file, nil - } - } else { - file, ok := w.devtools[key] - if ok { - return file, nil - } - } - - if len(w.atimes) >= w.ulimit { - var m_k uint64 - var m_t int64 = math.MaxInt64 - for k, t := range w.atimes { - if t < m_t { - m_k = k - m_t = t - } - } - if err := w.Close(m_k); err != nil { - return nil, err - } - } - filePath := w.dir + strconv.FormatUint(key, 10) - if mode == DEV { - filePath += "devtools" - } - file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) - if err != nil { - return nil, err - } - if mode == DOM { - w.files[key] = file - } else { - w.devtools[key] = file - } - w.atimes[key] = time.Now().Unix() - return file, nil -} - -func (w *Writer) Close(key uint64) error { - // Close dom file - file := w.files[key] - if file == nil { - return nil - } - if err := file.Sync(); err != nil { - return err - } - if err := file.Close(); err != nil { - return err - } - delete(w.files, key) - delete(w.atimes, key) - // Close dev file - file = w.devtools[key] - if file == nil { - return nil - } - if err := file.Sync(); err != nil { - return err - } - if err := file.Close(); err != nil { - return err - } - delete(w.devtools, key) - return nil -} - -func (w *Writer) WriteDOM(key uint64, data []byte) error { - return w.Write(key, DOM, data) -} - -func (w *Writer) WriteDEV(key uint64, data []byte) error { - return w.Write(key, DEV, data) -} - -func (w *Writer) Write(key uint64, mode FileType, data []byte) error { - file, err := w.open(key, mode) - if err != nil { - return err - } - _, err = file.Write(data) - return err -} - -func (w *Writer) SyncAll() error { - for _, file := range w.files { - if err := file.Sync(); err != nil { - return err - } - } - for _, file := range w.devtools { - if err := file.Sync(); err != nil { - return err - } - } - return nil -} - -func (w *Writer) CloseAll() error { - for _, file := range w.files { - if err := file.Sync(); err != nil { - return err - } - if err := file.Close(); err != nil { - return err - } - } - w.files = nil - for _, file := range w.devtools { - if err := file.Sync(); err != nil { - return err - } - if err := file.Close(); err != nil { - return err - } - } - w.devtools = nil - w.atimes = nil - return nil -} - -func (w *Writer) Info() string { - return fmt.Sprintf("dom: %d, dev: %d", len(w.files), len(w.devtools)) -} diff --git a/backend/internal/sink/sessionwriter/session.go b/backend/internal/sink/sessionwriter/session.go new file mode 100644 index 000000000..110da588e --- /dev/null +++ b/backend/internal/sink/sessionwriter/session.go @@ -0,0 +1,73 @@ +package sessionwriter + +import ( + "fmt" + "os" + "strconv" + "sync" +) + +type Session struct { + lock *sync.Mutex + dom *os.File + dev *os.File +} + +func NewSession(dir string, id uint64) (*Session, error) { + if id == 0 { + return nil, fmt.Errorf("wrong session id") + } + + filePath := dir + strconv.FormatUint(id, 10) + domFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + return nil, err + } + filePath += "devtools" + devFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + domFile.Close() // should close first file descriptor + return nil, err + } + + return &Session{ + lock: &sync.Mutex{}, + dom: domFile, + dev: devFile, + }, nil +} + +func (s *Session) Lock() { + s.lock.Lock() +} + +func (s *Session) Unlock() { + s.lock.Unlock() +} + +func (s *Session) Write(mode FileType, data []byte) (err error) { + if mode == DOM { + _, err = s.dom.Write(data) + } else { + _, err = s.dev.Write(data) + } + return err +} + +func (s *Session) Sync() error { + domErr := s.dom.Sync() + devErr := s.dev.Sync() + if domErr == nil && devErr == nil { + return nil + } + return fmt.Errorf("dom: %s, dev: %s", domErr, devErr) +} + +func (s *Session) Close() error { + domErr := s.dom.Close() + devErr := s.dev.Close() + if domErr == nil && devErr == nil { + return nil + } + return fmt.Errorf("dom: %s, dev: %s", domErr, devErr) +} diff --git a/backend/internal/sink/sessionwriter/types.go b/backend/internal/sink/sessionwriter/types.go new file mode 100644 index 000000000..a20f61375 --- /dev/null +++ b/backend/internal/sink/sessionwriter/types.go @@ -0,0 +1,8 @@ +package sessionwriter + +type FileType int + +const ( + DOM FileType = 1 + DEV FileType = 2 +) diff --git a/backend/internal/sink/sessionwriter/writer.go b/backend/internal/sink/sessionwriter/writer.go new file mode 100644 index 000000000..1883b4c40 --- /dev/null +++ b/backend/internal/sink/sessionwriter/writer.go @@ -0,0 +1,157 @@ +package sessionwriter + +import ( + "fmt" + "log" + "math" + "sync" + "time" +) + +type SessionWriter struct { + ulimit int + dir string + lock *sync.Mutex + sessions *sync.Map + meta map[uint64]int64 + count int + done chan struct{} + stopped chan struct{} +} + +func NewWriter(ulimit uint16, dir string) *SessionWriter { + w := &SessionWriter{ + ulimit: int(ulimit), + dir: dir + "/", + lock: &sync.Mutex{}, + sessions: &sync.Map{}, + meta: make(map[uint64]int64, ulimit), + done: make(chan struct{}), + stopped: make(chan struct{}), + } + go w.synchronizer() + return w +} + +func (w *SessionWriter) WriteDOM(sid uint64, data []byte) error { + return w.write(sid, DOM, data) +} + +func (w *SessionWriter) WriteDEV(sid uint64, data []byte) error { + return w.write(sid, DEV, data) +} + +func (w *SessionWriter) Close(sid uint64) { + w.close(sid) +} + +func (w *SessionWriter) Stop() { + w.done <- struct{}{} + <-w.stopped +} + +func (w *SessionWriter) Info() string { + w.lock.Lock() + count := w.count + w.lock.Unlock() + return fmt.Sprintf("%d files", count) +} + +func (w *SessionWriter) write(sid uint64, mode FileType, data []byte) error { + var ( + sess *Session + err error + ) + + sessObj, ok := w.sessions.Load(sid) + if !ok { + sess, err = NewSession(w.dir, sid) + if err != nil { + return fmt.Errorf("can't write to session: %d, err: %s", sid, err) + } + sess.Lock() + defer sess.Unlock() + w.sessions.Store(sid, sess) + + // Check opened files limit + w.meta[sid] = time.Now().Unix() + if len(w.meta) >= w.ulimit { + var oldSessID uint64 + var minTimestamp int64 = math.MaxInt64 + for sessID, timestamp := range w.meta { + if timestamp < minTimestamp { + oldSessID = sessID + minTimestamp = timestamp + } + } + delete(w.meta, oldSessID) + if err := w.close(oldSessID); err != nil { + log.Printf("can't close session: %s", err) + } + } + } else { + sess = sessObj.(*Session) + sess.Lock() + defer sess.Unlock() + } + + // Update info + w.lock.Lock() + w.count = len(w.meta) + w.lock.Unlock() + + // Write data to session + return sess.Write(mode, data) +} + +func (w *SessionWriter) sync(sid uint64) error { + sessObj, ok := w.sessions.Load(sid) + if !ok { + return fmt.Errorf("can't sync, session: %d not found", sid) + } + sess := sessObj.(*Session) + sess.Lock() + defer sess.Unlock() + + return sess.Sync() +} + +func (w *SessionWriter) close(sid uint64) error { + sessObj, ok := w.sessions.LoadAndDelete(sid) + if !ok { + return fmt.Errorf("can't close, session: %d not found", sid) + } + sess := sessObj.(*Session) + sess.Lock() + defer sess.Unlock() + + if err := sess.Sync(); err != nil { + log.Printf("can't sync session: %d, err: %s", sid, err) + } + err := sess.Close() + return err +} + +func (w *SessionWriter) synchronizer() { + tick := time.Tick(2 * time.Second) + for { + select { + case <-tick: + w.sessions.Range(func(sid, lockObj any) bool { + if err := w.sync(sid.(uint64)); err != nil { + log.Printf("can't sync file descriptor: %s", err) + } + return true + }) + case <-w.done: + w.sessions.Range(func(sid, lockObj any) bool { + if err := w.close(sid.(uint64)); err != nil { + log.Printf("can't close file descriptor: %s", err) + } + return true + }) + w.stopped <- struct{}{} + return + } + } +}