2121import com .google .common .base .Joiner ;
2222import com .google .common .base .Preconditions ;
2323import com .google .common .collect .Lists ;
24+ import org .apache .commons .logging .Log ;
25+ import org .apache .commons .logging .LogFactory ;
26+ import org .apache .commons .logging .impl .Log4JLogger ;
2427import org .apache .hadoop .HadoopIllegalArgumentException ;
2528import org .apache .hadoop .classification .InterfaceAudience ;
2629import org .apache .hadoop .conf .Configuration ;
7275import org .apache .hadoop .util .JvmPauseMonitor ;
7376import org .apache .hadoop .util .ServicePlugin ;
7477import org .apache .hadoop .util .StringUtils ;
75- import org .apache .log4j .LogManager ;
78+ import org .apache .log4j .Appender ;
79+ import org .apache .log4j .AsyncAppender ;
7680import org .slf4j .Logger ;
7781import org .slf4j .LoggerFactory ;
7882
83+ import javax .management .Attribute ;
84+ import javax .management .AttributeList ;
85+ import javax .management .MBeanAttributeInfo ;
86+ import javax .management .MBeanInfo ;
87+ import javax .management .MBeanServer ;
88+ import javax .management .MalformedObjectNameException ;
7989import javax .management .ObjectName ;
8090
8191import java .io .IOException ;
8292import java .io .PrintStream ;
93+ import java .lang .management .ManagementFactory ;
8394import java .net .InetSocketAddress ;
8495import java .net .URI ;
8596import java .security .PrivilegedExceptionAction ;
8697import java .util .ArrayList ;
8798import java .util .Arrays ;
8899import java .util .Collection ;
100+ import java .util .Collections ;
101+ import java .util .HashSet ;
89102import java .util .List ;
103+ import java .util .Set ;
104+ import java .util .concurrent .ScheduledThreadPoolExecutor ;
105+ import java .util .concurrent .TimeUnit ;
90106import java .util .concurrent .atomic .AtomicBoolean ;
91107
92108import static org .apache .hadoop .fs .CommonConfigurationKeysPublic .FS_DEFAULT_NAME_KEY ;
112128import static org .apache .hadoop .hdfs .DFSConfigKeys .DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY ;
113129import static org .apache .hadoop .hdfs .DFSConfigKeys .DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY ;
114130import static org .apache .hadoop .hdfs .DFSConfigKeys .DFS_NAMENODE_KEYTAB_FILE_KEY ;
131+ import static org .apache .hadoop .hdfs .DFSConfigKeys .DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT ;
132+ import static org .apache .hadoop .hdfs .DFSConfigKeys .DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY ;
115133import static org .apache .hadoop .hdfs .DFSConfigKeys .DFS_NAMENODE_NAME_DIR_KEY ;
116134import static org .apache .hadoop .hdfs .DFSConfigKeys .DFS_NAMENODE_PLUGINS_KEY ;
117135import static org .apache .hadoop .hdfs .DFSConfigKeys .DFS_NAMENODE_RPC_ADDRESS_KEY ;
@@ -303,7 +321,10 @@ public long getProtocolVersion(String protocol,
303321 LoggerFactory .getLogger ("BlockStateChange" );
304322 public static final HAState ACTIVE_STATE = new ActiveState ();
305323 public static final HAState STANDBY_STATE = new StandbyState ();
306-
324+
325+ public static final Log MetricsLog =
326+ LogFactory .getLog ("NameNodeMetricsLog" );
327+
307328 protected FSNamesystem namesystem ;
308329 protected final Configuration conf ;
309330 protected final NamenodeRole role ;
@@ -329,6 +350,8 @@ public long getProtocolVersion(String protocol,
329350 private JvmPauseMonitor pauseMonitor ;
330351 private ObjectName nameNodeStatusBeanName ;
331352 SpanReceiverHost spanReceiverHost ;
353+ ScheduledThreadPoolExecutor metricsLoggerTimer ;
354+
332355 /**
333356 * The namenode address that clients will use to access this namenode
334357 * or the name service. For HA configurations using logical URI, it
@@ -662,6 +685,68 @@ protected void initialize(Configuration conf) throws IOException {
662685 metrics .getJvmMetrics ().setPauseMonitor (pauseMonitor );
663686
664687 startCommonServices (conf );
688+ startMetricsLogger (conf );
689+ }
690+
691+ /**
692+ * Start a timer to periodically write NameNode metrics to the log
693+ * file. This behavior can be disabled by configuration.
694+ * @param conf
695+ */
696+ protected void startMetricsLogger (Configuration conf ) {
697+ long metricsLoggerPeriodSec =
698+ conf .getInt (DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY ,
699+ DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT );
700+
701+ if (metricsLoggerPeriodSec <= 0 ) {
702+ return ;
703+ }
704+
705+ makeMetricsLoggerAsync ();
706+
707+ // Schedule the periodic logging.
708+ metricsLoggerTimer = new ScheduledThreadPoolExecutor (1 );
709+ metricsLoggerTimer .setExecuteExistingDelayedTasksAfterShutdownPolicy (
710+ false );
711+ metricsLoggerTimer .scheduleWithFixedDelay (new MetricsLoggerTask (),
712+ metricsLoggerPeriodSec ,
713+ metricsLoggerPeriodSec ,
714+ TimeUnit .SECONDS );
715+ }
716+
717+ /**
718+ * Make the metrics logger async and add all pre-existing appenders
719+ * to the async appender.
720+ */
721+ private static void makeMetricsLoggerAsync () {
722+ if (!(MetricsLog instanceof Log4JLogger )) {
723+ LOG .warn (
724+ "Metrics logging will not be async since the logger is not log4j" );
725+ return ;
726+ }
727+ org .apache .log4j .Logger logger = ((Log4JLogger ) MetricsLog ).getLogger ();
728+ logger .setAdditivity (false ); // Don't pollute NN logs with metrics dump
729+
730+ @ SuppressWarnings ("unchecked" )
731+ List <Appender > appenders = Collections .list (logger .getAllAppenders ());
732+ // failsafe against trying to async it more than once
733+ if (!appenders .isEmpty () && !(appenders .get (0 ) instanceof AsyncAppender )) {
734+ AsyncAppender asyncAppender = new AsyncAppender ();
735+ // change logger to have an async appender containing all the
736+ // previously configured appenders
737+ for (Appender appender : appenders ) {
738+ logger .removeAppender (appender );
739+ asyncAppender .addAppender (appender );
740+ }
741+ logger .addAppender (asyncAppender );
742+ }
743+ }
744+
745+ protected void stopMetricsLogger () {
746+ if (metricsLoggerTimer != null ) {
747+ metricsLoggerTimer .shutdown ();
748+ metricsLoggerTimer = null ;
749+ }
665750 }
666751
667752 /**
@@ -867,6 +952,7 @@ public void stop() {
867952 } catch (ServiceFailedException e ) {
868953 LOG .warn ("Encountered exception while exiting state " , e );
869954 } finally {
955+ stopMetricsLogger ();
870956 stopCommonServices ();
871957 if (metrics != null ) {
872958 metrics .shutdown ();
@@ -1846,4 +1932,91 @@ void checkHaStateChange(StateChangeRequestInfo req)
18461932 break ;
18471933 }
18481934 }
1935+
1936+ private static class MetricsLoggerTask implements Runnable {
1937+ private static final int MAX_LOGGED_VALUE_LEN = 128 ;
1938+ private static ObjectName OBJECT_NAME = null ;
1939+
1940+ static {
1941+ try {
1942+ OBJECT_NAME = new ObjectName ("Hadoop:*" );
1943+ } catch (MalformedObjectNameException m ) {
1944+ // This should not occur in practice since we pass
1945+ // a valid pattern to the constructor above.
1946+ }
1947+ }
1948+
1949+ /**
1950+ * Write NameNode metrics to the metrics appender when invoked.
1951+ */
1952+ @ Override
1953+ public void run () {
1954+ // Skip querying metrics if there are no known appenders.
1955+ if (!MetricsLog .isInfoEnabled () ||
1956+ !hasAppenders (MetricsLog ) ||
1957+ OBJECT_NAME == null ) {
1958+ return ;
1959+ }
1960+
1961+ MetricsLog .info (" >> Begin NameNode metrics dump" );
1962+ final MBeanServer server = ManagementFactory .getPlatformMBeanServer ();
1963+
1964+ // Iterate over each MBean.
1965+ for (final ObjectName mbeanName : server .queryNames (OBJECT_NAME , null )) {
1966+ try {
1967+ MBeanInfo mBeanInfo = server .getMBeanInfo (mbeanName );
1968+ final String mBeanNameName = MBeans .getMbeanNameName (mbeanName );
1969+ final Set <String > attributeNames = getFilteredAttributes (mBeanInfo );
1970+
1971+ final AttributeList attributes =
1972+ server .getAttributes (mbeanName ,
1973+ attributeNames .toArray (new String [attributeNames .size ()]));
1974+
1975+ for (Object o : attributes ) {
1976+ final Attribute attribute = (Attribute ) o ;
1977+ final Object value = attribute .getValue ();
1978+ final String valueStr =
1979+ (value != null ) ? value .toString () : "null" ;
1980+ // Truncate the value if it is too long
1981+ MetricsLog .info (mBeanNameName + ":" + attribute .getName () + "=" +
1982+ (valueStr .length () < MAX_LOGGED_VALUE_LEN ? valueStr :
1983+ valueStr .substring (0 , MAX_LOGGED_VALUE_LEN ) + "..." ));
1984+ }
1985+ } catch (Exception e ) {
1986+ MetricsLog .error ("Failed to get NameNode metrics for mbean " +
1987+ mbeanName .toString (), e );
1988+ }
1989+ }
1990+ MetricsLog .info (" << End NameNode metrics dump" );
1991+ }
1992+
1993+ private static boolean hasAppenders (Log logger ) {
1994+ if (!(logger instanceof Log4JLogger )) {
1995+ // Don't bother trying to determine the presence of appenders.
1996+ return true ;
1997+ }
1998+ Log4JLogger log4JLogger = ((Log4JLogger ) MetricsLog );
1999+ return log4JLogger .getLogger ().getAllAppenders ().hasMoreElements ();
2000+ }
2001+
2002+ /**
2003+ * Get the list of attributes for the MBean, filtering out a few
2004+ * attribute types.
2005+ */
2006+ private static Set <String > getFilteredAttributes (
2007+ MBeanInfo mBeanInfo ) {
2008+ Set <String > attributeNames = new HashSet <>();
2009+ for (MBeanAttributeInfo attributeInfo : mBeanInfo .getAttributes ()) {
2010+ if (!attributeInfo .getType ().equals (
2011+ "javax.management.openmbean.TabularData" ) &&
2012+ !attributeInfo .getType ().equals (
2013+ "javax.management.openmbean.CompositeData" ) &&
2014+ !attributeInfo .getType ().equals (
2015+ "[Ljavax.management.openmbean.CompositeData;" )) {
2016+ attributeNames .add (attributeInfo .getName ());
2017+ }
2018+ }
2019+ return attributeNames ;
2020+ }
2021+ }
18492022}
0 commit comments