Skip to content

Commit

Permalink
Storages: Fix returned column types may not match in late-materializa…
Browse files Browse the repository at this point in the history
…tion (#9176)

close #9175

Co-authored-by: JaySon <tshent@qq.com>
  • Loading branch information
JinheLin and JaySon-Huang authored Jul 3, 2024
1 parent 20d4616 commit 567bcb1
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 14 deletions.
3 changes: 1 addition & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ struct fmt::formatter<DB::DM::ColumnDefine>
template <typename FormatContext>
auto format(const DB::DM::ColumnDefine & cd, FormatContext & ctx) const -> decltype(ctx.out())
{
// Use '/' as separators because column names often have '_'.
return fmt::format_to(ctx.out(), "{}/{}/{}", cd.id, cd.name, cd.type->getName());
return fmt::format_to(ctx.out(), "{}/{}", cd.id, cd.type->getName());
}
};
31 changes: 19 additions & 12 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1222,9 +1222,10 @@ BlockInputStreams DeltaMergeStore::read(
GET_METRIC(tiflash_storage_read_tasks_count).Increment(tasks.size());
size_t final_num_stream = std::max(1, std::min(num_streams, tasks.size()));
auto read_mode = getReadMode(db_context, is_fast_scan, keep_order, filter);
const auto & final_columns_to_read = filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read;
auto read_task_pool = std::make_shared<SegmentReadTaskPool>(
extra_table_id_index,
columns_to_read,
final_columns_to_read,
filter,
start_ts,
expected_block_size,
Expand All @@ -1245,7 +1246,7 @@ BlockInputStreams DeltaMergeStore::read(
{
stream = std::make_shared<UnorderedInputStream>(
read_task_pool,
filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read,
final_columns_to_read,
extra_table_id_index,
log_tracing_id,
runtime_filter_list,
Expand All @@ -1257,7 +1258,7 @@ BlockInputStreams DeltaMergeStore::read(
dm_context,
read_task_pool,
after_segment_read,
filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read,
final_columns_to_read,
filter,
start_ts,
expected_block_size,
Expand All @@ -1271,14 +1272,17 @@ BlockInputStreams DeltaMergeStore::read(
LOG_INFO(
tracing_logger,
"Read create stream done, keep_order={} dt_enable_read_thread={} enable_read_thread={} "
"is_fast_scan={} is_push_down_filter_empty={} pool_id={} num_streams={}",
"is_fast_scan={} is_push_down_filter_empty={} pool_id={} num_streams={} columns_to_read={} "
"final_columns_to_read={}",
keep_order,
db_context.getSettingsRef().dt_enable_read_thread,
enable_read_thread,
is_fast_scan,
filter == nullptr || filter->before_where == nullptr,
read_task_pool->pool_id,
final_num_stream);
final_num_stream,
columns_to_read,
final_columns_to_read);

return res;
}
Expand Down Expand Up @@ -1329,9 +1333,10 @@ void DeltaMergeStore::read(
size_t final_num_stream
= enable_read_thread ? std::max(1, num_streams) : std::max(1, std::min(num_streams, tasks.size()));
auto read_mode = getReadMode(db_context, is_fast_scan, keep_order, filter);
const auto & final_columns_to_read = filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read;
auto read_task_pool = std::make_shared<SegmentReadTaskPool>(
extra_table_id_index,
columns_to_read,
final_columns_to_read,
filter,
start_ts,
expected_block_size,
Expand All @@ -1344,15 +1349,14 @@ void DeltaMergeStore::read(
dm_context->scan_context->resource_group_name);
dm_context->scan_context->read_mode = read_mode;

const auto & columns_after_cast = filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read;
if (enable_read_thread)
{
for (size_t i = 0; i < final_num_stream; ++i)
{
group_builder.addConcurrency(std::make_unique<UnorderedSourceOp>(
exec_context,
read_task_pool,
columns_after_cast,
final_columns_to_read,
extra_table_id_index,
log_tracing_id,
runtime_filter_list,
Expand All @@ -1368,7 +1372,7 @@ void DeltaMergeStore::read(
dm_context,
read_task_pool,
after_segment_read,
columns_after_cast,
final_columns_to_read,
filter,
start_ts,
expected_block_size,
Expand All @@ -1379,7 +1383,7 @@ void DeltaMergeStore::read(
builder.appendTransformOp(std::make_unique<AddExtraTableIDColumnTransformOp>(
exec_context,
log_tracing_id,
columns_after_cast,
final_columns_to_read,
extra_table_id_index,
physical_table_id));
});
Expand All @@ -1388,14 +1392,17 @@ void DeltaMergeStore::read(
LOG_INFO(
tracing_logger,
"Read create PipelineExec done, keep_order={} dt_enable_read_thread={} enable_read_thread={} "
"is_fast_scan={} is_push_down_filter_empty={} pool_id={} num_streams={}",
"is_fast_scan={} is_push_down_filter_empty={} pool_id={} num_streams={} columns_to_read={} "
"final_columns_to_read={}",
keep_order,
db_context.getSettingsRef().dt_enable_read_thread,
enable_read_thread,
is_fast_scan,
filter == nullptr || filter->before_where == nullptr,
read_task_pool->pool_id,
final_num_stream);
final_num_stream,
columns_to_read,
final_columns_to_read);
}

Remote::DisaggPhysicalTableReadSnapshotPtr DeltaMergeStore::writeNodeBuildRemoteReadSnapshot(
Expand Down
190 changes: 190 additions & 0 deletions tests/fullstack-test/expr/duration_filter_late_materialization2.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
# Copyright 2023 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

mysql> drop table if exists test.t;
mysql> create table if not exists test.t(a time(4), i int);

# insert more than 8192 rows to make sure filter conditions can be pushed down.
mysql> insert into test.t values('-000:10:10.123456', 1), ('000:11:11.123500', 2), ('000:12:12.123500', 3), ('000:13:13.123500', 4);
mysql> insert into test.t values('-001:10:10.123456', 1), ('001:11:11.123500', 2), ('001:12:12.123500', 3), ('001:13:13.123500', 4);
mysql> insert into test.t values('-002:10:10.123456', 1), ('002:11:11.123500', 2), ('002:12:12.123500', 3), ('002:13:13.123500', 4);
mysql> insert into test.t values('-003:10:10.123456', 1), ('003:11:11.123500', 2), ('003:12:12.123500', 3), ('003:13:13.123500', 4);
mysql> insert into test.t values('-004:10:10.123456', 1), ('004:11:11.123500', 2), ('004:12:12.123500', 3), ('004:13:13.123500', 4);
mysql> insert into test.t values('-005:10:10.123456', 1), ('005:11:11.123500', 2), ('005:12:12.123500', 3), ('005:13:13.123500', 4);
mysql> insert into test.t values('-006:10:10.123456', 1), ('006:11:11.123500', 2), ('006:12:12.123500', 3), ('006:13:13.123500', 4);
mysql> insert into test.t values('-007:10:10.123456', 1), ('007:11:11.123500', 2), ('007:12:12.123500', 3), ('007:13:13.123500', 4);
mysql> insert into test.t values('-008:10:10.123456', 1), ('008:11:11.123500', 2), ('008:12:12.123500', 3), ('008:13:13.123500', 4);
mysql> insert into test.t values('-009:10:10.123456', 1), ('009:11:11.123500', 2), ('009:12:12.123500', 3), ('009:13:13.123500', 4);
mysql> insert into test.t values('-010:10:10.123456', 1), ('010:11:11.123500', 2), ('010:12:12.123500', 3), ('010:13:13.123500', 4);
mysql> insert into test.t values('-011:10:10.123456', 1), ('011:11:11.123500', 2), ('011:12:12.123500', 3), ('011:13:13.123500', 4);
mysql> insert into test.t values('-012:10:10.123456', 1), ('012:11:11.123500', 2), ('012:12:12.123500', 3), ('012:13:13.123500', 4);
mysql> insert into test.t values('-013:10:10.123456', 1), ('013:11:11.123500', 2), ('013:12:12.123500', 3), ('013:13:13.123500', 4);
mysql> insert into test.t values('-014:10:10.123456', 1), ('014:11:11.123500', 2), ('014:12:12.123500', 3), ('014:13:13.123500', 4);
mysql> insert into test.t values('-015:10:10.123456', 1), ('015:11:11.123500', 2), ('015:12:12.123500', 3), ('015:13:13.123500', 4);
mysql> insert into test.t values('-016:10:10.123456', 1), ('016:11:11.123500', 2), ('016:12:12.123500', 3), ('016:13:13.123500', 4);
mysql> insert into test.t values('-017:10:10.123456', 1), ('017:11:11.123500', 2), ('017:12:12.123500', 3), ('017:13:13.123500', 4);
mysql> insert into test.t values('-018:10:10.123456', 1), ('018:11:11.123500', 2), ('018:12:12.123500', 3), ('018:13:13.123500', 4);
mysql> insert into test.t values('-019:10:10.123456', 1), ('019:11:11.123500', 2), ('019:12:12.123500', 3), ('019:13:13.123500', 4);
mysql> insert into test.t values('-020:10:10.123456', 1), ('020:11:11.123500', 2), ('020:12:12.123500', 3), ('020:13:13.123500', 4);
mysql> insert into test.t values('-021:10:10.123456', 1), ('021:11:11.123500', 2), ('021:12:12.123500', 3), ('021:13:13.123500', 4);
mysql> insert into test.t values('-022:10:10.123456', 1), ('022:11:11.123500', 2), ('022:12:12.123500', 3), ('022:13:13.123500', 4);
mysql> insert into test.t values('-023:10:10.123456', 1), ('023:11:11.123500', 2), ('023:12:12.123500', 3), ('023:13:13.123500', 4);
mysql> insert into test.t values('-024:10:10.123456', 1), ('024:11:11.123500', 2), ('024:12:12.123500', 3), ('024:13:13.123500', 4);
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;

mysql> alter table test.t set tiflash replica 1;

func> wait_table test t

mysql> set tidb_isolation_read_engines='tiflash'; select hour(a), i from test.t where a = '024:11:11.123500';
+---------+------+
| hour(a) | i |
+---------+------+
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
| 24 | 2 |
+---------+------+

mysql> drop table test.t;

0 comments on commit 567bcb1

Please sign in to comment.