Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AwsSdk2Transport implementation #177

Merged
merged 5 commits into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions java-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ java {

withJavadocJar()
withSourcesJar()

registerFeature("awsSdk2Support") {
usingSourceSet(sourceSets.get("main"))
}
}

tasks.withType<ProcessResources> {
Expand Down Expand Up @@ -119,6 +123,10 @@ val integrationTest = task<Test>("integrationTest") {
systemProperty("https", System.getProperty("https", "true"))
systemProperty("user", System.getProperty("user", "admin"))
systemProperty("password", System.getProperty("password", "admin"))
systemProperty("tests.awsSdk2support.domainHost",
System.getProperty("tests.awsSdk2support.domainHost", null))
systemProperty("tests.awsSdk2support.domainRegion",
System.getProperty("tests.awsSdk2support.domainRegion", "us-east-1"))
}

dependencies {
Expand Down Expand Up @@ -154,6 +162,15 @@ dependencies {
implementation("com.fasterxml.jackson.core", "jackson-databind", jacksonDatabindVersion)
testImplementation("com.fasterxml.jackson.datatype", "jackson-datatype-jakarta-jsonp", jacksonVersion)

// For AwsSdk2Transport
"awsSdk2SupportImplementation"("software.amazon.awssdk","sdk-core","[2.15,3.0)")
"awsSdk2SupportImplementation"("software.amazon.awssdk","auth","[2.15,3.0)")
testImplementation("software.amazon.awssdk","sdk-core","[2.15,3.0)")
testImplementation("software.amazon.awssdk","auth","[2.15,3.0)")
testImplementation("software.amazon.awssdk","aws-crt-client","[2.15,3.0)")
testImplementation("software.amazon.awssdk","apache-client","[2.15,3.0)")
testImplementation("software.amazon.awssdk","sts","[2.15,3.0)")

// EPL-2.0 OR BSD-3-Clause
// https://eclipse-ee4j.github.io/yasson/
implementation("org.eclipse", "yasson", "2.0.2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@

import org.opensearch.client.util.ObjectBuilder;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
Expand All @@ -59,6 +62,10 @@ default TransportOptions with(Consumer<Builder> fn) {
return builder.build();
}

static Builder builder() {
return new BuilderImpl();
}

interface Builder extends ObjectBuilder<TransportOptions> {

Builder addHeader(String name, String value);
Expand All @@ -67,4 +74,94 @@ interface Builder extends ObjectBuilder<TransportOptions> {

Builder onWarnings(Function<List<String>, Boolean> listener);
}

class BuilderImpl implements Builder {
protected List<Map.Entry<String, String>> headers = Collections.emptyList();
protected Map<String,String> queryParameters = Collections.emptyMap();
protected Function<List<String>, Boolean> onWarnings = null;

public BuilderImpl() {
}

public BuilderImpl(TransportOptions src) {
Collection<Map.Entry<String, String>> srcHeaders = src.headers();
if (srcHeaders != null && !srcHeaders.isEmpty()) {
headers = new ArrayList<>(srcHeaders);
}
Map<String,String> srcParams = src.queryParameters();
if (srcParams != null && !srcParams.isEmpty()) {
queryParameters = new HashMap<>(srcParams);
}
onWarnings = src.onWarnings();
}

@Override
public Builder addHeader(String name, String value) {
if (headers.isEmpty()) {
headers = new ArrayList<>();
}
headers.add(Map.entry(name, value));
return this;
}

@Override
public Builder setParameter(String name, String value) {
if (value == null) {
if (!queryParameters.isEmpty()) {
queryParameters.remove(name);
}
} else {
if (queryParameters.isEmpty()) {
queryParameters = new HashMap<>();
}
queryParameters.put(name, value);
}
return this;
}

@Override
public Builder onWarnings(Function<List<String>, Boolean> listener) {
onWarnings = listener;
return this;
}

@Override
public TransportOptions build() {
return new DefaultImpl(this);
}
}

class DefaultImpl implements TransportOptions {
private final List<Map.Entry<String, String>> headers;
private final Map<String, String> params;
private final Function<List<String>, Boolean> onWarnings;

protected DefaultImpl(BuilderImpl builder) {
this.headers = builder.headers.isEmpty() ? Collections.emptyList() : List.copyOf(builder.headers);
this.params = builder.queryParameters.isEmpty() ?
Collections.emptyMap() :
Map.copyOf(builder.queryParameters);
this.onWarnings = builder.onWarnings;
}

@Override
public Collection<Map.Entry<String, String>> headers() {
return headers;
}

@Override
public Map<String, String> queryParameters() {
return params;
}

@Override
public Function<List<String>, Boolean> onWarnings() {
return onWarnings;
}

@Override
public Builder toBuilder() {
return new BuilderImpl(this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
mtimmerm marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is new code too, remove everything except SPDX.

* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.client.transport.aws;

import org.reactivestreams.Subscriber;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;

import javax.annotation.CheckForNull;
import java.nio.ByteBuffer;
import java.util.Optional;

/**
* An implementation of AWS {@SdkHttpContentPublisher} that transfers a pre-existing
* byte array
*/
class AsyncByteArrayContentPublisher implements SdkHttpContentPublisher {
private final AsyncRequestBody delegate;

AsyncByteArrayContentPublisher(@CheckForNull byte[] data) {
if (data == null) {
delegate = AsyncRequestBody.empty();
} else {
delegate = AsyncRequestBody.fromBytes(data);
}
}

@Override
public Optional<Long> contentLength() {
return delegate.contentLength();
}

@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
delegate.subscribe(s);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.client.transport.aws;

import org.reactivestreams.Publisher;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* An implementation of AWS {@link SdkAsyncHttpResponseHandler} that captures the response,
* and the content as a byte array.
*/
final class AsyncCapturingResponseHandler implements SdkAsyncHttpResponseHandler {
private final CompletableFuture<SdkHttpResponse> responseFuture;
private final AsyncCapturingSubscriber bodySubscriber = new AsyncCapturingSubscriber();
private final AtomicBoolean subscribed = new AtomicBoolean(false);

AsyncCapturingResponseHandler() {
responseFuture = new CompletableFuture<>();
}

public CompletableFuture<SdkHttpResponse> getHeaderPromise() {
return responseFuture;
}

public CompletableFuture<byte[]> getBodyPromise() {
return bodySubscriber.getPromise();
}

@Override
public void onHeaders(SdkHttpResponse response) {
responseFuture.complete(response);
}

@Override
public void onStream(Publisher<ByteBuffer> publisher) {
if (!subscribed.getAndSet(true)) {
publisher.subscribe(bodySubscriber);
}
}

@Override
public void onError(Throwable e) {
responseFuture.completeExceptionally(e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.client.transport.aws;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;

/**
* A reactive subscriber that captures a byte stream into a byte array
*/
class AsyncCapturingSubscriber implements Subscriber<ByteBuffer> {
private final ByteArrayOutputStream buffer;
private final CompletableFuture<byte[]> promise;
private Subscription subscription;

AsyncCapturingSubscriber() {
buffer = new ByteArrayOutputStream();
promise = new CompletableFuture<>();
}

public CompletableFuture<byte[]> getPromise() {
return promise;
}

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}

@Override
public void onNext(ByteBuffer buf) {
try {
if (buf != null && buf.remaining() > 0) {
if (buf.hasArray()) {
buffer.write(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
} else {
byte[] data = new byte[buf.remaining()];
buf.asReadOnlyBuffer().get(data);
buffer.write(data);
}
}
this.subscription.request(1);
} catch (Throwable e) {
promise.completeExceptionally(e);
}
}

@Override
public void onError(Throwable e) {
if (e == null) {
e = new IllegalArgumentException("Subscriber.onError called with null paramter");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: parameter

}
promise.completeExceptionally(e);
}

@Override
public void onComplete() {
promise.complete(buffer.toByteArray());
}
}
Loading