Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-7987][Manager] Add heartbeat timeout status to the source #7989

Merged
merged 18 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
| [INLONG-8006](https://github.com/apache/inlong/issues/8006) | [Improve][Manager] Set displayname for the auto-registered cluster |
| [INLONG-7999](https://github.com/apache/inlong/issues/7999) | [Improve][Manager] Support PostgreSQL data node |
| [INLONG-7996](https://github.com/apache/inlong/issues/7996) | [Improve][Manager] Support issued kafka consumer group to sort |
| [INLONG-7987](https://github.com/apache/inlong/issues/7987) | [Improve][Manager] Add a heartbeat timeout status to the source |
| [INLONG-7981](https://github.com/apache/inlong/issues/7981) | [Bug][Manager] Failed to stop source correctly when suspend a group |
| [INLONG-7948](https://github.com/apache/inlong/issues/7948) | [Improve][Manager] Add user authentication when operate inlong consume |
| [INLONG-7946](https://github.com/apache/inlong/issues/7946) | [Improve][Manager] Add user authentication when bind clusterTag |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public static SimpleSourceStatus parseByStatus(int status) {
case SOURCE_STOP:
return FROZEN;
case SOURCE_FAILED:
case HEARTBEAT_TIMEOUT:
return FAILED;
case TO_BE_ISSUED_DELETE:
case BEEN_ISSUED_DELETE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public enum SourceStatus {
SOURCE_NORMAL(101, "normal"),
SOURCE_FAILED(102, "failed"),
SOURCE_STOP(104, "stop"),
HEARTBEAT_TIMEOUT(105, "heartbeat timeout"),

// if not approved
SOURCE_NEW(110, "new created"),
Expand Down Expand Up @@ -76,7 +77,7 @@ public enum SourceStatus {
*/
public static final Set<Integer> ALLOWED_UPDATE = Sets.newHashSet(
SOURCE_NEW.getCode(), SOURCE_FAILED.getCode(), SOURCE_STOP.getCode(),
SOURCE_NORMAL.getCode());
SOURCE_NORMAL.getCode(), HEARTBEAT_TIMEOUT.getCode());

public static final Set<SourceStatus> TOBE_ISSUED_SET = Sets.newHashSet(
TO_BE_ISSUED_ADD, TO_BE_ISSUED_DELETE, TO_BE_ISSUED_RETRY,
Expand All @@ -87,59 +88,75 @@ public enum SourceStatus {

static {
// new
SOURCE_STATE_AUTOMATON.put(SOURCE_NEW, Sets.newHashSet(SOURCE_DISABLE, SOURCE_NEW, TO_BE_ISSUED_ADD));
SOURCE_STATE_AUTOMATON.put(SOURCE_NEW,
Sets.newHashSet(SOURCE_DISABLE, SOURCE_NEW, TO_BE_ISSUED_ADD, HEARTBEAT_TIMEOUT));

// normal
SOURCE_STATE_AUTOMATON.put(SOURCE_NORMAL,
Sets.newHashSet(SOURCE_DISABLE, SOURCE_NORMAL, SOURCE_FAILED, TO_BE_ISSUED_DELETE,
TO_BE_ISSUED_RETRY, TO_BE_ISSUED_BACKTRACK, TO_BE_ISSUED_STOP, TO_BE_ISSUED_ACTIVE,
TO_BE_ISSUED_CHECK, TO_BE_ISSUED_REDO_METRIC, TO_BE_ISSUED_MAKEUP));
TO_BE_ISSUED_CHECK, TO_BE_ISSUED_REDO_METRIC, TO_BE_ISSUED_MAKEUP, HEARTBEAT_TIMEOUT));

// failed
SOURCE_STATE_AUTOMATON.put(SOURCE_FAILED, Sets.newHashSet(SOURCE_DISABLE, SOURCE_FAILED, TO_BE_ISSUED_RETRY));
SOURCE_STATE_AUTOMATON.put(SOURCE_FAILED,
Sets.newHashSet(SOURCE_DISABLE, SOURCE_FAILED, TO_BE_ISSUED_RETRY, HEARTBEAT_TIMEOUT));

// frozen
SOURCE_STATE_AUTOMATON.put(SOURCE_STOP, Sets.newHashSet(SOURCE_DISABLE, SOURCE_STOP, TO_BE_ISSUED_ACTIVE));
SOURCE_STATE_AUTOMATON.put(SOURCE_STOP,
Sets.newHashSet(SOURCE_DISABLE, SOURCE_STOP, TO_BE_ISSUED_ACTIVE, HEARTBEAT_TIMEOUT));

// [xxx] bo be issued
HashSet<SourceStatus> tobeAddNext = Sets.newHashSet(BEEN_ISSUED_ADD, SOURCE_DISABLE);
HashSet<SourceStatus> tobeAddNext = Sets.newHashSet(BEEN_ISSUED_ADD, SOURCE_DISABLE, HEARTBEAT_TIMEOUT);
tobeAddNext.addAll(TOBE_ISSUED_SET);
SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ADD, tobeAddNext);
HashSet<SourceStatus> tobeDeleteNext = Sets.newHashSet(BEEN_ISSUED_DELETE);
HashSet<SourceStatus> tobeDeleteNext = Sets.newHashSet(BEEN_ISSUED_DELETE, HEARTBEAT_TIMEOUT);
tobeDeleteNext.addAll(TOBE_ISSUED_SET);
SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_DELETE, Sets.newHashSet(tobeDeleteNext));
HashSet<SourceStatus> tobeRetryNext = Sets.newHashSet(BEEN_ISSUED_RETRY);
HashSet<SourceStatus> tobeRetryNext = Sets.newHashSet(BEEN_ISSUED_RETRY, HEARTBEAT_TIMEOUT);
tobeRetryNext.addAll(TOBE_ISSUED_SET);
SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_RETRY, Sets.newHashSet(tobeRetryNext));
HashSet<SourceStatus> tobeBacktrackNext = Sets.newHashSet(BEEN_ISSUED_BACKTRACK);
HashSet<SourceStatus> tobeBacktrackNext = Sets.newHashSet(BEEN_ISSUED_BACKTRACK, HEARTBEAT_TIMEOUT);
tobeBacktrackNext.addAll(TOBE_ISSUED_SET);
SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_BACKTRACK, Sets.newHashSet(tobeBacktrackNext));
HashSet<SourceStatus> tobeFrozenNext = Sets.newHashSet(BEEN_ISSUED_STOP);
HashSet<SourceStatus> tobeFrozenNext = Sets.newHashSet(BEEN_ISSUED_STOP, HEARTBEAT_TIMEOUT);
tobeFrozenNext.addAll(TOBE_ISSUED_SET);
SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_STOP, Sets.newHashSet(tobeFrozenNext));
HashSet<SourceStatus> tobeActiveNext = Sets.newHashSet(BEEN_ISSUED_ACTIVE);
HashSet<SourceStatus> tobeActiveNext = Sets.newHashSet(BEEN_ISSUED_ACTIVE, HEARTBEAT_TIMEOUT);
tobeActiveNext.addAll(TOBE_ISSUED_SET);
SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ACTIVE, Sets.newHashSet(tobeActiveNext));
HashSet<SourceStatus> tobeCheckNext = Sets.newHashSet(BEEN_ISSUED_CHECK);
HashSet<SourceStatus> tobeCheckNext = Sets.newHashSet(BEEN_ISSUED_CHECK, HEARTBEAT_TIMEOUT);
tobeCheckNext.addAll(TOBE_ISSUED_SET);
SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_CHECK, Sets.newHashSet(tobeCheckNext));
HashSet<SourceStatus> tobeRedoMetricNext = Sets.newHashSet(BEEN_ISSUED_REDO_METRIC);
HashSet<SourceStatus> tobeRedoMetricNext = Sets.newHashSet(BEEN_ISSUED_REDO_METRIC, HEARTBEAT_TIMEOUT);
tobeRedoMetricNext.addAll(TOBE_ISSUED_SET);
SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_REDO_METRIC, Sets.newHashSet(tobeRedoMetricNext));
HashSet<SourceStatus> tobeMakeupNext = Sets.newHashSet(BEEN_ISSUED_MAKEUP);
HashSet<SourceStatus> tobeMakeupNext = Sets.newHashSet(BEEN_ISSUED_MAKEUP, HEARTBEAT_TIMEOUT);
tobeMakeupNext.addAll(TOBE_ISSUED_SET);
SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_MAKEUP, Sets.newHashSet(tobeMakeupNext));

// [xxx] been issued
SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_ADD, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_DELETE, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_RETRY, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_BACKTRACK, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_STOP, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_ACTIVE, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_CHECK, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_REDO_METRIC, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_MAKEUP, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_ADD, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT));
SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_DELETE,
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT));
SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_RETRY, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT));
SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_BACKTRACK,
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT));
SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_STOP, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT));
SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_ACTIVE,
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT));
SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_CHECK, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT));
SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_REDO_METRIC,
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT));
SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_MAKEUP,
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT));
SOURCE_STATE_AUTOMATON.put(HEARTBEAT_TIMEOUT,
Sets.newHashSet(SOURCE_DISABLE, SOURCE_NORMAL, SOURCE_FAILED, SOURCE_STOP, TO_BE_ISSUED_ADD,
TO_BE_ISSUED_DELETE,
TO_BE_ISSUED_RETRY, TO_BE_ISSUED_BACKTRACK, TO_BE_ISSUED_STOP, TO_BE_ISSUED_ACTIVE,
TO_BE_ISSUED_CHECK, TO_BE_ISSUED_REDO_METRIC, TO_BE_ISSUED_MAKEUP, BEEN_ISSUED_ADD,
BEEN_ISSUED_DELETE, BEEN_ISSUED_RETRY, BEEN_ISSUED_BACKTRACK, BEEN_ISSUED_STOP,
BEEN_ISSUED_ACTIVE, BEEN_ISSUED_CHECK, BEEN_ISSUED_REDO_METRIC, BEEN_ISSUED_MAKEUP,
HEARTBEAT_TIMEOUT));
}

private final Integer code;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ List<InlongClusterNodeEntity> selectByParentId(@Param("parentId") Integer parent

int updateByIdSelective(InlongClusterNodeEntity record);

/**
* Update the status to `nextStatus` by the given id.
*/
int updateStatus(@Param("id") Integer id, @Param("nextStatus") Integer nextStatus, @Param("status") Integer status);

int deleteById(Integer id);

}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ List<StreamSourceEntity> selectByStatusAndCluster(@Param("statusList") List<Inte
List<Integer> selectNeedUpdateIdsByClusterAndDataNode(@Param("clusterName") String clusterName,
@Param("nodeName") String nodeName, @Param("sourceType") String sourceType);

/**
* Query need update tasks by the given status list and type List.
*/
List<Integer> selectHeartbeatTimeoutIds(@Param("sourceTypeList") List<String> sourceTypeList,
@Param("agentIp") String agentIp,
@Param("clusterName") String clusterName);

int updateByPrimaryKeySelective(StreamSourceEntity record);

int updateByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId,
Expand Down Expand Up @@ -164,6 +171,21 @@ int updateIpAndUuid(@Param("id") Integer id, @Param("agentIp") String agentIp, @
void updateStatusByIds(@Param("idList") List<Integer> idList, @Param("status") Integer status,
@Param("operator") String operator);

/**
* Update the source status
*
* @param idList source id list
* @param operator operator name
*/
void rollbackTimeoutStatusByIds(@Param("idList") List<Integer> idList, @Param("operator") String operator);

/**
* Update the source status when it has been deleted
*
* @param beforeSeconds the modified time was beforeSeconds seconds ago
*/
void updateStatusToTimeout(@Param("beforeSeconds") Integer beforeSeconds);

/**
* Physical delete stream sources by group id and stream id
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,23 @@
where id = #{id,jdbcType=INTEGER}
and version = #{version,jdbcType=INTEGER}
</update>

<update id="updateStatus">
update inlong_cluster_node
<set>
<if test="nextStatus != null">
status = #{nextStatus,jdbcType=INTEGER}
</if>
</set>
<where>
is_deleted = 0
<if test="status != null">
and status = #{status,jdbcType=INTEGER}
</if>
<if test="id != null">
and id = #{id,jdbcType=INTEGER}
</if>
</where>
</update>
<delete id="deleteById" parameterType="java.lang.Integer">
delete
from inlong_cluster_node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,24 @@
</if>
</where>
</select>
<select id="selectHeartbeatTimeoutIds" resultType="java.lang.Integer">
select id
from stream_source
<where>
is_deleted = 0
and status = 105
<if test="sourceTypeList != null and sourceTypeList.size()>0">
and source_type in
<foreach item="item" index="index" collection="sourceTypeList" open="(" close=")" separator=",">
#{item}
</foreach>
</if>
<if test="agentIp != null and agentIp != ''">
and agent_ip = #{agentIp, jdbcType=VARCHAR}
</if>
and inlong_cluster_name = #{clusterName, jdbcType=VARCHAR}
</where>
</select>
<update id="updateByRelatedId">
update stream_source
<set>
Expand Down Expand Up @@ -484,9 +502,11 @@
update stream_source
<set>
previous_status = status,
status = #{status, jdbcType=INTEGER},
modifier = #{operator, jdbcType=VARCHAR},
version = version + 1
status = #{status, jdbcType=INTEGER},
<if test="operator != null and operator !=''">
modifier = #{operator, jdbcType=VARCHAR},
</if>
version = version + 1
</set>
<where>
is_deleted = 0
Expand All @@ -498,6 +518,39 @@
</if>
</where>
</update>
<update id="rollbackTimeoutStatusByIds">
update stream_source
<set>
status = previous_status,
previous_status = 105,
<if test="operator != null and operator !=''">
modifier = #{operator, jdbcType=VARCHAR},
</if>
version = version + 1
</set>
<where>
is_deleted = 0
and status = 105
<if test="idList != null and idList.size() > 0">
and id in
<foreach item="item" index="index" collection="idList" open="(" close=")" separator=",">
#{item}
</foreach>
</if>
</where>
</update>
<update id="updateStatusToTimeout">
update stream_source
<set>
previous_status = status,
status = 105
fuweng11 marked this conversation as resolved.
Show resolved Hide resolved
</set>
<where>
is_deleted = 0
and modify_time &lt;= DATE_ADD(NOW(), INTERVAL -#{beforeSeconds, jdbcType=INTEGER} SECOND)
and status in (200, 201, 202, 203, 204, 205, 206, 207, 208)
</where>
</update>

<delete id="deleteByRelatedId">
delete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class BindTagRequest {
@ApiModelProperty(value = "Cluster tag")
@NotBlank(message = "clusterTag cannot be blank")
@Length(min = 1, max = 128, message = "length must be between 1 and 128")
@Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'")
@Pattern(regexp = "^[a-z0-9_.-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'")
private String clusterTag;

@ApiModelProperty(value = "Cluster-ID list which needs to bind tag")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class ClusterTagRequest {
@ApiModelProperty(value = "Cluster tag")
@NotBlank(groups = SaveValidation.class, message = "clusterTag cannot be blank")
@Length(min = 1, max = 128, message = "length must be between 1 and 128")
@Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'")
@Pattern(regexp = "^[a-z0-9_.-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'")
private String clusterTag;

@ApiModelProperty(value = "Extended params")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class SourceRequest {
@ApiModelProperty("Source name, unique in one stream")
@NotBlank(groups = SaveValidation.class, message = "sourceName cannot be blank")
@Length(min = 1, max = 100, message = "sourceName length must be between 1 and 100")
@Pattern(regexp = "^[a-z0-9_-]{1,100}$", message = "sourceName only supports lowercase letters, numbers, '-', or '_'")
@Pattern(regexp = "^[a-z0-9_.-]{1,100}$", message = "sourceName only supports lowercase letters, numbers, '-', or '_'")
private String sourceName;

@ApiModelProperty("Ip of the agent running the task")
Expand All @@ -85,7 +85,7 @@ public class SourceRequest {

@ApiModelProperty("Inlong cluster node label for filtering stream source collect task")
@Length(min = 1, max = 128, message = "length must be between 1 and 128")
@Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'")
@Pattern(regexp = "^[a-z0-9_.-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'")
private String inlongClusterNodeGroup;

@ApiModelProperty("Data node name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1240,7 +1240,7 @@ public DataProxyConfig getDataProxyConfig(String clusterTag, String clusterName)
clusterEntityList.forEach(e -> tagSet.addAll(Arrays.asList(e.getClusterTags().split(InlongConstants.COMMA))));
List<String> clusterTagList = new ArrayList<>(tagSet);
InlongGroupPageRequest groupRequest = InlongGroupPageRequest.builder()
.status(GroupStatus.CONFIG_SUCCESSFUL.getCode())
.statusList(Arrays.asList(GroupStatus.CONFIG_SUCCESSFUL.getCode(), GroupStatus.RESTARTED.getCode()))
.clusterTagList(clusterTagList)
.build();

Expand Down
Loading