Skip to content

Commit

Permalink
fix: avoid rewriting fields.idx unnecessarily (#21692)
Browse files Browse the repository at this point in the history
* fix: avoid rewriting fields.idx unnecessarily (#21592) [Port to 1.9] (#21609)

Under heavy write load creating new fields and measurements
the rewrite of the fields.idx file is a bottleneck. This
enhancement combines multiple writes into a single one and
shares any error return value with all of the combined
invocations. MeasurementFieldSet and the new
MeasurementFieldSetWriter must both now be explicitly
closed.

Closes #21577

(cherry picked from commit f64be28)

Closes #21597

* chore: minor refactor suggested by go lint (#21614) (#21616)

(cherry picked from commit 7d10228)
(cherry picked from commit f820287)

Co-authored-by: davidby-influx <72418212+davidby-influx@users.noreply.github.com>
  • Loading branch information
lesam and davidby-influx authored Jun 15, 2021
1 parent 260876c commit 1661b6f
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 94 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
[unreleased]

### Bugfixes

- [#21609](https://github.com/influxdata/influxdb/pull/21609): fix: avoid rewriting fields.idx unnecessarily

v1.9.2 [unreleased]

### Bugfixes

- [#21634](https://github.com/influxdata/influxdb/pull/21634): fix: group by returns multiple results per group in some circumstances

v1.9.1 [unreleased]
Expand Down
1 change: 1 addition & 0 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,7 @@ func (e *Engine) Close() error {
e.mu.Lock()
defer e.mu.Unlock()
e.done = nil // Ensures that the channel will not be closed again.
e.fieldset.Close()

if err := e.FileStore.Close(); err != nil {
return err
Expand Down
200 changes: 136 additions & 64 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"regexp"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -1639,26 +1638,29 @@ type MeasurementFieldSet struct {
mu sync.RWMutex
fields map[string]*MeasurementFields
// path is the location to persist field sets
path string
// ephemeral counters for updating the file on disk
memoryVersion uint64
writtenVersion uint64
path string
writer *MeasurementFieldSetWriter
}

// NewMeasurementFieldSet returns a new instance of MeasurementFieldSet.
func NewMeasurementFieldSet(path string) (*MeasurementFieldSet, error) {
const MaxCombinedWrites = 100
fs := &MeasurementFieldSet{
fields: make(map[string]*MeasurementFields),
path: path,
memoryVersion: 0,
writtenVersion: 0,
fields: make(map[string]*MeasurementFields),
path: path,
}

fs.SetMeasurementFieldSetWriter(MaxCombinedWrites)
// If there is a load error, return the error and an empty set so
// it can be rebuild manually.
return fs, fs.load()
}

func (fs *MeasurementFieldSet) Close() {
if fs != nil && fs.writer != nil {
fs.writer.Close()
}
}

// Bytes estimates the memory footprint of this MeasurementFieldSet, in bytes.
func (fs *MeasurementFieldSet) Bytes() int {
var b int
Expand Down Expand Up @@ -1736,83 +1738,153 @@ func (fs *MeasurementFieldSet) IsEmpty() bool {
return len(fs.fields) == 0
}

func (fs *MeasurementFieldSet) Save() (err error) {
// current version
var v uint64
// Is the MeasurementFieldSet empty?
isEmpty := false
// marshaled MeasurementFieldSet
type errorChannel chan<- error

b, err := func() ([]byte, error) {
fs.mu.Lock()
defer fs.mu.Unlock()
fs.memoryVersion += 1
v = fs.memoryVersion
// If no fields left, remove the fields index file
if len(fs.fields) == 0 {
isEmpty = true
if err := os.RemoveAll(fs.path); err != nil {
return nil, err
} else {
fs.writtenVersion = fs.memoryVersion
return nil, nil
}
}
return fs.marshalMeasurementFieldSetNoLock()
}()
type writeRequest struct {
done errorChannel
}

if err != nil {
return err
} else if isEmpty {
return nil
}
type MeasurementFieldSetWriter struct {
wg sync.WaitGroup
writeRequests chan writeRequest
}

// Write the new index to a temp file and rename when it's sync'd
// if it is still the most recent memoryVersion of the MeasurementFields
path := fs.path + "." + strconv.FormatUint(v, 10) + ".tmp"
// SetMeasurementFieldSetWriter - initialize the queue for write requests
// and start the background write process
func (fs *MeasurementFieldSet) SetMeasurementFieldSetWriter(queueLength int) {
fs.mu.Lock()
defer fs.mu.Unlock()
fs.writer = &MeasurementFieldSetWriter{writeRequests: make(chan writeRequest, queueLength)}
fs.writer.wg.Add(1)
go fs.saveWriter()
}

fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
if err != nil {
return err
func (w *MeasurementFieldSetWriter) Close() {
if w != nil {
close(w.writeRequests)
}
defer os.RemoveAll(path)
w.wg.Wait()
}

if _, err := fd.Write(fieldsIndexMagicNumber); err != nil {
return err
}
func (fs *MeasurementFieldSet) Save() error {
return fs.writer.RequestSave()
}

if _, err := fd.Write(b); err != nil {
return err
func (w *MeasurementFieldSetWriter) RequestSave() error {
done := make(chan error)
wr := writeRequest{done: done}
w.writeRequests <- wr
return <-done
}

func (fs *MeasurementFieldSet) saveWriter() {
defer fs.writer.wg.Done()
// Block until someone modifies the MeasurementFieldSet and
// it needs to be written to disk.
for req, ok := <-fs.writer.writeRequests; ok; req, ok = <-fs.writer.writeRequests {
fs.writeToFile(req)
}
}

if err = fd.Sync(); err != nil {
return err
// writeToFile: Write the new index to a temp file and rename when it's sync'd
func (fs *MeasurementFieldSet) writeToFile(first writeRequest) {
var err error
// Put the errorChannel on which we blocked into a slice to allow more invocations
// to share the return code from the file write
errorChannels := []errorChannel{first.done}
defer func() {
for _, c := range errorChannels {
c <- err
close(c)
}
}()
// Do some blocking IO operations before marshalling the in-memory index
// to allow other changes to it to be queued up and be captured in one
// write operation, in case we are under heavy field creation load
path := fs.path + ".tmp"

// Open the temp file
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
if err != nil {
return
}
// Ensure temp file is cleaned up
defer func() {
if e := os.RemoveAll(path); err == nil {
err = e
}
}()
isEmpty, err := func() (isEmpty bool, err error) {
// ensure temp file closed before rename (for Windows)
defer func() {
if e := fd.Close(); err == nil {
err = e
}
}()
if _, err = fd.Write(fieldsIndexMagicNumber); err != nil {
return true, err
}

//close file handle before renaming to support Windows
if err = fd.Close(); err != nil {
return err
// Read all the pending new field and measurement write requests
// that will be captured in the marshaling of the in-memory copy
for {
select {
case ec := <-fs.writer.writeRequests:
errorChannels = append(errorChannels, ec.done)
continue
default:
}
break
}
// Lock, copy, and marshal the in-memory index
b, err := fs.marshalMeasurementFieldSet()
if err != nil {
return true, err
}
if b == nil {
// No fields, file removed, all done
return true, nil
}
if _, err := fd.Write(b); err != nil {
return true, err
}
return false, fd.Sync()
}()
if err != nil || isEmpty {
return
}
err = fs.renameFile(path)
}

// marshalMeasurementFieldSet: remove the fields.idx file if no fields
// otherwise, copy the in-memory version into a protobuf to write to
// disk
func (fs *MeasurementFieldSet) marshalMeasurementFieldSet() ([]byte, error) {
fs.mu.Lock()
defer fs.mu.Unlock()

// Check if a later modification and save of fields has superseded ours
// If so, we are successfully done! We were beaten by a later call
// to this function
if fs.writtenVersion > v {
return nil
if len(fs.fields) == 0 {
// If no fields left, remove the fields index file
if err := os.RemoveAll(fs.path); err != nil {
return nil, err
} else {
return nil, nil
}
}
return fs.marshalMeasurementFieldSetNoLock()
}

func (fs *MeasurementFieldSet) renameFile(path string) error {
fs.mu.Lock()
defer fs.mu.Unlock()

if err := file.RenameFile(path, fs.path); err != nil {
return err
}

if err = file.SyncDir(filepath.Dir(fs.path)); err != nil {
if err := file.SyncDir(filepath.Dir(fs.path)); err != nil {
return err
}
// Update the written version to the current version
fs.writtenVersion = v

return nil
}

Expand Down
Loading

0 comments on commit 1661b6f

Please sign in to comment.