Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.doris.catalog;

import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.ResourceDesc;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.proc.BaseProcResult;

import org.apache.doris.load.loadv2.SparkRepository;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -144,6 +146,26 @@ public SparkResource getCopiedResource() {
return new SparkResource(name, Maps.newHashMap(sparkConfigs), workingDir, broker, brokerProperties);
}

// Each SparkResource has and only has one SparkRepository.
// This method get the remote archive which matches the dpp version from remote repository
public synchronized SparkRepository.SparkArchive prepareArchive() throws LoadException {
String remoteRepositoryPath = workingDir + "/" + Catalog.getCurrentCatalog().getClusterId()
+ "/" + SparkRepository.REPOSITORY_DIR + name;
BrokerDesc brokerDesc = new BrokerDesc(broker, getBrokerPropertiesWithoutPrefix());
SparkRepository repository = new SparkRepository(remoteRepositoryPath, brokerDesc);
// This checks and uploads the remote archive.
repository.prepare();
SparkRepository.SparkArchive archive = repository.getCurrentArchive();
// Normally, an archive should contain a DPP library and a SPARK library
Preconditions.checkState(archive.libraries.size() == 2);
SparkRepository.SparkLibrary dppLibrary = archive.getDppLibrary();
SparkRepository.SparkLibrary spark2xLibrary = archive.getSpark2xLibrary();
if (dppLibrary == null || spark2xLibrary == null) {
throw new LoadException("failed to get libraries from remote archive");
}
return archive;
}

public boolean isYarnMaster() {
return getMaster().equalsIgnoreCase(YARN_MASTER);
}
Expand Down
18 changes: 18 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -509,12 +509,30 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int hadoop_load_default_timeout_second = 86400 * 3; // 3 day

// Configurations for spark load
/**
* Default spark dpp version
*/
@ConfField
public static String spark_dpp_version = "1_0_0";
/**
* Default spark load timeout
*/
@ConfField(mutable = true, masterOnly = true)
public static int spark_load_default_timeout_second = 86400; // 1 day

/**
* Default spark home dir
*/
@ConfField(mutable = true, masterOnly = true)
public static String spark_home_default_dir = PaloFe.DORIS_HOME_DIR + "/lib/spark2x";

/**
* Default spark dependencies path
*/
@ConfField
public static String spark_resource_path = "";

/**
* Default number of waiting jobs for routine load and version 2 of load
* This is a desired number.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TBrokerCheckPathExistRequest;
import org.apache.doris.thrift.TBrokerCheckPathExistResponse;
import org.apache.doris.thrift.TBrokerCloseReaderRequest;
import org.apache.doris.thrift.TBrokerCloseWriterRequest;
import org.apache.doris.thrift.TBrokerDeletePathRequest;
Expand All @@ -45,7 +48,6 @@
import org.apache.doris.thrift.TBrokerVersion;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloBrokerService;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;

Expand Down Expand Up @@ -349,6 +351,38 @@ public static void deletePath(String path, BrokerDesc brokerDesc) throws UserExc
}
}

public static boolean checkPathExist(String remotePath, BrokerDesc brokerDesc) throws UserException {
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBrokerAddressAndClient(brokerDesc);
TPaloBrokerService.Client client = pair.first;
TNetworkAddress address = pair.second;
boolean failed = true;
try {
TBrokerCheckPathExistRequest req = new TBrokerCheckPathExistRequest(TBrokerVersion.VERSION_ONE,
remotePath, brokerDesc.getProperties());
TBrokerCheckPathExistResponse rep = client.checkPathExist(req);
if (rep.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
throw new UserException("Broker check path exist failed. path=" + remotePath + ", broker=" + address +
", msg=" + rep.getOpStatus().getMessage());
}
failed = false;
return rep.isPathExist;
} catch (TException e) {
LOG.warn("Broker check path exist failed, path={}, address={}, exception={}", remotePath, address, e);
throw new UserException("Broker check path exist exception. path=" + remotePath + ",broker=" + address);
} finally {
returnClient(client, address, failed);
}
}

public static Pair<TPaloBrokerService.Client, TNetworkAddress> getBrokerAddressAndClient(BrokerDesc brokerDesc) throws UserException {
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = new Pair<TPaloBrokerService.Client, TNetworkAddress>(null, null);
TNetworkAddress address = getAddress(brokerDesc);
TPaloBrokerService.Client client = borrowClient(address);
pair.first = client;
pair.second = address;
return pair;
}

private static TNetworkAddress getAddress(BrokerDesc brokerDesc) throws UserException {
FsBroker broker = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.doris.load.loadv2;

import org.apache.doris.PaloFe;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.catalog.SparkResource;
import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
Expand All @@ -30,6 +30,12 @@
import org.apache.doris.load.loadv2.etl.SparkEtlJob;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TEtlState;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
Expand All @@ -47,13 +53,6 @@
import org.apache.spark.launcher.SparkAppHandle.State;
import org.apache.spark.launcher.SparkLauncher;

import com.google.common.base.Strings;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
Expand All @@ -70,9 +69,7 @@
public class SparkEtlJobHandler {
private static final Logger LOG = LogManager.getLogger(SparkEtlJobHandler.class);

private static final String APP_RESOURCE_NAME = "palo-fe.jar";
private static final String CONFIG_FILE_NAME = "jobconfig.json";
private static final String APP_RESOURCE_LOCAL_PATH = PaloFe.DORIS_HOME_DIR + "/lib/" + APP_RESOURCE_NAME;
private static final String JOB_CONFIG_DIR = "configs";
private static final String ETL_JOB_NAME = "doris__%s";
// 5min
Expand All @@ -92,12 +89,34 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo
// delete outputPath
deleteEtlOutputPath(etlJobConfig.outputPath, brokerDesc);

// upload app resource and jobconfig to hdfs
// prepare dpp archive
SparkRepository.SparkArchive archive = resource.prepareArchive();
SparkRepository.SparkLibrary dppLibrary = archive.getDppLibrary();
SparkRepository.SparkLibrary spark2xLibrary = archive.getSpark2xLibrary();

// spark home
String sparkHome = Config.spark_home_default_dir;
// etl config path
String configsHdfsDir = etlJobConfig.outputPath + "/" + JOB_CONFIG_DIR + "/";
String appResourceHdfsPath = configsHdfsDir + APP_RESOURCE_NAME;
// etl config json path
String jobConfigHdfsPath = configsHdfsDir + CONFIG_FILE_NAME;
// spark submit app resource path
String appResourceHdfsPath = dppLibrary.remotePath;
// spark yarn archive path
String jobArchiveHdfsPath = spark2xLibrary.remotePath;
// spark yarn stage dir
String jobStageHdfsPath = resource.getWorkingDir();

// update archive and stage configs here
Map<String, String> sparkConfigs = resource.getSparkConfigs();
if (Strings.isNullOrEmpty(sparkConfigs.get("spark.yarn.archive"))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what situation, the spark.yarn.archive config will NOT be empty?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If user has set spark.yarn.archivein resource, we prefer to use the archive set by user, otherwise we use archive generated by SparkRepository.

sparkConfigs.put("spark.yarn.archive", jobArchiveHdfsPath);
}
if (Strings.isNullOrEmpty(sparkConfigs.get("spark.yarn.stage.dir"))) {
sparkConfigs.put("spark.yarn.stage.dir", jobStageHdfsPath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What' s this spark.yarn.stage.dir means?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A self-generated path by spark in hdfs to save temporary configuration for spark application

}

try {
BrokerUtil.writeFile(APP_RESOURCE_LOCAL_PATH, appResourceHdfsPath, brokerDesc);
byte[] configData = etlJobConfig.configToJson().getBytes("UTF-8");
BrokerUtil.writeFile(configData, jobConfigHdfsPath, brokerDesc);
} catch (UserException | UnsupportedEncodingException e) {
Expand All @@ -114,7 +133,9 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo
.setAppResource(appResourceHdfsPath)
.setMainClass(SparkEtlJob.class.getCanonicalName())
.setAppName(String.format(ETL_JOB_NAME, loadLabel))
.setSparkHome(sparkHome)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need to set spark home here?
Is it compatible with open source spark env?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This spark home is configurable. Users in open source environment need to configure this parameter in fe.conf

.addAppArgs(jobConfigHdfsPath);

// spark configs
for (Map.Entry<String, String> entry : resource.getSparkConfigs().entrySet()) {
launcher.setConf(entry.getKey(), entry.getValue());
Expand Down
Loading