Skip to content

Commit

Permalink
Do background compaction when the ratio of stable data covered by del…
Browse files Browse the repository at this point in the history
…ete range is too large (#3695)

close #3659
  • Loading branch information
lidezhu authored Apr 12, 2022
1 parent 00f52be commit 1af8ecb
Show file tree
Hide file tree
Showing 13 changed files with 741 additions and 451 deletions.
707 changes: 418 additions & 289 deletions dbms/src/Interpreters/Settings.h

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ class DeltaValueSnapshot : public std::enable_shared_from_this<DeltaValueSnapsho
size_t getBytes() const { return bytes; }
size_t getDeletes() const { return deletes; }

RowKeyRange getSquashDeleteRange() const;

const auto & getStorageSnapshot() { return storage_snap; }
const auto & getSharedDeltaIndex() { return shared_delta_index; }
};
Expand Down
13 changes: 12 additions & 1 deletion dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,22 @@ DeltaSnapshotPtr DeltaValueSpace::createSnapshot(const DMContext & context, bool
return snap;
}

RowKeyRange DeltaValueSnapshot::getSquashDeleteRange() const
{
RowKeyRange squashed_delete_range = RowKeyRange::newNone(is_common_handle, rowkey_column_size);
for (auto iter = packs.cbegin(); iter != packs.cend(); ++iter)
{
const auto & pack = *iter;
if (auto dp_delete = pack->tryToDeleteRange(); dp_delete)
squashed_delete_range = squashed_delete_range.merge(dp_delete->getDeleteRange());
}
return squashed_delete_range;
}

// ================================================
// DeltaValueReader
// ================================================


DeltaValueReader::DeltaValueReader(const DMContext & context,
const DeltaSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
Expand Down
169 changes: 106 additions & 63 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp

Large diffs are not rendered by default.

48 changes: 27 additions & 21 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ class DeltaMergeStore : private boost::noncopyable

enum TaskRunThread
{
Thread_BG_Thread_Pool,
Thread_FG,
Thread_BG_GC,
BackgroundThreadPool,
Foreground,
BackgroundGCThread,
};

static std::string toString(ThreadType type)
Expand Down Expand Up @@ -204,21 +204,6 @@ class DeltaMergeStore : private boost::noncopyable
}
}

static std::string toString(TaskRunThread type)
{
switch (type)
{
case Thread_BG_Thread_Pool:
return "BackgroundThreadPool";
case Thread_FG:
return "Foreground";
case Thread_BG_GC:
return "BackgroundGCThread";
default:
return "Unknown";
}
}

static std::string toString(TaskType type)
{
switch (type)
Expand All @@ -240,6 +225,21 @@ class DeltaMergeStore : private boost::noncopyable
}
}

static std::string toString(TaskRunThread type)
{
switch (type)
{
case BackgroundThreadPool:
return "BackgroundThreadPool";
case Foreground:
return "Foreground";
case BackgroundGCThread:
return "BackgroundGCThread";
default:
return "Unknown";
}
}

struct BackgroundTask
{
TaskType type;
Expand Down Expand Up @@ -403,7 +403,7 @@ class DeltaMergeStore : private boost::noncopyable
private:
#endif

DMContextPtr newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & query_id="");
DMContextPtr newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & query_id = "");

static bool pkIsHandle(const ColumnDefine & handle_define) { return handle_define.id != EXTRA_HANDLE_COLUMN_ID; }

Expand All @@ -414,13 +414,19 @@ class DeltaMergeStore : private boost::noncopyable

SegmentPair segmentSplit(DMContext & dm_context, const SegmentPtr & segment, bool is_foreground);
void segmentMerge(DMContext & dm_context, const SegmentPtr & left, const SegmentPtr & right, bool is_foreground);
SegmentPtr segmentMergeDelta(DMContext & dm_context, const SegmentPtr & segment, const TaskRunThread thread);
SegmentPtr segmentMergeDelta(DMContext & dm_context,
const SegmentPtr & segment,
const TaskRunThread thread,
SegmentSnapshotPtr segment_snap = nullptr);

bool updateGCSafePoint();

bool handleBackgroundTask(bool heavy);

bool isSegmentValid(const SegmentPtr & segment);
// isSegmentValid should be protected by lock on `read_write_mutex`
inline bool isSegmentValid(std::shared_lock<std::shared_mutex> &, const SegmentPtr & segment) { return doIsSegmentValid(segment); }
inline bool isSegmentValid(std::unique_lock<std::shared_mutex> &, const SegmentPtr & segment) { return doIsSegmentValid(segment); }
bool doIsSegmentValid(const SegmentPtr & segment);

void restoreStableFiles();

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,13 @@ class Segment : private boost::noncopyable
size_t expected_block_size = DEFAULT_BLOCK_SIZE);

/// Return a stream which is suitable for exporting data.
/// reorgize_block: put those rows with the same pk rows into the same block or not.
/// reorganize_block: put those rows with the same pk rows into the same block or not.
BlockInputStreamPtr getInputStreamForDataExport(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRange & data_range,
size_t expected_block_size = DEFAULT_BLOCK_SIZE,
bool reorgnize_block = true) const;
bool reorganize_block = true) const;

BlockInputStreamPtr getInputStreamRaw(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/convertColumnTypeHelpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NOT_IMPLEMENTED;
}

namespace DM
Expand Down
Loading

0 comments on commit 1af8ecb

Please sign in to comment.