[Sink] Zombie session killer feature (#829)

* feat(backend): added zombie session killer feature
This commit is contained in:
Alexander 2022-11-22 11:53:21 +01:00 committed by GitHub
parent 5a1cd27ebc
commit e67c3ec876
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 66 additions and 35 deletions

View file

@ -32,7 +32,7 @@ func main() {
log.Fatalf("%v doesn't exist. %v", cfg.FsDir, err)
}
writer := sessionwriter.NewWriter(cfg.FsUlimit, cfg.FsDir)
writer := sessionwriter.NewWriter(cfg.FsUlimit, cfg.FsDir, cfg.DeadSessionTimeout)
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
defer producer.Close(cfg.ProducerCloseTimeout)

View file

@ -9,6 +9,7 @@ type Config struct {
common.Config
FsDir string `env:"FS_DIR,required"`
FsUlimit uint16 `env:"FS_ULIMIT,required"`
DeadSessionTimeout int64 `env:"DEAD_SESSION_TIMEOUT,default=600"`
GroupSink string `env:"GROUP_SINK,required"`
TopicRawWeb string `env:"TOPIC_RAW_WEB,required"`
TopicRawIOS string `env:"TOPIC_RAW_IOS,required"`
@ -17,7 +18,7 @@ type Config struct {
CacheAssets bool `env:"CACHE_ASSETS,required"`
AssetsOrigin string `env:"ASSETS_ORIGIN,required"`
ProducerCloseTimeout int `env:"PRODUCER_CLOSE_TIMEOUT,default=15000"`
CacheThreshold int64 `env:"CACHE_THRESHOLD,default=75"`
CacheThreshold int64 `env:"CACHE_THRESHOLD,default=5"`
CacheExpiration int64 `env:"CACHE_EXPIRATION,default=120"`
}

View file

@ -5,12 +5,14 @@ import (
"os"
"strconv"
"sync"
"time"
)
type Session struct {
lock *sync.Mutex
dom *os.File
dev *os.File
lock *sync.Mutex
dom *os.File
dev *os.File
lastUpdate time.Time
}
func NewSession(dir string, id uint64) (*Session, error) {
@ -31,9 +33,10 @@ func NewSession(dir string, id uint64) (*Session, error) {
}
return &Session{
lock: &sync.Mutex{},
dom: domFile,
dev: devFile,
lock: &sync.Mutex{},
dom: domFile,
dev: devFile,
lastUpdate: time.Now(),
}, nil
}
@ -51,9 +54,14 @@ func (s *Session) Write(mode FileType, data []byte) (err error) {
} else {
_, err = s.dev.Write(data)
}
s.lastUpdate = time.Now()
return err
}
func (s *Session) LastUpdate() time.Time {
return s.lastUpdate
}
func (s *Session) Sync() error {
domErr := s.dom.Sync()
devErr := s.dev.Sync()

View file

@ -9,25 +9,26 @@ import (
)
type SessionWriter struct {
ulimit int
dir string
lock *sync.Mutex
sessions *sync.Map
meta map[uint64]int64
count int
done chan struct{}
stopped chan struct{}
ulimit int
dir string
zombieSessionTimeout float64
lock *sync.Mutex
sessions *sync.Map
meta map[uint64]int64
done chan struct{}
stopped chan struct{}
}
func NewWriter(ulimit uint16, dir string) *SessionWriter {
func NewWriter(ulimit uint16, dir string, zombieSessionTimeout int64) *SessionWriter {
w := &SessionWriter{
ulimit: int(ulimit),
dir: dir + "/",
lock: &sync.Mutex{},
sessions: &sync.Map{},
meta: make(map[uint64]int64, ulimit),
done: make(chan struct{}),
stopped: make(chan struct{}),
ulimit: int(ulimit) / 2, // should divide by 2 because each session has 2 files
dir: dir + "/",
zombieSessionTimeout: float64(zombieSessionTimeout),
lock: &sync.Mutex{},
sessions: &sync.Map{},
meta: make(map[uint64]int64, ulimit),
done: make(chan struct{}),
stopped: make(chan struct{}),
}
go w.synchronizer()
return w
@ -51,10 +52,25 @@ func (w *SessionWriter) Stop() {
}
func (w *SessionWriter) Info() string {
return fmt.Sprintf("%d sessions", w.numberOfSessions())
}
func (w *SessionWriter) addSession(sid uint64) {
w.lock.Lock()
count := w.count
w.meta[sid] = time.Now().Unix()
w.lock.Unlock()
return fmt.Sprintf("%d files", count)
}
func (w *SessionWriter) deleteSession(sid uint64) {
w.lock.Lock()
delete(w.meta, sid)
w.lock.Unlock()
}
func (w *SessionWriter) numberOfSessions() int {
w.lock.Lock()
defer w.lock.Unlock()
return len(w.meta)
}
func (w *SessionWriter) write(sid uint64, mode FileType, data []byte) error {
@ -71,10 +87,8 @@ func (w *SessionWriter) write(sid uint64, mode FileType, data []byte) error {
}
sess.Lock()
defer sess.Unlock()
w.sessions.Store(sid, sess)
// Check opened files limit
w.meta[sid] = time.Now().Unix()
if len(w.meta) >= w.ulimit {
var oldSessID uint64
var minTimestamp int64 = math.MaxInt64
@ -84,22 +98,20 @@ func (w *SessionWriter) write(sid uint64, mode FileType, data []byte) error {
minTimestamp = timestamp
}
}
delete(w.meta, oldSessID)
if err := w.close(oldSessID); err != nil {
log.Printf("can't close session: %s", err)
}
}
// Add new session to manager
w.sessions.Store(sid, sess)
w.addSession(sid)
} else {
sess = sessObj.(*Session)
sess.Lock()
defer sess.Unlock()
}
// Update info
w.lock.Lock()
w.count = len(w.meta)
w.lock.Unlock()
// Write data to session
return sess.Write(mode, data)
}
@ -113,7 +125,16 @@ func (w *SessionWriter) sync(sid uint64) error {
sess.Lock()
defer sess.Unlock()
return sess.Sync()
err := sess.Sync()
if time.Now().Sub(sess.LastUpdate()).Seconds() > w.zombieSessionTimeout {
if err != nil {
log.Printf("can't sync session: %d, err: %s", sid, err)
}
// Close "zombie" session
err = sess.Close()
w.deleteSession(sid)
}
return err
}
func (w *SessionWriter) close(sid uint64) error {
@ -129,6 +150,7 @@ func (w *SessionWriter) close(sid uint64) error {
log.Printf("can't sync session: %d, err: %s", sid, err)
}
err := sess.Close()
w.deleteSession(sid)
return err
}