Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(flow): add heartbeat for flow task #2468

Merged
merged 19 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,35 @@ public void clearAll() {
repository.deleteAll();
}

@Test
public void updateStatusByIdIn_entityExists_returnNotEmpty() {
ServiceTaskInstanceEntity entity = createEntity();
entity.setStatus(FlowNodeStatus.CANCELLED);
entity = this.repository.save(entity);
this.repository.updateStatusByIdIn(Collections.singletonList(entity.getId()), FlowNodeStatus.FAILED);
Optional<ServiceTaskInstanceEntity> optional = this.repository.findById(entity.getId());
entity.setStatus(FlowNodeStatus.FAILED);
Assert.assertEquals(entity, optional.get());
}

@Test
public void findByIdInAndStatus_entityExists_returnNotNull() {
ServiceTaskInstanceEntity entity = createEntity();
entity.setStatus(FlowNodeStatus.CANCELLED);
entity = this.repository.save(entity);
List<ServiceTaskInstanceEntity> actual = this.repository.findByIdInAndStatus(
Collections.singletonList(entity.getId()), FlowNodeStatus.CANCELLED);
Assert.assertEquals(Collections.singletonList(entity), actual);
}

@Test
public void findByStatus_entityExists_returnNotEmpty() {
ServiceTaskInstanceEntity entity = createEntity();
entity = this.repository.save(entity);
List<ServiceTaskInstanceEntity> actual = this.repository.findByStatus(entity.getStatus());
Assert.assertEquals(Collections.singletonList(entity), actual);
}

@Test
public void testSaveSerivceTaskInstanceEntity() {
ServiceTaskInstanceEntity entity = createEntity();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2023 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.oceanbase.odc.metadb.task;

import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Optional;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;

import com.oceanbase.odc.ServiceTestEnv;
import com.oceanbase.odc.test.tool.TestRandom;

public class TaskRepositoryTest extends ServiceTestEnv {

@Autowired
private TaskRepository taskRepository;

@Before
public void setUp() {
this.taskRepository.deleteAll();
}

@Test
public void updateLastHeartbeatTimeById_updateSucceed_returnNotNull() {
TaskEntity taskEntity = TestRandom.nextObject(TaskEntity.class);
taskEntity.setId(null);
taskEntity.setLastHeartbeatTime(null);
taskEntity = this.taskRepository.save(taskEntity);
this.taskRepository.updateLastHeartbeatTimeById(taskEntity.getId());
Optional<TaskEntity> optional = this.taskRepository.findById(taskEntity.getId());
Assert.assertNotNull(optional.get().getLastHeartbeatTime());
}

@Test
public void findAllByLastHeartbeatTimeBeforeAndIdIn_findBeforeNow_returnNotNull() {
TaskEntity taskEntity = TestRandom.nextObject(TaskEntity.class);
taskEntity.setId(null);
taskEntity.setLastHeartbeatTime(null);
taskEntity = this.taskRepository.save(taskEntity);
this.taskRepository.updateLastHeartbeatTimeById(taskEntity.getId());
Optional<TaskEntity> optional = this.taskRepository.findById(taskEntity.getId());
Date heartbeatTime = new Date(optional.get().getLastHeartbeatTime().getTime() + 1);
List<TaskEntity> actual = this.taskRepository.findAllByLastHeartbeatTimeBeforeAndIdIn(
heartbeatTime, Collections.singletonList(taskEntity.getId()));
Assert.assertFalse(actual.isEmpty());
}

@Test
public void findAllByLastHeartbeatTimeBeforeAndIdIn_findBeforeLastHeartbeatTime_returnEmpty() {
TaskEntity taskEntity = TestRandom.nextObject(TaskEntity.class);
taskEntity.setId(null);
taskEntity.setLastHeartbeatTime(null);
taskEntity = this.taskRepository.save(taskEntity);
this.taskRepository.updateLastHeartbeatTimeById(taskEntity.getId());
Optional<TaskEntity> optional = this.taskRepository.findById(taskEntity.getId());
List<TaskEntity> actual = this.taskRepository.findAllByLastHeartbeatTimeBeforeAndIdIn(
optional.get().getLastHeartbeatTime(), Collections.singletonList(taskEntity.getId()));
Assert.assertTrue(actual.isEmpty());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@
*/
package com.oceanbase.odc.service.flow.tool;

import java.util.Collections;
import java.util.List;

import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.ExecutionListener;

import com.oceanbase.odc.core.flow.BaseFlowableDelegate;
import com.oceanbase.odc.service.flow.listener.ServiceTaskExecutingCompleteListener;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -29,4 +34,9 @@ protected void run(DelegateExecution execution) {
log.info("Service task run, currentActivityId={}, processInstanceId={}", execution.getCurrentActivityId(),
execution.getProcessInstanceId());
}

@Override
protected List<Class<? extends ExecutionListener>> getExecutionListenerClasses() {
return Collections.singletonList(ServiceTaskExecutingCompleteListener.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*/
package com.oceanbase.odc.core.flow;

import java.util.List;

import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.ExecutionListener;
import org.flowable.engine.delegate.JavaDelegate;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -36,6 +39,8 @@ public abstract class BaseFlowableDelegate implements JavaDelegate {
*/
protected abstract void run(DelegateExecution execution) throws Exception;

protected abstract List<Class<? extends ExecutionListener>> getExecutionListenerClasses();

@Override
public void execute(DelegateExecution execution) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter table `task_task` add column `last_heartbeat_time` datetime default null comment 'Last heartbeat time';
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import java.util.Set;
import java.util.function.Function;

import javax.persistence.LockModeType;
import javax.transaction.Transactional;

import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Lock;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
Expand Down Expand Up @@ -62,6 +64,18 @@ public interface ServiceTaskInstanceRepository extends OdcJpaRepository<ServiceT

List<ServiceTaskInstanceEntity> findByFlowInstanceIdIn(Set<Long> flowInstanceIds);

@Transactional
@Lock(value = LockModeType.PESSIMISTIC_WRITE)
List<ServiceTaskInstanceEntity> findByIdInAndStatus(Collection<Long> ids, FlowNodeStatus status);

@Transactional
@Query(value = "update flow_instance_node_task set status=:#{#status.name()} where id in (:ids)",
nativeQuery = true)
@Modifying
int updateStatusByIdIn(@Param("ids") List<Long> ids, @Param("status") FlowNodeStatus status);

List<ServiceTaskInstanceEntity> findByStatus(FlowNodeStatus status);

@Query(value = "select na.* from flow_instance_node_task as na inner join flow_instance_node as n on na.id=n.instance_id "
+ "where n.instance_type=:#{#instanceType.name()} and n.activity_id=:activityId and n.flow_instance_id=:flowInstanceId",
nativeQuery = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,7 @@ public class TaskEntity {
@Column(name = "job_id")
private Long jobId;

@Column(name = "last_heartbeat_time")
private Date lastHeartbeatTime;

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.oceanbase.odc.metadb.task;

import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -43,6 +44,13 @@ public interface TaskRepository extends JpaRepository<TaskEntity, Long>, JpaSpec
@Modifying
int update(@Param("param") TaskEntity entity);

@Transactional
yhilmare marked this conversation as resolved.
Show resolved Hide resolved
@Query(value = "update task_task set last_heartbeat_time=now() where id=:id", nativeQuery = true)
@Modifying
int updateLastHeartbeatTimeById(@Param("id") Long id);

List<TaskEntity> findAllByLastHeartbeatTimeBeforeAndIdIn(Date lastHeartbeatTime, List<Long> ids);

@Transactional
@Query("update TaskEntity set parametersJson=:#{#param.parametersJson} where id=:#{#param.id}")
@Modifying
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright (c) 2023 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.oceanbase.odc.service.flow;

import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionTemplate;

import com.oceanbase.odc.core.shared.constant.FlowStatus;
import com.oceanbase.odc.metadb.flow.FlowInstanceRepository;
import com.oceanbase.odc.metadb.flow.ServiceTaskInstanceEntity;
import com.oceanbase.odc.metadb.flow.ServiceTaskInstanceRepository;
import com.oceanbase.odc.metadb.task.TaskEntity;
import com.oceanbase.odc.metadb.task.TaskRepository;
import com.oceanbase.odc.service.flow.model.FlowNodeStatus;
import com.oceanbase.odc.service.flow.task.model.FlowTaskProperties;
import com.oceanbase.odc.service.flow.task.model.RuntimeTaskConstants;
import com.oceanbase.tools.dbbrowser.util.DBSchemaAccessorUtil;

import lombok.extern.slf4j.Slf4j;

/**
* {@link FlowSchedules}
*
* @author yh263208
* @date 2024-05-22 15:33
* @since ODC-release_4.3.0
*/
@Slf4j
@Component
public class FlowSchedules {

private static final Integer OB_MAX_IN_SIZE = 2000;
@Autowired
private TransactionTemplate transactionTemplate;
@Autowired
private FlowTaskProperties flowTaskProperties;
@Autowired
private TaskRepository taskRepository;
@Autowired
private FlowInstanceRepository flowInstanceRepository;
@Autowired
private ServiceTaskInstanceRepository serviceTaskInstanceRepository;

@Scheduled(fixedDelayString = "${odc.flow.task.heartbeat-timeout-check-interval-millis:15000}")
public void cancelHeartbeatTimeoutFlow() {
long timeoutSeconds = this.flowTaskProperties.getHeartbeatTimeoutSeconds();
if (timeoutSeconds <= 0) {
return;
}
long minTimeoutSeconds = RuntimeTaskConstants.DEFAULT_TASK_CHECK_INTERVAL_SECONDS * 3;
yhilmare marked this conversation as resolved.
Show resolved Hide resolved
if (timeoutSeconds < minTimeoutSeconds) {
timeoutSeconds = minTimeoutSeconds;
}
try {
List<ServiceTaskInstanceEntity> taskInstanceEntities = this.serviceTaskInstanceRepository
.findByStatus(FlowNodeStatus.EXECUTING).stream()
.filter(e -> e.getTargetTaskId() != null).collect(Collectors.toList());
if (CollectionUtils.isEmpty(taskInstanceEntities)) {
return;
}
List<Long> taskIds = taskInstanceEntities.stream()
.map(ServiceTaskInstanceEntity::getTargetTaskId).distinct().collect(Collectors.toList());
Date timeoutBound = new Date(System.currentTimeMillis() - timeoutSeconds * 1000);
List<TaskEntity> heartbeatTimeoutTasks = DBSchemaAccessorUtil.partitionFind(taskIds,
OB_MAX_IN_SIZE, ids -> taskRepository.findAllByLastHeartbeatTimeBeforeAndIdIn(timeoutBound, ids));
if (CollectionUtils.isEmpty(heartbeatTimeoutTasks)) {
return;
}
Set<Long> heartbeatTimeoutTaskIds = heartbeatTimeoutTasks.stream()
.map(TaskEntity::getId).collect(Collectors.toSet());
log.info("Find the task with heartbeat timeout, timeoutSeconds={}, earliestHeartbeatTime={}, "
+ "heartbeatTimeoutTaskIds={}", timeoutSeconds, timeoutBound, heartbeatTimeoutTaskIds);
/**
* we just find such flow task instances:
* <p>
* 1. heartbeat timeout 2. flow task instance is executing
* </p>
*/
cancelFlowTaskInstanceAndFlowInstance(taskInstanceEntities.stream()
.filter(e -> heartbeatTimeoutTaskIds.contains(e.getTargetTaskId()))
.map(ServiceTaskInstanceEntity::getId).distinct().collect(Collectors.toList()));
} catch (Exception e) {
log.warn("Failed to sync flow instance's status", e);
}
}

private void cancelFlowTaskInstanceAndFlowInstance(List<Long> candidateFlowTaskInstanceIds) {
this.transactionTemplate.executeWithoutResult(tx -> {
try {
List<ServiceTaskInstanceEntity> candidates = serviceTaskInstanceRepository
.findByIdInAndStatus(candidateFlowTaskInstanceIds, FlowNodeStatus.EXECUTING);
List<Long> candidateIds = candidates.stream()
.map(ServiceTaskInstanceEntity::getId).distinct().collect(Collectors.toList());
List<Integer> result = DBSchemaAccessorUtil.partitionFind(candidateIds,
OB_MAX_IN_SIZE, ids -> Collections.singletonList(
serviceTaskInstanceRepository.updateStatusByIdIn(ids, FlowNodeStatus.FAILED)));
log.info("Update flow task instance status succeed, affectRows={}, flowTaskInstIds={}",
result, candidateIds);
List<Long> flowInstIds = candidates.stream().map(ServiceTaskInstanceEntity::getFlowInstanceId)
.distinct().collect(Collectors.toList());
result = DBSchemaAccessorUtil.partitionFind(flowInstIds,
OB_MAX_IN_SIZE, ids -> Collections.singletonList(
flowInstanceRepository.updateStatusByIds(ids, FlowStatus.EXECUTION_FAILED)));
log.info("Update flow instance status succeed, affectRows={}, flowInstIds={}", result, flowInstIds);
} catch (Exception e) {
log.warn("Failed to sync flow instance's status", e);
tx.setRollbackOnly();
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -778,19 +778,20 @@ private URL generatePresignedUrl(String objectName, Long expirationSeconds) thro
}

private void setDownloadUrlsIfNecessary(Long taskId, List<? extends FlowTaskResult> results) {
if (cloudObjectStorageService.supported()) {
for (FlowTaskResult result : results) {
TaskDownloadUrls urls = databaseChangeOssUrlCache.get(taskId);
if (result instanceof AbstractFlowTaskResult) {
((AbstractFlowTaskResult) result)
.setFullLogDownloadUrl(urls.getLogDownloadUrl());
}
if (result instanceof DatabaseChangeResult) {
((DatabaseChangeResult) result).setZipFileDownloadUrl(urls.getDatabaseChangeZipFileDownloadUrl());
if (Objects.nonNull(((DatabaseChangeResult) result).getRollbackPlanResult())) {
((DatabaseChangeResult) result).getRollbackPlanResult()
.setResultFileDownloadUrl(urls.getRollBackPlanResultFileDownloadUrl());
}
if (!cloudObjectStorageService.supported()) {
return;
}
for (FlowTaskResult result : results) {
TaskDownloadUrls urls = databaseChangeOssUrlCache.get(taskId);
if (result instanceof AbstractFlowTaskResult) {
((AbstractFlowTaskResult) result)
.setFullLogDownloadUrl(urls.getLogDownloadUrl());
}
if (result instanceof DatabaseChangeResult) {
((DatabaseChangeResult) result).setZipFileDownloadUrl(urls.getDatabaseChangeZipFileDownloadUrl());
if (Objects.nonNull(((DatabaseChangeResult) result).getRollbackPlanResult())) {
((DatabaseChangeResult) result).getRollbackPlanResult()
.setResultFileDownloadUrl(urls.getRollBackPlanResultFileDownloadUrl());
}
}
}
Expand Down
Loading