The camel-rocketmq has officially been released from the Apache Camel!
Maven: https://search.maven.org/artifact/org.apache.camel/camel-rocketmq
Please use the following dependency:
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-rocketmq</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>
Source codes: https://github.com/apache/camel/tree/main/components/camel-rocketmq
Maven:
<dependency>
<groupId>icu.wwj.camel</groupId>
<artifactId>camel-rocketmq</artifactId>
<version>3.2.0</version>
</dependency>
Versions:
Camel version | Component version | Release |
---|---|---|
3.2.0 | 3.2.0-* | 3.2.0 |
2.25.0 | 2.25.0-* | 2.25.0-0.0.1 |
Elder version's groupId is vip.wuweijie.camel
, which can be found in:
https://repo.maven.apache.org/maven2/vip/wuweijie/camel/camel-rocketmq/
from("rocketmq:from_topic?namesrvAddr=localhost:9876&consumerGroup=consumer")
.to("rocketmq:to_topic?namesrvAddr=localhost:9876&producerGroup=producer");
InOut Pattern based on Message Key. When the producer sending the message, a messageKey will be generated and append to the message's key.
After the message sent, a consumer will listen to the topic configured by the parameter ReplyToTopic
.
When a message from ReplyToTpic
contains the key, it means that the reply received and continue routing.
If requestTimeoutMillis
elapsed and no reply received, an exception will be thrown.
from("rocketmq:{{inout.rocketmq.topic.from}}?namesrvAddr={{rocketmq.namesrv.addr}}" +
"&consumerGroup={{inout.rocketmq.consumer.group}}" +
"&requestTimeoutMillis=10000")
.inOut("rocketmq:{{inout.rocketmq.topic.to}}?namesrvAddr={{rocketmq.namesrv.addr}}" +
"&producerGroup={{inout.rocketmq.producer.group}}" +
"&replyToTopic={{inout.rocketmq.reply.to.topic}}" +
"&requestTimeoutMillis={{inout.request.timeout}}" +
"&replyToConsumerGroup={{inout.rocketmq.reply.to.consumer}}"
)
.to("log:InOutRoute?showAll=true")
Notice: In InOut pattern, the message won't be routed until reply received.
Name | Type | Description | Default |
---|---|---|---|
topicName | common | (Required) consumer/producer's topic | |
namesrvAddr | common | NameServer (Separate by comma) | localhost:9876 |
accessKey | common | Rocketmq acl accessKey | |
secretKey | common | Rocketmq acl secretKey | |
consumerGroup | consumer | Consumer group name | |
subscribeTags | consumer | Subscribe tags expression | * |
producerGroup | producer | Producer group name | |
sendTag | producer | Send message's tag | |
waitForSendResult | producer | Block until message sent | false |
Name | Type | Description | Default |
---|---|---|---|
replyToTopic | producer | The topic to listen for reply | |
replyToConsumerGroup | producer | Consumer group | |
requestTimeoutMillis | producer | Wait for milliseconds before timeout | 10000 |
requestTimeoutCheckerIntervalMillis | advance | Timeout checker interval (milliseconds) | 1000 |
Constant | Value | Description |
---|---|---|
RocketMQConstants.OVERRIDE_TOPIC_NAME |
rocketmq.OVERRIDE_TOPIC_NAME |
Override the message's Topic |
RocketMQConstants.OVERRIDE_TAG |
rocketmq.OVERRIDE_TAG |
Override the message's Tag |
RocketMQConstants.OVERRIDE_MESSAGE_KEY |
rocketmq.OVERRIDE_MESSAGE_KEY |
Set the message's Key |
from("rocketmq:{{override.rocketmq.topic.from}}?namesrvAddr={{rocketmq.namesrv.addr}}&consumerGroup={{override.rocketmq.consumer.group}}")
.process(exchange -> {
exchange.getMessage().setHeader(RocketMQConstants.OVERRIDE_TOPIC_NAME, "OVERRIDE_TO");
exchange.getMessage().setHeader(RocketMQConstants.OVERRIDE_TAG, "OVERRIDE_TAG");
exchange.getMessage().setHeader(RocketMQConstants.OVERRIDE_MESSAGE_KEY, "OVERRIDE_MESSAGE_KEY");
}
)
.to("rocketmq:{{override.rocketmq.topic.to}}"
+ "?namesrvAddr={{rocketmq.namesrv.addr}}"
+ "&producerGroup={{override.rocketmq.producer.group}}"
)
.to("log:RocketRoute?showAll=true")