Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
Lchangliang committed Aug 1, 2024
1 parent ffb0907 commit 59894ff
Show file tree
Hide file tree
Showing 14 changed files with 333 additions and 32 deletions.
4 changes: 3 additions & 1 deletion be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ Status CloudBaseCompaction::prepare_compact() {
compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4);
cloud::StartTabletJobResponse resp;
auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
if (resp.has_alter_version()) {
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
}
if (!st.ok()) {
if (resp.status().code() == cloud::STALE_TABLET_CACHE) {
// set last_sync_time to 0 to force sync tablet next time
Expand All @@ -113,7 +116,6 @@ Status CloudBaseCompaction::prepare_compact() {
<< " schema_change_alter_version=" << resp.alter_version();
std::string msg = ss.str();
LOG(WARNING) << msg;
cloud_tablet->set_alter_version(resp.alter_version());
return Status::InternalError(msg);
}
return st;
Expand Down
5 changes: 4 additions & 1 deletion be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,13 @@ Status CloudCumulativeCompaction::modify_rowsets() {

cloud::FinishTabletJobResponse resp;
auto st = _engine.meta_mgr().commit_tablet_job(job, &resp);
if (resp.has_alter_version()) {
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
}
if (!st.ok()) {
if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
cloud_tablet()->clear_cache();
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
(dynamic_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
std::stringstream ss;
ss << "failed to prepare cumu compaction. Check compaction input versions "
"failed in schema change. "
Expand All @@ -288,6 +290,7 @@ Status CloudCumulativeCompaction::modify_rowsets() {
}
return st;
}

auto& stats = resp.stats();
LOG(INFO) << "tablet stats=" << stats.ShortDebugString();
{
Expand Down
43 changes: 25 additions & 18 deletions cloud/src/meta-service/meta_service_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,10 @@ void process_compaction_job(MetaServiceCode& code, std::string& msg, std::string
txn->put(job_key, job_val);
INSTANCE_LOG(INFO) << "remove compaction job tabelt_id=" << tablet_id
<< " key=" << hex(job_key);

response->set_alter_version(recorded_job.has_schema_change() &&
recorded_job.schema_change().has_alter_version()
? recorded_job.schema_change().alter_version()
: -1);
need_commit = true;
}

Expand Down Expand Up @@ -1007,9 +1010,8 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
}

// MUST check initiator to let the retried BE commit this schema_change job.
if (request->action() == FinishTabletJobRequest::COMMIT &&
(schema_change.id() != recorded_schema_change.id() ||
schema_change.initiator() != recorded_schema_change.initiator())) {
if (schema_change.id() != recorded_schema_change.id() ||
schema_change.initiator() != recorded_schema_change.initiator()) {
SS << "unmatched job id or initiator, recorded_id=" << recorded_schema_change.id()
<< " given_id=" << schema_change.id()
<< " recorded_job=" << proto_to_json(recorded_schema_change)
Expand All @@ -1031,21 +1033,22 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
{instance_id, new_table_id, new_index_id, new_partition_id, new_tablet_id});

std::string new_tablet_job_val;
TabletJobInfoPB new_recorded_job;
err = txn->get(new_tablet_job_key, &new_tablet_job_val);
if (err != TxnErrorCode::TXN_OK) {
SS << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "job not found," : "internal error,")
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
SS << "internal error,"
<< " instance_id=" << instance_id << " tablet_id=" << new_tablet_id
<< " job=" << proto_to_json(request->job()) << " err=" << err;
msg = ss.str();
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::INVALID_ARGUMENT
: cast_as<ErrCategory::READ>(err);
return;
}
TabletJobInfoPB new_recorded_job;
if (!new_recorded_job.ParseFromString(new_tablet_job_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed new tablet recorded job";
return;
} else if (err == TxnErrorCode::TXN_OK) {
if (!new_recorded_job.ParseFromString(new_tablet_job_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed new tablet recorded job";
return;
}
}

//==========================================================================
Expand All @@ -1058,11 +1061,13 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
recorded_schema_change.new_tablet_idx().tablet_id()) {
// remove schema change
recorded_job.clear_schema_change();
new_recorded_job.clear_schema_change();
auto job_val = recorded_job.SerializeAsString();
new_tablet_job_val = new_recorded_job.SerializeAsString();
txn->put(job_key, job_val);
txn->put(new_tablet_job_key, new_tablet_job_val);
if (!new_tablet_job_val.empty()) {
new_recorded_job.clear_schema_change();
new_tablet_job_val = new_recorded_job.SerializeAsString();
txn->put(new_tablet_job_key, new_tablet_job_val);
}
INSTANCE_LOG(INFO) << "remove schema_change job tablet_id=" << tablet_id
<< " key=" << hex(job_key);

Expand Down Expand Up @@ -1226,11 +1231,13 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
// remove schema_change job
//==========================================================================
recorded_job.clear_schema_change();
new_recorded_job.clear_schema_change();
auto job_val = recorded_job.SerializeAsString();
txn->put(job_key, job_val);
new_tablet_job_val = new_recorded_job.SerializeAsString();
txn->put(new_tablet_job_key, new_tablet_job_val);
if (!new_tablet_job_val.empty()) {
new_recorded_job.clear_schema_change();
new_tablet_job_val = new_recorded_job.SerializeAsString();
txn->put(new_tablet_job_key, new_tablet_job_val);
}
INSTANCE_LOG(INFO) << "remove schema_change job tablet_id=" << tablet_id
<< " key=" << hex(job_key);

Expand Down
10 changes: 7 additions & 3 deletions cloud/test/meta_service_job_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -687,8 +687,11 @@ TEST(MetaServiceJobTest, ProcessSchemaChangeArguments) {
recorded_sc->set_id("sc1");
recorded_sc->set_initiator("BE1");
job_val = recorded_job.SerializeAsString();
auto new_job_key =
job_tablet_key({instance_id, table_id, new_index_id, partition_id, new_tablet_id});
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->put(job_key, job_val);
txn->put(new_job_key, job_val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
meta_service->finish_tablet_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << res.status().msg();
Expand Down Expand Up @@ -2342,12 +2345,12 @@ TEST(MetaServiceJobTest, DoCompactionWhenSC) {
StartTabletJobResponse res;
start_compaction_job(meta_service.get(), tablet_id, "job1", "BE1", 0, 7,
TabletCompactionJobPB::CUMULATIVE, res, {7, 10});
ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_CHECK_ALTER_VERSION_FAIL);
ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_CHECK_ALTER_VERSION);
res.Clear();

start_compaction_job(meta_service.get(), tablet_id, "job1", "BE1", 0, 7,
TabletCompactionJobPB::BASE, res, {0, 10});
ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_CHECK_ALTER_VERSION_FAIL);
ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_CHECK_ALTER_VERSION);
res.Clear();

start_compaction_job(meta_service.get(), tablet_id, "job1", "BE1", 0, 7,
Expand Down Expand Up @@ -2499,7 +2502,8 @@ TEST(MetaServiceJobTest, CancelSC) {
FinishTabletJobResponse finish_res;
finish_schema_change_job(meta_service.get(), tablet_id, new_tablet_id, "job_sc", "BE1", {},
finish_res, FinishTabletJobRequest::ABORT);
ASSERT_EQ(finish_res.status().code(), MetaServiceCode::OK);
ASSERT_NE(finish_res.status().msg().find("unmatched job id or initiator"),
std::string::npos);
}
{
std::unique_ptr<Transaction> txn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ protected void onCancel() {
Long rollupTabletId = tabletEntry.getKey();
Long baseTabletId = tabletEntry.getValue();
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.removeSchemaChangeJob(dbId, tableId, baseIndexId, rollupIndexId,
.removeSchemaChangeJob(dbId, tableId, baseIndexId, rollupIndexId,
partitionId, baseTabletId, rollupTabletId);
}
LOG.info("Cancel RollupJob. Remove SchemaChangeJob in ms." +
"dbId:{}, tableId:{}, rollupIndexId: {} partitionId:{}. tabletSize:{}",
LOG.info("Cancel RollupJob. Remove SchemaChangeJob in ms."
+ "dbId:{}, tableId:{}, rollupIndexId: {} partitionId:{}. tabletSize:{}",
dbId, tableId, rollupIndexId, partitionId, rollupTabletIdToBaseTabletId.size());
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ protected void onCancel() {
Long shadowTabletId = entry.getKey();
Long originTabletId = entry.getValue();
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.removeSchemaChangeJob(dbId, tableId, originIndexId, shadowIndexId,
.removeSchemaChangeJob(dbId, tableId, originIndexId, shadowIndexId,
partitionId, originTabletId, shadowTabletId);
}
LOG.info("Cancel SchemaChange. Remove SchemaChangeJob in ms." +
"dbId:{}, tableId:{}, originIndexId:{}, partitionId:{}. tabletSize:{}",
LOG.info("Cancel SchemaChange. Remove SchemaChangeJob in ms."
+ "dbId:{}, tableId:{}, originIndexId:{}, partitionId:{}. tabletSize:{}",
dbId, tableId, originIndexId, partitionId, shadowTabletIdToOriginTabletId.size());
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -874,8 +874,8 @@ public void removeSchemaChangeJob(long dbId, long tableId, long indexId, long ne
newtabletIndexPBBuilder.setTabletId(newTabletId);
final Cloud.TabletIndexPB newtabletIndex = newtabletIndexPBBuilder.build();
schemaChangeJobPBBuilder.setNewTabletIdx(newtabletIndex);
final Cloud.TabletSchemaChangeJobPB tabletSchemaChangeJobPb =
schemaChangeJobPBBuilder.build();
final Cloud.TabletSchemaChangeJobPB tabletSchemaChangeJobPb =
schemaChangeJobPBBuilder.build();

tabletJobInfoPBBuilder.setSchemaChange(tabletSchemaChangeJobPb);

Expand Down
2 changes: 1 addition & 1 deletion gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1340,7 +1340,7 @@ enum MetaServiceCode {
JOB_ALREADY_SUCCESS = 5002;
ROUTINE_LOAD_DATA_INCONSISTENT = 5003;
ROUTINE_LOAD_PROGRESS_NOT_FOUND = 5004;
JOB_CHECK_ALTER_VERSION_FAIL = 5005;
JOB_CHECK_ALTER_VERSION = 5005;

// Rate limit
MAX_QPS_LIMIT = 6001;
Expand Down
1 change: 1 addition & 0 deletions regression-test/pipeline/cloud_p0/conf/be_custom.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ save_load_error_log_to_s3 = true
enable_stream_load_record = true
stream_load_record_batch_size = 500
webserver_num_workers = 128
enable_new_tablet_do_compaction = true
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ suite('test_schema_change_with_compaction10') {
options.cloudMode = true
options.enableDebugPoints()
options.beConfigs += [ "enable_java_support=false" ]
options.beConfigs += [ "enable_new_tablet_do_compaction=true" ]
options.beConfigs += [ "disable_auto_compaction=true" ]
options.beNum = 1
docker(options) {
Expand Down
Loading

0 comments on commit 59894ff

Please sign in to comment.