Skip to content

Commit 263413e

Browse files
author
Inigo Goiri
committed
HDFS-15112. RBF: Do not return FileNotFoundException when a subcluster is unavailable.
1 parent edbbc03 commit 263413e

File tree

3 files changed

+114
-23
lines changed

3 files changed

+114
-23
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY;
2222
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY;
2323

24+
import java.io.EOFException;
2425
import java.io.FileNotFoundException;
2526
import java.io.IOException;
2627
import java.lang.reflect.Constructor;
@@ -436,8 +437,7 @@ private Object invokeMethod(
436437
this.rpcMonitor.proxyOpFailureStandby();
437438
}
438439
failover = true;
439-
} else if (ioe instanceof ConnectException ||
440-
ioe instanceof ConnectTimeoutException) {
440+
} else if (isUnavailableException(ioe)) {
441441
if (this.rpcMonitor != null) {
442442
this.rpcMonitor.proxyOpFailureCommunicate();
443443
}
@@ -503,8 +503,7 @@ private Object invokeMethod(
503503
if (ioe instanceof StandbyException) {
504504
LOG.error("{} at {} is in Standby: {}",
505505
nnKey, addr, ioe.getMessage());
506-
} else if (ioe instanceof ConnectException ||
507-
ioe instanceof ConnectTimeoutException) {
506+
} else if (isUnavailableException(ioe)) {
508507
exConnect++;
509508
LOG.error("{} at {} cannot be reached: {}",
510509
nnKey, addr, ioe.getMessage());
@@ -563,8 +562,7 @@ private Object invoke(String nsId, int retryCount, final Method method,
563562
// failover, invoker looks for standby exceptions for failover.
564563
if (ioe instanceof StandbyException) {
565564
throw ioe;
566-
} else if (ioe instanceof ConnectException ||
567-
ioe instanceof ConnectTimeoutException) {
565+
} else if (isUnavailableException(ioe)) {
568566
throw ioe;
569567
} else {
570568
throw new StandbyException(ioe.getMessage());
@@ -578,6 +576,27 @@ private Object invoke(String nsId, int retryCount, final Method method,
578576
}
579577
}
580578

579+
/**
580+
* Check if the exception comes from an unavailable subcluster.
581+
* @param ioe IOException to check.
582+
* @return If the exception comes from an unavailable subcluster.
583+
*/
584+
public static boolean isUnavailableException(IOException ioe) {
585+
if (ioe instanceof ConnectException ||
586+
ioe instanceof ConnectTimeoutException ||
587+
ioe instanceof EOFException ||
588+
ioe instanceof StandbyException) {
589+
return true;
590+
}
591+
if (ioe instanceof RetriableException) {
592+
Throwable cause = ioe.getCause();
593+
if (cause instanceof NoNamenodesAvailableException) {
594+
return true;
595+
}
596+
}
597+
return false;
598+
}
599+
581600
/**
582601
* Check if the cluster of given nameservice id is available.
583602
* @param nsId nameservice ID.
@@ -833,8 +852,7 @@ public <T> T invokeSequential(
833852

834853
final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
835854
final Method m = remoteMethod.getMethod();
836-
IOException firstThrownException = null;
837-
IOException lastThrownException = null;
855+
List<IOException> thrownExceptions = new ArrayList<>();
838856
Object firstResult = null;
839857
// Invoke in priority order
840858
for (final RemoteLocationContext loc : locations) {
@@ -862,29 +880,33 @@ public <T> T invokeSequential(
862880
ioe = processException(ioe, loc);
863881

864882
// Record it and move on
865-
lastThrownException = ioe;
866-
if (firstThrownException == null) {
867-
firstThrownException = lastThrownException;
868-
}
883+
thrownExceptions.add(ioe);
869884
} catch (Exception e) {
870885
// Unusual error, ClientProtocol calls always use IOException (or
871886
// RemoteException). Re-wrap in IOException for compatibility with
872887
// ClientProtcol.
873888
LOG.error("Unexpected exception {} proxying {} to {}",
874889
e.getClass(), m.getName(), ns, e);
875-
lastThrownException = new IOException(
890+
IOException ioe = new IOException(
876891
"Unexpected exception proxying API " + e.getMessage(), e);
877-
if (firstThrownException == null) {
878-
firstThrownException = lastThrownException;
879-
}
892+
thrownExceptions.add(ioe);
880893
}
881894
}
882895

883-
if (firstThrownException != null) {
884-
// re-throw the last exception thrown for compatibility
885-
throw firstThrownException;
896+
if (!thrownExceptions.isEmpty()) {
897+
// An unavailable subcluster may be the actual cause
898+
// We cannot surface other exceptions (e.g., FileNotFoundException)
899+
for (int i = 0; i < thrownExceptions.size(); i++) {
900+
IOException ioe = thrownExceptions.get(i);
901+
if (isUnavailableException(ioe)) {
902+
throw ioe;
903+
}
904+
}
905+
906+
// re-throw the first exception thrown for compatibility
907+
throw thrownExceptions.get(0);
886908
}
887-
// Return the last result, whether it is the value we are looking for or a
909+
// Return the first result, whether it is the value or not
888910
@SuppressWarnings("unchecked")
889911
T ret = (T)firstResult;
890912
return ret;

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,12 @@ RemoteLocation getCreateLocation(
653653
}
654654
} catch (FileNotFoundException fne) {
655655
// Ignore if the file is not found
656+
} catch (IOException ioe) {
657+
if (RouterRpcClient.isUnavailableException(ioe)) {
658+
LOG.debug("Ignore unavailable exception: {}", ioe);
659+
} else {
660+
throw ioe;
661+
}
656662
}
657663
}
658664
return createLocation;

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.apache.hadoop.hdfs.server.federation.MockNamenode.registerSubclusters;
2626
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
2727
import static org.junit.Assert.assertEquals;
28+
import static org.junit.Assert.assertNotNull;
2829
import static org.junit.Assert.assertNull;
2930
import static org.junit.Assert.assertTrue;
3031
import static org.junit.Assert.fail;
@@ -40,6 +41,7 @@
4041
import java.util.HashMap;
4142
import java.util.List;
4243
import java.util.Map;
44+
import java.util.Map.Entry;
4345
import java.util.Random;
4446
import java.util.UUID;
4547
import java.util.concurrent.Callable;
@@ -51,6 +53,7 @@
5153

5254
import org.apache.hadoop.conf.Configuration;
5355
import org.apache.hadoop.fs.ContentSummary;
56+
import org.apache.hadoop.fs.FSDataInputStream;
5457
import org.apache.hadoop.fs.FSDataOutputStream;
5558
import org.apache.hadoop.fs.FileStatus;
5659
import org.apache.hadoop.fs.FileSystem;
@@ -72,6 +75,7 @@
7275
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
7376
import org.apache.hadoop.ipc.RemoteException;
7477
import org.apache.hadoop.security.UserGroupInformation;
78+
import org.apache.hadoop.test.LambdaTestUtils;
7579
import org.junit.After;
7680
import org.junit.Before;
7781
import org.junit.Test;
@@ -153,9 +157,6 @@ public void setup() throws Exception {
153157
registerSubclusters(
154158
routers, namenodes.values(), Collections.singleton("ns1"));
155159

156-
LOG.info("Stop ns1 to simulate an unavailable subcluster");
157-
namenodes.get("ns1").stop();
158-
159160
service = Executors.newFixedThreadPool(10);
160161
}
161162

@@ -209,6 +210,9 @@ private void updateMountPointFaultTolerant(final String mountPoint)
209210
@Test
210211
public void testWriteWithFailedSubcluster() throws Exception {
211212

213+
LOG.info("Stop ns1 to simulate an unavailable subcluster");
214+
namenodes.get("ns1").stop();
215+
212216
// Run the actual tests with each approach
213217
final List<Callable<Boolean>> tasks = new ArrayList<>();
214218
final List<DestinationOrder> orders = asList(
@@ -609,4 +613,63 @@ private FileSystem getRandomRouterFileSystem() throws Exception {
609613
return userUgi.doAs(
610614
(PrivilegedExceptionAction<FileSystem>) () -> getFileSystem(router));
611615
}
616+
617+
@Test
618+
public void testReadWithFailedSubcluster() throws Exception {
619+
620+
DestinationOrder order = DestinationOrder.HASH_ALL;
621+
final String mountPoint = "/" + order + "-testread";
622+
final Path mountPath = new Path(mountPoint);
623+
LOG.info("Setup {} with order {}", mountPoint, order);
624+
createMountTableEntry(
625+
routers, mountPoint, order, namenodes.keySet());
626+
627+
FileSystem fs = getRandomRouterFileSystem();
628+
629+
// Create a file (we don't write because we have no mock Datanodes)
630+
final Path fileexisting = new Path(mountPath, "fileexisting");
631+
final Path filenotexisting = new Path(mountPath, "filenotexisting");
632+
FSDataOutputStream os = fs.create(fileexisting);
633+
assertNotNull(os);
634+
os.close();
635+
636+
// We should be able to read existing files
637+
FSDataInputStream fsdis = fs.open(fileexisting);
638+
assertNotNull("We should be able to read the file", fsdis);
639+
// We shouldn't be able to read non-existing files
640+
LambdaTestUtils.intercept(FileNotFoundException.class,
641+
() -> fs.open(filenotexisting));
642+
643+
// Check the subcluster where the file got created
644+
String nsIdWithFile = null;
645+
for (Entry<String, MockNamenode> entry : namenodes.entrySet()) {
646+
String nsId = entry.getKey();
647+
MockNamenode nn = entry.getValue();
648+
int rpc = nn.getRPCPort();
649+
FileSystem nnfs = getFileSystem(rpc);
650+
651+
try {
652+
FileStatus fileStatus = nnfs.getFileStatus(fileexisting);
653+
assertNotNull(fileStatus);
654+
assertNull("The file cannot be in two subclusters", nsIdWithFile);
655+
nsIdWithFile = nsId;
656+
} catch (FileNotFoundException fnfe) {
657+
LOG.debug("File not found in {}", nsId);
658+
}
659+
}
660+
assertNotNull("The file has to be in one subcluster", nsIdWithFile);
661+
662+
LOG.info("Stop {} to simulate an unavailable subcluster", nsIdWithFile);
663+
namenodes.get(nsIdWithFile).stop();
664+
665+
// We should not get FileNotFoundException anymore
666+
try {
667+
fs.open(fileexisting);
668+
fail("It should throw an unavailable cluster exception");
669+
} catch(RemoteException re) {
670+
IOException ioe = re.unwrapRemoteException();
671+
assertTrue("Expected an unavailable exception for:" + ioe.getClass(),
672+
RouterRpcClient.isUnavailableException(ioe));
673+
}
674+
}
612675
}

0 commit comments

Comments
 (0)