Skip to content

Commit

Permalink
storage: Support transform SST to multiple DTFile (#5906)
Browse files Browse the repository at this point in the history
ref #5237
  • Loading branch information
breezewish committed Sep 16, 2022
1 parent 7cd0b02 commit befe835
Show file tree
Hide file tree
Showing 5 changed files with 378 additions and 76 deletions.
167 changes: 110 additions & 57 deletions dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,72 +15,64 @@
#include <Common/ProfileEvents.h>
#include <Common/TiFlashMetrics.h>
#include <Interpreters/Context.h>
#include <Poco/File.h>
#include <RaftStoreProxyFFI/ColumnFamily.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/File/DMFileBlockOutputStream.h>
#include <Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.h>
#include <Storages/StorageDeltaMerge.h>
#include <Storages/Transaction/PartitionStreams.h>
#include <Storages/Transaction/ProxyFFI.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/SSTReader.h>
#include <Storages/Transaction/TMTContext.h>
#include <common/logger_useful.h>

#include <magic_enum.hpp>

namespace DB
{
namespace ErrorCodes
{
extern const int ILLFORMAT_RAFT_ROW;
} // namespace ErrorCodes

namespace DM
{
SSTFilesToDTFilesOutputStream::SSTFilesToDTFilesOutputStream( //
BoundedSSTFilesToBlockInputStreamPtr child_,

template <typename ChildStream>
SSTFilesToDTFilesOutputStream<ChildStream>::SSTFilesToDTFilesOutputStream( //
ChildStream child_,
StorageDeltaMergePtr storage_,
DecodingStorageSchemaSnapshotConstPtr schema_snap_,
TiDB::SnapshotApplyMethod method_,
FileConvertJobType job_type_,
TMTContext & tmt_)
UInt64 split_after_rows_,
UInt64 split_after_size_,
Context & context_)
: child(std::move(child_))
, //
storage(std::move(storage_))
, storage(std::move(storage_))
, schema_snap(std::move(schema_snap_))
, method(method_)
, job_type(job_type_)
, tmt(tmt_)
, log(&Poco::Logger::get("SSTFilesToDTFilesOutputStream"))
, split_after_rows(split_after_rows_)
, split_after_size(split_after_size_)
, context(context_)
, log(Logger::get("SSTFilesToDTFilesOutputStream"))
{
}

SSTFilesToDTFilesOutputStream::~SSTFilesToDTFilesOutputStream() = default;
template <typename ChildStream>
SSTFilesToDTFilesOutputStream<ChildStream>::~SSTFilesToDTFilesOutputStream() = default;

void SSTFilesToDTFilesOutputStream::writePrefix()
template <typename ChildStream>
void SSTFilesToDTFilesOutputStream<ChildStream>::writePrefix()
{
child->readPrefix();

commit_rows = 0;
total_committed_rows = 0;
total_committed_bytes = 0;
watch.start();
}

void SSTFilesToDTFilesOutputStream::writeSuffix()
template <typename ChildStream>
void SSTFilesToDTFilesOutputStream<ChildStream>::writeSuffix()
{
child->readSuffix();

if (dt_stream != nullptr)
{
dt_stream->writeSuffix();
auto dt_file = dt_stream->getFile();
assert(!dt_file->canGC()); // The DTFile should not be able to gc until it is ingested.
// Add the DTFile to StoragePathPool so that we can restore it later
const auto bytes_written = dt_file->getBytesOnDisk();
storage->getStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written);

dt_stream.reset();
}
finalizeDTFileStream();

const auto process_keys = child->getProcessKeys();
if (job_type == FileConvertJobType::ApplySnapshot)
Expand All @@ -94,22 +86,44 @@ void SSTFilesToDTFilesOutputStream::writeSuffix()
// Note that number of keys in different cf will be aggregated into one metrics
GET_METRIC(tiflash_raft_process_keys, type_ingest_sst).Increment(process_keys.total());
}

LOG_FMT_INFO(
log,
"Pre-handle snapshot {} to {} DTFiles, cost {}ms [rows={}] [write_cf_keys={}] [default_cf_keys={}] [lock_cf_keys={}]",
"Transformed snapshot in SSTFile to DTFiles, region={} job_type={} cost_ms={} rows={} bytes={} write_cf_keys={} default_cf_keys={} lock_cf_keys={} dt_files=[{}]",
child->getRegion()->toString(true),
ingest_files.size(),
magic_enum::enum_name(job_type),
watch.elapsedMilliseconds(),
commit_rows,
total_committed_rows,
total_committed_bytes,
process_keys.write_cf,
process_keys.default_cf,
process_keys.lock_cf);
process_keys.lock_cf,
[&] {
FmtBuffer fmt_buf;
fmt_buf.fmtAppend("files_num={} ", ingest_files.size());
fmt_buf.joinStr(
ingest_files.begin(),
ingest_files.end(),
[](const DMFilePtr & file, FmtBuffer & fb) { fb.fmtAppend("dmf_{}", file->fileId()); },
",");
return fmt_buf.toString();
}());
}

bool SSTFilesToDTFilesOutputStream::newDTFileStream()
template <typename ChildStream>
bool SSTFilesToDTFilesOutputStream<ChildStream>::newDTFileStream()
{
// Generate a DMFilePtr and its DMFileBlockOutputStream
DMFileBlockOutputStream::Flags flags;
RUNTIME_CHECK(dt_stream == nullptr);

// The parent_path and file_id are generated by the storage.
auto [parent_path, file_id] = storage->getStore()->preAllocateIngestFile();
if (parent_path.empty())
{
// Can not allocate path and id for storing DTFiles (the storage may be dropped / shutdown)
return false;
}

DMFileBlockOutputStream::Flags flags{};
switch (method)
{
case TiDB::SnapshotApplyMethod::DTFile_Directory:
Expand All @@ -122,28 +136,55 @@ bool SSTFilesToDTFilesOutputStream::newDTFileStream()
break;
}

// The parent_path and file_id are generated by the storage.
auto [parent_path, file_id] = storage->getStore()->preAllocateIngestFile();
if (parent_path.empty())
auto dt_file = DMFile::create(file_id, parent_path, flags.isSingleFile(), storage->createChecksumConfig(flags.isSingleFile()));
dt_stream = std::make_unique<DMFileBlockOutputStream>(context, dt_file, *(schema_snap->column_defines), flags);
dt_stream->writePrefix();
ingest_files.emplace_back(dt_file);
committed_rows_this_dt_file = 0;
committed_bytes_this_dt_file = 0;

LOG_FMT_DEBUG(
log,
"Create new DTFile for snapshot data, region={} file_idx={} file={}",
child->getRegion()->toString(true),
ingest_files.size() - 1,
dt_file->path());

return true;
}

template <typename ChildStream>
bool SSTFilesToDTFilesOutputStream<ChildStream>::finalizeDTFileStream()
{
if (unlikely(dt_stream == nullptr))
{
// Can no allocate path and id for storing DTFiles (the storage may be dropped / shutdown)
// Maybe error happened in `newDTFileStream`, or no data has been written since last finalize.
return false;
}

auto dt_file = DMFile::create(file_id, parent_path, flags.isSingleFile(), storage->createChecksumConfig(flags.isSingleFile()));
dt_stream->writeSuffix();
auto dt_file = dt_stream->getFile();
assert(!dt_file->canGC()); // The DTFile should not be able to gc until it is ingested.
// Add the DTFile to StoragePathPool so that we can restore it later
const auto bytes_written = dt_file->getBytesOnDisk();
storage->getStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written);
dt_stream.reset();

LOG_FMT_INFO(
log,
"Create file for snapshot data {} [file={}] [single_file_mode={}]",
"Finished writing DTFile from snapshot data, region={} file_idx={} file_rows={} file_bytes={} file_bytes_on_disk={} file={}",
child->getRegion()->toString(true),
dt_file->path(),
flags.isSingleFile());
dt_stream = std::make_unique<DMFileBlockOutputStream>(tmt.getContext(), dt_file, *(schema_snap->column_defines), flags);
dt_stream->writePrefix();
ingest_files.emplace_back(dt_file);
ingest_files.size() - 1,
committed_rows_this_dt_file,
committed_bytes_this_dt_file,
bytes_written,
dt_file->path());

return true;
}

void SSTFilesToDTFilesOutputStream::write()
template <typename ChildStream>
void SSTFilesToDTFilesOutputStream<ChildStream>::write()
{
size_t last_effective_num_rows = 0;
size_t last_not_clean_rows = 0;
Expand All @@ -164,9 +205,7 @@ void SSTFilesToDTFilesOutputStream::write()
// If can not create DTFile stream (the storage may be dropped / shutdown),
// break the writing loop.
if (bool ok = newDTFileStream(); !ok)
{
break;
}
}

{
Expand Down Expand Up @@ -204,16 +243,26 @@ void SSTFilesToDTFilesOutputStream::write()
property.effective_num_rows = cur_effective_num_rows - last_effective_num_rows;
property.not_clean_rows = cur_not_clean_rows - last_not_clean_rows;
property.deleted_rows = cur_deleted_rows - last_deleted_rows;
dt_stream->write(block, property);

commit_rows += block.rows();
last_effective_num_rows = cur_effective_num_rows;
last_not_clean_rows = cur_not_clean_rows;
last_deleted_rows = cur_deleted_rows;
dt_stream->write(block, property);

auto rows = block.rows();
auto bytes = block.bytes();
total_committed_rows += rows;
total_committed_bytes += bytes;
committed_rows_this_dt_file += rows;
committed_bytes_this_dt_file += bytes;
auto should_split_dt_file = ((split_after_rows > 0 && committed_rows_this_dt_file >= split_after_rows) || //
(split_after_size > 0 && committed_bytes_this_dt_file >= split_after_size));
if (should_split_dt_file)
finalizeDTFileStream();
}
}

PageIds SSTFilesToDTFilesOutputStream::ingestIds() const
template <typename ChildStream>
PageIds SSTFilesToDTFilesOutputStream<ChildStream>::ingestIds() const
{
PageIds ids;
for (const auto & file : ingest_files)
Expand All @@ -223,7 +272,8 @@ PageIds SSTFilesToDTFilesOutputStream::ingestIds() const
return ids;
}

void SSTFilesToDTFilesOutputStream::cancel()
template <typename ChildStream>
void SSTFilesToDTFilesOutputStream<ChildStream>::cancel()
{
// Try a lightweight cleanup the file generated by this stream (marking them able to be GC-ed).
for (auto & file : ingest_files)
Expand All @@ -239,5 +289,8 @@ void SSTFilesToDTFilesOutputStream::cancel()
}
}

template class SSTFilesToDTFilesOutputStream<BoundedSSTFilesToBlockInputStreamPtr>;
template class SSTFilesToDTFilesOutputStream<MockSSTFilesToDTFilesOutputStreamChildPtr>;

} // namespace DM
} // namespace DB
Loading

0 comments on commit befe835

Please sign in to comment.