[Sink] Async session writer (#826)
* feat(backend): implemented async session writer module
This commit is contained in:
parent
bd383fb256
commit
5a1cd27ebc
5 changed files with 245 additions and 179 deletions
|
|
@ -10,7 +10,7 @@ import (
|
|||
|
||||
"openreplay/backend/internal/config/sink"
|
||||
"openreplay/backend/internal/sink/assetscache"
|
||||
"openreplay/backend/internal/sink/oswriter"
|
||||
"openreplay/backend/internal/sink/sessionwriter"
|
||||
"openreplay/backend/internal/storage"
|
||||
"openreplay/backend/pkg/messages"
|
||||
"openreplay/backend/pkg/monitoring"
|
||||
|
|
@ -32,7 +32,7 @@ func main() {
|
|||
log.Fatalf("%v doesn't exist. %v", cfg.FsDir, err)
|
||||
}
|
||||
|
||||
writer := oswriter.NewWriter(cfg.FsUlimit, cfg.FsDir)
|
||||
writer := sessionwriter.NewWriter(cfg.FsUlimit, cfg.FsDir)
|
||||
|
||||
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
|
||||
defer producer.Close(cfg.ProducerCloseTimeout)
|
||||
|
|
@ -63,9 +63,7 @@ func main() {
|
|||
if err := producer.Produce(cfg.TopicTrigger, msg.SessionID(), msg.Encode()); err != nil {
|
||||
log.Printf("can't send SessionEnd to trigger topic: %s; sessID: %d", err, msg.SessionID())
|
||||
}
|
||||
if err := writer.Close(msg.SessionID()); err != nil {
|
||||
log.Printf("can't close session file: %s", err)
|
||||
}
|
||||
writer.Close(msg.SessionID())
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -139,9 +137,9 @@ func main() {
|
|||
select {
|
||||
case sig := <-sigchan:
|
||||
log.Printf("Caught signal %v: terminating\n", sig)
|
||||
if err := writer.CloseAll(); err != nil {
|
||||
log.Printf("closeAll error: %v\n", err)
|
||||
}
|
||||
// Sync and stop writer
|
||||
writer.Stop()
|
||||
// Commit and stop consumer
|
||||
if err := consumer.Commit(); err != nil {
|
||||
log.Printf("can't commit messages: %s", err)
|
||||
}
|
||||
|
|
@ -149,16 +147,10 @@ func main() {
|
|||
os.Exit(0)
|
||||
case <-tick:
|
||||
counter.Print()
|
||||
s := time.Now()
|
||||
if err := writer.SyncAll(); err != nil {
|
||||
log.Fatalf("sync error: %v\n", err)
|
||||
}
|
||||
dur := time.Now().Sub(s).Milliseconds()
|
||||
s = time.Now()
|
||||
if err := consumer.Commit(); err != nil {
|
||||
log.Printf("can't commit messages: %s", err)
|
||||
}
|
||||
log.Printf("sync: %d, commit: %d, writer: %s", dur, time.Now().Sub(s).Milliseconds(), writer.Info())
|
||||
log.Printf("writer: %s", writer.Info())
|
||||
default:
|
||||
err := consumer.ConsumeNext()
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -1,164 +0,0 @@
|
|||
package oswriter
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
type FileType int
|
||||
|
||||
const (
|
||||
DOM FileType = 1
|
||||
DEV FileType = 2
|
||||
)
|
||||
|
||||
type Writer struct {
|
||||
ulimit int
|
||||
dir string
|
||||
files map[uint64]*os.File
|
||||
devtools map[uint64]*os.File
|
||||
atimes map[uint64]int64
|
||||
}
|
||||
|
||||
func NewWriter(ulimit uint16, dir string) *Writer {
|
||||
return &Writer{
|
||||
ulimit: int(ulimit),
|
||||
dir: dir + "/",
|
||||
files: make(map[uint64]*os.File, 1024),
|
||||
devtools: make(map[uint64]*os.File, 1024),
|
||||
atimes: make(map[uint64]int64, 1024),
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Writer) open(key uint64, mode FileType) (*os.File, error) {
|
||||
if mode == DOM {
|
||||
file, ok := w.files[key]
|
||||
if ok {
|
||||
return file, nil
|
||||
}
|
||||
} else {
|
||||
file, ok := w.devtools[key]
|
||||
if ok {
|
||||
return file, nil
|
||||
}
|
||||
}
|
||||
|
||||
if len(w.atimes) >= w.ulimit {
|
||||
var m_k uint64
|
||||
var m_t int64 = math.MaxInt64
|
||||
for k, t := range w.atimes {
|
||||
if t < m_t {
|
||||
m_k = k
|
||||
m_t = t
|
||||
}
|
||||
}
|
||||
if err := w.Close(m_k); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
filePath := w.dir + strconv.FormatUint(key, 10)
|
||||
if mode == DEV {
|
||||
filePath += "devtools"
|
||||
}
|
||||
file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if mode == DOM {
|
||||
w.files[key] = file
|
||||
} else {
|
||||
w.devtools[key] = file
|
||||
}
|
||||
w.atimes[key] = time.Now().Unix()
|
||||
return file, nil
|
||||
}
|
||||
|
||||
func (w *Writer) Close(key uint64) error {
|
||||
// Close dom file
|
||||
file := w.files[key]
|
||||
if file == nil {
|
||||
return nil
|
||||
}
|
||||
if err := file.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := file.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
delete(w.files, key)
|
||||
delete(w.atimes, key)
|
||||
// Close dev file
|
||||
file = w.devtools[key]
|
||||
if file == nil {
|
||||
return nil
|
||||
}
|
||||
if err := file.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := file.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
delete(w.devtools, key)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Writer) WriteDOM(key uint64, data []byte) error {
|
||||
return w.Write(key, DOM, data)
|
||||
}
|
||||
|
||||
func (w *Writer) WriteDEV(key uint64, data []byte) error {
|
||||
return w.Write(key, DEV, data)
|
||||
}
|
||||
|
||||
func (w *Writer) Write(key uint64, mode FileType, data []byte) error {
|
||||
file, err := w.open(key, mode)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = file.Write(data)
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *Writer) SyncAll() error {
|
||||
for _, file := range w.files {
|
||||
if err := file.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, file := range w.devtools {
|
||||
if err := file.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Writer) CloseAll() error {
|
||||
for _, file := range w.files {
|
||||
if err := file.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := file.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
w.files = nil
|
||||
for _, file := range w.devtools {
|
||||
if err := file.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := file.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
w.devtools = nil
|
||||
w.atimes = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Writer) Info() string {
|
||||
return fmt.Sprintf("dom: %d, dev: %d", len(w.files), len(w.devtools))
|
||||
}
|
||||
73
backend/internal/sink/sessionwriter/session.go
Normal file
73
backend/internal/sink/sessionwriter/session.go
Normal file
|
|
@ -0,0 +1,73 @@
|
|||
package sessionwriter
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Session struct {
|
||||
lock *sync.Mutex
|
||||
dom *os.File
|
||||
dev *os.File
|
||||
}
|
||||
|
||||
func NewSession(dir string, id uint64) (*Session, error) {
|
||||
if id == 0 {
|
||||
return nil, fmt.Errorf("wrong session id")
|
||||
}
|
||||
|
||||
filePath := dir + strconv.FormatUint(id, 10)
|
||||
domFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
filePath += "devtools"
|
||||
devFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
|
||||
if err != nil {
|
||||
domFile.Close() // should close first file descriptor
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Session{
|
||||
lock: &sync.Mutex{},
|
||||
dom: domFile,
|
||||
dev: devFile,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Session) Lock() {
|
||||
s.lock.Lock()
|
||||
}
|
||||
|
||||
func (s *Session) Unlock() {
|
||||
s.lock.Unlock()
|
||||
}
|
||||
|
||||
func (s *Session) Write(mode FileType, data []byte) (err error) {
|
||||
if mode == DOM {
|
||||
_, err = s.dom.Write(data)
|
||||
} else {
|
||||
_, err = s.dev.Write(data)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Session) Sync() error {
|
||||
domErr := s.dom.Sync()
|
||||
devErr := s.dev.Sync()
|
||||
if domErr == nil && devErr == nil {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("dom: %s, dev: %s", domErr, devErr)
|
||||
}
|
||||
|
||||
func (s *Session) Close() error {
|
||||
domErr := s.dom.Close()
|
||||
devErr := s.dev.Close()
|
||||
if domErr == nil && devErr == nil {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("dom: %s, dev: %s", domErr, devErr)
|
||||
}
|
||||
8
backend/internal/sink/sessionwriter/types.go
Normal file
8
backend/internal/sink/sessionwriter/types.go
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
package sessionwriter
|
||||
|
||||
type FileType int
|
||||
|
||||
const (
|
||||
DOM FileType = 1
|
||||
DEV FileType = 2
|
||||
)
|
||||
157
backend/internal/sink/sessionwriter/writer.go
Normal file
157
backend/internal/sink/sessionwriter/writer.go
Normal file
|
|
@ -0,0 +1,157 @@
|
|||
package sessionwriter
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type SessionWriter struct {
|
||||
ulimit int
|
||||
dir string
|
||||
lock *sync.Mutex
|
||||
sessions *sync.Map
|
||||
meta map[uint64]int64
|
||||
count int
|
||||
done chan struct{}
|
||||
stopped chan struct{}
|
||||
}
|
||||
|
||||
func NewWriter(ulimit uint16, dir string) *SessionWriter {
|
||||
w := &SessionWriter{
|
||||
ulimit: int(ulimit),
|
||||
dir: dir + "/",
|
||||
lock: &sync.Mutex{},
|
||||
sessions: &sync.Map{},
|
||||
meta: make(map[uint64]int64, ulimit),
|
||||
done: make(chan struct{}),
|
||||
stopped: make(chan struct{}),
|
||||
}
|
||||
go w.synchronizer()
|
||||
return w
|
||||
}
|
||||
|
||||
func (w *SessionWriter) WriteDOM(sid uint64, data []byte) error {
|
||||
return w.write(sid, DOM, data)
|
||||
}
|
||||
|
||||
func (w *SessionWriter) WriteDEV(sid uint64, data []byte) error {
|
||||
return w.write(sid, DEV, data)
|
||||
}
|
||||
|
||||
func (w *SessionWriter) Close(sid uint64) {
|
||||
w.close(sid)
|
||||
}
|
||||
|
||||
func (w *SessionWriter) Stop() {
|
||||
w.done <- struct{}{}
|
||||
<-w.stopped
|
||||
}
|
||||
|
||||
func (w *SessionWriter) Info() string {
|
||||
w.lock.Lock()
|
||||
count := w.count
|
||||
w.lock.Unlock()
|
||||
return fmt.Sprintf("%d files", count)
|
||||
}
|
||||
|
||||
func (w *SessionWriter) write(sid uint64, mode FileType, data []byte) error {
|
||||
var (
|
||||
sess *Session
|
||||
err error
|
||||
)
|
||||
|
||||
sessObj, ok := w.sessions.Load(sid)
|
||||
if !ok {
|
||||
sess, err = NewSession(w.dir, sid)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't write to session: %d, err: %s", sid, err)
|
||||
}
|
||||
sess.Lock()
|
||||
defer sess.Unlock()
|
||||
w.sessions.Store(sid, sess)
|
||||
|
||||
// Check opened files limit
|
||||
w.meta[sid] = time.Now().Unix()
|
||||
if len(w.meta) >= w.ulimit {
|
||||
var oldSessID uint64
|
||||
var minTimestamp int64 = math.MaxInt64
|
||||
for sessID, timestamp := range w.meta {
|
||||
if timestamp < minTimestamp {
|
||||
oldSessID = sessID
|
||||
minTimestamp = timestamp
|
||||
}
|
||||
}
|
||||
delete(w.meta, oldSessID)
|
||||
if err := w.close(oldSessID); err != nil {
|
||||
log.Printf("can't close session: %s", err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
sess = sessObj.(*Session)
|
||||
sess.Lock()
|
||||
defer sess.Unlock()
|
||||
}
|
||||
|
||||
// Update info
|
||||
w.lock.Lock()
|
||||
w.count = len(w.meta)
|
||||
w.lock.Unlock()
|
||||
|
||||
// Write data to session
|
||||
return sess.Write(mode, data)
|
||||
}
|
||||
|
||||
func (w *SessionWriter) sync(sid uint64) error {
|
||||
sessObj, ok := w.sessions.Load(sid)
|
||||
if !ok {
|
||||
return fmt.Errorf("can't sync, session: %d not found", sid)
|
||||
}
|
||||
sess := sessObj.(*Session)
|
||||
sess.Lock()
|
||||
defer sess.Unlock()
|
||||
|
||||
return sess.Sync()
|
||||
}
|
||||
|
||||
func (w *SessionWriter) close(sid uint64) error {
|
||||
sessObj, ok := w.sessions.LoadAndDelete(sid)
|
||||
if !ok {
|
||||
return fmt.Errorf("can't close, session: %d not found", sid)
|
||||
}
|
||||
sess := sessObj.(*Session)
|
||||
sess.Lock()
|
||||
defer sess.Unlock()
|
||||
|
||||
if err := sess.Sync(); err != nil {
|
||||
log.Printf("can't sync session: %d, err: %s", sid, err)
|
||||
}
|
||||
err := sess.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *SessionWriter) synchronizer() {
|
||||
tick := time.Tick(2 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-tick:
|
||||
w.sessions.Range(func(sid, lockObj any) bool {
|
||||
if err := w.sync(sid.(uint64)); err != nil {
|
||||
log.Printf("can't sync file descriptor: %s", err)
|
||||
}
|
||||
return true
|
||||
})
|
||||
case <-w.done:
|
||||
w.sessions.Range(func(sid, lockObj any) bool {
|
||||
if err := w.close(sid.(uint64)); err != nil {
|
||||
log.Printf("can't close file descriptor: %s", err)
|
||||
}
|
||||
return true
|
||||
})
|
||||
w.stopped <- struct{}{}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue