Skip to content

Commit cfc11d2

Browse files
snmvaughanSteve Vaughan Jr
andauthored
HADOOP-18365. Update the remote address when a change is detected (#4692) (#4768)
Back port to branch-3.3, to avoid reconnecting to the old address after detecting that the address has been updated. * Use a stable hashCode to allow safe IP addr changes * Add test that updated address is used Once the address has been updated, it will be used in future calls. Test verifies that a second request succeeds and that it uses the existing updated address instead of having to re-resolve. Co-authored-by: Steve Vaughan Jr <s_vaughan@apple.com>
1 parent 51ddd02 commit cfc11d2

File tree

2 files changed

+110
-6
lines changed
  • hadoop-common-project/hadoop-common/src

2 files changed

+110
-6
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ public synchronized Writable getRpcResponse() {
418418
* socket: responses may be delivered out of order. */
419419
private class Connection extends Thread {
420420
private InetSocketAddress server; // server ip:port
421-
private final ConnectionId remoteId; // connection id
421+
private final ConnectionId remoteId; // connection id
422422
private AuthMethod authMethod; // authentication method
423423
private AuthProtocol authProtocol;
424424
private int serviceClass;
@@ -644,6 +644,9 @@ private synchronized boolean updateAddress() throws IOException {
644644
LOG.warn("Address change detected. Old: " + server.toString() +
645645
" New: " + currentAddr.toString());
646646
server = currentAddr;
647+
// Update the remote address so that reconnections are with the updated address.
648+
// This avoids thrashing.
649+
remoteId.setAddress(currentAddr);
647650
UserGroupInformation ticket = remoteId.getTicket();
648651
this.setName("IPC Client (" + socketFactory.hashCode()
649652
+ ") connection to " + server.toString() + " from "
@@ -1698,9 +1701,9 @@ private Connection getConnection(ConnectionId remoteId,
16981701
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
16991702
@InterfaceStability.Evolving
17001703
public static class ConnectionId {
1701-
InetSocketAddress address;
1702-
UserGroupInformation ticket;
1703-
final Class<?> protocol;
1704+
private InetSocketAddress address;
1705+
private final UserGroupInformation ticket;
1706+
private final Class<?> protocol;
17041707
private static final int PRIME = 16777619;
17051708
private final int rpcTimeout;
17061709
private final int maxIdleTime; //connections will be culled if it was idle for
@@ -1751,7 +1754,28 @@ public static class ConnectionId {
17511754
InetSocketAddress getAddress() {
17521755
return address;
17531756
}
1754-
1757+
1758+
/**
1759+
* This is used to update the remote address when an address change is detected. This method
1760+
* ensures that the {@link #hashCode()} won't change.
1761+
*
1762+
* @param address the updated address
1763+
* @throws IllegalArgumentException if the hostname or port doesn't match
1764+
* @see Connection#updateAddress()
1765+
*/
1766+
void setAddress(InetSocketAddress address) {
1767+
if (!Objects.equals(this.address.getHostName(), address.getHostName())) {
1768+
throw new IllegalArgumentException("Hostname must match: " + this.address + " vs "
1769+
+ address);
1770+
}
1771+
if (this.address.getPort() != address.getPort()) {
1772+
throw new IllegalArgumentException("Port must match: " + this.address + " vs " + address);
1773+
}
1774+
1775+
this.address = address;
1776+
}
1777+
1778+
17551779
Class<?> getProtocol() {
17561780
return protocol;
17571781
}
@@ -1858,7 +1882,11 @@ && isEqual(this.protocol, that.protocol)
18581882
@Override
18591883
public int hashCode() {
18601884
int result = connectionRetryPolicy.hashCode();
1861-
result = PRIME * result + ((address == null) ? 0 : address.hashCode());
1885+
// We calculate based on the host name and port without the IP address, since the hashCode
1886+
// must be stable even if the IP address is updated.
1887+
result = PRIME * result + ((address == null || address.getHostName() == null) ? 0 :
1888+
address.getHostName().hashCode());
1889+
result = PRIME * result + ((address == null) ? 0 : address.getPort());
18621890
result = PRIME * result + (doPing ? 1231 : 1237);
18631891
result = PRIME * result + maxIdleTime;
18641892
result = PRIME * result + pingInterval;

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.ipc;
2020

21+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2122
import static org.junit.Assert.assertEquals;
2223
import static org.junit.Assert.assertFalse;
2324
import static org.junit.Assert.assertNotNull;
@@ -815,6 +816,81 @@ public Void call() throws IOException {
815816
}
816817
}
817818

819+
/**
820+
* The {@link ConnectionId#hashCode} has to be stable despite updates that occur as the the
821+
* address evolves over time. The {@link ConnectionId} is used as a primary key in maps, so
822+
* its hashCode can't change.
823+
*
824+
* @throws IOException if there is a client or server failure
825+
*/
826+
@Test
827+
public void testStableHashCode() throws IOException {
828+
Server server = new TestServer(5, false);
829+
try {
830+
server.start();
831+
832+
// Leave host unresolved to start. Use "localhost" as opposed
833+
// to local IP from NetUtils.getConnectAddress(server) to force
834+
// resolution later
835+
InetSocketAddress unresolvedAddr = InetSocketAddress.createUnresolved(
836+
"localhost", NetUtils.getConnectAddress(server).getPort());
837+
838+
// Setup: Create a ConnectionID using an unresolved address, and get it's hashCode to serve
839+
// as a point of comparison.
840+
int rpcTimeout = MIN_SLEEP_TIME * 2;
841+
final ConnectionId remoteId = getConnectionId(unresolvedAddr, rpcTimeout, conf);
842+
int expected = remoteId.hashCode();
843+
844+
// Start client
845+
Client.setConnectTimeout(conf, 100);
846+
Client client = new Client(LongWritable.class, conf);
847+
try {
848+
// Test: Call should re-resolve host and succeed
849+
LongWritable param = new LongWritable(RANDOM.nextLong());
850+
client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId,
851+
RPC.RPC_SERVICE_CLASS_DEFAULT, null);
852+
int actual = remoteId.hashCode();
853+
854+
// Verify: The hashCode should match, although the InetAddress is different since it has
855+
// now been resolved
856+
assertThat(remoteId.getAddress()).isNotEqualTo(unresolvedAddr);
857+
assertThat(remoteId.getAddress().getHostName()).isEqualTo(unresolvedAddr.getHostName());
858+
assertThat(remoteId.hashCode()).isEqualTo(expected);
859+
860+
// Test: Call should succeed without having to re-resolve
861+
InetSocketAddress expectedSocketAddress = remoteId.getAddress();
862+
param = new LongWritable(RANDOM.nextLong());
863+
client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId,
864+
RPC.RPC_SERVICE_CLASS_DEFAULT, null);
865+
866+
// Verify: The same instance of the InetSocketAddress has been used to make the second
867+
// call
868+
assertThat(remoteId.getAddress()).isSameAs(expectedSocketAddress);
869+
870+
// Verify: The hashCode is protected against updates to the host name
871+
String hostName = InetAddress.getLocalHost().getHostName();
872+
InetSocketAddress mismatchedHostName = NetUtils.createSocketAddr(
873+
InetAddress.getLocalHost().getHostName(),
874+
remoteId.getAddress().getPort());
875+
assertThatExceptionOfType(IllegalArgumentException.class)
876+
.isThrownBy(() -> remoteId.setAddress(mismatchedHostName))
877+
.withMessageStartingWith("Hostname must match");
878+
879+
// Verify: The hashCode is protected against updates to the port
880+
InetSocketAddress mismatchedPort = NetUtils.createSocketAddr(
881+
remoteId.getAddress().getHostName(),
882+
remoteId.getAddress().getPort() + 1);
883+
assertThatExceptionOfType(IllegalArgumentException.class)
884+
.isThrownBy(() -> remoteId.setAddress(mismatchedPort))
885+
.withMessageStartingWith("Port must match");
886+
} finally {
887+
client.stop();
888+
}
889+
} finally {
890+
server.stop();
891+
}
892+
}
893+
818894
@Test(timeout=60000)
819895
public void testIpcFlakyHostResolution() throws IOException {
820896
// start server

0 commit comments

Comments
 (0)