feat (backend): FS_CLEAN timer

This commit is contained in:
ShiKhu 2021-07-20 23:57:37 +08:00
parent 45614e1aa5
commit 27fe0cd1e2
3 changed files with 45 additions and 4 deletions

View file

@ -42,6 +42,6 @@ func (flaker *Flaker) Compose(timestamp uint64) (uint64, error) {
return compose(timestamp, flaker.shardID, flaker.nextSeqID()), nil
}
func (flaker *Flaker) ExtractTimestamp(id uint64) uint64 {
func ExtractTimestamp(id uint64) uint64 {
return extractTimestamp(id) + EPOCH
}

View file

@ -0,0 +1,34 @@
package main
import (
"os"
"log"
"time"
"strconv"
"io/ioutil"
"openreplay/backend/pkg/flakeid"
)
const DELETE_TIMEOUT = 12 * time.Hour;
func cleanDir(dirname string) {
files, err := ioutil.ReadDir(dirname)
if err != nil {
log.Printf("Cannot read file directory. %v", err)
return
}
for f := range files {
name := f.Name()
id, err := strconv.ParseUint(name, 10, 64)
if err != nil {
log.Printf("Cannot parse session filename. %v", err)
continue
}
ts := int64(flakeid.ExtractTimestamp(id))
if time.Unix(ts/1000, 0).Add(DELETE_TIMEOUT).Before(time.Now()) {
os.Remove(dirname + "/" + name)
}
}
}

View file

@ -24,14 +24,15 @@ func main() {
storageWeb := storage.NewS3(env.String("AWS_REGION_WEB"), env.String("S3_BUCKET_WEB"))
//storageIos := storage.NewS3(env.String("AWS_REGION_IOS"), env.String("S3_BUCKET_IOS"))
FS_DIR := env.String("FS_DIR") + "/"
FS_DIR := env.String("FS_DIR")
FS_CLEAN_HRS := env.Int("FS_CLEAN_HRS")
var uploadKey func(string, int, *storage.S3)
uploadKey = func(key string, retryCount int, s *storage.S3) {
if retryCount <= 0 {
return;
}
file, err := os.Open(FS_DIR + key)
file, err := os.Open(FS_DIR + "/" + key)
defer file.Close()
if err != nil {
log.Printf("File error: %v; Will retry %v more time(s)\n", err, retryCount)
@ -63,13 +64,19 @@ func main() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
log.Println("Start consuming")
cleanTick := time.Tick(time.Duration(FS_CLEAN_HRS) * time.Hour)
log.Println("Storage: start consuming")
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
consumer.Close()
os.Exit(0)
case <-cleanTick:
cleanDir(FS_DIR)
default:
err := consumer.ConsumeNext()
if err != nil {