@@ -69,6 +69,11 @@ public class ClientContext {
6969 */
7070 private final String name ;
7171
72+ /**
73+ * The client conf used to initialize context.
74+ */
75+ private final DfsClientConf dfsClientConf ;
76+
7277 /**
7378 * String representation of the configuration.
7479 */
@@ -130,6 +135,17 @@ public class ClientContext {
130135 */
131136 private volatile DeadNodeDetector deadNodeDetector = null ;
132137
138+ /**
139+ * The switch for the {@link LocatedBlocksRefresher}.
140+ */
141+ private final boolean locatedBlocksRefresherEnabled ;
142+
143+ /**
144+ * Periodically refresh the {@link org.apache.hadoop.hdfs.protocol.LocatedBlocks} backing
145+ * registered {@link DFSInputStream}s, to take advantage of changes in block placement.
146+ */
147+ private volatile LocatedBlocksRefresher locatedBlocksRefresher = null ;
148+
133149 /**
134150 * Count the reference of ClientContext.
135151 */
@@ -146,6 +162,7 @@ private ClientContext(String name, DfsClientConf conf,
146162 final ShortCircuitConf scConf = conf .getShortCircuitConf ();
147163
148164 this .name = name ;
165+ this .dfsClientConf = conf ;
149166 this .confString = scConf .confAsString ();
150167 this .clientShortCircuitNum = conf .getClientShortCircuitNum ();
151168 this .shortCircuitCache = new ShortCircuitCache [this .clientShortCircuitNum ];
@@ -164,6 +181,7 @@ private ClientContext(String name, DfsClientConf conf,
164181 this .byteArrayManager = ByteArrayManager .newInstance (
165182 conf .getWriteByteArrayManagerConf ());
166183 this .deadNodeDetectionEnabled = conf .isDeadNodeDetectionEnabled ();
184+ this .locatedBlocksRefresherEnabled = conf .isLocatedBlocksRefresherEnabled ();
167185 initTopologyResolution (config );
168186 }
169187
@@ -301,6 +319,21 @@ public DeadNodeDetector getDeadNodeDetector() {
301319 return deadNodeDetector ;
302320 }
303321
322+ /**
323+ * If true, LocatedBlocksRefresher will be periodically refreshing LocatedBlocks
324+ * of registered DFSInputStreams.
325+ */
326+ public boolean isLocatedBlocksRefresherEnabled () {
327+ return locatedBlocksRefresherEnabled ;
328+ }
329+
330+ /**
331+ * Obtain LocatedBlocksRefresher of the current client.
332+ */
333+ public LocatedBlocksRefresher getLocatedBlocksRefresher () {
334+ return locatedBlocksRefresher ;
335+ }
336+
304337 /**
305338 * Increment the counter. Start the dead node detector thread if there is no
306339 * reference.
@@ -311,6 +344,10 @@ synchronized void reference() {
311344 deadNodeDetector = new DeadNodeDetector (name , configuration );
312345 deadNodeDetector .start ();
313346 }
347+ if (locatedBlocksRefresherEnabled && locatedBlocksRefresher == null ) {
348+ locatedBlocksRefresher = new LocatedBlocksRefresher (name , configuration , dfsClientConf );
349+ locatedBlocksRefresher .start ();
350+ }
314351 }
315352
316353 /**
@@ -324,5 +361,10 @@ synchronized void unreference() {
324361 deadNodeDetector .shutdown ();
325362 deadNodeDetector = null ;
326363 }
364+
365+ if (counter == 0 && locatedBlocksRefresherEnabled && locatedBlocksRefresher != null ) {
366+ locatedBlocksRefresher .shutdown ();
367+ locatedBlocksRefresher = null ;
368+ }
327369 }
328370}
0 commit comments