Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add namespace id to PageStorage interface #4175

Merged
merged 21 commits into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Server/DTTool/DTToolBench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ int benchEntry(const std::vector<std::string> & opts)
auto settings = DB::Settings();
auto db_context = env.getContext();
auto path_pool = std::make_unique<DB::StoragePathPool>(db_context->getPathPool().withTable("test", "t1", false));
auto storage_pool = std::make_unique<DB::DM::StoragePool>("test.t1", *path_pool, *db_context, db_context->getSettingsRef());
auto storage_pool = std::make_unique<DB::DM::StoragePool>("test.t1", /*table_id*/ 1, *path_pool, *db_context, db_context->getSettingsRef());
auto page_id_generator = std::make_unique<DB::DM::PageIdGenerator>();
auto dm_settings = DB::DM::DeltaMergeStore::Settings{};
auto dm_context = std::make_unique<DB::DM::DMContext>( //
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Server/tests/gtest_dttool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ struct DTToolTest : public DB::base::TiFlashStorageTestBasic
properties.push_back(property);
}
auto path_pool = std::make_unique<DB::StoragePathPool>(db_context->getPathPool().withTable("test", "t1", false));
auto storage_pool = std::make_unique<DB::DM::StoragePool>("test.t1", *path_pool, *db_context, db_context->getSettingsRef());
auto storage_pool = std::make_unique<DB::DM::StoragePool>("test.t1", /*table_id*/ 1, *path_pool, *db_context, db_context->getSettingsRef());
auto page_id_generator = std::make_unique<DB::DM::PageIdGenerator>();
auto dm_settings = DB::DM::DeltaMergeStore::Settings{};
auto dm_context = std::make_unique<DB::DM::DMContext>( //
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Server/tests/gtest_server_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ dt_page_gc_low_write_prob = 0.2

auto & global_ctx = TiFlashTestEnv::getGlobalContext();
std::unique_ptr<StoragePathPool> path_pool = std::make_unique<StoragePathPool>(global_ctx.getPathPool().withTable("test", "t1", false));
std::unique_ptr<DM::StoragePool> storage_pool = std::make_unique<DM::StoragePool>("test.t1", *path_pool, global_ctx, global_ctx.getSettingsRef());
std::unique_ptr<DM::StoragePool> storage_pool = std::make_unique<DM::StoragePool>("test.t1", /*table_id*/ 100, *path_pool, global_ctx, global_ctx.getSettingsRef());

auto verify_storage_pool_reload_config = [&global_ctx](std::unique_ptr<DM::StoragePool> & storage_pool) {
DB::Settings & settings = global_ctx.getSettingsRef();
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(DMContext & context, /
readIntBinary(valid_rows, buf);
readIntBinary(valid_bytes, buf);

auto file_id = context.storage_pool.data()->getNormalPageId(file_ref_id);
auto file_id = context.storage_pool.dataReader().getNormalPageId(file_ref_id);
auto file_parent_path = context.path_pool.getStableDiskDelegator().getDTFilePath(file_id);

auto dmfile = DMFile::restore(context.db_context.getFileProvider(), file_id, file_ref_id, file_parent_path, DMFile::ReadMetaMode::all());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ ColumnFilePersistedSet::ColumnFilePersistedSet(PageId metadata_id_, const Column

ColumnFilePersistedSetPtr ColumnFilePersistedSet::restore(DMContext & context, const RowKeyRange & segment_range, PageId id)
{
Page page = context.storage_pool.meta()->read(id, nullptr);
Page page = context.storage_pool.metaReader().read(id);
ReadBufferFromMemory buf(page.data.begin(), page.data.size());
auto column_files = deserializeSavedColumnFiles(context, segment_range, buf);
return std::make_shared<ColumnFilePersistedSet>(id, column_files);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ bool DeltaValueSpace::compact(DMContext & context)

// do compaction task
WriteBatches wbs(context.storage_pool, context.getWriteLimiter());
PageReader reader(context.storage_pool.log(), std::move(log_storage_snap), context.getReadLimiter());
PageReader reader(context.storage_pool.getNamespaceId(), context.storage_pool.log(), std::move(log_storage_snap), context.getReadLimiter());
compaction_task->prepare(context, wbs, reader);

{
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context,
bool data_path_contains_database_name,
const String & db_name_,
const String & table_name_,
TableID table_id_,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it make trouble for running tests under tests/delta-merge-test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For mock test, table_id_ in -1, and it's a valid value when convert to NamespaceId. But I still added some check to pass a TEST_NAMESPACE_ID when table_id_ is -1

const ColumnDefines & columns,
const ColumnDefine & handle,
bool is_common_handle_,
Expand All @@ -182,7 +183,7 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context,
: global_context(db_context.getGlobalContext())
, path_pool(global_context.getPathPool().withTable(db_name_, table_name_, data_path_contains_database_name))
, settings(settings_)
, storage_pool(db_name_ + "." + table_name_, path_pool, global_context, db_context.getSettingsRef())
, storage_pool(db_name_ + "." + table_name_, table_id_, path_pool, global_context, db_context.getSettingsRef())
, db_name(db_name_)
, table_name(table_name_)
, is_common_handle(is_common_handle_)
Expand Down Expand Up @@ -302,6 +303,7 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
}
}
};
callbacks.ns_id = storage_pool.getNamespaceId();
storage_pool.data()->registerExternalPagesCallbacks(callbacks);

gc_handle = background_pool.addTask([this] { return storage_pool.gc(global_context.getSettingsRef()); });
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ class DeltaMergeStore : private boost::noncopyable
bool data_path_contains_database_name,
const String & db_name,
const String & table_name_,
TableID table_id_,
const ColumnDefines & columns,
const ColumnDefine & handle,
bool is_common_handle_,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ SegmentPtr Segment::newSegment(

SegmentPtr Segment::restoreSegment(DMContext & context, PageId segment_id)
{
Page page = context.storage_pool.meta()->read(segment_id, nullptr); // not limit restore
Page page = context.storage_pool.metaReader().read(segment_id); // not limit restore

ReadBufferFromMemory buf(page.data.begin(), page.data.size());
SegmentFormat::Version version;
Expand Down
37 changes: 18 additions & 19 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ void StableValueSpace::setFiles(const DMFiles & files_, const RowKeyRange & rang

if (range.all())
{
for (auto & file : files_)
for (const auto & file : files_)
{
rows += file->getRows();
bytes += file->getBytes();
Expand All @@ -34,7 +34,7 @@ void StableValueSpace::setFiles(const DMFiles & files_, const RowKeyRange & rang
{
auto index_cache = dm_context->db_context.getGlobalContext().getMinMaxIndexCache();
auto hash_salt = dm_context->hash_salt;
for (auto & file : files_)
for (const auto & file : files_)
{
auto pack_filter = DMFilePackFilter::loadFrom(file,
index_cache,
Expand All @@ -61,7 +61,7 @@ void StableValueSpace::saveMeta(WriteBatch & meta_wb)
writeIntBinary(STORAGE_FORMAT_CURRENT.stable, buf);
writeIntBinary(valid_rows, buf);
writeIntBinary(valid_bytes, buf);
writeIntBinary((UInt64)files.size(), buf);
writeIntBinary(static_cast<UInt64>(files.size()), buf);
for (auto & f : files)
writeIntBinary(f->refId(), buf);

Expand All @@ -73,7 +73,7 @@ StableValueSpacePtr StableValueSpace::restore(DMContext & context, PageId id)
{
auto stable = std::make_shared<StableValueSpace>(id);

Page page = context.storage_pool.meta()->read(id, nullptr); // not limit restore
Page page = context.storage_pool.metaReader().read(id); // not limit restore
ReadBufferFromMemory buf(page.data.begin(), page.data.size());
UInt64 version, valid_rows, valid_bytes, size;
readIntBinary(version, buf);
Expand All @@ -88,7 +88,7 @@ StableValueSpacePtr StableValueSpace::restore(DMContext & context, PageId id)
{
readIntBinary(ref_id, buf);

auto file_id = context.storage_pool.data()->getNormalPageId(ref_id);
auto file_id = context.storage_pool.dataReader().getNormalPageId(ref_id);
auto file_parent_path = context.path_pool.getStableDiskDelegator().getDTFilePath(file_id);

auto dmfile = DMFile::restore(context.db_context.getFileProvider(), file_id, ref_id, file_parent_path, DMFile::ReadMetaMode::all());
Expand Down Expand Up @@ -124,7 +124,7 @@ size_t StableValueSpace::getBytesOnDisk() const
size_t StableValueSpace::getPacks() const
{
size_t packs = 0;
for (auto & file : files)
for (const auto & file : files)
packs += file->getPacks();
return packs;
}
Expand All @@ -147,7 +147,7 @@ void StableValueSpace::enableDMFilesGC()

void StableValueSpace::recordRemovePacksPages(WriteBatches & wbs) const
{
for (auto & file : files)
for (const auto & file : files)
{
// Here we should remove the ref id instead of file_id.
// Because a dmfile could be used by several segments, and only after all ref_ids are removed, then the file_id removed.
Expand All @@ -161,11 +161,10 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const
property.num_versions = 0;
property.num_puts = 0;
property.num_rows = 0;
for (size_t i = 0; i < files.size(); i++)
for (auto & file : files)
{
auto & file = files[i];
auto & pack_stats = file->getPackStats();
auto & pack_properties = file->getPackProperties();
const auto & pack_stats = file->getPackStats();
const auto & pack_properties = file->getPackProperties();
if (pack_stats.empty())
continue;
// if PackPropertys of this DMFile is empty, this must be an old format file generated by previous version.
Expand Down Expand Up @@ -233,7 +232,7 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const
{},
context.db_context.getFileProvider(),
context.getReadLimiter());
auto & use_packs = pack_filter.getUsePacks();
const auto & use_packs = pack_filter.getUsePacks();
size_t new_pack_properties_index = 0;
bool use_new_pack_properties = pack_properties.property_size() == 0;
if (use_new_pack_properties)
Expand All @@ -259,14 +258,14 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const
property.num_puts += pack_stats[pack_id].rows - pack_stats[pack_id].not_clean;
if (use_new_pack_properties)
{
auto & pack_property = new_pack_properties.property(new_pack_properties_index);
const auto & pack_property = new_pack_properties.property(new_pack_properties_index);
property.num_rows += pack_property.num_rows();
property.gc_hint_version = std::min(property.gc_hint_version, pack_property.gc_hint_version());
new_pack_properties_index += 1;
}
else
{
auto & pack_property = pack_properties.property(pack_id);
const auto & pack_property = pack_properties.property(pack_id);
property.num_rows += pack_property.num_rows();
property.gc_hint_version = std::min(property.gc_hint_version, pack_property.gc_hint_version());
}
Expand Down Expand Up @@ -337,7 +336,7 @@ SkippableBlockInputStreamPtr StableValueSpace::Snapshot::getInputStream(const DM
return std::make_shared<ConcatSkippableBlockInputStream>(streams);
}

RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext & context, const RowKeyRange & range)
RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext & context, const RowKeyRange & range) const
{
// Avoid unnessary reading IO
if (valid_rows == 0 || range.none())
Expand All @@ -361,8 +360,8 @@ RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext &
IdSetPtr{},
context.db_context.getFileProvider(),
context.getReadLimiter());
auto & pack_stats = f->getPackStats();
auto & use_packs = filter.getUsePacks();
const auto & pack_stats = f->getPackStats();
const auto & use_packs = filter.getUsePacks();
for (size_t i = 0; i < pack_stats.size(); ++i)
{
if (use_packs[i])
Expand All @@ -375,8 +374,8 @@ RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext &
}
if (!total_match_rows || !match_packs)
return {0, 0};
Float64 avg_pack_rows = total_match_rows / match_packs;
Float64 avg_pack_bytes = total_match_bytes / match_packs;
Float64 avg_pack_rows = static_cast<Float64>(total_match_rows) / match_packs;
Float64 avg_pack_bytes = static_cast<Float64>(total_match_bytes) / match_packs;
// By average, the first and last pack are only half covered by the range.
// And if this range only covers one pack, then return the pack's stat.
size_t approx_rows = std::max(avg_pack_rows, total_match_rows - avg_pack_rows / 2);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/StableValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class StableValueSpace : public std::enable_shared_from_this<StableValueSpace>
size_t expected_block_size,
bool enable_clean_read);

RowsAndBytes getApproxRowsAndBytes(const DMContext & context, const RowKeyRange & range);
RowsAndBytes getApproxRowsAndBytes(const DMContext & context, const RowKeyRange & range) const;

private:
Poco::Logger * log;
Expand Down
15 changes: 10 additions & 5 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ PageStorage::Config extractConfig(const Settings & settings, StorageType subtype
return config;
}

StoragePool::StoragePool(const String & name, StoragePathPool & path_pool, const Context & global_ctx, const Settings & settings)
: // The iops and bandwidth in log_storage are relatively high, use multi-disks if possible
StoragePool::StoragePool(const String & name, NamespaceId ns_id_, StoragePathPool & path_pool, const Context & global_ctx, const Settings & settings)
: ns_id(ns_id_)
,
// The iops and bandwidth in log_storage are relatively high, use multi-disks if possible
log_storage(PageStorage::create(name + ".log",
path_pool.getPSDiskDelegatorMulti("log"),
extractConfig(settings, StorageType::Log),
Expand All @@ -71,6 +73,9 @@ StoragePool::StoragePool(const String & name, StoragePathPool & path_pool, const
path_pool.getPSDiskDelegatorMulti("meta"),
extractConfig(settings, StorageType::Meta),
global_ctx.getFileProvider()))
, log_storage_reader(ns_id, log_storage, nullptr)
, data_storage_reader(ns_id, data_storage, nullptr)
, meta_storage_reader(ns_id, meta_storage, nullptr)
, global_context(global_ctx)
{}

Expand Down Expand Up @@ -120,9 +125,9 @@ bool StoragePool::gc(const Settings & settings, const Seconds & try_gc_period)

void PageIdGenerator::restore(const StoragePool & storage_pool)
{
max_log_page_id = storage_pool.log_storage->getMaxId();
max_data_page_id = storage_pool.data_storage->getMaxId();
max_meta_page_id = storage_pool.meta_storage->getMaxId();
max_log_page_id = storage_pool.log_storage_reader.getMaxId();
max_data_page_id = storage_pool.data_storage_reader.getMaxId();
max_meta_page_id = storage_pool.meta_storage_reader.getMaxId();
}

PageId PageIdGenerator::newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who)
Expand Down
33 changes: 29 additions & 4 deletions dbms/src/Storages/DeltaMerge/StoragePool.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,49 @@ class StoragePool : private boost::noncopyable
using Duration = Clock::duration;
using Seconds = std::chrono::seconds;

StoragePool(const String & name, StoragePathPool & path_pool, const Context & global_ctx, const Settings & settings);
StoragePool(const String & name, NamespaceId ns_id_, StoragePathPool & path_pool, const Context & global_ctx, const Settings & settings);

void restore();

NamespaceId getNamespaceId() const { return ns_id; }

PageStoragePtr log() { return log_storage; }
PageStoragePtr data() { return data_storage; }
PageStoragePtr meta() { return meta_storage; }

PageReader & logReader() { return log_storage_reader; }
PageReader & dataReader() { return data_storage_reader; }
PageReader & metaReader() { return meta_storage_reader; }

PageReader newLogReader(ReadLimiterPtr read_limiter, bool snapshot_read)
{
return PageReader(ns_id, log_storage, snapshot_read ? log_storage->getSnapshot() : nullptr, read_limiter);
}
PageReader newDataReader(ReadLimiterPtr read_limiter, bool snapshot_read)
{
return PageReader(ns_id, data_storage, snapshot_read ? data_storage->getSnapshot() : nullptr, read_limiter);
}
PageReader newMetaReader(ReadLimiterPtr read_limiter, bool snapshot_read)
{
return PageReader(ns_id, meta_storage, snapshot_read ? meta_storage->getSnapshot() : nullptr, read_limiter);
}

// Caller must cancel gc tasks before drop
void drop();

bool gc(const Settings & settings, const Seconds & try_gc_period = DELTA_MERGE_GC_PERIOD);

private:
NamespaceId ns_id;

PageStoragePtr log_storage;
PageStoragePtr data_storage;
PageStoragePtr meta_storage;

PageReader log_storage_reader;
PageReader data_storage_reader;
PageReader meta_storage_reader;

std::atomic<Timepoint> last_try_gc_time = Clock::now();

std::mutex mutex;
Expand Down Expand Up @@ -74,9 +99,9 @@ class PageIdGenerator : private boost::noncopyable
struct StorageSnapshot : private boost::noncopyable
{
StorageSnapshot(StoragePool & storage, ReadLimiterPtr read_limiter, bool snapshot_read = true)
: log_reader(storage.log(), snapshot_read ? storage.log()->getSnapshot() : nullptr, read_limiter)
, data_reader(storage.data(), snapshot_read ? storage.data()->getSnapshot() : nullptr, read_limiter)
, meta_reader(storage.meta(), snapshot_read ? storage.meta()->getSnapshot() : nullptr, read_limiter)
: log_reader(storage.newLogReader(read_limiter, snapshot_read))
, data_reader(storage.newDataReader(read_limiter, snapshot_read))
, meta_reader(storage.newMetaReader(read_limiter, snapshot_read))
{}

PageReader log_reader;
Expand Down
13 changes: 10 additions & 3 deletions dbms/src/Storages/DeltaMerge/WriteBatches.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace DM
{
struct WriteBatches : private boost::noncopyable
{
NamespaceId ns_id;
WriteBatch log;
WriteBatch data;
WriteBatch meta;
Expand All @@ -26,7 +27,13 @@ struct WriteBatches : private boost::noncopyable
WriteLimiterPtr write_limiter;

WriteBatches(StoragePool & storage_pool_, const WriteLimiterPtr & write_limiter_ = nullptr)
: storage_pool(storage_pool_)
: log(storage_pool_.getNamespaceId())
, data(storage_pool_.getNamespaceId())
, meta(storage_pool_.getNamespaceId())
, removed_log(storage_pool_.getNamespaceId())
, removed_data(storage_pool_.getNamespaceId())
, removed_meta(storage_pool_.getNamespaceId())
, storage_pool(storage_pool_)
, write_limiter(write_limiter_)
{
}
Expand Down Expand Up @@ -103,10 +110,10 @@ struct WriteBatches : private boost::noncopyable

void rollbackWrittenLogAndData()
{
WriteBatch log_wb;
WriteBatch log_wb(ns_id);
for (auto p : writtenLog)
log_wb.delPage(p);
WriteBatch data_wb;
WriteBatch data_wb(ns_id);
for (auto p : writtenData)
data_wb.delPage(p);

Expand Down
Loading