Skip to content

Commit

Permalink
[INLONG-8393][DataProxy] Optimize the HeartbeatManager class (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
gosonzhang authored Dec 23, 2023
1 parent 5870135 commit 9d745b8
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.dataproxy.config;

import org.apache.inlong.common.heartbeat.AddressInfo;
import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigRequest;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
Expand All @@ -25,7 +26,6 @@
import org.apache.inlong.dataproxy.config.holder.GroupIdNumConfigHolder;
import org.apache.inlong.dataproxy.config.holder.MetaConfigHolder;
import org.apache.inlong.dataproxy.config.holder.SourceReportConfigHolder;
import org.apache.inlong.dataproxy.config.holder.SourceReportInfo;
import org.apache.inlong.dataproxy.config.holder.WeightConfigHolder;
import org.apache.inlong.dataproxy.config.holder.WhiteListConfigHolder;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
Expand All @@ -47,6 +47,7 @@
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -204,12 +205,13 @@ public boolean isIllegalIP(String strRemoteIP) {
|| whitelistConfigHolder.isIllegalIP(strRemoteIP);
}

public void addSourceReportInfo(String sourceIp, String sourcePort, String protocolType) {
sourceReportConfigHolder.addSourceInfo(sourceIp, sourcePort, protocolType);
public void addSourceReportInfo(String sourceIp,
String sourcePort, String rptSrcType, String protocolType) {
sourceReportConfigHolder.addSourceInfo(sourceIp, sourcePort, rptSrcType, protocolType);
}

public SourceReportInfo getSourceReportInfo() {
return sourceReportConfigHolder.getSourceReportInfo();
public Map<String, AddressInfo> getSrcAddressInfos() {
return sourceReportConfigHolder.getSrcAddressInfos();
}

public boolean isMqClusterReady() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

package org.apache.inlong.dataproxy.config.holder;

import org.apache.inlong.common.heartbeat.AddressInfo;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
*
Expand All @@ -36,57 +36,28 @@ public class SourceReportConfigHolder {
public static final Logger LOG =
LoggerFactory.getLogger(SourceReportConfigHolder.class);

private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Map<String, SourceReportInfo> sourceConfMap = new HashMap<>();
// Begin: this part can be optimized later
// after optimizing the implementation of the heartbeat reporting interface
// between Manager and DataProxy, the report is directly configured according to
// the SourceReportInfo of each source, instead of splicing report items separately.
private String ipSet = "";
private String portSet = "";
private String protocolTypeSet = "";
// end
private final Map<String, AddressInfo> srcAddressMap = new HashMap<>();

public SourceReportConfigHolder() {

}

public void addSourceInfo(String sourceIp, String sourcePort, String protocolType) {
public void addSourceInfo(String sourceIp,
String sourcePort, String rptSrcType, String protocolType) {
if (StringUtils.isEmpty(sourceIp)
|| StringUtils.isEmpty(sourcePort)
|| StringUtils.isEmpty(rptSrcType)
|| StringUtils.isEmpty(protocolType)) {
LOG.warn("[Source Report Holder] found empty parameter!, add values is {}, {}, {}",
sourceIp, sourcePort, protocolType);
LOG.warn("[Source Report Holder] found empty parameter!, add values is {}, {}, {}, {}",
sourceIp, sourcePort, rptSrcType, protocolType);
return;
}
String recordKey = sourceIp + "#" + sourcePort + "#" + protocolType;
SourceReportInfo sourceReportInfo =
new SourceReportInfo(sourceIp, sourcePort, protocolType);
try {
readWriteLock.writeLock().lock();
if (sourceConfMap.putIfAbsent(recordKey, sourceReportInfo) == null) {
if (ipSet.isEmpty()) {
ipSet = sourceIp;
portSet = sourcePort;
protocolTypeSet = protocolType;
} else {
ipSet += "," + sourceIp;
portSet += "," + sourcePort;
protocolTypeSet += "," + protocolType;
}
}
} finally {
readWriteLock.writeLock().unlock();
}
srcAddressMap.put(recordKey, new AddressInfo(sourceIp, sourcePort, rptSrcType, protocolType));
}

public SourceReportInfo getSourceReportInfo() {
try {
readWriteLock.readLock().lock();
return new SourceReportInfo(ipSet, portSet, protocolTypeSet);
} finally {
readWriteLock.readLock().unlock();
}
public Map<String, AddressInfo> getSrcAddressInfos() {
return this.srcAddressMap;
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import org.apache.inlong.common.enums.ComponentTypeEnum;
import org.apache.inlong.common.enums.NodeSrvStatus;
import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
import org.apache.inlong.common.heartbeat.AddressInfo;
import org.apache.inlong.common.heartbeat.GroupHeartbeat;
import org.apache.inlong.common.heartbeat.HeartbeatMsg;
import org.apache.inlong.common.heartbeat.StreamHeartbeat;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.holder.SourceReportInfo;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.utils.HttpUtils;

Expand All @@ -43,6 +43,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -132,16 +133,14 @@ private synchronized CloseableHttpClient constructHttpClient() {
private HeartbeatMsg buildHeartbeat() {
ConfigManager configManager = ConfigManager.getInstance();
HeartbeatMsg heartbeatMsg = new HeartbeatMsg();
SourceReportInfo reportInfo = configManager.getSourceReportInfo();
if (!validReportInfo(reportInfo)) {
Map<String, AddressInfo> reportInfoMap = configManager.getSrcAddressInfos();
if (reportInfoMap.isEmpty()) {
return null;
}
heartbeatMsg.setAddressInfos(new ArrayList<>(reportInfoMap.values()));
heartbeatMsg.setNodeSrvStatus(ConfigManager.getInstance().isMqClusterReady()
? NodeSrvStatus.OK
: NodeSrvStatus.SERVICE_UNREADY);
heartbeatMsg.setIp(reportInfo.getIp());
heartbeatMsg.setPort(reportInfo.getPort());
heartbeatMsg.setProtocolType(reportInfo.getProtocolType());
heartbeatMsg.setComponentType(ComponentTypeEnum.DataProxy.getType());
heartbeatMsg.setReportTime(System.currentTimeMillis());
heartbeatMsg.setLoad(0xffff);
Expand Down Expand Up @@ -183,8 +182,4 @@ private HeartbeatMsg buildHeartbeat() {
heartbeatMsg.setStreamHeartbeats(streamHeartbeats);
return heartbeatMsg;
}

private boolean validReportInfo(SourceReportInfo reportInfo) {
return StringUtils.isNotBlank(reportInfo.getIp()) && StringUtils.isNotBlank(reportInfo.getPort());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.dataproxy.source;

import org.apache.inlong.common.heartbeat.ReportResourceType;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.dataproxy.admin.ProxyServiceMBean;
import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor;
Expand Down Expand Up @@ -86,6 +87,8 @@ public abstract class BaseSource
// source serviced port
protected int srcPort;
protected String strPort;
// report source name
protected String rptSrcType;
// message factory name
protected String msgFactoryName;
// message handler name
Expand Down Expand Up @@ -143,8 +146,14 @@ public void configure(Context context) {
this.srcHost = getHostIp(context);
this.srcPort = getHostPort(context);
this.strPort = String.valueOf(this.srcPort);
// get source logic type
String tmpVal = context.getString(
SourceConstants.SRCCXT_LOGIC_TYPE_NAME, ReportResourceType.INLONG);
Preconditions.checkArgument(StringUtils.isNotBlank(tmpVal),
SourceConstants.SRCCXT_LOGIC_TYPE_NAME + " config is blank");
this.rptSrcType = tmpVal.trim().toUpperCase();
// get message factory
String tmpVal = context.getString(SourceConstants.SRCCXT_MSG_FACTORY_NAME,
tmpVal = context.getString(SourceConstants.SRCCXT_MSG_FACTORY_NAME,
ServerMessageFactory.class.getName()).trim();
Preconditions.checkArgument(StringUtils.isNotBlank(tmpVal),
SourceConstants.SRCCXT_MSG_FACTORY_NAME + " config is blank");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ public synchronized void startSource() {
this.getCachedSrcName(), srcHost, srcPort, e);
System.exit(-1);
}
ConfigManager.getInstance().addSourceReportInfo(
srcHost, String.valueOf(srcPort), getProtocolName().toUpperCase());
logger.info("Source {} started at ({}:{})!", this.getCachedSrcName(), srcHost, srcPort);
ConfigManager.getInstance().addSourceReportInfo(srcHost,
String.valueOf(srcPort), rptSrcType, getProtocolName().toUpperCase());
logger.info("Source {} started at ({}:{}), rptSrcType={}!",
this.getCachedSrcName(), srcHost, srcPort, rptSrcType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,10 @@ public synchronized void startSource() {
this.getCachedSrcName(), srcHost, srcPort, e);
System.exit(-1);
}
ConfigManager.getInstance().addSourceReportInfo(
srcHost, String.valueOf(srcPort), getProtocolName().toUpperCase());
logger.info("Source {} started at ({}:{})!", this.getCachedSrcName(), srcHost, srcPort);
ConfigManager.getInstance().addSourceReportInfo(srcHost,
String.valueOf(srcPort), rptSrcType, getProtocolName().toUpperCase());
logger.info("Source {} started at ({}:{}), rptSrcType={}!",
this.getCachedSrcName(), srcHost, srcPort, rptSrcType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ public void startSource() {
this.getCachedSrcName(), srcHost, srcPort, e);
System.exit(-1);
}
ConfigManager.getInstance().addSourceReportInfo(
srcHost, String.valueOf(srcPort), getProtocolName().toUpperCase());
logger.info("Source {} started at ({}:{})!", this.getCachedSrcName(), srcHost, srcPort);
ConfigManager.getInstance().addSourceReportInfo(srcHost,
String.valueOf(srcPort), rptSrcType, getProtocolName().toUpperCase());
logger.info("Source {} started at ({}:{}), rptSrcType={}!",
this.getCachedSrcName(), srcHost, srcPort, rptSrcType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class SourceConstants {
public static final String SRCCXT_CONFIG_PORT = "port";
// system env source port
public static final String SYSENV_HOST_PORT = "inlongHostPort";
// source logic type name
public static final String SRCCXT_LOGIC_TYPE_NAME = "logic-type-name";
// message factory name
public static final String SRCCXT_MSG_FACTORY_NAME = "msg-factory-name";
// message handler name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

package org.apache.inlong.dataproxy.config.holder;

import org.apache.inlong.common.heartbeat.AddressInfo;
import org.apache.inlong.dataproxy.config.ConfigManager;

import org.junit.Assert;
import org.junit.Test;

import java.util.Map;

/**
* Test for {@link SourceReportConfigHolder}
*/
Expand All @@ -30,21 +33,20 @@ public class TestSourceReportHolder {
@Test
public void testCase() {
// first get
SourceReportInfo reportInfo = ConfigManager.getInstance().getSourceReportInfo();
Assert.assertEquals(reportInfo.getIp(), "");
Assert.assertEquals(reportInfo.getPort(), "");
Assert.assertEquals(reportInfo.getProtocolType(), "");
Map<String, AddressInfo> srcAddressInfoMap = ConfigManager.getInstance().getSrcAddressInfos();
Assert.assertTrue(srcAddressInfoMap.isEmpty());
// add
ConfigManager.getInstance().addSourceReportInfo("0.0.0.0", "46801", "UDP");
ConfigManager.getInstance().addSourceReportInfo("0.0.0.0", "46801", "TCP");
ConfigManager.getInstance().addSourceReportInfo("127.0.0.1", "46802", "TCP");
ConfigManager.getInstance().addSourceReportInfo("127.0.0.1", "46803", "HTTP");
ConfigManager.getInstance().addSourceReportInfo("0.0.0.0", "46801", "TCP");
ConfigManager.getInstance().addSourceReportInfo("127.0.0.1", "46803", "HTTP");
ConfigManager.getInstance().addSourceReportInfo("0.0.0.0", "46801", "INLONG", "UDP");
ConfigManager.getInstance().addSourceReportInfo("0.0.0.0", "46801", "INLONG", "TCP");
ConfigManager.getInstance().addSourceReportInfo("127.0.0.1", "46802", "TEST", "TCP");
ConfigManager.getInstance().addSourceReportInfo("127.0.0.1", "46803", "TEST", "HTTP");
ConfigManager.getInstance().addSourceReportInfo("0.0.0.0", "46801", "TEST", "TCP");
ConfigManager.getInstance().addSourceReportInfo("127.0.0.1", "46803", "TEST", "HTTP");
// get and check
reportInfo = ConfigManager.getInstance().getSourceReportInfo();
Assert.assertEquals(reportInfo.getIp(), "0.0.0.0,0.0.0.0,127.0.0.1,127.0.0.1");
Assert.assertEquals(reportInfo.getPort(), "46801,46801,46802,46803");
Assert.assertEquals(reportInfo.getProtocolType(), "UDP,TCP,TCP,HTTP");
srcAddressInfoMap = ConfigManager.getInstance().getSrcAddressInfos();
Assert.assertEquals(srcAddressInfoMap.size(), 4);
String recordKey = "127.0.0.1" + "#" + "46803" + "#" + "HTTP";
AddressInfo addressInfo = srcAddressInfoMap.get(recordKey);
Assert.assertEquals(addressInfo.getReportSourceType(), "TEST");
}
}

0 comments on commit 9d745b8

Please sign in to comment.