diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepositoryTest.java b/server/integration-test/src/test/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepositoryTest.java index 6b87761a70..52bb8a94cd 100644 --- a/server/integration-test/src/test/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepositoryTest.java +++ b/server/integration-test/src/test/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepositoryTest.java @@ -65,6 +65,35 @@ public void clearAll() { repository.deleteAll(); } + @Test + public void updateStatusByIdIn_entityExists_returnNotEmpty() { + ServiceTaskInstanceEntity entity = createEntity(); + entity.setStatus(FlowNodeStatus.CANCELLED); + entity = this.repository.save(entity); + this.repository.updateStatusByIdIn(Collections.singletonList(entity.getId()), FlowNodeStatus.FAILED); + Optional optional = this.repository.findById(entity.getId()); + entity.setStatus(FlowNodeStatus.FAILED); + Assert.assertEquals(entity, optional.get()); + } + + @Test + public void findByIdInAndStatus_entityExists_returnNotNull() { + ServiceTaskInstanceEntity entity = createEntity(); + entity.setStatus(FlowNodeStatus.CANCELLED); + entity = this.repository.save(entity); + List actual = this.repository.findByIdInAndStatus( + Collections.singletonList(entity.getId()), FlowNodeStatus.CANCELLED); + Assert.assertEquals(Collections.singletonList(entity), actual); + } + + @Test + public void findByStatus_entityExists_returnNotEmpty() { + ServiceTaskInstanceEntity entity = createEntity(); + entity = this.repository.save(entity); + List actual = this.repository.findByStatus(entity.getStatus()); + Assert.assertEquals(Collections.singletonList(entity), actual); + } + @Test public void testSaveSerivceTaskInstanceEntity() { ServiceTaskInstanceEntity entity = createEntity(); diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/metadb/task/TaskRepositoryTest.java b/server/integration-test/src/test/java/com/oceanbase/odc/metadb/task/TaskRepositoryTest.java new file mode 100644 index 0000000000..53415cbd60 --- /dev/null +++ b/server/integration-test/src/test/java/com/oceanbase/odc/metadb/task/TaskRepositoryTest.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.metadb.task; + +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Optional; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; + +import com.oceanbase.odc.ServiceTestEnv; +import com.oceanbase.odc.test.tool.TestRandom; + +public class TaskRepositoryTest extends ServiceTestEnv { + + @Autowired + private TaskRepository taskRepository; + + @Before + public void setUp() { + this.taskRepository.deleteAll(); + } + + @Test + public void updateLastHeartbeatTimeById_updateSucceed_returnNotNull() { + TaskEntity taskEntity = TestRandom.nextObject(TaskEntity.class); + taskEntity.setId(null); + taskEntity.setLastHeartbeatTime(null); + taskEntity = this.taskRepository.save(taskEntity); + this.taskRepository.updateLastHeartbeatTimeById(taskEntity.getId()); + Optional optional = this.taskRepository.findById(taskEntity.getId()); + Assert.assertNotNull(optional.get().getLastHeartbeatTime()); + } + + @Test + public void findAllByLastHeartbeatTimeBeforeAndIdIn_findBeforeNow_returnNotNull() { + TaskEntity taskEntity = TestRandom.nextObject(TaskEntity.class); + taskEntity.setId(null); + taskEntity.setLastHeartbeatTime(null); + taskEntity = this.taskRepository.save(taskEntity); + this.taskRepository.updateLastHeartbeatTimeById(taskEntity.getId()); + Optional optional = this.taskRepository.findById(taskEntity.getId()); + Date heartbeatTime = new Date(optional.get().getLastHeartbeatTime().getTime() + 1); + List actual = this.taskRepository.findAllByLastHeartbeatTimeBeforeAndIdIn( + heartbeatTime, Collections.singletonList(taskEntity.getId())); + Assert.assertFalse(actual.isEmpty()); + } + + @Test + public void findAllByLastHeartbeatTimeBeforeAndIdIn_findBeforeLastHeartbeatTime_returnEmpty() { + TaskEntity taskEntity = TestRandom.nextObject(TaskEntity.class); + taskEntity.setId(null); + taskEntity.setLastHeartbeatTime(null); + taskEntity = this.taskRepository.save(taskEntity); + this.taskRepository.updateLastHeartbeatTimeById(taskEntity.getId()); + Optional optional = this.taskRepository.findById(taskEntity.getId()); + List actual = this.taskRepository.findAllByLastHeartbeatTimeBeforeAndIdIn( + optional.get().getLastHeartbeatTime(), Collections.singletonList(taskEntity.getId())); + Assert.assertTrue(actual.isEmpty()); + } + +} diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/tool/TestFlowableDelegateImpl.java b/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/tool/TestFlowableDelegateImpl.java index cfb0617a1e..402518c519 100644 --- a/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/tool/TestFlowableDelegateImpl.java +++ b/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/tool/TestFlowableDelegateImpl.java @@ -15,9 +15,14 @@ */ package com.oceanbase.odc.service.flow.tool; +import java.util.Collections; +import java.util.List; + import org.flowable.engine.delegate.DelegateExecution; +import org.flowable.engine.delegate.ExecutionListener; import com.oceanbase.odc.core.flow.BaseFlowableDelegate; +import com.oceanbase.odc.service.flow.listener.ServiceTaskExecutingCompleteListener; import lombok.extern.slf4j.Slf4j; @@ -29,4 +34,9 @@ protected void run(DelegateExecution execution) { log.info("Service task run, currentActivityId={}, processInstanceId={}", execution.getCurrentActivityId(), execution.getProcessInstanceId()); } + + @Override + protected List> getExecutionListenerClasses() { + return Collections.singletonList(ServiceTaskExecutingCompleteListener.class); + } } diff --git a/server/odc-core/src/main/java/com/oceanbase/odc/core/flow/BaseFlowableDelegate.java b/server/odc-core/src/main/java/com/oceanbase/odc/core/flow/BaseFlowableDelegate.java index da27ee6dc6..be3bb10c07 100644 --- a/server/odc-core/src/main/java/com/oceanbase/odc/core/flow/BaseFlowableDelegate.java +++ b/server/odc-core/src/main/java/com/oceanbase/odc/core/flow/BaseFlowableDelegate.java @@ -15,7 +15,10 @@ */ package com.oceanbase.odc.core.flow; +import java.util.List; + import org.flowable.engine.delegate.DelegateExecution; +import org.flowable.engine.delegate.ExecutionListener; import org.flowable.engine.delegate.JavaDelegate; import lombok.extern.slf4j.Slf4j; @@ -36,6 +39,8 @@ public abstract class BaseFlowableDelegate implements JavaDelegate { */ protected abstract void run(DelegateExecution execution) throws Exception; + protected abstract List> getExecutionListenerClasses(); + @Override public void execute(DelegateExecution execution) { try { diff --git a/server/odc-migrate/src/main/resources/migrate/common/V_4_3_0_6__add_flow_task_heartbeat.sql b/server/odc-migrate/src/main/resources/migrate/common/V_4_3_0_6__add_flow_task_heartbeat.sql new file mode 100644 index 0000000000..66d3a5228e --- /dev/null +++ b/server/odc-migrate/src/main/resources/migrate/common/V_4_3_0_6__add_flow_task_heartbeat.sql @@ -0,0 +1 @@ +alter table `task_task` add column `last_heartbeat_time` datetime default null comment 'Last heartbeat time'; \ No newline at end of file diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepository.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepository.java index a76673aa42..763d3039ac 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepository.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepository.java @@ -21,9 +21,11 @@ import java.util.Set; import java.util.function.Function; +import javax.persistence.LockModeType; import javax.transaction.Transactional; import org.springframework.data.jpa.repository.JpaSpecificationExecutor; +import org.springframework.data.jpa.repository.Lock; import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; @@ -62,6 +64,18 @@ public interface ServiceTaskInstanceRepository extends OdcJpaRepository findByFlowInstanceIdIn(Set flowInstanceIds); + @Transactional + @Lock(value = LockModeType.PESSIMISTIC_WRITE) + List findByIdInAndStatus(Collection ids, FlowNodeStatus status); + + @Transactional + @Query(value = "update flow_instance_node_task set status=:#{#status.name()} where id in (:ids)", + nativeQuery = true) + @Modifying + int updateStatusByIdIn(@Param("ids") List ids, @Param("status") FlowNodeStatus status); + + List findByStatus(FlowNodeStatus status); + @Query(value = "select na.* from flow_instance_node_task as na inner join flow_instance_node as n on na.id=n.instance_id " + "where n.instance_type=:#{#instanceType.name()} and n.activity_id=:activityId and n.flow_instance_id=:flowInstanceId", nativeQuery = true) diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskEntity.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskEntity.java index 367ba2bbb6..58ab56113e 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskEntity.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskEntity.java @@ -166,4 +166,7 @@ public class TaskEntity { @Column(name = "job_id") private Long jobId; + @Column(name = "last_heartbeat_time") + private Date lastHeartbeatTime; + } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskRepository.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskRepository.java index 424a13c929..f9daf8998e 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskRepository.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/task/TaskRepository.java @@ -15,6 +15,7 @@ */ package com.oceanbase.odc.metadb.task; +import java.util.Date; import java.util.List; import java.util.Optional; import java.util.Set; @@ -43,6 +44,13 @@ public interface TaskRepository extends JpaRepository, JpaSpec @Modifying int update(@Param("param") TaskEntity entity); + @Transactional + @Query(value = "update task_task set last_heartbeat_time=now() where id=:id", nativeQuery = true) + @Modifying + int updateLastHeartbeatTimeById(@Param("id") Long id); + + List findAllByLastHeartbeatTimeBeforeAndIdIn(Date lastHeartbeatTime, List ids); + @Transactional @Query("update TaskEntity set parametersJson=:#{#param.parametersJson} where id=:#{#param.id}") @Modifying diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java new file mode 100644 index 0000000000..889cae6a43 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowSchedules.java @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.flow; + +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.commons.collections4.CollectionUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.transaction.support.TransactionTemplate; + +import com.oceanbase.odc.core.shared.constant.FlowStatus; +import com.oceanbase.odc.metadb.flow.FlowInstanceRepository; +import com.oceanbase.odc.metadb.flow.ServiceTaskInstanceEntity; +import com.oceanbase.odc.metadb.flow.ServiceTaskInstanceRepository; +import com.oceanbase.odc.metadb.task.TaskEntity; +import com.oceanbase.odc.metadb.task.TaskRepository; +import com.oceanbase.odc.service.flow.model.FlowNodeStatus; +import com.oceanbase.odc.service.flow.task.model.FlowTaskProperties; +import com.oceanbase.odc.service.flow.task.model.RuntimeTaskConstants; +import com.oceanbase.tools.dbbrowser.util.DBSchemaAccessorUtil; + +import lombok.extern.slf4j.Slf4j; + +/** + * {@link FlowSchedules} + * + * @author yh263208 + * @date 2024-05-22 15:33 + * @since ODC-release_4.3.0 + */ +@Slf4j +@Component +public class FlowSchedules { + + private static final Integer OB_MAX_IN_SIZE = 2000; + @Autowired + private TransactionTemplate transactionTemplate; + @Autowired + private FlowTaskProperties flowTaskProperties; + @Autowired + private TaskRepository taskRepository; + @Autowired + private FlowInstanceRepository flowInstanceRepository; + @Autowired + private ServiceTaskInstanceRepository serviceTaskInstanceRepository; + + @Scheduled(fixedDelayString = "${odc.flow.task.heartbeat-timeout-check-interval-millis:15000}") + public void cancelHeartbeatTimeoutFlow() { + long timeoutSeconds = this.flowTaskProperties.getHeartbeatTimeoutSeconds(); + if (timeoutSeconds <= 0) { + return; + } + long minTimeoutSeconds = RuntimeTaskConstants.DEFAULT_TASK_CHECK_INTERVAL_SECONDS * 3; + if (timeoutSeconds < minTimeoutSeconds) { + timeoutSeconds = minTimeoutSeconds; + } + try { + List taskInstanceEntities = this.serviceTaskInstanceRepository + .findByStatus(FlowNodeStatus.EXECUTING).stream() + .filter(e -> e.getTargetTaskId() != null).collect(Collectors.toList()); + if (CollectionUtils.isEmpty(taskInstanceEntities)) { + return; + } + List taskIds = taskInstanceEntities.stream() + .map(ServiceTaskInstanceEntity::getTargetTaskId).distinct().collect(Collectors.toList()); + Date timeoutBound = new Date(System.currentTimeMillis() - timeoutSeconds * 1000); + List heartbeatTimeoutTasks = DBSchemaAccessorUtil.partitionFind(taskIds, + OB_MAX_IN_SIZE, ids -> taskRepository.findAllByLastHeartbeatTimeBeforeAndIdIn(timeoutBound, ids)); + if (CollectionUtils.isEmpty(heartbeatTimeoutTasks)) { + return; + } + Set heartbeatTimeoutTaskIds = heartbeatTimeoutTasks.stream() + .map(TaskEntity::getId).collect(Collectors.toSet()); + log.info("Find the task with heartbeat timeout, timeoutSeconds={}, earliestHeartbeatTime={}, " + + "heartbeatTimeoutTaskIds={}", timeoutSeconds, timeoutBound, heartbeatTimeoutTaskIds); + /** + * we just find such flow task instances: + *

+ * 1. heartbeat timeout 2. flow task instance is executing + *

+ */ + cancelFlowTaskInstanceAndFlowInstance(taskInstanceEntities.stream() + .filter(e -> heartbeatTimeoutTaskIds.contains(e.getTargetTaskId())) + .map(ServiceTaskInstanceEntity::getId).distinct().collect(Collectors.toList())); + } catch (Exception e) { + log.warn("Failed to sync flow instance's status", e); + } + } + + private void cancelFlowTaskInstanceAndFlowInstance(List candidateFlowTaskInstanceIds) { + this.transactionTemplate.executeWithoutResult(tx -> { + try { + List candidates = serviceTaskInstanceRepository + .findByIdInAndStatus(candidateFlowTaskInstanceIds, FlowNodeStatus.EXECUTING); + List candidateIds = candidates.stream() + .map(ServiceTaskInstanceEntity::getId).distinct().collect(Collectors.toList()); + List result = DBSchemaAccessorUtil.partitionFind(candidateIds, + OB_MAX_IN_SIZE, ids -> Collections.singletonList( + serviceTaskInstanceRepository.updateStatusByIdIn(ids, FlowNodeStatus.FAILED))); + log.info("Update flow task instance status succeed, affectRows={}, flowTaskInstIds={}", + result, candidateIds); + List flowInstIds = candidates.stream().map(ServiceTaskInstanceEntity::getFlowInstanceId) + .distinct().collect(Collectors.toList()); + result = DBSchemaAccessorUtil.partitionFind(flowInstIds, + OB_MAX_IN_SIZE, ids -> Collections.singletonList( + flowInstanceRepository.updateStatusByIds(ids, FlowStatus.EXECUTION_FAILED))); + log.info("Update flow instance status succeed, affectRows={}, flowInstIds={}", result, flowInstIds); + } catch (Exception e) { + log.warn("Failed to sync flow instance's status", e); + tx.setRollbackOnly(); + } + }); + } + +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java index 2d92f725fe..2af2bb8ea0 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java @@ -778,19 +778,20 @@ private URL generatePresignedUrl(String objectName, Long expirationSeconds) thro } private void setDownloadUrlsIfNecessary(Long taskId, List results) { - if (cloudObjectStorageService.supported()) { - for (FlowTaskResult result : results) { - TaskDownloadUrls urls = databaseChangeOssUrlCache.get(taskId); - if (result instanceof AbstractFlowTaskResult) { - ((AbstractFlowTaskResult) result) - .setFullLogDownloadUrl(urls.getLogDownloadUrl()); - } - if (result instanceof DatabaseChangeResult) { - ((DatabaseChangeResult) result).setZipFileDownloadUrl(urls.getDatabaseChangeZipFileDownloadUrl()); - if (Objects.nonNull(((DatabaseChangeResult) result).getRollbackPlanResult())) { - ((DatabaseChangeResult) result).getRollbackPlanResult() - .setResultFileDownloadUrl(urls.getRollBackPlanResultFileDownloadUrl()); - } + if (!cloudObjectStorageService.supported()) { + return; + } + for (FlowTaskResult result : results) { + TaskDownloadUrls urls = databaseChangeOssUrlCache.get(taskId); + if (result instanceof AbstractFlowTaskResult) { + ((AbstractFlowTaskResult) result) + .setFullLogDownloadUrl(urls.getLogDownloadUrl()); + } + if (result instanceof DatabaseChangeResult) { + ((DatabaseChangeResult) result).setZipFileDownloadUrl(urls.getDatabaseChangeZipFileDownloadUrl()); + if (Objects.nonNull(((DatabaseChangeResult) result).getRollbackPlanResult())) { + ((DatabaseChangeResult) result).getRollbackPlanResult() + .setResultFileDownloadUrl(urls.getRollBackPlanResultFileDownloadUrl()); } } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/FlowInstanceConfigurer.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/FlowInstanceConfigurer.java index fcc3e7e1a6..cc4f5b5153 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/FlowInstanceConfigurer.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/FlowInstanceConfigurer.java @@ -51,7 +51,6 @@ import com.oceanbase.odc.service.flow.listener.BaseTaskBindUserTaskListener; import com.oceanbase.odc.service.flow.listener.BaseTaskExecutingCompleteListener; import com.oceanbase.odc.service.flow.listener.GatewayExecutingCompleteListener; -import com.oceanbase.odc.service.flow.listener.ServiceTaskExecutingCompleteListener; import com.oceanbase.odc.service.flow.listener.ServiceTaskPendingExpiredListener; import com.oceanbase.odc.service.flow.listener.ServiceTaskPendingListener; import com.oceanbase.odc.service.flow.model.ExecutionStrategyConfig; @@ -144,7 +143,6 @@ public FlowInstanceConfigurer next(@NonNull FlowApprovalInstance nextNode) { public FlowInstanceConfigurer next(@NonNull FlowTaskInstance nextNode) { return next(nextNode, serviceTaskBuilder -> { - serviceTaskBuilder.addExecutionListener(ServiceTaskExecutingCompleteListener.class); serviceTaskBuilder.setAsynchronous(true); }, userTaskBuilder -> { userTaskBuilder.addExecutionListener(ServiceTaskPendingListener.class); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/OdcFlowInstanceConfigurer.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/OdcFlowInstanceConfigurer.java index 3b66f27195..74003e162b 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/OdcFlowInstanceConfigurer.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/OdcFlowInstanceConfigurer.java @@ -24,7 +24,6 @@ import com.oceanbase.odc.service.flow.FlowableAdaptor; import com.oceanbase.odc.service.flow.listener.PreCheckServiceTaskFailedListener; import com.oceanbase.odc.service.flow.listener.ServiceTaskCancelledListener; -import com.oceanbase.odc.service.flow.listener.ServiceTaskExecutingCompleteListener; import com.oceanbase.odc.service.flow.listener.ServiceTaskExpiredListener; import com.oceanbase.odc.service.flow.listener.ServiceTaskFailedListener; import com.oceanbase.odc.service.flow.listener.ServiceTaskPendingExpiredListener; @@ -85,7 +84,6 @@ public FlowInstanceConfigurer next(@NonNull FlowTaskInstance nextNode) { }); } return next(nextNode, serviceTaskBuilder -> { - serviceTaskBuilder.addExecutionListener(ServiceTaskExecutingCompleteListener.class); serviceTaskBuilder.setAsynchronous(true); ErrorBoundaryEventBuilder cancelErrBuilder = setHandleableError(nextNode, serviceTaskBuilder, ErrorCodes.FlowTaskInstanceCancelled); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/listener/ServiceTaskExecutingCompleteListener.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/listener/ServiceTaskExecutingCompleteListener.java index cb480977e2..e490701509 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/listener/ServiceTaskExecutingCompleteListener.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/listener/ServiceTaskExecutingCompleteListener.java @@ -55,7 +55,7 @@ protected FlowNodeStatus doModifyStatusOnStart(FlowTaskInstance target) { @Override protected FlowNodeStatus doModifyStatusOnEnd(FlowTaskInstance target) { - return FlowNodeStatus.EXECUTING; + return internalModify(target, FlowNodeStatus.COMPLETED); } @Override diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java index b77eb3b2d2..98c5f3563e 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java @@ -27,6 +27,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import org.apache.commons.collections4.CollectionUtils; import org.flowable.engine.delegate.DelegateExecution; import org.springframework.beans.factory.annotation.Autowired; @@ -77,8 +78,6 @@ @Slf4j public abstract class BaseODCFlowTaskDelegate extends BaseRuntimeFlowableDelegate { - @Autowired - private TaskService taskService; @Autowired protected HostProperties hostProperties; @Autowired @@ -119,6 +118,7 @@ private void initMonitorExecutor() { int interval = RuntimeTaskConstants.DEFAULT_TASK_CHECK_INTERVAL_SECONDS; scheduleExecutor.scheduleAtFixedRate(() -> { try { + updateHeartbeatTime(); if (taskLatch.getCount() > 0) { onProgressUpdate(taskId, taskService); } @@ -358,6 +358,9 @@ private void doSetDownloadLogUrl() throws IOException, NotFoundException { } List flowTaskResults = flowTaskInstanceService.getTaskResultFromEntity(taskEntity, false); + if (CollectionUtils.isEmpty(flowTaskResults)) { + return; + } Verify.singleton(flowTaskResults, "flowTaskResults"); if (flowTaskResults.get(0) instanceof AbstractFlowTaskResult) { FlowTaskResult flowTaskResult = flowTaskResults.get(0); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseRuntimeFlowableDelegate.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseRuntimeFlowableDelegate.java index 924095add7..47796784a9 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseRuntimeFlowableDelegate.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseRuntimeFlowableDelegate.java @@ -15,6 +15,8 @@ */ package com.oceanbase.odc.service.flow.task; +import java.util.Collections; +import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.concurrent.Callable; @@ -27,6 +29,7 @@ import javax.annotation.PostConstruct; import org.flowable.engine.delegate.DelegateExecution; +import org.flowable.engine.delegate.ExecutionListener; import org.springframework.beans.factory.annotation.Autowired; import com.oceanbase.odc.common.event.EventPublisher; @@ -44,7 +47,9 @@ import com.oceanbase.odc.service.flow.exception.ServiceTaskError; import com.oceanbase.odc.service.flow.instance.FlowTaskInstance; import com.oceanbase.odc.service.flow.listener.ActiveTaskStatisticsListener; +import com.oceanbase.odc.service.flow.listener.ServiceTaskExecutingCompleteListener; import com.oceanbase.odc.service.flow.model.ExecutionStrategyConfig; +import com.oceanbase.odc.service.task.TaskService; import lombok.Getter; import lombok.NonNull; @@ -66,6 +71,8 @@ public abstract class BaseRuntimeFlowableDelegate extends BaseFlowableDelegat @Getter private Long targetTaskInstanceId; @Getter + private Long targetTaskId; + @Getter private TaskType taskType; @Getter private Long flowInstanceId; @@ -76,6 +83,8 @@ public abstract class BaseRuntimeFlowableDelegate extends BaseFlowableDelegat @Autowired private EventPublisher eventPublisher; @Autowired + protected TaskService taskService; + @Autowired private FlowInstanceRepository flowInstanceRepository; private volatile T returnObject = null; private volatile Exception thrown = null; @@ -181,6 +190,7 @@ private void initTargetTaskInstanceId(DelegateExecution execution) { this.targetTaskInstanceId = flowTaskInstance.getId(); this.taskType = flowTaskInstance.getTaskType(); + this.targetTaskId = flowTaskInstance.getTargetTaskId(); this.strategyConfig = flowTaskInstance.getStrategyConfig(); flowTaskInstance.dealloc(); } @@ -194,6 +204,19 @@ public void bindToFlowTaskInstance(@NonNull FlowTaskInstance taskInstance) { } } + public void updateHeartbeatTime() { + try { + this.taskService.updateHeartbeatTime(getTargetTaskId()); + } catch (Exception e) { + log.warn("Failed to update heartbeat time, taskId={}", getTargetTaskId(), e); + } + } + + @Override + public List> getExecutionListenerClasses() { + return Collections.singletonList(ServiceTaskExecutingCompleteListener.class); + } + protected void updateFlowInstanceStatus(@NonNull FlowStatus flowStatus) { this.retryExecutor.run(() -> { try { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/CreateExternalApprovalTask.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/CreateExternalApprovalTask.java index 4e20ae393b..932e688e6c 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/CreateExternalApprovalTask.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/CreateExternalApprovalTask.java @@ -16,9 +16,12 @@ package com.oceanbase.odc.service.flow.task; +import java.util.Collections; +import java.util.List; import java.util.Optional; import org.flowable.engine.delegate.DelegateExecution; +import org.flowable.engine.delegate.ExecutionListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.i18n.LocaleContextHolder; @@ -33,6 +36,7 @@ import com.oceanbase.odc.metadb.regulation.risklevel.RiskLevelRepository; import com.oceanbase.odc.service.flow.FlowableAdaptor; import com.oceanbase.odc.service.flow.instance.FlowApprovalInstance; +import com.oceanbase.odc.service.flow.listener.ServiceTaskExecutingCompleteListener; import com.oceanbase.odc.service.flow.model.FlowNodeStatus; import com.oceanbase.odc.service.flow.task.model.RuntimeTaskConstants; import com.oceanbase.odc.service.flow.util.FlowTaskUtil; @@ -140,4 +144,9 @@ private FlowApprovalInstance getFlowApprovalInstance(DelegateExecution execution return instanceOpt.get(); } + @Override + public List> getExecutionListenerClasses() { + return Collections.singletonList(ServiceTaskExecutingCompleteListener.class); + } + } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/FlowTaskSubmitter.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/FlowTaskSubmitter.java index f057e2c198..bef000a7ec 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/FlowTaskSubmitter.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/FlowTaskSubmitter.java @@ -71,15 +71,22 @@ public void execute(DelegateExecution execution) { CustomDelegateExecution executionFacade = new CustomDelegateExecution(execution); long flowInstanceId = FlowTaskUtil.getFlowInstanceId(executionFacade); String activityId = executionFacade.getCurrentActivityId(); - List> defs = - FlowUtil.getBoundaryEventDefinitions(execution.getProcessDefinitionId(), - activityId, ErrorEventDefinition.class); + List> defs = FlowUtil.getBoundaryEventDefinitions( + execution.getProcessDefinitionId(), activityId, ErrorEventDefinition.class); threadPoolTaskExecutor.submit(() -> { FlowTaskInstance flowTaskInstance = null; try { flowTaskInstance = getFlowTaskInstance(flowInstanceId, activityId); - getDelegateInstance(flowTaskInstance).execute(executionFacade); - updateFlowTaskInstance(flowTaskInstance.getId(), FlowNodeStatus.COMPLETED); + BaseRuntimeFlowableDelegate delegate = getDelegateInstance(flowTaskInstance); + delegate.updateHeartbeatTime(); + List> list = delegate.getExecutionListenerClasses(); + if (CollectionUtils.isNotEmpty(list)) { + list.forEach(c -> doCallListener(FlowConstants.EXECUTION_START_EVENT_NAME, executionFacade, c)); + } + delegate.execute(executionFacade); + if (CollectionUtils.isNotEmpty(list)) { + list.forEach(c -> doCallListener(FlowConstants.EXECUTION_END_EVENT_NAME, executionFacade, c)); + } flowTaskCallBackApprovalService.approval(executionFacade.getProcessInstanceId(), flowTaskInstance.getId(), executionFacade.getFutureVariable()); } catch (Exception e) { @@ -97,8 +104,7 @@ public void execute(DelegateExecution execution) { private void updateFlowTaskInstance(long flowTaskInstanceId, FlowNodeStatus flowNodeStatus) { try { - int affectRows = - serviceTaskRepository.updateStatusById(flowTaskInstanceId, flowNodeStatus); + int affectRows = serviceTaskRepository.updateStatusById(flowTaskInstanceId, flowNodeStatus); log.info("Modify node instance status successfully, instanceId={}, flowNodeStatus={}, affectRows={}", flowTaskInstanceId, flowNodeStatus, affectRows); } catch (Exception ex) { @@ -106,11 +112,10 @@ private void updateFlowTaskInstance(long flowTaskInstanceId, FlowNodeStatus flow } } - private void handleException(CustomDelegateExecution executionFacade, FlowTaskInstance flowTaskInstance, - Exception e, - List> defs) { - String processDefinitionId = executionFacade.getProcessDefinitionId(); - String activityId = executionFacade.getCurrentActivityId(); + private void handleException(CustomDelegateExecution execution, FlowTaskInstance flowTaskInstance, + Exception e, List> defs) { + String processDefinitionId = execution.getProcessDefinitionId(); + String activityId = execution.getCurrentActivityId(); if (defs.isEmpty()) { log.warn("No error boundary is defined to handle events, processInstanceId={}, activityId={}", processDefinitionId, activityId); @@ -121,13 +126,12 @@ private void handleException(CustomDelegateExecution executionFacade, FlowTaskIn ErrorEventDefinition eed = eventDefinition.getEventDefinition(); String acceptErrorCode = eed.getErrorCode(); if (Objects.equals(acceptErrorCode, targetErrorCode)) { - flowTaskCallBackApprovalService.reject(executionFacade.getProcessInstanceId(), - flowTaskInstance.getId(), executionFacade.getFutureVariable()); - if (CollectionUtils.isNotEmpty(eventDefinition.getFlowableListeners())) { - callListener(FlowConstants.EXECUTION_START_EVENT_NAME, executionFacade, - eventDefinition.getFlowableListeners()); - callListener(FlowConstants.EXECUTION_END_EVENT_NAME, executionFacade, - eventDefinition.getFlowableListeners()); + flowTaskCallBackApprovalService.reject(execution.getProcessInstanceId(), + flowTaskInstance.getId(), execution.getFutureVariable()); + List listeners = eventDefinition.getFlowableListeners(); + if (CollectionUtils.isNotEmpty(listeners)) { + callListener(FlowConstants.EXECUTION_START_EVENT_NAME, execution, listeners); + callListener(FlowConstants.EXECUTION_END_EVENT_NAME, execution, listeners); } return; } @@ -140,27 +144,34 @@ private void handleException(CustomDelegateExecution executionFacade, FlowTaskIn processDefinitionId, activityId, defs.stream().map(a -> a.getEventDefinition().getErrorCode()).collect(Collectors.toList())); } - flowTaskCallBackApprovalService.approval(executionFacade.getProcessInstanceId(), - flowTaskInstance.getId(), executionFacade.getFutureVariable()); + flowTaskCallBackApprovalService.approval(execution.getProcessInstanceId(), + flowTaskInstance.getId(), execution.getFutureVariable()); } - private void callListener(String eventName, DelegateExecution execution, List flowableListeners) { - flowableListeners.stream().filter(fl -> eventName.equals(fl.getEvent())).forEach(fl -> { + private void callListener(String eventName, DelegateExecution execution, List listeners) { + listeners.stream().filter(fl -> eventName.equals(fl.getEvent())).forEach(fl -> { try { - Class clazz = Class.forName(fl.getImplementation(), false, - Thread.currentThread().getContextClassLoader()); - if (ExecutionListener.class.isAssignableFrom(clazz)) { - ExecutionListener listener = - (ExecutionListener) BeanInjectedClassDelegate.instantiateDelegate(clazz); - execution.setEventName(fl.getEvent()); - listener.notify(execution); - } - } catch (Exception ex) { - log.warn("Call execution listener occur error: ", ex); + doCallListener(fl.getEvent(), execution, + Class.forName(fl.getImplementation(), false, Thread.currentThread().getContextClassLoader())); + } catch (Exception e) { + log.warn("Failed to load execution class, className={}", fl.getImplementation(), e); } }); } + private void doCallListener(String eventName, DelegateExecution execution, Class listenerClass) { + try { + if (ExecutionListener.class.isAssignableFrom(listenerClass)) { + ExecutionListener listener = + (ExecutionListener) BeanInjectedClassDelegate.instantiateDelegate(listenerClass); + execution.setEventName(eventName); + listener.notify(execution); + } + } catch (Exception e) { + log.warn("Failed to call execution listener", e); + } + } + private BaseRuntimeFlowableDelegate getDelegateInstance(FlowTaskInstance flowTaskInstance) throws Exception { OdcRuntimeDelegateMapper mapper = new OdcRuntimeDelegateMapper(); Class> delegateClass = diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MultipleDatabaseChangeRuntimeFlowableTask.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MultipleDatabaseChangeRuntimeFlowableTask.java index cc7b98495e..0b317a04f8 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MultipleDatabaseChangeRuntimeFlowableTask.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MultipleDatabaseChangeRuntimeFlowableTask.java @@ -19,10 +19,10 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.stream.Collectors; import org.flowable.engine.delegate.DelegateExecution; +import org.flowable.engine.delegate.ExecutionListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; @@ -32,6 +32,7 @@ import com.oceanbase.odc.core.shared.constant.TaskErrorStrategy; import com.oceanbase.odc.core.shared.constant.TaskType; import com.oceanbase.odc.metadb.flow.FlowInstanceEntity; +import com.oceanbase.odc.metadb.flow.FlowInstanceRepository; import com.oceanbase.odc.metadb.flow.ServiceTaskInstanceRepository; import com.oceanbase.odc.metadb.task.TaskEntity; import com.oceanbase.odc.service.connection.database.DatabaseService; @@ -42,7 +43,6 @@ import com.oceanbase.odc.service.databasechange.model.DatabaseChangingRecord; import com.oceanbase.odc.service.flow.FlowInstanceService; import com.oceanbase.odc.service.flow.FlowableAdaptor; -import com.oceanbase.odc.service.flow.instance.FlowTaskInstance; import com.oceanbase.odc.service.flow.model.CreateFlowInstanceReq; import com.oceanbase.odc.service.flow.model.FlowInstanceDetailResp; import com.oceanbase.odc.service.flow.model.FlowNodeStatus; @@ -51,7 +51,6 @@ import com.oceanbase.odc.service.flow.task.model.MultipleDatabaseChangeParameters; import com.oceanbase.odc.service.flow.task.model.MultipleDatabaseChangeTaskResult; import com.oceanbase.odc.service.flow.util.FlowTaskUtil; -import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; import com.oceanbase.odc.service.task.TaskService; import lombok.extern.slf4j.Slf4j; @@ -67,25 +66,20 @@ public class MultipleDatabaseChangeRuntimeFlowableTask extends BaseODCFlowTaskDe private volatile boolean isFailure = false; private Integer batchId; - private Integer batchSum; - List> orderedDatabaseIds; - + private List> orderedDatabaseIds; private FlowTaskExecutionStrategy flowTaskExecutionStrategy; - private MultipleDatabaseChangeParameters multipleDatabaseChangeParameters; - @Autowired private FlowInstanceService flowInstanceService; - - @Autowired - private ServiceTaskInstanceRepository serviceTaskInstanceRepository; @Autowired - private AuthenticationFacade authenticationFacade; + private FlowInstanceRepository flowInstanceRepository; @Autowired private DatabaseService databaseService; @Autowired private FlowableAdaptor flowableAdaptor; + @Autowired + private ServiceTaskInstanceRepository serviceTaskInstanceRepository; @Override public boolean cancel(boolean mayInterruptIfRunning, Long taskId, TaskService taskService) { @@ -119,6 +113,8 @@ protected Void start(Long taskId, TaskService taskService, DelegateExecution exe throws InterruptedException { MultipleDatabaseChangeTraceContextHolder.trace(taskId); try { + this.flowInstanceRepository.updateStatusById(getFlowInstanceId(), FlowStatus.EXECUTING); + this.serviceTaskInstanceRepository.updateStatusById(getTargetTaskInstanceId(), FlowNodeStatus.EXECUTING); FlowInstanceDetailResp flowInstanceDetailResp = flowInstanceService.detail(getFlowInstanceId()); this.flowTaskExecutionStrategy = flowInstanceDetailResp.getExecutionStrategy(); TaskEntity detail = taskService.detail(taskId); @@ -131,7 +127,7 @@ protected Void start(Long taskId, TaskService taskService, DelegateExecution exe } else { this.batchId = value; } - log.info("multiple database task start, taskId={}, batchId={}", taskId, + log.info("Multiple database task start, taskId={}, batchId={}", taskId, this.batchId + 1); multipleDatabaseChangeParameters.setBatchId(this.batchId + 1); detail.setParametersJson(JsonUtils.toJson(multipleDatabaseChangeParameters)); @@ -191,7 +187,7 @@ protected Void start(Long taskId, TaskService taskService, DelegateExecution exe } return null; } catch (Exception e) { - log.warn("multiple database task failed, taskId={}, batchId={}", taskId, + log.warn("Multiple database task failed, taskId={}, batchId={}", taskId, this.batchId == null ? null : this.batchId + 1, e); this.isFailure = true; this.isSuccessful = false; @@ -205,7 +201,7 @@ protected Void start(Long taskId, TaskService taskService, DelegateExecution exe protected void onFailure(Long taskId, TaskService taskService) { try { MultipleDatabaseChangeTraceContextHolder.trace(taskId); - log.warn("multiple database task failed, taskId={}, batchId={}", taskId, + log.warn("Multiple database task failed, taskId={}, batchId={}", taskId, this.batchId == null ? null : this.batchId + 1); updateFlowInstanceStatus(FlowStatus.EXECUTION_FAILED); taskService.fail(taskId, 100, generateResult()); @@ -219,14 +215,10 @@ protected void onFailure(Long taskId, TaskService taskService) { protected void onSuccessful(Long taskId, TaskService taskService) { try { MultipleDatabaseChangeTraceContextHolder.trace(taskId); - log.info("multiple database task succeed, taskId={}, batchId={}", taskId, this.batchId + 1); + log.info("Multiple database task succeed, taskId={}, batchId={}", taskId, this.batchId + 1); if (this.batchId == batchSum - 1) { - List list = flowInstanceService.getFlowInstanceByParentId( - getFlowInstanceId()); - boolean allSucceeded = list.stream() - .allMatch( - flowInstanceEntity -> FlowStatus.EXECUTION_SUCCEEDED == flowInstanceEntity.getStatus()); - if (allSucceeded) { + List list = flowInstanceService.getFlowInstanceByParentId(getFlowInstanceId()); + if (list.stream().allMatch(e -> FlowStatus.EXECUTION_SUCCEEDED == e.getStatus())) { updateFlowInstanceStatus(FlowStatus.EXECUTION_SUCCEEDED); } else { updateFlowInstanceStatus(FlowStatus.EXECUTION_FAILED); @@ -236,14 +228,14 @@ protected void onSuccessful(Long taskId, TaskService taskService) { taskService.updateProgress(taskId, (this.batchId + 1) * 100D / this.batchSum); } super.onSuccessful(taskId, taskService); - Optional taskInstanceByActivityId = flowableAdaptor.getTaskInstanceByActivityId( - getActivityId(), getFlowInstanceId()); if (!this.isSuccessful) { - serviceTaskInstanceRepository.updateStatusById(taskInstanceByActivityId.get().getId(), - FlowNodeStatus.FAILED); + this.serviceTaskInstanceRepository.updateStatusById(getTargetTaskInstanceId(), FlowNodeStatus.FAILED); + } else { + this.serviceTaskInstanceRepository.updateStatusById(getTargetTaskInstanceId(), + FlowNodeStatus.COMPLETED); } } catch (Exception e) { - log.warn("multiple database task failed, taskId={}, batchId={}", taskId, + log.warn("Multiple database task failed, taskId={}, batchId={}", taskId, this.batchId == null ? null : this.batchId + 1, e); } finally { MultipleDatabaseChangeTraceContextHolder.clear(); @@ -255,7 +247,7 @@ protected void onTimeout(Long taskId, TaskService taskService) { try { MultipleDatabaseChangeTraceContextHolder.trace(taskId); taskService.fail(taskId, 100, generateResult()); - log.warn("multiple database task timeout, taskId={}, batchId={}", taskId, + log.warn("Multiple database task timeout, taskId={}, batchId={}", taskId, this.batchId == null ? null : this.batchId + 1); } finally { MultipleDatabaseChangeTraceContextHolder.clear(); @@ -280,20 +272,16 @@ private MultipleDatabaseChangeTaskResult generateResult() { flowInstanceDetailResp -> flowInstanceDetailResp)); List idList = this.orderedDatabaseIds.stream().flatMap(Collection::stream).collect(Collectors.toList()); List databaseList = databaseService.listDatabasesDetailsByIds(idList); - ArrayList databaseChangingRecords = new ArrayList<>(); + ArrayList records = new ArrayList<>(); for (Database database : databaseList) { - FlowInstanceDetailResp flowInstanceDetailResp = databaseId2FlowInstanceDetailResp.get(database.getId()); - DatabaseChangingRecord databaseChangingRecord = new DatabaseChangingRecord(); - databaseChangingRecord.setDatabase(new DatabaseChangeDatabase(database)); - databaseChangingRecord.setFlowInstanceDetailResp( - flowInstanceDetailResp != null ? new DatabaseChangeFlowInstanceDetailResp(flowInstanceDetailResp) - : null); - databaseChangingRecord.setStatus(flowInstanceDetailResp != null ? flowInstanceDetailResp.getStatus() - : FlowStatus.WAIT_FOR_EXECUTION); - databaseChangingRecords.add(databaseChangingRecord); + FlowInstanceDetailResp resp = databaseId2FlowInstanceDetailResp.get(database.getId()); + DatabaseChangingRecord record = new DatabaseChangingRecord(); + record.setDatabase(new DatabaseChangeDatabase(database)); + record.setFlowInstanceDetailResp(resp != null ? new DatabaseChangeFlowInstanceDetailResp(resp) : null); + record.setStatus(resp != null ? resp.getStatus() : FlowStatus.WAIT_FOR_EXECUTION); + records.add(record); } - result.setDatabaseChangingRecordList(databaseChangingRecords); - + result.setDatabaseChangingRecordList(records); return result; } @@ -308,4 +296,9 @@ private Boolean isContinue(FlowTaskExecutionStrategy flowTaskExecutionStrategy, } } + @Override + public List> getExecutionListenerClasses() { + return null; + } + } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/FlowTaskProperties.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/FlowTaskProperties.java index 36e27b0de2..1fe10cd3bb 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/FlowTaskProperties.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/FlowTaskProperties.java @@ -47,4 +47,8 @@ public class FlowTaskProperties { @Value("${odc.task.async.index-change-max-timeout-millis:432000000}") private long indexChangeMaxTimeoutMillisecond; + + @Value("${odc.flow.task.heartbeat-timeout-seconds:180}") + private long heartbeatTimeoutSeconds; + } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/TaskService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/TaskService.java index 55529608ba..499cc0ae8b 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/TaskService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/TaskService.java @@ -344,6 +344,10 @@ public void updateJobId(Long id, Long jobId) { taskRepository.updateJobId(id, jobId); } + @Transactional(rollbackFor = Exception.class) + public void updateHeartbeatTime(Long id) { + taskRepository.updateLastHeartbeatTimeById(id); + } private TaskEntity nullSafeFindById(Long id) { return taskRepository.findById(id)