Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add close() method to connectionPool so connections are closed when origins are reloaded #551

Merged
merged 8 commits into from
Dec 4, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class SimpleConnectionPool implements ConnectionPool, Connection.Listener
private final AtomicInteger terminatedConnections = new AtomicInteger();
private final AtomicInteger connectionFailures = new AtomicInteger();
private final AtomicInteger connectionsInEstablishment = new AtomicInteger();
private volatile boolean active;


public SimpleConnectionPool(Origin origin, ConnectionPoolSettings poolSettings, Connection.Factory connectionFactory) {
Expand All @@ -65,6 +66,7 @@ public SimpleConnectionPool(Origin origin, ConnectionPoolSettings poolSettings,
this.connectionFactory = requireNonNull(connectionFactory);
this.availableConnections = new ConcurrentLinkedDeque<>();
this.waitingSubscribers = new ConcurrentLinkedDeque<>();
this.active = true;
}

public Origin getOrigin() {
Expand All @@ -73,25 +75,29 @@ public Origin getOrigin() {

@Override
public Publisher<Connection> borrowConnection() {
return Mono.<Connection>create(sink -> {
Connection connection = dequeue();
if (connection != null) {
attemptBorrowConnection(sink, connection);
} else {
if (waitingSubscribers.size() < poolSettings.maxPendingConnectionsPerHost()) {
this.waitingSubscribers.add(sink);
sink.onDispose(() -> waitingSubscribers.remove(sink));
newConnection();
if (active) {
return Mono.<Connection>create(sink -> {
Connection connection = dequeue();
if (connection != null) {
attemptBorrowConnection(sink, connection);
} else {
sink.error(new MaxPendingConnectionsExceededException(
origin,
poolSettings.maxPendingConnectionsPerHost(),
poolSettings.maxPendingConnectionsPerHost()));
if (waitingSubscribers.size() < poolSettings.maxPendingConnectionsPerHost()) {
this.waitingSubscribers.add(sink);
sink.onDispose(() -> waitingSubscribers.remove(sink));
newConnection();
} else {
sink.error(new MaxPendingConnectionsExceededException(
origin,
poolSettings.maxPendingConnectionsPerHost(),
poolSettings.maxPendingConnectionsPerHost()));
}
}
}
}).timeout(
Duration.ofMillis(poolSettings.pendingConnectionTimeoutMillis()),
Mono.error(() -> new MaxPendingConnectionTimeoutException(origin, connectionSettings.connectTimeoutMillis())));
}).timeout(
Duration.ofMillis(poolSettings.pendingConnectionTimeoutMillis()),
Mono.error(() -> new MaxPendingConnectionTimeoutException(origin, connectionSettings.connectTimeoutMillis())));
} else {
throw new IllegalStateException("Pool is closed");
dvlato marked this conversation as resolved.
Show resolved Hide resolved
}
}

private void newConnection() {
Expand Down Expand Up @@ -156,19 +162,25 @@ private void attemptBorrowConnection(MonoSink<Connection> sink, Connection conne
}

public boolean returnConnection(Connection connection) {
borrowedCount.decrementAndGet();
if (connection.isConnected()) {
queueNewConnection(connection);
if (active) {
borrowedCount.decrementAndGet();
if (connection.isConnected()) {
queueNewConnection(connection);
}
dvlato marked this conversation as resolved.
Show resolved Hide resolved
} else {
connection.close();
}
return false;
}

public boolean closeConnection(Connection connection) {
connection.close();
borrowedCount.decrementAndGet();
closedConnections.incrementAndGet();
if (active) {
borrowedCount.decrementAndGet();
closedConnections.incrementAndGet();

newConnection();
newConnection();
}
dvlato marked this conversation as resolved.
Show resolved Hide resolved
return true;
}

Expand All @@ -187,8 +199,19 @@ public ConnectionPoolSettings settings() {

@Override
public void connectionClosed(Connection connection) {
terminatedConnections.incrementAndGet();
availableConnections.remove(connection);
if (active) {
terminatedConnections.incrementAndGet();
availableConnections.remove(connection);
}
}

@Override
public void close() {
active = false;
Connection con;
while ((con = availableConnections.poll()) != null) {
con.close();
}
}

public ConnectionPool.Stats stats() {
Expand Down Expand Up @@ -237,6 +260,7 @@ public int connectionsInEstablishment() {
return connectionsInEstablishment.get();
}


@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand Down