-
Notifications
You must be signed in to change notification settings - Fork 13.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-20628][connectors/rabbitmq2] RabbitMQ connector using new connector API #15140
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 4d34b9b (Fri May 28 09:03:52 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the great work! I did a first pass and a few things need some improments
- Remove custom failover mechanics from sink (not your fault but we decided that Flink should handle this in the feature)
- Exception Handling: Usually IOException are handled by Flink can be safely propagated. All other exception should use a RuntimeException but not FlinkRuntimeException because it is meant to be used in internal components.
- Threading in the source reader: is not safe yet please revisit the threading model and check whether object are accessed by different threads concurrently.
- Remove any sleeps from the integration tests and use proper synchronization.
...-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/ConsistencyMode.java
Outdated
Show resolved
Hide resolved
...r-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/RabbitMQConnectionConfig.java
Outdated
Show resolved
Hide resolved
...r-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/RabbitMQConnectionConfig.java
Outdated
Show resolved
Hide resolved
...r-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/RabbitMQConnectionConfig.java
Outdated
Show resolved
Hide resolved
...r-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/RabbitMQConnectionConfig.java
Outdated
Show resolved
Hide resolved
...ctor-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSinkTest.java
Outdated
Show resolved
Hide resolved
...ctor-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSinkTest.java
Outdated
Show resolved
Hide resolved
...ctor-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSinkTest.java
Outdated
Show resolved
Hide resolved
...-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/source/RabbitMQSourceTest.java
Outdated
Show resolved
Hide resolved
...-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/source/RabbitMQSourceTest.java
Outdated
Show resolved
Hide resolved
30eecfc
to
d8420cc
Compare
39aa4d5
to
828662a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are almost done, great job! A few overarching things you still have to address:
- Revisit the visibility of classes/interfaces/enums only make public which should be used by a user (or necessarily need to be...)
- Add not null check to constructor arguments to prevent unexpected null pointer exceptions
- Use MiniCluster for testing and do not use the environment directly. This should allow you to ease a lot of the testing setup and get rid of all static magic.
...onnector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSink.java
Outdated
Show resolved
Hide resolved
...onnector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSink.java
Outdated
Show resolved
Hide resolved
...onnector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSink.java
Show resolved
Hide resolved
...2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/common/RabbitMQSinkConnection.java
Show resolved
Hide resolved
...che/flink/connector/rabbitmq2/source/reader/specialized/RabbitMQSourceReaderExactlyOnce.java
Outdated
Show resolved
Hide resolved
...or-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/common/RabbitMQBaseTest.java
Show resolved
Hide resolved
...itmq2/src/test/java/org/apache/flink/connector/rabbitmq2/common/RabbitMQContainerClient.java
Outdated
Show resolved
Hide resolved
...ctor-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSinkTest.java
Outdated
Show resolved
Hide resolved
...ctor-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSinkTest.java
Outdated
Show resolved
Hide resolved
0c76cee
to
1feef00
Compare
@flinkbot run azure |
1 similar comment
@flinkbot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CI failure is unrelated to your changes. It is caused by some infrastructure problems.
I left some minor cleanup comments for the test code but after they are addressed, IMO this PR looks good 👍
...2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/common/RabbitMQSinkConnection.java
Outdated
Show resolved
Hide resolved
.../apache/flink/connector/rabbitmq2/sink/writer/specialized/RabbitMQSinkWriterAtLeastOnce.java
Outdated
Show resolved
Hide resolved
.../apache/flink/connector/rabbitmq2/sink/writer/specialized/RabbitMQSinkWriterExactlyOnce.java
Outdated
Show resolved
Hide resolved
.../apache/flink/connector/rabbitmq2/sink/writer/specialized/RabbitMQSinkWriterExactlyOnce.java
Outdated
Show resolved
Hide resolved
...ctor-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/RabbitMQSource.java
Outdated
Show resolved
Hide resolved
...itmq2/src/test/java/org/apache/flink/connector/rabbitmq2/common/RabbitMQContainerClient.java
Outdated
Show resolved
Hide resolved
...itmq2/src/test/java/org/apache/flink/connector/rabbitmq2/common/RabbitMQContainerClient.java
Outdated
Show resolved
Hide resolved
...ctor-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSinkTest.java
Outdated
Show resolved
Hide resolved
...a/org/apache/flink/connector/rabbitmq2/sink/state/RabbitMQSinkWriterStateSerializerTest.java
Outdated
Show resolved
Hide resolved
...-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/source/RabbitMQSourceTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot one thing... Please have two separate commits for the final submission and distribute the changes of the review commit accordingly.
...ava/org/apache/flink/connector/rabbitmq2/source/split/RabbitMQSourceSplitSerializerTest.java
Show resolved
Hide resolved
...-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/source/RabbitMQSourceTest.java
Outdated
Show resolved
Hide resolved
1feef00
to
a5cf90e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my side, this PR is good to go % some nitty last feedback. We now have to wait until an official apache committer approves and possibly merges this PR.
Good job, and thanks for all your effort!
P.S. After you did the changes, can you rebase to the latest master? I think the CI failure is already fixed on the master branch and the CI will finally be green :)
...c/main/java/org/apache/flink/connector/rabbitmq2/source/reader/RabbitMQSourceReaderBase.java
Outdated
Show resolved
Hide resolved
...or-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/common/RabbitMQBaseTest.java
Show resolved
Hide resolved
...or-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSinkITCase.java
Show resolved
Hide resolved
...abbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/source/RabbitMQSourceITCase.java
Outdated
Show resolved
Hide resolved
...abbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/source/RabbitMQSourceITCase.java
Show resolved
Hide resolved
...abbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/source/RabbitMQSourceITCase.java
Outdated
Show resolved
Hide resolved
a5cf90e
to
a874800
Compare
a874800
to
4d34b9b
Compare
@flinkbot run azure |
Test failure is unrelated https://issues.apache.org/jira/browse/FLINK-21879 |
@flinkbot run azure |
1 similar comment
@flinkbot run azure |
Excuse me, @pscls Are you still working for this PR ? |
@RocMarshal Not really, we finished it 9 months ago with @fapaul's requested changes. Since then nothing happened on our side. |
@pscls Cool! Would you mind rabasing it from the latest master branch ? Thank you. |
…Source API RabbitMQ Connector using new Source API https://issues.apache.org/jira/browse/FLINK-20628 Co-authored-by: Yannik SchrÃder <schroeder_yannik@web.de> Co-authored-by: Pascal Schulze <pascal.schulze@student.hpi.uni-potsdam.de>
4d34b9b
to
0f4eb4b
Compare
@flinkbot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pscls Thanks for the update. I left some comments. Please let me know what's your opinion.
...onnector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSink.java
Outdated
Show resolved
Hide resolved
...onnector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSink.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/flink/connector/rabbitmq2/source/enumerator/RabbitMQSourceEnumerator.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/flink/connector/rabbitmq2/source/enumerator/RabbitMQSourceEnumerator.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/flink/connector/rabbitmq2/source/enumerator/RabbitMQSourceEnumerator.java
Outdated
Show resolved
Hide resolved
...tmq2/src/main/java/org/apache/flink/connector/rabbitmq2/common/RabbitMQConnectionConfig.java
Outdated
Show resolved
Hide resolved
.../flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/README.md
Outdated
Show resolved
Hide resolved
@pscls , thanks for your contribution of the new RabbitMQ connector. I left my minor comments and you should upgrade the amqp-client version for vulnerabilities. |
0f4eb4b
to
f222068
Compare
...onnector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSink.java
Outdated
Show resolved
Hide resolved
...2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/common/RabbitMQSinkConnection.java
Show resolved
Hide resolved
...2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/common/RabbitMQSinkConnection.java
Show resolved
Hide resolved
@pscls , thanks for your updates. In general the changes look good to me and I have still left certain comments. Please take a look. |
f222068
to
092da6c
Compare
LGTM. @fapaul , could you help to merge this pull request? And @RocMarshal would implement the TableSource and TableSink based on this. |
...tmq2/src/main/java/org/apache/flink/connector/rabbitmq2/common/RabbitMQConnectionConfig.java
Outdated
Show resolved
Hide resolved
… Sink API RabbitMQ Connector using new Sink API https://issues.apache.org/jira/browse/FLINK-21373 Co-authored-by: Yannik SchrÃder <schroeder_yannik@web.de> Co-authored-by: Jan Westphal <jan.westphal306@gmail.com>
092da6c
to
9ef2375
Compare
Sorry for the late reply. Thanks for the additional efforts from all sides. We are currently finalizing the discussions around the external connector repository and thought this PR would be a good candidate since it is already reviewed and has decent test coverage. @RocMarshal @SteNicholas Is it okay for you to leave the PR unmerged for now or do you need this connector urgently? |
@fapaul , @RocMarshal would like to work for the table source and sink for the RabbitMQ connector. IMO, if the PR is unmerged for now, @RocMarshal should implement the table source and sink based on this branch. |
Thank you @SteNicholas @fapaul . |
@SteNicholas @RocMarshal I'm a bit hesitant to merge this PR because we are getting close to the release cut of Flink 1.15 and new connectors are quite often a source of flakiness. The CI doesn't check all use cases (like the Java 11 tests), so merging it could result in flakiness which I would rather avoid at this point. I guess we could either postponing the merging of this PR until the release 1.15 branch has been cut (in a couple of weeks) or we could consider already moving this entire PR to its own external connector repo, as is the current plan already. Something like github.com/apache/flink-connector-rabbitmq - We could use that repo to also test out the testing infrastructure that we need for external connector repositories anyway. Let me know what you think. |
@MartijnVisser , the point mentioned above makes sense to me. Therefore, at present @RocMarshal could introduce the table source and sink based on the branch of this pull request. Thus, this pull request and the table implementation would be merged for Flink 1.16 version. |
@SteNicholas Sounds good. Keep in mind that we might not want to merge in new connectors in 1.16, depending on how quickly the external connector discussion is progressing and the building blocks will be delivered. It could actually be that we start moving out connectors in 1.16. |
@pscls Thanks for your patience! We've started this week with our first external connector repo project, which is moving out the Elasticsearch connector from this repository to https://github.com/apache/flink-connector-elasticsearch I think it would be best to first get that one moved out, so we can understand the actual issues that we might run into. When that one is done, I propose to create a dedicated repo for RabbitMQ and move this code to that repo. What do you think? |
Sounds good to me. Just ping me if there's anything to do from my side. |
@pscls There's now https://github.com/apache/flink-connector-rabbitmq - Would you like to move this PR to that repo, so we can merge it there? |
@MartijnVisser This PR is now moved into the repository new apache/flink-connector-rabbitmq and can be found here: flink-connector-rabbitmq/pull/1 |
Closing this PR since the connector has been moved to https://github.com/apache/flink-connector-rabbitmq and this PR is now available at apache/flink-connector-rabbitmq#1 - It would be great if we can finish this over there :) |
What is the purpose of the change
This pull request ports the RabbitMQ connector implementation to the new Connector’s API described in FLIP-27 and FLIP-143. It includes both source and sink with at-most-once, at-least-once, and exactly-once behavior respectively.
This pull request closes the following issues (separated RabbitMQ connector Source and Sink tickets): FLINK-20628 and FLINK-21373
Brief change log
Verifying this change
This change added tests and can be verified as follows:
All changes are within the flink-connectors/flink-connector-rabbitmq2/ module.
Added Integration Tests can be find under org.apache.flink.connector.rabbitmq2.source and org.apache.flink.connector.rabbitmq2.sink package in the test respective directories.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation