diff --git a/be/src/cloud/cloud_base_compaction.cpp b/be/src/cloud/cloud_base_compaction.cpp index 963ce1fa587308..cb272a1f5e9ebd 100644 --- a/be/src/cloud/cloud_base_compaction.cpp +++ b/be/src/cloud/cloud_base_compaction.cpp @@ -415,7 +415,7 @@ Status CloudBaseCompaction::modify_rowsets() { // the tablet to be unable to synchronize the rowset meta changes generated by cumu compaction. cloud_tablet()->set_base_compaction_cnt(stats.base_compaction_cnt()); if (output_rowset_delete_bitmap) { - _tablet->tablet_meta()->delete_bitmap().merge(*output_rowset_delete_bitmap); + _tablet->tablet_meta()->delete_bitmap()->merge(*output_rowset_delete_bitmap); } if (stats.cumulative_compaction_cnt() >= cloud_tablet()->cumulative_compaction_cnt()) { cloud_tablet()->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(), diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index 39954097324a99..71234470298d48 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -376,7 +376,7 @@ Status CloudCumulativeCompaction::modify_rowsets() { cloud_tablet()->set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt()); cloud_tablet()->set_cumulative_layer_point(stats.cumulative_point()); if (output_rowset_delete_bitmap) { - _tablet->tablet_meta()->delete_bitmap().merge(*output_rowset_delete_bitmap); + _tablet->tablet_meta()->delete_bitmap()->merge(*output_rowset_delete_bitmap); } if (stats.base_compaction_cnt() >= cloud_tablet()->base_compaction_cnt()) { cloud_tablet()->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(), @@ -416,7 +416,7 @@ Status CloudCumulativeCompaction::process_old_version_delete_bitmap() { rowset->rowset_id().to_string(); DeleteBitmap::BitmapKey start {rowset->rowset_id(), seg_id, 0}; DeleteBitmap::BitmapKey end {rowset->rowset_id(), seg_id, pre_max_version}; - auto d = _tablet->tablet_meta()->delete_bitmap().get_agg( + auto d = _tablet->tablet_meta()->delete_bitmap()->get_agg( {rowset->rowset_id(), seg_id, pre_max_version}); to_remove_vec.emplace_back(std::make_tuple(_tablet->tablet_id(), start, end)); if (d->isEmpty()) { @@ -440,10 +440,10 @@ Status CloudCumulativeCompaction::process_old_version_delete_bitmap() { _input_rowsets.back()->end_version()); for (auto it = new_delete_bitmap->delete_bitmap.begin(); it != new_delete_bitmap->delete_bitmap.end(); it++) { - _tablet->tablet_meta()->delete_bitmap().set(it->first, it->second); + _tablet->tablet_meta()->delete_bitmap()->set(it->first, it->second); } - _tablet->tablet_meta()->delete_bitmap().add_to_remove_queue(version.to_string(), - to_remove_vec); + _tablet->tablet_meta()->delete_bitmap()->add_to_remove_queue(version.to_string(), + to_remove_vec); DBUG_EXECUTE_IF( "CloudCumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets", { static_cast(_tablet.get())->delete_expired_stale_rowsets(); }); diff --git a/be/src/cloud/cloud_full_compaction.cpp b/be/src/cloud/cloud_full_compaction.cpp index 6bfab2ec69808d..7358f6d19156a1 100644 --- a/be/src/cloud/cloud_full_compaction.cpp +++ b/be/src/cloud/cloud_full_compaction.cpp @@ -26,6 +26,7 @@ #include "common/status.h" #include "cpp/sync_point.h" #include "gen_cpp/cloud.pb.h" +#include "olap/base_tablet.h" #include "olap/compaction.h" #include "olap/rowset/beta_rowset.h" #include "olap/tablet_meta.h" @@ -273,7 +274,7 @@ Status CloudFullCompaction::modify_rowsets() { cloud_tablet()->set_base_compaction_cnt(stats.base_compaction_cnt()); cloud_tablet()->set_cumulative_layer_point(stats.cumulative_point()); if (output_rowset_delete_bitmap) { - _tablet->tablet_meta()->delete_bitmap().merge(*output_rowset_delete_bitmap); + _tablet->tablet_meta()->delete_bitmap()->merge(*output_rowset_delete_bitmap); } if (stats.cumulative_compaction_cnt() >= cloud_tablet()->cumulative_compaction_cnt()) { cloud_tablet()->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(), @@ -340,8 +341,9 @@ Status CloudFullCompaction::_cloud_full_compaction_update_delete_bitmap(int64_t int64_t max_version = cloud_tablet()->max_version().second; DCHECK(max_version >= _output_rowset->version().second); if (max_version > _output_rowset->version().second) { - RETURN_IF_ERROR(cloud_tablet()->capture_consistent_rowsets_unlocked( - {_output_rowset->version().second + 1, max_version}, &tmp_rowsets)); + auto ret = DORIS_TRY(cloud_tablet()->capture_consistent_rowsets_unlocked( + {_output_rowset->version().second + 1, max_version}, CaptureRowsetOps {})); + tmp_rowsets = std::move(ret.rowsets); } for (const auto& it : tmp_rowsets) { int64_t cur_version = it->rowset_meta()->start_version(); @@ -372,7 +374,7 @@ Status CloudFullCompaction::_cloud_full_compaction_update_delete_bitmap(int64_t .tag("input_segments", _input_segments) .tag("input_rowsets_total_size", _input_rowsets_total_size) .tag("update_bitmap_size", delete_bitmap->delete_bitmap.size()); - _tablet->tablet_meta()->delete_bitmap().merge(*delete_bitmap); + _tablet->tablet_meta()->delete_bitmap()->merge(*delete_bitmap); return Status::OK(); } diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index dff85b3f639258..684d89cb01bb44 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -577,7 +577,7 @@ Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet, .error(st); return st; } - tablet->tablet_meta()->delete_bitmap().merge(delete_bitmap); + tablet->tablet_meta()->delete_bitmap()->merge(delete_bitmap); if (config::enable_mow_verbose_log && !resp.rowset_meta().empty() && delete_bitmap.cardinality() > 0) { std::vector new_rowset_msgs; diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index a5707e51bb64a6..405dcbe1a0d3cc 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -27,6 +27,7 @@ #include "cloud/cloud_meta_mgr.h" #include "cloud/cloud_tablet_mgr.h" #include "common/status.h" +#include "olap/base_tablet.h" #include "olap/delete_handler.h" #include "olap/olap_define.h" #include "olap/rowset/beta_rowset.h" @@ -186,7 +187,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx(); reader_context.is_unique = _base_tablet->keys_type() == UNIQUE_KEYS; reader_context.batch_size = ALTER_TABLE_BATCH_SIZE; - reader_context.delete_bitmap = &_base_tablet->tablet_meta()->delete_bitmap(); + reader_context.delete_bitmap = _base_tablet->tablet_meta()->delete_bitmap(); reader_context.version = Version(0, start_resp.alter_version()); for (auto& split : rs_splits) { @@ -457,7 +458,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version, .tag("alter_version", alter_version); RETURN_IF_ERROR(_cloud_storage_engine.register_compaction_stop_token(_new_tablet, initiator)); TabletMetaSharedPtr tmp_meta = std::make_shared(*(_new_tablet->tablet_meta())); - tmp_meta->delete_bitmap().delete_bitmap.clear(); + tmp_meta->delete_bitmap()->delete_bitmap.clear(); std::shared_ptr tmp_tablet = std::make_shared(_cloud_storage_engine, tmp_meta); { @@ -466,22 +467,21 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version, } // step 1, process incremental rowset without delete bitmap update lock - std::vector incremental_rowsets; RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get())); int64_t max_version = tmp_tablet->max_version().second; LOG(INFO) << "alter table for mow table, calculate delete bitmap of " << "incremental rowsets without lock, version: " << start_calc_delete_bitmap_version << "-" << max_version << " new_table_id: " << _new_tablet->tablet_id(); if (max_version >= start_calc_delete_bitmap_version) { - RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked( - {start_calc_delete_bitmap_version, max_version}, &incremental_rowsets)); + auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked( + {start_calc_delete_bitmap_version, max_version}, CaptureRowsetOps {})); DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock", DBUG_BLOCK); { std::unique_lock wlock(tmp_tablet->get_header_lock()); tmp_tablet->add_rowsets(_output_rowsets, true, wlock); } - for (auto rowset : incremental_rowsets) { + for (auto rowset : ret.rowsets) { RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset)); } } @@ -497,15 +497,14 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version, LOG(INFO) << "alter table for mow table, calculate delete bitmap of " << "incremental rowsets with lock, version: " << max_version + 1 << "-" << new_max_version << " new_tablet_id: " << _new_tablet->tablet_id(); - std::vector new_incremental_rowsets; if (new_max_version > max_version) { - RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked( - {max_version + 1, new_max_version}, &new_incremental_rowsets)); + auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked( + {max_version + 1, new_max_version}, CaptureRowsetOps {})); { std::unique_lock wlock(tmp_tablet->get_header_lock()); tmp_tablet->add_rowsets(_output_rowsets, true, wlock); } - for (auto rowset : new_incremental_rowsets) { + for (auto rowset : ret.rowsets) { RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset)); } } @@ -522,13 +521,14 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version, } }); - auto& delete_bitmap = tmp_tablet->tablet_meta()->delete_bitmap(); + auto delete_bitmap = tmp_tablet->tablet_meta()->delete_bitmap(); // step4, store delete bitmap RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().update_delete_bitmap( - *_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator, &delete_bitmap)); + *_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator, delete_bitmap.get())); - _new_tablet->tablet_meta()->delete_bitmap() = delete_bitmap; + auto original_dbm = _new_tablet->tablet_meta()->delete_bitmap(); + *original_dbm = std::move(*delete_bitmap); return Status::OK(); } diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 582a4b771265d2..9b9ae527372296 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -37,6 +37,7 @@ #include "common/logging.h" #include "io/cache/block_file_cache_downloader.h" #include "io/cache/block_file_cache_factory.h" +#include "olap/base_tablet.h" #include "olap/compaction.h" #include "olap/cumulative_compaction_time_series_policy.h" #include "olap/olap_define.h" @@ -69,23 +70,6 @@ bool CloudTablet::exceed_version_limit(int32_t limit) { return _approximate_num_rowsets.load(std::memory_order_relaxed) > limit; } -Status CloudTablet::capture_consistent_rowsets_unlocked( - const Version& spec_version, std::vector* rowsets) const { - Versions version_path; - auto st = _timestamped_version_tracker.capture_consistent_versions(spec_version, &version_path); - if (!st.ok()) { - // Check no missed versions or req version is merged - auto missed_versions = get_missed_versions(spec_version.second); - if (missed_versions.empty()) { - st.set_code(VERSION_ALREADY_MERGED); // Reset error code - } - st.append(" tablet_id=" + std::to_string(tablet_id())); - return st; - } - VLOG_DEBUG << "capture consitent versions: " << version_path; - return _capture_consistent_rowsets_unlocked(version_path, rowsets); -} - std::string CloudTablet::tablet_path() const { return ""; } @@ -97,25 +81,10 @@ Status CloudTablet::capture_rs_readers(const Version& spec_version, LOG_WARNING("CloudTablet.capture_rs_readers.return e-230").tag("tablet_id", tablet_id()); return Status::Error(-230, "injected error"); }); - Versions version_path; std::shared_lock rlock(_meta_lock); - auto st = _timestamped_version_tracker.capture_consistent_versions(spec_version, &version_path); - if (!st.ok()) { - rlock.unlock(); // avoid logging in lock range - // Check no missed versions or req version is merged - auto missed_versions = get_missed_versions(spec_version.second); - if (missed_versions.empty()) { - st.set_code(VERSION_ALREADY_MERGED); // Reset error code - st.append(" versions are already compacted, "); - } - st.append(" tablet_id=" + std::to_string(tablet_id())); - // clang-format off - LOG(WARNING) << st << '\n' << [this]() { std::string json; get_compaction_status(&json); return json; }(); - // clang-format on - return st; - } - VLOG_DEBUG << "capture consitent versions: " << version_path; - return capture_rs_readers_unlocked(version_path, rs_splits); + *rs_splits = DORIS_TRY(capture_rs_readers_unlocked( + spec_version, CaptureRowsetOps {.skip_missing_versions = skip_missing_version})); + return Status::OK(); } Status CloudTablet::merge_rowsets_schema() { @@ -461,7 +430,7 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() { } _reconstruct_version_tracker_if_necessary(); } - _tablet_meta->delete_bitmap().remove_stale_delete_bitmap_from_queue(version_to_delete); + _tablet_meta->delete_bitmap()->remove_stale_delete_bitmap_from_queue(version_to_delete); recycle_cached_data(expired_rowsets); if (config::enable_mow_verbose_log) { LOG_INFO("finish delete_expired_stale_rowset for tablet={}", tablet_id()); @@ -967,7 +936,7 @@ Status CloudTablet::calc_delete_bitmap_for_compaction( std::size_t missed_rows_size = 0; calc_compaction_output_rowset_delete_bitmap( input_rowsets, rowid_conversion, 0, version.second + 1, missed_rows.get(), - location_map.get(), tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get()); + location_map.get(), *tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get()); if (missed_rows) { missed_rows_size = missed_rows->size(); if (!allow_delete_in_cumu_compaction) { @@ -1002,7 +971,7 @@ Status CloudTablet::calc_delete_bitmap_for_compaction( calc_compaction_output_rowset_delete_bitmap( input_rowsets, rowid_conversion, version.second, UINT64_MAX, missed_rows.get(), - location_map.get(), tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get()); + location_map.get(), *tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get()); int64_t t4 = MonotonicMicros(); if (location_map) { RETURN_IF_ERROR(check_rowid_conversion(output_rowset, *location_map)); diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index 2dd1d3c4425a3a..dc357eb7249c95 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -55,9 +55,6 @@ class CloudTablet final : public BaseTablet { Status capture_rs_readers(const Version& spec_version, std::vector* rs_splits, bool skip_missing_version) override; - Status capture_consistent_rowsets_unlocked( - const Version& spec_version, std::vector* rowsets) const override; - size_t tablet_footprint() override { return _approximate_data_size.load(std::memory_order_relaxed); } diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index cbf9a29ee907ff..71932bd70772b9 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -25,6 +25,7 @@ #include "cloud/cloud_storage_engine.h" #include "cloud/cloud_tablet.h" #include "cloud/config.h" +#include "common/logging.h" #include "common/status.h" #include "olap/lru_cache.h" #include "runtime/memory/cache_policy.h" @@ -154,7 +155,7 @@ void set_tablet_access_time_ms(CloudTablet* tablet) { Result> CloudTabletMgr::get_tablet(int64_t tablet_id, bool warmup_data, bool sync_delete_bitmap, SyncRowsetStats* sync_stats, - bool force_use_cache) { + bool local_only) { // LRU value type. `Value`'s lifetime MUST NOT be longer than `CloudTabletMgr` class Value : public LRUCacheValueBase { public: @@ -172,12 +173,17 @@ Result> CloudTabletMgr::get_tablet(int64_t tablet_i CacheKey key(tablet_id_str); auto* handle = _cache->lookup(key); - if (handle == nullptr && force_use_cache) { - return ResultError( - Status::InternalError("failed to get cloud tablet from cache {}", tablet_id)); - } - if (handle == nullptr) { + if (local_only) { + LOG(INFO) << "tablet=" << tablet_id + << "does not exists in local tablet cache, because param local_only=true, " + "treat it as an error"; + return ResultError(Status::InternalError( + "tablet={} does not exists in local tablet cache, because param " + "local_only=true, " + "treat it as an error", + tablet_id)); + } if (sync_stats) { ++sync_stats->tablet_meta_cache_miss; } @@ -475,7 +481,7 @@ void CloudTabletMgr::get_topn_tablet_delete_bitmap_score( auto t = tablet_wk.lock(); if (!t) return; uint64_t delete_bitmap_count = - t.get()->tablet_meta()->delete_bitmap().get_delete_bitmap_count(); + t.get()->tablet_meta()->delete_bitmap()->get_delete_bitmap_count(); total_delete_map_count += delete_bitmap_count; if (delete_bitmap_count > *max_delete_bitmap_score) { max_delete_bitmap_score_tablet_id = t->tablet_id(); diff --git a/be/src/cloud/cloud_tablet_mgr.h b/be/src/cloud/cloud_tablet_mgr.h index a1ce6d2b8cfb1b..2e0ea79f4244f0 100644 --- a/be/src/cloud/cloud_tablet_mgr.h +++ b/be/src/cloud/cloud_tablet_mgr.h @@ -47,7 +47,7 @@ class CloudTabletMgr { Result> get_tablet(int64_t tablet_id, bool warmup_data = false, bool sync_delete_bitmap = true, SyncRowsetStats* sync_stats = nullptr, - bool force_use_cache = false); + bool local_only = false); void erase_tablet(int64_t tablet_id); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 3df7058c12d1ce..9265f3634d6a64 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1511,6 +1511,7 @@ DEFINE_mBool(enable_compaction_pause_on_high_memory, "true"); DEFINE_mBool(enable_calc_delete_bitmap_between_segments_concurrently, "false"); +DEFINE_mBool(enable_fetch_rowsets_from_peer_replicas, "false"); // the max length of segments key bounds, in bytes // ATTENTION: as long as this conf has ever been enabled, cluster downgrade and backup recovery will no longer be supported. DEFINE_mInt32(segments_key_bounds_truncation_threshold, "-1"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 0fa93d0f8f8ef1..8aad438f73dabc 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1585,6 +1585,7 @@ DECLARE_mBool(enable_compaction_pause_on_high_memory); DECLARE_mBool(enable_calc_delete_bitmap_between_segments_concurrently); +DECLARE_mBool(enable_fetch_rowsets_from_peer_replicas); // the max length of segments key bounds, in bytes // ATTENTION: as long as this conf has ever been enabled, cluster downgrade and backup recovery will no longer be supported. DECLARE_mInt32(segments_key_bounds_truncation_threshold); diff --git a/be/src/http/action/delete_bitmap_action.cpp b/be/src/http/action/delete_bitmap_action.cpp index 59783d1c055535..5fc4d0f4388e78 100644 --- a/be/src/http/action/delete_bitmap_action.cpp +++ b/be/src/http/action/delete_bitmap_action.cpp @@ -159,7 +159,7 @@ Status DeleteBitmapAction::_handle_show_local_delete_bitmap_count(HttpRequest* r if (tablet == nullptr) { return Status::NotFound("Tablet not found. tablet_id={}", tablet_id); } - auto dm = tablet->tablet_meta()->delete_bitmap().snapshot(); + auto dm = tablet->tablet_meta()->delete_bitmap()->snapshot(); _show_delete_bitmap(dm, verbose, json_result); return Status::OK(); } @@ -183,7 +183,7 @@ Status DeleteBitmapAction::_handle_show_ms_delete_bitmap_count(HttpRequest* req, LOG(WARNING) << "failed to sync tablet=" << tablet_id << ", st=" << st; return st; } - auto dm = tablet->tablet_meta()->delete_bitmap().snapshot(); + auto dm = tablet->tablet_meta()->delete_bitmap()->snapshot(); _show_delete_bitmap(dm, verbose, json_result); return Status::OK(); } @@ -210,4 +210,4 @@ void DeleteBitmapAction::handle(HttpRequest* req) { } #include "common/compile_check_end.h" -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 1aa74582d84d9c..56ffc2d100e628 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -516,7 +516,7 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest if (!s.ok() && !s.is()) { return s; } - if (s.ok() && _tablet_meta->delete_bitmap().contains_agg_without_cache( + if (s.ok() && _tablet_meta->delete_bitmap()->contains_agg_without_cache( {loc.rowset_id, loc.segment_id, version}, loc.row_id)) { // if has sequence col, we continue to compare the sequence_id of // all rowsets, util we find an existing key. @@ -1114,37 +1114,6 @@ void BaseTablet::_rowset_ids_difference(const RowsetIdUnorderedSet& cur, } } -Status BaseTablet::_capture_consistent_rowsets_unlocked( - const std::vector& version_path, std::vector* rowsets) const { - DCHECK(rowsets != nullptr); - rowsets->reserve(version_path.size()); - for (const auto& version : version_path) { - bool is_find = false; - do { - auto it = _rs_version_map.find(version); - if (it != _rs_version_map.end()) { - is_find = true; - rowsets->push_back(it->second); - break; - } - - auto it_expired = _stale_rs_version_map.find(version); - if (it_expired != _stale_rs_version_map.end()) { - is_find = true; - rowsets->push_back(it_expired->second); - break; - } - } while (false); - - if (!is_find) { - return Status::Error( - "fail to find Rowset for version. tablet={}, version={}", tablet_id(), - version.to_string()); - } - } - return Status::OK(); -} - Status BaseTablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, int64_t max_version, int64_t txn_id, const RowsetIdUnorderedSet& rowset_ids, @@ -1564,7 +1533,7 @@ Status BaseTablet::update_delete_bitmap_without_lock( delete_bitmap->remove_sentinel_marks(); } for (auto& iter : delete_bitmap->delete_bitmap) { - self->_tablet_meta->delete_bitmap().merge( + self->_tablet_meta->delete_bitmap()->merge( {std::get<0>(iter.first), std::get<1>(iter.first), cur_version}, iter.second); } @@ -1723,7 +1692,7 @@ void BaseTablet::get_base_rowset_delete_bitmap_count( } base_found = true; uint64_t base_rowset_delete_bitmap_count = - this->tablet_meta()->delete_bitmap().get_count_with_range( + this->tablet_meta()->delete_bitmap()->get_count_with_range( {rowset->rowset_id(), 0, 0}, {rowset->rowset_id(), UINT32_MAX, UINT64_MAX}); if (base_rowset_delete_bitmap_count > *max_base_rowset_delete_bitmap_score) { @@ -1737,6 +1706,16 @@ void BaseTablet::get_base_rowset_delete_bitmap_count( } } +void TabletReadSource::fill_delete_predicates() { + DCHECK_EQ(delete_predicates.size(), 0); + auto delete_pred_view = + rs_splits | std::views::transform([](auto&& split) { + return split.rs_reader->rowset()->rowset_meta(); + }) | + std::views::filter([](const auto& rs_meta) { return rs_meta->has_delete_predicate(); }); + delete_predicates = {delete_pred_view.begin(), delete_pred_view.end()}; +} + int32_t BaseTablet::max_version_config() { int32_t max_version = tablet_meta()->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY ? config::time_series_max_tablet_version_num diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 9dd69d0bd9a20c..6c797e0478bcf7 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -42,6 +42,9 @@ class SegmentCacheHandle; class RowIdConversion; struct PartialUpdateInfo; class PartialUpdateReadPlan; +struct CaptureRowsetOps; +struct CaptureRowsetResult; +struct TabletReadSource; struct TabletWithVersion { BaseTabletSPtr tablet; @@ -107,9 +110,6 @@ class BaseTablet { virtual Result> create_rowset_writer(RowsetWriterContext& context, bool vertical) = 0; - virtual Status capture_consistent_rowsets_unlocked( - const Version& spec_version, std::vector* rowsets) const = 0; - virtual Status capture_rs_readers(const Version& spec_version, std::vector* rs_splits, bool skip_missing_version) = 0; @@ -309,6 +309,20 @@ class BaseTablet { return Status::OK(); } + [[nodiscard]] Result capture_consistent_rowsets_unlocked( + const Version& version_range, const CaptureRowsetOps& options) const; + + [[nodiscard]] Result> capture_consistent_versions_unlocked( + const Version& version_range, const CaptureRowsetOps& options) const; + + [[nodiscard]] Result> capture_rs_readers_unlocked( + const Version& version_range, const CaptureRowsetOps& options) const; + + [[nodiscard]] Result capture_read_source(const Version& version_range, + const CaptureRowsetOps& options); + + Result _remote_capture_rowsets(const Version& version_range) const; + protected: // Find the missed versions until the spec_version. // @@ -326,9 +340,6 @@ class BaseTablet { const RowsetIdUnorderedSet& pre, RowsetIdUnorderedSet* to_add, RowsetIdUnorderedSet* to_del); - Status _capture_consistent_rowsets_unlocked(const std::vector& version_path, - std::vector* rowsets) const; - Status sort_block(vectorized::Block& in_block, vectorized::Block& output_block); mutable std::shared_mutex _meta_lock; @@ -369,4 +380,24 @@ class BaseTablet { Status last_compaction_status = Status::OK(); }; +struct CaptureRowsetOps { + bool skip_missing_versions = false; + bool quiet = false; + bool include_stale_rowsets = true; + bool enable_fetch_rowsets_from_peers = false; +}; + +struct CaptureRowsetResult { + std::vector rowsets; + std::shared_ptr delete_bitmap; +}; + +struct TabletReadSource { + std::vector rs_splits; + std::vector delete_predicates; + std::shared_ptr delete_bitmap; + // Fill delete predicates with `rs_splits` + void fill_delete_predicates(); +}; + } /* namespace doris */ diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 7e312c5847f1dd..77af2b30fe1b34 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -1069,7 +1069,7 @@ Status CompactionMixin::modify_rowsets() { std::size_t missed_rows_size = 0; tablet()->calc_compaction_output_rowset_delete_bitmap( _input_rowsets, *_rowid_conversion, 0, version.second + 1, missed_rows.get(), - location_map.get(), _tablet->tablet_meta()->delete_bitmap(), + location_map.get(), *_tablet->tablet_meta()->delete_bitmap(), &output_rowset_delete_bitmap); if (missed_rows) { missed_rows_size = missed_rows->size(); @@ -1112,7 +1112,7 @@ Status CompactionMixin::modify_rowsets() { ss << ", debug info: "; DeleteBitmap subset_map(_tablet->tablet_id()); for (auto rs : _input_rowsets) { - _tablet->tablet_meta()->delete_bitmap().subset( + _tablet->tablet_meta()->delete_bitmap()->subset( {rs->rowset_id(), 0, 0}, {rs->rowset_id(), rs->num_segments(), version.second + 1}, &subset_map); @@ -1187,7 +1187,7 @@ Status CompactionMixin::modify_rowsets() { // incremental data. tablet()->calc_compaction_output_rowset_delete_bitmap( _input_rowsets, *_rowid_conversion, version.second, UINT64_MAX, - missed_rows.get(), location_map.get(), _tablet->tablet_meta()->delete_bitmap(), + missed_rows.get(), location_map.get(), *_tablet->tablet_meta()->delete_bitmap(), &output_rowset_delete_bitmap); if (missed_rows) { diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index 5cb2a6105d9d55..6389f1ad1334d5 100644 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -635,14 +635,14 @@ Status DataDir::load() { } ++dbm_cnt; auto seg_id = delete_bitmap_pb.segment_ids(i); - auto iter = tablet->tablet_meta()->delete_bitmap().delete_bitmap.find( + auto iter = tablet->tablet_meta()->delete_bitmap()->delete_bitmap.find( {rst_id, seg_id, version}); // This version of delete bitmap already exists - if (iter != tablet->tablet_meta()->delete_bitmap().delete_bitmap.end()) { + if (iter != tablet->tablet_meta()->delete_bitmap()->delete_bitmap.end()) { continue; } auto bitmap = delete_bitmap_pb.segment_delete_bitmaps(i).data(); - tablet->tablet_meta()->delete_bitmap().delete_bitmap[{rst_id, seg_id, version}] = + tablet->tablet_meta()->delete_bitmap()->delete_bitmap[{rst_id, seg_id, version}] = roaring::Roaring::read(bitmap); } return true; diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp index 06641e7a22c2a9..7d3c497f2a214b 100644 --- a/be/src/olap/full_compaction.cpp +++ b/be/src/olap/full_compaction.cpp @@ -27,6 +27,7 @@ #include "common/config.h" #include "common/status.h" +#include "olap/base_tablet.h" #include "olap/compaction.h" #include "olap/cumulative_compaction_policy.h" #include "olap/olap_common.h" @@ -143,8 +144,9 @@ Status FullCompaction::modify_rowsets() { int64_t max_version = tablet()->max_version().second; DCHECK(max_version >= _output_rowset->version().second); if (max_version > _output_rowset->version().second) { - RETURN_IF_ERROR(_tablet->capture_consistent_rowsets_unlocked( - {_output_rowset->version().second + 1, max_version}, &tmp_rowsets)); + auto ret = DORIS_TRY(_tablet->capture_consistent_rowsets_unlocked( + {_output_rowset->version().second + 1, max_version}, CaptureRowsetOps {})); + tmp_rowsets = std::move(ret.rowsets); } for (const auto& it : tmp_rowsets) { @@ -219,7 +221,7 @@ Status FullCompaction::_full_compaction_calc_delete_bitmap(const RowsetSharedPtr [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); for (const auto& [k, v] : delete_bitmap->delete_bitmap) { if (std::get<1>(k) != DeleteBitmap::INVALID_SEGMENT_ID) { - _tablet->tablet_meta()->delete_bitmap().merge( + _tablet->tablet_meta()->delete_bitmap()->merge( {std::get<0>(k), std::get<1>(k), cur_version}, v); } } diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index 01f36d77dc2cc5..c3f59872f62acb 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -68,7 +68,7 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type, reader_params.tablet = tablet; reader_params.reader_type = reader_type; - TabletReader::ReadSource read_source; + TabletReadSource read_source; read_source.rs_splits.reserve(src_rowset_readers.size()); for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) { read_source.rs_splits.emplace_back(rs_reader); @@ -87,7 +87,7 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type, } reader_params.tablet_schema = merge_tablet_schema; if (!tablet->tablet_schema()->cluster_key_idxes().empty()) { - reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap(); + reader_params.delete_bitmap = tablet->tablet_meta()->delete_bitmap(); } if (stats_output && stats_output->rowid_conversion) { @@ -249,7 +249,7 @@ Status Merger::vertical_compact_one_group( reader_params.tablet = tablet; reader_params.reader_type = reader_type; - TabletReader::ReadSource read_source; + TabletReadSource read_source; read_source.rs_splits.reserve(src_rowset_readers.size()); for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) { read_source.rs_splits.emplace_back(rs_reader); @@ -268,7 +268,7 @@ Status Merger::vertical_compact_one_group( reader_params.tablet_schema = merge_tablet_schema; if (!tablet->tablet_schema()->cluster_key_idxes().empty()) { - reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap(); + reader_params.delete_bitmap = tablet->tablet_meta()->delete_bitmap(); } if (is_key && stats_output && stats_output->rowid_conversion) { diff --git a/be/src/olap/parallel_scanner_builder.cpp b/be/src/olap/parallel_scanner_builder.cpp index 103e6341d7c8b3..f4cd210bbd205a 100644 --- a/be/src/olap/parallel_scanner_builder.cpp +++ b/be/src/olap/parallel_scanner_builder.cpp @@ -57,7 +57,7 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list& // `rs_splits` in `entire read source` will be devided into several partitial read sources // to build several parallel scanners, based on segment rows number. All the partitial read sources // share the same delete predicates from their corresponding entire read source. - TabletReader::ReadSource partitial_read_source; + TabletReadSource partitial_read_source; int64_t rows_collected = 0; for (auto& rs_split : entire_read_source.rs_splits) { auto reader = rs_split.rs_reader; @@ -106,10 +106,11 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list& partitial_read_source.rs_splits.emplace_back(std::move(split)); - scanners.emplace_back( - _build_scanner(tablet, version, _key_ranges, - {std::move(partitial_read_source.rs_splits), - entire_read_source.delete_predicates})); + scanners.emplace_back(_build_scanner( + tablet, version, _key_ranges, + {.rs_splits = std::move(partitial_read_source.rs_splits), + .delete_predicates = entire_read_source.delete_predicates, + .delete_bitmap = entire_read_source.delete_bitmap})); partitial_read_source = {}; split = RowSetSplits(reader->clone()); @@ -150,9 +151,11 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list& split.segment_offsets.second - split.segment_offsets.first); } #endif - scanners.emplace_back(_build_scanner(tablet, version, _key_ranges, - {std::move(partitial_read_source.rs_splits), - entire_read_source.delete_predicates})); + scanners.emplace_back( + _build_scanner(tablet, version, _key_ranges, + {.rs_splits = std::move(partitial_read_source.rs_splits), + .delete_predicates = entire_read_source.delete_predicates, + .delete_bitmap = entire_read_source.delete_bitmap})); } } @@ -199,7 +202,7 @@ Status ParallelScannerBuilder::_load() { std::shared_ptr ParallelScannerBuilder::_build_scanner( BaseTabletSPtr tablet, int64_t version, const std::vector& key_ranges, - TabletReader::ReadSource&& read_source) { + TabletReadSource&& read_source) { NewOlapScanner::Params params {_state, _scanner_profile.get(), key_ranges, std::move(tablet), version, std::move(read_source), _limit, _is_preaggregation}; return NewOlapScanner::create_shared(_parent, std::move(params)); diff --git a/be/src/olap/parallel_scanner_builder.h b/be/src/olap/parallel_scanner_builder.h index 1f371e3129a04f..a746ff5ba5d77d 100644 --- a/be/src/olap/parallel_scanner_builder.h +++ b/be/src/olap/parallel_scanner_builder.h @@ -44,7 +44,7 @@ class ParallelScannerBuilder { public: ParallelScannerBuilder(pipeline::OlapScanLocalState* parent, const std::vector& tablets, - std::vector& read_sources, + std::vector& read_sources, const std::shared_ptr& profile, const std::vector& key_ranges, RuntimeState* state, int64_t limit, bool is_dup_mow_key, bool is_preaggregation) @@ -71,7 +71,7 @@ class ParallelScannerBuilder { std::shared_ptr _build_scanner( BaseTabletSPtr tablet, int64_t version, const std::vector& key_ranges, - TabletReader::ReadSource&& read_source); + TabletReadSource&& read_source); pipeline::OlapScanLocalState* _parent; @@ -94,8 +94,8 @@ class ParallelScannerBuilder { bool _is_preaggregation; std::vector _tablets; std::vector _key_ranges; - std::unordered_map _all_read_sources; - std::vector& _read_sources; + std::unordered_map _all_read_sources; + std::vector& _read_sources; }; } // namespace doris diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h index fd3b4fed56f967..2faed632349c63 100644 --- a/be/src/olap/rowset/rowset_reader_context.h +++ b/be/src/olap/rowset/rowset_reader_context.h @@ -74,7 +74,7 @@ struct RowsetReaderContext { uint64_t* merged_rows = nullptr; // for unique key merge on write bool enable_unique_key_merge_on_write = false; - const DeleteBitmap* delete_bitmap = nullptr; + std::shared_ptr delete_bitmap = nullptr; bool record_rowids = false; RowIdConversion* rowid_conversion; bool is_vertical_compaction = false; diff --git a/be/src/olap/rowset_version_mgr.cpp b/be/src/olap/rowset_version_mgr.cpp new file mode 100644 index 00000000000000..27df2ede4b2dec --- /dev/null +++ b/be/src/olap/rowset_version_mgr.cpp @@ -0,0 +1,452 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "cloud/config.h" +#include "common/status.h" +#include "cpp/sync_point.h" +#include "olap/base_tablet.h" +#include "olap/olap_common.h" +#include "olap/rowset/rowset.h" +#include "olap/rowset/rowset_factory.h" +#include "olap/rowset/rowset_reader.h" +#include "runtime/client_cache.h" +#include "service/backend_options.h" +#include "service/internal_service.h" +#include "util/brpc_client_cache.h" +#include "util/debug_points.h" +#include "util/thrift_rpc_helper.h" +#include "util/time.h" + +namespace doris { + +using namespace ErrorCode; +using namespace std::ranges; + +static bvar::LatencyRecorder g_remote_fetch_tablet_rowsets_single_request_latency( + "remote_fetch_rowsets_single_rpc"); +static bvar::LatencyRecorder g_remote_fetch_tablet_rowsets_latency("remote_fetch_rowsets"); + +[[nodiscard]] Result> BaseTablet::capture_consistent_versions_unlocked( + const Version& version_range, const CaptureRowsetOps& options) const { + std::vector version_path; + auto st = + _timestamped_version_tracker.capture_consistent_versions(version_range, &version_path); + if (!st && !options.quiet) { + auto missed_versions = get_missed_versions_unlocked(version_range.second); + if (missed_versions.empty()) { + LOG(WARNING) << fmt::format( + "version already has been merged. version_range={}, max_version={}, " + "tablet_id={}", + version_range.to_string(), _tablet_meta->max_version().second, tablet_id()); + return ResultError(Status::Error( + "missed versions is empty, version_range={}, max_version={}, tablet_id={}", + version_range.to_string(), _tablet_meta->max_version().second, tablet_id())); + } + LOG(WARNING) << fmt::format("missed version for version_range={}, tablet_id={}, st={}", + version_range.to_string(), tablet_id(), st); + _print_missed_versions(missed_versions); + if (!options.skip_missing_versions) { + return ResultError(std::move(st)); + } + LOG(WARNING) << "force skipping missing version for tablet:" << tablet_id(); + } + DBUG_EXECUTE_IF("Tablet::capture_consistent_versions.inject_failure", { + auto tablet_id = dp->param("tablet_id", -1); + auto skip_by_option = dp->param("skip_by_option", false); + if (skip_by_option && !options.enable_fetch_rowsets_from_peers) { + return version_path; + } + if (tablet_id != -1 && (tablet_id == _tablet_meta->tablet_id()) || tablet_id == -2) { + return ResultError(Status::Error("version already merged")); + } + }); + return version_path; +} + +[[nodiscard]] Result BaseTablet::capture_consistent_rowsets_unlocked( + const Version& version_range, const CaptureRowsetOps& options) const { + CaptureRowsetResult result; + auto& rowsets = result.rowsets; + auto maybe_versions = capture_consistent_versions_unlocked(version_range, options); + if (maybe_versions) { + const auto& version_paths = maybe_versions.value(); + rowsets.reserve(version_paths.size()); + + auto rowset_for_version = [&](const Version& version, + bool include_stale) -> Result { + if (auto it = _rs_version_map.find(version); it != _rs_version_map.end()) { + return it->second; + } else { + VLOG_NOTICE << "fail to find Rowset in rs_version for version. tablet=" + << tablet_id() << ", version='" << version.first << "-" + << version.second; + } + if (include_stale) { + if (auto it = _stale_rs_version_map.find(version); + it != _stale_rs_version_map.end()) { + return it->second; + } else { + LOG(WARNING) << fmt::format( + "fail to find Rowset in stale_rs_version for version. tablet={}, " + "version={}-{}", + tablet_id(), version.first, version.second); + } + } + return ResultError(Status::Error( + "failed to find rowset for version={}", version.to_string())); + }; + + for (const auto& version : version_paths) { + auto ret = rowset_for_version(version, options.include_stale_rowsets); + if (!ret) { + return ResultError(std::move(ret.error())); + } + + rowsets.push_back(std::move(ret.value())); + } + if (keys_type() == KeysType::UNIQUE_KEYS && enable_unique_key_merge_on_write()) { + result.delete_bitmap = _tablet_meta->delete_bitmap(); + } + return result; + } + + if (!config::is_cloud_mode() || !options.enable_fetch_rowsets_from_peers) { + return ResultError(std::move(maybe_versions.error())); + } + auto ret = _remote_capture_rowsets(version_range); + if (!ret) { + auto st = Status::Error( + "version already merged, meet error during remote capturing rowsets, " + "error={}, version_range={}", + ret.error().to_string(), version_range.to_string()); + return ResultError(std::move(st)); + } + return ret; +} + +[[nodiscard]] Result> BaseTablet::capture_rs_readers_unlocked( + const Version& version_range, const CaptureRowsetOps& options) const { + auto maybe_rs_list = capture_consistent_rowsets_unlocked(version_range, options); + if (!maybe_rs_list) { + return ResultError(std::move(maybe_rs_list.error())); + } + const auto& rs_list = maybe_rs_list.value().rowsets; + std::vector rs_splits; + rs_splits.reserve(rs_list.size()); + for (const auto& rs : rs_list) { + RowsetReaderSharedPtr rs_reader; + auto st = rs->create_reader(&rs_reader); + if (!st) { + return ResultError(Status::Error( + "failed to create reader for rowset={}, reason={}", rs->rowset_id().to_string(), + st.to_string())); + } + rs_splits.emplace_back(std::move(rs_reader)); + } + return rs_splits; +} + +[[nodiscard]] Result BaseTablet::capture_read_source( + const Version& version_range, const CaptureRowsetOps& options) { + std::shared_lock rdlock(get_header_lock()); + auto maybe_result = capture_consistent_rowsets_unlocked(version_range, options); + if (!maybe_result) { + return ResultError(std::move(maybe_result.error())); + } + auto rowsets_result = std::move(maybe_result.value()); + TabletReadSource read_source; + read_source.delete_bitmap = std::move(rowsets_result.delete_bitmap); + const auto& rowsets = rowsets_result.rowsets; + read_source.rs_splits.reserve(rowsets.size()); + for (const auto& rs : rowsets) { + RowsetReaderSharedPtr rs_reader; + auto st = rs->create_reader(&rs_reader); + if (!st) { + return ResultError(Status::Error( + "failed to create reader for rowset={}, reason={}", rs->rowset_id().to_string(), + st.to_string())); + } + read_source.rs_splits.emplace_back(std::move(rs_reader)); + } + return read_source; +} + +template +bool call_bthread(bthread_t& th, const bthread_attr_t* attr, Fn&& fn, Args&&... args) { + auto p_wrap_fn = new auto([=] { fn(args...); }); + auto call_back = [](void* ar) -> void* { + auto f = reinterpret_cast(ar); + (*f)(); + delete f; + return nullptr; + }; + return bthread_start_background(&th, attr, call_back, p_wrap_fn) == 0; +} + +struct GetRowsetsCntl : std::enable_shared_from_this { + struct RemoteGetRowsetResult { + std::vector rowsets; + std::unique_ptr delete_bitmap; + }; + + Status start_req_bg() { + task_cnt = req_addrs.size(); + for (const auto& [ip, port] : req_addrs) { + bthread_t tid; + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + + bool succ = call_bthread(tid, &attr, [self = shared_from_this(), &ip, port]() { + LOG(INFO) << "start to get tablet rowsets from peer BE, ip=" << ip; + Defer defer_log {[&ip, port]() { + LOG(INFO) << "finish to get rowsets from peer BE, ip=" << ip + << ", port=" << port; + }}; + + PGetTabletRowsetsRequest req; + req.set_tablet_id(self->tablet_id); + req.set_version_start(self->version_range.first); + req.set_version_end(self->version_range.second); + if (self->delete_bitmap_keys.has_value()) { + req.mutable_delete_bitmap_keys()->CopyFrom(self->delete_bitmap_keys.value()); + } + brpc::Controller cntl; + cntl.set_timeout_ms(60000); + cntl.set_max_retry(3); + PGetTabletRowsetsResponse response; + auto start_tm_us = MonotonicMicros(); +#ifndef BE_TEST + std::shared_ptr stub = + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(ip, port); + if (stub == nullptr) { + self->result = ResultError(Status::InternalError( + "failed to fetch get_tablet_rowsets stub, ip={}, port={}", ip, port)); + return; + } + stub->get_tablet_rowsets(&cntl, &req, &response, nullptr); +#else + TEST_SYNC_POINT_CALLBACK("get_tablet_rowsets", &response); +#endif + g_remote_fetch_tablet_rowsets_single_request_latency + << MonotonicMicros() - start_tm_us; + + std::unique_lock l(self->butex); + if (self->done) { + return; + } + --self->task_cnt; + auto resp_st = Status::create(response.status()); + DBUG_EXECUTE_IF("GetRowsetCntl::start_req_bg.inject_failure", + { resp_st = Status::InternalError("inject error"); }); + if (cntl.Failed() || !resp_st) { + if (self->task_cnt != 0) { + return; + } + std::stringstream reason; + reason << "failed to get rowsets from all replicas, tablet_id=" + << self->tablet_id; + if (cntl.Failed()) { + reason << ", reason=[" << cntl.ErrorCode() << "] " << cntl.ErrorText(); + } else { + reason << ", reason=" << resp_st.to_string(); + } + self->result = ResultError(Status::InternalError(reason.str())); + self->done = true; + self->event.signal(); + return; + } + + Defer done_cb {[&]() { + self->done = true; + self->event.signal(); + }}; + std::vector rs_metas; + for (auto&& rs_pb : response.rowsets()) { + auto rs_meta = std::make_shared(); + if (!rs_meta->init_from_pb(rs_pb)) { + self->result = + ResultError(Status::InternalError("failed to init rowset from pb")); + return; + } + rs_metas.push_back(std::move(rs_meta)); + } + CaptureRowsetResult result; + self->result->rowsets = std::move(rs_metas); + + if (response.has_delete_bitmap()) { + self->result->delete_bitmap = std::make_unique( + DeleteBitmap::from_pb(response.delete_bitmap(), self->tablet_id)); + } + }); + + if (!succ) { + return Status::InternalError( + "failed to create bthread when request rowsets for tablet={}", tablet_id); + } + if (bthread_join(tid, nullptr) != 0) { + return Status::InternalError("failed to join bthread tid={}", tid); + } + } + return Status::OK(); + } + + Result wait_for_ret() { + event.wait(); + return std::move(result); + } + + int64_t tablet_id; + std::vector> req_addrs; + Version version_range; + std::optional delete_bitmap_keys = std::nullopt; + +private: + size_t task_cnt; + + bthread::Mutex butex; + bthread::CountdownEvent event {1}; + bool done = false; + + Result result; +}; + +Result>> get_peer_replicas_addresses( + const int64_t tablet_id) { + auto* cluster_info = ExecEnv::GetInstance()->cluster_info(); + DCHECK_NE(cluster_info, nullptr); + auto master_addr = cluster_info->master_fe_addr; + TGetTabletReplicaInfosRequest req; + req.tablet_ids.push_back(tablet_id); + TGetTabletReplicaInfosResult resp; + auto st = ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&](FrontendServiceConnection& client) { client->getTabletReplicaInfos(resp, req); }); + if (!st) { + return ResultError(Status::InternalError( + "failed to get tablet replica infos, rpc error={}, tablet_id={}", st.to_string(), + tablet_id)); + } + + auto it = resp.tablet_replica_infos.find(tablet_id); + if (it == resp.tablet_replica_infos.end()) { + return ResultError(Status::InternalError("replicas not found, tablet_id={}", tablet_id)); + } + auto replicas = it->second; + auto local_host = BackendOptions::get_localhost(); + bool include_local_host = false; + DBUG_EXECUTE_IF("get_peer_replicas_address.enable_local_host", { include_local_host = true; }); + auto ret_view = + replicas | std::views::filter([&local_host, include_local_host](const auto& replica) { + return local_host.find(replica.host) == std::string::npos || include_local_host; + }) | + std::views::transform([](auto& replica) { + return std::make_pair(std::move(replica.host), replica.brpc_port); + }); + return std::vector(ret_view.begin(), ret_view.end()); +} + +Result BaseTablet::_remote_capture_rowsets( + const Version& version_range) const { + auto start_tm_us = MonotonicMicros(); + Defer defer { + [&]() { g_remote_fetch_tablet_rowsets_latency << MonotonicMicros() - start_tm_us; }}; +#ifndef BE_TEST + auto maybe_be_addresses = get_peer_replicas_addresses(tablet_id()); +#else + Result>> maybe_be_addresses; + TEST_SYNC_POINT_CALLBACK("get_peer_replicas_addresses", &maybe_be_addresses); +#endif + DBUG_EXECUTE_IF("Tablet::_remote_get_rowsets_meta.inject_replica_address_fail", + { maybe_be_addresses = ResultError(Status::InternalError("inject failure")); }); + if (!maybe_be_addresses) { + return ResultError(std::move(maybe_be_addresses.error())); + } + auto be_addresses = std::move(maybe_be_addresses.value()); + if (be_addresses.empty()) { + LOG(WARNING) << "no peers replica for tablet=" << tablet_id(); + return ResultError(Status::InternalError("no replicas for tablet={}", tablet_id())); + } + + auto cntl = std::make_shared(); + cntl->tablet_id = tablet_id(); + cntl->req_addrs = std::move(be_addresses); + cntl->version_range = version_range; + bool is_mow = keys_type() == KeysType::UNIQUE_KEYS && enable_unique_key_merge_on_write(); + CaptureRowsetResult result; + if (is_mow) { + result.delete_bitmap = + std::make_unique(_tablet_meta->delete_bitmap()->snapshot()); + DeleteBitmapPB delete_bitmap_keys; + auto keyset = result.delete_bitmap->delete_bitmap | + std::views::transform([](const auto& kv) { return kv.first; }); + for (const auto& key : keyset) { + const auto& [rs_id, seg_id, version] = key; + delete_bitmap_keys.mutable_rowset_ids()->Add(rs_id.to_string()); + delete_bitmap_keys.mutable_segment_ids()->Add(seg_id); + delete_bitmap_keys.mutable_versions()->Add(version); + } + cntl->delete_bitmap_keys = std::move(delete_bitmap_keys); + } + + RETURN_IF_ERROR_RESULT(cntl->start_req_bg()); + auto maybe_meta = cntl->wait_for_ret(); + if (!maybe_meta) { + auto err = Status::InternalError( + "tried to get rowsets from peer replicas and failed, " + "reason={}", + maybe_meta.error()); + return ResultError(std::move(err)); + } + + auto& remote_meta = maybe_meta.value(); + const auto& rs_metas = remote_meta.rowsets; + for (const auto& rs_meta : rs_metas) { + RowsetSharedPtr rs; + auto st = RowsetFactory::create_rowset(_tablet_meta->tablet_schema(), {}, rs_meta, &rs); + if (!st) { + return ResultError(std::move(st)); + } + result.rowsets.push_back(std::move(rs)); + } + if (is_mow) { + DCHECK_NE(result.delete_bitmap, nullptr); + result.delete_bitmap->merge(*remote_meta.delete_bitmap); + } + return result; +} + +} // namespace doris diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index ae285a94aff1d7..7fd6e96730ee14 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -496,12 +496,12 @@ Status LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader, RowsetWr // copy delete bitmap to new tablet. if (new_tablet->keys_type() == UNIQUE_KEYS && new_tablet->enable_unique_key_merge_on_write()) { DeleteBitmap origin_delete_bitmap(base_tablet->tablet_id()); - base_tablet->tablet_meta()->delete_bitmap().subset( + base_tablet->tablet_meta()->delete_bitmap()->subset( {rowset_reader->rowset()->rowset_id(), 0, 0}, {rowset_reader->rowset()->rowset_id(), UINT32_MAX, INT64_MAX}, &origin_delete_bitmap); for (auto& iter : origin_delete_bitmap.delete_bitmap) { - int ret = new_tablet->tablet_meta()->delete_bitmap().set( + int ret = new_tablet->tablet_meta()->delete_bitmap()->set( {rowset_writer->rowset_id(), std::get<1>(iter.first), std::get<2>(iter.first)}, iter.second); DCHECK(ret == 1); @@ -958,7 +958,7 @@ Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& reques reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx(); reader_context.is_unique = _base_tablet->keys_type() == UNIQUE_KEYS; reader_context.batch_size = ALTER_TABLE_BATCH_SIZE; - reader_context.delete_bitmap = &_base_tablet->tablet_meta()->delete_bitmap(); + reader_context.delete_bitmap = _base_tablet->tablet_meta()->delete_bitmap(); reader_context.version = Version(0, end_version); for (auto& rs_split : rs_splits) { res = rs_split.rs_reader->init(&reader_context); @@ -1073,10 +1073,8 @@ Status SchemaChangeJob::_get_versions_to_be_changed(std::vector* versio _base_tablet->tablet_id()); } *max_rowset = rowset; - - RETURN_IF_ERROR(_base_tablet->capture_consistent_versions_unlocked( - Version(0, rowset->version().second), versions_to_be_changed, false, false)); - + *versions_to_be_changed = DORIS_TRY(_base_tablet->capture_consistent_versions_unlocked( + Version(0, rowset->version().second), {})); return Status::OK(); } @@ -1478,8 +1476,9 @@ Status SchemaChangeJob::_calc_delete_bitmap_for_mow_table(int64_t alter_version) << "double write rowsets for version: " << alter_version + 1 << "-" << max_version << " new_tablet=" << _new_tablet->tablet_id(); std::shared_lock rlock(_new_tablet->get_header_lock()); - RETURN_IF_ERROR(_new_tablet->capture_consistent_rowsets_unlocked( - {alter_version + 1, max_version}, &rowsets)); + auto ret = DORIS_TRY(_new_tablet->capture_consistent_rowsets_unlocked( + {alter_version + 1, max_version}, CaptureRowsetOps {})); + rowsets = std::move(ret.rowsets); } for (auto rowset_ptr : rowsets) { std::lock_guard rwlock(_new_tablet->get_rowset_update_lock()); @@ -1497,8 +1496,9 @@ Status SchemaChangeJob::_calc_delete_bitmap_for_mow_table(int64_t alter_version) LOG(INFO) << "alter table for unique with merge-on-write, calculate delete bitmap of " << "incremental rowsets for version: " << max_version + 1 << "-" << new_max_version << " new_tablet=" << _new_tablet->tablet_id(); - RETURN_IF_ERROR(_new_tablet->capture_consistent_rowsets_unlocked( - {max_version + 1, new_max_version}, &rowsets)); + auto ret = DORIS_TRY(_new_tablet->capture_consistent_rowsets_unlocked( + {max_version + 1, new_max_version}, CaptureRowsetOps {})); + rowsets = std::move(ret.rowsets); } for (auto&& rowset_ptr : rowsets) { RETURN_IF_ERROR(Tablet::update_delete_bitmap_without_lock(_new_tablet, rowset_ptr)); diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index a59ed36bb827cc..6d2c956ac04445 100644 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -40,6 +40,7 @@ #include "common/logging.h" #include "common/status.h" #include "io/fs/local_file_system.h" +#include "olap/base_tablet.h" #include "olap/data_dir.h" #include "olap/olap_common.h" #include "olap/olap_define.h" @@ -567,13 +568,23 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet res = check_version_continuity(consistent_rowsets); if (res.ok() && max_cooldowned_version < version) { // Pick consistent rowsets of remaining required version - res = ref_tablet->capture_consistent_rowsets_unlocked( - {max_cooldowned_version + 1, version}, &consistent_rowsets); + auto ret = ref_tablet->capture_consistent_rowsets_unlocked( + {max_cooldowned_version + 1, version}, CaptureRowsetOps {}); + if (ret) { + consistent_rowsets = std::move(ret->rowsets); + } else { + res = std::move(ret.error()); + } } } else { // get shortest version path - res = ref_tablet->capture_consistent_rowsets_unlocked(Version(0, version), - &consistent_rowsets); + auto ret = ref_tablet->capture_consistent_rowsets_unlocked(Version(0, version), + CaptureRowsetOps {}); + if (ret) { + consistent_rowsets = std::move(ret->rowsets); + } else { + res = std::move(ret.error()); + } } if (!res.ok()) { LOG(WARNING) << "fail to select versions to span. res=" << res; @@ -594,7 +605,7 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet if (ref_tablet->keys_type() == UNIQUE_KEYS && ref_tablet->enable_unique_key_merge_on_write()) { delete_bitmap_snapshot = - ref_tablet->tablet_meta()->delete_bitmap().snapshot(version); + ref_tablet->tablet_meta()->delete_bitmap()->snapshot(version); } } diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 5c4770e3a3344f..403438f1dadad3 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -391,12 +391,14 @@ Status Tablet::revise_tablet_meta(const std::vector& to_add, } if (calc_delete_bitmap_ver.first <= calc_delete_bitmap_ver.second) { - calc_bm_status = capture_consistent_rowsets_unlocked(calc_delete_bitmap_ver, - &calc_delete_bitmap_rowsets); - if (!calc_bm_status.ok()) { - LOG(WARNING) << "fail to capture_consistent_rowsets, res: " << calc_bm_status; + auto ret = capture_consistent_rowsets_unlocked(calc_delete_bitmap_ver, + CaptureRowsetOps {}); + if (!ret) { + LOG(WARNING) << "fail to capture_consistent_rowsets, res: " << ret.error(); + calc_bm_status = std::move(ret.error()); break; } + calc_delete_bitmap_rowsets = std::move(ret->rowsets); // FIXME(plat1ko): Use `const TabletSharedPtr&` as parameter auto self = _engine.tablet_manager()->get_tablet(tablet_id()); CHECK(self); @@ -451,17 +453,16 @@ Status Tablet::revise_tablet_meta(const std::vector& to_add, // that we can capture by version if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) { Version full_version = Version(0, max_version_unlocked()); - std::vector expected_rowsets; - auto st = capture_consistent_rowsets_unlocked(full_version, &expected_rowsets); - DCHECK(st.ok()) << st; - DCHECK_EQ(base_rowsets_for_full_clone.size(), expected_rowsets.size()); - if (st.ok() && base_rowsets_for_full_clone.size() != expected_rowsets.size()) - [[unlikely]] { + auto ret = capture_consistent_rowsets_unlocked(full_version, CaptureRowsetOps {}); + DCHECK(ret) << ret.error(); + DCHECK_EQ(base_rowsets_for_full_clone.size(), ret->rowsets.size()); + + if (ret && base_rowsets_for_full_clone.size() != ret->rowsets.size()) [[unlikely]] { LOG(WARNING) << "full clone succeeded, but the count(" << base_rowsets_for_full_clone.size() << ") of base rowsets used for delete bitmap calculation is not match " "expect count(" - << expected_rowsets.size() << ") we capture from tablet meta"; + << ret->rowsets.size() << ") we capture from tablet meta"; } } } @@ -747,10 +748,9 @@ void Tablet::delete_expired_stale_rowset() { Version test_version = Version(0, lastest_delta->end_version()); stale_version_path_map[*path_id_iter] = version_path; - Status status = - capture_consistent_versions_unlocked(test_version, nullptr, false, false); + auto ret = capture_consistent_versions_unlocked(test_version, {}); // 1. When there is no consistent versions, we must reconstruct the tracker. - if (!status.ok()) { + if (!ret) { // 2. fetch missing version after delete Versions after_missed_versions = get_missed_versions_unlocked(lastest_delta->end_version()); @@ -865,51 +865,11 @@ void Tablet::delete_expired_stale_rowset() { { _engine.start_delete_unused_rowset(); }); } -Status Tablet::capture_consistent_versions_unlocked(const Version& spec_version, - Versions* version_path, - bool skip_missing_version, bool quiet) const { - Status status = - _timestamped_version_tracker.capture_consistent_versions(spec_version, version_path); - if (!status.ok() && !quiet) { - Versions missed_versions = get_missed_versions_unlocked(spec_version.second); - if (missed_versions.empty()) { - // if version_path is null, it may be a compaction check logic. - // so to avoid print too many logs. - if (version_path != nullptr) { - LOG(WARNING) << "tablet:" << tablet_id() - << ", version already has been merged. spec_version: " << spec_version - << ", max_version: " << max_version_unlocked(); - } - status = Status::Error( - "versions are already compacted, spec_version " - "{}, max_version {}, tablet_id {}", - spec_version.second, max_version_unlocked(), tablet_id()); - } else { - if (version_path != nullptr) { - LOG(WARNING) << "status:" << status << ", tablet:" << tablet_id() - << ", missed version for version:" << spec_version; - _print_missed_versions(missed_versions); - if (skip_missing_version) { - LOG(WARNING) << "force skipping missing version for tablet:" << tablet_id(); - return Status::OK(); - } - } - } - } - - DBUG_EXECUTE_IF("TTablet::capture_consistent_versions.inject_failure", { - auto tablet_id = dp->param("tablet_id", -1); - if (tablet_id != -1 && tablet_id == _tablet_meta->tablet_id()) { - status = Status::Error("version already merged"); - } - }); - - return status; -} - Status Tablet::check_version_integrity(const Version& version, bool quiet) { std::shared_lock rdlock(_meta_lock); - return capture_consistent_versions_unlocked(version, nullptr, false, quiet); + [[maybe_unused]] auto _versions = DORIS_TRY( + capture_consistent_versions_unlocked(version, CaptureRowsetOps {.quiet = quiet})); + return Status::OK(); } bool Tablet::exceed_version_limit(int32_t limit) { @@ -939,22 +899,12 @@ void Tablet::acquire_version_and_rowsets( } } -Status Tablet::capture_consistent_rowsets_unlocked(const Version& spec_version, - std::vector* rowsets) const { - std::vector version_path; - RETURN_IF_ERROR( - capture_consistent_versions_unlocked(spec_version, &version_path, false, false)); - RETURN_IF_ERROR(_capture_consistent_rowsets_unlocked(version_path, rowsets)); - return Status::OK(); -} - Status Tablet::capture_rs_readers(const Version& spec_version, std::vector* rs_splits, bool skip_missing_version) { std::shared_lock rlock(_meta_lock); std::vector version_path; - RETURN_IF_ERROR(capture_consistent_versions_unlocked(spec_version, &version_path, - skip_missing_version, false)); - RETURN_IF_ERROR(capture_rs_readers_unlocked(version_path, rs_splits)); + *rs_splits = DORIS_TRY(capture_rs_readers_unlocked( + spec_version, CaptureRowsetOps {.skip_missing_versions = skip_missing_version})); return Status::OK(); } @@ -2514,8 +2464,8 @@ Status Tablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id, for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) { // skip sentinel mark, which is used for delete bitmap correctness check if (std::get<1>(key) != DeleteBitmap::INVALID_SEGMENT_ID) { - _tablet_meta->delete_bitmap().merge({std::get<0>(key), std::get<1>(key), cur_version}, - bitmap); + _tablet_meta->delete_bitmap()->merge({std::get<0>(key), std::get<1>(key), cur_version}, + bitmap); } } @@ -2523,7 +2473,7 @@ Status Tablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id, } void Tablet::merge_delete_bitmap(const DeleteBitmap& delete_bitmap) { - _tablet_meta->delete_bitmap().merge(delete_bitmap); + _tablet_meta->delete_bitmap()->merge(delete_bitmap); } bool Tablet::check_all_rowset_segment() { diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 467fc51f98517c..a8c9df89ff0889 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -180,21 +180,12 @@ class Tablet final : public BaseTablet { /// need to delete flag. void delete_expired_stale_rowset(); - // Given spec_version, find a continuous version path and store it in version_path. - // If quiet is true, then only "does this path exist" is returned. - // If skip_missing_version is true, return ok even there are missing versions. - Status capture_consistent_versions_unlocked(const Version& spec_version, Versions* version_path, - bool skip_missing_version, bool quiet) const; - // if quiet is true, no error log will be printed if there are missing versions Status check_version_integrity(const Version& version, bool quiet = false); bool check_version_exist(const Version& version) const; void acquire_version_and_rowsets( std::vector>* version_rowsets) const; - Status capture_consistent_rowsets_unlocked( - const Version& spec_version, std::vector* rowsets) const override; - // If skip_missing_version is true, skip versions if they are missing. Status capture_rs_readers(const Version& spec_version, std::vector* rs_splits, bool skip_missing_version) override; diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index f74cf2bf1f670e..3e7d48e44af97e 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -1764,7 +1764,7 @@ void TabletManager::get_topn_tablet_delete_bitmap_score( buf.reserve(n + 1); auto handler = [&](const TabletSharedPtr& tablet) { uint64_t delete_bitmap_count = - tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count(); + tablet->tablet_meta()->delete_bitmap()->get_delete_bitmap_count(); total_delete_map_count += delete_bitmap_count; if (delete_bitmap_count > *max_delete_bitmap_score) { max_delete_bitmap_score_tablet_id = tablet->tablet_id(); diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 3a0ff3419ee09c..c1dbc8a93948df 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -455,12 +456,12 @@ void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn& tco void TabletMeta::remove_rowset_delete_bitmap(const RowsetId& rowset_id, const Version& version) { if (_enable_unique_key_merge_on_write) { - delete_bitmap().remove({rowset_id, 0, 0}, {rowset_id, UINT32_MAX, 0}); + delete_bitmap()->remove({rowset_id, 0, 0}, {rowset_id, UINT32_MAX, 0}); if (config::enable_mow_verbose_log) { LOG_INFO("delete rowset delete bitmap. tablet={}, rowset={}, version={}", tablet_id(), rowset_id.to_string(), version.to_string()); } - size_t rowset_cache_version_size = delete_bitmap().remove_rowset_cache_version(rowset_id); + size_t rowset_cache_version_size = delete_bitmap()->remove_rowset_cache_version(rowset_id); _check_mow_rowset_cache_version_size(rowset_cache_version_size); } } @@ -705,7 +706,7 @@ void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) { auto seg_id = tablet_meta_pb.delete_bitmap().segment_ids(i); auto ver = tablet_meta_pb.delete_bitmap().versions(i); auto bitmap = tablet_meta_pb.delete_bitmap().segment_delete_bitmaps(i).data(); - delete_bitmap().delete_bitmap[{rst_id, seg_id, ver}] = roaring::Roaring::read(bitmap); + delete_bitmap()->delete_bitmap[{rst_id, seg_id, ver}] = roaring::Roaring::read(bitmap); } } @@ -789,7 +790,7 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) { stale_rs_ids.insert(rowset->rowset_id()); } DeleteBitmapPB* delete_bitmap_pb = tablet_meta_pb->mutable_delete_bitmap(); - for (auto& [id, bitmap] : delete_bitmap().snapshot().delete_bitmap) { + for (auto& [id, bitmap] : delete_bitmap()->snapshot().delete_bitmap) { auto& [rowset_id, segment_id, ver] = id; if (stale_rs_ids.count(rowset_id) != 0) { continue; @@ -1121,6 +1122,35 @@ DeleteBitmap& DeleteBitmap::operator=(DeleteBitmap&& o) { return *this; } +DeleteBitmap DeleteBitmap::from_pb(const DeleteBitmapPB& pb, int64_t tablet_id) { + size_t len = pb.rowset_ids().size(); + DCHECK_EQ(len, pb.segment_ids().size()); + DCHECK_EQ(len, pb.versions().size()); + DeleteBitmap delete_bitmap(tablet_id); + for (int32_t i = 0; i < len; ++i) { + RowsetId rs_id; + rs_id.init(pb.rowset_ids(i)); + BitmapKey key = {rs_id, pb.segment_ids(i), pb.versions(i)}; + delete_bitmap.delete_bitmap[key] = + roaring::Roaring::read(pb.segment_delete_bitmaps(i).data()); + } + return delete_bitmap; +} + +DeleteBitmapPB DeleteBitmap::to_pb() { + std::shared_lock l(lock); + DeleteBitmapPB ret; + for (const auto& [k, v] : delete_bitmap) { + ret.mutable_rowset_ids()->Add(std::get<0>(k).to_string()); + ret.mutable_segment_ids()->Add(std::get<1>(k)); + ret.mutable_versions()->Add(std::get<2>(k)); + std::string bitmap_data(v.getSizeInBytes(), '\0'); + v.write(bitmap_data.data()); + ret.mutable_segment_delete_bitmaps()->Add(std::move(bitmap_data)); + } + return ret; +} + DeleteBitmap DeleteBitmap::snapshot() const { std::shared_lock l(lock); return DeleteBitmap(*this); @@ -1508,6 +1538,22 @@ std::shared_ptr DeleteBitmap::get_agg_without_cache(const Bitm return bitmap; } +DeleteBitmap DeleteBitmap::diffset(const std::set& key_set) const { + std::shared_lock l(lock); + auto diff_key_set_view = + delete_bitmap | std::ranges::views::transform([](const auto& kv) { return kv.first; }) | + std::ranges::views::filter( + [&key_set](const auto& key) { return !key_set.contains(key); }); + + DeleteBitmap dbm(_tablet_id); + for (const auto& key : diff_key_set_view) { + const auto* bitmap = get(key); + DCHECK_NE(bitmap, nullptr); + dbm.delete_bitmap[key] = *bitmap; + } + return dbm; +} + std::atomic DeleteBitmap::AggCache::s_repr {nullptr}; std::string tablet_state_name(TabletState state) { diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 388ddc439dc31e..63ad0124f862a6 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -241,7 +241,7 @@ class TabletMeta : public MetadataAdder { static void init_column_from_tcolumn(uint32_t unique_id, const TColumn& tcolumn, ColumnPB* column); - DeleteBitmap& delete_bitmap() { return *_delete_bitmap; } + std::shared_ptr delete_bitmap() { return _delete_bitmap; } void remove_rowset_delete_bitmap(const RowsetId& rowset_id, const Version& version); bool enable_unique_key_merge_on_write() const { return _enable_unique_key_merge_on_write; } @@ -417,6 +417,10 @@ class DeleteBitmap { DeleteBitmap(DeleteBitmap&& r); DeleteBitmap& operator=(DeleteBitmap&& r); + static DeleteBitmap from_pb(const DeleteBitmapPB& pb, int64_t tablet_id); + + DeleteBitmapPB to_pb(); + /** * Makes a snapshot of delete bitmap, read lock will be acquired in this * process @@ -566,6 +570,14 @@ class DeleteBitmap { std::set get_rowset_cache_version(); + /** + * Calculate diffset with given `key_set`. All entries with keys contained in this delete bitmap but not + * in given key_set will be added to the output delete bitmap. + * + * @return Deletebitmap containning all entries in diffset + */ + DeleteBitmap diffset(const std::set& key_set) const; + class AggCachePolicy : public LRUCachePolicy { public: AggCachePolicy(size_t capacity) diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp index a0c3a9c5eda95b..fbcacb32870ce4 100644 --- a/be/src/olap/tablet_reader.cpp +++ b/be/src/olap/tablet_reader.cpp @@ -100,16 +100,6 @@ std::string TabletReader::KeysParam::to_string() const { return ss.str(); } -void TabletReader::ReadSource::fill_delete_predicates() { - DCHECK_EQ(delete_predicates.size(), 0); - for (auto&& split : rs_splits) { - auto& rs_meta = split.rs_reader->rowset()->rowset_meta(); - if (rs_meta->has_delete_predicate()) { - delete_predicates.push_back(rs_meta); - } - } -} - TabletReader::~TabletReader() { for (auto* pred : _col_predicates) { delete pred; @@ -657,7 +647,7 @@ Status TabletReader::init_reader_params_and_create_block( reader_params->version = Version(input_rowsets.front()->start_version(), input_rowsets.back()->end_version()); - ReadSource read_source; + TabletReadSource read_source; for (const auto& rowset : input_rowsets) { RowsetReaderSharedPtr rs_reader; RETURN_IF_ERROR(rowset->create_reader(&rs_reader)); @@ -679,9 +669,6 @@ Status TabletReader::init_reader_params_and_create_block( merge_tablet_schema->merge_dropped_columns(*del_pred->tablet_schema()); } reader_params->tablet_schema = merge_tablet_schema; - if (tablet->enable_unique_key_merge_on_write()) { - reader_params->delete_bitmap = &tablet->tablet_meta()->delete_bitmap(); - } reader_params->return_columns.resize(read_tablet_schema->num_columns()); std::iota(reader_params->return_columns.begin(), reader_params->return_columns.end(), 0); diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h index 87af3bb08eb36e..d5aac0b89b5211 100644 --- a/be/src/olap/tablet_reader.h +++ b/be/src/olap/tablet_reader.h @@ -34,6 +34,7 @@ #include "exprs/function_filter.h" #include "gutil/strings/substitute.h" #include "io/io_common.h" +#include "olap/base_tablet.h" #include "olap/delete_handler.h" #include "olap/iterators.h" #include "olap/olap_common.h" @@ -91,12 +92,6 @@ class TabletReader { }; public: - struct ReadSource { - std::vector rs_splits; - std::vector delete_predicates; - // Fill delete predicates with `rs_splits` - void fill_delete_predicates(); - }; // Params for Reader, // mainly include tablet, data version and fetch range. struct ReaderParams { @@ -117,9 +112,12 @@ class TabletReader { return BeExecVersionManager::get_newest_version(); } - void set_read_source(ReadSource read_source) { + void set_read_source(TabletReadSource read_source, bool skip_delete_bitmap = false) { rs_splits = std::move(read_source.rs_splits); delete_predicates = std::move(read_source.delete_predicates); + if (tablet->enable_unique_key_merge_on_write() && !skip_delete_bitmap) { + delete_bitmap = std::move(read_source.delete_bitmap); + } } BaseTabletSPtr tablet; @@ -148,7 +146,7 @@ class TabletReader { std::vector rs_splits; // For unique key table with merge-on-write - DeleteBitmap* delete_bitmap = nullptr; + std::shared_ptr delete_bitmap = nullptr; // return_columns is init from query schema std::vector return_columns; diff --git a/be/src/olap/task/engine_checksum_task.cpp b/be/src/olap/task/engine_checksum_task.cpp index 05ecfc0401b6d0..c6cf69d54d9eef 100644 --- a/be/src/olap/task/engine_checksum_task.cpp +++ b/be/src/olap/task/engine_checksum_task.cpp @@ -24,6 +24,7 @@ #include #include "io/io_common.h" +#include "olap/base_tablet.h" #include "olap/olap_common.h" #include "olap/olap_define.h" #include "olap/rowset/rowset.h" @@ -81,13 +82,15 @@ Status EngineChecksumTask::_compute_checksum() { vectorized::Block block; { std::shared_lock rdlock(tablet->get_header_lock()); - Status acquire_reader_st = - tablet->capture_consistent_rowsets_unlocked(version, &input_rowsets); - if (!acquire_reader_st.ok()) { + auto ret = tablet->capture_consistent_rowsets_unlocked(version, CaptureRowsetOps {}); + if (ret) { + input_rowsets = std::move(ret->rowsets); + } else { LOG(WARNING) << "fail to captute consistent rowsets. tablet=" << tablet->tablet_id() - << "res=" << acquire_reader_st; - return acquire_reader_st; + << "res=" << ret.error(); + return std::move(ret.error()); } + RETURN_IF_ERROR(TabletReader::init_reader_params_and_create_block( tablet, ReaderType::READER_CHECKSUM, input_rowsets, &reader_params, &block)); } diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index ecf1bdfc6d5c7d..6a9e66f1d383cd 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -984,7 +984,7 @@ Status EngineCloneTask::_finish_full_clone(Tablet* tablet, } } if (tablet->enable_unique_key_merge_on_write()) { - tablet->tablet_meta()->delete_bitmap().merge(cloned_tablet_meta->delete_bitmap()); + tablet->tablet_meta()->delete_bitmap()->merge(*cloned_tablet_meta->delete_bitmap()); } return tablet->revise_tablet_meta(to_add, to_delete, false); // TODO(plat1ko): write cooldown meta to remote if this replica is cooldown replica diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp index 210aa6a8c56f08..d9352a0ea9d0bb 100644 --- a/be/src/olap/task/engine_storage_migration_task.cpp +++ b/be/src/olap/task/engine_storage_migration_task.cpp @@ -32,8 +32,10 @@ #include "common/config.h" #include "common/logging.h" +#include "common/status.h" #include "gutil/strings/numbers.h" #include "io/fs/local_file_system.h" +#include "olap/base_tablet.h" #include "olap/data_dir.h" #include "olap/olap_common.h" #include "olap/olap_define.h" @@ -87,8 +89,10 @@ Status EngineStorageMigrationTask::_get_versions(int32_t start_version, int32_t* << ", start_version=" << start_version << ", end_version=" << *end_version; return Status::OK(); } - return _tablet->capture_consistent_rowsets_unlocked(Version(start_version, *end_version), - consistent_rowsets); + auto ret = DORIS_TRY(_tablet->capture_consistent_rowsets_unlocked( + Version(start_version, *end_version), CaptureRowsetOps {})); + *consistent_rowsets = std::move(ret.rowsets); + return Status::OK(); } bool EngineStorageMigrationTask::_is_timeout() { @@ -354,7 +358,7 @@ void EngineStorageMigrationTask::_generate_new_header( } new_tablet_meta->revise_rs_metas(std::move(rs_metas)); if (_tablet->keys_type() == UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) { - DeleteBitmap bm = _tablet->tablet_meta()->delete_bitmap().snapshot(end_version); + DeleteBitmap bm = _tablet->tablet_meta()->delete_bitmap()->snapshot(end_version); new_tablet_meta->revise_delete_bitmap_unlocked(bm); } new_tablet_meta->set_shard_id(new_shard); diff --git a/be/src/olap/task/index_builder.cpp b/be/src/olap/task/index_builder.cpp index 9f72056af8d1c9..ffb226bee58da9 100644 --- a/be/src/olap/task/index_builder.cpp +++ b/be/src/olap/task/index_builder.cpp @@ -822,7 +822,7 @@ Status IndexBuilder::modify_rowsets(const Merger::Statistics* stats) { for (auto i = 0; i < _input_rowsets.size(); ++i) { RowsetId input_rowset_id = _input_rowsets[i]->rowset_id(); RowsetId output_rowset_id = _output_rowsets[i]->rowset_id(); - for (const auto& [k, v] : _tablet->tablet_meta()->delete_bitmap().delete_bitmap) { + for (const auto& [k, v] : _tablet->tablet_meta()->delete_bitmap()->delete_bitmap) { RowsetId rs_id = std::get<0>(k); if (rs_id == input_rowset_id) { DeleteBitmap::BitmapKey output_rs_key = {output_rowset_id, std::get<1>(k), @@ -832,7 +832,7 @@ Status IndexBuilder::modify_rowsets(const Merger::Statistics* stats) { } } } - _tablet->tablet_meta()->delete_bitmap().merge(*delete_bitmap); + _tablet->tablet_meta()->delete_bitmap()->merge(*delete_bitmap); // modify_rowsets will remove the delete_bitmap for input rowsets, // should call it after merge delete_bitmap diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 53a7d8cd0f3490..13d2044cc2ab6c 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -27,6 +27,7 @@ #include "cloud/cloud_tablet.h" #include "cloud/cloud_tablet_hotspot.h" #include "cloud/config.h" +#include "common/config.h" #include "olap/parallel_scanner_builder.h" #include "olap/storage_engine.h" #include "olap/tablet_manager.h" @@ -528,9 +529,11 @@ Status OlapScanLocalState::hold_tablets() { } for (size_t i = 0; i < _scan_ranges.size(); i++) { - RETURN_IF_ERROR(_tablets[i].tablet->capture_rs_readers( - {0, _tablets[i].version}, &_read_sources[i].rs_splits, - RuntimeFilterConsumer::_state->skip_missing_version())); + _read_sources[i] = DORIS_TRY(_tablets[i].tablet->capture_read_source( + {0, _tablets[i].version}, + {.skip_missing_versions = RuntimeFilterConsumer::_state->skip_missing_version(), + .enable_fetch_rowsets_from_peers = + config::enable_fetch_rowsets_from_peer_replicas})); if (!PipelineXLocalState<>::_state->skip_delete_predicate()) { _read_sources[i].fill_delete_predicates(); } diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index 24a1b1b876a354..e1eea0c7822f88 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -229,7 +229,7 @@ class OlapScanLocalState final : public ScanLocalState { std::mutex _profile_mtx; std::vector _tablets; - std::vector _read_sources; + std::vector _read_sources; }; class OlapScanOperatorX final : public ScanOperatorX { diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 423c62c6c4b409..6d5d9055fc3a3e 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -53,6 +53,9 @@ #include #include +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet_mgr.h" +#include "cloud/config.h" #include "common/config.h" #include "common/consts.h" #include "common/exception.h" @@ -162,6 +165,8 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_active_threads, MetricUnit: DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_max_queue_size, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_max_threads, MetricUnit::NOUNIT); +static bvar::LatencyRecorder g_process_remote_fetch_rowsets_latency("process_remote_fetch_rowsets"); + bthread_key_t btls_key; static void thread_context_deleter(void* d) { @@ -2200,4 +2205,66 @@ void PInternalService::get_be_resource(google::protobuf::RpcController* controll } } +void PInternalService::get_tablet_rowsets(google::protobuf::RpcController* controller, + const PGetTabletRowsetsRequest* request, + PGetTabletRowsetsResponse* response, + google::protobuf::Closure* done) { + DCHECK(config::is_cloud_mode()); + auto start_time = GetMonoTimeMicros(); + Defer defer { + [&]() { g_process_remote_fetch_rowsets_latency << GetMonoTimeMicros() - start_time; }}; + brpc::ClosureGuard closure_guard(done); + LOG(INFO) << "process get tablet rowsets, request=" << request->ShortDebugString(); + if (!request->has_tablet_id() || !request->has_version_start() || !request->has_version_end()) { + Status::InvalidArgument("missing params tablet/version_start/version_end") + .to_protobuf(response->mutable_status()); + return; + } + CloudStorageEngine& storage = ExecEnv::GetInstance()->storage_engine().to_cloud(); + + auto maybe_tablet = + storage.tablet_mgr().get_tablet(request->tablet_id(), /*warmup data*/ false, + /*syn_delete_bitmap*/ false, /*delete_bitmap*/ nullptr, + /*local_only*/ true); + if (!maybe_tablet) { + maybe_tablet.error().to_protobuf(response->mutable_status()); + return; + } + auto tablet = maybe_tablet.value(); + Result ret; + { + std::shared_lock l(tablet->get_header_lock()); + ret = tablet->capture_consistent_rowsets_unlocked( + {request->version_start(), request->version_end()}, + CaptureRowsetOps {.enable_fetch_rowsets_from_peers = false}); + } + if (!ret) { + ret.error().to_protobuf(response->mutable_status()); + return; + } + auto rowsets = std::move(ret.value().rowsets); + for (const auto& rs : rowsets) { + RowsetMetaPB meta; + rs->rowset_meta()->to_rowset_pb(&meta); + response->mutable_rowsets()->Add(std::move(meta)); + } + if (request->has_delete_bitmap_keys()) { + DCHECK(tablet->enable_unique_key_merge_on_write()); + auto delete_bitmap = std::move(ret.value().delete_bitmap); + auto keys_pb = request->delete_bitmap_keys(); + size_t len = keys_pb.rowset_ids().size(); + DCHECK_EQ(len, keys_pb.segment_ids().size()); + DCHECK_EQ(len, keys_pb.versions().size()); + std::set keys; + for (size_t i = 0; i < len; ++i) { + RowsetId rs_id; + rs_id.init(keys_pb.rowset_ids(i)); + keys.emplace(rs_id, keys_pb.segment_ids(i), keys_pb.versions(i)); + } + auto diffset = delete_bitmap->diffset(keys).to_pb(); + *response->mutable_delete_bitmap() = std::move(diffset); + } + Status::OK().to_protobuf(response->mutable_status()); +} + } // namespace doris diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 66a0f867393793..7262f4eed2da80 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -240,6 +240,11 @@ class PInternalService : public PBackendService { const PGetBeResourceRequest* request, PGetBeResourceResponse* response, google::protobuf::Closure* done) override; + void get_tablet_rowsets(google::protobuf::RpcController* controller, + const PGetTabletRowsetsRequest* request, + PGetTabletRowsetsResponse* response, + google::protobuf::Closure* done) override; + private: void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index e9c199074ecc4f..c8008c9852583b 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -68,7 +68,7 @@ namespace doris::vectorized { -using ReadSource = TabletReader::ReadSource; +using ReadSource = TabletReadSource; NewOlapScanner::NewOlapScanner(pipeline::ScanLocalStateBase* parent, NewOlapScanner::Params&& params) @@ -97,7 +97,8 @@ NewOlapScanner::NewOlapScanner(pipeline::ScanLocalStateBase* parent, .filter_block_conjuncts {}, .key_group_cluster_key_idxes {}, }) { - _tablet_reader_params.set_read_source(std::move(params.read_source)); + _tablet_reader_params.set_read_source(std::move(params.read_source), + _state->skip_delete_bitmap()); _is_init = false; } @@ -193,12 +194,14 @@ Status NewOlapScanner::init() { ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet); } - auto st = tablet->capture_rs_readers(_tablet_reader_params.version, - &read_source.rs_splits, - _state->skip_missing_version()); - if (!st.ok()) { - LOG(WARNING) << "fail to init reader.res=" << st; - return st; + auto maybe_read_source = tablet->capture_read_source( + _tablet_reader_params.version, + {.skip_missing_versions = _state->skip_missing_version(), + .enable_fetch_rowsets_from_peers = + config::enable_fetch_rowsets_from_peer_replicas}); + if (!maybe_read_source) { + LOG(WARNING) << "fail to init reader. res=" << maybe_read_source.error(); + return maybe_read_source.error(); } if (config::enable_mow_verbose_log && tablet->enable_unique_key_merge_on_write()) { LOG_INFO("finish capture_rs_readers for tablet={}, query_id={}", @@ -309,7 +312,6 @@ Status NewOlapScanner::_init_tablet_reader_params( std::inserter(_tablet_reader_params.function_filters, _tablet_reader_params.function_filters.begin())); - auto& tablet = _tablet_reader_params.tablet; auto& tablet_schema = _tablet_reader_params.tablet_schema; // Merge the columns in delete predicate that not in latest schema in to current tablet schema for (auto& del_pred : _tablet_reader_params.delete_predicates) { @@ -369,10 +371,6 @@ Status NewOlapScanner::_init_tablet_reader_params( _tablet_reader_params.use_page_cache = _state->enable_page_cache(); - if (tablet->enable_unique_key_merge_on_write() && !_state->skip_delete_bitmap()) { - _tablet_reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap(); - } - DBUG_EXECUTE_IF("NewOlapScanner::_init_tablet_reader_params.block", DBUG_BLOCK); if (!_state->skip_storage_engine_merge()) { diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h index fd1246b120ba77..a997ae2adf5af8 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.h +++ b/be/src/vec/exec/scan/new_olap_scanner.h @@ -62,7 +62,7 @@ class NewOlapScanner : public VScanner { std::vector key_ranges; BaseTabletSPtr tablet; int64_t version; - TabletReader::ReadSource read_source; + TabletReadSource read_source; int64_t limit; bool aggregation; }; diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index 7f6aadd6070cce..d3e9115baf1239 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -996,7 +996,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) { StorageReadOptions opts; opts.stats = &stats; opts.tablet_schema = rowset1->tablet_schema(); - opts.delete_bitmap.emplace(0, tablet->tablet_meta()->delete_bitmap().get_agg( + opts.delete_bitmap.emplace(0, tablet->tablet_meta()->delete_bitmap()->get_agg( {rowset1->rowset_id(), 0, cur_version})); std::unique_ptr iter; std::shared_ptr schema = std::make_shared(rowset1->tablet_schema()); @@ -1024,7 +1024,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) { StorageReadOptions opts; opts.stats = &stats; opts.tablet_schema = rowset2->tablet_schema(); - opts.delete_bitmap.emplace(0, tablet->tablet_meta()->delete_bitmap().get_agg( + opts.delete_bitmap.emplace(0, tablet->tablet_meta()->delete_bitmap()->get_agg( {rowset2->rowset_id(), 0, cur_version})); std::unique_ptr iter; std::shared_ptr schema = std::make_shared(rowset2->tablet_schema()); diff --git a/be/test/olap/segcompaction_mow_test.cpp b/be/test/olap/segcompaction_mow_test.cpp index 62a3232889dede..efe40dcb859bca 100644 --- a/be/test/olap/segcompaction_mow_test.cpp +++ b/be/test/olap/segcompaction_mow_test.cpp @@ -237,7 +237,7 @@ class SegCompactionMoWTest : public ::testing::TestWithParam { std::vector return_columns = {0, 1, 2}; reader_context.return_columns = &return_columns; reader_context.stats = &_stats; - reader_context.delete_bitmap = delete_bitmap.get(); + reader_context.delete_bitmap = delete_bitmap; std::vector segment_num_rows; Status s; diff --git a/be/test/olap/tablet_meta_test.cpp b/be/test/olap/tablet_meta_test.cpp index b85c63ef714e8e..b350e5e00616a5 100644 --- a/be/test/olap/tablet_meta_test.cpp +++ b/be/test/olap/tablet_meta_test.cpp @@ -73,23 +73,23 @@ TEST(TabletMetaTest, TestReviseMeta) { } ASSERT_EQ(4, tablet_meta.all_rs_metas().size()); - tablet_meta.delete_bitmap().add({rsids[0], 1, 1}, 1); - tablet_meta.delete_bitmap().add({rsids[1], 0, 2}, 2); - tablet_meta.delete_bitmap().add({rsids[2], 1, 1}, 1); - tablet_meta.delete_bitmap().add({rsids[3], 0, 2}, 3); - tablet_meta.delete_bitmap().add({rsids[3], 0, 4}, 4); - ASSERT_EQ(5, tablet_meta.delete_bitmap().delete_bitmap.size()); + tablet_meta.delete_bitmap()->add({rsids[0], 1, 1}, 1); + tablet_meta.delete_bitmap()->add({rsids[1], 0, 2}, 2); + tablet_meta.delete_bitmap()->add({rsids[2], 1, 1}, 1); + tablet_meta.delete_bitmap()->add({rsids[3], 0, 2}, 3); + tablet_meta.delete_bitmap()->add({rsids[3], 0, 4}, 4); + ASSERT_EQ(5, tablet_meta.delete_bitmap()->delete_bitmap.size()); std::vector new_rowsets; new_rowsets.push_back(src_rowsets[2]->rowset_meta()); new_rowsets.push_back(src_rowsets[3]->rowset_meta()); tablet_meta.revise_rs_metas(std::move(new_rowsets)); // Take a snapshot with max_version=3. - DeleteBitmap snap = tablet_meta.delete_bitmap().snapshot(3); + DeleteBitmap snap = tablet_meta.delete_bitmap()->snapshot(3); tablet_meta.revise_delete_bitmap_unlocked(snap); ASSERT_EQ(2, tablet_meta.all_rs_metas().size()); - ASSERT_EQ(2, tablet_meta.delete_bitmap().delete_bitmap.size()); - for (auto entry : tablet_meta.delete_bitmap().delete_bitmap) { + ASSERT_EQ(2, tablet_meta.delete_bitmap()->delete_bitmap.size()); + for (auto entry : tablet_meta.delete_bitmap()->delete_bitmap) { RowsetId rsid = std::get<0>(entry.first); ASSERT_TRUE(rsid == rsids[2] || rsid == rsids[3]); int64_t version = std::get<2>(entry.first); diff --git a/be/test/olap/test_data/rowset_meta.json b/be/test/olap/test_data/rowset_meta.json index d446e2a34e971d..4fe585978aa620 100644 --- a/be/test/olap/test_data/rowset_meta.json +++ b/be/test/olap/test_data/rowset_meta.json @@ -12,6 +12,10 @@ "data_disk_size": 0, "index_disk_size": 0, "empty": true, + "load_id": { + "hi": 0, + "lo": 0 + }, "creation_time": 1552911435, "tablet_uid": { "hi": 10, diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java index 358fc1023b297a..fce12193e3571f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java @@ -602,6 +602,22 @@ public void clearClusterToBe(String cluster) { secondaryClusterToBackends.remove(cluster); } + public List getAllPrimaryBes() { + List result = new ArrayList(); + primaryClusterToBackends.keySet().forEach(clusterId -> { + List backendIds = primaryClusterToBackends.get(clusterId); + if (backendIds == null || backendIds.isEmpty()) { + return; + } + Long beId = backendIds.get(0); + if (beId != -1) { + Backend backend = Env.getCurrentSystemInfo().getBackend(beId); + result.add(backend); + } + }); + return result; + } + // ATTN: This func is only used by redundant tablet report clean in bes. // Only the master node will do the diff logic, // so just only need to clean up secondaryClusterToBackends on the master node. diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 92cfe98e535f5f..ddb49a0ee0ab7e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -48,6 +48,7 @@ import org.apache.doris.catalog.TabletMeta; import org.apache.doris.catalog.View; import org.apache.doris.cloud.catalog.CloudPartition; +import org.apache.doris.cloud.catalog.CloudReplica; import org.apache.doris.cloud.catalog.CloudTablet; import org.apache.doris.cloud.proto.Cloud.CommitTxnResponse; import org.apache.doris.cluster.ClusterNamespace; @@ -2756,15 +2757,24 @@ public TGetTabletReplicaInfosResult getTabletReplicaInfos(TGetTabletReplicaInfos LOG.warn("replica {} not normal", replica.getId()); continue; } - Backend backend = Env.getCurrentSystemInfo().getBackend(replica.getBackendIdWithoutException()); - if (backend != null) { - TReplicaInfo replicaInfo = new TReplicaInfo(); - replicaInfo.setHost(backend.getHost()); - replicaInfo.setBePort(backend.getBePort()); - replicaInfo.setHttpPort(backend.getHttpPort()); - replicaInfo.setBrpcPort(backend.getBrpcPort()); - replicaInfo.setReplicaId(replica.getId()); - replicaInfos.add(replicaInfo); + List backends; + if (Config.isCloudMode()) { + CloudReplica cloudReplica = (CloudReplica) replica; + backends = cloudReplica.getAllPrimaryBes(); + } else { + Backend backend = Env.getCurrentSystemInfo().getBackend(replica.getBackendIdWithoutException()); + backends = Lists.newArrayList(backend); + } + for (Backend backend : backends) { + if (backend != null) { + TReplicaInfo replicaInfo = new TReplicaInfo(); + replicaInfo.setHost(backend.getHost()); + replicaInfo.setBePort(backend.getBePort()); + replicaInfo.setHttpPort(backend.getHttpPort()); + replicaInfo.setBrpcPort(backend.getBrpcPort()); + replicaInfo.setReplicaId(replica.getId()); + replicaInfos.add(replicaInfo); + } } } tabletReplicaInfos.put(tabletId, replicaInfos); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 150c07fcf9a9ed..20de386b3eba7b 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -989,6 +989,21 @@ message PGetBeResourceResponse { optional PGlobalResourceUsage global_be_resource_usage = 2; } +message PGetTabletRowsetsRequest { + optional int64 tablet_id = 1; + optional int64 version_start = 2; + optional int64 version_end = 3; + + optional DeleteBitmapPB delete_bitmap_keys = 4; +} + +message PGetTabletRowsetsResponse { + required PStatus status = 1; + repeated RowsetMetaPB rowsets = 2; + + optional DeleteBitmapPB delete_bitmap = 3; +} + service PBackendService { rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult); rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult); @@ -1041,5 +1056,6 @@ service PBackendService { rpc test_jdbc_connection(PJdbcTestConnectionRequest) returns (PJdbcTestConnectionResult); rpc alter_vault_sync(PAlterVaultSyncRequest) returns (PAlterVaultSyncResponse); rpc get_be_resource(PGetBeResourceRequest) returns (PGetBeResourceResponse); + rpc get_tablet_rowsets(PGetTabletRowsetsRequest) returns (PGetTabletRowsetsResponse); }; diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_version_already_merged.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_version_already_merged.out new file mode 100644 index 00000000000000..78964812ebfb24 --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_version_already_merged.out @@ -0,0 +1,15 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 +2 2 2 2 +3 3 3 3 +4 4 4 4 +5 5 5 5 + +-- !sql -- +1 1 1 1 +2 2 2 2 +3 3 3 3 +4 4 4 4 +5 5 5 5 + diff --git a/regression-test/pipeline/p0/conf/be.conf b/regression-test/pipeline/p0/conf/be.conf index 5627f048905a27..9b0bc0f8e7184b 100644 --- a/regression-test/pipeline/p0/conf/be.conf +++ b/regression-test/pipeline/p0/conf/be.conf @@ -80,3 +80,5 @@ large_cumu_compaction_task_min_thread_num=3 # This feature has bug, so by default is false, only open it in pipeline to observe enable_parquet_page_index=true + +enable_fetch_rowsets_from_peer_replicas = true diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_version_already_merged.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_version_already_merged.groovy new file mode 100644 index 00000000000000..e1c6669b11b8ea --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_version_already_merged.groovy @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.util.NodeType + +suite("test_cloud_version_already_merged", "nonConcurrent") { + if (!isCloudMode()) { + return + } + def tblName = "test_cloud_version_already_merged" + sql """ DROP TABLE IF EXISTS ${tblName} FORCE; """ + sql """ + CREATE TABLE IF NOT EXISTS ${tblName} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); + """ + + sql "insert into ${tblName} values(1,-1,-1,-1);" + sql "insert into ${tblName} values(2,-2,-2,-2);" + sql "insert into ${tblName} values(3,-3,-3,-3);" + sql "insert into ${tblName} values(4,-4,-4,-4)" + sql "insert into ${tblName} values(5,-5,-5,-5)" + sql "insert into ${tblName} values(1,1,1,1);" + sql "insert into ${tblName} values(2,2,2,2);" + sql "insert into ${tblName} values(3,3,3,3);" + sql "insert into ${tblName} values(4,4,4,4)" + sql "insert into ${tblName} values(5,5,5,5)" + + + sql "sync;" + qt_sql "select * from ${tblName} order by k1;" + + def backends = sql_return_maparray('show backends') + def tabletStats = sql_return_maparray("show tablets from ${tblName};") + assert tabletStats.size() == 1 + def tabletId = tabletStats[0].TabletId + def tabletBackendId = tabletStats[0].BackendId + def tabletBackend + for (def be : backends) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + try { + GetDebugPoint().enableDebugPoint(tabletBackend.Host, tabletBackend.HttpPort as int, NodeType.BE, "Tablet::capture_consistent_versions.inject_failure", [tablet_id: tabletId, skip_by_option: true]) + GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host") + + qt_sql """ SELECT * from ${tblName} ORDER BY k1 """ + + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("Tablet::capture_consistent_versions.inject_failure", [tablet_id: tabletId]) + GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host") + + test { + sql """ SELECT * from ${tblName} ORDER BY k1 """ + exception "version already merged, meet error during remote capturing rowsets" + } + + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } + + try { + GetDebugPoint().enableDebugPoint(tabletBackend.Host, tabletBackend.HttpPort as int, NodeType.BE, "Tablet::capture_consistent_versions.inject_failure", [tablet_id: tabletId, skip_by_option: true]) + GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host") + GetDebugPoint().enableDebugPointForAllBEs("GetRowsetCntl::start_req_bg.inject_failure"); + + test { + sql """ SELECT * from ${tblName} ORDER BY k1 """ + exception "version already merged, meet error during remote capturing rowsets" + } + + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } + + try { + GetDebugPoint().enableDebugPoint(tabletBackend.Host, tabletBackend.HttpPort as int, NodeType.BE, "Tablet::capture_consistent_versions.inject_failure", [tablet_id: tabletId, skip_by_option: true]) + GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host") + GetDebugPoint().enableDebugPointForAllBEs("Tablet::_remote_get_rowsets_meta.inject_replica_address_fail"); + + test { + sql """ SELECT * from ${tblName} ORDER BY k1 """ + exception "version already merged, meet error during remote capturing rowsets" + } + + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } +}