From b3c905b065307ca8072ebde2de416d7e57c924dd Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Mon, 14 Jun 2021 21:31:55 +0100 Subject: [PATCH] KAFKA-12948: Remove node from ClusterConnectionStates.connectingNodes when node is removed --- .../clients/ClusterConnectionStates.java | 1 + .../kafka/clients/NetworkClientTest.java | 33 +++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java index b9d2b13a602d1..524d54b0923b5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -387,6 +387,7 @@ private void updateConnectionSetupTimeout(NodeConnectionState nodeState) { */ public void remove(String id) { nodeState.remove(id); + connectingNodes.remove(id); } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 47b5b201141fd..4fbfd4293409a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -1064,6 +1064,39 @@ public void testFailedConnectionToFirstAddressAfterReconnect() { assertEquals(2, mockHostResolver.resolutionCount()); } + @Test + public void testCloseConnectingNode() { + Cluster cluster = TestUtils.clusterWith(2); + Node node0 = cluster.nodeById(0); + Node node1 = cluster.nodeById(1); + client.ready(node0, time.milliseconds()); + selector.serverConnectionBlocked(node0.idString()); + client.poll(1, time.milliseconds()); + client.close(node0.idString()); + + // Poll without any connections should return without exceptions + client.poll(0, time.milliseconds()); + assertFalse(NetworkClientUtils.isReady(client, node0, time.milliseconds())); + assertFalse(NetworkClientUtils.isReady(client, node1, time.milliseconds())); + + // Connection to new node should work + client.ready(node1, time.milliseconds()); + ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(defaultApiVersionsResponse(), ApiKeys.API_VERSIONS.latestVersion(), 0); + selector.delayedReceive(new DelayedReceive(node1.idString(), new NetworkReceive(node1.idString(), buffer))); + while (!client.ready(node1, time.milliseconds())) + client.poll(1, time.milliseconds()); + assertTrue(client.isReady(node1, time.milliseconds())); + selector.clear(); + + // New connection to node closed earlier should work + client.ready(node0, time.milliseconds()); + buffer = RequestTestUtils.serializeResponseWithHeader(defaultApiVersionsResponse(), ApiKeys.API_VERSIONS.latestVersion(), 1); + selector.delayedReceive(new DelayedReceive(node0.idString(), new NetworkReceive(node0.idString(), buffer))); + while (!client.ready(node0, time.milliseconds())) + client.poll(1, time.milliseconds()); + assertTrue(client.isReady(node0, time.milliseconds())); + } + private RequestHeader parseHeader(ByteBuffer buffer) { buffer.getInt(); // skip size return RequestHeader.parse(buffer.slice());