Memory control manager (#1067)

* feat(backend): added a mechanism to control memory consumption

* feat(backend): extra log for system allocation

* feat(backend): implemented new memory manager in db and heuristics service
This commit is contained in:
Alexander 2023-04-11 16:01:37 +02:00 committed by GitHub
parent 08c5b11e30
commit f561593b1a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 191 additions and 4 deletions

View file

@ -2,6 +2,7 @@ package main
import (
"log"
"openreplay/backend/pkg/memory"
config "openreplay/backend/internal/config/db"
"openreplay/backend/internal/db"
@ -53,8 +54,15 @@ func main() {
cfg.MessageSizeLimit,
)
// Init memory manager
memoryManager, err := memory.NewManager(cfg.MemoryLimitMB, cfg.MaxMemoryUsage)
if err != nil {
log.Printf("can't init memory manager: %s", err)
return
}
// Run service and wait for TERM signal
service := db.New(cfg, consumer, saver)
service := db.New(cfg, consumer, saver, memoryManager)
log.Printf("Db service started\n")
terminator.Wait(service)
}

View file

@ -2,6 +2,7 @@ package main
import (
"log"
"openreplay/backend/pkg/memory"
"os"
"os/signal"
"strings"
@ -51,6 +52,12 @@ func main() {
cfg.MessageSizeLimit,
)
memoryManager, err := memory.NewManager(cfg.MemoryLimitMB, cfg.MaxMemoryUsage)
if err != nil {
log.Printf("can't init memory manager: %s", err)
return
}
log.Printf("Ender service started\n")
sigchan := make(chan os.Signal, 1)
@ -121,6 +128,9 @@ func main() {
case msg := <-consumer.Rebalanced():
log.Println(msg)
default:
if !memoryManager.HasFreeMemory() {
continue
}
if err := consumer.ConsumeNext(); err != nil {
log.Fatalf("Error on consuming: %v", err)
}

View file

@ -7,6 +7,7 @@ import (
"openreplay/backend/pkg/handlers"
"openreplay/backend/pkg/handlers/custom"
"openreplay/backend/pkg/handlers/web"
"openreplay/backend/pkg/memory"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/metrics"
heuristicsMetrics "openreplay/backend/pkg/metrics/heuristics"
@ -48,8 +49,15 @@ func main() {
cfg.MessageSizeLimit,
)
// Init memory manager
memoryManager, err := memory.NewManager(cfg.MemoryLimitMB, cfg.MaxMemoryUsage)
if err != nil {
log.Printf("can't init memory manager: %s", err)
return
}
// Run service and wait for TERM signal
service := heuristics.New(cfg, producer, consumer, eventBuilder)
service := heuristics.New(cfg, producer, consumer, eventBuilder, memoryManager)
log.Printf("Heuristics service started\n")
terminator.Wait(service)
}

View file

@ -5,6 +5,8 @@ import "strings"
type Config struct {
ConfigFilePath string `env:"CONFIG_FILE_PATH"`
MessageSizeLimit int `env:"QUEUE_MESSAGE_SIZE_LIMIT,default=1048576"`
MaxMemoryUsage uint64 `env:"MAX_MEMORY_USAGE,default=80"`
MemoryLimitMB uint64 `env:"MEMORY_LIMIT_MB,default=0"` // 0 means take limit from OS (cgroup)
}
type Configer interface {

View file

@ -2,6 +2,7 @@ package db
import (
"log"
"openreplay/backend/pkg/memory"
"time"
"openreplay/backend/internal/config/db"
@ -14,13 +15,15 @@ type dbImpl struct {
cfg *db.Config
consumer types.Consumer
saver datasaver.Saver
mm memory.Manager
}
func New(cfg *db.Config, consumer types.Consumer, saver datasaver.Saver) service.Interface {
func New(cfg *db.Config, consumer types.Consumer, saver datasaver.Saver, mm memory.Manager) service.Interface {
s := &dbImpl{
cfg: cfg,
consumer: consumer,
saver: saver,
mm: mm,
}
go s.run()
return s
@ -35,6 +38,9 @@ func (d *dbImpl) run() {
case msg := <-d.consumer.Rebalanced():
log.Println(msg)
default:
if !d.mm.HasFreeMemory() {
continue
}
if err := d.consumer.ConsumeNext(); err != nil {
log.Fatalf("Error on consumption: %v", err)
}

View file

@ -3,6 +3,7 @@ package heuristics
import (
"fmt"
"log"
"openreplay/backend/pkg/memory"
"openreplay/backend/pkg/messages"
metrics "openreplay/backend/pkg/metrics/heuristics"
"time"
@ -18,14 +19,16 @@ type heuristicsImpl struct {
producer types.Producer
consumer types.Consumer
events sessions.EventBuilder
mm memory.Manager
}
func New(cfg *heuristics.Config, p types.Producer, c types.Consumer, e sessions.EventBuilder) service.Interface {
func New(cfg *heuristics.Config, p types.Producer, c types.Consumer, e sessions.EventBuilder, mm memory.Manager) service.Interface {
s := &heuristicsImpl{
cfg: cfg,
producer: p,
consumer: c,
events: e,
mm: mm,
}
go s.run()
return s
@ -47,6 +50,9 @@ func (h *heuristicsImpl) run() {
case msg := <-h.consumer.Rebalanced():
log.Println(msg)
default:
if !h.mm.HasFreeMemory() {
continue
}
if err := h.consumer.ConsumeNext(); err != nil {
log.Fatalf("Error on consuming: %v", err)
}

View file

@ -0,0 +1,44 @@
package memory
import (
"errors"
"fmt"
"log"
"os"
"strconv"
"strings"
)
const (
Limit = "hierarchical_memory_limit"
Maximum = 9223372036854771712
)
func parseMemoryLimit() (int, error) {
data, err := os.ReadFile("/sys/fs/cgroup/memory/memory.stat")
if err != nil {
return 0, err
}
lines := strings.Split(string(data), "\n")
for _, line := range lines {
if strings.Contains(line, Limit) {
lineParts := strings.Split(line, " ")
if len(lineParts) != 2 {
log.Println("can't parse memory limit")
return 0, fmt.Errorf("can't split string with memory limit, str: %s", line)
}
value, err := strconv.Atoi(lineParts[1])
if err != nil {
return 0, fmt.Errorf("can't convert memory limit to int, str: %s", lineParts[1])
}
if value == Maximum {
return 0, errors.New("memory limit is not defined")
}
// DEBUG_LOG
value /= 1024 * 1024
log.Printf("memory limit is defined: %d MiB", value)
return value, nil
}
}
return 0, errors.New("memory limit is not defined")
}

View file

@ -0,0 +1,103 @@
package memory
import (
"errors"
"log"
"runtime"
"sync"
"time"
)
type Manager interface {
HasFreeMemory() bool
}
type managerImpl struct {
mutex *sync.RWMutex
current uint64
maximum uint64
threshold uint64
allocated uint64
total uint64
}
func NewManager(maximumMemory, thresholdValue uint64) (Manager, error) {
if maximumMemory < 1 {
log.Println("maximumMemory is not defined, try to parse memory limit from system")
memLimit, err := parseMemoryLimit()
if err != nil {
log.Println("can't parse system memory limit, err: ", err)
}
if memLimit > 0 {
maximumMemory = uint64(memLimit)
}
}
if thresholdValue > 100 {
return nil, errors.New("threshold must be less than 100")
}
m := &managerImpl{
mutex: &sync.RWMutex{},
threshold: thresholdValue,
maximum: maximumMemory,
current: 0,
}
go m.worker()
return m, nil
}
func (m *managerImpl) currentUsage() uint64 {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.current
}
func (m *managerImpl) calcMemoryUsage() {
var rtm runtime.MemStats
runtime.ReadMemStats(&rtm)
allocated := rtm.Alloc / 1024 / 1024
total := rtm.Sys / 1024 / 1024
if allocated > m.maximum && m.HasFreeMemory() {
log.Println("memory consumption is greater than maximum memory, current: ", allocated, "maximum: ", m.maximum)
}
current := uint64(float64(allocated*100) / float64(m.maximum))
m.mutex.Lock()
m.current = current
m.allocated = allocated
m.total = total
m.mutex.Unlock()
}
func (m *managerImpl) printStat() {
log.Printf("current memory consumption: %d, allocated: %d, maximum: %d, current usage: %d, threshold: %d",
m.total, m.allocated, m.maximum, m.current, m.threshold)
}
func (m *managerImpl) worker() {
// If maximum memory is not defined, then we don't need to check memory usage
if m.maximum == 0 {
m.current = m.threshold
return
}
// First memory usage calculation
m.calcMemoryUsage()
m.printStat()
// Logs and update ticks
updateTick := time.Tick(time.Second)
logsTick := time.Tick(time.Minute)
// Start worker
for {
select {
case <-updateTick:
m.calcMemoryUsage()
case <-logsTick:
m.printStat()
}
}
}
func (m *managerImpl) HasFreeMemory() bool {
return m.currentUsage() <= m.threshold
}