Skip to content

Commit

Permalink
HBASE-23679 FileSystem objects leak when cleaned up in cleanupBulkLoad
Browse files Browse the repository at this point in the history
The cleanupBulkLoad method is only called for the first Region in the
table which was being bulk loaded into. This means that potentially N-1
other RegionServers (where N is the number of RegionServers) will leak
one FileSystem object into the FileSystem cache which will never be
cleaned up. We need to do this clean-up as a part of secureBulkLoadHFiles
otherwise we cannot guarantee that heap usage won't grow unbounded.

Closes apache#1029

Signed-off-by: Sean Busbey <busbey@apache.org>
(cherry picked from commit 3d63bff)

Change-Id: Iffe6bea060649421d5c6c14fdf1400bdfcd2765e
  • Loading branch information
joshelser authored and Jenkins committed Jan 21, 2020
1 parent 8f78b24 commit 856282f
Showing 1 changed file with 20 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,26 +151,15 @@ public String prepareBulkLoad(final HRegion region, final PrepareBulkLoadRequest

public void cleanupBulkLoad(final HRegion region, final CleanupBulkLoadRequest request)
throws IOException {
try {
region.getCoprocessorHost().preCleanupBulkLoad(getActiveUser());
region.getCoprocessorHost().preCleanupBulkLoad(getActiveUser());

Path path = new Path(request.getBulkToken());
if (!fs.delete(path, true)) {
if (fs.exists(path)) {
throw new IOException("Failed to clean up " + path);
}
}
LOG.info("Cleaned up " + path + " successfully.");
} finally {
UserGroupInformation ugi = getActiveUser().getUGI();
try {
if (!UserGroupInformation.getLoginUser().equals(ugi) && !isUserReferenced(ugi)) {
FileSystem.closeAllForUGI(ugi);
}
} catch (IOException e) {
LOG.error("Failed to close FileSystem for: " + ugi, e);
Path path = new Path(request.getBulkToken());
if (!fs.delete(path, true)) {
if (fs.exists(path)) {
throw new IOException("Failed to clean up " + path);
}
}
LOG.trace("Cleaned up {} successfully.", path);
}

private Consumer<HRegion> fsCreatedListener;
Expand Down Expand Up @@ -279,6 +268,13 @@ public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
public Map<byte[], List<Path>> run() {
FileSystem fs = null;
try {
/*
* This is creating and caching a new FileSystem instance. Other code called
* "beneath" this method will rely on this FileSystem instance being in the
* cache. This is important as those methods make _no_ attempt to close this
* FileSystem instance. It is critical that here, in SecureBulkLoadManager,
* we are tracking the lifecycle and closing the FS when safe to do so.
*/
fs = FileSystem.get(conf);
for(Pair<byte[], String> el: familyPaths) {
Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
Expand All @@ -303,6 +299,13 @@ public Map<byte[], List<Path>> run() {
});
} finally {
decrementUgiReference(ugi);
try {
if (!UserGroupInformation.getLoginUser().equals(ugi) && !isUserReferenced(ugi)) {
FileSystem.closeAllForUGI(ugi);
}
} catch (IOException e) {
LOG.error("Failed to close FileSystem for: {}", ugi, e);
}
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
}
Expand Down

0 comments on commit 856282f

Please sign in to comment.