-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-30488] OpenSearch implementation of Async Sink #5
base: main
Are you sure you want to change the base?
Conversation
2aacffb
to
46b018e
Compare
env.setRestartStrategy(RestartStrategies.noRestart()); | ||
} | ||
DataStream<Long> stream = env.fromSequence(1, 5); | ||
try (final StreamExecutionEnvironment env = new LocalStreamEnvironment()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related but StreamExecutionEnvironment
is AutoCloseable
, slightly changing the test case
@zentol @MartijnVisser would appreciate if you guys have time for the review, adding AsyncSink support for OpenSearch, discussed initially [1] |
@zentol @MartijnVisser doing my one per month ping diligence :-), please |
I'm currently a bit over capacity. Don't know if the same applies for @zentol tbh @dannycranmer Could you potentially help out? You also have the experience with the Async API, or perhaps @hlteoh37 ? |
Sure, I can take a look |
List<HttpHost> httpHosts = new ArrayList<>(); | ||
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure what this is used for, shall we remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's just an example of using AsyncSync, we used to have them https://github.com/apache/flink-connector-opensearch/tree/main/flink-connector-opensearch-e2e-tests/src/main/java/org/apache/flink/streaming/tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some comments!
|
||
OpensearchAsyncSinkBuilder<Tuple2<String, String>> osSinkBuilder = | ||
OpensearchAsyncSink.<Tuple2<String, String>>builder() | ||
.setHosts(new HttpHost("localhost:9200")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, should we instead define a constant something like OPENSEARCH_DOMAIN
so users can use the example more easily?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see, there are 2 places where same host is used, should be 1, I will fix that, thank you
...opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSink.java
Outdated
Show resolved
Hide resolved
...opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSink.java
Outdated
Show resolved
Hide resolved
int maxBufferedRequests, | ||
long maxBatchSizeInBytes, | ||
long maxTimeInBufferMS, | ||
long maxRecordSizeInBytes, |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is specific to AWS OpenSearch managed service, this is not applicable to OpenSearch in general.
*/ | ||
public OpensearchAsyncSinkBuilder<InputT> setHosts(HttpHost... hosts) { | ||
checkNotNull(hosts); | ||
checkState(hosts.length > 0, "Hosts cannot be empty."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would checkArgument
be a better method to call here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, also we check this twice, once in builder, and once in constructor. Would it be better to just validate this in the constructor ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thing checking twice is acceptable here: we should fail as early as possible, allowing to construct a builder with possibly illegal arguments and carrying it over could potentially raise an exception down the stack, when build
method is called. By validating early, we are preventing that.
} | ||
|
||
private void handleFullyFailedBulkRequest( | ||
Throwable err, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider logging this error? Otherwise the sink can get stuck in a retry loop without any logs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any exceptions we want to classify as non-retryable and fail the Flink job? For example "domain doesn't exist" or "insufficient permissions"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Certainly makes sense!
final BulkItemResponse[] items = response.getItems(); | ||
|
||
for (int i = 0; i < items.length; i++) { | ||
if (items[i].getFailure() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider logging this error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, there could be massive amount of items in bulk request, logging 10k failures (failure is reported per item), would probably flood the logs
if (networkClientConfig.getConnectionRequestTimeout() != null | ||
|| networkClientConfig.getConnectionTimeout() != null | ||
|| networkClientConfig.getSocketTimeout() != null) { | ||
builder.setRequestConfigCallback( | ||
requestConfigBuilder -> { | ||
if (networkClientConfig.getConnectionRequestTimeout() != null) { | ||
requestConfigBuilder.setConnectionRequestTimeout( | ||
networkClientConfig.getConnectionRequestTimeout()); | ||
} | ||
if (networkClientConfig.getConnectionTimeout() != null) { | ||
requestConfigBuilder.setConnectTimeout( | ||
networkClientConfig.getConnectionTimeout()); | ||
} | ||
if (networkClientConfig.getSocketTimeout() != null) { | ||
requestConfigBuilder.setSocketTimeout( | ||
networkClientConfig.getSocketTimeout()); | ||
} | ||
return requestConfigBuilder; | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Seems unnecessary to do 2 null checks. Should we instead just remove the outer if
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The presence of first if
helps to eliminate the need to create the RequestConfigCallback
instance at all if there is nothing to configure.
private static DocWriteRequest<?> readDocumentRequest(StreamInput in) throws IOException { | ||
byte type = in.readByte(); | ||
DocWriteRequest<?> docWriteRequest; | ||
if (type == 0) { | ||
docWriteRequest = new IndexRequest(in); | ||
} else if (type == 1) { | ||
docWriteRequest = new DeleteRequest(in); | ||
} else if (type == 2) { | ||
docWriteRequest = new UpdateRequest(in); | ||
} else { | ||
throw new IllegalStateException("Invalid request type [" + type + " ]"); | ||
} | ||
return docWriteRequest; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These methods are untested. Should we add unit tests for them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are tested in scope if integration test, OpensearchAsyncSinkITCase, both reading and writing side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for a unit test. Unless there is a good reason not to, unit tests give quicker feedback.
new IndexRequest("my-index") | ||
.id(element.f0.toString()) | ||
.source(element.f1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. Since we have to implement a DocSerdeRequest
, should we consider exposing this in the interface instead of OpenSearch
classes? This might be helpful in the event OpenSearch's interface changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DocSerdeRequest
is sadly a necessary leaking abstraction (AsyncSink requires Serializable
), we should export in the places when it is inevitable but in general we should only operate over OpenSearch APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a shame indeed, because Async Sink does not actually need Serializable. https://issues.apache.org/jira/browse/FLINK-27537
Thanks a lot for the review @hlteoh37 , I believe I addressed or/and answered all your comments, please let me know if I missed something |
new IndexRequest("my-index") | ||
.id(element.f0.toString()) | ||
.source(element.f1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a shame indeed, because Async Sink does not actually need Serializable. https://issues.apache.org/jira/browse/FLINK-27537
* @param <T> type of the write request | ||
*/ | ||
@PublicEvolving | ||
public class DocSerdeRequest<T> implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ii think the class level generics are redundant here. We are using <?>
throughout. Consider changing private final DocWriteRequest<T> request;
to private final DocWriteRequest<?> request;
and removing class generics. This makes the Sink interface a bit messy extends AsyncSinkBase<InputT, DocSerdeRequest<?>>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is my bad, the Removed T
must be constrained, I will fix itT
, not necessary indeed
private static DocWriteRequest<?> readDocumentRequest(StreamInput in) throws IOException { | ||
byte type = in.readByte(); | ||
DocWriteRequest<?> docWriteRequest; | ||
if (type == 0) { | ||
docWriteRequest = new IndexRequest(in); | ||
} else if (type == 1) { | ||
docWriteRequest = new DeleteRequest(in); | ||
} else if (type == 2) { | ||
docWriteRequest = new UpdateRequest(in); | ||
} else { | ||
throw new IllegalStateException("Invalid request type [" + type + " ]"); | ||
} | ||
return docWriteRequest; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for a unit test. Unless there is a good reason not to, unit tests give quicker feedback.
1000), /* OpensearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION */ | ||
nonNullOrDefault( | ||
getMaxInFlightRequests(), 1), /* BulkProcessor::concurrentRequests */ | ||
nonNullOrDefault(getMaxBufferedRequests(), 10000), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also promote the other magic numbers to constants? 10000
and 2 * 1024 * 1024
if (networkClientConfig.getPassword() != null | ||
&& networkClientConfig.getUsername() != null) { | ||
final CredentialsProvider credentialsProvider = | ||
new BasicCredentialsProvider(); | ||
credentialsProvider.setCredentials( | ||
AuthScope.ANY, | ||
new UsernamePasswordCredentials( | ||
networkClientConfig.getUsername(), | ||
networkClientConfig.getPassword())); | ||
|
||
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); | ||
} | ||
|
||
if (networkClientConfig.isAllowInsecure().orElse(false)) { | ||
try { | ||
httpClientBuilder.setSSLContext( | ||
SSLContexts.custom() | ||
.loadTrustMaterial(new TrustAllStrategy()) | ||
.build()); | ||
} catch (final NoSuchAlgorithmException | ||
| KeyStoreException | ||
| KeyManagementException ex) { | ||
throw new IllegalStateException( | ||
"Unable to create custom SSL context", ex); | ||
} | ||
} | ||
|
||
return httpClientBuilder; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should we move this out to a separate class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why? that's the only place is needed actually, seems like sealing it in place is acceptable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my opinion it breaches the single responsibility philosophy. The writer is responsible for writing and knowing how to construct the client. I am less concerned how many times it is used. However, I marked as nit since I am not marking this as a blocker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dannycranmer , I think this is a good idea (more over, I was not correct, there was another place with the similar instantiation logic present), extracted the utility class
Thanks @dannycranmer , I think I went through all your comments, thanks a lot, really appreciate it. |
ab7f2eb
to
455048c
Compare
@Test | ||
@SuppressWarnings("unchecked") | ||
void unsupportedRequestType() throws IOException { | ||
final DocSerdeRequest serialized = DocSerdeRequest.from(mock(DocWriteRequest.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mockito is banned. Since you only have one usage here can we remove it?
@reta The PR looks good to me minus the Mockito comment. However I have questions over the approach here. We are adding a new sink alongside the existing sink, we will have If this has already been discussed on mailing lists I missed that, please give me a link :D |
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
…more) Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
Thanks a lot for review @dannycranmer
This is a indeed a good question, I think the main difference between those are within internal APIs the implementation is based upon:
I have covered this part in the docs, thank you.
Updated the documentation, thank you
You mean the |
@dannycranmer would appreciate if you could take a look, thank you |
@reta I am reluctant to introduce a new Sink API based on the internal implementation unless there is a really good/semantic reason. I would prefer to encapsulate the internals via a single Flink layer that can support either We should keep the Sink API as simple as possible with sensible defaults, and allow advanced users to configure as they wish. For instance, a user should not need decide to use There could be reasons to have 2x Sinks if they support fundamentally different features/APIs but I would expect the naming to reflect this, for example Apologies for raising these concerns late in the process but I cannot see this has been considered before. @MartijnVisser what are your thoughts? |
Thanks @dannycranmer , I understand your concerns. I will move this pull request to draft (for now) so we could get to it at some point in the future, when migrating off the |
OpenSearch implementation of Async Sink (https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), a few TODO items: