-
Notifications
You must be signed in to change notification settings - Fork 13.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[FLINK-21373][connectors/rabbitmq2] RabbitMQ Connector using FLIP-143…
… Sink API RabbitMQ Connector using new Sink API https://issues.apache.org/jira/browse/FLINK-21373 Co-authored-by: Yannik SchrÃder <schroeder_yannik@web.de> Co-authored-by: Jan Westphal <jan.westphal306@gmail.com>
- Loading branch information
1 parent
cbb2773
commit 4d34b9b
Showing
19 changed files
with
1,789 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
78 changes: 78 additions & 0 deletions
78
...tor-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/README.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
# License of the Rabbit MQ Connector | ||
|
||
Flink's RabbitMQ connector defines a Maven dependency on the | ||
"RabbitMQ AMQP Java Client", is triple-licensed under the Mozilla Public License 1.1 ("MPL"), | ||
the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL"). | ||
|
||
Flink itself neither reuses source code from the "RabbitMQ AMQP Java Client" | ||
nor packages binaries from the "RabbitMQ AMQP Java Client". | ||
|
||
Users that create and publish derivative work based on Flink's | ||
RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client") | ||
must be aware that this may be subject to conditions declared in the | ||
Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") | ||
and the Apache License version 2 ("ASL"). | ||
|
||
# RabbitMQ Sink | ||
|
||
Flink's RabbitMQ connector provides a sink which enables you to publish your stream directly | ||
to a RabbitMQ exchange in three different consistency modes: at-most-once, at-least-once, | ||
and exactly-once. Furthermore, user defined publish options can be used to customize each message | ||
options in regard to exchange and publish settings in the RabbitMQ context. | ||
|
||
## Consistency Mode | ||
With __at-most-once__, the sink will simply take each message and publish the serialization of it | ||
(with publish options if given) to RabbitMQ. At this point the sink gives up the ownership of the message. | ||
|
||
For __at-least-once__ the same process as for at-most-once is executed except that the ownership of | ||
the message does not end immediately with publishing it. The sink will keep the individual publishing | ||
id for each message as well as the message itself and buffer it as long as it takes to receive the | ||
message received acknowledgment from RabbitMQ. Since we are in the need of buffering messages and waiting | ||
for their asynchronous acknowledgment, this requires _checkpointing enabled_. On each checkpoint, | ||
all to this point buffered (and thus send) but unacknowledged messages will be stored. Simultaneously, | ||
on each checkpoint a resend will be triggered to send all unacknowledged messages once again since | ||
we have to assume that something went wrong for it during the publishing process. Since it can take a | ||
moment until messages get acknowledged from RabbitMQ this can and probably will result in a message | ||
duplication and therefore this logic becomes at-least-once. | ||
|
||
By contrast, the __exactly-once-mode__ mode will not send messages on receive. All incoming messages | ||
will be buffered until a checkpoint is triggered. On each checkpoint all messages will be | ||
published/committed as one transaction to ensure the reception acknowledge by RabbitMQ. | ||
If successful, all messages which were committed will be given up, otherwise they will be stored | ||
and tried to commit again in the next transaction during the next checkpoint. | ||
This consistency mode ensures that each message will be stored in RabbitMQ exactly once but also has | ||
a performance drawback. Committing many messages will take time and will thus increase the overall | ||
time it takes to do a checkpoint. This can result in checkpoint delays and in peaks where | ||
checkpoint have either many or just a few messages. This also correlates to the latency of each message. | ||
|
||
## How to use it | ||
```java | ||
RabbitMQSink<T> sink = | ||
RabbitMQSink.<T>builder() | ||
.setConnectionConfig(<RMQConnectionConfig>) | ||
.setQueueName(<RabbitMQ Queue Name>) | ||
.setSerializationSchema(<Serialization Schema>) | ||
.setConsistencyMode(<ConsistencyMode>) | ||
.build(); | ||
|
||
// ******************* An example usage looks like this ******************* | ||
|
||
RMQConnectionConfig rmqConnectionConfig = | ||
new RMQConnectionConfig.Builder() | ||
.setHost("localhost") | ||
.setVirtualHost("/") | ||
.setUserName("guest") | ||
.setPassword("guest") | ||
.setPort(5672) | ||
.build(); | ||
|
||
RabbitMQSink<String> rmqSink = | ||
RabbitMQSink.<String>builder() | ||
.setConnectionConfig(rmqConnectionConfig) | ||
.setQueueName("publish-queue") | ||
.setSerializationSchema(new SimpleStringSchema()) | ||
.setConsistencyMode(ConsistencyMode.AT_MOST_ONCE) | ||
.build(); | ||
|
||
(DataStream<String>).sinkTo(rmqSink) | ||
``` |
Oops, something went wrong.