Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: Skip filtering for filter column (#9361) #9481

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ Block LateMaterializationBlockInputStream::readImpl()
}
for (auto & col : filter_column_block)
{
if (col.name == filter_column_name)
continue;
col.column = col.column->filter(col_filter, passed_count);
}
}
Expand Down
248 changes: 248 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3885,6 +3885,254 @@ try
}
CATCH

<<<<<<< HEAD
=======

TEST_F(DeltaMergeStoreTest, RSResult)
try
{
auto log = Logger::get(GET_GTEST_FULL_NAME);
auto table_column_defines = DMTestEnv::getDefaultColumns();
ColumnDefine cd_time(1, "col_time", std::make_shared<DataTypeInt64>());
table_column_defines->push_back(cd_time);

store = reload(table_column_defines);

auto create_data = [&](Int64 start, Int64 limit) {
std::vector<Int64> v(limit, 0);
std::iota(v.begin(), v.end(), start); // start ... start + limit - 1
return v;
};

auto create_block = [&](UInt64 beg, UInt64 end, UInt64 ts) {
auto block = DMTestEnv::prepareSimpleWriteBlock(beg, end, false, ts);
auto time_data = create_data(0, end - beg);
auto col_time = createColumn<Int64>(time_data, cd_time.name, cd_time.id);
block.insert(col_time);
block.checkNumberOfRows();
return block;
};

auto check = [&](PushDownFilterPtr filter, RSResult expected_res, const std::vector<Int64> & expected_data) {
auto in = store->read(
*db_context,
db_context->getSettingsRef(),
store->getTableColumns(),
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* start_ts= */ std::numeric_limits<UInt64>::max(),
filter,
std::vector<RuntimeFilterPtr>{},
0,
"",
/* keep_order= */ false,
/* is_fast_scan= */ false,
/* expected_block_size= */ 1024)[0];

Int64 rows = 0;
in->readPrefix();
while (true)
{
auto b = in->read();
if (!b)
break;
rows += b.rows();
ASSERT_EQ(b.getRSResult(), expected_res) << fmt::format("{} vs {}", b.getRSResult(), expected_res);
const auto * v = toColumnVectorDataPtr<Int64>(b.getByName("col_time").column);
ASSERT_NE(v, nullptr);
ASSERT_EQ(v->size(), expected_data.size());
ASSERT_TRUE(std::equal(v->begin(), v->end(), expected_data.begin()))
<< fmt::format("{} vs {}", *v, expected_data);
}
in->readSuffix();
ASSERT_EQ(rows, expected_data.size());
};

const String table_info_json = R"json({
"cols":[
{"comment":"","default":null,"default_bit":null,"id":1,"name":{"L":"col_time","O":"col_time"},"offset":-1,"origin_default":null,"state":0,"type":{"Charset":null,"Collate":null,"Decimal":5,"Elems":null,"Flag":1,"Flen":0,"Tp":11}}
],
"pk_is_handle":false,"index_info":[],"is_common_handle":false,
"name":{"L":"t_111","O":"t_111"},"partition":null,
"comment":"Mocked.","id":30,"schema_version":-1,"state":0,"tiflash_replica":{"Count":0},"update_timestamp":1636471547239654
})json";

auto create_filter = [&](Int64 value) {
auto filter = generatePushDownFilter(
*db_context,
table_info_json,
fmt::format("select * from default.t_111 where col_time >= {}", value));
RUNTIME_CHECK(filter->extra_cast != nullptr);
RUNTIME_CHECK(filter->rs_operator != nullptr);
auto rs_unsupported = typeid_cast<const Unsupported *>(filter->rs_operator.get());
RUNTIME_CHECK(rs_unsupported == nullptr, filter->rs_operator->toDebugString());
RUNTIME_CHECK(filter->before_where != nullptr);
LOG_DEBUG(
log,
"value={} extra_cast={} rs_operator={} before_where={}",
value,
filter->extra_cast->dumpActions(),
filter->rs_operator->toDebugString(),
filter->before_where->dumpActions());
return filter;
};

DB::registerFunctions();

constexpr Int64 num_rows = 128;
auto filter_all = create_filter(0);
auto filter_all_data = create_data(0, num_rows);
auto filter_some = create_filter(64);
auto filter_some_data = create_data(64, num_rows - 64);
auto filter_none = create_filter(128);
auto filter_none_data = std::vector<Int64>{};

// Disable delta merge to ensure read data from delta
FailPointHelper::enableFailPoint(FailPoints::pause_before_dt_background_delta_merge);

auto block = create_block(0, num_rows, 1);
store->write(*db_context, db_context->getSettingsRef(), block);

LOG_DEBUG(log, "Check delta");
// Delta always return Some
check(filter_all, RSResult::Some, filter_all_data);
check(filter_some, RSResult::Some, filter_some_data);
check(filter_none, RSResult::Some, filter_none_data);

LOG_DEBUG(log, "Check stable");
FailPointHelper::disableFailPoint(FailPoints::pause_before_dt_background_delta_merge);
store->mergeDeltaAll(*db_context);
check(filter_all, RSResult::All, filter_all_data);
check(filter_some, RSResult::Some, filter_some_data);
check(filter_none, RSResult::Some, filter_none_data);
}
CATCH

TEST_F(DeltaMergeStoreTest, LMAllWithMultiVersionRecords)
try
{
auto log = Logger::get(GET_GTEST_FULL_NAME);
auto table_column_defines = DMTestEnv::getDefaultColumns();
ColumnDefine cd_time(1, "col_time", std::make_shared<DataTypeInt64>());
ColumnDefine cd_int(2, "col_int", std::make_shared<DataTypeInt64>());
table_column_defines->push_back(cd_time);
table_column_defines->push_back(cd_int);

store = reload(table_column_defines);

auto create_data = [&](Int64 start, Int64 limit) {
std::vector<Int64> v(limit, 0);
std::iota(v.begin(), v.end(), start); // start ... start + limit - 1
return v;
};

auto create_block = [&](UInt64 beg, UInt64 end, UInt64 ts) {
auto block = DMTestEnv::prepareSimpleWriteBlock(beg, end, false, ts);
auto data = create_data(0, end - beg);
block.insert(createColumn<Int64>(data, cd_time.name, cd_time.id));
block.insert(createColumn<Int64>(data, cd_int.name, cd_int.id));
block.checkNumberOfRows();
return block;
};

auto check = [&](PushDownFilterPtr filter, RSResult expected_res, const std::vector<Int64> & expected_data) {
auto in = store->read(
*db_context,
db_context->getSettingsRef(),
store->getTableColumns(),
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* start_ts= */ std::numeric_limits<UInt64>::max(),
filter,
std::vector<RuntimeFilterPtr>{},
0,
"",
/* keep_order= */ false,
/* is_fast_scan= */ false,
/* expected_block_size= */ 1024)[0];

Int64 rows = 0;
in->readPrefix();
while (true)
{
auto b = in->read();
if (!b)
break;
rows += b.rows();
ASSERT_EQ(b.getRSResult(), expected_res) << fmt::format("{} vs {}", b.getRSResult(), expected_res);
const auto * v = toColumnVectorDataPtr<Int64>(b.getByName("col_time").column);
ASSERT_NE(v, nullptr);
ASSERT_EQ(v->size(), expected_data.size());
ASSERT_TRUE(std::equal(v->begin(), v->end(), expected_data.begin()))
<< fmt::format("{} vs {}", *v, expected_data);
}
in->readSuffix();
ASSERT_EQ(rows, expected_data.size());
};

const String table_info_json = R"json({
"cols":[
{"comment":"","default":null,"default_bit":null,"id":1,"name":{"L":"col_time","O":"col_time"},"offset":-1,"origin_default":null,"state":0,"type":{"Charset":null,"Collate":null,"Decimal":5,"Elems":null,"Flag":1,"Flen":0,"Tp":11}},
{"comment":"","default":null,"default_bit":null,"id":2,"name":{"L":"col_int","O":"col_int"},"offset":-1,"origin_default":null,"state":0,"type":{"Charset":null,"Collate":null,"Decimal":5,"Elems":null,"Flag":1,"Flen":0,"Tp":8}}
],
"pk_is_handle":false,"index_info":[],"is_common_handle":false,
"name":{"L":"t_111","O":"t_111"},"partition":null,
"comment":"Mocked.","id":30,"schema_version":-1,"state":0,"tiflash_replica":{"Count":0},"update_timestamp":1636471547239654
})json";

auto create_filter = [&](Int64 value) {
auto filter = generatePushDownFilter(
*db_context,
table_info_json,
fmt::format("select * from default.t_111 where col_time >= {}", value));
RUNTIME_CHECK(filter->extra_cast != nullptr);
RUNTIME_CHECK(filter->rs_operator != nullptr);
auto rs_unsupported = typeid_cast<const Unsupported *>(filter->rs_operator.get());
RUNTIME_CHECK(rs_unsupported == nullptr, filter->rs_operator->toDebugString());
RUNTIME_CHECK(filter->before_where != nullptr);
LOG_DEBUG(
log,
"value={} extra_cast={} rs_operator={} before_where={}",
value,
filter->extra_cast->dumpActions(),
filter->rs_operator->toDebugString(),
filter->before_where->dumpActions());
return filter;
};

DB::registerFunctions();

constexpr Int64 num_rows = 128;
auto filter_all = create_filter(0);
auto filter_all_data = create_data(0, num_rows);

// Write multi-version records.
{
auto block = create_block(0, num_rows, 1);
store->write(*db_context, db_context->getSettingsRef(), block);
}
{
auto block = create_block(0, num_rows, 2);
store->write(*db_context, db_context->getSettingsRef(), block);
}
store->mergeDeltaAll(*db_context);

// Ensure multi-version records.
ASSERT_EQ(store->id_to_segment.size(), 1);
auto seg = store->id_to_segment.begin()->second;
seg->stable->calculateStableProperty(
*store->newDMContext(*db_context, db_context->getSettingsRef()),
RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()),
store->isCommonHandle());
const auto & property = seg->stable->getStableProperty();
ASSERT_EQ(property.num_versions, num_rows * 2);
ASSERT_EQ(property.num_puts, num_rows);

check(filter_all, RSResult::All, filter_all_data);
}
CATCH

>>>>>>> 55cb9b9af1 (Storages: Skip filtering for filter column (#9361))
} // namespace tests
} // namespace DM
} // namespace DB