Skip to content

Commit

Permalink
HADOOP-16570. S3A committers encounter scale issues.
Browse files Browse the repository at this point in the history
Contributed by Steve Loughran.

This addresses two scale issues which has surfaced in large scale benchmarks
of the S3A Committers.

* Thread pools are not cleaned up.
  This now happens, with tests.

* OOM on job commit for jobs with many thousands of tasks,
  each generating tens of (very large) files.

Instead of loading all pending commits into memory as a single list, the list
of files to load is the sole list which is passed around; .pendingset files are
loaded and processed in isolation -and reloaded if necessary for any
abort/rollback operation.

The parallel commit/abort/revert operations now work at the .pendingset level,
rather than that of individual pending commit files. The existing parallelized
Tasks API is still used to commit those files, but with a null thread pool, so
as to serialize the operations.

Change-Id: I5c8240cd31800eaa83d112358770ca0eb2bca797
  • Loading branch information
steveloughran committed Oct 4, 2019
1 parent aa24add commit 6574f27
Show file tree
Hide file tree
Showing 22 changed files with 1,123 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -837,4 +837,10 @@ private Constants() {
public static final String AWS_SERVICE_IDENTIFIER_S3 = "S3";
public static final String AWS_SERVICE_IDENTIFIER_DDB = "DDB";
public static final String AWS_SERVICE_IDENTIFIER_STS = "STS";

/**
* How long to wait for the thread pool to terminate when cleaning up.
* Value: {@value} seconds.
*/
public static final int THREAD_POOL_SHUTDOWN_DELAY_SECONDS = 30;
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.apache.hadoop.util.concurrent.HadoopExecutors;

import static org.apache.hadoop.fs.impl.AbstractFSBuilderImpl.rejectUnknownMandatoryKeys;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
Expand Down Expand Up @@ -3062,6 +3063,12 @@ public void close() throws IOException {
transfers.shutdownNow(true);
transfers = null;
}
HadoopExecutors.shutdown(boundedThreadPool, LOG,
THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
boundedThreadPool = null;
HadoopExecutors.shutdown(unboundedThreadPool, LOG,
THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
unboundedThreadPool = null;
S3AUtils.closeAll(LOG, metadataStore, instrumentation);
metadataStore = null;
instrumentation = null;
Expand Down Expand Up @@ -4064,7 +4071,7 @@ public List<MultipartUpload> listMultipartUploads(String prefix)
*/
@Retries.OnceRaw
void abortMultipartUpload(String destKey, String uploadId) {
LOG.info("Aborting multipart upload {} to {}", uploadId, destKey);
LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey);
getAmazonS3Client().abortMultipartUpload(
new AbortMultipartUploadRequest(getBucket(),
destKey,
Expand All @@ -4084,7 +4091,7 @@ void abortMultipartUpload(MultipartUpload upload) {
uploadId = upload.getUploadId();
if (LOG.isInfoEnabled()) {
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
LOG.info("Aborting multipart upload {} to {} initiated by {} on {}",
LOG.debug("Aborting multipart upload {} to {} initiated by {} on {}",
uploadId, destKey, upload.getInitiator(),
df.format(upload.getInitiated()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,11 +611,14 @@ public void getMetrics(MetricsCollector collector, boolean all) {

public void close() {
synchronized (metricsSystemLock) {
// it is critical to close each quantile, as they start a scheduled
// task in a shared thread pool.
putLatencyQuantile.stop();
throttleRateQuantile.stop();
metricsSystem.unregisterSource(metricsSourceName);
int activeSources = --metricsSourceActiveCounter;
if (activeSources == 0) {
LOG.debug("Shutting down metrics publisher");
metricsSystem.publishMetricsNow();
metricsSystem.shutdown();
metricsSystem = null;
Expand Down
Loading

0 comments on commit 6574f27

Please sign in to comment.