Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class SparkEtlJobHandler {
private static final String YARN_KILL_CMD = "%s --config %s application -kill %s";

public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobConfig, SparkResource resource,
BrokerDesc brokerDesc, SparkPendingTaskAttachment attachment) throws LoadException {
BrokerDesc brokerDesc, SparkLoadAppHandle handle, SparkPendingTaskAttachment attachment) throws LoadException {
// delete outputPath
deleteEtlOutputPath(etlJobConfig.outputPath, brokerDesc);

Expand Down Expand Up @@ -142,13 +142,12 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo
}

// start app
SparkLoadAppHandle handle = null;
State state = null;
String appId = null;
String errMsg = "start spark app failed. error: ";
try {
Process process = launcher.launch();
handle = new SparkLoadAppHandle(process);
handle.setProcess(process);
if (!FeConstants.runningUnitTest) {
SparkLauncherMonitor.LogMonitor logMonitor = SparkLauncherMonitor.createLogMonitor(handle);
logMonitor.setSubmitTimeoutMs(GET_APPID_TIMEOUT_MS);
Expand Down Expand Up @@ -264,8 +263,18 @@ public EtlStatus getEtlJobStatus(SparkLoadAppHandle handle, String appId, long l
}

public void killEtlJob(SparkLoadAppHandle handle, String appId, long loadJobId, SparkResource resource) throws LoadException {
Preconditions.checkNotNull(appId);
if (resource.isYarnMaster()) {
// The appId may be empty when the load job is in PENDING phase. This is because the appId is
// parsed from the spark launcher process's output (spark launcher process submit job and then
// return appId). In this case, the spark job has still not been submitted, we only need to kill
// the spark launcher process.
if (Strings.isNullOrEmpty(appId)) {
appId = handle.getAppId();
if (Strings.isNullOrEmpty(appId)) {
handle.kill();
return;
}
}
// prepare yarn config
String configDir = resource.prepareYarnConfig();
// yarn client path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public void setRedirectLogPath(String redirectLogPath) throws IOException {
// UNKNOWN/SUBMITTED for a long time.
@Override
public void run() {
if (handle.getState() == SparkLoadAppHandle.State.KILLED) {
// If handle has been killed, kill the process
process.destroyForcibly();
return;
}
BufferedReader outReader = null;
String line = null;
long startTime = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ public void kill() {

public String getLogPath() { return this.logPath; }

public void setProcess(Process process) {
this.process = process;
}

public void setState(State state) {
this.state = state;
this.fireEvent(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,10 @@ protected long getEtlStartTimestamp() {
return etlStartTimestamp;
}

public SparkLoadAppHandle getHandle() {
return sparkLoadAppHandle;
}

public void clearSparkLauncherLog() {
String logPath = sparkLoadAppHandle.getLogPath();
if (!Strings.isNullOrEmpty(logPath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,15 @@
import org.apache.doris.load.loadv2.etl.EtlJobConfig.FilePatternVersion;
import org.apache.doris.load.loadv2.etl.EtlJobConfig.SourceType;
import org.apache.doris.transaction.TransactionState;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -84,6 +83,7 @@ public class SparkLoadPendingTask extends LoadTask {
private final long loadJobId;
private final long transactionId;
private EtlJobConfig etlJobConfig;
private SparkLoadAppHandle sparkLoadAppHandle;

public SparkLoadPendingTask(SparkLoadJob loadTaskCallback,
Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToBrokerFileGroups,
Expand All @@ -98,6 +98,7 @@ public SparkLoadPendingTask(SparkLoadJob loadTaskCallback,
this.loadJobId = loadTaskCallback.getId();
this.loadLabel = loadTaskCallback.getLabel();
this.transactionId = loadTaskCallback.getTransactionId();
this.sparkLoadAppHandle = loadTaskCallback.getHandle();
this.failMsg = new FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL);
}

Expand All @@ -115,7 +116,7 @@ private void submitEtlJob() throws LoadException {

// handler submit etl job
SparkEtlJobHandler handler = new SparkEtlJobHandler();
handler.submitEtlJob(loadJobId, loadLabel, etlJobConfig, resource, brokerDesc, sparkAttachment);
handler.submitEtlJob(loadJobId, loadLabel, etlJobConfig, resource, brokerDesc, sparkLoadAppHandle, sparkAttachment);
LOG.info("submit spark etl job success. load job id: {}, attachment: {}", loadJobId, sparkAttachment);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void testSubmitEtlJob(@Mocked BrokerUtil brokerUtil, @Mocked SparkLaunche
BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap());
SparkPendingTaskAttachment attachment = new SparkPendingTaskAttachment(pendingTaskId);
SparkEtlJobHandler handler = new SparkEtlJobHandler();
handler.submitEtlJob(loadJobId, label, etlJobConfig, resource, brokerDesc, attachment);
handler.submitEtlJob(loadJobId, label, etlJobConfig, resource, brokerDesc, handle, attachment);

// check submit etl job success
Assert.assertEquals(appId, attachment.getAppId());
Expand Down Expand Up @@ -203,7 +203,7 @@ public void testSubmitEtlJobFailed(@Mocked BrokerUtil brokerUtil, @Mocked SparkL
BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap());
SparkPendingTaskAttachment attachment = new SparkPendingTaskAttachment(pendingTaskId);
SparkEtlJobHandler handler = new SparkEtlJobHandler();
handler.submitEtlJob(loadJobId, label, etlJobConfig, resource, brokerDesc, attachment);
handler.submitEtlJob(loadJobId, label, etlJobConfig, resource, brokerDesc, handle, attachment);
}

@Test
Expand Down