Skip to content

Commit

Permalink
read code
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyuecai committed May 26, 2024
1 parent 596718b commit eb1a67e
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 2 deletions.
1 change: 1 addition & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1681,6 +1681,7 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
ts_sz_for_record_.insert({id, ts_sz});
}
max_column_family_ = std::max(max_column_family_, id);
// 加入到双向链表中
// add to linked list
new_cfd->next_ = dummy_cfd_;
auto prev = dummy_cfd_->prev_;
Expand Down
5 changes: 5 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3356,6 +3356,7 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
s = ColumnFamilyData::ValidateOptions(db_options, cf_options);
if (s.ok()) {
for (auto& cf_path : cf_options.cf_paths) {
// 给 cf 创建目录
s = env_->CreateDirIfMissing(cf_path.path);
if (!s.ok()) {
break;
Expand All @@ -3370,16 +3371,20 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
{
InstrumentedMutexLock l(&mutex_);

// 所有的 ColumnFamilySet 都由 ColumnFamilySet 来管理
if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) !=
nullptr) {
// 如果 cf 已经存在了,返回用户错误
return Status::InvalidArgument("Column family already exists");
}
VersionEdit edit;
edit.AddColumnFamily(column_family_name);
uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
// 更新 VERSION 文件里面 cf 的信息记录
edit.SetColumnFamily(new_id);
edit.SetLogNumber(logfile_number_);
edit.SetComparatorName(cf_options.comparator->Name());
// TODO persist_user_defined_timestamps 是啥用处??
edit.SetPersistUserDefinedTimestamps(
cf_options.persist_user_defined_timestamps);

Expand Down
40 changes: 39 additions & 1 deletion db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Options SanitizeOptions(const std::string& dbname, const Options& src,
return Options(db_options, cf_options);
}

// 默认的参数
DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
bool read_only, Status* logger_creation_s) {
DBOptions result(src);
Expand Down Expand Up @@ -421,15 +422,22 @@ Status DBImpl::Recover(
return s;
}

// 检查CURRENT文件是否存在,或者找一个可用的MANIFEST文件
std::string current_fname = CurrentFileName(dbname_);
// Path to any MANIFEST file in the db dir. It does not matter which one.
// Since best-efforts recovery ignores CURRENT file, existence of a
// MANIFEST indicates the recovery to recover existing db. If no MANIFEST
// can be found, a new db will be created.
// db 目录中任何 MANIFEST 文件的路径。哪一个并不重要。
// 由于尽力恢复会忽略当前文件,因此
// MANIFEST 的存在表明恢复是为了恢复现有数据库。如果找不到 MANIFEST
// ,将创建一个新的数据库。
std::string manifest_path;
if (!immutable_db_options_.best_efforts_recovery) {
// 如果不是"尽最大努力恢复",则会要求 CURRENT 文件一定要存在
s = env_->FileExists(current_fname);
} else {
}
else {
s = Status::NotFound();
IOOptions io_opts;
io_opts.do_not_recurse = true;
Expand All @@ -455,13 +463,15 @@ Status DBImpl::Recover(
}
}
if (s.IsNotFound()) {
// 不存在则创建
if (immutable_db_options_.create_if_missing) {
s = NewDB(&files_in_dbname);
is_new_db = true;
if (!s.ok()) {
return s;
}
} else {
// 直接报错
return Status::InvalidArgument(
current_fname, "does not exist (create_if_missing is false)");
}
Expand Down Expand Up @@ -515,6 +525,7 @@ Status DBImpl::Recover(
assert(db_id_.empty());
Status s;
bool missing_table_file = false;
// 执行 Version 的恢复动作
if (!immutable_db_options_.best_efforts_recovery) {
s = versions_->Recover(column_families, read_only, &db_id_);
} else {
Expand Down Expand Up @@ -1760,11 +1771,15 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
}

Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
// DB 的配置和 CF 的配置是独立的,Options 同时继承了这俩类
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);

std::vector<ColumnFamilyDescriptor> column_families;
// 会自动将默认的 CF 进行创建
column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
// 如果要持久化 CF 的数据,会创建一个默认的 CF(___rocksdb_stats_history___) 来存储
if (db_options.persist_stats_to_disk) {
column_families.push_back(
ColumnFamilyDescriptor(kPersistentStatsColumnFamilyName, cf_options));
Expand All @@ -1787,13 +1802,21 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
return s;
}


// 源码学习资料:https://blog.csdn.net/xuhaitao23/article/details/121747616
// db_options: 配置/选项
// dbname: 数据库名称
// column_families: 要打开的column family
// handles: 打开的column family
// dbptr: 打开的数据库
Status DB::Open(const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
const bool kSeqPerBatch = true;
const bool kBatchPerTxn = true;
ThreadStatusUtil::SetEnableTracking(db_options.enable_thread_tracking);
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType::OP_DBOPEN);
// 实际上创建 RocksDB 的逻辑
Status s = DBImpl::Open(db_options, dbname, column_families, handles, dbptr,
!kSeqPerBatch, kBatchPerTxn);
ThreadStatusUtil::ResetThreadStatus();
Expand Down Expand Up @@ -1912,10 +1935,13 @@ IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
return io_s;
}

// 源码学习资料:https://blog.csdn.net/xuhaitao23/article/details/121747616
Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
const bool seq_per_batch, const bool batch_per_txn) {
// 检查配置参数是否合理
// TODO 待整理
Status s = ValidateOptionsByTable(db_options, column_families);
if (!s.ok()) {
return s;
Expand All @@ -1930,12 +1956,14 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
assert(handles);
handles->clear();

// 选择所有 CF 设置的最大的 max_write_buffer_size 作为最终的值
size_t max_write_buffer_size = 0;
for (auto cf : column_families) {
max_write_buffer_size =
std::max(max_write_buffer_size, cf.options.write_buffer_size);
}

// 创建 DBImpl 实例
DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch, batch_per_txn);
if (!impl->immutable_db_options_.info_log) {
s = impl->init_logger_creation_s_;
Expand All @@ -1944,6 +1972,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
} else {
assert(impl->init_logger_creation_s_.ok());
}

// 创建各个目录
s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.GetWalDir());
if (s.ok()) {
std::vector<std::string> paths;
Expand Down Expand Up @@ -1982,11 +2012,14 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,

// Handles create_if_missing, error_if_exists
uint64_t recovered_seq(kMaxSequenceNumber);

// 恢复历史状态,即从 WAL 中恢复数据(如果有需要的话)
s = impl->Recover(column_families, false /* read_only */,
false /* error_if_wal_file_exists */,
false /* error_if_data_exists_in_wals */, &recovered_seq,
&recovery_ctx);
if (s.ok()) {
// 创建一个新的wal(CreateWAL)
uint64_t new_log_number = impl->versions_->NewFileNumber();
log::Writer* new_log = nullptr;
const size_t preallocate_block_size =
Expand Down Expand Up @@ -2045,6 +2078,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
s = impl->InitPersistStatsColumnFamily();
}

// 初始化 CF
if (s.ok()) {
// set column family handles
for (auto cf : column_families) {
Expand Down Expand Up @@ -2074,6 +2108,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
}
}

// 初始化 InstallSuperVersionAndScheduleWork
if (s.ok()) {
SuperVersionContext sv_context(/* create_superversion */ true);
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
Expand All @@ -2088,6 +2123,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
s = impl->PersistentStatsProcessFormatVersion();
}

// 检查是否能够使用FIFO(compaction策略是FIFO时),是否支持merge
if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
if (!cfd->mem()->IsSnapshotSupported()) {
Expand All @@ -2110,6 +2146,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
if (s.ok()) {
// Persist RocksDB Options before scheduling the compaction.
// The WriteOptionsFile() will release and lock the mutex internally.
// 持久化存储 RocksDB 的参数配置
persist_options_status = impl->WriteOptionsFile(
false /*need_mutex_lock*/, false /*need_enter_write_thread*/);

Expand All @@ -2123,6 +2160,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
}
impl->mutex_.Unlock();

// 初始化 SSTFileManager
auto sfm = static_cast<SstFileManagerImpl*>(
impl->immutable_db_options_.sst_file_manager.get());
if (s.ok() && sfm) {
Expand Down
1 change: 1 addition & 0 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7101,6 +7101,7 @@ uint64_t VersionSet::GetObsoleteSstFilesSize() const {
return ret;
}

// 创建 CF 的核心逻辑
ColumnFamilyData* VersionSet::CreateColumnFamily(
const ColumnFamilyOptions& cf_options, const ReadOptions& read_options,
const VersionEdit* edit) {
Expand Down
25 changes: 24 additions & 1 deletion include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,9 @@ struct DBOptions {
// future, support for doing storage operations such as read/write files
// through env will be deprecated in favor of file_system (see below)
// Default: Env::Default()
// 使用指定的对象与环境交互,例如读取/写入文件、安排后台工作等。(即,所有和文件系统的交互,都是由 Env 对象来完成的)
// 在不久的将来,通过 env 执行读取/写入文件等存储操作的支持将被弃用,转而使用 file_system(见下文)
// Default: Env::Default()
Env* env = Env::Default();

// Limits internal file read/write bandwidth:
Expand Down Expand Up @@ -672,6 +675,7 @@ struct DBOptions {
bool use_fsync = false;

// A list of paths where SST files can be put into, with its target size.
// 位置越靠前的存储设备,使用的优先级越高
// Newer data is placed into paths specified earlier in the vector while
// older data gradually moves to paths specified later in the vector.
//
Expand All @@ -686,12 +690,15 @@ struct DBOptions {
// is slightly more than target size under some workloads. User should give
// some buffer room for those cases.
//
// 如果没有一个路径有足够的空间来放置文件,则无论目标大小如何,文件都会被放置到最后一个路径。
// If none of the paths has sufficient room to place a file, the file will
// be placed to the last path anyway, despite to the target size.
//
// 将较新的数据放置到较早的路径也是尽力而为。在某些极端情况下,用户应该期望用户文件被放置在更高的级别。
// Placing newer data to earlier paths is also best-efforts. User should
// expect user files to be placed in higher levels in some extreme cases.
//

// 如果留空,则仅使用一个路径,即打开数据库时传递的db_name。
// If left empty, only one path will be used, which is db_name passed when
// opening the DB.
// Default: empty
Expand Down Expand Up @@ -1315,13 +1322,19 @@ struct DBOptions {
// in DB files and return an error to the user, either at DB::Open time or
// later during DB operation. The exception to this policy is the WAL file,
// whose recovery is controlled by the wal_recovery_mode option.
// 默认情况下,RocksDB 将尝试检测数据库文件中的任何数据丢失或损坏
// ,并在 DB::Open 时或稍后在数据库操作期间向用户返回错误。此策略的例外是 WAL 文件, 其恢复由 wal_recovery_mode 选项控制。
//
// Best-efforts recovery (this option set to true) signals a preference for
// opening the DB to any point-in-time valid state for each column family,
// including the empty/new state, versus the default of returning non-WAL
// data losses to the user as errors. In terms of RocksDB user data, this
// is like applying WALRecoveryMode::kPointInTimeRecovery to each column
// family rather than just the WAL.
// 尽力恢复(此选项设置为 true)表示优先将数据库打开到每个列族的任何时间点有效状态,
// 包括空/新状态,而不是默认返回非 WAL 数据因错误而丢失给用户。就 RocksDB 用户数据而言,这
// 就像将 WALRecoveryMode::kPointInTimeRecovery 应用于每个列
// family 而不仅仅是 WAL。
//
// Best-efforts recovery (BER) is specifically designed to recover a DB with
// files that are missing or truncated to some smaller size, such as the
Expand All @@ -1331,6 +1344,9 @@ struct DBOptions {
// BER is not yet designed to produce a usable DB from other corruptions to
// DB files (which should generally be detectable by DB::VerifyChecksum()),
// and BER does not yet attempt to recover any WAL files.
// 尽力而为恢复带有缺失或截断为较小大小文件的数据库,例如不完整的数据库“物理”(文件系统)副本。
// BER 还可以检测到 SST 文件是否已被替换为大小相同的其他文件(假设在数据库清单中跟踪了 SST 唯一 ID)。
// BER 尚未设计用于从其他损坏的数据库文件(通常应可通过 DB::VerifyChecksum() 检测到)中生成可用的数据库,并且 BER 尚未尝试恢复任何 WAL 文件。
//
// For example, if an SST or blob file referenced by the MANIFEST is missing,
// BER might be able to find a set of files corresponding to an old "point in
Expand All @@ -1339,8 +1355,15 @@ struct DBOptions {
// either ignored or replaced with BER, or quietly fixed regardless of BER
// setting. BER does require at least one valid MANIFEST to recover to a
// non-trivial DB state, unlike `ldb repair`.
// 例如,如果 MANIFEST 引用的 SST 或 blob 文件丢失,
// BER 可能能够找到与列族的旧“point in
// time”版本相对应的一组文件,可能来自较旧的 MANIFEST
// 文件。一些其他类型的 DB 文件(例如 CURRENT、LOCK、IDENTITY)
// 要么被忽略,要么被 BER 替换,或者无论 BER
// 设置如何,都会悄悄修复。与“ldb 修复”不同,BER 确实需要至少一个有效的 MANIFEST 才能恢复到非平凡的数据库状态。
//
// Currently, best_efforts_recovery=true is not compatible with atomic flush.
// 目前,best_efforts_recovery=true 与原子刷新不兼容。
//
// Default: false
bool best_efforts_recovery = false;
Expand Down

0 comments on commit eb1a67e

Please sign in to comment.