Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support snapshot for direct write #2

Open
wants to merge 4 commits into
base: direct-write
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions dbms/src/Flash/TiFlashSnapshotHandler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/DAGQueryInfo.h>
#include <Flash/TiFlashSnapshotHandler.h>
#include <Interpreters/SQLQuerySource.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>
#include <Storages/DeltaMerge/File/DMFileBlockOutputStream.h>
#include <Storages/IManageableStorage.h>
#include <Storages/StorageDeltaMerge.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/TMTContext.h>

#include <iostream>

namespace DB
{

struct PreHandledTiFlashSnapshot
{
~PreHandledTiFlashSnapshot();
RegionPtr region;
std::string path;
};

PreHandledTiFlashSnapshot::~PreHandledTiFlashSnapshot()
{
std::cerr << "GC PreHandledTiFlashSnapshot success"
<< "\n";
}

struct TiFlashSnapshot
{
TiFlashSnapshot(const DM::ColumnDefines & write_columns_) : write_columns{write_columns_} {}
Pipeline pipeline;
const DM::ColumnDefines & write_columns;
~TiFlashSnapshot();
};

TiFlashSnapshot::~TiFlashSnapshot()
{
std::cerr << "GC TiFlashSnapshot success"
<< "\n";
}

PreHandledTiFlashSnapshot * TiFlashSnapshotHandler::preHandleTiFlashSnapshot(RegionPtr region, const String & path)
{
return new PreHandledTiFlashSnapshot{std::move(region), path};
}

void TiFlashSnapshotHandler::applyPreHandledTiFlashSnapshot(TMTContext * tmt, PreHandledTiFlashSnapshot * snap)
{
std::cerr << "applyPreHandledTiFlashSnapshot: " << snap->region->toString() << "\n";
auto & kvstore = tmt->getKVStore();
kvstore->handleApplySnapshot(snap->region, *tmt);

// TODO: check storage is not nullptr
auto table_id = snap->region->getMappedTableID();
auto storage = tmt->getStorages().get(table_id);
auto dm_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage);

auto snapshot_file = DM::DMFile::restore(tmt->getContext().getFileProvider(), snap->path);
auto column_cache = std::make_shared<DM::ColumnCache>();
DM::DMFileBlockInputStream stream(tmt->getContext(),
DM::MAX_UINT64,
false,
0,
snapshot_file,
dm_storage->getStore()->getTableColumns(),
DM::RowKeyRange::newAll(dm_storage->isCommonHandle(), 1),
DM::EMPTY_FILTER,
column_cache,
DM::IdSetPtr{});

auto settings = tmt->getContext().getSettingsRef();
stream.readPrefix();
while (auto block = stream.read())
dm_storage->write(std::move(block), settings);

stream.readSuffix();
}

TiFlashSnapshot * TiFlashSnapshotHandler::genTiFlashSnapshot(TMTContext * tmt, uint64_t region_id)
{
auto & kvstore = tmt->getKVStore();
// generate snapshot struct;
const RegionPtr region = kvstore->getRegion(region_id);
auto region_range = region->getRange();
auto table_id = region->getMappedTableID();
auto storage = tmt->getStorages().get(table_id);
Logger * log = &Logger::get("TiFlashSnapshotHandler");
if (storage == nullptr)
{
LOG_WARNING(log,
"genTiFlashSnapshot can not get table for region:" + region->toString() + " with table id: " + DB::toString(table_id)
+ ", ignored");
return new TiFlashSnapshot(DM::ColumnDefines{});
}

auto dm_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage);

auto * snapshot = new TiFlashSnapshot(dm_storage->getStore()->getTableColumns());
const Settings & settings = tmt->getContext().getSettingsRef();

SelectQueryInfo query_info;
// query_info.query is just a placeholder
String query_str = "SELECT 1";
SQLQuerySource query_src(query_str.data(), query_str.data() + query_str.size());
std::tie(std::ignore, query_info.query) = query_src.parse(0);
const ASTSelectWithUnionQuery & ast = typeid_cast<const ASTSelectWithUnionQuery &>(*query_info.query);
query_info.query = ast.list_of_selects->children[0];

auto mvcc_query_info = std::make_unique<MvccQueryInfo>();
mvcc_query_info->resolve_locks = true;
mvcc_query_info->read_tso = settings.read_tso;
RegionQueryInfo info;
{
info.region_id = region_id;
info.version = region->version();
info.conf_version = region->confVer();
info.range_in_table = region_range->rawKeys();
}
mvcc_query_info->regions_query_info.emplace_back(std::move(info));
query_info.mvcc_query_info = std::move(mvcc_query_info);

DAGPreparedSets dag_sets{};
query_info.dag_query = std::make_unique<DAGQueryInfo>(std::vector<const tipb::Expr *>{}, dag_sets, std::vector<NameAndTypePair>{});

QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
auto table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__);
Names required_columns = storage->getColumns().getNamesOfPhysical();
snapshot->pipeline.streams = storage->read(required_columns, query_info, tmt->getContext(), from_stage, settings.max_block_size, 1);
snapshot->pipeline.transform([&](auto & stream) { stream->addTableLock(table_lock); });
return snapshot;
}

SerializeTiFlashSnapshotRes TiFlashSnapshotHandler::serializeTiFlashSnapshotInto(
TMTContext * tmt, TiFlashSnapshot * snapshot, const String & path)
{
if (snapshot->write_columns.empty())
return {0, 0, 0};
auto snapshot_file = DM::DMFile::create(path);
uint64_t key_count = 0;
DM::DMFileBlockOutputStream dst_stream(tmt->getContext(), snapshot_file, snapshot->write_columns);
auto & src_stream = snapshot->pipeline.firstStream();
src_stream->readPrefix();
dst_stream.writePrefix();
while (auto block = src_stream->read())
{
key_count += block.rows();
dst_stream.write(block, 0);
}
src_stream->readSuffix();
dst_stream.writeSuffix();
Poco::File file(path);
uint64_t total_size = file.getSize();
// if key_count is 0, file will be deleted
return {1, key_count, total_size};
}

bool TiFlashSnapshotHandler::isTiFlashSnapshot(TMTContext * tmt, const String & path)
{
return DM::DMFile::isValidDMFileInSingleFileMode(tmt->getContext().getFileProvider(), path);
}

void TiFlashSnapshotHandler::deleteTiFlashSnapshot(TiFlashSnapshot * snap) { delete snap; }

void TiFlashSnapshotHandler::deletePreHandledTiFlashSnapshot(PreHandledTiFlashSnapshot * snap) { delete snap; }

} // namespace DB
39 changes: 39 additions & 0 deletions dbms/src/Flash/TiFlashSnapshotHandler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#include <Storages/Transaction/Region.h>

#include <cstdint>
#include <string>

namespace DB
{
class TMTContext;

struct PreHandledTiFlashSnapshot;
struct TiFlashSnapshot;

struct SerializeTiFlashSnapshotRes
{
uint8_t ok;
uint64_t key_count;
uint64_t total_size;
};
using String = std::string;

class TiFlashSnapshotHandler
{
public:
static PreHandledTiFlashSnapshot * preHandleTiFlashSnapshot(RegionPtr region, const String & path);

static void applyPreHandledTiFlashSnapshot(TMTContext * tmt, PreHandledTiFlashSnapshot * snap);

static TiFlashSnapshot * genTiFlashSnapshot(TMTContext * tmt, uint64_t region_id);

static SerializeTiFlashSnapshotRes serializeTiFlashSnapshotInto(TMTContext * tmt, TiFlashSnapshot * snapshot, const String & path);

static bool isTiFlashSnapshot(TMTContext * tmt, const String & path);

static void deleteTiFlashSnapshot(TiFlashSnapshot * snap);

static void deletePreHandledTiFlashSnapshot(PreHandledTiFlashSnapshot * snap);
};

} // namespace DB
3 changes: 0 additions & 3 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,16 @@
#include <Common/TiFlashBuildInfo.h>
#include <Common/TiFlashException.h>
#include <Common/config.h>
#include <Common/escapeForFileName.h>
#include <Common/getFQDNOrHostName.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Encryption/DataKeyManager.h>
#include <Encryption/FileProvider.h>
#include <Encryption/MockKeyManager.h>
#include <Flash/DiagnosticsService.h>
#include <Flash/FlashService.h>
#include <Functions/registerFunctions.h>
#include <IO/HTTPCommon.h>
#include <IO/ReadHelpers.h>
#include <IO/createReadBufferFromFileBase.h>
#include <Interpreters/AsynchronousMetrics.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/IDAsPathUpgrader.h>
Expand Down
57 changes: 51 additions & 6 deletions dbms/src/Storages/DeltaMerge/File/DMFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ String getPathByStatus(const String & parent_path, UInt64 file_id, DMFile::Statu

String DMFile::path() const
{
return getPathByStatus(parent_path, file_id, status);
return !file_path.empty() ? file_path : getPathByStatus(parent_path, file_id, status);
}

String DMFile::ngcPath() const
Expand Down Expand Up @@ -103,6 +103,29 @@ DMFilePtr DMFile::restore(const FileProviderPtr & file_provider, UInt64 file_id,
return dmfile;
}

DMFilePtr DMFile::create(const String & path)
{
Logger * log = &Logger::get("DMFile");

DMFilePtr new_dmfile(new DMFile(path, Mode::SINGLE_FILE, Status::WRITABLE, log));
Poco::File file(path);
if (file.exists())
{
file.remove(true);
LOG_WARNING(log, "Existing dmfile, removed :" << path);
}
PageUtil::touchFile(path);

return new_dmfile;
}

DMFilePtr DMFile::restore(const FileProviderPtr & file_provider, const String & path)
{
DMFilePtr dmfile(new DMFile(path, Mode::SINGLE_FILE, Status::READABLE, &Logger::get("DMFile")));
dmfile->readMeta(file_provider);
return dmfile;
}

String DMFile::colIndexCacheKey(const FileNameBase & file_name_base) const
{
if (isSingleFileMode())
Expand Down Expand Up @@ -144,7 +167,7 @@ bool DMFile::isColIndexExist(const ColId & col_id) const

const String DMFile::encryptionBasePath() const
{
return parent_path + "/" + FOLDER_PREFIX_READABLE + DB::toString(file_id);
return !file_path.empty() ? file_path : parent_path + "/" + FOLDER_PREFIX_READABLE + DB::toString(file_id);
}


Expand Down Expand Up @@ -339,6 +362,7 @@ void DMFile::finalize(const FileProviderPtr & file_provider)
void DMFile::finalize(WriteBuffer & buffer)
{
Footer footer;
footer.magic_number = DMFile::magic_number;
std::tie(footer.meta_pack_info.meta_offset, footer.meta_pack_info.meta_size) = writeMeta(buffer);
std::tie(footer.meta_pack_info.pack_stat_offset, footer.meta_pack_info.pack_stat_size) = writePack(buffer);
footer.sub_file_stat_offset = buffer.count();
Expand All @@ -357,14 +381,20 @@ void DMFile::finalize(WriteBuffer & buffer)
writeIntBinary(footer.sub_file_stat_offset, buffer);
writeIntBinary(footer.sub_file_num, buffer);
writeIntBinary(static_cast<std::underlying_type_t<DMSingleFileFormatVersion>>(footer.file_format_version), buffer);
writeIntBinary(footer.magic_number, buffer);
buffer.next();
if (status != Status::WRITING)
throw Exception("Expected WRITING status, now " + statusString(status));
Poco::File old_file(path());
Poco::File old_ngc_file(ngcPath());
status = Status::READABLE;

auto new_path = path();
auto old_path = path();
auto old_ngc_path = ngcPath();
status = Status::READABLE;
auto new_path = path();
// If the path is same, then this is a snapshot file, no need to rename the file
if (old_path == new_path)
return;
Poco::File old_file(old_path);
Poco::File old_ngc_file(old_ngc_path);
Poco::File file(new_path);
if (file.exists())
file.remove();
Expand Down Expand Up @@ -479,5 +509,20 @@ void DMFile::remove(const FileProviderPtr & file_provider)
}
}

bool DMFile::isValidDMFileInSingleFileMode(const FileProviderPtr & file_provider, const String & path)
{
Poco::File file(path);
if (!file.isFile())
return false;

MagicNumber number;
ReadBufferFromFileProvider buf(file_provider, path, EncryptionPath(path, ""));
buf.seek(file.getSize() - sizeof(MagicNumber), SEEK_SET);
DB::readIntBinary(number, buf);
return number == DMFile::magic_number;
}

const DMFile::MagicNumber DMFile::magic_number = 0x23579BDF48799ADE;

} // namespace DM
} // namespace DB
Loading