diff --git a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/AddressInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/AddressInfo.java new file mode 100644 index 00000000000..c07c7faf695 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/AddressInfo.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.heartbeat; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class AddressInfo { + + /** + * Ip of component + */ + private String ip; + + /** + * Port of component + */ + private String port; + + /** + * Report source type of component + */ + private String reportSourceType; + + private String protocolType; + +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java index 3a54b714f78..43ede0823bb 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java @@ -50,6 +50,8 @@ public class HeartbeatMsg { */ private String port; + private List addressInfos; + /** * ProtocolType of component */ @@ -60,6 +62,11 @@ public class HeartbeatMsg { */ private String componentType; + /** + * Type of report source + */ + private String reportSourceType; + /** * Report time millis of component */ diff --git a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ReportResourceType.java b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ReportResourceType.java new file mode 100644 index 00000000000..3001622df6f --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ReportResourceType.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.heartbeat; + +/** + * Constants of reportResource + */ +public class ReportResourceType { + + public static final String INLONG = "INLONG"; + +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeResponse.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeResponse.java index cccf61d428d..7b53ef96db9 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeResponse.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeResponse.java @@ -33,6 +33,8 @@ public class DataProxyNodeResponse { @Deprecated private Integer clusterId; + private String reportSourceType; + /** * Is the DataProxy cluster an intranet? 0: no, 1: yes */ diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/dataproxy/DataProxyClusterNodeDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/dataproxy/DataProxyClusterNodeDTO.java new file mode 100644 index 00000000000..af44f1c7b52 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/dataproxy/DataProxyClusterNodeDTO.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.cluster.dataproxy; + +import org.apache.inlong.common.heartbeat.ReportResourceType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.JsonUtils; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import javax.validation.constraints.NotNull; + +/** + * DataProxy cluster node info + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@ApiModel("DataProxy cluster node info") +public class DataProxyClusterNodeDTO { + + @ApiModelProperty("Report source type") + private String reportSourceType = ReportResourceType.INLONG; + + /** + * Get the dto instance from the request + */ + public static DataProxyClusterNodeDTO getFromRequest(DataProxyClusterNodeRequest request) { + return DataProxyClusterNodeDTO.builder() + .reportSourceType(request.getReportSourceType()) + .build(); + } + + /** + * Get the dto instance from the JSON string. + */ + public static DataProxyClusterNodeDTO getFromJson(@NotNull String extParams) { + try { + return JsonUtils.parseObject(extParams, DataProxyClusterNodeDTO.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT.getMessage() + ": " + e.getMessage()); + } + } + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/dataproxy/DataProxyClusterNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/dataproxy/DataProxyClusterNodeRequest.java new file mode 100644 index 00000000000..2d9c7007b2b --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/dataproxy/DataProxyClusterNodeRequest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.cluster.dataproxy; + +import org.apache.inlong.manager.common.enums.ClusterType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * Inlong cluster node request for Agent + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = ClusterType.DATAPROXY) +@ApiModel("Inlong cluster node request for dataproxy") +public class DataProxyClusterNodeRequest extends ClusterNodeRequest { + + @ApiModelProperty("Report source type") + private String reportSourceType; + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/dataproxy/DataProxyClusterNodeResponse.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/dataproxy/DataProxyClusterNodeResponse.java new file mode 100644 index 00000000000..56469c6681d --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/dataproxy/DataProxyClusterNodeResponse.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.cluster.dataproxy; + +import org.apache.inlong.manager.common.enums.ClusterType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * Dataproxy cluster node response + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = ClusterType.DATAPROXY) +@ApiModel("Inlong cluster node response for dataproxy") +public class DataProxyClusterNodeResponse extends ClusterNodeResponse { + + @ApiModelProperty("Report source type") + private String reportSourceType; + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java index dd35ec8e2d6..5062e4e7300 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java @@ -407,6 +407,16 @@ List listNodeByGroupId( */ DataProxyNodeResponse getDataProxyNodes(String inlongGroupId, String protocolType); + /** + * Query data proxy nodes by the given inlong group id and protocol type + * + * @param clusterName inlong cluster name + * @param protocolType protocol type + * @param reportSourceType report source type + * @return data proxy node response + */ + DataProxyNodeResponse getDataProxyNodesByCluster(String clusterName, String protocolType, String reportSourceType); + /** * Get the configuration of DataProxy through the cluster name to which DataProxy belongs. * diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java index 123221ff93c..f1bcfa3ab10 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java @@ -19,6 +19,7 @@ import org.apache.inlong.common.constant.Constants; import org.apache.inlong.common.constant.MQType; +import org.apache.inlong.common.heartbeat.ReportResourceType; import org.apache.inlong.common.pojo.audit.AuditConfig; import org.apache.inlong.common.pojo.audit.MQInfo; import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster; @@ -60,6 +61,7 @@ import org.apache.inlong.manager.pojo.cluster.TenantClusterTagInfo; import org.apache.inlong.manager.pojo.cluster.TenantClusterTagPageRequest; import org.apache.inlong.manager.pojo.cluster.TenantClusterTagRequest; +import org.apache.inlong.manager.pojo.cluster.dataproxy.DataProxyClusterNodeDTO; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.UpdateResult; @@ -1178,6 +1180,60 @@ public DataProxyNodeResponse getDataProxyNodes(String groupId, String protocolTy return response; } + @Override + public DataProxyNodeResponse getDataProxyNodesByCluster(String clusterName, String protocolType, + String reportSourceType) { + LOGGER.debug("begin to get data proxy nodes for clusterName={}, protocol={}", clusterName, protocolType); + InlongClusterEntity clusterEntity = clusterMapper.selectByNameAndType(clusterName, ClusterType.DATAPROXY); + DataProxyNodeResponse response = new DataProxyNodeResponse(); + if (clusterEntity == null) { + LOGGER.debug("not any dataproxy cluster for clusterName={}, protocol={}", clusterName, protocolType); + return response; + } + List nodeEntities = + clusterNodeMapper.selectByParentId(clusterEntity.getId(), protocolType); + if (CollectionUtils.isEmpty(nodeEntities)) { + LOGGER.debug("not any data proxy node for clusterName={}, protocol={}", clusterName, protocolType); + return response; + } + // all cluster nodes belong to the same clusterId + response.setClusterId(clusterEntity.getId()); + // TODO consider the data proxy load and re-balance + List nodeList = new ArrayList<>(); + for (InlongClusterNodeEntity nodeEntity : nodeEntities) { + if (Objects.equals(nodeEntity.getStatus(), NodeStatus.HEARTBEAT_TIMEOUT.getStatus())) { + LOGGER.debug("dataproxy node was timeout, parentId={} ip={} port={}", nodeEntity.getParentId(), + nodeEntity.getIp(), nodeEntity.getPort()); + continue; + } + if (StringUtils.isNotBlank(nodeEntity.getExtParams())) { + DataProxyClusterNodeDTO dataProxyClusterNodeDTO = DataProxyClusterNodeDTO.getFromJson( + nodeEntity.getExtParams()); + if (StringUtils.isBlank(dataProxyClusterNodeDTO.getReportSourceType())) { + dataProxyClusterNodeDTO.setReportSourceType(ReportResourceType.INLONG); + } + if (StringUtils.isNotBlank(reportSourceType) && !Objects.equals( + dataProxyClusterNodeDTO.getReportSourceType(), reportSourceType)) { + continue; + } + } + DataProxyNodeInfo nodeInfo = new DataProxyNodeInfo(); + nodeInfo.setId(nodeEntity.getId()); + nodeInfo.setIp(nodeEntity.getIp()); + nodeInfo.setPort(nodeEntity.getPort()); + nodeInfo.setProtocolType(nodeEntity.getProtocolType()); + nodeInfo.setNodeLoad(nodeEntity.getNodeLoad()); + nodeList.add(nodeInfo); + } + response.setNodeList(nodeList); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("success to get dp nodes for clusterName={}, protocol={}, result={}", + clusterName, protocolType, response); + } + return response; + } + private List getClusterNodes(String groupId, String clusterType, String protocolType) { InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId); if (groupEntity == null) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java index 291218ecd5f..53466f13cc4 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java @@ -19,6 +19,7 @@ import org.apache.inlong.common.enums.NodeSrvStatus; import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager; +import org.apache.inlong.common.heartbeat.AddressInfo; import org.apache.inlong.common.heartbeat.ComponentHeartbeat; import org.apache.inlong.common.heartbeat.HeartbeatMsg; import org.apache.inlong.manager.common.consts.InlongConstants; @@ -38,6 +39,7 @@ import org.apache.inlong.manager.pojo.cluster.ClusterInfo; import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest; import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO; +import org.apache.inlong.manager.pojo.cluster.dataproxy.DataProxyClusterNodeDTO; import org.apache.inlong.manager.service.cluster.InlongClusterOperator; import org.apache.inlong.manager.service.cluster.InlongClusterOperatorFactory; @@ -153,10 +155,25 @@ public void reportHeartbeat(HeartbeatMsg heartbeat) { // if the heartbeat was not in the cache, insert or update the node by the heartbeat info HeartbeatMsg lastHeartbeat = heartbeatCache.getIfPresent(componentHeartbeat); + if (heartbeat.getAddressInfos() != null) { + heartbeat.setPort(Joiner.on(InlongConstants.COMMA) + .join(heartbeat.getAddressInfos().stream().map(AddressInfo::getPort).collect(Collectors.toList()))); + heartbeat.setIp(Joiner.on(InlongConstants.COMMA) + .join(heartbeat.getAddressInfos().stream().map(AddressInfo::getIp).collect(Collectors.toList()))); + heartbeat.setReportSourceType(Joiner.on(InlongConstants.COMMA).join(heartbeat.getAddressInfos().stream() + .map(AddressInfo::getReportSourceType).collect(Collectors.toList()))); + } // protocolType may be null, and the protocolTypes' length may be less than ports' length String[] ports = heartbeat.getPort().split(InlongConstants.COMMA); String[] ips = heartbeat.getIp().split(InlongConstants.COMMA); + String[] reportSourceTypes = null; + if (StringUtils.isNotBlank(heartbeat.getReportSourceType()) && ports.length > 1) { + reportSourceTypes = heartbeat.getReportSourceType().split(InlongConstants.COMMA); + if (reportSourceTypes.length < ports.length) { + reportSourceTypes = null; + } + } String protocolType = heartbeat.getProtocolType(); String[] protocolTypes = null; if (StringUtils.isNotBlank(protocolType) && ports.length > 1) { @@ -173,6 +190,11 @@ public void reportHeartbeat(HeartbeatMsg heartbeat) { assert heartbeatMsg != null; heartbeatMsg.setPort(ports[i].trim()); heartbeatMsg.setIp(ips[i].trim()); + if (reportSourceTypes != null) { + heartbeatMsg.setReportSourceType(reportSourceTypes[i].trim()); + } else { + heartbeatMsg.setReportSourceType(heartbeat.getReportSourceType()); + } if (protocolTypes != null) { heartbeatMsg.setProtocolType(protocolTypes[i]); } else { @@ -301,12 +323,26 @@ private int updateClusterNode(InlongClusterNodeEntity clusterNode, HeartbeatMsg private void insertOrUpdateNodeGroup(InlongClusterNodeEntity clusterNode, HeartbeatMsg heartbeat) { Set groupSet = StringUtils.isBlank(heartbeat.getNodeGroup()) ? new HashSet<>() : Arrays.stream(heartbeat.getNodeGroup().split(InlongConstants.COMMA)).collect(Collectors.toSet()); - AgentClusterNodeDTO agentClusterNodeDTO = new AgentClusterNodeDTO(); - if (StringUtils.isNotBlank(clusterNode.getExtParams())) { - agentClusterNodeDTO = AgentClusterNodeDTO.getFromJson(clusterNode.getExtParams()); - agentClusterNodeDTO.setAgentGroup(Joiner.on(InlongConstants.COMMA).join(groupSet)); + String extParams = null; + switch (clusterNode.getType()) { + case ClusterType.AGENT: + AgentClusterNodeDTO agentClusterNodeDTO = new AgentClusterNodeDTO(); + if (StringUtils.isNotBlank(clusterNode.getExtParams())) { + agentClusterNodeDTO = AgentClusterNodeDTO.getFromJson(clusterNode.getExtParams()); + agentClusterNodeDTO.setAgentGroup(Joiner.on(InlongConstants.COMMA).join(groupSet)); + } + extParams = GSON.toJson(agentClusterNodeDTO); + break; + case ClusterType.DATAPROXY: + DataProxyClusterNodeDTO dataProxyClusterNodeDTO = new DataProxyClusterNodeDTO(); + if (StringUtils.isNotBlank(clusterNode.getExtParams())) { + dataProxyClusterNodeDTO = DataProxyClusterNodeDTO.getFromJson(clusterNode.getExtParams()); + dataProxyClusterNodeDTO.setReportSourceType(heartbeat.getReportSourceType()); + } + extParams = GSON.toJson(dataProxyClusterNodeDTO); + break; } - clusterNode.setExtParams(GSON.toJson(agentClusterNodeDTO)); + clusterNode.setExtParams(extParams); } private int deleteClusterNode(InlongClusterNodeEntity clusterNode) { diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java index cea54c8c822..3c382760008 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java @@ -59,6 +59,14 @@ public Response getIpList(@PathVariable String inlongGrou return Response.success(clusterService.getDataProxyNodes(inlongGroupId, protocolType)); } + @PostMapping(value = "/dataproxy/getIpListByClusterName/{clusterName}") + @ApiOperation(value = "Get data proxy IP list by clusterName") + public Response getIpListByClusterName(@PathVariable String clusterName, + @RequestParam(required = false) String protocolType, + @RequestParam(required = false) String reportSourceType) { + return Response.success(clusterService.getDataProxyNodesByCluster(clusterName, protocolType, reportSourceType)); + } + @PostMapping("/dataproxy/getConfig") @ApiOperation(value = "Get data proxy topic list") public Response getConfig(@RequestBody DataProxyConfigRequest request) {