Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#839 enhancement : add Spark Task Component can switch Spark Version #1494

Merged
merged 4 commits into from
Dec 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.enums;

import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.Getter;

@Getter
public enum SparkVersion {

/**
* 0 SPARK1
* 1 SPARK2
*/
SPARK1(0, "SPARK1"),
SPARK2(1, "SPARK2");

SparkVersion(int code, String descp){
this.code = code;
this.descp = descp;
}

@EnumValue
private final int code;
private final String descp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ public class SparkParameters extends AbstractParameters {
*/
private ProgramType programType;

/**
* spark version
*/
private String sparkVersion;

public ResourceInfo getMainJar() {
return mainJar;
}
Expand Down Expand Up @@ -200,9 +205,17 @@ public void setProgramType(ProgramType programType) {
this.programType = programType;
}

public String getSparkVersion() {
return sparkVersion;
}

public void setSparkVersion(String sparkVersion) {
this.sparkVersion = sparkVersion;
}

@Override
public boolean checkParameters() {
return mainJar != null && programType != null;
return mainJar != null && programType != null && sparkVersion != null;
}


Expand All @@ -211,7 +224,7 @@ public List<String> getResourceFilesList() {
if(resourceList !=null ) {
this.resourceList.add(mainJar);
return resourceList.stream()
.map(p -> p.getRes()).collect(Collectors.toList());
.map(ResourceInfo::getRes).collect(Collectors.toList());
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.dolphinscheduler.server.worker.task.spark;

import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.enums.SparkVersion;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
Expand All @@ -25,7 +27,6 @@
import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;

import java.util.ArrayList;
Expand All @@ -38,9 +39,14 @@
public class SparkTask extends AbstractYarnTask {

/**
* spark command
* spark1 command
*/
private static final String SPARK1_COMMAND = "${SPARK_HOME1}/bin/spark-submit";

/**
* spark2 command
*/
private static final String SPARK_COMMAND = "spark-submit";
private static final String SPARK2_COMMAND = "${SPARK_HOME2}/bin/spark-submit";

/**
* spark parameters
Expand Down Expand Up @@ -89,7 +95,14 @@ public void init() {
protected String buildCommand() {
List<String> args = new ArrayList<>();

args.add(SPARK_COMMAND);
//spark version
String sparkCommand = SPARK2_COMMAND;

if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
sparkCommand = SPARK1_COMMAND;
}

args.add(sparkCommand);

// other parameters
args.addAll(SparkArgsUtils.buildArgs(sparkParameters));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.spark;

import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.enums.SparkVersion;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;


public class SparkTaskTest {

private static final Logger logger = LoggerFactory.getLogger(SparkTaskTest.class);

/**
* spark1 command
*/
private static final String SPARK1_COMMAND = "${SPARK_HOME1}/bin/spark-submit";

/**
* spark2 command
*/
private static final String SPARK2_COMMAND = "${SPARK_HOME2}/bin/spark-submit";

@Test
public void testSparkTaskInit() {

TaskProps taskProps = new TaskProps();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ut has no assert check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will add assert check for this


String spark1Params = "{" +
"\"mainArgs\":\"\", " +
"\"driverMemory\":\"1G\", " +
"\"executorMemory\":\"2G\", " +
"\"programType\":\"SCALA\", " +
"\"mainClass\":\"basicetl.GlobalUserCar\", " +
"\"driverCores\":\"2\", " +
"\"deployMode\":\"cluster\", " +
"\"executorCores\":2, " +
"\"mainJar\":{\"res\":\"test-1.0-SNAPSHOT.jar\"}, " +
"\"sparkVersion\":\"SPARK1\", " +
"\"numExecutors\":\"10\", " +
"\"localParams\":[], " +
"\"others\":\"\", " +
"\"resourceList\":[]" +
"}";

String spark2Params = "{" +
"\"mainArgs\":\"\", " +
"\"driverMemory\":\"1G\", " +
"\"executorMemory\":\"2G\", " +
"\"programType\":\"SCALA\", " +
"\"mainClass\":\"basicetl.GlobalUserCar\", " +
"\"driverCores\":\"2\", " +
"\"deployMode\":\"cluster\", " +
"\"executorCores\":2, " +
"\"mainJar\":{\"res\":\"test-1.0-SNAPSHOT.jar\"}, " +
"\"sparkVersion\":\"SPARK2\", " +
"\"numExecutors\":\"10\", " +
"\"localParams\":[], " +
"\"others\":\"\", " +
"\"resourceList\":[]" +
"}";

taskProps.setTaskParams(spark2Params);

logger.info("spark task params {}", taskProps.getTaskParams());

SparkParameters sparkParameters = JSONUtils.parseObject(taskProps.getTaskParams(), SparkParameters.class);

assert sparkParameters != null;
if (!sparkParameters.checkParameters()) {
throw new RuntimeException("spark task params is not valid");
}
sparkParameters.setQueue(taskProps.getQueue());

if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) {
String args = sparkParameters.getMainArgs();

/**
* combining local and global parameters
*/
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
sparkParameters.getLocalParametersMap(),
taskProps.getCmdTypeIfComplement(),
taskProps.getScheduleTime());
if (paramsMap != null) {
args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
}
sparkParameters.setMainArgs(args);
}

List<String> args = new ArrayList<>();

//spark version
String sparkCommand = SPARK2_COMMAND;

if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
sparkCommand = SPARK1_COMMAND;
}

args.add(sparkCommand);

// other parameters
args.addAll(SparkArgsUtils.buildArgs(sparkParameters));

String sparkArgs = String.join(" ", args);

logger.info("spark task command : {}", sparkArgs);

Assert.assertEquals(sparkArgs.split(" ")[0], SPARK2_COMMAND );

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,22 @@
</x-select>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('Spark Version')}}</div>
<div slot="content">
<x-select
style="width: 130px;"
v-model="sparkVersion"
:disabled="isDetails">
<x-option
v-for="city in sparkVersionList"
:key="city.code"
:value="city.code"
:label="city.code">
</x-option>
</x-select>
</div>
</m-list-box>
<m-list-box v-if="programType !== 'PYTHON'">
<div slot="text">{{$t('Main class')}}</div>
<div slot="content">
Expand Down Expand Up @@ -224,7 +240,11 @@
// Program type
programType: 'SCALA',
// Program type(List)
programTypeList: [{ code: 'JAVA' }, { code: 'SCALA' }, { code: 'PYTHON' }]
programTypeList: [{ code: 'JAVA' }, { code: 'SCALA' }, { code: 'PYTHON' }],
// Spark version
sparkVersion: 'SPARK2',
// Spark version(LIst)
sparkVersionList: [{ code: 'SPARK2' }, { code: 'SPARK1' }]
}
},
props: {
Expand Down Expand Up @@ -318,7 +338,8 @@
executorCores: this.executorCores,
mainArgs: this.mainArgs,
others: this.others,
programType: this.programType
programType: this.programType,
sparkVersion: this.sparkVersion
})
return true
},
Expand Down Expand Up @@ -366,6 +387,7 @@
this.mainArgs = o.params.mainArgs || ''
this.others = o.params.others
this.programType = o.params.programType || 'SCALA'
this.sparkVersion = o.params.sparkVersion || 'SPARK2'

// backfill resourceList
let resourceList = o.params.resourceList || []
Expand Down
4 changes: 2 additions & 2 deletions dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ export default {
'Please enter a positive integer': 'Please enter a positive integer',
'Program Type': 'Program Type',
'Main class': 'Main class',
'Please enter main class': 'Please enter main class',
'Main jar package': 'Main jar package',
'Please enter main jar package': 'Please enter main jar package',
'Command-line parameters': 'Command-line parameters',
Expand Down Expand Up @@ -505,5 +504,6 @@ export default {
'There is no data for this period of time': 'There is no data for this period of time',
'IP address cannot be empty': 'IP address cannot be empty',
'Please enter the correct IP': 'Please enter the correct IP',
'Please generate token': 'Please generate token'
'Please generate token': 'Please generate token',
'Spark Version': 'Spark Version'
}
3 changes: 2 additions & 1 deletion dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
Original file line number Diff line number Diff line change
Expand Up @@ -504,5 +504,6 @@ export default {
'There is no data for this period of time': '该时间段无数据',
'IP address cannot be empty': 'IP地址不能为空',
'Please enter the correct IP': '请输入正确的IP',
'Please generate token': '请生成Token'
'Please generate token': '请生成Token',
'Spark Version': 'Spark版本'
}