From 1d2c3efc690f6c408a99a20942b8fa81c77fb703 Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Mon, 13 Jan 2020 18:30:30 -0500 Subject: [PATCH] HBASE-23679 FileSystem objects leak when cleaned up in cleanupBulkLoad The cleanupBulkLoad method is only called for the first Region in the table which was being bulk loaded into. This means that potentially N-1 other RegionServers (where N is the number of RegionServers) will leak one FileSystem object into the FileSystem cache which will never be cleaned up. We need to do this clean-up as a part of secureBulkLoadHFiles otherwise we cannot guarantee that heap usage won't grow unbounded. Closes #1029 Signed-off-by: Sean Busbey --- .../regionserver/SecureBulkLoadManager.java | 37 ++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index bccc8fed459b..599c89820c25 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -153,26 +153,15 @@ public String prepareBulkLoad(final HRegion region, final PrepareBulkLoadRequest public void cleanupBulkLoad(final HRegion region, final CleanupBulkLoadRequest request) throws IOException { - try { - region.getCoprocessorHost().preCleanupBulkLoad(getActiveUser()); + region.getCoprocessorHost().preCleanupBulkLoad(getActiveUser()); - Path path = new Path(request.getBulkToken()); - if (!fs.delete(path, true)) { - if (fs.exists(path)) { - throw new IOException("Failed to clean up " + path); - } - } - LOG.info("Cleaned up " + path + " successfully."); - } finally { - UserGroupInformation ugi = getActiveUser().getUGI(); - try { - if (!UserGroupInformation.getLoginUser().equals(ugi) && !isUserReferenced(ugi)) { - FileSystem.closeAllForUGI(ugi); - } - } catch (IOException e) { - LOG.error("Failed to close FileSystem for: " + ugi, e); + Path path = new Path(request.getBulkToken()); + if (!fs.delete(path, true)) { + if (fs.exists(path)) { + throw new IOException("Failed to clean up " + path); } } + LOG.trace("Cleaned up {} successfully.", path); } private Consumer fsCreatedListener; @@ -281,6 +270,13 @@ public Map> secureBulkLoadHFiles(final HRegion region, public Map> run() { FileSystem fs = null; try { + /* + * This is creating and caching a new FileSystem instance. Other code called + * "beneath" this method will rely on this FileSystem instance being in the + * cache. This is important as those methods make _no_ attempt to close this + * FileSystem instance. It is critical that here, in SecureBulkLoadManager, + * we are tracking the lifecycle and closing the FS when safe to do so. + */ fs = FileSystem.get(conf); for(Pair el: familyPaths) { Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst())); @@ -305,6 +301,13 @@ public Map> run() { }); } finally { decrementUgiReference(ugi); + try { + if (!UserGroupInformation.getLoginUser().equals(ugi) && !isUserReferenced(ugi)) { + FileSystem.closeAllForUGI(ugi); + } + } catch (IOException e) { + LOG.error("Failed to close FileSystem for: {}", ugi, e); + } if (region.getCoprocessorHost() != null) { region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map); }