diff --git a/be/src/cloud/cloud_base_compaction.cpp b/be/src/cloud/cloud_base_compaction.cpp index d749459f8422d6..7c27e91007d0d9 100644 --- a/be/src/cloud/cloud_base_compaction.cpp +++ b/be/src/cloud/cloud_base_compaction.cpp @@ -344,7 +344,7 @@ Status CloudBaseCompaction::modify_rowsets() { compaction_job->set_size_output_rowsets(_output_rowset->total_disk_size()); compaction_job->set_num_input_segments(_input_segments); compaction_job->set_num_output_segments(_output_rowset->num_segments()); - compaction_job->set_num_input_rowsets(_input_rowsets.size()); + compaction_job->set_num_input_rowsets(num_input_rowsets()); compaction_job->set_num_output_rowsets(1); compaction_job->add_input_versions(_input_rowsets.front()->start_version()); compaction_job->add_input_versions(_input_rowsets.back()->end_version()); diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index 81c8f78f613993..d5e6373d64adc6 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -270,7 +270,7 @@ Status CloudCumulativeCompaction::modify_rowsets() { compaction_job->set_size_output_rowsets(_output_rowset->total_disk_size()); compaction_job->set_num_input_segments(_input_segments); compaction_job->set_num_output_segments(_output_rowset->num_segments()); - compaction_job->set_num_input_rowsets(_input_rowsets.size()); + compaction_job->set_num_input_rowsets(num_input_rowsets()); compaction_job->set_num_output_rowsets(1); compaction_job->add_input_versions(_input_rowsets.front()->start_version()); compaction_job->add_input_versions(_input_rowsets.back()->end_version()); diff --git a/be/src/cloud/cloud_delta_writer.cpp b/be/src/cloud/cloud_delta_writer.cpp index 6fac6a13873e63..8b0f1fc5d2ab4f 100644 --- a/be/src/cloud/cloud_delta_writer.cpp +++ b/be/src/cloud/cloud_delta_writer.cpp @@ -20,6 +20,7 @@ #include "cloud/cloud_meta_mgr.h" #include "cloud/cloud_rowset_builder.h" #include "cloud/cloud_storage_engine.h" +#include "cloud/config.h" #include "olap/delta_writer.h" #include "runtime/thread_context.h" @@ -108,11 +109,31 @@ const RowsetMetaSharedPtr& CloudDeltaWriter::rowset_meta() { Status CloudDeltaWriter::commit_rowset() { std::lock_guard lock(_mtx); + + // Handle empty rowset (no data written) if (!_is_init) { - // No data to write, but still need to write a empty rowset kv to keep version continuous - RETURN_IF_ERROR(_rowset_builder->init()); - RETURN_IF_ERROR(_rowset_builder->build_rowset()); + return _commit_empty_rowset(); + } + + // Handle normal rowset with data + return _engine.meta_mgr().commit_rowset(*rowset_meta(), ""); +} + +Status CloudDeltaWriter::_commit_empty_rowset() { + // If skip writing empty rowset metadata is enabled, + // we do not prepare rowset to meta service. + if (config::skip_writing_empty_rowset_metadata) { + rowset_builder()->set_skip_writing_rowset_metadata(true); + } + + RETURN_IF_ERROR(_rowset_builder->init()); + RETURN_IF_ERROR(_rowset_builder->build_rowset()); + + // If skip writing empty rowset metadata is enabled, we do not commit rowset to meta service. + if (config::skip_writing_empty_rowset_metadata) { + return Status::OK(); } + // write a empty rowset kv to keep version continuous return _engine.meta_mgr().commit_rowset(*rowset_meta(), ""); } diff --git a/be/src/cloud/cloud_delta_writer.h b/be/src/cloud/cloud_delta_writer.h index 994410f438d5ca..b4c9e6eda6f7f6 100644 --- a/be/src/cloud/cloud_delta_writer.h +++ b/be/src/cloud/cloud_delta_writer.h @@ -58,6 +58,9 @@ class CloudDeltaWriter final : public BaseDeltaWriter { // Convert `_rowset_builder` from `BaseRowsetBuilder` to `CloudRowsetBuilder` CloudRowsetBuilder* rowset_builder(); + // Handle commit for empty rowset (when no data is written) + Status _commit_empty_rowset(); + bthread::Mutex _mtx; CloudStorageEngine& _engine; std::shared_ptr _resource_ctx; diff --git a/be/src/cloud/cloud_full_compaction.cpp b/be/src/cloud/cloud_full_compaction.cpp index 8aadcf9ffe0ea0..2e351400f20e4e 100644 --- a/be/src/cloud/cloud_full_compaction.cpp +++ b/be/src/cloud/cloud_full_compaction.cpp @@ -224,7 +224,7 @@ Status CloudFullCompaction::modify_rowsets() { }) compaction_job->set_num_input_segments(_input_segments); compaction_job->set_num_output_segments(_output_rowset->num_segments()); - compaction_job->set_num_input_rowsets(_input_rowsets.size()); + compaction_job->set_num_input_rowsets(num_input_rowsets()); compaction_job->set_num_output_rowsets(1); compaction_job->add_input_versions(_input_rowsets.front()->start_version()); compaction_job->add_input_versions(_input_rowsets.back()->end_version()); diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index 86de6080585a4a..7ba6113def1cb5 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -159,6 +159,7 @@ bvar::Window> g_cloud_ms_rpc_timeout_count_window( "cloud_meta_mgr_rpc_timeout_qps", &g_cloud_meta_mgr_rpc_timeout_count, 30); bvar::LatencyRecorder g_cloud_be_mow_get_dbm_lock_backoff_sleep_time( "cloud_be_mow_get_dbm_lock_backoff_sleep_time"); +bvar::Adder g_cloud_version_hole_filled_count("cloud_version_hole_filled_count"); class MetaServiceProxy { public: @@ -740,6 +741,12 @@ Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet, options.warmup_delta_data || config::enable_warmup_immediately_on_new_rowset); } + + // Fill version holes + int64_t partition_max_version = + resp.has_partition_max_version() ? resp.partition_max_version() : -1; + RETURN_IF_ERROR(fill_version_holes(tablet, partition_max_version, wlock)); + tablet->last_base_compaction_success_time_ms = stats.last_base_compaction_time_ms(); tablet->last_cumu_compaction_success_time_ms = stats.last_cumu_compaction_time_ms(); tablet->set_base_compaction_cnt(stats.base_compaction_cnt()); @@ -1590,5 +1597,121 @@ int64_t CloudMetaMgr::get_inverted_index_file_szie(const RowsetMeta& rs_meta) { return total_inverted_index_size; } +Status CloudMetaMgr::fill_version_holes(CloudTablet* tablet, int64_t max_version, + std::unique_lock& wlock) { + if (max_version <= 0) { + return Status::OK(); + } + + Versions existing_versions; + for (const auto& rs : tablet->tablet_meta()->all_rs_metas()) { + existing_versions.emplace_back(rs->version()); + } + + // If there are no existing versions, it may be a new tablet for restore, so skip filling holes. + if (existing_versions.empty()) { + return Status::OK(); + } + + std::vector hole_rowsets; + // sort the existing versions in ascending order + std::sort(existing_versions.begin(), existing_versions.end(), + [](const Version& a, const Version& b) { + // simple because 2 versions are certainly not overlapping + return a.first < b.first; + }); + + int64_t last_version = -1; + for (const Version& version : existing_versions) { + // missing versions are those that are not in the existing_versions + if (version.first > last_version + 1) { + // there is a hole between versions + auto prev_non_hole_rowset = tablet->get_rowset_by_version(version); + for (int64_t ver = last_version + 1; ver < version.first; ++ver) { + RowsetSharedPtr hole_rowset; + RETURN_IF_ERROR(create_empty_rowset_for_hole( + tablet, ver, prev_non_hole_rowset->rowset_meta(), &hole_rowset)); + hole_rowsets.push_back(hole_rowset); + } + LOG(INFO) << "Created empty rowset for version hole, from " << last_version + 1 + << " to " << version.first - 1 << " for tablet " << tablet->tablet_id(); + } + last_version = version.second; + } + + if (last_version + 1 <= max_version) { + LOG(INFO) << "Created empty rowset for version hole, from " << last_version + 1 << " to " + << max_version << " for tablet " << tablet->tablet_id(); + for (; last_version + 1 <= max_version; ++last_version) { + RowsetSharedPtr hole_rowset; + auto prev_non_hole_rowset = tablet->get_rowset_by_version(existing_versions.back()); + RETURN_IF_ERROR(create_empty_rowset_for_hole( + tablet, last_version + 1, prev_non_hole_rowset->rowset_meta(), &hole_rowset)); + hole_rowsets.push_back(hole_rowset); + } + } + + if (!hole_rowsets.empty()) { + size_t hole_count = hole_rowsets.size(); + tablet->add_rowsets(std::move(hole_rowsets), false, wlock, false); + g_cloud_version_hole_filled_count << hole_count; + } + return Status::OK(); +} + +Status CloudMetaMgr::create_empty_rowset_for_hole(CloudTablet* tablet, int64_t version, + RowsetMetaSharedPtr prev_rowset_meta, + RowsetSharedPtr* rowset) { + // Create a RowsetMeta for the empty rowset + auto rs_meta = std::make_shared(); + + // Generate a deterministic rowset ID for the hole (same tablet_id + version = same rowset_id) + RowsetId hole_rowset_id; + hole_rowset_id.init(2, 0, tablet->tablet_id(), version); + rs_meta->set_rowset_id(hole_rowset_id); + + // Generate a deterministic load_id for the hole rowset (same tablet_id + version = same load_id) + PUniqueId load_id; + load_id.set_hi(tablet->tablet_id()); + load_id.set_lo(version); + rs_meta->set_load_id(load_id); + + // Copy schema and other metadata from template + rs_meta->set_tablet_schema(prev_rowset_meta->tablet_schema()); + rs_meta->set_rowset_type(prev_rowset_meta->rowset_type()); + rs_meta->set_tablet_schema_hash(prev_rowset_meta->tablet_schema_hash()); + rs_meta->set_resource_id(prev_rowset_meta->resource_id()); + + // Basic tablet information + rs_meta->set_tablet_id(tablet->tablet_id()); + rs_meta->set_index_id(tablet->index_id()); + rs_meta->set_partition_id(tablet->partition_id()); + rs_meta->set_tablet_uid(tablet->tablet_uid()); + rs_meta->set_version(Version(version, version)); + rs_meta->set_txn_id(version); + + rs_meta->set_num_rows(0); + rs_meta->set_total_disk_size(0); + rs_meta->set_data_disk_size(0); + rs_meta->set_index_disk_size(0); + rs_meta->set_empty(true); + rs_meta->set_num_segments(0); + rs_meta->set_segments_overlap(NONOVERLAPPING); + rs_meta->set_rowset_state(VISIBLE); + rs_meta->set_creation_time(UnixSeconds()); + rs_meta->set_newest_write_timestamp(UnixSeconds()); + + Status s = RowsetFactory::create_rowset(nullptr, "", rs_meta, rowset); + if (!s.ok()) { + LOG_WARNING("Failed to create empty rowset for hole") + .tag("tablet_id", tablet->tablet_id()) + .tag("version", version) + .error(s); + return s; + } + (*rowset)->set_hole_rowset(true); + + return Status::OK(); +} #include "common/compile_check_end.h" } // namespace doris::cloud diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h index 30b5ecd581c534..3c488b5635a904 100644 --- a/be/src/cloud/cloud_meta_mgr.h +++ b/be/src/cloud/cloud_meta_mgr.h @@ -27,6 +27,7 @@ #include "cloud/cloud_tablet.h" #include "common/status.h" +#include "olap/rowset/rowset_fwd.h" #include "olap/rowset/rowset_meta.h" #include "util/s3_util.h" @@ -148,6 +149,15 @@ class CloudMetaMgr { void remove_delete_bitmap_update_lock(int64_t table_id, int64_t lock_id, int64_t initiator, int64_t tablet_id); + // Fill version holes by creating empty rowsets for missing versions + Status fill_version_holes(CloudTablet* tablet, int64_t max_version, + std::unique_lock& wlock); + + // Create an empty rowset to fill a version hole + Status create_empty_rowset_for_hole(CloudTablet* tablet, int64_t version, + RowsetMetaSharedPtr prev_rowset_meta, + RowsetSharedPtr* rowset); + private: bool sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64_t old_max_version, std::ranges::range auto&& rs_metas, @@ -157,6 +167,7 @@ class CloudMetaMgr { std::ranges::range auto&& rs_metas, const TabletStatsPB& stats, const TabletIndexPB& idx, DeleteBitmap* delete_bitmap, bool full_sync = false, SyncRowsetStats* sync_stats = nullptr); + void check_table_size_correctness(const RowsetMeta& rs_meta); int64_t get_segment_file_size(const RowsetMeta& rs_meta); int64_t get_inverted_index_file_szie(const RowsetMeta& rs_meta); diff --git a/be/src/cloud/cloud_rowset_builder.cpp b/be/src/cloud/cloud_rowset_builder.cpp index d135292a7817d0..2f9697e59d99ef 100644 --- a/be/src/cloud/cloud_rowset_builder.cpp +++ b/be/src/cloud/cloud_rowset_builder.cpp @@ -81,7 +81,9 @@ Status CloudRowsetBuilder::init() { _calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor()->create_token(); - RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_rowset_writer->rowset_meta(), "")); + if (!_skip_writing_rowset_metadata) { + RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_rowset_writer->rowset_meta(), "")); + } _is_init = true; return Status::OK(); diff --git a/be/src/cloud/cloud_rowset_builder.h b/be/src/cloud/cloud_rowset_builder.h index 05e24e663825fc..afa5d7c7574b6d 100644 --- a/be/src/cloud/cloud_rowset_builder.h +++ b/be/src/cloud/cloud_rowset_builder.h @@ -39,6 +39,8 @@ class CloudRowsetBuilder final : public BaseRowsetBuilder { Status set_txn_related_delete_bitmap(); + void set_skip_writing_rowset_metadata(bool skip) { _skip_writing_rowset_metadata = skip; } + private: // Convert `_tablet` from `BaseTablet` to `CloudTablet` CloudTablet* cloud_tablet(); @@ -46,6 +48,10 @@ class CloudRowsetBuilder final : public BaseRowsetBuilder { Status check_tablet_version_count(); CloudStorageEngine& _engine; + + // whether to skip writing rowset metadata to meta service. + // This is used for empty rowset when config::skip_writing_empty_rowset_metadata is true. + bool _skip_writing_rowset_metadata = false; }; } // namespace doris diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 88a3a66c2ab972..84ccba2ce4805d 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -701,7 +701,6 @@ std::vector CloudTablet::recycle_cached_data( void CloudTablet::reset_approximate_stats(int64_t num_rowsets, int64_t num_segments, int64_t num_rows, int64_t data_size) { - _approximate_num_rowsets.store(num_rowsets, std::memory_order_relaxed); _approximate_num_segments.store(num_segments, std::memory_order_relaxed); _approximate_num_rows.store(num_rows, std::memory_order_relaxed); _approximate_data_size.store(data_size, std::memory_order_relaxed); @@ -712,10 +711,16 @@ void CloudTablet::reset_approximate_stats(int64_t num_rowsets, int64_t num_segme if (v.second < cp) { continue; } - cumu_num_deltas += r->is_segments_overlapping() ? r->num_segments() : 1; ++cumu_num_rowsets; } + // num_rowsets may be less than the size of _rs_version_map when there are some hole rowsets + // in the version map, so we use the max value to ensure that the approximate number + // of rowsets is at least the size of _rs_version_map. + // Note that this is not the exact number of rowsets, but an approximate number. + int64_t approximate_num_rowsets = + std::max(num_rowsets, static_cast(_rs_version_map.size())); + _approximate_num_rowsets.store(approximate_num_rowsets, std::memory_order_relaxed); _approximate_cumu_num_rowsets.store(cumu_num_rowsets, std::memory_order_relaxed); _approximate_cumu_num_deltas.store(cumu_num_deltas, std::memory_order_relaxed); } diff --git a/be/src/cloud/cloud_tablets_channel.cpp b/be/src/cloud/cloud_tablets_channel.cpp index cbc6a5d525da81..3d27138d08abbb 100644 --- a/be/src/cloud/cloud_tablets_channel.cpp +++ b/be/src/cloud/cloud_tablets_channel.cpp @@ -22,6 +22,7 @@ #include "cloud/cloud_delta_writer.h" #include "cloud/cloud_meta_mgr.h" #include "cloud/cloud_storage_engine.h" +#include "cloud/config.h" #include "olap/delta_writer.h" #include "runtime/tablets_channel.h" @@ -62,6 +63,7 @@ Status CloudTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& reques _build_tablet_to_rowidxs(request, &tablet_to_rowidxs); std::unordered_set partition_ids; + std::vector writers; { // add_batch may concurrency with inc_open but not under _lock. // so need to protect it with _tablet_writers_lock. @@ -72,8 +74,11 @@ Status CloudTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& reques return Status::InternalError("unknown tablet to append data, tablet={}", tablet_id); } partition_ids.insert(tablet_writer_it->second->partition_id()); + writers.push_back(static_cast(tablet_writer_it->second.get())); } - if (!partition_ids.empty()) { + if (config::skip_writing_empty_rowset_metadata && !writers.empty()) { + RETURN_IF_ERROR(CloudDeltaWriter::batch_init(writers)); + } else if (!partition_ids.empty()) { RETURN_IF_ERROR(_init_writers_by_partition_ids(partition_ids)); } } diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index 96e53d66b40f5a..b60e5efbfeb437 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -91,6 +91,8 @@ DEFINE_mInt32(meta_service_conflict_error_retry_times, "10"); DEFINE_Bool(enable_check_storage_vault, "true"); +DEFINE_mBool(skip_writing_empty_rowset_metadata, "true"); + DEFINE_mInt64(warmup_tablet_replica_info_cache_ttl_sec, "600"); DEFINE_mInt64(warm_up_rowset_slow_log_ms, "1000"); diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index 3a949746818ef5..7301af17b1505c 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -108,6 +108,9 @@ DECLARE_mInt32(delete_bitmap_lock_expiration_seconds); DECLARE_mInt32(get_delete_bitmap_lock_max_retry_times); +// Skip writing empty rowset metadata to meta service +DECLARE_mBool(skip_writing_empty_rowset_metadata); + // enable large txn lazy commit in meta-service `commit_txn` DECLARE_mBool(enable_cloud_txn_lazy_commit); diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index c0c394ae635fa3..72a80cfbd61912 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -1541,8 +1541,16 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext& } // Use the storage resource of the previous rowset - ctx.storage_resource = - *DORIS_TRY(_input_rowsets.back()->rowset_meta()->remote_storage_resource()); + // when multiple hole rowsets doing compaction, those rowsets may not have a storage resource. + // case: + // [0-1, 2-2, 3-3, 4-4, 5-5], 2-5 are hole rowsets. + // 0-1 current doesn't have a resource_id, so 2-5 also have no resource_id. + // Because there is no data to write, so we can skip setting the storage resource. + if (!_input_rowsets.back()->is_hole_rowset() || + !_input_rowsets.back()->rowset_meta()->resource_id().empty()) { + ctx.storage_resource = + *DORIS_TRY(_input_rowsets.back()->rowset_meta()->remote_storage_resource()); + } ctx.txn_id = boost::uuids::hash_value(UUIDGenerator::instance()->next_uuid()) & std::numeric_limits::max(); // MUST be positive @@ -1597,5 +1605,16 @@ void CloudCompactionMixin::update_compaction_level() { } } +// should skip hole rowsets, ortherwise the count will be wrong in ms +int64_t CloudCompactionMixin::num_input_rowsets() const { + int64_t count = 0; + for (const auto& r : _input_rowsets) { + if (!r->is_hole_rowset()) { + count++; + } + } + return count; +} + #include "common/compile_check_end.h" } // namespace doris diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 4f267c091f5df6..edefc3a527b0eb 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -205,6 +205,8 @@ class CloudCompactionMixin : public Compaction { int64_t initiator() const; + int64_t num_input_rowsets() const; + protected: CloudTablet* cloud_tablet() { return static_cast(_tablet.get()); } diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index a10dc3866e1010..6b105ed37e8020 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -318,6 +318,11 @@ class Rowset : public std::enable_shared_from_this, public MetadataAdder std::vector get_index_file_names(); + // check if the rowset is a hole rowset + bool is_hole_rowset() const { return _is_hole_rowset; } + // set the rowset as a hole rowset + void set_hole_rowset(bool is_hole_rowset) { _is_hole_rowset = is_hole_rowset; } + protected: friend class RowsetFactory; @@ -358,6 +363,11 @@ class Rowset : public std::enable_shared_from_this, public MetadataAdder // , skip index compaction std::set skip_index_compaction; + + // only used for cloud mode, it indicates whether this rowset is a hole rowset. + // a hole rowset is a rowset that has no data, but is used to fill the version gap + // it is used to ensure that the version sequence is continuous. + bool _is_hole_rowset = false; }; // `rs_metas` MUST already be sorted by `RowsetMeta::comparator` diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index 879cffd6c74552..1d1c59ed56935c 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -67,6 +67,10 @@ class RowsetMeta : public MetadataAdder { const std::string& resource_id() const { return _rowset_meta_pb.resource_id(); } + void set_resource_id(const std::string& resource_id) { + _rowset_meta_pb.set_resource_id(resource_id); + } + bool is_local() const { return !_rowset_meta_pb.has_resource_id(); } bool has_variant_type_in_schema() const; diff --git a/be/test/cloud/cloud_meta_mgr_test.cpp b/be/test/cloud/cloud_meta_mgr_test.cpp index b938d949553881..b925dd6f645086 100644 --- a/be/test/cloud/cloud_meta_mgr_test.cpp +++ b/be/test/cloud/cloud_meta_mgr_test.cpp @@ -21,6 +21,15 @@ #include #include +#include + +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet.h" +#include "olap/olap_common.h" +#include "olap/rowset/rowset_factory.h" +#include "olap/rowset/rowset_meta.h" +#include "olap/tablet_meta.h" +#include "util/uid_util.h" namespace doris { using namespace cloud; @@ -88,4 +97,531 @@ TEST_F(CloudMetaMgrTest, bthread_fork_join_test) { // clang-format on } +TEST_F(CloudMetaMgrTest, test_fill_version_holes_no_holes) { + CloudStorageEngine engine(EngineOptions {}); + CloudMetaMgr meta_mgr; + + TabletMetaSharedPtr tablet_meta( + new TabletMeta(1001, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, + UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F)); + auto tablet = std::make_shared(engine, std::make_shared(*tablet_meta)); + + // Add consecutive versions: 0, 1, 2, 3, 4 + std::vector rowsets; + for (int64_t version = 0; version <= 4; ++version) { + auto rs_meta = std::make_shared(); + rs_meta->set_tablet_id(1001); + rs_meta->set_index_id(2); + rs_meta->set_partition_id(15673); + rs_meta->set_tablet_uid(UniqueId(9, 10)); + rs_meta->set_version(Version(version, version)); + rs_meta->set_rowset_type(BETA_ROWSET); + rs_meta->set_rowset_id(engine.next_rowset_id()); + rs_meta->set_num_rows(100); + rs_meta->set_empty(false); + rs_meta->set_tablet_schema(tablet->tablet_schema()); + + // Create rowset and add it to tablet + RowsetSharedPtr rowset; + auto status = RowsetFactory::create_rowset(tablet->tablet_schema(), tablet->tablet_path(), + rs_meta, &rowset); + EXPECT_TRUE(status.ok()); + rowsets.push_back(rowset); + } + + // Add all rowsets to tablet + { + std::unique_lock lock(tablet->get_header_lock()); + tablet->add_rowsets(std::move(rowsets), false, lock, false); + } + + // Test fill_version_holes directly - should not add any rowsets since there are no holes + std::unique_lock wlock(tablet->get_header_lock()); + Status status = meta_mgr.fill_version_holes(tablet.get(), 4, wlock); + EXPECT_TRUE(status.ok()); + + // Verify tablet still has the same number of rowsets (no holes to fill) + EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 5); + // Verify rows number is correct + for (const auto& rs_meta : tablet->tablet_meta()->all_rs_metas()) { + EXPECT_EQ(rs_meta->num_rows(), 100); + } +} + +TEST_F(CloudMetaMgrTest, test_fill_version_holes_with_holes) { + CloudStorageEngine engine(EngineOptions {}); + CloudMetaMgr meta_mgr; + + TabletMetaSharedPtr tablet_meta( + new TabletMeta(1001, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, + UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F)); + auto tablet = std::make_shared(engine, std::make_shared(*tablet_meta)); + + // Add non-consecutive versions: 0, 2, 4 (missing 1, 3) + std::vector versions = {0, 2, 4}; + std::vector rowsets; + for (int64_t version : versions) { + auto rs_meta = std::make_shared(); + rs_meta->set_tablet_id(1001); + rs_meta->set_index_id(2); + rs_meta->set_partition_id(15673); + rs_meta->set_tablet_uid(UniqueId(9, 10)); + rs_meta->set_version(Version(version, version)); + rs_meta->set_rowset_type(BETA_ROWSET); + rs_meta->set_rowset_id(engine.next_rowset_id()); + rs_meta->set_num_rows(100); + rs_meta->set_empty(false); + rs_meta->set_tablet_schema(tablet->tablet_schema()); + + // Create rowset and add it to list + RowsetSharedPtr rowset; + auto status = RowsetFactory::create_rowset(tablet->tablet_schema(), tablet->tablet_path(), + rs_meta, &rowset); + EXPECT_TRUE(status.ok()); + rowsets.push_back(rowset); + } + + // Add all rowsets to tablet + { + std::unique_lock lock(tablet->get_header_lock()); + tablet->add_rowsets(std::move(rowsets), false, lock, false); + } + + // Initially we have 3 rowsets (versions 0, 2, 4) + EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 3); + + // Test fill_version_holes directly to fill missing versions 1 and 3 + std::unique_lock wlock(tablet->get_header_lock()); + Status status = meta_mgr.fill_version_holes(tablet.get(), 4, wlock); + EXPECT_TRUE(status.ok()); + + // After filling holes, we should have 5 rowsets (versions 0, 1, 2, 3, 4) + EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 5); + + // Verify all versions are present + auto rs_metas = tablet->tablet_meta()->all_rs_metas(); + std::set found_versions; + for (const auto& rs_meta : rs_metas) { + found_versions.insert(rs_meta->version().first); + } + EXPECT_EQ(found_versions.size(), 5); + EXPECT_TRUE(found_versions.contains(0)); + EXPECT_TRUE(found_versions.contains(1)); + EXPECT_TRUE(found_versions.contains(2)); + EXPECT_TRUE(found_versions.contains(3)); + EXPECT_TRUE(found_versions.contains(4)); + + // Verify the hole rowsets (versions 1 and 3) are empty + for (const auto& rs_meta : rs_metas) { + if (rs_meta->version().first == 1 || rs_meta->version().first == 3) { + EXPECT_TRUE(rs_meta->empty()); + EXPECT_EQ(rs_meta->num_rows(), 0); + } else { + EXPECT_FALSE(rs_meta->empty()); + EXPECT_EQ(rs_meta->num_rows(), 100); + } + } +} + +// Test create_empty_rowset_for_hole function +TEST_F(CloudMetaMgrTest, test_create_empty_rowset_for_hole) { + CloudStorageEngine engine(EngineOptions {}); + CloudMetaMgr meta_mgr; + + TabletMetaSharedPtr tablet_meta( + new TabletMeta(1001, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, + UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F)); + auto tablet = std::make_shared(engine, std::make_shared(*tablet_meta)); + + // Create a previous rowset meta to pass as reference + auto prev_rs_meta = std::make_shared(); + prev_rs_meta->set_tablet_id(1001); + prev_rs_meta->set_index_id(2); + prev_rs_meta->set_partition_id(15673); + prev_rs_meta->set_tablet_uid(UniqueId(9, 10)); + prev_rs_meta->set_version(Version(1, 1)); + prev_rs_meta->set_rowset_type(BETA_ROWSET); + prev_rs_meta->set_rowset_id(engine.next_rowset_id()); + prev_rs_meta->set_num_rows(100); + prev_rs_meta->set_empty(false); + prev_rs_meta->set_tablet_schema(tablet->tablet_schema()); + + // Test creating an empty rowset for version hole + RowsetSharedPtr hole_rowset; + Status status = + meta_mgr.create_empty_rowset_for_hole(tablet.get(), 2, prev_rs_meta, &hole_rowset); + EXPECT_TRUE(status.ok()) << "Failed to create empty rowset for hole: " << status; + EXPECT_NE(hole_rowset, nullptr); + + // Verify the hole rowset properties + auto hole_rs_meta = hole_rowset->rowset_meta(); + EXPECT_EQ(hole_rs_meta->tablet_id(), 15673); + EXPECT_EQ(hole_rs_meta->index_id(), 0); + EXPECT_EQ(hole_rs_meta->partition_id(), 2); + EXPECT_EQ(hole_rs_meta->tablet_uid(), UniqueId(9, 10)); + EXPECT_EQ(hole_rs_meta->version(), Version(2, 2)); + EXPECT_EQ(hole_rs_meta->rowset_type(), BETA_ROWSET); + EXPECT_EQ(hole_rs_meta->num_rows(), 0); + EXPECT_EQ(hole_rs_meta->total_disk_size(), 0); + EXPECT_EQ(hole_rs_meta->data_disk_size(), 0); + EXPECT_EQ(hole_rs_meta->index_disk_size(), 0); + EXPECT_TRUE(hole_rs_meta->empty()); + EXPECT_EQ(hole_rs_meta->num_segments(), 0); + EXPECT_EQ(hole_rs_meta->segments_overlap(), NONOVERLAPPING); + EXPECT_EQ(hole_rs_meta->rowset_state(), VISIBLE); + EXPECT_TRUE(hole_rowset->is_hole_rowset()); + EXPECT_EQ(hole_rowset->txn_id(), 2); // txn_id should match version + RowsetId expected_rowset_id; + expected_rowset_id.init(2, 0, 15673, 2); + EXPECT_EQ(hole_rowset->rowset_meta()->rowset_id(), expected_rowset_id); + + // Test creating multiple hole rowsets with different versions + RowsetSharedPtr hole_rowset_v3; + status = meta_mgr.create_empty_rowset_for_hole(tablet.get(), 3, prev_rs_meta, &hole_rowset_v3); + EXPECT_TRUE(status.ok()); + EXPECT_NE(hole_rowset_v3, nullptr); + EXPECT_EQ(hole_rowset_v3->rowset_meta()->version(), Version(3, 3)); + EXPECT_TRUE(hole_rowset_v3->is_hole_rowset()); + + // Verify different hole rowsets have different rowset IDs + EXPECT_NE(hole_rowset->rowset_meta()->rowset_id(), hole_rowset_v3->rowset_meta()->rowset_id()); +} + +TEST_F(CloudMetaMgrTest, test_fill_version_holes_edge_cases) { + CloudStorageEngine engine(EngineOptions {}); + CloudMetaMgr meta_mgr; + + // Test case 1: max_version <= 0 + { + TabletMetaSharedPtr tablet_meta(new TabletMeta( + 1001, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10), + TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F)); + auto tablet = + std::make_shared(engine, std::make_shared(*tablet_meta)); + + std::unique_lock wlock(tablet->get_header_lock()); + Status status = meta_mgr.fill_version_holes(tablet.get(), 0, wlock); + EXPECT_TRUE(status.ok()); + + status = meta_mgr.fill_version_holes(tablet.get(), -1, wlock); + EXPECT_TRUE(status.ok()); + + EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 0); + } + + // Test case 2: empty tablet (no existing versions) + { + TabletMetaSharedPtr tablet_meta(new TabletMeta( + 1002, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10), + TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F)); + auto tablet = + std::make_shared(engine, std::make_shared(*tablet_meta)); + + std::unique_lock wlock(tablet->get_header_lock()); + Status status = meta_mgr.fill_version_holes(tablet.get(), 5, wlock); + EXPECT_TRUE(status.ok()); + + // Should still have no rowsets + EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 0); + } +} + +TEST_F(CloudMetaMgrTest, test_fill_version_holes_trailing_holes) { + CloudStorageEngine engine(EngineOptions {}); + CloudMetaMgr meta_mgr; + + TabletMetaSharedPtr tablet_meta( + new TabletMeta(1003, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, + UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F)); + auto tablet = std::make_shared(engine, std::make_shared(*tablet_meta)); + + // Add only versions 0, 1, 2 but max_version is 5 (missing 3, 4, 5) + std::vector rowsets; + for (int64_t version = 0; version <= 2; ++version) { + auto rs_meta = std::make_shared(); + rs_meta->set_tablet_id(1003); + rs_meta->set_index_id(2); + rs_meta->set_partition_id(15673); + rs_meta->set_tablet_uid(UniqueId(9, 10)); + rs_meta->set_version(Version(version, version)); + rs_meta->set_rowset_type(BETA_ROWSET); + rs_meta->set_rowset_id(engine.next_rowset_id()); + rs_meta->set_num_rows(100); + rs_meta->set_empty(false); + rs_meta->set_tablet_schema(tablet->tablet_schema()); + + RowsetSharedPtr rowset; + auto status = RowsetFactory::create_rowset(tablet->tablet_schema(), tablet->tablet_path(), + rs_meta, &rowset); + EXPECT_TRUE(status.ok()); + rowsets.push_back(rowset); + } + + // Add all rowsets to tablet + { + std::unique_lock lock(tablet->get_header_lock()); + tablet->add_rowsets(std::move(rowsets), false, lock, false); + } + + // Initially we have 3 rowsets (versions 0, 1, 2) + EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 3); + + // Test fill_version_holes to fill trailing holes (versions 3, 4, 5) + std::unique_lock wlock(tablet->get_header_lock()); + Status status = meta_mgr.fill_version_holes(tablet.get(), 5, wlock); + EXPECT_TRUE(status.ok()); + + // After filling holes, we should have 6 rowsets (versions 0, 1, 2, 3, 4, 5) + EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 6); + + // Verify all versions are present + auto rs_metas = tablet->tablet_meta()->all_rs_metas(); + std::set found_versions; + for (const auto& rs_meta : rs_metas) { + found_versions.insert(rs_meta->version().first); + } + EXPECT_EQ(found_versions.size(), 6); + for (int64_t v = 0; v <= 5; ++v) { + EXPECT_TRUE(found_versions.contains(v)) << "Missing version " << v; + } + + // Verify the trailing hole rowsets (versions 3, 4, 5) are empty + for (const auto& rs_meta : rs_metas) { + if (rs_meta->version().first >= 3) { + EXPECT_TRUE(rs_meta->empty()) + << "Version " << rs_meta->version().first << " should be empty"; + EXPECT_EQ(rs_meta->num_rows(), 0); + EXPECT_EQ(rs_meta->total_disk_size(), 0); + } else { + EXPECT_FALSE(rs_meta->empty()) + << "Version " << rs_meta->version().first << " should not be empty"; + EXPECT_EQ(rs_meta->num_rows(), 100); + } + } +} + +TEST_F(CloudMetaMgrTest, test_fill_version_holes_single_hole) { + CloudStorageEngine engine(EngineOptions {}); + CloudMetaMgr meta_mgr; + + TabletMetaSharedPtr tablet_meta( + new TabletMeta(1004, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, + UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F)); + auto tablet = std::make_shared(engine, std::make_shared(*tablet_meta)); + + // Add versions 0, 2 (missing only version 1) + std::vector versions = {0, 2}; + std::vector rowsets; + for (int64_t version : versions) { + auto rs_meta = std::make_shared(); + rs_meta->set_tablet_id(1004); + rs_meta->set_index_id(2); + rs_meta->set_partition_id(15673); + rs_meta->set_tablet_uid(UniqueId(9, 10)); + rs_meta->set_version(Version(version, version)); + rs_meta->set_rowset_type(BETA_ROWSET); + rs_meta->set_rowset_id(engine.next_rowset_id()); + rs_meta->set_num_rows(100); + rs_meta->set_empty(false); + rs_meta->set_tablet_schema(tablet->tablet_schema()); + + RowsetSharedPtr rowset; + auto status = RowsetFactory::create_rowset(tablet->tablet_schema(), tablet->tablet_path(), + rs_meta, &rowset); + EXPECT_TRUE(status.ok()); + rowsets.push_back(rowset); + } + + // Add all rowsets to tablet + { + std::unique_lock lock(tablet->get_header_lock()); + tablet->add_rowsets(std::move(rowsets), false, lock, false); + } + + // Initially we have 2 rowsets (versions 0, 2) + EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 2); + + // Test fill_version_holes to fill single hole (version 1) + std::unique_lock wlock(tablet->get_header_lock()); + Status status = meta_mgr.fill_version_holes(tablet.get(), 2, wlock); + EXPECT_TRUE(status.ok()); + + // After filling holes, we should have 3 rowsets (versions 0, 1, 2) + EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 3); + + // Verify all versions are present + auto rs_metas = tablet->tablet_meta()->all_rs_metas(); + std::set found_versions; + for (const auto& rs_meta : rs_metas) { + found_versions.insert(rs_meta->version().first); + } + EXPECT_EQ(found_versions.size(), 3); + EXPECT_TRUE(found_versions.contains(0)); + EXPECT_TRUE(found_versions.contains(1)); + EXPECT_TRUE(found_versions.contains(2)); + + // Verify the hole rowset (version 1) is empty + for (const auto& rs_meta : rs_metas) { + if (rs_meta->version().first == 1) { + EXPECT_TRUE(rs_meta->empty()); + EXPECT_EQ(rs_meta->num_rows(), 0); + EXPECT_EQ(rs_meta->total_disk_size(), 0); + } else { + EXPECT_FALSE(rs_meta->empty()); + EXPECT_EQ(rs_meta->num_rows(), 100); + } + } +} + +TEST_F(CloudMetaMgrTest, test_fill_version_holes_multiple_consecutive_holes) { + CloudStorageEngine engine(EngineOptions {}); + CloudMetaMgr meta_mgr; + + TabletMetaSharedPtr tablet_meta( + new TabletMeta(1005, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, + UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F)); + auto tablet = std::make_shared(engine, std::make_shared(*tablet_meta)); + + // Add versions 0, 5 (missing 1, 2, 3, 4 - multiple consecutive holes) + std::vector versions = {0, 5}; + std::vector rowsets; + for (int64_t version : versions) { + auto rs_meta = std::make_shared(); + rs_meta->set_tablet_id(1005); + rs_meta->set_index_id(2); + rs_meta->set_partition_id(15673); + rs_meta->set_tablet_uid(UniqueId(9, 10)); + rs_meta->set_version(Version(version, version)); + rs_meta->set_rowset_type(BETA_ROWSET); + rs_meta->set_rowset_id(engine.next_rowset_id()); + rs_meta->set_num_rows(100); + rs_meta->set_empty(false); + rs_meta->set_tablet_schema(tablet->tablet_schema()); + + RowsetSharedPtr rowset; + auto status = RowsetFactory::create_rowset(tablet->tablet_schema(), tablet->tablet_path(), + rs_meta, &rowset); + EXPECT_TRUE(status.ok()); + rowsets.push_back(rowset); + } + + // Add all rowsets to tablet + { + std::unique_lock lock(tablet->get_header_lock()); + tablet->add_rowsets(std::move(rowsets), false, lock, false); + } + + // Initially we have 2 rowsets (versions 0, 5) + EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 2); + + // Test fill_version_holes to fill multiple consecutive holes (versions 1, 2, 3, 4) + std::unique_lock wlock(tablet->get_header_lock()); + Status status = meta_mgr.fill_version_holes(tablet.get(), 5, wlock); + EXPECT_TRUE(status.ok()); + + // After filling holes, we should have 6 rowsets (versions 0, 1, 2, 3, 4, 5) + EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 6); + + // Verify all versions are present + auto rs_metas = tablet->tablet_meta()->all_rs_metas(); + std::set found_versions; + for (const auto& rs_meta : rs_metas) { + found_versions.insert(rs_meta->version().first); + } + EXPECT_EQ(found_versions.size(), 6); + for (int64_t v = 0; v <= 5; ++v) { + EXPECT_TRUE(found_versions.contains(v)) << "Missing version " << v; + } + + // Verify the hole rowsets (versions 1, 2, 3, 4) are empty + for (const auto& rs_meta : rs_metas) { + if (rs_meta->version().first >= 1 && rs_meta->version().first <= 4) { + EXPECT_TRUE(rs_meta->empty()) + << "Version " << rs_meta->version().first << " should be empty"; + EXPECT_EQ(rs_meta->num_rows(), 0); + EXPECT_EQ(rs_meta->total_disk_size(), 0); + } else { + EXPECT_FALSE(rs_meta->empty()) + << "Version " << rs_meta->version().first << " should not be empty"; + EXPECT_EQ(rs_meta->num_rows(), 100); + } + } +} + +TEST_F(CloudMetaMgrTest, test_fill_version_holes_mixed_holes) { + CloudStorageEngine engine(EngineOptions {}); + CloudMetaMgr meta_mgr; + + TabletMetaSharedPtr tablet_meta( + new TabletMeta(1006, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, + UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F)); + auto tablet = std::make_shared(engine, std::make_shared(*tablet_meta)); + + // Add versions 0, 2, 5, 6 (missing 1, 3, 4 and potential trailing holes up to max_version) + std::vector versions = {0, 2, 5, 6}; + std::vector rowsets; + for (int64_t version : versions) { + auto rs_meta = std::make_shared(); + rs_meta->set_tablet_id(1006); + rs_meta->set_index_id(2); + rs_meta->set_partition_id(15673); + rs_meta->set_tablet_uid(UniqueId(9, 10)); + rs_meta->set_version(Version(version, version)); + rs_meta->set_rowset_type(BETA_ROWSET); + rs_meta->set_rowset_id(engine.next_rowset_id()); + rs_meta->set_num_rows(100); + rs_meta->set_empty(false); + rs_meta->set_tablet_schema(tablet->tablet_schema()); + + RowsetSharedPtr rowset; + auto status = RowsetFactory::create_rowset(tablet->tablet_schema(), tablet->tablet_path(), + rs_meta, &rowset); + EXPECT_TRUE(status.ok()); + rowsets.push_back(rowset); + } + + // Add all rowsets to tablet + { + std::unique_lock lock(tablet->get_header_lock()); + tablet->add_rowsets(std::move(rowsets), false, lock, false); + } + + // Initially we have 4 rowsets (versions 0, 2, 5, 6) + EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 4); + + // Test fill_version_holes with max_version = 8 (should fill 1, 3, 4, 7, 8) + std::unique_lock wlock(tablet->get_header_lock()); + Status status = meta_mgr.fill_version_holes(tablet.get(), 8, wlock); + EXPECT_TRUE(status.ok()); + + // After filling holes, we should have 9 rowsets (versions 0-8) + EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 9); + + // Verify all versions are present + auto rs_metas = tablet->tablet_meta()->all_rs_metas(); + std::set found_versions; + for (const auto& rs_meta : rs_metas) { + found_versions.insert(rs_meta->version().first); + } + EXPECT_EQ(found_versions.size(), 9); + for (int64_t v = 0; v <= 8; ++v) { + EXPECT_TRUE(found_versions.contains(v)) << "Missing version " << v; + } + + // Verify the hole rowsets (versions 1, 3, 4, 7, 8) are empty + std::set original_versions = {0, 2, 5, 6}; + std::set hole_versions = {1, 3, 4, 7, 8}; + for (const auto& rs_meta : rs_metas) { + int64_t version = rs_meta->version().first; + if (hole_versions.contains(version)) { + EXPECT_TRUE(rs_meta->empty()) << "Version " << version << " should be empty"; + EXPECT_EQ(rs_meta->num_rows(), 0); + EXPECT_EQ(rs_meta->total_disk_size(), 0); + } else if (original_versions.contains(version)) { + EXPECT_FALSE(rs_meta->empty()) << "Version " << version << " should not be empty"; + EXPECT_EQ(rs_meta->num_rows(), 100); + } + } +} + } // namespace doris diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index b51738bc47138f..5b33b53ea7e037 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -2736,7 +2736,8 @@ void MetaServiceImpl::get_partition_pending_txn_id(std::string_view instance_id, int64_t table_id, int64_t partition_id, int64_t tablet_id, std::stringstream& ss, MetaServiceCode& code, std::string& msg, - int64_t& first_txn_id, Transaction* txn) { + int64_t& first_txn_id, + int64_t& partition_version, Transaction* txn) { std::string ver_val; std::string ver_key = partition_version_key({instance_id, db_id, table_id, partition_id}); TxnErrorCode err = txn->get(ver_key, &ver_val); @@ -2769,6 +2770,7 @@ void MetaServiceImpl::get_partition_pending_txn_id(std::string_view instance_id, } else { first_txn_id = -1; } + partition_version = version_pb.version(); } void MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller, @@ -2842,13 +2844,15 @@ void MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller, // there is maybe a lazy commit txn when call get_rowset // we need advance lazy commit txn here int64_t first_txn_id = -1; + int64_t partition_version = -1; if (!is_versioned_read) { get_partition_pending_txn_id(instance_id, idx.db_id(), idx.table_id(), idx.partition_id(), tablet_id, ss, code, msg, - first_txn_id, txn.get()); + first_txn_id, partition_version, txn.get()); if (code != MetaServiceCode::OK) { return; } + response->set_partition_max_version(partition_version); } else { err = reader.get_partition_pending_txn_id(txn.get(), idx.partition_id(), &first_txn_id); diff --git a/cloud/src/meta-service/meta_service.h b/cloud/src/meta-service/meta_service.h index 10eb04a5ec729f..2c64c532fa93be 100644 --- a/cloud/src/meta-service/meta_service.h +++ b/cloud/src/meta-service/meta_service.h @@ -429,7 +429,8 @@ class MetaServiceImpl : public cloud::MetaService { void get_partition_pending_txn_id(std::string_view instance_id, int64_t db_id, int64_t table_id, int64_t partition_id, int64_t tablet_id, std::stringstream& ss, MetaServiceCode& code, - std::string& msg, int64_t& first_txn_id, Transaction* txn); + std::string& msg, int64_t& first_txn_id, + int64_t& partition_version, Transaction* txn); // Get versions in batch, Only for versioned read. std::pair batch_get_table_versions( diff --git a/cloud/src/meta-service/meta_service_job.cpp b/cloud/src/meta-service/meta_service_job.cpp index 779f9d5feaaafa..e76147b19b5c3f 100644 --- a/cloud/src/meta-service/meta_service_job.cpp +++ b/cloud/src/meta-service/meta_service_job.cpp @@ -1127,7 +1127,9 @@ void process_compaction_job(MetaServiceCode& code, std::string& msg, std::string TEST_SYNC_POINT_CALLBACK("process_compaction_job::loop_input_done", &num_rowsets); - if (num_rowsets < 1) { + // compaction.num_input_rowsets is 0 when multiple hole rowsets are compacted, + // we can continue to process the job for this case. + if (num_rowsets < 1 && compaction.num_input_rowsets() > 0) { SS << "too few input rowsets, tablet_id=" << tablet_id << " num_rowsets=" << num_rowsets; code = MetaServiceCode::UNDEFINED_ERR; msg = ss.str(); diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 324165e139a802..b16e5f76519769 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -2256,27 +2256,13 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id, RecyclerMetricsContext& TEST_SYNC_POINT_CALLBACK("InstanceRecycler::recycle_tablet.create_rowset_meta", &resp); for (const auto& rs_meta : resp.rowset_meta()) { - /* - * For compatibility, we skip the loop for [0-1] here. - * The purpose of this loop is to delete object files, - * and since [0-1] only has meta and doesn't have object files, - * skipping it doesn't affect system correctness. - * - * If not skipped, the check "if (!rs_meta.has_resource_id())" below - * would return error -1 directly, causing the recycle operation to fail. - * - * [0-1] doesn't have resource id is a bug. - * In the future, we will fix this problem, after that, - * we can remove this if statement. - * - * TODO(Yukang-Lian): remove this if statement when [0-1] has resource id in the future. - */ - - if (rs_meta.end_version() == 1) { - // Assert that [0-1] has no resource_id to make sure - // this if statement will not be forgetted to remove - // when the resource id bug is fixed - DCHECK(!rs_meta.has_resource_id()) << "rs_meta" << rs_meta.ShortDebugString(); + // The rowset has no resource id and segments when it was generated by compaction + // with multiple hole rowsets or it's version is [0-1], so we can skip it. + if (!rs_meta.has_resource_id() && rs_meta.num_segments() == 0) { + LOG_INFO("rowset meta does not have a resource id and no segments, skip this rowset") + .tag("rs_meta", rs_meta.ShortDebugString()) + .tag("instance_id", instance_id_) + .tag("tablet_id", tablet_id); recycle_rowsets_number += 1; continue; } diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index f9034c79234c7c..698dc0f6520a57 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -1233,6 +1233,8 @@ message GetRowsetResponse { optional TabletStatsPB stats = 3; // Return dict value if SchemaOp is RETURN_DICT optional SchemaCloudDictionary schema_dict = 4; + // The current max version of the partition + optional int64 partition_max_version = 5; } message GetSchemaDictRequest {