Skip to content

Commit 01b6239

Browse files
steveloughrandeepakdamri
authored andcommitted
HADOOP-13786 Add S3A committer for zero-rename commits to S3 endpoints.
Contributed by Steve Loughran and Ryan Blue.
1 parent 1fdd52f commit 01b6239

File tree

7 files changed

+225
-5
lines changed

7 files changed

+225
-5
lines changed

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,22 @@ public boolean hasOutputPath() {
9191
return getOutputPath() != null;
9292
}
9393

94+
/**
95+
* Get the final directory where work will be placed once the job
96+
* is committed. This may be null, in which case, there is no output
97+
* path to write data to.
98+
* @return the path where final output of the job should be placed.
99+
*/
100+
public abstract Path getOutputPath();
101+
102+
/**
103+
* Predicate: is there an output path?
104+
* @return true if we have an output path set, else false.
105+
*/
106+
public boolean hasOutputPath() {
107+
return getOutputPath() != null;
108+
}
109+
94110
/**
95111
* Get the directory that the task should write results into.
96112
* Warning: there's no guarantee that this work path is on the same

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class S3ObjectAttributes {
3535
private final S3AEncryptionMethods serverSideEncryptionAlgorithm;
3636
private final String serverSideEncryptionKey;
3737

38-
public S3ObjectAttributes(
38+
S3ObjectAttributes(
3939
String bucket,
4040
String key,
4141
S3AEncryptionMethods serverSideEncryptionAlgorithm,
@@ -46,19 +46,19 @@ public S3ObjectAttributes(
4646
this.serverSideEncryptionKey = serverSideEncryptionKey;
4747
}
4848

49-
public String getBucket() {
49+
String getBucket() {
5050
return bucket;
5151
}
5252

53-
public String getKey() {
53+
String getKey() {
5454
return key;
5555
}
5656

57-
public S3AEncryptionMethods getServerSideEncryptionAlgorithm() {
57+
S3AEncryptionMethods getServerSideEncryptionAlgorithm() {
5858
return serverSideEncryptionAlgorithm;
5959
}
6060

61-
public String getServerSideEncryptionKey() {
61+
String getServerSideEncryptionKey() {
6262
return serverSideEncryptionKey;
6363
}
6464
}

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ public void testFakeDirectoryDeletion() throws Throwable {
200200
// before the internal behavior w/ or w/o metadata store.
201201
// assumeFalse(fs.hasMetadataStore());
202202

203+
skipDuringFaultInjection(fs);
204+
203205
Path srcBaseDir = path("src");
204206
mkdirs(srcBaseDir);
205207
MetricDiff deleteRequests =
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a.commit.magic;
20+
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.fs.Path;
23+
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
24+
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
25+
26+
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
27+
28+
/**
29+
* Full integration test for the Magic Committer.
30+
*
31+
* There's no need to disable the committer setting for the filesystem here,
32+
* because the committers are being instantiated in their own processes;
33+
* the settings in {@link #applyCustomConfigOptions(Configuration)} are
34+
* passed down to these processes.
35+
*/
36+
public class ITMagicCommitMRJob extends AbstractITCommitMRJob {
37+
38+
/**
39+
* Need consistency here.
40+
* @return false
41+
*/
42+
@Override
43+
public boolean useInconsistentClient() {
44+
return false;
45+
}
46+
47+
@Override
48+
protected String committerName() {
49+
return MagicS3GuardCommitter.NAME;
50+
}
51+
52+
/**
53+
* Turn on the magic commit support for the FS, else nothing will work.
54+
* @param conf configuration
55+
*/
56+
@Override
57+
protected void applyCustomConfigOptions(Configuration conf) {
58+
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
59+
}
60+
61+
/**
62+
* Check that the magic dir was cleaned up.
63+
* {@inheritDoc}
64+
*/
65+
@Override
66+
protected void customPostExecutionValidation(Path destPath,
67+
SuccessData successData) throws Exception {
68+
assertPathDoesNotExist("No cleanup", new Path(destPath, MAGIC));
69+
}
70+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a.commit.staging.integration;
20+
21+
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
22+
import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
23+
24+
/**
25+
* Full integration test for the directory committer.
26+
*/
27+
public class ITDirectoryCommitMRJob extends AbstractITCommitMRJob {
28+
29+
@Override
30+
protected String committerName() {
31+
return DirectoryStagingCommitter.NAME;
32+
}
33+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a.commit.staging.integration;
20+
21+
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
22+
import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter;
23+
24+
/**
25+
* Full integration test for the partition committer.
26+
*/
27+
public class ITPartitionCommitMRJob extends AbstractITCommitMRJob {
28+
29+
@Override
30+
protected String committerName() {
31+
return PartitionedStagingCommitter.NAME;
32+
}
33+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a.commit.staging.integration;
20+
21+
import org.junit.Test;
22+
23+
import org.hamcrest.core.StringContains;
24+
import org.hamcrest.core.StringEndsWith;
25+
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.fs.FileSystem;
27+
import org.apache.hadoop.fs.Path;
28+
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
29+
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
30+
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
31+
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants;
32+
import org.apache.hadoop.security.UserGroupInformation;
33+
34+
import static org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory;
35+
36+
/**
37+
* Full integration test for the staging committer.
38+
*/
39+
public class ITStagingCommitMRJob extends AbstractITCommitMRJob {
40+
41+
@Override
42+
protected String committerName() {
43+
return StagingCommitter.NAME;
44+
}
45+
46+
/**
47+
* Verify that staging commit dirs are made absolute under the user's
48+
* home directory, so, in a secure cluster, private.
49+
*/
50+
@Test
51+
public void testStagingDirectory() throws Throwable {
52+
FileSystem hdfs = getDFS();
53+
Configuration conf = hdfs.getConf();
54+
conf.set(CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH,
55+
"private");
56+
Path dir = getMultipartUploadCommitsDirectory(conf, "UUID");
57+
assertThat(dir.toString(), StringEndsWith.endsWith(
58+
"UUID/"
59+
+ StagingCommitterConstants.STAGING_UPLOADS));
60+
assertTrue("path unqualified", dir.isAbsolute());
61+
String self = UserGroupInformation.getCurrentUser().getShortUserName();
62+
assertThat(dir.toString(),
63+
StringContains.containsString("/user/" + self + "/private"));
64+
}
65+
66+
}

0 commit comments

Comments
 (0)