diff --git a/contrib/tiflash-proxy b/contrib/tiflash-proxy index a03434d3758..e46e88799f3 160000 --- a/contrib/tiflash-proxy +++ b/contrib/tiflash-proxy @@ -1 +1 @@ -Subproject commit a03434d3758bee4fa335ce87da5f772eebe8f9cc +Subproject commit e46e88799f383bbd10d6508da74a625053ab78e5 diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 7ebbb813ef8..1c74b5f402e 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -1650,10 +1650,8 @@ bool Context::initializeGlobalStoragePoolIfNeed(const PathPool & path_pool) auto lock = getLock(); if (shared->global_storage_pool) { - // Can't init GlobalStoragePool twice. - // Because we won't remove the gc task in BackGroundPool - // Also won't remove it from ~GlobalStoragePool() - throw Exception("GlobalStoragePool has already been initialized.", ErrorCodes::LOGICAL_ERROR); + // GlobalStoragePool may be initialized many times in some test cases for restore. + LOG_WARNING(shared->log, "GlobalStoragePool has already been initialized."); } CurrentMetrics::set(CurrentMetrics::GlobalStorageRunMode, static_cast(shared->storage_run_mode)); if (shared->storage_run_mode == PageStorageRunMode::MIX_MODE || shared->storage_run_mode == PageStorageRunMode::ONLY_V3) diff --git a/dbms/src/Interpreters/InterpreterFactory.cpp b/dbms/src/Interpreters/InterpreterFactory.cpp index 0e6e6fbdae3..8240926f1a3 100644 --- a/dbms/src/Interpreters/InterpreterFactory.cpp +++ b/dbms/src/Interpreters/InterpreterFactory.cpp @@ -28,7 +28,6 @@ #include #include #include -#include #include #include #include @@ -42,7 +41,6 @@ #include #include #include -#include #include #include @@ -136,11 +134,7 @@ std::unique_ptr InterpreterFactory::get(ASTPtr & query, Context & { return std::make_unique(query, context); } - else if (typeid_cast(query.get())) - { - throwIfReadOnly(context); - return std::make_unique(query, context); - } + else if (typeid_cast(query.get())) { throwIfReadOnly(context); diff --git a/dbms/src/Interpreters/InterpreterSystemQuery.cpp b/dbms/src/Interpreters/InterpreterSystemQuery.cpp deleted file mode 100644 index 32437fcc3fd..00000000000 --- a/dbms/src/Interpreters/InterpreterSystemQuery.cpp +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include -#include -#include - -#include - - -namespace DB -{ -namespace ErrorCodes -{ -extern const int BAD_ARGUMENTS; -extern const int CANNOT_KILL; -extern const int NOT_IMPLEMENTED; -} // namespace ErrorCodes - - -namespace -{ -ExecutionStatus getOverallExecutionStatusOfCommands() -{ - return ExecutionStatus(0); -} - -/// Consequently execute all commands and genreates final exception message for failed commands -template -ExecutionStatus getOverallExecutionStatusOfCommands(Callable && command, Callables &&... commands) -{ - ExecutionStatus status_head(0); - try - { - command(); - } - catch (...) - { - status_head = ExecutionStatus::fromCurrentException(); - } - - ExecutionStatus status_tail = getOverallExecutionStatusOfCommands(std::forward(commands)...); - - auto res_status = status_head.code != 0 ? status_head.code : status_tail.code; - auto res_message = status_head.message + (status_tail.message.empty() ? "" : ("\n" + status_tail.message)); - - return ExecutionStatus(res_status, res_message); -} - -} // namespace - - -InterpreterSystemQuery::InterpreterSystemQuery(const ASTPtr & query_ptr_, Context & context_) - : query_ptr(query_ptr_) - , context(context_) -{} - - -BlockIO InterpreterSystemQuery::execute() -{ - auto & query = typeid_cast(*query_ptr); - - using Type = ASTSystemQuery::Type; - - switch (query.type) - { - case Type::SHUTDOWN: - if (kill(0, SIGTERM)) - throwFromErrno("System call kill(0, SIGTERM) failed", ErrorCodes::CANNOT_KILL); - break; - case Type::KILL: - if (kill(0, SIGKILL)) - throwFromErrno("System call kill(0, SIGKILL) failed", ErrorCodes::CANNOT_KILL); - break; - case Type::DROP_DNS_CACHE: - DNSCache::instance().drop(); - break; - case Type::DROP_MARK_CACHE: - context.dropMarkCache(); - break; - case Type::DROP_UNCOMPRESSED_CACHE: - context.dropUncompressedCache(); - break; - case Type::RELOAD_DICTIONARY: - context.getExternalDictionaries().reloadDictionary(query.target_dictionary); - break; - case Type::RELOAD_DICTIONARIES: - { - auto status = getOverallExecutionStatusOfCommands( - [&] { context.getExternalDictionaries().reload(); }, - [&] { context.getEmbeddedDictionaries().reload(); }); - if (status.code != 0) - throw Exception(status.message, status.code); - break; - } - case Type::RELOAD_CONFIG: - context.reloadConfig(); - break; - case Type::STOP_LISTEN_QUERIES: - case Type::START_LISTEN_QUERIES: - case Type::RESTART_REPLICAS: - case Type::SYNC_REPLICA: - case Type::STOP_MERGES: - case Type::START_MERGES: - case Type::STOP_REPLICATION_QUEUES: - case Type::START_REPLICATION_QUEUES: - throw Exception(String(ASTSystemQuery::typeToString(query.type)) + " is not supported yet", ErrorCodes::NOT_IMPLEMENTED); - default: - throw Exception("Unknown type of SYSTEM query", ErrorCodes::BAD_ARGUMENTS); - } - - return BlockIO(); -} - - -} // namespace DB diff --git a/dbms/src/Interpreters/InterpreterSystemQuery.h b/dbms/src/Interpreters/InterpreterSystemQuery.h deleted file mode 100644 index 2f5c30fc480..00000000000 --- a/dbms/src/Interpreters/InterpreterSystemQuery.h +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once -#include - - -namespace DB -{ -class Context; -class IAST; -using ASTPtr = std::shared_ptr; - - -class InterpreterSystemQuery : public IInterpreter -{ -public: - InterpreterSystemQuery(const ASTPtr & query_ptr_, Context & context_); - - BlockIO execute() override; - -private: - ASTPtr query_ptr; - Context & context; -}; - - -} // namespace DB diff --git a/dbms/src/Parsers/ASTSystemQuery.cpp b/dbms/src/Parsers/ASTSystemQuery.cpp deleted file mode 100644 index b1a2dbbc752..00000000000 --- a/dbms/src/Parsers/ASTSystemQuery.cpp +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include - - -namespace DB -{ - - -namespace ErrorCodes -{ - extern const int BAD_TYPE_OF_FIELD; - extern const int NOT_IMPLEMENTED; -} - - -const char * ASTSystemQuery::typeToString(Type type) -{ - switch (type) - { - case Type::SHUTDOWN: - return "SHUTDOWN"; - case Type::KILL: - return "KILL"; - case Type::DROP_DNS_CACHE: - return "DROP DNS CACHE"; - case Type::DROP_MARK_CACHE: - return "DROP MARK CACHE"; - case Type::DROP_UNCOMPRESSED_CACHE: - return "DROP UNCOMPRESSED CACHE"; - case Type::STOP_LISTEN_QUERIES: - return "STOP LISTEN QUERIES"; - case Type::START_LISTEN_QUERIES: - return "START LISTEN QUERIES"; - case Type::RESTART_REPLICAS: - return "RESTART REPLICAS"; - case Type::SYNC_REPLICA: - return "SYNC REPLICA"; - case Type::RELOAD_DICTIONARY: - return "RELOAD DICTIONARY"; - case Type::RELOAD_DICTIONARIES: - return "RELOAD DICTIONARIES"; - case Type::RELOAD_CONFIG: - return "RELOAD CONFIG"; - case Type::STOP_MERGES: - return "STOP MERGES"; - case Type::START_MERGES: - return "START MERGES"; - case Type::STOP_REPLICATION_QUEUES: - return "STOP REPLICATION QUEUES"; - case Type::START_REPLICATION_QUEUES: - return "START REPLICATION QUEUES"; - default: - throw Exception("Unknown SYSTEM query command", ErrorCodes::BAD_TYPE_OF_FIELD); - } -} - - -void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &, FormatStateStacked) const -{ - settings.ostr << (settings.hilite ? hilite_keyword : "") << "SYSTEM " << (settings.hilite ? hilite_none : ""); - settings.ostr << typeToString(type); - - if (type == Type::RELOAD_DICTIONARY) - settings.ostr << " " << backQuoteIfNeed(target_dictionary); - else if (type == Type::SYNC_REPLICA) - throw Exception("SYNC_REPLICA isn't supported yet", ErrorCodes::NOT_IMPLEMENTED); -} - - -} diff --git a/dbms/src/Parsers/ASTSystemQuery.h b/dbms/src/Parsers/ASTSystemQuery.h deleted file mode 100644 index 06841542114..00000000000 --- a/dbms/src/Parsers/ASTSystemQuery.h +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include - - -namespace DB -{ - -class ASTSystemQuery : public IAST -{ -public: - - enum class Type - { - UNKNOWN, - SHUTDOWN, - KILL, - DROP_DNS_CACHE, - DROP_MARK_CACHE, - DROP_UNCOMPRESSED_CACHE, - STOP_LISTEN_QUERIES, - START_LISTEN_QUERIES, - RESTART_REPLICAS, - SYNC_REPLICA, - RELOAD_DICTIONARY, - RELOAD_DICTIONARIES, - RELOAD_CONFIG, - STOP_MERGES, - START_MERGES, - STOP_REPLICATION_QUEUES, - START_REPLICATION_QUEUES, - END - }; - - static const char * typeToString(Type type); - - Type type = Type::UNKNOWN; - - String target_dictionary; - //String target_replica_database; - //String target_replica_table; - - String getID() const override { return "SYSTEM query"; }; - - ASTPtr clone() const override { return std::make_shared(*this); } - -protected: - - void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override; -}; - - -} diff --git a/dbms/src/Parsers/ParserQuery.cpp b/dbms/src/Parsers/ParserQuery.cpp index 8394cf0c722..8c506763315 100644 --- a/dbms/src/Parsers/ParserQuery.cpp +++ b/dbms/src/Parsers/ParserQuery.cpp @@ -22,7 +22,6 @@ #include #include #include -#include #include namespace DB @@ -34,7 +33,6 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ParserUseQuery use_p; ParserSetQuery set_p; ParserDBGInvokeQuery dbginvoke_p; - ParserSystemQuery system_p; ParserManageQuery manage_p; bool res = query_with_output_p.parse(pos, node, expected) @@ -42,7 +40,6 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) || use_p.parse(pos, node, expected) || set_p.parse(pos, node, expected) || dbginvoke_p.parse(pos, node, expected) - || system_p.parse(pos, node, expected) || manage_p.parse(pos, node, expected); return res; diff --git a/dbms/src/Parsers/ParserSystemQuery.cpp b/dbms/src/Parsers/ParserSystemQuery.cpp deleted file mode 100644 index eca26bd8122..00000000000 --- a/dbms/src/Parsers/ParserSystemQuery.cpp +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include -#include - - -namespace ErrorCodes -{ - extern const int NOT_IMPLEMENTED; -} - - -namespace DB -{ - - -bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected) -{ - if (!ParserKeyword{"SYSTEM"}.ignore(pos)) - return false; - - using Type = ASTSystemQuery::Type; - - auto res = std::make_shared(); - - bool found = false; - for (int i = static_cast(Type::UNKNOWN) + 1; i < static_cast(Type::END); ++i) - { - Type t = static_cast(i); - if (ParserKeyword{ASTSystemQuery::typeToString(t)}.ignore(pos)) - { - res->type = t; - found = true; - } - } - - if (!found) - return false; - - if (res->type == Type::RELOAD_DICTIONARY) - { - if (!parseIdentifierOrStringLiteral(pos, expected, res->target_dictionary)) - return false; - } - else if (res->type == Type::SYNC_REPLICA) - { - throw Exception("SYNC REPLICA is not supported yet", ErrorCodes::NOT_IMPLEMENTED); - } - - node = std::move(res); - return true; -} - -} diff --git a/dbms/src/Parsers/ParserSystemQuery.h b/dbms/src/Parsers/ParserSystemQuery.h deleted file mode 100644 index 3e4539b8600..00000000000 --- a/dbms/src/Parsers/ParserSystemQuery.h +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once -#include - - -namespace DB -{ - - -class ParserSystemQuery : public IParserBase -{ -protected: - const char * getName() const override { return "SYSTEM query"; } - bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; -}; - -} diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h index 00731068858..21b424dffce 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h @@ -114,7 +114,8 @@ class ColumnFile virtual ColumnFileReaderPtr getReader(const DMContext & context, const StorageSnapshotPtr & storage_snap, const ColumnDefinesPtr & col_defs) const = 0; - /// only ColumnInMemoryFile can be appendable + /// Note: Only ColumnFileInMemory can be appendable. Other ColumnFiles (i.e. ColumnFilePersisted) have + /// been persisted in the disk and their data will be immutable. virtual bool isAppendable() const { return false; } virtual void disableAppend() {} virtual bool append(DMContext & /*dm_context*/, const Block & /*data*/, size_t /*offset*/, size_t /*limit*/, size_t /*data_bytes*/) diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h index 5c43bed8c28..efd4705c0f5 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h @@ -27,7 +27,6 @@ using ColumnTinyFilePtr = std::shared_ptr; /// It may be created in two ways: /// 1. created directly when writing to storage if the data is large enough /// 2. created when flushed `ColumnFileInMemory` to disk -/// And it may have cache data if the column file is small enough(The details are in the flush process). class ColumnFileTiny : public ColumnFilePersisted { friend class ColumnFileTinyReader; @@ -38,13 +37,15 @@ class ColumnFileTiny : public ColumnFilePersisted UInt64 rows = 0; UInt64 bytes = 0; - // The id of data page which stores the data of this pack. + /// The id of data page which stores the data of this pack. PageId data_page_id; /// The members below are not serialized. - // The cache data in memory. + + /// The cache data in memory. + /// Currently this field is unused. CachePtr cache; - // Used to map column id to column instance in a Block. + /// Used to map column id to column instance in a Block. ColIdToOffset colid_to_offset; private: diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h index 4cde8b3e121..8f14682caa8 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h @@ -234,7 +234,9 @@ class DeltaValueSpace /// Flush the data of column files which haven't write to disk yet, and also save the metadata of column files. bool flush(DMContext & context); - /// Compacts fragment column files into bigger one, to save some IOPS during reading. + /// Compact fragment column files in the delta layer into bigger column files, to save some IOPS during reading. + /// It does not merge the delta into stable layer. + /// a.k.a. minor compaction. bool compact(DMContext & context); /// Create a constant snapshot for read. diff --git a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp index 042f1bacf6c..bab8f352cad 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp @@ -33,7 +33,7 @@ namespace DM { void MemTableSet::appendColumnFileInner(const ColumnFilePtr & column_file) { - // If this column file's schema is identical to last_schema, then use the last_schema instance, + // If this column file's schema is identical to last_schema, then use the last_schema instance (instead of the one in `column_file`), // so that we don't have to serialize my_schema instance. if (auto * m_file = column_file->tryToInMemoryFile(); m_file) { @@ -54,6 +54,8 @@ void MemTableSet::appendColumnFileInner(const ColumnFilePtr & column_file) if (!column_files.empty()) { + // As we are now appending a new column file (which can be used for new appends), + // let's simply mark the last column file as not appendable. auto & last_column_file = column_files.back(); if (last_column_file->isAppendable()) last_column_file->disableAppend(); @@ -212,7 +214,7 @@ ColumnFileFlushTaskPtr MemTableSet::buildFlushTask(DMContext & context, size_t r if (column_files.empty()) return nullptr; - // make the last column file not appendable + // Mark the last ColumnFile not appendable, so that `appendToCache` will not reuse it and we will be safe to flush it to disk. if (column_files.back()->isAppendable()) column_files.back()->disableAppend(); @@ -224,6 +226,8 @@ ColumnFileFlushTaskPtr MemTableSet::buildFlushTask(DMContext & context, size_t r auto & task = flush_task->addColumnFile(column_file); if (auto * m_file = column_file->tryToInMemoryFile(); m_file) { + // If the ColumnFile is not yet persisted in the disk, it will contain block data. + // In this case, let's write the block data in the flush process as well. task.rows_offset = cur_rows_offset; task.deletes_offset = cur_deletes_offset; task.block_data = m_file->readDataForFlush(); diff --git a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h index 295b358090e..4f0bc4f857e 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h +++ b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h @@ -88,8 +88,14 @@ class MemTableSet : public std::enable_shared_from_this /// The following methods returning false means this operation failed, caused by other threads could have done /// some updates on this instance. E.g. this instance have been abandoned. /// Caller should try again from the beginning. + + /// Append a ColumnFile into this MemTableSet. The ColumnFile may be flushed later. + /// Note that some ColumnFiles may not contain block data, but only a reference to the block data stored in disk. + /// See different ColumnFile implementations for details. void appendColumnFile(const ColumnFilePtr & column_file); + /// Append the block data into a ColumnFileInMemory (may be reused). + /// The ColumnFileInMemory will be stored in this MemTableSet and flushed later. void appendToCache(DMContext & dm_context, const Block & block, size_t offset, size_t limit); void appendDeleteRange(const RowKeyRange & delete_range); @@ -99,7 +105,7 @@ class MemTableSet : public std::enable_shared_from_this /// Create a constant snapshot for read. ColumnFileSetSnapshotPtr createSnapshot(const StorageSnapshotPtr & storage_snap); - /// Build a flush task which will try to flush all column files in MemTableSet now + /// Build a flush task which will try to flush all column files in this MemTableSet at this moment. ColumnFileFlushTaskPtr buildFlushTask(DMContext & context, size_t rows_offset, size_t deletes_offset, size_t flush_version); void removeColumnFilesInFlushTask(const ColumnFileFlushTask & flush_task); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h b/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h index dd9fa6e7d1d..77335e6d9f0 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h @@ -223,6 +223,7 @@ inline bool hasColumn(const ColumnDefines & columns, const ColId & col_id) return false; } +/// Checks whether two blocks have the same schema. template inline bool isSameSchema(const Block & a, const Block & b) { diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 6b4339938af..2d53f85f516 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -578,7 +578,7 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ const auto bytes = block.bytes(); { - // Sort by handle & version in ascending order. + // Sort the block by handle & version in ascending order. SortDescription sort; sort.emplace_back(EXTRA_HANDLE_COLUMN_NAME, 1, 0); sort.emplace_back(VERSION_COLUMN_NAME, 1, 0); @@ -594,6 +594,7 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ const auto handle_column = block.getByName(EXTRA_HANDLE_COLUMN_NAME).column; auto rowkey_column = RowKeyColumnContainer(handle_column, is_common_handle); + // Write block by segments while (offset != rows) { RowKeyValueRef start_key = rowkey_column.getRowKeyValue(offset); @@ -604,6 +605,7 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ // Keep trying until succeeded. while (true) { + // Find the segment according to current start_key SegmentPtr segment; { std::shared_lock lock(read_write_mutex); @@ -618,12 +620,16 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ } FAIL_POINT_PAUSE(FailPoints::pause_when_writing_to_dt_store); + + // Do force merge or stop write if necessary. waitForWrite(dm_context, segment); if (segment->hasAbandoned()) continue; const auto & rowkey_range = segment->getRowKeyRange(); + // The [offset, rows - offset] can be exceeding the Segment's rowkey_range. Cut the range + // to fit the segment. auto [cur_offset, cur_limit] = rowkey_range.getPosRange(handle_column, offset, rows - offset); if (unlikely(cur_offset != offset)) throw Exception("cur_offset does not equal to offset", ErrorCodes::LOGICAL_ERROR); @@ -632,8 +638,8 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ auto alloc_bytes = block.bytes(offset, limit); bool is_small = limit < dm_context->delta_cache_limit_rows / 4 && alloc_bytes < dm_context->delta_cache_limit_bytes / 4; - // Small column files are appended to Delta Cache, then flushed later. - // While large column files are directly written to PageStorage. + // For small column files, data is appended to MemTableSet, then flushed later. + // For large column files, data is directly written to PageStorage, while the ColumnFile entry is appended to MemTableSet. if (is_small) { if (segment->writeToCache(*dm_context, block, offset, limit)) @@ -651,6 +657,8 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ wbs.rollbackWrittenLogAndData(); wbs.clear(); + // In this case we will construct a ColumnFile that does not contain block data in the memory. + // The block data has been written to PageStorage in wbs. write_column_file = ColumnFileTiny::writeColumnFile(*dm_context, block, offset, limit, wbs); wbs.writeLogAndData(); write_range = rowkey_range; @@ -1200,6 +1208,7 @@ void DeltaMergeStore::waitForWrite(const DMContextPtr & dm_context, const Segmen size_t delta_rows = segment->getDelta()->getRows(); size_t delta_bytes = segment->getDelta()->getBytes(); + // No need to stall the write stall if not exceeding the threshold of force merge. if (delta_rows < forceMergeDeltaRows(dm_context) && delta_bytes < forceMergeDeltaBytes(dm_context)) return; @@ -1216,16 +1225,23 @@ void DeltaMergeStore::waitForWrite(const DMContextPtr & dm_context, const Segmen size_t sleep_ms; if (delta_rows >= stop_write_delta_rows || delta_bytes >= stop_write_delta_bytes) + { + // For stop write (hard limit), wait until segment is updated (e.g. delta is merged). sleep_ms = std::numeric_limits::max(); + } else + { + // For force merge (soft limit), wait for a reasonable amount of time. + // It is possible that the segment is still not updated after the wait. sleep_ms = static_cast(segment_bytes) / k10mb * 1000 * wait_duration_factor; + } // checkSegmentUpdate could do foreground merge delta, so call it before sleep. checkSegmentUpdate(dm_context, segment, ThreadType::Write); size_t sleep_step = 50; - // The delta will be merged, only after this segment got abandoned. - // Because merge delta will replace the segment instance. + // Wait at most `sleep_ms` until the delta is merged. + // Merge delta will replace the segment instance, causing `segment->hasAbandoned() == true`. while (!segment->hasAbandoned() && sleep_ms > 0) { size_t ms = std::min(sleep_ms, sleep_step); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index c600ac9dbf8..a8c34073f50 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -374,10 +374,14 @@ class DeltaMergeStore : private boost::noncopyable void flushCache(const DMContextPtr & dm_context, const RowKeyRange & range); - /// Do merge delta for all segments. Only used for debug. + /// Merge delta into the stable layer for all segments. + /// + /// This function is called when using `MANAGE TABLE [TABLE] MERGE DELTA` from TiFlash Client. void mergeDeltaAll(const Context & context); - /// Compact fragment column files into bigger one. + + /// Compact the delta layer, merging multiple fragmented delta files into larger ones. + /// This is a minor compaction as it does not merge the delta into stable layer. void compact(const Context & context, const RowKeyRange & range); /// Iterator over all segments and apply gc jobs. @@ -424,13 +428,33 @@ class DeltaMergeStore : private boost::noncopyable return handle_define.id != EXTRA_HANDLE_COLUMN_ID; } + /// Try to stall the writing. It will suspend the current thread if flow control is necessary. + /// There are roughly two flow control mechanisms: + /// - Force Merge (1 GB by default, see force_merge_delta_rows|size): Wait for a small amount of time at most. + /// - Stop Write (2 GB by default, see stop_write_delta_rows|size): Wait until delta is merged. void waitForWrite(const DMContextPtr & context, const SegmentPtr & segment); + void waitForDeleteRange(const DMContextPtr & context, const SegmentPtr & segment); + /// Try to update the segment. "Update" means splitting the segment into two, merging two segments, merging the delta, etc. + /// If an update is really performed, the segment will be abandoned (with `segment->hasAbandoned() == true`). + /// See `segmentSplit`, `segmentMerge`, `segmentMergeDelta` for details. + /// + /// This may be called from multiple threads, e.g. at the foreground write moment, or in background threads. + /// A `thread_type` should be specified indicating the type of the thread calling this function. + /// Depend on the thread type, the "update" to do may be varied. void checkSegmentUpdate(const DMContextPtr & context, const SegmentPtr & segment, ThreadType thread_type); + /// Split the segment into two. + /// After splitting, the segment will be abandoned (with `segment->hasAbandoned() == true`) and the new two segments will be returned. SegmentPair segmentSplit(DMContext & dm_context, const SegmentPtr & segment, bool is_foreground); + + /// Merge two segments into one. + /// After merging, both segments will be abandoned (with `segment->hasAbandoned() == true`). void segmentMerge(DMContext & dm_context, const SegmentPtr & left, const SegmentPtr & right, bool is_foreground); + + /// Merge the delta (major compaction) in the segment. + /// After delta-merging, the segment will be abandoned (with `segment->hasAbandoned() == true`) and a new segment will be returned. SegmentPtr segmentMergeDelta( DMContext & dm_context, const SegmentPtr & segment, diff --git a/dbms/src/Storages/DeltaMerge/RowKeyRange.h b/dbms/src/Storages/DeltaMerge/RowKeyRange.h index 43b56b2a1c0..ef2afbfdb32 100644 --- a/dbms/src/Storages/DeltaMerge/RowKeyRange.h +++ b/dbms/src/Storages/DeltaMerge/RowKeyRange.h @@ -63,6 +63,7 @@ struct RowKeyValueRef RowKeyValue toRowKeyValue() const; }; +/// Stores the raw bytes of the RowKey. Normally the RowKey will be stored in a column. struct RowKeyValue { RowKeyValue() = default; @@ -112,7 +113,11 @@ struct RowKeyValue // Format as a hex string for debugging. The value will be converted to '?' if redact-log is on String toDebugString() const; - RowKeyValueRef toRowKeyValueRef() const { return RowKeyValueRef{is_common_handle, value->data(), value->size(), int_value}; } + inline RowKeyValueRef toRowKeyValueRef() const + { + return RowKeyValueRef{is_common_handle, value->data(), value->size(), int_value}; + } + DecodedTiKVKeyPtr toRegionKey(TableID table_id) const { // FIXME: move this to TiKVRecordFormat.h @@ -177,6 +182,8 @@ struct RowKeyValue using RowKeyValues = std::vector; +/// An optimized implementation that will try to compare IntHandle via comparing Int values directly. +/// For common handles, per-byte comparison will be still used. inline int compare(const RowKeyValueRef & a, const RowKeyValueRef & b) { if (unlikely(a.is_common_handle != b.is_common_handle)) @@ -214,6 +221,9 @@ inline int compare(const RowKeyValueRef & a, const RowKeyValueRef & b) } } +// TODO (wenxuan): The following compare operators can be simplified using +// boost::operator, or operator<=> when we upgrade to C++20. + inline int compare(const StringRef & a, const RowKeyValueRef & b) { RowKeyValueRef r_a{true, a.data, a.size, 0}; @@ -353,12 +363,13 @@ size_t lowerBound(const RowKeyColumnContainer & rowkey_column, size_t first, siz } } // namespace +/// A range denoted as [StartRowKey, EndRowKey). struct RowKeyRange { - // todo use template to refine is_common_handle + // TODO: use template to refine is_common_handle bool is_common_handle; - /// start and end in RowKeyRange are always meaningful - /// it is assumed that start value is included and end value is excluded. + + // start and end in RowKeyRange are always meaningful. RowKeyValue start; RowKeyValue end; size_t rowkey_column_size; @@ -440,6 +451,7 @@ struct RowKeyRange } } + /// Create a RowKeyRange that covers all key space. static RowKeyRange newAll(bool is_common_handle, size_t rowkey_column_size) { if (is_common_handle) @@ -456,6 +468,7 @@ struct RowKeyRange } } + /// Create a RowKeyRange that covers no data at all. static RowKeyRange newNone(bool is_common_handle, size_t rowkey_column_size) { if (is_common_handle) @@ -594,10 +607,13 @@ struct RowKeyRange return check(first) && check(last_include); } + /// Check whether thisRange.Start <= key inline bool checkStart(const RowKeyValueRef & value) const { return compare(getStart(), value) <= 0; } + /// Check whether key < thisRange.End inline bool checkEnd(const RowKeyValueRef & value) const { return compare(value, getEnd()) < 0; } + /// Check whether the key is included in this range. inline bool check(const RowKeyValueRef & value) const { return checkStart(value) && checkEnd(value); } inline RowKeyValueRef getStart() const { return start.toRowKeyValueRef(); } @@ -625,7 +641,7 @@ struct RowKeyRange return {start_key, end_key}; } - /// return + /// Clip the according to this range, and return the clipped . std::pair getPosRange(const ColumnPtr & column, const size_t offset, const size_t limit) const { RowKeyColumnContainer rowkey_column(column, is_common_handle); diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index 0d048011e18..3ad29ee14a5 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -135,9 +135,16 @@ class Segment : private boost::noncopyable void serialize(WriteBatch & wb); + /// Attach a new ColumnFile into the Segment. The ColumnFile will be added to MemFileSet and flushed to disk later. + /// The block data of the passed in ColumnFile should be placed on disk before calling this function. + /// To write new block data, you can use `writeToCache`. bool writeToDisk(DMContext & dm_context, const ColumnFilePtr & column_file); + + /// Write a block of data into the MemTableSet part of the Segment. The data will be flushed to disk later. bool writeToCache(DMContext & dm_context, const Block & block, size_t offset, size_t limit); - bool write(DMContext & dm_context, const Block & block); // For test only + + /// For test only. + bool write(DMContext & dm_context, const Block & block); bool write(DMContext & dm_context, const RowKeyRange & delete_range); bool ingestColumnFiles(DMContext & dm_context, const RowKeyRange & range, const ColumnFiles & column_files, bool clear_data_in_range); @@ -219,7 +226,12 @@ class Segment : private boost::noncopyable WriteBatches & wbs, const StableValueSpacePtr & merged_stable); + /// Merge the delta (major compaction) and return the new segment. + /// + /// Note: This is only a shortcut function used in tests. + /// Normally you should call `prepareMergeDelta`, `applyMergeDelta` instead. SegmentPtr mergeDelta(DMContext & dm_context, const ColumnDefinesPtr & schema_snap) const; + StableValueSpacePtr prepareMergeDelta( DMContext & dm_context, const ColumnDefinesPtr & schema_snap, @@ -237,6 +249,8 @@ class Segment : private boost::noncopyable bool flushCache(DMContext & dm_context); void placeDeltaIndex(DMContext & dm_context); + /// Compact the delta layer, merging fragment column files into bigger column files. + /// It does not merge the delta into stable layer. bool compactDelta(DMContext & dm_context); size_t getEstimatedRows() const { return delta->getRows() + stable->getRows(); } @@ -266,11 +280,18 @@ class Segment : private boost::noncopyable return lock; } + /// Marks this segment as abandoned. + /// Note: Segment member functions never abandon the segment itself. + /// The abandon state is usually triggered by the DeltaMergeStore. void abandon(DMContext & context) { LOG_FMT_DEBUG(log, "Abandon segment [{}]", segment_id); delta->abandon(context); } + + /// Returns whether this segment has been marked as abandoned. + /// Note: Segment member functions never abandon the segment itself. + /// The abandon state is usually triggered by the DeltaMergeStore. bool hasAbandoned() { return delta->hasAbandoned(); } bool isSplitForbidden() { return split_forbidden; } @@ -372,7 +393,9 @@ class Segment : private boost::noncopyable bool relevant_place) const; private: - const UInt64 epoch; // After split / merge / merge delta, epoch got increased by 1. + /// The version of this segment. After split / merge / merge delta, epoch got increased by 1. + const UInt64 epoch; + RowKeyRange rowkey_range; bool is_common_handle; size_t rowkey_column_size; diff --git a/dbms/src/Storages/IManageableStorage.h b/dbms/src/Storages/IManageableStorage.h index ff2428f4533..23dc4cc447a 100644 --- a/dbms/src/Storages/IManageableStorage.h +++ b/dbms/src/Storages/IManageableStorage.h @@ -76,10 +76,10 @@ class IManageableStorage : public IStorage virtual void deleteRows(const Context &, size_t /*rows*/) { throw Exception("Unsupported"); } - // `limit` is the max number of segments to gc, return value is the number of segments gced + /// `limit` is the max number of segments to gc, return value is the number of segments gced virtual UInt64 onSyncGc(Int64 /*limit*/) { throw Exception("Unsupported"); } - // Return true is data dir exist + /// Return true is data dir exist virtual bool initStoreIfDataDirExist() { throw Exception("Unsupported"); } virtual void mergeDelta(const Context &) { throw Exception("Unsupported"); } @@ -90,7 +90,7 @@ class IManageableStorage : public IStorage virtual String getDatabaseName() const = 0; - // Update tidb table info in memory. + /// Update tidb table info in memory. virtual void setTableInfo(const TiDB::TableInfo & table_info_) = 0; virtual const TiDB::TableInfo & getTableInfo() const = 0; @@ -99,8 +99,8 @@ class IManageableStorage : public IStorage Timestamp getTombstone() const { return tombstone; } void setTombstone(Timestamp tombstone_) { IManageableStorage::tombstone = tombstone_; } - // Apply AlterCommands synced from TiDB should use `alterFromTiDB` instead of `alter(...)` - // Once called, table_info is guaranteed to be persisted, regardless commands being empty or not. + /// Apply AlterCommands synced from TiDB should use `alterFromTiDB` instead of `alter(...)` + /// Once called, table_info is guaranteed to be persisted, regardless commands being empty or not. virtual void alterFromTiDB( const TableLockHolder &, const AlterCommands & commands, @@ -110,16 +110,14 @@ class IManageableStorage : public IStorage const Context & context) = 0; - /** Rename the table. - * - * Renaming a name in a file with metadata, the name in the list of tables in the RAM, is done separately. - * Different from `IStorage::rename`, storage's data path do not contain database name, nothing to do with data path, `new_path_to_db` is ignored. - * But `getDatabaseName` and `getTableInfo` means we usally store database name / TiDB table info as member in storage, - * we need to update database name with `new_database_name`, and table name in tidb table info with `new_display_table_name`. - * - * Called when the table structure is locked for write. - * TODO: For TiFlash, we can rename without any lock on data? - */ + /// Rename the table. + /// + /// Renaming a name in a file with metadata, the name in the list of tables in the RAM, is done separately. + /// Different from `IStorage::rename`, storage's data path do not contain database name, nothing to do with data path, `new_path_to_db` is ignored. + /// But `getDatabaseName` and `getTableInfo` means we usually store database name / TiDB table info as member in storage, + /// we need to update database name with `new_database_name`, and table name in tidb table info with `new_display_table_name`. + /// + /// Called when the table structure is locked for write. virtual void rename( const String & new_path_to_db, const String & new_database_name, @@ -160,9 +158,9 @@ class IManageableStorage : public IStorage virtual size_t getRowKeyColumnSize() const { return 1; } - // when `need_block` is true, it will try return a cached block corresponding to DecodingStorageSchemaSnapshotConstPtr, - // and `releaseDecodingBlock` need to be called when the block is free - // when `need_block` is false, it will just return an nullptr + /// when `need_block` is true, it will try return a cached block corresponding to DecodingStorageSchemaSnapshotConstPtr, + /// and `releaseDecodingBlock` need to be called when the block is free + /// when `need_block` is false, it will just return an nullptr virtual std::pair getSchemaSnapshotAndBlockForDecoding(bool /* need_block */) { throw Exception("Method getDecodingSchemaSnapshot is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp index d3fdafe57e8..2a5714de027 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp @@ -43,17 +43,16 @@ class PageStorageMixedTest : public DB::base::TiFlashStorageTestBasic storage_path_pool_v3 = std::make_unique(Strings{path}, Strings{path}, Strings{}, std::make_shared(0, paths, caps, Strings{}, caps), global_context.getFileProvider(), true); global_context.setPageStorageRunMode(PageStorageRunMode::MIX_MODE); - if (!global_context.getGlobalStoragePool()) - global_context.initializeGlobalStoragePoolIfNeed(*storage_path_pool_v3); } void SetUp() override { + auto & global_context = DB::tests::TiFlashTestEnv::getGlobalContext(); + global_context.setPageStorageRunMode(PageStorageRunMode::MIX_MODE); TiFlashStorageTestBasic::SetUp(); const auto & path = getTemporaryPath(); createIfNotExist(path); - auto & global_context = DB::tests::TiFlashTestEnv::getGlobalContext(); std::vector caps = {}; Strings paths = {path}; @@ -75,7 +74,7 @@ class PageStorageMixedTest : public DB::base::TiFlashStorageTestBasic PageStorageRunMode reloadMixedStoragePool() { - DB::tests::TiFlashTestEnv::getContext().setPageStorageRunMode(PageStorageRunMode::MIX_MODE); + db_context->setPageStorageRunMode(PageStorageRunMode::MIX_MODE); PageStorageRunMode run_mode = storage_pool_mix->restore(); page_writer_mix = storage_pool_mix->logWriter(); page_reader_mix = storage_pool_mix->logReader(); @@ -84,7 +83,7 @@ class PageStorageMixedTest : public DB::base::TiFlashStorageTestBasic void reloadV2StoragePool() { - DB::tests::TiFlashTestEnv::getContext().setPageStorageRunMode(PageStorageRunMode::ONLY_V2); + db_context->setPageStorageRunMode(PageStorageRunMode::ONLY_V2); storage_pool_v2->restore(); page_writer_v2 = storage_pool_v2->logWriter(); page_reader_v2 = storage_pool_v2->logReader(); diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index f2aa227d29c..c61842e4353 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -414,6 +414,10 @@ class DMBlockOutputStream : public IBlockOutputStream void write(const Block & block) override try { + // When dt_insert_max_rows (Max rows of insert blocks when write into DeltaTree Engine, default = 0) is specified, + // the insert block will be splited into multiples. + // Currently dt_insert_max_rows is only used for performance tests. + if (db_settings.dt_insert_max_rows == 0) { Block to_write = decorator(block); @@ -467,6 +471,7 @@ void StorageDeltaMerge::write(Block & block, const Settings & settings) #ifndef NDEBUG { // Do some check under DEBUG mode to ensure all block are written with column id properly set. + // In this way we can catch the case that upstream raft log contains problematic data written from TiDB. auto header = store->getHeader(); bool ok = true; String name; diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index 560f365e747..b9598b77a86 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -73,6 +74,9 @@ class StorageDeltaMerge void flushCache(const Context & context, const DM::RowKeyRange & range_to_flush) override; + /// Merge delta into the stable layer for all segments. + /// + /// This function is called when using `MANAGE TABLE [TABLE] MERGE DELTA` from TiFlash Client. void mergeDelta(const Context & context) override; void deleteRange(const DM::RowKeyRange & range_to_delete, const Settings & settings); diff --git a/dbms/src/TestUtils/TiFlashTestEnv.cpp b/dbms/src/TestUtils/TiFlashTestEnv.cpp index 8ec391c03d4..7e8bd0da319 100644 --- a/dbms/src/TestUtils/TiFlashTestEnv.cpp +++ b/dbms/src/TestUtils/TiFlashTestEnv.cpp @@ -95,6 +95,7 @@ Context TiFlashTestEnv::getContext(const DB::Settings & settings, Strings testda context.setPath(root_path); auto paths = getPathPool(testdata_path); context.setPathPool(paths.first, paths.second, Strings{}, true, context.getPathCapacity(), context.getFileProvider()); + global_context->initializeGlobalStoragePoolIfNeed(context.getPathPool()); context.getSettingsRef() = settings; return context; }