Skip to content

Commit

Permalink
Fix bug for unexpected deleted ingest file (#2047) (#2048)
Browse files Browse the repository at this point in the history
* Only enable GC for DTFiles after they get applied to all segments.
* Refine some loggings

Signed-off-by: JaySon-Huang <jayson.hjs@gmail.com>
  • Loading branch information
JaySon-Huang authored Jun 1, 2021
1 parent 8c3bfc4 commit b329618
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 40 deletions.
4 changes: 3 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(exception_during_write_to_storage) \
M(force_set_sst_to_dtfile_block_size) \
M(force_set_sst_decode_rand) \
M(exception_before_page_file_write_sync)
M(exception_before_page_file_write_sync) \
M(force_set_segment_ingest_packs_fail) \
M(segment_merge_after_ingest_packs)

#define APPLY_FOR_FAILPOINTS(M) M(force_set_page_file_write_errno)

Expand Down
71 changes: 53 additions & 18 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ extern const char pause_when_ingesting_to_dt_store[];
extern const char pause_when_altering_dt_store[];
extern const char force_triggle_background_merge_delta[];
extern const char force_triggle_foreground_flush[];
extern const char force_set_segment_ingest_packs_fail[];
extern const char segment_merge_after_ingest_packs[];
} // namespace FailPoints

namespace DM
Expand Down Expand Up @@ -544,10 +546,10 @@ void DeltaMergeStore::preIngestFile(const String & parent_path, const PageId fil
delegator.addDTFile(file_id, file_size, parent_path);
}

void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context,
const RowKeyRange & range,
std::vector<PageId> file_ids,
bool clear_data_in_range)
void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context,
const RowKeyRange & range,
const std::vector<PageId> & file_ids,
bool clear_data_in_range)
{
if (unlikely(shutdown_called.load(std::memory_order_relaxed)))
{
Expand All @@ -558,8 +560,6 @@ void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context,
throw Exception(msg);
}

LOG_INFO(log, __FUNCTION__ << " table: " << db_name << "." << table_name << ", region range:" << range.toDebugString());

EventRecorder write_block_recorder(ProfileEvents::DMWriteFile, ProfileEvents::DMWriteFileNS);

auto delegate = dm_context->path_pool.getStableDiskDelegator();
Expand All @@ -582,9 +582,9 @@ void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context,
bytes_on_disk += file->getBytesOnDisk();
}

LOG_DEBUG(log,
__FUNCTION__ << " table: " << db_name << "." << table_name << ", rows: " << rows << ", bytes: " << bytes << ", bytes on disk"
<< bytes_on_disk);
LOG_INFO(log,
__FUNCTION__ << " table: " << db_name << "." << table_name << ", rows: " << rows << ", bytes: " << bytes
<< ", bytes on disk: " << bytes_on_disk << ", region range: " << range.toDebugString());

Segments updated_segments;
RowKeyRange cur_range = range;
Expand Down Expand Up @@ -661,17 +661,18 @@ void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context,
// they are visible for readers who require file_ids to be found in PageStorage.
wbs.writeLogAndData();

if (segment->ingestPacks(*dm_context, range.shrink(segment_range), packs, clear_data_in_range))
bool ingest_success = segment->ingestPacks(*dm_context, range.shrink(segment_range), packs, clear_data_in_range);
fiu_do_on(FailPoints::force_set_segment_ingest_packs_fail, { ingest_success = false; });
if (ingest_success)
{
updated_segments.push_back(segment);
// Enable gc for DTFile once it has been committed.
for (size_t index = 0; index < my_file_used.size(); ++index)
{
auto & file = files[index];
if (my_file_used[index])
file->enableGC();
}
// only update `file_used` after ingest_success
file_used.swap(my_file_used);
fiu_do_on(FailPoints::segment_merge_after_ingest_packs, {
segment->flushCache(*dm_context);
segmentMergeDelta(*dm_context, segment, false);
storage_pool.gc(global_context.getSettingsRef(), StoragePool::Seconds(0));
});
break;
}
else
Expand All @@ -684,6 +685,40 @@ void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context,
cur_range.setEnd(range.end);
}

// Enable gc for DTFile after all segment applied.
// Note that we can not enable gc for them once they have applied to any segments.
// Assume that one segment (call `s0`) get compacted after file ingested, `gc_handle`
// gc the DTFiles before they get applied to all segments. Then we will apply some
// deleted DTFiles to other segments.
for (size_t index = 0; index < file_used.size(); ++index)
{
auto & file = files[index];
if (file_used[index])
file->enableGC();
}

{
// Add some logging about the ingested file ids and updated segments
std::stringstream ss;
// "ingest dmf_1001,1002,1003 into segment [1,3,5]"
ss << "ingest dmf_";
for (size_t i = 0; i < file_ids.size(); ++i)
{
if (i != 0)
ss << ",";
ss << file_ids[i];
}
ss << " into segment [";
for (size_t i = 0; i < updated_segments.size(); ++i)
{
if (i != 0)
ss << ",";
ss << updated_segments[i]->segmentId();
}
ss << "]";
LOG_INFO(log, __FUNCTION__ << " table: " << db_name << "." << table_name << ", " << ss.str());
}

GET_METRIC(dm_context->metrics, tiflash_storage_throughput_bytes, type_ingest).Increment(bytes);
GET_METRIC(dm_context->metrics, tiflash_storage_throughput_rows, type_ingest).Increment(rows);

Expand Down Expand Up @@ -1268,7 +1303,7 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy)
/* ignore_cache= */ false,
global_context.getSettingsRef().safe_point_update_interval_seconds);

LOG_DEBUG(log, "Task" << toString(task.type) << " GC safe point: " << safe_point);
LOG_DEBUG(log, "Task " << toString(task.type) << " GC safe point: " << safe_point);

// Foreground task don't get GC safe point from remote, but we better make it as up to date as possible.
latest_gc_safe_point = safe_point;
Expand Down
20 changes: 10 additions & 10 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,16 +270,16 @@ class DeltaMergeStore : private boost::noncopyable

void preIngestFile(const String & parent_path, const PageId file_id, size_t file_size);

void ingestFiles(const DMContextPtr & dm_context, //
const RowKeyRange & range,
std::vector<PageId> file_ids,
bool clear_data_in_range);

void ingestFiles(const Context & db_context, //
const DB::Settings & db_settings,
const RowKeyRange & range,
std::vector<PageId> file_ids,
bool clear_data_in_range)
void ingestFiles(const DMContextPtr & dm_context, //
const RowKeyRange & range,
const std::vector<PageId> & file_ids,
bool clear_data_in_range);

void ingestFiles(const Context & db_context, //
const DB::Settings & db_settings,
const RowKeyRange & range,
const std::vector<PageId> & file_ids,
bool clear_data_in_range)
{
auto dm_context = newDMContext(db_context, db_settings);
return ingestFiles(dm_context, range, file_ids, clear_data_in_range);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ bool SSTFilesToDTFilesOutputStream::newDTFileStream()
return false;
}
auto dt_file = DMFile::create(file_id, parent_path, flags.isSingleFile());
LOG_INFO(log, "Create file for snapshot data [file=" << dt_file->path() << "] [single_file_mode=" << flags.isSingleFile() << "]");
LOG_INFO(log,
"Create file for snapshot data " << child->getRegion()->toString(true) << " [file=" << dt_file->path()
<< "] [single_file_mode=" << flags.isSingleFile() << "]");
dt_stream = std::make_unique<DMFileBlockOutputStream>(tmt.getContext(), dt_file, *schema_snap, flags);
dt_stream->writePrefix();
ingest_files.emplace_back(dt_file);
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

#include <Common/TiFlashMetrics.h>
#include <DataStreams/ConcatBlockInputStream.h>
#include <DataStreams/EmptyBlockInputStream.h>
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,22 +105,22 @@ bool StoragePool::gc(const Settings & /*settings*/, const Seconds & try_gc_perio
last_try_gc_time = now;
}

bool ok = false;
bool done_anything = false;

// FIXME: The global_context.settings is mutable, we need a way to reload thses settings.
// auto config = extractConfig(settings, StorageType::Meta);
// meta_storage.reloadSettings(config);
ok |= meta_storage.gc();
done_anything |= meta_storage.gc();

// config = extractConfig(settings, StorageType::Data);
// data_storage.reloadSettings(config);
ok |= data_storage.gc();
done_anything |= data_storage.gc();

// config = extractConfig(settings, StorageType::Log);
// log_storage.reloadSettings(config);
ok |= log_storage.gc();
done_anything |= log_storage.gc();

return ok;
return done_anything;
}

} // namespace DM
Expand Down
144 changes: 140 additions & 4 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ extern const char pause_before_dt_background_delta_merge[];
extern const char pause_until_dt_background_delta_merge[];
extern const char force_triggle_background_merge_delta[];
extern const char force_triggle_foreground_flush[];
extern const char force_set_segment_ingest_packs_fail[];
extern const char segment_merge_after_ingest_packs[];
extern const char force_set_segment_physical_split[];
} // namespace FailPoints

namespace DM
Expand Down Expand Up @@ -873,7 +876,7 @@ try

{
// Prepare DTFiles for ingesting
auto dm_context = store->newDMContext(*context, context->getSettingsRef());
auto dm_context = store->newDMContext(*context, context->getSettingsRef());

auto [range1, file_ids1] = genDMFile(*dm_context, DMTestEnv::prepareSimpleWriteBlock(32, 48, false, tso2));
auto [range2, file_ids2] = genDMFile(*dm_context, DMTestEnv::prepareSimpleWriteBlock(80, 256, false, tso3));
Expand Down Expand Up @@ -1035,6 +1038,139 @@ try
}
CATCH

TEST_P(DeltaMergeStore_test, IngestWithFail)
try
{
if (mode == TestMode::V1_BlockOnly)
return;

const UInt64 tso1 = 4;
const size_t num_rows_before_ingest = 128;
// Write to store [0, 128)
{
Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_before_ingest, false, tso1);
store->write(*context, context->getSettingsRef(), std::move(block));

auto dm_context = store->newDMContext(*context, context->getSettingsRef());
store->flushCache(dm_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()));

SegmentPtr seg;
std::tie(std::ignore, seg) = *store->segments.begin();
store->segmentSplit(*dm_context, seg, /*is_foreground*/ true);
}

const UInt64 tso2 = 10;

{
// Prepare DTFiles for ingesting
auto dm_context = store->newDMContext(*context, context->getSettingsRef());
auto [ingest_range, file_ids] = genDMFile(*dm_context, DMTestEnv::prepareSimpleWriteBlock(32, 128, false, tso2));
// Enable failpoint for testing
FailPointHelper::enableFailPoint(FailPoints::force_set_segment_ingest_packs_fail);
FailPointHelper::enableFailPoint(FailPoints::segment_merge_after_ingest_packs);
store->ingestFiles(dm_context, ingest_range, file_ids, /*clear_data_in_range*/ true);
}


// After ingesting, the data in [32, 128) should be overwrite by the data in ingested files.
{
// Read all data <= tso1
// We can only get [0, 32) with tso1
const auto & columns = store->getTableColumns();
BlockInputStreams ins = store->read(*context,
context->getSettingsRef(),
columns,
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* max_version= */ tso1,
EMPTY_FILTER,
/* expected_block_size= */ 1024);
ASSERT_EQ(ins.size(), 1);
BlockInputStreamPtr in = ins[0];

size_t num_rows_read = 0;
in->readPrefix();
Int64 expect_pk = 0;
UInt64 expect_tso = tso1;
while (Block block = in->read())
{
ASSERT_TRUE(block.has(DMTestEnv::pk_name));
ASSERT_TRUE(block.has(VERSION_COLUMN_NAME));
auto pk_c = block.getByName(DMTestEnv::pk_name);
auto v_c = block.getByName(VERSION_COLUMN_NAME);
for (size_t i = 0; i < block.rows(); ++i)
{
// std::cerr << "pk:" << pk_c.column->getInt(i) << ", ver:" << v_c.column->getInt(i) << std::endl;
ASSERT_EQ(pk_c.column->getInt(i), expect_pk++);
ASSERT_EQ(v_c.column->getUInt(i), expect_tso);
}
num_rows_read += block.rows();
}
in->readSuffix();
EXPECT_EQ(num_rows_read, 32) << "Data [32, 128) before ingest should be erased, should only get [0, 32)";
}

{
// Read all data between [tso, tso2)
const auto & columns = store->getTableColumns();
BlockInputStreams ins = store->read(*context,
context->getSettingsRef(),
columns,
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* max_version= */ tso2 - 1,
EMPTY_FILTER,
/* expected_block_size= */ 1024);
ASSERT_EQ(ins.size(), 1);
BlockInputStreamPtr in = ins[0];

size_t num_rows_read = 0;
in->readPrefix();
Int64 expect_pk = 0;
UInt64 expect_tso = tso1;
while (Block block = in->read())
{
ASSERT_TRUE(block.has(DMTestEnv::pk_name));
ASSERT_TRUE(block.has(VERSION_COLUMN_NAME));
auto pk_c = block.getByName(DMTestEnv::pk_name);
auto v_c = block.getByName(VERSION_COLUMN_NAME);
for (size_t i = 0; i < block.rows(); ++i)
{
// std::cerr << "pk:" << pk_c.column->getInt(i) << ", ver:" << v_c.column->getInt(i) << std::endl;
ASSERT_EQ(pk_c.column->getInt(i), expect_pk++);
ASSERT_EQ(v_c.column->getUInt(i), expect_tso);
}
num_rows_read += block.rows();
}
in->readSuffix();
EXPECT_EQ(num_rows_read, 32) << "Data [32, 128) after ingest with tso less than: " << tso2
<< " are erased, should only get [0, 32)";
}

{
// Read all data between [tso2, tso3)
const auto & columns = store->getTableColumns();
BlockInputStreams ins = store->read(*context,
context->getSettingsRef(),
columns,
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits<UInt64>::max(),
EMPTY_FILTER,
/* expected_block_size= */ 1024);
ASSERT_EQ(ins.size(), 1);
BlockInputStreamPtr in = ins[0];

size_t num_rows_read = 0;
in->readPrefix();
while (Block block = in->read())
num_rows_read += block.rows();
in->readSuffix();
EXPECT_EQ(num_rows_read, 32 + 128 - 32) << "The rows number after ingest is not match";
}
}
CATCH

TEST_P(DeltaMergeStore_test, IngestEmptyFileLists)
try
{
Expand All @@ -1052,10 +1188,10 @@ try
// Test that if we ingest a empty file list, the data in range will be removed.
// The ingest range is [32, 256)
{
auto dm_context = store->newDMContext(*context, context->getSettingsRef());
auto dm_context = store->newDMContext(*context, context->getSettingsRef());

std::vector<PageId> file_ids ;
auto ingest_range = RowKeyRange::fromHandleRange(HandleRange{32, 256});
std::vector<PageId> file_ids;
auto ingest_range = RowKeyRange::fromHandleRange(HandleRange{32, 256});
store->ingestFiles(dm_context, ingest_range, file_ids, /*clear_data_in_range*/ true);
}

Expand Down

0 comments on commit b329618

Please sign in to comment.