diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index efc85cfc387a..90e92279f9a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -151,26 +151,15 @@ public String prepareBulkLoad(final HRegion region, final PrepareBulkLoadRequest public void cleanupBulkLoad(final HRegion region, final CleanupBulkLoadRequest request) throws IOException { - try { - region.getCoprocessorHost().preCleanupBulkLoad(getActiveUser()); + region.getCoprocessorHost().preCleanupBulkLoad(getActiveUser()); - Path path = new Path(request.getBulkToken()); - if (!fs.delete(path, true)) { - if (fs.exists(path)) { - throw new IOException("Failed to clean up " + path); - } - } - LOG.info("Cleaned up " + path + " successfully."); - } finally { - UserGroupInformation ugi = getActiveUser().getUGI(); - try { - if (!UserGroupInformation.getLoginUser().equals(ugi) && !isUserReferenced(ugi)) { - FileSystem.closeAllForUGI(ugi); - } - } catch (IOException e) { - LOG.error("Failed to close FileSystem for: " + ugi, e); + Path path = new Path(request.getBulkToken()); + if (!fs.delete(path, true)) { + if (fs.exists(path)) { + throw new IOException("Failed to clean up " + path); } } + LOG.trace("Cleaned up {} successfully.", path); } private Consumer fsCreatedListener; @@ -279,6 +268,13 @@ public Map> secureBulkLoadHFiles(final HRegion region, public Map> run() { FileSystem fs = null; try { + /* + * This is creating and caching a new FileSystem instance. Other code called + * "beneath" this method will rely on this FileSystem instance being in the + * cache. This is important as those methods make _no_ attempt to close this + * FileSystem instance. It is critical that here, in SecureBulkLoadManager, + * we are tracking the lifecycle and closing the FS when safe to do so. + */ fs = FileSystem.get(conf); for(Pair el: familyPaths) { Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst())); @@ -303,6 +299,13 @@ public Map> run() { }); } finally { decrementUgiReference(ugi); + try { + if (!UserGroupInformation.getLoginUser().equals(ugi) && !isUserReferenced(ugi)) { + FileSystem.closeAllForUGI(ugi); + } + } catch (IOException e) { + LOG.error("Failed to close FileSystem for: {}", ugi, e); + } if (region.getCoprocessorHost() != null) { region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map); }