Skip to content

Commit

Permalink
TxEventQ Kafka Examples (#1013)
Browse files Browse the repository at this point in the history
* TxEventQ Kafka Examples

---------

Signed-off-by: Anders Swanson <anders.swanson@oracle.com>
Co-authored-by: Mark Nelson <mark.x.nelson@oracle.com>
  • Loading branch information
anders-swanson and markxnelson authored Dec 18, 2024
1 parent 6a93ed9 commit 384d887
Show file tree
Hide file tree
Showing 3 changed files with 416 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,11 @@ weight = 4

This section explains advanced features of Transactional Event Queues, including transactional messaging, message propagation between queues and the database, and error handling.


* [Transactional Messaging: Combine Messaging with Database Queries](#transactional-messaging-combine-messaging-with-database-queries)
* [SQL Example](#sql-example)
* [Kafka Example](#kafka-example)
* [Transactional Produce](#transactional-produce)
* [Producer Methods](#producer-methods)
* [Transactional Produce Example](#transactional-produce-example)
* [Transactional Consume](#transactional-consume)
* [Consumer Methods](#consumer-methods)
* [Transactional Consume Example](#transactional-consume-example)
* [Message Propagation](#message-propagation)
* [Queue to Queue Message Propagation](#queue-to-queue-message-propagation)
* [Removing Subscribers and Stopping Propagation](#removing-subscribers-and-stopping-propagation)
* [Stopping Queue Propagation](#stopping-queue-propagation)
* [Using Database Links](#using-database-links)
* [Error Handling](#error-handling)

Expand Down Expand Up @@ -74,205 +66,48 @@ end;

> Note: The same pattern applies to the `dbms_aq.dequeue` procedure, allowing developers to perform DML operations within dequeue transactions.
### Kafka Example

The KafkaProducer and KafkaConsumer classes implemented by the [Kafka Java Client for Oracle Transactional Event Queues](https://github.com/oracle/okafka) provide functionality for transactional messaging, allowing developers to run database queries within a produce or consume transaction.

#### Transactional Produce

To configure a transactional producer, configure the org.oracle.okafka.clients.producer.KafkaProducer class with the `oracle.transactional.producer=true` property.

Once the producer instance is created, initialize database transactions with the `producer.initTransactions()` method.

```java
Properties props = new Properties();
// Use your database service name
props.put("oracle.service.name", "freepdb1");
// Choose PLAINTEXT or SSL as appropriate for your database connection
props.put("security.protocol", "SSL");
// Your database server
props.put("bootstrap.servers", "my-db-server");
// Path to directory containing ojdbc.properties
// If using Oracle Wallet, this directory must contain the unzipped wallet (such as in sqlnet.ora)
props.put("oracle.net.tns_admin", "/my/path/");
props.put("enable.idempotence", "true");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// Enable Transactional messaging with the producer
props.put("oracle.transactional.producer", "true");
KafkaProducer<String, String> producer = new KafkaProducer<>(
producerProps
);

// Initialize the producer for database transactions
producer.initTransactions();
```

##### Producer Methods

- To start a database transaction, use the `producer.beginTransaction()` method.
- To commit the transaction, use the `producer.commitTransaction()` method.
- To retrieve the current database connection within the transaction, use the `producer.getDBConnection()` method.
- To abort the transaction, use the `producer.abortTransaction()` method.

##### Transactional Produce Example

The following Java method takes in input record and processes it using a transactional producer. On error, the transaction is aborted and neither the DML nor topic produce are committed to the database. Assume the `processRecord` method does some DML operation with the record, like inserting or updating a table.

```java
public void produce(String record) {
// 1. Begin the current transaction
producer.beginTransaction();

try {
// 2. Create the producer record and prepare to send it to a topic
ProducerRecord<String, String> pr = new ProducerRecord<>(
topic,
Integer.toString(idx),
record
);
producer.send(pr);

// 3. Use the record in a database query
processRecord(record, conn);
} catch (Exception e) {
// 4. On error, abort the transaction
System.out.println("Error processing record", e);
producer.abortTransaction();
}

// 5. Once complete, commit the transaction
producer.commitTransaction();
System.out.println("Processed record");
}
```

#### Transactional Consume

To configure a transactional consumer, configure a org.oracle.okafka.clients.consumer.KafkaConsumer class with `auto.commit=false`. Disabling auto-commit will allow great control of database transactions through the `commitSync()` and `commitAsync()` methods.

```java
Properties props = new Properties();
// Use your database service name
props.put("oracle.service.name", "freepdb1");
// Choose PLAINTEXT or SSL as appropriate for your database connection
props.put("security.protocol", "SSL");
// Your database server
props.put("bootstrap.servers", "my-db-server");
// Path to directory containing ojdbc.properties
// If using Oracle Wallet, this directory must contain the unzipped wallet (such as in sqlnet.ora)
props.put("oracle.net.tns_admin", "/my/path/");

props.put("group.id" , "MY_CONSUMER_GROUP");
// Set auto-commit to false for direct transaction management.
props.put("enable.auto.commit","false");
props.put("max.poll.records", 2000);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
```

##### Consumer Methods

- To retrieve the current database connection within the transaction, use the `consumer.getDBConnection()` method.
- To commit the current transaction synchronously, use the `consumer.commitSync()` method.
- To commit the current transaction asynchronously, use the `consumer.commitAsync()` method.

##### Transactional Consume Example

The following Java method demonstrates how to use a KafkaConsumer for transactional messaging. Assume the `processRecord` method does some DML operation with the record, like inserting or updating a table.

```java
public void run() {
this.consumer.subscribe(List.of("topic1"));
while (true) {
try {
// 1. Poll a batch of records from the subscribed topics
ConsumerRecords<String, String> records = consumer.poll(
Duration.ofMillis(100)
);
System.out.println("Consumed records: " + records.count());
// 2. Get the current transaction's database connection
Connection conn = consumer.getDBConnection();
for (ConsumerRecord<String, String> record : records) {
// 3. Do some DML with the record and connection
processRecord(record, conn);
}

// 4. Do a blocking commit on the current batch of records. For non-blocking, use commitAsync()
consumer.commitSync();
} catch (Exception e) {
// 5. Since auto-commit is disabled, transactions are not
// committed when commitSync() is not called.
System.out.println("Unexpected error processing records. Aborting transaction!");
}
}
}
```

## Message Propagation

Messages can be propagated within the same database or across a [database link](https://docs.oracle.com/en/database/oracle/oracle-database/23/sqlrf/CREATE-DATABASE-LINK.html) to different queues or topics. Message propagation is useful for workflows that require message processing d by different consumers or for event-driven actions that need to trigger subsequent processes.

#### Queue to Queue Message Propagation

Create and start two queues. q1 will be the source queue, and q2 will be the propagated queue.
Create and start two queues. `source` will be the source queue, and `dest` will be the propagated destination queue.

```sql
begin
dbms_aqadm.create_transactional_event_queue(
queue_name => 'q1',
queue_name => 'source',
queue_payload_type => 'JSON',
multiple_consumers => true
);
dbms_aqadm.start_queue(
queue_name => 'q1'
);
dbms_aqadm.create_transactional_event_queue(
queue_name => 'q2',
queue_name => 'dest',
queue_payload_type => 'JSON',
multiple_consumers => true
);
dbms_aqadm.start_queue(
queue_name => 'q2'
queue_name => 'source'
);
end;
/
```

Add a subscriber to q2 using the [`DBMS_AQADM.ADD_SUBSCRIBER` procedure](https://docs.oracle.com/en/database/oracle/oracle-database/23/arpls/DBMS_AQADM.html#GUID-2B4498B0-7851-4520-89DD-E07FC4C5B2C7):

```sql
begin
dbms_aqadm.add_subscriber(
queue_name => 'q2',
subscriber => sys.aq$_agent(
'q2_test_subscriber',
null,
null
)
dbms_aqadm.start_queue(
queue_name => 'dest'
);
end;
/
```

Schedule message propagation so messages from q1 are propagated to q2, using the [`DBMS_AQADM.SCHEDULE_PROPAGATION` procedure](https://docs.oracle.com/en/database/oracle/oracle-database/23/arpls/DBMS_AQADM.html#GUID-E97FCD3F-D96B-4B01-A57F-23AC9A110A0D):

Schedule message propagation so messages from `source` are propagated to `dest`, using [`DBMS_AQADM.SCHEDULE_PROPAGATION` procedure](https://docs.oracle.com/en/database/oracle/oracle-database/23/arpls/DBMS_AQADM.html#GUID-E97FCD3F-D96B-4B01-A57F-23AC9A110A0D).
```sql
begin
dbms_aqadm.schedule_propagation(
queue_name => 'q1',
destination_queue => 'q2',
latency => 0, -- latency, in seconds, before propagating
start_time => sysdate, -- begin propagation immediately
duration => null -- propagate until stopped
queue_name => 'source',
destination_queue => 'dest'
);
end;
/
```

Let's enqueue a message into q1. We expect this message to be propagated to q2:
Let's enqueue a message into `source`. We expect this message to be propagated to `dest`:

```sql
declare
Expand All @@ -284,7 +119,7 @@ declare
begin
select json(body) into message;
dbms_aq.enqueue(
queue_name => 'q1',
queue_name => 'source',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
Expand All @@ -295,37 +130,26 @@ end;
/
```

#### Removing Subscribers and Stopping Propagation

You can remove subscribers and stop propagation using the DBMS_AQADM.STOP_PROPAGATION procedures:

If propagation does not occur, check the `JOB_QUEUE_PROCESSES` parameter and ensure its value is high enough. If the value is very low, you may need to update it with a larger value:
```sql
begin
dbms_aqadm.unschedule_propagation(
queue_name => 'q1',
destination_queue => 'q2'
);
end;
/
alter system set job_queue_processes=10;
```

Remove the subscriber:
#### Stopping Queue Propagation

You can stop propagation using the DBMS_AQADM.STOP_PROPAGATION procedures:

```sql
begin
dbms_aqadm.remove_subscriber(
queue_name => 'q2',
subscriber => sys.aq$_agent(
'q2_test_subscriber',
null,
null
)
dbms_aqadm.unschedule_propagation(
queue_name => 'source',
destination_queue => 'dest'
);
end;
/
```

Your can view queue subscribers and propagation schedules from the respective `DBA_QUEUE_SCHEDULES` and `DBA_QUEUE_SUBSCRIBERS` system views.
Your can view queue subscribers and propagation schedules from the respective `DBA_QUEUE_SCHEDULES` and `DBA_QUEUE_SUBSCRIBERS` system views. These views are helpful for debugging propagation issues, including error messages and schedule status.

#### Using Database Links

Expand All @@ -334,19 +158,19 @@ To propagate messages between databases, a [database link](https://docs.oracle.c
```sql
begin
dbms_aqadm.schedule_propagation(
queue_name => 'json_queue_1',
destination => '<database link>.<schema name>' -- replace with your database link and schema name,
destination_queue => 'json_queue_2'
queue_name => 'source',
destination => '<database link>.<schema name>', -- replace with your database link and schema name,
destination_queue => 'dest'
);
end;
/
```

## Error Handling

Error handling is a critical component of message processing, ensuring malformed or otherwise unprocessable messages are handled correctly. Depending on the message payload and exception, an appropriate action should be taken to either replay or store the message for inspection.
Error handling is a critical component of message processing, ensuring malformed or otherwise unprocessable messages are handled correctly. Depending on the message payload and exception, an appropriate action should be taken to either replay, discard, or otherwise process the failed message. If a message cannot be dequeued due to errors, it may be moved to the [exception queue](./message-operations.md#message-expiry-and-exception-queues), if one exists.

If a message cannot be dequeued due to errors, it may be moved to the [exception queue](./message-operations.md#message-expiry-and-exception-queues), if one exists. You can handle such errors by using PL/SQL exception handling mechanisms.
For errors on procedures like enqueue you may also use the standard SQL exception mechanisms:

```sql
declare
Expand Down
Loading

0 comments on commit 384d887

Please sign in to comment.