Skip to content

Commit

Permalink
Always use readable pathname in DMFileReader (#5958) (#5983)
Browse files Browse the repository at this point in the history
close #5956
  • Loading branch information
ti-chi-bot authored Sep 22, 2022
1 parent f01f4b1 commit 6a305d3
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 11 deletions.
7 changes: 1 addition & 6 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,7 @@ DMFileBlockInputStreamBuilder::DMFileBlockInputStreamBuilder(const Context & con

DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr & dmfile, const ColumnDefines & read_columns, const RowKeyRanges & rowkey_ranges)
{
if (dmfile->getStatus() != DMFile::Status::READABLE)
throw Exception(fmt::format(
"DMFile [{}] is expected to be in READABLE status, but: {}",
dmfile->fileId(),
DMFile::statusString(dmfile->getStatus())),
ErrorCodes::LOGICAL_ERROR);
RUNTIME_CHECK(dmfile->getStatus() == DMFile::Status::READABLE, dmfile->fileId(), DMFile::statusString(dmfile->getStatus()));

// if `rowkey_ranges` is empty, we unconditionally read all packs
// `rowkey_ranges` and `is_common_handle` will only be useful in clean read mode.
Expand Down
8 changes: 3 additions & 5 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,11 @@ class DMFileReader
/// Return false if it is the end of stream.
bool getSkippedRows(size_t & skip_rows);
Block read();
UInt64 fileId() const
{
return dmfile->fileId();
}
std::string path() const
{
return dmfile->path();
// Status of DMFile can be updated when DMFileReader in used and the pathname will be changed.
// For DMFileReader, always use the readable path.
return DMFile::getPathByStatus(dmfile->parentPath(), dmfile->fileId(), DMFile::Status::READABLE);
}
void addCachedPacks(ColId col_id, size_t start_pack_id, size_t pack_count, ColumnPtr & col);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,11 @@ void DMFileReaderPool::set(DMFileReader & from_reader, int64_t col_id, size_t st
r->addCachedPacks(col_id, start, count, col);
}
}

DMFileReader * DMFileReaderPool::get(const std::string & name)
{
std::lock_guard lock(mtx);
auto itr = readers.find(name);
return itr != readers.end() && !itr->second.empty() ? *(itr->second.begin()) : nullptr;
}
} // namespace DB::DM
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ class DMFileReaderPool
void add(DMFileReader & reader);
void del(DMFileReader & reader);
void set(DMFileReader & from_reader, int64_t col_id, size_t start, size_t count, ColumnPtr & col);
// `get` is just for test.
DMFileReader * get(const std::string & name);

private:
DMFileReaderPool() = default;
Expand Down
94 changes: 94 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_segment_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/File/DMFileReader.h>
#include <Storages/DeltaMerge/Filter/RSOperator.h>
#include <Storages/DeltaMerge/PKSquashingBlockInputStream.h>
#include <Storages/DeltaMerge/ReadThread/ColumnSharingCache.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/tests/DMTestEnv.h>
#include <Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h>
Expand Down Expand Up @@ -127,6 +129,98 @@ try
}
CATCH


TEST_P(DeltaMergeStoreRWTest, DMFileNameChangedInDMFileReadPool)
try
{
const ColumnDefine col_str_define(2, "col2", std::make_shared<DataTypeString>());
const ColumnDefine col_i8_define(3, "i8", std::make_shared<DataTypeInt8>());
{
auto table_column_defines = DMTestEnv::getDefaultColumns();
table_column_defines->emplace_back(col_str_define);
table_column_defines->emplace_back(col_i8_define);
store = reload(table_column_defines);
}

constexpr size_t num_rows_write = 128;
// Ensure stable is not empty.
{
auto block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false);
block.insert(DB::tests::createColumn<String>(
createNumberStrings(0, num_rows_write),
col_str_define.name,
col_str_define.id));
block.insert(DB::tests::createColumn<Int8>(
createSignedNumbers(0, num_rows_write),
col_i8_define.name,
col_i8_define.id));
store->write(*db_context, db_context->getSettingsRef(), block);
ASSERT_TRUE(store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())));
store->mergeDeltaAll(*db_context);
auto stable = store->id_to_segment.begin()->second->getStable();
ASSERT_EQ(stable->getRows(), num_rows_write);
}

// Ensure delta is not empty.
{
auto beg = num_rows_write;
auto end = num_rows_write + num_rows_write;
auto block = DMTestEnv::prepareSimpleWriteBlock(beg, end, false);
block.insert(DB::tests::createColumn<String>(
createNumberStrings(beg, end),
col_str_define.name,
col_str_define.id));
block.insert(DB::tests::createColumn<Int8>(
createSignedNumbers(beg, end),
col_i8_define.name,
col_i8_define.id));
store->write(*db_context, db_context->getSettingsRef(), block);
auto delta = store->id_to_segment.begin()->second->getDelta();
ASSERT_EQ(delta->getRows(), num_rows_write);
}

// Check DMFile
const auto & dmfiles = store->id_to_segment.begin()->second->getStable()->getDMFiles();
ASSERT_EQ(dmfiles.size(), 1);
auto dmfile = dmfiles.front();
auto readable_path = DMFile::getPathByStatus(dmfile->parentPath(), dmfile->fileId(), DMFile::Status::READABLE);
ASSERT_EQ(dmfile->path(), readable_path);
ASSERT_EQ(DMFileReaderPool::instance().get(readable_path), nullptr);

{
const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(*db_context,
db_context->getSettingsRef(),
columns,
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits<UInt64>::max(),
EMPTY_FILTER,
TRACING_NAME,
/* keep_order= */ false,
/* is_fast_scan= */ false,
/* expected_block_size= */ 128)[0];
auto blk = in->read();
// DMFileReader is created and add to DMFileReaderPool.
auto * reader = DMFileReaderPool::instance().get(readable_path);
ASSERT_NE(reader, nullptr);
ASSERT_EQ(reader->path(), readable_path);

// Update DMFile.
ASSERT_TRUE(store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())));
store->mergeDeltaAll(*db_context);
auto stable = store->id_to_segment.begin()->second->getStable();
ASSERT_EQ(stable->getRows(), 2 * num_rows_write);

dmfile->remove(db_context->getFileProvider());
ASSERT_NE(dmfile->path(), readable_path);

in = nullptr;
ASSERT_EQ(DMFileReaderPool::instance().get(readable_path), nullptr);
}
}
CATCH

} // namespace tests
} // namespace DM
} // namespace DB

0 comments on commit 6a305d3

Please sign in to comment.