Skip to content

Commit

Permalink
Do background compaction when the ratio of stable data covered by del…
Browse files Browse the repository at this point in the history
…ete range is too large
  • Loading branch information
lidezhu committed Jan 28, 2022
1 parent 8242cf2 commit 40d5efc
Show file tree
Hide file tree
Showing 13 changed files with 324 additions and 162 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ struct Settings
M(SettingUInt64, dt_bg_gc_check_interval, 600, "Background gc thread check interval, the unit is second.")\
M(SettingInt64, dt_bg_gc_max_segments_to_check_every_round, 15, "Max segments to check in every gc round, value less than or equal to 0 means gc no segments.")\
M(SettingFloat, dt_bg_gc_ratio_threhold_to_trigger_gc, 1.2, "Trigger segment's gc when the ratio of invalid version exceed this threhold. Values smaller than or equal to 1.0 means gc all segments")\
M(SettingFloat, dt_bg_gc_delta_delete_ratio_to_trigger_gc, 0.3, "Trigger segment's gc when the ratio of delta delete range to stable exceeds this ratio.")\
M(SettingUInt64, dt_insert_max_rows, 0, "Max rows of insert blocks when write into DeltaTree Engine. By default 0 means no limit.")\
M(SettingBool, dt_enable_rough_set_filter, true, "Whether to parse where expression as Rough Set Index filter or not.") \
M(SettingBool, dt_raw_filter_range, true, "Do range filter or not when read data in raw mode in DeltaTree Engine.")\
Expand Down Expand Up @@ -362,4 +363,4 @@ struct Settings
};


} // namespace DB
} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ class DeltaValueSnapshot : public std::enable_shared_from_this<DeltaValueSnapsho
size_t getBytes() const { return bytes; }
size_t getDeletes() const { return deletes; }

RowKeyRange getSquashDeleteRange() const;

const auto & getStorageSnapshot() { return storage_snap; }
const auto & getSharedDeltaIndex() { return shared_delta_index; }
};
Expand Down
13 changes: 12 additions & 1 deletion dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,22 @@ DeltaSnapshotPtr DeltaValueSpace::createSnapshot(const DMContext & context, bool
return snap;
}

RowKeyRange DeltaValueSnapshot::getSquashDeleteRange() const
{
RowKeyRange squashed_delete_range = RowKeyRange::newNone(is_common_handle, rowkey_column_size);
for (auto iter = packs.cbegin(); iter != packs.cend(); ++iter)
{
const auto & pack = *iter;
if (auto dp_delete = pack->tryToDeleteRange(); dp_delete)
squashed_delete_range = squashed_delete_range.merge(dp_delete->getDeleteRange());
}
return squashed_delete_range;
}

// ================================================
// DeltaValueReader
// ================================================


DeltaValueReader::DeltaValueReader(const DMContext & context,
const DeltaSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
Expand Down
169 changes: 106 additions & 63 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp

Large diffs are not rendered by default.

48 changes: 27 additions & 21 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ class DeltaMergeStore : private boost::noncopyable

enum TaskRunThread
{
Thread_BG_Thread_Pool,
Thread_FG,
Thread_BG_GC,
BackgroundThreadPool,
Foreground,
BackgroundGCThread,
};

static std::string toString(ThreadType type)
Expand Down Expand Up @@ -204,21 +204,6 @@ class DeltaMergeStore : private boost::noncopyable
}
}

static std::string toString(TaskRunThread type)
{
switch (type)
{
case Thread_BG_Thread_Pool:
return "BackgroundThreadPool";
case Thread_FG:
return "Foreground";
case Thread_BG_GC:
return "BackgroundGCThread";
default:
return "Unknown";
}
}

static std::string toString(TaskType type)
{
switch (type)
Expand All @@ -240,6 +225,21 @@ class DeltaMergeStore : private boost::noncopyable
}
}

static std::string toString(TaskRunThread type)
{
switch (type)
{
case BackgroundThreadPool:
return "BackgroundThreadPool";
case Foreground:
return "Foreground";
case BackgroundGCThread:
return "BackgroundGCThread";
default:
return "Unknown";
}
}

struct BackgroundTask
{
TaskType type;
Expand Down Expand Up @@ -403,7 +403,7 @@ class DeltaMergeStore : private boost::noncopyable
private:
#endif

DMContextPtr newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & query_id="");
DMContextPtr newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & query_id = "");

static bool pkIsHandle(const ColumnDefine & handle_define) { return handle_define.id != EXTRA_HANDLE_COLUMN_ID; }

Expand All @@ -414,13 +414,19 @@ class DeltaMergeStore : private boost::noncopyable

SegmentPair segmentSplit(DMContext & dm_context, const SegmentPtr & segment, bool is_foreground);
void segmentMerge(DMContext & dm_context, const SegmentPtr & left, const SegmentPtr & right, bool is_foreground);
SegmentPtr segmentMergeDelta(DMContext & dm_context, const SegmentPtr & segment, const TaskRunThread thread);
SegmentPtr segmentMergeDelta(DMContext & dm_context,
const SegmentPtr & segment,
const TaskRunThread thread,
SegmentSnapshotPtr segment_snap = nullptr);

bool updateGCSafePoint();

bool handleBackgroundTask(bool heavy);

bool isSegmentValid(const SegmentPtr & segment);
// isSegmentValid should be protected by lock on `read_write_mutex`
inline bool isSegmentValid(std::shared_lock<std::shared_mutex> &, const SegmentPtr & segment) { return doIsSegmentValid(segment); }
inline bool isSegmentValid(std::unique_lock<std::shared_mutex> &, const SegmentPtr & segment) { return doIsSegmentValid(segment); }
bool doIsSegmentValid(const SegmentPtr & segment);

void restoreStableFiles();

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,13 @@ class Segment : private boost::noncopyable
size_t expected_block_size = DEFAULT_BLOCK_SIZE);

/// Return a stream which is suitable for exporting data.
/// reorgize_block: put those rows with the same pk rows into the same block or not.
/// reorganize_block: put those rows with the same pk rows into the same block or not.
BlockInputStreamPtr getInputStreamForDataExport(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRange & data_range,
size_t expected_block_size = DEFAULT_BLOCK_SIZE,
bool reorgnize_block = true) const;
bool reorganize_block = true) const;

BlockInputStreamPtr getInputStreamRaw(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/convertColumnTypeHelpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NOT_IMPLEMENTED;
}

namespace DM
Expand Down
Loading

0 comments on commit 40d5efc

Please sign in to comment.