feat(backend/sink): write big messages directly to file
This commit is contained in:
parent
9e9452a62d
commit
70d7210e9a
2 changed files with 22 additions and 3 deletions
|
|
@ -9,7 +9,7 @@ type Config struct {
|
|||
common.Config
|
||||
FsDir string `env:"FS_DIR,required"`
|
||||
FsUlimit uint16 `env:"FS_ULIMIT,required"`
|
||||
FileBuffer int `env:"FILE_BUFFER,default=32768"`
|
||||
FileBuffer int `env:"FILE_BUFFER,default=16384"`
|
||||
SyncTimeout int `env:"SYNC_TIMEOUT,default=5"`
|
||||
GroupSink string `env:"GROUP_SINK,required"`
|
||||
TopicRawWeb string `env:"TOPIC_RAW_WEB,required"`
|
||||
|
|
|
|||
|
|
@ -2,6 +2,8 @@ package sessionwriter
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
)
|
||||
|
||||
|
|
@ -24,15 +26,32 @@ func NewFile(path string, bufSize int) (*File, error) {
|
|||
}
|
||||
|
||||
func (f *File) Write(data []byte) error {
|
||||
f.updated = true
|
||||
if len(data) > f.buffer.Available()+f.buffer.Size() {
|
||||
// Flush buffer to file
|
||||
for i := 0; i < 3; i++ {
|
||||
err := f.buffer.Flush()
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
log.Printf("can't flush buffer: %s", err)
|
||||
}
|
||||
// Write big message directly to file
|
||||
return f.write(f.file, data)
|
||||
}
|
||||
return f.write(f.buffer, data)
|
||||
}
|
||||
|
||||
func (f *File) write(w io.Writer, data []byte) error {
|
||||
leftToWrite := len(data)
|
||||
for leftToWrite > 0 {
|
||||
writtenDown, err := f.buffer.Write(data)
|
||||
from := len(data) - leftToWrite
|
||||
writtenDown, err := w.Write(data[from:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
leftToWrite -= writtenDown
|
||||
}
|
||||
f.updated = true
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue