Skip to content

Commit e820baa

Browse files
authored
HDFS-15417. RBF: Get the datanode report from cache for federation WebHDFS operations (#2080)
1 parent 834372f commit e820baa

File tree

3 files changed

+191
-11
lines changed

3 files changed

+191
-11
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

Lines changed: 121 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_COUNT_KEY;
2727
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT;
2828
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_KEY;
29+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE;
30+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE_MS_DEFAULT;
2931

3032
import java.io.FileNotFoundException;
3133
import java.io.IOException;
@@ -41,7 +43,19 @@
4143
import java.util.Map;
4244
import java.util.Map.Entry;
4345
import java.util.Set;
44-
46+
import java.util.concurrent.Callable;
47+
import java.util.concurrent.ExecutionException;
48+
import java.util.concurrent.Executors;
49+
import java.util.concurrent.ThreadFactory;
50+
import java.util.concurrent.TimeUnit;
51+
52+
import com.google.common.cache.CacheBuilder;
53+
import com.google.common.cache.CacheLoader;
54+
import com.google.common.cache.LoadingCache;
55+
import com.google.common.util.concurrent.ListenableFuture;
56+
import com.google.common.util.concurrent.ListeningExecutorService;
57+
import com.google.common.util.concurrent.MoreExecutors;
58+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
4559
import org.apache.hadoop.conf.Configuration;
4660
import org.apache.hadoop.crypto.CryptoProtocolVersion;
4761
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
@@ -219,6 +233,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
219233
private static final ThreadLocal<UserGroupInformation> CUR_USER =
220234
new ThreadLocal<>();
221235

236+
/** DN type -> full DN report. */
237+
private final LoadingCache<DatanodeReportType, DatanodeInfo[]> dnCache;
238+
222239
/**
223240
* Construct a router RPC server.
224241
*
@@ -361,6 +378,23 @@ public RouterRpcServer(Configuration configuration, Router router,
361378
this.nnProto = new RouterNamenodeProtocol(this);
362379
this.clientProto = new RouterClientProtocol(conf, this);
363380
this.routerProto = new RouterUserProtocol(this);
381+
382+
long dnCacheExpire = conf.getTimeDuration(
383+
DN_REPORT_CACHE_EXPIRE,
384+
DN_REPORT_CACHE_EXPIRE_MS_DEFAULT, TimeUnit.MILLISECONDS);
385+
this.dnCache = CacheBuilder.newBuilder()
386+
.build(new DatanodeReportCacheLoader());
387+
388+
// Actively refresh the dn cache in a configured interval
389+
Executors
390+
.newSingleThreadScheduledExecutor()
391+
.scheduleWithFixedDelay(() -> this.dnCache
392+
.asMap()
393+
.keySet()
394+
.parallelStream()
395+
.forEach((key) -> this.dnCache.refresh(key)),
396+
0,
397+
dnCacheExpire, TimeUnit.MILLISECONDS);
364398
}
365399

366400
@Override
@@ -868,6 +902,50 @@ public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
868902
return clientProto.getDatanodeReport(type);
869903
}
870904

905+
/**
906+
* Get the datanode report from cache.
907+
*
908+
* @param type Type of the datanode.
909+
* @return List of datanodes.
910+
* @throws IOException If it cannot get the report.
911+
*/
912+
DatanodeInfo[] getCachedDatanodeReport(DatanodeReportType type)
913+
throws IOException {
914+
try {
915+
DatanodeInfo[] dns = this.dnCache.get(type);
916+
if (dns == null) {
917+
LOG.debug("Get null DN report from cache");
918+
dns = getCachedDatanodeReportImpl(type);
919+
this.dnCache.put(type, dns);
920+
}
921+
return dns;
922+
} catch (ExecutionException e) {
923+
LOG.error("Cannot get the DN report for {}", type, e);
924+
Throwable cause = e.getCause();
925+
if (cause instanceof IOException) {
926+
throw (IOException) cause;
927+
} else {
928+
throw new IOException(cause);
929+
}
930+
}
931+
}
932+
933+
private DatanodeInfo[] getCachedDatanodeReportImpl(
934+
final DatanodeReportType type) throws IOException {
935+
// We need to get the DNs as a privileged user
936+
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
937+
RouterRpcServer.setCurrentUser(loginUser);
938+
939+
try {
940+
DatanodeInfo[] dns = clientProto.getDatanodeReport(type);
941+
LOG.debug("Refresh cached DN report with {} datanodes", dns.length);
942+
return dns;
943+
} finally {
944+
// Reset ugi to remote user for remaining operations.
945+
RouterRpcServer.resetCurrentUser();
946+
}
947+
}
948+
871949
/**
872950
* Get the datanode report with a timeout.
873951
* @param type Type of the datanode.
@@ -1748,4 +1826,45 @@ public void refreshSuperUserGroupsConfiguration() throws IOException {
17481826
public String[] getGroupsForUser(String user) throws IOException {
17491827
return routerProto.getGroupsForUser(user);
17501828
}
1751-
}
1829+
1830+
/**
1831+
* Deals with loading datanode report into the cache and refresh.
1832+
*/
1833+
private class DatanodeReportCacheLoader
1834+
extends CacheLoader<DatanodeReportType, DatanodeInfo[]> {
1835+
1836+
private ListeningExecutorService executorService;
1837+
1838+
DatanodeReportCacheLoader() {
1839+
ThreadFactory threadFactory = new ThreadFactoryBuilder()
1840+
.setNameFormat("DatanodeReport-Cache-Reload")
1841+
.setDaemon(true)
1842+
.build();
1843+
1844+
executorService = MoreExecutors.listeningDecorator(
1845+
Executors.newSingleThreadExecutor(threadFactory));
1846+
}
1847+
1848+
@Override
1849+
public DatanodeInfo[] load(DatanodeReportType type) throws Exception {
1850+
return getCachedDatanodeReportImpl(type);
1851+
}
1852+
1853+
/**
1854+
* Override the reload method to provide an asynchronous implementation,
1855+
* so that the query will not be slowed down by the cache refresh. It
1856+
* will return the old cache value and schedule a background refresh.
1857+
*/
1858+
@Override
1859+
public ListenableFuture<DatanodeInfo[]> reload(
1860+
final DatanodeReportType type, DatanodeInfo[] oldValue)
1861+
throws Exception {
1862+
return executorService.submit(new Callable<DatanodeInfo[]>() {
1863+
@Override
1864+
public DatanodeInfo[] call() throws Exception {
1865+
return load(type);
1866+
}
1867+
});
1868+
}
1869+
}
1870+
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -454,19 +454,12 @@ private URI redirectURI(final Router router, final UserGroupInformation ugi,
454454
private DatanodeInfo chooseDatanode(final Router router,
455455
final String path, final HttpOpParam.Op op, final long openOffset,
456456
final String excludeDatanodes) throws IOException {
457-
// We need to get the DNs as a privileged user
458457
final RouterRpcServer rpcServer = getRPCServer(router);
459-
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
460-
RouterRpcServer.setCurrentUser(loginUser);
461-
462458
DatanodeInfo[] dns = null;
463459
try {
464-
dns = rpcServer.getDatanodeReport(DatanodeReportType.LIVE);
460+
dns = rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
465461
} catch (IOException e) {
466462
LOG.error("Cannot get the datanodes from the RPC server", e);
467-
} finally {
468-
// Reset ugi to remote user for remaining operations.
469-
RouterRpcServer.resetCurrentUser();
470463
}
471464

472465
HashSet<Node> excludes = new HashSet<Node>();

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@
6767
import org.apache.hadoop.hdfs.DFSConfigKeys;
6868
import org.apache.hadoop.hdfs.DFSTestUtil;
6969
import org.apache.hadoop.hdfs.DistributedFileSystem;
70+
import org.apache.hadoop.hdfs.MiniDFSCluster;
71+
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
7072
import org.apache.hadoop.hdfs.NameNodeProxies;
7173
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
7274
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
@@ -216,6 +218,12 @@ public static void globalSetUp() throws Exception {
216218
// Register and verify all NNs with all routers
217219
cluster.registerNamenodes();
218220
cluster.waitNamenodeRegistration();
221+
222+
// We decrease the DN heartbeat expire interval to make them dead faster
223+
cluster.getCluster().getNamesystem(0).getBlockManager()
224+
.getDatanodeManager().setHeartbeatExpireInterval(5000);
225+
cluster.getCluster().getNamesystem(1).getBlockManager()
226+
.getDatanodeManager().setHeartbeatExpireInterval(5000);
219227
}
220228

221229
@AfterClass
@@ -1777,6 +1785,66 @@ public void testgetGroupsForUser() throws IOException {
17771785
assertArrayEquals(group, result);
17781786
}
17791787

1788+
@Test
1789+
public void testGetCachedDatanodeReport() throws Exception {
1790+
RouterRpcServer rpcServer = router.getRouter().getRpcServer();
1791+
final DatanodeInfo[] datanodeReport =
1792+
rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
1793+
1794+
// We should have 12 nodes in total
1795+
assertEquals(12, datanodeReport.length);
1796+
1797+
// We should be caching this information
1798+
DatanodeInfo[] datanodeReport1 =
1799+
rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
1800+
assertArrayEquals(datanodeReport1, datanodeReport);
1801+
1802+
// Stop one datanode
1803+
MiniDFSCluster miniDFSCluster = getCluster().getCluster();
1804+
DataNodeProperties dnprop = miniDFSCluster.stopDataNode(0);
1805+
1806+
// We wait until the cached value is updated
1807+
GenericTestUtils.waitFor(new Supplier<Boolean>() {
1808+
@Override
1809+
public Boolean get() {
1810+
DatanodeInfo[] dn = null;
1811+
try {
1812+
dn = rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
1813+
} catch (IOException ex) {
1814+
LOG.error("Error on getCachedDatanodeReport");
1815+
}
1816+
return !Arrays.equals(datanodeReport, dn);
1817+
}
1818+
}, 500, 5 * 1000);
1819+
1820+
// The cache should be updated now
1821+
final DatanodeInfo[] datanodeReport2 =
1822+
rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
1823+
assertEquals(datanodeReport.length - 1, datanodeReport2.length);
1824+
1825+
// Restart the DN we just stopped
1826+
miniDFSCluster.restartDataNode(dnprop);
1827+
miniDFSCluster.waitActive();
1828+
1829+
GenericTestUtils.waitFor(new Supplier<Boolean>() {
1830+
@Override
1831+
public Boolean get() {
1832+
DatanodeInfo[] dn = null;
1833+
try {
1834+
dn = rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
1835+
} catch (IOException ex) {
1836+
LOG.error("Error on getCachedDatanodeReport");
1837+
}
1838+
return datanodeReport.length == dn.length;
1839+
}
1840+
}, 500, 5 * 1000);
1841+
1842+
// The cache should be updated now
1843+
final DatanodeInfo[] datanodeReport3 =
1844+
rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
1845+
assertEquals(datanodeReport.length, datanodeReport3.length);
1846+
}
1847+
17801848
/**
17811849
* Check the erasure coding policies in the Router and the Namenode.
17821850
* @return The erasure coding policies.
@@ -1814,4 +1882,4 @@ private DFSClient getFileDFSClient(final String path) {
18141882
}
18151883
return null;
18161884
}
1817-
}
1885+
}

0 commit comments

Comments
 (0)