Skip to content

Commit 9083b40

Browse files
Adding RSI example (#225)
1 parent 5a4b032 commit 9083b40

File tree

7 files changed

+833
-0
lines changed

7 files changed

+833
-0
lines changed

java/rsi-example/README.md

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# Design High-Speed Data Ingestion Services Using MQTT, AMQP, and STOMP
2+
3+
This directory contains the code samples for Reactive Streams Ingestion (RSI) integrated with ActiveMQ.
4+
To view the details, visit [Design High-Speed Data Ingestion Services Using MQTT, AMQP, and STOMP](https://blogs.oracle.com/...).
5+
6+
## Prerequisites
7+
- Oracle Database 19c
8+
- JDK 19
9+
- ActiveMQ 5.17.2
10+
- RSI 21.7.0.0
11+
- Maven 3.8.1
12+
13+
## Create a table
14+
The `retailer` table is created with the following statement
15+
16+
```sql
17+
CREATE TABLE retailer (
18+
rank int,
19+
msr int,
20+
retailer varchar(255),
21+
name varchar(255),
22+
city varchar(255),
23+
phone varchar(255),
24+
terminal_type varchar(255),
25+
weeks_active int,
26+
instant_sales_amt varchar(255),
27+
online_sales_amt varchar(255),
28+
total_sales_amt varchar(255)
29+
);
30+
```
31+
32+
### Start ActiveMQ
33+
Download ActiveMQ from the [Apache ActiveMQ website](https://activemq.apache.org/components/classic/download/).
34+
Go to the ActiveMQ directory and run the following command to start up ActiveMQ:
35+
36+
```shell
37+
$ cd apache-activemq-5.17.2
38+
$ ./bin/activemq start
39+
INFO: Loading '/Users/tinglwang/Downloads/apache-activemq-5.17.2//bin/env'
40+
INFO: Using java '/Library/Java/JavaVirtualMachines/jdk-11.0.5.jdk/Contents/Home/bin/java'
41+
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
42+
INFO: pidfile created : '/Users/tinglwang/Downloads/apache-activemq-5.17.2//data/activemq.pid' (pid '61766')
43+
```
44+
45+
### Configure Listener.java
46+
- To connect to your own database, configure the `URL`, `username` and `password` parameters.
47+
48+
### Running the sample application
49+
Run the following command to start the listener. Note: change the target class to rsi.demo.mqtt.Listener and rsi.demo.stomp.Listener if you want to use the MQTT or STOMP protocol.
50+
The "--enable-preview" argument is required since Virtual Thread is a preview feature in JDK 19.
51+
```shell
52+
$ mvn package
53+
$ java --enable-preview -cp ./target/rsi-demo-0.1.0.jar rsi.demo.amqp.Listener
54+
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
55+
SLF4J: Defaulting to no-operation (NOP) logger implementation
56+
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
57+
Sep 30, 2022 4:26:00 PM oracle.rsi.logging.ClioSupport _log
58+
INFO: :::Database type is non-sharded.
59+
```
60+
61+
### Sending messages to ActiveMQ
62+
The default path is http://localhost:8161/api/message/event?type=topic and the port number is 8161. You also need to configure the credentials with the default ActiveMQ username and password "admin:admin".
63+
The easiest way is to send the request using cURL command. Post the data that we want to stream to the database as follows.
64+
65+
```shell
66+
$ curl -i -H Accept:application/json -XPOST http://localhost:8161/api/message/event?type=topic -u admin:admin -H Content-Type:application/json -d '{\
67+
"rank": 1,\
68+
"msr": 217,\
69+
"retailer": "100224",\
70+
"name": "Freddys One Stop",\
71+
"city": "Roland",\
72+
"phone": "(918) 503-6288",\
73+
"terminal_type": "Extrema",\
74+
"weeks_active": 37,\
75+
"instant_sales_amt": "$318,600.00 ",\
76+
"online_sales_amt": "$509,803.00 ",\
77+
"total_sales_amt": "$828,403.00 "}'
78+
HTTP/1.1 200 OK
79+
Date: Fri, 30 Sep 2022 20:34:28 GMT
80+
X-FRAME-OPTIONS: SAMEORIGIN
81+
X-XSS-Protection: 1; mode=block
82+
X-Content-Type-Options: nosniff
83+
Set-Cookie: JSESSIONID=node01f5up6hqo6g6ljv8l3a2cpzc5.node0; Path=/api; HttpOnly
84+
Expires: Thu, 01 Jan 1970 00:00:00 GMT
85+
messageID: ID:tinglwan-mac-49875-1664560122069-5:5:1:1:1
86+
Content-Length: 12
87+
```
88+
89+
### Close the listener and cleanup
90+
To close the listener, simply send a "SHUTDOWN" message to ActiveMQ and it will do the job. According to ActiveMQ's documentation, adding the "body" parameter is critical otherwise the web servlet will not read the body from the -d parameter, and this will cause an error.
91+
92+
```shell
93+
curl -XPOST -u admin:admin http://localhost:8161/api/message/event?type=topic -d "body=SHUTDOWN"
94+
```
95+
96+
Alternatively, you can run the test with Apache JMeter. Go to [Design High-Speed Data Ingestion Services Using MQTT, AMQP, and STOMP](https://blogs.oracle.com/...) for more details.
97+
98+
Once you've completed the test, stop ActiveMQ by running the following command:
99+
100+
```shell
101+
$ ./bin/activemq stop
102+
```

java/rsi-example/pom.xml

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
<modelVersion>4.0.0</modelVersion>
4+
5+
<groupId>com.oracle.database.jdbc</groupId>
6+
<artifactId>rsi-example</artifactId>
7+
<version>0.1.0</version>
8+
<packaging>jar</packaging>
9+
10+
<name>RSI Example</name>
11+
<description>Design High-Speed Data Ingestion Services Using MQTT, AMQP, and STOMP</description>
12+
13+
<properties>
14+
<maven.compiler.target>19</maven.compiler.target>
15+
<maven.compiler.source>19</maven.compiler.source>
16+
</properties>
17+
18+
<dependencies>
19+
<!-- RSI -->
20+
<dependency>
21+
<groupId>com.oracle.database.jdbc</groupId>
22+
<artifactId>rsi</artifactId>
23+
<version>21.7.0.0</version>
24+
</dependency>
25+
<dependency>
26+
<groupId>com.oracle.database.jdbc</groupId>
27+
<artifactId>ojdbc11</artifactId>
28+
<version>21.7.0.0</version>
29+
</dependency>
30+
<dependency>
31+
<groupId>com.oracle.database.jdbc</groupId>
32+
<artifactId>ucp11</artifactId>
33+
<version>21.7.0.0</version>
34+
</dependency>
35+
36+
<!-- AMQP, MQTT, STOMP -->
37+
<dependency>
38+
<groupId>org.apache.geronimo.specs</groupId>
39+
<artifactId>geronimo-jms_1.1_spec</artifactId>
40+
<version>1.1</version>
41+
</dependency>
42+
<dependency>
43+
<groupId>org.apache.qpid</groupId>
44+
<artifactId>qpid-jms-client</artifactId>
45+
<version>1.6.0</version>
46+
</dependency>
47+
<dependency>
48+
<groupId>org.fusesource.mqtt-client</groupId>
49+
<artifactId>mqtt-client</artifactId>
50+
<version>1.16</version>
51+
</dependency>
52+
<dependency>
53+
<groupId>org.fusesource.stompjms</groupId>
54+
<artifactId>stompjms-client</artifactId>
55+
<version>1.19</version>
56+
</dependency>
57+
</dependencies>
58+
59+
<build>
60+
<plugins>
61+
<plugin>
62+
<groupId>org.apache.maven.plugins</groupId>
63+
<artifactId>maven-compiler-plugin</artifactId>
64+
<version>3.10.1</version>
65+
<configuration>
66+
<source>${maven.compiler.source}</source>
67+
<target>${maven.compiler.target}</target>
68+
<compilerArgs>--enable-preview</compilerArgs>
69+
</configuration>
70+
</plugin>
71+
72+
<!-- include all the dependencies into the jar so it's easier to execute the example -->
73+
<plugin>
74+
<groupId>org.fusesource.mvnplugins</groupId>
75+
<artifactId>maven-uberize-plugin</artifactId>
76+
<version>1.14</version>
77+
<executions>
78+
<execution>
79+
<phase>package</phase>
80+
<goals>
81+
<goal>uberize</goal>
82+
</goals>
83+
</execution>
84+
</executions>
85+
</plugin>
86+
87+
</plugins>
88+
</build>
89+
90+
</project>
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
Copyright (c) 2021, 2022, Oracle and/or its affiliates.
3+
This software is dual-licensed to you under the Universal Permissive License
4+
(UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License
5+
2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
6+
either license.
7+
Licensed under the Apache License, Version 2.0 (the "License");
8+
you may not use this file except in compliance with the License.
9+
You may obtain a copy of the License at
10+
https://www.apache.org/licenses/LICENSE-2.0
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
package rsi.example.amqp;
18+
19+
import oracle.rsi.PushPublisher;
20+
import oracle.rsi.ReactiveStreamsIngestion;
21+
import oracle.sql.json.OracleJsonFactory;
22+
import oracle.sql.json.OracleJsonObject;
23+
import org.apache.qpid.jms.JmsConnectionFactory;
24+
import rsi.example.common.Retailer;
25+
import rsi.example.common.RSIService;
26+
27+
import javax.jms.*;
28+
import java.io.ByteArrayInputStream;
29+
30+
/**
31+
* A listener class that listens to inputs from the topic in ActiveMQ using AMQP
32+
* protocol. RSI service starts at the time when the listener is up. Once the data
33+
* is received, RSI streams the records into the database.
34+
*/
35+
public class Listener {
36+
private static final String ACTIVEMQ_USER = "admin";
37+
private static final String ACTIVEMQ_PASSWORD = "password";
38+
private static final String ACTIVEMQ_HOST = "localhost";
39+
private static final int ACTIVEMQ_PORT = 5672;
40+
private static final String TOPIC_NAME = "event";
41+
42+
// TODO: replace the DB_URL with yours.
43+
private static final String DB_URL = "jdbc:oracle:thin:@<your-connection-string>";
44+
// TODO: replace the DB_USERNAME with your username.
45+
private static final String DB_USERNAME = "<your-username>";
46+
// TODO: replace the DB_PASSWORD with your password.
47+
private static final String DB_PASSWORD = "<your-password>";
48+
49+
private static final OracleJsonFactory JSON_FACTORY = new OracleJsonFactory();
50+
private static final RSIService RSI_SERVICE = new RSIService();
51+
52+
public static void main(String[] args) throws Exception {
53+
// Setup ActiveMQ connection and consumer
54+
String connectionURI = "amqp://" + ACTIVEMQ_HOST + ":" + ACTIVEMQ_PORT;
55+
JmsConnectionFactory factory = new JmsConnectionFactory(connectionURI);
56+
57+
Connection connection = factory.
58+
createConnection(ACTIVEMQ_USER, ACTIVEMQ_PASSWORD);
59+
connection.start();
60+
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
61+
62+
Destination destination = session.
63+
createTopic(TOPIC_NAME);
64+
65+
MessageConsumer consumer = session.createConsumer(destination);
66+
67+
long start = System.currentTimeMillis();
68+
long count = 1;
69+
70+
// Start up RSI
71+
RSI_SERVICE.setUrl(DB_URL);
72+
RSI_SERVICE.setUsername(DB_USERNAME);
73+
RSI_SERVICE.setPassword(DB_PASSWORD);
74+
RSI_SERVICE.setSchema(DB_USERNAME);
75+
RSI_SERVICE.setEntity(Retailer.class);
76+
ReactiveStreamsIngestion rsi = RSI_SERVICE.start();
77+
PushPublisher<Retailer> pushPublisher = ReactiveStreamsIngestion.pushPublisher();
78+
pushPublisher.subscribe(rsi.subscriber());
79+
80+
System.out.println("Waiting for messages...");
81+
82+
while (true) {
83+
Message msg = consumer.receive();
84+
if (msg instanceof TextMessage) {
85+
String body = ((TextMessage) msg).getText();
86+
87+
if (body.trim().equals("SHUTDOWN")) {
88+
long diff = System.currentTimeMillis() - start;
89+
System.out.println(String.format("Received %d in %.2f seconds", count, (1.0 * diff / 1000.0)));
90+
connection.close();
91+
92+
// close RSI and worker threads
93+
pushPublisher.close();
94+
RSI_SERVICE.stop();
95+
96+
try {
97+
Thread.sleep(10);
98+
} catch (Exception e) {
99+
}
100+
System.exit(1);
101+
102+
} else {
103+
// Create OracleJsonObject from the incoming message
104+
OracleJsonObject jsonObject = JSON_FACTORY
105+
.createJsonTextValue(
106+
new ByteArrayInputStream(body.getBytes()))
107+
.asJsonObject();
108+
109+
// Push the data
110+
pushPublisher.accept(new Retailer(jsonObject));
111+
112+
if (count == 1) {
113+
start = System.currentTimeMillis();
114+
} else if (count % 1000 == 0) {
115+
System.out.println(String.format("Received %d messages.", count));
116+
}
117+
count++;
118+
}
119+
120+
} else {
121+
System.out.println("Unexpected message type: " + msg.getClass());
122+
}
123+
}
124+
}
125+
}

0 commit comments

Comments
 (0)