Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions c-deps/libroach/batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,18 @@ DBString DBBatch::GetCompactionStats() { return ToDBString("unsupported"); }

DBStatus DBBatch::EnvWriteFile(DBSlice path, DBSlice contents) { return FmtStatus("unsupported"); }

DBStatus DBBatch::EnvOpenFile(DBSlice path, rocksdb::WritableFile** file) {
return FmtStatus("unsupported");
}

DBStatus DBBatch::EnvCloseFile(rocksdb::WritableFile* file) { return FmtStatus("unsupported"); }

DBStatus DBBatch::EnvSyncFile(rocksdb::WritableFile* file) { return FmtStatus("unsupported"); }

DBStatus DBBatch::EnvAppendFile(rocksdb::WritableFile* file, DBSlice contents) {
return FmtStatus("unsupported");
}

DBWriteOnlyBatch::DBWriteOnlyBatch(DBEngine* db) : DBEngine(db->rep, db->iters), updates(0) {}

DBWriteOnlyBatch::~DBWriteOnlyBatch() {}
Expand Down Expand Up @@ -582,6 +594,22 @@ DBStatus DBWriteOnlyBatch::EnvWriteFile(DBSlice path, DBSlice contents) {
return FmtStatus("unsupported");
}

DBStatus DBWriteOnlyBatch::EnvOpenFile(DBSlice path, rocksdb::WritableFile** file) {
return FmtStatus("unsupported");
}

DBStatus DBWriteOnlyBatch::EnvCloseFile(rocksdb::WritableFile* file) {
return FmtStatus("unsupported");
}

DBStatus DBWriteOnlyBatch::EnvSyncFile(rocksdb::WritableFile* file) {
return FmtStatus("unsupported");
}

DBStatus DBWriteOnlyBatch::EnvAppendFile(rocksdb::WritableFile* file, DBSlice contents) {
return FmtStatus("unsupported");
}

rocksdb::WriteBatch::Handler* GetDBBatchInserter(::rocksdb::WriteBatchBase* batch) {
return new DBBatchInserter(batch);
}
Expand Down
8 changes: 8 additions & 0 deletions c-deps/libroach/batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ struct DBBatch : public DBEngine {
virtual DBStatus GetStats(DBStatsResult* stats);
virtual DBString GetCompactionStats();
virtual DBStatus EnvWriteFile(DBSlice path, DBSlice contents);
virtual DBStatus EnvOpenFile(DBSlice path, rocksdb::WritableFile** file);
virtual DBStatus EnvAppendFile(rocksdb::WritableFile* file, DBSlice contents);
virtual DBStatus EnvSyncFile(rocksdb::WritableFile* file);
virtual DBStatus EnvCloseFile(rocksdb::WritableFile* file);
};

struct DBWriteOnlyBatch : public DBEngine {
Expand All @@ -62,6 +66,10 @@ struct DBWriteOnlyBatch : public DBEngine {
virtual DBStatus GetStats(DBStatsResult* stats);
virtual DBString GetCompactionStats();
virtual DBStatus EnvWriteFile(DBSlice path, DBSlice contents);
virtual DBStatus EnvOpenFile(DBSlice path, rocksdb::WritableFile** file);
virtual DBStatus EnvAppendFile(rocksdb::WritableFile* file, DBSlice contents);
virtual DBStatus EnvSyncFile(rocksdb::WritableFile* file);
virtual DBStatus EnvCloseFile(rocksdb::WritableFile* file);
};

// GetDBBatchInserter returns a WriteBatch::Handler that operates on a
Expand Down
16 changes: 16 additions & 0 deletions c-deps/libroach/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,22 @@ DBStatus DBEnvWriteFile(DBEngine* db, DBSlice path, DBSlice contents) {
return db->EnvWriteFile(path, contents);
}

DBStatus DBEnvOpenFile(DBEngine* db, DBSlice path, DBWritableFile* file) {
return db->EnvOpenFile(path, (rocksdb::WritableFile**)file);
}

DBStatus DBEnvCloseFile(DBEngine* db, DBWritableFile file) {
return db->EnvCloseFile((rocksdb::WritableFile*)file);
}

DBStatus DBEnvSyncFile(DBEngine* db, DBWritableFile file) {
return db->EnvSyncFile((rocksdb::WritableFile*)file);
}

DBStatus DBEnvAppendFile(DBEngine* db, DBWritableFile file, DBSlice contents) {
return db->EnvAppendFile((rocksdb::WritableFile*)file, contents);
}

DBIterator* DBNewIter(DBEngine* db, bool prefix, bool stats) {
rocksdb::ReadOptions opts;
opts.prefix_same_as_start = prefix;
Expand Down
34 changes: 34 additions & 0 deletions c-deps/libroach/engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,38 @@ DBStatus DBImpl::EnvWriteFile(DBSlice path, DBSlice contents) {
return kSuccess;
}

// EnvOpenFile opens a new file in the given engine.
DBStatus DBImpl::EnvOpenFile(DBSlice path, rocksdb::WritableFile** file) {
rocksdb::Status status;
const rocksdb::EnvOptions soptions;
rocksdb::unique_ptr<rocksdb::WritableFile> rocksdb_file;

// Create the file.
status = this->rep->GetEnv()->NewWritableFile(ToString(path), &rocksdb_file, soptions);
if (!status.ok()) {
return ToDBStatus(status);
}
*file = rocksdb_file.release();
return kSuccess;
}

// CloseFile closes the given file in the given engine.
DBStatus DBImpl::EnvCloseFile(rocksdb::WritableFile* file) {
rocksdb::Status status = file->Close();
delete file;
return ToDBStatus(status);
}

// EnvAppendFile appends the given data to the file in the given engine.
DBStatus DBImpl::EnvAppendFile(rocksdb::WritableFile* file, DBSlice contents) {
rocksdb::Status status = file->Append(ToSlice(contents));
return ToDBStatus(status);
}

// EnvSyncFile synchronously writes the data of the file to the disk.
DBStatus DBImpl::EnvSyncFile(rocksdb::WritableFile* file) {
rocksdb::Status status = file->Sync();
return ToDBStatus(status);
}

} // namespace cockroach
8 changes: 8 additions & 0 deletions c-deps/libroach/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ struct DBEngine {
virtual DBStatus GetStats(DBStatsResult* stats) = 0;
virtual DBString GetCompactionStats() = 0;
virtual DBStatus EnvWriteFile(DBSlice path, DBSlice contents) = 0;
virtual DBStatus EnvOpenFile(DBSlice path, rocksdb::WritableFile** file) = 0;
virtual DBStatus EnvAppendFile(rocksdb::WritableFile* file, DBSlice contents) = 0;
virtual DBStatus EnvSyncFile(rocksdb::WritableFile* file) = 0;
virtual DBStatus EnvCloseFile(rocksdb::WritableFile* file) = 0;

DBSSTable* GetSSTables(int* n);
DBString GetUserProperties();
Expand Down Expand Up @@ -78,6 +82,10 @@ struct DBImpl : public DBEngine {
virtual DBStatus GetStats(DBStatsResult* stats);
virtual DBString GetCompactionStats();
virtual DBStatus EnvWriteFile(DBSlice path, DBSlice contents);
virtual DBStatus EnvOpenFile(DBSlice path, rocksdb::WritableFile** file);
virtual DBStatus EnvAppendFile(rocksdb::WritableFile* file, DBSlice contents);
virtual DBStatus EnvSyncFile(rocksdb::WritableFile* file);
virtual DBStatus EnvCloseFile(rocksdb::WritableFile* file);
};

} // namespace cockroach
14 changes: 14 additions & 0 deletions c-deps/libroach/include/libroach.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ typedef struct {
typedef struct DBCache DBCache;
typedef struct DBEngine DBEngine;
typedef struct DBIterator DBIterator;
typedef void* DBWritableFile;

// DBOptions contains local database options.
typedef struct {
Expand Down Expand Up @@ -359,6 +360,19 @@ void DBRunLDB(int argc, char** argv);
// DBEnvWriteFile writes the given data as a new "file" in the given engine.
DBStatus DBEnvWriteFile(DBEngine* db, DBSlice path, DBSlice contents);

// DBEnvOpenFile opens a DBWritableFile as a new "file" in the given engine.
DBStatus DBEnvOpenFile(DBEngine* db, DBSlice path, DBWritableFile* file);

// DBEnvAppendFile appends the given data to the given DBWritableFile in the
// given engine.
DBStatus DBEnvAppendFile(DBEngine* db, DBWritableFile file, DBSlice contents);

// DBEnvSyncFile synchronously writes the data of the file to the disk.
DBStatus DBEnvSyncFile(DBEngine* db, DBWritableFile file);

// DBEnvCloseFile closes the given DBWritableFile in the given engine.
DBStatus DBEnvCloseFile(DBEngine* db, DBWritableFile file);

// DBFileLock contains various parameters set during DBLockFile and required for DBUnlockFile.
typedef void* DBFileLock;

Expand Down
12 changes: 12 additions & 0 deletions c-deps/libroach/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,16 @@ DBStatus DBSnapshot::EnvWriteFile(DBSlice path, DBSlice contents) {
return FmtStatus("unsupported");
}

DBStatus DBSnapshot::EnvOpenFile(DBSlice path, rocksdb::WritableFile** file) {
return FmtStatus("unsupported");
}

DBStatus DBSnapshot::EnvCloseFile(rocksdb::WritableFile* file) { return FmtStatus("unsupported"); }

DBStatus DBSnapshot::EnvSyncFile(rocksdb::WritableFile* file) { return FmtStatus("unsupported"); }

DBStatus DBSnapshot::EnvAppendFile(rocksdb::WritableFile* file, DBSlice contents) {
return FmtStatus("unsupported");
}

} // namespace cockroach
4 changes: 4 additions & 0 deletions c-deps/libroach/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ struct DBSnapshot : public DBEngine {
virtual DBStatus GetStats(DBStatsResult* stats);
virtual DBString GetCompactionStats();
virtual DBStatus EnvWriteFile(DBSlice path, DBSlice contents);
virtual DBStatus EnvOpenFile(DBSlice path, rocksdb::WritableFile** file);
virtual DBStatus EnvAppendFile(rocksdb::WritableFile* file, DBSlice contents);
virtual DBStatus EnvSyncFile(rocksdb::WritableFile* file);
virtual DBStatus EnvCloseFile(rocksdb::WritableFile* file);
};

} // namespace cockroach
25 changes: 24 additions & 1 deletion pkg/storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ type multiTestContext struct {
dbs []*client.DB
gossips []*gossip.Gossip
storePools []*storage.StorePool
dirCleanups []func()
caches []engine.RocksDBCache
// We use multiple stoppers so we can restart different parts of the
// test individually. transportStopper is for 'transport', and the
// 'stoppers' slice corresponds to the 'stores'.
Expand Down Expand Up @@ -339,6 +341,14 @@ func (m *multiTestContext) Stop() {
}
m.transportStopper.Stop(context.TODO())

for _, cleanup := range m.dirCleanups {
cleanup()
}

for _, cache := range m.caches {
cache.Release()
}

for _, s := range m.engineStoppers {
s.Stop(context.TODO())
}
Expand Down Expand Up @@ -693,7 +703,20 @@ func (m *multiTestContext) addStore(idx int) {
} else {
engineStopper := stop.NewStopper()
m.engineStoppers = append(m.engineStoppers, engineStopper)
eng = engine.NewInMem(roachpb.Attributes{}, 1<<20)

dir, cleanup := testutils.TempDir(m.t)
cache := engine.NewRocksDBCache(1 << 20)
var err error
eng, err = engine.NewRocksDB(engine.RocksDBConfig{
Dir: dir,
MustExist: false,
}, cache)
if err != nil {
m.t.Fatal(err)
}

m.dirCleanups = append(m.dirCleanups, cleanup)
m.caches = append(m.caches, cache)
engineStopper.AddCloser(eng)
m.engines = append(m.engines, eng)
needBootstrap = true
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ type Engine interface {
// that the key range is compacted all the way to the bottommost level of
// SSTables, which is necessary to pick up changes to bloom filters.
CompactRange(start, end roachpb.Key, forceBottommost bool) error
// OpenFile opens a DBFile with the given filename.
OpenFile(filename string) (DBFile, error)
}

// WithSSTables extends the Engine interface with a method to get info
Expand Down
42 changes: 42 additions & 0 deletions pkg/storage/engine/rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2616,6 +2616,16 @@ func (r *RocksDB) WriteFile(filename string, data []byte) error {
return statusToError(C.DBEnvWriteFile(r.rdb, goToCSlice([]byte(filename)), goToCSlice(data)))
}

// OpenFile opens a DBFile, which is essentially a rocksdb WritableFile
// with the given filename, in this RocksDB's env.
func (r *RocksDB) OpenFile(filename string) (DBFile, error) {
var file C.DBWritableFile
if err := statusToError(C.DBEnvOpenFile(r.rdb, goToCSlice([]byte(filename)), &file)); err != nil {
return nil, err
}
return &rocksdbFile{file: file, rdb: r.rdb}, nil
}

// IsValidSplitKey returns whether the key is a valid split key. Certain key
// ranges cannot be split (the meta1 span and the system DB span); split keys
// chosen within any of these ranges are considered invalid. And a split key
Expand Down Expand Up @@ -2661,3 +2671,35 @@ func mvccScanDecodeKeyValue(repr []byte) (key MVCCKey, value []byte, orepr []byt
key, err = DecodeKey(rawKey)
return key, value, repr, err
}

// DBFile is an interface for interacting with DBWritableFile in RocksDB.
type DBFile interface {
// Append appends data to this DBFile.
Append(data []byte) error
// Close closes this DBFile.
Close() error
// Sync synchronously flushes this DBFile's data to disk.
Sync() error
}

// rocksdbFile implements DBFile interface. It is used to interact with the
// DBWritableFile in the corresponding RocksDB env.
type rocksdbFile struct {
file C.DBWritableFile
rdb *C.DBEngine
}

// Append implements the DBFile interface.
func (f *rocksdbFile) Append(data []byte) error {
return statusToError(C.DBEnvAppendFile(f.rdb, f.file, goToCSlice(data)))
}

// Close implements the DBFile interface.
func (f *rocksdbFile) Close() error {
return statusToError(C.DBEnvCloseFile(f.rdb, f.file))
}

// Sync implements the DBFile interface.
func (f *rocksdbFile) Sync() error {
return statusToError(C.DBEnvSyncFile(f.rdb, f.file))
}
1 change: 1 addition & 0 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,7 @@ func (r *Replica) setReplicaIDRaftMuLockedMuLocked(replicaID roachpb.ReplicaID)
replicaID,
r.store.Engine().GetAuxiliaryDir(),
r.store.limiters.BulkIOWriteRate,
r.store.engine,
); err != nil {
return errors.Wrap(err, "while initializing sideloaded storage")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func addSSTablePreApply(
}
}

if err := writeFileSyncing(ctx, path, sst.Data, 0600, st, limiter); err != nil {
if err := writeFileSyncing(ctx, path, sst.Data, eng, 0600, st, limiter); err != nil {
log.Fatalf(ctx, "while ingesting %s: %s", path, err)
}
copied = true
Expand Down
14 changes: 10 additions & 4 deletions pkg/storage/replica_sideload_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/pkg/errors"
"golang.org/x/time/rate"
)
Expand All @@ -36,6 +37,7 @@ type diskSideloadStorage struct {
limiter *rate.Limiter
dir string
dirCreated bool
eng engine.Engine
}

func newDiskSideloadStorage(
Expand All @@ -44,6 +46,7 @@ func newDiskSideloadStorage(
replicaID roachpb.ReplicaID,
baseDir string,
limiter *rate.Limiter,
eng engine.Engine,
) (sideloadStorage, error) {
ss := &diskSideloadStorage{
dir: filepath.Join(
Expand All @@ -52,14 +55,15 @@ func newDiskSideloadStorage(
fmt.Sprintf("%d", rangeID%1000), // sharding
fmt.Sprintf("%d.%d", rangeID, replicaID),
),
eng: eng,
st: st,
limiter: limiter,
}
return ss, nil
}

func (ss *diskSideloadStorage) createDir() error {
err := os.MkdirAll(ss.dir, 0755)
func (ss *diskSideloadStorage) createDir(dir string) error {
err := os.MkdirAll(dir, 0755)
ss.dirCreated = ss.dirCreated || err == nil
return err
}
Expand All @@ -75,12 +79,14 @@ func (ss *diskSideloadStorage) Put(ctx context.Context, index, term uint64, cont
for {
// Use 0644 since that's what RocksDB uses:
// https://github.com/facebook/rocksdb/blob/56656e12d67d8a63f1e4c4214da9feeec2bd442b/env/env_posix.cc#L171
if err := writeFileSyncing(ctx, filename, contents, 0644, ss.st, ss.limiter); err == nil {
if err := writeFileSyncing(ctx, filename, contents, ss.eng, 0644, ss.st, ss.limiter); err == nil {
return nil
} else if !os.IsNotExist(err) {
return err
}
if err := ss.createDir(); err != nil {
pieces := strings.Split(filename, "/")
dir := strings.Join(pieces[:len(pieces)-1], "/")
if err := ss.createDir(dir); err != nil {
return err
}
continue
Expand Down
Loading