fix(backend): several fixes in backend services

This commit is contained in:
Alexander Zavorotynskiy 2022-06-03 16:01:14 +02:00
parent d0e651bc29
commit d358747caf
5 changed files with 43 additions and 58 deletions

View file

@ -4,7 +4,6 @@ import (
"encoding/binary"
"log"
"openreplay/backend/internal/storage"
"openreplay/backend/pkg/flakeid"
"time"
"os"
@ -45,20 +44,23 @@ func main() {
cfg.TopicRawWeb,
},
func(sessionID uint64, message Message, _ *types.Meta) {
// If message timestamp is empty, use at least ts of session start
ts := message.Meta().Timestamp
if ts == 0 {
ts = int64(flakeid.ExtractTimestamp(sessionID))
}
// Log ts of last processed message
counter.Update(time.UnixMilli(ts))
// Process assets
message = assetMessageHandler.ParseAssets(sessionID, message)
// Filter message
typeID := message.TypeID()
if !IsReplayerType(typeID) {
return
}
message = assetMessageHandler.ParseAssets(sessionID, message)
// If message timestamp is empty, use at least ts of session start
ts := message.Meta().Timestamp
if ts == 0 {
log.Printf("zero ts; sessID: %d, msg: %+v", sessionID, message)
} else {
// Log ts of last processed message
counter.Update(sessionID, time.UnixMilli(ts))
}
value := message.Encode()
var data []byte

View file

@ -40,7 +40,7 @@ func main() {
case *messages.SessionEnd:
srv.UploadKey(strconv.FormatUint(sessionID, 10), 5)
// Log timestamp of last processed session
counter.Update(time.UnixMilli(meta.Timestamp))
counter.Update(sessionID, time.UnixMilli(meta.Timestamp))
}
},
true,
@ -51,7 +51,6 @@ func main() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
cleanTick := time.Tick(time.Duration(cfg.FSCleanHRS) * time.Hour)
counterTick := time.Tick(time.Second * 30)
for {
select {
@ -59,8 +58,6 @@ func main() {
log.Printf("Caught signal %v: terminating\n", sig)
consumer.Close()
os.Exit(0)
case <-cleanTick:
go srv.CleanDir(cfg.FSDir)
case <-counterTick:
go counter.Print()
default:

View file

@ -1,36 +0,0 @@
package storage
import (
"io/ioutil"
"log"
"os"
"strconv"
"time"
"openreplay/backend/pkg/flakeid"
)
func (s *Storage) CleanDir(dirname string) {
files, err := ioutil.ReadDir(dirname)
if err != nil {
log.Printf("Cannot read file directory. %v", err)
return
}
removedFiles := 0
for _, f := range files {
name := f.Name()
id, err := strconv.ParseUint(name, 10, 64)
if err != nil {
log.Printf("Cannot parse session filename. %v", err)
continue
}
ts := int64(flakeid.ExtractTimestamp(id))
if time.UnixMilli(ts).Add(s.cfg.DeleteTimeout).Before(time.Now()) {
// returns an error. Don't log it since it can be race condition between worker instances
if err := os.Remove(dirname + "/" + name); err == nil {
removedFiles++
}
}
}
log.Printf("Removed %d of %d session files", removedFiles, len(files))
}

View file

@ -7,10 +7,11 @@ import (
)
type logCounter struct {
mu sync.Mutex
counter int
timestamp time.Time
lastTS time.Time
mu sync.Mutex
counter int
timestamp time.Time
lastTS time.Time
lastSessID uint64
}
func NewLogCounter() *logCounter {
@ -26,19 +27,22 @@ func (c *logCounter) init() {
c.mu.Unlock()
}
func (c *logCounter) Update(ts time.Time) {
func (c *logCounter) Update(sessID uint64, ts time.Time) {
c.mu.Lock()
c.counter++
c.lastTS = ts
c.lastSessID = sessID
c.mu.Unlock()
}
func (c *logCounter) Print() {
c.mu.Lock()
log.Printf("counter: %d, duration: %s, lastSessionTS: %s",
log.Printf("count: %d, dur: %ds, msgTS: %s, sessID: %d, part: %d",
c.counter,
time.Now().Sub(c.timestamp).String(),
int(time.Now().Sub(c.timestamp).Seconds()),
c.lastTS.String(),
c.lastSessID,
c.lastSessID%16,
)
c.mu.Unlock()
c.init()

View file

@ -5,8 +5,10 @@ import (
"fmt"
"log"
config "openreplay/backend/internal/config/storage"
"openreplay/backend/pkg/flakeid"
"openreplay/backend/pkg/storage"
"os"
"strconv"
"time"
)
@ -37,7 +39,14 @@ func (s *Storage) UploadKey(key string, retryCount int) {
file, err := os.Open(s.cfg.FSDir + "/" + key)
if err != nil {
log.Printf("File error: %v; Will retry %v more time(s); sessID: %s\n", err, retryCount, key)
sessID, _ := strconv.ParseUint(key, 10, 64)
log.Printf("File error: %v; Will retry %v more time(s); sessID: %s, part: %d, sessStart: %s\n",
err,
retryCount,
key,
sessID%16,
time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))),
)
time.AfterFunc(s.cfg.RetryTimeout, func() {
s.UploadKey(key, retryCount-1)
})
@ -47,7 +56,16 @@ func (s *Storage) UploadKey(key string, retryCount int) {
nRead, err := file.Read(s.startBytes)
if err != nil {
log.Printf("File read error: %s; sessID: %s", err, key)
sessID, _ := strconv.ParseUint(key, 10, 64)
log.Printf("File read error: %s; sessID: %s, part: %d, sessStart: %s",
err,
key,
sessID%16,
time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))),
)
time.AfterFunc(s.cfg.RetryTimeout, func() {
s.UploadKey(key, retryCount-1)
})
return
}
startReader := bytes.NewBuffer(s.startBytes[:nRead])