Skip to content

Commit

Permalink
TiFlash support TiDB push down ExtraPhysTblID column (#4203)
Browse files Browse the repository at this point in the history
close #4180
  • Loading branch information
hehechen authored Mar 11, 2022
1 parent b887653 commit e8fae73
Show file tree
Hide file tree
Showing 15 changed files with 347 additions and 93 deletions.
68 changes: 41 additions & 27 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ MakeRegionQueryInfos(
mvcc_info.regions_query_info.clear();
RegionRetryList region_need_retry;
RegionException::RegionReadStatus status_res = RegionException::RegionReadStatus::OK;
for (auto & [id, r] : dag_region_infos)
for (const auto & [id, r] : dag_region_infos)
{
if (r.key_ranges.empty())
{
Expand Down Expand Up @@ -155,7 +155,7 @@ void DAGStorageInterpreter::execute(DAGPipeline & pipeline)
if (!mvcc_query_info->regions_query_info.empty())
doLocalRead(pipeline, settings.max_block_size);

for (auto & region_info : dag_context.getRegionsForRemoteRead())
for (const auto & region_info : dag_context.getRegionsForRemoteRead())
region_retry.emplace_back(region_info);

null_stream_if_empty = std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns));
Expand Down Expand Up @@ -267,7 +267,7 @@ void DAGStorageInterpreter::doLocalRead(DAGPipeline & pipeline, size_t max_block
RegionException::UnavailableRegions region_ids;
for (const auto & info : regions_info)
{
if (rand() % 100 > 50)
if (random() % 100 > 50)
region_ids.insert(info.region_id);
}
throw RegionException(std::move(region_ids), RegionException::RegionReadStatus::NOT_FOUND);
Expand Down Expand Up @@ -364,12 +364,12 @@ std::tuple<ManageableStoragePtr, TableStructureLockHolder> DAGStorageInterpreter
/// Get current schema version in schema syncer for a chance to shortcut.
if (unlikely(query_schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION))
{
auto storage_ = tmt.getStorages().get(table_id);
if (!storage_)
auto managed_storage = tmt.getStorages().get(table_id);
if (!managed_storage)
{
throw TiFlashException("Table " + std::to_string(table_id) + " doesn't exist.", Errors::Table::NotExists);
}
return {storage_, storage_->lockStructureForShare(context.getCurrentQueryId())};
return {managed_storage, managed_storage->lockStructureForShare(context.getCurrentQueryId())};
}

auto global_schema_version = tmt.getSchemaSyncer()->getCurrentVersion();
Expand All @@ -379,31 +379,31 @@ std::tuple<ManageableStoragePtr, TableStructureLockHolder> DAGStorageInterpreter
auto get_and_lock_storage = [&](bool schema_synced) -> std::tuple<ManageableStoragePtr, TableStructureLockHolder, Int64, bool> {
/// Get storage in case it's dropped then re-created.
// If schema synced, call getTable without try, leading to exception on table not existing.
auto storage_ = tmt.getStorages().get(table_id);
if (!storage_)
auto managed_storage = tmt.getStorages().get(table_id);
if (!managed_storage)
{
if (schema_synced)
throw TiFlashException("Table " + std::to_string(table_id) + " doesn't exist.", Errors::Table::NotExists);
else
return {nullptr, TableStructureLockHolder{}, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, false};
}

if (storage_->engineType() != ::TiDB::StorageEngine::TMT && storage_->engineType() != ::TiDB::StorageEngine::DT)
if (managed_storage->engineType() != ::TiDB::StorageEngine::TMT && managed_storage->engineType() != ::TiDB::StorageEngine::DT)
{
throw TiFlashException("Specifying schema_version for non-managed storage: " + storage_->getName()
+ ", table: " + storage_->getTableName() + ", id: " + DB::toString(table_id) + " is not allowed",
throw TiFlashException("Specifying schema_version for non-managed storage: " + managed_storage->getName()
+ ", table: " + managed_storage->getTableName() + ", id: " + DB::toString(table_id) + " is not allowed",
Errors::Coprocessor::Internal);
}

auto lock = storage_->lockStructureForShare(context.getCurrentQueryId());
auto lock = managed_storage->lockStructureForShare(context.getCurrentQueryId());

/// Check schema version, requiring TiDB/TiSpark and TiFlash both use exactly the same schema.
// We have three schema versions, two in TiFlash:
// 1. Storage: the version that this TiFlash table (storage) was last altered.
// 2. Global: the version that TiFlash global schema is at.
// And one from TiDB/TiSpark:
// 3. Query: the version that TiDB/TiSpark used for this query.
auto storage_schema_version = storage_->getTableInfo().schema_version;
auto storage_schema_version = managed_storage->getTableInfo().schema_version;
// Not allow storage > query in any case, one example is time travel queries.
if (storage_schema_version > query_schema_version)
throw TiFlashException("Table " + std::to_string(table_id) + " schema version " + std::to_string(storage_schema_version)
Expand All @@ -412,13 +412,13 @@ std::tuple<ManageableStoragePtr, TableStructureLockHolder> DAGStorageInterpreter
// From now on we have storage <= query.
// If schema was synced, it implies that global >= query, as mentioned above we have storage <= query, we are OK to serve.
if (schema_synced)
return {storage_, lock, storage_schema_version, true};
return {managed_storage, lock, storage_schema_version, true};
// From now on the schema was not synced.
// 1. storage == query, TiDB/TiSpark is using exactly the same schema that altered this table, we are just OK to serve.
// 2. global >= query, TiDB/TiSpark is using a schema older than TiFlash global, but as mentioned above we have storage <= query,
// meaning that the query schema is still newer than the time when this table was last altered, so we still OK to serve.
if (storage_schema_version == query_schema_version || global_schema_version >= query_schema_version)
return {storage_, lock, storage_schema_version, true};
return {managed_storage, lock, storage_schema_version, true};
// From now on we have global < query.
// Return false for outer to sync and retry.
return {nullptr, TableStructureLockHolder{}, storage_schema_version, false};
Expand Down Expand Up @@ -480,24 +480,38 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>, String> DAGS
Errors::BroadcastJoin::TooManyColumns);
}

Names required_columns_;
NamesAndTypes source_columns_;
Names table_scan_required_columns;
NamesAndTypes table_scan_source_columns;
std::vector<ExtraCastAfterTSMode> need_cast_column;
need_cast_column.reserve(table_scan.columns_size());
String handle_column_name_ = MutableSupport::tidb_pk_column_name;
String table_scan_handle_column_name = MutableSupport::tidb_pk_column_name;
if (auto pk_handle_col = storage->getTableInfo().getPKHandleColumn())
handle_column_name_ = pk_handle_col->get().name;
table_scan_handle_column_name = pk_handle_col->get().name;

for (Int32 i = 0; i < table_scan.columns_size(); i++)
{
auto const & ci = table_scan.columns(i);
ColumnID cid = ci.column_id();

// Column ID -1 return the handle column
String name = cid == -1 ? handle_column_name_ : storage->getTableInfo().getColumnName(cid);
auto pair = storage->getColumns().getPhysical(name);
required_columns_.emplace_back(std::move(name));
source_columns_.emplace_back(std::move(pair));
// Column ID -1 return the handle column, -3 return the extra physical table id column
String name;
if (cid == TiDBPkColumnID)
name = table_scan_handle_column_name;
else if (cid == ExtraTableIDColumnID)
name = MutableSupport::extra_table_id_column_name;
else
name = storage->getTableInfo().getColumnName(cid);
if (cid == ExtraTableIDColumnID)
{
NameAndTypePair extra_table_id_column_pair = {name, MutableSupport::extra_table_id_column_type};
table_scan_source_columns.emplace_back(std::move(extra_table_id_column_pair));
}
else
{
auto pair = storage->getColumns().getPhysical(name);
table_scan_source_columns.emplace_back(std::move(pair));
}
table_scan_required_columns.emplace_back(std::move(name));
if (cid != -1 && ci.tp() == TiDB::TypeTimestamp)
need_cast_column.push_back(ExtraCastAfterTSMode::AppendTimeZoneCast);
else if (cid != -1 && ci.tp() == TiDB::TypeTime)
Expand All @@ -506,7 +520,7 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>, String> DAGS
need_cast_column.push_back(ExtraCastAfterTSMode::None);
}

return {required_columns_, source_columns_, need_cast_column, handle_column_name_};
return {table_scan_required_columns, table_scan_source_columns, need_cast_column, table_scan_handle_column_name};
}

std::tuple<std::optional<tipb::DAGRequest>, std::optional<DAGSchema>> DAGStorageInterpreter::buildRemoteTS()
Expand Down Expand Up @@ -541,7 +555,7 @@ std::tuple<std::optional<tipb::DAGRequest>, std::optional<DAGSchema>> DAGStorage
executor->set_tp(tipb::ExecType::TypeSelection);
executor->set_executor_id(query_block.selection->executor_id());
auto * selection = executor->mutable_selection();
for (auto & condition : query_block.selection->selection().conditions())
for (const auto & condition : query_block.selection->selection().conditions())
*selection->add_conditions() = condition;
executor = selection->mutable_child();
}
Expand All @@ -568,7 +582,7 @@ std::tuple<std::optional<tipb::DAGRequest>, std::optional<DAGSchema>> DAGStorage
}
else
{
auto & col_info = table_info.getColumnInfo(col_id);
const auto & col_info = table_info.getColumnInfo(col_id);
schema.emplace_back(std::make_pair(col_info.name, col_info));
}
dag_req.add_output_offsets(i);
Expand Down
22 changes: 22 additions & 0 deletions dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
size_t expected_block_size_,
bool is_raw_,
bool do_range_filter_for_raw_,
const int extra_table_id_index,
const TableID physical_table_id,
const LogWithPrefixPtr & log_)
: dm_context(dm_context_)
, task_pool(task_pool_)
Expand All @@ -47,8 +49,16 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
, expected_block_size(expected_block_size_)
, is_raw(is_raw_)
, do_range_filter_for_raw(do_range_filter_for_raw_)
, extra_table_id_index(extra_table_id_index)
, physical_table_id(physical_table_id)
, log(getMPPTaskLog(log_, NAME))
{
if (extra_table_id_index != InvalidColumnID)
{
ColumnDefine extra_table_id_col_define = getExtraTableIDColumnDefine();
ColumnWithTypeAndName col{extra_table_id_col_define.type->createColumn(), extra_table_id_col_define.type, extra_table_id_col_define.name, extra_table_id_col_define.id, extra_table_id_col_define.default_value};
header.insert(extra_table_id_index, col);
}
}

String getName() const override { return NAME; }
Expand Down Expand Up @@ -102,6 +112,15 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream

if (res)
{
if (extra_table_id_index != InvalidColumnID)
{
ColumnDefine extra_table_id_col_define = getExtraTableIDColumnDefine();
ColumnWithTypeAndName col{{}, extra_table_id_col_define.type, extra_table_id_col_define.name, extra_table_id_col_define.id};
size_t row_number = res.rows();
auto col_data = col.type->createColumnConst(row_number, Field(physical_table_id));
col.column = std::move(col_data);
res.insert(extra_table_id_index, std::move(col));
}
if (!res.rows())
continue;
else
Expand All @@ -128,12 +147,15 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
const size_t expected_block_size;
const bool is_raw;
const bool do_range_filter_for_raw;
// position of the ExtraPhysTblID column in column_names parameter in the StorageDeltaMerge::read function.
const int extra_table_id_index;

bool done = false;

BlockInputStreamPtr cur_stream;

SegmentPtr cur_segment;
TableID physical_table_id;

LogWithPrefixPtr log;
};
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,18 @@ inline static const UInt64 INITIAL_EPOCH = 0;
#define EXTRA_HANDLE_COLUMN_NAME ::DB::MutableSupport::tidb_pk_column_name
#define VERSION_COLUMN_NAME ::DB::MutableSupport::version_column_name
#define TAG_COLUMN_NAME ::DB::MutableSupport::delmark_column_name
#define EXTRA_TABLE_ID_COLUMN_NAME ::DB::MutableSupport::extra_table_id_column_name

#define EXTRA_HANDLE_COLUMN_ID ::DB::TiDBPkColumnID
#define VERSION_COLUMN_ID ::DB::VersionColumnID
#define TAG_COLUMN_ID ::DB::DelMarkColumnID
#define EXTRA_TABLE_ID_COLUMN_ID ::DB::ExtraTableIDColumnID

#define EXTRA_HANDLE_COLUMN_INT_TYPE ::DB::MutableSupport::tidb_pk_column_int_type
#define EXTRA_HANDLE_COLUMN_STRING_TYPE ::DB::MutableSupport::tidb_pk_column_string_type
#define VERSION_COLUMN_TYPE ::DB::MutableSupport::version_column_type
#define TAG_COLUMN_TYPE ::DB::MutableSupport::delmark_column_type
#define EXTRA_TABLE_ID_COLUMN_TYPE ::DB::MutableSupport::extra_table_id_column_type

inline const ColumnDefine & getExtraIntHandleColumnDefine()
{
Expand Down Expand Up @@ -134,6 +137,11 @@ inline const ColumnDefine & getTagColumnDefine()
static ColumnDefine TAG_COLUMN_DEFINE_{TAG_COLUMN_ID, TAG_COLUMN_NAME, TAG_COLUMN_TYPE};
return TAG_COLUMN_DEFINE_;
}
inline const ColumnDefine & getExtraTableIDColumnDefine()
{
static ColumnDefine EXTRA_TABLE_ID_COLUMN_DEFINE_{EXTRA_TABLE_ID_COLUMN_ID, EXTRA_TABLE_ID_COLUMN_NAME, EXTRA_TABLE_ID_COLUMN_TYPE};
return EXTRA_TABLE_ID_COLUMN_DEFINE_;
}

static constexpr UInt64 MIN_UINT64 = std::numeric_limits<UInt64>::min();
static constexpr UInt64 MAX_UINT64 = std::numeric_limits<UInt64>::max();
Expand Down
14 changes: 11 additions & 3 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,15 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context,
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
const Settings & settings_)
const Settings & settings_,
const TableID physical_table_id)
: global_context(db_context.getGlobalContext())
, path_pool(global_context.getPathPool().withTable(db_name_, table_name_, data_path_contains_database_name))
, settings(settings_)
, storage_pool(db_name_ + "." + table_name_, path_pool, global_context, db_context.getSettingsRef())
, db_name(db_name_)
, table_name(table_name_)
, physical_table_id(physical_table_id)
, is_common_handle(is_common_handle_)
, rowkey_column_size(rowkey_column_size_)
, original_table_handle_define(handle)
Expand Down Expand Up @@ -954,7 +956,8 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
const DB::Settings & db_settings,
const ColumnDefines & columns_to_read,
size_t num_streams,
const SegmentIdSet & read_segments)
const SegmentIdSet & read_segments,
size_t extra_table_id_index)
{
SegmentReadTasks tasks;

Expand Down Expand Up @@ -995,6 +998,8 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
DEFAULT_BLOCK_SIZE,
true,
db_settings.dt_raw_filter_range,
extra_table_id_index,
physical_table_id,
nullptr);
res.push_back(stream);
}
Expand All @@ -1009,7 +1014,8 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
UInt64 max_version,
const RSOperatorPtr & filter,
size_t expected_block_size,
const SegmentIdSet & read_segments)
const SegmentIdSet & read_segments,
size_t extra_table_id_index)
{
auto dm_context = newDMContext(db_context, db_settings, db_context.getCurrentQueryId());

Expand Down Expand Up @@ -1038,6 +1044,8 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
expected_block_size,
false,
db_settings.dt_raw_filter_range,
extra_table_id_index,
physical_table_id,
nullptr);
res.push_back(stream);
}
Expand Down
10 changes: 7 additions & 3 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ class DeltaMergeStore : private boost::noncopyable
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
const Settings & settings_ = EMPTY_SETTINGS);
const Settings & settings_ = EMPTY_SETTINGS,
const TableID physical_table_id = 0);
~DeltaMergeStore();

void setUpBackgroundTask(const DMContextPtr & dm_context);
Expand Down Expand Up @@ -325,7 +326,8 @@ class DeltaMergeStore : private boost::noncopyable
const DB::Settings & db_settings,
const ColumnDefines & columns_to_read,
size_t num_streams,
const SegmentIdSet & read_segments = {});
const SegmentIdSet & read_segments = {},
size_t extra_table_id_index = InvalidColumnID);

/// Read rows with MVCC filtering
/// `sorted_ranges` should be already sorted and merged
Expand All @@ -337,7 +339,8 @@ class DeltaMergeStore : private boost::noncopyable
UInt64 max_version,
const RSOperatorPtr & filter,
size_t expected_block_size = DEFAULT_BLOCK_SIZE,
const SegmentIdSet & read_segments = {});
const SegmentIdSet & read_segments = {},
size_t extra_table_id_index = InvalidColumnID);

/// Force flush all data to disk.
void flushCache(const Context & context, const RowKeyRange & range)
Expand Down Expand Up @@ -436,6 +439,7 @@ class DeltaMergeStore : private boost::noncopyable

String db_name;
String table_name;
TableID physical_table_id;

bool is_common_handle;
size_t rowkey_column_size;
Expand Down
Loading

0 comments on commit e8fae73

Please sign in to comment.