Skip to content

Commit

Permalink
improve external page callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu committed Mar 16, 2022
1 parent 75f1531 commit f4ee341
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 19 deletions.
21 changes: 5 additions & 16 deletions dbms/src/Storages/Page/V3/PageStorageImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,10 @@ bool PageStorageImpl::gc(bool /*not_skip*/, const WriteLimiterPtr & write_limite
std::scoped_lock lock{callbacks_mutex};
if (!callbacks_container.empty())
{
for (const auto & callbacks : callbacks_container)
for (const auto & [ns_id, callbacks] : callbacks_container)
{
auto pending_external_pages = callbacks.scanner();
auto alive_external_ids = page_directory->getAliveExternalIds(callbacks.ns_id);
auto alive_external_ids = page_directory->getAliveExternalIds(ns_id);
callbacks.remover(pending_external_pages, alive_external_ids);
}
}
Expand Down Expand Up @@ -266,17 +266,14 @@ void PageStorageImpl::registerExternalPagesCallbacks(const ExternalPageCallbacks
assert(callbacks.scanner != nullptr);
assert(callbacks.remover != nullptr);
assert(callbacks.ns_id != MAX_NAMESPACE_ID);
callbacks_container.push_back(callbacks);
assert(callbacks_container.count(callbacks.ns_id) == 0);
callbacks_container.emplace(callbacks.ns_id, callbacks);
}

void PageStorageImpl::unregisterExternalPagesCallbacks(NamespaceId ns_id)
{
std::scoped_lock lock{callbacks_mutex};
callbacks_container.erase(
std::remove_if(callbacks_container.begin(),
callbacks_container.end(),
[&](const ExternalPageCallbacks & callbacks) { return callbacks.ns_id == ns_id; }),
callbacks_container.end());
callbacks_container.erase(ns_id);
}

const String PageStorageImpl::manifests_file_name = "manifests";
Expand All @@ -295,13 +292,5 @@ void PageStorageImpl::createManifestsFileIfNeed(const String & path)
file.createFile();
}

#ifndef NDEBUG
void PageStorageImpl::clearExternalPagesCallbacks()
{
std::scoped_lock lock{callbacks_mutex};
callbacks_container.clear();
}
#endif

} // namespace PS::V3
} // namespace DB
3 changes: 1 addition & 2 deletions dbms/src/Storages/Page/V3/PageStorageImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ class PageStorageImpl : public DB::PageStorage

#ifndef NDEBUG
// Just for tests, refactor them out later
void clearExternalPagesCallbacks();
void write(DB::WriteBatch && wb) { return write(std::move(wb), nullptr); }
DB::PageEntry getEntry(PageId page_id) { return getEntry(TEST_NAMESPACE_ID, page_id, nullptr); }
DB::Page read(PageId page_id) { return read(TEST_NAMESPACE_ID, page_id, nullptr, nullptr); }
Expand Down Expand Up @@ -86,7 +85,7 @@ class PageStorageImpl : public DB::PageStorage
const static String manifests_file_name;

std::mutex callbacks_mutex;
using ExternalPageCallbacksContainer = std::vector<ExternalPageCallbacks>;
using ExternalPageCallbacksContainer = std::unordered_map<NamespaceId, ExternalPageCallbacks>;
ExternalPageCallbacksContainer callbacks_container;
};

Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,11 @@ TEST_F(PageStorageTest, IngestFile)
page_storage->registerExternalPagesCallbacks(callbacks);
page_storage->gc();
ASSERT_EQ(times_remover_called, 1);
page_storage->gc();
ASSERT_EQ(times_remover_called, 2);
page_storage->unregisterExternalPagesCallbacks(callbacks.ns_id);
page_storage->gc();
ASSERT_EQ(times_remover_called, 2);
}

// TBD : enable after wal apply and restore
Expand Down Expand Up @@ -834,7 +839,7 @@ try
EXPECT_EQ(living_page_ids.size(), 1);
EXPECT_GT(living_page_ids.count(0), 0);
};
page_storage->clearExternalPagesCallbacks();
page_storage->unregisterExternalPagesCallbacks(callbacks.ns_id);
page_storage->registerExternalPagesCallbacks(callbacks);
{
SCOPED_TRACE("gc with snapshot released");
Expand Down

0 comments on commit f4ee341

Please sign in to comment.