From 21b807c1aa285ebb0b4f40395c408a18403b8b3d Mon Sep 17 00:00:00 2001
From: yh263208
Date: Wed, 22 May 2024 14:00:01 +0800
Subject: [PATCH 01/17] format the code
---
...ipleDatabaseChangeRuntimeFlowableTask.java | 36 ++++++-------------
1 file changed, 11 insertions(+), 25 deletions(-)
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MultipleDatabaseChangeRuntimeFlowableTask.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MultipleDatabaseChangeRuntimeFlowableTask.java
index dd7c6c1bf4..fc649b7601 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MultipleDatabaseChangeRuntimeFlowableTask.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MultipleDatabaseChangeRuntimeFlowableTask.java
@@ -51,7 +51,6 @@
import com.oceanbase.odc.service.flow.task.model.MultipleDatabaseChangeParameters;
import com.oceanbase.odc.service.flow.task.model.MultipleDatabaseChangeTaskResult;
import com.oceanbase.odc.service.flow.util.FlowTaskUtil;
-import com.oceanbase.odc.service.iam.auth.AuthenticationFacade;
import com.oceanbase.odc.service.task.TaskService;
import lombok.extern.slf4j.Slf4j;
@@ -67,25 +66,18 @@ public class MultipleDatabaseChangeRuntimeFlowableTask extends BaseODCFlowTaskDe
private volatile boolean isFailure = false;
private Integer batchId;
-
private Integer batchSum;
- List> orderedDatabaseIds;
-
+ private List> orderedDatabaseIds;
private FlowTaskExecutionStrategy flowTaskExecutionStrategy;
-
private MultipleDatabaseChangeParameters multipleDatabaseChangeParameters;
-
@Autowired
private FlowInstanceService flowInstanceService;
-
- @Autowired
- private ServiceTaskInstanceRepository serviceTaskInstanceRepository;
- @Autowired
- private AuthenticationFacade authenticationFacade;
@Autowired
private DatabaseService databaseService;
@Autowired
private FlowableAdaptor flowableAdaptor;
+ @Autowired
+ private ServiceTaskInstanceRepository serviceTaskInstanceRepository;
@Override
public boolean cancel(boolean mayInterruptIfRunning, Long taskId, TaskService taskService) {
@@ -131,7 +123,7 @@ protected Void start(Long taskId, TaskService taskService, DelegateExecution exe
} else {
this.batchId = value;
}
- log.info("multiple database task start, taskId={}, batchId={}", taskId,
+ log.info("Multiple database task start, taskId={}, batchId={}", taskId,
this.batchId + 1);
multipleDatabaseChangeParameters.setBatchId(this.batchId + 1);
detail.setParametersJson(JsonUtils.toJson(multipleDatabaseChangeParameters));
@@ -201,8 +193,6 @@ protected Void start(Long taskId, TaskService taskService, DelegateExecution exe
}
}
-
-
@Override
protected void onFailure(Long taskId, TaskService taskService) {
try {
@@ -223,12 +213,9 @@ protected void onSuccessful(Long taskId, TaskService taskService) {
MultipleDatabaseChangeTraceContextHolder.trace(taskId);
log.info("multiple database task succeed, taskId={}, batchId={}", taskId, this.batchId + 1);
if (this.batchId == batchSum - 1) {
- List list = flowInstanceService.getFlowInstanceByParentId(
- getFlowInstanceId());
- boolean allSucceeded = list.stream()
- .allMatch(
- flowInstanceEntity -> FlowStatus.EXECUTION_SUCCEEDED == flowInstanceEntity.getStatus());
- if (allSucceeded) {
+ List list = flowInstanceService
+ .getFlowInstanceByParentId(getFlowInstanceId());
+ if (list.stream().allMatch(e -> FlowStatus.EXECUTION_SUCCEEDED == e.getStatus())) {
updateFlowInstanceStatus(FlowStatus.EXECUTION_SUCCEEDED);
} else {
updateFlowInstanceStatus(FlowStatus.EXECUTION_FAILED);
@@ -238,11 +225,11 @@ protected void onSuccessful(Long taskId, TaskService taskService) {
taskService.updateProgress(taskId, (this.batchId + 1) * 100D / this.batchSum);
}
super.onSuccessful(taskId, taskService);
- Optional taskInstanceByActivityId = flowableAdaptor.getTaskInstanceByActivityId(
- getActivityId(), getFlowInstanceId());
+ Optional taskInstanceByActivityId = flowableAdaptor
+ .getTaskInstanceByActivityId(getActivityId(), getFlowInstanceId());
if (!this.isSuccessful) {
- serviceTaskInstanceRepository.updateStatusById(taskInstanceByActivityId.get().getId(),
- FlowNodeStatus.FAILED);
+ serviceTaskInstanceRepository.updateStatusById(
+ taskInstanceByActivityId.get().getId(), FlowNodeStatus.FAILED);
}
} catch (Exception e) {
log.warn("multiple database task failed, taskId={}, batchId={}", taskId,
@@ -295,7 +282,6 @@ private MultipleDatabaseChangeTaskResult generateResult() {
databaseChangingRecords.add(databaseChangingRecord);
}
result.setDatabaseChangingRecordList(databaseChangingRecords);
-
return result;
}
From e0a448f09390ae4d785314731f5dd1a6e9c9be92 Mon Sep 17 00:00:00 2001
From: yh263208
Date: Wed, 22 May 2024 14:01:33 +0800
Subject: [PATCH 02/17] opt log printing
---
.../MultipleDatabaseChangeRuntimeFlowableTask.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MultipleDatabaseChangeRuntimeFlowableTask.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MultipleDatabaseChangeRuntimeFlowableTask.java
index fc649b7601..dee0b62789 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MultipleDatabaseChangeRuntimeFlowableTask.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MultipleDatabaseChangeRuntimeFlowableTask.java
@@ -183,7 +183,7 @@ protected Void start(Long taskId, TaskService taskService, DelegateExecution exe
}
return null;
} catch (Exception e) {
- log.warn("multiple database task failed, taskId={}, batchId={}", taskId,
+ log.warn("Multiple database task failed, taskId={}, batchId={}", taskId,
this.batchId == null ? null : this.batchId + 1, e);
this.isFailure = true;
this.isSuccessful = false;
@@ -197,7 +197,7 @@ protected Void start(Long taskId, TaskService taskService, DelegateExecution exe
protected void onFailure(Long taskId, TaskService taskService) {
try {
MultipleDatabaseChangeTraceContextHolder.trace(taskId);
- log.warn("multiple database task failed, taskId={}, batchId={}", taskId,
+ log.warn("Multiple database task failed, taskId={}, batchId={}", taskId,
this.batchId == null ? null : this.batchId + 1);
updateFlowInstanceStatus(FlowStatus.EXECUTION_FAILED);
taskService.fail(taskId, 100, generateResult());
@@ -211,7 +211,7 @@ protected void onFailure(Long taskId, TaskService taskService) {
protected void onSuccessful(Long taskId, TaskService taskService) {
try {
MultipleDatabaseChangeTraceContextHolder.trace(taskId);
- log.info("multiple database task succeed, taskId={}, batchId={}", taskId, this.batchId + 1);
+ log.info("Multiple database task succeed, taskId={}, batchId={}", taskId, this.batchId + 1);
if (this.batchId == batchSum - 1) {
List list = flowInstanceService
.getFlowInstanceByParentId(getFlowInstanceId());
@@ -232,7 +232,7 @@ protected void onSuccessful(Long taskId, TaskService taskService) {
taskInstanceByActivityId.get().getId(), FlowNodeStatus.FAILED);
}
} catch (Exception e) {
- log.warn("multiple database task failed, taskId={}, batchId={}", taskId,
+ log.warn("Multiple database task failed, taskId={}, batchId={}", taskId,
this.batchId == null ? null : this.batchId + 1, e);
} finally {
MultipleDatabaseChangeTraceContextHolder.clear();
@@ -244,7 +244,7 @@ protected void onTimeout(Long taskId, TaskService taskService) {
try {
MultipleDatabaseChangeTraceContextHolder.trace(taskId);
taskService.fail(taskId, 100, generateResult());
- log.warn("multiple database task timeout, taskId={}, batchId={}", taskId,
+ log.warn("Multiple database task timeout, taskId={}, batchId={}", taskId,
this.batchId == null ? null : this.batchId + 1);
} finally {
MultipleDatabaseChangeTraceContextHolder.clear();
From ae391fb891e46cc272204cc46e68582252745cc8 Mon Sep 17 00:00:00 2001
From: yh263208
Date: Wed, 22 May 2024 15:22:47 +0800
Subject: [PATCH 03/17] add heart beat time
---
.../odc/metadb/task/TaskRepositoryTest.java | 49 +++++++++++++++++++
.../V_4_3_0_6__add_flow_task_heartbeat.sql | 1 +
.../oceanbase/odc/metadb/task/TaskEntity.java | 3 ++
.../odc/metadb/task/TaskRepository.java | 5 ++
.../service/flow/FlowTaskInstanceService.java | 27 +++++-----
.../flow/task/BaseODCFlowTaskDelegate.java | 5 ++
.../odc/service/task/TaskService.java | 4 ++
7 files changed, 81 insertions(+), 13 deletions(-)
create mode 100644 server/integration-test/src/test/java/com/oceanbase/odc/metadb/task/TaskRepositoryTest.java
create mode 100644 server/odc-migrate/src/main/resources/migrate/common/V_4_3_0_6__add_flow_task_heartbeat.sql
diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/metadb/task/TaskRepositoryTest.java b/server/integration-test/src/test/java/com/oceanbase/odc/metadb/task/TaskRepositoryTest.java
new file mode 100644
index 0000000000..41486cf595
--- /dev/null
+++ b/server/integration-test/src/test/java/com/oceanbase/odc/metadb/task/TaskRepositoryTest.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2023 OceanBase.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.oceanbase.odc.metadb.task;
+
+import java.util.Optional;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import com.oceanbase.odc.ServiceTestEnv;
+import com.oceanbase.odc.test.tool.TestRandom;
+
+public class TaskRepositoryTest extends ServiceTestEnv {
+
+ @Autowired
+ private TaskRepository taskRepository;
+
+ @Before
+ public void setUp() {
+ this.taskRepository.deleteAll();
+ }
+
+ @Test
+ public void updateLastHeartbeatTimeById_updateSucceed_returnNotNull() {
+ TaskEntity taskEntity = TestRandom.nextObject(TaskEntity.class);
+ taskEntity.setId(null);
+ taskEntity.setLastHeartbeatTime(null);
+ taskEntity = this.taskRepository.save(taskEntity);
+ this.taskRepository.updateLastHeartbeatTimeById(taskEntity.getId());
+ Optional optional = this.taskRepository.findById(taskEntity.getId());
+ Assert.assertNotNull(optional.get().getLastHeartbeatTime());
+ }
+
+}
diff --git a/server/odc-migrate/src/main/resources/migrate/common/V_4_3_0_6__add_flow_task_heartbeat.sql b/server/odc-migrate/src/main/resources/migrate/common/V_4_3_0_6__add_flow_task_heartbeat.sql
new file mode 100644
index 0000000000..326665da3e
--- /dev/null
+++ b/server/odc-migrate/src/main/resources/migrate/common/V_4_3_0_6__add_flow_task_heartbeat.sql
@@ -0,0 +1 @@
+alter table `task_task` add column `last_heartbeat_time` datetime default null comment 'Last heart beat time';
\ No newline at end of file
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskEntity.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskEntity.java
index 367ba2bbb6..58ab56113e 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskEntity.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskEntity.java
@@ -166,4 +166,7 @@ public class TaskEntity {
@Column(name = "job_id")
private Long jobId;
+ @Column(name = "last_heartbeat_time")
+ private Date lastHeartbeatTime;
+
}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskRepository.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskRepository.java
index 424a13c929..34ef3163b1 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskRepository.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskRepository.java
@@ -43,6 +43,11 @@ public interface TaskRepository extends JpaRepository, JpaSpec
@Modifying
int update(@Param("param") TaskEntity entity);
+ @Transactional
+ @Query(value = "update task_task set last_heartbeat_time=now() where id=:id", nativeQuery = true)
+ @Modifying
+ int updateLastHeartbeatTimeById(@Param("id") Long id);
+
@Transactional
@Query("update TaskEntity set parametersJson=:#{#param.parametersJson} where id=:#{#param.id}")
@Modifying
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java
index 2d92f725fe..2af2bb8ea0 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java
@@ -778,19 +778,20 @@ private URL generatePresignedUrl(String objectName, Long expirationSeconds) thro
}
private void setDownloadUrlsIfNecessary(Long taskId, List extends FlowTaskResult> results) {
- if (cloudObjectStorageService.supported()) {
- for (FlowTaskResult result : results) {
- TaskDownloadUrls urls = databaseChangeOssUrlCache.get(taskId);
- if (result instanceof AbstractFlowTaskResult) {
- ((AbstractFlowTaskResult) result)
- .setFullLogDownloadUrl(urls.getLogDownloadUrl());
- }
- if (result instanceof DatabaseChangeResult) {
- ((DatabaseChangeResult) result).setZipFileDownloadUrl(urls.getDatabaseChangeZipFileDownloadUrl());
- if (Objects.nonNull(((DatabaseChangeResult) result).getRollbackPlanResult())) {
- ((DatabaseChangeResult) result).getRollbackPlanResult()
- .setResultFileDownloadUrl(urls.getRollBackPlanResultFileDownloadUrl());
- }
+ if (!cloudObjectStorageService.supported()) {
+ return;
+ }
+ for (FlowTaskResult result : results) {
+ TaskDownloadUrls urls = databaseChangeOssUrlCache.get(taskId);
+ if (result instanceof AbstractFlowTaskResult) {
+ ((AbstractFlowTaskResult) result)
+ .setFullLogDownloadUrl(urls.getLogDownloadUrl());
+ }
+ if (result instanceof DatabaseChangeResult) {
+ ((DatabaseChangeResult) result).setZipFileDownloadUrl(urls.getDatabaseChangeZipFileDownloadUrl());
+ if (Objects.nonNull(((DatabaseChangeResult) result).getRollbackPlanResult())) {
+ ((DatabaseChangeResult) result).getRollbackPlanResult()
+ .setResultFileDownloadUrl(urls.getRollBackPlanResultFileDownloadUrl());
}
}
}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java
index 46a3354787..a50a604ace 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java
@@ -126,6 +126,11 @@ private void initMonitorExecutor() {
scheduleExecutor.scheduleAtFixedRate(() -> {
try {
if (taskLatch.getCount() > 0) {
+ try {
+ this.taskService.updateHeartbeatTime(taskId);
+ } catch (Exception e) {
+ log.warn("Failed to update heartbeat time, taskId={}", taskId, e);
+ }
onProgressUpdate(taskId, taskService);
}
} catch (Exception e) {
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/TaskService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/TaskService.java
index 55529608ba..499cc0ae8b 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/TaskService.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/TaskService.java
@@ -344,6 +344,10 @@ public void updateJobId(Long id, Long jobId) {
taskRepository.updateJobId(id, jobId);
}
+ @Transactional(rollbackFor = Exception.class)
+ public void updateHeartbeatTime(Long id) {
+ taskRepository.updateLastHeartbeatTimeById(id);
+ }
private TaskEntity nullSafeFindById(Long id) {
return taskRepository.findById(id)
From 22a1eabcea2d60860bd2930f3be5957aada7f872 Mon Sep 17 00:00:00 2001
From: yh263208
Date: Wed, 22 May 2024 16:41:15 +0800
Subject: [PATCH 04/17] add heartbeat scan task
---
.../ServiceTaskInstanceRepositoryTest.java | 12 ++
.../odc/metadb/task/TaskRepositoryTest.java | 28 +++++
.../flow/ServiceTaskInstanceRepository.java | 2 +
.../odc/metadb/task/TaskRepository.java | 3 +
.../odc/service/flow/FlowSchedules.java | 108 ++++++++++++++++++
.../flow/task/model/FlowTaskProperties.java | 4 +
6 files changed, 157 insertions(+)
create mode 100644 server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java
diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepositoryTest.java b/server/integration-test/src/test/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepositoryTest.java
index 6b87761a70..16683943e2 100644
--- a/server/integration-test/src/test/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepositoryTest.java
+++ b/server/integration-test/src/test/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepositoryTest.java
@@ -17,9 +17,11 @@
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Random;
+import java.util.Set;
import java.util.UUID;
import org.junit.After;
@@ -65,6 +67,16 @@ public void clearAll() {
repository.deleteAll();
}
+ @Test
+ public void findByTargetTaskIdIn_entityExists_returnNotEmpty() {
+ ServiceTaskInstanceEntity entity = createEntity();
+ entity = this.repository.save(entity);
+ Set ids = new HashSet<>();
+ ids.add(entity.getTargetTaskId());
+ List actual = this.repository.findByTargetTaskIdIn(ids);
+ Assert.assertEquals(Collections.singletonList(entity), actual);
+ }
+
@Test
public void testSaveSerivceTaskInstanceEntity() {
ServiceTaskInstanceEntity entity = createEntity();
diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/metadb/task/TaskRepositoryTest.java b/server/integration-test/src/test/java/com/oceanbase/odc/metadb/task/TaskRepositoryTest.java
index 41486cf595..2ccaedb46f 100644
--- a/server/integration-test/src/test/java/com/oceanbase/odc/metadb/task/TaskRepositoryTest.java
+++ b/server/integration-test/src/test/java/com/oceanbase/odc/metadb/task/TaskRepositoryTest.java
@@ -15,6 +15,8 @@
*/
package com.oceanbase.odc.metadb.task;
+import java.util.Date;
+import java.util.List;
import java.util.Optional;
import org.junit.Assert;
@@ -46,4 +48,30 @@ public void updateLastHeartbeatTimeById_updateSucceed_returnNotNull() {
Assert.assertNotNull(optional.get().getLastHeartbeatTime());
}
+ @Test
+ public void findAllByLastHeartbeatTimeBefore_findBeforeNow_returnNotNull() {
+ TaskEntity taskEntity = TestRandom.nextObject(TaskEntity.class);
+ taskEntity.setId(null);
+ taskEntity.setLastHeartbeatTime(null);
+ taskEntity = this.taskRepository.save(taskEntity);
+ this.taskRepository.updateLastHeartbeatTimeById(taskEntity.getId());
+ Optional optional = this.taskRepository.findById(taskEntity.getId());
+ Date heartbeatTime = new Date(optional.get().getLastHeartbeatTime().getTime() + 1);
+ List actual = this.taskRepository.findAllByLastHeartbeatTimeBefore(heartbeatTime);
+ Assert.assertFalse(actual.isEmpty());
+ }
+
+ @Test
+ public void findAllByLastHeartbeatTimeBefore_findBeforeLastHeartbeatTime_returnEmpty() {
+ TaskEntity taskEntity = TestRandom.nextObject(TaskEntity.class);
+ taskEntity.setId(null);
+ taskEntity.setLastHeartbeatTime(null);
+ taskEntity = this.taskRepository.save(taskEntity);
+ this.taskRepository.updateLastHeartbeatTimeById(taskEntity.getId());
+ Optional optional = this.taskRepository.findById(taskEntity.getId());
+ List actual = this.taskRepository
+ .findAllByLastHeartbeatTimeBefore(optional.get().getLastHeartbeatTime());
+ Assert.assertTrue(actual.isEmpty());
+ }
+
}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepository.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepository.java
index a76673aa42..ea614b936b 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepository.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepository.java
@@ -62,6 +62,8 @@ public interface ServiceTaskInstanceRepository extends OdcJpaRepository findByFlowInstanceIdIn(Set flowInstanceIds);
+ List findByTargetTaskIdIn(Set flowInstanceIds);
+
@Query(value = "select na.* from flow_instance_node_task as na inner join flow_instance_node as n on na.id=n.instance_id "
+ "where n.instance_type=:#{#instanceType.name()} and n.activity_id=:activityId and n.flow_instance_id=:flowInstanceId",
nativeQuery = true)
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskRepository.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskRepository.java
index 34ef3163b1..d040e0170a 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskRepository.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskRepository.java
@@ -15,6 +15,7 @@
*/
package com.oceanbase.odc.metadb.task;
+import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@@ -48,6 +49,8 @@ public interface TaskRepository extends JpaRepository, JpaSpec
@Modifying
int updateLastHeartbeatTimeById(@Param("id") Long id);
+ List findAllByLastHeartbeatTimeBefore(Date lastHeartbeatTime);
+
@Transactional
@Query("update TaskEntity set parametersJson=:#{#param.parametersJson} where id=:#{#param.id}")
@Modifying
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java
new file mode 100644
index 0000000000..7388f4b282
--- /dev/null
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright (c) 2023 OceanBase.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.oceanbase.odc.service.flow;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import com.oceanbase.odc.core.shared.constant.FlowStatus;
+import com.oceanbase.odc.metadb.flow.FlowInstanceRepository;
+import com.oceanbase.odc.metadb.flow.ServiceTaskInstanceEntity;
+import com.oceanbase.odc.metadb.flow.ServiceTaskInstanceRepository;
+import com.oceanbase.odc.metadb.task.TaskEntity;
+import com.oceanbase.odc.metadb.task.TaskRepository;
+import com.oceanbase.odc.service.flow.model.FlowNodeStatus;
+import com.oceanbase.odc.service.flow.task.model.FlowTaskProperties;
+import com.oceanbase.odc.service.flow.task.model.RuntimeTaskConstants;
+import com.oceanbase.tools.dbbrowser.util.DBSchemaAccessorUtil;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * {@link FlowSchedules}
+ *
+ * @author yh263208
+ * @date 2024-05-22 15:33
+ * @since ODC-release_4.3.0
+ */
+@Slf4j
+@Component
+public class FlowSchedules {
+
+ @Autowired
+ private FlowTaskProperties flowTaskProperties;
+ @Autowired
+ private TaskRepository taskRepository;
+ @Autowired
+ private FlowInstanceRepository flowInstanceRepository;
+ @Autowired
+ private ServiceTaskInstanceRepository serviceTaskInstanceRepository;
+
+ @Scheduled(fixedDelayString = "${odc.flow.task.heartbeat-timeout-check-interval-millis:15000}")
+ public void cancelHeartbeatTimeoutFlow() {
+ try {
+ long minTimeoutSeconds = RuntimeTaskConstants.DEFAULT_TASK_CHECK_INTERVAL_SECONDS + 5;
+ long timeoutSeconds = this.flowTaskProperties.getHeartbeatTimeoutSeconds();
+ if (timeoutSeconds < minTimeoutSeconds) {
+ timeoutSeconds = minTimeoutSeconds;
+ }
+ Date timeoutBound = new Date(System.currentTimeMillis() - timeoutSeconds * 1000);
+ List taskEntities = this.taskRepository.findAllByLastHeartbeatTimeBefore(timeoutBound);
+ if (CollectionUtils.isEmpty(taskEntities)) {
+ return;
+ }
+ List taskIds = taskEntities.stream().map(TaskEntity::getId).collect(Collectors.toList());
+ log.info("Find the task with heartbeat timeout, timeoutSeconds={}, earliestHeartbeatTime={}, taskIds={}",
+ timeoutSeconds, timeoutBound, taskIds);
+ /**
+ * we just find such flow task instances:
+ *
+ * 1. heartbeat timeout 2. flow task instance is executing
+ *
+ */
+ List candidates = DBSchemaAccessorUtil.partitionFind(taskIds,
+ DBSchemaAccessorUtil.OB_MAX_IN_SIZE,
+ ids -> serviceTaskInstanceRepository.findByTargetTaskIdIn(new HashSet<>(ids))).stream()
+ .filter(entity -> entity.getStatus() == FlowNodeStatus.EXECUTING).collect(Collectors.toList());
+ Set flowTaskInstIds = candidates.stream()
+ .map(ServiceTaskInstanceEntity::getId).collect(Collectors.toSet());
+ List flowInstIds = candidates.stream().map(ServiceTaskInstanceEntity::getFlowInstanceId)
+ .distinct().collect(Collectors.toList());
+ log.info("Find heartbeat timeout flow task instance, timeoutSeconds={}, earliestHeartbeatTime={}, "
+ + "flowTaskInstIds={}, flowInstIds={}", timeoutSeconds, timeoutBound, flowTaskInstIds, flowInstIds);
+ List executeResult =
+ DBSchemaAccessorUtil.partitionFind(flowInstIds, DBSchemaAccessorUtil.OB_MAX_IN_SIZE, ids -> {
+ int affectRows = flowInstanceRepository.updateStatusByIds(ids, FlowStatus.CANCELLED);
+ List result = new ArrayList<>();
+ result.add(affectRows);
+ return result;
+ });
+ log.info("Update flow instance's status succeed, affectRows={}", executeResult);
+ } catch (Exception e) {
+ log.warn("Failed to sync flow instance's status", e);
+ }
+ }
+
+}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/FlowTaskProperties.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/FlowTaskProperties.java
index 36e27b0de2..0550f49287 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/FlowTaskProperties.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/FlowTaskProperties.java
@@ -47,4 +47,8 @@ public class FlowTaskProperties {
@Value("${odc.task.async.index-change-max-timeout-millis:432000000}")
private long indexChangeMaxTimeoutMillisecond;
+
+ @Value("${odc.flow.task.heartbeat-timeout-seconds:15}")
+ private long heartbeatTimeoutSeconds;
+
}
From c61d31b1468932f5923c7e0654fefa0316d1bb71 Mon Sep 17 00:00:00 2001
From: yh263208
Date: Wed, 22 May 2024 16:47:10 +0800
Subject: [PATCH 05/17] format the code
---
.../odc/service/flow/task/FlowTaskSubmitter.java | 11 ++++-------
1 file changed, 4 insertions(+), 7 deletions(-)
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/FlowTaskSubmitter.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/FlowTaskSubmitter.java
index f057e2c198..da6e8b127f 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/FlowTaskSubmitter.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/FlowTaskSubmitter.java
@@ -71,9 +71,8 @@ public void execute(DelegateExecution execution) {
CustomDelegateExecution executionFacade = new CustomDelegateExecution(execution);
long flowInstanceId = FlowTaskUtil.getFlowInstanceId(executionFacade);
String activityId = executionFacade.getCurrentActivityId();
- List> defs =
- FlowUtil.getBoundaryEventDefinitions(execution.getProcessDefinitionId(),
- activityId, ErrorEventDefinition.class);
+ List> defs = FlowUtil.getBoundaryEventDefinitions(
+ execution.getProcessDefinitionId(), activityId, ErrorEventDefinition.class);
threadPoolTaskExecutor.submit(() -> {
FlowTaskInstance flowTaskInstance = null;
try {
@@ -97,8 +96,7 @@ public void execute(DelegateExecution execution) {
private void updateFlowTaskInstance(long flowTaskInstanceId, FlowNodeStatus flowNodeStatus) {
try {
- int affectRows =
- serviceTaskRepository.updateStatusById(flowTaskInstanceId, flowNodeStatus);
+ int affectRows = serviceTaskRepository.updateStatusById(flowTaskInstanceId, flowNodeStatus);
log.info("Modify node instance status successfully, instanceId={}, flowNodeStatus={}, affectRows={}",
flowTaskInstanceId, flowNodeStatus, affectRows);
} catch (Exception ex) {
@@ -107,8 +105,7 @@ private void updateFlowTaskInstance(long flowTaskInstanceId, FlowNodeStatus flow
}
private void handleException(CustomDelegateExecution executionFacade, FlowTaskInstance flowTaskInstance,
- Exception e,
- List> defs) {
+ Exception e, List> defs) {
String processDefinitionId = executionFacade.getProcessDefinitionId();
String activityId = executionFacade.getCurrentActivityId();
if (defs.isEmpty()) {
From 3e5c12ca8212584cab67f8a370274eaa68b7dd8f Mon Sep 17 00:00:00 2001
From: yh263208
Date: Wed, 22 May 2024 16:54:34 +0800
Subject: [PATCH 06/17] format the code
---
.../service/flow/task/FlowTaskSubmitter.java | 18 +++++++++---------
1 file changed, 9 insertions(+), 9 deletions(-)
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/FlowTaskSubmitter.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/FlowTaskSubmitter.java
index da6e8b127f..f43412a4fa 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/FlowTaskSubmitter.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/FlowTaskSubmitter.java
@@ -104,10 +104,10 @@ private void updateFlowTaskInstance(long flowTaskInstanceId, FlowNodeStatus flow
}
}
- private void handleException(CustomDelegateExecution executionFacade, FlowTaskInstance flowTaskInstance,
+ private void handleException(CustomDelegateExecution execution, FlowTaskInstance flowTaskInstance,
Exception e, List> defs) {
- String processDefinitionId = executionFacade.getProcessDefinitionId();
- String activityId = executionFacade.getCurrentActivityId();
+ String processDefinitionId = execution.getProcessDefinitionId();
+ String activityId = execution.getCurrentActivityId();
if (defs.isEmpty()) {
log.warn("No error boundary is defined to handle events, processInstanceId={}, activityId={}",
processDefinitionId, activityId);
@@ -118,12 +118,12 @@ private void handleException(CustomDelegateExecution executionFacade, FlowTaskIn
ErrorEventDefinition eed = eventDefinition.getEventDefinition();
String acceptErrorCode = eed.getErrorCode();
if (Objects.equals(acceptErrorCode, targetErrorCode)) {
- flowTaskCallBackApprovalService.reject(executionFacade.getProcessInstanceId(),
- flowTaskInstance.getId(), executionFacade.getFutureVariable());
+ flowTaskCallBackApprovalService.reject(execution.getProcessInstanceId(),
+ flowTaskInstance.getId(), execution.getFutureVariable());
if (CollectionUtils.isNotEmpty(eventDefinition.getFlowableListeners())) {
- callListener(FlowConstants.EXECUTION_START_EVENT_NAME, executionFacade,
+ callListener(FlowConstants.EXECUTION_START_EVENT_NAME, execution,
eventDefinition.getFlowableListeners());
- callListener(FlowConstants.EXECUTION_END_EVENT_NAME, executionFacade,
+ callListener(FlowConstants.EXECUTION_END_EVENT_NAME, execution,
eventDefinition.getFlowableListeners());
}
return;
@@ -137,8 +137,8 @@ private void handleException(CustomDelegateExecution executionFacade, FlowTaskIn
processDefinitionId, activityId,
defs.stream().map(a -> a.getEventDefinition().getErrorCode()).collect(Collectors.toList()));
}
- flowTaskCallBackApprovalService.approval(executionFacade.getProcessInstanceId(),
- flowTaskInstance.getId(), executionFacade.getFutureVariable());
+ flowTaskCallBackApprovalService.approval(execution.getProcessInstanceId(),
+ flowTaskInstance.getId(), execution.getFutureVariable());
}
private void callListener(String eventName, DelegateExecution execution, List flowableListeners) {
From 356894a369efca97279730b5b1d45f26e4b74d57 Mon Sep 17 00:00:00 2001
From: yh263208
Date: Wed, 22 May 2024 17:44:05 +0800
Subject: [PATCH 07/17] fix illegal multiple task status
---
.../odc/core/flow/BaseFlowableDelegate.java | 5 ++
.../flow/instance/FlowInstanceConfigurer.java | 2 -
.../instance/OdcFlowInstanceConfigurer.java | 2 -
.../ServiceTaskExecutingCompleteListener.java | 2 +-
.../task/BaseRuntimeFlowableDelegate.java | 9 ++++
.../flow/task/CreateExternalApprovalTask.java | 9 ++++
.../service/flow/task/FlowTaskSubmitter.java | 51 ++++++++++++-------
...ipleDatabaseChangeRuntimeFlowableTask.java | 36 +++++++------
8 files changed, 73 insertions(+), 43 deletions(-)
diff --git a/server/odc-core/src/main/java/com/oceanbase/odc/core/flow/BaseFlowableDelegate.java b/server/odc-core/src/main/java/com/oceanbase/odc/core/flow/BaseFlowableDelegate.java
index da27ee6dc6..be3bb10c07 100644
--- a/server/odc-core/src/main/java/com/oceanbase/odc/core/flow/BaseFlowableDelegate.java
+++ b/server/odc-core/src/main/java/com/oceanbase/odc/core/flow/BaseFlowableDelegate.java
@@ -15,7 +15,10 @@
*/
package com.oceanbase.odc.core.flow;
+import java.util.List;
+
import org.flowable.engine.delegate.DelegateExecution;
+import org.flowable.engine.delegate.ExecutionListener;
import org.flowable.engine.delegate.JavaDelegate;
import lombok.extern.slf4j.Slf4j;
@@ -36,6 +39,8 @@ public abstract class BaseFlowableDelegate implements JavaDelegate {
*/
protected abstract void run(DelegateExecution execution) throws Exception;
+ protected abstract List> getExecutionListenerClasses();
+
@Override
public void execute(DelegateExecution execution) {
try {
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/FlowInstanceConfigurer.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/FlowInstanceConfigurer.java
index fcc3e7e1a6..cc4f5b5153 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/FlowInstanceConfigurer.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/FlowInstanceConfigurer.java
@@ -51,7 +51,6 @@
import com.oceanbase.odc.service.flow.listener.BaseTaskBindUserTaskListener;
import com.oceanbase.odc.service.flow.listener.BaseTaskExecutingCompleteListener;
import com.oceanbase.odc.service.flow.listener.GatewayExecutingCompleteListener;
-import com.oceanbase.odc.service.flow.listener.ServiceTaskExecutingCompleteListener;
import com.oceanbase.odc.service.flow.listener.ServiceTaskPendingExpiredListener;
import com.oceanbase.odc.service.flow.listener.ServiceTaskPendingListener;
import com.oceanbase.odc.service.flow.model.ExecutionStrategyConfig;
@@ -144,7 +143,6 @@ public FlowInstanceConfigurer next(@NonNull FlowApprovalInstance nextNode) {
public FlowInstanceConfigurer next(@NonNull FlowTaskInstance nextNode) {
return next(nextNode, serviceTaskBuilder -> {
- serviceTaskBuilder.addExecutionListener(ServiceTaskExecutingCompleteListener.class);
serviceTaskBuilder.setAsynchronous(true);
}, userTaskBuilder -> {
userTaskBuilder.addExecutionListener(ServiceTaskPendingListener.class);
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/OdcFlowInstanceConfigurer.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/OdcFlowInstanceConfigurer.java
index 3b66f27195..74003e162b 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/OdcFlowInstanceConfigurer.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/OdcFlowInstanceConfigurer.java
@@ -24,7 +24,6 @@
import com.oceanbase.odc.service.flow.FlowableAdaptor;
import com.oceanbase.odc.service.flow.listener.PreCheckServiceTaskFailedListener;
import com.oceanbase.odc.service.flow.listener.ServiceTaskCancelledListener;
-import com.oceanbase.odc.service.flow.listener.ServiceTaskExecutingCompleteListener;
import com.oceanbase.odc.service.flow.listener.ServiceTaskExpiredListener;
import com.oceanbase.odc.service.flow.listener.ServiceTaskFailedListener;
import com.oceanbase.odc.service.flow.listener.ServiceTaskPendingExpiredListener;
@@ -85,7 +84,6 @@ public FlowInstanceConfigurer next(@NonNull FlowTaskInstance nextNode) {
});
}
return next(nextNode, serviceTaskBuilder -> {
- serviceTaskBuilder.addExecutionListener(ServiceTaskExecutingCompleteListener.class);
serviceTaskBuilder.setAsynchronous(true);
ErrorBoundaryEventBuilder cancelErrBuilder =
setHandleableError(nextNode, serviceTaskBuilder, ErrorCodes.FlowTaskInstanceCancelled);
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/listener/ServiceTaskExecutingCompleteListener.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/listener/ServiceTaskExecutingCompleteListener.java
index cb480977e2..e490701509 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/listener/ServiceTaskExecutingCompleteListener.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/listener/ServiceTaskExecutingCompleteListener.java
@@ -55,7 +55,7 @@ protected FlowNodeStatus doModifyStatusOnStart(FlowTaskInstance target) {
@Override
protected FlowNodeStatus doModifyStatusOnEnd(FlowTaskInstance target) {
- return FlowNodeStatus.EXECUTING;
+ return internalModify(target, FlowNodeStatus.COMPLETED);
}
@Override
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseRuntimeFlowableDelegate.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseRuntimeFlowableDelegate.java
index 924095add7..0870cff37c 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseRuntimeFlowableDelegate.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseRuntimeFlowableDelegate.java
@@ -15,6 +15,8 @@
*/
package com.oceanbase.odc.service.flow.task;
+import java.util.Collections;
+import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
@@ -27,6 +29,7 @@
import javax.annotation.PostConstruct;
import org.flowable.engine.delegate.DelegateExecution;
+import org.flowable.engine.delegate.ExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import com.oceanbase.odc.common.event.EventPublisher;
@@ -44,6 +47,7 @@
import com.oceanbase.odc.service.flow.exception.ServiceTaskError;
import com.oceanbase.odc.service.flow.instance.FlowTaskInstance;
import com.oceanbase.odc.service.flow.listener.ActiveTaskStatisticsListener;
+import com.oceanbase.odc.service.flow.listener.ServiceTaskExecutingCompleteListener;
import com.oceanbase.odc.service.flow.model.ExecutionStrategyConfig;
import lombok.Getter;
@@ -194,6 +198,11 @@ public void bindToFlowTaskInstance(@NonNull FlowTaskInstance taskInstance) {
}
}
+ @Override
+ public List> getExecutionListenerClasses() {
+ return Collections.singletonList(ServiceTaskExecutingCompleteListener.class);
+ }
+
protected void updateFlowInstanceStatus(@NonNull FlowStatus flowStatus) {
this.retryExecutor.run(() -> {
try {
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/CreateExternalApprovalTask.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/CreateExternalApprovalTask.java
index 4e20ae393b..932e688e6c 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/CreateExternalApprovalTask.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/CreateExternalApprovalTask.java
@@ -16,9 +16,12 @@
package com.oceanbase.odc.service.flow.task;
+import java.util.Collections;
+import java.util.List;
import java.util.Optional;
import org.flowable.engine.delegate.DelegateExecution;
+import org.flowable.engine.delegate.ExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.i18n.LocaleContextHolder;
@@ -33,6 +36,7 @@
import com.oceanbase.odc.metadb.regulation.risklevel.RiskLevelRepository;
import com.oceanbase.odc.service.flow.FlowableAdaptor;
import com.oceanbase.odc.service.flow.instance.FlowApprovalInstance;
+import com.oceanbase.odc.service.flow.listener.ServiceTaskExecutingCompleteListener;
import com.oceanbase.odc.service.flow.model.FlowNodeStatus;
import com.oceanbase.odc.service.flow.task.model.RuntimeTaskConstants;
import com.oceanbase.odc.service.flow.util.FlowTaskUtil;
@@ -140,4 +144,9 @@ private FlowApprovalInstance getFlowApprovalInstance(DelegateExecution execution
return instanceOpt.get();
}
+ @Override
+ public List> getExecutionListenerClasses() {
+ return Collections.singletonList(ServiceTaskExecutingCompleteListener.class);
+ }
+
}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/FlowTaskSubmitter.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/FlowTaskSubmitter.java
index f43412a4fa..ae73d35cd8 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/FlowTaskSubmitter.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/FlowTaskSubmitter.java
@@ -77,8 +77,15 @@ public void execute(DelegateExecution execution) {
FlowTaskInstance flowTaskInstance = null;
try {
flowTaskInstance = getFlowTaskInstance(flowInstanceId, activityId);
- getDelegateInstance(flowTaskInstance).execute(executionFacade);
- updateFlowTaskInstance(flowTaskInstance.getId(), FlowNodeStatus.COMPLETED);
+ BaseRuntimeFlowableDelegate> delegate = getDelegateInstance(flowTaskInstance);
+ List> list = delegate.getExecutionListenerClasses();
+ if (CollectionUtils.isNotEmpty(list)) {
+ list.forEach(c -> doCallListener(FlowConstants.EXECUTION_START_EVENT_NAME, executionFacade, c));
+ }
+ delegate.execute(executionFacade);
+ if (CollectionUtils.isNotEmpty(list)) {
+ list.forEach(c -> doCallListener(FlowConstants.EXECUTION_END_EVENT_NAME, executionFacade, c));
+ }
flowTaskCallBackApprovalService.approval(executionFacade.getProcessInstanceId(),
flowTaskInstance.getId(), executionFacade.getFutureVariable());
} catch (Exception e) {
@@ -120,11 +127,10 @@ private void handleException(CustomDelegateExecution execution, FlowTaskInstance
if (Objects.equals(acceptErrorCode, targetErrorCode)) {
flowTaskCallBackApprovalService.reject(execution.getProcessInstanceId(),
flowTaskInstance.getId(), execution.getFutureVariable());
- if (CollectionUtils.isNotEmpty(eventDefinition.getFlowableListeners())) {
- callListener(FlowConstants.EXECUTION_START_EVENT_NAME, execution,
- eventDefinition.getFlowableListeners());
- callListener(FlowConstants.EXECUTION_END_EVENT_NAME, execution,
- eventDefinition.getFlowableListeners());
+ List listeners = eventDefinition.getFlowableListeners();
+ if (CollectionUtils.isNotEmpty(listeners)) {
+ callListener(FlowConstants.EXECUTION_START_EVENT_NAME, execution, listeners);
+ callListener(FlowConstants.EXECUTION_END_EVENT_NAME, execution, listeners);
}
return;
}
@@ -141,23 +147,30 @@ private void handleException(CustomDelegateExecution execution, FlowTaskInstance
flowTaskInstance.getId(), execution.getFutureVariable());
}
- private void callListener(String eventName, DelegateExecution execution, List flowableListeners) {
- flowableListeners.stream().filter(fl -> eventName.equals(fl.getEvent())).forEach(fl -> {
+ private void callListener(String eventName, DelegateExecution execution, List listeners) {
+ listeners.stream().filter(fl -> eventName.equals(fl.getEvent())).forEach(fl -> {
try {
- Class> clazz = Class.forName(fl.getImplementation(), false,
- Thread.currentThread().getContextClassLoader());
- if (ExecutionListener.class.isAssignableFrom(clazz)) {
- ExecutionListener listener =
- (ExecutionListener) BeanInjectedClassDelegate.instantiateDelegate(clazz);
- execution.setEventName(fl.getEvent());
- listener.notify(execution);
- }
- } catch (Exception ex) {
- log.warn("Call execution listener occur error: ", ex);
+ doCallListener(fl.getEvent(), execution,
+ Class.forName(fl.getImplementation(), false, Thread.currentThread().getContextClassLoader()));
+ } catch (Exception e) {
+ log.warn("Failed to load execution class, className={}", fl.getImplementation(), e);
}
});
}
+ private void doCallListener(String eventName, DelegateExecution execution, Class> listenerClass) {
+ try {
+ if (ExecutionListener.class.isAssignableFrom(listenerClass)) {
+ ExecutionListener listener =
+ (ExecutionListener) BeanInjectedClassDelegate.instantiateDelegate(listenerClass);
+ execution.setEventName(eventName);
+ listener.notify(execution);
+ }
+ } catch (Exception e) {
+ log.warn("Failed to call execution listener", e);
+ }
+ }
+
private BaseRuntimeFlowableDelegate> getDelegateInstance(FlowTaskInstance flowTaskInstance) throws Exception {
OdcRuntimeDelegateMapper mapper = new OdcRuntimeDelegateMapper();
Class extends BaseRuntimeFlowableDelegate>> delegateClass =
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MultipleDatabaseChangeRuntimeFlowableTask.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MultipleDatabaseChangeRuntimeFlowableTask.java
index dee0b62789..5c818c7015 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MultipleDatabaseChangeRuntimeFlowableTask.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MultipleDatabaseChangeRuntimeFlowableTask.java
@@ -19,10 +19,10 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.stream.Collectors;
import org.flowable.engine.delegate.DelegateExecution;
+import org.flowable.engine.delegate.ExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
@@ -42,7 +42,6 @@
import com.oceanbase.odc.service.databasechange.model.DatabaseChangingRecord;
import com.oceanbase.odc.service.flow.FlowInstanceService;
import com.oceanbase.odc.service.flow.FlowableAdaptor;
-import com.oceanbase.odc.service.flow.instance.FlowTaskInstance;
import com.oceanbase.odc.service.flow.model.CreateFlowInstanceReq;
import com.oceanbase.odc.service.flow.model.FlowInstanceDetailResp;
import com.oceanbase.odc.service.flow.model.FlowNodeStatus;
@@ -111,6 +110,7 @@ protected Void start(Long taskId, TaskService taskService, DelegateExecution exe
throws InterruptedException {
MultipleDatabaseChangeTraceContextHolder.trace(taskId);
try {
+ serviceTaskInstanceRepository.updateStatusById(getTargetTaskInstanceId(), FlowNodeStatus.EXECUTING);
FlowInstanceDetailResp flowInstanceDetailResp = flowInstanceService.detail(getFlowInstanceId());
this.flowTaskExecutionStrategy = flowInstanceDetailResp.getExecutionStrategy();
TaskEntity detail = taskService.detail(taskId);
@@ -213,8 +213,7 @@ protected void onSuccessful(Long taskId, TaskService taskService) {
MultipleDatabaseChangeTraceContextHolder.trace(taskId);
log.info("Multiple database task succeed, taskId={}, batchId={}", taskId, this.batchId + 1);
if (this.batchId == batchSum - 1) {
- List list = flowInstanceService
- .getFlowInstanceByParentId(getFlowInstanceId());
+ List list = flowInstanceService.getFlowInstanceByParentId(getFlowInstanceId());
if (list.stream().allMatch(e -> FlowStatus.EXECUTION_SUCCEEDED == e.getStatus())) {
updateFlowInstanceStatus(FlowStatus.EXECUTION_SUCCEEDED);
} else {
@@ -225,11 +224,8 @@ protected void onSuccessful(Long taskId, TaskService taskService) {
taskService.updateProgress(taskId, (this.batchId + 1) * 100D / this.batchSum);
}
super.onSuccessful(taskId, taskService);
- Optional taskInstanceByActivityId = flowableAdaptor
- .getTaskInstanceByActivityId(getActivityId(), getFlowInstanceId());
if (!this.isSuccessful) {
- serviceTaskInstanceRepository.updateStatusById(
- taskInstanceByActivityId.get().getId(), FlowNodeStatus.FAILED);
+ serviceTaskInstanceRepository.updateStatusById(getTargetTaskInstanceId(), FlowNodeStatus.FAILED);
}
} catch (Exception e) {
log.warn("Multiple database task failed, taskId={}, batchId={}", taskId,
@@ -269,19 +265,16 @@ private MultipleDatabaseChangeTaskResult generateResult() {
flowInstanceDetailResp -> flowInstanceDetailResp));
List idList = this.orderedDatabaseIds.stream().flatMap(Collection::stream).collect(Collectors.toList());
List databaseList = databaseService.listDatabasesDetailsByIds(idList);
- ArrayList databaseChangingRecords = new ArrayList<>();
+ ArrayList records = new ArrayList<>();
for (Database database : databaseList) {
- FlowInstanceDetailResp flowInstanceDetailResp = databaseId2FlowInstanceDetailResp.get(database.getId());
- DatabaseChangingRecord databaseChangingRecord = new DatabaseChangingRecord();
- databaseChangingRecord.setDatabase(new DatabaseChangeDatabase(database));
- databaseChangingRecord.setFlowInstanceDetailResp(
- flowInstanceDetailResp != null ? new DatabaseChangeFlowInstanceDetailResp(flowInstanceDetailResp)
- : null);
- databaseChangingRecord.setStatus(flowInstanceDetailResp != null ? flowInstanceDetailResp.getStatus()
- : FlowStatus.WAIT_FOR_EXECUTION);
- databaseChangingRecords.add(databaseChangingRecord);
+ FlowInstanceDetailResp resp = databaseId2FlowInstanceDetailResp.get(database.getId());
+ DatabaseChangingRecord record = new DatabaseChangingRecord();
+ record.setDatabase(new DatabaseChangeDatabase(database));
+ record.setFlowInstanceDetailResp(resp != null ? new DatabaseChangeFlowInstanceDetailResp(resp) : null);
+ record.setStatus(resp != null ? resp.getStatus() : FlowStatus.WAIT_FOR_EXECUTION);
+ records.add(record);
}
- result.setDatabaseChangingRecordList(databaseChangingRecords);
+ result.setDatabaseChangingRecordList(records);
return result;
}
@@ -296,4 +289,9 @@ private Boolean isContinue(FlowTaskExecutionStrategy flowTaskExecutionStrategy,
}
}
+ @Override
+ public List> getExecutionListenerClasses() {
+ return null;
+ }
+
}
From 3adf811f907a58c64a92cf95df0c3984293d373c Mon Sep 17 00:00:00 2001
From: yh263208
Date: Wed, 22 May 2024 19:19:06 +0800
Subject: [PATCH 08/17] opt code
---
.../ServiceTaskInstanceRepositoryTest.java | 17 ++++--
.../odc/metadb/task/TaskRepositoryTest.java | 12 +++--
.../flow/tool/TestFlowableDelegateImpl.java | 10 ++++
.../flow/ServiceTaskInstanceRepository.java | 8 ++-
.../odc/metadb/task/TaskRepository.java | 2 +-
.../odc/service/flow/FlowSchedules.java | 53 +++++++++++--------
6 files changed, 69 insertions(+), 33 deletions(-)
diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepositoryTest.java b/server/integration-test/src/test/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepositoryTest.java
index 16683943e2..62af04f2c3 100644
--- a/server/integration-test/src/test/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepositoryTest.java
+++ b/server/integration-test/src/test/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepositoryTest.java
@@ -17,11 +17,9 @@
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Random;
-import java.util.Set;
import java.util.UUID;
import org.junit.After;
@@ -70,10 +68,19 @@ public void clearAll() {
@Test
public void findByTargetTaskIdIn_entityExists_returnNotEmpty() {
ServiceTaskInstanceEntity entity = createEntity();
+ entity.setStatus(FlowNodeStatus.CANCELLED);
entity = this.repository.save(entity);
- Set ids = new HashSet<>();
- ids.add(entity.getTargetTaskId());
- List actual = this.repository.findByTargetTaskIdIn(ids);
+ this.repository.updateStatusByIdIn(Collections.singletonList(entity.getId()), FlowNodeStatus.FAILED);
+ Optional optional = this.repository.findById(entity.getId());
+ entity.setStatus(FlowNodeStatus.FAILED);
+ Assert.assertEquals(entity, optional.get());
+ }
+
+ @Test
+ public void findByStatus_entityExists_returnNotEmpty() {
+ ServiceTaskInstanceEntity entity = createEntity();
+ entity = this.repository.save(entity);
+ List actual = this.repository.findByStatus(entity.getStatus());
Assert.assertEquals(Collections.singletonList(entity), actual);
}
diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/metadb/task/TaskRepositoryTest.java b/server/integration-test/src/test/java/com/oceanbase/odc/metadb/task/TaskRepositoryTest.java
index 2ccaedb46f..53415cbd60 100644
--- a/server/integration-test/src/test/java/com/oceanbase/odc/metadb/task/TaskRepositoryTest.java
+++ b/server/integration-test/src/test/java/com/oceanbase/odc/metadb/task/TaskRepositoryTest.java
@@ -15,6 +15,7 @@
*/
package com.oceanbase.odc.metadb.task;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Optional;
@@ -49,7 +50,7 @@ public void updateLastHeartbeatTimeById_updateSucceed_returnNotNull() {
}
@Test
- public void findAllByLastHeartbeatTimeBefore_findBeforeNow_returnNotNull() {
+ public void findAllByLastHeartbeatTimeBeforeAndIdIn_findBeforeNow_returnNotNull() {
TaskEntity taskEntity = TestRandom.nextObject(TaskEntity.class);
taskEntity.setId(null);
taskEntity.setLastHeartbeatTime(null);
@@ -57,20 +58,21 @@ public void findAllByLastHeartbeatTimeBefore_findBeforeNow_returnNotNull() {
this.taskRepository.updateLastHeartbeatTimeById(taskEntity.getId());
Optional optional = this.taskRepository.findById(taskEntity.getId());
Date heartbeatTime = new Date(optional.get().getLastHeartbeatTime().getTime() + 1);
- List actual = this.taskRepository.findAllByLastHeartbeatTimeBefore(heartbeatTime);
+ List actual = this.taskRepository.findAllByLastHeartbeatTimeBeforeAndIdIn(
+ heartbeatTime, Collections.singletonList(taskEntity.getId()));
Assert.assertFalse(actual.isEmpty());
}
@Test
- public void findAllByLastHeartbeatTimeBefore_findBeforeLastHeartbeatTime_returnEmpty() {
+ public void findAllByLastHeartbeatTimeBeforeAndIdIn_findBeforeLastHeartbeatTime_returnEmpty() {
TaskEntity taskEntity = TestRandom.nextObject(TaskEntity.class);
taskEntity.setId(null);
taskEntity.setLastHeartbeatTime(null);
taskEntity = this.taskRepository.save(taskEntity);
this.taskRepository.updateLastHeartbeatTimeById(taskEntity.getId());
Optional optional = this.taskRepository.findById(taskEntity.getId());
- List actual = this.taskRepository
- .findAllByLastHeartbeatTimeBefore(optional.get().getLastHeartbeatTime());
+ List actual = this.taskRepository.findAllByLastHeartbeatTimeBeforeAndIdIn(
+ optional.get().getLastHeartbeatTime(), Collections.singletonList(taskEntity.getId()));
Assert.assertTrue(actual.isEmpty());
}
diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/tool/TestFlowableDelegateImpl.java b/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/tool/TestFlowableDelegateImpl.java
index cfb0617a1e..402518c519 100644
--- a/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/tool/TestFlowableDelegateImpl.java
+++ b/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/tool/TestFlowableDelegateImpl.java
@@ -15,9 +15,14 @@
*/
package com.oceanbase.odc.service.flow.tool;
+import java.util.Collections;
+import java.util.List;
+
import org.flowable.engine.delegate.DelegateExecution;
+import org.flowable.engine.delegate.ExecutionListener;
import com.oceanbase.odc.core.flow.BaseFlowableDelegate;
+import com.oceanbase.odc.service.flow.listener.ServiceTaskExecutingCompleteListener;
import lombok.extern.slf4j.Slf4j;
@@ -29,4 +34,9 @@ protected void run(DelegateExecution execution) {
log.info("Service task run, currentActivityId={}, processInstanceId={}", execution.getCurrentActivityId(),
execution.getProcessInstanceId());
}
+
+ @Override
+ protected List> getExecutionListenerClasses() {
+ return Collections.singletonList(ServiceTaskExecutingCompleteListener.class);
+ }
}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepository.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepository.java
index ea614b936b..7476fb224d 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepository.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepository.java
@@ -62,7 +62,13 @@ public interface ServiceTaskInstanceRepository extends OdcJpaRepository findByFlowInstanceIdIn(Set flowInstanceIds);
- List findByTargetTaskIdIn(Set flowInstanceIds);
+ @Transactional
+ @Query(value = "update flow_instance_node_task set status=:#{#status.name()} where id in (:ids)",
+ nativeQuery = true)
+ @Modifying
+ int updateStatusByIdIn(@Param("ids") List ids, @Param("status") FlowNodeStatus status);
+
+ List findByStatus(FlowNodeStatus status);
@Query(value = "select na.* from flow_instance_node_task as na inner join flow_instance_node as n on na.id=n.instance_id "
+ "where n.instance_type=:#{#instanceType.name()} and n.activity_id=:activityId and n.flow_instance_id=:flowInstanceId",
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskRepository.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskRepository.java
index d040e0170a..f9daf8998e 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskRepository.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskRepository.java
@@ -49,7 +49,7 @@ public interface TaskRepository extends JpaRepository, JpaSpec
@Modifying
int updateLastHeartbeatTimeById(@Param("id") Long id);
- List findAllByLastHeartbeatTimeBefore(Date lastHeartbeatTime);
+ List findAllByLastHeartbeatTimeBeforeAndIdIn(Date lastHeartbeatTime, List ids);
@Transactional
@Query("update TaskEntity set parametersJson=:#{#param.parametersJson} where id=:#{#param.id}")
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java
index 7388f4b282..11fdf43368 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java
@@ -15,9 +15,8 @@
*/
package com.oceanbase.odc.service.flow;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@@ -68,38 +67,50 @@ public void cancelHeartbeatTimeoutFlow() {
if (timeoutSeconds < minTimeoutSeconds) {
timeoutSeconds = minTimeoutSeconds;
}
+ List taskInstanceEntities = this.serviceTaskInstanceRepository
+ .findByStatus(FlowNodeStatus.EXECUTING).stream()
+ .filter(e -> e.getTargetTaskId() != null).collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(taskInstanceEntities)) {
+ return;
+ }
+ List taskIds = taskInstanceEntities.stream()
+ .map(ServiceTaskInstanceEntity::getTargetTaskId).distinct().collect(Collectors.toList());
Date timeoutBound = new Date(System.currentTimeMillis() - timeoutSeconds * 1000);
- List taskEntities = this.taskRepository.findAllByLastHeartbeatTimeBefore(timeoutBound);
- if (CollectionUtils.isEmpty(taskEntities)) {
+ List heartbeatTimeoutTasks = DBSchemaAccessorUtil.partitionFind(taskIds,
+ DBSchemaAccessorUtil.OB_MAX_IN_SIZE,
+ ids -> taskRepository.findAllByLastHeartbeatTimeBeforeAndIdIn(timeoutBound, ids));
+ if (CollectionUtils.isEmpty(heartbeatTimeoutTasks)) {
return;
}
- List taskIds = taskEntities.stream().map(TaskEntity::getId).collect(Collectors.toList());
- log.info("Find the task with heartbeat timeout, timeoutSeconds={}, earliestHeartbeatTime={}, taskIds={}",
- timeoutSeconds, timeoutBound, taskIds);
+ Set heartbeatTimeoutTaskIds = heartbeatTimeoutTasks.stream()
+ .map(TaskEntity::getId).collect(Collectors.toSet());
+ log.info("Find the task with heartbeat timeout, timeoutSeconds={}, earliestHeartbeatTime={}, "
+ + "heartbeatTimeoutTaskIds={}", timeoutSeconds, timeoutBound, heartbeatTimeoutTaskIds);
/**
* we just find such flow task instances:
*
* 1. heartbeat timeout 2. flow task instance is executing
*
*/
- List candidates = DBSchemaAccessorUtil.partitionFind(taskIds,
- DBSchemaAccessorUtil.OB_MAX_IN_SIZE,
- ids -> serviceTaskInstanceRepository.findByTargetTaskIdIn(new HashSet<>(ids))).stream()
- .filter(entity -> entity.getStatus() == FlowNodeStatus.EXECUTING).collect(Collectors.toList());
- Set flowTaskInstIds = candidates.stream()
- .map(ServiceTaskInstanceEntity::getId).collect(Collectors.toSet());
+ List candidates = taskInstanceEntities.stream()
+ .filter(e -> heartbeatTimeoutTaskIds.contains(e.getTargetTaskId()))
+ .collect(Collectors.toList());
+ List candidateIds = candidates.stream()
+ .map(ServiceTaskInstanceEntity::getId).distinct().collect(Collectors.toList());
List flowInstIds = candidates.stream().map(ServiceTaskInstanceEntity::getFlowInstanceId)
.distinct().collect(Collectors.toList());
log.info("Find heartbeat timeout flow task instance, timeoutSeconds={}, earliestHeartbeatTime={}, "
- + "flowTaskInstIds={}, flowInstIds={}", timeoutSeconds, timeoutBound, flowTaskInstIds, flowInstIds);
- List executeResult =
- DBSchemaAccessorUtil.partitionFind(flowInstIds, DBSchemaAccessorUtil.OB_MAX_IN_SIZE, ids -> {
- int affectRows = flowInstanceRepository.updateStatusByIds(ids, FlowStatus.CANCELLED);
- List result = new ArrayList<>();
- result.add(affectRows);
- return result;
- });
+ + "flowTaskInstIds={}, flowInstIds={}", timeoutSeconds, timeoutBound, candidateIds, flowInstIds);
+ List executeResult = DBSchemaAccessorUtil.partitionFind(flowInstIds,
+ DBSchemaAccessorUtil.OB_MAX_IN_SIZE,
+ ids -> Collections.singletonList(flowInstanceRepository
+ .updateStatusByIds(ids, FlowStatus.CANCELLED)));
log.info("Update flow instance's status succeed, affectRows={}", executeResult);
+ executeResult = DBSchemaAccessorUtil.partitionFind(candidateIds,
+ DBSchemaAccessorUtil.OB_MAX_IN_SIZE,
+ ids -> Collections.singletonList(serviceTaskInstanceRepository
+ .updateStatusByIdIn(ids, FlowNodeStatus.CANCELLED)));
+ log.info("Update flow task instance's status succeed, affectRows={}", executeResult);
} catch (Exception e) {
log.warn("Failed to sync flow instance's status", e);
}
From b7552fbfc5a10893a4783da0d369ace13ed6cdf2 Mon Sep 17 00:00:00 2001
From: yh263208
Date: Wed, 22 May 2024 19:33:16 +0800
Subject: [PATCH 09/17] opt code
---
.../java/com/oceanbase/odc/service/flow/FlowSchedules.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java
index 11fdf43368..8e324f3753 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java
@@ -105,12 +105,14 @@ public void cancelHeartbeatTimeoutFlow() {
DBSchemaAccessorUtil.OB_MAX_IN_SIZE,
ids -> Collections.singletonList(flowInstanceRepository
.updateStatusByIds(ids, FlowStatus.CANCELLED)));
- log.info("Update flow instance's status succeed, affectRows={}", executeResult);
+ log.info("Update flow instance's status succeed, affectRows={}, flowInstIds={}", executeResult,
+ flowInstIds);
executeResult = DBSchemaAccessorUtil.partitionFind(candidateIds,
DBSchemaAccessorUtil.OB_MAX_IN_SIZE,
ids -> Collections.singletonList(serviceTaskInstanceRepository
.updateStatusByIdIn(ids, FlowNodeStatus.CANCELLED)));
- log.info("Update flow task instance's status succeed, affectRows={}", executeResult);
+ log.info("Update flow task instance's status succeed, affectRows={}, "
+ + "flowTaskInstIds={}", executeResult, candidateIds);
} catch (Exception e) {
log.warn("Failed to sync flow instance's status", e);
}
From 5dcf2c68d6711099aa49a06f747e67fd2d4b8c18 Mon Sep 17 00:00:00 2001
From: yh263208
Date: Wed, 22 May 2024 20:39:49 +0800
Subject: [PATCH 10/17] optimize code
---
.../service/flow/task/BaseODCFlowTaskDelegate.java | 14 +++++++++-----
.../flow/task/BaseRuntimeFlowableDelegate.java | 3 +++
.../MultipleDatabaseChangeRuntimeFlowableTask.java | 11 +++++++++--
3 files changed, 21 insertions(+), 7 deletions(-)
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java
index c0302ca0e8..14acc0d2d5 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java
@@ -27,6 +27,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.collections4.CollectionUtils;
import org.flowable.engine.delegate.DelegateExecution;
import org.springframework.beans.factory.annotation.Autowired;
@@ -119,12 +120,12 @@ private void initMonitorExecutor() {
int interval = RuntimeTaskConstants.DEFAULT_TASK_CHECK_INTERVAL_SECONDS;
scheduleExecutor.scheduleAtFixedRate(() -> {
try {
+ try {
+ this.taskService.updateHeartbeatTime(getTargetTaskId());
+ } catch (Exception e) {
+ log.warn("Failed to update heartbeat time, taskId={}", getTargetTaskId(), e);
+ }
if (taskLatch.getCount() > 0) {
- try {
- this.taskService.updateHeartbeatTime(taskId);
- } catch (Exception e) {
- log.warn("Failed to update heartbeat time, taskId={}", taskId, e);
- }
onProgressUpdate(taskId, taskService);
}
} catch (Exception e) {
@@ -363,6 +364,9 @@ private void doSetDownloadLogUrl() throws IOException, NotFoundException {
}
List extends FlowTaskResult> flowTaskResults =
flowTaskInstanceService.getTaskResultFromEntity(taskEntity, false);
+ if (CollectionUtils.isEmpty(flowTaskResults)) {
+ return;
+ }
Verify.singleton(flowTaskResults, "flowTaskResults");
if (flowTaskResults.get(0) instanceof AbstractFlowTaskResult) {
FlowTaskResult flowTaskResult = flowTaskResults.get(0);
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseRuntimeFlowableDelegate.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseRuntimeFlowableDelegate.java
index 0870cff37c..7e7878e554 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseRuntimeFlowableDelegate.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseRuntimeFlowableDelegate.java
@@ -70,6 +70,8 @@ public abstract class BaseRuntimeFlowableDelegate extends BaseFlowableDelegat
@Getter
private Long targetTaskInstanceId;
@Getter
+ private Long targetTaskId;
+ @Getter
private TaskType taskType;
@Getter
private Long flowInstanceId;
@@ -185,6 +187,7 @@ private void initTargetTaskInstanceId(DelegateExecution execution) {
this.targetTaskInstanceId = flowTaskInstance.getId();
this.taskType = flowTaskInstance.getTaskType();
+ this.targetTaskId = flowTaskInstance.getTargetTaskId();
this.strategyConfig = flowTaskInstance.getStrategyConfig();
flowTaskInstance.dealloc();
}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MultipleDatabaseChangeRuntimeFlowableTask.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MultipleDatabaseChangeRuntimeFlowableTask.java
index 5c818c7015..b5d5ef39ff 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MultipleDatabaseChangeRuntimeFlowableTask.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MultipleDatabaseChangeRuntimeFlowableTask.java
@@ -32,6 +32,7 @@
import com.oceanbase.odc.core.shared.constant.TaskErrorStrategy;
import com.oceanbase.odc.core.shared.constant.TaskType;
import com.oceanbase.odc.metadb.flow.FlowInstanceEntity;
+import com.oceanbase.odc.metadb.flow.FlowInstanceRepository;
import com.oceanbase.odc.metadb.flow.ServiceTaskInstanceRepository;
import com.oceanbase.odc.metadb.task.TaskEntity;
import com.oceanbase.odc.service.connection.database.DatabaseService;
@@ -72,6 +73,8 @@ public class MultipleDatabaseChangeRuntimeFlowableTask extends BaseODCFlowTaskDe
@Autowired
private FlowInstanceService flowInstanceService;
@Autowired
+ private FlowInstanceRepository flowInstanceRepository;
+ @Autowired
private DatabaseService databaseService;
@Autowired
private FlowableAdaptor flowableAdaptor;
@@ -110,7 +113,8 @@ protected Void start(Long taskId, TaskService taskService, DelegateExecution exe
throws InterruptedException {
MultipleDatabaseChangeTraceContextHolder.trace(taskId);
try {
- serviceTaskInstanceRepository.updateStatusById(getTargetTaskInstanceId(), FlowNodeStatus.EXECUTING);
+ this.flowInstanceRepository.updateStatusById(getFlowInstanceId(), FlowStatus.EXECUTING);
+ this.serviceTaskInstanceRepository.updateStatusById(getTargetTaskInstanceId(), FlowNodeStatus.EXECUTING);
FlowInstanceDetailResp flowInstanceDetailResp = flowInstanceService.detail(getFlowInstanceId());
this.flowTaskExecutionStrategy = flowInstanceDetailResp.getExecutionStrategy();
TaskEntity detail = taskService.detail(taskId);
@@ -225,7 +229,10 @@ protected void onSuccessful(Long taskId, TaskService taskService) {
}
super.onSuccessful(taskId, taskService);
if (!this.isSuccessful) {
- serviceTaskInstanceRepository.updateStatusById(getTargetTaskInstanceId(), FlowNodeStatus.FAILED);
+ this.serviceTaskInstanceRepository.updateStatusById(getTargetTaskInstanceId(), FlowNodeStatus.FAILED);
+ } else {
+ this.serviceTaskInstanceRepository.updateStatusById(getTargetTaskInstanceId(),
+ FlowNodeStatus.COMPLETED);
}
} catch (Exception e) {
log.warn("Multiple database task failed, taskId={}, batchId={}", taskId,
From 8a1ce5a59aea57e8dd675a0781892de1bd3e5163 Mon Sep 17 00:00:00 2001
From: yh263208
Date: Thu, 23 May 2024 10:05:18 +0800
Subject: [PATCH 11/17] opt code
---
.../oceanbase/odc/service/flow/FlowSchedules.java | 13 ++++++++-----
1 file changed, 8 insertions(+), 5 deletions(-)
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java
index 8e324f3753..9d43de031a 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java
@@ -61,12 +61,15 @@ public class FlowSchedules {
@Scheduled(fixedDelayString = "${odc.flow.task.heartbeat-timeout-check-interval-millis:15000}")
public void cancelHeartbeatTimeoutFlow() {
+ long timeoutSeconds = this.flowTaskProperties.getHeartbeatTimeoutSeconds();
+ if (timeoutSeconds <= 0) {
+ return;
+ }
+ long minTimeoutSeconds = RuntimeTaskConstants.DEFAULT_TASK_CHECK_INTERVAL_SECONDS * 3;
+ if (timeoutSeconds < minTimeoutSeconds) {
+ timeoutSeconds = minTimeoutSeconds;
+ }
try {
- long minTimeoutSeconds = RuntimeTaskConstants.DEFAULT_TASK_CHECK_INTERVAL_SECONDS + 5;
- long timeoutSeconds = this.flowTaskProperties.getHeartbeatTimeoutSeconds();
- if (timeoutSeconds < minTimeoutSeconds) {
- timeoutSeconds = minTimeoutSeconds;
- }
List taskInstanceEntities = this.serviceTaskInstanceRepository
.findByStatus(FlowNodeStatus.EXECUTING).stream()
.filter(e -> e.getTargetTaskId() != null).collect(Collectors.toList());
From e6c14684e6f456669b78277c4c7aeebac67dfe7d Mon Sep 17 00:00:00 2001
From: yh263208
Date: Thu, 23 May 2024 10:11:59 +0800
Subject: [PATCH 12/17] add switch off
---
.../oceanbase/odc/service/flow/FlowSchedules.java | 14 ++++++--------
1 file changed, 6 insertions(+), 8 deletions(-)
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java
index 9d43de031a..6477ba3c7c 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java
@@ -50,6 +50,7 @@
@Component
public class FlowSchedules {
+ private static final Integer OB_MAX_IN_SIZE = 2000;
@Autowired
private FlowTaskProperties flowTaskProperties;
@Autowired
@@ -80,8 +81,7 @@ public void cancelHeartbeatTimeoutFlow() {
.map(ServiceTaskInstanceEntity::getTargetTaskId).distinct().collect(Collectors.toList());
Date timeoutBound = new Date(System.currentTimeMillis() - timeoutSeconds * 1000);
List heartbeatTimeoutTasks = DBSchemaAccessorUtil.partitionFind(taskIds,
- DBSchemaAccessorUtil.OB_MAX_IN_SIZE,
- ids -> taskRepository.findAllByLastHeartbeatTimeBeforeAndIdIn(timeoutBound, ids));
+ OB_MAX_IN_SIZE, ids -> taskRepository.findAllByLastHeartbeatTimeBeforeAndIdIn(timeoutBound, ids));
if (CollectionUtils.isEmpty(heartbeatTimeoutTasks)) {
return;
}
@@ -105,15 +105,13 @@ public void cancelHeartbeatTimeoutFlow() {
log.info("Find heartbeat timeout flow task instance, timeoutSeconds={}, earliestHeartbeatTime={}, "
+ "flowTaskInstIds={}, flowInstIds={}", timeoutSeconds, timeoutBound, candidateIds, flowInstIds);
List executeResult = DBSchemaAccessorUtil.partitionFind(flowInstIds,
- DBSchemaAccessorUtil.OB_MAX_IN_SIZE,
- ids -> Collections.singletonList(flowInstanceRepository
- .updateStatusByIds(ids, FlowStatus.CANCELLED)));
+ OB_MAX_IN_SIZE, ids -> Collections.singletonList(
+ flowInstanceRepository.updateStatusByIds(ids, FlowStatus.CANCELLED)));
log.info("Update flow instance's status succeed, affectRows={}, flowInstIds={}", executeResult,
flowInstIds);
executeResult = DBSchemaAccessorUtil.partitionFind(candidateIds,
- DBSchemaAccessorUtil.OB_MAX_IN_SIZE,
- ids -> Collections.singletonList(serviceTaskInstanceRepository
- .updateStatusByIdIn(ids, FlowNodeStatus.CANCELLED)));
+ OB_MAX_IN_SIZE, ids -> Collections.singletonList(
+ serviceTaskInstanceRepository.updateStatusByIdIn(ids, FlowNodeStatus.CANCELLED)));
log.info("Update flow task instance's status succeed, affectRows={}, "
+ "flowTaskInstIds={}", executeResult, candidateIds);
} catch (Exception e) {
From 8ceeef3d96fc814bfe5a4371775428a337c19f09 Mon Sep 17 00:00:00 2001
From: yh263208
Date: Thu, 23 May 2024 10:18:20 +0800
Subject: [PATCH 13/17] modify comment
---
.../migrate/common/V_4_3_0_6__add_flow_task_heartbeat.sql | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/server/odc-migrate/src/main/resources/migrate/common/V_4_3_0_6__add_flow_task_heartbeat.sql b/server/odc-migrate/src/main/resources/migrate/common/V_4_3_0_6__add_flow_task_heartbeat.sql
index 326665da3e..66d3a5228e 100644
--- a/server/odc-migrate/src/main/resources/migrate/common/V_4_3_0_6__add_flow_task_heartbeat.sql
+++ b/server/odc-migrate/src/main/resources/migrate/common/V_4_3_0_6__add_flow_task_heartbeat.sql
@@ -1 +1 @@
-alter table `task_task` add column `last_heartbeat_time` datetime default null comment 'Last heart beat time';
\ No newline at end of file
+alter table `task_task` add column `last_heartbeat_time` datetime default null comment 'Last heartbeat time';
\ No newline at end of file
From c0772a1bdec5278b6f66943cd8ad464a1b9ddd29 Mon Sep 17 00:00:00 2001
From: yh263208
Date: Thu, 23 May 2024 14:13:24 +0800
Subject: [PATCH 14/17] update heartbeat time when task is started
---
.../service/flow/task/BaseODCFlowTaskDelegate.java | 8 +-------
.../flow/task/BaseRuntimeFlowableDelegate.java | 11 +++++++++++
.../odc/service/flow/task/FlowTaskSubmitter.java | 1 +
3 files changed, 13 insertions(+), 7 deletions(-)
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java
index 14acc0d2d5..98c5f3563e 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java
@@ -78,8 +78,6 @@
@Slf4j
public abstract class BaseODCFlowTaskDelegate extends BaseRuntimeFlowableDelegate {
- @Autowired
- private TaskService taskService;
@Autowired
protected HostProperties hostProperties;
@Autowired
@@ -120,11 +118,7 @@ private void initMonitorExecutor() {
int interval = RuntimeTaskConstants.DEFAULT_TASK_CHECK_INTERVAL_SECONDS;
scheduleExecutor.scheduleAtFixedRate(() -> {
try {
- try {
- this.taskService.updateHeartbeatTime(getTargetTaskId());
- } catch (Exception e) {
- log.warn("Failed to update heartbeat time, taskId={}", getTargetTaskId(), e);
- }
+ updateHeartbeatTime();
if (taskLatch.getCount() > 0) {
onProgressUpdate(taskId, taskService);
}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseRuntimeFlowableDelegate.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseRuntimeFlowableDelegate.java
index 7e7878e554..47796784a9 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseRuntimeFlowableDelegate.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseRuntimeFlowableDelegate.java
@@ -49,6 +49,7 @@
import com.oceanbase.odc.service.flow.listener.ActiveTaskStatisticsListener;
import com.oceanbase.odc.service.flow.listener.ServiceTaskExecutingCompleteListener;
import com.oceanbase.odc.service.flow.model.ExecutionStrategyConfig;
+import com.oceanbase.odc.service.task.TaskService;
import lombok.Getter;
import lombok.NonNull;
@@ -82,6 +83,8 @@ public abstract class BaseRuntimeFlowableDelegate extends BaseFlowableDelegat
@Autowired
private EventPublisher eventPublisher;
@Autowired
+ protected TaskService taskService;
+ @Autowired
private FlowInstanceRepository flowInstanceRepository;
private volatile T returnObject = null;
private volatile Exception thrown = null;
@@ -201,6 +204,14 @@ public void bindToFlowTaskInstance(@NonNull FlowTaskInstance taskInstance) {
}
}
+ public void updateHeartbeatTime() {
+ try {
+ this.taskService.updateHeartbeatTime(getTargetTaskId());
+ } catch (Exception e) {
+ log.warn("Failed to update heartbeat time, taskId={}", getTargetTaskId(), e);
+ }
+ }
+
@Override
public List> getExecutionListenerClasses() {
return Collections.singletonList(ServiceTaskExecutingCompleteListener.class);
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/FlowTaskSubmitter.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/FlowTaskSubmitter.java
index ae73d35cd8..bef000a7ec 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/FlowTaskSubmitter.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/FlowTaskSubmitter.java
@@ -78,6 +78,7 @@ public void execute(DelegateExecution execution) {
try {
flowTaskInstance = getFlowTaskInstance(flowInstanceId, activityId);
BaseRuntimeFlowableDelegate> delegate = getDelegateInstance(flowTaskInstance);
+ delegate.updateHeartbeatTime();
List> list = delegate.getExecutionListenerClasses();
if (CollectionUtils.isNotEmpty(list)) {
list.forEach(c -> doCallListener(FlowConstants.EXECUTION_START_EVENT_NAME, executionFacade, c));
From 1056b87a0642c80e94fb989c3bc27f172823fd8b Mon Sep 17 00:00:00 2001
From: yh263208
Date: Thu, 23 May 2024 14:42:29 +0800
Subject: [PATCH 15/17] add transaction control
---
.../ServiceTaskInstanceRepositoryTest.java | 12 ++++-
.../flow/ServiceTaskInstanceRepository.java | 6 +++
.../odc/service/flow/FlowSchedules.java | 48 ++++++++++++-------
3 files changed, 47 insertions(+), 19 deletions(-)
diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepositoryTest.java b/server/integration-test/src/test/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepositoryTest.java
index 62af04f2c3..52bb8a94cd 100644
--- a/server/integration-test/src/test/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepositoryTest.java
+++ b/server/integration-test/src/test/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepositoryTest.java
@@ -66,7 +66,7 @@ public void clearAll() {
}
@Test
- public void findByTargetTaskIdIn_entityExists_returnNotEmpty() {
+ public void updateStatusByIdIn_entityExists_returnNotEmpty() {
ServiceTaskInstanceEntity entity = createEntity();
entity.setStatus(FlowNodeStatus.CANCELLED);
entity = this.repository.save(entity);
@@ -76,6 +76,16 @@ public void findByTargetTaskIdIn_entityExists_returnNotEmpty() {
Assert.assertEquals(entity, optional.get());
}
+ @Test
+ public void findByIdInAndStatus_entityExists_returnNotNull() {
+ ServiceTaskInstanceEntity entity = createEntity();
+ entity.setStatus(FlowNodeStatus.CANCELLED);
+ entity = this.repository.save(entity);
+ List actual = this.repository.findByIdInAndStatus(
+ Collections.singletonList(entity.getId()), FlowNodeStatus.CANCELLED);
+ Assert.assertEquals(Collections.singletonList(entity), actual);
+ }
+
@Test
public void findByStatus_entityExists_returnNotEmpty() {
ServiceTaskInstanceEntity entity = createEntity();
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepository.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepository.java
index 7476fb224d..763d3039ac 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepository.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepository.java
@@ -21,9 +21,11 @@
import java.util.Set;
import java.util.function.Function;
+import javax.persistence.LockModeType;
import javax.transaction.Transactional;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
+import org.springframework.data.jpa.repository.Lock;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
@@ -62,6 +64,10 @@ public interface ServiceTaskInstanceRepository extends OdcJpaRepository findByFlowInstanceIdIn(Set flowInstanceIds);
+ @Transactional
+ @Lock(value = LockModeType.PESSIMISTIC_WRITE)
+ List findByIdInAndStatus(Collection ids, FlowNodeStatus status);
+
@Transactional
@Query(value = "update flow_instance_node_task set status=:#{#status.name()} where id in (:ids)",
nativeQuery = true)
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java
index 6477ba3c7c..a7ceecda4c 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java
@@ -25,6 +25,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
+import org.springframework.transaction.support.TransactionTemplate;
import com.oceanbase.odc.core.shared.constant.FlowStatus;
import com.oceanbase.odc.metadb.flow.FlowInstanceRepository;
@@ -52,6 +53,8 @@ public class FlowSchedules {
private static final Integer OB_MAX_IN_SIZE = 2000;
@Autowired
+ private TransactionTemplate transactionTemplate;
+ @Autowired
private FlowTaskProperties flowTaskProperties;
@Autowired
private TaskRepository taskRepository;
@@ -95,28 +98,37 @@ public void cancelHeartbeatTimeoutFlow() {
* 1. heartbeat timeout 2. flow task instance is executing
*
*/
- List candidates = taskInstanceEntities.stream()
+ cancelFlowTaskInstanceAndFlowInstance(taskInstanceEntities.stream()
.filter(e -> heartbeatTimeoutTaskIds.contains(e.getTargetTaskId()))
- .collect(Collectors.toList());
- List candidateIds = candidates.stream()
- .map(ServiceTaskInstanceEntity::getId).distinct().collect(Collectors.toList());
- List flowInstIds = candidates.stream().map(ServiceTaskInstanceEntity::getFlowInstanceId)
- .distinct().collect(Collectors.toList());
- log.info("Find heartbeat timeout flow task instance, timeoutSeconds={}, earliestHeartbeatTime={}, "
- + "flowTaskInstIds={}, flowInstIds={}", timeoutSeconds, timeoutBound, candidateIds, flowInstIds);
- List executeResult = DBSchemaAccessorUtil.partitionFind(flowInstIds,
- OB_MAX_IN_SIZE, ids -> Collections.singletonList(
- flowInstanceRepository.updateStatusByIds(ids, FlowStatus.CANCELLED)));
- log.info("Update flow instance's status succeed, affectRows={}, flowInstIds={}", executeResult,
- flowInstIds);
- executeResult = DBSchemaAccessorUtil.partitionFind(candidateIds,
- OB_MAX_IN_SIZE, ids -> Collections.singletonList(
- serviceTaskInstanceRepository.updateStatusByIdIn(ids, FlowNodeStatus.CANCELLED)));
- log.info("Update flow task instance's status succeed, affectRows={}, "
- + "flowTaskInstIds={}", executeResult, candidateIds);
+ .map(ServiceTaskInstanceEntity::getId).distinct().collect(Collectors.toList()));
} catch (Exception e) {
log.warn("Failed to sync flow instance's status", e);
}
}
+ private void cancelFlowTaskInstanceAndFlowInstance(List candidateFlowTaskInstanceIds) {
+ this.transactionTemplate.executeWithoutResult(tx -> {
+ try {
+ List candidates = serviceTaskInstanceRepository
+ .findByIdInAndStatus(candidateFlowTaskInstanceIds, FlowNodeStatus.EXECUTING);
+ List candidateIds = candidates.stream()
+ .map(ServiceTaskInstanceEntity::getId).distinct().collect(Collectors.toList());
+ List result = DBSchemaAccessorUtil.partitionFind(candidateIds,
+ OB_MAX_IN_SIZE, ids -> Collections.singletonList(
+ serviceTaskInstanceRepository.updateStatusByIdIn(ids, FlowNodeStatus.CANCELLED)));
+ log.info("Update flow task instance status succeed, affectRows={}, flowTaskInstIds={}",
+ result, candidateIds);
+ List flowInstIds = candidates.stream().map(ServiceTaskInstanceEntity::getFlowInstanceId)
+ .distinct().collect(Collectors.toList());
+ result = DBSchemaAccessorUtil.partitionFind(flowInstIds,
+ OB_MAX_IN_SIZE, ids -> Collections.singletonList(
+ flowInstanceRepository.updateStatusByIds(ids, FlowStatus.CANCELLED)));
+ log.info("Update flow instance status succeed, affectRows={}, flowInstIds={}", result, flowInstIds);
+ } catch (Exception e) {
+ log.warn("Failed to sync flow instance's status", e);
+ tx.setRollbackOnly();
+ }
+ });
+ }
+
}
From e7d38e078340841bb22ccc8127954f824fe51497 Mon Sep 17 00:00:00 2001
From: yh263208
Date: Thu, 23 May 2024 14:43:52 +0800
Subject: [PATCH 16/17] change default heartbeat timeout to 180s
---
.../odc/service/flow/task/model/FlowTaskProperties.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/FlowTaskProperties.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/FlowTaskProperties.java
index 0550f49287..1fe10cd3bb 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/FlowTaskProperties.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/FlowTaskProperties.java
@@ -48,7 +48,7 @@ public class FlowTaskProperties {
@Value("${odc.task.async.index-change-max-timeout-millis:432000000}")
private long indexChangeMaxTimeoutMillisecond;
- @Value("${odc.flow.task.heartbeat-timeout-seconds:15}")
+ @Value("${odc.flow.task.heartbeat-timeout-seconds:180}")
private long heartbeatTimeoutSeconds;
}
From 5840b53a309087d89025a3c0d03485e282284709 Mon Sep 17 00:00:00 2001
From: yh263208
Date: Thu, 23 May 2024 20:20:34 +0800
Subject: [PATCH 17/17] response to cr comment
---
.../java/com/oceanbase/odc/service/flow/FlowSchedules.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java
index a7ceecda4c..889cae6a43 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java
@@ -115,14 +115,14 @@ private void cancelFlowTaskInstanceAndFlowInstance(List candidateFlowTaskI
.map(ServiceTaskInstanceEntity::getId).distinct().collect(Collectors.toList());
List result = DBSchemaAccessorUtil.partitionFind(candidateIds,
OB_MAX_IN_SIZE, ids -> Collections.singletonList(
- serviceTaskInstanceRepository.updateStatusByIdIn(ids, FlowNodeStatus.CANCELLED)));
+ serviceTaskInstanceRepository.updateStatusByIdIn(ids, FlowNodeStatus.FAILED)));
log.info("Update flow task instance status succeed, affectRows={}, flowTaskInstIds={}",
result, candidateIds);
List flowInstIds = candidates.stream().map(ServiceTaskInstanceEntity::getFlowInstanceId)
.distinct().collect(Collectors.toList());
result = DBSchemaAccessorUtil.partitionFind(flowInstIds,
OB_MAX_IN_SIZE, ids -> Collections.singletonList(
- flowInstanceRepository.updateStatusByIds(ids, FlowStatus.CANCELLED)));
+ flowInstanceRepository.updateStatusByIds(ids, FlowStatus.EXECUTION_FAILED)));
log.info("Update flow instance status succeed, affectRows={}, flowInstIds={}", result, flowInstIds);
} catch (Exception e) {
log.warn("Failed to sync flow instance's status", e);