Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug for unexpected deleted ingest file (#2047) #2048

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(exception_during_write_to_storage) \
M(force_set_sst_to_dtfile_block_size) \
M(force_set_sst_decode_rand) \
M(exception_before_page_file_write_sync)
M(exception_before_page_file_write_sync) \
M(force_set_segment_ingest_packs_fail) \
M(segment_merge_after_ingest_packs)

#define APPLY_FOR_FAILPOINTS(M) M(force_set_page_file_write_errno)

Expand Down
71 changes: 53 additions & 18 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ extern const char pause_when_ingesting_to_dt_store[];
extern const char pause_when_altering_dt_store[];
extern const char force_triggle_background_merge_delta[];
extern const char force_triggle_foreground_flush[];
extern const char force_set_segment_ingest_packs_fail[];
extern const char segment_merge_after_ingest_packs[];
} // namespace FailPoints

namespace DM
Expand Down Expand Up @@ -544,10 +546,10 @@ void DeltaMergeStore::preIngestFile(const String & parent_path, const PageId fil
delegator.addDTFile(file_id, file_size, parent_path);
}

void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context,
const RowKeyRange & range,
std::vector<PageId> file_ids,
bool clear_data_in_range)
void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context,
const RowKeyRange & range,
const std::vector<PageId> & file_ids,
bool clear_data_in_range)
{
if (unlikely(shutdown_called.load(std::memory_order_relaxed)))
{
Expand All @@ -558,8 +560,6 @@ void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context,
throw Exception(msg);
}

LOG_INFO(log, __FUNCTION__ << " table: " << db_name << "." << table_name << ", region range:" << range.toDebugString());

EventRecorder write_block_recorder(ProfileEvents::DMWriteFile, ProfileEvents::DMWriteFileNS);

auto delegate = dm_context->path_pool.getStableDiskDelegator();
Expand All @@ -582,9 +582,9 @@ void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context,
bytes_on_disk += file->getBytesOnDisk();
}

LOG_DEBUG(log,
__FUNCTION__ << " table: " << db_name << "." << table_name << ", rows: " << rows << ", bytes: " << bytes << ", bytes on disk"
<< bytes_on_disk);
LOG_INFO(log,
__FUNCTION__ << " table: " << db_name << "." << table_name << ", rows: " << rows << ", bytes: " << bytes
<< ", bytes on disk: " << bytes_on_disk << ", region range: " << range.toDebugString());

Segments updated_segments;
RowKeyRange cur_range = range;
Expand Down Expand Up @@ -661,17 +661,18 @@ void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context,
// they are visible for readers who require file_ids to be found in PageStorage.
wbs.writeLogAndData();

if (segment->ingestPacks(*dm_context, range.shrink(segment_range), packs, clear_data_in_range))
bool ingest_success = segment->ingestPacks(*dm_context, range.shrink(segment_range), packs, clear_data_in_range);
fiu_do_on(FailPoints::force_set_segment_ingest_packs_fail, { ingest_success = false; });
if (ingest_success)
{
updated_segments.push_back(segment);
// Enable gc for DTFile once it has been committed.
for (size_t index = 0; index < my_file_used.size(); ++index)
{
auto & file = files[index];
if (my_file_used[index])
file->enableGC();
}
// only update `file_used` after ingest_success
file_used.swap(my_file_used);
fiu_do_on(FailPoints::segment_merge_after_ingest_packs, {
segment->flushCache(*dm_context);
segmentMergeDelta(*dm_context, segment, false);
storage_pool.gc(global_context.getSettingsRef(), StoragePool::Seconds(0));
});
break;
}
else
Expand All @@ -684,6 +685,40 @@ void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context,
cur_range.setEnd(range.end);
}

// Enable gc for DTFile after all segment applied.
// Note that we can not enable gc for them once they have applied to any segments.
// Assume that one segment (call `s0`) get compacted after file ingested, `gc_handle`
// gc the DTFiles before they get applied to all segments. Then we will apply some
// deleted DTFiles to other segments.
for (size_t index = 0; index < file_used.size(); ++index)
{
auto & file = files[index];
if (file_used[index])
file->enableGC();
}

{
// Add some logging about the ingested file ids and updated segments
std::stringstream ss;
// "ingest dmf_1001,1002,1003 into segment [1,3,5]"
ss << "ingest dmf_";
for (size_t i = 0; i < file_ids.size(); ++i)
{
if (i != 0)
ss << ",";
ss << file_ids[i];
}
ss << " into segment [";
for (size_t i = 0; i < updated_segments.size(); ++i)
{
if (i != 0)
ss << ",";
ss << updated_segments[i]->segmentId();
}
ss << "]";
LOG_INFO(log, __FUNCTION__ << " table: " << db_name << "." << table_name << ", " << ss.str());
}

GET_METRIC(dm_context->metrics, tiflash_storage_throughput_bytes, type_ingest).Increment(bytes);
GET_METRIC(dm_context->metrics, tiflash_storage_throughput_rows, type_ingest).Increment(rows);

Expand Down Expand Up @@ -1268,7 +1303,7 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy)
/* ignore_cache= */ false,
global_context.getSettingsRef().safe_point_update_interval_seconds);

LOG_DEBUG(log, "Task" << toString(task.type) << " GC safe point: " << safe_point);
LOG_DEBUG(log, "Task " << toString(task.type) << " GC safe point: " << safe_point);

// Foreground task don't get GC safe point from remote, but we better make it as up to date as possible.
latest_gc_safe_point = safe_point;
Expand Down
20 changes: 10 additions & 10 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,16 +270,16 @@ class DeltaMergeStore : private boost::noncopyable

void preIngestFile(const String & parent_path, const PageId file_id, size_t file_size);

void ingestFiles(const DMContextPtr & dm_context, //
const RowKeyRange & range,
std::vector<PageId> file_ids,
bool clear_data_in_range);

void ingestFiles(const Context & db_context, //
const DB::Settings & db_settings,
const RowKeyRange & range,
std::vector<PageId> file_ids,
bool clear_data_in_range)
void ingestFiles(const DMContextPtr & dm_context, //
const RowKeyRange & range,
const std::vector<PageId> & file_ids,
bool clear_data_in_range);

void ingestFiles(const Context & db_context, //
const DB::Settings & db_settings,
const RowKeyRange & range,
const std::vector<PageId> & file_ids,
bool clear_data_in_range)
{
auto dm_context = newDMContext(db_context, db_settings);
return ingestFiles(dm_context, range, file_ids, clear_data_in_range);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ bool SSTFilesToDTFilesOutputStream::newDTFileStream()
return false;
}
auto dt_file = DMFile::create(file_id, parent_path, flags.isSingleFile());
LOG_INFO(log, "Create file for snapshot data [file=" << dt_file->path() << "] [single_file_mode=" << flags.isSingleFile() << "]");
LOG_INFO(log,
"Create file for snapshot data " << child->getRegion()->toString(true) << " [file=" << dt_file->path()
<< "] [single_file_mode=" << flags.isSingleFile() << "]");
dt_stream = std::make_unique<DMFileBlockOutputStream>(tmt.getContext(), dt_file, *schema_snap, flags);
dt_stream->writePrefix();
ingest_files.emplace_back(dt_file);
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

#include <Common/TiFlashMetrics.h>
#include <DataStreams/ConcatBlockInputStream.h>
#include <DataStreams/EmptyBlockInputStream.h>
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,22 +105,22 @@ bool StoragePool::gc(const Settings & /*settings*/, const Seconds & try_gc_perio
last_try_gc_time = now;
}

bool ok = false;
bool done_anything = false;

// FIXME: The global_context.settings is mutable, we need a way to reload thses settings.
// auto config = extractConfig(settings, StorageType::Meta);
// meta_storage.reloadSettings(config);
ok |= meta_storage.gc();
done_anything |= meta_storage.gc();

// config = extractConfig(settings, StorageType::Data);
// data_storage.reloadSettings(config);
ok |= data_storage.gc();
done_anything |= data_storage.gc();

// config = extractConfig(settings, StorageType::Log);
// log_storage.reloadSettings(config);
ok |= log_storage.gc();
done_anything |= log_storage.gc();

return ok;
return done_anything;
}

} // namespace DM
Expand Down
144 changes: 140 additions & 4 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ extern const char pause_before_dt_background_delta_merge[];
extern const char pause_until_dt_background_delta_merge[];
extern const char force_triggle_background_merge_delta[];
extern const char force_triggle_foreground_flush[];
extern const char force_set_segment_ingest_packs_fail[];
extern const char segment_merge_after_ingest_packs[];
extern const char force_set_segment_physical_split[];
} // namespace FailPoints

namespace DM
Expand Down Expand Up @@ -873,7 +876,7 @@ try

{
// Prepare DTFiles for ingesting
auto dm_context = store->newDMContext(*context, context->getSettingsRef());
auto dm_context = store->newDMContext(*context, context->getSettingsRef());

auto [range1, file_ids1] = genDMFile(*dm_context, DMTestEnv::prepareSimpleWriteBlock(32, 48, false, tso2));
auto [range2, file_ids2] = genDMFile(*dm_context, DMTestEnv::prepareSimpleWriteBlock(80, 256, false, tso3));
Expand Down Expand Up @@ -1035,6 +1038,139 @@ try
}
CATCH

TEST_P(DeltaMergeStore_test, IngestWithFail)
try
{
if (mode == TestMode::V1_BlockOnly)
return;

const UInt64 tso1 = 4;
const size_t num_rows_before_ingest = 128;
// Write to store [0, 128)
{
Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_before_ingest, false, tso1);
store->write(*context, context->getSettingsRef(), std::move(block));

auto dm_context = store->newDMContext(*context, context->getSettingsRef());
store->flushCache(dm_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()));

SegmentPtr seg;
std::tie(std::ignore, seg) = *store->segments.begin();
store->segmentSplit(*dm_context, seg, /*is_foreground*/ true);
}

const UInt64 tso2 = 10;

{
// Prepare DTFiles for ingesting
auto dm_context = store->newDMContext(*context, context->getSettingsRef());
auto [ingest_range, file_ids] = genDMFile(*dm_context, DMTestEnv::prepareSimpleWriteBlock(32, 128, false, tso2));
// Enable failpoint for testing
FailPointHelper::enableFailPoint(FailPoints::force_set_segment_ingest_packs_fail);
FailPointHelper::enableFailPoint(FailPoints::segment_merge_after_ingest_packs);
store->ingestFiles(dm_context, ingest_range, file_ids, /*clear_data_in_range*/ true);
}


// After ingesting, the data in [32, 128) should be overwrite by the data in ingested files.
{
// Read all data <= tso1
// We can only get [0, 32) with tso1
const auto & columns = store->getTableColumns();
BlockInputStreams ins = store->read(*context,
context->getSettingsRef(),
columns,
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* max_version= */ tso1,
EMPTY_FILTER,
/* expected_block_size= */ 1024);
ASSERT_EQ(ins.size(), 1);
BlockInputStreamPtr in = ins[0];

size_t num_rows_read = 0;
in->readPrefix();
Int64 expect_pk = 0;
UInt64 expect_tso = tso1;
while (Block block = in->read())
{
ASSERT_TRUE(block.has(DMTestEnv::pk_name));
ASSERT_TRUE(block.has(VERSION_COLUMN_NAME));
auto pk_c = block.getByName(DMTestEnv::pk_name);
auto v_c = block.getByName(VERSION_COLUMN_NAME);
for (size_t i = 0; i < block.rows(); ++i)
{
// std::cerr << "pk:" << pk_c.column->getInt(i) << ", ver:" << v_c.column->getInt(i) << std::endl;
ASSERT_EQ(pk_c.column->getInt(i), expect_pk++);
ASSERT_EQ(v_c.column->getUInt(i), expect_tso);
}
num_rows_read += block.rows();
}
in->readSuffix();
EXPECT_EQ(num_rows_read, 32) << "Data [32, 128) before ingest should be erased, should only get [0, 32)";
}

{
// Read all data between [tso, tso2)
const auto & columns = store->getTableColumns();
BlockInputStreams ins = store->read(*context,
context->getSettingsRef(),
columns,
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* max_version= */ tso2 - 1,
EMPTY_FILTER,
/* expected_block_size= */ 1024);
ASSERT_EQ(ins.size(), 1);
BlockInputStreamPtr in = ins[0];

size_t num_rows_read = 0;
in->readPrefix();
Int64 expect_pk = 0;
UInt64 expect_tso = tso1;
while (Block block = in->read())
{
ASSERT_TRUE(block.has(DMTestEnv::pk_name));
ASSERT_TRUE(block.has(VERSION_COLUMN_NAME));
auto pk_c = block.getByName(DMTestEnv::pk_name);
auto v_c = block.getByName(VERSION_COLUMN_NAME);
for (size_t i = 0; i < block.rows(); ++i)
{
// std::cerr << "pk:" << pk_c.column->getInt(i) << ", ver:" << v_c.column->getInt(i) << std::endl;
ASSERT_EQ(pk_c.column->getInt(i), expect_pk++);
ASSERT_EQ(v_c.column->getUInt(i), expect_tso);
}
num_rows_read += block.rows();
}
in->readSuffix();
EXPECT_EQ(num_rows_read, 32) << "Data [32, 128) after ingest with tso less than: " << tso2
<< " are erased, should only get [0, 32)";
}

{
// Read all data between [tso2, tso3)
const auto & columns = store->getTableColumns();
BlockInputStreams ins = store->read(*context,
context->getSettingsRef(),
columns,
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits<UInt64>::max(),
EMPTY_FILTER,
/* expected_block_size= */ 1024);
ASSERT_EQ(ins.size(), 1);
BlockInputStreamPtr in = ins[0];

size_t num_rows_read = 0;
in->readPrefix();
while (Block block = in->read())
num_rows_read += block.rows();
in->readSuffix();
EXPECT_EQ(num_rows_read, 32 + 128 - 32) << "The rows number after ingest is not match";
}
}
CATCH

TEST_P(DeltaMergeStore_test, IngestEmptyFileLists)
try
{
Expand All @@ -1052,10 +1188,10 @@ try
// Test that if we ingest a empty file list, the data in range will be removed.
// The ingest range is [32, 256)
{
auto dm_context = store->newDMContext(*context, context->getSettingsRef());
auto dm_context = store->newDMContext(*context, context->getSettingsRef());

std::vector<PageId> file_ids ;
auto ingest_range = RowKeyRange::fromHandleRange(HandleRange{32, 256});
std::vector<PageId> file_ids;
auto ingest_range = RowKeyRange::fromHandleRange(HandleRange{32, 256});
store->ingestFiles(dm_context, ingest_range, file_ids, /*clear_data_in_range*/ true);
}

Expand Down