-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[Spark Load]Create spark load's repository in HDFS for dependencies #4163
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
Outdated
Show resolved
Hide resolved
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java
Outdated
Show resolved
Hide resolved
| .setAppResource(appResourceHdfsPath) | ||
| .setMainClass(SparkEtlJob.class.getCanonicalName()) | ||
| .setAppName(String.format(ETL_JOB_NAME, loadLabel)) | ||
| .setSparkHome(spark_home) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SparkHome argument of sparkLaucher
| } | ||
|
|
||
| private void initRepository() throws LoadException { | ||
| LOG.info("start to init remote repository"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the whole initRepository need to be lock protected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a synchronized lock in SparkEtlHandler for cluster id.
Now initRepository operations are protected by lock if they are in same cluster
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java
Outdated
Show resolved
Hide resolved
fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
Outdated
Show resolved
Hide resolved
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
Outdated
Show resolved
Hide resolved
| sparkConfigs.put("spark.yarn.archive", jobArchiveHdfsPath); | ||
| } | ||
| if (Strings.isNullOrEmpty(sparkConfigs.get("spark.yarn.stage.dir"))) { | ||
| sparkConfigs.put("spark.yarn.stage.dir", jobStageHdfsPath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What' s this spark.yarn.stage.dir means?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A self-generated path by spark in hdfs to save temporary configuration for spark application
fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
Outdated
Show resolved
Hide resolved
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
Outdated
Show resolved
Hide resolved
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
Outdated
Show resolved
Hide resolved
fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
Outdated
Show resolved
Hide resolved
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java
Outdated
Show resolved
Hide resolved
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java
Outdated
Show resolved
Hide resolved
fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
Outdated
Show resolved
Hide resolved
fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
Outdated
Show resolved
Hide resolved
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
Outdated
Show resolved
Hide resolved
|
|
||
| // update archive and stage configs here | ||
| Map<String, String> sparkConfigs = resource.getSparkConfigs(); | ||
| if (Strings.isNullOrEmpty(sparkConfigs.get("spark.yarn.archive"))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In what situation, the spark.yarn.archive config will NOT be empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If user has set spark.yarn.archivein resource, we prefer to use the archive set by user, otherwise we use archive generated by SparkRepository.
| .setAppResource(appResourceHdfsPath) | ||
| .setMainClass(SparkEtlJob.class.getCanonicalName()) | ||
| .setAppName(String.format(ETL_JOB_NAME, loadLabel)) | ||
| .setSparkHome(sparkHome) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why need to set spark home here?
Is it compatible with open source spark env?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This spark home is configurable. Users in open source environment need to configure this parameter in fe.conf
morningman
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Proposed changes
Please see the main description in issue #4101
Resume
When users use spark load, they have to upload the dependent jars to hdfs every time.
This cl will add a self-generated repository under working_dir folder in hdfs for saving dependecies of spark dpp programe and spark platform.
Note that, the dependcies we upload to repository include:
1、
spark-dpp.jar2、
spark2x.zip1 is the dpp library which built with spark-dpp submodule. See details about spark-dpp submodule in pr #4146 .
2 is the spark2.x.x platform library which contains all jars in $SPARK_HOME/jars
The repository structure will be like this:
The followinng conditions will force fe to upload dependencies:
1、When fe find its dppVersion is absent in repository.
2、The MD5 value of remote file does not match the local file.
Before Fe uploads the dependencies, it will create an archive directory with name
__archive_{dppVersion}under the repository.Types of changes
Checklist