Skip to content

Commit d985f92

Browse files
branch-3.0: [fix](cloud-schema-change) Make SC tablet job abort logic really work #50908 (#51203)
Cherry-picked from #50908 Co-authored-by: Siyang Tang <tangsiyang@selectdb.com>
1 parent 2576d6b commit d985f92

File tree

7 files changed

+119
-7
lines changed

7 files changed

+119
-7
lines changed

be/src/cloud/cloud_schema_change_job.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,13 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque
122122
}
123123
return st;
124124
}
125+
DBUG_EXECUTE_IF("CloudSchemaChangeJob::process_alter_tablet.alter_fail", {
126+
auto res =
127+
Status::InternalError("inject alter tablet failed. base_tablet={}, new_tablet={}",
128+
request.base_tablet_id, request.new_tablet_id);
129+
LOG(WARNING) << "inject error. res=" << res;
130+
return res;
131+
});
125132
if (request.alter_version > 1) {
126133
// [0-1] is a placeholder rowset, no need to convert
127134
RETURN_IF_ERROR(_base_tablet->capture_rs_readers({2, start_resp.alter_version()},

cloud/src/meta-service/meta_service_job.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1086,7 +1086,9 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
10861086

10871087
// MUST check initiator to let the retried BE commit this schema_change job.
10881088
if (schema_change.id() != recorded_schema_change.id() ||
1089-
schema_change.initiator() != recorded_schema_change.initiator()) {
1089+
(schema_change.initiator() != recorded_schema_change.initiator() &&
1090+
request->action() != FinishTabletJobRequest::ABORT)) {
1091+
// abort is from FE, so initiator differ from the original one, just skip this check
10901092
SS << "unmatched job id or initiator, recorded_id=" << recorded_schema_change.id()
10911093
<< " given_id=" << schema_change.id()
10921094
<< " recorded_job=" << proto_to_json(recorded_schema_change)

fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ protected void onCancel() {
137137
Long rollupTabletId = tabletEntry.getKey();
138138
Long baseTabletId = tabletEntry.getValue();
139139
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
140-
.removeSchemaChangeJob(dbId, tableId, baseIndexId, rollupIndexId,
140+
.removeSchemaChangeJob(jobId, dbId, tableId, baseIndexId, rollupIndexId,
141141
partitionId, baseTabletId, rollupTabletId);
142142
}
143143
LOG.info("Cancel RollupJob. Remove SchemaChangeJob in ms."

fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,8 @@ protected void onCancel() {
140140
Long shadowTabletId = entry.getKey();
141141
Long originTabletId = entry.getValue();
142142
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
143-
.removeSchemaChangeJob(dbId, tableId, originIndexId, shadowIndexId,
144-
partitionId, originTabletId, shadowTabletId);
143+
.removeSchemaChangeJob(jobId, dbId, tableId, originIndexId, shadowIndexId,
144+
partitionId, originTabletId, shadowTabletId);
145145
}
146146
LOG.info("Cancel SchemaChange. Remove SchemaChangeJob in ms."
147147
+ "dbId:{}, tableId:{}, originIndexId:{}, partitionId:{}. tabletSize:{}",

fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,6 @@ protected void runRunningJob() throws AlterCancelException {
665665
commitShadowIndex();
666666
// all partitions are good
667667
onFinished(tbl);
668-
pruneMeta();
669668

670669
LOG.info("schema change job finished: {}", jobId);
671670

@@ -677,6 +676,7 @@ protected void runRunningJob() throws AlterCancelException {
677676
// Write edit log with table's write lock held, to avoid adding partitions before writing edit log,
678677
// else it will try to transform index in newly added partition while replaying and result in failure.
679678
Env.getCurrentEnv().getEditLog().logAlterJob(this);
679+
pruneMeta();
680680
} finally {
681681
tbl.writeUnlock();
682682
}
@@ -791,7 +791,6 @@ protected synchronized boolean cancelImpl(String errMsg) {
791791

792792
cancelInternal();
793793

794-
pruneMeta();
795794
this.errMsg = errMsg;
796795
this.finishedTimeMs = System.currentTimeMillis();
797796
changeTableState(dbId, tableId, OlapTableState.NORMAL);
@@ -800,6 +799,7 @@ protected synchronized boolean cancelImpl(String errMsg) {
800799
Env.getCurrentEnv().getEditLog().logAlterJob(this);
801800
LOG.info("cancel {} job {}, err: {}", this.type, jobId, errMsg);
802801
onCancel();
802+
pruneMeta();
803803

804804
return true;
805805
}
@@ -937,6 +937,7 @@ private void replayRunningJob(SchemaChangeJobV2 replayedJob) {
937937
LOG.info("replay finished schema change job: {} table id: {}", jobId, tableId);
938938
changeTableState(dbId, tableId, OlapTableState.NORMAL);
939939
LOG.info("set table's state to NORMAL when replay finished, table id: {}, job id: {}", tableId, jobId);
940+
pruneMeta();
940941
}
941942

942943
/**
@@ -952,6 +953,7 @@ private void replayCancelled(SchemaChangeJobV2 replayedJob) {
952953
LOG.info("replay cancelled schema change job: {}", jobId);
953954
changeTableState(dbId, tableId, OlapTableState.NORMAL);
954955
LOG.info("set table's state to NORMAL when replay cancelled, table id: {}, job id: {}", tableId, jobId);
956+
pruneMeta();
955957
}
956958

957959
@Override

fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -838,7 +838,7 @@ private void dropCloudPartition(long dbId, long tableId, List<Long> partitionIds
838838
}
839839
}
840840

841-
public void removeSchemaChangeJob(long dbId, long tableId, long indexId, long newIndexId,
841+
public void removeSchemaChangeJob(long jobId, long dbId, long tableId, long indexId, long newIndexId,
842842
long partitionId, long tabletId, long newTabletId)
843843
throws DdlException {
844844
Cloud.FinishTabletJobRequest.Builder finishTabletJobRequestBuilder = Cloud.FinishTabletJobRequest.newBuilder();
@@ -867,6 +867,7 @@ public void removeSchemaChangeJob(long dbId, long tableId, long indexId, long ne
867867
newtabletIndexPBBuilder.setTabletId(newTabletId);
868868
final Cloud.TabletIndexPB newtabletIndex = newtabletIndexPBBuilder.build();
869869
schemaChangeJobPBBuilder.setNewTabletIdx(newtabletIndex);
870+
schemaChangeJobPBBuilder.setId(String.valueOf(jobId));
870871
final Cloud.TabletSchemaChangeJobPB tabletSchemaChangeJobPb =
871872
schemaChangeJobPBBuilder.build();
872873

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import org.apache.doris.regression.util.NodeType
19+
20+
import org.codehaus.groovy.runtime.IOGroovyMethods
21+
import org.awaitility.Awaitility
22+
23+
suite("test_base_compaction_after_sc_fail", "nonConcurrent") {
24+
if (!isCloudMode()) {
25+
return
26+
}
27+
28+
def tableName = "test_base_compaction_after_sc_fail"
29+
30+
sql """ DROP TABLE IF EXISTS ${tableName} """
31+
sql """
32+
CREATE TABLE IF NOT EXISTS ${tableName} (
33+
`k1` int NOT NULL,
34+
`c1` int,
35+
`c2` int,
36+
`c3` int
37+
) DUPLICATE KEY(k1)
38+
DISTRIBUTED BY HASH(k1) BUCKETS 1
39+
PROPERTIES (
40+
"disable_auto_compaction" = "true",
41+
"replication_num" = "1");
42+
"""
43+
44+
def injectBe = null
45+
def backends = sql_return_maparray('show backends')
46+
def array = sql_return_maparray("SHOW TABLETS FROM ${tableName}")
47+
def injectBeId = array[0].BackendId
48+
def originTabletId = array[0].TabletId
49+
injectBe = backends.stream().filter(be -> be.BackendId == injectBeId).findFirst().orElse(null)
50+
assertNotNull(injectBe)
51+
52+
def injectName = "CloudSchemaChangeJob::process_alter_tablet.alter_fail"
53+
54+
GetDebugPoint().clearDebugPointsForAllBEs()
55+
56+
try {
57+
GetDebugPoint().enableDebugPointForAllBEs(injectName)
58+
59+
sql """ ALTER TABLE ${tableName} MODIFY COLUMN c1 VARCHAR(44) """
60+
61+
def wait_for_schema_change = {
62+
def try_times=1000
63+
while(true){
64+
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
65+
Thread.sleep(10)
66+
if(res[0][9].toString() == "CANCELLED") {
67+
break;
68+
}
69+
assert(try_times>0)
70+
try_times--
71+
}
72+
}
73+
wait_for_schema_change()
74+
75+
def insert_data = {
76+
for (i in 0..100) {
77+
sql """ INSERT INTO ${tableName} VALUES(1, "2", 3, 4) """
78+
sql """ DELETE FROM ${tableName} WHERE k1=1 """
79+
}
80+
}
81+
82+
insert_data()
83+
84+
trigger_and_wait_compaction(tableName, "cumulative")
85+
86+
insert_data()
87+
88+
trigger_and_wait_compaction(tableName, "cumulative")
89+
90+
insert_data()
91+
92+
trigger_and_wait_compaction(tableName, "cumulative")
93+
94+
trigger_and_wait_compaction(tableName, "base")
95+
96+
} finally {
97+
GetDebugPoint().disableDebugPointForAllBEs(injectName)
98+
}
99+
100+
}

0 commit comments

Comments
 (0)