Skip to content

Commit

Permalink
Switch O_SYNC to periodic fsync
Browse files Browse the repository at this point in the history
O_SYNC was added with writing TSM files to fix an issue where the
final fsync at the end cause the process to stall.  This ends up
increase disk util to much so this change switches to use multiple
fsyncs while writing the TSM file instead of O_SYNC or one large
one at the end.
  • Loading branch information
jwilder committed Dec 7, 2017
1 parent 6b1cf18 commit 4fd5ca3
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 4 deletions.
2 changes: 1 addition & 1 deletion tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([
}

func (c *Compactor) write(path string, iter KeyIterator) (err error) {
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
if err != nil {
return errCompactionInProgress{err: err}
}
Expand Down
37 changes: 34 additions & 3 deletions tsdb/engine/tsm1/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ const (

// max length of a key in an index entry (measurement + tags)
maxKeyLength = (1 << (2 * 8)) - 1

// The threshold amount data written before we periodically fsync a TSM file. This helps avoid
// long pauses due to very large fsyncs at the end of writing a TSM file.
fsyncEvery = 512 * 1024 * 1024
)

var (
Expand Down Expand Up @@ -233,7 +237,7 @@ func (e *IndexEntry) String() string {

// NewIndexWriter returns a new IndexWriter.
func NewIndexWriter() IndexWriter {
buf := bytes.NewBuffer(make([]byte, 0, 4096))
buf := bytes.NewBuffer(make([]byte, 0, 1024*1024))
return &directIndex{buf: buf, w: bufio.NewWriter(buf)}
}

Expand All @@ -253,6 +257,9 @@ type indexBlock struct {
type directIndex struct {
keyCount int
size uint32

// The bytes written count of when we last fsync'd
lastSync uint32
fd *os.File
buf *bytes.Buffer

Expand Down Expand Up @@ -377,7 +384,7 @@ func (d *directIndex) WriteTo(w io.Writer) (int64, error) {
return 0, err
}

return io.Copy(w, bufio.NewReader(d.fd))
return io.Copy(w, bufio.NewReaderSize(d.fd, 1024*1024))
}

func (d *directIndex) flush(w io.Writer) (int64, error) {
Expand Down Expand Up @@ -435,6 +442,15 @@ func (d *directIndex) flush(w io.Writer) (int64, error) {
d.indexEntries.Type = 0
d.indexEntries.entries = d.indexEntries.entries[:0]

// If this is a disk based index and we've written more than the fsync threshold,
// fsync the data to avoid long pauses later on.
if d.fd != nil && d.size-d.lastSync > fsyncEvery {
if err := d.fd.Sync(); err != nil {
return N, err
}
d.lastSync = d.size
}

return N, nil

}
Expand Down Expand Up @@ -486,13 +502,16 @@ type tsmWriter struct {
w *bufio.Writer
index IndexWriter
n int64

// The bytes written count of when we last fsync'd
lastSync int64
}

// NewTSMWriter returns a new TSMWriter writing to w.
func NewTSMWriter(w io.Writer) (TSMWriter, error) {
var index IndexWriter
if fw, ok := w.(*os.File); ok && !strings.HasSuffix(fw.Name(), "01.tsm.tmp") {
f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -612,6 +631,14 @@ func (t *tsmWriter) WriteBlock(key []byte, minTime, maxTime int64, block []byte)
// Increment file position pointer (checksum + block len)
t.n += int64(n)

// fsync the file periodically to avoid long pauses with very big files.
if t.n-t.lastSync > fsyncEvery {
if err := t.sync(); err != nil {
return err
}
t.lastSync = t.n
}

if len(t.index.Entries(key)) >= maxIndexEntries {
return ErrMaxBlocksExceeded
}
Expand Down Expand Up @@ -646,6 +673,10 @@ func (t *tsmWriter) Flush() error {
return err
}

return t.sync()
}

func (t *tsmWriter) sync() error {
if f, ok := t.wrapped.(*os.File); ok {
if err := f.Sync(); err != nil {
return err
Expand Down

0 comments on commit 4fd5ca3

Please sign in to comment.