Skip to content

Commit

Permalink
[INLONG-7912][Manager] Fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
fuweng11 committed Apr 27, 2023
1 parent e89456a commit f6835ee
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1156,7 +1156,7 @@ public DataProxyNodeResponse getDataProxyNodes(String groupId, String protocolTy
List<DataProxyNodeInfo> nodeList = new ArrayList<>();
for (InlongClusterNodeEntity nodeEntity : nodeEntities) {
if (Objects.equals(nodeEntity.getStatus(), NodeStatus.HEARTBEAT_TIMEOUT.getStatus())) {
LOGGER.debug("data proxy is heart time out by parentId={}, ip={}, port={}", nodeEntity.getParentId(),
LOGGER.debug("dataproxy node was timeout, parentId={} ip={} port={}", nodeEntity.getParentId(),
nodeEntity.getIp(), nodeEntity.getPort());
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,12 @@ public Boolean updateClusterNode(Integer id, Integer parentId, String clusterTyp
return clusterService.updateNode(request, GLOBAL_OPERATOR);
}

private HeartbeatMsg createHeartbeatMsg(String clusterName, String ip, String port, String type) {
private HeartbeatMsg createHeartbeatMsg(String clusterName, String ip, String port, String type, String protocolType) {
HeartbeatMsg heartbeatMsg = new HeartbeatMsg();
heartbeatMsg.setIp(ip);
heartbeatMsg.setPort(port);
heartbeatMsg.setClusterTag("default_cluster");
heartbeatMsg.setProtocolType(ProtocolType.HTTP);
heartbeatMsg.setProtocolType(protocolType);
heartbeatMsg.setLoad(0xFFFF);
heartbeatMsg.setComponentType(type);
heartbeatMsg.setReportTime(System.currentTimeMillis());
Expand Down Expand Up @@ -330,13 +330,6 @@ public void testDataProxyCluster() {
Integer nodeId2 = this.saveClusterNode(id, ClusterType.DATAPROXY, ip, port2, ProtocolType.TCP);
Assertions.assertNotNull(nodeId2);

// report heartbeat
HeartbeatMsg msg1 = createHeartbeatMsg(clusterName, ip, String.valueOf(port1),
ComponentTypeEnum.DataProxy.getType());
heartbeatManager.reportHeartbeat(msg1);
HeartbeatMsg msg2 = createHeartbeatMsg(clusterName, ip, String.valueOf(port2),
ComponentTypeEnum.DataProxy.getType());
heartbeatManager.reportHeartbeat(msg2);

// create an inlong group which use the clusterTag
String inlongGroupId = "test_cluster_tag_group";
Expand All @@ -345,6 +338,13 @@ public void testDataProxyCluster() {
updateGroupInfo.setInlongClusterTag(clusterTag);
groupService.update(updateGroupInfo.genRequest(), GLOBAL_OPERATOR);

// report heartbeat
HeartbeatMsg msg1 = createHeartbeatMsg(clusterName, ip, String.valueOf(port1),
ComponentTypeEnum.DataProxy.getType(), ProtocolType.TCP);
heartbeatManager.reportHeartbeat(msg1);
HeartbeatMsg msg2 = createHeartbeatMsg(clusterName, ip, String.valueOf(port2),
ComponentTypeEnum.DataProxy.getType(), ProtocolType.TCP);
heartbeatManager.reportHeartbeat(msg2);
// get the data proxy nodes, the first port should is p1, second port is p2
DataProxyNodeResponse nodeResponse = clusterService.getDataProxyNodes(inlongGroupId, ProtocolType.TCP);
List<DataProxyNodeInfo> nodeInfoList = nodeResponse.getNodeList();
Expand All @@ -353,6 +353,13 @@ public void testDataProxyCluster() {
Assertions.assertEquals(port1, nodeInfoList.get(0).getPort());
Assertions.assertEquals(port2, nodeInfoList.get(1).getPort());

// report heartbeat
HeartbeatMsg msg3 = createHeartbeatMsg(clusterName, ip, String.valueOf(port1),
ComponentTypeEnum.DataProxy.getType(), ProtocolType.HTTP);
heartbeatManager.reportHeartbeat(msg3);
HeartbeatMsg msg4 = createHeartbeatMsg(clusterName, ip, String.valueOf(port2),
ComponentTypeEnum.DataProxy.getType(), ProtocolType.HTTP);
heartbeatManager.reportHeartbeat(msg4);
nodeResponse = clusterService.getDataProxyNodes(inlongGroupId, ProtocolType.HTTP);
nodeInfoList = nodeResponse.getNodeList();
nodeInfoList.sort(Comparator.comparingInt(DataProxyNodeInfo::getId));
Expand Down

0 comments on commit f6835ee

Please sign in to comment.