Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TiFlash support TiDB push down ExtraPhysTblID column #4203

Merged
merged 11 commits into from
Mar 11, 2022
68 changes: 41 additions & 27 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ MakeRegionQueryInfos(
mvcc_info.regions_query_info.clear();
RegionRetryList region_need_retry;
RegionException::RegionReadStatus status_res = RegionException::RegionReadStatus::OK;
for (auto & [id, r] : dag_region_infos)
for (const auto & [id, r] : dag_region_infos)
{
if (r.key_ranges.empty())
{
Expand Down Expand Up @@ -155,7 +155,7 @@ void DAGStorageInterpreter::execute(DAGPipeline & pipeline)
if (!mvcc_query_info->regions_query_info.empty())
doLocalRead(pipeline, settings.max_block_size);

for (auto & region_info : dag_context.getRegionsForRemoteRead())
for (const auto & region_info : dag_context.getRegionsForRemoteRead())
region_retry.emplace_back(region_info);

null_stream_if_empty = std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns));
Expand Down Expand Up @@ -267,7 +267,7 @@ void DAGStorageInterpreter::doLocalRead(DAGPipeline & pipeline, size_t max_block
RegionException::UnavailableRegions region_ids;
for (const auto & info : regions_info)
{
if (rand() % 100 > 50)
if (random() % 100 > 50)
region_ids.insert(info.region_id);
}
throw RegionException(std::move(region_ids), RegionException::RegionReadStatus::NOT_FOUND);
Expand Down Expand Up @@ -364,12 +364,12 @@ std::tuple<ManageableStoragePtr, TableStructureLockHolder> DAGStorageInterpreter
/// Get current schema version in schema syncer for a chance to shortcut.
if (unlikely(query_schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION))
{
auto storage_ = tmt.getStorages().get(table_id);
if (!storage_)
auto managed_storage = tmt.getStorages().get(table_id);
if (!managed_storage)
{
throw TiFlashException("Table " + std::to_string(table_id) + " doesn't exist.", Errors::Table::NotExists);
}
return {storage_, storage_->lockStructureForShare(context.getCurrentQueryId())};
return {managed_storage, managed_storage->lockStructureForShare(context.getCurrentQueryId())};
}

auto global_schema_version = tmt.getSchemaSyncer()->getCurrentVersion();
Expand All @@ -379,31 +379,31 @@ std::tuple<ManageableStoragePtr, TableStructureLockHolder> DAGStorageInterpreter
auto get_and_lock_storage = [&](bool schema_synced) -> std::tuple<ManageableStoragePtr, TableStructureLockHolder, Int64, bool> {
/// Get storage in case it's dropped then re-created.
// If schema synced, call getTable without try, leading to exception on table not existing.
auto storage_ = tmt.getStorages().get(table_id);
if (!storage_)
auto managed_storage = tmt.getStorages().get(table_id);
if (!managed_storage)
{
if (schema_synced)
throw TiFlashException("Table " + std::to_string(table_id) + " doesn't exist.", Errors::Table::NotExists);
else
return {nullptr, TableStructureLockHolder{}, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, false};
}

if (storage_->engineType() != ::TiDB::StorageEngine::TMT && storage_->engineType() != ::TiDB::StorageEngine::DT)
if (managed_storage->engineType() != ::TiDB::StorageEngine::TMT && managed_storage->engineType() != ::TiDB::StorageEngine::DT)
{
throw TiFlashException("Specifying schema_version for non-managed storage: " + storage_->getName()
+ ", table: " + storage_->getTableName() + ", id: " + DB::toString(table_id) + " is not allowed",
throw TiFlashException("Specifying schema_version for non-managed storage: " + managed_storage->getName()
+ ", table: " + managed_storage->getTableName() + ", id: " + DB::toString(table_id) + " is not allowed",
Errors::Coprocessor::Internal);
}

auto lock = storage_->lockStructureForShare(context.getCurrentQueryId());
auto lock = managed_storage->lockStructureForShare(context.getCurrentQueryId());

/// Check schema version, requiring TiDB/TiSpark and TiFlash both use exactly the same schema.
// We have three schema versions, two in TiFlash:
// 1. Storage: the version that this TiFlash table (storage) was last altered.
// 2. Global: the version that TiFlash global schema is at.
// And one from TiDB/TiSpark:
// 3. Query: the version that TiDB/TiSpark used for this query.
auto storage_schema_version = storage_->getTableInfo().schema_version;
auto storage_schema_version = managed_storage->getTableInfo().schema_version;
// Not allow storage > query in any case, one example is time travel queries.
if (storage_schema_version > query_schema_version)
throw TiFlashException("Table " + std::to_string(table_id) + " schema version " + std::to_string(storage_schema_version)
Expand All @@ -412,13 +412,13 @@ std::tuple<ManageableStoragePtr, TableStructureLockHolder> DAGStorageInterpreter
// From now on we have storage <= query.
// If schema was synced, it implies that global >= query, as mentioned above we have storage <= query, we are OK to serve.
if (schema_synced)
return {storage_, lock, storage_schema_version, true};
return {managed_storage, lock, storage_schema_version, true};
// From now on the schema was not synced.
// 1. storage == query, TiDB/TiSpark is using exactly the same schema that altered this table, we are just OK to serve.
// 2. global >= query, TiDB/TiSpark is using a schema older than TiFlash global, but as mentioned above we have storage <= query,
// meaning that the query schema is still newer than the time when this table was last altered, so we still OK to serve.
if (storage_schema_version == query_schema_version || global_schema_version >= query_schema_version)
return {storage_, lock, storage_schema_version, true};
return {managed_storage, lock, storage_schema_version, true};
// From now on we have global < query.
// Return false for outer to sync and retry.
return {nullptr, TableStructureLockHolder{}, storage_schema_version, false};
Expand Down Expand Up @@ -480,24 +480,38 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>, String> DAGS
Errors::BroadcastJoin::TooManyColumns);
}

Names required_columns_;
NamesAndTypes source_columns_;
Names table_scan_required_columns;
NamesAndTypes table_scan_source_columns;
std::vector<ExtraCastAfterTSMode> need_cast_column;
need_cast_column.reserve(table_scan.columns_size());
String handle_column_name_ = MutableSupport::tidb_pk_column_name;
String table_scan_handle_column_name = MutableSupport::tidb_pk_column_name;
if (auto pk_handle_col = storage->getTableInfo().getPKHandleColumn())
handle_column_name_ = pk_handle_col->get().name;
table_scan_handle_column_name = pk_handle_col->get().name;

for (Int32 i = 0; i < table_scan.columns_size(); i++)
{
auto const & ci = table_scan.columns(i);
ColumnID cid = ci.column_id();

// Column ID -1 return the handle column
String name = cid == -1 ? handle_column_name_ : storage->getTableInfo().getColumnName(cid);
auto pair = storage->getColumns().getPhysical(name);
required_columns_.emplace_back(std::move(name));
source_columns_.emplace_back(std::move(pair));
// Column ID -1 return the handle column, -3 return the extra physical table id column
String name;
if (cid == TiDBPkColumnID)
name = table_scan_handle_column_name;
else if (cid == ExtraTblIDColumnID)
name = MutableSupport::extra_tblid_column_name;
else
name = storage->getTableInfo().getColumnName(cid);
if (cid == ExtraTblIDColumnID)
{
NameAndTypePair extra_tbl_column_pair = {name, MutableSupport::extra_tblid_column_type};
table_scan_source_columns.emplace_back(std::move(extra_tbl_column_pair));
}
else
{
auto pair = storage->getColumns().getPhysical(name);
table_scan_source_columns.emplace_back(std::move(pair));
}
table_scan_required_columns.emplace_back(std::move(name));
if (cid != -1 && ci.tp() == TiDB::TypeTimestamp)
need_cast_column.push_back(ExtraCastAfterTSMode::AppendTimeZoneCast);
else if (cid != -1 && ci.tp() == TiDB::TypeTime)
Expand All @@ -506,7 +520,7 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>, String> DAGS
need_cast_column.push_back(ExtraCastAfterTSMode::None);
}

return {required_columns_, source_columns_, need_cast_column, handle_column_name_};
return {table_scan_required_columns, table_scan_source_columns, need_cast_column, table_scan_handle_column_name};
}

std::tuple<std::optional<tipb::DAGRequest>, std::optional<DAGSchema>> DAGStorageInterpreter::buildRemoteTS()
Expand Down Expand Up @@ -541,7 +555,7 @@ std::tuple<std::optional<tipb::DAGRequest>, std::optional<DAGSchema>> DAGStorage
executor->set_tp(tipb::ExecType::TypeSelection);
executor->set_executor_id(query_block.selection->executor_id());
auto * selection = executor->mutable_selection();
for (auto & condition : query_block.selection->selection().conditions())
for (const auto & condition : query_block.selection->selection().conditions())
*selection->add_conditions() = condition;
executor = selection->mutable_child();
}
Expand All @@ -568,7 +582,7 @@ std::tuple<std::optional<tipb::DAGRequest>, std::optional<DAGSchema>> DAGStorage
}
else
{
auto & col_info = table_info.getColumnInfo(col_id);
const auto & col_info = table_info.getColumnInfo(col_id);
schema.emplace_back(std::make_pair(col_info.name, col_info));
}
dag_req.add_output_offsets(i);
Expand Down
21 changes: 21 additions & 0 deletions dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
size_t expected_block_size_,
bool is_raw_,
bool do_range_filter_for_raw_,
const int extra_table_id_index,
const TableID physical_table_id,
const LogWithPrefixPtr & log_)
: dm_context(dm_context_)
, task_pool(task_pool_)
Expand All @@ -47,8 +49,16 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
, expected_block_size(expected_block_size_)
, is_raw(is_raw_)
, do_range_filter_for_raw(do_range_filter_for_raw_)
, extra_table_id_index(extra_table_id_index)
, physical_table_id(physical_table_id)
, log(getMPPTaskLog(log_, NAME))
{
if (extra_table_id_index != InvalidColumnID)
{
ColumnDefine extra_table_id_col_define = getExtraTblIDColumnDefine();
ColumnWithTypeAndName col{extra_table_id_col_define.type->createColumn(), extra_table_id_col_define.type, extra_table_id_col_define.name, extra_table_id_col_define.id, extra_table_id_col_define.default_value};
header.insert(extra_table_id_index, col);
}
}

String getName() const override { return NAME; }
Expand Down Expand Up @@ -102,6 +112,15 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream

if (res)
{
if (extra_table_id_index != InvalidColumnID)
{
ColumnDefine extra_table_id_col_define = getExtraTblIDColumnDefine();
ColumnWithTypeAndName col{{}, extra_table_id_col_define.type, extra_table_id_col_define.name, extra_table_id_col_define.id};
size_t row_number = res.rows();
auto col_data = col.type->createColumnConst(row_number, Field(physical_table_id));
col.column = std::move(col_data);
res.insert(extra_table_id_index, std::move(col));
}
if (!res.rows())
continue;
else
Expand All @@ -128,12 +147,14 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
const size_t expected_block_size;
const bool is_raw;
const bool do_range_filter_for_raw;
const int extra_table_id_index;
hehechen marked this conversation as resolved.
Show resolved Hide resolved

bool done = false;

BlockInputStreamPtr cur_stream;

SegmentPtr cur_segment;
TableID physical_table_id;

LogWithPrefixPtr log;
};
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,18 @@ inline static const UInt64 INITIAL_EPOCH = 0;
#define EXTRA_HANDLE_COLUMN_NAME ::DB::MutableSupport::tidb_pk_column_name
#define VERSION_COLUMN_NAME ::DB::MutableSupport::version_column_name
#define TAG_COLUMN_NAME ::DB::MutableSupport::delmark_column_name
#define EXTRA_TBLID_COLUMN_NAME ::DB::MutableSupport::extra_tblid_column_name
hehechen marked this conversation as resolved.
Show resolved Hide resolved

#define EXTRA_HANDLE_COLUMN_ID ::DB::TiDBPkColumnID
#define VERSION_COLUMN_ID ::DB::VersionColumnID
#define TAG_COLUMN_ID ::DB::DelMarkColumnID
#define EXTRA_TBLID_COLUMN_ID ::DB::ExtraTblIDColumnID

#define EXTRA_HANDLE_COLUMN_INT_TYPE ::DB::MutableSupport::tidb_pk_column_int_type
#define EXTRA_HANDLE_COLUMN_STRING_TYPE ::DB::MutableSupport::tidb_pk_column_string_type
#define VERSION_COLUMN_TYPE ::DB::MutableSupport::version_column_type
#define TAG_COLUMN_TYPE ::DB::MutableSupport::delmark_column_type
#define EXTRA_TBLID_COLUMN_TYPE ::DB::MutableSupport::extra_tblid_column_type

inline const ColumnDefine & getExtraIntHandleColumnDefine()
{
Expand Down Expand Up @@ -134,6 +137,11 @@ inline const ColumnDefine & getTagColumnDefine()
static ColumnDefine TAG_COLUMN_DEFINE_{TAG_COLUMN_ID, TAG_COLUMN_NAME, TAG_COLUMN_TYPE};
return TAG_COLUMN_DEFINE_;
}
inline const ColumnDefine & getExtraTblIDColumnDefine()
hehechen marked this conversation as resolved.
Show resolved Hide resolved
{
static ColumnDefine EXTRA_TABLE_ID_COLUMN_DEFINE_{EXTRA_TBLID_COLUMN_ID, EXTRA_TBLID_COLUMN_NAME, EXTRA_TBLID_COLUMN_TYPE};
return EXTRA_TABLE_ID_COLUMN_DEFINE_;
}

static constexpr UInt64 MIN_UINT64 = std::numeric_limits<UInt64>::min();
static constexpr UInt64 MAX_UINT64 = std::numeric_limits<UInt64>::max();
Expand Down
14 changes: 11 additions & 3 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,15 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context,
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
const Settings & settings_)
const Settings & settings_,
const TableID physical_table_id)
: global_context(db_context.getGlobalContext())
, path_pool(global_context.getPathPool().withTable(db_name_, table_name_, data_path_contains_database_name))
, settings(settings_)
, storage_pool(db_name_ + "." + table_name_, path_pool, global_context, db_context.getSettingsRef())
, db_name(db_name_)
, table_name(table_name_)
, physical_table_id(physical_table_id)
, is_common_handle(is_common_handle_)
, rowkey_column_size(rowkey_column_size_)
, original_table_handle_define(handle)
Expand Down Expand Up @@ -956,7 +958,8 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
const DB::Settings & db_settings,
const ColumnDefines & columns_to_read,
size_t num_streams,
const SegmentIdSet & read_segments)
const SegmentIdSet & read_segments,
size_t extra_table_id_index)
{
SegmentReadTasks tasks;

Expand Down Expand Up @@ -997,6 +1000,8 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
DEFAULT_BLOCK_SIZE,
true,
db_settings.dt_raw_filter_range,
extra_table_id_index,
physical_table_id,
nullptr);
res.push_back(stream);
}
Expand All @@ -1011,7 +1016,8 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
UInt64 max_version,
const RSOperatorPtr & filter,
size_t expected_block_size,
const SegmentIdSet & read_segments)
const SegmentIdSet & read_segments,
size_t extra_table_id_index)
{
auto dm_context = newDMContext(db_context, db_settings, db_context.getCurrentQueryId());

Expand Down Expand Up @@ -1040,6 +1046,8 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
expected_block_size,
false,
db_settings.dt_raw_filter_range,
extra_table_id_index,
physical_table_id,
nullptr);
res.push_back(stream);
}
Expand Down
10 changes: 7 additions & 3 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ class DeltaMergeStore : private boost::noncopyable
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
const Settings & settings_ = EMPTY_SETTINGS);
const Settings & settings_ = EMPTY_SETTINGS,
const TableID physical_table_id = 0);
~DeltaMergeStore();

void setUpBackgroundTask(const DMContextPtr & dm_context);
Expand Down Expand Up @@ -325,7 +326,8 @@ class DeltaMergeStore : private boost::noncopyable
const DB::Settings & db_settings,
const ColumnDefines & columns_to_read,
size_t num_streams,
const SegmentIdSet & read_segments = {});
const SegmentIdSet & read_segments = {},
size_t extra_table_id_index = InvalidColumnID);

/// Read rows with MVCC filtering
/// `sorted_ranges` should be already sorted and merged
Expand All @@ -337,7 +339,8 @@ class DeltaMergeStore : private boost::noncopyable
UInt64 max_version,
const RSOperatorPtr & filter,
size_t expected_block_size = DEFAULT_BLOCK_SIZE,
const SegmentIdSet & read_segments = {});
const SegmentIdSet & read_segments = {},
size_t extra_table_id_index = InvalidColumnID);

/// Force flush all data to disk.
void flushCache(const Context & context, const RowKeyRange & range)
Expand Down Expand Up @@ -436,6 +439,7 @@ class DeltaMergeStore : private boost::noncopyable

String db_name;
String table_name;
TableID physical_table_id;

bool is_common_handle;
size_t rowkey_column_size;
Expand Down
Loading