Skip to content

Commit

Permalink
refine code
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu committed Feb 8, 2023
1 parent f041f4f commit b0705d9
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 140 deletions.
214 changes: 82 additions & 132 deletions dbms/src/Storages/DeltaMerge/WriteBatches.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/Page/V3/PageDirectory/PageIdTrait.h>
#include <Storages/Page/WriteBatchWrapper.h>

namespace DB
Expand Down Expand Up @@ -88,52 +87,46 @@ struct WriteBatches : private boost::noncopyable

void writeLogAndData()
{
PageIdU64s log_write_pages, data_write_pages;
switch (run_mode)
{
case PageStorageRunMode::UNI_PS:
if constexpr (DM_RUN_CHECK)
{
if constexpr (DM_RUN_CHECK)
auto check = [](const auto & wb, const String & what) {
if (wb.empty())
return;
for (const auto & w : wb.getWrites())
{
if (unlikely(w.type == WriteBatchWriteType::DEL))
throw Exception("Unexpected deletes in " + what);
}
LOG_TRACE(Logger::get(), "Write into {} : {}", what, wb.toString());
};
switch (run_mode)
{
case PageStorageRunMode::UNI_PS:
{
auto check = [](const UniversalWriteBatch & wb, const String & what) {
if (wb.empty())
return;
for (const auto & w : wb.getWrites())
{
if (unlikely(w.type == WriteBatchWriteType::DEL))
throw Exception("Unexpected deletes in " + what);
}
LOG_TRACE(Logger::get(), "Write into {} : {}", what, wb.toString());
};

check(log.getUniversalWriteBatch(), "log");
check(data.getUniversalWriteBatch(), "data");
}
default:
{
check(log.getWriteBatch(), "log");
check(data.getWriteBatch(), "data");
}
}
}

PageIdU64s log_write_pages, data_write_pages;
switch (run_mode)
{
case PageStorageRunMode::UNI_PS:
{
for (const auto & w : log.getUniversalWriteBatch().getWrites())
log_write_pages.push_back(PS::V3::universal::PageIdTrait::getU64ID(w.page_id));
log_write_pages.push_back(UniversalPageIdFormat::getU64ID(w.page_id));
for (const auto & w : data.getUniversalWriteBatch().getWrites())
data_write_pages.push_back(PS::V3::universal::PageIdTrait::getU64ID(w.page_id));
data_write_pages.push_back(UniversalPageIdFormat::getU64ID(w.page_id));
break;
}
default:
{
if constexpr (DM_RUN_CHECK)
{
auto check = [](const WriteBatch & wb, const String & what) {
if (wb.empty())
return;
for (const auto & w : wb.getWrites())
{
if (unlikely(w.type == WriteBatchWriteType::DEL))
throw Exception("Unexpected deletes in " + what);
}
LOG_TRACE(Logger::get(), "Write into {} : {}", what, wb.toString());
};

check(log.getWriteBatch(), "log");
check(data.getWriteBatch(), "data");
}

for (const auto & w : log.getWriteBatch().getWrites())
log_write_pages.push_back(w.page_id);
for (const auto & w : data.getWriteBatch().getWrites())
Expand Down Expand Up @@ -163,48 +156,34 @@ struct WriteBatches : private boost::noncopyable
for (auto p : written_data)
data_wb.delPage(p);

switch (run_mode)
{
case PageStorageRunMode::UNI_PS:
if constexpr (DM_RUN_CHECK)
{
if constexpr (DM_RUN_CHECK)
{
auto check = [](const UniversalWriteBatch & wb, const String & what) {
if (wb.empty())
return;
for (const auto & w : wb.getWrites())
{
if (unlikely(w.type != WriteBatchWriteType::DEL))
throw Exception("Expected deletes in " + what);
}
LOG_TRACE(Logger::get(), "Rollback remove from {} : {}", what, wb.toString());
};
auto check = [](const auto & wb, const String & what) {
if (wb.empty())
return;
for (const auto & w : wb.getWrites())
{
if (unlikely(w.type != WriteBatchWriteType::DEL))
throw Exception("Expected deletes in " + what);
}
LOG_TRACE(Logger::get(), "Rollback remove from {} : {}", what, wb.toString());
};

switch (run_mode)
{
case PageStorageRunMode::UNI_PS:
{
check(log_wb.getUniversalWriteBatch(), "log_wb");
check(data_wb.getUniversalWriteBatch(), "data_wb");
break;
}
break;
}
default:
{
if constexpr (DM_RUN_CHECK)
default:
{
auto check = [](const WriteBatch & wb, const String & what) {
if (wb.empty())
return;
for (const auto & w : wb.getWrites())
{
if (unlikely(w.type != WriteBatchWriteType::DEL))
throw Exception("Expected deletes in " + what);
}
LOG_TRACE(Logger::get(), "Rollback remove from {} : {}", what, wb.toString());
};

check(log_wb.getWriteBatch(), "log_wb");
check(data_wb.getWriteBatch(), "data_wb");
break;
}
}
break;
}
}

storage_pool.logWriter()->write(std::move(log_wb), write_limiter);
Expand All @@ -216,45 +195,30 @@ struct WriteBatches : private boost::noncopyable

void writeMeta()
{
switch (run_mode)
{
case PageStorageRunMode::UNI_PS:
if constexpr (DM_RUN_CHECK)
{
if constexpr (DM_RUN_CHECK)
auto check = [](const auto & wb, const String & what) {
if (wb.empty())
return;
for (const auto & w : wb.getWrites())
{
if (unlikely(w.type != WriteBatchWriteType::PUT))
throw Exception("Expected puts in " + what);
}
LOG_TRACE(Logger::get(), "Write into {} : {}", what, wb.toString());
};
switch (run_mode)
{
case PageStorageRunMode::UNI_PS:
{
auto check = [](const UniversalWriteBatch & wb, const String & what) {
if (wb.empty())
return;
for (const auto & w : wb.getWrites())
{
if (unlikely(w.type != WriteBatchWriteType::PUT))
throw Exception("Expected puts in " + what);
}
LOG_TRACE(Logger::get(), "Write into {} : {}", what, wb.toString());
};

check(meta.getUniversalWriteBatch(), "meta");
break;
}
break;
}
default:
{
if constexpr (DM_RUN_CHECK)
default:
{
auto check = [](const WriteBatch & wb, const String & what) {
if (wb.empty())
return;
for (const auto & w : wb.getWrites())
{
if (unlikely(w.type != WriteBatchWriteType::PUT))
throw Exception("Expected puts in " + what);
}
LOG_TRACE(Logger::get(), "Write into {} : {}", what, wb.toString());
};

check(meta.getWriteBatch(), "meta");
}
}
}
}

storage_pool.metaWriter()->write(std::move(meta), write_limiter);
Expand All @@ -263,48 +227,34 @@ struct WriteBatches : private boost::noncopyable

void writeRemoves()
{
switch (run_mode)
{
case PageStorageRunMode::UNI_PS:
if constexpr (DM_RUN_CHECK)
{
if constexpr (DM_RUN_CHECK)
{
auto check = [](const UniversalWriteBatch & wb, const String & what) {
if (wb.empty())
return;
for (const auto & w : wb.getWrites())
{
if (unlikely(w.type != WriteBatchWriteType::DEL))
throw Exception("Expected deletes in " + what);
}
LOG_TRACE(Logger::get(), "Write into {} : {}", what, wb.toString());
};
auto check = [](const auto & wb, const String & what) {
if (wb.empty())
return;
for (const auto & w : wb.getWrites())
{
if (unlikely(w.type != WriteBatchWriteType::DEL))
throw Exception("Expected deletes in " + what);
}
LOG_TRACE(Logger::get(), "Write into {} : {}", what, wb.toString());
};

switch (run_mode)
{
case PageStorageRunMode::UNI_PS:
{
check(removed_log.getUniversalWriteBatch(), "removed_log");
check(removed_data.getUniversalWriteBatch(), "removed_data");
check(removed_meta.getUniversalWriteBatch(), "removed_meta");
}
}
default:
{
if constexpr (DM_RUN_CHECK)
default:
{
auto check = [](const WriteBatch & wb, const String & what) {
if (wb.empty())
return;
for (const auto & w : wb.getWrites())
{
if (unlikely(w.type != WriteBatchWriteType::DEL))
throw Exception("Expected deletes in " + what);
}
LOG_TRACE(Logger::get(), "Write into {} : {}", what, wb.toString());
};

check(removed_log.getWriteBatch(), "removed_log");
check(removed_data.getWriteBatch(), "removed_data");
check(removed_meta.getWriteBatch(), "removed_meta");
}
}
}
}

storage_pool.logWriter()->write(std::move(removed_log), write_limiter);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/FormatVersion.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ inline static constexpr Version V2 = 2;
// - If we already have V2 data in disk. It will turn PageStorage into MIX_MODE
// - If we don't have any v2 data in disk. It will turn PageStorage into ONLY_V3
inline static constexpr Version V3 = 3;
// Store all data in one instance.
// Store all data in one ps instance.
inline static constexpr Version V4 = 4;
} // namespace PageFormat

Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/Page/PageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ class PageReaderImplUniversal : public PageReaderImpl
PageMapU64 read(const std::vector<PageReadFields> & page_fields) const override
{
std::vector<UniversalPageStorage::PageReadFields> us_page_fields;
us_page_fields.reserve(page_fields.size());
for (const auto & f : page_fields)
{
us_page_fields.emplace_back(UniversalPageIdFormat::toFullPageId(prefix, f.first), f.second);
Expand All @@ -402,7 +403,7 @@ class PageReaderImplUniversal : public PageReaderImpl

PageIdU64 getNormalPageId(PageIdU64 page_id) const override
{
return PS::V3::universal::PageIdTrait::getU64ID(storage->getNormalPageId(UniversalPageIdFormat::toFullPageId(prefix, page_id), snap));
return UniversalPageIdFormat::getU64ID(storage->getNormalPageId(UniversalPageIdFormat::toFullPageId(prefix, page_id), snap));
}

PageEntry getPageEntry(PageIdU64 page_id) const override
Expand Down Expand Up @@ -808,7 +809,7 @@ bool PageWriter::gc(bool not_skip, const WriteLimiterPtr & write_limiter, const
}
case PageStorageRunMode::UNI_PS:
{
return uni_ps->gc(not_skip, write_limiter, read_limiter);
return false;
}
default:
throw Exception(fmt::format("Unknown PageStorageRunMode {}", static_cast<UInt8>(run_mode)), ErrorCodes::LOGICAL_ERROR);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1365,7 +1365,7 @@ void PageDirectory<Trait>::apply(PageEntriesEdit && edit, const WriteLimiterPtr
if constexpr (std::is_same_v<Trait, universal::PageDirectoryTrait>)
{
// TODO: the value is just used at restart, if it slows down the performance, we can avoid calculating it at run time.
for (auto iter = max_page_id_by_prefix.begin(); iter != max_page_id_by_prefix.end(); ++iter)
for (const auto & iter = max_page_id_by_prefix.begin(); iter != max_page_id_by_prefix.end(); ++iter)
{
if (r.page_id.hasPrefix(iter->first))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
} // namespace ErrorCodes

// Adapt UniversalWriteBatch to WriteBatch interface
class UniversalWriteBatchAdaptor : private boost::noncopyable
{
public:
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Page/WriteBatchWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ enum class PageStorageRunMode : UInt8
UNI_PS = 4,
};

// It contains either an UniversalWriteBatchAdaptor or a WriteBatch.
class WriteBatchWrapper : private boost::noncopyable
{
public:
Expand Down
6 changes: 2 additions & 4 deletions dbms/src/Storages/Transaction/RegionPersister.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ void RegionPersister::drop(RegionID region_id, const RegionTaskLock &)
{
if (page_writer)
{
std::variant<String, NamespaceId> prefix = ns_id;
DB::WriteBatchWrapper wb_v2{run_mode, std::move(prefix)};
DB::WriteBatchWrapper wb_v2{run_mode, getWriteBatchPrefix()};
wb_v2.delPage(region_id);
page_writer->write(std::move(wb_v2), global_context.getWriteLimiter());
}
Expand Down Expand Up @@ -131,8 +130,7 @@ void RegionPersister::doPersist(RegionCacheWriteElement & region_write_buffer, c
auto read_buf = buffer.tryGetReadBuffer();
if (page_writer)
{
std::variant<String, NamespaceId> prefix = ns_id;
DB::WriteBatchWrapper wb{run_mode, std::move(prefix)};
DB::WriteBatchWrapper wb{run_mode, getWriteBatchPrefix()};
wb.putPage(region_id, applied_index, read_buf, region_size);
page_writer->write(std::move(wb), global_context.getWriteLimiter());
}
Expand Down
14 changes: 14 additions & 0 deletions dbms/src/Storages/Transaction/RegionPersister.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,22 @@ class RegionPersister final : private boost::noncopyable
void doPersist(RegionCacheWriteElement & region_write_buffer, const RegionTaskLock & lock, const Region & region);
void doPersist(const Region & region, const RegionTaskLock * lock);

private:
inline std::variant<String, NamespaceId> getWriteBatchPrefix() const
{
switch (run_mode)
{
case PageStorageRunMode::UNI_PS:
return UniversalPageIdFormat::toStorageSubPrefix(StorageType::KVStore);
default:
return ns_id;
}
}

#ifndef DBMS_PUBLIC_GTEST
private:
#else
public:
#endif

Context & global_context;
Expand Down

0 comments on commit b0705d9

Please sign in to comment.