diff --git a/c-deps/libroach/batch.cc b/c-deps/libroach/batch.cc index f6f070ec4254..e7aa3fe76038 100644 --- a/c-deps/libroach/batch.cc +++ b/c-deps/libroach/batch.cc @@ -515,6 +515,18 @@ DBString DBBatch::GetCompactionStats() { return ToDBString("unsupported"); } DBStatus DBBatch::EnvWriteFile(DBSlice path, DBSlice contents) { return FmtStatus("unsupported"); } +DBStatus DBBatch::EnvOpenFile(DBSlice path, rocksdb::WritableFile** file) { + return FmtStatus("unsupported"); +} + +DBStatus DBBatch::EnvCloseFile(rocksdb::WritableFile* file) { return FmtStatus("unsupported"); } + +DBStatus DBBatch::EnvSyncFile(rocksdb::WritableFile* file) { return FmtStatus("unsupported"); } + +DBStatus DBBatch::EnvAppendFile(rocksdb::WritableFile* file, DBSlice contents) { + return FmtStatus("unsupported"); +} + DBWriteOnlyBatch::DBWriteOnlyBatch(DBEngine* db) : DBEngine(db->rep, db->iters), updates(0) {} DBWriteOnlyBatch::~DBWriteOnlyBatch() {} @@ -582,6 +594,22 @@ DBStatus DBWriteOnlyBatch::EnvWriteFile(DBSlice path, DBSlice contents) { return FmtStatus("unsupported"); } +DBStatus DBWriteOnlyBatch::EnvOpenFile(DBSlice path, rocksdb::WritableFile** file) { + return FmtStatus("unsupported"); +} + +DBStatus DBWriteOnlyBatch::EnvCloseFile(rocksdb::WritableFile* file) { + return FmtStatus("unsupported"); +} + +DBStatus DBWriteOnlyBatch::EnvSyncFile(rocksdb::WritableFile* file) { + return FmtStatus("unsupported"); +} + +DBStatus DBWriteOnlyBatch::EnvAppendFile(rocksdb::WritableFile* file, DBSlice contents) { + return FmtStatus("unsupported"); +} + rocksdb::WriteBatch::Handler* GetDBBatchInserter(::rocksdb::WriteBatchBase* batch) { return new DBBatchInserter(batch); } diff --git a/c-deps/libroach/batch.h b/c-deps/libroach/batch.h index 81df6cedcb23..9700e8107fa0 100644 --- a/c-deps/libroach/batch.h +++ b/c-deps/libroach/batch.h @@ -41,6 +41,10 @@ struct DBBatch : public DBEngine { virtual DBStatus GetStats(DBStatsResult* stats); virtual DBString GetCompactionStats(); virtual DBStatus EnvWriteFile(DBSlice path, DBSlice contents); + virtual DBStatus EnvOpenFile(DBSlice path, rocksdb::WritableFile** file); + virtual DBStatus EnvAppendFile(rocksdb::WritableFile* file, DBSlice contents); + virtual DBStatus EnvSyncFile(rocksdb::WritableFile* file); + virtual DBStatus EnvCloseFile(rocksdb::WritableFile* file); }; struct DBWriteOnlyBatch : public DBEngine { @@ -62,6 +66,10 @@ struct DBWriteOnlyBatch : public DBEngine { virtual DBStatus GetStats(DBStatsResult* stats); virtual DBString GetCompactionStats(); virtual DBStatus EnvWriteFile(DBSlice path, DBSlice contents); + virtual DBStatus EnvOpenFile(DBSlice path, rocksdb::WritableFile** file); + virtual DBStatus EnvAppendFile(rocksdb::WritableFile* file, DBSlice contents); + virtual DBStatus EnvSyncFile(rocksdb::WritableFile* file); + virtual DBStatus EnvCloseFile(rocksdb::WritableFile* file); }; // GetDBBatchInserter returns a WriteBatch::Handler that operates on a diff --git a/c-deps/libroach/db.cc b/c-deps/libroach/db.cc index 29a4b046db60..28a63fa61823 100644 --- a/c-deps/libroach/db.cc +++ b/c-deps/libroach/db.cc @@ -420,6 +420,22 @@ DBStatus DBEnvWriteFile(DBEngine* db, DBSlice path, DBSlice contents) { return db->EnvWriteFile(path, contents); } +DBStatus DBEnvOpenFile(DBEngine* db, DBSlice path, DBWritableFile* file) { + return db->EnvOpenFile(path, (rocksdb::WritableFile**)file); +} + +DBStatus DBEnvCloseFile(DBEngine* db, DBWritableFile file) { + return db->EnvCloseFile((rocksdb::WritableFile*)file); +} + +DBStatus DBEnvSyncFile(DBEngine* db, DBWritableFile file) { + return db->EnvSyncFile((rocksdb::WritableFile*)file); +} + +DBStatus DBEnvAppendFile(DBEngine* db, DBWritableFile file, DBSlice contents) { + return db->EnvAppendFile((rocksdb::WritableFile*)file, contents); +} + DBIterator* DBNewIter(DBEngine* db, bool prefix, bool stats) { rocksdb::ReadOptions opts; opts.prefix_same_as_start = prefix; diff --git a/c-deps/libroach/engine.cc b/c-deps/libroach/engine.cc index d07ca2698858..bbd373d0044d 100644 --- a/c-deps/libroach/engine.cc +++ b/c-deps/libroach/engine.cc @@ -223,4 +223,38 @@ DBStatus DBImpl::EnvWriteFile(DBSlice path, DBSlice contents) { return kSuccess; } +// EnvOpenFile opens a new file in the given engine. +DBStatus DBImpl::EnvOpenFile(DBSlice path, rocksdb::WritableFile** file) { + rocksdb::Status status; + const rocksdb::EnvOptions soptions; + rocksdb::unique_ptr rocksdb_file; + + // Create the file. + status = this->rep->GetEnv()->NewWritableFile(ToString(path), &rocksdb_file, soptions); + if (!status.ok()) { + return ToDBStatus(status); + } + *file = rocksdb_file.release(); + return kSuccess; +} + +// CloseFile closes the given file in the given engine. +DBStatus DBImpl::EnvCloseFile(rocksdb::WritableFile* file) { + rocksdb::Status status = file->Close(); + delete file; + return ToDBStatus(status); +} + +// EnvAppendFile appends the given data to the file in the given engine. +DBStatus DBImpl::EnvAppendFile(rocksdb::WritableFile* file, DBSlice contents) { + rocksdb::Status status = file->Append(ToSlice(contents)); + return ToDBStatus(status); +} + +// EnvSyncFile synchronously writes the data of the file to the disk. +DBStatus DBImpl::EnvSyncFile(rocksdb::WritableFile* file) { + rocksdb::Status status = file->Sync(); + return ToDBStatus(status); +} + } // namespace cockroach diff --git a/c-deps/libroach/engine.h b/c-deps/libroach/engine.h index 90371193814a..63c2d28ce4d0 100644 --- a/c-deps/libroach/engine.h +++ b/c-deps/libroach/engine.h @@ -42,6 +42,10 @@ struct DBEngine { virtual DBStatus GetStats(DBStatsResult* stats) = 0; virtual DBString GetCompactionStats() = 0; virtual DBStatus EnvWriteFile(DBSlice path, DBSlice contents) = 0; + virtual DBStatus EnvOpenFile(DBSlice path, rocksdb::WritableFile** file) = 0; + virtual DBStatus EnvAppendFile(rocksdb::WritableFile* file, DBSlice contents) = 0; + virtual DBStatus EnvSyncFile(rocksdb::WritableFile* file) = 0; + virtual DBStatus EnvCloseFile(rocksdb::WritableFile* file) = 0; DBSSTable* GetSSTables(int* n); DBString GetUserProperties(); @@ -78,6 +82,10 @@ struct DBImpl : public DBEngine { virtual DBStatus GetStats(DBStatsResult* stats); virtual DBString GetCompactionStats(); virtual DBStatus EnvWriteFile(DBSlice path, DBSlice contents); + virtual DBStatus EnvOpenFile(DBSlice path, rocksdb::WritableFile** file); + virtual DBStatus EnvAppendFile(rocksdb::WritableFile* file, DBSlice contents); + virtual DBStatus EnvSyncFile(rocksdb::WritableFile* file); + virtual DBStatus EnvCloseFile(rocksdb::WritableFile* file); }; } // namespace cockroach diff --git a/c-deps/libroach/include/libroach.h b/c-deps/libroach/include/libroach.h index dacc23e688f8..fc94f5e1706d 100644 --- a/c-deps/libroach/include/libroach.h +++ b/c-deps/libroach/include/libroach.h @@ -61,6 +61,7 @@ typedef struct { typedef struct DBCache DBCache; typedef struct DBEngine DBEngine; typedef struct DBIterator DBIterator; +typedef void* DBWritableFile; // DBOptions contains local database options. typedef struct { @@ -359,6 +360,19 @@ void DBRunLDB(int argc, char** argv); // DBEnvWriteFile writes the given data as a new "file" in the given engine. DBStatus DBEnvWriteFile(DBEngine* db, DBSlice path, DBSlice contents); +// DBEnvOpenFile opens a DBWritableFile as a new "file" in the given engine. +DBStatus DBEnvOpenFile(DBEngine* db, DBSlice path, DBWritableFile* file); + +// DBEnvAppendFile appends the given data to the given DBWritableFile in the +// given engine. +DBStatus DBEnvAppendFile(DBEngine* db, DBWritableFile file, DBSlice contents); + +// DBEnvSyncFile synchronously writes the data of the file to the disk. +DBStatus DBEnvSyncFile(DBEngine* db, DBWritableFile file); + +// DBEnvCloseFile closes the given DBWritableFile in the given engine. +DBStatus DBEnvCloseFile(DBEngine* db, DBWritableFile file); + // DBFileLock contains various parameters set during DBLockFile and required for DBUnlockFile. typedef void* DBFileLock; diff --git a/c-deps/libroach/snapshot.cc b/c-deps/libroach/snapshot.cc index 7d3d9ee971a0..916f048ecd8b 100644 --- a/c-deps/libroach/snapshot.cc +++ b/c-deps/libroach/snapshot.cc @@ -57,4 +57,16 @@ DBStatus DBSnapshot::EnvWriteFile(DBSlice path, DBSlice contents) { return FmtStatus("unsupported"); } +DBStatus DBSnapshot::EnvOpenFile(DBSlice path, rocksdb::WritableFile** file) { + return FmtStatus("unsupported"); +} + +DBStatus DBSnapshot::EnvCloseFile(rocksdb::WritableFile* file) { return FmtStatus("unsupported"); } + +DBStatus DBSnapshot::EnvSyncFile(rocksdb::WritableFile* file) { return FmtStatus("unsupported"); } + +DBStatus DBSnapshot::EnvAppendFile(rocksdb::WritableFile* file, DBSlice contents) { + return FmtStatus("unsupported"); +} + } // namespace cockroach diff --git a/c-deps/libroach/snapshot.h b/c-deps/libroach/snapshot.h index 033e446bc323..1cc5600e6493 100644 --- a/c-deps/libroach/snapshot.h +++ b/c-deps/libroach/snapshot.h @@ -38,6 +38,10 @@ struct DBSnapshot : public DBEngine { virtual DBStatus GetStats(DBStatsResult* stats); virtual DBString GetCompactionStats(); virtual DBStatus EnvWriteFile(DBSlice path, DBSlice contents); + virtual DBStatus EnvOpenFile(DBSlice path, rocksdb::WritableFile** file); + virtual DBStatus EnvAppendFile(rocksdb::WritableFile* file, DBSlice contents); + virtual DBStatus EnvSyncFile(rocksdb::WritableFile* file); + virtual DBStatus EnvCloseFile(rocksdb::WritableFile* file); }; } // namespace cockroach diff --git a/pkg/storage/client_test.go b/pkg/storage/client_test.go index 4fe9f9ea597a..baf285aede0e 100644 --- a/pkg/storage/client_test.go +++ b/pkg/storage/client_test.go @@ -202,6 +202,8 @@ type multiTestContext struct { dbs []*client.DB gossips []*gossip.Gossip storePools []*storage.StorePool + dirCleanups []func() + caches []engine.RocksDBCache // We use multiple stoppers so we can restart different parts of the // test individually. transportStopper is for 'transport', and the // 'stoppers' slice corresponds to the 'stores'. @@ -339,6 +341,14 @@ func (m *multiTestContext) Stop() { } m.transportStopper.Stop(context.TODO()) + for _, cleanup := range m.dirCleanups { + cleanup() + } + + for _, cache := range m.caches { + cache.Release() + } + for _, s := range m.engineStoppers { s.Stop(context.TODO()) } @@ -693,7 +703,20 @@ func (m *multiTestContext) addStore(idx int) { } else { engineStopper := stop.NewStopper() m.engineStoppers = append(m.engineStoppers, engineStopper) - eng = engine.NewInMem(roachpb.Attributes{}, 1<<20) + + dir, cleanup := testutils.TempDir(m.t) + cache := engine.NewRocksDBCache(1 << 20) + var err error + eng, err = engine.NewRocksDB(engine.RocksDBConfig{ + Dir: dir, + MustExist: false, + }, cache) + if err != nil { + m.t.Fatal(err) + } + + m.dirCleanups = append(m.dirCleanups, cleanup) + m.caches = append(m.caches, cache) engineStopper.AddCloser(eng) m.engines = append(m.engines, eng) needBootstrap = true diff --git a/pkg/storage/engine/engine.go b/pkg/storage/engine/engine.go index 0d5e9756aad6..cd4af68039c4 100644 --- a/pkg/storage/engine/engine.go +++ b/pkg/storage/engine/engine.go @@ -283,6 +283,8 @@ type Engine interface { // that the key range is compacted all the way to the bottommost level of // SSTables, which is necessary to pick up changes to bloom filters. CompactRange(start, end roachpb.Key, forceBottommost bool) error + // OpenFile opens a DBFile with the given filename. + OpenFile(filename string) (DBFile, error) } // WithSSTables extends the Engine interface with a method to get info diff --git a/pkg/storage/engine/rocksdb.go b/pkg/storage/engine/rocksdb.go index 567900c13825..8b8d8242f513 100644 --- a/pkg/storage/engine/rocksdb.go +++ b/pkg/storage/engine/rocksdb.go @@ -2616,6 +2616,16 @@ func (r *RocksDB) WriteFile(filename string, data []byte) error { return statusToError(C.DBEnvWriteFile(r.rdb, goToCSlice([]byte(filename)), goToCSlice(data))) } +// OpenFile opens a DBFile, which is essentially a rocksdb WritableFile +// with the given filename, in this RocksDB's env. +func (r *RocksDB) OpenFile(filename string) (DBFile, error) { + var file C.DBWritableFile + if err := statusToError(C.DBEnvOpenFile(r.rdb, goToCSlice([]byte(filename)), &file)); err != nil { + return nil, err + } + return &rocksdbFile{file: file, rdb: r.rdb}, nil +} + // IsValidSplitKey returns whether the key is a valid split key. Certain key // ranges cannot be split (the meta1 span and the system DB span); split keys // chosen within any of these ranges are considered invalid. And a split key @@ -2661,3 +2671,35 @@ func mvccScanDecodeKeyValue(repr []byte) (key MVCCKey, value []byte, orepr []byt key, err = DecodeKey(rawKey) return key, value, repr, err } + +// DBFile is an interface for interacting with DBWritableFile in RocksDB. +type DBFile interface { + // Append appends data to this DBFile. + Append(data []byte) error + // Close closes this DBFile. + Close() error + // Sync synchronously flushes this DBFile's data to disk. + Sync() error +} + +// rocksdbFile implements DBFile interface. It is used to interact with the +// DBWritableFile in the corresponding RocksDB env. +type rocksdbFile struct { + file C.DBWritableFile + rdb *C.DBEngine +} + +// Append implements the DBFile interface. +func (f *rocksdbFile) Append(data []byte) error { + return statusToError(C.DBEnvAppendFile(f.rdb, f.file, goToCSlice(data))) +} + +// Close implements the DBFile interface. +func (f *rocksdbFile) Close() error { + return statusToError(C.DBEnvCloseFile(f.rdb, f.file)) +} + +// Sync implements the DBFile interface. +func (f *rocksdbFile) Sync() error { + return statusToError(C.DBEnvSyncFile(f.rdb, f.file)) +} diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 68fc053a315f..8b04fcf54611 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -937,6 +937,7 @@ func (r *Replica) setReplicaIDRaftMuLockedMuLocked(replicaID roachpb.ReplicaID) replicaID, r.store.Engine().GetAuxiliaryDir(), r.store.limiters.BulkIOWriteRate, + r.store.engine, ); err != nil { return errors.Wrap(err, "while initializing sideloaded storage") } diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 003dfc298c24..3f81649484fb 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -417,7 +417,7 @@ func addSSTablePreApply( } } - if err := writeFileSyncing(ctx, path, sst.Data, 0600, st, limiter); err != nil { + if err := writeFileSyncing(ctx, path, sst.Data, eng, 0600, st, limiter); err != nil { log.Fatalf(ctx, "while ingesting %s: %s", path, err) } copied = true diff --git a/pkg/storage/replica_sideload_disk.go b/pkg/storage/replica_sideload_disk.go index 4a8357213341..5893e1eacb33 100644 --- a/pkg/storage/replica_sideload_disk.go +++ b/pkg/storage/replica_sideload_disk.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/pkg/errors" "golang.org/x/time/rate" ) @@ -36,6 +37,7 @@ type diskSideloadStorage struct { limiter *rate.Limiter dir string dirCreated bool + eng engine.Engine } func newDiskSideloadStorage( @@ -44,6 +46,7 @@ func newDiskSideloadStorage( replicaID roachpb.ReplicaID, baseDir string, limiter *rate.Limiter, + eng engine.Engine, ) (sideloadStorage, error) { ss := &diskSideloadStorage{ dir: filepath.Join( @@ -52,14 +55,15 @@ func newDiskSideloadStorage( fmt.Sprintf("%d", rangeID%1000), // sharding fmt.Sprintf("%d.%d", rangeID, replicaID), ), + eng: eng, st: st, limiter: limiter, } return ss, nil } -func (ss *diskSideloadStorage) createDir() error { - err := os.MkdirAll(ss.dir, 0755) +func (ss *diskSideloadStorage) createDir(dir string) error { + err := os.MkdirAll(dir, 0755) ss.dirCreated = ss.dirCreated || err == nil return err } @@ -75,12 +79,14 @@ func (ss *diskSideloadStorage) Put(ctx context.Context, index, term uint64, cont for { // Use 0644 since that's what RocksDB uses: // https://github.com/facebook/rocksdb/blob/56656e12d67d8a63f1e4c4214da9feeec2bd442b/env/env_posix.cc#L171 - if err := writeFileSyncing(ctx, filename, contents, 0644, ss.st, ss.limiter); err == nil { + if err := writeFileSyncing(ctx, filename, contents, ss.eng, 0644, ss.st, ss.limiter); err == nil { return nil } else if !os.IsNotExist(err) { return err } - if err := ss.createDir(); err != nil { + pieces := strings.Split(filename, "/") + dir := strings.Join(pieces[:len(pieces)-1], "/") + if err := ss.createDir(dir); err != nil { return err } continue diff --git a/pkg/storage/replica_sideload_inmem.go b/pkg/storage/replica_sideload_inmem.go index f977006f6b1c..e6627786adeb 100644 --- a/pkg/storage/replica_sideload_inmem.go +++ b/pkg/storage/replica_sideload_inmem.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage/engine" ) type slKey struct { @@ -35,7 +36,7 @@ type inMemSideloadStorage struct { func mustNewInMemSideloadStorage( rangeID roachpb.RangeID, replicaID roachpb.ReplicaID, baseDir string, ) sideloadStorage { - ss, err := newInMemSideloadStorage(cluster.MakeTestingClusterSettings(), rangeID, replicaID, baseDir) + ss, err := newInMemSideloadStorage(cluster.MakeTestingClusterSettings(), rangeID, replicaID, baseDir, nil) if err != nil { panic(err) } @@ -43,7 +44,11 @@ func mustNewInMemSideloadStorage( } func newInMemSideloadStorage( - _ *cluster.Settings, rangeID roachpb.RangeID, replicaID roachpb.ReplicaID, baseDir string, + _ *cluster.Settings, + rangeID roachpb.RangeID, + replicaID roachpb.ReplicaID, + baseDir string, + eng engine.Engine, ) (sideloadStorage, error) { return &inMemSideloadStorage{ prefix: filepath.Join(baseDir, fmt.Sprintf("%d.%d", rangeID, replicaID)), diff --git a/pkg/storage/replica_sideload_test.go b/pkg/storage/replica_sideload_test.go index 4599a689f64e..8f96cad26168 100644 --- a/pkg/storage/replica_sideload_test.go +++ b/pkg/storage/replica_sideload_test.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -89,9 +90,9 @@ func TestSideloadingSideloadedStorage(t *testing.T) { }) t.Run("Disk", func(t *testing.T) { maker := func( - s *cluster.Settings, rangeID roachpb.RangeID, rep roachpb.ReplicaID, name string, + s *cluster.Settings, rangeID roachpb.RangeID, rep roachpb.ReplicaID, name string, eng engine.Engine, ) (sideloadStorage, error) { - return newDiskSideloadStorage(s, rangeID, rep, name, rate.NewLimiter(rate.Inf, math.MaxInt64)) + return newDiskSideloadStorage(s, rangeID, rep, name, rate.NewLimiter(rate.Inf, math.MaxInt64), eng) } testSideloadingSideloadedStorage(t, maker) }) @@ -99,7 +100,7 @@ func TestSideloadingSideloadedStorage(t *testing.T) { func testSideloadingSideloadedStorage( t *testing.T, - maker func(*cluster.Settings, roachpb.RangeID, roachpb.ReplicaID, string) (sideloadStorage, error), + maker func(*cluster.Settings, roachpb.RangeID, roachpb.ReplicaID, string, engine.Engine) (sideloadStorage, error), ) { dir, cleanup := testutils.TempDir(t) defer cleanup() @@ -107,7 +108,12 @@ func testSideloadingSideloadedStorage( ctx := context.Background() st := cluster.MakeTestingClusterSettings() - ss, err := maker(st, 1, 2, dir) + cleanup, cache, eng := newRocksDB(t) + defer cleanup() + defer cache.Release() + defer eng.Close() + + ss, err := maker(st, 1, 2, dir, eng) if err != nil { t.Fatal(err) } @@ -224,7 +230,7 @@ func testSideloadingSideloadedStorage( } // Verify a sideloaded storage for another ReplicaID doesn't see the files. - if otherSS, err := maker(st, 1, 999 /* ReplicaID */, dir); err != nil { + if otherSS, err := maker(st, 1, 999 /* ReplicaID */, dir, eng); err != nil { t.Fatal(err) } else if _, err = otherSS.Get(ctx, payloads[0], highTerm); err != errSideloadedFileNotFound { t.Fatal("expected not found") @@ -234,7 +240,7 @@ func testSideloadingSideloadedStorage( // one), which shouldn't change anything about its state. if !isInMem { var err error - ss, err = maker(st, 1, 2, dir) + ss, err = maker(st, 1, 2, dir, eng) if err != nil { t.Fatal(err) } @@ -708,6 +714,19 @@ func (mr *mockSender) Recv() (*SnapshotResponse, error) { return &SnapshotResponse{Status: status}, nil } +func newRocksDB(t *testing.T) (func(), engine.RocksDBCache, *engine.RocksDB) { + dir, cleanup := testutils.TempDir(t) + cache := engine.NewRocksDBCache(1 << 20) + eng, err := engine.NewRocksDB(engine.RocksDBConfig{ + Dir: dir, + MustExist: false, + }, cache) + if err != nil { + t.Fatal(err) + } + return cleanup, cache, eng +} + // This test verifies that when a snapshot is sent, sideloaded proposals are // inlined. func TestRaftSSTableSideloadingSnapshot(t *testing.T) { @@ -718,6 +737,13 @@ func TestRaftSSTableSideloadingSnapshot(t *testing.T) { tc := testContext{} stopper := stop.NewStopper() defer stopper.Stop(ctx) + + cleanup, cache, eng := newRocksDB(t) + tc.engine = eng + defer cleanup() + defer cache.Release() + defer eng.Close() + tc.Start(t, stopper) var ba roachpb.BatchRequest @@ -968,6 +994,13 @@ func TestRaftSSTableSideloadingUpdatedReplicaID(t *testing.T) { tc := testContext{} stopper := stop.NewStopper() defer stopper.Stop(context.TODO()) + + cleanup, cache, eng := newRocksDB(t) + tc.engine = eng + defer cleanup() + defer cache.Release() + defer eng.Close() + tc.Start(t, stopper) repl := tc.repl ctx := context.Background() @@ -1004,6 +1037,7 @@ func TestRaftSSTableSideloadingUpdatedReplicaID(t *testing.T) { _, err = repl.raftMu.sideloaded.Get(ctx, index, term) repl.raftMu.Unlock() + log.Infof(ctx, "olddir is %s, newdir is %s", oldDir, newDir) if err != nil { t.Fatal(err) } diff --git a/pkg/storage/syncing_write.go b/pkg/storage/syncing_write.go index d4a7ba97841f..059795ff3652 100644 --- a/pkg/storage/syncing_write.go +++ b/pkg/storage/syncing_write.go @@ -16,13 +16,14 @@ package storage import ( "context" - "io" "os" "runtime/debug" + "strings" "time" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "golang.org/x/time/rate" @@ -75,6 +76,7 @@ func writeFileSyncing( ctx context.Context, filename string, data []byte, + eng engine.Engine, perm os.FileMode, settings *cluster.Settings, limiter *rate.Limiter, @@ -86,8 +88,11 @@ func writeFileSyncing( sync = false } - f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm) + f, err := eng.OpenFile(filename) if err != nil { + if strings.Contains(err.Error(), "No such file or directory") { + return os.ErrNotExist + } return err } @@ -100,12 +105,7 @@ func writeFileSyncing( // rate limit limitBulkIOWrite(ctx, limiter, len(chunk)) - - var wrote int - wrote, err = f.Write(chunk) - if err == nil && wrote < len(chunk) { - err = io.ErrShortWrite - } + err = f.Append(chunk) if err == nil && sync { err = f.Sync() }