From 4fa1751a68b518368319c80410669e81da41b547 Mon Sep 17 00:00:00 2001 From: ShiKhu Date: Wed, 6 Apr 2022 16:53:49 +0200 Subject: [PATCH] fix(backend-storage): dir clean in routine; MAX_POLL_INTERVAL_MS env var --- backend/Dockerfile | 1 + backend/services/storage/clean.go | 25 +++++++++++++------------ backend/services/storage/main.go | 2 +- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/backend/Dockerfile b/backend/Dockerfile index 4c31518ef..b7a494f86 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -27,6 +27,7 @@ ENV TZ=UTC \ HTTP_PORT=80 \ BEACON_SIZE_LIMIT=7000000 \ KAFKA_USE_SSL=true \ + KAFKA_MAX_POLL_INTERVAL_MS=400000 \ REDIS_STREAMS_MAX_LEN=3000 \ TOPIC_RAW_WEB=raw \ TOPIC_RAW_IOS=raw-ios \ diff --git a/backend/services/storage/clean.go b/backend/services/storage/clean.go index 829bc8705..72f5f359c 100644 --- a/backend/services/storage/clean.go +++ b/backend/services/storage/clean.go @@ -1,23 +1,23 @@ package main import ( - "os" - "log" - "time" - "strconv" "io/ioutil" + "log" + "os" + "strconv" + "time" "openreplay/backend/pkg/flakeid" ) -const DELETE_TIMEOUT = 12 * time.Hour; +const DELETE_TIMEOUT = 48 * time.Hour func cleanDir(dirname string) { - files, err := ioutil.ReadDir(dirname) - if err != nil { - log.Printf("Cannot read file directory. %v", err) - return - } + files, err := ioutil.ReadDir(dirname) + if err != nil { + log.Printf("Cannot read file directory. %v", err) + return + } for _, f := range files { name := f.Name() @@ -27,8 +27,9 @@ func cleanDir(dirname string) { continue } ts := int64(flakeid.ExtractTimestamp(id)) - if time.Unix(ts/1000, 0).Add(DELETE_TIMEOUT).Before(time.Now()) { + if time.UnixMilli(ts).Add(DELETE_TIMEOUT).Before(time.Now()) { + // returns a error. Don't log it sinse it can be race condition between worker instances os.Remove(dirname + "/" + name) } } -} \ No newline at end of file +} diff --git a/backend/services/storage/main.go b/backend/services/storage/main.go index cd585f10e..9579fbe4f 100644 --- a/backend/services/storage/main.go +++ b/backend/services/storage/main.go @@ -69,7 +69,7 @@ func main() { consumer.Close() os.Exit(0) case <-cleanTick: - cleanDir(FS_DIR) + go cleanDir(FS_DIR) default: err := consumer.ConsumeNext() if err != nil {