From 70d7210e9a8944479e1eb7a52f38ee151c9f2415 Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Mon, 28 Nov 2022 14:58:18 +0100 Subject: [PATCH] feat(backend/sink): write big messages directly to file --- backend/internal/config/sink/config.go | 2 +- backend/internal/sink/sessionwriter/file.go | 23 +++++++++++++++++++-- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/backend/internal/config/sink/config.go b/backend/internal/config/sink/config.go index 1a2df142e..53e3517a4 100644 --- a/backend/internal/config/sink/config.go +++ b/backend/internal/config/sink/config.go @@ -9,7 +9,7 @@ type Config struct { common.Config FsDir string `env:"FS_DIR,required"` FsUlimit uint16 `env:"FS_ULIMIT,required"` - FileBuffer int `env:"FILE_BUFFER,default=32768"` + FileBuffer int `env:"FILE_BUFFER,default=16384"` SyncTimeout int `env:"SYNC_TIMEOUT,default=5"` GroupSink string `env:"GROUP_SINK,required"` TopicRawWeb string `env:"TOPIC_RAW_WEB,required"` diff --git a/backend/internal/sink/sessionwriter/file.go b/backend/internal/sink/sessionwriter/file.go index 1ad076d72..37b1664a9 100644 --- a/backend/internal/sink/sessionwriter/file.go +++ b/backend/internal/sink/sessionwriter/file.go @@ -2,6 +2,8 @@ package sessionwriter import ( "bufio" + "io" + "log" "os" ) @@ -24,15 +26,32 @@ func NewFile(path string, bufSize int) (*File, error) { } func (f *File) Write(data []byte) error { + f.updated = true + if len(data) > f.buffer.Available()+f.buffer.Size() { + // Flush buffer to file + for i := 0; i < 3; i++ { + err := f.buffer.Flush() + if err == nil { + break + } + log.Printf("can't flush buffer: %s", err) + } + // Write big message directly to file + return f.write(f.file, data) + } + return f.write(f.buffer, data) +} + +func (f *File) write(w io.Writer, data []byte) error { leftToWrite := len(data) for leftToWrite > 0 { - writtenDown, err := f.buffer.Write(data) + from := len(data) - leftToWrite + writtenDown, err := w.Write(data[from:]) if err != nil { return err } leftToWrite -= writtenDown } - f.updated = true return nil }