1+ /*
2+ * Licensed to the Apache Software Foundation (ASF) under one
3+ * or more contributor license agreements. See the NOTICE file
4+ * distributed with this work for additional information
5+ * regarding copyright ownership. The ASF licenses this file
6+ * to you under the Apache License, Version 2.0 (the
7+ * "License"); you may not use this file except in compliance
8+ * with the License. You may obtain a copy of the License at
9+ *
10+ * http://www.apache.org/licenses/LICENSE-2.0
11+ *
12+ * Unless required by applicable law or agreed to in writing, software
13+ * distributed under the License is distributed on an "AS IS" BASIS,
14+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+ * See the License for the specific language governing permissions and
16+ * limitations under the License.
17+ */
18+
119package org .apache .hadoop .fs .s3a .select ;
220
321import java .io .ByteArrayInputStream ;
422import java .io .InputStream ;
523import java .io .SequenceInputStream ;
6- import java .util .Enumeration ;
7- import java .util .Iterator ;
8- import java .util .NoSuchElementException ;
9- import java .util .concurrent .ArrayBlockingQueue ;
10- import java .util .concurrent .BlockingQueue ;
1124import java .util .concurrent .CompletableFuture ;
1225import java .util .function .Consumer ;
1326
1427import org .reactivestreams .Subscriber ;
15- import org .reactivestreams .Subscription ;
1628
1729import software .amazon .awssdk .core .async .SdkPublisher ;
1830import software .amazon .awssdk .http .AbortableInputStream ;
2335import software .amazon .awssdk .utils .ToString ;
2436import software .amazon .awssdk .utils .Validate ;
2537
38+ /**
39+ * Async publisher of {@link SelectObjectContentEventStream}s returned
40+ * from a SelectObjectContent call.
41+ */
2642public final class SelectEventStreamPublisher implements
2743 SdkPublisher <SelectObjectContentEventStream > {
2844
29- private final CompletableFuture <Void > future ;
45+ private final CompletableFuture <Void > selectOperationFuture ;
3046 private final SelectObjectContentResponse response ;
3147 private final SdkPublisher <SelectObjectContentEventStream > publisher ;
3248
49+ /**
50+ * Create the publisher.
51+ * @param selectOperationFuture SelectObjectContent future
52+ * @param response SelectObjectContent response
53+ * @param publisher SelectObjectContentEventStream publisher to wrap
54+ */
3355 public SelectEventStreamPublisher (
34- CompletableFuture <Void > future ,
56+ CompletableFuture <Void > selectOperationFuture ,
3557 SelectObjectContentResponse response ,
3658 SdkPublisher <SelectObjectContentEventStream > publisher ) {
37- this .future = Validate . paramNotNull ( future , "future" ) ;
38- this .response = Validate . paramNotNull ( response , "response" ) ;
39- this .publisher = Validate . paramNotNull ( publisher , "publisher" ) ;
59+ this .selectOperationFuture = selectOperationFuture ;
60+ this .response = response ;
61+ this .publisher = publisher ;
4062 }
4163
42- public void cancel () {
43- future .cancel (true );
64+ /**
65+ * Retrieve an input stream to the subset of the S3 object that matched the select query.
66+ * This is equivalent to loading the content of all RecordsEvents into an InputStream.
67+ * This will lazily-load the content from S3, minimizing the amount of memory used.
68+ * @param onEndEvent callback on the end event
69+ * @return the input stream
70+ */
71+ public AbortableInputStream toRecordsInputStream (Consumer <EndEvent > onEndEvent ) {
72+ SdkPublisher <InputStream > recordInputStreams = this .publisher
73+ .filter (e -> {
74+ if (e instanceof RecordsEvent ) {
75+ return true ;
76+ } else if (e instanceof EndEvent ) {
77+ onEndEvent .accept ((EndEvent ) e );
78+ }
79+ return false ;
80+ })
81+ .map (e -> ((RecordsEvent ) e ).payload ().asInputStream ());
82+
83+ // Subscribe to the async publisher using an enumeration that will
84+ // buffer a single chunk (RecordsEvent's payload) at a time and
85+ // block until it is consumed.
86+ // Also inject an empty stream as the first element that
87+ // SequenceInputStream will request on construction.
88+ BlockingEnumeration enumeration =
89+ new BlockingEnumeration (recordInputStreams , 1 , EMPTY_STREAM );
90+ return AbortableInputStream .create (
91+ new SequenceInputStream (enumeration ),
92+ this ::cancel );
4493 }
4594
95+ /**
96+ * The response from the SelectObjectContent call.
97+ * @return the response object
98+ */
4699 public SelectObjectContentResponse response () {
47100 return response ;
48101 }
@@ -52,6 +105,13 @@ public void subscribe(Subscriber<? super SelectObjectContentEventStream> subscri
52105 publisher .subscribe (subscriber );
53106 }
54107
108+ /**
109+ * Cancel the operation.
110+ */
111+ public void cancel () {
112+ selectOperationFuture .cancel (true );
113+ }
114+
55115 @ Override
56116 public String toString () {
57117 return ToString .builder ("SelectObjectContentEventStream" )
@@ -60,25 +120,6 @@ public String toString() {
60120 .build ();
61121 }
62122
63- public AbortableInputStream toRecordsInputStream (Consumer <EndEvent > onEndEvent ) {
64- SdkPublisher <InputStream > recordInputStreams = this .publisher
65- .filter (e -> {
66- if (e instanceof RecordsEvent ) {
67- return true ;
68- } else if (e instanceof EndEvent ) {
69- onEndEvent .accept ((EndEvent ) e );
70- }
71- return false ;
72- })
73- .map (e -> ((RecordsEvent ) e ).payload ().asInputStream ());
74-
75- return AbortableInputStream .create (
76- new SequenceInputStream (
77- new BlockingEnumeration (recordInputStreams , 1 ,
78- EMPTY_STREAM )),
79- this ::cancel );
80- }
81-
82123 private static final InputStream EMPTY_STREAM =
83124 new ByteArrayInputStream (new byte [0 ]);
84125}
0 commit comments