From befe83518adca50b75ea27e833fa7fd6523e655d Mon Sep 17 00:00:00 2001 From: Wenxuan Date: Fri, 16 Sep 2022 19:26:59 +0800 Subject: [PATCH] storage: Support transform SST to multiple DTFile (#5906) ref pingcap/tiflash#5237 --- .../SSTFilesToDTFilesOutputStream.cpp | 167 +++++++++++------ .../SSTFilesToDTFilesOutputStream.h | 106 +++++++++-- .../tests/gtest_sst_files_stream.cpp | 172 ++++++++++++++++++ .../Storages/Transaction/ApplySnapshot.cpp | 8 +- .../Storages/Transaction/PartitionStreams.h | 1 + 5 files changed, 378 insertions(+), 76 deletions(-) create mode 100644 dbms/src/Storages/DeltaMerge/tests/gtest_sst_files_stream.cpp diff --git a/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp b/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp index 216bf56b8f5..d1047431141 100644 --- a/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp @@ -15,8 +15,6 @@ #include #include #include -#include -#include #include #include #include @@ -24,63 +22,57 @@ #include #include #include -#include #include #include #include +#include + namespace DB { -namespace ErrorCodes -{ -extern const int ILLFORMAT_RAFT_ROW; -} // namespace ErrorCodes - namespace DM { -SSTFilesToDTFilesOutputStream::SSTFilesToDTFilesOutputStream( // - BoundedSSTFilesToBlockInputStreamPtr child_, + +template +SSTFilesToDTFilesOutputStream::SSTFilesToDTFilesOutputStream( // + ChildStream child_, StorageDeltaMergePtr storage_, DecodingStorageSchemaSnapshotConstPtr schema_snap_, TiDB::SnapshotApplyMethod method_, FileConvertJobType job_type_, - TMTContext & tmt_) + UInt64 split_after_rows_, + UInt64 split_after_size_, + Context & context_) : child(std::move(child_)) - , // - storage(std::move(storage_)) + , storage(std::move(storage_)) , schema_snap(std::move(schema_snap_)) , method(method_) , job_type(job_type_) - , tmt(tmt_) - , log(&Poco::Logger::get("SSTFilesToDTFilesOutputStream")) + , split_after_rows(split_after_rows_) + , split_after_size(split_after_size_) + , context(context_) + , log(Logger::get("SSTFilesToDTFilesOutputStream")) { } -SSTFilesToDTFilesOutputStream::~SSTFilesToDTFilesOutputStream() = default; +template +SSTFilesToDTFilesOutputStream::~SSTFilesToDTFilesOutputStream() = default; -void SSTFilesToDTFilesOutputStream::writePrefix() +template +void SSTFilesToDTFilesOutputStream::writePrefix() { child->readPrefix(); - - commit_rows = 0; + total_committed_rows = 0; + total_committed_bytes = 0; watch.start(); } -void SSTFilesToDTFilesOutputStream::writeSuffix() +template +void SSTFilesToDTFilesOutputStream::writeSuffix() { child->readSuffix(); - if (dt_stream != nullptr) - { - dt_stream->writeSuffix(); - auto dt_file = dt_stream->getFile(); - assert(!dt_file->canGC()); // The DTFile should not be able to gc until it is ingested. - // Add the DTFile to StoragePathPool so that we can restore it later - const auto bytes_written = dt_file->getBytesOnDisk(); - storage->getStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written); - - dt_stream.reset(); - } + finalizeDTFileStream(); const auto process_keys = child->getProcessKeys(); if (job_type == FileConvertJobType::ApplySnapshot) @@ -94,22 +86,44 @@ void SSTFilesToDTFilesOutputStream::writeSuffix() // Note that number of keys in different cf will be aggregated into one metrics GET_METRIC(tiflash_raft_process_keys, type_ingest_sst).Increment(process_keys.total()); } + LOG_FMT_INFO( log, - "Pre-handle snapshot {} to {} DTFiles, cost {}ms [rows={}] [write_cf_keys={}] [default_cf_keys={}] [lock_cf_keys={}]", + "Transformed snapshot in SSTFile to DTFiles, region={} job_type={} cost_ms={} rows={} bytes={} write_cf_keys={} default_cf_keys={} lock_cf_keys={} dt_files=[{}]", child->getRegion()->toString(true), - ingest_files.size(), + magic_enum::enum_name(job_type), watch.elapsedMilliseconds(), - commit_rows, + total_committed_rows, + total_committed_bytes, process_keys.write_cf, process_keys.default_cf, - process_keys.lock_cf); + process_keys.lock_cf, + [&] { + FmtBuffer fmt_buf; + fmt_buf.fmtAppend("files_num={} ", ingest_files.size()); + fmt_buf.joinStr( + ingest_files.begin(), + ingest_files.end(), + [](const DMFilePtr & file, FmtBuffer & fb) { fb.fmtAppend("dmf_{}", file->fileId()); }, + ","); + return fmt_buf.toString(); + }()); } -bool SSTFilesToDTFilesOutputStream::newDTFileStream() +template +bool SSTFilesToDTFilesOutputStream::newDTFileStream() { - // Generate a DMFilePtr and its DMFileBlockOutputStream - DMFileBlockOutputStream::Flags flags; + RUNTIME_CHECK(dt_stream == nullptr); + + // The parent_path and file_id are generated by the storage. + auto [parent_path, file_id] = storage->getStore()->preAllocateIngestFile(); + if (parent_path.empty()) + { + // Can not allocate path and id for storing DTFiles (the storage may be dropped / shutdown) + return false; + } + + DMFileBlockOutputStream::Flags flags{}; switch (method) { case TiDB::SnapshotApplyMethod::DTFile_Directory: @@ -122,28 +136,55 @@ bool SSTFilesToDTFilesOutputStream::newDTFileStream() break; } - // The parent_path and file_id are generated by the storage. - auto [parent_path, file_id] = storage->getStore()->preAllocateIngestFile(); - if (parent_path.empty()) + auto dt_file = DMFile::create(file_id, parent_path, flags.isSingleFile(), storage->createChecksumConfig(flags.isSingleFile())); + dt_stream = std::make_unique(context, dt_file, *(schema_snap->column_defines), flags); + dt_stream->writePrefix(); + ingest_files.emplace_back(dt_file); + committed_rows_this_dt_file = 0; + committed_bytes_this_dt_file = 0; + + LOG_FMT_DEBUG( + log, + "Create new DTFile for snapshot data, region={} file_idx={} file={}", + child->getRegion()->toString(true), + ingest_files.size() - 1, + dt_file->path()); + + return true; +} + +template +bool SSTFilesToDTFilesOutputStream::finalizeDTFileStream() +{ + if (unlikely(dt_stream == nullptr)) { - // Can no allocate path and id for storing DTFiles (the storage may be dropped / shutdown) + // Maybe error happened in `newDTFileStream`, or no data has been written since last finalize. return false; } - auto dt_file = DMFile::create(file_id, parent_path, flags.isSingleFile(), storage->createChecksumConfig(flags.isSingleFile())); + dt_stream->writeSuffix(); + auto dt_file = dt_stream->getFile(); + assert(!dt_file->canGC()); // The DTFile should not be able to gc until it is ingested. + // Add the DTFile to StoragePathPool so that we can restore it later + const auto bytes_written = dt_file->getBytesOnDisk(); + storage->getStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written); + dt_stream.reset(); + LOG_FMT_INFO( log, - "Create file for snapshot data {} [file={}] [single_file_mode={}]", + "Finished writing DTFile from snapshot data, region={} file_idx={} file_rows={} file_bytes={} file_bytes_on_disk={} file={}", child->getRegion()->toString(true), - dt_file->path(), - flags.isSingleFile()); - dt_stream = std::make_unique(tmt.getContext(), dt_file, *(schema_snap->column_defines), flags); - dt_stream->writePrefix(); - ingest_files.emplace_back(dt_file); + ingest_files.size() - 1, + committed_rows_this_dt_file, + committed_bytes_this_dt_file, + bytes_written, + dt_file->path()); + return true; } -void SSTFilesToDTFilesOutputStream::write() +template +void SSTFilesToDTFilesOutputStream::write() { size_t last_effective_num_rows = 0; size_t last_not_clean_rows = 0; @@ -164,9 +205,7 @@ void SSTFilesToDTFilesOutputStream::write() // If can not create DTFile stream (the storage may be dropped / shutdown), // break the writing loop. if (bool ok = newDTFileStream(); !ok) - { break; - } } { @@ -204,16 +243,26 @@ void SSTFilesToDTFilesOutputStream::write() property.effective_num_rows = cur_effective_num_rows - last_effective_num_rows; property.not_clean_rows = cur_not_clean_rows - last_not_clean_rows; property.deleted_rows = cur_deleted_rows - last_deleted_rows; - dt_stream->write(block, property); - - commit_rows += block.rows(); last_effective_num_rows = cur_effective_num_rows; last_not_clean_rows = cur_not_clean_rows; last_deleted_rows = cur_deleted_rows; + dt_stream->write(block, property); + + auto rows = block.rows(); + auto bytes = block.bytes(); + total_committed_rows += rows; + total_committed_bytes += bytes; + committed_rows_this_dt_file += rows; + committed_bytes_this_dt_file += bytes; + auto should_split_dt_file = ((split_after_rows > 0 && committed_rows_this_dt_file >= split_after_rows) || // + (split_after_size > 0 && committed_bytes_this_dt_file >= split_after_size)); + if (should_split_dt_file) + finalizeDTFileStream(); } } -PageIds SSTFilesToDTFilesOutputStream::ingestIds() const +template +PageIds SSTFilesToDTFilesOutputStream::ingestIds() const { PageIds ids; for (const auto & file : ingest_files) @@ -223,7 +272,8 @@ PageIds SSTFilesToDTFilesOutputStream::ingestIds() const return ids; } -void SSTFilesToDTFilesOutputStream::cancel() +template +void SSTFilesToDTFilesOutputStream::cancel() { // Try a lightweight cleanup the file generated by this stream (marking them able to be GC-ed). for (auto & file : ingest_files) @@ -239,5 +289,8 @@ void SSTFilesToDTFilesOutputStream::cancel() } } +template class SSTFilesToDTFilesOutputStream; +template class SSTFilesToDTFilesOutputStream; + } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.h b/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.h index d89d2421593..7d805aa583d 100644 --- a/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.h +++ b/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.h @@ -22,11 +22,6 @@ #include #include -namespace Poco -{ -class Logger; -} - namespace DB { class TMTContext; @@ -56,52 +51,131 @@ enum class FileConvertJobType IngestSST, }; - // This class is tightly coupling with BoundedSSTFilesToBlockInputStream // to get some info of the decoding process. +template class SSTFilesToDTFilesOutputStream : private boost::noncopyable { public: - SSTFilesToDTFilesOutputStream(BoundedSSTFilesToBlockInputStreamPtr child_, + /** + * When `split_after_rows` or `split_after_size` are > 0, multiple DTFiles will be produced. + * When `0` is specified for both parameters, only one DTFile will be produced. + * + * As the stream is processed by blocks, each DTFile is not ensured truncated at the specified + * rows or size: it is possible that one DTFile is significantly large, if a large Block + * is produced by the `child`. + * + * @param split_after_rows_ Split for a new DTFile when reaching specified rows. + * @param split_after_size_ Split for a new DTFile when reaching specified bytes. + */ + SSTFilesToDTFilesOutputStream(ChildStream child_, StorageDeltaMergePtr storage_, DecodingStorageSchemaSnapshotConstPtr schema_snap_, TiDB::SnapshotApplyMethod method_, FileConvertJobType job_type_, - TMTContext & tmt_); + UInt64 split_after_rows_, + UInt64 split_after_size_, + Context & context); ~SSTFilesToDTFilesOutputStream(); void writePrefix(); void writeSuffix(); void write(); + /** + * The DTFile page ids that can be ingested. + * The returned vector is ensured to be ordered in ascending order. + */ PageIds ingestIds() const; // Try to cleanup the files in `ingest_files` quickly. void cancel(); private: + /** + * Generate a DMFilePtr and its DMFileBlockOutputStream. + */ bool newDTFileStream(); - - // Stop the process for decoding committed data into DTFiles - void stop(); + /** + * Close the current DMFile stream. + */ + bool finalizeDTFileStream(); private: - BoundedSSTFilesToBlockInputStreamPtr child; + ChildStream child; StorageDeltaMergePtr storage; DecodingStorageSchemaSnapshotConstPtr schema_snap; const TiDB::SnapshotApplyMethod method; const FileConvertJobType job_type; - TMTContext & tmt; - Poco::Logger * log; + const UInt64 split_after_rows; + const UInt64 split_after_size; + Context & context; + LoggerPtr log; std::unique_ptr dt_stream; std::vector ingest_files; - size_t schema_sync_trigger_count = 0; - size_t commit_rows = 0; + /** + * How many rows has been committed to the current DTFile. + */ + size_t committed_rows_this_dt_file = 0; + size_t committed_bytes_this_dt_file = 0; + + /** + * How many rows has been committed so far. + */ + size_t total_committed_rows = 0; + size_t total_committed_bytes = 0; + Stopwatch watch; }; +class MockSSTFilesToDTFilesOutputStreamChild : private boost::noncopyable +{ +public: + MockSSTFilesToDTFilesOutputStreamChild(BlockInputStreamPtr mock_data_, RegionPtr mock_region_) // + : mock_data(mock_data_) + , mock_region(mock_region_) + {} + + void readPrefix() + { + mock_data->readPrefix(); + } + + void readSuffix() + { + mock_data->readSuffix(); + } + + RegionPtr getRegion() const + { + return mock_region; + } + + Block read() + { + return mock_data->read(); + } + + std::tuple getMvccStatistics() const + { + return {}; + } + + SSTFilesToBlockInputStream::ProcessKeys getProcessKeys() const + { + return {}; + } + +protected: + BlockInputStreamPtr mock_data; + RegionPtr mock_region; +}; + +using MockSSTFilesToDTFilesOutputStreamChildPtr = std::shared_ptr; + + } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_sst_files_stream.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_sst_files_stream.cpp new file mode 100644 index 00000000000..328ac443aa3 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_sst_files_stream.cpp @@ -0,0 +1,172 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ +namespace DM +{ +namespace tests +{ + +class SSTFilesToDTFilesOutputStreamTest + : public DB::base::TiFlashStorageTestBasic +{ +public: + void SetUp() override + { + pk_type = DMTestEnv::PkType::HiddenTiDBRowID; + mock_region = makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 1000)); + + TiFlashStorageTestBasic::SetUp(); + setupStorage(); + } + + void TearDown() override + { + storage->drop(); + db_context->getTMTContext().getStorages().remove(/* table id */ 100); + } + + void setupStorage() + { + auto columns = DM::tests::DMTestEnv::getDefaultTableColumns(pk_type); + auto table_info = DM::tests::DMTestEnv::getMinimalTableInfo(/* table id */ 100, pk_type); + auto astptr = DM::tests::DMTestEnv::getPrimaryKeyExpr("test_table", pk_type); + + storage = StorageDeltaMerge::create("TiFlash", + "default" /* db_name */, + "test_table" /* table_name */, + table_info, + ColumnsDescription{columns}, + astptr, + 0, + db_context->getGlobalContext()); + storage->startup(); + } + + BlockInputStreamPtr prepareBlocks(size_t begin, size_t end, size_t block_size) + { + RUNTIME_CHECK(end > begin); + + BlocksList list{}; + while (true) + { + if (begin >= end) + break; + auto this_block_size = std::min(end - begin, block_size); + auto block = DMTestEnv::prepareSimpleWriteBlock(begin, begin + this_block_size, false, pk_type, 2); + list.push_back(block); + begin += this_block_size; + } + + BlockInputStreamPtr stream = std::make_shared(std::move(list)); + return stream; + } + +protected: + StorageDeltaMergePtr storage; + RegionPtr mock_region; + DMTestEnv::PkType pk_type; +}; + + +TEST_F(SSTFilesToDTFilesOutputStreamTest, OutputSingleDTFile) +{ + auto table_lock = storage->lockStructureForShare("foo_query_id"); + auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false); + + auto mock_stream = std::make_shared(prepareBlocks(50, 100, /*block_size=*/5), mock_region); + auto stream = std::make_shared>( + mock_stream, + storage, + schema_snapshot, + TiDB::SnapshotApplyMethod::DTFile_Directory, + FileConvertJobType::ApplySnapshot, + /* split_after_rows */ 0, + /* split_after_size */ 0, + *db_context); + + stream->writePrefix(); + stream->write(); + stream->writeSuffix(); + auto ids = stream->ingestIds(); + ASSERT_EQ(1, ids.size()); +} + + +TEST_F(SSTFilesToDTFilesOutputStreamTest, OutputSingleDTFileWithOneBlock) +{ + auto table_lock = storage->lockStructureForShare("foo_query_id"); + auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false); + + auto mock_stream = std::make_shared(prepareBlocks(50, 100, /*block_size=*/1000), mock_region); + auto stream = std::make_shared>( + mock_stream, + storage, + schema_snapshot, + TiDB::SnapshotApplyMethod::DTFile_Directory, + FileConvertJobType::ApplySnapshot, + /* split_after_rows */ 1, + /* split_after_size */ 1, + *db_context); + + stream->writePrefix(); + stream->write(); + stream->writeSuffix(); + auto ids = stream->ingestIds(); + + // We expect to have only 1 DTFile, as there is only one block. + ASSERT_EQ(1, ids.size()); +} + + +TEST_F(SSTFilesToDTFilesOutputStreamTest, OutputMultipleDTFile) +{ + auto table_lock = storage->lockStructureForShare("foo_query_id"); + auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false); + + auto mock_stream = std::make_shared(prepareBlocks(50, 100, /*block_size=*/1), mock_region); + auto stream = std::make_shared>( + mock_stream, + storage, + schema_snapshot, + TiDB::SnapshotApplyMethod::DTFile_Directory, + FileConvertJobType::ApplySnapshot, + /* split_after_rows */ 10, + /* split_after_size */ 0, + *db_context); + + stream->writePrefix(); + stream->write(); + stream->writeSuffix(); + auto ids = stream->ingestIds(); + ASSERT_EQ(5, ids.size()); +} + + +} // namespace tests +} // namespace DM +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/Transaction/ApplySnapshot.cpp b/dbms/src/Storages/Transaction/ApplySnapshot.cpp index 0a64de37f94..e68c31613c5 100644 --- a/dbms/src/Storages/Transaction/ApplySnapshot.cpp +++ b/dbms/src/Storages/Transaction/ApplySnapshot.cpp @@ -296,7 +296,7 @@ std::vector KVStore::preHandleSSTsToDTFiles( { // If any schema changes is detected during decoding SSTs to DTFiles, we need to cancel and recreate DTFiles with // the latest schema. Or we will get trouble in `BoundedSSTFilesToBlockInputStream`. - std::shared_ptr stream; + std::shared_ptr> stream; try { // Get storage schema atomically, will do schema sync if the storage does not exists. @@ -328,13 +328,15 @@ std::vector KVStore::preHandleSSTsToDTFiles( tmt, expected_block_size); auto bounded_stream = std::make_shared(sst_stream, ::DB::TiDBPkColumnID, schema_snap); - stream = std::make_shared( + stream = std::make_shared>( bounded_stream, storage, schema_snap, snapshot_apply_method, job_type, - tmt); + /* split_after_rows */ 0, + /* split_after_size */ 0, + tmt.getContext()); stream->writePrefix(); stream->write(); diff --git a/dbms/src/Storages/Transaction/PartitionStreams.h b/dbms/src/Storages/Transaction/PartitionStreams.h index aa78942803f..1723d82a29f 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.h +++ b/dbms/src/Storages/Transaction/PartitionStreams.h @@ -24,6 +24,7 @@ namespace DB class Region; using RegionPtr = std::shared_ptr; class StorageDeltaMerge; +class TMTContext; std::tuple, DecodingStorageSchemaSnapshotConstPtr> // AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt);