Skip to content

Commit fbea6a6

Browse files
steveloughranslfan1989
authored andcommitted
HADOOP-18830. Cut S3 Select (#6144)
Cut out S3 Select * leave public/unstable constants alone * s3guard tool will fail with error * s3afs. path capability will fail * openFile() will fail with specific error * s3 select doc updated * Cut eventstream jar * New test: ITestSelectUnsupported verifies new failure handling above Contributed by Steve Loughran
1 parent 4344537 commit fbea6a6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+264
-6496
lines changed

hadoop-project/pom.xml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1121,11 +1121,6 @@
11211121
</exclusion>
11221122
</exclusions>
11231123
</dependency>
1124-
<dependency>
1125-
<groupId>software.amazon.eventstream</groupId>
1126-
<artifactId>eventstream</artifactId>
1127-
<version>${aws.eventstream.version}</version>
1128-
</dependency>
11291124
<dependency>
11301125
<groupId>org.apache.mina</groupId>
11311126
<artifactId>mina-core</artifactId>

hadoop-tools/hadoop-aws/pom.xml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -508,11 +508,6 @@
508508
<artifactId>bundle</artifactId>
509509
<scope>compile</scope>
510510
</dependency>
511-
<dependency>
512-
<groupId>software.amazon.eventstream</groupId>
513-
<artifactId>eventstream</artifactId>
514-
<scope>test</scope>
515-
</dependency>
516511
<dependency>
517512
<groupId>org.assertj</groupId>
518513
<artifactId>assertj-core</artifactId>

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 9 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,6 @@
8383
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
8484
import software.amazon.awssdk.services.s3.model.S3Error;
8585
import software.amazon.awssdk.services.s3.model.S3Object;
86-
import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest;
87-
import software.amazon.awssdk.services.s3.model.SelectObjectContentResponseHandler;
8886
import software.amazon.awssdk.services.s3.model.StorageClass;
8987
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
9088
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
@@ -194,8 +192,6 @@
194192
import org.apache.hadoop.fs.s3a.commit.PutTracker;
195193
import org.apache.hadoop.fs.s3a.commit.MagicCommitIntegration;
196194
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
197-
import org.apache.hadoop.fs.s3a.select.SelectBinding;
198-
import org.apache.hadoop.fs.s3a.select.SelectConstants;
199195
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
200196
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
201197
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
@@ -299,7 +295,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
299295

300296
private S3Client s3Client;
301297

302-
/** Async client is used for transfer manager and s3 select. */
298+
/** Async client is used for transfer manager. */
303299
private S3AsyncClient s3AsyncClient;
304300

305301
// initial callback policy is fail-once; it's there just to assist
@@ -1725,8 +1721,7 @@ public FSDataInputStream open(Path f, int bufferSize)
17251721
/**
17261722
* Opens an FSDataInputStream at the indicated Path.
17271723
* The {@code fileInformation} parameter controls how the file
1728-
* is opened, whether it is normal vs. an S3 select call,
1729-
* can a HEAD be skipped, etc.
1724+
* is opened, can a HEAD be skipped, etc.
17301725
* @param path the file to open
17311726
* @param fileInformation information about the file to open
17321727
* @throws IOException IO failure.
@@ -1853,13 +1848,6 @@ public <T> CompletableFuture<T> submit(final CallableRaisingIOE<T> operation) {
18531848
private final class WriteOperationHelperCallbacksImpl
18541849
implements WriteOperationHelper.WriteOperationHelperCallbacks {
18551850

1856-
@Override
1857-
public CompletableFuture<Void> selectObjectContent(
1858-
SelectObjectContentRequest request,
1859-
SelectObjectContentResponseHandler responseHandler) {
1860-
return getS3AsyncClient().selectObjectContent(request, responseHandler);
1861-
}
1862-
18631851
@Override
18641852
public CompleteMultipartUploadResponse completeMultipartUpload(
18651853
CompleteMultipartUploadRequest request) {
@@ -1872,7 +1860,7 @@ public CompleteMultipartUploadResponse completeMultipartUpload(
18721860
* using FS state as well as the status.
18731861
* @param fileStatus file status.
18741862
* @param auditSpan audit span.
1875-
* @return a context for read and select operations.
1863+
* @return a context for read operations.
18761864
*/
18771865
@VisibleForTesting
18781866
protected S3AReadOpContext createReadContext(
@@ -5452,13 +5440,6 @@ public boolean hasPathCapability(final Path path, final String capability)
54525440
// capability depends on FS configuration
54535441
return isMagicCommitEnabled();
54545442

5455-
case SelectConstants.S3_SELECT_CAPABILITY:
5456-
// select is only supported if enabled and client side encryption is
5457-
// disabled.
5458-
return !isCSEEnabled
5459-
&& SelectBinding.isSelectEnabled(getConf())
5460-
&& !s3ExpressStore;
5461-
54625443
case CommonPathCapabilities.FS_CHECKSUMS:
54635444
// capability depends on FS configuration
54645445
return getConf().getBoolean(ETAG_CHECKSUM_ENABLED,
@@ -5572,85 +5553,6 @@ public AWSCredentialProviderList shareCredentials(final String purpose) {
55725553
return credentials.share();
55735554
}
55745555

5575-
/**
5576-
* This is a proof of concept of a select API.
5577-
* @param source path to source data
5578-
* @param options request configuration from the builder.
5579-
* @param fileInformation any passed in information.
5580-
* @return the stream of the results
5581-
* @throws IOException IO failure
5582-
*/
5583-
@Retries.RetryTranslated
5584-
@AuditEntryPoint
5585-
private FSDataInputStream select(final Path source,
5586-
final Configuration options,
5587-
final OpenFileSupport.OpenFileInformation fileInformation)
5588-
throws IOException {
5589-
requireSelectSupport(source);
5590-
final AuditSpan auditSpan = entryPoint(OBJECT_SELECT_REQUESTS, source);
5591-
final Path path = makeQualified(source);
5592-
String expression = fileInformation.getSql();
5593-
final S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path,
5594-
fileInformation);
5595-
5596-
// readahead range can be dynamically set
5597-
S3ObjectAttributes objectAttributes = createObjectAttributes(
5598-
path, fileStatus);
5599-
ChangeDetectionPolicy changePolicy = fileInformation.getChangePolicy();
5600-
S3AReadOpContext readContext = createReadContext(
5601-
fileStatus,
5602-
auditSpan);
5603-
fileInformation.applyOptions(readContext);
5604-
5605-
if (changePolicy.getSource() != ChangeDetectionPolicy.Source.None
5606-
&& fileStatus.getEtag() != null) {
5607-
// if there is change detection, and the status includes at least an
5608-
// etag,
5609-
// check that the object metadata lines up with what is expected
5610-
// based on the object attributes (which may contain an eTag or
5611-
// versionId).
5612-
// This is because the select API doesn't offer this.
5613-
// (note: this is trouble for version checking as cannot force the old
5614-
// version in the final read; nor can we check the etag match)
5615-
ChangeTracker changeTracker =
5616-
new ChangeTracker(uri.toString(),
5617-
changePolicy,
5618-
readContext.getS3AStatisticsContext()
5619-
.newInputStreamStatistics()
5620-
.getChangeTrackerStatistics(),
5621-
objectAttributes);
5622-
5623-
// will retry internally if wrong version detected
5624-
Invoker readInvoker = readContext.getReadInvoker();
5625-
getObjectMetadata(path, changeTracker, readInvoker, "select");
5626-
}
5627-
// instantiate S3 Select support using the current span
5628-
// as the active span for operations.
5629-
SelectBinding selectBinding = new SelectBinding(
5630-
createWriteOperationHelper(auditSpan));
5631-
5632-
// build and execute the request
5633-
return selectBinding.select(
5634-
readContext,
5635-
expression,
5636-
options,
5637-
objectAttributes);
5638-
}
5639-
5640-
/**
5641-
* Verify the FS supports S3 Select.
5642-
* @param source source file.
5643-
* @throws UnsupportedOperationException if not.
5644-
*/
5645-
private void requireSelectSupport(final Path source) throws
5646-
UnsupportedOperationException {
5647-
if (!isCSEEnabled && !SelectBinding.isSelectEnabled(getConf())) {
5648-
5649-
throw new UnsupportedOperationException(
5650-
SelectConstants.SELECT_UNSUPPORTED);
5651-
}
5652-
}
5653-
56545556
/**
56555557
* Get the file status of the source file.
56565558
* If in the fileInformation parameter return that
@@ -5681,16 +5583,14 @@ private S3AFileStatus extractOrFetchSimpleFileStatus(
56815583
}
56825584

56835585
/**
5684-
* Initiate the open() or select() operation.
5586+
* Initiate the open() operation.
56855587
* This is invoked from both the FileSystem and FileContext APIs.
56865588
* It's declared as an audit entry point but the span creation is pushed
5687-
* down into the open/select methods it ultimately calls.
5589+
* down into the open operation s it ultimately calls.
56885590
* @param rawPath path to the file
56895591
* @param parameters open file parameters from the builder.
5690-
* @return a future which will evaluate to the opened/selected file.
5592+
* @return a future which will evaluate to the opened file.
56915593
* @throws IOException failure to resolve the link.
5692-
* @throws PathIOException operation is a select request but S3 select is
5693-
* disabled
56945594
* @throws IllegalArgumentException unknown mandatory key
56955595
*/
56965596
@Override
@@ -5706,20 +5606,9 @@ public CompletableFuture<FSDataInputStream> openFileWithOptions(
57065606
parameters,
57075607
getDefaultBlockSize());
57085608
CompletableFuture<FSDataInputStream> result = new CompletableFuture<>();
5709-
if (!fileInformation.isS3Select()) {
5710-
// normal path.
5711-
unboundedThreadPool.submit(() ->
5712-
LambdaUtils.eval(result,
5713-
() -> executeOpen(path, fileInformation)));
5714-
} else {
5715-
// it is a select statement.
5716-
// fail fast if the operation is not available
5717-
requireSelectSupport(path);
5718-
// submit the query
5719-
unboundedThreadPool.submit(() ->
5720-
LambdaUtils.eval(result,
5721-
() -> select(path, parameters.getOptions(), fileInformation)));
5722-
}
5609+
unboundedThreadPool.submit(() ->
5610+
LambdaUtils.eval(result,
5611+
() -> executeOpen(path, fileInformation)));
57235612
return result;
57245613
}
57255614

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
/**
2626
* This class holds attributes of an object independent of the
2727
* file status type.
28-
* It is used in {@link S3AInputStream} and the select equivalent.
28+
* It is used in {@link S3AInputStream} and elsewhere.
2929
* as a way to reduce parameters being passed
3030
* to the constructor of such class,
3131
* and elsewhere to be a source-neutral representation of a file status.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -265,10 +265,6 @@ public enum Statistic {
265265
StoreStatisticNames.OBJECT_PUT_BYTES_PENDING,
266266
"number of bytes queued for upload/being actively uploaded",
267267
TYPE_GAUGE),
268-
OBJECT_SELECT_REQUESTS(
269-
StoreStatisticNames.OBJECT_SELECT_REQUESTS,
270-
"Count of S3 Select requests issued",
271-
TYPE_COUNTER),
272268
STREAM_READ_ABORTED(
273269
StreamStatisticNames.STREAM_READ_ABORTED,
274270
"Count of times the TCP stream was aborted",

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java

Lines changed: 0 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.io.FileNotFoundException;
2323
import java.io.IOException;
2424
import java.util.List;
25-
import java.util.concurrent.CompletableFuture;
2625
import java.util.concurrent.atomic.AtomicInteger;
2726

2827
import software.amazon.awssdk.core.sync.RequestBody;
@@ -33,8 +32,6 @@
3332
import software.amazon.awssdk.services.s3.model.MultipartUpload;
3433
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
3534
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
36-
import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest;
37-
import software.amazon.awssdk.services.s3.model.SelectObjectContentResponseHandler;
3835
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
3936
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
4037
import org.slf4j.Logger;
@@ -49,16 +46,11 @@
4946
import org.apache.hadoop.fs.s3a.api.RequestFactory;
5047
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
5148
import org.apache.hadoop.fs.s3a.impl.StoreContext;
52-
import org.apache.hadoop.fs.s3a.select.SelectEventStreamPublisher;
53-
import org.apache.hadoop.fs.s3a.select.SelectObjectContentHelper;
5449
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
55-
import org.apache.hadoop.fs.s3a.select.SelectBinding;
5650
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
5751
import org.apache.hadoop.fs.store.audit.AuditSpan;
5852
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
59-
import org.apache.hadoop.util.DurationInfo;
6053
import org.apache.hadoop.util.functional.CallableRaisingIOE;
61-
import org.apache.hadoop.util.Preconditions;
6254

6355
import static org.apache.hadoop.util.Preconditions.checkNotNull;
6456
import static org.apache.hadoop.fs.s3a.Invoker.*;
@@ -82,7 +74,6 @@
8274
* <li>Other low-level access to S3 functions, for private use.</li>
8375
* <li>Failure handling, including converting exceptions to IOEs.</li>
8476
* <li>Integration with instrumentation.</li>
85-
* <li>Evolution to add more low-level operations, such as S3 select.</li>
8677
* </ul>
8778
*
8879
* This API is for internal use only.
@@ -615,63 +606,6 @@ public Configuration getConf() {
615606
return conf;
616607
}
617608

618-
public SelectObjectContentRequest.Builder newSelectRequestBuilder(Path path) {
619-
try (AuditSpan span = getAuditSpan()) {
620-
return getRequestFactory().newSelectRequestBuilder(
621-
storeContext.pathToKey(path));
622-
}
623-
}
624-
625-
/**
626-
* Execute an S3 Select operation.
627-
* On a failure, the request is only logged at debug to avoid the
628-
* select exception being printed.
629-
*
630-
* @param source source for selection
631-
* @param request Select request to issue.
632-
* @param action the action for use in exception creation
633-
* @return response
634-
* @throws IOException failure
635-
*/
636-
@Retries.RetryTranslated
637-
public SelectEventStreamPublisher select(
638-
final Path source,
639-
final SelectObjectContentRequest request,
640-
final String action)
641-
throws IOException {
642-
// no setting of span here as the select binding is (statically) created
643-
// without any span.
644-
String bucketName = request.bucket();
645-
Preconditions.checkArgument(bucket.equals(bucketName),
646-
"wrong bucket: %s", bucketName);
647-
if (LOG.isDebugEnabled()) {
648-
LOG.debug("Initiating select call {} {}",
649-
source, request.expression());
650-
LOG.debug(SelectBinding.toString(request));
651-
}
652-
return invoker.retry(
653-
action,
654-
source.toString(),
655-
true,
656-
withinAuditSpan(getAuditSpan(), () -> {
657-
try (DurationInfo ignored =
658-
new DurationInfo(LOG, "S3 Select operation")) {
659-
try {
660-
return SelectObjectContentHelper.select(
661-
writeOperationHelperCallbacks, source, request, action);
662-
} catch (Throwable e) {
663-
LOG.error("Failure of S3 Select request against {}",
664-
source);
665-
LOG.debug("S3 Select request against {}:\n{}",
666-
source,
667-
SelectBinding.toString(request),
668-
e);
669-
throw e;
670-
}
671-
}
672-
}));
673-
}
674-
675609
@Override
676610
public AuditSpan createSpan(final String operation,
677611
@Nullable final String path1,
@@ -705,15 +639,6 @@ public RequestFactory getRequestFactory() {
705639
*/
706640
public interface WriteOperationHelperCallbacks {
707641

708-
/**
709-
* Initiates a select request.
710-
* @param request selectObjectContent request
711-
* @param t selectObjectContent request handler
712-
* @return selectObjectContentResult
713-
*/
714-
CompletableFuture<Void> selectObjectContent(SelectObjectContentRequest request,
715-
SelectObjectContentResponseHandler t);
716-
717642
/**
718643
* Initiates a complete multi-part upload request.
719644
* @param request Complete multi-part upload request

0 commit comments

Comments
 (0)