Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ Status CloudBaseCompaction::modify_rowsets() {
// the tablet to be unable to synchronize the rowset meta changes generated by cumu compaction.
cloud_tablet()->set_base_compaction_cnt(stats.base_compaction_cnt());
if (output_rowset_delete_bitmap) {
_tablet->tablet_meta()->delete_bitmap().merge(*output_rowset_delete_bitmap);
_tablet->tablet_meta()->delete_bitmap()->merge(*output_rowset_delete_bitmap);
}
if (stats.cumulative_compaction_cnt() >= cloud_tablet()->cumulative_compaction_cnt()) {
cloud_tablet()->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(),
Expand Down
10 changes: 5 additions & 5 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ Status CloudCumulativeCompaction::modify_rowsets() {
cloud_tablet()->set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt());
cloud_tablet()->set_cumulative_layer_point(stats.cumulative_point());
if (output_rowset_delete_bitmap) {
_tablet->tablet_meta()->delete_bitmap().merge(*output_rowset_delete_bitmap);
_tablet->tablet_meta()->delete_bitmap()->merge(*output_rowset_delete_bitmap);
}
if (stats.base_compaction_cnt() >= cloud_tablet()->base_compaction_cnt()) {
cloud_tablet()->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(),
Expand Down Expand Up @@ -416,7 +416,7 @@ Status CloudCumulativeCompaction::process_old_version_delete_bitmap() {
rowset->rowset_id().to_string();
DeleteBitmap::BitmapKey start {rowset->rowset_id(), seg_id, 0};
DeleteBitmap::BitmapKey end {rowset->rowset_id(), seg_id, pre_max_version};
auto d = _tablet->tablet_meta()->delete_bitmap().get_agg(
auto d = _tablet->tablet_meta()->delete_bitmap()->get_agg(
{rowset->rowset_id(), seg_id, pre_max_version});
to_remove_vec.emplace_back(std::make_tuple(_tablet->tablet_id(), start, end));
if (d->isEmpty()) {
Expand All @@ -440,10 +440,10 @@ Status CloudCumulativeCompaction::process_old_version_delete_bitmap() {
_input_rowsets.back()->end_version());
for (auto it = new_delete_bitmap->delete_bitmap.begin();
it != new_delete_bitmap->delete_bitmap.end(); it++) {
_tablet->tablet_meta()->delete_bitmap().set(it->first, it->second);
_tablet->tablet_meta()->delete_bitmap()->set(it->first, it->second);
}
_tablet->tablet_meta()->delete_bitmap().add_to_remove_queue(version.to_string(),
to_remove_vec);
_tablet->tablet_meta()->delete_bitmap()->add_to_remove_queue(version.to_string(),
to_remove_vec);
DBUG_EXECUTE_IF(
"CloudCumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets",
{ static_cast<CloudTablet*>(_tablet.get())->delete_expired_stale_rowsets(); });
Expand Down
10 changes: 6 additions & 4 deletions be/src/cloud/cloud_full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "common/status.h"
#include "cpp/sync_point.h"
#include "gen_cpp/cloud.pb.h"
#include "olap/base_tablet.h"
#include "olap/compaction.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/tablet_meta.h"
Expand Down Expand Up @@ -273,7 +274,7 @@ Status CloudFullCompaction::modify_rowsets() {
cloud_tablet()->set_base_compaction_cnt(stats.base_compaction_cnt());
cloud_tablet()->set_cumulative_layer_point(stats.cumulative_point());
if (output_rowset_delete_bitmap) {
_tablet->tablet_meta()->delete_bitmap().merge(*output_rowset_delete_bitmap);
_tablet->tablet_meta()->delete_bitmap()->merge(*output_rowset_delete_bitmap);
}
if (stats.cumulative_compaction_cnt() >= cloud_tablet()->cumulative_compaction_cnt()) {
cloud_tablet()->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(),
Expand Down Expand Up @@ -340,8 +341,9 @@ Status CloudFullCompaction::_cloud_full_compaction_update_delete_bitmap(int64_t
int64_t max_version = cloud_tablet()->max_version().second;
DCHECK(max_version >= _output_rowset->version().second);
if (max_version > _output_rowset->version().second) {
RETURN_IF_ERROR(cloud_tablet()->capture_consistent_rowsets_unlocked(
{_output_rowset->version().second + 1, max_version}, &tmp_rowsets));
auto ret = DORIS_TRY(cloud_tablet()->capture_consistent_rowsets_unlocked(
{_output_rowset->version().second + 1, max_version}, CaptureRowsetOps {}));
tmp_rowsets = std::move(ret.rowsets);
}
for (const auto& it : tmp_rowsets) {
int64_t cur_version = it->rowset_meta()->start_version();
Expand Down Expand Up @@ -372,7 +374,7 @@ Status CloudFullCompaction::_cloud_full_compaction_update_delete_bitmap(int64_t
.tag("input_segments", _input_segments)
.tag("input_rowsets_total_size", _input_rowsets_total_size)
.tag("update_bitmap_size", delete_bitmap->delete_bitmap.size());
_tablet->tablet_meta()->delete_bitmap().merge(*delete_bitmap);
_tablet->tablet_meta()->delete_bitmap()->merge(*delete_bitmap);
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
.error(st);
return st;
}
tablet->tablet_meta()->delete_bitmap().merge(delete_bitmap);
tablet->tablet_meta()->delete_bitmap()->merge(delete_bitmap);
if (config::enable_mow_verbose_log && !resp.rowset_meta().empty() &&
delete_bitmap.cardinality() > 0) {
std::vector<std::string> new_rowset_msgs;
Expand Down
26 changes: 13 additions & 13 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_tablet_mgr.h"
#include "common/status.h"
#include "olap/base_tablet.h"
#include "olap/delete_handler.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
Expand Down Expand Up @@ -186,7 +187,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque
reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx();
reader_context.is_unique = _base_tablet->keys_type() == UNIQUE_KEYS;
reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
reader_context.delete_bitmap = &_base_tablet->tablet_meta()->delete_bitmap();
reader_context.delete_bitmap = _base_tablet->tablet_meta()->delete_bitmap();
reader_context.version = Version(0, start_resp.alter_version());

for (auto& split : rs_splits) {
Expand Down Expand Up @@ -457,7 +458,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
.tag("alter_version", alter_version);
RETURN_IF_ERROR(_cloud_storage_engine.register_compaction_stop_token(_new_tablet, initiator));
TabletMetaSharedPtr tmp_meta = std::make_shared<TabletMeta>(*(_new_tablet->tablet_meta()));
tmp_meta->delete_bitmap().delete_bitmap.clear();
tmp_meta->delete_bitmap()->delete_bitmap.clear();
std::shared_ptr<CloudTablet> tmp_tablet =
std::make_shared<CloudTablet>(_cloud_storage_engine, tmp_meta);
{
Expand All @@ -466,22 +467,21 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
}

// step 1, process incremental rowset without delete bitmap update lock
std::vector<RowsetSharedPtr> incremental_rowsets;
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
int64_t max_version = tmp_tablet->max_version().second;
LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
<< "incremental rowsets without lock, version: " << start_calc_delete_bitmap_version
<< "-" << max_version << " new_table_id: " << _new_tablet->tablet_id();
if (max_version >= start_calc_delete_bitmap_version) {
RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked(
{start_calc_delete_bitmap_version, max_version}, &incremental_rowsets));
auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked(
{start_calc_delete_bitmap_version, max_version}, CaptureRowsetOps {}));
DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock",
DBUG_BLOCK);
{
std::unique_lock wlock(tmp_tablet->get_header_lock());
tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
}
for (auto rowset : incremental_rowsets) {
for (auto rowset : ret.rowsets) {
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset));
}
}
Expand All @@ -497,15 +497,14 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
<< "incremental rowsets with lock, version: " << max_version + 1 << "-"
<< new_max_version << " new_tablet_id: " << _new_tablet->tablet_id();
std::vector<RowsetSharedPtr> new_incremental_rowsets;
if (new_max_version > max_version) {
RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked(
{max_version + 1, new_max_version}, &new_incremental_rowsets));
auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked(
{max_version + 1, new_max_version}, CaptureRowsetOps {}));
{
std::unique_lock wlock(tmp_tablet->get_header_lock());
tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
}
for (auto rowset : new_incremental_rowsets) {
for (auto rowset : ret.rowsets) {
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset));
}
}
Expand All @@ -522,13 +521,14 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
}
});

auto& delete_bitmap = tmp_tablet->tablet_meta()->delete_bitmap();
auto delete_bitmap = tmp_tablet->tablet_meta()->delete_bitmap();

// step4, store delete bitmap
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().update_delete_bitmap(
*_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator, &delete_bitmap));
*_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator, delete_bitmap.get()));

_new_tablet->tablet_meta()->delete_bitmap() = delete_bitmap;
auto original_dbm = _new_tablet->tablet_meta()->delete_bitmap();
*original_dbm = std::move(*delete_bitmap);
return Status::OK();
}

Expand Down
45 changes: 7 additions & 38 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "common/logging.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "olap/base_tablet.h"
#include "olap/compaction.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/olap_define.h"
Expand Down Expand Up @@ -69,23 +70,6 @@ bool CloudTablet::exceed_version_limit(int32_t limit) {
return _approximate_num_rowsets.load(std::memory_order_relaxed) > limit;
}

Status CloudTablet::capture_consistent_rowsets_unlocked(
const Version& spec_version, std::vector<RowsetSharedPtr>* rowsets) const {
Versions version_path;
auto st = _timestamped_version_tracker.capture_consistent_versions(spec_version, &version_path);
if (!st.ok()) {
// Check no missed versions or req version is merged
auto missed_versions = get_missed_versions(spec_version.second);
if (missed_versions.empty()) {
st.set_code(VERSION_ALREADY_MERGED); // Reset error code
}
st.append(" tablet_id=" + std::to_string(tablet_id()));
return st;
}
VLOG_DEBUG << "capture consitent versions: " << version_path;
return _capture_consistent_rowsets_unlocked(version_path, rowsets);
}

std::string CloudTablet::tablet_path() const {
return "";
}
Expand All @@ -97,25 +81,10 @@ Status CloudTablet::capture_rs_readers(const Version& spec_version,
LOG_WARNING("CloudTablet.capture_rs_readers.return e-230").tag("tablet_id", tablet_id());
return Status::Error<false>(-230, "injected error");
});
Versions version_path;
std::shared_lock rlock(_meta_lock);
auto st = _timestamped_version_tracker.capture_consistent_versions(spec_version, &version_path);
if (!st.ok()) {
rlock.unlock(); // avoid logging in lock range
// Check no missed versions or req version is merged
auto missed_versions = get_missed_versions(spec_version.second);
if (missed_versions.empty()) {
st.set_code(VERSION_ALREADY_MERGED); // Reset error code
st.append(" versions are already compacted, ");
}
st.append(" tablet_id=" + std::to_string(tablet_id()));
// clang-format off
LOG(WARNING) << st << '\n' << [this]() { std::string json; get_compaction_status(&json); return json; }();
// clang-format on
return st;
}
VLOG_DEBUG << "capture consitent versions: " << version_path;
return capture_rs_readers_unlocked(version_path, rs_splits);
*rs_splits = DORIS_TRY(capture_rs_readers_unlocked(
spec_version, CaptureRowsetOps {.skip_missing_versions = skip_missing_version}));
return Status::OK();
}

Status CloudTablet::merge_rowsets_schema() {
Expand Down Expand Up @@ -461,7 +430,7 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
}
_reconstruct_version_tracker_if_necessary();
}
_tablet_meta->delete_bitmap().remove_stale_delete_bitmap_from_queue(version_to_delete);
_tablet_meta->delete_bitmap()->remove_stale_delete_bitmap_from_queue(version_to_delete);
recycle_cached_data(expired_rowsets);
if (config::enable_mow_verbose_log) {
LOG_INFO("finish delete_expired_stale_rowset for tablet={}", tablet_id());
Expand Down Expand Up @@ -967,7 +936,7 @@ Status CloudTablet::calc_delete_bitmap_for_compaction(
std::size_t missed_rows_size = 0;
calc_compaction_output_rowset_delete_bitmap(
input_rowsets, rowid_conversion, 0, version.second + 1, missed_rows.get(),
location_map.get(), tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get());
location_map.get(), *tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get());
if (missed_rows) {
missed_rows_size = missed_rows->size();
if (!allow_delete_in_cumu_compaction) {
Expand Down Expand Up @@ -1002,7 +971,7 @@ Status CloudTablet::calc_delete_bitmap_for_compaction(

calc_compaction_output_rowset_delete_bitmap(
input_rowsets, rowid_conversion, version.second, UINT64_MAX, missed_rows.get(),
location_map.get(), tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get());
location_map.get(), *tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get());
int64_t t4 = MonotonicMicros();
if (location_map) {
RETURN_IF_ERROR(check_rowid_conversion(output_rowset, *location_map));
Expand Down
3 changes: 0 additions & 3 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ class CloudTablet final : public BaseTablet {
Status capture_rs_readers(const Version& spec_version, std::vector<RowSetSplits>* rs_splits,
bool skip_missing_version) override;

Status capture_consistent_rowsets_unlocked(
const Version& spec_version, std::vector<RowsetSharedPtr>* rowsets) const override;

size_t tablet_footprint() override {
return _approximate_data_size.load(std::memory_order_relaxed);
}
Expand Down
20 changes: 13 additions & 7 deletions be/src/cloud/cloud_tablet_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "olap/lru_cache.h"
#include "runtime/memory/cache_policy.h"
Expand Down Expand Up @@ -154,7 +155,7 @@ void set_tablet_access_time_ms(CloudTablet* tablet) {
Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_id, bool warmup_data,
bool sync_delete_bitmap,
SyncRowsetStats* sync_stats,
bool force_use_cache) {
bool local_only) {
// LRU value type. `Value`'s lifetime MUST NOT be longer than `CloudTabletMgr`
class Value : public LRUCacheValueBase {
public:
Expand All @@ -172,12 +173,17 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i
CacheKey key(tablet_id_str);
auto* handle = _cache->lookup(key);

if (handle == nullptr && force_use_cache) {
return ResultError(
Status::InternalError("failed to get cloud tablet from cache {}", tablet_id));
}

if (handle == nullptr) {
if (local_only) {
LOG(INFO) << "tablet=" << tablet_id
<< "does not exists in local tablet cache, because param local_only=true, "
"treat it as an error";
return ResultError(Status::InternalError(
"tablet={} does not exists in local tablet cache, because param "
"local_only=true, "
"treat it as an error",
tablet_id));
}
if (sync_stats) {
++sync_stats->tablet_meta_cache_miss;
}
Expand Down Expand Up @@ -475,7 +481,7 @@ void CloudTabletMgr::get_topn_tablet_delete_bitmap_score(
auto t = tablet_wk.lock();
if (!t) return;
uint64_t delete_bitmap_count =
t.get()->tablet_meta()->delete_bitmap().get_delete_bitmap_count();
t.get()->tablet_meta()->delete_bitmap()->get_delete_bitmap_count();
total_delete_map_count += delete_bitmap_count;
if (delete_bitmap_count > *max_delete_bitmap_score) {
max_delete_bitmap_score_tablet_id = t->tablet_id();
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablet_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class CloudTabletMgr {
Result<std::shared_ptr<CloudTablet>> get_tablet(int64_t tablet_id, bool warmup_data = false,
bool sync_delete_bitmap = true,
SyncRowsetStats* sync_stats = nullptr,
bool force_use_cache = false);
bool local_only = false);

void erase_tablet(int64_t tablet_id);

Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1511,6 +1511,7 @@ DEFINE_mBool(enable_compaction_pause_on_high_memory, "true");

DEFINE_mBool(enable_calc_delete_bitmap_between_segments_concurrently, "false");

DEFINE_mBool(enable_fetch_rowsets_from_peer_replicas, "false");
// the max length of segments key bounds, in bytes
// ATTENTION: as long as this conf has ever been enabled, cluster downgrade and backup recovery will no longer be supported.
DEFINE_mInt32(segments_key_bounds_truncation_threshold, "-1");
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1585,6 +1585,7 @@ DECLARE_mBool(enable_compaction_pause_on_high_memory);

DECLARE_mBool(enable_calc_delete_bitmap_between_segments_concurrently);

DECLARE_mBool(enable_fetch_rowsets_from_peer_replicas);
// the max length of segments key bounds, in bytes
// ATTENTION: as long as this conf has ever been enabled, cluster downgrade and backup recovery will no longer be supported.
DECLARE_mInt32(segments_key_bounds_truncation_threshold);
Expand Down
Loading
Loading