Skip to content

Commit

Permalink
ignore delmark when add minmax for pk column (#4746) (#4757)
Browse files Browse the repository at this point in the history
close #4747
  • Loading branch information
ti-chi-bot authored Jun 21, 2022
1 parent 02dfb20 commit fe9eb32
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 4 deletions.
10 changes: 8 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ void DMFileWriter::writeColumn(ColId col_id, const IDataType & type, const IColu
auto & minmax_indexs = single_file_stream->minmax_indexs;
if (auto iter = minmax_indexs.find(stream_name); iter != minmax_indexs.end())
{
iter->second->addPack(column, del_mark);
// For EXTRA_HANDLE_COLUMN_ID, we ignore del_mark when add minmax index.
// Because we need all rows which satisfy a certain range when place delta index no matter whether the row is a delete row.
iter->second->addPack(column, col_id == EXTRA_HANDLE_COLUMN_ID ? nullptr : del_mark);
}

auto offset_in_compressed_block = single_file_stream->original_hashing.offset();
Expand Down Expand Up @@ -217,7 +219,11 @@ void DMFileWriter::writeColumn(ColId col_id, const IDataType & type, const IColu
const auto name = DMFile::getFileNameBase(col_id, substream);
auto & stream = column_streams.at(name);
if (stream->minmaxes)
stream->minmaxes->addPack(column, del_mark);
{
// For EXTRA_HANDLE_COLUMN_ID, we ignore del_mark when add minmax index.
// Because we need all rows which satisfy a certain range when place delta index no matter whether the row is a delete row.
stream->minmaxes->addPack(column, col_id == EXTRA_HANDLE_COLUMN_ID ? nullptr : del_mark);
}

/// There could already be enough data to compress into the new block.
if (stream->original_hashing.offset() >= options.min_compress_block_size)
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,10 @@ class DMTestEnv
* @param ts_beg `timestamp`'s value begin
* @param ts_end `timestamp`'s value end (not included)
* @param reversed increasing/decreasing insert `timestamp`'s value
* @param deleted if deleted is false, set `tag` to 0; otherwise set `tag` to 1
* @return
*/
static Block prepareBlockWithTso(Int64 pk, size_t ts_beg, size_t ts_end, bool reversed = false)
static Block prepareBlockWithTso(Int64 pk, size_t ts_beg, size_t ts_end, bool reversed = false, bool deleted = false)
{
Block block;
const size_t num_rows = (ts_end - ts_beg);
Expand Down Expand Up @@ -240,7 +241,7 @@ class DMTestEnv
column_data.resize(num_rows);
for (size_t i = 0; i < num_rows; ++i)
{
column_data[i] = 0;
column_data[i] = deleted ? 1 : 0;
}
tag_col.column = std::move(m_col);
}
Expand Down
36 changes: 36 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,42 @@ try
}
CATCH

TEST_F(Segment_test, WriteRead2)
try
{
const size_t num_rows_write = dmContext().stable_pack_rows;
{
// write a block with rows all deleted
Block block = DMTestEnv::prepareBlockWithTso(2, 100, 100 + num_rows_write, false, true);
segment->write(dmContext(), block);
// write not deleted rows with larger pk
Block block2 = DMTestEnv::prepareBlockWithTso(3, 100, 100 + num_rows_write, false, false);
segment->write(dmContext(), block2);

// flush segment and make sure there is two packs in stable
segment = segment->mergeDelta(dmContext(), tableColumns());
ASSERT_EQ(segment->getStable()->getPacks(), 2);
}

{
Block block = DMTestEnv::prepareBlockWithTso(1, 100, 100 + num_rows_write, false, false);
segment->write(dmContext(), block);
}

{
auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)});
size_t num_rows_read = 0;
in->readPrefix();
while (Block block = in->read())
{
num_rows_read += block.rows();
}
in->readSuffix();
// only write two visible pks
ASSERT_EQ(num_rows_read, 2);
}
}
CATCH

TEST_F(Segment_test, ReadWithMoreAdvacedDeltaIndex)
try
Expand Down

0 comments on commit fe9eb32

Please sign in to comment.