From d358747cafd85d00d9c1793a803451f1314dfba1 Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Fri, 3 Jun 2022 16:01:14 +0200 Subject: [PATCH] fix(backend): several fixes in backend services --- backend/cmd/sink/main.go | 20 ++++++++-------- backend/cmd/storage/main.go | 5 +--- backend/internal/storage/clean.go | 36 ----------------------------- backend/internal/storage/counter.go | 18 +++++++++------ backend/internal/storage/storage.go | 22 ++++++++++++++++-- 5 files changed, 43 insertions(+), 58 deletions(-) delete mode 100644 backend/internal/storage/clean.go diff --git a/backend/cmd/sink/main.go b/backend/cmd/sink/main.go index 14bc485ce..f97cc802b 100644 --- a/backend/cmd/sink/main.go +++ b/backend/cmd/sink/main.go @@ -4,7 +4,6 @@ import ( "encoding/binary" "log" "openreplay/backend/internal/storage" - "openreplay/backend/pkg/flakeid" "time" "os" @@ -45,20 +44,23 @@ func main() { cfg.TopicRawWeb, }, func(sessionID uint64, message Message, _ *types.Meta) { - // If message timestamp is empty, use at least ts of session start - ts := message.Meta().Timestamp - if ts == 0 { - ts = int64(flakeid.ExtractTimestamp(sessionID)) - } - // Log ts of last processed message - counter.Update(time.UnixMilli(ts)) + // Process assets + message = assetMessageHandler.ParseAssets(sessionID, message) + // Filter message typeID := message.TypeID() if !IsReplayerType(typeID) { return } - message = assetMessageHandler.ParseAssets(sessionID, message) + // If message timestamp is empty, use at least ts of session start + ts := message.Meta().Timestamp + if ts == 0 { + log.Printf("zero ts; sessID: %d, msg: %+v", sessionID, message) + } else { + // Log ts of last processed message + counter.Update(sessionID, time.UnixMilli(ts)) + } value := message.Encode() var data []byte diff --git a/backend/cmd/storage/main.go b/backend/cmd/storage/main.go index 933bd1f2b..7916cd2ad 100644 --- a/backend/cmd/storage/main.go +++ b/backend/cmd/storage/main.go @@ -40,7 +40,7 @@ func main() { case *messages.SessionEnd: srv.UploadKey(strconv.FormatUint(sessionID, 10), 5) // Log timestamp of last processed session - counter.Update(time.UnixMilli(meta.Timestamp)) + counter.Update(sessionID, time.UnixMilli(meta.Timestamp)) } }, true, @@ -51,7 +51,6 @@ func main() { sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - cleanTick := time.Tick(time.Duration(cfg.FSCleanHRS) * time.Hour) counterTick := time.Tick(time.Second * 30) for { select { @@ -59,8 +58,6 @@ func main() { log.Printf("Caught signal %v: terminating\n", sig) consumer.Close() os.Exit(0) - case <-cleanTick: - go srv.CleanDir(cfg.FSDir) case <-counterTick: go counter.Print() default: diff --git a/backend/internal/storage/clean.go b/backend/internal/storage/clean.go deleted file mode 100644 index 7635b0489..000000000 --- a/backend/internal/storage/clean.go +++ /dev/null @@ -1,36 +0,0 @@ -package storage - -import ( - "io/ioutil" - "log" - "os" - "strconv" - "time" - - "openreplay/backend/pkg/flakeid" -) - -func (s *Storage) CleanDir(dirname string) { - files, err := ioutil.ReadDir(dirname) - if err != nil { - log.Printf("Cannot read file directory. %v", err) - return - } - removedFiles := 0 - for _, f := range files { - name := f.Name() - id, err := strconv.ParseUint(name, 10, 64) - if err != nil { - log.Printf("Cannot parse session filename. %v", err) - continue - } - ts := int64(flakeid.ExtractTimestamp(id)) - if time.UnixMilli(ts).Add(s.cfg.DeleteTimeout).Before(time.Now()) { - // returns an error. Don't log it since it can be race condition between worker instances - if err := os.Remove(dirname + "/" + name); err == nil { - removedFiles++ - } - } - } - log.Printf("Removed %d of %d session files", removedFiles, len(files)) -} diff --git a/backend/internal/storage/counter.go b/backend/internal/storage/counter.go index b1f0e2647..91204caf1 100644 --- a/backend/internal/storage/counter.go +++ b/backend/internal/storage/counter.go @@ -7,10 +7,11 @@ import ( ) type logCounter struct { - mu sync.Mutex - counter int - timestamp time.Time - lastTS time.Time + mu sync.Mutex + counter int + timestamp time.Time + lastTS time.Time + lastSessID uint64 } func NewLogCounter() *logCounter { @@ -26,19 +27,22 @@ func (c *logCounter) init() { c.mu.Unlock() } -func (c *logCounter) Update(ts time.Time) { +func (c *logCounter) Update(sessID uint64, ts time.Time) { c.mu.Lock() c.counter++ c.lastTS = ts + c.lastSessID = sessID c.mu.Unlock() } func (c *logCounter) Print() { c.mu.Lock() - log.Printf("counter: %d, duration: %s, lastSessionTS: %s", + log.Printf("count: %d, dur: %ds, msgTS: %s, sessID: %d, part: %d", c.counter, - time.Now().Sub(c.timestamp).String(), + int(time.Now().Sub(c.timestamp).Seconds()), c.lastTS.String(), + c.lastSessID, + c.lastSessID%16, ) c.mu.Unlock() c.init() diff --git a/backend/internal/storage/storage.go b/backend/internal/storage/storage.go index c4791ac7e..b83ff1577 100644 --- a/backend/internal/storage/storage.go +++ b/backend/internal/storage/storage.go @@ -5,8 +5,10 @@ import ( "fmt" "log" config "openreplay/backend/internal/config/storage" + "openreplay/backend/pkg/flakeid" "openreplay/backend/pkg/storage" "os" + "strconv" "time" ) @@ -37,7 +39,14 @@ func (s *Storage) UploadKey(key string, retryCount int) { file, err := os.Open(s.cfg.FSDir + "/" + key) if err != nil { - log.Printf("File error: %v; Will retry %v more time(s); sessID: %s\n", err, retryCount, key) + sessID, _ := strconv.ParseUint(key, 10, 64) + log.Printf("File error: %v; Will retry %v more time(s); sessID: %s, part: %d, sessStart: %s\n", + err, + retryCount, + key, + sessID%16, + time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))), + ) time.AfterFunc(s.cfg.RetryTimeout, func() { s.UploadKey(key, retryCount-1) }) @@ -47,7 +56,16 @@ func (s *Storage) UploadKey(key string, retryCount int) { nRead, err := file.Read(s.startBytes) if err != nil { - log.Printf("File read error: %s; sessID: %s", err, key) + sessID, _ := strconv.ParseUint(key, 10, 64) + log.Printf("File read error: %s; sessID: %s, part: %d, sessStart: %s", + err, + key, + sessID%16, + time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))), + ) + time.AfterFunc(s.cfg.RetryTimeout, func() { + s.UploadKey(key, retryCount-1) + }) return } startReader := bytes.NewBuffer(s.startBytes[:nRead])