Skip to content

Commit

Permalink
HBASE-23679 FileSystem objects leak when cleaned up in cleanupBulkLoad
Browse files Browse the repository at this point in the history
The cleanupBulkLoad method is only called for the first Region in the
table which was being bulk loaded into. This means that potentially N-1
other RegionServers (where N is the number of RegionServers) will leak
one FileSystem object into the FileSystem cache which will never be
cleaned up. We need to do this clean-up as a part of secureBulkLoadHFiles
otherwise we cannot guarantee that heap usage won't grow unbounded.

Closes #1029
  • Loading branch information
joshelser committed Jan 13, 2020
1 parent 0bf933b commit 63bf5f7
Showing 1 changed file with 20 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,26 +153,15 @@ public String prepareBulkLoad(final HRegion region, final PrepareBulkLoadRequest

public void cleanupBulkLoad(final HRegion region, final CleanupBulkLoadRequest request)
throws IOException {
try {
region.getCoprocessorHost().preCleanupBulkLoad(getActiveUser());
region.getCoprocessorHost().preCleanupBulkLoad(getActiveUser());

Path path = new Path(request.getBulkToken());
if (!fs.delete(path, true)) {
if (fs.exists(path)) {
throw new IOException("Failed to clean up " + path);
}
}
LOG.info("Cleaned up " + path + " successfully.");
} finally {
UserGroupInformation ugi = getActiveUser().getUGI();
try {
if (!UserGroupInformation.getLoginUser().equals(ugi) && !isUserReferenced(ugi)) {
FileSystem.closeAllForUGI(ugi);
}
} catch (IOException e) {
LOG.error("Failed to close FileSystem for: " + ugi, e);
Path path = new Path(request.getBulkToken());
if (!fs.delete(path, true)) {
if (fs.exists(path)) {
throw new IOException("Failed to clean up " + path);
}
}
LOG.trace("Cleaned up {} successfully.", path);
}

private Consumer<HRegion> fsCreatedListener;
Expand Down Expand Up @@ -281,6 +270,13 @@ public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
public Map<byte[], List<Path>> run() {
FileSystem fs = null;
try {
/*
* This is creating and caching a new FileSystem instance. Other code called
* "beneath" this method will rely on this FileSystem instance being in the
* cache. This is important as those methods make _no_ attempt to close this
* FileSystem instance. It is critical that here, in SecureBulkLoadManager,
* we are tracking the lifecycle and closing the FS when safe to do so.
*/
fs = FileSystem.get(conf);
for(Pair<byte[], String> el: familyPaths) {
Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
Expand All @@ -305,6 +301,13 @@ public Map<byte[], List<Path>> run() {
});
} finally {
decrementUgiReference(ugi);
try {
if (!UserGroupInformation.getLoginUser().equals(ugi) && !isUserReferenced(ugi)) {
FileSystem.closeAllForUGI(ugi);
}
} catch (IOException e) {
LOG.error("Failed to close FileSystem for: " + ugi, e);
}
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
}
Expand Down

0 comments on commit 63bf5f7

Please sign in to comment.