@@ -78,8 +78,13 @@ public class HadoopFileIO implements HadoopConfigurable, DelegateFileIO {
7878 */
7979 private final AtomicBoolean bulkDeleteConfigured = new AtomicBoolean (false );
8080
81- /** Dynamically loaded accessor of Hadoop Wrapped IO classes. */
82- private transient DynamicWrappedIO wrappedIO ;
81+ /**
82+ * Dynamically loaded accessor of Hadoop Wrapped IO classes.
83+ * Marked as volatile as its creation in
84+ * {@link #maybeUseBulkDeleteApi()} is synchronized and IDEs
85+ * then complain about mixed use.
86+ */
87+ private volatile transient DynamicWrappedIO wrappedIO ;
8388
8489 /**
8590 * Flag to indicate that bulk delete is present and should be used.
@@ -197,7 +202,7 @@ public void deletePrefix(String prefix) {
197202 *
198203 * @return true if bulk delete should be used.
199204 */
200- private synchronized boolean useBulkDeleteApi () {
205+ private synchronized boolean maybeUseBulkDeleteApi () {
201206 if (!bulkDeleteConfigured .compareAndSet (false , true )) {
202207 // configured already, so return.
203208 return useBulkDelete ;
@@ -210,14 +215,12 @@ private synchronized boolean useBulkDeleteApi() {
210215 // library is configured to use bulk delete, so try to load it
211216 // and probe for the bulk delete methods being found.
212217 // this is only satisfied on Hadoop releases with the WrappedIO class.
213- if (wrappedIO == null ) {
214- wrappedIO = new DynamicWrappedIO (this .getClass ().getClassLoader ());
215- useBulkDelete = wrappedIO .bulkDeleteAvailable ();
216- if (useBulkDelete ) {
217- LOG .debug ("Bulk delete is enabled and available" );
218- } else {
219- LOG .debug ("Bulk delete enabled but not available" );
220- }
218+ wrappedIO = new DynamicWrappedIO (this .getClass ().getClassLoader ());
219+ useBulkDelete = wrappedIO .bulkDeleteAvailable ();
220+ if (useBulkDelete ) {
221+ LOG .debug ("Bulk delete is enabled and available" );
222+ } else {
223+ LOG .debug ("Bulk delete enabled but not available" );
221224 }
222225 }
223226 return useBulkDelete ;
@@ -230,13 +233,22 @@ private synchronized boolean useBulkDeleteApi() {
230233 * @return true if bulk delete is enabled.
231234 */
232235 boolean isBulkDeleteApiUsed () {
233- return useBulkDeleteApi ();
236+ return maybeUseBulkDeleteApi ();
234237 }
235238
239+ /**
240+ * Delete files.
241+ * <p>
242+ * If the Hadoop bulk deletion API is available and enabled, this API is used through
243+ * {@link #bulkDeleteFiles(Iterable)}.
244+ * Otherwise, each file is deleted individually in the thread pool.
245+ * @param pathsToDelete The paths to delete
246+ * @throws BulkDeletionFailureException failure to delete one or more files.
247+ */
236248 @ Override
237249 public void deleteFiles (Iterable <String > pathsToDelete ) throws BulkDeletionFailureException {
238250 AtomicInteger failureCount = new AtomicInteger (0 );
239- if (useBulkDeleteApi ()) {
251+ if (maybeUseBulkDeleteApi ()) {
240252 // bulk delete.
241253 failureCount .set (bulkDeleteFiles (pathsToDelete ));
242254 } else {
@@ -295,30 +307,59 @@ private int bulkDeleteFiles(Iterable<String> pathnames) {
295307 // the root path of each filesystem.
296308 SetMultimap <Path , Path > fsMap =
297309 Multimaps .newSetMultimap (Maps .newHashMap (), Sets ::newHashSet );
310+
311+ // this map of filesystem root to page size reduces the amount of
312+ // reflective invocations on the filesystems needed, and any work there.
313+ // this ensures that on scale tests with the default "page size == 1" bulk
314+ // delete implementation, execution time is no slower than the classic
315+ // delete implementation.
316+ Map <Path , Integer > fsPageSizeMap = Maps .newHashMap ();
317+
318+ // deletion tasks submitted.
298319 List <Future <List <Map .Entry <Path , String >>>> deletionTasks = Lists .newArrayList ();
320+
299321 final Path rootPath = new Path ("/" );
300322 final Configuration conf = hadoopConf .get ();
301- int deletedFiles = 0 ;
323+ int totalFailedDeletions = 0 ;
324+
302325 for (String name : pathnames ) {
303326 Path target = new Path (name );
304- FileSystem fs = Util .getFs (target , conf );
327+ final FileSystem fs ;
328+ try {
329+ fs = Util .getFs (target , conf );
330+ } catch (Exception e ) {
331+ // any failure to find/load a filesystem
332+ LOG .warn ("Failed to get filesystem for path: {}" , target , e );
333+ totalFailedDeletions ++;
334+ continue ;
335+ }
305336 // build root path of the filesystem.
306337 Path fsRoot = fs .makeQualified (rootPath );
338+ int pageSize ;
339+ if (!fsPageSizeMap .containsKey (fsRoot )) {
340+ pageSize = wrappedIO .bulkDelete_pageSize (fs , rootPath );
341+ fsPageSizeMap .put (fsRoot , pageSize );
342+ } else {
343+ pageSize = fsPageSizeMap .get (fsRoot );
344+ }
345+
307346 // retrieve or create set paths for the specific filesystem
308347 Set <Path > pathsForFilesystem = fsMap .get (fsRoot );
348+ // add the target. This updates the value in the map.
309349 pathsForFilesystem .add (target );
310350
311- // determine the page size for the target filesystem.
312- int pageSize = wrappedIO .bulkDelete_pageSize (fs , target );
313-
314- // the page size has been reached.
315- // for classic filesystems page size == 1 so this happens every time.
316- // hence: try and keep it efficient.
317351 if (pathsForFilesystem .size () == pageSize ) {
318- LOG .debug ("Queueing batch delete for filesystem {}: file count {}" , fsRoot , pageSize );
352+ // the page size has been reached.
353+ // for classic filesystems page size == 1 so this happens every time.
354+ // hence: try and keep it efficient.
355+
356+ // clone the live path list, which MUST be done outside the async
357+ // submitted closure.
319358 HashSet <Path > paths = Sets .newHashSet (pathsForFilesystem );
359+ // submit the batch deletion task.
320360 deletionTasks .add (executorService ().submit (() ->
321361 deleteBatch (fs , fsRoot , paths )));
362+ // remove all paths for this fs from the map.
322363 fsMap .removeAll (fsRoot );
323364 }
324365 }
@@ -328,14 +369,14 @@ private int bulkDeleteFiles(Iterable<String> pathnames) {
328369 for (Map .Entry <Path , Collection <Path >> pathsToDeleteByFileSystem :
329370 fsMap .asMap ().entrySet ()) {
330371 Path fsRoot = pathsToDeleteByFileSystem .getKey ();
331-
332372 deletionTasks .add (executorService ().submit (() ->
333373 deleteBatch (Util .getFs (fsRoot , conf ),
334374 fsRoot ,
335375 pathsToDeleteByFileSystem .getValue ())));
336376 }
337377
338- int totalFailedDeletions = 0 ;
378+ // Wait for all deletion tasks to complete and count the failures.
379+ LOG .debug ("Waiting for {} deletion tasks to complete" , deletionTasks .size ());
339380
340381 for (Future <List <Map .Entry <Path , String >>> deletionTask : deletionTasks ) {
341382 try {
@@ -362,11 +403,12 @@ private int bulkDeleteFiles(Iterable<String> pathnames) {
362403 * @param fsRoot root of the filesytem (all paths to delete must be under this).
363404 * @param paths paths to delete.
364405 *
365- * @return the list of paths that couldn't be deleted.
406+ * @return the list of paths which couldn't be deleted.
366407 * @throws UncheckedIOException if an IOE was raised in the invoked methods.
367408 */
368409 private List <Map .Entry <Path , String >> deleteBatch (FileSystem fs , final Path fsRoot , Collection <Path > paths ) {
369410
411+ LOG .debug ("Deleting batch of {} files under {}" , paths .size (), fsRoot );
370412 return wrappedIO .bulkDelete_delete (fs , fsRoot , paths );
371413 }
372414
0 commit comments