Skip to content

Commit

Permalink
add GeneratedColumnPlaceholderInputStream (#6796)
Browse files Browse the repository at this point in the history
  • Loading branch information
guo-shaoge committed Feb 13, 2023
1 parent 01a5ddd commit e84ed48
Show file tree
Hide file tree
Showing 9 changed files with 288 additions and 1 deletion.
97 changes: 97 additions & 0 deletions dbms/src/DataStreams/GeneratedColumnPlaceholderBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <Flash/Coprocessor/TiDBTableScan.h>

namespace DB
{
class GeneratedColumnPlaceholderBlockInputStream : public IProfilingBlockInputStream
{
public:
GeneratedColumnPlaceholderBlockInputStream(
const BlockInputStreamPtr & input,
const std::vector<std::tuple<UInt64, String, DataTypePtr>> & generated_column_infos_,
const String & req_id_)
: generated_column_infos(generated_column_infos_)
, log(Logger::get(req_id_))
{
children.push_back(input);
}

String getName() const override { return NAME; }
Block getHeader() const override
{
Block block = children.back()->getHeader();
insertColumns(block, /*insert_data=*/false);
return block;
}

static String getColumnName(UInt64 col_index)
{
return "generated_column_" + std::to_string(col_index);
}

protected:
void readPrefix() override
{
RUNTIME_CHECK(!generated_column_infos.empty());
// Validation check.
for (size_t i = 1; i < generated_column_infos.size(); ++i)
{
RUNTIME_CHECK(std::get<0>(generated_column_infos[i]) > std::get<0>(generated_column_infos[i - 1]));
}
}

Block readImpl() override
{
Block block = children.back()->read();
insertColumns(block, /*insert_data=*/true);
return block;
}

private:
void insertColumns(Block & block, bool insert_data) const
{
if (!block)
return;

for (const auto & ele : generated_column_infos)
{
const auto & col_index = std::get<0>(ele);
const auto & col_name = std::get<1>(ele);
const auto & data_type = std::get<2>(ele);
ColumnPtr column = nullptr;
if (insert_data)
column = data_type->createColumnConstWithDefaultValue(block.rows());
else
column = data_type->createColumn();
block.insert(col_index, ColumnWithTypeAndName{column, data_type, col_name});
}
}

static constexpr auto NAME = "GeneratedColumnPlaceholder";
const std::vector<std::tuple<UInt64, String, DataTypePtr>> generated_column_infos;
const LoggerPtr log;
};

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_s
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(names_and_types), context);
pipeline.streams.insert(pipeline.streams.end(), mock_table_scan_streams.begin(), mock_table_scan_streams.end());
}

// Ignore handling GeneratedColumnPlaceholderBlockInputStream for now, because we don't support generated column in test framework.
}


Expand Down
12 changes: 12 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Common/TiFlashMetrics.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/GeneratedColumnPlaceholderBlockInputStream.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/MultiplexInputStream.h>
#include <DataStreams/NullBlockInputStream.h>
Expand Down Expand Up @@ -335,6 +336,8 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)

/// handle timezone/duration cast for local and remote table scan.
executeCastAfterTableScan(remote_read_streams_start_index, pipeline);
/// handle generated column if necessary.
executeGeneratedColumnPlaceholder(remote_read_streams_start_index, generated_column_infos, log, pipeline);
recordProfileStreams(pipeline, table_scan.getTableScanExecutorID());

/// handle filter conditions for local and remote table scan.
Expand Down Expand Up @@ -940,6 +943,15 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageIn
auto const & ci = table_scan.getColumns()[i];
const ColumnID cid = ci.id;

if (ci.hasGeneratedColumnFlag())
{
LOG_DEBUG(log, "got column({}) with generated column flag", i);
const auto & data_type = getDataTypeByColumnInfoForComputingLayer(ci);
const auto & col_name = GeneratedColumnPlaceholderBlockInputStream::getColumnName(i);
generated_column_infos.push_back(std::make_tuple(i, col_name, data_type));
source_columns_tmp.emplace_back(NameAndTypePair{col_name, data_type});
continue;
}
// Column ID -1 return the handle column
String name;
if (cid == TiDBPkColumnID)
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ class DAGStorageInterpreter
ManageableStoragePtr storage_for_logical_table;
Names required_columns;
NamesAndTypes source_columns;
// For generated column, just need a placeholder, and TiDB will fill this column.
std::vector<std::tuple<UInt64, String, DataTypePtr>> generated_column_infos;
};

} // namespace DB
18 changes: 18 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/GeneratedColumnPlaceholderBlockInputStream.h>
#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/PartialSortingBlockInputStream.h>
#include <DataStreams/SharedQueryBlockInputStream.h>
Expand Down Expand Up @@ -221,4 +222,21 @@ void executePushedDownFilter(
stream->setExtraInfo("projection after push down filter");
}
}

void executeGeneratedColumnPlaceholder(
size_t remote_read_streams_start_index,
const std::vector<std::tuple<UInt64, String, DataTypePtr>> & generated_column_infos,
LoggerPtr log,
DAGPipeline & pipeline)
{
if (generated_column_infos.empty())
return;
assert(remote_read_streams_start_index <= pipeline.streams.size());
for (size_t i = 0; i < remote_read_streams_start_index; ++i)
{
auto & stream = pipeline.streams[i];
stream = std::make_shared<GeneratedColumnPlaceholderBlockInputStream>(stream, generated_column_infos, log->identifier());
stream->setExtraInfo("generated column placeholder above table scan");
}
}
} // namespace DB
6 changes: 6 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,10 @@ void executePushedDownFilter(
DAGExpressionAnalyzer & analyzer,
LoggerPtr log,
DAGPipeline & pipeline);

void executeGeneratedColumnPlaceholder(
size_t remote_read_streams_start_index,
const std::vector<std::tuple<UInt64, String, DataTypePtr>> & generated_column_infos,
LoggerPtr log,
DAGPipeline & pipeline);
} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Coprocessor/MockSourceStream.h>
#include <Flash/Pipeline/Exec/PipelineExecBuilder.h>
#include <Flash/Planner/FinalizeHelper.h>
Expand Down Expand Up @@ -67,6 +68,7 @@ std::pair<NamesAndTypes, BlockInputStreams> mockSchemaAndStreams(
assert(!schema.empty());
assert(!mock_streams.empty());

// Ignore handling GeneratedColumnPlaceholderBlockInputStream for now, because we don't support generated column in test framework.
return {std::move(schema), std::move(mock_streams)};
}
} // namespace
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/Transaction/TiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ enum TP
M(NoDefaultValue, (1 << 12)) \
M(OnUpdateNow, (1 << 13)) \
M(PartKey, (1 << 14)) \
M(Num, (1 << 15))
M(Num, (1 << 15)) \
M(GeneratedColumn, (1 << 23))

enum ColumnFlag
{
Expand Down
147 changes: 147 additions & 0 deletions tests/fullstack-test/expr/generated_index.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# Copyright 2023 PingCAP, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

mysql> drop table if exists test.t;
mysql> create table test.t(c1 varchar(100), c2 varchar(100));
mysql> insert into test.t values('ABC', 'DEF');
mysql> alter table test.t set tiflash replica 1;
func> wait_table test t
mysql> alter table test.t add index idx2((lower(c2)));

mysql> select /*+ nth_plan(1) */ * from test.t where lower(test.t.c2) = 'def';
+------+------+
| c1 | c2 |
+------+------+
| ABC | DEF |
+------+------+
mysql> select /*+ nth_plan(2) */ * from test.t where lower(test.t.c2) = 'def';
+------+------+
| c1 | c2 |
+------+------+
| ABC | DEF |
+------+------+
mysql> select /*+ nth_plan(3) */ * from test.t where lower(test.t.c2) = 'def';
+------+------+
| c1 | c2 |
+------+------+
| ABC | DEF |
+------+------+
mysql> select /*+ nth_plan(4) */ * from test.t where lower(test.t.c2) = 'def';
+------+------+
| c1 | c2 |
+------+------+
| ABC | DEF |
+------+------+
mysql> select /*+ nth_plan(5) */ * from test.t where lower(test.t.c2) = 'def';
+------+------+
| c1 | c2 |
+------+------+
| ABC | DEF |
+------+------+
mysql> select /*+ nth_plan(6) */ * from test.t where lower(test.t.c2) = 'def';
+------+------+
| c1 | c2 |
+------+------+
| ABC | DEF |
+------+------+
mysql> select /*+ nth_plan(7) */ * from test.t where lower(test.t.c2) = 'def';
+------+------+
| c1 | c2 |
+------+------+
| ABC | DEF |
+------+------+
mysql> select /*+ nth_plan(8) */ * from test.t where lower(test.t.c2) = 'def';
+------+------+
| c1 | c2 |
+------+------+
| ABC | DEF |
+------+------+
mysql> select /*+ nth_plan(9) */ * from test.t where lower(test.t.c2) = 'def';
+------+------+
| c1 | c2 |
+------+------+
| ABC | DEF |
+------+------+
mysql> select /*+ nth_plan(10) */ * from test.t where lower(test.t.c2) = 'def';
+------+------+
| c1 | c2 |
+------+------+
| ABC | DEF |
+------+------+

mysql> drop table if exists test.t;
mysql> create table test.t(id int, value int);
mysql> alter table test.t set tiflash replica 1;
func> wait_table test t
mysql> create unique index uk on test.t((tidb_shard(id)), id);
mysql> select /*+ nth_paln(1) */ max(value) from test.t;
+------------+
| max(value) |
+------------+
| NULL |
+------------+
mysql> select /*+ nth_paln(2) */ max(value) from test.t;
+------------+
| max(value) |
+------------+
| NULL |
+------------+
mysql> select /*+ nth_paln(3) */ max(value) from test.t;
+------------+
| max(value) |
+------------+
| NULL |
+------------+
mysql> select /*+ nth_paln(4) */ max(value) from test.t;
+------------+
| max(value) |
+------------+
| NULL |
+------------+
mysql> select /*+ nth_paln(5) */ max(value) from test.t;
+------------+
| max(value) |
+------------+
| NULL |
+------------+
mysql> select /*+ nth_paln(6) */ max(value) from test.t;
+------------+
| max(value) |
+------------+
| NULL |
+------------+
mysql> select /*+ nth_paln(7) */ max(value) from test.t;
+------------+
| max(value) |
+------------+
| NULL |
+------------+
mysql> select /*+ nth_paln(8) */ max(value) from test.t;
+------------+
| max(value) |
+------------+
| NULL |
+------------+
mysql> select /*+ nth_paln(9) */ max(value) from test.t;
+------------+
| max(value) |
+------------+
| NULL |
+------------+
mysql> select /*+ nth_paln(10) */ max(value) from test.t;
+------------+
| max(value) |
+------------+
| NULL |
+------------+

0 comments on commit e84ed48

Please sign in to comment.