From 7d7313fcc0ed6a7bb0a397c63b4bb7e03854f2df Mon Sep 17 00:00:00 2001
From: Yichao Yang <1048262223@qq.com>
Date: Sun, 16 Aug 2020 18:57:34 +0800
Subject: [PATCH 01/53] [Fix][checkstyle] Fix checkstyle static type order
checker (#3520)
---
style/checkstyle.xml | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
diff --git a/style/checkstyle.xml b/style/checkstyle.xml
index 2a74b99a275e..d7283abafa16 100644
--- a/style/checkstyle.xml
+++ b/style/checkstyle.xml
@@ -219,12 +219,15 @@
^sun\.misc\.BASE64Decoder,
^jdk\.internal\.jline\.internal\.Nullable"/>
-
+
-
+
-
+
+
+
+
@@ -269,7 +272,9 @@
-
+
+
+
From ad89f433f16d65ddcab3186baa84e14762d03e52 Mon Sep 17 00:00:00 2001
From: dengc367
Date: Mon, 17 Aug 2020 17:28:46 +0800
Subject: [PATCH 02/53] [Feature]modify some cases from rockxsj:Feature-presto
to add presto datasource support (#3468)
* Feature presto (#1)
* * add presto datasource support
update .gitigonre to igonre some files
* * use another presto driver
* * add LICENSE files about presto-jdbc
* * just for test sonar
Co-authored-by: rockxsj
* modify the io.prestosql.jdbc.PrestoDriver to com.facebook.presto.jdbc.PrestoDriver
* add presto connection in sql node
Co-authored-by: rockxsj
---
.../org/apache/dolphinscheduler/common/Constants.java | 2 +-
dolphinscheduler-ui/src/js/conf/home/store/dag/state.js | 5 +++++
script/scp-hosts.sh | 8 ++++----
3 files changed, 10 insertions(+), 5 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 4f22e012410e..072a67f44f29 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -898,7 +898,7 @@ private Constants() {
public static final String COM_ORACLE_JDBC_DRIVER = "oracle.jdbc.driver.OracleDriver";
public static final String COM_SQLSERVER_JDBC_DRIVER = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
public static final String COM_DB2_JDBC_DRIVER = "com.ibm.db2.jcc.DB2Driver";
- public static final String COM_PRESTO_JDBC_DRIVER = "io.prestosql.jdbc.PrestoDriver";
+ public static final String COM_PRESTO_JDBC_DRIVER = "com.facebook.presto.jdbc.PrestoDriver";
/**
* database type
diff --git a/dolphinscheduler-ui/src/js/conf/home/store/dag/state.js b/dolphinscheduler-ui/src/js/conf/home/store/dag/state.js
index 05dfa7716148..e3c75b838fe0 100644
--- a/dolphinscheduler-ui/src/js/conf/home/store/dag/state.js
+++ b/dolphinscheduler-ui/src/js/conf/home/store/dag/state.js
@@ -96,6 +96,11 @@ export default {
id: 7,
code: 'DB2',
disabled: false
+ },
+ {
+ id: 8,
+ code: 'PRESTO',
+ disabled: false
}
],
// Alarm interface
diff --git a/script/scp-hosts.sh b/script/scp-hosts.sh
index 4a94cffa2950..9da94ab79cbc 100644
--- a/script/scp-hosts.sh
+++ b/script/scp-hosts.sh
@@ -33,8 +33,8 @@ for workerGroup in ${workersGroup[@]}
do
echo $workerGroup;
worker=`echo $workerGroup|awk -F':' '{print $1}'`
- groupName=`echo $workerGroup|awk -F':' '{print $2}'`
- workersGroupMap+=([$worker]=$groupName)
+ groupsName=`echo $workerGroup|awk -F':' '{print $2}'`
+ workersGroupMap+=([$worker]=$groupsName)
done
@@ -53,7 +53,7 @@ do
do
# if worker in workersGroupMap
if [[ "${workersGroupMap[${host}]}" ]] && [[ "${dsDir}" == "conf" ]]; then
- sed -i ${txt} "s#worker.group.*#worker.group=${workersGroupMap[${host}]}#g" ${dsDir}/worker.properties
+ sed -i ${txt} "s:.*worker.groups.*:worker.groups=${workersGroupMap[${host}]}:g" ${dsDir}/worker.properties
fi
echo "start to scp $dsDir to $host/$installPath"
@@ -61,4 +61,4 @@ do
done
echo "scp dirs to $host/$installPath complete"
-done
\ No newline at end of file
+done
From 0b7b6d4e2a600bb3af36363c4814da6b169317bf Mon Sep 17 00:00:00 2001
From: XiaotaoYi
Date: Tue, 18 Aug 2020 07:09:39 +0800
Subject: [PATCH 03/53] =?UTF-8?q?[bug-3480][server]fix=20ds=20muti-level?=
=?UTF-8?q?=20directory=20in=20zk,=20which=20lead=20to=20fa=E2=80=A6=20(#3?=
=?UTF-8?q?532)?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
* [bug-3480][server]fix ds muti-level directory in zk, which lead to fail to assign work
* miss whitespace for if statement
---
.../server/registry/ZookeeperNodeManager.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
index f039fb55326d..b1a5edee383f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
@@ -151,10 +151,10 @@ protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String
private String parseGroup(String path){
String[] parts = path.split("\\/");
- if(parts.length != 6){
+ if (parts.length < 6) {
throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path));
}
- String group = parts[4];
+ String group = parts[parts.length - 2];
return group;
}
}
From 59610a56610a4c7131818e3d1c70366d2bdd25ab Mon Sep 17 00:00:00 2001
From: Hsu Pu
Date: Tue, 18 Aug 2020 14:46:00 +0800
Subject: [PATCH 04/53] [Improvement][dao,server] unit test for ConditionsTask
(#3385)
---
.../dao/entity/TaskInstance.java | 29 ++-
.../dao/entity/TaskInstanceTest.java | 32 ++++
.../server/master/ConditionsTaskTest.java | 171 ++++++++++++------
pom.xml | 3 +-
4 files changed, 160 insertions(+), 75 deletions(-)
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index a90d9271547c..b82da62b0229 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -16,24 +16,23 @@
*/
package org.apache.dolphinscheduler.dao.entity;
-import com.baomidou.mybatisplus.annotation.IdType;
-import com.baomidou.mybatisplus.annotation.TableField;
-import com.fasterxml.jackson.annotation.JsonFormat;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.TaskNode;
-import org.apache.dolphinscheduler.common.utils.*;
-import com.baomidou.mybatisplus.annotation.IdType;
-import com.baomidou.mybatisplus.annotation.TableId;
-import com.baomidou.mybatisplus.annotation.TableName;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
import java.util.Date;
import java.util.Map;
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import com.fasterxml.jackson.annotation.JsonFormat;
+
/**
* task instance
*/
@@ -382,16 +381,16 @@ public void setAppLink(String appLink) {
this.appLink = appLink;
}
-
-
- public String getDependency(){
-
- if(this.dependency != null){
+ public String getDependency() {
+ if (this.dependency != null) {
return this.dependency;
}
TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class);
+ return taskNode == null ? null : taskNode.getDependence();
+ }
- return taskNode.getDependence();
+ public void setDependency(String dependency) {
+ this.dependency = dependency;
}
public Flag getFlag() {
@@ -495,10 +494,6 @@ public boolean taskCanRetry() {
}
}
- public void setDependency(String dependency) {
- this.dependency = dependency;
- }
-
public Priority getTaskInstancePriority() {
return taskInstancePriority;
}
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java
index 9c596708725a..5742c95a5de1 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java
@@ -16,6 +16,9 @@
*/
package org.apache.dolphinscheduler.dao.entity;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+
import org.junit.Assert;
import org.junit.Test;
@@ -43,7 +46,36 @@ public void testTaskInstanceIsSubProcess() {
//sub process
taskInstance.setTaskType("DEPENDENT");
Assert.assertTrue(taskInstance.isDependTask());
+ }
+
+ /**
+ * test for TaskInstance.getDependence
+ */
+ @Test
+ public void testTaskInstanceGetDependence() {
+ TaskInstance taskInstance;
+ TaskNode taskNode;
+
+ taskInstance = new TaskInstance();
+ taskInstance.setTaskJson(null);
+ Assert.assertNull(taskInstance.getDependency());
+
+ taskInstance = new TaskInstance();
+ taskNode = new TaskNode();
+ taskNode.setDependence(null);
+ taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
+ Assert.assertNull(taskInstance.getDependency());
+ taskInstance = new TaskInstance();
+ taskNode = new TaskNode();
+ // expect a JSON here, and will be unwrap when toJsonString
+ taskNode.setDependence("\"A\"");
+ taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
+ Assert.assertEquals("A", taskInstance.getDependency());
+ taskInstance = new TaskInstance();
+ taskInstance.setTaskJson(null);
+ taskInstance.setDependency("{}");
+ Assert.assertEquals("{}", taskInstance.getDependency());
}
}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
index f1ee8ccf11c1..61058de8640f 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
@@ -16,14 +16,26 @@
*/
package org.apache.dolphinscheduler.server.master;
-
+import org.apache.dolphinscheduler.common.enums.DependentRelation;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.model.DependentItem;
+import org.apache.dolphinscheduler.common.model.DependentTaskModel;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
+import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.ConditionsTaskExecThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -34,99 +46,144 @@
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
-import java.util.ArrayList;
-import java.util.List;
-
@RunWith(MockitoJUnitRunner.Silent.class)
public class ConditionsTaskTest {
-
private static final Logger logger = LoggerFactory.getLogger(DependentTaskTest.class);
- private ProcessService processService;
- private ApplicationContext applicationContext;
+ /**
+ * TaskNode.runFlag : task can be run normally
+ */
+ public static final String FLOWNODE_RUN_FLAG_NORMAL = "NORMAL";
+ private ProcessService processService;
- private MasterConfig config;
+ private ProcessInstance processInstance;
@Before
public void before() {
- config = new MasterConfig();
+ ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class);
+ SpringApplicationContext springApplicationContext = new SpringApplicationContext();
+ springApplicationContext.setApplicationContext(applicationContext);
+
+ MasterConfig config = new MasterConfig();
+ Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
config.setMasterTaskCommitRetryTimes(3);
config.setMasterTaskCommitInterval(1000);
+
processService = Mockito.mock(ProcessService.class);
- applicationContext = Mockito.mock(ApplicationContext.class);
- SpringApplicationContext springApplicationContext = new SpringApplicationContext();
- springApplicationContext.setApplicationContext(applicationContext);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
- Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
+ processInstance = getProcessInstance();
Mockito.when(processService
- .findTaskInstanceById(252612))
- .thenReturn(getTaskInstance());
+ .findProcessInstanceById(processInstance.getId()))
+ .thenReturn(processInstance);
+ }
- Mockito.when(processService.saveTaskInstance(getTaskInstance()))
- .thenReturn(true);
+ private TaskInstance testBasicInit(ExecutionStatus expectResult) {
+ TaskInstance taskInstance = getTaskInstance(getTaskNode(), processInstance);
- Mockito.when(processService.findProcessInstanceById(10112))
- .thenReturn(getProcessInstance());
+ // for MasterBaseTaskExecThread.submit
+ Mockito.when(processService
+ .submitTask(taskInstance))
+ .thenReturn(taskInstance);
+ // for MasterBaseTaskExecThread.call
+ Mockito.when(processService
+ .findTaskInstanceById(taskInstance.getId()))
+ .thenReturn(taskInstance);
+ // for ConditionsTaskExecThread.initTaskParameters
+ Mockito.when(processService
+ .saveTaskInstance(taskInstance))
+ .thenReturn(true);
+ // for ConditionsTaskExecThread.updateTaskState
+ Mockito.when(processService
+ .updateTaskInstance(taskInstance))
+ .thenReturn(true);
+ // for ConditionsTaskExecThread.waitTaskQuit
+ List conditions = Stream.of(
+ getTaskInstanceForValidTaskList(1001, "1", expectResult)
+ ).collect(Collectors.toList());
Mockito.when(processService
- .findValidTaskListByProcessId(10112))
- .thenReturn(getTaskInstances());
+ .findValidTaskListByProcessId(processInstance.getId()))
+ .thenReturn(conditions);
+
+ return taskInstance;
}
@Test
- public void testCondition(){
- TaskInstance taskInstance = getTaskInstance();
- String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"depTasks\":\"1\",\"status\":\"SUCCESS\"}],\"relation\":\"AND\"}],\"relation\":\"AND\"}";
- String conditionResult = "{\"successNode\":[\"2\"],\"failedNode\":[\"3\"]}";
+ public void testBasicSuccess() throws Exception {
+ TaskInstance taskInstance = testBasicInit(ExecutionStatus.SUCCESS);
+ ConditionsTaskExecThread taskExecThread = new ConditionsTaskExecThread(taskInstance);
+ taskExecThread.call();
+ Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecThread.getTaskInstance().getState());
+ }
- taskInstance.setDependency(dependString);
- Mockito.when(processService.submitTask(taskInstance))
- .thenReturn(taskInstance);
- ConditionsTaskExecThread conditions =
- new ConditionsTaskExecThread(taskInstance);
+ @Test
+ public void testBasicFailure() throws Exception {
+ TaskInstance taskInstance = testBasicInit(ExecutionStatus.FAILURE);
+ ConditionsTaskExecThread taskExecThread = new ConditionsTaskExecThread(taskInstance);
+ taskExecThread.call();
+ Assert.assertEquals(ExecutionStatus.FAILURE, taskExecThread.getTaskInstance().getState());
+ }
- try {
- conditions.call();
- } catch (Exception e) {
- e.printStackTrace();
- }
+ private TaskNode getTaskNode() {
+ TaskNode taskNode = new TaskNode();
+ taskNode.setId("tasks-1000");
+ taskNode.setName("C");
+ taskNode.setType(TaskType.CONDITIONS.toString());
+ taskNode.setRunFlag(FLOWNODE_RUN_FLAG_NORMAL);
- Assert.assertEquals(ExecutionStatus.SUCCESS, conditions.getTaskInstance().getState());
- }
+ DependentItem dependentItem = new DependentItem();
+ dependentItem.setDepTasks("1");
+ dependentItem.setStatus(ExecutionStatus.SUCCESS);
+ DependentTaskModel dependentTaskModel = new DependentTaskModel();
+ dependentTaskModel.setDependItemList(Stream.of(dependentItem).collect(Collectors.toList()));
+ dependentTaskModel.setRelation(DependentRelation.AND);
- private TaskInstance getTaskInstance(){
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setId(252612);
- taskInstance.setName("C");
- taskInstance.setTaskType("CONDITIONS");
- taskInstance.setProcessInstanceId(10112);
- taskInstance.setProcessDefinitionId(100001);
- return taskInstance;
- }
+ DependentParameters dependentParameters = new DependentParameters();
+ dependentParameters.setDependTaskList(Stream.of(dependentTaskModel).collect(Collectors.toList()));
+ dependentParameters.setRelation(DependentRelation.AND);
+ // in: AND(AND(1 is SUCCESS))
+ taskNode.setDependence(JSONUtils.toJsonString(dependentParameters));
+ ConditionsParameters conditionsParameters = new ConditionsParameters();
+ conditionsParameters.setSuccessNode(Stream.of("2").collect(Collectors.toList()));
+ conditionsParameters.setFailedNode(Stream.of("3").collect(Collectors.toList()));
- private List getTaskInstances(){
- List list = new ArrayList<>();
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setId(199999);
- taskInstance.setName("1");
- taskInstance.setState(ExecutionStatus.SUCCESS);
- list.add(taskInstance);
- return list;
+ // out: SUCCESS => 2, FAILED => 3
+ taskNode.setConditionResult(JSONUtils.toJsonString(conditionsParameters));
+
+ return taskNode;
}
- private ProcessInstance getProcessInstance(){
+ private ProcessInstance getProcessInstance() {
ProcessInstance processInstance = new ProcessInstance();
- processInstance.setId(10112);
- processInstance.setProcessDefinitionId(100001);
+ processInstance.setId(1000);
+ processInstance.setProcessDefinitionId(1000);
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
return processInstance;
}
+ private TaskInstance getTaskInstance(TaskNode taskNode, ProcessInstance processInstance) {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1000);
+ taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
+ taskInstance.setName(taskNode.getName());
+ taskInstance.setTaskType(taskNode.getType());
+ taskInstance.setProcessInstanceId(processInstance.getId());
+ taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId());
+ return taskInstance;
+ }
+
+ private TaskInstance getTaskInstanceForValidTaskList(int id, String name, ExecutionStatus state) {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(id);
+ taskInstance.setName(name);
+ taskInstance.setState(state);
+ return taskInstance;
+ }
}
diff --git a/pom.xml b/pom.xml
index 5a3590affd48..2937a814fc1e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -797,6 +797,7 @@
**/dao/mapper/CommandMapperTest.java**/dao/mapper/ConnectionFactoryTest.java**/dao/mapper/DataSourceMapperTest.java
+ **/dao/entity/TaskInstanceTest.java**/dao/entity/UdfFuncTest.java**/remote/JsonSerializerTest.java**/remote/RemoveTaskLogResponseCommandTest.java
@@ -820,7 +821,7 @@
**/server/master/AlertManagerTest.java**/server/master/MasterCommandTest.java**/server/master/DependentTaskTest.java
-
+ **/server/master/ConditionsTaskTest.java**/server/master/MasterExecThreadTest.java**/server/master/ParamsTest.java**/server/register/ZookeeperNodeManagerTest.java
From 00e0003aed207adc33d1c4068977b172fa21d3f0 Mon Sep 17 00:00:00 2001
From: xingchun-chen <55787491+xingchun-chen@users.noreply.github.com>
Date: Tue, 18 Aug 2020 17:27:36 +0800
Subject: [PATCH 05/53] [test-2995][e2e]Add task connection (#3524)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
* Add task connection
* Optimize test cases
* Modify variable format
* Optimize test cases
* Update BrowserCommon.java
* Update BrowserCommon.java
* Update WorkflowDefineLocator.java
Co-authored-by: chenxingchun <438044805@qq.com>
---
.../common/BrowserCommon.java | 40 +++++++++++++++----
.../project/WorkflowDefineLocator.java | 12 +++++-
.../page/project/WorkflowDefinePage.java | 5 +++
3 files changed, 48 insertions(+), 9 deletions(-)
diff --git a/e2e/src/test/java/org/apache/dolphinscheduler/common/BrowserCommon.java b/e2e/src/test/java/org/apache/dolphinscheduler/common/BrowserCommon.java
index 437e81cfbb0d..2eade95488fd 100644
--- a/e2e/src/test/java/org/apache/dolphinscheduler/common/BrowserCommon.java
+++ b/e2e/src/test/java/org/apache/dolphinscheduler/common/BrowserCommon.java
@@ -223,22 +223,46 @@ public Actions moveToElement(By locator) {
/**
* mouse drag element
*
- * @param source_locator BY
- * @param target_locator BY
+ * @param sourceLocator BY
+ * @param targetLocator BY
*/
- public void dragAndDrop(By source_locator, By target_locator) {
- WebElement sourceElement = locateElement(source_locator);
- WebElement targetElement = locateElement(target_locator);
+ public void dragAndDrop(By sourceLocator, By targetLocator) {
+ WebElement sourceElement = locateElement(sourceLocator);
+ WebElement targetElement = locateElement(targetLocator);
actions.dragAndDrop(sourceElement, targetElement).perform();
actions.release();
}
- public void moveToDragElement(By target_locator, int X, int Y) {
- WebElement targetElement = locateElement(target_locator);
- actions.dragAndDropBy(targetElement, X, Y).perform();
+ public void moveToDragElement(By targetLocator, int x, int y) {
+ WebElement targetElement = locateElement(targetLocator);
+ actions.dragAndDropBy(targetElement, x, y).perform();
actions.release();
}
+ /**
+ * Right mouse click on the element
+ *
+ * @param locator By
+ * @return actions
+ */
+ public void mouseRightClickElement(By locator) {
+ WebElement mouseRightClickElement = locateElement(locator);
+ actions.contextClick(mouseRightClickElement).perform();
+ }
+
+ /**
+ * The mouse moves from a position to a specified positionØ
+ *
+ * @param sourceLocator BY
+ * @param targetLocator BY
+ * @return actions
+ */
+ public void mouseMovePosition(By sourceLocator, By targetLocator) throws InterruptedException {
+ WebElement sourceElement = locateElement(sourceLocator);
+ WebElement targetElement = locateElement(targetLocator);
+ actions.dragAndDrop(sourceElement,targetElement).perform();
+ actions.click();
+ }
/**
* jump page
diff --git a/e2e/src/test/java/org/apache/dolphinscheduler/locator/project/WorkflowDefineLocator.java b/e2e/src/test/java/org/apache/dolphinscheduler/locator/project/WorkflowDefineLocator.java
index bfb237ccd27b..a70b22eacf7a 100644
--- a/e2e/src/test/java/org/apache/dolphinscheduler/locator/project/WorkflowDefineLocator.java
+++ b/e2e/src/test/java/org/apache/dolphinscheduler/locator/project/WorkflowDefineLocator.java
@@ -120,12 +120,22 @@ public class WorkflowDefineLocator {
//click submit button
public static final By CLICK_SUBMIT_BUTTON = By.xpath("//div[3]/div/button[2]/span");
+ //copy task
+ public static final By MOUSE_RIGHT_CLICK = By.xpath("//div[2]/div[2]/div/div/div/div/div[2]");
+ public static final By COPY_TASK = By.xpath("//a[3]/span");
+
+ //click line
+ public static final By CLICK_LINE = By.xpath("//a[@id='line']/button/i");
+
+ public static final By LINE_SOURCES_TASK = By.xpath("//div[@id='canvas']/div[1]/div[2]");
+
+ public static final By LINE_TARGET_TASK = By.xpath("//div[@id='canvas']/div[2]/div[2]");
/**
* save workflow
*/
//click save workflow button
- public static final By CLICK_SAVE_WORKFLOW_BUTTON = By.xpath("//div[2]/div[1]/div[2]/button[2]");
+ public static final By CLICK_SAVE_WORKFLOW_BUTTON = By.xpath("//div[2]/div[1]/div[2]/button[2]/span");
//input workflow name
public static final By INPUT_WORKFLOW_NAME = By.xpath("//input");
diff --git a/e2e/src/test/java/org/apache/dolphinscheduler/page/project/WorkflowDefinePage.java b/e2e/src/test/java/org/apache/dolphinscheduler/page/project/WorkflowDefinePage.java
index 9e462805da45..83442562ca45 100644
--- a/e2e/src/test/java/org/apache/dolphinscheduler/page/project/WorkflowDefinePage.java
+++ b/e2e/src/test/java/org/apache/dolphinscheduler/page/project/WorkflowDefinePage.java
@@ -131,6 +131,11 @@ public boolean createWorkflow() throws InterruptedException {
System.out.println("move to Dag Element ");
moveToDragElement(WorkflowDefineLocator.MOUSE_MOVE_SHELL_AT_DAG,-300,-100);
+ System.out.println("copy task");
+ mouseRightClickElement(WorkflowDefineLocator.MOUSE_RIGHT_CLICK);
+ clickButton(WorkflowDefineLocator.COPY_TASK);
+ clickButton(WorkflowDefineLocator.CLICK_LINE);
+ mouseMovePosition(WorkflowDefineLocator.LINE_SOURCES_TASK,WorkflowDefineLocator.LINE_TARGET_TASK);
return ifTitleContains(WorkflowDefineData.CREATE_WORKFLOW_TITLE);
}
From a4ee351a3af1b05bafab48699f846e2b3ac226eb Mon Sep 17 00:00:00 2001
From: vanilla111 <1115690319@qq.com>
Date: Wed, 19 Aug 2020 11:36:30 +0800
Subject: [PATCH 06/53] delay execution of tasks and improve some designs
(#3427)
---
.../api/dto/TaskCountDto.java | 24 +-
.../dolphinscheduler/common/Constants.java | 17 +-
.../common/enums/ExecutionStatus.java | 109 +++--
.../common/enums/TaskStateType.java | 10 +-
.../common/model/TaskNode.java | 50 +-
.../common/utils/DateUtils.java | 444 +++++++++++++++++-
.../common/enums/ExecutionStatusTest.java | 32 ++
.../dao/entity/TaskInstance.java | 127 +++--
.../remote/codec/NettyDecoder.java | 1 +
.../builder/TaskExecutionContextBuilder.java | 2 +
.../server/entity/TaskExecutionContext.java | 171 ++++---
.../runner/ConditionsTaskExecThread.java | 1 +
.../runner/DependentTaskExecThread.java | 1 +
.../runner/MasterBaseTaskExecThread.java | 21 +-
.../master/runner/MasterExecThread.java | 54 ++-
.../master/runner/MasterTaskExecThread.java | 17 +-
.../processor/TaskExecuteProcessor.java | 39 +-
.../worker/runner/TaskExecuteThread.java | 98 +++-
.../worker/task/AbstractCommandExecutor.java | 22 +-
.../TaskPriorityQueueConsumerTest.java | 3 +-
.../dispatch/ExecutorDispatcherTest.java | 3 +-
.../executor/NettyExecutorManagerTest.java | 3 +-
.../host/RoundRobinHostManagerTest.java | 3 +-
.../queue/TaskResponseServiceTest.java | 4 +-
.../master/registry/MasterRegistryTest.java | 4 +-
.../worker/runner/TaskExecuteThreadTest.java | 171 +++++++
.../service/process/ProcessService.java | 98 +++-
pom.xml | 2 +
sql/dolphinscheduler-postgre.sql | 4 +-
sql/dolphinscheduler_mysql.sql | 2 +
.../mysql/dolphinscheduler_ddl.sql | 1 -
.../mysql/dolphinscheduler_ddl.sql | 58 +++
.../mysql/dolphinscheduler_dml.sql | 16 +
.../postgresql/dolphinscheduler_ddl.sql | 52 ++
.../postgresql/dolphinscheduler_dml.sql | 16 +
35 files changed, 1383 insertions(+), 297 deletions(-)
create mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/enums/ExecutionStatusTest.java
create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
create mode 100644 sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql
create mode 100644 sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_dml.sql
create mode 100644 sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql
create mode 100644 sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_dml.sql
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java
index fa7588f2edd4..35aaaf34dd41 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java
@@ -42,9 +42,10 @@ public TaskCountDto(List taskInstanceStateCounts) {
countTaskDtos(taskInstanceStateCounts);
}
- private void countTaskDtos(List taskInstanceStateCounts){
+ private void countTaskDtos(List taskInstanceStateCounts) {
int submittedSuccess = 0;
- int runningExeution = 0;
+ int runningExecution = 0;
+ int delayExecution = 0;
int readyPause = 0;
int pause = 0;
int readyStop = 0;
@@ -55,15 +56,18 @@ private void countTaskDtos(List taskInstanceStateCounts){
int kill = 0;
int waittingThread = 0;
- for(ExecuteStatusCount taskInstanceStateCount : taskInstanceStateCounts){
+ for (ExecuteStatusCount taskInstanceStateCount : taskInstanceStateCounts) {
ExecutionStatus status = taskInstanceStateCount.getExecutionStatus();
totalCount += taskInstanceStateCount.getCount();
- switch (status){
+ switch (status) {
case SUBMITTED_SUCCESS:
submittedSuccess += taskInstanceStateCount.getCount();
break;
case RUNNING_EXECUTION:
- runningExeution += taskInstanceStateCount.getCount();
+ runningExecution += taskInstanceStateCount.getCount();
+ break;
+ case DELAY_EXECUTION:
+ delayExecution += taskInstanceStateCount.getCount();
break;
case READY_PAUSE:
readyPause += taskInstanceStateCount.getCount();
@@ -93,13 +97,14 @@ private void countTaskDtos(List taskInstanceStateCounts){
waittingThread += taskInstanceStateCount.getCount();
break;
- default:
- break;
+ default:
+ break;
}
}
this.taskCountDtos = new ArrayList<>();
this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.SUBMITTED_SUCCESS, submittedSuccess));
- this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.RUNNING_EXECUTION, runningExeution));
+ this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.RUNNING_EXECUTION, runningExecution));
+ this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.DELAY_EXECUTION, delayExecution));
this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.READY_PAUSE, readyPause));
this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.PAUSE, pause));
this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.READY_STOP, readyStop));
@@ -111,8 +116,7 @@ private void countTaskDtos(List taskInstanceStateCounts){
this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.WAITTING_THREAD, waittingThread));
}
-
- public List getTaskCountDtos(){
+ public List getTaskCountDtos() {
return taskCountDtos;
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 072a67f44f29..cee83e73bcbc 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -138,7 +138,7 @@ private Constants() {
/**
* python home
*/
- public static final String PYTHON_HOME="PYTHON_HOME";
+ public static final String PYTHON_HOME = "PYTHON_HOME";
/**
* resource.view.suffixs
@@ -366,7 +366,6 @@ private Constants() {
public static final double DEFAULT_WORKER_RESERVED_MEMORY = OSUtils.totalMemorySize() / 10;
-
/**
* default log cache rows num,output when reach the number
*/
@@ -752,7 +751,7 @@ private Constants() {
/**
- * preview schedule execute count
+ * preview schedule execute count
*/
public static final int PREVIEW_SCHEDULE_EXECUTE_COUNT = 5;
@@ -832,6 +831,7 @@ private Constants() {
public static final int[] NOT_TERMINATED_STATES = new int[]{
ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
+ ExecutionStatus.DELAY_EXECUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
ExecutionStatus.READY_STOP.ordinal(),
ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
@@ -852,18 +852,17 @@ private Constants() {
/**
* data total
*/
- public static final String COUNT = "count";
+ public static final String COUNT = "count";
/**
* page size
*/
- public static final String PAGE_SIZE = "pageSize";
+ public static final String PAGE_SIZE = "pageSize";
/**
* current page no
*/
- public static final String PAGE_NUMBER = "pageNo";
-
+ public static final String PAGE_NUMBER = "pageNo";
/**
@@ -966,11 +965,11 @@ private Constants() {
/**
* authorize writable perm
*/
- public static final int AUTHORIZE_WRITABLE_PERM=7;
+ public static final int AUTHORIZE_WRITABLE_PERM = 7;
/**
* authorize readable perm
*/
- public static final int AUTHORIZE_READABLE_PERM=4;
+ public static final int AUTHORIZE_READABLE_PERM = 4;
/**
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
index 6ea02ef0967e..f6ac2cf5ab43 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
@@ -14,16 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.common.enums;
+import java.util.HashMap;
import com.baomidou.mybatisplus.annotation.EnumValue;
-import java.util.HashMap;
-
/**
* running status for workflow and task nodes
- *
*/
public enum ExecutionStatus {
@@ -41,6 +40,7 @@ public enum ExecutionStatus {
* 9 kill
* 10 waiting thread
* 11 waiting depend node complete
+ * 12 delay execution
*/
SUBMITTED_SUCCESS(0, "submit success"),
RUNNING_EXECUTION(1, "running"),
@@ -53,9 +53,10 @@ public enum ExecutionStatus {
NEED_FAULT_TOLERANCE(8, "need fault tolerance"),
KILL(9, "kill"),
WAITTING_THREAD(10, "waiting thread"),
- WAITTING_DEPEND(11, "waiting depend node complete");
+ WAITTING_DEPEND(11, "waiting depend node complete"),
+ DELAY_EXECUTION(12, "delay execution");
- ExecutionStatus(int code, String descp){
+ ExecutionStatus(int code, String descp) {
this.code = code;
this.descp = descp;
}
@@ -64,77 +65,85 @@ public enum ExecutionStatus {
private final int code;
private final String descp;
- private static HashMap EXECUTION_STATUS_MAP=new HashMap<>();
+ private static HashMap EXECUTION_STATUS_MAP = new HashMap<>();
static {
- for (ExecutionStatus executionStatus:ExecutionStatus.values()){
- EXECUTION_STATUS_MAP.put(executionStatus.code,executionStatus);
- }
+ for (ExecutionStatus executionStatus : ExecutionStatus.values()) {
+ EXECUTION_STATUS_MAP.put(executionStatus.code, executionStatus);
+ }
}
- /**
- * status is success
- * @return status
- */
- public boolean typeIsSuccess(){
- return this == SUCCESS;
- }
-
- /**
- * status is failure
- * @return status
- */
- public boolean typeIsFailure(){
- return this == FAILURE || this == NEED_FAULT_TOLERANCE || this == KILL;
- }
-
- /**
- * status is finished
- * @return status
- */
- public boolean typeIsFinished(){
-
- return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause()
- || typeIsStop();
- }
+ /**
+ * status is success
+ *
+ * @return status
+ */
+ public boolean typeIsSuccess() {
+ return this == SUCCESS;
+ }
+
+ /**
+ * status is failure
+ *
+ * @return status
+ */
+ public boolean typeIsFailure() {
+ return this == FAILURE || this == NEED_FAULT_TOLERANCE || this == KILL;
+ }
+
+ /**
+ * status is finished
+ *
+ * @return status
+ */
+ public boolean typeIsFinished() {
+ return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause()
+ || typeIsStop();
+ }
/**
* status is waiting thread
+ *
* @return status
*/
- public boolean typeIsWaitingThread(){
- return this == WAITTING_THREAD;
- }
+ public boolean typeIsWaitingThread() {
+ return this == WAITTING_THREAD;
+ }
/**
* status is pause
+ *
* @return status
*/
- public boolean typeIsPause(){
- return this == PAUSE;
- }
+ public boolean typeIsPause() {
+ return this == PAUSE;
+ }
+
/**
* status is pause
+ *
* @return status
*/
- public boolean typeIsStop(){
+ public boolean typeIsStop() {
return this == STOP;
}
/**
* status is running
+ *
* @return status
*/
- public boolean typeIsRunning(){
- return this == RUNNING_EXECUTION || this == WAITTING_DEPEND;
- }
+ public boolean typeIsRunning() {
+ return this == RUNNING_EXECUTION || this == WAITTING_DEPEND || this == DELAY_EXECUTION;
+ }
/**
* status is cancel
+ *
* @return status
*/
- public boolean typeIsCancel(){
- return this == KILL || this == STOP ;
+ public boolean typeIsCancel() {
+ return this == KILL || this == STOP;
}
public int getCode() {
@@ -145,10 +154,10 @@ public String getDescp() {
return descp;
}
- public static ExecutionStatus of(int status){
- if(EXECUTION_STATUS_MAP.containsKey(status)){
- return EXECUTION_STATUS_MAP.get(status);
- }
+ public static ExecutionStatus of(int status) {
+ if (EXECUTION_STATUS_MAP.containsKey(status)) {
+ return EXECUTION_STATUS_MAP.get(status);
+ }
throw new IllegalArgumentException("invalid status : " + status);
}
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java
index 11ab8560b7ec..36766a7f4d04 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java
@@ -31,12 +31,13 @@ public enum TaskStateType {
/**
* convert task state to execute status integer array ;
+ *
* @param taskStateType task state type
* @return result of execution status
*/
- public static int[] convert2ExecutStatusIntArray(TaskStateType taskStateType){
+ public static int[] convert2ExecutStatusIntArray(TaskStateType taskStateType) {
- switch (taskStateType){
+ switch (taskStateType) {
case SUCCESS:
return new int[]{ExecutionStatus.SUCCESS.ordinal()};
case FAILED:
@@ -51,14 +52,15 @@ public static int[] convert2ExecutStatusIntArray(TaskStateType taskStateType){
case RUNNING:
return new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
+ ExecutionStatus.DELAY_EXECUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
ExecutionStatus.READY_STOP.ordinal()};
case WAITTING:
return new int[]{
ExecutionStatus.SUBMITTED_SUCCESS.ordinal()
};
- default:
- break;
+ default:
+ break;
}
return new int[0];
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
index f79439645735..cd3e573b16dc 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
@@ -136,6 +136,11 @@ public class TaskNode {
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String timeout;
+ /**
+ * delay execution time.
+ */
+ private int delayTime;
+
public String getId() {
return id;
}
@@ -310,24 +315,25 @@ public boolean isConditionsTask(){
@Override
public String toString() {
- return "TaskNode{" +
- "id='" + id + '\'' +
- ", name='" + name + '\'' +
- ", desc='" + desc + '\'' +
- ", type='" + type + '\'' +
- ", runFlag='" + runFlag + '\'' +
- ", loc='" + loc + '\'' +
- ", maxRetryTimes=" + maxRetryTimes +
- ", retryInterval=" + retryInterval +
- ", params='" + params + '\'' +
- ", preTasks='" + preTasks + '\'' +
- ", extras='" + extras + '\'' +
- ", depList=" + depList +
- ", dependence='" + dependence + '\'' +
- ", taskInstancePriority=" + taskInstancePriority +
- ", timeout='" + timeout + '\'' +
- ", workerGroup='" + workerGroup + '\'' +
- '}';
+ return "TaskNode{"
+ + "id='" + id + '\''
+ + ", name='" + name + '\''
+ + ", desc='" + desc + '\''
+ + ", type='" + type + '\''
+ + ", runFlag='" + runFlag + '\''
+ + ", loc='" + loc + '\''
+ + ", maxRetryTimes=" + maxRetryTimes
+ + ", retryInterval=" + retryInterval
+ + ", params='" + params + '\''
+ + ", preTasks='" + preTasks + '\''
+ + ", extras='" + extras + '\''
+ + ", depList=" + depList
+ + ", dependence='" + dependence + '\''
+ + ", taskInstancePriority=" + taskInstancePriority
+ + ", timeout='" + timeout + '\''
+ + ", workerGroup='" + workerGroup + '\''
+ + ", delayTime=" + delayTime
+ + '}';
}
public String getWorkerGroup() {
@@ -353,4 +359,12 @@ public Integer getWorkerGroupId() {
public void setWorkerGroupId(Integer workerGroupId) {
this.workerGroupId = workerGroupId;
}
+
+ public int getDelayTime() {
+ return delayTime;
+ }
+
+ public void setDelayTime(int delayTime) {
+ this.delayTime = delayTime;
+ }
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java
index 1033816e8e87..6cd1d5867e76 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java
@@ -1 +1,443 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Calendar;
import java.util.Date;
/**
* date utils
*/
public class DateUtils {
private static final Logger logger = LoggerFactory.getLogger(DateUtils.class);
/**
* date to local datetime
*
* @param date date
* @return local datetime
*/
private static LocalDateTime date2LocalDateTime(Date date) {
return LocalDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault());
}
/**
* local datetime to date
*
* @param localDateTime local datetime
* @return date
*/
private static Date localDateTime2Date(LocalDateTime localDateTime) {
Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant();
return Date.from(instant);
}
/**
* get current date str
*
* @return date string
*/
public static String getCurrentTime() {
return getCurrentTime(Constants.YYYY_MM_DD_HH_MM_SS);
}
/**
* get the date string in the specified format of the current time
*
* @param format date format
* @return date string
*/
public static String getCurrentTime(String format) {
return LocalDateTime.now().format(DateTimeFormatter.ofPattern(format));
}
/**
* get the formatted date string
*
* @param date date
* @param format e.g. yyyy-MM-dd HH:mm:ss
* @return date string
*/
public static String format(Date date, String format) {
return format(date2LocalDateTime(date), format);
}
/**
* get the formatted date string
*
* @param localDateTime local data time
* @param format yyyy-MM-dd HH:mm:ss
* @return date string
*/
public static String format(LocalDateTime localDateTime, String format) {
return localDateTime.format(DateTimeFormatter.ofPattern(format));
}
/**
* convert time to yyyy-MM-dd HH:mm:ss format
*
* @param date date
* @return date string
*/
public static String dateToString(Date date) {
return format(date, Constants.YYYY_MM_DD_HH_MM_SS);
}
/**
* convert string to date and time
*
* @param date date
* @param format format
* @return date
*/
public static Date parse(String date, String format) {
try {
LocalDateTime ldt = LocalDateTime.parse(date, DateTimeFormatter.ofPattern(format));
return localDateTime2Date(ldt);
} catch (Exception e) {
logger.error("error while parse date:" + date, e);
}
return null;
}
/**
* convert date str to yyyy-MM-dd HH:mm:ss format
*
* @param str date string
* @return yyyy-MM-dd HH:mm:ss format
*/
public static Date stringToDate(String str) {
return parse(str, Constants.YYYY_MM_DD_HH_MM_SS);
}
/**
* get seconds between two dates
*
* @param d1 date1
* @param d2 date2
* @return differ seconds
*/
public static long differSec(Date d1, Date d2) {
if(d1 == null || d2 == null){
return 0;
}
return (long) Math.ceil(differMs(d1, d2) / 1000.0);
}
/**
* get ms between two dates
*
* @param d1 date1
* @param d2 date2
* @return differ ms
*/
public static long differMs(Date d1, Date d2) {
return Math.abs(d1.getTime() - d2.getTime());
}
/**
* get hours between two dates
*
* @param d1 date1
* @param d2 date2
* @return differ hours
*/
public static long diffHours(Date d1, Date d2) {
return (long) Math.ceil(diffMin(d1, d2) / 60.0);
}
/**
* get minutes between two dates
*
* @param d1 date1
* @param d2 date2
* @return differ minutes
*/
public static long diffMin(Date d1, Date d2) {
return (long) Math.ceil(differSec(d1, d2) / 60.0);
}
/**
* get the date of the specified date in the days before and after
*
* @param date date
* @param day day
* @return the date of the specified date in the days before and after
*/
public static Date getSomeDay(Date date, int day) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
calendar.add(Calendar.DATE, day);
return calendar.getTime();
}
/**
* get the hour of day.
*
* @param date date
* @return hour of day
*/
public static int getHourIndex(Date date) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
return calendar.get(Calendar.HOUR_OF_DAY);
}
/**
* compare two dates
*
* @param future future date
* @param old old date
* @return true if future time greater than old time
*/
public static boolean compare(Date future, Date old) {
return future.getTime() > old.getTime();
}
/**
* convert schedule string to date
*
* @param schedule schedule
* @return convert schedule string to date
*/
public static Date getScheduleDate(String schedule) {
return stringToDate(schedule);
}
/**
* format time to readable
*
* @param ms ms
* @return format time
*/
public static String format2Readable(long ms) {
long days = ms / (1000 * 60 * 60 * 24);
long hours = (ms % (1000 * 60 * 60 * 24)) / (1000 * 60 * 60);
long minutes = (ms % (1000 * 60 * 60)) / (1000 * 60);
long seconds = (ms % (1000 * 60)) / 1000;
return String.format("%02d %02d:%02d:%02d", days, hours, minutes, seconds);
}
/**
* get monday
*
* note: Set the first day of the week to Monday, the default is Sunday
* @param date date
* @return get monday
*/
public static Date getMonday(Date date) {
Calendar cal = Calendar.getInstance();
cal.setTime(date);
cal.setFirstDayOfWeek(Calendar.MONDAY);
cal.set(Calendar.DAY_OF_WEEK, Calendar.MONDAY);
return cal.getTime();
}
/**
* get sunday
*
* note: Set the first day of the week to Monday, the default is Sunday
* @param date date
* @return get sunday
*/
public static Date getSunday(Date date) {
Calendar cal = Calendar.getInstance();
cal.setTime(date);
cal.setFirstDayOfWeek(Calendar.MONDAY);
cal.set(Calendar.DAY_OF_WEEK, Calendar.SUNDAY);
return cal.getTime();
}
/**
* get first day of month
*
* @param date date
* @return first day of month
* */
public static Date getFirstDayOfMonth(Date date) {
Calendar cal = Calendar.getInstance();
cal.setTime(date);
cal.set(Calendar.DAY_OF_MONTH, 1);
return cal.getTime();
}
/**
* get some hour of day
*
* @param date date
* @param offsetHour hours
* @return some hour of day
* */
public static Date getSomeHourOfDay(Date date, int offsetHour) {
Calendar cal = Calendar.getInstance();
cal.setTime(date);
cal.set(Calendar.HOUR_OF_DAY, cal.get(Calendar.HOUR_OF_DAY) + offsetHour);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
return cal.getTime();
}
/**
* get last day of month
*
* @param date date
* @return get last day of month
*/
public static Date getLastDayOfMonth(Date date) {
Calendar cal = Calendar.getInstance();
cal.setTime(date);
cal.add(Calendar.MONTH, 1);
cal.set(Calendar.DAY_OF_MONTH, 1);
cal.add(Calendar.DAY_OF_MONTH, -1);
return cal.getTime();
}
/**
* return YYYY-MM-DD 00:00:00
*
* @param inputDay date
* @return start day
*/
public static Date getStartOfDay(Date inputDay) {
Calendar cal = Calendar.getInstance();
cal.setTime(inputDay);
cal.set(Calendar.HOUR_OF_DAY, 0);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
return cal.getTime();
}
/**
* return YYYY-MM-DD 23:59:59
*
* @param inputDay day
* @return end of day
*/
public static Date getEndOfDay(Date inputDay) {
Calendar cal = Calendar.getInstance();
cal.setTime(inputDay);
cal.set(Calendar.HOUR_OF_DAY, 23);
cal.set(Calendar.MINUTE, 59);
cal.set(Calendar.SECOND, 59);
cal.set(Calendar.MILLISECOND, 999);
return cal.getTime();
}
/**
* return YYYY-MM-DD 00:00:00
*
* @param inputDay day
* @return start of hour
*/
public static Date getStartOfHour(Date inputDay) {
Calendar cal = Calendar.getInstance();
cal.setTime(inputDay);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
return cal.getTime();
}
/**
* return YYYY-MM-DD 23:59:59
*
* @param inputDay day
* @return end of hour
*/
public static Date getEndOfHour(Date inputDay) {
Calendar cal = Calendar.getInstance();
cal.setTime(inputDay);
cal.set(Calendar.MINUTE, 59);
cal.set(Calendar.SECOND, 59);
cal.set(Calendar.MILLISECOND, 999);
return cal.getTime();
}
/**
* get current date
* @return current date
*/
public static Date getCurrentDate() {
return DateUtils.parse(DateUtils.getCurrentTime(),
Constants.YYYY_MM_DD_HH_MM_SS);
}
/**
* get date
* @param date date
* @param calendarField calendarField
* @param amount amount
* @return date
*/
public static Date add(final Date date, final int calendarField, final int amount) {
if (date == null) {
throw new IllegalArgumentException("The date must not be null");
}
final Calendar c = Calendar.getInstance();
c.setTime(date);
c.add(calendarField, amount);
return c.getTime();
}
}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.utils;
+
+import org.apache.dolphinscheduler.common.Constants;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Calendar;
+import java.util.Date;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * date utils
+ */
+public class DateUtils {
+
+ private static final Logger logger = LoggerFactory.getLogger(DateUtils.class);
+
+ /**
+ * date to local datetime
+ *
+ * @param date date
+ * @return local datetime
+ */
+ private static LocalDateTime date2LocalDateTime(Date date) {
+ return LocalDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault());
+ }
+
+ /**
+ * local datetime to date
+ *
+ * @param localDateTime local datetime
+ * @return date
+ */
+ private static Date localDateTime2Date(LocalDateTime localDateTime) {
+ Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant();
+ return Date.from(instant);
+ }
+
+ /**
+ * get current date str
+ *
+ * @return date string
+ */
+ public static String getCurrentTime() {
+ return getCurrentTime(Constants.YYYY_MM_DD_HH_MM_SS);
+ }
+
+ /**
+ * get the date string in the specified format of the current time
+ *
+ * @param format date format
+ * @return date string
+ */
+ public static String getCurrentTime(String format) {
+ return LocalDateTime.now().format(DateTimeFormatter.ofPattern(format));
+ }
+
+ /**
+ * get the formatted date string
+ *
+ * @param date date
+ * @param format e.g. yyyy-MM-dd HH:mm:ss
+ * @return date string
+ */
+ public static String format(Date date, String format) {
+ return format(date2LocalDateTime(date), format);
+ }
+
+ /**
+ * get the formatted date string
+ *
+ * @param localDateTime local data time
+ * @param format yyyy-MM-dd HH:mm:ss
+ * @return date string
+ */
+ public static String format(LocalDateTime localDateTime, String format) {
+ return localDateTime.format(DateTimeFormatter.ofPattern(format));
+ }
+
+ /**
+ * convert time to yyyy-MM-dd HH:mm:ss format
+ *
+ * @param date date
+ * @return date string
+ */
+ public static String dateToString(Date date) {
+ return format(date, Constants.YYYY_MM_DD_HH_MM_SS);
+ }
+
+ /**
+ * convert string to date and time
+ *
+ * @param date date
+ * @param format format
+ * @return date
+ */
+ public static Date parse(String date, String format) {
+ try {
+ LocalDateTime ldt = LocalDateTime.parse(date, DateTimeFormatter.ofPattern(format));
+ return localDateTime2Date(ldt);
+ } catch (Exception e) {
+ logger.error("error while parse date:" + date, e);
+ }
+ return null;
+ }
+
+ /**
+ * convert date str to yyyy-MM-dd HH:mm:ss format
+ *
+ * @param str date string
+ * @return yyyy-MM-dd HH:mm:ss format
+ */
+ public static Date stringToDate(String str) {
+ return parse(str, Constants.YYYY_MM_DD_HH_MM_SS);
+ }
+
+ /**
+ * get seconds between two dates
+ *
+ * @param d1 date1
+ * @param d2 date2
+ * @return differ seconds
+ */
+ public static long differSec(Date d1, Date d2) {
+ if (d1 == null || d2 == null) {
+ return 0;
+ }
+ return (long) Math.ceil(differMs(d1, d2) / 1000.0);
+ }
+
+ /**
+ * get ms between two dates
+ *
+ * @param d1 date1
+ * @param d2 date2
+ * @return differ ms
+ */
+ public static long differMs(Date d1, Date d2) {
+ return Math.abs(d1.getTime() - d2.getTime());
+ }
+
+ /**
+ * get hours between two dates
+ *
+ * @param d1 date1
+ * @param d2 date2
+ * @return differ hours
+ */
+ public static long diffHours(Date d1, Date d2) {
+ return (long) Math.ceil(diffMin(d1, d2) / 60.0);
+ }
+
+ /**
+ * get minutes between two dates
+ *
+ * @param d1 date1
+ * @param d2 date2
+ * @return differ minutes
+ */
+ public static long diffMin(Date d1, Date d2) {
+ return (long) Math.ceil(differSec(d1, d2) / 60.0);
+ }
+
+ /**
+ * get the date of the specified date in the days before and after
+ *
+ * @param date date
+ * @param day day
+ * @return the date of the specified date in the days before and after
+ */
+ public static Date getSomeDay(Date date, int day) {
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(date);
+ calendar.add(Calendar.DATE, day);
+ return calendar.getTime();
+ }
+
+ /**
+ * get the hour of day.
+ *
+ * @param date date
+ * @return hour of day
+ */
+ public static int getHourIndex(Date date) {
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(date);
+ return calendar.get(Calendar.HOUR_OF_DAY);
+ }
+
+ /**
+ * compare two dates
+ *
+ * @param future future date
+ * @param old old date
+ * @return true if future time greater than old time
+ */
+ public static boolean compare(Date future, Date old) {
+ return future.getTime() > old.getTime();
+ }
+
+ /**
+ * convert schedule string to date
+ *
+ * @param schedule schedule
+ * @return convert schedule string to date
+ */
+ public static Date getScheduleDate(String schedule) {
+ return stringToDate(schedule);
+ }
+
+ /**
+ * format time to readable
+ *
+ * @param ms ms
+ * @return format time
+ */
+ public static String format2Readable(long ms) {
+
+ long days = ms / (1000 * 60 * 60 * 24);
+ long hours = (ms % (1000 * 60 * 60 * 24)) / (1000 * 60 * 60);
+ long minutes = (ms % (1000 * 60 * 60)) / (1000 * 60);
+ long seconds = (ms % (1000 * 60)) / 1000;
+
+ return String.format("%02d %02d:%02d:%02d", days, hours, minutes, seconds);
+
+ }
+
+ /**
+ * get monday
+ *
+ * note: Set the first day of the week to Monday, the default is Sunday
+ *
+ * @param date date
+ * @return get monday
+ */
+ public static Date getMonday(Date date) {
+ Calendar cal = Calendar.getInstance();
+
+ cal.setTime(date);
+
+ cal.setFirstDayOfWeek(Calendar.MONDAY);
+ cal.set(Calendar.DAY_OF_WEEK, Calendar.MONDAY);
+
+ return cal.getTime();
+ }
+
+ /**
+ * get sunday
+ *
+ * note: Set the first day of the week to Monday, the default is Sunday
+ *
+ * @param date date
+ * @return get sunday
+ */
+ public static Date getSunday(Date date) {
+ Calendar cal = Calendar.getInstance();
+ cal.setTime(date);
+
+ cal.setFirstDayOfWeek(Calendar.MONDAY);
+ cal.set(Calendar.DAY_OF_WEEK, Calendar.SUNDAY);
+
+ return cal.getTime();
+ }
+
+ /**
+ * get first day of month
+ *
+ * @param date date
+ * @return first day of month
+ */
+ public static Date getFirstDayOfMonth(Date date) {
+ Calendar cal = Calendar.getInstance();
+
+ cal.setTime(date);
+ cal.set(Calendar.DAY_OF_MONTH, 1);
+
+ return cal.getTime();
+ }
+
+ /**
+ * get some hour of day
+ *
+ * @param date date
+ * @param offsetHour hours
+ * @return some hour of day
+ */
+ public static Date getSomeHourOfDay(Date date, int offsetHour) {
+ Calendar cal = Calendar.getInstance();
+
+ cal.setTime(date);
+ cal.set(Calendar.HOUR_OF_DAY, cal.get(Calendar.HOUR_OF_DAY) + offsetHour);
+ cal.set(Calendar.MINUTE, 0);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+
+ return cal.getTime();
+ }
+
+ /**
+ * get last day of month
+ *
+ * @param date date
+ * @return get last day of month
+ */
+ public static Date getLastDayOfMonth(Date date) {
+ Calendar cal = Calendar.getInstance();
+
+ cal.setTime(date);
+
+ cal.add(Calendar.MONTH, 1);
+ cal.set(Calendar.DAY_OF_MONTH, 1);
+ cal.add(Calendar.DAY_OF_MONTH, -1);
+
+ return cal.getTime();
+ }
+
+ /**
+ * return YYYY-MM-DD 00:00:00
+ *
+ * @param inputDay date
+ * @return start day
+ */
+ public static Date getStartOfDay(Date inputDay) {
+ Calendar cal = Calendar.getInstance();
+ cal.setTime(inputDay);
+ cal.set(Calendar.HOUR_OF_DAY, 0);
+ cal.set(Calendar.MINUTE, 0);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+ return cal.getTime();
+ }
+
+ /**
+ * return YYYY-MM-DD 23:59:59
+ *
+ * @param inputDay day
+ * @return end of day
+ */
+ public static Date getEndOfDay(Date inputDay) {
+ Calendar cal = Calendar.getInstance();
+ cal.setTime(inputDay);
+ cal.set(Calendar.HOUR_OF_DAY, 23);
+ cal.set(Calendar.MINUTE, 59);
+ cal.set(Calendar.SECOND, 59);
+ cal.set(Calendar.MILLISECOND, 999);
+ return cal.getTime();
+ }
+
+ /**
+ * return YYYY-MM-DD 00:00:00
+ *
+ * @param inputDay day
+ * @return start of hour
+ */
+ public static Date getStartOfHour(Date inputDay) {
+ Calendar cal = Calendar.getInstance();
+ cal.setTime(inputDay);
+ cal.set(Calendar.MINUTE, 0);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+ return cal.getTime();
+ }
+
+ /**
+ * return YYYY-MM-DD 23:59:59
+ *
+ * @param inputDay day
+ * @return end of hour
+ */
+ public static Date getEndOfHour(Date inputDay) {
+ Calendar cal = Calendar.getInstance();
+ cal.setTime(inputDay);
+ cal.set(Calendar.MINUTE, 59);
+ cal.set(Calendar.SECOND, 59);
+ cal.set(Calendar.MILLISECOND, 999);
+ return cal.getTime();
+ }
+
+ /**
+ * get current date
+ *
+ * @return current date
+ */
+ public static Date getCurrentDate() {
+ return DateUtils.parse(DateUtils.getCurrentTime(),
+ Constants.YYYY_MM_DD_HH_MM_SS);
+ }
+
+ /**
+ * get date
+ *
+ * @param date date
+ * @param calendarField calendarField
+ * @param amount amount
+ * @return date
+ */
+ public static Date add(final Date date, final int calendarField, final int amount) {
+ if (date == null) {
+ throw new IllegalArgumentException("The date must not be null");
+ }
+ final Calendar c = Calendar.getInstance();
+ c.setTime(date);
+ c.add(calendarField, amount);
+ return c.getTime();
+ }
+
+ /**
+ * starting from the current time, get how many seconds are left before the target time.
+ * targetTime = baseTime + intervalSeconds
+ *
+ * @param baseTime base time
+ * @param intervalSeconds a period of time
+ * @return the number of seconds
+ */
+ public static long getRemainTime(Date baseTime, long intervalSeconds) {
+ if (baseTime == null) {
+ return 0;
+ }
+ long usedTime = (System.currentTimeMillis() - baseTime.getTime()) / 1000;
+ return intervalSeconds - usedTime;
+ }
+}
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/enums/ExecutionStatusTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/enums/ExecutionStatusTest.java
new file mode 100644
index 000000000000..6d4be78aefae
--- /dev/null
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/enums/ExecutionStatusTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.enums;
+
+import junit.framework.TestCase;
+
+/**
+ * execution status test.
+ */
+public class ExecutionStatusTest extends TestCase {
+
+ public void testTypeIsRunning() {
+ assertTrue(ExecutionStatus.RUNNING_EXECUTION.typeIsRunning());
+ assertTrue(ExecutionStatus.WAITTING_DEPEND.typeIsRunning());
+ assertTrue(ExecutionStatus.DELAY_EXECUTION.typeIsRunning());
+ }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index b82da62b0229..9688200b2c3b 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -42,7 +42,7 @@ public class TaskInstance implements Serializable {
/**
* id
*/
- @TableId(value="id", type=IdType.AUTO)
+ @TableId(value = "id", type = IdType.AUTO)
private int id;
/**
@@ -51,7 +51,6 @@ public class TaskInstance implements Serializable {
private String name;
-
/**
* task type
*/
@@ -83,22 +82,28 @@ public class TaskInstance implements Serializable {
*/
private ExecutionStatus state;
+ /**
+ * task first submit time.
+ */
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
+ private Date firstSubmitTime;
+
/**
* task submit time
*/
- @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date submitTime;
/**
* task start time
*/
- @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date startTime;
/**
* task end time
*/
- @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date endTime;
/**
@@ -214,11 +219,14 @@ public class TaskInstance implements Serializable {
@TableField(exist = false)
- private Map resources;
-
+ private Map resources;
+ /**
+ * delay execution time.
+ */
+ private int delayTime;
- public void init(String host,Date startTime,String executePath){
+ public void init(String host, Date startTime, String executePath) {
this.host = host;
this.startTime = startTime;
this.executePath = executePath;
@@ -297,6 +305,14 @@ public void setState(ExecutionStatus state) {
this.state = state;
}
+ public Date getFirstSubmitTime() {
+ return firstSubmitTime;
+ }
+
+ public void setFirstSubmitTime(Date firstSubmitTime) {
+ this.firstSubmitTime = firstSubmitTime;
+ }
+
public Date getSubmitTime() {
return submitTime;
}
@@ -361,7 +377,7 @@ public void setRetryTimes(int retryTimes) {
this.retryTimes = retryTimes;
}
- public Boolean isTaskSuccess(){
+ public Boolean isTaskSuccess() {
return this.state == ExecutionStatus.SUCCESS;
}
@@ -400,6 +416,7 @@ public Flag getFlag() {
public void setFlag(Flag flag) {
this.flag = flag;
}
+
public String getProcessInstanceName() {
return processInstanceName;
}
@@ -464,33 +481,33 @@ public void setResources(Map resources) {
this.resources = resources;
}
- public boolean isSubProcess(){
+ public boolean isSubProcess() {
return TaskType.SUB_PROCESS.equals(TaskType.valueOf(this.taskType));
}
- public boolean isDependTask(){
+ public boolean isDependTask() {
return TaskType.DEPENDENT.equals(TaskType.valueOf(this.taskType));
}
- public boolean isConditionsTask(){
+ public boolean isConditionsTask() {
return TaskType.CONDITIONS.equals(TaskType.valueOf(this.taskType));
}
-
/**
* determine if you can try again
+ *
* @return can try result
*/
public boolean taskCanRetry() {
- if(this.isSubProcess()){
+ if (this.isSubProcess()) {
return false;
}
- if(this.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE){
+ if (this.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
return true;
- }else {
+ } else {
return (this.getState().typeIsFailure()
- && this.getRetryTimes() < this.getMaxRetryTimes());
+ && this.getRetryTimes() < this.getMaxRetryTimes());
}
}
@@ -526,40 +543,50 @@ public void setDependentResult(String dependentResult) {
this.dependentResult = dependentResult;
}
+ public int getDelayTime() {
+ return delayTime;
+ }
+
+ public void setDelayTime(int delayTime) {
+ this.delayTime = delayTime;
+ }
+
@Override
public String toString() {
- return "TaskInstance{" +
- "id=" + id +
- ", name='" + name + '\'' +
- ", taskType='" + taskType + '\'' +
- ", processDefinitionId=" + processDefinitionId +
- ", processInstanceId=" + processInstanceId +
- ", processInstanceName='" + processInstanceName + '\'' +
- ", taskJson='" + taskJson + '\'' +
- ", state=" + state +
- ", submitTime=" + submitTime +
- ", startTime=" + startTime +
- ", endTime=" + endTime +
- ", host='" + host + '\'' +
- ", executePath='" + executePath + '\'' +
- ", logPath='" + logPath + '\'' +
- ", retryTimes=" + retryTimes +
- ", alertFlag=" + alertFlag +
- ", processInstance=" + processInstance +
- ", processDefine=" + processDefine +
- ", pid=" + pid +
- ", appLink='" + appLink + '\'' +
- ", flag=" + flag +
- ", dependency='" + dependency + '\'' +
- ", duration=" + duration +
- ", maxRetryTimes=" + maxRetryTimes +
- ", retryInterval=" + retryInterval +
- ", taskInstancePriority=" + taskInstancePriority +
- ", processInstancePriority=" + processInstancePriority +
- ", dependentResult='" + dependentResult + '\'' +
- ", workerGroup='" + workerGroup + '\'' +
- ", executorId=" + executorId +
- ", executorName='" + executorName + '\'' +
- '}';
+ return "TaskInstance{"
+ + "id=" + id
+ + ", name='" + name + '\''
+ + ", taskType='" + taskType + '\''
+ + ", processDefinitionId=" + processDefinitionId
+ + ", processInstanceId=" + processInstanceId
+ + ", processInstanceName='" + processInstanceName + '\''
+ + ", taskJson='" + taskJson + '\''
+ + ", state=" + state
+ + ", firstSubmitTime=" + firstSubmitTime
+ + ", submitTime=" + submitTime
+ + ", startTime=" + startTime
+ + ", endTime=" + endTime
+ + ", host='" + host + '\''
+ + ", executePath='" + executePath + '\''
+ + ", logPath='" + logPath + '\''
+ + ", retryTimes=" + retryTimes
+ + ", alertFlag=" + alertFlag
+ + ", processInstance=" + processInstance
+ + ", processDefine=" + processDefine
+ + ", pid=" + pid
+ + ", appLink='" + appLink + '\''
+ + ", flag=" + flag
+ + ", dependency='" + dependency + '\''
+ + ", duration=" + duration
+ + ", maxRetryTimes=" + maxRetryTimes
+ + ", retryInterval=" + retryInterval
+ + ", taskInstancePriority=" + taskInstancePriority
+ + ", processInstancePriority=" + processInstancePriority
+ + ", dependentResult='" + dependentResult + '\''
+ + ", workerGroup='" + workerGroup + '\''
+ + ", executorId=" + executorId
+ + ", executorName='" + executorName + '\''
+ + ", delayTime=" + delayTime
+ + '}';
}
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
index a69022214de4..179ae1bef8a5 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
@@ -75,6 +75,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List