Skip to content

Adding RSI example #225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions java/rsi-example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Design High-Speed Data Ingestion Services Using MQTT, AMQP, and STOMP

This directory contains the code samples for Reactive Streams Ingestion (RSI) integrated with ActiveMQ.
To view the details, visit [Design High-Speed Data Ingestion Services Using MQTT, AMQP, and STOMP](https://blogs.oracle.com/...).

## Prerequisites
- Oracle Database 19c
- JDK 19
- ActiveMQ 5.17.2
- RSI 21.7.0.0
- Maven 3.8.1

## Create a table
The `retailer` table is created with the following statement

```sql
CREATE TABLE retailer (
rank int,
msr int,
retailer varchar(255),
name varchar(255),
city varchar(255),
phone varchar(255),
terminal_type varchar(255),
weeks_active int,
instant_sales_amt varchar(255),
online_sales_amt varchar(255),
total_sales_amt varchar(255)
);
```

### Start ActiveMQ
Download ActiveMQ from the [Apache ActiveMQ website](https://activemq.apache.org/components/classic/download/).
Go to the ActiveMQ directory and run the following command to start up ActiveMQ:

```shell
$ cd apache-activemq-5.17.2
$ ./bin/activemq start
INFO: Loading '/Users/tinglwang/Downloads/apache-activemq-5.17.2//bin/env'
INFO: Using java '/Library/Java/JavaVirtualMachines/jdk-11.0.5.jdk/Contents/Home/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/Users/tinglwang/Downloads/apache-activemq-5.17.2//data/activemq.pid' (pid '61766')
```

### Configure Listener.java
- To connect to your own database, configure the `URL`, `username` and `password` parameters.

### Running the sample application
Run the following command to start the listener. Note: change the target class to rsi.demo.mqtt.Listener and rsi.demo.stomp.Listener if you want to use the MQTT or STOMP protocol.
The "--enable-preview" argument is required since Virtual Thread is a preview feature in JDK 19.
```shell
$ mvn package
$ java --enable-preview -cp ./target/rsi-demo-0.1.0.jar rsi.demo.amqp.Listener
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Sep 30, 2022 4:26:00 PM oracle.rsi.logging.ClioSupport _log
INFO: :::Database type is non-sharded.
```

### Sending messages to ActiveMQ
The default path is http://localhost:8161/api/message/event?type=topic and the port number is 8161. You also need to configure the credentials with the default ActiveMQ username and password "admin:admin".
The easiest way is to send the request using cURL command. Post the data that we want to stream to the database as follows.

```shell
$ curl -i -H Accept:application/json -XPOST http://localhost:8161/api/message/event?type=topic -u admin:admin -H Content-Type:application/json -d '{\
"rank": 1,\
"msr": 217,\
"retailer": "100224",\
"name": "Freddys One Stop",\
"city": "Roland",\
"phone": "(918) 503-6288",\
"terminal_type": "Extrema",\
"weeks_active": 37,\
"instant_sales_amt": "$318,600.00 ",\
"online_sales_amt": "$509,803.00 ",\
"total_sales_amt": "$828,403.00 "}'
HTTP/1.1 200 OK
Date: Fri, 30 Sep 2022 20:34:28 GMT
X-FRAME-OPTIONS: SAMEORIGIN
X-XSS-Protection: 1; mode=block
X-Content-Type-Options: nosniff
Set-Cookie: JSESSIONID=node01f5up6hqo6g6ljv8l3a2cpzc5.node0; Path=/api; HttpOnly
Expires: Thu, 01 Jan 1970 00:00:00 GMT
messageID: ID:tinglwan-mac-49875-1664560122069-5:5:1:1:1
Content-Length: 12
```

### Close the listener and cleanup
To close the listener, simply send a "SHUTDOWN" message to ActiveMQ and it will do the job. According to ActiveMQ's documentation, adding the "body" parameter is critical otherwise the web servlet will not read the body from the -d parameter, and this will cause an error.

```shell
curl -XPOST -u admin:admin http://localhost:8161/api/message/event?type=topic -d "body=SHUTDOWN"
```

Alternatively, you can run the test with Apache JMeter. Go to [Design High-Speed Data Ingestion Services Using MQTT, AMQP, and STOMP](https://blogs.oracle.com/...) for more details.

Once you've completed the test, stop ActiveMQ by running the following command:

```shell
$ ./bin/activemq stop
```
90 changes: 90 additions & 0 deletions java/rsi-example/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.oracle.database.jdbc</groupId>
<artifactId>rsi-example</artifactId>
<version>0.1.0</version>
<packaging>jar</packaging>

<name>RSI Example</name>
<description>Design High-Speed Data Ingestion Services Using MQTT, AMQP, and STOMP</description>

<properties>
<maven.compiler.target>19</maven.compiler.target>
<maven.compiler.source>19</maven.compiler.source>
</properties>

<dependencies>
<!-- RSI -->
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>rsi</artifactId>
<version>21.7.0.0</version>
</dependency>
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc11</artifactId>
<version>21.7.0.0</version>
</dependency>
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ucp11</artifactId>
<version>21.7.0.0</version>
</dependency>

<!-- AMQP, MQTT, STOMP -->
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.16</version>
</dependency>
<dependency>
<groupId>org.fusesource.stompjms</groupId>
<artifactId>stompjms-client</artifactId>
<version>1.19</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.10.1</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<compilerArgs>--enable-preview</compilerArgs>
</configuration>
</plugin>

<!-- include all the dependencies into the jar so it's easier to execute the example -->
<plugin>
<groupId>org.fusesource.mvnplugins</groupId>
<artifactId>maven-uberize-plugin</artifactId>
<version>1.14</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>uberize</goal>
</goals>
</execution>
</executions>
</plugin>

</plugins>
</build>

</project>
125 changes: 125 additions & 0 deletions java/rsi-example/src/main/java/rsi/example/amqp/Listener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
Copyright (c) 2021, 2022, Oracle and/or its affiliates.
This software is dual-licensed to you under the Universal Permissive License
(UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License
2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
either license.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rsi.example.amqp;

import oracle.rsi.PushPublisher;
import oracle.rsi.ReactiveStreamsIngestion;
import oracle.sql.json.OracleJsonFactory;
import oracle.sql.json.OracleJsonObject;
import org.apache.qpid.jms.JmsConnectionFactory;
import rsi.example.common.Retailer;
import rsi.example.common.RSIService;

import javax.jms.*;
import java.io.ByteArrayInputStream;

/**
* A listener class that listens to inputs from the topic in ActiveMQ using AMQP
* protocol. RSI service starts at the time when the listener is up. Once the data
* is received, RSI streams the records into the database.
*/
public class Listener {
private static final String ACTIVEMQ_USER = "admin";
private static final String ACTIVEMQ_PASSWORD = "password";
private static final String ACTIVEMQ_HOST = "localhost";
private static final int ACTIVEMQ_PORT = 5672;
private static final String TOPIC_NAME = "event";

// TODO: replace the DB_URL with yours.
private static final String DB_URL = "jdbc:oracle:thin:@<your-connection-string>";
// TODO: replace the DB_USERNAME with your username.
private static final String DB_USERNAME = "<your-username>";
// TODO: replace the DB_PASSWORD with your password.
private static final String DB_PASSWORD = "<your-password>";

private static final OracleJsonFactory JSON_FACTORY = new OracleJsonFactory();
private static final RSIService RSI_SERVICE = new RSIService();

public static void main(String[] args) throws Exception {
// Setup ActiveMQ connection and consumer
String connectionURI = "amqp://" + ACTIVEMQ_HOST + ":" + ACTIVEMQ_PORT;
JmsConnectionFactory factory = new JmsConnectionFactory(connectionURI);

Connection connection = factory.
createConnection(ACTIVEMQ_USER, ACTIVEMQ_PASSWORD);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination destination = session.
createTopic(TOPIC_NAME);

MessageConsumer consumer = session.createConsumer(destination);

long start = System.currentTimeMillis();
long count = 1;

// Start up RSI
RSI_SERVICE.setUrl(DB_URL);
RSI_SERVICE.setUsername(DB_USERNAME);
RSI_SERVICE.setPassword(DB_PASSWORD);
RSI_SERVICE.setSchema(DB_USERNAME);
RSI_SERVICE.setEntity(Retailer.class);
ReactiveStreamsIngestion rsi = RSI_SERVICE.start();
PushPublisher<Retailer> pushPublisher = ReactiveStreamsIngestion.pushPublisher();
pushPublisher.subscribe(rsi.subscriber());

System.out.println("Waiting for messages...");

while (true) {
Message msg = consumer.receive();
if (msg instanceof TextMessage) {
String body = ((TextMessage) msg).getText();

if (body.trim().equals("SHUTDOWN")) {
long diff = System.currentTimeMillis() - start;
System.out.println(String.format("Received %d in %.2f seconds", count, (1.0 * diff / 1000.0)));
connection.close();

// close RSI and worker threads
pushPublisher.close();
RSI_SERVICE.stop();

try {
Thread.sleep(10);
} catch (Exception e) {
}
System.exit(1);

} else {
// Create OracleJsonObject from the incoming message
OracleJsonObject jsonObject = JSON_FACTORY
.createJsonTextValue(
new ByteArrayInputStream(body.getBytes()))
.asJsonObject();

// Push the data
pushPublisher.accept(new Retailer(jsonObject));

if (count == 1) {
start = System.currentTimeMillis();
} else if (count % 1000 == 0) {
System.out.println(String.format("Received %d messages.", count));
}
count++;
}

} else {
System.out.println("Unexpected message type: " + msg.getClass());
}
}
}
}
Loading