2424import java .lang .reflect .Method ;
2525import java .lang .reflect .Proxy ;
2626import java .net .URI ;
27+ import java .util .concurrent .Callable ;
28+ import java .util .concurrent .ExecutionException ;
29+ import java .util .concurrent .RejectedExecutionException ;
30+ import java .util .concurrent .Future ;
2731import java .util .concurrent .TimeUnit ;
32+ import java .util .concurrent .TimeoutException ;
2833import java .util .List ;
2934
3035import org .apache .hadoop .classification .InterfaceAudience ;
4651import org .apache .hadoop .ipc .RemoteException ;
4752import org .apache .hadoop .ipc .RpcInvocationHandler ;
4853import org .apache .hadoop .ipc .StandbyException ;
54+ import org .apache .hadoop .util .BlockingThreadPoolExecutorService ;
4955import org .apache .hadoop .util .Time ;
5056import org .slf4j .Logger ;
5157import org .slf4j .LoggerFactory ;
@@ -88,6 +94,17 @@ public class ObserverReadProxyProvider<T>
8894 /** Observer probe retry period default to 10 min. */
8995 static final long OBSERVER_PROBE_RETRY_PERIOD_DEFAULT = 60 * 10 * 1000 ;
9096
97+ /**
98+ * Timeout in ms to cancel the ha-state probe rpc request for an namenode.
99+ * To disable timeout, set it to 0 or a negative value.
100+ */
101+ static final String NAMENODE_HA_STATE_PROBE_TIMEOUT =
102+ HdfsClientConfigKeys .Failover .PREFIX + "namenode.ha-state.probe.timeout" ;
103+ /**
104+ * Default to disable namenode ha-state probe timeout.
105+ */
106+ static final long NAMENODE_HA_STATE_PROBE_TIMEOUT_DEFAULT = 0 ;
107+
91108 /** The inner proxy provider used for active/standby failover. */
92109 private final AbstractNNFailoverProxyProvider <T > failoverProxy ;
93110 /** List of all NameNode proxies. */
@@ -155,12 +172,22 @@ public class ObserverReadProxyProvider<T>
155172 */
156173 private long observerProbeRetryPeriodMs ;
157174
175+ /**
176+ * Timeout in ms when we try to get the HA state of a namenode.
177+ */
178+ private long namenodeHAStateProbeTimeoutMs ;
179+
158180 /**
159181 * The previous time where zero observer were found. If there was observer,
160182 * or it is initialization, this is set to 0.
161183 */
162184 private long lastObserverProbeTime ;
163185
186+ /**
187+ * Threadpool to send the getHAServiceState requests.
188+ */
189+ private final BlockingThreadPoolExecutorService nnProbingThreadPool ;
190+
164191 /**
165192 * By default ObserverReadProxyProvider uses
166193 * {@link ConfiguredFailoverProxyProvider} for failover.
@@ -213,6 +240,8 @@ public ObserverReadProxyProvider(
213240 observerProbeRetryPeriodMs = conf .getTimeDuration (
214241 OBSERVER_PROBE_RETRY_PERIOD_KEY ,
215242 OBSERVER_PROBE_RETRY_PERIOD_DEFAULT , TimeUnit .MILLISECONDS );
243+ namenodeHAStateProbeTimeoutMs = conf .getTimeDuration (NAMENODE_HA_STATE_PROBE_TIMEOUT ,
244+ NAMENODE_HA_STATE_PROBE_TIMEOUT_DEFAULT , TimeUnit .MILLISECONDS );
216245
217246 // TODO : make this configurable or remove this variable
218247 if (wrappedProxy instanceof ClientProtocol ) {
@@ -222,6 +251,15 @@ public ObserverReadProxyProvider(
222251 + "class does not implement {}" , uri , ClientProtocol .class .getName ());
223252 this .observerReadEnabled = false ;
224253 }
254+
255+ /*
256+ * At most 4 threads will be running and each thread will die after 10
257+ * seconds of no use. Up to 132 tasks (4 active + 128 waiting) can be
258+ * submitted simultaneously.
259+ */
260+ nnProbingThreadPool =
261+ BlockingThreadPoolExecutorService .newInstance (4 , 128 , 10L , TimeUnit .SECONDS ,
262+ "nn-ha-state-probing" );
225263 }
226264
227265 public AlignmentContext getAlignmentContext () {
@@ -285,13 +323,62 @@ private synchronized NNProxyInfo<T> changeProxy(NNProxyInfo<T> initial) {
285323 }
286324 currentIndex = (currentIndex + 1 ) % nameNodeProxies .size ();
287325 currentProxy = createProxyIfNeeded (nameNodeProxies .get (currentIndex ));
288- currentProxy .setCachedState (getHAServiceState (currentProxy ));
326+ currentProxy .setCachedState (getHAServiceStateWithTimeout (currentProxy ));
289327 LOG .debug ("Changed current proxy from {} to {}" ,
290328 initial == null ? "none" : initial .proxyInfo ,
291329 currentProxy .proxyInfo );
292330 return currentProxy ;
293331 }
294332
333+ /**
334+ * Execute getHAServiceState() call with a timeout, to avoid a long wait when
335+ * an NN becomes irresponsive to rpc requests
336+ * (when a thread/heap dump is being taken, e.g.).
337+ *
338+ * For each getHAServiceState() call, a task is created and submitted to a
339+ * threadpool for execution. We will wait for a response up to
340+ * namenodeHAStateProbeTimeoutSec and cancel these requests if they time out.
341+ *
342+ * The implementation is split into two functions so that we can unit test
343+ * the second function.
344+ */
345+ HAServiceState getHAServiceStateWithTimeout (final NNProxyInfo <T > proxyInfo ) {
346+ try {
347+ Future <HAServiceState > task = nnProbingThreadPool .submit (() -> getHAServiceState (proxyInfo ));
348+ return getHAServiceStateWithTimeout (proxyInfo , task );
349+ } catch (RejectedExecutionException e ) {
350+ LOG .warn ("Run out of threads to submit the request to query HA state. "
351+ + "Ok to return null and we will fallback to use active NN to serve "
352+ + "this request." );
353+ return null ;
354+ }
355+ }
356+
357+ HAServiceState getHAServiceStateWithTimeout (final NNProxyInfo <T > proxyInfo ,
358+ Future <HAServiceState > task ) {
359+ HAServiceState state = null ;
360+ try {
361+ if (namenodeHAStateProbeTimeoutMs > 0 ) {
362+ state = task .get (namenodeHAStateProbeTimeoutMs , TimeUnit .MILLISECONDS );
363+ } else {
364+ // Disable timeout by waiting indefinitely when namenodeHAStateProbeTimeoutSec is set to 0
365+ // or a negative value.
366+ state = task .get ();
367+ }
368+ LOG .debug ("HA State for {} is {}" , proxyInfo .proxyInfo , state );
369+ } catch (TimeoutException e ) {
370+ // Cancel the task on timeout
371+ String msg = String .format ("Cancel NN probe task due to timeout for %s" , proxyInfo .proxyInfo );
372+ LOG .warn (msg , e );
373+ task .cancel (true );
374+ } catch (InterruptedException |ExecutionException e ) {
375+ String msg = String .format ("Exception in NN probe task for %s" , proxyInfo .proxyInfo );
376+ LOG .warn (msg , e );
377+ }
378+
379+ return state ;
380+ }
381+
295382 /**
296383 * Fetch the service state from a proxy. If it is unable to be fetched,
297384 * assume it is in standby state, but log the exception.
@@ -554,6 +641,7 @@ public synchronized void close() throws IOException {
554641 }
555642 }
556643 failoverProxy .close ();
644+ nnProbingThreadPool .shutdown ();
557645 }
558646
559647 @ Override
0 commit comments