diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 83be0f1f534..7900cd41b26 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -54,7 +54,7 @@ MakeRegionQueryInfos( mvcc_info.regions_query_info.clear(); RegionRetryList region_need_retry; RegionException::RegionReadStatus status_res = RegionException::RegionReadStatus::OK; - for (auto & [id, r] : dag_region_infos) + for (const auto & [id, r] : dag_region_infos) { if (r.key_ranges.empty()) { @@ -155,7 +155,7 @@ void DAGStorageInterpreter::execute(DAGPipeline & pipeline) if (!mvcc_query_info->regions_query_info.empty()) doLocalRead(pipeline, settings.max_block_size); - for (auto & region_info : dag_context.getRegionsForRemoteRead()) + for (const auto & region_info : dag_context.getRegionsForRemoteRead()) region_retry.emplace_back(region_info); null_stream_if_empty = std::make_shared(storage->getSampleBlockForColumns(required_columns)); @@ -267,7 +267,7 @@ void DAGStorageInterpreter::doLocalRead(DAGPipeline & pipeline, size_t max_block RegionException::UnavailableRegions region_ids; for (const auto & info : regions_info) { - if (rand() % 100 > 50) + if (random() % 100 > 50) region_ids.insert(info.region_id); } throw RegionException(std::move(region_ids), RegionException::RegionReadStatus::NOT_FOUND); @@ -364,12 +364,12 @@ std::tuple DAGStorageInterpreter /// Get current schema version in schema syncer for a chance to shortcut. if (unlikely(query_schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION)) { - auto storage_ = tmt.getStorages().get(table_id); - if (!storage_) + auto managed_storage = tmt.getStorages().get(table_id); + if (!managed_storage) { throw TiFlashException("Table " + std::to_string(table_id) + " doesn't exist.", Errors::Table::NotExists); } - return {storage_, storage_->lockStructureForShare(context.getCurrentQueryId())}; + return {managed_storage, managed_storage->lockStructureForShare(context.getCurrentQueryId())}; } auto global_schema_version = tmt.getSchemaSyncer()->getCurrentVersion(); @@ -379,8 +379,8 @@ std::tuple DAGStorageInterpreter auto get_and_lock_storage = [&](bool schema_synced) -> std::tuple { /// Get storage in case it's dropped then re-created. // If schema synced, call getTable without try, leading to exception on table not existing. - auto storage_ = tmt.getStorages().get(table_id); - if (!storage_) + auto managed_storage = tmt.getStorages().get(table_id); + if (!managed_storage) { if (schema_synced) throw TiFlashException("Table " + std::to_string(table_id) + " doesn't exist.", Errors::Table::NotExists); @@ -388,14 +388,14 @@ std::tuple DAGStorageInterpreter return {nullptr, TableStructureLockHolder{}, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, false}; } - if (storage_->engineType() != ::TiDB::StorageEngine::TMT && storage_->engineType() != ::TiDB::StorageEngine::DT) + if (managed_storage->engineType() != ::TiDB::StorageEngine::TMT && managed_storage->engineType() != ::TiDB::StorageEngine::DT) { - throw TiFlashException("Specifying schema_version for non-managed storage: " + storage_->getName() - + ", table: " + storage_->getTableName() + ", id: " + DB::toString(table_id) + " is not allowed", + throw TiFlashException("Specifying schema_version for non-managed storage: " + managed_storage->getName() + + ", table: " + managed_storage->getTableName() + ", id: " + DB::toString(table_id) + " is not allowed", Errors::Coprocessor::Internal); } - auto lock = storage_->lockStructureForShare(context.getCurrentQueryId()); + auto lock = managed_storage->lockStructureForShare(context.getCurrentQueryId()); /// Check schema version, requiring TiDB/TiSpark and TiFlash both use exactly the same schema. // We have three schema versions, two in TiFlash: @@ -403,7 +403,7 @@ std::tuple DAGStorageInterpreter // 2. Global: the version that TiFlash global schema is at. // And one from TiDB/TiSpark: // 3. Query: the version that TiDB/TiSpark used for this query. - auto storage_schema_version = storage_->getTableInfo().schema_version; + auto storage_schema_version = managed_storage->getTableInfo().schema_version; // Not allow storage > query in any case, one example is time travel queries. if (storage_schema_version > query_schema_version) throw TiFlashException("Table " + std::to_string(table_id) + " schema version " + std::to_string(storage_schema_version) @@ -412,13 +412,13 @@ std::tuple DAGStorageInterpreter // From now on we have storage <= query. // If schema was synced, it implies that global >= query, as mentioned above we have storage <= query, we are OK to serve. if (schema_synced) - return {storage_, lock, storage_schema_version, true}; + return {managed_storage, lock, storage_schema_version, true}; // From now on the schema was not synced. // 1. storage == query, TiDB/TiSpark is using exactly the same schema that altered this table, we are just OK to serve. // 2. global >= query, TiDB/TiSpark is using a schema older than TiFlash global, but as mentioned above we have storage <= query, // meaning that the query schema is still newer than the time when this table was last altered, so we still OK to serve. if (storage_schema_version == query_schema_version || global_schema_version >= query_schema_version) - return {storage_, lock, storage_schema_version, true}; + return {managed_storage, lock, storage_schema_version, true}; // From now on we have global < query. // Return false for outer to sync and retry. return {nullptr, TableStructureLockHolder{}, storage_schema_version, false}; @@ -480,24 +480,38 @@ std::tuple, String> DAGS Errors::BroadcastJoin::TooManyColumns); } - Names required_columns_; - NamesAndTypes source_columns_; + Names table_scan_required_columns; + NamesAndTypes table_scan_source_columns; std::vector need_cast_column; need_cast_column.reserve(table_scan.columns_size()); - String handle_column_name_ = MutableSupport::tidb_pk_column_name; + String table_scan_handle_column_name = MutableSupport::tidb_pk_column_name; if (auto pk_handle_col = storage->getTableInfo().getPKHandleColumn()) - handle_column_name_ = pk_handle_col->get().name; + table_scan_handle_column_name = pk_handle_col->get().name; for (Int32 i = 0; i < table_scan.columns_size(); i++) { auto const & ci = table_scan.columns(i); ColumnID cid = ci.column_id(); - // Column ID -1 return the handle column - String name = cid == -1 ? handle_column_name_ : storage->getTableInfo().getColumnName(cid); - auto pair = storage->getColumns().getPhysical(name); - required_columns_.emplace_back(std::move(name)); - source_columns_.emplace_back(std::move(pair)); + // Column ID -1 return the handle column, -3 return the extra physical table id column + String name; + if (cid == TiDBPkColumnID) + name = table_scan_handle_column_name; + else if (cid == ExtraTableIDColumnID) + name = MutableSupport::extra_table_id_column_name; + else + name = storage->getTableInfo().getColumnName(cid); + if (cid == ExtraTableIDColumnID) + { + NameAndTypePair extra_table_id_column_pair = {name, MutableSupport::extra_table_id_column_type}; + table_scan_source_columns.emplace_back(std::move(extra_table_id_column_pair)); + } + else + { + auto pair = storage->getColumns().getPhysical(name); + table_scan_source_columns.emplace_back(std::move(pair)); + } + table_scan_required_columns.emplace_back(std::move(name)); if (cid != -1 && ci.tp() == TiDB::TypeTimestamp) need_cast_column.push_back(ExtraCastAfterTSMode::AppendTimeZoneCast); else if (cid != -1 && ci.tp() == TiDB::TypeTime) @@ -506,7 +520,7 @@ std::tuple, String> DAGS need_cast_column.push_back(ExtraCastAfterTSMode::None); } - return {required_columns_, source_columns_, need_cast_column, handle_column_name_}; + return {table_scan_required_columns, table_scan_source_columns, need_cast_column, table_scan_handle_column_name}; } std::tuple, std::optional> DAGStorageInterpreter::buildRemoteTS() @@ -541,7 +555,7 @@ std::tuple, std::optional> DAGStorage executor->set_tp(tipb::ExecType::TypeSelection); executor->set_executor_id(query_block.selection->executor_id()); auto * selection = executor->mutable_selection(); - for (auto & condition : query_block.selection->selection().conditions()) + for (const auto & condition : query_block.selection->selection().conditions()) *selection->add_conditions() = condition; executor = selection->mutable_child(); } @@ -568,7 +582,7 @@ std::tuple, std::optional> DAGStorage } else { - auto & col_info = table_info.getColumnInfo(col_id); + const auto & col_info = table_info.getColumnInfo(col_id); schema.emplace_back(std::make_pair(col_info.name, col_info)); } dag_req.add_output_offsets(i); diff --git a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h index 70fc50a0815..ed86b1b432e 100644 --- a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h @@ -36,6 +36,8 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream size_t expected_block_size_, bool is_raw_, bool do_range_filter_for_raw_, + const int extra_table_id_index, + const TableID physical_table_id, const LogWithPrefixPtr & log_) : dm_context(dm_context_) , task_pool(task_pool_) @@ -47,8 +49,16 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream , expected_block_size(expected_block_size_) , is_raw(is_raw_) , do_range_filter_for_raw(do_range_filter_for_raw_) + , extra_table_id_index(extra_table_id_index) + , physical_table_id(physical_table_id) , log(getMPPTaskLog(log_, NAME)) { + if (extra_table_id_index != InvalidColumnID) + { + ColumnDefine extra_table_id_col_define = getExtraTableIDColumnDefine(); + ColumnWithTypeAndName col{extra_table_id_col_define.type->createColumn(), extra_table_id_col_define.type, extra_table_id_col_define.name, extra_table_id_col_define.id, extra_table_id_col_define.default_value}; + header.insert(extra_table_id_index, col); + } } String getName() const override { return NAME; } @@ -102,6 +112,15 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream if (res) { + if (extra_table_id_index != InvalidColumnID) + { + ColumnDefine extra_table_id_col_define = getExtraTableIDColumnDefine(); + ColumnWithTypeAndName col{{}, extra_table_id_col_define.type, extra_table_id_col_define.name, extra_table_id_col_define.id}; + size_t row_number = res.rows(); + auto col_data = col.type->createColumnConst(row_number, Field(physical_table_id)); + col.column = std::move(col_data); + res.insert(extra_table_id_index, std::move(col)); + } if (!res.rows()) continue; else @@ -128,12 +147,15 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream const size_t expected_block_size; const bool is_raw; const bool do_range_filter_for_raw; + // position of the ExtraPhysTblID column in column_names parameter in the StorageDeltaMerge::read function. + const int extra_table_id_index; bool done = false; BlockInputStreamPtr cur_stream; SegmentPtr cur_segment; + TableID physical_table_id; LogWithPrefixPtr log; }; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h b/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h index aa067ecbd6d..23db40428d5 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h @@ -98,15 +98,18 @@ inline static const UInt64 INITIAL_EPOCH = 0; #define EXTRA_HANDLE_COLUMN_NAME ::DB::MutableSupport::tidb_pk_column_name #define VERSION_COLUMN_NAME ::DB::MutableSupport::version_column_name #define TAG_COLUMN_NAME ::DB::MutableSupport::delmark_column_name +#define EXTRA_TABLE_ID_COLUMN_NAME ::DB::MutableSupport::extra_table_id_column_name #define EXTRA_HANDLE_COLUMN_ID ::DB::TiDBPkColumnID #define VERSION_COLUMN_ID ::DB::VersionColumnID #define TAG_COLUMN_ID ::DB::DelMarkColumnID +#define EXTRA_TABLE_ID_COLUMN_ID ::DB::ExtraTableIDColumnID #define EXTRA_HANDLE_COLUMN_INT_TYPE ::DB::MutableSupport::tidb_pk_column_int_type #define EXTRA_HANDLE_COLUMN_STRING_TYPE ::DB::MutableSupport::tidb_pk_column_string_type #define VERSION_COLUMN_TYPE ::DB::MutableSupport::version_column_type #define TAG_COLUMN_TYPE ::DB::MutableSupport::delmark_column_type +#define EXTRA_TABLE_ID_COLUMN_TYPE ::DB::MutableSupport::extra_table_id_column_type inline const ColumnDefine & getExtraIntHandleColumnDefine() { @@ -134,6 +137,11 @@ inline const ColumnDefine & getTagColumnDefine() static ColumnDefine TAG_COLUMN_DEFINE_{TAG_COLUMN_ID, TAG_COLUMN_NAME, TAG_COLUMN_TYPE}; return TAG_COLUMN_DEFINE_; } +inline const ColumnDefine & getExtraTableIDColumnDefine() +{ + static ColumnDefine EXTRA_TABLE_ID_COLUMN_DEFINE_{EXTRA_TABLE_ID_COLUMN_ID, EXTRA_TABLE_ID_COLUMN_NAME, EXTRA_TABLE_ID_COLUMN_TYPE}; + return EXTRA_TABLE_ID_COLUMN_DEFINE_; +} static constexpr UInt64 MIN_UINT64 = std::numeric_limits::min(); static constexpr UInt64 MAX_UINT64 = std::numeric_limits::max(); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 7845f9c3932..01554d6846e 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -178,13 +178,15 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context, const ColumnDefine & handle, bool is_common_handle_, size_t rowkey_column_size_, - const Settings & settings_) + const Settings & settings_, + const TableID physical_table_id) : global_context(db_context.getGlobalContext()) , path_pool(global_context.getPathPool().withTable(db_name_, table_name_, data_path_contains_database_name)) , settings(settings_) , storage_pool(db_name_ + "." + table_name_, path_pool, global_context, db_context.getSettingsRef()) , db_name(db_name_) , table_name(table_name_) + , physical_table_id(physical_table_id) , is_common_handle(is_common_handle_) , rowkey_column_size(rowkey_column_size_) , original_table_handle_define(handle) @@ -954,7 +956,8 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, const DB::Settings & db_settings, const ColumnDefines & columns_to_read, size_t num_streams, - const SegmentIdSet & read_segments) + const SegmentIdSet & read_segments, + size_t extra_table_id_index) { SegmentReadTasks tasks; @@ -995,6 +998,8 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, DEFAULT_BLOCK_SIZE, true, db_settings.dt_raw_filter_range, + extra_table_id_index, + physical_table_id, nullptr); res.push_back(stream); } @@ -1009,7 +1014,8 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, UInt64 max_version, const RSOperatorPtr & filter, size_t expected_block_size, - const SegmentIdSet & read_segments) + const SegmentIdSet & read_segments, + size_t extra_table_id_index) { auto dm_context = newDMContext(db_context, db_settings, db_context.getCurrentQueryId()); @@ -1038,6 +1044,8 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, expected_block_size, false, db_settings.dt_raw_filter_range, + extra_table_id_index, + physical_table_id, nullptr); res.push_back(stream); } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 71ed4c87cbb..186d623e168 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -280,7 +280,8 @@ class DeltaMergeStore : private boost::noncopyable const ColumnDefine & handle, bool is_common_handle_, size_t rowkey_column_size_, - const Settings & settings_ = EMPTY_SETTINGS); + const Settings & settings_ = EMPTY_SETTINGS, + const TableID physical_table_id = 0); ~DeltaMergeStore(); void setUpBackgroundTask(const DMContextPtr & dm_context); @@ -325,7 +326,8 @@ class DeltaMergeStore : private boost::noncopyable const DB::Settings & db_settings, const ColumnDefines & columns_to_read, size_t num_streams, - const SegmentIdSet & read_segments = {}); + const SegmentIdSet & read_segments = {}, + size_t extra_table_id_index = InvalidColumnID); /// Read rows with MVCC filtering /// `sorted_ranges` should be already sorted and merged @@ -337,7 +339,8 @@ class DeltaMergeStore : private boost::noncopyable UInt64 max_version, const RSOperatorPtr & filter, size_t expected_block_size = DEFAULT_BLOCK_SIZE, - const SegmentIdSet & read_segments = {}); + const SegmentIdSet & read_segments = {}, + size_t extra_table_id_index = InvalidColumnID); /// Force flush all data to disk. void flushCache(const Context & context, const RowKeyRange & range) @@ -436,6 +439,7 @@ class DeltaMergeStore : private boost::noncopyable String db_name; String table_name; + TableID physical_table_id; bool is_common_handle; size_t rowkey_column_size; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp index 464c7beed52..f44dd5d3163 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp @@ -579,6 +579,108 @@ TEST(StorageDeltaMergeInternalTest, RangeSplit) } } +TEST(StorageDeltaMergeTest, ReadExtraPhysicalTableID) +try +{ + // prepare block data + Block sample; + sample.insert(DB::tests::createColumn( + createNumbers(0, 100, /*reversed*/ true), + "col1")); + sample.insert(DB::tests::createColumn( + Strings(100, "a"), + "col2")); + + Context ctx = DMTestEnv::getContext(); + std::shared_ptr storage; + DataTypes data_types; + Names column_names; + // create table + { + NamesAndTypesList names_and_types_list{ + {"col1", std::make_shared()}, + {"col2", std::make_shared()}, + }; + for (const auto & name_type : names_and_types_list) + { + data_types.push_back(name_type.type); + column_names.push_back(name_type.name); + } + + const String path_name = DB::tests::TiFlashTestEnv::getTemporaryPath("StorageDeltaMerge_ReadWriteCase1"); + if (Poco::File path(path_name); path.exists()) + path.remove(true); + + // primary_expr_ast + const String table_name = "t_1233"; + ASTPtr astptr(new ASTIdentifier(table_name, ASTIdentifier::Kind::Table)); + astptr->children.emplace_back(new ASTIdentifier("col1")); + + TiDB::TableInfo tidb_table_info; + tidb_table_info.id = 1; + + storage = StorageDeltaMerge::create("TiFlash", + /* db_name= */ "default", + table_name, + tidb_table_info, + ColumnsDescription{names_and_types_list}, + astptr, + 0, + ctx); + storage->startup(); + } + + // test writing to DeltaMergeStorage + { + ASTPtr insertptr(new ASTInsertQuery()); + BlockOutputStreamPtr output = storage->write(insertptr, ctx.getSettingsRef()); + + output->writePrefix(); + output->write(sample); + output->writeSuffix(); + } + + // get read stream from DeltaMergeStorage + QueryProcessingStage::Enum stage2; + SelectQueryInfo query_info; + query_info.query = std::make_shared(); + query_info.mvcc_query_info = std::make_unique(ctx.getSettingsRef().resolve_locks, std::numeric_limits::max()); + Names read_columns = {"col1", EXTRA_TABLE_ID_COLUMN_NAME, "col2"}; + BlockInputStreams ins = storage->read(read_columns, query_info, ctx, stage2, 8192, 1); + ASSERT_EQ(ins.size(), 1); + BlockInputStreamPtr in = ins[0]; + in->readPrefix(); + + size_t num_rows_read = 0; + while (Block block = in->read()) + { + ASSERT_EQ(block.getByPosition(1).name, EXTRA_TABLE_ID_COLUMN_NAME); + num_rows_read += block.rows(); + for (auto & iter : block) + { + auto c = iter.column; + for (unsigned int i = 0; i < c->size(); i++) + { + if (iter.name == "col1") + { + ASSERT_EQ(c->getInt(i), i); + } + else if (iter.name == "col2") + { + ASSERT_EQ(c->getDataAt(i), "a"); + } + else if (iter.name == EXTRA_TABLE_ID_COLUMN_NAME) + { + ASSERT_EQ(c->getInt(i), 1); + } + } + } + } + in->readSuffix(); + ASSERT_EQ(num_rows_read, sample.rows()); + storage->drop(); +} +CATCH } // namespace tests } // namespace DM diff --git a/dbms/src/Storages/ITableDeclaration.cpp b/dbms/src/Storages/ITableDeclaration.cpp index b34ad7cfc87..87b76250589 100644 --- a/dbms/src/Storages/ITableDeclaration.cpp +++ b/dbms/src/Storages/ITableDeclaration.cpp @@ -1,34 +1,31 @@ -#include #include +#include +#include #include #include #include - -#include #include +#include namespace DB { - namespace ErrorCodes { - extern const int NO_SUCH_COLUMN_IN_TABLE; - extern const int EMPTY_LIST_OF_COLUMNS_QUERIED; - extern const int COLUMN_QUERIED_MORE_THAN_ONCE; - extern const int TYPE_MISMATCH; - extern const int DUPLICATE_COLUMN; - extern const int NOT_FOUND_COLUMN_IN_BLOCK; - extern const int EMPTY_LIST_OF_COLUMNS_PASSED; -} +extern const int NO_SUCH_COLUMN_IN_TABLE; +extern const int EMPTY_LIST_OF_COLUMNS_QUERIED; +extern const int COLUMN_QUERIED_MORE_THAN_ONCE; +extern const int TYPE_MISMATCH; +extern const int DUPLICATE_COLUMN; +extern const int NOT_FOUND_COLUMN_IN_BLOCK; +extern const int EMPTY_LIST_OF_COLUMNS_PASSED; +} // namespace ErrorCodes void ITableDeclaration::setColumns(ColumnsDescription columns_) { - if (columns_.ordinary.empty()) - throw Exception("Empty list of columns passed", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED); - columns = std::move(columns_); + setColumnsImpl(columns_); } @@ -48,7 +45,7 @@ Block ITableDeclaration::getSampleBlock() const Block res; for (const auto & col : boost::join(getColumns().ordinary, getColumns().materialized)) - res.insert({ col.type->createColumn(), col.type, col.name }); + res.insert({col.type->createColumn(), col.type, col.name}); return res; } @@ -59,7 +56,7 @@ Block ITableDeclaration::getSampleBlockNonMaterialized() const Block res; for (const auto & col : getColumns().ordinary) - res.insert({ col.type->createColumn(), col.type, col.name }); + res.insert({col.type->createColumn(), col.type, col.name}); return res; } @@ -71,8 +68,8 @@ Block ITableDeclaration::getSampleBlockForColumns(const Names & column_names) co for (const auto & name : column_names) { - auto col = getColumn(name); - res.insert({ col.type->createColumn(), col.type, name }); + auto col = name == MutableSupport::extra_table_id_column_name ? NameAndTypePair(name, MutableSupport::extra_table_id_column_type) : getColumn(name); + res.insert({col.type->createColumn(), col.type, name}); } return res; @@ -86,7 +83,7 @@ Block ITableDeclaration::getSampleBlockNoHidden() const for (const auto & col : boost::join(getColumns().ordinary, getColumns().materialized)) if (!hidden.has(col.name)) - res.insert({ col.type->createColumn(), col.type, col.name }); + res.insert({col.type->createColumn(), col.type, col.name}); return res; } @@ -99,7 +96,7 @@ Block ITableDeclaration::getSampleBlockNonMaterializedNoHidden() const for (const auto & col : getColumns().ordinary) if (!hidden.has(col.name)) - res.insert({ col.type->createColumn(), col.type, col.name }); + res.insert({col.type->createColumn(), col.type, col.name}); return res; } @@ -120,7 +117,10 @@ static std::string listOfColumns(const NamesAndTypesList & available_columns) using NamesAndTypesMap = google::dense_hash_map; -static NamesAndTypesMap & getColumnsMapImpl(NamesAndTypesMap & res) { return res; } +static NamesAndTypesMap & getColumnsMapImpl(NamesAndTypesMap & res) +{ + return res; +} template static NamesAndTypesMap & getColumnsMapImpl(NamesAndTypesMap & res, const Arg & arg, const Args &... args) @@ -149,7 +149,7 @@ void ITableDeclaration::check(const Names & column_names) const if (column_names.empty()) throw Exception("Empty list of columns queried. There are columns: " + listOfColumns(available_columns), - ErrorCodes::EMPTY_LIST_OF_COLUMNS_QUERIED); + ErrorCodes::EMPTY_LIST_OF_COLUMNS_QUERIED); const auto columns_map = getColumnsMap(available_columns); @@ -161,11 +161,11 @@ void ITableDeclaration::check(const Names & column_names) const { if (columns_map.end() == columns_map.find(name)) throw Exception("There is no column with name " + name + " in table. There are columns: " + listOfColumns(available_columns), - ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); + ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); if (unique_names.end() != unique_names.find(name)) throw Exception("Column " + name + " queried more than once", - ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE); + ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE); unique_names.insert(name); } } @@ -185,15 +185,17 @@ void ITableDeclaration::check(const NamesAndTypesList & provided_columns) const NamesAndTypesMap::const_iterator it = columns_map.find(column.name); if (columns_map.end() == it) throw Exception("There is no column with name " + column.name + ". There are columns: " - + listOfColumns(available_columns), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); + + listOfColumns(available_columns), + ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); if (!column.type->equals(*it->second)) throw Exception("Type mismatch for column " + column.name + ". Column has type " - + it->second->getName() + ", got type " + column.type->getName(), ErrorCodes::TYPE_MISMATCH); + + it->second->getName() + ", got type " + column.type->getName(), + ErrorCodes::TYPE_MISMATCH); if (unique_names.end() != unique_names.find(column.name)) throw Exception("Column " + column.name + " queried more than once", - ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE); + ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE); unique_names.insert(column.name); } } @@ -207,7 +209,7 @@ void ITableDeclaration::check(const NamesAndTypesList & provided_columns, const if (column_names.empty()) throw Exception("Empty list of columns queried. There are columns: " + listOfColumns(available_columns), - ErrorCodes::EMPTY_LIST_OF_COLUMNS_QUERIED); + ErrorCodes::EMPTY_LIST_OF_COLUMNS_QUERIED); using UniqueStrings = google::dense_hash_set; UniqueStrings unique_names; @@ -222,15 +224,17 @@ void ITableDeclaration::check(const NamesAndTypesList & provided_columns, const NamesAndTypesMap::const_iterator jt = available_columns_map.find(name); if (available_columns_map.end() == jt) throw Exception("There is no column with name " + name + ". There are columns: " - + listOfColumns(available_columns), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); + + listOfColumns(available_columns), + ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); if (it->second->getName() != jt->second->getName()) throw Exception("Type mismatch for column " + name + ". Column has type " - + jt->second->getName() + ", got type " + it->second->getName(), ErrorCodes::TYPE_MISMATCH); + + jt->second->getName() + ", got type " + it->second->getName(), + ErrorCodes::TYPE_MISMATCH); if (unique_names.end() != unique_names.find(name)) throw Exception("Column " + name + " queried more than once", - ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE); + ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE); unique_names.insert(name); } } @@ -257,19 +261,21 @@ void ITableDeclaration::check(const Block & block, bool need_all) const NamesAndTypesMap::const_iterator it = columns_map.find(column.name); if (columns_map.end() == it) throw Exception("There is no column with name " + column.name + ". There are columns: " - + listOfColumns(available_columns), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); + + listOfColumns(available_columns), + ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); if (!column.type->equals(*it->second)) throw Exception("Type mismatch for column " + column.name + ". Column has type " - + it->second->getName() + ", got type " + column.type->getName(), ErrorCodes::TYPE_MISMATCH); + + it->second->getName() + ", got type " + column.type->getName(), + ErrorCodes::TYPE_MISMATCH); } if (need_all && names_in_block.size() < columns_map.size()) { - for (NamesAndTypesList::const_iterator it = available_columns.begin(); it != available_columns.end(); ++it) + for (const auto & available_column : available_columns) { - if (!names_in_block.count(it->name)) - throw Exception("Expected column " + it->name, ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK); + if (!names_in_block.count(available_column.name)) + throw Exception("Expected column " + available_column.name, ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK); } } } @@ -277,7 +283,13 @@ void ITableDeclaration::check(const Block & block, bool need_all) const ITableDeclaration::ITableDeclaration(ColumnsDescription columns_) { - setColumns(std::move(columns_)); + setColumnsImpl(columns_); } +void ITableDeclaration::setColumnsImpl(ColumnsDescription columns_) +{ + if (columns_.ordinary.empty()) + throw Exception("Empty list of columns passed", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED); + columns = std::move(columns_); } +} // namespace DB diff --git a/dbms/src/Storages/ITableDeclaration.h b/dbms/src/Storages/ITableDeclaration.h index c62a77cc1db..9dcc46cbab9 100644 --- a/dbms/src/Storages/ITableDeclaration.h +++ b/dbms/src/Storages/ITableDeclaration.h @@ -5,7 +5,6 @@ namespace DB { - /** Description of the table. * Is not thread safe. See IStorage::lockStructure (). */ @@ -62,6 +61,7 @@ class ITableDeclaration { return empty_names; } + void setColumnsImpl(ColumnsDescription columns_); }; -} +} // namespace DB diff --git a/dbms/src/Storages/MutableSupport.cpp b/dbms/src/Storages/MutableSupport.cpp index e1d9fdc380a..0f10d8014a3 100644 --- a/dbms/src/Storages/MutableSupport.cpp +++ b/dbms/src/Storages/MutableSupport.cpp @@ -3,7 +3,6 @@ namespace DB { - const String MutableSupport::mmt_storage_name = "MutableMergeTree"; const String MutableSupport::txn_storage_name = "TxnMergeTree"; const String MutableSupport::delta_tree_storage_name = "DeltaMerge"; @@ -11,10 +10,13 @@ const String MutableSupport::delta_tree_storage_name = "DeltaMerge"; const String MutableSupport::tidb_pk_column_name = "_tidb_rowid"; const String MutableSupport::version_column_name = "_INTERNAL_VERSION"; const String MutableSupport::delmark_column_name = "_INTERNAL_DELMARK"; +const String MutableSupport::extra_table_id_column_name = "_tidb_tid"; const DataTypePtr MutableSupport::tidb_pk_column_int_type = DataTypeFactory::instance().get("Int64"); const DataTypePtr MutableSupport::tidb_pk_column_string_type = DataTypeFactory::instance().get("String"); const DataTypePtr MutableSupport::version_column_type = DataTypeFactory::instance().get("UInt64"); const DataTypePtr MutableSupport::delmark_column_type = DataTypeFactory::instance().get("UInt8"); +const DataTypePtr MutableSupport::extra_table_id_column_type = DataTypeFactory::instance().get("Int64"); +; } // namespace DB diff --git a/dbms/src/Storages/MutableSupport.h b/dbms/src/Storages/MutableSupport.h index 72e0eb534ee..048250280c0 100644 --- a/dbms/src/Storages/MutableSupport.h +++ b/dbms/src/Storages/MutableSupport.h @@ -54,11 +54,13 @@ class MutableSupport : public ext::Singleton static const String tidb_pk_column_name; static const String version_column_name; static const String delmark_column_name; + static const String extra_table_id_column_name; static const DataTypePtr tidb_pk_column_int_type; static const DataTypePtr tidb_pk_column_string_type; static const DataTypePtr version_column_type; static const DataTypePtr delmark_column_type; + static const DataTypePtr extra_table_id_column_type; /// mark that ColumnId of those columns are defined in dbms/src/Storages/Transaction/Types.h diff --git a/dbms/src/Storages/PrimaryKeyNotMatchException.h b/dbms/src/Storages/PrimaryKeyNotMatchException.h index f16cad063f6..51fcd230aab 100644 --- a/dbms/src/Storages/PrimaryKeyNotMatchException.h +++ b/dbms/src/Storages/PrimaryKeyNotMatchException.h @@ -9,22 +9,21 @@ class Logger; namespace DB { - class Context; -struct PrimaryKeyNotMatchException +struct PrimaryKeyNotMatchException : public std::exception { // The primary key name in definition const String pri_key; // The actual primary key name in TiDB::TableInfo const String actual_pri_key; PrimaryKeyNotMatchException(const String & pri_key_, const String & actual_pri_key_) - : pri_key(pri_key_), actual_pri_key(actual_pri_key_) + : pri_key(pri_key_) + , actual_pri_key(actual_pri_key_) {} }; // This function will replace the primary key and update statement in `table_metadata_path`. The correct statement will be return. -String fixCreateStatementWithPriKeyNotMatchException(Context & context, const String old_definition, const String & table_metadata_path, - const PrimaryKeyNotMatchException & ex, Poco::Logger * log); +String fixCreateStatementWithPriKeyNotMatchException(Context & context, const String old_definition, const String & table_metadata_path, const PrimaryKeyNotMatchException & ex, Poco::Logger * log); } // namespace DB diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index bce84934358..7e07fccde39 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -563,18 +563,24 @@ BlockInputStreams StorageDeltaMerge::read( // failed to parsed. ColumnDefines columns_to_read; auto header = store->getHeader(); - for (const auto & n : column_names) + size_t extra_table_id_index = InvalidColumnID; + for (size_t i = 0; i < column_names.size(); i++) { ColumnDefine col_define; - if (n == EXTRA_HANDLE_COLUMN_NAME) + if (column_names[i] == EXTRA_HANDLE_COLUMN_NAME) col_define = store->getHandle(); - else if (n == VERSION_COLUMN_NAME) + else if (column_names[i] == VERSION_COLUMN_NAME) col_define = getVersionColumnDefine(); - else if (n == TAG_COLUMN_NAME) + else if (column_names[i] == TAG_COLUMN_NAME) col_define = getTagColumnDefine(); + else if (column_names[i] == EXTRA_TABLE_ID_COLUMN_NAME) + { + extra_table_id_index = i; + continue; + } else { - auto & column = header->getByName(n); + auto & column = header->getByName(column_names[i]); col_define.name = column.name; col_define.id = column.column_id; col_define.type = column.type; @@ -592,7 +598,8 @@ BlockInputStreams StorageDeltaMerge::read( context.getSettingsRef(), columns_to_read, num_streams, - parseSegmentSet(select_query.segment_expression_list)); + parseSegmentSet(select_query.segment_expression_list), + extra_table_id_index); } // Read with MVCC filtering @@ -714,7 +721,8 @@ BlockInputStreams StorageDeltaMerge::read( /*max_version=*/mvcc_query_info.read_tso, rs_operator, max_block_size, - parseSegmentSet(select_query.segment_expression_list)); + parseSegmentSet(select_query.segment_expression_list), + extra_table_id_index); /// Ensure read_tso info after read. check_read_tso(mvcc_query_info.read_tso); @@ -798,7 +806,7 @@ size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM DM::RowKeyRange getRange(DM::DeltaMergeStorePtr & store, const Context & context, size_t total_rows, size_t delete_rows) { - auto start_index = rand() % (total_rows - delete_rows + 1); + auto start_index = random() % (total_rows - delete_rows + 1); DM::RowKeyRange range = DM::RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()); { @@ -931,7 +939,7 @@ static void updateDeltaMergeTableCreateStatement( const SortDescription & pk_names, const ColumnsDescription & columns, const OrderedNameSet & hidden_columns, - const OptionTableInfoConstRef table_info, + OptionTableInfoConstRef table_info, Timestamp tombstone, const Context & context); @@ -1179,7 +1187,8 @@ void StorageDeltaMerge::rename( std::move(handle_column_define), is_common_handle, rowkey_column_size, - settings); + settings, + tidb_table_info.id); } String StorageDeltaMerge::getTableName() const @@ -1418,7 +1427,7 @@ void StorageDeltaMerge::startup() tmt.getStorages().put(std::static_pointer_cast(shared_from_this())); } -void StorageDeltaMerge::shutdown() +void StorageDeltaMerge::shutdownImpl() { bool v = false; if (!shutdown_called.compare_exchange_strong(v, true)) @@ -1429,6 +1438,11 @@ void StorageDeltaMerge::shutdown() } } +void StorageDeltaMerge::shutdown() +{ + shutdownImpl(); +} + void StorageDeltaMerge::removeFromTMTContext() { // remove this table from TMTContext @@ -1439,7 +1453,7 @@ void StorageDeltaMerge::removeFromTMTContext() StorageDeltaMerge::~StorageDeltaMerge() { - shutdown(); + shutdownImpl(); } DataTypePtr StorageDeltaMerge::getPKTypeImpl() const @@ -1490,7 +1504,8 @@ DeltaMergeStorePtr & StorageDeltaMerge::getAndMaybeInitStore() std::move(table_column_info->handle_column_define), is_common_handle, rowkey_column_size, - DeltaMergeStore::Settings()); + DeltaMergeStore::Settings(), + tidb_table_info.id); table_column_info.reset(nullptr); store_inited.store(true, std::memory_order_release); } diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index d38feced258..b52bd568b70 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -168,6 +168,7 @@ class StorageDeltaMerge void updateTableColumnInfo(); DM::ColumnDefines getStoreColumnDefines() const; bool dataDirExist(); + void shutdownImpl(); #ifndef DBMS_PUBLIC_GTEST private: diff --git a/dbms/src/Storages/Transaction/Types.h b/dbms/src/Storages/Transaction/Types.h index 3ecb4bc6ba4..d14b2f4d0ba 100644 --- a/dbms/src/Storages/Transaction/Types.h +++ b/dbms/src/Storages/Transaction/Types.h @@ -7,7 +7,6 @@ namespace DB { - using TableID = Int64; using TableIDSet = std::unordered_set; @@ -24,6 +23,7 @@ enum : ColumnID { // Prevent conflict with TiDB. TiDBPkColumnID = -1, + ExtraTableIDColumnID = -3, VersionColumnID = -1024, DelMarkColumnID = -1025, InvalidColumnID = -10000, diff --git a/tests/fullstack-test/mpp/extra_physical_table_column.test b/tests/fullstack-test/mpp/extra_physical_table_column.test new file mode 100644 index 00000000000..0eac2281c70 --- /dev/null +++ b/tests/fullstack-test/mpp/extra_physical_table_column.test @@ -0,0 +1,65 @@ +# Preparation. +=> DBGInvoke __init_fail_point() + +mysql> drop table if exists test.employees; +mysql> CREATE TABLE test.employees ( id int(11) NOT NULL, fname varchar(30) DEFAULT NULL, lname varchar(30) DEFAULT NULL, hired date NOT NULL DEFAULT '1970-01-01', separated date DEFAULT '9999-12-31', job_code int(11) DEFAULT NULL, store_id int(11) NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin PARTITION BY RANGE (store_id) (PARTITION p0 VALUES LESS THAN (6), PARTITION p1 VALUES LESS THAN (11), PARTITION p2 VALUES LESS THAN (16), PARTITION p3 VALUES LESS THAN (21), PARTITION p4 VALUES LESS THAN (MAXVALUE)); +mysql> alter table test.employees set tiflash replica 1; + +func> wait_table test employees + +mysql> set tidb_partition_prune_mode=dynamic; begin; insert into test.employees values(100,'aa','aa','2020-01-01',null,10,10); select count(*) from test.employees; insert into test.employees values(100,'aa','aa','2020-01-01',null,10,23); select * from test.employees where store_id > 10; set session tidb_allow_batch_cop=2; select count(*) from test.employees; select * from test.employees where store_id > 10; commit; ++----------+ +| count(*) | ++----------+ +| 1 | ++----------+ ++-----+-------+-------+------------+-----------+----------+----------+ +| id | fname | lname | hired | separated | job_code | store_id | ++-----+-------+-------+------------+-----------+----------+----------+ +| 100 | aa | aa | 2020-01-01 | NULL | 10 | 23 | ++-----+-------+-------+------------+-----------+----------+----------+ ++----------+ +| count(*) | ++----------+ +| 2 | ++----------+ ++-----+-------+-------+------------+-----------+----------+----------+ +| id | fname | lname | hired | separated | job_code | store_id | ++-----+-------+-------+------------+-----------+----------+----------+ +| 100 | aa | aa | 2020-01-01 | NULL | 10 | 23 | ++-----+-------+-------+------------+-----------+----------+----------+ + +=> DBGInvoke __enable_fail_point(force_remote_read_for_batch_cop) + +mysql> drop table if exists test.employees; +mysql> CREATE TABLE test.employees ( id int(11) NOT NULL, fname varchar(30) DEFAULT NULL, lname varchar(30) DEFAULT NULL, hired date NOT NULL DEFAULT '1970-01-01', separated date DEFAULT '9999-12-31', job_code int(11) DEFAULT NULL, store_id int(11) NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin PARTITION BY RANGE (store_id) (PARTITION p0 VALUES LESS THAN (6), PARTITION p1 VALUES LESS THAN (11), PARTITION p2 VALUES LESS THAN (16), PARTITION p3 VALUES LESS THAN (21), PARTITION p4 VALUES LESS THAN (MAXVALUE)); +mysql> alter table test.employees set tiflash replica 1; + +func> wait_table test employees + +mysql> set tidb_partition_prune_mode=dynamic; begin; insert into test.employees values(100,'aa','aa','2020-01-01',null,10,10); select count(*) from test.employees; insert into test.employees values(100,'aa','aa','2020-01-01',null,10,23);select * from test.employees where store_id > 10; set session tidb_allow_batch_cop=2; select count(*) from test.employees; select * from test.employees where store_id > 10; commit; ++----------+ +| count(*) | ++----------+ +| 1 | ++----------+ ++-----+-------+-------+------------+-----------+----------+----------+ +| id | fname | lname | hired | separated | job_code | store_id | ++-----+-------+-------+------------+-----------+----------+----------+ +| 100 | aa | aa | 2020-01-01 | NULL | 10 | 23 | ++-----+-------+-------+------------+-----------+----------+----------+ ++----------+ +| count(*) | ++----------+ +| 2 | ++----------+ ++-----+-------+-------+------------+-----------+----------+----------+ +| id | fname | lname | hired | separated | job_code | store_id | ++-----+-------+-------+------------+-----------+----------+----------+ +| 100 | aa | aa | 2020-01-01 | NULL | 10 | 23 | ++-----+-------+-------+------------+-----------+----------+----------+ + +=> DBGInvoke __disable_fail_point(force_remote_read_for_batch_cop) + +# Clean up. +mysql> drop table if exists test.employees;