Skip to content

Commit

Permalink
Address code review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
reta committed Mar 3, 2023
1 parent 0d1c957 commit 4cd6423
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 3 deletions.
79 changes: 79 additions & 0 deletions docs/content/docs/connectors/datastream/opensearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,85 @@ This will buffer elements before sending them in bulk to the cluster. The `BulkP
executes bulk requests one at a time, i.e. there will be no two concurrent
flushes of the buffered actions in progress.

## Opensearch AsyncSink

The example below shows how to configure and create a AsyncSink (see please [FLIP-171](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink)):

{{< tabs "b1732edd-4218-470e-adad-b1ebb4021a1b" >}}
{{< tab "Java" >}}

```java
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.opensearch.sink.OpensearchAsyncSink;
import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.http.HttpHost;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.client.Requests;

import java.util.HashMap;
import java.util.Map;

DataStream<String> input = ...;

input.sinkTo(
OpensearchAsyncSink.<String>builder()
.setHosts(new HttpHost("localhost", 9200, "http"))
.setElementConverter((element: String, context: SinkWriter.Context) -> createIndexRequest(element))
.build());


private static IndexRequest createIndexRequest(String element) {
Map<String, Object> json = new HashMap<>();
json.put("data", element);

return Requests.indexRequest()
.index("my-index")
.id(element)
.source(json);
}
```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import org.apache.flink.api.connector.sink.SinkWriter
import org.apache.flink.connector.opensearch.sink.{OpensearchAsyncSink, RequestIndexer}
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.http.HttpHost
import org.opensearch.action.index.IndexRequest
import org.opensearch.client.Requests

val input: DataStream[String] = ...

input.sinkTo(
OpensearchAsyncSink[String]
.builder()
.setMaxBatchSize(1) // Instructs the AsyncSink to emit after every element, otherwise they would be buffered
.setHosts(new HttpHost("127.0.0.1", 9200, "http"))
.setElementConverter((element: String, context: SinkWriter.Context) => createIndexRequest(element))
.build())

def createIndexRequest(element: (String)): IndexRequest = {

val json = Map(
"data" -> element.asInstanceOf[AnyRef]
)

Requests.indexRequest.index("my-index").source(mapAsJavaMap(json))
}
```

{{< /tab >}}
{{< /tabs >}}

Note that the example only demonstrates performing a single index
request for each incoming element. Generally, the `ElementConverter`
can be used to produce the requests of different types (ex.,
`DeleteRequest`, `UpdateRequest`, etc.).

Internally, each parallel instance of the Flink Opensearch AsyncSink uses
a `RestHighLevelClient::bulkAsync` to send action requests to the cluster.

### Opensearch Sinks and Fault Tolerance

With Flink’s checkpointing enabled, the Flink Opensearch Sink guarantees
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.index.VersionType;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand All @@ -37,7 +39,6 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;

class DocSerdeRequestTest {
@ParameterizedTest
Expand Down Expand Up @@ -65,9 +66,8 @@ void serde(DocWriteRequest<?> request) throws IOException {
}

@Test
@SuppressWarnings("unchecked")
void unsupportedRequestType() throws IOException {
final DocSerdeRequest serialized = DocSerdeRequest.from(mock(DocWriteRequest.class));
final DocSerdeRequest serialized = DocSerdeRequest.from(new DummyDocWriteRequest());
try (final ByteArrayOutputStream bytes = new ByteArrayOutputStream()) {
try (final DataOutputStream out = new DataOutputStream(bytes)) {
assertThatThrownBy(() -> serialized.writeTo(out))
Expand All @@ -89,4 +89,111 @@ private static Stream<Arguments> requests() {
.id("id")
.source(Collections.singletonMap("action", "index"))));
}

private static class DummyDocWriteRequest implements DocWriteRequest<Object> {
@Override
public String[] indices() {
throw new UnsupportedOperationException();
}

@Override
public long ramBytesUsed() {
throw new UnsupportedOperationException();
}

@Override
public Object index(String index) {
throw new UnsupportedOperationException();
}

@Override
public String index() {
throw new UnsupportedOperationException();
}

@Override
public Object type(String type) {
throw new UnsupportedOperationException();
}

@Override
public String type() {
throw new UnsupportedOperationException();
}

@Override
public Object defaultTypeIfNull(String defaultType) {
throw new UnsupportedOperationException();
}

@Override
public String id() {
throw new UnsupportedOperationException();
}

@Override
public IndicesOptions indicesOptions() {
throw new UnsupportedOperationException();
}

@Override
public Object routing(String routing) {
throw new UnsupportedOperationException();
}

@Override
public String routing() {
throw new UnsupportedOperationException();
}

@Override
public long version() {
throw new UnsupportedOperationException();
}

@Override
public Object version(long version) {
throw new UnsupportedOperationException();
}

@Override
public VersionType versionType() {
throw new UnsupportedOperationException();
}

@Override
public Object versionType(VersionType versionType) {
throw new UnsupportedOperationException();
}

@Override
public Object setIfSeqNo(long seqNo) {
throw new UnsupportedOperationException();
}

@Override
public Object setIfPrimaryTerm(long term) {
throw new UnsupportedOperationException();
}

@Override
public long ifSeqNo() {
throw new UnsupportedOperationException();
}

@Override
public long ifPrimaryTerm() {
throw new UnsupportedOperationException();
}

@Override
public OpType opType() {
throw new UnsupportedOperationException();
}

@Override
public boolean isRequireAlias() {
throw new UnsupportedOperationException();
}
}
}

0 comments on commit 4cd6423

Please sign in to comment.