diff --git a/backend/build.sh b/backend/build.sh index e67178c8c..97604dd0c 100755 --- a/backend/build.sh +++ b/backend/build.sh @@ -23,7 +23,7 @@ function build_service() { image="$1" echo "BUILDING $image" case "$image" in - http | db | sink | ender | heuristics) + http | db | sink | ender | heuristics | storage) echo build http docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --platform linux/amd64 --build-arg SERVICE_NAME=$image -f ./cmd/Dockerfile . [[ $PUSH_IMAGE -eq 1 ]] && { diff --git a/backend/cmd/storage/main.go b/backend/cmd/storage/main.go new file mode 100644 index 000000000..57fbd1776 --- /dev/null +++ b/backend/cmd/storage/main.go @@ -0,0 +1,66 @@ +package main + +import ( + "log" + "os" + "os/signal" + "strconv" + "syscall" + "time" + + config "openreplay/backend/internal/config/storage" + "openreplay/backend/internal/storage" + "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/queue" + "openreplay/backend/pkg/queue/types" + s3storage "openreplay/backend/pkg/storage" +) + +func main() { + log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) + + cfg := config.New() + + s3 := s3storage.NewS3(cfg.S3Region, cfg.S3Bucket) + srv, err := storage.New(cfg, s3) + if err != nil { + log.Printf("can't init storage service: %s", err) + return + } + + consumer := queue.NewMessageConsumer( + cfg.GroupStorage, + []string{ + cfg.TopicTrigger, + }, + func(sessionID uint64, msg messages.Message, meta *types.Meta) { + switch msg.(type) { + case *messages.SessionEnd: + srv.UploadKey(strconv.FormatUint(sessionID, 10), 5) + } + }, + true, + ) + + log.Printf("Storage service started\n") + + sigchan := make(chan os.Signal, 1) + signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) + + cleanTick := time.Tick(time.Duration(cfg.FSCleanHRS) * time.Hour) + for { + select { + case sig := <-sigchan: + log.Printf("Caught signal %v: terminating\n", sig) + consumer.Close() + os.Exit(0) + case <-cleanTick: + go srv.CleanDir(cfg.FSDir) + default: + err := consumer.ConsumeNext() + if err != nil { + log.Fatalf("Error on consumption: %v", err) + } + } + } +} diff --git a/backend/internal/config/storage/config.go b/backend/internal/config/storage/config.go new file mode 100644 index 000000000..305c293c9 --- /dev/null +++ b/backend/internal/config/storage/config.go @@ -0,0 +1,32 @@ +package storage + +import ( + "openreplay/backend/pkg/env" + "time" +) + +type Config struct { + S3Region string + S3Bucket string + FSDir string + FSCleanHRS int + SessionFileSplitSize int + RetryTimeout time.Duration + GroupStorage string + TopicTrigger string + DeleteTimeout time.Duration +} + +func New() *Config { + return &Config{ + S3Region: env.String("AWS_REGION_WEB"), + S3Bucket: env.String("S3_BUCKET_WEB"), + FSDir: env.String("FS_DIR"), + FSCleanHRS: env.Int("FS_CLEAN_HRS"), + SessionFileSplitSize: 200000, // ~200 kB + RetryTimeout: 2 * time.Minute, + GroupStorage: env.String("GROUP_STORAGE"), + TopicTrigger: env.String("TOPIC_TRIGGER"), + DeleteTimeout: 48 * time.Hour, + } +} diff --git a/backend/services/storage/clean.go b/backend/internal/storage/clean.go similarity index 66% rename from backend/services/storage/clean.go rename to backend/internal/storage/clean.go index 72f5f359c..3b6e4d6eb 100644 --- a/backend/services/storage/clean.go +++ b/backend/internal/storage/clean.go @@ -1,4 +1,4 @@ -package main +package storage import ( "io/ioutil" @@ -10,9 +10,7 @@ import ( "openreplay/backend/pkg/flakeid" ) -const DELETE_TIMEOUT = 48 * time.Hour - -func cleanDir(dirname string) { +func (s *Storage) CleanDir(dirname string) { files, err := ioutil.ReadDir(dirname) if err != nil { log.Printf("Cannot read file directory. %v", err) @@ -27,8 +25,8 @@ func cleanDir(dirname string) { continue } ts := int64(flakeid.ExtractTimestamp(id)) - if time.UnixMilli(ts).Add(DELETE_TIMEOUT).Before(time.Now()) { - // returns a error. Don't log it sinse it can be race condition between worker instances + if time.UnixMilli(ts).Add(s.cfg.DeleteTimeout).Before(time.Now()) { + // returns an error. Don't log it since it can be race condition between worker instances os.Remove(dirname + "/" + name) } } diff --git a/backend/services/storage/gzip.go b/backend/internal/storage/gzip.go similarity index 75% rename from backend/services/storage/gzip.go rename to backend/internal/storage/gzip.go index 0e662efaa..ee47e5079 100644 --- a/backend/services/storage/gzip.go +++ b/backend/internal/storage/gzip.go @@ -1,11 +1,11 @@ -package main +package storage import ( gzip "github.com/klauspost/pgzip" "io" ) -func gzipFile(file io.Reader) io.Reader { +func (s *Storage) gzipFile(file io.Reader) io.Reader { reader, writer := io.Pipe() go func() { gw, _ := gzip.NewWriterLevel(writer, gzip.BestSpeed) diff --git a/backend/internal/storage/storage.go b/backend/internal/storage/storage.go new file mode 100644 index 000000000..0051fd5ea --- /dev/null +++ b/backend/internal/storage/storage.go @@ -0,0 +1,58 @@ +package storage + +import ( + "bytes" + "fmt" + "log" + config "openreplay/backend/internal/config/storage" + "openreplay/backend/pkg/storage" + "os" + "time" +) + +type Storage struct { + cfg *config.Config + s3 *storage.S3 +} + +func New(cfg *config.Config, s3 *storage.S3) (*Storage, error) { + switch { + case cfg == nil: + return nil, fmt.Errorf("config is empty") + case s3 == nil: + return nil, fmt.Errorf("s3 storage is empty") + } + return &Storage{s3: s3}, nil +} + +func (s *Storage) UploadKey(key string, retryCount int) { + if retryCount <= 0 { + return + } + + file, err := os.Open(s.cfg.FSDir + "/" + key) + if err != nil { + log.Printf("File error: %v; Will retry %v more time(s)\n", err, retryCount) + time.AfterFunc(s.cfg.RetryTimeout, func() { + s.UploadKey(key, retryCount-1) + }) + return + } + defer file.Close() + + startBytes := make([]byte, s.cfg.SessionFileSplitSize) + nRead, err := file.Read(startBytes) + if err != nil { + log.Printf("File read error: %f", err) + return + } + startReader := bytes.NewBuffer(startBytes) + if err := s.s3.Upload(s.gzipFile(startReader), key, "application/octet-stream", true); err != nil { + log.Fatalf("Storage: start upload failed. %v\n", err) + } + if nRead == s.cfg.SessionFileSplitSize { + if err := s.s3.Upload(s.gzipFile(file), key+"e", "application/octet-stream", true); err != nil { + log.Fatalf("Storage: end upload failed. %v\n", err) + } + } +} diff --git a/backend/services/storage/build_hack b/backend/services/storage/build_hack new file mode 100644 index 000000000..e69de29bb diff --git a/backend/services/storage/main.go b/backend/services/storage/main.go deleted file mode 100644 index 95c5c6d17..000000000 --- a/backend/services/storage/main.go +++ /dev/null @@ -1,100 +0,0 @@ -package main - -import ( - "log" - "os" - "strconv" - "time" - - "bytes" - - "os/signal" - "syscall" - - "openreplay/backend/pkg/env" - "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/queue" - "openreplay/backend/pkg/queue/types" - "openreplay/backend/pkg/storage" -) - -const RetryTimeout = 2 * time.Minute - -const SESSION_FILE_SPLIT_SIZE = 200000 // ~200 kB - -func main() { - log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) - - storage := storage.NewS3(env.String("AWS_REGION_WEB"), env.String("S3_BUCKET_WEB")) - FS_DIR := env.String("FS_DIR") - FS_CLEAN_HRS := env.Int("FS_CLEAN_HRS") - - var uploadKey func(string, int) - uploadKey = func(key string, retryCount int) { - if retryCount <= 0 { - return - } - - file, err := os.Open(FS_DIR + "/" + key) - if err != nil { - log.Printf("File error: %v; Will retry %v more time(s)\n", err, retryCount) - time.AfterFunc(RetryTimeout, func() { - uploadKey(key, retryCount-1) - }) - return - } - defer file.Close() - - startBytes := make([]byte, SESSION_FILE_SPLIT_SIZE) - nRead, err := file.Read(startBytes) - if err != nil { - log.Printf("File read error: %f", err) - return - } - startReader := bytes.NewBuffer(startBytes) - if err := storage.Upload(gzipFile(startReader), key, "application/octet-stream", true); err != nil { - log.Fatalf("Storage: start upload failed. %v\n", err) - } - if nRead == SESSION_FILE_SPLIT_SIZE { - if err := storage.Upload(gzipFile(file), key+"e", "application/octet-stream", true); err != nil { - log.Fatalf("Storage: end upload failed. %v\n", err) - } - } - } - - consumer := queue.NewMessageConsumer( - env.String("GROUP_STORAGE"), - []string{ - env.String("TOPIC_TRIGGER"), - }, - func(sessionID uint64, msg messages.Message, meta *types.Meta) { - switch msg.(type) { - case *messages.SessionEnd: - uploadKey(strconv.FormatUint(sessionID, 10), 5) - } - }, - true, - ) - - sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - - cleanTick := time.Tick(time.Duration(FS_CLEAN_HRS) * time.Hour) - - log.Printf("Storage service started\n") - for { - select { - case sig := <-sigchan: - log.Printf("Caught signal %v: terminating\n", sig) - consumer.Close() - os.Exit(0) - case <-cleanTick: - go cleanDir(FS_DIR) - default: - err := consumer.ConsumeNext() - if err != nil { - log.Fatalf("Error on consumption: %v", err) - } - } - } -}