diff --git a/CHANGELOG.md b/CHANGELOG.md index 987e3471b2f..ce6d595e040 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,13 @@ +[unreleased] + +### Bugfixes + +- [#21609](https://github.com/influxdata/influxdb/pull/21609): fix: avoid rewriting fields.idx unnecessarily + v1.9.2 [unreleased] + +### Bugfixes + - [#21634](https://github.com/influxdata/influxdb/pull/21634): fix: group by returns multiple results per group in some circumstances v1.9.1 [unreleased] diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 401beea7e75..cf7f369f857 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -778,6 +778,7 @@ func (e *Engine) Close() error { e.mu.Lock() defer e.mu.Unlock() e.done = nil // Ensures that the channel will not be closed again. + e.fieldset.Close() if err := e.FileStore.Close(); err != nil { return err diff --git a/tsdb/shard.go b/tsdb/shard.go index 16a0aa382bf..200346bad4f 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -12,7 +12,6 @@ import ( "regexp" "runtime" "sort" - "strconv" "strings" "sync" "sync/atomic" @@ -1639,26 +1638,29 @@ type MeasurementFieldSet struct { mu sync.RWMutex fields map[string]*MeasurementFields // path is the location to persist field sets - path string - // ephemeral counters for updating the file on disk - memoryVersion uint64 - writtenVersion uint64 + path string + writer *MeasurementFieldSetWriter } // NewMeasurementFieldSet returns a new instance of MeasurementFieldSet. func NewMeasurementFieldSet(path string) (*MeasurementFieldSet, error) { + const MaxCombinedWrites = 100 fs := &MeasurementFieldSet{ - fields: make(map[string]*MeasurementFields), - path: path, - memoryVersion: 0, - writtenVersion: 0, + fields: make(map[string]*MeasurementFields), + path: path, } - + fs.SetMeasurementFieldSetWriter(MaxCombinedWrites) // If there is a load error, return the error and an empty set so // it can be rebuild manually. return fs, fs.load() } +func (fs *MeasurementFieldSet) Close() { + if fs != nil && fs.writer != nil { + fs.writer.Close() + } +} + // Bytes estimates the memory footprint of this MeasurementFieldSet, in bytes. func (fs *MeasurementFieldSet) Bytes() int { var b int @@ -1736,83 +1738,153 @@ func (fs *MeasurementFieldSet) IsEmpty() bool { return len(fs.fields) == 0 } -func (fs *MeasurementFieldSet) Save() (err error) { - // current version - var v uint64 - // Is the MeasurementFieldSet empty? - isEmpty := false - // marshaled MeasurementFieldSet +type errorChannel chan<- error - b, err := func() ([]byte, error) { - fs.mu.Lock() - defer fs.mu.Unlock() - fs.memoryVersion += 1 - v = fs.memoryVersion - // If no fields left, remove the fields index file - if len(fs.fields) == 0 { - isEmpty = true - if err := os.RemoveAll(fs.path); err != nil { - return nil, err - } else { - fs.writtenVersion = fs.memoryVersion - return nil, nil - } - } - return fs.marshalMeasurementFieldSetNoLock() - }() +type writeRequest struct { + done errorChannel +} - if err != nil { - return err - } else if isEmpty { - return nil - } +type MeasurementFieldSetWriter struct { + wg sync.WaitGroup + writeRequests chan writeRequest +} - // Write the new index to a temp file and rename when it's sync'd - // if it is still the most recent memoryVersion of the MeasurementFields - path := fs.path + "." + strconv.FormatUint(v, 10) + ".tmp" +// SetMeasurementFieldSetWriter - initialize the queue for write requests +// and start the background write process +func (fs *MeasurementFieldSet) SetMeasurementFieldSetWriter(queueLength int) { + fs.mu.Lock() + defer fs.mu.Unlock() + fs.writer = &MeasurementFieldSetWriter{writeRequests: make(chan writeRequest, queueLength)} + fs.writer.wg.Add(1) + go fs.saveWriter() +} - fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666) - if err != nil { - return err +func (w *MeasurementFieldSetWriter) Close() { + if w != nil { + close(w.writeRequests) } - defer os.RemoveAll(path) + w.wg.Wait() +} - if _, err := fd.Write(fieldsIndexMagicNumber); err != nil { - return err - } +func (fs *MeasurementFieldSet) Save() error { + return fs.writer.RequestSave() +} - if _, err := fd.Write(b); err != nil { - return err +func (w *MeasurementFieldSetWriter) RequestSave() error { + done := make(chan error) + wr := writeRequest{done: done} + w.writeRequests <- wr + return <-done +} + +func (fs *MeasurementFieldSet) saveWriter() { + defer fs.writer.wg.Done() + // Block until someone modifies the MeasurementFieldSet and + // it needs to be written to disk. + for req, ok := <-fs.writer.writeRequests; ok; req, ok = <-fs.writer.writeRequests { + fs.writeToFile(req) } +} - if err = fd.Sync(); err != nil { - return err +// writeToFile: Write the new index to a temp file and rename when it's sync'd +func (fs *MeasurementFieldSet) writeToFile(first writeRequest) { + var err error + // Put the errorChannel on which we blocked into a slice to allow more invocations + // to share the return code from the file write + errorChannels := []errorChannel{first.done} + defer func() { + for _, c := range errorChannels { + c <- err + close(c) + } + }() + // Do some blocking IO operations before marshalling the in-memory index + // to allow other changes to it to be queued up and be captured in one + // write operation, in case we are under heavy field creation load + path := fs.path + ".tmp" + + // Open the temp file + fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666) + if err != nil { + return } + // Ensure temp file is cleaned up + defer func() { + if e := os.RemoveAll(path); err == nil { + err = e + } + }() + isEmpty, err := func() (isEmpty bool, err error) { + // ensure temp file closed before rename (for Windows) + defer func() { + if e := fd.Close(); err == nil { + err = e + } + }() + if _, err = fd.Write(fieldsIndexMagicNumber); err != nil { + return true, err + } - //close file handle before renaming to support Windows - if err = fd.Close(); err != nil { - return err + // Read all the pending new field and measurement write requests + // that will be captured in the marshaling of the in-memory copy + for { + select { + case ec := <-fs.writer.writeRequests: + errorChannels = append(errorChannels, ec.done) + continue + default: + } + break + } + // Lock, copy, and marshal the in-memory index + b, err := fs.marshalMeasurementFieldSet() + if err != nil { + return true, err + } + if b == nil { + // No fields, file removed, all done + return true, nil + } + if _, err := fd.Write(b); err != nil { + return true, err + } + return false, fd.Sync() + }() + if err != nil || isEmpty { + return } + err = fs.renameFile(path) +} +// marshalMeasurementFieldSet: remove the fields.idx file if no fields +// otherwise, copy the in-memory version into a protobuf to write to +// disk +func (fs *MeasurementFieldSet) marshalMeasurementFieldSet() ([]byte, error) { fs.mu.Lock() defer fs.mu.Unlock() - - // Check if a later modification and save of fields has superseded ours - // If so, we are successfully done! We were beaten by a later call - // to this function - if fs.writtenVersion > v { - return nil + if len(fs.fields) == 0 { + // If no fields left, remove the fields index file + if err := os.RemoveAll(fs.path); err != nil { + return nil, err + } else { + return nil, nil + } } + return fs.marshalMeasurementFieldSetNoLock() +} + +func (fs *MeasurementFieldSet) renameFile(path string) error { + fs.mu.Lock() + defer fs.mu.Unlock() if err := file.RenameFile(path, fs.path); err != nil { return err } - if err = file.SyncDir(filepath.Dir(fs.path)); err != nil { + if err := file.SyncDir(filepath.Dir(fs.path)); err != nil { return err } - // Update the written version to the current version - fs.writtenVersion = v + return nil } diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index c476074b553..b44e7ab7610 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -1570,7 +1570,7 @@ func TestMeasurementFieldSet_SaveLoad(t *testing.T) { if err != nil { t.Fatalf("NewMeasurementFieldSet error: %v", err) } - + defer mf.Close() fields := mf.CreateFieldsIfNotExists([]byte("cpu")) if err := fields.CreateFieldIfNotExists([]byte("value"), influxql.Float); err != nil { t.Fatalf("create field error: %v", err) @@ -1580,12 +1580,12 @@ func TestMeasurementFieldSet_SaveLoad(t *testing.T) { t.Fatalf("save error: %v", err) } - mf, err = tsdb.NewMeasurementFieldSet(path) + mf2, err := tsdb.NewMeasurementFieldSet(path) if err != nil { t.Fatalf("NewMeasurementFieldSet error: %v", err) } - - fields = mf.FieldsByString("cpu") + defer mf2.Close() + fields = mf2.FieldsByString("cpu") field := fields.Field("value") if field == nil { t.Fatalf("field is null") @@ -1601,36 +1601,36 @@ func TestMeasurementFieldSet_Corrupt(t *testing.T) { defer cleanup() path := filepath.Join(dir, "fields.idx") - mf, err := tsdb.NewMeasurementFieldSet(path) - if err != nil { - t.Fatalf("NewMeasurementFieldSet error: %v", err) - } - - fields := mf.CreateFieldsIfNotExists([]byte("cpu")) - if err := fields.CreateFieldIfNotExists([]byte("value"), influxql.Float); err != nil { - t.Fatalf("create field error: %v", err) - } - - if err := mf.Save(); err != nil { - t.Fatalf("save error: %v", err) - } + func() { + mf, err := tsdb.NewMeasurementFieldSet(path) + if err != nil { + t.Fatalf("NewMeasurementFieldSet error: %v", err) + } + defer mf.Close() + fields := mf.CreateFieldsIfNotExists([]byte("cpu")) + if err := fields.CreateFieldIfNotExists([]byte("value"), influxql.Float); err != nil { + t.Fatalf("create field error: %v", err) + } + if err := mf.Save(); err != nil { + t.Fatalf("save error: %v", err) + } + }() stat, err := os.Stat(path) if err != nil { t.Fatalf("stat error: %v", err) } - // Truncate the file to simulate a a corrupted file if err := os.Truncate(path, stat.Size()-3); err != nil { t.Fatalf("truncate error: %v", err) } - - mf, err = tsdb.NewMeasurementFieldSet(path) + mf, err := tsdb.NewMeasurementFieldSet(path) if err == nil { t.Fatal("NewMeasurementFieldSet expected error") } + defer mf.Close() - fields = mf.FieldsByString("cpu") + fields := mf.FieldsByString("cpu") if fields != nil { t.Fatal("expecte fields to be nil") } @@ -1644,7 +1644,7 @@ func TestMeasurementFieldSet_DeleteEmpty(t *testing.T) { if err != nil { t.Fatalf("NewMeasurementFieldSet error: %v", err) } - + defer mf.Close() fields := mf.CreateFieldsIfNotExists([]byte("cpu")) if err := fields.CreateFieldIfNotExists([]byte("value"), influxql.Float); err != nil { t.Fatalf("create field error: %v", err) @@ -1653,13 +1653,12 @@ func TestMeasurementFieldSet_DeleteEmpty(t *testing.T) { if err := mf.Save(); err != nil { t.Fatalf("save error: %v", err) } - - mf, err = tsdb.NewMeasurementFieldSet(path) + mf2, err := tsdb.NewMeasurementFieldSet(path) if err != nil { t.Fatalf("NewMeasurementFieldSet error: %v", err) } - - fields = mf.FieldsByString("cpu") + defer mf2.Close() + fields = mf2.FieldsByString("cpu") field := fields.Field("value") if field == nil { t.Fatalf("field is null") @@ -1669,9 +1668,9 @@ func TestMeasurementFieldSet_DeleteEmpty(t *testing.T) { t.Fatalf("field type mismatch: got %v, exp %v", got, exp) } - mf.Delete("cpu") + mf2.Delete("cpu") - if err := mf.Save(); err != nil { + if err := mf2.Save(); err != nil { t.Fatalf("save after delete error: %v", err) } @@ -1690,10 +1689,11 @@ func TestMeasurementFieldSet_InvalidFormat(t *testing.T) { t.Fatalf("error writing fields.index: %v", err) } - _, err := tsdb.NewMeasurementFieldSet(path) + mf, err := tsdb.NewMeasurementFieldSet(path) if err != tsdb.ErrUnknownFieldsFormat { t.Fatalf("unexpected error: got %v, exp %v", err, tsdb.ErrUnknownFieldsFormat) } + defer mf.Close() } func TestMeasurementFieldSet_ConcurrentSave(t *testing.T) { @@ -1721,6 +1721,7 @@ func TestMeasurementFieldSet_ConcurrentSave(t *testing.T) { if err != nil { t.Fatalf("NewMeasurementFieldSet error: %v", err) } + defer mfs.Close() var wg sync.WaitGroup wg.Add(len(ft)) @@ -1733,6 +1734,7 @@ func TestMeasurementFieldSet_ConcurrentSave(t *testing.T) { if err != nil { t.Fatalf("NewMeasurementFieldSet error: %v", err) } + defer mfs2.Close() for i, fs := range ft { mf := mfs.Fields([]byte(mt[i])) mf2 := mfs2.Fields([]byte(mt[i])) @@ -1745,7 +1747,6 @@ func TestMeasurementFieldSet_ConcurrentSave(t *testing.T) { } } } - } func testFieldMaker(t *testing.T, wg *sync.WaitGroup, mf *tsdb.MeasurementFieldSet, measurement string, fieldNames []string) {