Skip to content

Commit

Permalink
Do background compaction when the ratio of stable data covered by del…
Browse files Browse the repository at this point in the history
…ete range is too large (#3703)

close #3659
  • Loading branch information
lidezhu authored Jun 3, 2022
1 parent 9856019 commit 2f8f481
Show file tree
Hide file tree
Showing 17 changed files with 336 additions and 21 deletions.
3 changes: 3 additions & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ struct Settings
M(SettingUInt64, dt_segment_delta_small_pack_size, 524288, "Determine whether a pack in delta is small or not. 512 KB by default.")\
M(SettingUInt64, dt_segment_stable_pack_rows, DEFAULT_MERGE_BLOCK_SIZE, "Expected stable pack rows in DeltaTree Engine.")\
M(SettingFloat, dt_segment_wait_duration_factor, 1, "The factor of wait duration in a write stall.")\
M(SettingUInt64, dt_bg_gc_check_interval, 5, "Background gc thread check interval, the unit is second.")\
M(SettingInt64, dt_bg_gc_max_segments_to_check_every_round, 100, "Max segments to check in every gc round, value less than or equal to 0 means gc no segments.")\
M(SettingFloat, dt_bg_gc_delta_delete_ratio_to_trigger_gc, 0.3, "Trigger segment's gc when the ratio of delta delete range to stable exceeds this ratio.")\
M(SettingUInt64, dt_insert_max_rows, 0, "Max rows of insert blocks when write into DeltaTree Engine. By default 0 means no limit.")\
M(SettingBool, dt_enable_rough_set_filter, true, "Whether to parse where expression as Rough Set Index filter or not.") \
M(SettingBool, dt_raw_filter_range, true, "Do range filter or not when read data in raw mode in DeltaTree Engine.")\
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ class DeltaValueSnapshot : public std::enable_shared_from_this<DeltaValueSnapsho
size_t getBytes() const { return bytes; }
size_t getDeletes() const { return deletes; }

RowKeyRange getSquashDeleteRange() const;

const auto & getStorageSnapshot() { return storage_snap; }
const auto & getSharedDeltaIndex() { return shared_delta_index; }
};
Expand Down
13 changes: 13 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,19 @@ DeltaSnapshotPtr DeltaValueSpace::createSnapshot(const DMContext & context, bool
return snap;
}

RowKeyRange DeltaValueSnapshot::getSquashDeleteRange() const
{
RowKeyRange squashed_delete_range = RowKeyRange::newNone(is_common_handle, rowkey_column_size);
for (auto iter = packs.cbegin(); iter != packs.cend(); ++iter)
{
const auto & pack = *iter;
if (auto dp_delete = pack->tryToDeleteRange(); dp_delete)
squashed_delete_range = squashed_delete_range.merge(dp_delete->getDeleteRange());
}
return squashed_delete_range;
}


// ================================================
// DeltaValueReader
// ================================================
Expand Down
187 changes: 172 additions & 15 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context,
original_table_handle_define(handle),
background_pool(db_context.getBackgroundPool()),
blockable_background_pool(db_context.getBlockableBackgroundPool()),
next_gc_check_key(is_common_handle ? RowKeyValue::COMMON_HANDLE_MIN_KEY : RowKeyValue::INT_HANDLE_MIN_KEY),
log(&Logger::get("DeltaMergeStore[" + db_name + "." + table_name + "]"))
{
LOG_INFO(log, "Restore DeltaMerge Store start [" << db_name << "." << table_name << "]");
Expand Down Expand Up @@ -1248,25 +1249,32 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
try_place_delta_index();
}

bool DeltaMergeStore::handleBackgroundTask(bool heavy)
bool DeltaMergeStore::updateGCSafePoint()
{
auto task = background_tasks.nextTask(heavy, log);
if (!task)
return false;

// Update GC safe point before background task
/// Note that `task.dm_context->db_context` will be free after query is finish. We should not use that in background task.
if (auto pd_client = global_context.getTMTContext().getPDClient(); !pd_client->isMock())
{
auto safe_point = PDClientHelper::getGCSafePointWithRetry(pd_client,
/* ignore_cache= */ false,
global_context.getSettingsRef().safe_point_update_interval_seconds);
latest_gc_safe_point.store(safe_point, std::memory_order_release);
return true;
}
return false;
}

LOG_DEBUG(log, "Task" << toString(task.type) << " GC safe point: " << safe_point);
bool DeltaMergeStore::handleBackgroundTask(bool heavy)
{
auto task = background_tasks.nextTask(heavy, log);
if (!task)
return false;

// Foreground task don't get GC safe point from remote, but we better make it as up to date as possible.
latest_gc_safe_point = safe_point;
task.dm_context->min_version = safe_point;
// Update GC safe point before background task
// Foreground task don't get GC safe point from remote, but we better make it as up to date as possible.
if (updateGCSafePoint())
{
/// Note that `task.dm_context->db_context` will be free after query is finish. We should not use that in background task.
task.dm_context->min_version = latest_gc_safe_point.load(std::memory_order_relaxed);
LOG_DEBUG(log, "Task " << toString(task.type) << " GC safe point: " << task.dm_context->min_version);
}

SegmentPtr left, right;
Expand Down Expand Up @@ -1332,6 +1340,152 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy)
return true;
}

namespace GC
{
bool shouldCompactDeltaWithStable(
const DMContext & context, const SegmentSnapshotPtr & snap, const RowKeyRange & segment_range, double ratio_threshold, Logger * log)
{
auto actual_delete_range = snap->delta->getSquashDeleteRange().shrink(segment_range);
if (actual_delete_range.none())
return false;

auto [delete_rows, delete_bytes] = snap->stable->getApproxRowsAndBytes(context, actual_delete_range);

auto stable_rows = snap->stable->getRows();
auto stable_bytes = snap->stable->getBytes();

LOG_TRACE(log,
__PRETTY_FUNCTION__ << " delete range rows [" << delete_rows << "], delete_bytes [" << delete_bytes << "] stable_rows ["
<< stable_rows << "] stable_bytes [" << stable_bytes << "]");

// 1. for small tables, the data may just reside in delta and stable_rows may be 0,
// so the `=` in `>=` is needed to cover the scenario when set tiflash replica of small tables to 0.
// (i.e. `actual_delete_range` is not none, but `delete_rows` and `stable_rows` are both 0).
// 2. the disadvantage of `=` in `>=` is that it may trigger an extra gc when write apply snapshot file to an empty segment,
// because before write apply snapshot file, it will write a delete range first, and will meet the following gc criteria.
// But the cost should be really minor because merge delta on an empty segment should be very fast.
// What's more, we can ignore this kind of delete range in future to avoid this extra gc.
bool should_compact = (delete_rows >= stable_rows * ratio_threshold) || (delete_bytes >= stable_bytes * ratio_threshold);
return should_compact;
}
} // namespace GC

UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
{
if (shutdown_called.load(std::memory_order_relaxed))
return 0;

if (!updateGCSafePoint())
return 0;

{
std::shared_lock lock(read_write_mutex);
// avoid gc on empty tables
if (segments.size() == 1)
{
const auto & seg = segments.begin()->second;
if (seg->getEstimatedRows() == 0)
return 0;
}
}

DB::Timestamp gc_safe_point = latest_gc_safe_point.load(std::memory_order_acquire);
LOG_DEBUG(log,
"GC on table " << table_name << " start with key: " << next_gc_check_key.toDebugString()
<< ", gc_safe_point: " << gc_safe_point << ", max gc limit: " << limit);

UInt64 check_segments_num = 0;
Int64 gc_segments_num = 0;
while (gc_segments_num < limit)
{
// If the store is shut down, give up running GC on it.
if (shutdown_called.load(std::memory_order_relaxed))
break;

auto dm_context = newDMContext(global_context, global_context.getSettingsRef());
SegmentPtr segment;
SegmentSnapshotPtr segment_snap;
{
std::shared_lock lock(read_write_mutex);

auto segment_it = segments.upper_bound(next_gc_check_key.toRowKeyValueRef());
if (segment_it == segments.end())
segment_it = segments.begin();

// we have check all segments, stop here
if (check_segments_num >= segments.size())
break;
check_segments_num++;

segment = segment_it->second;
next_gc_check_key = segment_it->first.toRowKeyValue();
segment_snap = segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge);
}

assert(segment != nullptr);
if (segment->hasAbandoned() || segment->getLastCheckGCSafePoint() >= gc_safe_point || segment_snap == nullptr)
continue;

const auto segment_id = segment->segmentId();
RowKeyRange segment_range = segment->getRowKeyRange();

// meet empty segment, try merge it
if (segment_snap->getRows() == 0)
{
checkSegmentUpdate(dm_context, segment, ThreadType::BG_GC);
continue;
}

// Avoid recheck this segment when gc_safe_point doesn't change regardless whether we trigger this segment's DeltaMerge or not.
// Because after we calculate StableProperty and compare it with this gc_safe_point,
// there is no need to recheck it again using the same gc_safe_point.
// On the other hand, if it should do DeltaMerge using this gc_safe_point, and the DeltaMerge is interruptted by other process,
// it's still worth to wait another gc_safe_point to check this segment again.
segment->setLastCheckGCSafePoint(gc_safe_point);
dm_context->min_version = gc_safe_point;

try
{
// Check whether we should apply gc on this segment
const bool should_compact = GC::shouldCompactDeltaWithStable(
*dm_context, segment_snap, segment_range, global_context.getSettingsRef().dt_bg_gc_delta_delete_ratio_to_trigger_gc, log);
bool finish_gc_on_segment = false;
if (should_compact)
{
if (segment = segmentMergeDelta(*dm_context, segment, false, segment_snap); segment)
{
// Continue to check whether we need to apply more tasks on this segment
checkSegmentUpdate(dm_context, segment, ThreadType::BG_GC);
gc_segments_num++;
finish_gc_on_segment = true;
LOG_INFO(log,
"GC-merge-delta done Segment [" << segment_id << "] [range=" << segment_range.toDebugString()
<< "] [table=" << table_name << "]");
}
else
{
LOG_INFO(log,
"GC aborted on Segment [" << segment_id << "] [range=" << segment_range.toDebugString()
<< "] [table=" << table_name << "]");
}
}
if (!finish_gc_on_segment)
LOG_DEBUG(log,
"GC is skipped Segment [" << segment_id << "] [range=" << segment_range.toDebugString()
<< "] [table=" << table_name << "]");
}
catch (Exception & e)
{
e.addMessage("while apply gc Segment [" + DB::toString(segment_id) + "] [range=" + segment_range.toDebugString()
+ "] [table=" + table_name + "]");
e.rethrow();
}
}

LOG_DEBUG(log, "Finish GC on " << gc_segments_num << " segments [table=" + table_name + "]");
return gc_segments_num;
}

SegmentPair DeltaMergeStore::segmentSplit(DMContext & dm_context, const SegmentPtr & segment, bool is_foreground)
{
LOG_DEBUG(log,
Expand Down Expand Up @@ -1577,14 +1731,14 @@ void DeltaMergeStore::segmentMerge(DMContext & dm_context, const SegmentPtr & le
check(dm_context.db_context);
}

SegmentPtr DeltaMergeStore::segmentMergeDelta(DMContext & dm_context, const SegmentPtr & segment, bool is_foreground)
SegmentPtr
DeltaMergeStore::segmentMergeDelta(DMContext & dm_context, const SegmentPtr & segment, bool is_foreground, SegmentSnapshotPtr segment_snap)
{
LOG_DEBUG(log,
(is_foreground ? "Foreground" : "Background")
<< " merge delta, segment [" << segment->segmentId() << "], safe point:" << dm_context.min_version);

SegmentSnapshotPtr segment_snap;
ColumnDefinesPtr schema_snap;
ColumnDefinesPtr schema_snap;
{
std::shared_lock lock(read_write_mutex);

Expand All @@ -1594,7 +1748,10 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(DMContext & dm_context, const Segm
return {};
}

segment_snap = segment->createSnapshot(dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge);
// Try to generate a new snapshot if there is no pre-allocated one
if (!segment_snap)
segment_snap = segment->createSnapshot(dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge);

if (!segment_snap)
{
LOG_DEBUG(log, "Give up merge delta, segment [" << segment->segmentId() << "]");
Expand Down
12 changes: 10 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class DeltaMergeStore : private boost::noncopyable
BG_MergeDelta,
BG_Compact,
BG_Flush,
BG_GC,
};

enum TaskType
Expand Down Expand Up @@ -323,6 +324,9 @@ class DeltaMergeStore : private boost::noncopyable
/// Compact fregment packs into bigger one.
void compact(const Context & context, const RowKeyRange & range);

/// Iterator over all segments and apply gc jobs.
UInt64 onSyncGc(Int64 limit);

/// Apply `commands` on `table_columns`
void applyAlters(const AlterCommands & commands, //
const OptionTableInfoConstRef table_info,
Expand Down Expand Up @@ -367,7 +371,9 @@ class DeltaMergeStore : private boost::noncopyable

SegmentPair segmentSplit(DMContext & dm_context, const SegmentPtr & segment, bool is_foreground);
void segmentMerge(DMContext & dm_context, const SegmentPtr & left, const SegmentPtr & right, bool is_foreground);
SegmentPtr segmentMergeDelta(DMContext & dm_context, const SegmentPtr & segment, bool is_foreground);
SegmentPtr segmentMergeDelta(DMContext & dm_context, const SegmentPtr & segment, bool is_foreground, SegmentSnapshotPtr segment_snap = nullptr);

bool updateGCSafePoint();

bool handleBackgroundTask(bool heavy);

Expand Down Expand Up @@ -417,7 +423,9 @@ class DeltaMergeStore : private boost::noncopyable

MergeDeltaTaskPool background_tasks;

DB::Timestamp latest_gc_safe_point = 0;
std::atomic<DB::Timestamp> latest_gc_safe_point = 0;

RowKeyValue next_gc_check_key;

// Synchronize between write threads and read threads.
mutable std::shared_mutex read_write_mutex;
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ class Segment : private boost::noncopyable
RowsAndBytes
getRowsAndBytesInRange(DMContext & dm_context, const SegmentSnapshotPtr & segment_snap, const RowKeyRange & check_range, bool is_exact);

DB::Timestamp getLastCheckGCSafePoint() { return last_check_gc_safe_point.load(std::memory_order_relaxed); }

void setLastCheckGCSafePoint(DB::Timestamp gc_safe_point) { last_check_gc_safe_point.store(gc_safe_point, std::memory_order_relaxed); }

private:
ReadInfo getReadInfo(const DMContext & dm_context,
const ColumnDefines & read_columns,
Expand Down Expand Up @@ -323,6 +327,8 @@ class Segment : private boost::noncopyable
const PageId segment_id;
const PageId next_segment_id;

std::atomic<DB::Timestamp> last_check_gc_safe_point = 0;

const DeltaValueSpacePtr delta;
const StableValueSpacePtr stable;

Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ StoragePool::StoragePool(const String & name, StoragePathPool & path_pool, const
global_ctx.getTiFlashMetrics()),
max_log_page_id(0),
max_data_page_id(0),
max_meta_page_id(0),
global_context(global_ctx)
max_meta_page_id(0)
{
}

Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Storages/DeltaMerge/StoragePool.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ class StoragePool : private boost::noncopyable
std::atomic<Timepoint> last_try_gc_time = Clock::now();

std::mutex mutex;

const Context & global_context;
};

struct StorageSnapshot
Expand Down
Loading

0 comments on commit 2f8f481

Please sign in to comment.