feat(backend/storage): service refactoring

This commit is contained in:
Alexander Zavorotynskiy 2022-05-16 10:52:32 +02:00
parent 4175d98be8
commit 24f64af95a
8 changed files with 163 additions and 109 deletions

View file

@ -23,7 +23,7 @@ function build_service() {
image="$1"
echo "BUILDING $image"
case "$image" in
http | db | sink | ender | heuristics)
http | db | sink | ender | heuristics | storage)
echo build http
docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --platform linux/amd64 --build-arg SERVICE_NAME=$image -f ./cmd/Dockerfile .
[[ $PUSH_IMAGE -eq 1 ]] && {

View file

@ -0,0 +1,66 @@
package main
import (
"log"
"os"
"os/signal"
"strconv"
"syscall"
"time"
config "openreplay/backend/internal/config/storage"
"openreplay/backend/internal/storage"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
s3storage "openreplay/backend/pkg/storage"
)
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := config.New()
s3 := s3storage.NewS3(cfg.S3Region, cfg.S3Bucket)
srv, err := storage.New(cfg, s3)
if err != nil {
log.Printf("can't init storage service: %s", err)
return
}
consumer := queue.NewMessageConsumer(
cfg.GroupStorage,
[]string{
cfg.TopicTrigger,
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
switch msg.(type) {
case *messages.SessionEnd:
srv.UploadKey(strconv.FormatUint(sessionID, 10), 5)
}
},
true,
)
log.Printf("Storage service started\n")
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
cleanTick := time.Tick(time.Duration(cfg.FSCleanHRS) * time.Hour)
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
consumer.Close()
os.Exit(0)
case <-cleanTick:
go srv.CleanDir(cfg.FSDir)
default:
err := consumer.ConsumeNext()
if err != nil {
log.Fatalf("Error on consumption: %v", err)
}
}
}
}

View file

@ -0,0 +1,32 @@
package storage
import (
"openreplay/backend/pkg/env"
"time"
)
type Config struct {
S3Region string
S3Bucket string
FSDir string
FSCleanHRS int
SessionFileSplitSize int
RetryTimeout time.Duration
GroupStorage string
TopicTrigger string
DeleteTimeout time.Duration
}
func New() *Config {
return &Config{
S3Region: env.String("AWS_REGION_WEB"),
S3Bucket: env.String("S3_BUCKET_WEB"),
FSDir: env.String("FS_DIR"),
FSCleanHRS: env.Int("FS_CLEAN_HRS"),
SessionFileSplitSize: 200000, // ~200 kB
RetryTimeout: 2 * time.Minute,
GroupStorage: env.String("GROUP_STORAGE"),
TopicTrigger: env.String("TOPIC_TRIGGER"),
DeleteTimeout: 48 * time.Hour,
}
}

View file

@ -1,4 +1,4 @@
package main
package storage
import (
"io/ioutil"
@ -10,9 +10,7 @@ import (
"openreplay/backend/pkg/flakeid"
)
const DELETE_TIMEOUT = 48 * time.Hour
func cleanDir(dirname string) {
func (s *Storage) CleanDir(dirname string) {
files, err := ioutil.ReadDir(dirname)
if err != nil {
log.Printf("Cannot read file directory. %v", err)
@ -27,8 +25,8 @@ func cleanDir(dirname string) {
continue
}
ts := int64(flakeid.ExtractTimestamp(id))
if time.UnixMilli(ts).Add(DELETE_TIMEOUT).Before(time.Now()) {
// returns a error. Don't log it sinse it can be race condition between worker instances
if time.UnixMilli(ts).Add(s.cfg.DeleteTimeout).Before(time.Now()) {
// returns an error. Don't log it since it can be race condition between worker instances
os.Remove(dirname + "/" + name)
}
}

View file

@ -1,11 +1,11 @@
package main
package storage
import (
gzip "github.com/klauspost/pgzip"
"io"
)
func gzipFile(file io.Reader) io.Reader {
func (s *Storage) gzipFile(file io.Reader) io.Reader {
reader, writer := io.Pipe()
go func() {
gw, _ := gzip.NewWriterLevel(writer, gzip.BestSpeed)

View file

@ -0,0 +1,58 @@
package storage
import (
"bytes"
"fmt"
"log"
config "openreplay/backend/internal/config/storage"
"openreplay/backend/pkg/storage"
"os"
"time"
)
type Storage struct {
cfg *config.Config
s3 *storage.S3
}
func New(cfg *config.Config, s3 *storage.S3) (*Storage, error) {
switch {
case cfg == nil:
return nil, fmt.Errorf("config is empty")
case s3 == nil:
return nil, fmt.Errorf("s3 storage is empty")
}
return &Storage{s3: s3}, nil
}
func (s *Storage) UploadKey(key string, retryCount int) {
if retryCount <= 0 {
return
}
file, err := os.Open(s.cfg.FSDir + "/" + key)
if err != nil {
log.Printf("File error: %v; Will retry %v more time(s)\n", err, retryCount)
time.AfterFunc(s.cfg.RetryTimeout, func() {
s.UploadKey(key, retryCount-1)
})
return
}
defer file.Close()
startBytes := make([]byte, s.cfg.SessionFileSplitSize)
nRead, err := file.Read(startBytes)
if err != nil {
log.Printf("File read error: %f", err)
return
}
startReader := bytes.NewBuffer(startBytes)
if err := s.s3.Upload(s.gzipFile(startReader), key, "application/octet-stream", true); err != nil {
log.Fatalf("Storage: start upload failed. %v\n", err)
}
if nRead == s.cfg.SessionFileSplitSize {
if err := s.s3.Upload(s.gzipFile(file), key+"e", "application/octet-stream", true); err != nil {
log.Fatalf("Storage: end upload failed. %v\n", err)
}
}
}

View file

View file

@ -1,100 +0,0 @@
package main
import (
"log"
"os"
"strconv"
"time"
"bytes"
"os/signal"
"syscall"
"openreplay/backend/pkg/env"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/storage"
)
const RetryTimeout = 2 * time.Minute
const SESSION_FILE_SPLIT_SIZE = 200000 // ~200 kB
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
storage := storage.NewS3(env.String("AWS_REGION_WEB"), env.String("S3_BUCKET_WEB"))
FS_DIR := env.String("FS_DIR")
FS_CLEAN_HRS := env.Int("FS_CLEAN_HRS")
var uploadKey func(string, int)
uploadKey = func(key string, retryCount int) {
if retryCount <= 0 {
return
}
file, err := os.Open(FS_DIR + "/" + key)
if err != nil {
log.Printf("File error: %v; Will retry %v more time(s)\n", err, retryCount)
time.AfterFunc(RetryTimeout, func() {
uploadKey(key, retryCount-1)
})
return
}
defer file.Close()
startBytes := make([]byte, SESSION_FILE_SPLIT_SIZE)
nRead, err := file.Read(startBytes)
if err != nil {
log.Printf("File read error: %f", err)
return
}
startReader := bytes.NewBuffer(startBytes)
if err := storage.Upload(gzipFile(startReader), key, "application/octet-stream", true); err != nil {
log.Fatalf("Storage: start upload failed. %v\n", err)
}
if nRead == SESSION_FILE_SPLIT_SIZE {
if err := storage.Upload(gzipFile(file), key+"e", "application/octet-stream", true); err != nil {
log.Fatalf("Storage: end upload failed. %v\n", err)
}
}
}
consumer := queue.NewMessageConsumer(
env.String("GROUP_STORAGE"),
[]string{
env.String("TOPIC_TRIGGER"),
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
switch msg.(type) {
case *messages.SessionEnd:
uploadKey(strconv.FormatUint(sessionID, 10), 5)
}
},
true,
)
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
cleanTick := time.Tick(time.Duration(FS_CLEAN_HRS) * time.Hour)
log.Printf("Storage service started\n")
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
consumer.Close()
os.Exit(0)
case <-cleanTick:
go cleanDir(FS_DIR)
default:
err := consumer.ConsumeNext()
if err != nil {
log.Fatalf("Error on consumption: %v", err)
}
}
}
}