Skip to content

Commit

Permalink
Revert "background gc thread" (#2015)
Browse files Browse the repository at this point in the history
* Revert "Fix calculate stable property (#1839) (#1878)"

This reverts commit 6a9eb58.

* Revert "Add background gc check thread for DeltaTree storage (#1742) (#1828)"

This reverts commit cbbbd09.

* remove code

* format code

* format code

* fix test compile

* fix comment
  • Loading branch information
lidezhu authored May 28, 2021
1 parent b09e4cc commit c5e51a8
Show file tree
Hide file tree
Showing 44 changed files with 126 additions and 2,489 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ nbproject/*
config-preprocessed.xml

# Protobuf
*.pb.cc
*.pb.cpp
*.pb.h

Expand Down
1 change: 0 additions & 1 deletion dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ target_link_libraries (dbms
kvproto
kv_client
tipb
dtpb
${Protobuf_LIBRARIES}
gRPC::grpc++_unsecure
${CURL_LIBRARIES}
Expand Down
6 changes: 2 additions & 4 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,12 @@ namespace DB
M(tiflash_storage_command_count, "Total number of storage's command, such as delete range / shutdown /startup", Counter, \
F(type_delete_range, {"type", "delete_range"}), F(type_ingest, {"type", "ingest"})) \
M(tiflash_storage_subtask_count, "Total number of storage's sub task", Counter, F(type_delta_merge, {"type", "delta_merge"}), \
F(type_delta_merge_fg, {"type", "delta_merge_fg"}), F(type_delta_merge_bg_gc, {"type", "delta_merge_bg_gc"}), \
F(type_delta_compact, {"type", "delta_compact"}), F(type_delta_flush, {"type", "delta_flush"}), \
F(type_seg_split, {"type", "seg_split"}), F(type_seg_merge, {"type", "seg_merge"}), \
F(type_delta_merge_fg, {"type", "delta_merge_fg"}), F(type_delta_compact, {"type", "delta_compact"}), \
F(type_delta_flush, {"type", "delta_flush"}),F(type_seg_split, {"type", "seg_split"}), F(type_seg_merge, {"type", "seg_merge"}), \
F(type_place_index_update, {"type", "place_index_update"})) \
M(tiflash_storage_subtask_duration_seconds, "Bucketed histogram of storage's sub task duration", Histogram, \
F(type_delta_merge, {{"type", "delta_merge"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_delta_merge_fg, {{"type", "delta_merge_fg"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_delta_merge_bg_gc, {{"type", "delta_merge_bg_gc"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_delta_compact, {{"type", "delta_compact"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_delta_flush, {{"type", "delta_flush"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_seg_split, {{"type", "seg_split"}}, ExpBuckets{0.0005, 2, 20}), \
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Databases/test/gtest_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ DatabasePtr detachThenAttach(Context & ctx, const String & db_name, DatabasePtr
}

db = ctx.getDatabase(db_name);
return db;
return std::move(db);
}

TEST_F(DatabaseTiFlash_test, Tombstone)
Expand Down
3 changes: 0 additions & 3 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,6 @@ struct Settings
M(SettingUInt64, dt_segment_delta_small_pack_size, 524288, "Determine whether a pack in delta is small or not. 512 KB by default.")\
M(SettingUInt64, dt_segment_stable_pack_rows, DEFAULT_MERGE_BLOCK_SIZE, "Expected stable pack rows in DeltaTree Engine.")\
M(SettingFloat, dt_segment_wait_duration_factor, 1, "The factor of wait duration in a write stall.")\
M(SettingUInt64, dt_bg_gc_check_interval, 600, "Background gc thread check interval, the unit is second.")\
M(SettingInt64, dt_bg_gc_max_segments_to_check_every_round, 15, "Max segments to check in every gc round, value less than or equal to 0 means gc no segments.")\
M(SettingFloat, dt_bg_gc_ratio_threhold_to_trigger_gc, 1.2, "Trigger segment's gc when the ratio of invalid version exceed this threhold. Values smaller than or equal to 1.0 means gc all segments")\
M(SettingUInt64, dt_insert_max_rows, 0, "Max rows of insert blocks when write into DeltaTree Engine. By default 0 means no limit.")\
M(SettingBool, dt_enable_rough_set_filter, true, "Whether to parse where expression as Rough Set Index filter or not.") \
M(SettingBool, dt_raw_filter_range, true, "Do range filter or not when read data in raw mode in DeltaTree Engine.")\
Expand Down
8 changes: 0 additions & 8 deletions dbms/src/Storages/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
add_subdirectory (System)
add_subdirectory (Page)
set (save_CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
set (save_CMAKE_C_FLAGS ${CMAKE_C_FLAGS})
set (_save ${ENABLE_TESTS})
set (ENABLE_TESTS 0)
add_subdirectory (DeltaMerge/File/dtpb/cpp)
set (ENABLE_TESTS ${_save})
set (CMAKE_CXX_FLAGS ${save_CMAKE_CXX_FLAGS})
set (CMAKE_C_FLAGS ${save_CMAKE_C_FLAGS})

if (ENABLE_TESTS)
add_subdirectory (tests EXCLUDE_FROM_ALL)
Expand Down
65 changes: 0 additions & 65 deletions dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,35 +176,6 @@ Block DMVersionFilterBlockInputStream<MODE>::read(FilterPtr & res_filter, bool r
}
}

// Let's set effective.
effective.resize(rows);

{
UInt8 * effective_pos = effective.data();
size_t handle_pos = 0;
size_t next_handle_pos = handle_pos + 1;
for (size_t i = 0; i < batch_rows; ++i)
{
(*effective_pos)
= compare(rowkey_column->getRowKeyValue(handle_pos), rowkey_column->getRowKeyValue(next_handle_pos)) != 0;
++effective_pos;
++handle_pos;
++next_handle_pos;
}
}

{
UInt8 * effective_pos = effective.data();
UInt8 * filter_pos = filter.data();
for (size_t i = 0; i < batch_rows; ++i)
{
(*effective_pos) &= (*filter_pos);

++effective_pos;
++filter_pos;
}
}

// Let's set not_clean.
not_clean.resize(rows);

Expand Down Expand Up @@ -245,32 +216,6 @@ Block DMVersionFilterBlockInputStream<MODE>::read(FilterPtr & res_filter, bool r
++filter_pos;
}
}

// Let's calculate gc_hint_version
gc_hint_version = UINT64_MAX;
{
UInt8 * filter_pos = filter.data();
size_t handle_pos = 0;
size_t next_handle_pos = handle_pos + 1;
auto * version_pos = const_cast<UInt64 *>(version_col_data->data());
auto * delete_pos = const_cast<UInt8 *>(delete_col_data->data());
for (size_t i = 0; i < batch_rows; ++i)
{
if (*filter_pos)
gc_hint_version = std::min(gc_hint_version,
calculateRowGcHintVersion(rowkey_column->getRowKeyValue(handle_pos),
*version_pos,
rowkey_column->getRowKeyValue(next_handle_pos),
true,
*delete_pos));

++filter_pos;
++handle_pos;
++next_handle_pos;
++version_pos;
++delete_pos;
}
}
}
else
{
Expand All @@ -296,11 +241,6 @@ Block DMVersionFilterBlockInputStream<MODE>::read(FilterPtr & res_filter, bool r
{
filter[rows - 1] = cur_version >= version_limit || !deleted;
not_clean[rows - 1] = filter[rows - 1] && deleted;
effective[rows - 1] = filter[rows - 1];
if (filter[rows - 1])
gc_hint_version = std::min(
gc_hint_version,
calculateRowGcHintVersion(cur_handle, cur_version, /* just a placeholder */ cur_handle, false, deleted));
}
else
{
Expand All @@ -321,10 +261,6 @@ Block DMVersionFilterBlockInputStream<MODE>::read(FilterPtr & res_filter, bool r
filter[rows - 1] = cur_version >= version_limit
|| ((compare(cur_handle, next_handle) != 0 || next_version > version_limit) && !deleted);
not_clean[rows - 1] = filter[rows - 1] && (compare(cur_handle, next_handle) == 0 || deleted);
effective[rows - 1] = filter[rows - 1] && (compare(cur_handle, next_handle) != 0);
if (filter[rows - 1])
gc_hint_version
= std::min(gc_hint_version, calculateRowGcHintVersion(cur_handle, cur_version, next_handle, true, deleted));
}
else
{
Expand All @@ -338,7 +274,6 @@ Block DMVersionFilterBlockInputStream<MODE>::read(FilterPtr & res_filter, bool r
if constexpr (MODE == DM_VERSION_FILTER_MODE_COMPACT)
{
not_clean_rows += countBytesInFilter(not_clean);
effective_num_rows += countBytesInFilter(effective);
}

++total_blocks;
Expand Down
67 changes: 1 addition & 66 deletions dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
"Total rows: " << total_rows << ", pass: " << DB::toString((Float64)passed_rows * 100 / total_rows, 2)
<< "%, complete pass: " << DB::toString((Float64)complete_passed * 100 / total_blocks, 2)
<< "%, complete not pass: " << DB::toString((Float64)complete_not_passed * 100 / total_blocks, 2)
<< "%, not clean: " << DB::toString((Float64)not_clean_rows * 100 / passed_rows, 2)
<< "%, effective: " << DB::toString((Float64)effective_num_rows * 100 / passed_rows, 2) << "%");
<< "%, not clean: " << DB::toString((Float64)not_clean_rows * 100 / passed_rows, 2) << "%");
}

void readPrefix() override;
Expand All @@ -66,9 +65,7 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream

Block read(FilterPtr & res_filter, bool return_filter) override;

size_t getEffectiveNumRows() const { return effective_num_rows; }
size_t getNotCleanRows() const { return not_clean_rows; }
UInt64 getGCHintVersion() const { return gc_hint_version; }

private:
inline void checkWithNextIndex(size_t i)
Expand All @@ -87,10 +84,6 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
filter[i]
= cur_version >= version_limit || ((compare(cur_handle, next_handle) != 0 || next_version > version_limit) && !deleted);
not_clean[i] = filter[i] && (compare(cur_handle, next_handle) == 0 || deleted);
effective[i] = filter[i] && (compare(cur_handle, next_handle) != 0);
if (filter[i])
gc_hint_version
= std::min(gc_hint_version, calculateRowGcHintVersion(cur_handle, cur_version, next_handle, true, deleted));
}
else
{
Expand Down Expand Up @@ -122,50 +115,6 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
}
}

private:
inline UInt64 calculateRowGcHintVersion(
const RowKeyValueRef & cur_handle, UInt64 cur_version, const RowKeyValueRef & next_handle, bool next_handle_valid, bool deleted)
{
// The rules to calculate gc_hint_version of every pk,
// 1. If the oldest version is delete, then the result is the oldest version.
// 2. Otherwise, if the pk has just a single version, the result is UInt64_MAX(means just ignore this kind of pk).
// 3. Otherwise, the result is the second oldest version.
bool matched = false;
if (is_first_oldest_version && deleted)
{
// rule 1
matched = true;
}
else if (is_second_oldest_version && gc_hint_version_pending)
{
// rule 3
matched = true;
}
gc_hint_version_pending = !matched;

// update status variable for next row if need
if (next_handle_valid)
{
if (compare(cur_handle, next_handle) != 0)
{
is_first_oldest_version = true;
is_second_oldest_version = false;
}
else if (is_first_oldest_version && (compare(cur_handle, next_handle) == 0))
{
is_first_oldest_version = false;
is_second_oldest_version = true;
}
else
{
is_first_oldest_version = false;
is_second_oldest_version = false;
}
}

return matched ? cur_version : UINT64_MAX;
}

private:
UInt64 version_limit;
bool is_common_handle;
Expand All @@ -176,22 +125,9 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
size_t delete_col_pos;

IColumn::Filter filter{};
// effective = selected & handle not equals with next
IColumn::Filter effective{};
// not_clean = selected & (handle equals with next || deleted)
IColumn::Filter not_clean{};

// Calculate per block, when gc_safe_point exceed this version, there must be some data obsolete in this block
// First calculate the gc_hint_version of every pk according to the following rules,
// see the comments in `calculateRowGcHintVersion` to see how to calculate it for every pk
// Then the block's gc_hint_version is the minimum value of all pk's gc_hint_version
UInt64 gc_hint_version;

// auxiliary variable for the calculation of gc_hint_version
bool is_first_oldest_version = true;
bool is_second_oldest_version = false;
bool gc_hint_version_pending = true;

Block raw_block;

//PaddedPODArray<Handle> const * handle_col_data = nullptr;
Expand All @@ -205,7 +141,6 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
size_t complete_passed = 0;
size_t complete_not_passed = 0;
size_t not_clean_rows = 0;
size_t effective_num_rows = 0;

Logger * log;
};
Expand Down
Loading

0 comments on commit c5e51a8

Please sign in to comment.