Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ Status CloudBaseCompaction::modify_rowsets() {
compaction_job->set_size_output_rowsets(_output_rowset->total_disk_size());
compaction_job->set_num_input_segments(_input_segments);
compaction_job->set_num_output_segments(_output_rowset->num_segments());
compaction_job->set_num_input_rowsets(_input_rowsets.size());
compaction_job->set_num_input_rowsets(num_input_rowsets());
compaction_job->set_num_output_rowsets(1);
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ Status CloudCumulativeCompaction::modify_rowsets() {
compaction_job->set_size_output_rowsets(_output_rowset->total_disk_size());
compaction_job->set_num_input_segments(_input_segments);
compaction_job->set_num_output_segments(_output_rowset->num_segments());
compaction_job->set_num_input_rowsets(_input_rowsets.size());
compaction_job->set_num_input_rowsets(num_input_rowsets());
compaction_job->set_num_output_rowsets(1);
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
Expand Down
27 changes: 24 additions & 3 deletions be/src/cloud/cloud_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_rowset_builder.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/config.h"
#include "olap/delta_writer.h"
#include "runtime/thread_context.h"

Expand Down Expand Up @@ -108,11 +109,31 @@ const RowsetMetaSharedPtr& CloudDeltaWriter::rowset_meta() {

Status CloudDeltaWriter::commit_rowset() {
std::lock_guard<bthread::Mutex> lock(_mtx);

// Handle empty rowset (no data written)
if (!_is_init) {
// No data to write, but still need to write a empty rowset kv to keep version continuous
RETURN_IF_ERROR(_rowset_builder->init());
RETURN_IF_ERROR(_rowset_builder->build_rowset());
return _commit_empty_rowset();
}

// Handle normal rowset with data
return _engine.meta_mgr().commit_rowset(*rowset_meta(), "");
}

Status CloudDeltaWriter::_commit_empty_rowset() {
// If skip writing empty rowset metadata is enabled,
// we do not prepare rowset to meta service.
if (config::skip_writing_empty_rowset_metadata) {
rowset_builder()->set_skip_writing_rowset_metadata(true);
}

RETURN_IF_ERROR(_rowset_builder->init());
RETURN_IF_ERROR(_rowset_builder->build_rowset());

// If skip writing empty rowset metadata is enabled, we do not commit rowset to meta service.
if (config::skip_writing_empty_rowset_metadata) {
return Status::OK();
}
// write a empty rowset kv to keep version continuous
return _engine.meta_mgr().commit_rowset(*rowset_meta(), "");
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class CloudDeltaWriter final : public BaseDeltaWriter {
// Convert `_rowset_builder` from `BaseRowsetBuilder` to `CloudRowsetBuilder`
CloudRowsetBuilder* rowset_builder();

// Handle commit for empty rowset (when no data is written)
Status _commit_empty_rowset();

bthread::Mutex _mtx;
CloudStorageEngine& _engine;
std::shared_ptr<ResourceContext> _resource_ctx;
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ Status CloudFullCompaction::modify_rowsets() {
})
compaction_job->set_num_input_segments(_input_segments);
compaction_job->set_num_output_segments(_output_rowset->num_segments());
compaction_job->set_num_input_rowsets(_input_rowsets.size());
compaction_job->set_num_input_rowsets(num_input_rowsets());
compaction_job->set_num_output_rowsets(1);
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
Expand Down
123 changes: 123 additions & 0 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ bvar::Window<bvar::Adder<uint64_t>> g_cloud_ms_rpc_timeout_count_window(
"cloud_meta_mgr_rpc_timeout_qps", &g_cloud_meta_mgr_rpc_timeout_count, 30);
bvar::LatencyRecorder g_cloud_be_mow_get_dbm_lock_backoff_sleep_time(
"cloud_be_mow_get_dbm_lock_backoff_sleep_time");
bvar::Adder<uint64_t> g_cloud_version_hole_filled_count("cloud_version_hole_filled_count");

class MetaServiceProxy {
public:
Expand Down Expand Up @@ -740,6 +741,12 @@ Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
options.warmup_delta_data ||
config::enable_warmup_immediately_on_new_rowset);
}

// Fill version holes
int64_t partition_max_version =
resp.has_partition_max_version() ? resp.partition_max_version() : -1;
RETURN_IF_ERROR(fill_version_holes(tablet, partition_max_version, wlock));

tablet->last_base_compaction_success_time_ms = stats.last_base_compaction_time_ms();
tablet->last_cumu_compaction_success_time_ms = stats.last_cumu_compaction_time_ms();
tablet->set_base_compaction_cnt(stats.base_compaction_cnt());
Expand Down Expand Up @@ -1590,5 +1597,121 @@ int64_t CloudMetaMgr::get_inverted_index_file_szie(const RowsetMeta& rs_meta) {
return total_inverted_index_size;
}

Status CloudMetaMgr::fill_version_holes(CloudTablet* tablet, int64_t max_version,
std::unique_lock<std::shared_mutex>& wlock) {
if (max_version <= 0) {
return Status::OK();
}

Versions existing_versions;
for (const auto& rs : tablet->tablet_meta()->all_rs_metas()) {
existing_versions.emplace_back(rs->version());
}

// If there are no existing versions, it may be a new tablet for restore, so skip filling holes.
if (existing_versions.empty()) {
return Status::OK();
}

std::vector<RowsetSharedPtr> hole_rowsets;
// sort the existing versions in ascending order
std::sort(existing_versions.begin(), existing_versions.end(),
[](const Version& a, const Version& b) {
// simple because 2 versions are certainly not overlapping
return a.first < b.first;
});

int64_t last_version = -1;
for (const Version& version : existing_versions) {
// missing versions are those that are not in the existing_versions
if (version.first > last_version + 1) {
// there is a hole between versions
auto prev_non_hole_rowset = tablet->get_rowset_by_version(version);
for (int64_t ver = last_version + 1; ver < version.first; ++ver) {
RowsetSharedPtr hole_rowset;
RETURN_IF_ERROR(create_empty_rowset_for_hole(
tablet, ver, prev_non_hole_rowset->rowset_meta(), &hole_rowset));
hole_rowsets.push_back(hole_rowset);
}
LOG(INFO) << "Created empty rowset for version hole, from " << last_version + 1
<< " to " << version.first - 1 << " for tablet " << tablet->tablet_id();
}
last_version = version.second;
}

if (last_version + 1 <= max_version) {
LOG(INFO) << "Created empty rowset for version hole, from " << last_version + 1 << " to "
<< max_version << " for tablet " << tablet->tablet_id();
for (; last_version + 1 <= max_version; ++last_version) {
RowsetSharedPtr hole_rowset;
auto prev_non_hole_rowset = tablet->get_rowset_by_version(existing_versions.back());
RETURN_IF_ERROR(create_empty_rowset_for_hole(
tablet, last_version + 1, prev_non_hole_rowset->rowset_meta(), &hole_rowset));
hole_rowsets.push_back(hole_rowset);
}
}

if (!hole_rowsets.empty()) {
size_t hole_count = hole_rowsets.size();
tablet->add_rowsets(std::move(hole_rowsets), false, wlock, false);
g_cloud_version_hole_filled_count << hole_count;
}
return Status::OK();
}

Status CloudMetaMgr::create_empty_rowset_for_hole(CloudTablet* tablet, int64_t version,
RowsetMetaSharedPtr prev_rowset_meta,
RowsetSharedPtr* rowset) {
// Create a RowsetMeta for the empty rowset
auto rs_meta = std::make_shared<RowsetMeta>();

// Generate a deterministic rowset ID for the hole (same tablet_id + version = same rowset_id)
RowsetId hole_rowset_id;
hole_rowset_id.init(2, 0, tablet->tablet_id(), version);
rs_meta->set_rowset_id(hole_rowset_id);

// Generate a deterministic load_id for the hole rowset (same tablet_id + version = same load_id)
PUniqueId load_id;
load_id.set_hi(tablet->tablet_id());
load_id.set_lo(version);
rs_meta->set_load_id(load_id);

// Copy schema and other metadata from template
rs_meta->set_tablet_schema(prev_rowset_meta->tablet_schema());
rs_meta->set_rowset_type(prev_rowset_meta->rowset_type());
rs_meta->set_tablet_schema_hash(prev_rowset_meta->tablet_schema_hash());
rs_meta->set_resource_id(prev_rowset_meta->resource_id());

// Basic tablet information
rs_meta->set_tablet_id(tablet->tablet_id());
rs_meta->set_index_id(tablet->index_id());
rs_meta->set_partition_id(tablet->partition_id());
rs_meta->set_tablet_uid(tablet->tablet_uid());
rs_meta->set_version(Version(version, version));
rs_meta->set_txn_id(version);

rs_meta->set_num_rows(0);
rs_meta->set_total_disk_size(0);
rs_meta->set_data_disk_size(0);
rs_meta->set_index_disk_size(0);
rs_meta->set_empty(true);
rs_meta->set_num_segments(0);
rs_meta->set_segments_overlap(NONOVERLAPPING);
rs_meta->set_rowset_state(VISIBLE);
rs_meta->set_creation_time(UnixSeconds());
rs_meta->set_newest_write_timestamp(UnixSeconds());

Status s = RowsetFactory::create_rowset(nullptr, "", rs_meta, rowset);
if (!s.ok()) {
LOG_WARNING("Failed to create empty rowset for hole")
.tag("tablet_id", tablet->tablet_id())
.tag("version", version)
.error(s);
return s;
}
(*rowset)->set_hole_rowset(true);

return Status::OK();
}
#include "common/compile_check_end.h"
} // namespace doris::cloud
11 changes: 11 additions & 0 deletions be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "cloud/cloud_tablet.h"
#include "common/status.h"
#include "olap/rowset/rowset_fwd.h"
#include "olap/rowset/rowset_meta.h"
#include "util/s3_util.h"

Expand Down Expand Up @@ -148,6 +149,15 @@ class CloudMetaMgr {
void remove_delete_bitmap_update_lock(int64_t table_id, int64_t lock_id, int64_t initiator,
int64_t tablet_id);

// Fill version holes by creating empty rowsets for missing versions
Status fill_version_holes(CloudTablet* tablet, int64_t max_version,
std::unique_lock<std::shared_mutex>& wlock);

// Create an empty rowset to fill a version hole
Status create_empty_rowset_for_hole(CloudTablet* tablet, int64_t version,
RowsetMetaSharedPtr prev_rowset_meta,
RowsetSharedPtr* rowset);

private:
bool sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64_t old_max_version,
std::ranges::range auto&& rs_metas,
Expand All @@ -157,6 +167,7 @@ class CloudMetaMgr {
std::ranges::range auto&& rs_metas, const TabletStatsPB& stats,
const TabletIndexPB& idx, DeleteBitmap* delete_bitmap,
bool full_sync = false, SyncRowsetStats* sync_stats = nullptr);

void check_table_size_correctness(const RowsetMeta& rs_meta);
int64_t get_segment_file_size(const RowsetMeta& rs_meta);
int64_t get_inverted_index_file_szie(const RowsetMeta& rs_meta);
Expand Down
4 changes: 3 additions & 1 deletion be/src/cloud/cloud_rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ Status CloudRowsetBuilder::init() {

_calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor()->create_token();

RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_rowset_writer->rowset_meta(), ""));
if (!_skip_writing_rowset_metadata) {
RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_rowset_writer->rowset_meta(), ""));
}

_is_init = true;
return Status::OK();
Expand Down
6 changes: 6 additions & 0 deletions be/src/cloud/cloud_rowset_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,19 @@ class CloudRowsetBuilder final : public BaseRowsetBuilder {

Status set_txn_related_delete_bitmap();

void set_skip_writing_rowset_metadata(bool skip) { _skip_writing_rowset_metadata = skip; }

private:
// Convert `_tablet` from `BaseTablet` to `CloudTablet`
CloudTablet* cloud_tablet();

Status check_tablet_version_count();

CloudStorageEngine& _engine;

// whether to skip writing rowset metadata to meta service.
// This is used for empty rowset when config::skip_writing_empty_rowset_metadata is true.
bool _skip_writing_rowset_metadata = false;
};

} // namespace doris
9 changes: 7 additions & 2 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,6 @@ std::vector<RecycledRowsets> CloudTablet::recycle_cached_data(

void CloudTablet::reset_approximate_stats(int64_t num_rowsets, int64_t num_segments,
int64_t num_rows, int64_t data_size) {
_approximate_num_rowsets.store(num_rowsets, std::memory_order_relaxed);
_approximate_num_segments.store(num_segments, std::memory_order_relaxed);
_approximate_num_rows.store(num_rows, std::memory_order_relaxed);
_approximate_data_size.store(data_size, std::memory_order_relaxed);
Expand All @@ -712,10 +711,16 @@ void CloudTablet::reset_approximate_stats(int64_t num_rowsets, int64_t num_segme
if (v.second < cp) {
continue;
}

cumu_num_deltas += r->is_segments_overlapping() ? r->num_segments() : 1;
++cumu_num_rowsets;
}
// num_rowsets may be less than the size of _rs_version_map when there are some hole rowsets
// in the version map, so we use the max value to ensure that the approximate number
// of rowsets is at least the size of _rs_version_map.
// Note that this is not the exact number of rowsets, but an approximate number.
int64_t approximate_num_rowsets =
std::max(num_rowsets, static_cast<int64_t>(_rs_version_map.size()));
_approximate_num_rowsets.store(approximate_num_rowsets, std::memory_order_relaxed);
_approximate_cumu_num_rowsets.store(cumu_num_rowsets, std::memory_order_relaxed);
_approximate_cumu_num_deltas.store(cumu_num_deltas, std::memory_order_relaxed);
}
Expand Down
7 changes: 6 additions & 1 deletion be/src/cloud/cloud_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "cloud/cloud_delta_writer.h"
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/config.h"
#include "olap/delta_writer.h"
#include "runtime/tablets_channel.h"

Expand Down Expand Up @@ -62,6 +63,7 @@ Status CloudTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& reques
_build_tablet_to_rowidxs(request, &tablet_to_rowidxs);

std::unordered_set<int64_t> partition_ids;
std::vector<CloudDeltaWriter*> writers;
{
// add_batch may concurrency with inc_open but not under _lock.
// so need to protect it with _tablet_writers_lock.
Expand All @@ -72,8 +74,11 @@ Status CloudTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& reques
return Status::InternalError("unknown tablet to append data, tablet={}", tablet_id);
}
partition_ids.insert(tablet_writer_it->second->partition_id());
writers.push_back(static_cast<CloudDeltaWriter*>(tablet_writer_it->second.get()));
}
if (!partition_ids.empty()) {
if (config::skip_writing_empty_rowset_metadata && !writers.empty()) {
RETURN_IF_ERROR(CloudDeltaWriter::batch_init(writers));
} else if (!partition_ids.empty()) {
RETURN_IF_ERROR(_init_writers_by_partition_ids(partition_ids));
}
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ DEFINE_mInt32(meta_service_conflict_error_retry_times, "10");

DEFINE_Bool(enable_check_storage_vault, "true");

DEFINE_mBool(skip_writing_empty_rowset_metadata, "true");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

默认false 比较好


DEFINE_mInt64(warmup_tablet_replica_info_cache_ttl_sec, "600");

DEFINE_mInt64(warm_up_rowset_slow_log_ms, "1000");
Expand Down
3 changes: 3 additions & 0 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ DECLARE_mInt32(delete_bitmap_lock_expiration_seconds);

DECLARE_mInt32(get_delete_bitmap_lock_max_retry_times);

// Skip writing empty rowset metadata to meta service
DECLARE_mBool(skip_writing_empty_rowset_metadata);

// enable large txn lazy commit in meta-service `commit_txn`
DECLARE_mBool(enable_cloud_txn_lazy_commit);

Expand Down
23 changes: 21 additions & 2 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1541,8 +1541,16 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
}

// Use the storage resource of the previous rowset
ctx.storage_resource =
*DORIS_TRY(_input_rowsets.back()->rowset_meta()->remote_storage_resource());
// when multiple hole rowsets doing compaction, those rowsets may not have a storage resource.
// case:
// [0-1, 2-2, 3-3, 4-4, 5-5], 2-5 are hole rowsets.
// 0-1 current doesn't have a resource_id, so 2-5 also have no resource_id.
// Because there is no data to write, so we can skip setting the storage resource.
if (!_input_rowsets.back()->is_hole_rowset() ||
!_input_rowsets.back()->rowset_meta()->resource_id().empty()) {
ctx.storage_resource =
*DORIS_TRY(_input_rowsets.back()->rowset_meta()->remote_storage_resource());
}

ctx.txn_id = boost::uuids::hash_value(UUIDGenerator::instance()->next_uuid()) &
std::numeric_limits<int64_t>::max(); // MUST be positive
Expand Down Expand Up @@ -1597,5 +1605,16 @@ void CloudCompactionMixin::update_compaction_level() {
}
}

// should skip hole rowsets, ortherwise the count will be wrong in ms
int64_t CloudCompactionMixin::num_input_rowsets() const {
int64_t count = 0;
for (const auto& r : _input_rowsets) {
if (!r->is_hole_rowset()) {
count++;
}
}
return count;
}

#include "common/compile_check_end.h"
} // namespace doris
Loading
Loading