From 27fe0cd1e2ab2d9467a0acd8eb687bc00a30447e Mon Sep 17 00:00:00 2001 From: ShiKhu Date: Tue, 20 Jul 2021 23:57:37 +0800 Subject: [PATCH] feat (backend): FS_CLEAN timer --- backend/pkg/flakeid/flaker.go | 2 +- backend/services/storage/clean.go | 34 +++++++++++++++++++++++++++++++ backend/services/storage/main.go | 13 +++++++++--- 3 files changed, 45 insertions(+), 4 deletions(-) create mode 100644 backend/services/storage/clean.go diff --git a/backend/pkg/flakeid/flaker.go b/backend/pkg/flakeid/flaker.go index 5daba4feb..5c44d1042 100644 --- a/backend/pkg/flakeid/flaker.go +++ b/backend/pkg/flakeid/flaker.go @@ -42,6 +42,6 @@ func (flaker *Flaker) Compose(timestamp uint64) (uint64, error) { return compose(timestamp, flaker.shardID, flaker.nextSeqID()), nil } -func (flaker *Flaker) ExtractTimestamp(id uint64) uint64 { +func ExtractTimestamp(id uint64) uint64 { return extractTimestamp(id) + EPOCH } diff --git a/backend/services/storage/clean.go b/backend/services/storage/clean.go new file mode 100644 index 000000000..0cb956fd3 --- /dev/null +++ b/backend/services/storage/clean.go @@ -0,0 +1,34 @@ +package main + +import ( + "os" + "log" + "time" + "strconv" + "io/ioutil" + + "openreplay/backend/pkg/flakeid" +) + +const DELETE_TIMEOUT = 12 * time.Hour; + +func cleanDir(dirname string) { + files, err := ioutil.ReadDir(dirname) + if err != nil { + log.Printf("Cannot read file directory. %v", err) + return + } + + for f := range files { + name := f.Name() + id, err := strconv.ParseUint(name, 10, 64) + if err != nil { + log.Printf("Cannot parse session filename. %v", err) + continue + } + ts := int64(flakeid.ExtractTimestamp(id)) + if time.Unix(ts/1000, 0).Add(DELETE_TIMEOUT).Before(time.Now()) { + os.Remove(dirname + "/" + name) + } + } +} \ No newline at end of file diff --git a/backend/services/storage/main.go b/backend/services/storage/main.go index 2d008d868..3d3ba829f 100644 --- a/backend/services/storage/main.go +++ b/backend/services/storage/main.go @@ -24,14 +24,15 @@ func main() { storageWeb := storage.NewS3(env.String("AWS_REGION_WEB"), env.String("S3_BUCKET_WEB")) //storageIos := storage.NewS3(env.String("AWS_REGION_IOS"), env.String("S3_BUCKET_IOS")) - FS_DIR := env.String("FS_DIR") + "/" + FS_DIR := env.String("FS_DIR") + FS_CLEAN_HRS := env.Int("FS_CLEAN_HRS") var uploadKey func(string, int, *storage.S3) uploadKey = func(key string, retryCount int, s *storage.S3) { if retryCount <= 0 { return; } - file, err := os.Open(FS_DIR + key) + file, err := os.Open(FS_DIR + "/" + key) defer file.Close() if err != nil { log.Printf("File error: %v; Will retry %v more time(s)\n", err, retryCount) @@ -63,13 +64,19 @@ func main() { sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - log.Println("Start consuming") + + cleanTick := time.Tick(time.Duration(FS_CLEAN_HRS) * time.Hour) + + + log.Println("Storage: start consuming") for { select { case sig := <-sigchan: log.Printf("Caught signal %v: terminating\n", sig) consumer.Close() os.Exit(0) + case <-cleanTick: + cleanDir(FS_DIR) default: err := consumer.ConsumeNext() if err != nil {