1717 */
1818package org .apache .hadoop .hdfs .server .federation .router ;
1919
20+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_HEALTH_MONITOR_TIMEOUT ;
21+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT ;
2022import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_HEARTBEAT_INTERVAL_MS ;
2123import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT ;
2224
2527import java .net .InetSocketAddress ;
2628import java .net .URI ;
2729import java .util .Map ;
30+ import java .util .concurrent .TimeUnit ;
2831
2932import org .apache .hadoop .conf .Configuration ;
3033import org .apache .hadoop .ha .HAServiceProtocol ;
@@ -85,6 +88,10 @@ public class NamenodeHeartbeatService extends PeriodicService {
8588 private NNHAServiceTarget localTarget ;
8689 /** Cache HA protocol. */
8790 private HAServiceProtocol localTargetHAProtocol ;
91+ /** Cache NN protocol. */
92+ private NamenodeProtocol namenodeProtocol ;
93+ /** Cache Client protocol. */
94+ private ClientProtocol clientProtocol ;
8895 /** RPC address for the namenode. */
8996 private String rpcAddress ;
9097 /** Service RPC address for the namenode. */
@@ -100,6 +107,9 @@ public class NamenodeHeartbeatService extends PeriodicService {
100107
101108 private String resolvedHost ;
102109 private String originalNnId ;
110+
111+ private int healthMonitorTimeoutMs = (int ) DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT ;
112+
103113 /**
104114 * Create a new Namenode status updater.
105115 * @param resolver Namenode resolver service to handle NN registration.
@@ -211,6 +221,15 @@ protected void serviceInit(Configuration configuration) throws Exception {
211221 DFS_ROUTER_HEARTBEAT_INTERVAL_MS ,
212222 DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT ));
213223
224+ long timeoutMs = conf .getTimeDuration (DFS_ROUTER_HEALTH_MONITOR_TIMEOUT ,
225+ DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT , TimeUnit .MILLISECONDS );
226+ if (timeoutMs < 0 ) {
227+ LOG .warn ("Invalid value {} configured for {} should be greater than or equal to 0. " +
228+ "Using value of : 0ms instead." , timeoutMs , DFS_ROUTER_HEALTH_MONITOR_TIMEOUT );
229+ this .healthMonitorTimeoutMs = 0 ;
230+ } else {
231+ this .healthMonitorTimeoutMs = (int ) timeoutMs ;
232+ }
214233
215234 super .serviceInit (configuration );
216235 }
@@ -309,66 +328,26 @@ protected NamenodeStatusReport getNamenodeStatusReport() {
309328 LOG .debug ("Probing NN at service address: {}" , serviceAddress );
310329
311330 URI serviceURI = new URI ("hdfs://" + serviceAddress );
312- // Read the filesystem info from RPC (required)
313- NamenodeProtocol nn = NameNodeProxies
314- .createProxy (this .conf , serviceURI , NamenodeProtocol .class )
315- .getProxy ();
316331
317- if (nn != null ) {
318- NamespaceInfo info = nn .versionRequest ();
319- if (info != null ) {
320- report .setNamespaceInfo (info );
321- }
322- }
332+ // Read the filesystem info from RPC (required)
333+ updateNameSpaceInfoParameters (serviceURI , report );
323334 if (!report .registrationValid ()) {
324335 return report ;
325336 }
326337
327338 // Check for safemode from the client protocol. Currently optional, but
328339 // should be required at some point for QoS
329- try {
330- ClientProtocol client = NameNodeProxies
331- .createProxy (this .conf , serviceURI , ClientProtocol .class )
332- .getProxy ();
333- if (client != null ) {
334- boolean isSafeMode = client .setSafeMode (
335- SafeModeAction .SAFEMODE_GET , false );
336- report .setSafeMode (isSafeMode );
337- }
338- } catch (Exception e ) {
339- LOG .error ("Cannot fetch safemode state for {}" , getNamenodeDesc (), e );
340- }
340+ updateSafeModeParameters (serviceURI , report );
341341
342342 // Read the stats from JMX (optional)
343343 updateJMXParameters (webAddress , report );
344344
345- if (localTarget != null ) {
346- // Try to get the HA status
347- try {
348- // Determine if NN is active
349- // TODO: dynamic timeout
350- if (localTargetHAProtocol == null ) {
351- localTargetHAProtocol = localTarget .getHealthMonitorProxy (conf , 30 *1000 );
352- LOG .debug ("Get HA status with address {}" , lifelineAddress );
353- }
354- HAServiceStatus status = localTargetHAProtocol .getServiceStatus ();
355- report .setHAServiceState (status .getState ());
356- } catch (Throwable e ) {
357- if (e .getMessage ().startsWith ("HA for namenode is not enabled" )) {
358- LOG .error ("HA for {} is not enabled" , getNamenodeDesc ());
359- localTarget = null ;
360- } else {
361- // Failed to fetch HA status, ignoring failure
362- LOG .error ("Cannot fetch HA status for {}: {}" ,
363- getNamenodeDesc (), e .getMessage (), e );
364- }
365- localTargetHAProtocol = null ;
366- }
367- }
368- } catch (IOException e ) {
345+ // Try to get the HA status
346+ updateHAStatusParameters (report );
347+ } catch (IOException e ) {
369348 LOG .error ("Cannot communicate with {}: {}" ,
370349 getNamenodeDesc (), e .getMessage ());
371- } catch (Throwable e ) {
350+ } catch (Throwable e ) {
372351 // Generic error that we don't know about
373352 LOG .error ("Unexpected exception while communicating with {}: {}" ,
374353 getNamenodeDesc (), e .getMessage (), e );
@@ -399,6 +378,59 @@ private static String getNnHeartBeatServiceName(String nsId, String nnId) {
399378 (nnId == null ? "" : " " + nnId );
400379 }
401380
381+ /**
382+ * Get the namespace information for a Namenode via RPC and add them to the report.
383+ * @param serviceURI Server address of the Namenode to monitor.
384+ * @param report Namenode status report updating with namespace information data.
385+ * @throws IOException This method will throw IOException up, because RBF need
386+ * use Namespace Info to identify this NS. If there are some IOExceptions,
387+ * RBF doesn't need to get other information from NameNode,
388+ * so throw IOException up.
389+ */
390+ private void updateNameSpaceInfoParameters (URI serviceURI ,
391+ NamenodeStatusReport report ) throws IOException {
392+ try {
393+ if (this .namenodeProtocol == null ) {
394+ this .namenodeProtocol = NameNodeProxies .createProxy (this .conf , serviceURI ,
395+ NamenodeProtocol .class ).getProxy ();
396+ }
397+ if (namenodeProtocol != null ) {
398+ NamespaceInfo info = namenodeProtocol .versionRequest ();
399+ if (info != null ) {
400+ report .setNamespaceInfo (info );
401+ }
402+ }
403+ } catch (IOException e ) {
404+ this .namenodeProtocol = null ;
405+ throw e ;
406+ }
407+ }
408+
409+ /**
410+ * Get the safemode information for a Namenode via RPC and add them to the report.
411+ * Safemode is only one status of NameNode and is useless for RBF identify one NameNode.
412+ * So If there are some IOExceptions, RBF can just ignore it and try to collect
413+ * other information form namenode continue.
414+ * @param serviceURI Server address of the Namenode to monitor.
415+ * @param report Namenode status report updating with safemode information data.
416+ */
417+ private void updateSafeModeParameters (URI serviceURI , NamenodeStatusReport report ) {
418+ try {
419+ if (this .clientProtocol == null ) {
420+ this .clientProtocol = NameNodeProxies
421+ .createProxy (this .conf , serviceURI , ClientProtocol .class )
422+ .getProxy ();
423+ }
424+ if (clientProtocol != null ) {
425+ boolean isSafeMode = clientProtocol .setSafeMode (SafeModeAction .SAFEMODE_GET , false );
426+ report .setSafeMode (isSafeMode );
427+ }
428+ } catch (Exception e ) {
429+ LOG .error ("Cannot fetch safemode state for {}" , getNamenodeDesc (), e );
430+ this .clientProtocol = null ;
431+ }
432+ }
433+
402434 /**
403435 * Get the parameters for a Namenode from JMX and add them to the report.
404436 * @param address Web interface of the Namenode to monitor.
@@ -415,6 +447,34 @@ private void updateJMXParameters(
415447 }
416448 }
417449
450+ /**
451+ * Get the HA status for a Namenode via RPC and add them to the report.
452+ * @param report Namenode status report updating with HA status information data.
453+ */
454+ private void updateHAStatusParameters (NamenodeStatusReport report ) {
455+ if (localTarget != null ) {
456+ try {
457+ // Determine if NN is active
458+ if (localTargetHAProtocol == null ) {
459+ localTargetHAProtocol = localTarget .getHealthMonitorProxy (
460+ conf , this .healthMonitorTimeoutMs );
461+ LOG .debug ("Get HA status with address {}" , lifelineAddress );
462+ }
463+ HAServiceStatus status = localTargetHAProtocol .getServiceStatus ();
464+ report .setHAServiceState (status .getState ());
465+ } catch (Throwable e ) {
466+ if (e .getMessage ().startsWith ("HA for namenode is not enabled" )) {
467+ LOG .error ("HA for {} is not enabled" , getNamenodeDesc ());
468+ localTarget = null ;
469+ } else {
470+ // Failed to fetch HA status, ignoring failure
471+ LOG .error ("Cannot fetch HA status for {}" , getNamenodeDesc (), e );
472+ }
473+ localTargetHAProtocol = null ;
474+ }
475+ }
476+ }
477+
418478 /**
419479 * Fetches NamenodeInfo metrics from namenode.
420480 * @param address Web interface of the Namenode to monitor.
0 commit comments