121 lines
2.7 KiB
Go
121 lines
2.7 KiB
Go
package sessionwriter
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type SessionWriter struct {
|
|
filesLimit int
|
|
workingDir string
|
|
fileBuffer int
|
|
syncTimeout time.Duration
|
|
meta *Meta
|
|
sessions *sync.Map
|
|
done chan struct{}
|
|
stopped chan struct{}
|
|
}
|
|
|
|
func NewWriter(filesLimit uint16, workingDir string, fileBuffer int, syncTimeout int) *SessionWriter {
|
|
w := &SessionWriter{
|
|
filesLimit: int(filesLimit) / 2, // should divide by 2 because each session has 2 files
|
|
workingDir: workingDir + "/",
|
|
fileBuffer: fileBuffer,
|
|
syncTimeout: time.Duration(syncTimeout) * time.Second,
|
|
meta: NewMeta(int(filesLimit)),
|
|
sessions: &sync.Map{},
|
|
done: make(chan struct{}),
|
|
stopped: make(chan struct{}),
|
|
}
|
|
go w.synchronizer()
|
|
return w
|
|
}
|
|
|
|
func (w *SessionWriter) Write(sid uint64, domBuffer, devBuffer []byte) (err error) {
|
|
var sess *Session
|
|
|
|
// Load session
|
|
sessObj, ok := w.sessions.Load(sid)
|
|
if !ok {
|
|
// Create new session
|
|
sess, err = NewSession(sid, w.workingDir, w.fileBuffer)
|
|
if err != nil {
|
|
return fmt.Errorf("can't create session: %d, err: %s", sid, err)
|
|
}
|
|
|
|
// Check opened sessions limit and close extra session if you need to
|
|
if extraSessID := w.meta.GetExtra(); extraSessID != 0 {
|
|
if err := w.Close(extraSessID); err != nil {
|
|
log.Printf("can't close session: %s", err)
|
|
}
|
|
}
|
|
|
|
// Add created session
|
|
w.sessions.Store(sid, sess)
|
|
w.meta.Add(sid)
|
|
} else {
|
|
sess = sessObj.(*Session)
|
|
}
|
|
|
|
// Write data to session
|
|
return sess.Write(domBuffer, devBuffer)
|
|
}
|
|
|
|
func (w *SessionWriter) sync(sid uint64) error {
|
|
sessObj, ok := w.sessions.Load(sid)
|
|
if !ok {
|
|
return fmt.Errorf("session: %d not found", sid)
|
|
}
|
|
sess := sessObj.(*Session)
|
|
return sess.Sync()
|
|
}
|
|
|
|
func (w *SessionWriter) Close(sid uint64) error {
|
|
sessObj, ok := w.sessions.LoadAndDelete(sid)
|
|
if !ok {
|
|
return fmt.Errorf("session: %d not found", sid)
|
|
}
|
|
sess := sessObj.(*Session)
|
|
err := sess.Close()
|
|
w.meta.Delete(sid)
|
|
return err
|
|
}
|
|
|
|
func (w *SessionWriter) Stop() {
|
|
w.done <- struct{}{}
|
|
<-w.stopped
|
|
}
|
|
|
|
func (w *SessionWriter) Info() string {
|
|
return fmt.Sprintf("%d sessions", w.meta.Count())
|
|
}
|
|
|
|
func (w *SessionWriter) Sync() {
|
|
w.sessions.Range(func(sid, lockObj any) bool {
|
|
if err := w.sync(sid.(uint64)); err != nil {
|
|
log.Printf("can't sync file descriptor: %s", err)
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
|
|
func (w *SessionWriter) synchronizer() {
|
|
tick := time.Tick(w.syncTimeout)
|
|
for {
|
|
select {
|
|
case <-tick:
|
|
w.Sync()
|
|
case <-w.done:
|
|
w.sessions.Range(func(sid, lockObj any) bool {
|
|
if err := w.Close(sid.(uint64)); err != nil {
|
|
log.Printf("can't close file descriptor: %s", err)
|
|
}
|
|
return true
|
|
})
|
|
w.stopped <- struct{}{}
|
|
return
|
|
}
|
|
}
|
|
}
|