Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
Lchangliang committed Jul 29, 2024
1 parent 1cee3af commit c1da59a
Show file tree
Hide file tree
Showing 29 changed files with 1,201 additions and 40 deletions.
16 changes: 12 additions & 4 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,20 @@ Status CloudBaseCompaction::prepare_compact() {
// tablet not found
cloud_tablet()->clear_cache();
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION_FAIL) {
(dynamic_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
auto* cloud_tablet = (static_cast<CloudTablet*>(_tablet.get()));
std::stringstream ss;
ss << "failed to prepare cumu compaction. Check compaction input versions "
"failed in schema change. "
"failed in schema change. The input version end must "
"less than or equal to alter_version."
"current alter version in BE is not correct."
"input_version_start="
<< compaction_job->input_versions(0)
<< " input_version_end=" << compaction_job->input_versions(1)
<< " current alter_version=" << cloud_tablet->alter_version()
<< " schema_change_alter_version=" << resp.alter_version();
std::string msg = ss.str();
LOG(WARNING) << msg;
cloud_tablet->set_alter_version(resp.alter_version());
return Status::InternalError(msg);
}
return st;
Expand Down Expand Up @@ -329,16 +333,20 @@ Status CloudBaseCompaction::modify_rowsets() {
if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
cloud_tablet()->clear_cache();
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION_FAIL) {
(dynamic_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
auto* cloud_tablet = (static_cast<CloudTablet*>(_tablet.get()));
std::stringstream ss;
ss << "failed to prepare cumu compaction. Check compaction input versions "
"failed in schema change. "
"failed in schema change. The input version end must "
"less than or equal to alter_version."
"current alter version in BE is not correct."
"input_version_start="
<< compaction_job->input_versions(0)
<< " input_version_end=" << compaction_job->input_versions(1)
<< " current alter_version=" << cloud_tablet->alter_version()
<< " schema_change_alter_version=" << resp.alter_version();
std::string msg = ss.str();
LOG(WARNING) << msg;
cloud_tablet->set_alter_version(resp.alter_version());
return Status::InternalError(msg);
}
return st;
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ Status CloudCumulativeCompaction::prepare_compact() {

compaction_job->add_input_versions(_input_rowsets.front()->start_version());
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
// Set input version range to let meta-service judge version range conflict
compaction_job->set_judge_input_versions_range(config::enable_parallel_cumu_compaction);
// Set input version range to let meta-service check version range conflict
compaction_job->set_check_input_versions_range(config::enable_parallel_cumu_compaction);
cloud::StartTabletJobResponse resp;
st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
if (!st.ok()) {
Expand Down
18 changes: 10 additions & 8 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,20 +154,22 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
};
if (_version != max_version + 1 || should_sync_rowsets_produced_by_compaction()) {
auto sync_st = tablet->sync_rowsets();
if (sync_st.is<ErrorCode::INVALID_TABLET_STATE>()) [[unlikely]] {
_engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, "
"tablet_id: "
<< _tablet_id << " txn_id: " << _transaction_id
<< ", request_version=" << _version;
return sync_st;
}
if (!sync_st.ok()) {
LOG(WARNING) << "failed to sync rowsets. tablet_id=" << _tablet_id
<< ", txn_id=" << _transaction_id << ", status=" << sync_st;
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, sync_st);
return sync_st;
}
if (tablet->tablet_state() != TABLET_RUNNING) [[unlikely]] {
_engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, "
"tablet_id: "
<< _tablet_id << " txn_id: " << _transaction_id
<< ", request_version=" << _version;
return Status::Error<ErrorCode::INVALID_TABLET_STATE>(
"invalid tablet state {}. tablet_id={}", tablet->tablet_state(),
tablet->tablet_id());
}
}
auto sync_rowset_time_us = MonotonicMicros() - t2;
max_version = tablet->max_version_unlocked();
Expand Down
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,8 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
tablet->last_sync_time_s = now;

if (tablet->enable_unique_key_merge_on_write()) {
if (tablet->enable_unique_key_merge_on_write() &&
tablet->tablet_state() == TABLET_RUNNING) {
DeleteBitmap delete_bitmap(tablet_id);
int64_t old_max_version = req.start_version() - 1;
auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(),
Expand Down
53 changes: 53 additions & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ Status CloudTablet::capture_rs_readers(const Version& spec_version,
// There are only two tablet_states RUNNING and NOT_READY in cloud mode
// This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS.
Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data) {
RETURN_IF_ERROR(sync_if_not_running());

if (query_version > 0) {
std::shared_lock rlock(_meta_lock);
if (_max_version >= query_version) {
Expand All @@ -133,6 +135,57 @@ Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data)
return st;
}

// Sync tablet meta and all rowset meta if not running.
// This could happen when BE didn't finish schema change job and another BE committed this schema change job.
// It should be a quite rare situation.
Status CloudTablet::sync_if_not_running() {
if (tablet_state() == TABLET_RUNNING) {
return Status::OK();
}

// Serially execute sync to reduce unnecessary network overhead
std::lock_guard lock(_sync_meta_lock);

{
std::shared_lock rlock(_meta_lock);
if (tablet_state() == TABLET_RUNNING) {
return Status::OK();
}
}

TabletMetaSharedPtr tablet_meta;
auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
if (!st.ok()) {
if (st.is<ErrorCode::NOT_FOUND>()) {
clear_cache();
}
return st;
}

if (tablet_meta->tablet_state() != TABLET_RUNNING) [[unlikely]] {
// MoW may go to here when load while schema change
return Status::OK();
}

TimestampedVersionTracker empty_tracker;
{
std::lock_guard wlock(_meta_lock);
RETURN_IF_ERROR(set_tablet_state(TABLET_RUNNING));
_rs_version_map.clear();
_stale_rs_version_map.clear();
std::swap(_timestamped_version_tracker, empty_tracker);
_tablet_meta->clear_rowsets();
_tablet_meta->clear_stale_rowset();
_max_version = -1;
}

st = _engine.meta_mgr().sync_tablet_rowsets(this);
if (st.is<ErrorCode::NOT_FOUND>()) {
clear_cache();
}
return st;
}

TabletSchemaSPtr CloudTablet::merged_tablet_schema() const {
std::shared_lock rdlock(_meta_lock);
TabletSchemaSPtr target_schema;
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ class CloudTablet final : public BaseTablet {

static void recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets);

Status sync_if_not_running();

CloudStorageEngine& _engine;

// this mutex MUST ONLY be used when sync meta
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,6 @@ DEFINE_mBool(save_load_error_log_to_s3, "false");

DEFINE_mInt32(sync_load_for_tablets_thread, "32");

DEFINE_mBool(enable_new_tablet_do_compaction, "true");

} // namespace doris::config
1 change: 1 addition & 0 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ DECLARE_mInt32(tablet_sync_interval_s);

// Cloud compaction config
DECLARE_mInt64(min_compaction_failure_interval_ms);
DECLARE_mBool(enable_new_tablet_do_compaction);
// For cloud read/write separate mode
DECLARE_mInt64(base_compaction_freeze_interval_s);
DECLARE_mInt64(cu_compaction_freeze_interval_s);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1437,7 +1437,7 @@ Status BaseTablet::update_delete_bitmap_without_lock(
<< ", rnd:" << rnd << ", percent: " << percent;
}
});
int64_t cur_version = rowset->end_version();
int64_t cur_version = rowset->start_version();
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(std::dynamic_pointer_cast<BetaRowset>(rowset)->load_segments(&segments));

Expand Down
1 change: 0 additions & 1 deletion be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <memory>
#include <mutex>
#include <roaring/roaring.hh>
#include <thread>
#include <tuple>
#include <utility>

Expand Down
49 changes: 33 additions & 16 deletions cloud/src/meta-service/meta_service_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,18 @@ namespace doris::cloud {
static constexpr int COMPACTION_DELETE_BITMAP_LOCK_ID = -1;
static constexpr int SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID = -2;

// check compaction input_versions are valid during schema change.
// If the schema change job doesnt have alter version, it dont need to check
// because the schema change job is come from old version BE.
// we will check they in prepare compaction and commit compaction.
// 1. When if base compaction, we need to guarantee the end version
// is less than or equal to alter_version.
// 2. When if cu compaction, we need to guarantee the start version
// is large than alter_version.
bool check_compaction_input_verions(const TabletCompactionJobPB& compaction,
const TabletJobInfoPB& job_pb) {
if (!job_pb.schema_change().has_alter_version()) return true;
if (!job_pb.has_schema_change() || !job_pb.schema_change().has_alter_version()) return true;
// compaction need to know [start_version, end_version]
DCHECK_EQ(compaction.input_versions_size(), 2) << proto_to_json(compaction);
DCHECK_LE(compaction.input_versions(0), compaction.input_versions(1))
<< proto_to_json(compaction);
Expand Down Expand Up @@ -138,7 +147,7 @@ void start_compaction_job(MetaServiceCode& code, std::string& msg, std::stringst
}
while (err == TxnErrorCode::TXN_OK) {
job_pb.ParseFromString(job_val);
if (job_pb.has_schema_change() && !check_compaction_input_verions(compaction, job_pb)) {
if (!check_compaction_input_verions(compaction, job_pb)) {
SS << "Check compaction input versions failed in schema change. input_version_start="
<< compaction.input_versions(0)
<< " input_version_end=" << compaction.input_versions(1)
Expand Down Expand Up @@ -178,8 +187,10 @@ void start_compaction_job(MetaServiceCode& code, std::string& msg, std::stringst
// for MOW table, so priority should be given to performing full
// compaction operations and canceling other types of compaction.
compactions.Clear();
} else if (!compaction.has_judge_input_versions_range() ||
!compaction.judge_input_versions_range()) {
} else if ((!compaction.has_check_input_versions_range() &&
compaction.input_versions().empty()) ||
(compaction.has_check_input_versions_range() &&
!compaction.check_input_versions_range())) {
// Unknown input version range, doesn't support parallel compaction of same type
for (auto& c : compactions) {
if (c.type() != compaction.type() && c.type() != TabletCompactionJobPB::FULL)
Expand Down Expand Up @@ -997,8 +1008,9 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
}

// MUST check initiator to let the retried BE commit this schema_change job.
if (schema_change.id() != recorded_schema_change.id() ||
schema_change.initiator() != recorded_schema_change.initiator()) {
if (request->action() == FinishTabletJobRequest::COMMIT &&
(schema_change.id() != recorded_schema_change.id() ||
schema_change.initiator() != recorded_schema_change.initiator())) {
SS << "unmatched job id or initiator, recorded_id=" << recorded_schema_change.id()
<< " given_id=" << schema_change.id()
<< " recorded_job=" << proto_to_json(recorded_schema_change)
Expand All @@ -1020,16 +1032,21 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
// Abort
//==========================================================================
if (request->action() == FinishTabletJobRequest::ABORT) {
// TODO(cyx)
// remove schema change
recorded_job.clear_schema_change();
auto job_val = recorded_job.SerializeAsString();
txn->put(job_key, job_val);
txn->remove(new_tablet_job_key);
INSTANCE_LOG(INFO) << "remove schema_change job tablet_id=" << tablet_id
<< " key=" << hex(job_key);

need_commit = true;
if (schema_change.new_tablet_idx().index_id() ==
recorded_schema_change.new_tablet_idx().index_id() &&
schema_change.new_tablet_idx().tablet_id() ==
recorded_schema_change.new_tablet_idx().tablet_id()) {
// TODO(cyx)
// remove schema change
recorded_job.clear_schema_change();
auto job_val = recorded_job.SerializeAsString();
txn->put(job_key, job_val);
txn->remove(new_tablet_job_key);
INSTANCE_LOG(INFO) << "remove schema_change job tablet_id=" << tablet_id
<< " key=" << hex(job_key);

need_commit = true;
}
return;
}

Expand Down
2 changes: 1 addition & 1 deletion cloud/test/meta_service_job_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void start_compaction_job(MetaService* meta_service, int64_t tablet_id, const st
if (input_version.first > 0 && input_version.second > 0) {
compaction->add_input_versions(input_version.first);
compaction->add_input_versions(input_version.second);
compaction->set_judge_input_versions_range(true);
compaction->set_check_input_versions_range(true);
}
meta_service->start_tablet_job(&cntl, &req, &res, nullptr);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,15 @@ protected void onCancel() {
Long partitionId = partitionEntry.getKey();
Map<Long, Long> rollupTabletIdToBaseTabletId = partitionEntry.getValue();
for (Map.Entry<Long, Long> tabletEntry : rollupTabletIdToBaseTabletId.entrySet()) {
Long rollupTabletId = tabletEntry.getKey();
Long baseTabletId = tabletEntry.getValue();
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.removeSchemaChangeJob(dbId, tableId, baseIndexId, partitionId, baseTabletId);
.removeSchemaChangeJob(dbId, tableId, baseIndexId, rollupIndexId,
partitionId, baseTabletId, rollupTabletId);
}
LOG.info("Cancel RollupJob. Remove SchemaChangeJob in ms." +
"dbId:{}, tableId:{}, rollupIndexId: {} partitionId:{}. tabletSize:{}",
dbId, tableId, rollupIndexId, partitionId, rollupTabletIdToBaseTabletId.size());
}
break;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,15 @@ protected void onCancel() {
Long originIndexId = indexIdMap.get(shadowIndexId);
Map<Long, Long> shadowTabletIdToOriginTabletId = data.getValue();
for (Map.Entry<Long, Long> entry : shadowTabletIdToOriginTabletId.entrySet()) {
Long shadowTabletId = entry.getKey();
Long originTabletId = entry.getValue();
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.removeSchemaChangeJob(dbId, tableId, originIndexId, partitionId, originTabletId);
.removeSchemaChangeJob(dbId, tableId, originIndexId, shadowIndexId,
partitionId, originTabletId, shadowTabletId);
}
LOG.info("Cancel SchemaChange. Remove SchemaChangeJob in ms." +
"dbId:{}, tableId:{}, originIndexId:{}, partitionId:{}. tabletSize:{}",
dbId, tableId, originIndexId, partitionId, shadowTabletIdToOriginTabletId.size());
}
break;
} catch (Exception e) {
Expand Down
Loading

0 comments on commit c1da59a

Please sign in to comment.