Skip to content

Commit

Permalink
[Improvement][server] WATERDROP task plug-in optimization in switch c…
Browse files Browse the repository at this point in the history
…ase code cleaning. (#3652)

* WATERDROP switch case code checkstyle.

* add TaskManagerTest.

* TaskManagerTest checkstyle.

* TaskManagerTest checkstyle.

* TaskManagerTest add pom maven-surefire

* TaskManagerTest update.
  • Loading branch information
zhuangchong authored Sep 14, 2020
1 parent 565b8d3 commit fad2852
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,8 @@ public static AbstractParameters getParameters(String taskType, String parameter
switch (EnumUtils.getEnum(TaskType.class, taskType)) {
case SUB_PROCESS:
return JSONUtils.parseObject(parameter, SubProcessParameters.class);
case WATERDROP:
return JSONUtils.parseObject(parameter, ShellParameters.class);
case SHELL:
case WATERDROP:
return JSONUtils.parseObject(parameter, ShellParameters.class);
case PROCEDURE:
return JSONUtils.parseObject(parameter, ProcedureParameters.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task;

package org.apache.dolphinscheduler.server.worker.task;

import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.EnumUtils;
Expand All @@ -30,49 +30,47 @@
import org.apache.dolphinscheduler.server.worker.task.spark.SparkTask;
import org.apache.dolphinscheduler.server.worker.task.sql.SqlTask;
import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopTask;

import org.slf4j.Logger;

/**
* task manaster
*/
public class TaskManager {

/**
* create new task
* @param taskExecutionContext taskExecutionContext
* @param logger logger
* @return AbstractTask
* @throws IllegalArgumentException illegal argument exception
*/
public static AbstractTask newTask(TaskExecutionContext taskExecutionContext,
Logger logger)
throws IllegalArgumentException {
switch (EnumUtils.getEnum(TaskType.class,taskExecutionContext.getTaskType())) {
case SHELL:
return new ShellTask(taskExecutionContext, logger);
case WATERDROP:
return new ShellTask(taskExecutionContext, logger);
case PROCEDURE:
return new ProcedureTask(taskExecutionContext, logger);
case SQL:
return new SqlTask(taskExecutionContext, logger);
case MR:
return new MapReduceTask(taskExecutionContext, logger);
case SPARK:
return new SparkTask(taskExecutionContext, logger);
case FLINK:
return new FlinkTask(taskExecutionContext, logger);
case PYTHON:
return new PythonTask(taskExecutionContext, logger);
case HTTP:
return new HttpTask(taskExecutionContext, logger);
case DATAX:
return new DataxTask(taskExecutionContext, logger);
case SQOOP:
return new SqoopTask(taskExecutionContext, logger);
default:
logger.error("unsupport task type: {}", taskExecutionContext.getTaskType());
throw new IllegalArgumentException("not support task type");
/**
* create new task
* @param taskExecutionContext taskExecutionContext
* @param logger logger
* @return AbstractTask
* @throws IllegalArgumentException illegal argument exception
*/
public static AbstractTask newTask(TaskExecutionContext taskExecutionContext, Logger logger) throws IllegalArgumentException {
switch (EnumUtils.getEnum(TaskType.class,taskExecutionContext.getTaskType())) {
case SHELL:
case WATERDROP:
return new ShellTask(taskExecutionContext, logger);
case PROCEDURE:
return new ProcedureTask(taskExecutionContext, logger);
case SQL:
return new SqlTask(taskExecutionContext, logger);
case MR:
return new MapReduceTask(taskExecutionContext, logger);
case SPARK:
return new SparkTask(taskExecutionContext, logger);
case FLINK:
return new FlinkTask(taskExecutionContext, logger);
case PYTHON:
return new PythonTask(taskExecutionContext, logger);
case HTTP:
return new HttpTask(taskExecutionContext, logger);
case DATAX:
return new DataxTask(taskExecutionContext, logger);
case SQOOP:
return new SqoopTask(taskExecutionContext, logger);
default:
logger.error("unsupport task type: {}", taskExecutionContext.getTaskType());
throw new IllegalArgumentException("not support task type");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.worker.task;

import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;

import java.util.Date;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class})
public class TaskManagerTest {

private TaskExecutionContext taskExecutionContext;

private Logger taskLogger;

private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager;

@Before
public void before() {
// init task execution context, logger
taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setProcessId(12345);
taskExecutionContext.setProcessDefineId(1);
taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskType("");
taskExecutionContext.setFirstSubmitTime(new Date());
taskExecutionContext.setDelayTime(0);
taskExecutionContext.setLogPath("/tmp/test.log");
taskExecutionContext.setHost("localhost");
taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4");

taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(
LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()
));

taskExecutionContextCacheManager = new TaskExecutionContextCacheManagerImpl();
taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);

PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class))
.thenReturn(taskExecutionContextCacheManager);
}

@Test
public void testNewTask() {

taskExecutionContext.setTaskType("SHELL");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
taskExecutionContext.setTaskType("WATERDROP");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
taskExecutionContext.setTaskType("HTTP");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
taskExecutionContext.setTaskType("MR");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
taskExecutionContext.setTaskType("SPARK");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
taskExecutionContext.setTaskType("FLINK");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
taskExecutionContext.setTaskType("PYTHON");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
taskExecutionContext.setTaskType("DATAX");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
taskExecutionContext.setTaskType("SQOOP");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
//taskExecutionContext.setTaskType(null);
//Assert.assertNull(TaskManager.newTask(taskExecutionContext,taskLogger));
//taskExecutionContext.setTaskType("XXX");
//Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
}
}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,7 @@
<!--<include>**/server/worker/task/datax/DataxTaskTest.java</include>-->
<!--<include>**/server/worker/task/http/HttpTaskTest.java</include>-->
<include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
<include>**/server/worker/task/TaskManagerTest.java</include>
<include>**/server/worker/EnvFileTest.java</include>
<include>**/server/worker/runner/TaskExecuteThreadTest.java</include>
<include>**/service/quartz/cron/CronUtilsTest.java</include>
Expand Down

0 comments on commit fad2852

Please sign in to comment.