Skip to content

Commit

Permalink
[Fixed] close connection when flush fails during drain.
Browse files Browse the repository at this point in the history
Fixeds #324

Signed-off-by: Matthias Hanel <mh@synadia.com>
  • Loading branch information
matthiashanel committed Aug 5, 2020
1 parent 4dbb1d4 commit fd8ab65
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 3 deletions.
1 change: 1 addition & 0 deletions src/examples/java/io/nats/examples/NatsSub.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;

import io.nats.client.Connection;
import io.nats.client.Message;
Expand Down
11 changes: 8 additions & 3 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -1791,8 +1791,13 @@ public CompletableFuture<Boolean> drain(Duration timeout) throws TimeoutExceptio
cons.sendUnsubForDrain();
});

this.flush(timeout); // Flush and wait up to the timeout, if this fails, let the caller know

try {
this.flush(timeout); // Flush and wait up to the timeout, if this fails, let the caller know
} catch (Exception e) {
this.close(false);
throw e;
}

consumers.forEach((cons) -> {
cons.markUnsubedForDrain();
});
Expand Down Expand Up @@ -1843,7 +1848,7 @@ public CompletableFuture<Boolean> drain(Duration timeout) throws TimeoutExceptio
this.processException(e);
} finally {
try {
this.close();// close the connection after the last flush
this.close(false);// close the connection after the last flush
} catch (InterruptedException e) {
this.processException(e);
}
Expand Down
33 changes: 33 additions & 0 deletions src/test/java/io/nats/client/impl/DrainTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,39 @@

public class DrainTests {

@Test
public void testLeftOverTimer() throws Exception {
try (NatsTestServer ts = new NatsTestServer(false);
final Connection subCon = Nats.connect(new Options.Builder().server(ts.getURI()).maxReconnects(0).build())){
assertTrue("Connected Status", Connection.Status.CONNECTED == subCon.getStatus());

Subscription sub = subCon.subscribe("draintest");
subCon.flush(Duration.ofSeconds(1)); // Get the sub to the server, so drain has things to do

assertTrue("Timer exists", Thread.getAllStackTraces().keySet().stream().anyMatch(thread -> {
return thread.getName().equals("Nats Connection Timer");
}));

ts.shutdown(); // shut down the server to fail drain and subsequent close
boolean timedOut = false;
try {
subCon.drain(Duration.ofSeconds(1));
} catch (java.util.concurrent.TimeoutException e) {
timedOut = true;
} finally {
subCon.close();
assertFalse("Timer gone", Thread.getAllStackTraces().keySet().stream().anyMatch(thread -> {
return thread.getName().equals("Nats Connection Timer");
}));
}
assertTrue("Expect timeout exception", timedOut);
assertTrue("Expect closed connection", Connection.Status.CLOSED == subCon.getStatus());
assertFalse("Timer gone", Thread.getAllStackTraces().keySet().stream().anyMatch(thread -> {
return thread.getName().equals("Nats Connection Timer");
}));
}
}

@Test
public void testSimpleSubDrain() throws Exception {
try (NatsTestServer ts = new NatsTestServer(false);
Expand Down

0 comments on commit fd8ab65

Please sign in to comment.