Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#6796
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
guo-shaoge authored and ti-chi-bot committed Feb 13, 2023
1 parent 9f45e26 commit 3834650
Show file tree
Hide file tree
Showing 9 changed files with 642 additions and 1 deletion.
97 changes: 97 additions & 0 deletions dbms/src/DataStreams/GeneratedColumnPlaceholderBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <Flash/Coprocessor/TiDBTableScan.h>

namespace DB
{
class GeneratedColumnPlaceholderBlockInputStream : public IProfilingBlockInputStream
{
public:
GeneratedColumnPlaceholderBlockInputStream(
const BlockInputStreamPtr & input,
const std::vector<std::tuple<UInt64, String, DataTypePtr>> & generated_column_infos_,
const String & req_id_)
: generated_column_infos(generated_column_infos_)
, log(Logger::get(req_id_))
{
children.push_back(input);
}

String getName() const override { return NAME; }
Block getHeader() const override
{
Block block = children.back()->getHeader();
insertColumns(block, /*insert_data=*/false);
return block;
}

static String getColumnName(UInt64 col_index)
{
return "generated_column_" + std::to_string(col_index);
}

protected:
void readPrefix() override
{
RUNTIME_CHECK(!generated_column_infos.empty());
// Validation check.
for (size_t i = 1; i < generated_column_infos.size(); ++i)
{
RUNTIME_CHECK(std::get<0>(generated_column_infos[i]) > std::get<0>(generated_column_infos[i - 1]));
}
}

Block readImpl() override
{
Block block = children.back()->read();
insertColumns(block, /*insert_data=*/true);
return block;
}

private:
void insertColumns(Block & block, bool insert_data) const
{
if (!block)
return;

for (const auto & ele : generated_column_infos)
{
const auto & col_index = std::get<0>(ele);
const auto & col_name = std::get<1>(ele);
const auto & data_type = std::get<2>(ele);
ColumnPtr column = nullptr;
if (insert_data)
column = data_type->createColumnConstWithDefaultValue(block.rows());
else
column = data_type->createColumn();
block.insert(col_index, ColumnWithTypeAndName{column, data_type, col_name});
}
}

static constexpr auto NAME = "GeneratedColumnPlaceholder";
const std::vector<std::tuple<UInt64, String, DataTypePtr>> generated_column_infos;
const LoggerPtr log;
};

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_s
auto mock_table_scan_stream = std::make_shared<MockTableScanBlockInputStream>(columns_with_type_and_name, context.getSettingsRef().max_block_size);
pipeline.streams.emplace_back(mock_table_scan_stream);
}

// Ignore handling GeneratedColumnPlaceholderBlockInputStream for now, because we don't support generated column in test framework.
}

void DAGQueryBlockInterpreter::handleTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline)
Expand Down
12 changes: 12 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Common/TiFlashMetrics.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/GeneratedColumnPlaceholderBlockInputStream.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/TiRemoteBlockInputStream.h>
Expand Down Expand Up @@ -300,6 +301,8 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)

/// handle timezone/duration cast for local and remote table scan.
executeCastAfterTableScan(remote_read_streams_start_index, pipeline);
/// handle generated column if necessary.
executeGeneratedColumnPlaceholder(remote_read_streams_start_index, generated_column_infos, log, pipeline);
recordProfileStreams(pipeline, table_scan.getTableScanExecutorID());

/// handle pushed down filter for local and remote table scan.
Expand Down Expand Up @@ -962,6 +965,15 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageIn
auto const & ci = table_scan.getColumns()[i];
ColumnID cid = ci.column_id();

if (ci.hasGeneratedColumnFlag())
{
LOG_DEBUG(log, "got column({}) with generated column flag", i);
const auto & data_type = getDataTypeByColumnInfoForComputingLayer(ci);
const auto & col_name = GeneratedColumnPlaceholderBlockInputStream::getColumnName(i);
generated_column_infos.push_back(std::make_tuple(i, col_name, data_type));
source_columns_tmp.emplace_back(NameAndTypePair{col_name, data_type});
continue;
}
// Column ID -1 return the handle column
String name;
if (cid == TiDBPkColumnID)
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ class DAGStorageInterpreter
ManageableStoragePtr storage_for_logical_table;
Names required_columns;
NamesAndTypes source_columns;
// For generated column, just need a placeholder, and TiDB will fill this column.
std::vector<std::tuple<UInt64, String, DataTypePtr>> generated_column_infos;
};

} // namespace DB
173 changes: 173 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

<<<<<<< HEAD
=======
#include <Common/ThresholdUtils.h>
#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/GeneratedColumnPlaceholderBlockInputStream.h>
#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/PartialSortingBlockInputStream.h>
>>>>>>> e84ed489e6 (add GeneratedColumnPlaceholderInputStream (#6796))
#include <DataStreams/SharedQueryBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
Expand Down Expand Up @@ -93,4 +103,167 @@ ExpressionActionsPtr generateProjectExpressionActions(
project->add(ExpressionAction::project(project_cols));
return project;
}
<<<<<<< HEAD
=======

void executeExpression(
DAGPipeline & pipeline,
const ExpressionActionsPtr & expr_actions,
const LoggerPtr & log,
const String & extra_info)
{
if (expr_actions && !expr_actions->getActions().empty())
{
pipeline.transform([&](auto & stream) {
stream = std::make_shared<ExpressionBlockInputStream>(stream, expr_actions, log->identifier());
stream->setExtraInfo(extra_info);
});
}
}

void orderStreams(
DAGPipeline & pipeline,
size_t max_streams,
const SortDescription & order_descr,
Int64 limit,
bool enable_fine_grained_shuffle,
const Context & context,
const LoggerPtr & log)
{
const Settings & settings = context.getSettingsRef();
String extra_info;
if (enable_fine_grained_shuffle)
extra_info = enableFineGrainedShuffleExtraInfo;

pipeline.transform([&](auto & stream) {
stream = std::make_shared<PartialSortingBlockInputStream>(stream, order_descr, log->identifier(), limit);
stream->setExtraInfo(extra_info);
});

if (enable_fine_grained_shuffle)
{
pipeline.transform([&](auto & stream) {
stream = std::make_shared<MergeSortingBlockInputStream>(
stream,
order_descr,
settings.max_block_size,
limit,
getAverageThreshold(settings.max_bytes_before_external_sort, pipeline.streams.size()),
SpillConfig(context.getTemporaryPath(), fmt::format("{}_sort", log->identifier()), settings.max_spilled_size_per_spill, context.getFileProvider()),
log->identifier());
stream->setExtraInfo(String(enableFineGrainedShuffleExtraInfo));
});
}
else
{
/// If there are several streams, we merge them into one
executeUnion(pipeline, max_streams, log, false, "for partial order");

/// Merge the sorted blocks.
pipeline.firstStream() = std::make_shared<MergeSortingBlockInputStream>(
pipeline.firstStream(),
order_descr,
settings.max_block_size,
limit,
settings.max_bytes_before_external_sort,
// todo use identifier_executor_id as the spill id
SpillConfig(context.getTemporaryPath(), fmt::format("{}_sort", log->identifier()), settings.max_spilled_size_per_spill, context.getFileProvider()),
log->identifier());
}
}

void executeCreatingSets(
DAGPipeline & pipeline,
const Context & context,
size_t max_streams,
const LoggerPtr & log)
{
DAGContext & dag_context = *context.getDAGContext();
/// add union to run in parallel if needed
if (unlikely(context.isExecutorTest() || context.isInterpreterTest()))
executeUnion(pipeline, max_streams, log, /*ignore_block=*/false, "for test");
else if (context.isMPPTest())
executeUnion(pipeline, max_streams, log, /*ignore_block=*/true, "for mpp test");
else if (dag_context.isMPPTask())
/// MPPTask do not need the returned blocks.
executeUnion(pipeline, max_streams, log, /*ignore_block=*/true, "for mpp");
else
executeUnion(pipeline, max_streams, log, /*ignore_block=*/false, "for non mpp");
if (dag_context.hasSubquery())
{
const Settings & settings = context.getSettingsRef();
pipeline.firstStream() = std::make_shared<CreatingSetsBlockInputStream>(
pipeline.firstStream(),
std::move(dag_context.moveSubqueries()),
SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode),
log->identifier());
}
}

std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> buildPushDownFilter(
const FilterConditions & filter_conditions,
DAGExpressionAnalyzer & analyzer)
{
assert(filter_conditions.hasValue());

ExpressionActionsChain chain;
analyzer.initChain(chain);
String filter_column_name = analyzer.appendWhere(chain, filter_conditions.conditions);
ExpressionActionsPtr before_where = chain.getLastActions();
chain.addStep();

// remove useless tmp column and keep the schema of local streams and remote streams the same.
NamesWithAliases project_cols;
for (const auto & col : analyzer.getCurrentInputColumns())
{
chain.getLastStep().required_output.push_back(col.name);
project_cols.emplace_back(col.name, col.name);
}
chain.getLastActions()->add(ExpressionAction::project(project_cols));
ExpressionActionsPtr project_after_where = chain.getLastActions();
chain.finalize();
chain.clear();

return {before_where, filter_column_name, project_after_where};
}

void executePushedDownFilter(
size_t remote_read_streams_start_index,
const FilterConditions & filter_conditions,
DAGExpressionAnalyzer & analyzer,
LoggerPtr log,
DAGPipeline & pipeline)
{
auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(filter_conditions, analyzer);

assert(remote_read_streams_start_index <= pipeline.streams.size());
// for remote read, filter had been pushed down, don't need to execute again.
for (size_t i = 0; i < remote_read_streams_start_index; ++i)
{
auto & stream = pipeline.streams[i];
stream = std::make_shared<FilterBlockInputStream>(stream, before_where, filter_column_name, log->identifier());
stream->setExtraInfo("push down filter");
// after filter, do project action to keep the schema of local streams and remote streams the same.
stream = std::make_shared<ExpressionBlockInputStream>(stream, project_after_where, log->identifier());
stream->setExtraInfo("projection after push down filter");
}
}

void executeGeneratedColumnPlaceholder(
size_t remote_read_streams_start_index,
const std::vector<std::tuple<UInt64, String, DataTypePtr>> & generated_column_infos,
LoggerPtr log,
DAGPipeline & pipeline)
{
if (generated_column_infos.empty())
return;
assert(remote_read_streams_start_index <= pipeline.streams.size());
for (size_t i = 0; i < remote_read_streams_start_index; ++i)
{
auto & stream = pipeline.streams[i];
stream = std::make_shared<GeneratedColumnPlaceholderBlockInputStream>(stream, generated_column_infos, log->identifier());
stream->setExtraInfo("generated column placeholder above table scan");
}
}
>>>>>>> e84ed489e6 (add GeneratedColumnPlaceholderInputStream (#6796))
} // namespace DB
41 changes: 41 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,45 @@ ExpressionActionsPtr generateProjectExpressionActions(
const BlockInputStreamPtr & stream,
const Context & context,
const NamesWithAliases & project_cols);
<<<<<<< HEAD
=======

void executeExpression(
DAGPipeline & pipeline,
const ExpressionActionsPtr & expr_actions,
const LoggerPtr & log,
const String & extra_info = "");

void orderStreams(
DAGPipeline & pipeline,
size_t max_streams,
const SortDescription & order_descr,
Int64 limit,
bool enable_fine_grained_shuffle,
const Context & context,
const LoggerPtr & log);

void executeCreatingSets(
DAGPipeline & pipeline,
const Context & context,
size_t max_streams,
const LoggerPtr & log);

std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> buildPushDownFilter(
const FilterConditions & filter_conditions,
DAGExpressionAnalyzer & analyzer);

void executePushedDownFilter(
size_t remote_read_streams_start_index,
const FilterConditions & filter_conditions,
DAGExpressionAnalyzer & analyzer,
LoggerPtr log,
DAGPipeline & pipeline);

void executeGeneratedColumnPlaceholder(
size_t remote_read_streams_start_index,
const std::vector<std::tuple<UInt64, String, DataTypePtr>> & generated_column_infos,
LoggerPtr log,
DAGPipeline & pipeline);
>>>>>>> e84ed489e6 (add GeneratedColumnPlaceholderInputStream (#6796))
} // namespace DB
Loading

0 comments on commit 3834650

Please sign in to comment.