From c5e51a8283eede0603a76ef80e7fdb8ab8d6031a Mon Sep 17 00:00:00 2001 From: lidezhu <47731263+lidezhu@users.noreply.github.com> Date: Fri, 28 May 2021 15:32:01 +0800 Subject: [PATCH] Revert "background gc thread" (#2015) * Revert "Fix calculate stable property (#1839) (#1878)" This reverts commit 6a9eb585aea4dc53995afbfe2f6ed7765d419a8f. * Revert "Add background gc check thread for DeltaTree storage (#1742) (#1828)" This reverts commit cbbbd09e38bcea87936832a742de79d1f7b1d755. * remove code * format code * format code * fix test compile * fix comment --- .gitignore | 1 - dbms/CMakeLists.txt | 1 - dbms/src/Common/TiFlashMetrics.h | 6 +- dbms/src/Databases/test/gtest_database.cpp | 2 +- dbms/src/Interpreters/Settings.h | 3 - dbms/src/Storages/CMakeLists.txt | 8 - .../DMVersionFilterBlockInputStream.cpp | 65 -- .../DMVersionFilterBlockInputStream.h | 67 +- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 222 +----- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 35 +- dbms/src/Storages/DeltaMerge/File/DMFile.cpp | 131 +-- dbms/src/Storages/DeltaMerge/File/DMFile.h | 38 +- .../DeltaMerge/File/DMFileBlockInputStream.h | 6 +- .../DeltaMerge/File/DMFileBlockOutputStream.h | 3 +- .../Storages/DeltaMerge/File/DMFileReader.cpp | 8 +- .../Storages/DeltaMerge/File/DMFileReader.h | 6 +- .../Storages/DeltaMerge/File/DMFileWriter.cpp | 9 +- .../Storages/DeltaMerge/File/DMFileWriter.h | 10 +- .../DeltaMerge/File/dtpb/cpp/CMakeLists.txt | 26 - .../File/dtpb/cpp/dtpb/dmfile.pb.cc | 750 ------------------ .../DeltaMerge/File/dtpb/cpp/dtpb/dmfile.pb.h | 451 ----------- .../DeltaMerge/File/dtpb/generate_cpp.sh | 6 - .../DeltaMerge/File/dtpb/proto/dmfile.proto | 14 - .../DeltaMerge/SSTFilesToBlockInputStream.cpp | 6 +- .../DeltaMerge/SSTFilesToBlockInputStream.h | 4 +- .../SSTFilesToDTFilesOutputStream.cpp | 16 +- dbms/src/Storages/DeltaMerge/Segment.cpp | 33 +- dbms/src/Storages/DeltaMerge/Segment.h | 6 - .../Storages/DeltaMerge/StableValueSpace.cpp | 124 --- .../Storages/DeltaMerge/StableValueSpace.h | 28 - .../DeltaMerge/tests/gtest_dm_file.cpp | 89 +-- .../DeltaMerge/tests/gtest_dm_segment.cpp | 91 --- .../DeltaMerge/tests/gtest_version_filter.cpp | 202 +---- dbms/src/Storages/GCManager.cpp | 84 -- dbms/src/Storages/GCManager.h | 26 - dbms/src/Storages/IManageableStorage.h | 3 - dbms/src/Storages/StorageDeltaMerge.cpp | 6 - dbms/src/Storages/StorageDeltaMerge.h | 2 - .../Transaction/BackgroundService.cpp | 6 - .../Storages/Transaction/BackgroundService.h | 1 - .../Storages/Transaction/PartitionStreams.cpp | 4 +- dbms/src/Storages/Transaction/TMTContext.cpp | 2 - dbms/src/Storages/Transaction/TMTContext.h | 7 - release-centos7/build/build-tiflash-ci.sh | 7 - 44 files changed, 126 insertions(+), 2489 deletions(-) delete mode 100644 dbms/src/Storages/DeltaMerge/File/dtpb/cpp/CMakeLists.txt delete mode 100644 dbms/src/Storages/DeltaMerge/File/dtpb/cpp/dtpb/dmfile.pb.cc delete mode 100644 dbms/src/Storages/DeltaMerge/File/dtpb/cpp/dtpb/dmfile.pb.h delete mode 100755 dbms/src/Storages/DeltaMerge/File/dtpb/generate_cpp.sh delete mode 100644 dbms/src/Storages/DeltaMerge/File/dtpb/proto/dmfile.proto delete mode 100644 dbms/src/Storages/GCManager.cpp delete mode 100644 dbms/src/Storages/GCManager.h diff --git a/.gitignore b/.gitignore index ebf14c2abc7..a886242b529 100644 --- a/.gitignore +++ b/.gitignore @@ -225,7 +225,6 @@ nbproject/* config-preprocessed.xml # Protobuf -*.pb.cc *.pb.cpp *.pb.h diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index d2e217b0db4..501998feaee 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -178,7 +178,6 @@ target_link_libraries (dbms kvproto kv_client tipb - dtpb ${Protobuf_LIBRARIES} gRPC::grpc++_unsecure ${CURL_LIBRARIES} diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 1a10d8f9ac0..3530fae4009 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -86,14 +86,12 @@ namespace DB M(tiflash_storage_command_count, "Total number of storage's command, such as delete range / shutdown /startup", Counter, \ F(type_delete_range, {"type", "delete_range"}), F(type_ingest, {"type", "ingest"})) \ M(tiflash_storage_subtask_count, "Total number of storage's sub task", Counter, F(type_delta_merge, {"type", "delta_merge"}), \ - F(type_delta_merge_fg, {"type", "delta_merge_fg"}), F(type_delta_merge_bg_gc, {"type", "delta_merge_bg_gc"}), \ - F(type_delta_compact, {"type", "delta_compact"}), F(type_delta_flush, {"type", "delta_flush"}), \ - F(type_seg_split, {"type", "seg_split"}), F(type_seg_merge, {"type", "seg_merge"}), \ + F(type_delta_merge_fg, {"type", "delta_merge_fg"}), F(type_delta_compact, {"type", "delta_compact"}), \ + F(type_delta_flush, {"type", "delta_flush"}),F(type_seg_split, {"type", "seg_split"}), F(type_seg_merge, {"type", "seg_merge"}), \ F(type_place_index_update, {"type", "place_index_update"})) \ M(tiflash_storage_subtask_duration_seconds, "Bucketed histogram of storage's sub task duration", Histogram, \ F(type_delta_merge, {{"type", "delta_merge"}}, ExpBuckets{0.0005, 2, 20}), \ F(type_delta_merge_fg, {{"type", "delta_merge_fg"}}, ExpBuckets{0.0005, 2, 20}), \ - F(type_delta_merge_bg_gc, {{"type", "delta_merge_bg_gc"}}, ExpBuckets{0.0005, 2, 20}), \ F(type_delta_compact, {{"type", "delta_compact"}}, ExpBuckets{0.0005, 2, 20}), \ F(type_delta_flush, {{"type", "delta_flush"}}, ExpBuckets{0.0005, 2, 20}), \ F(type_seg_split, {{"type", "seg_split"}}, ExpBuckets{0.0005, 2, 20}), \ diff --git a/dbms/src/Databases/test/gtest_database.cpp b/dbms/src/Databases/test/gtest_database.cpp index b5f25b49c26..526a18be483 100644 --- a/dbms/src/Databases/test/gtest_database.cpp +++ b/dbms/src/Databases/test/gtest_database.cpp @@ -796,7 +796,7 @@ DatabasePtr detachThenAttach(Context & ctx, const String & db_name, DatabasePtr } db = ctx.getDatabase(db_name); - return db; + return std::move(db); } TEST_F(DatabaseTiFlash_test, Tombstone) diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 9e1aaa40fb2..ab317368eeb 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -259,9 +259,6 @@ struct Settings M(SettingUInt64, dt_segment_delta_small_pack_size, 524288, "Determine whether a pack in delta is small or not. 512 KB by default.")\ M(SettingUInt64, dt_segment_stable_pack_rows, DEFAULT_MERGE_BLOCK_SIZE, "Expected stable pack rows in DeltaTree Engine.")\ M(SettingFloat, dt_segment_wait_duration_factor, 1, "The factor of wait duration in a write stall.")\ - M(SettingUInt64, dt_bg_gc_check_interval, 600, "Background gc thread check interval, the unit is second.")\ - M(SettingInt64, dt_bg_gc_max_segments_to_check_every_round, 15, "Max segments to check in every gc round, value less than or equal to 0 means gc no segments.")\ - M(SettingFloat, dt_bg_gc_ratio_threhold_to_trigger_gc, 1.2, "Trigger segment's gc when the ratio of invalid version exceed this threhold. Values smaller than or equal to 1.0 means gc all segments")\ M(SettingUInt64, dt_insert_max_rows, 0, "Max rows of insert blocks when write into DeltaTree Engine. By default 0 means no limit.")\ M(SettingBool, dt_enable_rough_set_filter, true, "Whether to parse where expression as Rough Set Index filter or not.") \ M(SettingBool, dt_raw_filter_range, true, "Do range filter or not when read data in raw mode in DeltaTree Engine.")\ diff --git a/dbms/src/Storages/CMakeLists.txt b/dbms/src/Storages/CMakeLists.txt index 8cab10d5cbf..42c9842eec3 100644 --- a/dbms/src/Storages/CMakeLists.txt +++ b/dbms/src/Storages/CMakeLists.txt @@ -1,13 +1,5 @@ add_subdirectory (System) add_subdirectory (Page) -set (save_CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS}) -set (save_CMAKE_C_FLAGS ${CMAKE_C_FLAGS}) -set (_save ${ENABLE_TESTS}) -set (ENABLE_TESTS 0) -add_subdirectory (DeltaMerge/File/dtpb/cpp) -set (ENABLE_TESTS ${_save}) -set (CMAKE_CXX_FLAGS ${save_CMAKE_CXX_FLAGS}) -set (CMAKE_C_FLAGS ${save_CMAKE_C_FLAGS}) if (ENABLE_TESTS) add_subdirectory (tests EXCLUDE_FROM_ALL) diff --git a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp index 957c4e028fb..c85cb78d114 100644 --- a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp @@ -176,35 +176,6 @@ Block DMVersionFilterBlockInputStream::read(FilterPtr & res_filter, bool r } } - // Let's set effective. - effective.resize(rows); - - { - UInt8 * effective_pos = effective.data(); - size_t handle_pos = 0; - size_t next_handle_pos = handle_pos + 1; - for (size_t i = 0; i < batch_rows; ++i) - { - (*effective_pos) - = compare(rowkey_column->getRowKeyValue(handle_pos), rowkey_column->getRowKeyValue(next_handle_pos)) != 0; - ++effective_pos; - ++handle_pos; - ++next_handle_pos; - } - } - - { - UInt8 * effective_pos = effective.data(); - UInt8 * filter_pos = filter.data(); - for (size_t i = 0; i < batch_rows; ++i) - { - (*effective_pos) &= (*filter_pos); - - ++effective_pos; - ++filter_pos; - } - } - // Let's set not_clean. not_clean.resize(rows); @@ -245,32 +216,6 @@ Block DMVersionFilterBlockInputStream::read(FilterPtr & res_filter, bool r ++filter_pos; } } - - // Let's calculate gc_hint_version - gc_hint_version = UINT64_MAX; - { - UInt8 * filter_pos = filter.data(); - size_t handle_pos = 0; - size_t next_handle_pos = handle_pos + 1; - auto * version_pos = const_cast(version_col_data->data()); - auto * delete_pos = const_cast(delete_col_data->data()); - for (size_t i = 0; i < batch_rows; ++i) - { - if (*filter_pos) - gc_hint_version = std::min(gc_hint_version, - calculateRowGcHintVersion(rowkey_column->getRowKeyValue(handle_pos), - *version_pos, - rowkey_column->getRowKeyValue(next_handle_pos), - true, - *delete_pos)); - - ++filter_pos; - ++handle_pos; - ++next_handle_pos; - ++version_pos; - ++delete_pos; - } - } } else { @@ -296,11 +241,6 @@ Block DMVersionFilterBlockInputStream::read(FilterPtr & res_filter, bool r { filter[rows - 1] = cur_version >= version_limit || !deleted; not_clean[rows - 1] = filter[rows - 1] && deleted; - effective[rows - 1] = filter[rows - 1]; - if (filter[rows - 1]) - gc_hint_version = std::min( - gc_hint_version, - calculateRowGcHintVersion(cur_handle, cur_version, /* just a placeholder */ cur_handle, false, deleted)); } else { @@ -321,10 +261,6 @@ Block DMVersionFilterBlockInputStream::read(FilterPtr & res_filter, bool r filter[rows - 1] = cur_version >= version_limit || ((compare(cur_handle, next_handle) != 0 || next_version > version_limit) && !deleted); not_clean[rows - 1] = filter[rows - 1] && (compare(cur_handle, next_handle) == 0 || deleted); - effective[rows - 1] = filter[rows - 1] && (compare(cur_handle, next_handle) != 0); - if (filter[rows - 1]) - gc_hint_version - = std::min(gc_hint_version, calculateRowGcHintVersion(cur_handle, cur_version, next_handle, true, deleted)); } else { @@ -338,7 +274,6 @@ Block DMVersionFilterBlockInputStream::read(FilterPtr & res_filter, bool r if constexpr (MODE == DM_VERSION_FILTER_MODE_COMPACT) { not_clean_rows += countBytesInFilter(not_clean); - effective_num_rows += countBytesInFilter(effective); } ++total_blocks; diff --git a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h index 1c14fe55553..1881140d7e9 100644 --- a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h @@ -48,8 +48,7 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream "Total rows: " << total_rows << ", pass: " << DB::toString((Float64)passed_rows * 100 / total_rows, 2) << "%, complete pass: " << DB::toString((Float64)complete_passed * 100 / total_blocks, 2) << "%, complete not pass: " << DB::toString((Float64)complete_not_passed * 100 / total_blocks, 2) - << "%, not clean: " << DB::toString((Float64)not_clean_rows * 100 / passed_rows, 2) - << "%, effective: " << DB::toString((Float64)effective_num_rows * 100 / passed_rows, 2) << "%"); + << "%, not clean: " << DB::toString((Float64)not_clean_rows * 100 / passed_rows, 2) << "%"); } void readPrefix() override; @@ -66,9 +65,7 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream Block read(FilterPtr & res_filter, bool return_filter) override; - size_t getEffectiveNumRows() const { return effective_num_rows; } size_t getNotCleanRows() const { return not_clean_rows; } - UInt64 getGCHintVersion() const { return gc_hint_version; } private: inline void checkWithNextIndex(size_t i) @@ -87,10 +84,6 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream filter[i] = cur_version >= version_limit || ((compare(cur_handle, next_handle) != 0 || next_version > version_limit) && !deleted); not_clean[i] = filter[i] && (compare(cur_handle, next_handle) == 0 || deleted); - effective[i] = filter[i] && (compare(cur_handle, next_handle) != 0); - if (filter[i]) - gc_hint_version - = std::min(gc_hint_version, calculateRowGcHintVersion(cur_handle, cur_version, next_handle, true, deleted)); } else { @@ -122,50 +115,6 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream } } -private: - inline UInt64 calculateRowGcHintVersion( - const RowKeyValueRef & cur_handle, UInt64 cur_version, const RowKeyValueRef & next_handle, bool next_handle_valid, bool deleted) - { - // The rules to calculate gc_hint_version of every pk, - // 1. If the oldest version is delete, then the result is the oldest version. - // 2. Otherwise, if the pk has just a single version, the result is UInt64_MAX(means just ignore this kind of pk). - // 3. Otherwise, the result is the second oldest version. - bool matched = false; - if (is_first_oldest_version && deleted) - { - // rule 1 - matched = true; - } - else if (is_second_oldest_version && gc_hint_version_pending) - { - // rule 3 - matched = true; - } - gc_hint_version_pending = !matched; - - // update status variable for next row if need - if (next_handle_valid) - { - if (compare(cur_handle, next_handle) != 0) - { - is_first_oldest_version = true; - is_second_oldest_version = false; - } - else if (is_first_oldest_version && (compare(cur_handle, next_handle) == 0)) - { - is_first_oldest_version = false; - is_second_oldest_version = true; - } - else - { - is_first_oldest_version = false; - is_second_oldest_version = false; - } - } - - return matched ? cur_version : UINT64_MAX; - } - private: UInt64 version_limit; bool is_common_handle; @@ -176,22 +125,9 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream size_t delete_col_pos; IColumn::Filter filter{}; - // effective = selected & handle not equals with next - IColumn::Filter effective{}; // not_clean = selected & (handle equals with next || deleted) IColumn::Filter not_clean{}; - // Calculate per block, when gc_safe_point exceed this version, there must be some data obsolete in this block - // First calculate the gc_hint_version of every pk according to the following rules, - // see the comments in `calculateRowGcHintVersion` to see how to calculate it for every pk - // Then the block's gc_hint_version is the minimum value of all pk's gc_hint_version - UInt64 gc_hint_version; - - // auxiliary variable for the calculation of gc_hint_version - bool is_first_oldest_version = true; - bool is_second_oldest_version = false; - bool gc_hint_version_pending = true; - Block raw_block; //PaddedPODArray const * handle_col_data = nullptr; @@ -205,7 +141,6 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream size_t complete_passed = 0; size_t complete_not_passed = 0; size_t not_clean_rows = 0; - size_t effective_num_rows = 0; Logger * log; }; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 20598cb4ff2..24435335454 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -161,7 +161,6 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context, original_table_handle_define(handle), background_pool(db_context.getBackgroundPool()), blockable_background_pool(db_context.getBlockableBackgroundPool()), - next_gc_check_key(is_common_handle ? RowKeyValue::COMMON_HANDLE_MIN_KEY : RowKeyValue::INT_HANDLE_MIN_KEY), log(&Logger::get("DeltaMergeStore[" + db_name + "." + table_name + "]")) { LOG_INFO(log, "Restore DeltaMerge Store start [" << db_name << "." << table_name << "]"); @@ -356,7 +355,7 @@ DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB: auto * ctx = new DMContext(db_context.getGlobalContext(), path_pool, storage_pool, - latest_gc_safe_point.load(std::memory_order_acquire), + latest_gc_safe_point, settings.not_compress_columns, is_common_handle, rowkey_column_size, @@ -802,7 +801,7 @@ void DeltaMergeStore::mergeDeltaAll(const Context & context) for (auto & segment : all_segments) { - segmentMergeDelta(*dm_context, segment, TaskRunThread::Thread_FG); + segmentMergeDelta(*dm_context, segment, true); } } @@ -1158,7 +1157,7 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const .Observe(watch.elapsedSeconds()); }); - return segmentMergeDelta(*dm_context, segment, TaskRunThread::Thread_FG); + return segmentMergeDelta(*dm_context, segment, true); } return {}; }; @@ -1255,19 +1254,6 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const try_place_delta_index(); } -bool DeltaMergeStore::updateGCSafePoint() -{ - if (auto pd_client = global_context.getTMTContext().getPDClient(); !pd_client->isMock()) - { - auto safe_point = PDClientHelper::getGCSafePointWithRetry(pd_client, - /* ignore_cache= */ false, - global_context.getSettingsRef().safe_point_update_interval_seconds); - latest_gc_safe_point.store(safe_point, std::memory_order_release); - return true; - } - return false; -} - bool DeltaMergeStore::handleBackgroundTask(bool heavy) { auto task = background_tasks.nextTask(heavy, log); @@ -1275,12 +1261,18 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy) return false; // Update GC safe point before background task - // Foreground task don't get GC safe point from remote, but we better make it as up to date as possible. - if (updateGCSafePoint()) + /// Note that `task.dm_context->db_context` will be free after query is finish. We should not use that in background task. + if (auto pd_client = global_context.getTMTContext().getPDClient(); !pd_client->isMock()) { - /// Note that `task.dm_context->db_context` will be free after query is finish. We should not use that in background task. - task.dm_context->min_version = latest_gc_safe_point.load(std::memory_order_relaxed); - LOG_DEBUG(log, "Task" << toString(task.type) << " GC safe point: " << task.dm_context->min_version); + auto safe_point = PDClientHelper::getGCSafePointWithRetry(pd_client, + /* ignore_cache= */ false, + global_context.getSettingsRef().safe_point_update_interval_seconds); + + LOG_DEBUG(log, "Task" << toString(task.type) << " GC safe point: " << safe_point); + + // Foreground task don't get GC safe point from remote, but we better make it as up to date as possible. + latest_gc_safe_point = safe_point; + task.dm_context->min_version = safe_point; } SegmentPtr left, right; @@ -1299,7 +1291,7 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy) break; case MergeDelta: { FAIL_POINT_PAUSE(FailPoints::pause_before_dt_background_delta_merge); - left = segmentMergeDelta(*task.dm_context, task.segment, TaskRunThread::Thread_BG_Thread_Pool); + left = segmentMergeDelta(*task.dm_context, task.segment, false); type = ThreadType::BG_MergeDelta; // Wake up all waiting threads if failpoint is enabled FailPointHelper::disableFailPoint(FailPoints::pause_until_dt_background_delta_merge); @@ -1346,146 +1338,6 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy) return true; } -namespace GC -{ -// Returns true if it needs gc. -// This is for optimization purpose, does not mean to be accurate. -bool shouldCompact(const SegmentPtr & seg, DB::Timestamp gc_safepoint, double ratio_threshold, Logger * log) -{ - // Always GC. - if (ratio_threshold < 1.0) - return true; - - auto & property = seg->getStable()->getStableProperty(); - LOG_DEBUG(log, property.toDebugString()); - // No data older than safe_point to GC. - if (property.gc_hint_version > gc_safepoint) - return false; - // A lot of MVCC versions to GC. - if (property.num_versions > property.num_rows * ratio_threshold) - return true; - // A lot of non-effective MVCC versions to GC. - if (property.num_versions > property.num_puts * ratio_threshold) - return true; - return false; -} -} // namespace GC - -UInt64 DeltaMergeStore::onSyncGc(Int64 limit) -{ - if (shutdown_called.load(std::memory_order_relaxed)) - return 0; - - if (!updateGCSafePoint()) - return 0; - - { - std::shared_lock lock(read_write_mutex); - // avoid gc on empty tables - if (segments.size() == 1) - { - const auto & seg = segments.begin()->second; - if (seg->getStable()->getRows() == 0) - return 0; - } - } - - DB::Timestamp gc_safe_point = latest_gc_safe_point.load(std::memory_order_acquire); - LOG_DEBUG(log, - "GC on table " << table_name << " start with key: " << next_gc_check_key.toDebugString() - << ", gc_safe_point: " << gc_safe_point); - - UInt64 check_segments_num = 0; - Int64 gc_segments_num = 0; - while (gc_segments_num < limit) - { - SegmentPtr segment; - // If the store is shut down, give up running GC on it. - if (shutdown_called.load(std::memory_order_relaxed)) - break; - { - std::shared_lock lock(read_write_mutex); - - auto segment_it = segments.upper_bound(next_gc_check_key.toRowKeyValueRef()); - if (segment_it == segments.end()) - segment_it = segments.begin(); - - // we have check all segments, stop here - if (check_segments_num >= segments.size()) - break; - check_segments_num++; - - segment = segment_it->second; - next_gc_check_key = segment_it->first.toRowKeyValue(); - } - - if (segment->hasAbandoned()) - continue; - - if (segment->getLastCheckGCSafePoint() >= gc_safe_point) - continue; - - const auto segment_id = segment->segmentId(); - RowKeyRange segment_range = segment->getRowKeyRange(); - if (segment->getDelta()->isUpdating()) - { - LOG_DEBUG(log, - "GC is skipped Segment [" << segment_id << "] [range=" << segment_range.toDebugString() << "] [table=" << table_name - << "]"); - continue; - } - - // Avoid recheck this segment when gc_safe_point doesn't change regardless whether we trigger this segment's DeltaMerge or not. - // Because after we calculate StableProperty and compare it with this gc_safe_point, - // there is no need to recheck it again using the same gc_safe_point. - // On the other hand, if it should do DeltaMerge using this gc_safe_point, and the DeltaMerge is interruptted by other process, - // it's still worth to wait another gc_safe_point to check this segment again. - segment->setLastCheckGCSafePoint(gc_safe_point); - - auto dm_context = newDMContext(global_context, global_context.getSettingsRef()); - dm_context->min_version = gc_safe_point; - // calculate StableProperty if needed - if (!segment->getStable()->isStablePropertyCached()) - segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle()); - - try - { - // Check whether we should apply gc on this segment - const bool should_compact - = GC::shouldCompact(segment, gc_safe_point, global_context.getSettingsRef().dt_bg_gc_ratio_threhold_to_trigger_gc, log); - bool finish_gc_on_segment = false; - if (should_compact) - { - ThreadType type = ThreadType::BG_GC; - segment = segmentMergeDelta(*dm_context, segment, TaskRunThread::Thread_BG_GC); - if (segment) - { - // Continue to check whether we need to apply more tasks on this segment - checkSegmentUpdate(dm_context, segment, type); - gc_segments_num++; - finish_gc_on_segment = true; - LOG_INFO(log, - "GC-merge-delta done Segment [" << segment_id << "] [range=" << segment_range.toDebugString() - << "] [table=" << table_name << "]"); - } - } - if (!finish_gc_on_segment) - LOG_DEBUG(log, - "GC is skipped Segment [" << segment_id << "] [range=" << segment_range.toDebugString() - << "] [table=" << table_name << "]"); - } - catch (Exception & e) - { - e.addMessage("while apply gc Segment [" + DB::toString(segment_id) + "] [range=" + segment_range.toDebugString() - + "] [table=" + table_name + "]"); - e.rethrow(); - } - } - - LOG_DEBUG(log, "Finish GC on " << gc_segments_num << " segments [table=" + table_name + "]"); - return gc_segments_num; -} - SegmentPair DeltaMergeStore::segmentSplit(DMContext & dm_context, const SegmentPtr & segment, bool is_foreground) { LOG_DEBUG(log, @@ -1706,9 +1558,11 @@ void DeltaMergeStore::segmentMerge(DMContext & dm_context, const SegmentPtr & le check(dm_context.db_context); } -SegmentPtr DeltaMergeStore::segmentMergeDelta(DMContext & dm_context, const SegmentPtr & segment, const TaskRunThread run_thread) +SegmentPtr DeltaMergeStore::segmentMergeDelta(DMContext & dm_context, const SegmentPtr & segment, bool is_foreground) { - LOG_DEBUG(log, toString(run_thread) << " merge delta, segment [" << segment->segmentId() << "], safe point:" << dm_context.min_version); + LOG_DEBUG(log, + (is_foreground ? "Foreground" : "Background") + << " merge delta, segment [" << segment->segmentId() << "], safe point:" << dm_context.min_version); SegmentSnapshotPtr segment_snap; ColumnDefinesPtr schema_snap; @@ -1738,46 +1592,24 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(DMContext & dm_context, const Segm CurrentMetrics::Increment cur_dm_total_bytes{CurrentMetrics::DT_DeltaMergeTotalBytes, (Int64)segment_snap->getBytes()}; CurrentMetrics::Increment cur_dm_total_rows{CurrentMetrics::DT_DeltaMergeTotalRows, (Int64)segment_snap->getRows()}; - switch (run_thread) - { - case TaskRunThread::Thread_BG_Thread_Pool: - GET_METRIC(dm_context.metrics, tiflash_storage_subtask_count, type_delta_merge).Increment(); - break; - case TaskRunThread::Thread_FG: + if (is_foreground) GET_METRIC(dm_context.metrics, tiflash_storage_subtask_count, type_delta_merge_fg).Increment(); - break; - case TaskRunThread::Thread_BG_GC: - GET_METRIC(dm_context.metrics, tiflash_storage_subtask_count, type_delta_merge_bg_gc).Increment(); - break; - default: - break; - } + else + GET_METRIC(dm_context.metrics, tiflash_storage_subtask_count, type_delta_merge).Increment(); Stopwatch watch_delta_merge; SCOPE_EXIT({ - switch (run_thread) - { - case TaskRunThread::Thread_BG_Thread_Pool: - GET_METRIC(dm_context.metrics, tiflash_storage_subtask_duration_seconds, type_delta_merge) - .Observe(watch_delta_merge.elapsedSeconds()); - break; - case TaskRunThread::Thread_FG: + if (is_foreground) GET_METRIC(dm_context.metrics, tiflash_storage_subtask_duration_seconds, type_delta_merge_fg) .Observe(watch_delta_merge.elapsedSeconds()); - break; - case TaskRunThread::Thread_BG_GC: - GET_METRIC(dm_context.metrics, tiflash_storage_subtask_duration_seconds, type_delta_merge_bg_gc) + else + GET_METRIC(dm_context.metrics, tiflash_storage_subtask_duration_seconds, type_delta_merge) .Observe(watch_delta_merge.elapsedSeconds()); - break; - default: - break; - } }); - bool need_rate_limit = (run_thread != TaskRunThread::Thread_FG); - WriteBatches wbs(storage_pool, need_rate_limit ? dm_context.db_context.getRateLimiter() : nullptr); + WriteBatches wbs(storage_pool, is_foreground ? nullptr : dm_context.db_context.getRateLimiter()); - auto new_stable = segment->prepareMergeDelta(dm_context, schema_snap, segment_snap, wbs, need_rate_limit); + auto new_stable = segment->prepareMergeDelta(dm_context, schema_snap, segment_snap, wbs, !is_foreground); wbs.writeLogAndData(); new_stable->enableDMFilesGC(); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 38af1a8d115..5cd212a00fb 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -151,7 +151,6 @@ class DeltaMergeStore : private boost::noncopyable BG_MergeDelta, BG_Compact, BG_Flush, - BG_GC, }; enum TaskType @@ -164,13 +163,6 @@ class DeltaMergeStore : private boost::noncopyable PlaceIndex, }; - enum TaskRunThread - { - Thread_BG_Thread_Pool, - Thread_FG, - Thread_BG_GC, - }; - static std::string toString(ThreadType type) { switch (type) @@ -191,23 +183,6 @@ class DeltaMergeStore : private boost::noncopyable return "BG_Compact"; case BG_Flush: return "BG_Flush"; - case BG_GC: - return "BG_GC"; - default: - return "Unknown"; - } - } - - static std::string toString(TaskRunThread type) - { - switch (type) - { - case Thread_BG_Thread_Pool: - return "BackgroundThreadPool"; - case Thread_FG: - return "Foreground"; - case Thread_BG_GC: - return "BackgroundGCThread"; default: return "Unknown"; } @@ -368,8 +343,6 @@ class DeltaMergeStore : private boost::noncopyable bool isCommonHandle() const { return is_common_handle; } size_t getRowKeyColumnSize() const { return rowkey_column_size; } - UInt64 onSyncGc(Int64 limit); - public: /// Methods mainly used by region split. @@ -394,9 +367,7 @@ class DeltaMergeStore : private boost::noncopyable SegmentPair segmentSplit(DMContext & dm_context, const SegmentPtr & segment, bool is_foreground); void segmentMerge(DMContext & dm_context, const SegmentPtr & left, const SegmentPtr & right, bool is_foreground); - SegmentPtr segmentMergeDelta(DMContext & dm_context, const SegmentPtr & segment, const TaskRunThread thread); - - bool updateGCSafePoint(); + SegmentPtr segmentMergeDelta(DMContext & dm_context, const SegmentPtr & segment, bool is_foreground); bool handleBackgroundTask(bool heavy); @@ -446,9 +417,7 @@ class DeltaMergeStore : private boost::noncopyable MergeDeltaTaskPool background_tasks; - std::atomic latest_gc_safe_point = 0; - - RowKeyValue next_gc_check_key; + DB::Timestamp latest_gc_safe_point = 0; // Synchronize between write threads and read threads. mutable std::shared_mutex read_write_mutex; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp index ca2ebd6203f..4d3af9f6142 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp @@ -117,7 +117,7 @@ DMFilePtr DMFile::restore(const FileProviderPtr & file_provider, UInt64 file_id, DMFilePtr dmfile(new DMFile( file_id, ref_id, parent_path, single_file_mode ? Mode::SINGLE_FILE : Mode::FOLDER, Status::READABLE, &Logger::get("DMFile"))); if (read_meta) - dmfile->readMetadata(file_provider); + dmfile->readMeta(file_provider); return dmfile; } @@ -183,20 +183,15 @@ const EncryptionPath DMFile::encryptionMarkPath(const FileNameBase & file_name_b const EncryptionPath DMFile::encryptionMetaPath() const { - return EncryptionPath(encryptionBasePath(), isSingleFileMode() ? "" : metaFileName()); + return EncryptionPath(encryptionBasePath(), isSingleFileMode() ? "" : "meta.txt"); } const EncryptionPath DMFile::encryptionPackStatPath() const { - return EncryptionPath(encryptionBasePath(), isSingleFileMode() ? "" : packStatFileName()); + return EncryptionPath(encryptionBasePath(), isSingleFileMode() ? "" : "pack"); } -const EncryptionPath DMFile::encryptionPackPropertyPath() const -{ - return EncryptionPath(encryptionBasePath(), isSingleFileMode() ? "" : packPropertyFileName()); -} - -DMFile::OffsetAndSize DMFile::writeMetaToBuffer(WriteBuffer & buffer) +std::tuple DMFile::writeMeta(WriteBuffer & buffer) { size_t meta_offset = buffer.count(); writeString("DTFile format: ", buffer); @@ -207,7 +202,7 @@ DMFile::OffsetAndSize DMFile::writeMetaToBuffer(WriteBuffer & buffer) return std::make_tuple(meta_offset, meta_size); } -DMFile::OffsetAndSize DMFile::writePackStatToBuffer(WriteBuffer & buffer) +std::tuple DMFile::writePack(WriteBuffer & buffer) { size_t pack_offset = buffer.count(); for (auto & stat : pack_stats) @@ -218,16 +213,6 @@ DMFile::OffsetAndSize DMFile::writePackStatToBuffer(WriteBuffer & buffer) return std::make_tuple(pack_offset, pack_size); } -DMFile::OffsetAndSize DMFile::writePackPropertyToBuffer(WriteBuffer & buffer) -{ - size_t offset = buffer.count(); - String tmp_buf; - pack_properties.SerializeToString(&tmp_buf); - writeStringBinary(tmp_buf, buffer); - size_t size = buffer.count() - offset; - return std::make_tuple(offset, size); -} - void DMFile::writeMeta(const FileProviderPtr & file_provider, const RateLimiterPtr & rate_limiter) { String meta_path = metaPath(); @@ -235,30 +220,12 @@ void DMFile::writeMeta(const FileProviderPtr & file_provider, const RateLimiterP { WriteBufferFromFileProvider buf(file_provider, tmp_meta_path, encryptionMetaPath(), false, rate_limiter, 4096); - writeMetaToBuffer(buf); + writeMeta(buf); buf.sync(); } Poco::File(tmp_meta_path).renameTo(meta_path); } -void DMFile::writePackProperty(const FileProviderPtr & file_provider, const RateLimiterPtr & rate_limiter) -{ - String property_path = packPropertyPath(); - String tmp_property_path = property_path + ".tmp"; - { - WriteBufferFromFileProvider buf(file_provider, tmp_property_path, encryptionPackPropertyPath(), false, rate_limiter, 4096); - writePackPropertyToBuffer(buf); - buf.sync(); - } - Poco::File(tmp_property_path).renameTo(property_path); -} - -void DMFile::writeMetadata(const FileProviderPtr & file_provider, const RateLimiterPtr & rate_limiter) -{ - writePackProperty(file_provider, rate_limiter); - writeMeta(file_provider, rate_limiter); -} - void DMFile::upgradeMetaIfNeed(const FileProviderPtr & file_provider, DMFileFormat::Version ver) { if (unlikely(mode != Mode::FOLDER)) @@ -292,42 +259,7 @@ void DMFile::upgradeMetaIfNeed(const FileProviderPtr & file_provider, DMFileForm } } -void DMFile::readMeta(const FileProviderPtr & file_provider, const MetaPackInfo & meta_pack_info) -{ - auto buf = openForRead(file_provider, metaPath(), encryptionMetaPath(), meta_pack_info.meta_size); - buf.seek(meta_pack_info.meta_offset); - - DMFileFormat::Version ver; // Binary version - assertString("DTFile format: ", buf); - DB::readText(ver, buf); - assertString("\n", buf); - readText(column_stats, ver, buf); - // No need to upgrade meta when mode is Mode::SINGLE_FILE - if (mode == Mode::FOLDER) - { - upgradeMetaIfNeed(file_provider, ver); - } -} - -void DMFile::readPackStat(const FileProviderPtr & file_provider, const MetaPackInfo & meta_pack_info) -{ - size_t packs = meta_pack_info.pack_stat_size / sizeof(PackStat); - pack_stats.resize(packs); - auto buf = openForRead(file_provider, packStatPath(), encryptionPackStatPath(), meta_pack_info.pack_stat_size); - buf.seek(meta_pack_info.pack_stat_offset); - buf.readStrict((char *)pack_stats.data(), sizeof(PackStat) * packs); -} - -void DMFile::readPackProperty(const FileProviderPtr & file_provider, const MetaPackInfo & meta_pack_info) -{ - String tmp_buf; - auto buf = openForRead(file_provider, packPropertyPath(), encryptionPackPropertyPath(), meta_pack_info.pack_property_size); - buf.seek(meta_pack_info.pack_property_offset); - readStringBinary(tmp_buf, buf); - pack_properties.ParseFromString(tmp_buf); -} - -void DMFile::readMetadata(const FileProviderPtr & file_provider) +void DMFile::readMeta(const FileProviderPtr & file_provider) { Footer footer; if (isSingleFileMode()) @@ -336,16 +268,15 @@ void DMFile::readMetadata(const FileProviderPtr & file_provider) /// TODO: Redesign the file format for single file mode (https://github.com/pingcap/tics/issues/1798) Poco::File file(path()); ReadBufferFromFileProvider buf(file_provider, path(), EncryptionPath(encryptionBasePath(), "")); - buf.seek(file.getSize() - sizeof(Footer), SEEK_SET); - DB::readIntBinary(footer.meta_pack_info.pack_property_offset, buf); - DB::readIntBinary(footer.meta_pack_info.pack_property_size, buf); DB::readIntBinary(footer.meta_pack_info.meta_offset, buf); DB::readIntBinary(footer.meta_pack_info.meta_size, buf); DB::readIntBinary(footer.meta_pack_info.pack_stat_offset, buf); DB::readIntBinary(footer.meta_pack_info.pack_stat_size, buf); + DB::readIntBinary(footer.sub_file_stat_offset, buf); DB::readIntBinary(footer.sub_file_num, buf); + // initialize sub file state buf.seek(footer.sub_file_stat_offset, SEEK_SET); SubFileStat sub_file_stat; @@ -360,23 +291,38 @@ void DMFile::readMetadata(const FileProviderPtr & file_provider) } else { - if (auto file = Poco::File(packPropertyPath()); file.exists()) - footer.meta_pack_info.pack_property_size = file.getSize(); - footer.meta_pack_info.meta_size = Poco::File(metaPath()).getSize(); footer.meta_pack_info.pack_stat_size = Poco::File(packStatPath()).getSize(); } - if (footer.meta_pack_info.pack_property_size != 0) - readPackProperty(file_provider, footer.meta_pack_info); + { + auto buf = openForRead(file_provider, metaPath(), encryptionMetaPath(), footer.meta_pack_info.meta_size); + buf.seek(footer.meta_pack_info.meta_offset); + + DMFileFormat::Version ver; // Binary version + assertString("DTFile format: ", buf); + DB::readText(ver, buf); + assertString("\n", buf); + readText(column_stats, ver, buf); + // No need to upgrade meta when mode is Mode::SINGLE_FILE + if (mode == Mode::FOLDER) + { + upgradeMetaIfNeed(file_provider, ver); + } + } - readMeta(file_provider, footer.meta_pack_info); - readPackStat(file_provider, footer.meta_pack_info); + { + size_t packs = footer.meta_pack_info.pack_stat_size / sizeof(PackStat); + pack_stats.resize(packs); + auto buf = openForRead(file_provider, packStatPath(), encryptionPackStatPath(), footer.meta_pack_info.pack_stat_size); + buf.seek(footer.meta_pack_info.pack_stat_offset); + buf.readStrict((char *)pack_stats.data(), sizeof(PackStat) * packs); + } } void DMFile::finalizeForFolderMode(const FileProviderPtr & file_provider, const RateLimiterPtr & rate_limiter) { - writeMetadata(file_provider, rate_limiter); + writeMeta(file_provider, rate_limiter); if (unlikely(status != Status::WRITING)) throw Exception("Expected WRITING status, now " + statusString(status)); Poco::File old_file(path()); @@ -401,20 +347,17 @@ void DMFile::finalizeForFolderMode(const FileProviderPtr & file_provider, const void DMFile::finalizeForSingleFileMode(WriteBuffer & buffer) { Footer footer; - std::tie(footer.meta_pack_info.pack_property_offset, footer.meta_pack_info.pack_property_size) = writePackPropertyToBuffer(buffer); - std::tie(footer.meta_pack_info.meta_offset, footer.meta_pack_info.meta_size) = writeMetaToBuffer(buffer); - std::tie(footer.meta_pack_info.pack_stat_offset, footer.meta_pack_info.pack_stat_size) = writePackStatToBuffer(buffer); - - footer.sub_file_stat_offset = buffer.count(); - footer.sub_file_num = sub_file_stats.size(); + std::tie(footer.meta_pack_info.meta_offset, footer.meta_pack_info.meta_size) = writeMeta(buffer); + std::tie(footer.meta_pack_info.pack_stat_offset, footer.meta_pack_info.pack_stat_size) = writePack(buffer); + footer.sub_file_stat_offset = buffer.count(); + footer.sub_file_num = sub_file_stats.size(); + footer.file_format_version = DMSingleFileFormatVersion::SINGLE_FILE_VERSION_BASE; for (auto & iter : sub_file_stats) { writeStringBinary(iter.first, buffer); writeIntBinary(iter.second.offset, buffer); writeIntBinary(iter.second.size, buffer); } - writeIntBinary(footer.meta_pack_info.pack_property_offset, buffer); - writeIntBinary(footer.meta_pack_info.pack_property_size, buffer); writeIntBinary(footer.meta_pack_info.meta_offset, buffer); writeIntBinary(footer.meta_pack_info.meta_size, buffer); writeIntBinary(footer.meta_pack_info.pack_stat_offset, buffer); diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.h b/dbms/src/Storages/DeltaMerge/File/DMFile.h index eb4fe945b2c..d09815b4505 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.h @@ -1,10 +1,5 @@ #pragma once -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wunused-parameter" -#include -#pragma GCC diagnostic pop - #include #include #include @@ -81,17 +76,12 @@ class DMFile : private boost::noncopyable struct MetaPackInfo { - UInt64 pack_property_offset; - UInt64 pack_property_size; UInt64 meta_offset; UInt64 meta_size; UInt64 pack_stat_offset; UInt64 pack_stat_size; - MetaPackInfo() - : pack_property_offset(0), pack_property_size(0), meta_offset(0), meta_size(0), pack_stat_offset(0), pack_stat_size(0) - { - } + MetaPackInfo() : meta_offset(0), meta_size(0), pack_stat_offset(0), pack_stat_size(0) {} }; struct Footer @@ -112,8 +102,6 @@ class DMFile : private boost::noncopyable }; using PackStats = PaddedPODArray; - // `PackProperties` is similar to `PackStats` except it uses protobuf to do serialization - using PackProperties = dtpb::PackProperties; static DMFilePtr create(UInt64 file_id, const String & parent_path, bool single_file_mode = false); @@ -165,7 +153,6 @@ class DMFile : private boost::noncopyable size_t getPacks() const { return pack_stats.size(); } const PackStats & getPackStats() const { return pack_stats; } - PackProperties & getPackProperties() { return pack_properties; } const ColumnStat & getColumnStat(ColId col_id) const { @@ -196,7 +183,6 @@ class DMFile : private boost::noncopyable String ngcPath() const; String metaPath() const { return subFilePath(metaFileName()); } String packStatPath() const { return subFilePath(packStatFileName()); } - String packPropertyPath() const { return subFilePath(packPropertyFileName()); } using FileNameBase = String; String colDataPath(const FileNameBase & file_name_base) const { return subFilePath(colDataFileName(file_name_base)); } @@ -220,7 +206,6 @@ class DMFile : private boost::noncopyable const EncryptionPath encryptionMarkPath(const FileNameBase & file_name_base) const; const EncryptionPath encryptionMetaPath() const; const EncryptionPath encryptionPackStatPath() const; - const EncryptionPath encryptionPackPropertyPath() const; static FileNameBase getFileNameBase(ColId col_id, const IDataType::SubstreamPath & substream = {}) { @@ -229,25 +214,15 @@ class DMFile : private boost::noncopyable static String metaFileName() { return "meta.txt"; } static String packStatFileName() { return "pack"; } - static String packPropertyFileName() { return "property"; } - static String colDataFileName(const FileNameBase & file_name_base) { return file_name_base + ".dat"; } static String colIndexFileName(const FileNameBase & file_name_base) { return file_name_base + ".idx"; } static String colMarkFileName(const FileNameBase & file_name_base) { return file_name_base + ".mrk"; } - using OffsetAndSize = std::tuple; - OffsetAndSize writeMetaToBuffer(WriteBuffer & buffer); - OffsetAndSize writePackStatToBuffer(WriteBuffer & buffer); - OffsetAndSize writePackPropertyToBuffer(WriteBuffer & buffer); + std::tuple writeMeta(WriteBuffer & buffer); + std::tuple writePack(WriteBuffer & buffer); void writeMeta(const FileProviderPtr & file_provider, const RateLimiterPtr & rate_limiter); - void writePackProperty(const FileProviderPtr & file_provider, const RateLimiterPtr & rate_limiter); - void readMeta(const FileProviderPtr & file_provider, const MetaPackInfo & meta_pack_info); - void readPackStat(const FileProviderPtr & file_provider, const MetaPackInfo & meta_pack_info); - void readPackProperty(const FileProviderPtr & file_provider, const MetaPackInfo & meta_pack_info); - - void writeMetadata(const FileProviderPtr & file_provider, const RateLimiterPtr & rate_limiter); - void readMetadata(const FileProviderPtr & file_provider); + void readMeta(const FileProviderPtr & file_provider); void upgradeMetaIfNeed(const FileProviderPtr & file_provider, DMFileFormat::Version ver); @@ -279,9 +254,8 @@ class DMFile : private boost::noncopyable UInt64 ref_id; // It is a reference to file_id, could be the same. String parent_path; - PackStats pack_stats; - PackProperties pack_properties; - ColumnStats column_stats; + PackStats pack_stats; + ColumnStats column_stats; Mode mode; Status status; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h index a08a56e92a9..d3197b2db19 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h @@ -21,8 +21,7 @@ class DMFileBlockInputStream : public SkippableBlockInputStream const RSOperatorPtr & filter, const ColumnCachePtr & column_cache_, const IdSetPtr & read_packs, - size_t expected_size = DMFILE_READ_ROWS_THRESHOLD, - bool read_one_pack_every_time_ = false) + size_t expected_size = DMFILE_READ_ROWS_THRESHOLD) : reader(dmfile, read_columns, // clean read @@ -40,8 +39,7 @@ class DMFileBlockInputStream : public SkippableBlockInputStream context.getSettingsRef().min_bytes_to_use_direct_io, context.getSettingsRef().max_read_buffer_size, context.getFileProvider(), - expected_size, - read_one_pack_every_time_) + expected_size) { } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockOutputStream.h b/dbms/src/Storages/DeltaMerge/File/DMFileBlockOutputStream.h index 1ec2aaa7b32..92893fe1581 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockOutputStream.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockOutputStream.h @@ -40,8 +40,7 @@ class DMFileBlockOutputStream const DMFilePtr getFile() const { return writer.getFile(); } - using BlockProperty = DMFileWriter::BlockProperty; - void write(const Block & block, const BlockProperty & block_property) { writer.write(block, block_property); } + void write(const Block & block, size_t not_clean_rows) { writer.write(block, not_clean_rows); } void writePrefix() {} diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 47d26539ae0..cb731beb042 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -153,8 +153,7 @@ DMFileReader::DMFileReader(const DMFilePtr & dmfile_, size_t aio_threshold, size_t max_read_buffer_size, const FileProviderPtr & file_provider_, - size_t rows_threshold_per_read_, - bool read_one_pack_every_time_) + size_t rows_threshold_per_read_) : dmfile(dmfile_), read_columns(read_columns_), enable_clean_read(enable_clean_read_), @@ -169,7 +168,6 @@ DMFileReader::DMFileReader(const DMFilePtr & dmfile_, column_cache(column_cache_), rows_threshold_per_read(rows_threshold_per_read_), file_provider(file_provider_), - read_one_pack_every_time(read_one_pack_every_time_), single_file_mode(dmfile_->isSingleFileMode()), log(&Logger::get("DMFileReader")) { @@ -240,9 +238,9 @@ Block DMFileReader::read() // Find max continuing rows we can read. size_t start_pack_id = next_pack_id; - // When single_file_mode is true, or read_one_pack_every_time is true, we can just read one pack every time. + // When single_file_mode is true, we can just read one pack every time. // 0 means no limit - size_t read_pack_limit = (single_file_mode || read_one_pack_every_time) ? 1 : 0; + size_t read_pack_limit = single_file_mode ? 1 : 0; auto & pack_stats = dmfile->getPackStats(); size_t read_rows = 0; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index 146426aad52..c1eb485f2d7 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -72,8 +72,7 @@ class DMFileReader size_t aio_threshold, size_t max_read_buffer_size, const FileProviderPtr & file_provider_, - size_t rows_threshold_per_read_ = DMFILE_READ_ROWS_THRESHOLD, - bool read_one_pack_every_time_ = false); + size_t rows_threshold_per_read_ = DMFILE_READ_ROWS_THRESHOLD); Block getHeader() const { return toEmptyBlock(read_columns); } @@ -122,9 +121,6 @@ class DMFileReader FileProviderPtr file_provider; - // read_one_pack_every_time is used to create info for every pack - const bool read_one_pack_every_time; - const bool single_file_mode; Logger * log; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp index 1ad99630840..c2c874043b0 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp @@ -85,11 +85,11 @@ void DMFileWriter::addStreams(ColId col_id, DataTypePtr type, bool do_index) } -void DMFileWriter::write(const Block & block, const BlockProperty & block_property) +void DMFileWriter::write(const Block & block, size_t not_clean_rows) { DMFile::PackStat stat; stat.rows = block.rows(); - stat.not_clean = block_property.not_clean_rows; + stat.not_clean = not_clean_rows; stat.bytes = block.bytes(); // This is bytes of pack data in memory. auto del_mark_column = tryGetByColumnId(block, TAG_COLUMN_ID).column; @@ -113,11 +113,6 @@ void DMFileWriter::write(const Block & block, const BlockProperty & block_proper } dmfile->addPack(stat); - - auto & properties = dmfile->getPackProperties(); - auto * property = properties.add_property(); - property->set_num_rows(block_property.effective_num_rows); - property->set_gc_hint_version(block_property.gc_hint_version); } void DMFileWriter::finalize() diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h index 4f4a0332857..e4be0b60de2 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h @@ -129,13 +129,6 @@ class DMFileWriter }; using SingleFileStreamPtr = std::shared_ptr; - struct BlockProperty - { - size_t not_clean_rows; - size_t effective_num_rows; - size_t gc_hint_version; - }; - struct Flags { private: @@ -180,7 +173,6 @@ class DMFileWriter } }; - public: DMFileWriter(const DMFilePtr & dmfile_, const ColumnDefines & write_columns_, @@ -188,7 +180,7 @@ class DMFileWriter const RateLimiterPtr & rate_limiter_, const Options & options_); - void write(const Block & block, const BlockProperty & block_property); + void write(const Block & block, size_t not_clean_rows); void finalize(); const DMFilePtr getFile() const { return dmfile; } diff --git a/dbms/src/Storages/DeltaMerge/File/dtpb/cpp/CMakeLists.txt b/dbms/src/Storages/DeltaMerge/File/dtpb/cpp/CMakeLists.txt deleted file mode 100644 index f34cf2644e5..00000000000 --- a/dbms/src/Storages/DeltaMerge/File/dtpb/cpp/CMakeLists.txt +++ /dev/null @@ -1,26 +0,0 @@ -cmake_minimum_required (VERSION 2.8) - -set (CMAKE_CXX_STANDARD 17) - -if (CMAKE_VERSION VERSION_LESS "3.8.0") - set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++1z") -else () - set (CMAKE_CXX_STANDARD 17) - set (CMAKE_CXX_EXTENSIONS 0) # https://cmake.org/cmake/help/latest/prop_tgt/CXX_EXTENSIONS.html#prop_tgt:CXX_EXTENSIONS - set (CMAKE_CXX_STANDARD_REQUIRED ON) - set (CXX_FLAGS_INTERNAL_COMPILER "-std=c++1z") - set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++17") -endif () - -if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/dbms/src/Storages/DeltaMerge/File/dtpb/cpp/dtpb/dmfile.pb.h") - message (FATAL_ERROR "dmfile cpp files in DeltaMerge/File/dtpb/cpp/dtpb is missing. Try go to DeltaMerge/File/dtpb, and run ./generate_cpp.sh") -endif () - -file(GLOB __tmp RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} dtpb/*.cc dtpb/*.h) -list(APPEND generated_files ${__tmp}) - -set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-unused-parameter") -set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-parameter") - -add_library(dtpb ${generated_files}) -target_include_directories(dtpb PUBLIC ./) diff --git a/dbms/src/Storages/DeltaMerge/File/dtpb/cpp/dtpb/dmfile.pb.cc b/dbms/src/Storages/DeltaMerge/File/dtpb/cpp/dtpb/dmfile.pb.cc deleted file mode 100644 index 991ee5cd6ec..00000000000 --- a/dbms/src/Storages/DeltaMerge/File/dtpb/cpp/dtpb/dmfile.pb.cc +++ /dev/null @@ -1,750 +0,0 @@ -// Generated by the protocol buffer compiler. DO NOT EDIT! -// source: dmfile.proto - -#include "dmfile.pb.h" - -#include - -#include -#include -#include -#include -#include -#include -#include -#include -// @@protoc_insertion_point(includes) -#include -extern PROTOBUF_INTERNAL_EXPORT_dmfile_2eproto ::PROTOBUF_NAMESPACE_ID::internal::SCCInfo<0> scc_info_PackProperty_dmfile_2eproto; -namespace dtpb { -class PackPropertyDefaultTypeInternal { - public: - ::PROTOBUF_NAMESPACE_ID::internal::ExplicitlyConstructed _instance; -} _PackProperty_default_instance_; -class PackPropertiesDefaultTypeInternal { - public: - ::PROTOBUF_NAMESPACE_ID::internal::ExplicitlyConstructed _instance; -} _PackProperties_default_instance_; -} // namespace dtpb -static void InitDefaultsscc_info_PackProperties_dmfile_2eproto() { - GOOGLE_PROTOBUF_VERIFY_VERSION; - - { - void* ptr = &::dtpb::_PackProperties_default_instance_; - new (ptr) ::dtpb::PackProperties(); - ::PROTOBUF_NAMESPACE_ID::internal::OnShutdownDestroyMessage(ptr); - } - ::dtpb::PackProperties::InitAsDefaultInstance(); -} - -::PROTOBUF_NAMESPACE_ID::internal::SCCInfo<1> scc_info_PackProperties_dmfile_2eproto = - {{ATOMIC_VAR_INIT(::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase::kUninitialized), 1, InitDefaultsscc_info_PackProperties_dmfile_2eproto}, { - &scc_info_PackProperty_dmfile_2eproto.base,}}; - -static void InitDefaultsscc_info_PackProperty_dmfile_2eproto() { - GOOGLE_PROTOBUF_VERIFY_VERSION; - - { - void* ptr = &::dtpb::_PackProperty_default_instance_; - new (ptr) ::dtpb::PackProperty(); - ::PROTOBUF_NAMESPACE_ID::internal::OnShutdownDestroyMessage(ptr); - } - ::dtpb::PackProperty::InitAsDefaultInstance(); -} - -::PROTOBUF_NAMESPACE_ID::internal::SCCInfo<0> scc_info_PackProperty_dmfile_2eproto = - {{ATOMIC_VAR_INIT(::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase::kUninitialized), 0, InitDefaultsscc_info_PackProperty_dmfile_2eproto}, {}}; - -static ::PROTOBUF_NAMESPACE_ID::Metadata file_level_metadata_dmfile_2eproto[2]; -static constexpr ::PROTOBUF_NAMESPACE_ID::EnumDescriptor const** file_level_enum_descriptors_dmfile_2eproto = nullptr; -static constexpr ::PROTOBUF_NAMESPACE_ID::ServiceDescriptor const** file_level_service_descriptors_dmfile_2eproto = nullptr; - -const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_dmfile_2eproto::offsets[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = { - PROTOBUF_FIELD_OFFSET(::dtpb::PackProperty, _has_bits_), - PROTOBUF_FIELD_OFFSET(::dtpb::PackProperty, _internal_metadata_), - ~0u, // no _extensions_ - ~0u, // no _oneof_case_ - ~0u, // no _weak_field_map_ - PROTOBUF_FIELD_OFFSET(::dtpb::PackProperty, gc_hint_version_), - PROTOBUF_FIELD_OFFSET(::dtpb::PackProperty, num_rows_), - 0, - 1, - PROTOBUF_FIELD_OFFSET(::dtpb::PackProperties, _has_bits_), - PROTOBUF_FIELD_OFFSET(::dtpb::PackProperties, _internal_metadata_), - ~0u, // no _extensions_ - ~0u, // no _oneof_case_ - ~0u, // no _weak_field_map_ - PROTOBUF_FIELD_OFFSET(::dtpb::PackProperties, property_), - ~0u, -}; -static const ::PROTOBUF_NAMESPACE_ID::internal::MigrationSchema schemas[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = { - { 0, 7, sizeof(::dtpb::PackProperty)}, - { 9, 15, sizeof(::dtpb::PackProperties)}, -}; - -static ::PROTOBUF_NAMESPACE_ID::Message const * const file_default_instances[] = { - reinterpret_cast(&::dtpb::_PackProperty_default_instance_), - reinterpret_cast(&::dtpb::_PackProperties_default_instance_), -}; - -const char descriptor_table_protodef_dmfile_2eproto[] = - "\n\014dmfile.proto\022\004dtpb\"9\n\014PackProperty\022\027\n\017" - "gc_hint_version\030\001 \002(\004\022\020\n\010num_rows\030\002 \002(\004\"" - "6\n\016PackProperties\022$\n\010property\030\001 \003(\0132\022.dt" - "pb.PackProperty" - ; -static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_dmfile_2eproto_deps[1] = { -}; -static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_dmfile_2eproto_sccs[2] = { - &scc_info_PackProperties_dmfile_2eproto.base, - &scc_info_PackProperty_dmfile_2eproto.base, -}; -static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_dmfile_2eproto_once; -static bool descriptor_table_dmfile_2eproto_initialized = false; -const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_dmfile_2eproto = { - &descriptor_table_dmfile_2eproto_initialized, descriptor_table_protodef_dmfile_2eproto, "dmfile.proto", 135, - &descriptor_table_dmfile_2eproto_once, descriptor_table_dmfile_2eproto_sccs, descriptor_table_dmfile_2eproto_deps, 2, 0, - schemas, file_default_instances, TableStruct_dmfile_2eproto::offsets, - file_level_metadata_dmfile_2eproto, 2, file_level_enum_descriptors_dmfile_2eproto, file_level_service_descriptors_dmfile_2eproto, -}; - -// Force running AddDescriptors() at dynamic initialization time. -static bool dynamic_init_dummy_dmfile_2eproto = ( ::PROTOBUF_NAMESPACE_ID::internal::AddDescriptors(&descriptor_table_dmfile_2eproto), true); -namespace dtpb { - -// =================================================================== - -void PackProperty::InitAsDefaultInstance() { -} -class PackProperty::HasBitSetters { - public: - using HasBits = decltype(std::declval()._has_bits_); - static void set_has_gc_hint_version(HasBits* has_bits) { - (*has_bits)[0] |= 1u; - } - static void set_has_num_rows(HasBits* has_bits) { - (*has_bits)[0] |= 2u; - } -}; - -#if !defined(_MSC_VER) || _MSC_VER >= 1900 -const int PackProperty::kGcHintVersionFieldNumber; -const int PackProperty::kNumRowsFieldNumber; -#endif // !defined(_MSC_VER) || _MSC_VER >= 1900 - -PackProperty::PackProperty() - : ::PROTOBUF_NAMESPACE_ID::Message(), _internal_metadata_(nullptr) { - SharedCtor(); - // @@protoc_insertion_point(constructor:dtpb.PackProperty) -} -PackProperty::PackProperty(const PackProperty& from) - : ::PROTOBUF_NAMESPACE_ID::Message(), - _internal_metadata_(nullptr), - _has_bits_(from._has_bits_) { - _internal_metadata_.MergeFrom(from._internal_metadata_); - ::memcpy(&gc_hint_version_, &from.gc_hint_version_, - static_cast(reinterpret_cast(&num_rows_) - - reinterpret_cast(&gc_hint_version_)) + sizeof(num_rows_)); - // @@protoc_insertion_point(copy_constructor:dtpb.PackProperty) -} - -void PackProperty::SharedCtor() { - ::memset(&gc_hint_version_, 0, static_cast( - reinterpret_cast(&num_rows_) - - reinterpret_cast(&gc_hint_version_)) + sizeof(num_rows_)); -} - -PackProperty::~PackProperty() { - // @@protoc_insertion_point(destructor:dtpb.PackProperty) - SharedDtor(); -} - -void PackProperty::SharedDtor() { -} - -void PackProperty::SetCachedSize(int size) const { - _cached_size_.Set(size); -} -const PackProperty& PackProperty::default_instance() { - ::PROTOBUF_NAMESPACE_ID::internal::InitSCC(&::scc_info_PackProperty_dmfile_2eproto.base); - return *internal_default_instance(); -} - - -void PackProperty::Clear() { -// @@protoc_insertion_point(message_clear_start:dtpb.PackProperty) - ::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0; - // Prevent compiler warnings about cached_has_bits being unused - (void) cached_has_bits; - - cached_has_bits = _has_bits_[0]; - if (cached_has_bits & 0x00000003u) { - ::memset(&gc_hint_version_, 0, static_cast( - reinterpret_cast(&num_rows_) - - reinterpret_cast(&gc_hint_version_)) + sizeof(num_rows_)); - } - _has_bits_.Clear(); - _internal_metadata_.Clear(); -} - -#if GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER -const char* PackProperty::_InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::internal::ParseContext* ctx) { -#define CHK_(x) if (PROTOBUF_PREDICT_FALSE(!(x))) goto failure - HasBitSetters::HasBits has_bits{}; - while (!ctx->Done(&ptr)) { - ::PROTOBUF_NAMESPACE_ID::uint32 tag; - ptr = ::PROTOBUF_NAMESPACE_ID::internal::ReadTag(ptr, &tag); - CHK_(ptr); - switch (tag >> 3) { - // required uint64 gc_hint_version = 1; - case 1: - if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 8)) { - HasBitSetters::set_has_gc_hint_version(&has_bits); - gc_hint_version_ = ::PROTOBUF_NAMESPACE_ID::internal::ReadVarint(&ptr); - CHK_(ptr); - } else goto handle_unusual; - continue; - // required uint64 num_rows = 2; - case 2: - if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 16)) { - HasBitSetters::set_has_num_rows(&has_bits); - num_rows_ = ::PROTOBUF_NAMESPACE_ID::internal::ReadVarint(&ptr); - CHK_(ptr); - } else goto handle_unusual; - continue; - default: { - handle_unusual: - if ((tag & 7) == 4 || tag == 0) { - ctx->SetLastTag(tag); - goto success; - } - ptr = UnknownFieldParse(tag, &_internal_metadata_, ptr, ctx); - CHK_(ptr != nullptr); - continue; - } - } // switch - } // while -success: - _has_bits_.Or(has_bits); - return ptr; -failure: - ptr = nullptr; - goto success; -#undef CHK_ -} -#else // GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER -bool PackProperty::MergePartialFromCodedStream( - ::PROTOBUF_NAMESPACE_ID::io::CodedInputStream* input) { -#define DO_(EXPRESSION) if (!PROTOBUF_PREDICT_TRUE(EXPRESSION)) goto failure - ::PROTOBUF_NAMESPACE_ID::uint32 tag; - // @@protoc_insertion_point(parse_start:dtpb.PackProperty) - for (;;) { - ::std::pair<::PROTOBUF_NAMESPACE_ID::uint32, bool> p = input->ReadTagWithCutoffNoLastTag(127u); - tag = p.first; - if (!p.second) goto handle_unusual; - switch (::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::GetTagFieldNumber(tag)) { - // required uint64 gc_hint_version = 1; - case 1: { - if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (8 & 0xFF)) { - HasBitSetters::set_has_gc_hint_version(&_has_bits_); - DO_((::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadPrimitive< - ::PROTOBUF_NAMESPACE_ID::uint64, ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::TYPE_UINT64>( - input, &gc_hint_version_))); - } else { - goto handle_unusual; - } - break; - } - - // required uint64 num_rows = 2; - case 2: { - if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (16 & 0xFF)) { - HasBitSetters::set_has_num_rows(&_has_bits_); - DO_((::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadPrimitive< - ::PROTOBUF_NAMESPACE_ID::uint64, ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::TYPE_UINT64>( - input, &num_rows_))); - } else { - goto handle_unusual; - } - break; - } - - default: { - handle_unusual: - if (tag == 0) { - goto success; - } - DO_(::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SkipField( - input, tag, _internal_metadata_.mutable_unknown_fields())); - break; - } - } - } -success: - // @@protoc_insertion_point(parse_success:dtpb.PackProperty) - return true; -failure: - // @@protoc_insertion_point(parse_failure:dtpb.PackProperty) - return false; -#undef DO_ -} -#endif // GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER - -void PackProperty::SerializeWithCachedSizes( - ::PROTOBUF_NAMESPACE_ID::io::CodedOutputStream* output) const { - // @@protoc_insertion_point(serialize_start:dtpb.PackProperty) - ::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0; - (void) cached_has_bits; - - cached_has_bits = _has_bits_[0]; - // required uint64 gc_hint_version = 1; - if (cached_has_bits & 0x00000001u) { - ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteUInt64(1, this->gc_hint_version(), output); - } - - // required uint64 num_rows = 2; - if (cached_has_bits & 0x00000002u) { - ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteUInt64(2, this->num_rows(), output); - } - - if (_internal_metadata_.have_unknown_fields()) { - ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SerializeUnknownFields( - _internal_metadata_.unknown_fields(), output); - } - // @@protoc_insertion_point(serialize_end:dtpb.PackProperty) -} - -::PROTOBUF_NAMESPACE_ID::uint8* PackProperty::InternalSerializeWithCachedSizesToArray( - ::PROTOBUF_NAMESPACE_ID::uint8* target) const { - // @@protoc_insertion_point(serialize_to_array_start:dtpb.PackProperty) - ::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0; - (void) cached_has_bits; - - cached_has_bits = _has_bits_[0]; - // required uint64 gc_hint_version = 1; - if (cached_has_bits & 0x00000001u) { - target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteUInt64ToArray(1, this->gc_hint_version(), target); - } - - // required uint64 num_rows = 2; - if (cached_has_bits & 0x00000002u) { - target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteUInt64ToArray(2, this->num_rows(), target); - } - - if (_internal_metadata_.have_unknown_fields()) { - target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SerializeUnknownFieldsToArray( - _internal_metadata_.unknown_fields(), target); - } - // @@protoc_insertion_point(serialize_to_array_end:dtpb.PackProperty) - return target; -} - -size_t PackProperty::RequiredFieldsByteSizeFallback() const { -// @@protoc_insertion_point(required_fields_byte_size_fallback_start:dtpb.PackProperty) - size_t total_size = 0; - - if (has_gc_hint_version()) { - // required uint64 gc_hint_version = 1; - total_size += 1 + - ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::UInt64Size( - this->gc_hint_version()); - } - - if (has_num_rows()) { - // required uint64 num_rows = 2; - total_size += 1 + - ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::UInt64Size( - this->num_rows()); - } - - return total_size; -} -size_t PackProperty::ByteSizeLong() const { -// @@protoc_insertion_point(message_byte_size_start:dtpb.PackProperty) - size_t total_size = 0; - - if (_internal_metadata_.have_unknown_fields()) { - total_size += - ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::ComputeUnknownFieldsSize( - _internal_metadata_.unknown_fields()); - } - if (((_has_bits_[0] & 0x00000003) ^ 0x00000003) == 0) { // All required fields are present. - // required uint64 gc_hint_version = 1; - total_size += 1 + - ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::UInt64Size( - this->gc_hint_version()); - - // required uint64 num_rows = 2; - total_size += 1 + - ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::UInt64Size( - this->num_rows()); - - } else { - total_size += RequiredFieldsByteSizeFallback(); - } - ::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0; - // Prevent compiler warnings about cached_has_bits being unused - (void) cached_has_bits; - - int cached_size = ::PROTOBUF_NAMESPACE_ID::internal::ToCachedSize(total_size); - SetCachedSize(cached_size); - return total_size; -} - -void PackProperty::MergeFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) { -// @@protoc_insertion_point(generalized_merge_from_start:dtpb.PackProperty) - GOOGLE_DCHECK_NE(&from, this); - const PackProperty* source = - ::PROTOBUF_NAMESPACE_ID::DynamicCastToGenerated( - &from); - if (source == nullptr) { - // @@protoc_insertion_point(generalized_merge_from_cast_fail:dtpb.PackProperty) - ::PROTOBUF_NAMESPACE_ID::internal::ReflectionOps::Merge(from, this); - } else { - // @@protoc_insertion_point(generalized_merge_from_cast_success:dtpb.PackProperty) - MergeFrom(*source); - } -} - -void PackProperty::MergeFrom(const PackProperty& from) { -// @@protoc_insertion_point(class_specific_merge_from_start:dtpb.PackProperty) - GOOGLE_DCHECK_NE(&from, this); - _internal_metadata_.MergeFrom(from._internal_metadata_); - ::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0; - (void) cached_has_bits; - - cached_has_bits = from._has_bits_[0]; - if (cached_has_bits & 0x00000003u) { - if (cached_has_bits & 0x00000001u) { - gc_hint_version_ = from.gc_hint_version_; - } - if (cached_has_bits & 0x00000002u) { - num_rows_ = from.num_rows_; - } - _has_bits_[0] |= cached_has_bits; - } -} - -void PackProperty::CopyFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) { -// @@protoc_insertion_point(generalized_copy_from_start:dtpb.PackProperty) - if (&from == this) return; - Clear(); - MergeFrom(from); -} - -void PackProperty::CopyFrom(const PackProperty& from) { -// @@protoc_insertion_point(class_specific_copy_from_start:dtpb.PackProperty) - if (&from == this) return; - Clear(); - MergeFrom(from); -} - -bool PackProperty::IsInitialized() const { - if ((_has_bits_[0] & 0x00000003) != 0x00000003) return false; - return true; -} - -void PackProperty::Swap(PackProperty* other) { - if (other == this) return; - InternalSwap(other); -} -void PackProperty::InternalSwap(PackProperty* other) { - using std::swap; - _internal_metadata_.Swap(&other->_internal_metadata_); - swap(_has_bits_[0], other->_has_bits_[0]); - swap(gc_hint_version_, other->gc_hint_version_); - swap(num_rows_, other->num_rows_); -} - -::PROTOBUF_NAMESPACE_ID::Metadata PackProperty::GetMetadata() const { - return GetMetadataStatic(); -} - - -// =================================================================== - -void PackProperties::InitAsDefaultInstance() { -} -class PackProperties::HasBitSetters { - public: - using HasBits = decltype(std::declval()._has_bits_); -}; - -#if !defined(_MSC_VER) || _MSC_VER >= 1900 -const int PackProperties::kPropertyFieldNumber; -#endif // !defined(_MSC_VER) || _MSC_VER >= 1900 - -PackProperties::PackProperties() - : ::PROTOBUF_NAMESPACE_ID::Message(), _internal_metadata_(nullptr) { - SharedCtor(); - // @@protoc_insertion_point(constructor:dtpb.PackProperties) -} -PackProperties::PackProperties(const PackProperties& from) - : ::PROTOBUF_NAMESPACE_ID::Message(), - _internal_metadata_(nullptr), - _has_bits_(from._has_bits_), - property_(from.property_) { - _internal_metadata_.MergeFrom(from._internal_metadata_); - // @@protoc_insertion_point(copy_constructor:dtpb.PackProperties) -} - -void PackProperties::SharedCtor() { - ::PROTOBUF_NAMESPACE_ID::internal::InitSCC(&scc_info_PackProperties_dmfile_2eproto.base); -} - -PackProperties::~PackProperties() { - // @@protoc_insertion_point(destructor:dtpb.PackProperties) - SharedDtor(); -} - -void PackProperties::SharedDtor() { -} - -void PackProperties::SetCachedSize(int size) const { - _cached_size_.Set(size); -} -const PackProperties& PackProperties::default_instance() { - ::PROTOBUF_NAMESPACE_ID::internal::InitSCC(&::scc_info_PackProperties_dmfile_2eproto.base); - return *internal_default_instance(); -} - - -void PackProperties::Clear() { -// @@protoc_insertion_point(message_clear_start:dtpb.PackProperties) - ::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0; - // Prevent compiler warnings about cached_has_bits being unused - (void) cached_has_bits; - - property_.Clear(); - _has_bits_.Clear(); - _internal_metadata_.Clear(); -} - -#if GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER -const char* PackProperties::_InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::internal::ParseContext* ctx) { -#define CHK_(x) if (PROTOBUF_PREDICT_FALSE(!(x))) goto failure - while (!ctx->Done(&ptr)) { - ::PROTOBUF_NAMESPACE_ID::uint32 tag; - ptr = ::PROTOBUF_NAMESPACE_ID::internal::ReadTag(ptr, &tag); - CHK_(ptr); - switch (tag >> 3) { - // repeated .dtpb.PackProperty property = 1; - case 1: - if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 10)) { - ptr -= 1; - do { - ptr += 1; - ptr = ctx->ParseMessage(add_property(), ptr); - CHK_(ptr); - if (!ctx->DataAvailable(ptr)) break; - } while (::PROTOBUF_NAMESPACE_ID::internal::UnalignedLoad<::PROTOBUF_NAMESPACE_ID::uint8>(ptr) == 10); - } else goto handle_unusual; - continue; - default: { - handle_unusual: - if ((tag & 7) == 4 || tag == 0) { - ctx->SetLastTag(tag); - goto success; - } - ptr = UnknownFieldParse(tag, &_internal_metadata_, ptr, ctx); - CHK_(ptr != nullptr); - continue; - } - } // switch - } // while -success: - return ptr; -failure: - ptr = nullptr; - goto success; -#undef CHK_ -} -#else // GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER -bool PackProperties::MergePartialFromCodedStream( - ::PROTOBUF_NAMESPACE_ID::io::CodedInputStream* input) { -#define DO_(EXPRESSION) if (!PROTOBUF_PREDICT_TRUE(EXPRESSION)) goto failure - ::PROTOBUF_NAMESPACE_ID::uint32 tag; - // @@protoc_insertion_point(parse_start:dtpb.PackProperties) - for (;;) { - ::std::pair<::PROTOBUF_NAMESPACE_ID::uint32, bool> p = input->ReadTagWithCutoffNoLastTag(127u); - tag = p.first; - if (!p.second) goto handle_unusual; - switch (::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::GetTagFieldNumber(tag)) { - // repeated .dtpb.PackProperty property = 1; - case 1: { - if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (10 & 0xFF)) { - DO_(::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadMessage( - input, add_property())); - } else { - goto handle_unusual; - } - break; - } - - default: { - handle_unusual: - if (tag == 0) { - goto success; - } - DO_(::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SkipField( - input, tag, _internal_metadata_.mutable_unknown_fields())); - break; - } - } - } -success: - // @@protoc_insertion_point(parse_success:dtpb.PackProperties) - return true; -failure: - // @@protoc_insertion_point(parse_failure:dtpb.PackProperties) - return false; -#undef DO_ -} -#endif // GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER - -void PackProperties::SerializeWithCachedSizes( - ::PROTOBUF_NAMESPACE_ID::io::CodedOutputStream* output) const { - // @@protoc_insertion_point(serialize_start:dtpb.PackProperties) - ::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0; - (void) cached_has_bits; - - // repeated .dtpb.PackProperty property = 1; - for (unsigned int i = 0, - n = static_cast(this->property_size()); i < n; i++) { - ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteMessageMaybeToArray( - 1, - this->property(static_cast(i)), - output); - } - - if (_internal_metadata_.have_unknown_fields()) { - ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SerializeUnknownFields( - _internal_metadata_.unknown_fields(), output); - } - // @@protoc_insertion_point(serialize_end:dtpb.PackProperties) -} - -::PROTOBUF_NAMESPACE_ID::uint8* PackProperties::InternalSerializeWithCachedSizesToArray( - ::PROTOBUF_NAMESPACE_ID::uint8* target) const { - // @@protoc_insertion_point(serialize_to_array_start:dtpb.PackProperties) - ::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0; - (void) cached_has_bits; - - // repeated .dtpb.PackProperty property = 1; - for (unsigned int i = 0, - n = static_cast(this->property_size()); i < n; i++) { - target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite:: - InternalWriteMessageToArray( - 1, this->property(static_cast(i)), target); - } - - if (_internal_metadata_.have_unknown_fields()) { - target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SerializeUnknownFieldsToArray( - _internal_metadata_.unknown_fields(), target); - } - // @@protoc_insertion_point(serialize_to_array_end:dtpb.PackProperties) - return target; -} - -size_t PackProperties::ByteSizeLong() const { -// @@protoc_insertion_point(message_byte_size_start:dtpb.PackProperties) - size_t total_size = 0; - - if (_internal_metadata_.have_unknown_fields()) { - total_size += - ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::ComputeUnknownFieldsSize( - _internal_metadata_.unknown_fields()); - } - ::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0; - // Prevent compiler warnings about cached_has_bits being unused - (void) cached_has_bits; - - // repeated .dtpb.PackProperty property = 1; - { - unsigned int count = static_cast(this->property_size()); - total_size += 1UL * count; - for (unsigned int i = 0; i < count; i++) { - total_size += - ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::MessageSize( - this->property(static_cast(i))); - } - } - - int cached_size = ::PROTOBUF_NAMESPACE_ID::internal::ToCachedSize(total_size); - SetCachedSize(cached_size); - return total_size; -} - -void PackProperties::MergeFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) { -// @@protoc_insertion_point(generalized_merge_from_start:dtpb.PackProperties) - GOOGLE_DCHECK_NE(&from, this); - const PackProperties* source = - ::PROTOBUF_NAMESPACE_ID::DynamicCastToGenerated( - &from); - if (source == nullptr) { - // @@protoc_insertion_point(generalized_merge_from_cast_fail:dtpb.PackProperties) - ::PROTOBUF_NAMESPACE_ID::internal::ReflectionOps::Merge(from, this); - } else { - // @@protoc_insertion_point(generalized_merge_from_cast_success:dtpb.PackProperties) - MergeFrom(*source); - } -} - -void PackProperties::MergeFrom(const PackProperties& from) { -// @@protoc_insertion_point(class_specific_merge_from_start:dtpb.PackProperties) - GOOGLE_DCHECK_NE(&from, this); - _internal_metadata_.MergeFrom(from._internal_metadata_); - ::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0; - (void) cached_has_bits; - - property_.MergeFrom(from.property_); -} - -void PackProperties::CopyFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) { -// @@protoc_insertion_point(generalized_copy_from_start:dtpb.PackProperties) - if (&from == this) return; - Clear(); - MergeFrom(from); -} - -void PackProperties::CopyFrom(const PackProperties& from) { -// @@protoc_insertion_point(class_specific_copy_from_start:dtpb.PackProperties) - if (&from == this) return; - Clear(); - MergeFrom(from); -} - -bool PackProperties::IsInitialized() const { - if (!::PROTOBUF_NAMESPACE_ID::internal::AllAreInitialized(this->property())) return false; - return true; -} - -void PackProperties::Swap(PackProperties* other) { - if (other == this) return; - InternalSwap(other); -} -void PackProperties::InternalSwap(PackProperties* other) { - using std::swap; - _internal_metadata_.Swap(&other->_internal_metadata_); - swap(_has_bits_[0], other->_has_bits_[0]); - CastToBase(&property_)->InternalSwap(CastToBase(&other->property_)); -} - -::PROTOBUF_NAMESPACE_ID::Metadata PackProperties::GetMetadata() const { - return GetMetadataStatic(); -} - - -// @@protoc_insertion_point(namespace_scope) -} // namespace dtpb -PROTOBUF_NAMESPACE_OPEN -template<> PROTOBUF_NOINLINE ::dtpb::PackProperty* Arena::CreateMaybeMessage< ::dtpb::PackProperty >(Arena* arena) { - return Arena::CreateInternal< ::dtpb::PackProperty >(arena); -} -template<> PROTOBUF_NOINLINE ::dtpb::PackProperties* Arena::CreateMaybeMessage< ::dtpb::PackProperties >(Arena* arena) { - return Arena::CreateInternal< ::dtpb::PackProperties >(arena); -} -PROTOBUF_NAMESPACE_CLOSE - -// @@protoc_insertion_point(global_scope) -#include diff --git a/dbms/src/Storages/DeltaMerge/File/dtpb/cpp/dtpb/dmfile.pb.h b/dbms/src/Storages/DeltaMerge/File/dtpb/cpp/dtpb/dmfile.pb.h deleted file mode 100644 index b4d2431065f..00000000000 --- a/dbms/src/Storages/DeltaMerge/File/dtpb/cpp/dtpb/dmfile.pb.h +++ /dev/null @@ -1,451 +0,0 @@ -// Generated by the protocol buffer compiler. DO NOT EDIT! -// source: dmfile.proto - -#ifndef GOOGLE_PROTOBUF_INCLUDED_dmfile_2eproto -#define GOOGLE_PROTOBUF_INCLUDED_dmfile_2eproto - -#include -#include - -#include -#if PROTOBUF_VERSION < 3008000 -#error This file was generated by a newer version of protoc which is -#error incompatible with your Protocol Buffer headers. Please update -#error your headers. -#endif -#if 3008000 < PROTOBUF_MIN_PROTOC_VERSION -#error This file was generated by an older version of protoc which is -#error incompatible with your Protocol Buffer headers. Please -#error regenerate this file with a newer version of protoc. -#endif - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include // IWYU pragma: export -#include // IWYU pragma: export -#include -// @@protoc_insertion_point(includes) -#include -#define PROTOBUF_INTERNAL_EXPORT_dmfile_2eproto -PROTOBUF_NAMESPACE_OPEN -namespace internal { -class AnyMetadata; -} // namespace internal -PROTOBUF_NAMESPACE_CLOSE - -// Internal implementation detail -- do not use these members. -struct TableStruct_dmfile_2eproto { - static const ::PROTOBUF_NAMESPACE_ID::internal::ParseTableField entries[] - PROTOBUF_SECTION_VARIABLE(protodesc_cold); - static const ::PROTOBUF_NAMESPACE_ID::internal::AuxillaryParseTableField aux[] - PROTOBUF_SECTION_VARIABLE(protodesc_cold); - static const ::PROTOBUF_NAMESPACE_ID::internal::ParseTable schema[2] - PROTOBUF_SECTION_VARIABLE(protodesc_cold); - static const ::PROTOBUF_NAMESPACE_ID::internal::FieldMetadata field_metadata[]; - static const ::PROTOBUF_NAMESPACE_ID::internal::SerializationTable serialization_table[]; - static const ::PROTOBUF_NAMESPACE_ID::uint32 offsets[]; -}; -extern const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_dmfile_2eproto; -namespace dtpb { -class PackProperties; -class PackPropertiesDefaultTypeInternal; -extern PackPropertiesDefaultTypeInternal _PackProperties_default_instance_; -class PackProperty; -class PackPropertyDefaultTypeInternal; -extern PackPropertyDefaultTypeInternal _PackProperty_default_instance_; -} // namespace dtpb -PROTOBUF_NAMESPACE_OPEN -template<> ::dtpb::PackProperties* Arena::CreateMaybeMessage<::dtpb::PackProperties>(Arena*); -template<> ::dtpb::PackProperty* Arena::CreateMaybeMessage<::dtpb::PackProperty>(Arena*); -PROTOBUF_NAMESPACE_CLOSE -namespace dtpb { - -// =================================================================== - -class PackProperty : - public ::PROTOBUF_NAMESPACE_ID::Message /* @@protoc_insertion_point(class_definition:dtpb.PackProperty) */ { - public: - PackProperty(); - virtual ~PackProperty(); - - PackProperty(const PackProperty& from); - PackProperty(PackProperty&& from) noexcept - : PackProperty() { - *this = ::std::move(from); - } - - inline PackProperty& operator=(const PackProperty& from) { - CopyFrom(from); - return *this; - } - inline PackProperty& operator=(PackProperty&& from) noexcept { - if (GetArenaNoVirtual() == from.GetArenaNoVirtual()) { - if (this != &from) InternalSwap(&from); - } else { - CopyFrom(from); - } - return *this; - } - - inline const ::PROTOBUF_NAMESPACE_ID::UnknownFieldSet& unknown_fields() const { - return _internal_metadata_.unknown_fields(); - } - inline ::PROTOBUF_NAMESPACE_ID::UnknownFieldSet* mutable_unknown_fields() { - return _internal_metadata_.mutable_unknown_fields(); - } - - static const ::PROTOBUF_NAMESPACE_ID::Descriptor* descriptor() { - return GetDescriptor(); - } - static const ::PROTOBUF_NAMESPACE_ID::Descriptor* GetDescriptor() { - return GetMetadataStatic().descriptor; - } - static const ::PROTOBUF_NAMESPACE_ID::Reflection* GetReflection() { - return GetMetadataStatic().reflection; - } - static const PackProperty& default_instance(); - - static void InitAsDefaultInstance(); // FOR INTERNAL USE ONLY - static inline const PackProperty* internal_default_instance() { - return reinterpret_cast( - &_PackProperty_default_instance_); - } - static constexpr int kIndexInFileMessages = - 0; - - void Swap(PackProperty* other); - friend void swap(PackProperty& a, PackProperty& b) { - a.Swap(&b); - } - - // implements Message ---------------------------------------------- - - inline PackProperty* New() const final { - return CreateMaybeMessage(nullptr); - } - - PackProperty* New(::PROTOBUF_NAMESPACE_ID::Arena* arena) const final { - return CreateMaybeMessage(arena); - } - void CopyFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final; - void MergeFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final; - void CopyFrom(const PackProperty& from); - void MergeFrom(const PackProperty& from); - PROTOBUF_ATTRIBUTE_REINITIALIZES void Clear() final; - bool IsInitialized() const final; - - size_t ByteSizeLong() const final; - #if GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER - const char* _InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::internal::ParseContext* ctx) final; - #else - bool MergePartialFromCodedStream( - ::PROTOBUF_NAMESPACE_ID::io::CodedInputStream* input) final; - #endif // GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER - void SerializeWithCachedSizes( - ::PROTOBUF_NAMESPACE_ID::io::CodedOutputStream* output) const final; - ::PROTOBUF_NAMESPACE_ID::uint8* InternalSerializeWithCachedSizesToArray( - ::PROTOBUF_NAMESPACE_ID::uint8* target) const final; - int GetCachedSize() const final { return _cached_size_.Get(); } - - private: - inline void SharedCtor(); - inline void SharedDtor(); - void SetCachedSize(int size) const final; - void InternalSwap(PackProperty* other); - friend class ::PROTOBUF_NAMESPACE_ID::internal::AnyMetadata; - static ::PROTOBUF_NAMESPACE_ID::StringPiece FullMessageName() { - return "dtpb.PackProperty"; - } - private: - inline ::PROTOBUF_NAMESPACE_ID::Arena* GetArenaNoVirtual() const { - return nullptr; - } - inline void* MaybeArenaPtr() const { - return nullptr; - } - public: - - ::PROTOBUF_NAMESPACE_ID::Metadata GetMetadata() const final; - private: - static ::PROTOBUF_NAMESPACE_ID::Metadata GetMetadataStatic() { - ::PROTOBUF_NAMESPACE_ID::internal::AssignDescriptors(&::descriptor_table_dmfile_2eproto); - return ::descriptor_table_dmfile_2eproto.file_level_metadata[kIndexInFileMessages]; - } - - public: - - // nested types ---------------------------------------------------- - - // accessors ------------------------------------------------------- - - // required uint64 gc_hint_version = 1; - bool has_gc_hint_version() const; - void clear_gc_hint_version(); - static const int kGcHintVersionFieldNumber = 1; - ::PROTOBUF_NAMESPACE_ID::uint64 gc_hint_version() const; - void set_gc_hint_version(::PROTOBUF_NAMESPACE_ID::uint64 value); - - // required uint64 num_rows = 2; - bool has_num_rows() const; - void clear_num_rows(); - static const int kNumRowsFieldNumber = 2; - ::PROTOBUF_NAMESPACE_ID::uint64 num_rows() const; - void set_num_rows(::PROTOBUF_NAMESPACE_ID::uint64 value); - - // @@protoc_insertion_point(class_scope:dtpb.PackProperty) - private: - class HasBitSetters; - - // helper for ByteSizeLong() - size_t RequiredFieldsByteSizeFallback() const; - - ::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_; - ::PROTOBUF_NAMESPACE_ID::internal::HasBits<1> _has_bits_; - mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_; - ::PROTOBUF_NAMESPACE_ID::uint64 gc_hint_version_; - ::PROTOBUF_NAMESPACE_ID::uint64 num_rows_; - friend struct ::TableStruct_dmfile_2eproto; -}; -// ------------------------------------------------------------------- - -class PackProperties : - public ::PROTOBUF_NAMESPACE_ID::Message /* @@protoc_insertion_point(class_definition:dtpb.PackProperties) */ { - public: - PackProperties(); - virtual ~PackProperties(); - - PackProperties(const PackProperties& from); - PackProperties(PackProperties&& from) noexcept - : PackProperties() { - *this = ::std::move(from); - } - - inline PackProperties& operator=(const PackProperties& from) { - CopyFrom(from); - return *this; - } - inline PackProperties& operator=(PackProperties&& from) noexcept { - if (GetArenaNoVirtual() == from.GetArenaNoVirtual()) { - if (this != &from) InternalSwap(&from); - } else { - CopyFrom(from); - } - return *this; - } - - inline const ::PROTOBUF_NAMESPACE_ID::UnknownFieldSet& unknown_fields() const { - return _internal_metadata_.unknown_fields(); - } - inline ::PROTOBUF_NAMESPACE_ID::UnknownFieldSet* mutable_unknown_fields() { - return _internal_metadata_.mutable_unknown_fields(); - } - - static const ::PROTOBUF_NAMESPACE_ID::Descriptor* descriptor() { - return GetDescriptor(); - } - static const ::PROTOBUF_NAMESPACE_ID::Descriptor* GetDescriptor() { - return GetMetadataStatic().descriptor; - } - static const ::PROTOBUF_NAMESPACE_ID::Reflection* GetReflection() { - return GetMetadataStatic().reflection; - } - static const PackProperties& default_instance(); - - static void InitAsDefaultInstance(); // FOR INTERNAL USE ONLY - static inline const PackProperties* internal_default_instance() { - return reinterpret_cast( - &_PackProperties_default_instance_); - } - static constexpr int kIndexInFileMessages = - 1; - - void Swap(PackProperties* other); - friend void swap(PackProperties& a, PackProperties& b) { - a.Swap(&b); - } - - // implements Message ---------------------------------------------- - - inline PackProperties* New() const final { - return CreateMaybeMessage(nullptr); - } - - PackProperties* New(::PROTOBUF_NAMESPACE_ID::Arena* arena) const final { - return CreateMaybeMessage(arena); - } - void CopyFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final; - void MergeFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final; - void CopyFrom(const PackProperties& from); - void MergeFrom(const PackProperties& from); - PROTOBUF_ATTRIBUTE_REINITIALIZES void Clear() final; - bool IsInitialized() const final; - - size_t ByteSizeLong() const final; - #if GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER - const char* _InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::internal::ParseContext* ctx) final; - #else - bool MergePartialFromCodedStream( - ::PROTOBUF_NAMESPACE_ID::io::CodedInputStream* input) final; - #endif // GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER - void SerializeWithCachedSizes( - ::PROTOBUF_NAMESPACE_ID::io::CodedOutputStream* output) const final; - ::PROTOBUF_NAMESPACE_ID::uint8* InternalSerializeWithCachedSizesToArray( - ::PROTOBUF_NAMESPACE_ID::uint8* target) const final; - int GetCachedSize() const final { return _cached_size_.Get(); } - - private: - inline void SharedCtor(); - inline void SharedDtor(); - void SetCachedSize(int size) const final; - void InternalSwap(PackProperties* other); - friend class ::PROTOBUF_NAMESPACE_ID::internal::AnyMetadata; - static ::PROTOBUF_NAMESPACE_ID::StringPiece FullMessageName() { - return "dtpb.PackProperties"; - } - private: - inline ::PROTOBUF_NAMESPACE_ID::Arena* GetArenaNoVirtual() const { - return nullptr; - } - inline void* MaybeArenaPtr() const { - return nullptr; - } - public: - - ::PROTOBUF_NAMESPACE_ID::Metadata GetMetadata() const final; - private: - static ::PROTOBUF_NAMESPACE_ID::Metadata GetMetadataStatic() { - ::PROTOBUF_NAMESPACE_ID::internal::AssignDescriptors(&::descriptor_table_dmfile_2eproto); - return ::descriptor_table_dmfile_2eproto.file_level_metadata[kIndexInFileMessages]; - } - - public: - - // nested types ---------------------------------------------------- - - // accessors ------------------------------------------------------- - - // repeated .dtpb.PackProperty property = 1; - int property_size() const; - void clear_property(); - static const int kPropertyFieldNumber = 1; - ::dtpb::PackProperty* mutable_property(int index); - ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::dtpb::PackProperty >* - mutable_property(); - const ::dtpb::PackProperty& property(int index) const; - ::dtpb::PackProperty* add_property(); - const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::dtpb::PackProperty >& - property() const; - - // @@protoc_insertion_point(class_scope:dtpb.PackProperties) - private: - class HasBitSetters; - - ::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_; - ::PROTOBUF_NAMESPACE_ID::internal::HasBits<1> _has_bits_; - mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_; - ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::dtpb::PackProperty > property_; - friend struct ::TableStruct_dmfile_2eproto; -}; -// =================================================================== - - -// =================================================================== - -#ifdef __GNUC__ - #pragma GCC diagnostic push - #pragma GCC diagnostic ignored "-Wstrict-aliasing" -#endif // __GNUC__ -// PackProperty - -// required uint64 gc_hint_version = 1; -inline bool PackProperty::has_gc_hint_version() const { - return (_has_bits_[0] & 0x00000001u) != 0; -} -inline void PackProperty::clear_gc_hint_version() { - gc_hint_version_ = PROTOBUF_ULONGLONG(0); - _has_bits_[0] &= ~0x00000001u; -} -inline ::PROTOBUF_NAMESPACE_ID::uint64 PackProperty::gc_hint_version() const { - // @@protoc_insertion_point(field_get:dtpb.PackProperty.gc_hint_version) - return gc_hint_version_; -} -inline void PackProperty::set_gc_hint_version(::PROTOBUF_NAMESPACE_ID::uint64 value) { - _has_bits_[0] |= 0x00000001u; - gc_hint_version_ = value; - // @@protoc_insertion_point(field_set:dtpb.PackProperty.gc_hint_version) -} - -// required uint64 num_rows = 2; -inline bool PackProperty::has_num_rows() const { - return (_has_bits_[0] & 0x00000002u) != 0; -} -inline void PackProperty::clear_num_rows() { - num_rows_ = PROTOBUF_ULONGLONG(0); - _has_bits_[0] &= ~0x00000002u; -} -inline ::PROTOBUF_NAMESPACE_ID::uint64 PackProperty::num_rows() const { - // @@protoc_insertion_point(field_get:dtpb.PackProperty.num_rows) - return num_rows_; -} -inline void PackProperty::set_num_rows(::PROTOBUF_NAMESPACE_ID::uint64 value) { - _has_bits_[0] |= 0x00000002u; - num_rows_ = value; - // @@protoc_insertion_point(field_set:dtpb.PackProperty.num_rows) -} - -// ------------------------------------------------------------------- - -// PackProperties - -// repeated .dtpb.PackProperty property = 1; -inline int PackProperties::property_size() const { - return property_.size(); -} -inline void PackProperties::clear_property() { - property_.Clear(); -} -inline ::dtpb::PackProperty* PackProperties::mutable_property(int index) { - // @@protoc_insertion_point(field_mutable:dtpb.PackProperties.property) - return property_.Mutable(index); -} -inline ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::dtpb::PackProperty >* -PackProperties::mutable_property() { - // @@protoc_insertion_point(field_mutable_list:dtpb.PackProperties.property) - return &property_; -} -inline const ::dtpb::PackProperty& PackProperties::property(int index) const { - // @@protoc_insertion_point(field_get:dtpb.PackProperties.property) - return property_.Get(index); -} -inline ::dtpb::PackProperty* PackProperties::add_property() { - // @@protoc_insertion_point(field_add:dtpb.PackProperties.property) - return property_.Add(); -} -inline const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::dtpb::PackProperty >& -PackProperties::property() const { - // @@protoc_insertion_point(field_list:dtpb.PackProperties.property) - return property_; -} - -#ifdef __GNUC__ - #pragma GCC diagnostic pop -#endif // __GNUC__ -// ------------------------------------------------------------------- - - -// @@protoc_insertion_point(namespace_scope) - -} // namespace dtpb - -// @@protoc_insertion_point(global_scope) - -#include -#endif // GOOGLE_PROTOBUF_INCLUDED_GOOGLE_PROTOBUF_INCLUDED_dmfile_2eproto diff --git a/dbms/src/Storages/DeltaMerge/File/dtpb/generate_cpp.sh b/dbms/src/Storages/DeltaMerge/File/dtpb/generate_cpp.sh deleted file mode 100755 index 9400dbf0450..00000000000 --- a/dbms/src/Storages/DeltaMerge/File/dtpb/generate_cpp.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/bash -set -euo pipefail - -rm -rf cpp/dtpb && mkdir -p cpp/dtpb -echo "generate cpp code..." -protoc -I=./proto --cpp_out=./cpp/dtpb ./proto/*.proto diff --git a/dbms/src/Storages/DeltaMerge/File/dtpb/proto/dmfile.proto b/dbms/src/Storages/DeltaMerge/File/dtpb/proto/dmfile.proto deleted file mode 100644 index 4ba188238c8..00000000000 --- a/dbms/src/Storages/DeltaMerge/File/dtpb/proto/dmfile.proto +++ /dev/null @@ -1,14 +0,0 @@ -syntax = "proto2"; - -package dtpb; - -message PackProperty { - // when gc_safe_point exceed this version, there must be some data obsolete in this pack - required uint64 gc_hint_version = 1; - // effective rows(multiple versions of one row is count as one include delete) - required uint64 num_rows = 2; -} - -message PackProperties { - repeated PackProperty property = 1; -} diff --git a/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.cpp index f3152312326..7e07de53e39 100644 --- a/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.cpp @@ -238,11 +238,9 @@ const RegionPtr BoundedSSTFilesToBlockInputStream::getRegion() const return _raw_child->region; } -std::tuple // -BoundedSSTFilesToBlockInputStream::getMvccStatistics() const +size_t BoundedSSTFilesToBlockInputStream::getMvccStatistics() const { - return std::make_tuple( - mvcc_compact_stream->getEffectiveNumRows(), mvcc_compact_stream->getNotCleanRows(), mvcc_compact_stream->getGCHintVersion()); + return mvcc_compact_stream->getNotCleanRows(); } } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h b/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h index 97585d3091c..210896dade1 100644 --- a/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h @@ -120,8 +120,8 @@ class BoundedSSTFilesToBlockInputStream final const RegionPtr getRegion() const; - // Return values: (effective rows, not clean rows, gc hint version) - std::tuple getMvccStatistics() const; + // Return values: not clean rows + size_t getMvccStatistics() const; private: const ColId pk_column_id; diff --git a/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp b/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp index c2fa9c3099c..be0c61d6523 100644 --- a/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp @@ -129,10 +129,7 @@ bool SSTFilesToDTFilesOutputStream::newDTFileStream() void SSTFilesToDTFilesOutputStream::write() { - size_t last_effective_num_rows = 0; - size_t last_not_clean_rows = 0; - size_t cur_effective_num_rows = 0; - size_t cur_not_clean_rows = 0; + size_t last_not_clean_rows = 0; while (true) { @@ -177,16 +174,11 @@ void SSTFilesToDTFilesOutputStream::write() } // Write block to the output stream - DMFileBlockOutputStream::BlockProperty property; - std::tie(cur_effective_num_rows, cur_not_clean_rows, property.gc_hint_version) // - = child->getMvccStatistics(); - property.effective_num_rows = cur_effective_num_rows - last_effective_num_rows; - property.not_clean_rows = cur_not_clean_rows - last_not_clean_rows; - dt_stream->write(block, property); + auto cur_not_clean_rows = child->getMvccStatistics(); + dt_stream->write(block, cur_not_clean_rows - last_not_clean_rows); commit_rows += block.rows(); - last_effective_num_rows = cur_effective_num_rows; - last_not_clean_rows = cur_not_clean_rows; + last_not_clean_rows = cur_not_clean_rows; } } diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 631cda19001..90c164a96f1 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -89,13 +89,9 @@ DMFilePtr writeIntoNewDMFile(DMContext & dm_context, // output_stream->writePrefix(); while (true) { - size_t last_effective_num_rows = 0; - size_t last_not_clean_rows = 0; + size_t last_not_clean_rows = 0; if (mvcc_stream) - { - last_effective_num_rows = mvcc_stream->getEffectiveNumRows(); - last_not_clean_rows = mvcc_stream->getNotCleanRows(); - } + last_not_clean_rows = mvcc_stream->getNotCleanRows(); Block block = input_stream->read(); if (!block) @@ -103,22 +99,11 @@ DMFilePtr writeIntoNewDMFile(DMContext & dm_context, // if (!block.rows()) continue; - // When the input_stream is not mvcc, we assume the rows in this input_stream is most valid and make it not tend to be gc. - size_t cur_effective_num_rows = block.rows(); - size_t cur_not_clean_rows = 1; - size_t gc_hint_version = UINT64_MAX; + size_t cur_not_clean_rows = 1; if (mvcc_stream) - { - cur_effective_num_rows = mvcc_stream->getEffectiveNumRows(); - cur_not_clean_rows = mvcc_stream->getNotCleanRows(); - gc_hint_version = mvcc_stream->getGCHintVersion(); - } + cur_not_clean_rows = mvcc_stream->getNotCleanRows(); - DMFileBlockOutputStream::BlockProperty block_property; - block_property.effective_num_rows = cur_effective_num_rows - last_effective_num_rows; - block_property.not_clean_rows = cur_not_clean_rows - last_not_clean_rows; - block_property.gc_hint_version = gc_hint_version; - output_stream->write(block, block_property); + output_stream->write(block, cur_not_clean_rows - last_not_clean_rows); } input_stream->readSuffix(); @@ -594,9 +579,6 @@ SegmentPtr Segment::applyMergeDelta(DMContext & context, new_delta, new_stable); - // avoid recheck whether to do DeltaMerge using the same gc_safe_point - new_me->setLastCheckGCSafePoint(context.min_version); - // Store new meta data new_me->serialize(wbs.meta); @@ -1083,10 +1065,7 @@ StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, auto left_stream = getStream(left, left_snap); auto right_stream = getStream(right, right_snap); - BlockInputStreamPtr merged_stream = std::make_shared(BlockInputStreams{left_stream, right_stream}); - // for the purpose to calculate StableProperty of the new segment - merged_stream = std::make_shared>( - merged_stream, *schema_snap, dm_context.min_version, dm_context.is_common_handle); + auto merged_stream = std::make_shared(BlockInputStreams{left_stream, right_stream}); auto merged_stable_id = left->stable->getId(); auto merged_stable = createNewStable(dm_context, schema_snap, merged_stream, merged_stable_id, wbs, need_rate_limit); diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index 8367069359f..7085173fe6e 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -243,10 +243,6 @@ class Segment : private boost::noncopyable RowsAndBytes getRowsAndBytesInRange(DMContext & dm_context, const SegmentSnapshotPtr & segment_snap, const RowKeyRange & check_range, bool is_exact); - DB::Timestamp getLastCheckGCSafePoint() { return last_check_gc_safe_point.load(std::memory_order_relaxed); } - - void setLastCheckGCSafePoint(DB::Timestamp gc_safe_point) { last_check_gc_safe_point.store(gc_safe_point, std::memory_order_relaxed); } - private: ReadInfo getReadInfo(const DMContext & dm_context, const ColumnDefines & read_columns, @@ -324,8 +320,6 @@ class Segment : private boost::noncopyable const PageId segment_id; const PageId next_segment_id; - std::atomic last_check_gc_safe_point = 0; - const DeltaValueSpacePtr delta; const StableValueSpacePtr stable; diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index ece6d2e8bb3..d5433620b9f 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -1,8 +1,6 @@ #include -#include #include #include -#include #include #include #include @@ -10,12 +8,6 @@ namespace DB { - -namespace ErrorCodes -{ -extern const int LOGICAL_ERROR; -} - namespace DM { @@ -150,122 +142,6 @@ void StableValueSpace::recordRemovePacksPages(WriteBatches & wbs) const } } -void StableValueSpace::calculateStableProperty(const DMContext & context, const RowKeyRange & rowkey_range, bool is_common_handle) -{ - property.gc_hint_version = std::numeric_limits::max(); - property.num_versions = 0; - property.num_puts = 0; - property.num_rows = 0; - for (size_t i = 0; i < files.size(); i++) - { - auto & file = files[i]; - auto & pack_stats = file->getPackStats(); - auto & pack_properties = file->getPackProperties(); - if (pack_stats.empty()) - continue; - // if PackPropertys of this DMFile is empty, this must be an old format file generated by previous version. - // so we need to create file property for this file. - // but to keep dmfile immutable, we just cache the result in memory. - // - // `new_pack_properties` is the temporary container for the calculation result of this StableValueSpace's pack property. - // Note that `pack_stats` stores the stat of the whole underlying DTFile, - // and this Segment may share this DTFile with other Segment. So `pack_stats` may be larger than `new_pack_properties`. - DMFile::PackProperties new_pack_properties; - if (pack_properties.property_size() == 0) - { - LOG_DEBUG(log, "Try to calculate StableProperty from column data for stable " << id); - ColumnDefines read_columns; - read_columns.emplace_back(getExtraHandleColumnDefine(is_common_handle)); - read_columns.emplace_back(getVersionColumnDefine()); - read_columns.emplace_back(getTagColumnDefine()); - // Note we `RowKeyRange::newAll` instead of `segment_range`, - // because we need to calculate StableProperty based on the whole DTFile, - // and then use related info for this StableValueSpace. - // - // If we pass `segment_range` instead, - // then the returned stream is a `SkippableBlockInputStream` which will complicate the implementation - BlockInputStreamPtr data_stream - = std::make_shared(context.db_context, - std::numeric_limits::max(), - false, - file, - read_columns, - rowkey_range, - nullptr, - nullptr, - IdSetPtr{}, - UINT64_MAX, // because we just read one pack at a time - true); - auto mvcc_stream = std::make_shared>( - data_stream, read_columns, 0, is_common_handle); - mvcc_stream->readPrefix(); - while (true) - { - size_t last_effective_num_rows = mvcc_stream->getEffectiveNumRows(); - - Block block = mvcc_stream->read(); - if (!block) - break; - if (!block.rows()) - continue; - - size_t cur_effective_num_rows = mvcc_stream->getEffectiveNumRows(); - size_t gc_hint_version = mvcc_stream->getGCHintVersion(); - auto * pack_property = new_pack_properties.add_property(); - pack_property->set_num_rows(cur_effective_num_rows - last_effective_num_rows); - pack_property->set_gc_hint_version(gc_hint_version); - } - mvcc_stream->readSuffix(); - } - auto pack_filter = DMFilePackFilter::loadFrom(file, - context.db_context.getGlobalContext().getMinMaxIndexCache(), - rowkey_range, - EMPTY_FILTER, - {}, - context.db_context.getFileProvider()); - auto & use_packs = pack_filter.getUsePacks(); - size_t new_pack_properties_index = 0; - bool use_new_pack_properties = pack_properties.property_size() == 0; - if (use_new_pack_properties) - { - size_t use_packs_count = 0; - for (auto is_used : use_packs) - { - if (is_used) - use_packs_count += 1; - } - if (unlikely((size_t)new_pack_properties.property_size() != use_packs_count)) - { - throw Exception("new_pack_propertys size " + std::to_string(new_pack_properties.property_size()) - + " doesn't match use packs size " + std::to_string(use_packs_count), - ErrorCodes::LOGICAL_ERROR); - } - } - for (size_t pack_id = 0; pack_id < use_packs.size(); pack_id++) - { - if (!use_packs[pack_id]) - continue; - property.num_versions += pack_stats[pack_id].rows; - property.num_puts += pack_stats[pack_id].rows - pack_stats[pack_id].not_clean; - if (use_new_pack_properties) - { - auto & pack_property = new_pack_properties.property(new_pack_properties_index); - property.num_rows += pack_property.num_rows(); - property.gc_hint_version = std::min(property.gc_hint_version, pack_property.gc_hint_version()); - new_pack_properties_index += 1; - } - else - { - auto & pack_property = pack_properties.property(pack_id); - property.num_rows += pack_property.num_rows(); - property.gc_hint_version = std::min(property.gc_hint_version, pack_property.gc_hint_version()); - } - } - } - is_property_cached.store(true, std::memory_order_release); -} - - // ================================================ // StableValueSpace::Snapshot // ================================================ diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.h b/dbms/src/Storages/DeltaMerge/StableValueSpace.h index d438be335d8..a3d6c750b34 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.h @@ -45,31 +45,6 @@ class StableValueSpace : public std::enable_shared_from_this void recordRemovePacksPages(WriteBatches & wbs) const; - bool isStablePropertyCached() const { return is_property_cached.load(std::memory_order_acquire); } - - struct StableProperty - { - // when gc_safe_point exceed this version, there must be some data obsolete - UInt64 gc_hint_version; - // number of rows including all puts and deletes - UInt64 num_versions; - // number of visible rows using the latest timestamp - UInt64 num_puts; - // number of rows having at least one version(include delete) - UInt64 num_rows; - - const String toDebugString() const - { - return "StableProperty: gc_hint_version [" + std::to_string(this->gc_hint_version) + "] num_versions [" - + std::to_string(this->num_versions) + "] num_puts[" + std::to_string(this->num_puts) + "] num_rows[" - + std::to_string(this->num_rows) + "]"; - } - }; - - const StableProperty & getStableProperty() const { return property; } - - void calculateStableProperty(const DMContext & context, const RowKeyRange & rowkey_range, bool is_common_handle); - struct Snapshot; using SnapshotPtr = std::shared_ptr; @@ -150,9 +125,6 @@ class StableValueSpace : public std::enable_shared_from_this UInt64 valid_bytes; DMFiles files; - StableProperty property; - std::atomic is_property_cached = false; - Logger * log; }; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp index def30faa33f..065af0a5450 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp @@ -152,15 +152,6 @@ try const size_t num_rows_write = 128; - DMFileBlockOutputStream::BlockProperty block_property1; - block_property1.effective_num_rows = 1; - block_property1.gc_hint_version = 1; - DMFileBlockOutputStream::BlockProperty block_property2; - block_property2.effective_num_rows = 2; - block_property2.gc_hint_version = 2; - std::vector block_propertys; - block_propertys.push_back(block_property1); - block_propertys.push_back(block_property2); { // Prepare for write // Block 1: [0, 64) @@ -169,11 +160,9 @@ try Block block2 = DMTestEnv::prepareSimpleWriteBlock(num_rows_write / 2, num_rows_write, false); auto stream = std::make_shared(dbContext(), dm_file, *cols); stream->writePrefix(); - stream->write(block1, block_property1); - stream->write(block2, block_property2); + stream->write(block1, 0); + stream->write(block2, 0); stream->writeSuffix(); - - ASSERT_EQ(dm_file->getPackProperties().property_size(), 2); } @@ -210,17 +199,10 @@ try /// Test restore the file from disk and read { - dm_file = restoreDMFile(); - - // Test dt property read success - auto propertys = dm_file->getPackProperties(); - ASSERT_EQ(propertys.property_size(), 2); - for (int i = 0; i < propertys.property_size(); i++) - { - auto & property = propertys.property(i); - ASSERT_EQ((size_t)property.num_rows(), (size_t)block_propertys[i].effective_num_rows); - ASSERT_EQ((size_t)property.gc_hint_version(), (size_t)block_propertys[i].effective_num_rows); - } + auto id = dm_file->fileId(); + dm_file.reset(); + auto file_provider = dbContext().getFileProvider(); + dm_file = DMFile::restore(file_provider, id, 0, parent_path, /*read_meta=*/true); } { // Test read after restore @@ -280,11 +262,9 @@ try Block block1 = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write / 2, false); Block block2 = DMTestEnv::prepareSimpleWriteBlock(num_rows_write / 2, num_rows_write, false); auto stream = std::make_shared(dbContext(), dm_file, *cols); - - DMFileBlockOutputStream::BlockProperty block_property; stream->writePrefix(); - stream->write(block1, block_property); - stream->write(block2, block_property); + stream->write(block1, 0); + stream->write(block2, 0); stream->writeSuffix(); } @@ -324,11 +304,9 @@ try Block block1 = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write / 2, false); Block block2 = DMTestEnv::prepareSimpleWriteBlock(num_rows_write / 2, num_rows_write, false); auto stream = std::make_shared(dbContext(), dm_file, *cols); - - DMFileBlockOutputStream::BlockProperty block_property; stream->writePrefix(); - stream->write(block1, block_property); - stream->write(block2, block_property); + stream->write(block1, 0); + stream->write(block2, 0); stream->writeSuffix(); } @@ -394,11 +372,9 @@ try Block block1 = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write / 2, false); Block block2 = DMTestEnv::prepareSimpleWriteBlock(num_rows_write / 2, num_rows_write, false); auto stream = std::make_shared(dbContext(), dm_file, *cols); - - DMFileBlockOutputStream::BlockProperty block_property; stream->writePrefix(); - stream->write(block1, block_property); - stream->write(block2, block_property); + stream->write(block1, 0); + stream->write(block2, 0); stream->writeSuffix(); } @@ -468,13 +444,11 @@ try auto stream = std::make_shared(dbContext(), dm_file, *cols); stream->writePrefix(); size_t pk_beg = 0; - - DMFileBlockOutputStream::BlockProperty block_property; for (size_t i = 0; i < nparts; ++i) { auto pk_end = (i == nparts - 1) ? num_rows_write : (pk_beg + num_rows_write / nparts); Block block = DMTestEnv::prepareSimpleWriteBlock(pk_beg, pk_end, false); - stream->write(block, block_property); + stream->write(block, 0); pk_beg += num_rows_write / nparts; } stream->writeSuffix(); @@ -569,8 +543,6 @@ try { // Prepare some packs in DMFile auto stream = std::make_shared(dbContext(), dm_file, *cols); - - DMFileBlockOutputStream::BlockProperty block_property; stream->writePrefix(); size_t pk_beg = 0; for (size_t i = 0; i < nparts; ++i) @@ -586,7 +558,7 @@ try ColumnWithTypeAndName i64(std::move(col), i64_cd.type, i64_cd.name, i64_cd.id); block.insert(i64); - stream->write(block, block_property); + stream->write(block, 0); pk_beg += num_rows_write / nparts; } stream->writeSuffix(); @@ -673,8 +645,6 @@ try { // Prepare some packs in DMFile auto stream = std::make_shared(dbContext(), dm_file, *cols); - - DMFileBlockOutputStream::BlockProperty block_property; stream->writePrefix(); size_t pk_beg = 0; for (size_t i = 0; i < nparts; ++i) @@ -690,7 +660,7 @@ try ColumnWithTypeAndName i64(std::move(col), i64_cd.type, i64_cd.name, i64_cd.id); block.insert(i64); - stream->write(block, block_property); + stream->write(block, 0); pk_beg += num_rows_write / nparts; } stream->writeSuffix(); @@ -772,15 +742,13 @@ try { // Prepare some packs in DMFile auto stream = std::make_shared(dbContext(), dm_file, *cols); - - DMFileBlockOutputStream::BlockProperty block_property; stream->writePrefix(); size_t pk_beg = 0; for (size_t i = 0; i < nparts; ++i) { auto pk_end = (i == nparts - 1) ? num_rows_write : (pk_beg + num_rows_write / nparts); Block block = DMTestEnv::prepareSimpleWriteBlock(pk_beg, pk_end, false); - stream->write(block, block_property); + stream->write(block, 0); pk_beg += num_rows_write / nparts; } stream->writeSuffix(); @@ -895,10 +863,8 @@ try block.insert(f64); auto stream = std::make_unique(dbContext(), dm_file, *cols); - - DMFileBlockOutputStream::BlockProperty block_property; stream->writePrefix(); - stream->write(block, block_property); + stream->write(block, 0); stream->writeSuffix(); } @@ -964,10 +930,8 @@ try block.insert(str); auto stream = std::make_unique(dbContext(), dm_file, *cols); - - DMFileBlockOutputStream::BlockProperty block_property; stream->writePrefix(); - stream->write(block, block_property); + stream->write(block, 0); stream->writeSuffix(); } @@ -1033,10 +997,8 @@ try block.insert(nullable_col); auto stream = std::make_shared(dbContext(), dm_file, *cols); - - DMFileBlockOutputStream::BlockProperty block_property; stream->writePrefix(); - stream->write(block, block_property); + stream->write(block, 0); stream->writeSuffix(); } @@ -1202,11 +1164,9 @@ try is_common_handle, rowkey_column_size); auto stream = std::make_shared(dbContext(), dm_file, *cols); - - DMFileBlockOutputStream::BlockProperty block_property; stream->writePrefix(); - stream->write(block1, block_property); - stream->write(block2, block_property); + stream->write(block1, 0); + stream->write(block2, 0); stream->writeSuffix(); } @@ -1256,8 +1216,6 @@ try { // Prepare some packs in DMFile auto stream = std::make_shared(dbContext(), dm_file, *cols); - - DMFileBlockOutputStream::BlockProperty block_property; stream->writePrefix(); size_t pk_beg = 0; for (size_t i = 0; i < nparts; ++i) @@ -1272,7 +1230,7 @@ try EXTRA_HANDLE_COLUMN_STRING_TYPE, is_common_handle, rowkey_column_size); - stream->write(block, block_property); + stream->write(block, 0); pk_beg += num_rows_write / nparts; } stream->writeSuffix(); @@ -1384,9 +1342,8 @@ class DMFile_DDL_Test : public DMFile_Test block.insert(f64); auto stream = std::make_unique(dbContext(), dm_file, *cols_before_ddl); - DMFileBlockOutputStream::BlockProperty block_property; stream->writePrefix(); - stream->write(block, block_property); + stream->write(block, 0); stream->writeSuffix(); return {num_rows_write, *cols_before_ddl}; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index a390f4d4c4d..f3480cd6719 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -1567,97 +1567,6 @@ try } CATCH -TEST_F(Segment_test, CalculateDTFileProperty) -try -{ - Settings settings = dmContext().db_context.getSettings(); - settings.dt_segment_stable_pack_rows = 10; - - segment = reload(DMTestEnv::getDefaultColumns(), std::move(settings)); - - const size_t num_rows_write_every_round = 100; - const size_t write_round = 3; - const size_t tso = 10000; - for (size_t i = 0; i < write_round; i++) - { - size_t start = num_rows_write_every_round * i; - Block block = DMTestEnv::prepareSimpleWriteBlock(start, start + num_rows_write_every_round, false, tso); - // write to segment - segment->write(dmContext(), block); - segment = segment->mergeDelta(dmContext(), tableColumns()); - } - - { - auto & stable = segment->getStable(); - ASSERT_GT(stable->getDMFiles()[0]->getPacks(), (size_t)1); - ASSERT_EQ(stable->getRows(), num_rows_write_every_round * write_round); - // caculate StableProperty - ASSERT_EQ(stable->isStablePropertyCached(), false); - auto start = RowKeyValue::fromHandle(0); - auto end = RowKeyValue::fromHandle(num_rows_write_every_round); - RowKeyRange range(start, end, false, 1); - // calculate the StableProperty for packs in the key range [0, num_rows_write_every_round) - stable->calculateStableProperty(dmContext(), range, false); - ASSERT_EQ(stable->isStablePropertyCached(), true); - auto & property = stable->getStableProperty(); - ASSERT_EQ(property.gc_hint_version, UINT64_MAX); - ASSERT_EQ(property.num_versions, num_rows_write_every_round); - ASSERT_EQ(property.num_puts, num_rows_write_every_round); - ASSERT_EQ(property.num_rows, num_rows_write_every_round); - } -} -CATCH - -TEST_F(Segment_test, CalculateDTFilePropertyWithPropertyFileDeleted) -try -{ - Settings settings = dmContext().db_context.getSettings(); - settings.dt_segment_stable_pack_rows = 10; - - segment = reload(DMTestEnv::getDefaultColumns(), std::move(settings)); - - const size_t num_rows_write_every_round = 100; - const size_t write_round = 3; - const size_t tso = 10000; - for (size_t i = 0; i < write_round; i++) - { - size_t start = num_rows_write_every_round * i; - Block block = DMTestEnv::prepareSimpleWriteBlock(start, start + num_rows_write_every_round, false, tso); - // write to segment - segment->write(dmContext(), block); - segment = segment->mergeDelta(dmContext(), tableColumns()); - } - - { - auto & stable = segment->getStable(); - auto & dmfiles = stable->getDMFiles(); - ASSERT_GT(dmfiles[0]->getPacks(), (size_t)1); - auto & dmfile = dmfiles[0]; - auto file_path = dmfile->path(); - // check property file exists and then delete it - ASSERT_EQ(Poco::File(file_path + "/property").exists(), true); - Poco::File(file_path + "/property").remove(); - ASSERT_EQ(Poco::File(file_path + "/property").exists(), false); - // clear PackProperties to force it to calculate from scratch - dmfile->getPackProperties().clear_property(); - ASSERT_EQ(dmfile->getPackProperties().property_size(), 0); - // caculate StableProperty - ASSERT_EQ(stable->isStablePropertyCached(), false); - auto start = RowKeyValue::fromHandle(0); - auto end = RowKeyValue::fromHandle(num_rows_write_every_round); - RowKeyRange range(start, end, false, 1); - // calculate the StableProperty for packs in the key range [0, num_rows_write_every_round) - stable->calculateStableProperty(dmContext(), range, false); - ASSERT_EQ(stable->isStablePropertyCached(), true); - auto & property = stable->getStableProperty(); - ASSERT_EQ(property.gc_hint_version, UINT64_MAX); - ASSERT_EQ(property.num_versions, num_rows_write_every_round); - ASSERT_EQ(property.num_puts, num_rows_write_every_round); - ASSERT_EQ(property.num_rows, num_rows_write_every_round); - } -} -CATCH - INSTANTIATE_TEST_CASE_P(SegmentWriteType, Segment_DDL_test, ::testing::Combine( // diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_version_filter.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_version_filter.cpp index e78a074c38f..aaace769dbd 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_version_filter.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_version_filter.cpp @@ -172,208 +172,8 @@ TEST(VersionFilter_test, MVCCCommonHandle) TEST(VersionFilter_test, Compact) { - // TODO: currently it just test data statistics, add test for data correctness BlocksList blocks; - - { - Int64 pk_value = 4; - blocks.push_back(DMTestEnv::prepareOneRowBlock(pk_value, 10, 0, str_col_name, "hello", false, 1)); - blocks.push_back(DMTestEnv::prepareOneRowBlock(pk_value, 20, 0, str_col_name, "world", false, 1)); - blocks.push_back(DMTestEnv::prepareOneRowBlock(pk_value, 30, 1, str_col_name, "", false, 1)); - blocks.push_back(DMTestEnv::prepareOneRowBlock(pk_value, 40, 0, str_col_name, "Flash", false, 1)); - Int64 pk_value2 = 5; - blocks.push_back(DMTestEnv::prepareOneRowBlock(pk_value2, 10, 0, str_col_name, "hello", false, 1)); - blocks.push_back(DMTestEnv::prepareOneRowBlock(pk_value2, 20, 0, str_col_name, "world", false, 1)); - blocks.push_back(DMTestEnv::prepareOneRowBlock(pk_value2, 30, 1, str_col_name, "", false, 1)); - Int64 pk_value3 = 6; - blocks.push_back(DMTestEnv::prepareOneRowBlock(pk_value3, 10, 1, str_col_name, "hello", false, 1)); - } - - ColumnDefines columns = getColumnDefinesFromBlock(blocks.back()); - - { - auto in = genInputStream(blocks, columns, 40, false); - auto * mvcc_stream = typeid_cast *>(in.get()); - ASSERT_NE(mvcc_stream, nullptr); - UInt64 gc_hint_version = UINT64_MAX; - in->readPrefix(); - while (true) - { - Block block = in->read(); - if (!block) - break; - if (!block.rows()) - continue; - gc_hint_version = std::min(mvcc_stream->getGCHintVersion(), gc_hint_version); - } - ASSERT_EQ(mvcc_stream->getEffectiveNumRows(), (size_t)1); - ASSERT_EQ(mvcc_stream->getNotCleanRows(), (size_t)0); - ASSERT_EQ(gc_hint_version, (size_t)UINT64_MAX); - in->readSuffix(); - } - { - auto in = genInputStream(blocks, columns, 30, false); - auto * mvcc_stream = typeid_cast *>(in.get()); - ASSERT_NE(mvcc_stream, nullptr); - UInt64 gc_hint_version = UINT64_MAX; - in->readPrefix(); - while (true) - { - Block block = in->read(); - if (!block) - break; - if (!block.rows()) - continue; - gc_hint_version = std::min(mvcc_stream->getGCHintVersion(), gc_hint_version); - } - ASSERT_EQ(mvcc_stream->getEffectiveNumRows(), (size_t)2); - ASSERT_EQ(mvcc_stream->getNotCleanRows(), (size_t)2); - ASSERT_EQ(gc_hint_version, (size_t)30); - in->readSuffix(); - } - { - auto in = genInputStream(blocks, columns, 20, false); - auto * mvcc_stream = typeid_cast *>(in.get()); - ASSERT_NE(mvcc_stream, nullptr); - UInt64 gc_hint_version = UINT64_MAX; - in->readPrefix(); - while (true) - { - Block block = in->read(); - if (!block) - break; - if (!block.rows()) - continue; - gc_hint_version = std::min(mvcc_stream->getGCHintVersion(), gc_hint_version); - } - ASSERT_EQ(mvcc_stream->getEffectiveNumRows(), (size_t)2); - ASSERT_EQ(mvcc_stream->getNotCleanRows(), (size_t)4); - ASSERT_EQ(gc_hint_version, (size_t)30); - in->readSuffix(); - } - { - auto in = genInputStream(blocks, columns, 10, false); - auto * mvcc_stream = typeid_cast *>(in.get()); - ASSERT_NE(mvcc_stream, nullptr); - UInt64 gc_hint_version = UINT64_MAX; - in->readPrefix(); - while (true) - { - Block block = in->read(); - if (!block) - break; - if (!block.rows()) - continue; - gc_hint_version = std::min(mvcc_stream->getGCHintVersion(), gc_hint_version); - } - ASSERT_EQ(mvcc_stream->getEffectiveNumRows(), (size_t)3); - ASSERT_EQ(mvcc_stream->getNotCleanRows(), (size_t)7); - ASSERT_EQ(gc_hint_version, (size_t)10); - in->readSuffix(); - } -} - -TEST(VersionFilter_test, CompactCommonHandle) -{ - // TODO: currently it just test data statistics, add test for data correctness - BlocksList blocks; - - { - Int64 pk_value = 4; - blocks.push_back(DMTestEnv::prepareOneRowBlock(pk_value, 10, 0, str_col_name, "hello", true, 2)); - blocks.push_back(DMTestEnv::prepareOneRowBlock(pk_value, 20, 0, str_col_name, "world", true, 2)); - blocks.push_back(DMTestEnv::prepareOneRowBlock(pk_value, 30, 1, str_col_name, "", true, 2)); - blocks.push_back(DMTestEnv::prepareOneRowBlock(pk_value, 40, 0, str_col_name, "Flash", true, 2)); - Int64 pk_value2 = 5; - blocks.push_back(DMTestEnv::prepareOneRowBlock(pk_value2, 10, 0, str_col_name, "hello", true, 2)); - blocks.push_back(DMTestEnv::prepareOneRowBlock(pk_value2, 20, 0, str_col_name, "world", true, 2)); - blocks.push_back(DMTestEnv::prepareOneRowBlock(pk_value2, 30, 1, str_col_name, "", true, 2)); - Int64 pk_value3 = 6; - blocks.push_back(DMTestEnv::prepareOneRowBlock(pk_value3, 10, 1, str_col_name, "hello", true, 2)); - } - - ColumnDefines columns = getColumnDefinesFromBlock(blocks.back()); - - { - auto in = genInputStream(blocks, columns, 40, true); - auto * mvcc_stream = typeid_cast *>(in.get()); - ASSERT_NE(mvcc_stream, nullptr); - UInt64 gc_hint_version = UINT64_MAX; - in->readPrefix(); - while (true) - { - Block block = in->read(); - if (!block) - break; - if (!block.rows()) - continue; - gc_hint_version = std::min(mvcc_stream->getGCHintVersion(), gc_hint_version); - } - ASSERT_EQ(mvcc_stream->getEffectiveNumRows(), (size_t)1); - ASSERT_EQ(mvcc_stream->getNotCleanRows(), (size_t)0); - ASSERT_EQ(gc_hint_version, (size_t)UINT64_MAX); - in->readSuffix(); - } - { - auto in = genInputStream(blocks, columns, 30, true); - auto * mvcc_stream = typeid_cast *>(in.get()); - ASSERT_NE(mvcc_stream, nullptr); - UInt64 gc_hint_version = UINT64_MAX; - in->readPrefix(); - while (true) - { - Block block = in->read(); - if (!block) - break; - if (!block.rows()) - continue; - gc_hint_version = std::min(mvcc_stream->getGCHintVersion(), gc_hint_version); - } - ASSERT_EQ(mvcc_stream->getEffectiveNumRows(), (size_t)2); - ASSERT_EQ(mvcc_stream->getNotCleanRows(), (size_t)2); - ASSERT_EQ(gc_hint_version, (size_t)30); - in->readSuffix(); - } - { - auto in = genInputStream(blocks, columns, 20, true); - auto * mvcc_stream = typeid_cast *>(in.get()); - ASSERT_NE(mvcc_stream, nullptr); - UInt64 gc_hint_version = UINT64_MAX; - in->readPrefix(); - while (true) - { - Block block = in->read(); - if (!block) - break; - if (!block.rows()) - continue; - gc_hint_version = std::min(mvcc_stream->getGCHintVersion(), gc_hint_version); - } - ASSERT_EQ(mvcc_stream->getEffectiveNumRows(), (size_t)2); - ASSERT_EQ(mvcc_stream->getNotCleanRows(), (size_t)4); - ASSERT_EQ(gc_hint_version, (size_t)30); - in->readSuffix(); - } - { - auto in = genInputStream(blocks, columns, 10, true); - auto * mvcc_stream = typeid_cast *>(in.get()); - ASSERT_NE(mvcc_stream, nullptr); - UInt64 gc_hint_version = UINT64_MAX; - in->readPrefix(); - while (true) - { - Block block = in->read(); - if (!block) - break; - if (!block.rows()) - continue; - gc_hint_version = std::min(mvcc_stream->getGCHintVersion(), gc_hint_version); - } - ASSERT_EQ(mvcc_stream->getEffectiveNumRows(), (size_t)3); - ASSERT_EQ(mvcc_stream->getNotCleanRows(), (size_t)7); - ASSERT_EQ(gc_hint_version, (size_t)10); - in->readSuffix(); - } + // TODO fill this test } } // namespace tests diff --git a/dbms/src/Storages/GCManager.cpp b/dbms/src/Storages/GCManager.cpp deleted file mode 100644 index 47d9085f7a2..00000000000 --- a/dbms/src/Storages/GCManager.cpp +++ /dev/null @@ -1,84 +0,0 @@ -#include -#include - -namespace DB -{ -namespace ErrorCodes -{ -extern const int TABLE_IS_DROPPED; -} // namespace ErrorCodes - -bool GCManager::work() -{ - auto & global_settings = global_context.getSettingsRef(); - // TODO: remove this when `BackgroundProcessingPool` supports specify task running interval - if (gc_check_stop_watch.elapsedSeconds() < global_settings.dt_bg_gc_check_interval) - return false; - Int64 gc_segments_limit = global_settings.dt_bg_gc_max_segments_to_check_every_round; - if (gc_segments_limit <= 0) - { - gc_check_stop_watch.restart(); - return false; - } - LOG_INFO(log, "Start GC with table id: " << next_table_id); - // Get a storage snapshot with weak_ptrs first - // TODO: avoid gc on storage which have no data? - std::map> storages; - for (const auto & [table_id, storage] : global_context.getTMTContext().getStorages().getAllStorage()) - storages.emplace(table_id, storage); - auto iter = storages.begin(); - if (next_table_id != InvalidTableID) - iter = storages.lower_bound(next_table_id); - - UInt64 checked_storage_num = 0; - while (true) - { - // The TiFlash process receive a signal to terminate. - if (global_context.getTMTContext().getTerminated()) - break; - // All storages have been checked, stop here - if (checked_storage_num >= storages.size()) - break; - if (iter == storages.end()) - iter = storages.begin(); - checked_storage_num++; - auto storage = iter->second.lock(); - iter++; - // The storage has been free - if (!storage) - continue; - - try - { - TableLockHolder table_read_lock = storage->lockForShare(RWLock::NO_QUERY); - // Block this thread and do GC on the storage - // It is OK if any schema changes is apply to the storage while doing GC, so we - // do not acquire structure lock on the storage. - auto gc_segments_num = storage->onSyncGc(gc_segments_limit); - gc_segments_limit = gc_segments_limit - gc_segments_num; - LOG_TRACE(log, "GCManager gc " << gc_segments_num << " segments of table " << storage->getTableInfo().id); - // Reach the limit on the number of segments to be gc, stop here - if (gc_segments_limit <= 0) - break; - } - catch (DB::Exception & e) - { - // If the storage is physical dropped, just ignore and continue - if (e.code() != ErrorCodes::TABLE_IS_DROPPED) - tryLogCurrentException(__PRETTY_FUNCTION__); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } - if (iter == storages.end()) - iter = storages.begin(); - next_table_id = iter->first; - LOG_INFO(log, "End GC and next gc will start with table id: " << next_table_id); - gc_check_stop_watch.restart(); - // Always return false - return false; -} - -} // namespace DB diff --git a/dbms/src/Storages/GCManager.h b/dbms/src/Storages/GCManager.h deleted file mode 100644 index db809d76f7c..00000000000 --- a/dbms/src/Storages/GCManager.h +++ /dev/null @@ -1,26 +0,0 @@ -#pragma once - -#include -#include - -namespace DB -{ -class GCManager -{ -public: - GCManager(Context & context) : global_context{context.getGlobalContext()}, log(&Logger::get("GCManager")) {}; - - ~GCManager() = default; - - bool work(); - -private: - Context & global_context; - - TableID next_table_id = InvalidTableID; - - AtomicStopwatch gc_check_stop_watch; - - Logger * log; -}; -} // namespace DB diff --git a/dbms/src/Storages/IManageableStorage.h b/dbms/src/Storages/IManageableStorage.h index 1510aab1c26..08babb67ddf 100644 --- a/dbms/src/Storages/IManageableStorage.h +++ b/dbms/src/Storages/IManageableStorage.h @@ -55,9 +55,6 @@ class IManageableStorage : public IStorage virtual void deleteRows(const Context &, size_t /*rows*/) { throw Exception("Unsupported"); } - // `limit` is the max number of segments to gc, return value is the number of segments gced - virtual UInt64 onSyncGc(Int64 /*limit*/) { throw Exception("Unsupported"); } - virtual void mergeDelta(const Context &) { throw Exception("Unsupported"); } virtual BlockInputStreamPtr listSegments(const Context &) { throw Exception("Unsupported"); } diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index b109db31f7a..2dd5e9d1cda 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -668,12 +668,6 @@ void StorageDeltaMerge::ingestFiles( return store->ingestFiles(global_context, settings, range, file_ids, clear_data_in_range); } -UInt64 StorageDeltaMerge::onSyncGc(Int64 limit) -{ - store->onSyncGc(limit); - return 0; -} - size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM::RowKeyRange & range) { size_t rows = 0; diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index ce3a1e72635..991c64addaf 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -60,8 +60,6 @@ class StorageDeltaMerge : public ext::shared_ptr_helper, publ void ingestFiles( const DM::RowKeyRange & range, const std::vector & file_ids, bool clear_data_in_range, const Settings & settings); - UInt64 onSyncGc(Int64) override; - void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, diff --git a/dbms/src/Storages/Transaction/BackgroundService.cpp b/dbms/src/Storages/Transaction/BackgroundService.cpp index 61daf1b1a32..a7515f6c7e3 100644 --- a/dbms/src/Storages/Transaction/BackgroundService.cpp +++ b/dbms/src/Storages/Transaction/BackgroundService.cpp @@ -72,7 +72,6 @@ BackgroundService::BackgroundService(TMTContext & tmt_) else { LOG_INFO(log, "Configuration raft.disable_bg_flush is set to true, background flush tasks are disabled."); - storage_gc_handle = background_pool.addTask([this] { return tmt.getGCManager().work(); }, false); } } @@ -106,11 +105,6 @@ BackgroundService::~BackgroundService() background_pool.removeTask(region_handle); region_handle = nullptr; } - if (storage_gc_handle) - { - background_pool.removeTask(storage_gc_handle); - storage_gc_handle = nullptr; - } } } // namespace DB diff --git a/dbms/src/Storages/Transaction/BackgroundService.h b/dbms/src/Storages/Transaction/BackgroundService.h index 915a8dc396e..187b915666b 100644 --- a/dbms/src/Storages/Transaction/BackgroundService.h +++ b/dbms/src/Storages/Transaction/BackgroundService.h @@ -39,7 +39,6 @@ class BackgroundService : boost::noncopyable BackgroundProcessingPool::TaskHandle single_thread_task_handle; BackgroundProcessingPool::TaskHandle table_flush_handle; BackgroundProcessingPool::TaskHandle region_handle; - BackgroundProcessingPool::TaskHandle storage_gc_handle; }; } // namespace DB diff --git a/dbms/src/Storages/Transaction/PartitionStreams.cpp b/dbms/src/Storages/Transaction/PartitionStreams.cpp index 780039a9086..81e334a3222 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.cpp +++ b/dbms/src/Storages/Transaction/PartitionStreams.cpp @@ -606,7 +606,7 @@ Block GenRegionBlockDatawithSchema(const RegionPtr & region, Block res_block; // No committed data, just return if (!data_list_read) - return std::move(res_block); + return res_block; auto context = tmt.getContext(); auto metrics = context.getTiFlashMetrics(); @@ -638,7 +638,7 @@ Block GenRegionBlockDatawithSchema(const RegionPtr & region, // Remove committed data RemoveRegionCommitCache(region, *data_list_read); - return std::move(res_block); + return res_block; } } // namespace DB diff --git a/dbms/src/Storages/Transaction/TMTContext.cpp b/dbms/src/Storages/Transaction/TMTContext.cpp index b9ac900bd1e..cc13f49b8f1 100644 --- a/dbms/src/Storages/Transaction/TMTContext.cpp +++ b/dbms/src/Storages/Transaction/TMTContext.cpp @@ -19,7 +19,6 @@ TMTContext::TMTContext(Context & context_, const TiFlashRaftConfig & raft_config kvstore(std::make_shared(context, raft_config.snapshot_apply_method)), region_table(context), background_service(nullptr), - gc_manager(context), cluster(raft_config.pd_addrs.size() == 0 ? std::make_shared() : std::make_shared(raft_config.pd_addrs, cluster_config)), ignore_databases(raft_config.ignore_databases), @@ -56,7 +55,6 @@ BackgroundService & TMTContext::getBackgroundService() { return *background_serv const BackgroundService & TMTContext::getBackgroundService() const { return *background_service; } -GCManager & TMTContext::getGCManager() { return gc_manager; } Context & TMTContext::getContext() { return context; } diff --git a/dbms/src/Storages/Transaction/TMTContext.h b/dbms/src/Storages/Transaction/TMTContext.h index 87e3117f81d..658380dc430 100644 --- a/dbms/src/Storages/Transaction/TMTContext.h +++ b/dbms/src/Storages/Transaction/TMTContext.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include #include @@ -24,9 +23,6 @@ using BackGroundServicePtr = std::unique_ptr; class MPPTaskManager; using MPPTaskManagerPtr = std::shared_ptr; -class GCManager; -using GCManagerPtr = std::shared_ptr; - struct TiFlashRaftConfig; class TMTContext : private boost::noncopyable @@ -44,8 +40,6 @@ class TMTContext : private boost::noncopyable const BackgroundService & getBackgroundService() const; BackgroundService & getBackgroundService(); - GCManager & getGCManager(); - Context & getContext(); bool isInitialized() const; @@ -83,7 +77,6 @@ class TMTContext : private boost::noncopyable ManagedStorages storages; RegionTable region_table; BackGroundServicePtr background_service; - GCManager gc_manager; private: KVClusterPtr cluster; diff --git a/release-centos7/build/build-tiflash-ci.sh b/release-centos7/build/build-tiflash-ci.sh index 0c98d26b485..16cdca2ba01 100755 --- a/release-centos7/build/build-tiflash-ci.sh +++ b/release-centos7/build/build-tiflash-ci.sh @@ -39,13 +39,6 @@ if [ -d "$SRCPATH/contrib/tipb" ]; then cd - fi -if [ -d "$SRCPATH/dbms/src/Storages/DeltaMerge/File/dtpb" ]; then - cd "$SRCPATH/dbms/src/Storages/DeltaMerge/File/dtpb" - rm -rf cpp/dtpb - ./generate_cpp.sh - cd - -fi - rm -rf ${SRCPATH}/libs/libtiflash-proxy mkdir -p ${SRCPATH}/libs/libtiflash-proxy