Skip to content

Commit 284cd10

Browse files
committed
HADOOP-13786 magic commit tracker PUTs a 0-byte file to the original destination; handles apps which expect the newly written file to *exist*. It doesn't handle them trying to read it though
1 parent 394584b commit 284cd10

File tree

4 files changed

+20
-3
lines changed

4 files changed

+20
-3
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1313,6 +1313,7 @@ PutObjectRequest newPutObjectRequest(String key,
13131313
ObjectMetadata metadata,
13141314
InputStream inputStream) {
13151315
Preconditions.checkNotNull(inputStream);
1316+
Preconditions.checkArgument(StringUtils.isNotEmpty(key), "Null/empty key");
13161317
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
13171318
inputStream, metadata);
13181319
setOptionalPutRequestParameters(putObjectRequest);

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/PendingCommitFSIntegration.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import org.apache.hadoop.fs.Path;
2727
import org.apache.hadoop.fs.s3a.S3AFileSystem;
28+
import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker;
2829

2930
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
3031

@@ -88,7 +89,9 @@ public DefaultPutTracker getTracker(Path path, String key) {
8889
String pendingKey = key + CommitConstants.PENDING_SUFFIX;
8990
tracker = new MagicCommitTracker(path,
9091
owner.getBucket(),
91-
destKey, pendingKey,
92+
key,
93+
destKey,
94+
pendingKey,
9295
owner.createWriteOperationHelper(pendingKey));
9396
} else {
9497
// standard multipart tracking

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitTracker.java renamed to hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.hadoop.fs.s3a.commit;
19+
package org.apache.hadoop.fs.s3a.commit.magic;
2020

2121
import java.io.ByteArrayInputStream;
2222
import java.io.IOException;
@@ -32,16 +32,18 @@
3232
import org.apache.hadoop.classification.InterfaceAudience;
3333
import org.apache.hadoop.fs.Path;
3434
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
35+
import org.apache.hadoop.fs.s3a.commit.DefaultPutTracker;
3536
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
3637

3738
/**
38-
* Put tracker for pending commits.
39+
* Put tracker for Magic commits.
3940
*/
4041
@InterfaceAudience.Private
4142
public class MagicCommitTracker extends DefaultPutTracker {
4243
public static final Logger LOG = LoggerFactory.getLogger(
4344
MagicCommitTracker.class);
4445

46+
private final String originalDestKey;
4547
private final String pendingPartKey;
4648
private final Path path;
4749
private final WriteOperationHelper writer;
@@ -51,18 +53,21 @@ public class MagicCommitTracker extends DefaultPutTracker {
5153
* Pending commit tracker.
5254
* @param path path nominally being written to
5355
* @param bucket dest bucket
56+
* @param originalDestKey the original key, in the magic directory.
5457
* @param destKey key for the destination
5558
* @param pendingPartKey key of the pending part
5659
* @param writer writer instance to use for operations
5760
*/
5861
public MagicCommitTracker(Path path,
5962
String bucket,
63+
String originalDestKey,
6064
String destKey,
6165
String pendingPartKey,
6266
WriteOperationHelper writer) {
6367
super(destKey);
6468
this.bucket = bucket;
6569
this.path = path;
70+
this.originalDestKey = originalDestKey;
6671
this.pendingPartKey = pendingPartKey;
6772
this.writer = writer;
6873
}
@@ -114,6 +119,13 @@ public boolean aboutToComplete(String uploadId,
114119
PutObjectRequest put = writer.newPutRequest(
115120
new ByteArrayInputStream(bytes), bytes.length);
116121
writer.uploadObject(put);
122+
123+
// now put a 0-byte file with the name of the original under-magic path
124+
byte[] EMPTY = new byte[0];
125+
PutObjectRequest originalDestPut = writer.createPutObjectRequest(
126+
originalDestKey,
127+
new ByteArrayInputStream(EMPTY), 0);
128+
writer.uploadObject(originalDestPut);
117129
return false;
118130
}
119131

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitOperations.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.hadoop.fs.Path;
3636
import org.apache.hadoop.fs.s3a.S3AFileSystem;
3737
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
38+
import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker;
3839
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
3940
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitterFactory;
4041
import org.apache.hadoop.mapreduce.TaskAttemptID;

0 commit comments

Comments
 (0)