Skip to content

Commit

Permalink
[FLINK-21373][connectors/rabbitmq2] RabbitMQ Connector using FLIP-143…
Browse files Browse the repository at this point in the history
… Sink API

RabbitMQ Connector using new Sink API
https://issues.apache.org/jira/browse/FLINK-21373

Co-authored-by: Yannik SchrÃder <schroeder_yannik@web.de>
Co-authored-by: Jan Westphal <jan.westphal306@gmail.com>
  • Loading branch information
3 people committed Mar 18, 2021
1 parent c8e408f commit a5cf90e
Show file tree
Hide file tree
Showing 19 changed files with 1,791 additions and 13 deletions.
18 changes: 18 additions & 0 deletions flink-connectors/flink-connector-rabbitmq2/pom.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# License of the Rabbit MQ Connector

Flink's RabbitMQ connector defines a Maven dependency on the
"RabbitMQ AMQP Java Client", is triple-licensed under the Mozilla Public License 1.1 ("MPL"),
the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL").

Flink itself neither reuses source code from the "RabbitMQ AMQP Java Client"
nor packages binaries from the "RabbitMQ AMQP Java Client".

Users that create and publish derivative work based on Flink's
RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client")
must be aware that this may be subject to conditions declared in the
Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL")
and the Apache License version 2 ("ASL").

# RabbitMQ Sink

Flink's RabbitMQ connector provides a sink which enables you to publish your stream directly
to a RabbitMQ exchange in three different consistency modes: at-most-once, at-least-once,
and exactly-once. Furthermore, user defined publish options can be used to customize each message
options in regard to exchange and publish settings in the RabbitMQ context.

## Consistency Mode
With __at-most-once__, the sink will simply take each message and publish the serialization of it
(with publish options if given) to RabbitMQ. At this point the sink gives up the ownership of the message.

For __at-least-once__ the same process as for at-most-once is executed except that the ownership of
the message does not end immediately with publishing it. The sink will keep the individual publishing
id for each message as well as the message itself and buffer it as long as it takes to receive the
message received acknowledgment from RabbitMQ. Since we are in the need of buffering messages and waiting
for their asynchronous acknowledgment, this requires _checkpointing enabled_. On each checkpoint,
all to this point buffered (and thus send) but unacknowledged messages will be stored. Simultaneously,
on each checkpoint a resend will be triggered to send all unacknowledged messages once again since
we have to assume that something went wrong for it during the publishing process. Since it can take a
moment until messages get acknowledged from RabbitMQ this can and probably will result in a message
duplication and therefore this logic becomes at-least-once.

By contrast, the __exactly-once-mode__ mode will not send messages on receive. All incoming messages
will be buffered until a checkpoint is triggered. On each checkpoint all messages will be
published/committed as one transaction to ensure the reception acknowledge by RabbitMQ.
If successful, all messages which were committed will be given up, otherwise they will be stored
and tried to commit again in the next transaction during the next checkpoint.
This consistency mode ensures that each message will be stored in RabbitMQ exactly once but also has
a performance drawback. Committing many messages will take time and will thus increase the overall
time it takes to do a checkpoint. This can result in checkpoint delays and in peaks where
checkpoint have either many or just a few messages. This also correlates to the latency of each message.

## How to use it
```java
RabbitMQSink<T> sink =
RabbitMQSink.<T>builder()
.setConnectionConfig(<RMQConnectionConfig>)
.setQueueName(<RabbitMQ Queue Name>)
.setSerializationSchema(<Serialization Schema>)
.setConsistencyMode(<ConsistencyMode>)
.build();

// ******************* An example usage looks like this *******************

RMQConnectionConfig rmqConnectionConfig =
new RMQConnectionConfig.Builder()
.setHost("localhost")
.setVirtualHost("/")
.setUserName("guest")
.setPassword("guest")
.setPort(5672)
.build();

RabbitMQSink<String> rmqSink =
RabbitMQSink.<String>builder()
.setConnectionConfig(rmqConnectionConfig)
.setQueueName("publish-queue")
.setSerializationSchema(new SimpleStringSchema())
.setConsistencyMode(ConsistencyMode.AT_MOST_ONCE)
.build();

(DataStream<String>).sinkTo(rmqSink)
```
Loading

0 comments on commit a5cf90e

Please sign in to comment.