Skip to content

Commit

Permalink
Update rabbitmq (#5)
Browse files Browse the repository at this point in the history

Co-authored-by: Yannik Schröder <schroeder_yannik@web.de>
Co-authored-by: Pascal Schulze <pascal.schulze@student.hpi.uni-potsdam.de>
  • Loading branch information
3 people authored Mar 17, 2021
1 parent 828662a commit 0c76cee
Show file tree
Hide file tree
Showing 30 changed files with 638 additions and 477 deletions.
2 changes: 1 addition & 1 deletion flink-connectors/flink-connector-rabbitmq2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ must be aware that this may be subject to conditions declared in the
Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL")
and the Apache License version 2 ("ASL").

This connector allows consuming messages from and publishing to RabbitMQ. It supports the
This connector allows consuming messages from and publishing to RabbitMQ. It implements the
Source API specified in [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
and the Sink API specified in [FLIP-143](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API).

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,26 @@
# License of the Rabbit MQ Connector

Flink's RabbitMQ connector defines a Maven dependency on the
"RabbitMQ AMQP Java Client", is triple-licensed under the Mozilla Public License 1.1 ("MPL"),
the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL").

Flink itself neither reuses source code from the "RabbitMQ AMQP Java Client"
nor packages binaries from the "RabbitMQ AMQP Java Client".

Users that create and publish derivative work based on Flink's
RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client")
must be aware that this may be subject to conditions declared in the
Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL")
and the Apache License version 2 ("ASL").

# RabbitMQ Sink

Flink's RabbitMQ connector provides a sink which enables you to publish your stream directly
to a RabbitMQ exchange in three different consistency modes: at-most-once, at-least-once,
and exactly-once. Furthermore, user defined publish options can be used to customize each message
options in regard to exchange and publish settings in the RabbitMQ context.

## Consistency Behaviour
## Consistency Mode
With __at-most-once__, the sink will simply take each message and publish the serialization of it
(with publish options if given) to RabbitMQ. At this point the sink gives up the ownership of the message.

Expand All @@ -25,7 +40,7 @@ will be buffered until a checkpoint is triggered. On each checkpoint all message
published/committed as one transaction to ensure the reception acknowledge by RabbitMQ.
If successful, all messages which were committed will be given up, otherwise they will be stored
and tried to commit again in the next transaction during the next checkpoint.
This behaviour ensures that each message will be stored in RabbitMQ exactly once but also has
This consistency mode ensures that each message will be stored in RabbitMQ exactly once but also has
a performance drawback. Committing many messages will take time and will thus increase the overall
time it takes to do a checkpoint. This can result in checkpoint delays and in peaks where
checkpoint have either many or just a few messages. This also correlates to the latency of each message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,20 @@ private RabbitMQSink(
String queueName,
SerializationSchema<T> serializationSchema,
ConsistencyMode consistencyMode,
SerializableReturnListener returnListener,
@Nullable SerializableReturnListener returnListener,
@Nullable RabbitMQSinkPublishOptions<T> publishOptions) {
this.connectionConfig = connectionConfig;
this.queueName = queueName;
this.serializationSchema = serializationSchema;
this.consistencyMode = consistencyMode;
this.returnListener = returnListener;
this.publishOptions = publishOptions;
this.connectionConfig = requireNonNull(connectionConfig);
this.queueName = requireNonNull(queueName);
this.serializationSchema = requireNonNull(serializationSchema);
this.consistencyMode = requireNonNull(consistencyMode);

requireNonNull(connectionConfig);
requireNonNull(queueName);
requireNonNull(serializationSchema);
this.returnListener = returnListener;

Preconditions.checkState(
verifyPublishOptions(),
"If consistency mode is stronger than at-most-once and publish options are defined"
+ "then publish options need a deserialization schema");
this.publishOptions = publishOptions;
}

private boolean verifyPublishOptions() {
Expand All @@ -132,6 +129,13 @@ private boolean verifyPublishOptions() {
return publishOptions.getDeserializationSchema().isPresent();
}

/**
* Get a {@link RabbitMQSinkBuilder} for the sink.
*
* @param <T> type of the sink
* @return a sink builder
* @see RabbitMQSinkBuilder
*/
public static <T> RabbitMQSinkBuilder<T> builder() {
return new RabbitMQSinkBuilder<>();
}
Expand All @@ -148,40 +152,48 @@ public static <T> RabbitMQSinkBuilder<T> builder() {
public SinkWriter<T, Void, RabbitMQSinkWriterState<T>> createWriter(
InitContext context, List<RabbitMQSinkWriterState<T>> states) {
try {
switch (consistencyMode) {
case AT_MOST_ONCE:
return new RabbitMQSinkWriterAtMostOnce<>(
connectionConfig,
queueName,
serializationSchema,
publishOptions,
returnListener);
case AT_LEAST_ONCE:
return new RabbitMQSinkWriterAtLeastOnce<>(
connectionConfig,
queueName,
serializationSchema,
publishOptions,
returnListener,
states);
case EXACTLY_ONCE:
return new RabbitMQSinkWriterExactlyOnce<>(
connectionConfig,
queueName,
serializationSchema,
publishOptions,
returnListener,
states);
default:
throw new RuntimeException(
"Error in creating a SinkWriter: "
+ "No valid consistency mode was specified.");
}
RabbitMQSinkWriterBase<T> sinkWriter = createSpecializedWriter();
// Setup RabbitMQ needs to be called before recover from states as the writer might send
// messages to RabbitMQ on recover.
sinkWriter.setupRabbitMQ();
sinkWriter.recoverFromStates(states);
return sinkWriter;
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}

private RabbitMQSinkWriterBase<T> createSpecializedWriter() throws IllegalStateException {
switch (consistencyMode) {
case AT_MOST_ONCE:
return new RabbitMQSinkWriterAtMostOnce<>(
connectionConfig,
queueName,
serializationSchema,
publishOptions,
returnListener);
case AT_LEAST_ONCE:
return new RabbitMQSinkWriterAtLeastOnce<>(
connectionConfig,
queueName,
serializationSchema,
publishOptions,
returnListener);
case EXACTLY_ONCE:
return new RabbitMQSinkWriterExactlyOnce<>(
connectionConfig,
queueName,
serializationSchema,
publishOptions,
returnListener);
default:
throw new IllegalStateException(
"Error in creating a SinkWriter: No valid consistency mode ("
+ consistencyMode
+ ") was specified.");
}
}

@Override
public Optional<Committer<Void>> createCommitter() {
return Optional.empty();
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 0c76cee

Please sign in to comment.