From e67c3ec876209068e410cbfc35b41e8dc9e81098 Mon Sep 17 00:00:00 2001 From: Alexander Date: Tue, 22 Nov 2022 11:53:21 +0100 Subject: [PATCH] [Sink] Zombie session killer feature (#829) * feat(backend): added zombie session killer feature --- backend/cmd/sink/main.go | 2 +- backend/internal/config/sink/config.go | 3 +- .../internal/sink/sessionwriter/session.go | 20 +++-- backend/internal/sink/sessionwriter/writer.go | 76 ++++++++++++------- 4 files changed, 66 insertions(+), 35 deletions(-) diff --git a/backend/cmd/sink/main.go b/backend/cmd/sink/main.go index d6ebc0abc..84520dd33 100644 --- a/backend/cmd/sink/main.go +++ b/backend/cmd/sink/main.go @@ -32,7 +32,7 @@ func main() { log.Fatalf("%v doesn't exist. %v", cfg.FsDir, err) } - writer := sessionwriter.NewWriter(cfg.FsUlimit, cfg.FsDir) + writer := sessionwriter.NewWriter(cfg.FsUlimit, cfg.FsDir, cfg.DeadSessionTimeout) producer := queue.NewProducer(cfg.MessageSizeLimit, true) defer producer.Close(cfg.ProducerCloseTimeout) diff --git a/backend/internal/config/sink/config.go b/backend/internal/config/sink/config.go index a7481f93a..a8703a596 100644 --- a/backend/internal/config/sink/config.go +++ b/backend/internal/config/sink/config.go @@ -9,6 +9,7 @@ type Config struct { common.Config FsDir string `env:"FS_DIR,required"` FsUlimit uint16 `env:"FS_ULIMIT,required"` + DeadSessionTimeout int64 `env:"DEAD_SESSION_TIMEOUT,default=600"` GroupSink string `env:"GROUP_SINK,required"` TopicRawWeb string `env:"TOPIC_RAW_WEB,required"` TopicRawIOS string `env:"TOPIC_RAW_IOS,required"` @@ -17,7 +18,7 @@ type Config struct { CacheAssets bool `env:"CACHE_ASSETS,required"` AssetsOrigin string `env:"ASSETS_ORIGIN,required"` ProducerCloseTimeout int `env:"PRODUCER_CLOSE_TIMEOUT,default=15000"` - CacheThreshold int64 `env:"CACHE_THRESHOLD,default=75"` + CacheThreshold int64 `env:"CACHE_THRESHOLD,default=5"` CacheExpiration int64 `env:"CACHE_EXPIRATION,default=120"` } diff --git a/backend/internal/sink/sessionwriter/session.go b/backend/internal/sink/sessionwriter/session.go index 110da588e..f107c387b 100644 --- a/backend/internal/sink/sessionwriter/session.go +++ b/backend/internal/sink/sessionwriter/session.go @@ -5,12 +5,14 @@ import ( "os" "strconv" "sync" + "time" ) type Session struct { - lock *sync.Mutex - dom *os.File - dev *os.File + lock *sync.Mutex + dom *os.File + dev *os.File + lastUpdate time.Time } func NewSession(dir string, id uint64) (*Session, error) { @@ -31,9 +33,10 @@ func NewSession(dir string, id uint64) (*Session, error) { } return &Session{ - lock: &sync.Mutex{}, - dom: domFile, - dev: devFile, + lock: &sync.Mutex{}, + dom: domFile, + dev: devFile, + lastUpdate: time.Now(), }, nil } @@ -51,9 +54,14 @@ func (s *Session) Write(mode FileType, data []byte) (err error) { } else { _, err = s.dev.Write(data) } + s.lastUpdate = time.Now() return err } +func (s *Session) LastUpdate() time.Time { + return s.lastUpdate +} + func (s *Session) Sync() error { domErr := s.dom.Sync() devErr := s.dev.Sync() diff --git a/backend/internal/sink/sessionwriter/writer.go b/backend/internal/sink/sessionwriter/writer.go index 1883b4c40..94ff5dd66 100644 --- a/backend/internal/sink/sessionwriter/writer.go +++ b/backend/internal/sink/sessionwriter/writer.go @@ -9,25 +9,26 @@ import ( ) type SessionWriter struct { - ulimit int - dir string - lock *sync.Mutex - sessions *sync.Map - meta map[uint64]int64 - count int - done chan struct{} - stopped chan struct{} + ulimit int + dir string + zombieSessionTimeout float64 + lock *sync.Mutex + sessions *sync.Map + meta map[uint64]int64 + done chan struct{} + stopped chan struct{} } -func NewWriter(ulimit uint16, dir string) *SessionWriter { +func NewWriter(ulimit uint16, dir string, zombieSessionTimeout int64) *SessionWriter { w := &SessionWriter{ - ulimit: int(ulimit), - dir: dir + "/", - lock: &sync.Mutex{}, - sessions: &sync.Map{}, - meta: make(map[uint64]int64, ulimit), - done: make(chan struct{}), - stopped: make(chan struct{}), + ulimit: int(ulimit) / 2, // should divide by 2 because each session has 2 files + dir: dir + "/", + zombieSessionTimeout: float64(zombieSessionTimeout), + lock: &sync.Mutex{}, + sessions: &sync.Map{}, + meta: make(map[uint64]int64, ulimit), + done: make(chan struct{}), + stopped: make(chan struct{}), } go w.synchronizer() return w @@ -51,10 +52,25 @@ func (w *SessionWriter) Stop() { } func (w *SessionWriter) Info() string { + return fmt.Sprintf("%d sessions", w.numberOfSessions()) +} + +func (w *SessionWriter) addSession(sid uint64) { w.lock.Lock() - count := w.count + w.meta[sid] = time.Now().Unix() w.lock.Unlock() - return fmt.Sprintf("%d files", count) +} + +func (w *SessionWriter) deleteSession(sid uint64) { + w.lock.Lock() + delete(w.meta, sid) + w.lock.Unlock() +} + +func (w *SessionWriter) numberOfSessions() int { + w.lock.Lock() + defer w.lock.Unlock() + return len(w.meta) } func (w *SessionWriter) write(sid uint64, mode FileType, data []byte) error { @@ -71,10 +87,8 @@ func (w *SessionWriter) write(sid uint64, mode FileType, data []byte) error { } sess.Lock() defer sess.Unlock() - w.sessions.Store(sid, sess) // Check opened files limit - w.meta[sid] = time.Now().Unix() if len(w.meta) >= w.ulimit { var oldSessID uint64 var minTimestamp int64 = math.MaxInt64 @@ -84,22 +98,20 @@ func (w *SessionWriter) write(sid uint64, mode FileType, data []byte) error { minTimestamp = timestamp } } - delete(w.meta, oldSessID) if err := w.close(oldSessID); err != nil { log.Printf("can't close session: %s", err) } } + + // Add new session to manager + w.sessions.Store(sid, sess) + w.addSession(sid) } else { sess = sessObj.(*Session) sess.Lock() defer sess.Unlock() } - // Update info - w.lock.Lock() - w.count = len(w.meta) - w.lock.Unlock() - // Write data to session return sess.Write(mode, data) } @@ -113,7 +125,16 @@ func (w *SessionWriter) sync(sid uint64) error { sess.Lock() defer sess.Unlock() - return sess.Sync() + err := sess.Sync() + if time.Now().Sub(sess.LastUpdate()).Seconds() > w.zombieSessionTimeout { + if err != nil { + log.Printf("can't sync session: %d, err: %s", sid, err) + } + // Close "zombie" session + err = sess.Close() + w.deleteSession(sid) + } + return err } func (w *SessionWriter) close(sid uint64) error { @@ -129,6 +150,7 @@ func (w *SessionWriter) close(sid uint64) error { log.Printf("can't sync session: %d, err: %s", sid, err) } err := sess.Close() + w.deleteSession(sid) return err }