-
Notifications
You must be signed in to change notification settings - Fork 566
Oracle Advanced Queuing
Since: 2.3.0
Helidon AQ messaging connector is using JMS api where following logic applies,
JMS Queue maps to an AQ single-consumer queue, and Topic maps to a multiconsumer queue
. (see docs for more details)
When enqueueing message to an AQ multi-consumer queue, keep in mind it behaves as topic when dequeued with JMS client as non-durable consumer, that means message is sent to all active subscribers. Messages wont be sent to any consumer which subscribe in the future.
A consumer that is added as a subscriber to a queue is only able to dequeue messages that are enqueued after the subscriber is added.
(see docs for more details)
Each message remains in the queue until it is consumed by all its intended consumers.
(see docs for more details)
That means, messages are treated as topics when enqueued to multi-consumer queue without recipient list, but with recipient list it behaves like a multiple small queues for each recipient.
Required depencencies:
<dependency>
<groupId>io.helidon.microprofile.messaging</groupId>
<artifactId>helidon-microprofile-messaging</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.messaging.aq</groupId>
<artifactId>helidon-messaging-aq</artifactId>
</dependency>
<!-- When using Oracle UCP for database connection -->
<dependency>
<groupId>io.helidon.integrations.cdi</groupId>
<artifactId>helidon-integrations-cdi-datasource-ucp</artifactId>
<scope>runtime</scope>
</dependency>
Helidon config:
javax.sql.DataSource:
aq-test-ds:
connectionFactoryClassName: oracle.jdbc.pool.OracleDataSource
URL: jdbc:oracle:thin:@localhost:1521:XE
user: frank
password: frank
mp.messaging:
connector:
helidon-aq:
acknowledge-mode: CLIENT_ACKNOWLEDGE
data-source: aq-test-ds
incoming:
from-multi-consumer-queue-red:
connector: helidon-aq
destination: MULTI_CONSUMER_QUEUE
type: topic # AQ multi consumer queue is mapped to JMS topic
subscriber-name: RED # AQ multi consumer queue recipient name
durable: true # needs to be durable for named consumer
from-multi-consumer-queue-blue:
connector: helidon-aq
destination: MULTI_CONSUMER_QUEUE
type: topic
subscriber-name: BLUE
durable: true
from-multi-consumer-queue-anonymous:
connector: helidon-aq
destination: MULTI_CONSUMER_QUEUE
type: topic
Consuming Java bean:
@ApplicationScoped
public class RedBlueService {
@Incoming("from-multi-consumer-queue-red")
public void red(AqMessage<String> msg) {
System.out.println("RED> " + msg.getPayload());
}
@Incoming("from-multi-consumer-queue-blue")
public void blue(AqMessage<String> msg) {
System.out.println("BLUE> " + msg.getPayload());
}
@Incoming("from-multi-consumer-queue-anonymous")
public void anonymous(AqMessage<String> msg) {
System.out.println("ANONYMOUS> " + msg.getPayload());
}
}
Prepare multi consumer queue:
DECLARE
queue_name VARCHAR2(32);
queue_tab VARCHAR2(32);
BEGIN
queue_name := 'FRANK.MULTI_CONSUMER_QUEUE';
queue_tab := queue_name || '_TAB';
DBMS_AQADM.CREATE_QUEUE_TABLE(
queue_table => queue_tab,
multiple_consumers => TRUE,
queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE');
DBMS_AQADM.CREATE_QUEUE(queue_name, queue_tab);
DBMS_AQADM.START_QUEUE(queue_name);
-- Register named subscribers in advance
-- so messages with recipient list can be queued for them
DBMS_AQADM.ADD_SUBSCRIBER(queue_name, sys.aq$_agent('YELLOW', NULL, NULL));
DBMS_AQADM.ADD_SUBSCRIBER(queue_name, sys.aq$_agent('RED', NULL, NULL));
DBMS_AQADM.ADD_SUBSCRIBER(queue_name, sys.aq$_agent('BLUE', NULL, NULL));
END;
Enqueue message without recipient list (topic to all non-durable subscribers, queued for already subscribed YELLOW, RED and BLUE):
DECLARE
enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
recipients DBMS_AQ.AQ$_RECIPIENT_LIST_T;
message_handle RAW(16);
msg SYS.AQ$_JMS_TEXT_MESSAGE;
BEGIN
msg := SYS.AQ$_JMS_TEXT_MESSAGE.construct;
msg.set_text('for all ' || CURRENT_TIMESTAMP);
DBMS_AQ.ENQUEUE(
queue_name => 'FRANK.MULTI_CONSUMER_QUEUE',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => msg,
msgid => message_handle);
COMMIT;
END;
Now start the Helidon app with our AQ consumers prepared above:
java -jar ./target/quickstart-mp.jar
...
... : Server started on http://localhost:8080 (and all other host addresses) in 1433 milliseconds (since JVM startup).
...
RED> for all 13-MAY-21 10.29.48.208405000 AM UTC
BLUE> for all 13-MAY-21 10.29.48.208405000 AM UTC
☝️ You can see there is no message for ANONYMOUS, because for it multi consumer queue behaves as topic, but RED and BLUE got the message which waited for them enqueued respectively.