diff --git a/pkg/sql/logictest/testdata/logic_test/show_source b/pkg/sql/logictest/testdata/logic_test/show_source index aeb45bb4b6e8..05abca2c432f 100644 --- a/pkg/sql/logictest/testdata/logic_test/show_source +++ b/pkg/sql/logictest/testdata/logic_test/show_source @@ -67,6 +67,7 @@ kv.allocator.range_rebalance_threshold 5E-02 f minimum kv.allocator.stat_based_rebalancing.enabled false b set to enable rebalancing of range replicas based on write load and disk usage kv.allocator.stat_rebalance_threshold 2E-01 f minimum fraction away from the mean a store's stats (like disk usage or writes per second) can be before it is considered overfull or underfull kv.bulk_io_write.max_rate 8.0 EiB z the rate limit (bytes/sec) to use for writes to disk on behalf of bulk io ops +kv.bulk_sst.sync_size 2.0 MiB z threshold after which non-Rocks SST writes must fsync (0 disables) kv.raft.command.max_size 64 MiB z maximum size of a raft command kv.raft_log.synchronize true b set to true to synchronize on Raft log writes to persistent storage kv.range_descriptor_cache.size 1000000 i maximum number of entries in the range descriptor and leaseholder caches diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 15f5d2d88af7..ff1120b0c84f 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -17,7 +17,6 @@ package storage import ( "context" "fmt" - "io/ioutil" "os" "path/filepath" "time" @@ -35,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/rditer" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/fileutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -364,7 +364,7 @@ func addSSTablePreApply( } } - if err := ioutil.WriteFile(path, sst.Data, 0600); err != nil { + if err := fileutil.WriteFileSyncing(path, sst.Data, 0600, sstWriteSyncRate.Get(&st.SV)); err != nil { log.Fatalf(ctx, "while ingesting %s: %s", path, err) } } diff --git a/pkg/storage/replica_sideload_disk.go b/pkg/storage/replica_sideload_disk.go index e4637dac3fda..8f49457beda0 100644 --- a/pkg/storage/replica_sideload_disk.go +++ b/pkg/storage/replica_sideload_disk.go @@ -24,7 +24,9 @@ import ( "strings" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/fileutil" "github.com/pkg/errors" ) @@ -36,6 +38,13 @@ type diskSideloadStorage struct { dirCreated bool } +// sstWriteSyncRate wraps "kv.bulk_sst.sync_size". +var sstWriteSyncRate = settings.RegisterByteSizeSetting( + "kv.bulk_sst.sync_size", + "threshold after which non-Rocks SST writes must fsync (0 disables)", + 2048<<10, +) + func newDiskSideloadStorage( st *cluster.Settings, rangeID roachpb.RangeID, replicaID roachpb.ReplicaID, baseDir string, ) (sideloadStorage, error) { @@ -78,7 +87,7 @@ func (ss *diskSideloadStorage) PutIfNotExists( for { // Use 0644 since that's what RocksDB uses: // https://github.com/facebook/rocksdb/blob/56656e12d67d8a63f1e4c4214da9feeec2bd442b/env/env_posix.cc#L171 - if err := ioutil.WriteFile(filename, contents, 0644); err == nil { + if err := fileutil.WriteFileSyncing(filename, contents, 0644, sstWriteSyncRate.Get(&ss.st.SV)); err == nil { return nil } else if !os.IsNotExist(err) { return err diff --git a/pkg/util/fileutil/syncing_write.go b/pkg/util/fileutil/syncing_write.go new file mode 100644 index 000000000000..214c1125c789 --- /dev/null +++ b/pkg/util/fileutil/syncing_write.go @@ -0,0 +1,64 @@ +// Copyright 2017 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package fileutil + +import ( + "io" + "io/ioutil" + "os" +) + +// WriteFileSyncing is essentially ioutil.WriteFile -- writes data to a file +// named by filename -- but with an fsync every `syncBytes` to provide +// back-pressure smooth out disk IO, as mentioned in #20352 and #20279. If the +// file does not exist, WriteFile creates it with permissions perm; otherwise +// WriteFile truncates it before writing. Passing syncBytes=0 disables syncing +// (since syncBytes may likely be the result of reading a cluster setting). +func WriteFileSyncing(filename string, data []byte, perm os.FileMode, syncBytes int64) error { + if syncBytes == 0 { + return ioutil.WriteFile(filename, data, perm) + } + + f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm) + if err != nil { + return err + } + + for i := int64(0); i < int64(len(data)); i += syncBytes { + end := i + syncBytes + if l := int64(len(data)); end > l { + end = l + } + chunk := data[i:end] + + var wrote int + wrote, err = f.Write(chunk) + if err == nil && wrote < len(chunk) { + err = io.ErrShortWrite + } + if err == nil { + err = f.Sync() + } + if err != nil { + break + } + } + + closeErr := f.Close() + if err == nil { + err = closeErr + } + return err +}