Skip to content

Commit

Permalink
AwsSdk2Transport implementation (#177)
Browse files Browse the repository at this point in the history
* Implement AwsSdk2Transport
Implementation of automatic request and response compression.
Integration test
Make sure bulk requests work
Properly parse 403 errors from OpenSearch service (they don't follow OS format)
Ensure that every transport error with a status code is reported as an OpenSearchError

Signed-off-by: Matt Timmermans <mtimmermans@tripadvisor.com>

* Fix license headers on new files

Signed-off-by: Matt Timmermans <mtimmermans@tripadvisor.com>

* New AWS sdk release isn't all in the cache yet

Signed-off-by: Matt Timmermans <mtimmermans@tripadvisor.com>

* Fix license headers in new file

Signed-off-by: Matt Timmermans <mtimmermans@tripadvisor.com>

Co-authored-by: Matt Timmermans <mtimmermans@tripadvisor.com>
  • Loading branch information
mtimmerm and mtimmermansTa authored Jul 13, 2022
1 parent f8d9a5e commit 2336053
Show file tree
Hide file tree
Showing 12 changed files with 1,791 additions and 0 deletions.
17 changes: 17 additions & 0 deletions java-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ java {

withJavadocJar()
withSourcesJar()

registerFeature("awsSdk2Support") {
usingSourceSet(sourceSets.get("main"))
}
}

tasks.withType<ProcessResources> {
Expand Down Expand Up @@ -119,6 +123,10 @@ val integrationTest = task<Test>("integrationTest") {
systemProperty("https", System.getProperty("https", "true"))
systemProperty("user", System.getProperty("user", "admin"))
systemProperty("password", System.getProperty("password", "admin"))
systemProperty("tests.awsSdk2support.domainHost",
System.getProperty("tests.awsSdk2support.domainHost", null))
systemProperty("tests.awsSdk2support.domainRegion",
System.getProperty("tests.awsSdk2support.domainRegion", "us-east-1"))
}

dependencies {
Expand Down Expand Up @@ -154,6 +162,15 @@ dependencies {
implementation("com.fasterxml.jackson.core", "jackson-databind", jacksonDatabindVersion)
testImplementation("com.fasterxml.jackson.datatype", "jackson-datatype-jakarta-jsonp", jacksonVersion)

// For AwsSdk2Transport
"awsSdk2SupportImplementation"("software.amazon.awssdk","sdk-core","[2.15,3.0)")
"awsSdk2SupportImplementation"("software.amazon.awssdk","auth","[2.15,3.0)")
testImplementation("software.amazon.awssdk","sdk-core","[2.15,3.0)")
testImplementation("software.amazon.awssdk","auth","[2.15,3.0)")
testImplementation("software.amazon.awssdk","aws-crt-client","[2.15,3.0)")
testImplementation("software.amazon.awssdk","apache-client","[2.15,3.0)")
testImplementation("software.amazon.awssdk","sts","[2.15,3.0)")

// EPL-2.0 OR BSD-3-Clause
// https://eclipse-ee4j.github.io/yasson/
implementation("org.eclipse", "yasson", "2.0.2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@

import org.opensearch.client.util.ObjectBuilder;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
Expand All @@ -59,6 +62,10 @@ default TransportOptions with(Consumer<Builder> fn) {
return builder.build();
}

static Builder builder() {
return new BuilderImpl();
}

interface Builder extends ObjectBuilder<TransportOptions> {

Builder addHeader(String name, String value);
Expand All @@ -67,4 +74,94 @@ interface Builder extends ObjectBuilder<TransportOptions> {

Builder onWarnings(Function<List<String>, Boolean> listener);
}

class BuilderImpl implements Builder {
protected List<Map.Entry<String, String>> headers = Collections.emptyList();
protected Map<String,String> queryParameters = Collections.emptyMap();
protected Function<List<String>, Boolean> onWarnings = null;

public BuilderImpl() {
}

public BuilderImpl(TransportOptions src) {
Collection<Map.Entry<String, String>> srcHeaders = src.headers();
if (srcHeaders != null && !srcHeaders.isEmpty()) {
headers = new ArrayList<>(srcHeaders);
}
Map<String,String> srcParams = src.queryParameters();
if (srcParams != null && !srcParams.isEmpty()) {
queryParameters = new HashMap<>(srcParams);
}
onWarnings = src.onWarnings();
}

@Override
public Builder addHeader(String name, String value) {
if (headers.isEmpty()) {
headers = new ArrayList<>();
}
headers.add(Map.entry(name, value));
return this;
}

@Override
public Builder setParameter(String name, String value) {
if (value == null) {
if (!queryParameters.isEmpty()) {
queryParameters.remove(name);
}
} else {
if (queryParameters.isEmpty()) {
queryParameters = new HashMap<>();
}
queryParameters.put(name, value);
}
return this;
}

@Override
public Builder onWarnings(Function<List<String>, Boolean> listener) {
onWarnings = listener;
return this;
}

@Override
public TransportOptions build() {
return new DefaultImpl(this);
}
}

class DefaultImpl implements TransportOptions {
private final List<Map.Entry<String, String>> headers;
private final Map<String, String> params;
private final Function<List<String>, Boolean> onWarnings;

protected DefaultImpl(BuilderImpl builder) {
this.headers = builder.headers.isEmpty() ? Collections.emptyList() : List.copyOf(builder.headers);
this.params = builder.queryParameters.isEmpty() ?
Collections.emptyMap() :
Map.copyOf(builder.queryParameters);
this.onWarnings = builder.onWarnings;
}

@Override
public Collection<Map.Entry<String, String>> headers() {
return headers;
}

@Override
public Map<String, String> queryParameters() {
return params;
}

@Override
public Function<List<String>, Boolean> onWarnings() {
return onWarnings;
}

@Override
public Builder toBuilder() {
return new BuilderImpl(this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client.transport.aws;

import org.reactivestreams.Subscriber;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;

import javax.annotation.CheckForNull;
import java.nio.ByteBuffer;
import java.util.Optional;

/**
* An implementation of AWS {@SdkHttpContentPublisher} that transfers a pre-existing
* byte array
*/
class AsyncByteArrayContentPublisher implements SdkHttpContentPublisher {
private final AsyncRequestBody delegate;

AsyncByteArrayContentPublisher(@CheckForNull byte[] data) {
if (data == null) {
delegate = AsyncRequestBody.empty();
} else {
delegate = AsyncRequestBody.fromBytes(data);
}
}

@Override
public Optional<Long> contentLength() {
return delegate.contentLength();
}

@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
delegate.subscribe(s);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client.transport.aws;

import org.reactivestreams.Publisher;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* An implementation of AWS {@link SdkAsyncHttpResponseHandler} that captures the response,
* and the content as a byte array.
*/
final class AsyncCapturingResponseHandler implements SdkAsyncHttpResponseHandler {
private final CompletableFuture<SdkHttpResponse> responseFuture;
private final AsyncCapturingSubscriber bodySubscriber = new AsyncCapturingSubscriber();
private final AtomicBoolean subscribed = new AtomicBoolean(false);

AsyncCapturingResponseHandler() {
responseFuture = new CompletableFuture<>();
}

public CompletableFuture<SdkHttpResponse> getHeaderPromise() {
return responseFuture;
}

public CompletableFuture<byte[]> getBodyPromise() {
return bodySubscriber.getPromise();
}

@Override
public void onHeaders(SdkHttpResponse response) {
responseFuture.complete(response);
}

@Override
public void onStream(Publisher<ByteBuffer> publisher) {
if (!subscribed.getAndSet(true)) {
publisher.subscribe(bodySubscriber);
}
}

@Override
public void onError(Throwable e) {
responseFuture.completeExceptionally(e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client.transport.aws;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;

/**
* A reactive subscriber that captures a byte stream into a byte array
*/
class AsyncCapturingSubscriber implements Subscriber<ByteBuffer> {
private final ByteArrayOutputStream buffer;
private final CompletableFuture<byte[]> promise;
private Subscription subscription;

AsyncCapturingSubscriber() {
buffer = new ByteArrayOutputStream();
promise = new CompletableFuture<>();
}

public CompletableFuture<byte[]> getPromise() {
return promise;
}

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}

@Override
public void onNext(ByteBuffer buf) {
try {
if (buf != null && buf.remaining() > 0) {
if (buf.hasArray()) {
buffer.write(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
} else {
byte[] data = new byte[buf.remaining()];
buf.asReadOnlyBuffer().get(data);
buffer.write(data);
}
}
this.subscription.request(1);
} catch (Throwable e) {
promise.completeExceptionally(e);
}
}

@Override
public void onError(Throwable e) {
if (e == null) {
e = new IllegalArgumentException("Subscriber.onError called with null paramter");
}
promise.completeExceptionally(e);
}

@Override
public void onComplete() {
promise.complete(buffer.toByteArray());
}
}
Loading

0 comments on commit 2336053

Please sign in to comment.