Skip to content

Commit

Permalink
Prevent application crash on init time (#447)
Browse files Browse the repository at this point in the history
* Add health check configurable on init

* Checks on NebulaConfig edited

* Resolve lint problems

* Update TestConnectionPool.java
  • Loading branch information
aminmaghsodi authored Apr 11, 2022
1 parent e320716 commit 894a97a
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@ public class NebulaPoolConfig implements Serializable {
// 0 means never delete
private int idleTime = 0;

// the interval time to check idle connection, unit ms, -1 means no check
// The interval time to check idle connection, unit ms, -1 means no check
private int intervalIdle = -1;

// the wait time to get idle connection, unit ms
// The wait time to get idle connection, unit ms
private int waitTime = 0;

// set to true to turn on ssl encrypted traffic
// The minimum rate of healthy servers to all servers. if 1 it means all servers should be
// available on init.
private double minClusterHealthRate = 1;

// Set to true to turn on ssl encrypted traffic
private boolean enableSsl = false;

// ssl param is required if ssl is turned on
// SSL param is required if ssl is turned on
private SSLParam sslParam = null;

public boolean isEnableSsl() {
Expand Down Expand Up @@ -107,4 +111,13 @@ public NebulaPoolConfig setWaitTime(int waitTime) {
this.waitTime = waitTime;
return this;
}

public double getMinClusterHealthRate() {
return minClusterHealthRate;
}

public NebulaPoolConfig setMinClusterHealthRate(double minClusterHealthRate) {
this.minClusterHealthRate = minClusterHealthRate;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ private void checkConfig(NebulaPoolConfig config) {
throw new InvalidConfigException(
"Config waitTime:" + config.getWaitTime() + " is illegal");
}

if (config.getMinClusterHealthRate() < 0) {
throw new InvalidConfigException(
"Config minClusterHealthRate:" + config.getMinClusterHealthRate()
+ " is illegal");
}
}

/**
Expand All @@ -90,8 +96,10 @@ public boolean init(List<HostAddress> addresses, NebulaPoolConfig config)
this.waitTime = config.getWaitTime();
List<HostAddress> newAddrs = hostToIp(addresses);
this.loadBalancer = config.isEnableSsl()
? new RoundRobinLoadBalancer(newAddrs, config.getTimeout(), config.getSslParam())
: new RoundRobinLoadBalancer(newAddrs, config.getTimeout());
? new RoundRobinLoadBalancer(newAddrs, config.getTimeout(), config.getSslParam(),
config.getMinClusterHealthRate())
: new RoundRobinLoadBalancer(newAddrs, config.getTimeout(),
config.getMinClusterHealthRate());
ConnObjectPool objectPool = new ConnObjectPool(this.loadBalancer, config);
this.objectPool = new GenericObjectPool<>(objectPool);
GenericObjectPoolConfig objConfig = new GenericObjectPoolConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,28 @@ public class RoundRobinLoadBalancer implements LoadBalancer {
private static final int S_BAD = 1;
private final List<HostAddress> addresses = new ArrayList<>();
private final Map<HostAddress, Integer> serversStatus = new ConcurrentHashMap<>();
private final double minClusterHealthRate;
private final int timeout;
private final AtomicInteger pos = new AtomicInteger(0);
private final int delayTime = 60; // unit seconds
private final int delayTime = 60; // Unit seconds
private final ScheduledExecutorService schedule = Executors.newScheduledThreadPool(1);
private SSLParam sslParam;
private boolean enabledSsl;

public RoundRobinLoadBalancer(List<HostAddress> addresses, int timeout) {
public RoundRobinLoadBalancer(List<HostAddress> addresses, int timeout,
double minClusterHealthRate) {
this.timeout = timeout;
for (HostAddress addr : addresses) {
this.addresses.add(addr);
this.serversStatus.put(addr, S_BAD);
}
this.minClusterHealthRate = minClusterHealthRate;
schedule.scheduleAtFixedRate(this::scheduleTask, 0, delayTime, TimeUnit.SECONDS);
}

public RoundRobinLoadBalancer(List<HostAddress> addresses, int timeout, SSLParam sslParam) {
this(addresses, timeout);
public RoundRobinLoadBalancer(List<HostAddress> addresses, int timeout, SSLParam sslParam,
double minClusterHealthRate) {
this(addresses, timeout, minClusterHealthRate);
this.sslParam = sslParam;
this.enabledSsl = true;
}
Expand All @@ -63,11 +67,11 @@ public HostAddress getAddress() {
}

public void updateServersStatus() {
for (HostAddress addr : addresses) {
if (ping(addr)) {
serversStatus.put(addr, S_OK);
for (HostAddress hostAddress : addresses) {
if (ping(hostAddress)) {
serversStatus.put(hostAddress, S_OK);
} else {
serversStatus.put(addr, S_BAD);
serversStatus.put(hostAddress, S_BAD);
}
}
}
Expand All @@ -93,12 +97,16 @@ public boolean ping(HostAddress addr) {

public boolean isServersOK() {
this.updateServersStatus();
for (HostAddress addr : addresses) {
if (serversStatus.get(addr) == S_BAD) {
return false;
double numServersWithOkStatus = 0;
for (HostAddress hostAddress : addresses) {
if (serversStatus.get(hostAddress) == S_OK) {
numServersWithOkStatus++;
}
}
return true;

// Check health rate.
double okServersRate = numServersWithOkStatus / addresses.size();
return okServersRate >= minClusterHealthRate;
}

private void scheduleTask() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public void testGetSession() {
// set idle time
nebulaPoolConfig.setIdleTime(2000);
nebulaPoolConfig.setIntervalIdle(1000);
nebulaPoolConfig.setMinClusterHealthRate(0);
// set wait time
nebulaPoolConfig.setWaitTime(1000);
List<HostAddress> addresses = Collections.singletonList(
Expand Down

0 comments on commit 894a97a

Please sign in to comment.