fix(backend-storage): dir clean in routine; MAX_POLL_INTERVAL_MS env var
This commit is contained in:
parent
c30ceb447d
commit
4fa1751a68
3 changed files with 15 additions and 13 deletions
|
|
@ -27,6 +27,7 @@ ENV TZ=UTC \
|
|||
HTTP_PORT=80 \
|
||||
BEACON_SIZE_LIMIT=7000000 \
|
||||
KAFKA_USE_SSL=true \
|
||||
KAFKA_MAX_POLL_INTERVAL_MS=400000 \
|
||||
REDIS_STREAMS_MAX_LEN=3000 \
|
||||
TOPIC_RAW_WEB=raw \
|
||||
TOPIC_RAW_IOS=raw-ios \
|
||||
|
|
|
|||
|
|
@ -1,23 +1,23 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
"log"
|
||||
"time"
|
||||
"strconv"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"openreplay/backend/pkg/flakeid"
|
||||
)
|
||||
|
||||
const DELETE_TIMEOUT = 12 * time.Hour;
|
||||
const DELETE_TIMEOUT = 48 * time.Hour
|
||||
|
||||
func cleanDir(dirname string) {
|
||||
files, err := ioutil.ReadDir(dirname)
|
||||
if err != nil {
|
||||
log.Printf("Cannot read file directory. %v", err)
|
||||
return
|
||||
}
|
||||
files, err := ioutil.ReadDir(dirname)
|
||||
if err != nil {
|
||||
log.Printf("Cannot read file directory. %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, f := range files {
|
||||
name := f.Name()
|
||||
|
|
@ -27,8 +27,9 @@ func cleanDir(dirname string) {
|
|||
continue
|
||||
}
|
||||
ts := int64(flakeid.ExtractTimestamp(id))
|
||||
if time.Unix(ts/1000, 0).Add(DELETE_TIMEOUT).Before(time.Now()) {
|
||||
if time.UnixMilli(ts).Add(DELETE_TIMEOUT).Before(time.Now()) {
|
||||
// returns a error. Don't log it sinse it can be race condition between worker instances
|
||||
os.Remove(dirname + "/" + name)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -69,7 +69,7 @@ func main() {
|
|||
consumer.Close()
|
||||
os.Exit(0)
|
||||
case <-cleanTick:
|
||||
cleanDir(FS_DIR)
|
||||
go cleanDir(FS_DIR)
|
||||
default:
|
||||
err := consumer.ConsumeNext()
|
||||
if err != nil {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue