Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#4225
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
hehechen committed Apr 25, 2022
1 parent f7643a6 commit c8258fc
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 0 deletions.
100 changes: 100 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFileFlushTask.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#include <Common/TiFlashMetrics.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/Delta/ColumnFileFlushTask.h>
#include <Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h>
#include <Storages/DeltaMerge/Delta/MemTableSet.h>

namespace ProfileEvents
{
extern const Event DMWriteBytes;
extern const Event PSMWriteBytes;
extern const Event WriteBufferFromFileDescriptorWriteBytes;
extern const Event WriteBufferAIOWriteBytes;
} // namespace ProfileEvents

namespace DB
{
namespace DM
{
ColumnFileFlushTask::ColumnFileFlushTask(DMContext & context_, const MemTableSetPtr & mem_table_set_, size_t flush_version_)
: context{context_}
, mem_table_set{mem_table_set_}
, flush_version{flush_version_}
{}

DeltaIndex::Updates ColumnFileFlushTask::prepare(WriteBatches & wbs)
{
DeltaIndex::Updates delta_index_updates;
/// Write prepared data to disk.
for (auto & task : tasks)
{
if (!task.block_data)
continue;

IColumn::Permutation perm;
task.sorted = sortBlockByPk(getExtraHandleColumnDefine(context.is_common_handle), task.block_data, perm);
if (task.sorted)
delta_index_updates.emplace_back(task.deletes_offset, task.rows_offset, perm);

task.data_page = ColumnFileTiny::writeColumnFileData(context, task.block_data, 0, task.block_data.rows(), wbs);
}

wbs.writeLogAndData();
return delta_index_updates;
}

bool ColumnFileFlushTask::commit(ColumnFilePersistedSetPtr & persisted_file_set, WriteBatches & wbs)
{
if (!persisted_file_set->checkAndIncreaseFlushVersion(flush_version))
return false;

/// Create new column file instance for ColumnFilePersistedSet
ColumnFilePersisteds new_column_files;
for (auto & task : tasks)
{
ColumnFilePersistedPtr new_column_file;
if (auto * m_file = task.column_file->tryToInMemoryFile(); m_file)
{
new_column_file = std::make_shared<ColumnFileTiny>(m_file->getSchema(),
m_file->getRows(),
m_file->getBytes(),
task.data_page);
}
else if (auto * t_file = task.column_file->tryToTinyFile(); t_file)
{
new_column_file = std::make_shared<ColumnFileTiny>(*t_file);
}
else if (auto * b_file = task.column_file->tryToBigFile(); b_file)
{
new_column_file = std::make_shared<ColumnFileBig>(*b_file);
}
else if (auto * d_file = task.column_file->tryToDeleteRange(); d_file)
{
new_column_file = std::make_shared<ColumnFileDeleteRange>(*d_file);
}
else
{
throw Exception("Unexpected column file type", ErrorCodes::LOGICAL_ERROR);
}
new_column_files.push_back(new_column_file);
}

// serialize metadata and update persisted_file_set
if (!persisted_file_set->appendPersistedColumnFilesToLevel0(new_column_files, wbs))
return false;

mem_table_set->removeColumnFilesInFlushTask(*this);

// Also update the write amplification
auto total_write = ProfileEvents::counters[ProfileEvents::DMWriteBytes].load(std::memory_order_relaxed);
auto actual_write = ProfileEvents::counters[ProfileEvents::PSMWriteBytes].load(std::memory_order_relaxed)
+ ProfileEvents::counters[ProfileEvents::WriteBufferFromFileDescriptorWriteBytes].load(std::memory_order_relaxed)
+ ProfileEvents::counters[ProfileEvents::WriteBufferAIOWriteBytes].load(std::memory_order_relaxed);
GET_METRIC(tiflash_storage_write_amplification)
.Set((static_cast<double>(actual_write) / 1024 / 1024) / (static_cast<double>(total_write) / 1024 / 1024));
return true;
}
} // namespace DM
} // namespace DB
88 changes: 88 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3201,6 +3201,94 @@ try
}
CATCH

TEST_P(DeltaMergeStoreRWTest, DisableSmallColumnCache)
try
{
auto settings = db_context->getSettings();

size_t num_rows_write_in_total = 0;
const size_t num_rows_per_write = 5;
while (true)
{
{
// write to store
Block block = DMTestEnv::prepareSimpleWriteBlock(
num_rows_write_in_total + 1,
num_rows_write_in_total + 1 + num_rows_per_write,
false);

store->write(*db_context, settings, block);

store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()));
num_rows_write_in_total += num_rows_per_write;
auto segment_stats = store->getSegmentStats();
size_t delta_cache_size = 0;
for (auto & stat : segment_stats)
{
delta_cache_size += stat.delta_cache_size;
}
EXPECT_EQ(delta_cache_size, 0);
}

{
// Let's reload the store to check the persistence system.
// Note: store must be released before load another, because some background task could be still running.
store.reset();
store = reload();

// read all columns from store
const auto & columns = store->getTableColumns();
BlockInputStreams ins = store->read(*db_context,
db_context->getSettingsRef(),
// settings,
columns,
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits<UInt64>::max(),
EMPTY_FILTER,
/* expected_block_size= */ 1024);
ASSERT_EQ(ins.size(), 1UL);
BlockInputStreamPtr in = ins[0];

LOG_FMT_TRACE(&Poco::Logger::get(GET_GTEST_FULL_NAME), "start to check data of [1,{}]", num_rows_write_in_total);

size_t num_rows_read = 0;
in->readPrefix();
Int64 expected_row_pk = 1;
while (Block block = in->read())
{
num_rows_read += block.rows();
for (auto && iter : block)
{
auto c = iter.column;
if (iter.name == DMTestEnv::pk_name)
{
for (size_t i = 0; i < c->size(); ++i)
{
auto expected = expected_row_pk++;
auto value = c->getInt(i);
if (value != expected)
{
// Convenient for debug.
EXPECT_EQ(expected, value);
}
}
}
}
}
in->readSuffix();
ASSERT_EQ(num_rows_read, num_rows_write_in_total);

LOG_FMT_TRACE(&Poco::Logger::get(GET_GTEST_FULL_NAME), "done checking data of [1,{}]", num_rows_write_in_total);
}

// Reading with a large number of small DTFile ingested will greatly slow down the testing
if (num_rows_write_in_total >= 200)
break;
}
}
CATCH

INSTANTIATE_TEST_CASE_P(TestMode, //
DeltaMergeStore_RWTest,
testing::Values(TestMode::V1_BlockOnly, TestMode::V2_BlockOnly, TestMode::V2_FileOnly, TestMode::V2_Mix),
Expand Down

0 comments on commit c8258fc

Please sign in to comment.