diff --git a/CHANGES.md b/CHANGES.md index 5e350802d64..bac536234b7 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -50,6 +50,7 @@ | [INLONG-8006](https://github.com/apache/inlong/issues/8006) | [Improve][Manager] Set displayname for the auto-registered cluster | | [INLONG-7999](https://github.com/apache/inlong/issues/7999) | [Improve][Manager] Support PostgreSQL data node | | [INLONG-7996](https://github.com/apache/inlong/issues/7996) | [Improve][Manager] Support issued kafka consumer group to sort | +| [INLONG-7987](https://github.com/apache/inlong/issues/7987) | [Improve][Manager] Add a heartbeat timeout status to the source | | [INLONG-7981](https://github.com/apache/inlong/issues/7981) | [Bug][Manager] Failed to stop source correctly when suspend a group | | [INLONG-7948](https://github.com/apache/inlong/issues/7948) | [Improve][Manager] Add user authentication when operate inlong consume | | [INLONG-7946](https://github.com/apache/inlong/issues/7946) | [Improve][Manager] Add user authentication when bind clusterTag | diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleSourceStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleSourceStatus.java index e361daadcf7..7c72f916efb 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleSourceStatus.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleSourceStatus.java @@ -44,6 +44,7 @@ public static SimpleSourceStatus parseByStatus(int status) { case SOURCE_STOP: return FROZEN; case SOURCE_FAILED: + case HEARTBEAT_TIMEOUT: return FAILED; case TO_BE_ISSUED_DELETE: case BEEN_ISSUED_DELETE: diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java index 99118bebbfc..306a9e60ccb 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java @@ -34,6 +34,7 @@ public enum SourceStatus { SOURCE_NORMAL(101, "normal"), SOURCE_FAILED(102, "failed"), SOURCE_STOP(104, "stop"), + HEARTBEAT_TIMEOUT(105, "heartbeat timeout"), // if not approved SOURCE_NEW(110, "new created"), @@ -76,7 +77,7 @@ public enum SourceStatus { */ public static final Set ALLOWED_UPDATE = Sets.newHashSet( SOURCE_NEW.getCode(), SOURCE_FAILED.getCode(), SOURCE_STOP.getCode(), - SOURCE_NORMAL.getCode()); + SOURCE_NORMAL.getCode(), HEARTBEAT_TIMEOUT.getCode()); public static final Set TOBE_ISSUED_SET = Sets.newHashSet( TO_BE_ISSUED_ADD, TO_BE_ISSUED_DELETE, TO_BE_ISSUED_RETRY, @@ -87,59 +88,75 @@ public enum SourceStatus { static { // new - SOURCE_STATE_AUTOMATON.put(SOURCE_NEW, Sets.newHashSet(SOURCE_DISABLE, SOURCE_NEW, TO_BE_ISSUED_ADD)); + SOURCE_STATE_AUTOMATON.put(SOURCE_NEW, + Sets.newHashSet(SOURCE_DISABLE, SOURCE_NEW, TO_BE_ISSUED_ADD, HEARTBEAT_TIMEOUT)); // normal SOURCE_STATE_AUTOMATON.put(SOURCE_NORMAL, Sets.newHashSet(SOURCE_DISABLE, SOURCE_NORMAL, SOURCE_FAILED, TO_BE_ISSUED_DELETE, TO_BE_ISSUED_RETRY, TO_BE_ISSUED_BACKTRACK, TO_BE_ISSUED_STOP, TO_BE_ISSUED_ACTIVE, - TO_BE_ISSUED_CHECK, TO_BE_ISSUED_REDO_METRIC, TO_BE_ISSUED_MAKEUP)); + TO_BE_ISSUED_CHECK, TO_BE_ISSUED_REDO_METRIC, TO_BE_ISSUED_MAKEUP, HEARTBEAT_TIMEOUT)); // failed - SOURCE_STATE_AUTOMATON.put(SOURCE_FAILED, Sets.newHashSet(SOURCE_DISABLE, SOURCE_FAILED, TO_BE_ISSUED_RETRY)); + SOURCE_STATE_AUTOMATON.put(SOURCE_FAILED, + Sets.newHashSet(SOURCE_DISABLE, SOURCE_FAILED, TO_BE_ISSUED_RETRY, HEARTBEAT_TIMEOUT)); // frozen - SOURCE_STATE_AUTOMATON.put(SOURCE_STOP, Sets.newHashSet(SOURCE_DISABLE, SOURCE_STOP, TO_BE_ISSUED_ACTIVE)); + SOURCE_STATE_AUTOMATON.put(SOURCE_STOP, + Sets.newHashSet(SOURCE_DISABLE, SOURCE_STOP, TO_BE_ISSUED_ACTIVE, HEARTBEAT_TIMEOUT)); // [xxx] bo be issued - HashSet tobeAddNext = Sets.newHashSet(BEEN_ISSUED_ADD, SOURCE_DISABLE); + HashSet tobeAddNext = Sets.newHashSet(BEEN_ISSUED_ADD, SOURCE_DISABLE, HEARTBEAT_TIMEOUT); tobeAddNext.addAll(TOBE_ISSUED_SET); SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ADD, tobeAddNext); - HashSet tobeDeleteNext = Sets.newHashSet(BEEN_ISSUED_DELETE); + HashSet tobeDeleteNext = Sets.newHashSet(BEEN_ISSUED_DELETE, HEARTBEAT_TIMEOUT); tobeDeleteNext.addAll(TOBE_ISSUED_SET); SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_DELETE, Sets.newHashSet(tobeDeleteNext)); - HashSet tobeRetryNext = Sets.newHashSet(BEEN_ISSUED_RETRY); + HashSet tobeRetryNext = Sets.newHashSet(BEEN_ISSUED_RETRY, HEARTBEAT_TIMEOUT); tobeRetryNext.addAll(TOBE_ISSUED_SET); SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_RETRY, Sets.newHashSet(tobeRetryNext)); - HashSet tobeBacktrackNext = Sets.newHashSet(BEEN_ISSUED_BACKTRACK); + HashSet tobeBacktrackNext = Sets.newHashSet(BEEN_ISSUED_BACKTRACK, HEARTBEAT_TIMEOUT); tobeBacktrackNext.addAll(TOBE_ISSUED_SET); SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_BACKTRACK, Sets.newHashSet(tobeBacktrackNext)); - HashSet tobeFrozenNext = Sets.newHashSet(BEEN_ISSUED_STOP); + HashSet tobeFrozenNext = Sets.newHashSet(BEEN_ISSUED_STOP, HEARTBEAT_TIMEOUT); tobeFrozenNext.addAll(TOBE_ISSUED_SET); SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_STOP, Sets.newHashSet(tobeFrozenNext)); - HashSet tobeActiveNext = Sets.newHashSet(BEEN_ISSUED_ACTIVE); + HashSet tobeActiveNext = Sets.newHashSet(BEEN_ISSUED_ACTIVE, HEARTBEAT_TIMEOUT); tobeActiveNext.addAll(TOBE_ISSUED_SET); SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ACTIVE, Sets.newHashSet(tobeActiveNext)); - HashSet tobeCheckNext = Sets.newHashSet(BEEN_ISSUED_CHECK); + HashSet tobeCheckNext = Sets.newHashSet(BEEN_ISSUED_CHECK, HEARTBEAT_TIMEOUT); tobeCheckNext.addAll(TOBE_ISSUED_SET); SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_CHECK, Sets.newHashSet(tobeCheckNext)); - HashSet tobeRedoMetricNext = Sets.newHashSet(BEEN_ISSUED_REDO_METRIC); + HashSet tobeRedoMetricNext = Sets.newHashSet(BEEN_ISSUED_REDO_METRIC, HEARTBEAT_TIMEOUT); tobeRedoMetricNext.addAll(TOBE_ISSUED_SET); SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_REDO_METRIC, Sets.newHashSet(tobeRedoMetricNext)); - HashSet tobeMakeupNext = Sets.newHashSet(BEEN_ISSUED_MAKEUP); + HashSet tobeMakeupNext = Sets.newHashSet(BEEN_ISSUED_MAKEUP, HEARTBEAT_TIMEOUT); tobeMakeupNext.addAll(TOBE_ISSUED_SET); SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_MAKEUP, Sets.newHashSet(tobeMakeupNext)); // [xxx] been issued - SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_ADD, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED)); - SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_DELETE, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED)); - SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_RETRY, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED)); - SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_BACKTRACK, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED)); - SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_STOP, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED)); - SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_ACTIVE, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED)); - SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_CHECK, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED)); - SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_REDO_METRIC, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED)); - SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_MAKEUP, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED)); + SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_ADD, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT)); + SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_DELETE, + Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT)); + SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_RETRY, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT)); + SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_BACKTRACK, + Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT)); + SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_STOP, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT)); + SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_ACTIVE, + Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT)); + SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_CHECK, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT)); + SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_REDO_METRIC, + Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT)); + SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_MAKEUP, + Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT)); + SOURCE_STATE_AUTOMATON.put(HEARTBEAT_TIMEOUT, + Sets.newHashSet(SOURCE_DISABLE, SOURCE_NORMAL, SOURCE_FAILED, SOURCE_STOP, TO_BE_ISSUED_ADD, + TO_BE_ISSUED_DELETE, + TO_BE_ISSUED_RETRY, TO_BE_ISSUED_BACKTRACK, TO_BE_ISSUED_STOP, TO_BE_ISSUED_ACTIVE, + TO_BE_ISSUED_CHECK, TO_BE_ISSUED_REDO_METRIC, TO_BE_ISSUED_MAKEUP, BEEN_ISSUED_ADD, + BEEN_ISSUED_DELETE, BEEN_ISSUED_RETRY, BEEN_ISSUED_BACKTRACK, BEEN_ISSUED_STOP, + BEEN_ISSUED_ACTIVE, BEEN_ISSUED_CHECK, BEEN_ISSUED_REDO_METRIC, BEEN_ISSUED_MAKEUP, + HEARTBEAT_TIMEOUT)); } private final Integer code; diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java index 2c7537577e1..c85c83712e3 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java @@ -47,6 +47,11 @@ List selectByParentId(@Param("parentId") Integer parent int updateByIdSelective(InlongClusterNodeEntity record); + /** + * Update the status to `nextStatus` by the given id. + */ + int updateStatus(@Param("id") Integer id, @Param("nextStatus") Integer nextStatus, @Param("status") Integer status); + int deleteById(Integer id); } \ No newline at end of file diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java index 99a06ac340a..6d8610ab382 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java @@ -125,6 +125,13 @@ List selectByStatusAndCluster(@Param("statusList") List selectNeedUpdateIdsByClusterAndDataNode(@Param("clusterName") String clusterName, @Param("nodeName") String nodeName, @Param("sourceType") String sourceType); + /** + * Query need update tasks by the given status list and type List. + */ + List selectHeartbeatTimeoutIds(@Param("sourceTypeList") List sourceTypeList, + @Param("agentIp") String agentIp, + @Param("clusterName") String clusterName); + int updateByPrimaryKeySelective(StreamSourceEntity record); int updateByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId, @@ -164,6 +171,21 @@ int updateIpAndUuid(@Param("id") Integer id, @Param("agentIp") String agentIp, @ void updateStatusByIds(@Param("idList") List idList, @Param("status") Integer status, @Param("operator") String operator); + /** + * Update the source status + * + * @param idList source id list + * @param operator operator name + */ + void rollbackTimeoutStatusByIds(@Param("idList") List idList, @Param("operator") String operator); + + /** + * Update the source status when it has been deleted + * + * @param beforeSeconds the modified time was beforeSeconds seconds ago + */ + void updateStatusToTimeout(@Param("beforeSeconds") Integer beforeSeconds); + /** * Physical delete stream sources by group id and stream id */ diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml index 2dd8b216f82..7bf692fe5af 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml @@ -201,7 +201,23 @@ where id = #{id,jdbcType=INTEGER} and version = #{version,jdbcType=INTEGER} - + + update inlong_cluster_node + + + status = #{nextStatus,jdbcType=INTEGER} + + + + is_deleted = 0 + + and status = #{status,jdbcType=INTEGER} + + + and id = #{id,jdbcType=INTEGER} + + + delete from inlong_cluster_node diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml index 7c2df01992d..d29200b8747 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml @@ -342,6 +342,24 @@ + update stream_source @@ -484,9 +502,11 @@ update stream_source previous_status = status, - status = #{status, jdbcType=INTEGER}, - modifier = #{operator, jdbcType=VARCHAR}, - version = version + 1 + status = #{status, jdbcType=INTEGER}, + + modifier = #{operator, jdbcType=VARCHAR}, + + version = version + 1 is_deleted = 0 @@ -498,6 +518,39 @@ + + update stream_source + + status = previous_status, + previous_status = 105, + + modifier = #{operator, jdbcType=VARCHAR}, + + version = version + 1 + + + is_deleted = 0 + and status = 105 + + and id in + + #{item} + + + + + + update stream_source + + previous_status = status, + status = 105 + + + is_deleted = 0 + and modify_time <= DATE_ADD(NOW(), INTERVAL -#{beforeSeconds, jdbcType=INTEGER} SECOND) + and status in (200, 201, 202, 203, 204, 205, 206, 207, 208) + + delete diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java index b1aff06aea6..0d6d5d68e4e 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java @@ -38,7 +38,7 @@ public class BindTagRequest { @ApiModelProperty(value = "Cluster tag") @NotBlank(message = "clusterTag cannot be blank") @Length(min = 1, max = 128, message = "length must be between 1 and 128") - @Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'") + @Pattern(regexp = "^[a-z0-9_.-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'") private String clusterTag; @ApiModelProperty(value = "Cluster-ID list which needs to bind tag") diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java index 305a9087735..1548bafa0f5 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java @@ -43,7 +43,7 @@ public class ClusterTagRequest { @ApiModelProperty(value = "Cluster tag") @NotBlank(groups = SaveValidation.class, message = "clusterTag cannot be blank") @Length(min = 1, max = 128, message = "length must be between 1 and 128") - @Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'") + @Pattern(regexp = "^[a-z0-9_.-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'") private String clusterTag; @ApiModelProperty(value = "Extended params") diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java index 4b07ef0b3bb..d9306609c77 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java @@ -67,7 +67,7 @@ public class SourceRequest { @ApiModelProperty("Source name, unique in one stream") @NotBlank(groups = SaveValidation.class, message = "sourceName cannot be blank") @Length(min = 1, max = 100, message = "sourceName length must be between 1 and 100") - @Pattern(regexp = "^[a-z0-9_-]{1,100}$", message = "sourceName only supports lowercase letters, numbers, '-', or '_'") + @Pattern(regexp = "^[a-z0-9_.-]{1,100}$", message = "sourceName only supports lowercase letters, numbers, '-', or '_'") private String sourceName; @ApiModelProperty("Ip of the agent running the task") @@ -85,7 +85,7 @@ public class SourceRequest { @ApiModelProperty("Inlong cluster node label for filtering stream source collect task") @Length(min = 1, max = 128, message = "length must be between 1 and 128") - @Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'") + @Pattern(regexp = "^[a-z0-9_.-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'") private String inlongClusterNodeGroup; @ApiModelProperty("Data node name") diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java index d8fb62fbcb2..7ba844f8a8e 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java @@ -1240,7 +1240,7 @@ public DataProxyConfig getDataProxyConfig(String clusterTag, String clusterName) clusterEntityList.forEach(e -> tagSet.addAll(Arrays.asList(e.getClusterTags().split(InlongConstants.COMMA)))); List clusterTagList = new ArrayList<>(tagSet); InlongGroupPageRequest groupRequest = InlongGroupPageRequest.builder() - .status(GroupStatus.CONFIG_SUCCESSFUL.getCode()) + .statusList(Arrays.asList(GroupStatus.CONFIG_SUCCESSFUL.getCode(), GroupStatus.RESTARTED.getCode())) .clusterTagList(clusterTagList) .build(); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java index ff44bfb9c05..1faf0156132 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java @@ -19,6 +19,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.gson.Gson; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.RandomStringUtils; @@ -67,11 +68,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Isolation; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; +import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -81,6 +84,11 @@ import java.util.Locale; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -98,7 +106,18 @@ public class AgentServiceImpl implements AgentService { private static final int MODULUS_100 = 100; private static final int TASK_FETCH_SIZE = 2; private static final Gson GSON = new Gson(); - + private final ExecutorService executorService = new ThreadPoolExecutor( + 5, + 10, + 10L, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(100), + new ThreadFactoryBuilder().setNameFormat("async-agent-%s").build(), + new CallerRunsPolicy()); + @Value("${source.update.enabled:false}") + private Boolean enabled; + @Value("${source.update.before.seconds:60}") + private Integer beforeSeconds; @Autowired private StreamSourceEntityMapper sourceMapper; @Autowired @@ -114,6 +133,18 @@ public class AgentServiceImpl implements AgentService { @Autowired private InlongClusterNodeEntityMapper clusterNodeMapper; + /** + * Start the update task + */ + @PostConstruct + private void startHeartbeatTask() { + if (enabled) { + UpdateTaskRunnable taskRunnable = new UpdateTaskRunnable(); + this.executorService.execute(taskRunnable); + LOGGER.info("update task status started successfully"); + } + } + @Override public Boolean reportSnapshot(TaskSnapshotRequest request) { return snapshotOperator.snapshot(request); @@ -129,6 +160,8 @@ public void report(TaskRequest request) { throw new BusinessException("agent request or agent ip was empty, just return"); } + preTimeoutTasks(request); + // Update task status, other tasks with status 20x will change to 30x in next request if (CollectionUtils.isEmpty(request.getCommandInfo())) { LOGGER.info("task result was empty in request: {}, just return", request); @@ -422,6 +455,16 @@ private void preProcessLabelFileTasks(TaskRequest taskRequest) { }); } + private void preTimeoutTasks(TaskRequest taskRequest) { + // If the agent report succeeds, restore the source status + List needUpdateIds = sourceMapper.selectHeartbeatTimeoutIds(Lists.newArrayList(SourceType.FILE), + taskRequest.getAgentIp(), taskRequest.getClusterName()); + // restore state for all source by ip and type + if (CollectionUtils.isNotEmpty(needUpdateIds)) { + sourceMapper.rollbackTimeoutStatusByIds(needUpdateIds, null); + } + } + private InlongClusterNodeEntity selectByIpAndCluster(String clusterName, String ip) { InlongClusterEntity clusterEntity = clusterMapper.selectByNameAndType(clusterName, ClusterType.AGENT); if (clusterEntity == null) { @@ -602,4 +645,22 @@ private boolean matchGroup(StreamSourceEntity sourceEntity, InlongClusterNodeEnt return sourceGroups.stream().anyMatch(clusterNodeGroups::contains); } + /** + * update task status when task timeout + */ + private class UpdateTaskRunnable implements Runnable { + + @Override + public void run() { + while (true) { + try { + sourceMapper.updateStatusToTimeout(beforeSeconds); + Thread.sleep(beforeSeconds * 1000); + } catch (Throwable t) { + LOGGER.error("update task status runnable error", t); + } + } + } + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java index 20ee092fd1a..8f2ec1b4eed 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java @@ -23,17 +23,21 @@ import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.Scheduler; import com.google.common.base.Joiner; +import com.google.common.collect.Lists; import com.google.gson.Gson; import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.inlong.common.enums.NodeSrvStatus; import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager; import org.apache.inlong.common.heartbeat.ComponentHeartbeat; import org.apache.inlong.common.heartbeat.HeartbeatMsg; import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.consts.SourceType; import org.apache.inlong.manager.common.enums.ClusterStatus; +import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.NodeStatus; import org.apache.inlong.manager.common.util.JsonUtils; @@ -42,23 +46,28 @@ import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity; import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper; import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper; +import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest; import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO; import org.apache.inlong.manager.service.cluster.InlongClusterOperator; import org.apache.inlong.manager.service.cluster.InlongClusterOperatorFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Arrays; import java.util.HashSet; +import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @Slf4j +@Lazy @Component public class HeartbeatManager implements AbstractHeartbeatManager { @@ -75,6 +84,8 @@ public class HeartbeatManager implements AbstractHeartbeatManager { private InlongClusterEntityMapper clusterMapper; @Autowired private InlongClusterNodeEntityMapper clusterNodeMapper; + @Autowired + private StreamSourceEntityMapper sourceMapper; /** * Check whether the configuration information carried in the heartbeat has been updated @@ -88,11 +99,15 @@ private static boolean heartbeatConfigModified(HeartbeatMsg oldHB, HeartbeatMsg if (oldHB == null) { return true; } - return oldHB.getNodeGroup() != newHB.getNodeGroup() || oldHB.getLoad() != newHB.getLoad(); + return !Objects.equals(oldHB.getNodeGroup(), newHB.getNodeGroup()) || !Objects.equals(oldHB.getLoad(), + newHB.getLoad()); } @PostConstruct public void init() { + // When the manager restarts, set the heartbeat timeout state of all nodes + // and wait for the heartbeat report of the corresponding node + clusterNodeMapper.updateStatus(null, NodeStatus.HEARTBEAT_TIMEOUT.getStatus(), NodeStatus.NORMAL.getStatus()); long expireTime = heartbeatInterval() * 2L; Scheduler evictScheduler = Scheduler.forScheduledExecutorService(Executors.newSingleThreadScheduledExecutor()); heartbeatCache = Caffeine.newBuilder() @@ -162,6 +177,16 @@ public void reportHeartbeat(HeartbeatMsg heartbeat) { handlerNum += insertClusterNode(clusterInfo, heartbeatMsg, clusterInfo.getCreator()); } else { handlerNum += updateClusterNode(clusterNode, heartbeatMsg); + // If the agent report succeeds, restore the source status + if (Objects.equals(clusterNode.getType(), ClusterType.AGENT)) { + // If the agent report succeeds, restore the source status + List needUpdateIds = sourceMapper.selectHeartbeatTimeoutIds( + Lists.newArrayList(SourceType.FILE), heartbeat.getIp(), heartbeat.getClusterName()); + // restore state for all source by ip and type + if (CollectionUtils.isNotEmpty(needUpdateIds)) { + sourceMapper.rollbackTimeoutStatusByIds(needUpdateIds, null); + } + } } } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java index ec7071c98ae..052a818f69b 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java @@ -45,6 +45,7 @@ import org.apache.inlong.manager.pojo.heartbeat.StreamHeartbeatResponse; import org.apache.inlong.manager.service.core.HeartbeatService; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import java.util.List; @@ -59,6 +60,7 @@ public class HeartbeatServiceImpl implements HeartbeatService { private static final Gson GSON = new Gson(); @Autowired + @Lazy private HeartbeatManager heartbeatManager; @Autowired private ComponentHeartbeatEntityMapper componentHeartbeatMapper; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java index 2e3d3228319..fdad06090f6 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java @@ -62,7 +62,7 @@ public TaskEvent event() { @Override public ListenerResult listen(WorkflowContext context) throws Exception { - log.info("operate stream source for context={}", context); + log.info("operate stream source for groupId={}", context.getProcessForm().getInlongGroupId()); InlongGroupInfo groupInfo = getGroupInfo(context.getProcessForm()); final String groupId = groupInfo.getInlongGroupId(); List streamResponses = streamService.listBriefWithSink(groupId); @@ -104,6 +104,7 @@ public boolean checkIfOp(StreamSource streamSource, List unOperate SourceStatus sourceStatus = SourceStatus.forCode(status); // template sources are filtered and processed in corresponding subclass listeners if (sourceStatus == SourceStatus.SOURCE_NORMAL || sourceStatus == SourceStatus.SOURCE_STOP + || sourceStatus == SourceStatus.HEARTBEAT_TIMEOUT || CollectionUtils.isNotEmpty(streamSource.getSubSourceList())) { return true; } else if (sourceStatus == SourceStatus.SOURCE_FAILED || sourceStatus == SourceStatus.SOURCE_DISABLE) { @@ -118,7 +119,8 @@ public boolean checkIfOp(StreamSource streamSource, List unOperate if (sourceStatus != SourceStatus.SOURCE_NORMAL && sourceStatus != SourceStatus.SOURCE_STOP && sourceStatus != SourceStatus.SOURCE_DISABLE - && sourceStatus != SourceStatus.SOURCE_FAILED) { + && sourceStatus != SourceStatus.SOURCE_FAILED + && sourceStatus != SourceStatus.HEARTBEAT_TIMEOUT) { log.error("stream source ={} cannot be operated for status={}", streamSource, sourceStatus); unOperatedSources.add(streamSource); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java index a6a25b5e547..6c3cf444285 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java @@ -167,26 +167,29 @@ public void updateOpt(SourceRequest request, Integer groupStatus, Integer groupM // setting updated parameters of stream source entity. setTargetEntity(request, entity); entity.setModifier(operator); - entity.setPreviousStatus(entity.getStatus()); // re-issue task if necessary if (InlongConstants.STANDARD_MODE.equals(groupMode)) { + SourceStatus sourceStatus = SourceStatus.forCode(entity.getStatus()); + Integer nextStatus = entity.getStatus(); if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) { - entity.setStatus(SourceStatus.TO_BE_ISSUED_RETRY.getCode()); + nextStatus = SourceStatus.TO_BE_ISSUED_RETRY.getCode(); } else { switch (SourceStatus.forCode(entity.getStatus())) { case SOURCE_NORMAL: - entity.setStatus(SourceStatus.TO_BE_ISSUED_RETRY.getCode()); + case HEARTBEAT_TIMEOUT: + nextStatus = SourceStatus.TO_BE_ISSUED_RETRY.getCode(); break; case SOURCE_FAILED: - entity.setStatus(SourceStatus.SOURCE_NEW.getCode()); + nextStatus = SourceStatus.SOURCE_NEW.getCode(); break; default: // others leave it be break; } } + entity.setStatus(nextStatus); } int rowCount = sourceMapper.updateByPrimaryKeySelective(entity); diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties index 53dd907c1f0..5bd94df1d53 100644 --- a/inlong-manager/manager-web/src/main/resources/application-dev.properties +++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties @@ -85,3 +85,8 @@ data.cleansing.batchSize=100 # Whether to use ZooKeeper to manage the Sort task config, default is false, which means not using ZooKeeper sort.enable.zookeeper=false + +# If turned on, synchronizing change the source status when the agent heartbeat times out +source.update.enabled=false +source.update.before.seconds=60 + diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties index dfac2d81895..c07e79f049c 100644 --- a/inlong-manager/manager-web/src/main/resources/application-prod.properties +++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties @@ -84,3 +84,8 @@ data.cleansing.batchSize=100 # Whether to use ZooKeeper to manage the Sort task config, default is false, which means not using ZooKeeper sort.enable.zookeeper=false + +# If turned on, synchronizing change the source status when the agent heartbeat times out +source.update.enabled=false +source.update.before.seconds=60 + diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties index 53dd907c1f0..13822969a48 100644 --- a/inlong-manager/manager-web/src/main/resources/application-test.properties +++ b/inlong-manager/manager-web/src/main/resources/application-test.properties @@ -85,3 +85,7 @@ data.cleansing.batchSize=100 # Whether to use ZooKeeper to manage the Sort task config, default is false, which means not using ZooKeeper sort.enable.zookeeper=false + +# If turned on, synchronizing change the source status when the agent heartbeat times out +source.update.enabled=false +source.update.before.seconds=60