Skip to content

Commit

Permalink
set request schema version when do remote read (#4021) (#4027)
Browse files Browse the repository at this point in the history
close #3967
  • Loading branch information
ti-chi-bot authored Jun 14, 2022
1 parent 997f753 commit bfad86c
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 20 deletions.
2 changes: 1 addition & 1 deletion contrib/client-c
33 changes: 17 additions & 16 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ DAGQueryBlockInterpreter::DAGQueryBlockInterpreter(
{
if (query_block.selection != nullptr)
{
for (auto & condition : query_block.selection->selection().conditions())
for (const auto & condition : query_block.selection->selection().conditions())
conditions.push_back(&condition);
}
const Settings & settings = context.getSettingsRef();
Expand Down Expand Up @@ -222,7 +222,7 @@ AnalysisResult analyzeExpressions(
if (query_block.having != nullptr)
{
std::vector<const tipb::Expr *> having_conditions;
for (auto & c : query_block.having->selection().conditions())
for (const auto & c : query_block.having->selection().conditions())
having_conditions.push_back(&c);
analyzer.appendWhere(chain, having_conditions, res.having_column_name);
res.has_having = true;
Expand Down Expand Up @@ -295,7 +295,7 @@ ExpressionActionsPtr generateProjectExpressionActions(
{
auto columns = stream->getHeader();
NamesAndTypesList input_column;
for (auto & column : columns.getColumnsWithTypeAndName())
for (const auto & column : columns.getColumnsWithTypeAndName())
{
input_column.emplace_back(column.name, column.type);
}
Expand Down Expand Up @@ -454,7 +454,7 @@ void DAGQueryBlockInterpreter::executeJoin(const tipb::Join & join, DAGPipeline
throw TiFlashException("Join query block must have 2 input streams", Errors::BroadcastJoin::Internal);
}

auto & join_type_map = join.left_join_keys_size() == 0 ? cartesian_join_type_map : equal_join_type_map;
const auto & join_type_map = join.left_join_keys_size() == 0 ? cartesian_join_type_map : equal_join_type_map;
auto join_type_it = join_type_map.find(join.join_type());
if (join_type_it == join_type_map.end())
throw TiFlashException("Unknown join type in dag request", Errors::Coprocessor::BadRequest);
Expand Down Expand Up @@ -561,7 +561,7 @@ void DAGQueryBlockInterpreter::executeJoin(const tipb::Join & join, DAGPipeline
}

Names left_key_names, right_key_names;
String left_filter_column_name = "", right_filter_column_name = "";
String left_filter_column_name, right_filter_column_name;

/// add necessary transformation if the join key is an expression

Expand All @@ -585,7 +585,7 @@ void DAGQueryBlockInterpreter::executeJoin(const tipb::Join & join, DAGPipeline
swap_join_side ? join.left_conditions() : join.right_conditions(),
right_filter_column_name);

String other_filter_column_name = "", other_eq_filter_from_in_column_name = "";
String other_filter_column_name, other_eq_filter_from_in_column_name;
for (auto const & p : left_pipeline.streams[0]->getHeader().getNamesAndTypesList())
{
if (column_set_for_other_join_filter.find(p.name) == column_set_for_other_join_filter.end())
Expand All @@ -605,7 +605,7 @@ void DAGQueryBlockInterpreter::executeJoin(const tipb::Join & join, DAGPipeline
size_t max_block_size_for_cross_join = settings.max_block_size;
fiu_do_on(FailPoints::minimum_block_size_for_cross_join, { max_block_size_for_cross_join = 1; });

JoinPtr joinPtr = std::make_shared<Join>(
JoinPtr join_ptr = std::make_shared<Join>(
left_key_names,
right_key_names,
true,
Expand All @@ -624,11 +624,11 @@ void DAGQueryBlockInterpreter::executeJoin(const tipb::Join & join, DAGPipeline
// add a HashJoinBuildBlockInputStream to build a shared hash table
size_t stream_index = 0;
right_pipeline.transform(
[&](auto & stream) { stream = std::make_shared<HashJoinBuildBlockInputStream>(stream, joinPtr, stream_index++, log); });
[&](auto & stream) { stream = std::make_shared<HashJoinBuildBlockInputStream>(stream, join_ptr, stream_index++, log); });
executeUnion(right_pipeline, max_streams, log);

right_query.source = right_pipeline.firstStream();
right_query.join = joinPtr;
right_query.join = join_ptr;
right_query.join->setSampleBlock(right_query.source->getHeader());
dag.getDAGContext().getProfileStreamsMapForJoinBuildSide()[query_block.qb_join_subquery_alias].push_back(right_query.source);

Expand Down Expand Up @@ -765,10 +765,10 @@ void DAGQueryBlockInterpreter::executeExpression(DAGPipeline & pipeline, const E

void DAGQueryBlockInterpreter::executeUnion(DAGPipeline & pipeline, size_t max_streams, const LogWithPrefixPtr & log)
{
if (pipeline.streams.size() == 1 && pipeline.streams_with_non_joined_data.size() == 0)
if (pipeline.streams.size() == 1 && pipeline.streams_with_non_joined_data.empty())
return;
auto non_joined_data_stream = combinedNonJoinedDataStream(pipeline, max_streams, log);
if (pipeline.streams.size() > 0)
if (!pipeline.streams.empty())
{
pipeline.firstStream() = std::make_shared<UnionBlockInputStream<>>(pipeline.streams, non_joined_data_stream, max_streams, log);
pipeline.streams.resize(1);
Expand Down Expand Up @@ -912,7 +912,7 @@ void DAGQueryBlockInterpreter::executeRemoteQuery(DAGPipeline & pipeline)
throw TiFlashException("Remote query containing agg or limit or topN is not supported", Errors::Coprocessor::BadRequest);
const auto & ts = query_block.source->tbl_scan();
std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> key_ranges;
for (auto & range : ts.ranges())
for (const auto & range : ts.ranges())
{
std::string start_key(range.low());
DecodedTiKVKey start(std::move(start_key));
Expand All @@ -935,7 +935,7 @@ void DAGQueryBlockInterpreter::executeRemoteQuery(DAGPipeline & pipeline)
ColumnsWithTypeAndName columns;
BoolVec is_ts_column;
std::vector<NameAndTypePair> source_columns;
for (int i = 0; i < (int)query_block.output_field_types.size(); i++)
for (int i = 0; i < static_cast<int>(query_block.output_field_types.size()); i++)
{
dag_req.add_output_offsets(i);
ColumnInfo info = TiDB::fieldTypeToColumnInfo(query_block.output_field_types[i]);
Expand All @@ -962,6 +962,7 @@ void DAGQueryBlockInterpreter::executeRemoteQueryImpl(
dag_req.SerializeToString(&(req->data));
req->tp = pingcap::coprocessor::ReqType::DAG;
req->start_ts = context.getSettingsRef().read_tso;
req->schema_version = context.getSettingsRef().schema_version;
bool has_enforce_encode_type = dag_req.has_force_encode_type() && dag_req.force_encode_type();

pingcap::kv::Cluster * cluster = context.getTMTContext().getKVCluster();
Expand Down Expand Up @@ -1047,11 +1048,11 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
std::vector<NameAndTypePair> output_columns;
NamesWithAliases project_cols;
UniqueNameGenerator unique_name_generator;
for (auto & expr : query_block.source->projection().exprs())
for (const auto & expr : query_block.source->projection().exprs())
{
auto expr_name = dag_analyzer.getActions(expr, last_step.actions);
last_step.required_output.emplace_back(expr_name);
auto & col = last_step.actions->getSampleBlock().getByName(expr_name);
const auto & col = last_step.actions->getSampleBlock().getByName(expr_name);
String alias = unique_name_generator.toUniqueName(col.name);
output_columns.emplace_back(alias, col.type);
project_cols.emplace_back(col.name, alias);
Expand Down Expand Up @@ -1199,7 +1200,7 @@ BlockInputStreams DAGQueryBlockInterpreter::execute()
{
DAGPipeline pipeline;
executeImpl(pipeline);
if (pipeline.streams_with_non_joined_data.size() > 0)
if (!pipeline.streams_with_non_joined_data.empty())
{
size_t concurrency = pipeline.streams.size();
executeUnion(pipeline, max_streams, log);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ class DAGQueryBlockInterpreter
void executeLimit(DAGPipeline & pipeline);
void executeAggregation(
DAGPipeline & pipeline,
const ExpressionActionsPtr & expressionActionsPtr,
Names & aggregation_keys,
const ExpressionActionsPtr & expr,
Names & key_names,
TiDB::TiDBCollators & collators,
AggregateDescriptions & aggregate_descriptions,
AggregateDescriptions & aggregates,
bool is_final_agg);
void executeProject(DAGPipeline & pipeline, NamesWithAliases & project_cols);

Expand Down

0 comments on commit bfad86c

Please sign in to comment.