Skip to content

Commit

Permalink
flink task support(flink 任务支持) (apache#711)
Browse files Browse the repository at this point in the history
* flink任务支持

* flink任务支持

* Update zh_CN.js

* Update FlinkArgsUtils.java

* Update .escheduler_env.sh
  • Loading branch information
LoveEvenWong authored and gary0416 committed Sep 23, 2019
1 parent 87d11a2 commit 6ecfc3e
Show file tree
Hide file tree
Showing 16 changed files with 1,153 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -906,4 +906,18 @@ public final class Constants {
* hive conf
*/
public static final String HIVE_CONF = "hiveconf:";

//flink 任务
public static final String FLINK_YARN_CLUSTER = "yarn-cluster";
public static final String FLINK_RUN_MODE = "-m";
public static final String FLINK_YARN_SLOT = "-ys";
public static final String FLINK_APP_NAME = "-ynm";
public static final String FLINK_TASK_MANAGE = "-yn";

public static final String FLINK_JOB_MANAGE_MEM = "-yjm";
public static final String FLINK_TASK_MANAGE_MEM = "-ytm";
public static final String FLINK_detach = "-d";
public static final String FLINK_MAIN_CLASS = "-c";


}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ public enum TaskType {
* 5 SPARK
* 6 PYTHON
* 7 DEPENDENT
* 8 FLINK
*/
SHELL,SQL, SUB_PROCESS,PROCEDURE,MR,SPARK,PYTHON,DEPENDENT;
SHELL,SQL, SUB_PROCESS,PROCEDURE,MR,SPARK,PYTHON,DEPENDENT,FLINK;

public static boolean typeIsNormalTask(String typeName) {
TaskType taskType = TaskType.valueOf(typeName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.task.flink;

import cn.escheduler.common.enums.ProgramType;
import cn.escheduler.common.process.ResourceInfo;
import cn.escheduler.common.task.AbstractParameters;

import java.util.List;
import java.util.stream.Collectors;

/**
* spark parameters
*/
public class FlinkParameters extends AbstractParameters {

/**
* major jar
*/
private ResourceInfo mainJar;

/**
* major class
*/
private String mainClass;

/**
* deploy mode yarn-cluster yarn-client yarn-local
*/
private String deployMode;

/**
* arguments
*/
private String mainArgs;

/**
* slot个数
*/
private int slot;

/**
*Yarn application的名字
*/

private String appName;

/**
* taskManager 数量
*/
private int taskManager;

/**
* jobManagerMemory 内存大小
*/
private String jobManagerMemory ;

/**
* taskManagerMemory内存大小
*/
private String taskManagerMemory;

/**
* resource list
*/
private List<ResourceInfo> resourceList;

/**
* The YARN queue to submit to
*/
private String queue;

/**
* other arguments
*/
private String others;

/**
* program type
* 0 JAVA,1 SCALA,2 PYTHON
*/
private ProgramType programType;

public ResourceInfo getMainJar() {
return mainJar;
}

public void setMainJar(ResourceInfo mainJar) {
this.mainJar = mainJar;
}

public String getMainClass() {
return mainClass;
}

public void setMainClass(String mainClass) {
this.mainClass = mainClass;
}

public String getDeployMode() {
return deployMode;
}

public void setDeployMode(String deployMode) {
this.deployMode = deployMode;
}

public String getMainArgs() {
return mainArgs;
}

public void setMainArgs(String mainArgs) {
this.mainArgs = mainArgs;
}

public int getSlot() {
return slot;
}

public void setSlot(int slot) {
this.slot = slot;
}

public String getAppName() {
return appName;
}

public void setAppName(String appName) {
this.appName = appName;
}

public int getTaskManager() {
return taskManager;
}

public void setTaskManager(int taskManager) {
this.taskManager = taskManager;
}

public String getJobManagerMemory() {
return jobManagerMemory;
}

public void setJobManagerMemory(String jobManagerMemory) {
this.jobManagerMemory = jobManagerMemory;
}

public String getTaskManagerMemory() {
return taskManagerMemory;
}

public void setTaskManagerMemory(String taskManagerMemory) {
this.taskManagerMemory = taskManagerMemory;
}

public String getQueue() {
return queue;
}

public void setQueue(String queue) {
this.queue = queue;
}

public List<ResourceInfo> getResourceList() {
return resourceList;
}

public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}

public String getOthers() {
return others;
}

public void setOthers(String others) {
this.others = others;
}

public ProgramType getProgramType() {
return programType;
}

public void setProgramType(ProgramType programType) {
this.programType = programType;
}

@Override
public boolean checkParameters() {
return mainJar != null && programType != null;
}


@Override
public List<String> getResourceFilesList() {
if(resourceList !=null ) {
this.resourceList.add(mainJar);
return resourceList.stream()
.map(p -> p.getRes()).collect(Collectors.toList());
}
return null;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import cn.escheduler.common.enums.TaskType;
import cn.escheduler.common.task.AbstractParameters;
import cn.escheduler.common.task.dependent.DependentParameters;
import cn.escheduler.common.task.flink.FlinkParameters;
import cn.escheduler.common.task.mr.MapreduceParameters;
import cn.escheduler.common.task.procedure.ProcedureParameters;
import cn.escheduler.common.task.python.PythonParameters;
Expand Down Expand Up @@ -63,6 +64,8 @@ public static AbstractParameters getParameters(String taskType, String parameter
return JSONUtils.parseObject(parameter, PythonParameters.class);
case DEPENDENT:
return JSONUtils.parseObject(parameter, DependentParameters.class);
case FLINK:
return JSONUtils.parseObject(parameter, FlinkParameters.class);
default:
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.server.utils;


import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.ProgramType;
import cn.escheduler.common.task.flink.FlinkParameters;
import org.apache.commons.lang.StringUtils;

import java.util.ArrayList;
import java.util.List;


/**
* spark args utils
*/
public class FlinkArgsUtils {

/**
* build args
* @param param
* @return
*/
public static List<String> buildArgs(FlinkParameters param) {
List<String> args = new ArrayList<>();

args.add(Constants.FLINK_RUN_MODE); //-m

args.add(Constants.FLINK_YARN_CLUSTER); //yarn-cluster

if (param.getSlot() != 0) {
args.add(Constants.FLINK_YARN_SLOT);
args.add(String.format("%d", param.getSlot())); //-ys
}

if (StringUtils.isNotEmpty(param.getAppName())) { //-ynm
args.add(Constants.FLINK_APP_NAME);
args.add(param.getAppName());
}

if (param.getTaskManager() != 0) { //-yn
args.add(Constants.FLINK_TASK_MANAGE);
args.add(String.format("%d", param.getTaskManager()));
}

if (StringUtils.isNotEmpty(param.getJobManagerMemory())) {
args.add(Constants.FLINK_JOB_MANAGE_MEM);
args.add(param.getJobManagerMemory()); //-yjm
}

if (StringUtils.isNotEmpty(param.getTaskManagerMemory())) { // -ytm
args.add(Constants.FLINK_TASK_MANAGE_MEM);
args.add(param.getTaskManagerMemory());
}
args.add(Constants.FLINK_detach); //-d


if(param.getProgramType() !=null ){
if(param.getProgramType()!=ProgramType.PYTHON){
if (StringUtils.isNotEmpty(param.getMainClass())) {
args.add(Constants.FLINK_MAIN_CLASS); //-c
args.add(param.getMainClass()); //main class
}
}
}

if (param.getMainJar() != null) {
args.add(param.getMainJar().getRes());
}


// --files --conf --libjar ...
if (StringUtils.isNotEmpty(param.getOthers())) {
String others = param.getOthers();
if(!others.contains("--queue")){
if (StringUtils.isNotEmpty(param.getQueue())) {
args.add(Constants.SPARK_QUEUE);
args.add(param.getQueue());
}
}
args.add(param.getOthers());
}else if (StringUtils.isNotEmpty(param.getQueue())) {
args.add(Constants.SPARK_QUEUE);
args.add(param.getQueue());

}

if (StringUtils.isNotEmpty(param.getMainArgs())) {
args.add(param.getMainArgs());
}

return args;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import cn.escheduler.common.enums.TaskType;
import cn.escheduler.common.process.Property;
import cn.escheduler.common.task.AbstractParameters;
import cn.escheduler.common.task.flink.FlinkParameters;
import cn.escheduler.common.task.mr.MapreduceParameters;
import cn.escheduler.common.task.procedure.ProcedureParameters;
import cn.escheduler.common.task.python.PythonParameters;
Expand Down Expand Up @@ -178,6 +179,8 @@ private Class getCurTaskParamsClass(){
case SPARK:
paramsClass = SparkParameters.class;
break;
case FLINK:
paramsClass = FlinkParameters.class;
case PYTHON:
paramsClass = PythonParameters.class;
break;
Expand Down
Loading

0 comments on commit 6ecfc3e

Please sign in to comment.