Skip to content

Commit f0144a7

Browse files
author
Bhavay Pahuja
committed
HADOOP-14837: Glacier read restored objects support
1 parent 8243da8 commit f0144a7

File tree

9 files changed

+311
-8
lines changed

9 files changed

+311
-8
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1495,6 +1495,12 @@ private Constants() {
14951495
*/
14961496
public static final int DEFAULT_PREFETCH_MAX_BLOCKS_COUNT = 4;
14971497

1498+
/**
1499+
* Read Restored Glacier objects config.
1500+
* Value = {@value}
1501+
*/
1502+
public static final String READ_RESTORED_GLACIER_OBJECTS = "fs.s3a.glacier.read.restored.objects";
1503+
14981504
/**
14991505
* The bucket region header.
15001506
*/

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public class Listing extends AbstractStoreOperation {
7676

7777
private static final Logger LOG = S3AFileSystem.LOG;
7878
private final boolean isCSEEnabled;
79+
private final S3ObjectStorageClassFilter s3ObjectStorageClassFilter;
7980

8081
static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N =
8182
new AcceptAllButS3nDirs();
@@ -87,6 +88,7 @@ public Listing(ListingOperationCallbacks listingOperationCallbacks,
8788
super(storeContext);
8889
this.listingOperationCallbacks = listingOperationCallbacks;
8990
this.isCSEEnabled = storeContext.isCSEEnabled();
91+
this.s3ObjectStorageClassFilter = storeContext.s3ObjectsStorageClassFilter();
9092
}
9193

9294
/**
@@ -143,7 +145,8 @@ public FileStatusListingIterator createFileStatusListingIterator(
143145
return new FileStatusListingIterator(
144146
createObjectListingIterator(listPath, request, span),
145147
filter,
146-
acceptor);
148+
acceptor
149+
);
147150
}
148151

149152
/**
@@ -307,6 +310,30 @@ interface FileStatusAcceptor {
307310
* @return true if the status is accepted else false
308311
*/
309312
boolean accept(FileStatus status);
313+
314+
/**
315+
* Resolve a list of file status acceptors into a single FileStatusAcceptor
316+
* @param acceptors List of file status acceptors
317+
* @return combined FileStatusAcceptor
318+
*/
319+
static FileStatusAcceptor resolveFileStatusAcceptors(List<FileStatusAcceptor> acceptors) {
320+
return new FileStatusAcceptor() {
321+
@Override
322+
public boolean accept(Path keyPath, S3Object s3Object) {
323+
return acceptors.stream().map(acceptor -> acceptor.accept(keyPath, s3Object)).reduce(true, Boolean::logicalAnd);
324+
}
325+
326+
@Override
327+
public boolean accept(Path keyPath, String commonPrefix) {
328+
return acceptors.stream().map(acceptor -> acceptor.accept(keyPath, commonPrefix)).reduce(true, Boolean::logicalAnd);
329+
}
330+
331+
@Override
332+
public boolean accept(FileStatus status) {
333+
return acceptors.stream().map(acceptor -> acceptor.accept(status)).reduce(true, Boolean::logicalAnd);
334+
}
335+
};
336+
}
310337
}
311338

312339
/**
@@ -462,7 +489,8 @@ private boolean buildNextStatusBatch(S3ListResult objects) {
462489
LOG.debug("{}: {}", keyPath, stringify(s3Object));
463490
}
464491
// Skip over keys that are ourselves and old S3N _$folder$ files
465-
if (acceptor.accept(keyPath, s3Object) && filter.accept(keyPath)) {
492+
// Handle Glacier Storage Class objects based on the config fs.s3a.glacier.read.restored.objects value set
493+
if ( s3ObjectStorageClassFilter.getFilter().apply(s3Object) && acceptor.accept(keyPath, s3Object) && filter.accept(keyPath)) {
466494
S3AFileStatus status = createFileStatus(keyPath, s3Object,
467495
listingOperationCallbacks.getDefaultBlockSize(keyPath),
468496
getStoreContext().getUsername(),

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@
145145
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
146146
import org.apache.hadoop.fs.s3a.impl.StoreContext;
147147
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
148+
import org.apache.hadoop.fs.s3a.Listing.AcceptAllButSelfAndS3nDirs;
148149
import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
149150
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
150151
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
@@ -445,6 +446,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
445446
*/
446447
private boolean isCSEEnabled;
447448

449+
private S3ObjectStorageClassFilter s3ObjectStorageClassFilter;
450+
448451
/**
449452
* Bucket AccessPoint.
450453
*/
@@ -581,6 +584,12 @@ public void initialize(URI name, Configuration originalConf)
581584

582585
s3aInternals = createS3AInternals();
583586

587+
s3ObjectStorageClassFilter = Optional.ofNullable(conf.get(READ_RESTORED_GLACIER_OBJECTS))
588+
.map(String::trim)
589+
.map(String::toUpperCase)
590+
.map(S3ObjectStorageClassFilter::valueOf)
591+
.orElse(S3ObjectStorageClassFilter.READ_ALL);
592+
584593
// look for encryption data
585594
// DT Bindings may override this
586595
setEncryptionSecrets(
@@ -2477,7 +2486,7 @@ public RemoteIterator<S3ALocatedFileStatus> listFilesAndDirectoryMarkers(
24772486
true,
24782487
includeSelf
24792488
? Listing.ACCEPT_ALL_BUT_S3N
2480-
: new Listing.AcceptAllButSelfAndS3nDirs(path),
2489+
: new AcceptAllButSelfAndS3nDirs(path),
24812490
status
24822491
);
24832492
}
@@ -3861,7 +3870,8 @@ public S3AFileStatus probePathStatus(final Path path,
38613870
@Override
38623871
public RemoteIterator<S3ALocatedFileStatus> listFilesIterator(final Path path,
38633872
final boolean recursive) throws IOException {
3864-
return S3AFileSystem.this.innerListFiles(path, recursive, Listing.ACCEPT_ALL_BUT_S3N, null);
3873+
return S3AFileSystem.this.innerListFiles(path, recursive,
3874+
Listing.ACCEPT_ALL_BUT_S3N, null);
38653875
}
38663876
}
38673877

@@ -5197,7 +5207,7 @@ public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
51975207
*
51985208
* @param f path
51995209
* @param recursive recursive listing?
5200-
* @param acceptor file status filter
5210+
* @param acceptor file status filters
52015211
* @param status optional status of path to list.
52025212
* @return an iterator over the listing.
52035213
* @throws IOException failure
@@ -5224,7 +5234,7 @@ private RemoteIterator<S3ALocatedFileStatus> innerListFiles(
52245234
listing.getListFilesAssumingDir(path,
52255235
recursive,
52265236
acceptor,
5227-
getActiveAuditSpan());
5237+
getActiveAuditSpan());
52285238
// If there are no list entries present, we
52295239
// fallback to file existence check as the path
52305240
// can be a file or empty directory.
@@ -5769,6 +5779,7 @@ public StoreContext createStoreContext() {
57695779
.setContextAccessors(new ContextAccessorsImpl())
57705780
.setAuditor(getAuditor())
57715781
.setEnableCSE(isCSEEnabled)
5782+
.setS3ObjectStorageClassFilter(s3ObjectStorageClassFilter)
57725783
.build();
57735784
}
57745785

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a;
20+
21+
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
22+
import java.util.Set;
23+
import java.util.function.Function;
24+
import software.amazon.awssdk.services.s3.model.ObjectStorageClass;
25+
import software.amazon.awssdk.services.s3.model.S3Object;
26+
27+
28+
/**
29+
* S3ObjectStorageClassFilter will filter the S3 files based on the fs.s3a.glacier.read.restored.objects configuration set in S3AFileSystem
30+
* The config can have 3 values:
31+
* READ_ALL: This would conform to the current default behavior of not taking into account the storage classes retrieved from S3. This will be done to keep the current behavior for the customers and not changing the experience for them.
32+
* SKIP_ALL_GLACIER: If this value is set then this will ignore any S3 Objects which are tagged with Glacier storage classes and retrieve the others.
33+
* READ_RESTORED_GLACIER_OBJECTS: If this value is set then restored status of the Glacier object will be checked, if restored the objects would be read like normal S3 objects else they will be ignored as the objects would not have been retrieved from the S3 Glacier.
34+
*/
35+
public enum S3ObjectStorageClassFilter {
36+
READ_ALL(o -> true),
37+
SKIP_ALL_GLACIER(S3ObjectStorageClassFilter::isNotGlacierObject),
38+
READ_RESTORED_GLACIER_OBJECTS(S3ObjectStorageClassFilter::isCompletedRestoredObject);
39+
40+
private static final Set<ObjectStorageClass> GLACIER_STORAGE_CLASSES = Sets.newHashSet(ObjectStorageClass.GLACIER, ObjectStorageClass.DEEP_ARCHIVE);
41+
42+
private final Function<S3Object, Boolean> filter;
43+
44+
S3ObjectStorageClassFilter(Function<S3Object, Boolean> filter) {
45+
this.filter = filter;
46+
}
47+
48+
private static boolean isNotGlacierObject(S3Object object) {
49+
return !GLACIER_STORAGE_CLASSES.contains(object.storageClass());
50+
}
51+
52+
private static boolean isGlacierObject(S3Object object) {
53+
return GLACIER_STORAGE_CLASSES.contains(object.storageClass());
54+
}
55+
56+
private static boolean isCompletedRestoredObject(S3Object object) {
57+
if(isGlacierObject(object) ) {
58+
return object.restoreStatus() != null && !object.restoreStatus().isRestoreInProgress();
59+
}
60+
return true;
61+
}
62+
63+
public Function<S3Object, Boolean> getFilter() {
64+
return filter;
65+
}
66+
67+
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
4343
import software.amazon.awssdk.services.s3.model.MetadataDirective;
4444
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
45+
import software.amazon.awssdk.services.s3.model.OptionalObjectAttributes;
4546
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
4647
import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest;
4748
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
@@ -624,6 +625,7 @@ public ListObjectsV2Request.Builder newListObjectsV2RequestBuilder(
624625
final ListObjectsV2Request.Builder requestBuilder = ListObjectsV2Request.builder()
625626
.bucket(bucket)
626627
.maxKeys(maxKeys)
628+
.optionalObjectAttributes(OptionalObjectAttributes.RESTORE_STATUS) // Optional Attribute to get the Restored Status of the Glacier Objects
627629
.prefix(key);
628630

629631
if (delimiter != null) {

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.CompletableFuture;
2626
import java.util.concurrent.ExecutorService;
2727

28+
import org.apache.hadoop.fs.s3a.S3ObjectStorageClassFilter;
2829
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
2930

3031
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
@@ -117,6 +118,8 @@ public class StoreContext implements ActiveThreadSpanSource<AuditSpan> {
117118
/** Is client side encryption enabled? */
118119
private final boolean isCSEEnabled;
119120

121+
private final S3ObjectStorageClassFilter s3ObjectStorageClassFilter;
122+
120123
/**
121124
* Instantiate.
122125
*/
@@ -137,7 +140,8 @@ public class StoreContext implements ActiveThreadSpanSource<AuditSpan> {
137140
final boolean useListV1,
138141
final ContextAccessors contextAccessors,
139142
final AuditSpanSource<AuditSpanS3A> auditor,
140-
final boolean isCSEEnabled) {
143+
final boolean isCSEEnabled,
144+
final S3ObjectStorageClassFilter s3ObjectStorageClassFilter) {
141145
this.fsURI = fsURI;
142146
this.bucket = bucket;
143147
this.configuration = configuration;
@@ -158,6 +162,7 @@ public class StoreContext implements ActiveThreadSpanSource<AuditSpan> {
158162
this.contextAccessors = contextAccessors;
159163
this.auditor = auditor;
160164
this.isCSEEnabled = isCSEEnabled;
165+
this.s3ObjectStorageClassFilter = s3ObjectStorageClassFilter;
161166
}
162167

163168
public URI getFsURI() {
@@ -411,4 +416,8 @@ public RequestFactory getRequestFactory() {
411416
public boolean isCSEEnabled() {
412417
return isCSEEnabled;
413418
}
419+
420+
public S3ObjectStorageClassFilter s3ObjectsStorageClassFilter() {
421+
return s3ObjectStorageClassFilter;
422+
}
414423
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextBuilder.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.ExecutorService;
2323

2424
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.fs.s3a.S3ObjectStorageClassFilter;
2526
import org.apache.hadoop.fs.s3a.Invoker;
2627
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
2728
import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
@@ -69,6 +70,8 @@ public class StoreContextBuilder {
6970

7071
private boolean isCSEEnabled;
7172

73+
private S3ObjectStorageClassFilter s3ObjectStorageClassFilter;
74+
7275
public StoreContextBuilder setFsURI(final URI fsURI) {
7376
this.fsURI = fsURI;
7477
return this;
@@ -175,6 +178,12 @@ public StoreContextBuilder setEnableCSE(
175178
return this;
176179
}
177180

181+
public StoreContextBuilder setS3ObjectStorageClassFilter(
182+
S3ObjectStorageClassFilter value) {
183+
s3ObjectStorageClassFilter = value;
184+
return this;
185+
}
186+
178187
public StoreContext build() {
179188
return new StoreContext(fsURI,
180189
bucket,
@@ -192,6 +201,7 @@ public StoreContext build() {
192201
useListV1,
193202
contextAccessors,
194203
auditor,
195-
isCSEEnabled);
204+
isCSEEnabled,
205+
s3ObjectStorageClassFilter);
196206
}
197207
}

hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1261,6 +1261,20 @@ The switch to turn S3A auditing on or off.
12611261
</description>
12621262
</property>
12631263

1264+
<!--
1265+
The switch to control how S3A handles glacier storage classes.
1266+
-->
1267+
<property>
1268+
<name>fs.s3a.glacier.read.restored.objects</name>
1269+
<value>READ_ALL</value>
1270+
<description>
1271+
The config can have 3 values:
1272+
1273+
* READ_ALL: This would conform to the current default behavior of not taking into account the storage classes retrieved from S3. This will be done to keep the current behavior (i.e failing for an unrestored glacier class file) for the customers and not changing the experience for them.
1274+
* SKIP_ALL_GLACIER: If this value is set then this will ignore any S3 Objects which are tagged with Glacier storage classes and retrieve the others.
1275+
* READ_RESTORED_GLACIER_OBJECTS: If this value is set then restored status of the Glacier object will be checked, if restored the objects would be read like normal S3 objects else they will be ignored as the objects would not have been retrieved from the S3 Glacier.
1276+
</description>
1277+
</property>
12641278
```
12651279
## <a name="retry_and_recovery"></a>Retry and Recovery
12661280

0 commit comments

Comments
 (0)