From c5965a3afe60c182b146b1515c9590095fc5f7ed Mon Sep 17 00:00:00 2001 From: harayuanwang Date: Thu, 10 Nov 2022 11:07:43 -0500 Subject: [PATCH 1/4] add rsi-example --- java/rsi-example | 1 + 1 file changed, 1 insertion(+) create mode 160000 java/rsi-example diff --git a/java/rsi-example b/java/rsi-example new file mode 160000 index 00000000..d2c692a3 --- /dev/null +++ b/java/rsi-example @@ -0,0 +1 @@ +Subproject commit d2c692a3397f8e4d1a7026b5e27c26708d41bc2c From 5ef25702b73d78e9965631d933cb2d62601b95b1 Mon Sep 17 00:00:00 2001 From: harayuanwang Date: Thu, 10 Nov 2022 11:14:15 -0500 Subject: [PATCH 2/4] add rsi-example --- java/rsi-example | 1 - java/rsi-example/README.md | 102 +++++++++++++ java/rsi-example/pom.xml | 90 +++++++++++ .../main/java/rsi/example/amqp/Listener.java | 102 +++++++++++++ .../java/rsi/example/common/RSIService.java | 70 +++++++++ .../java/rsi/example/common/Retailer.java | 74 +++++++++ .../main/java/rsi/example/mqtt/Listener.java | 143 ++++++++++++++++++ .../main/java/rsi/example/stomp/Listener.java | 110 ++++++++++++++ 8 files changed, 691 insertions(+), 1 deletion(-) delete mode 160000 java/rsi-example create mode 100644 java/rsi-example/README.md create mode 100644 java/rsi-example/pom.xml create mode 100644 java/rsi-example/src/main/java/rsi/example/amqp/Listener.java create mode 100644 java/rsi-example/src/main/java/rsi/example/common/RSIService.java create mode 100644 java/rsi-example/src/main/java/rsi/example/common/Retailer.java create mode 100644 java/rsi-example/src/main/java/rsi/example/mqtt/Listener.java create mode 100644 java/rsi-example/src/main/java/rsi/example/stomp/Listener.java diff --git a/java/rsi-example b/java/rsi-example deleted file mode 160000 index d2c692a3..00000000 --- a/java/rsi-example +++ /dev/null @@ -1 +0,0 @@ -Subproject commit d2c692a3397f8e4d1a7026b5e27c26708d41bc2c diff --git a/java/rsi-example/README.md b/java/rsi-example/README.md new file mode 100644 index 00000000..edfd4397 --- /dev/null +++ b/java/rsi-example/README.md @@ -0,0 +1,102 @@ +# RSI Demo + +This directory contains the code samples for Reactive Streams Ingestion (RSI) integrated with ActiveMQ. +To view the details, visit [Design High-Speed Data Ingestion Services Using MQTT, AMQP, and STOMP](https://blogs.oracle.com/...). + +## Prerequisites +- Oracle Database 19c +- JDK 19 +- ActiveMQ 5.17.2 +- RSI 21.7.0.0 +- Maven 3.8.1 + +## Create a table +The `retailer` table is created with the following statement + +```sql +CREATE TABLE retailer ( + rank int, + msr int, + retailer varchar(255), + name varchar(255), + city varchar(255), + phone varchar(255), + terminal_type varchar(255), + weeks_active int, + instant_sales_amt varchar(255), + online_sales_amt varchar(255), + total_sales_amt varchar(255) +); +``` + +### Start ActiveMQ +Download ActiveMQ from the [Apache ActiveMQ website](https://activemq.apache.org/components/classic/download/). +Go to the ActiveMQ directory and run the following command to start up ActiveMQ: + +```shell +$ cd apache-activemq-5.17.2 +$ ./bin/activemq start +INFO: Loading '/Users/tinglwang/Downloads/apache-activemq-5.17.2//bin/env' +INFO: Using java '/Library/Java/JavaVirtualMachines/jdk-11.0.5.jdk/Contents/Home/bin/java' +INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details +INFO: pidfile created : '/Users/tinglwang/Downloads/apache-activemq-5.17.2//data/activemq.pid' (pid '61766') +``` + +### Configure Listener.java +- To connect to your own database, configure the `URL`, `username` and `password` parameters. + +### Running the sample application +Run the following command to start the listener. Note: change the target class to rsi.demo.mqtt.Listener and rsi.demo.stomp.Listener if you want to use the MQTT or STOMP protocol. +The "--enable-preview" argument is required since Virtual Thread is a preview feature in JDK 19. +```shell +$ mvn package +$ java --enable-preview -cp ./target/rsi-demo-0.1.0.jar rsi.demo.amqp.Listener +SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". +SLF4J: Defaulting to no-operation (NOP) logger implementation +SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. +Sep 30, 2022 4:26:00 PM oracle.rsi.logging.ClioSupport _log +INFO: :::Database type is non-sharded. +``` + +### Sending messages to ActiveMQ +The default path is http://localhost:8161/api/message/event?type=topic and the port number is 8161. You also need to configure the credentials with the default ActiveMQ username and password "admin:admin". +The easiest way is to send the request using cURL command. Post the data that we want to stream to the database as follows. + +```shell +$ curl -i -H Accept:application/json -XPOST http://localhost:8161/api/message/event?type=topic -u admin:admin -H Content-Type:application/json -d '{\ +"rank": 1,\ +"msr": 217,\ +"retailer": "100224",\ +"name": "Freddys One Stop",\ +"city": "Roland",\ +"phone": "(918) 503-6288",\ +"terminal_type": "Extrema",\ +"weeks_active": 37,\ +"instant_sales_amt": "$318,600.00 ",\ +"online_sales_amt": "$509,803.00 ",\ +"total_sales_amt": "$828,403.00 "}' +HTTP/1.1 200 OK +Date: Fri, 30 Sep 2022 20:34:28 GMT +X-FRAME-OPTIONS: SAMEORIGIN +X-XSS-Protection: 1; mode=block +X-Content-Type-Options: nosniff +Set-Cookie: JSESSIONID=node01f5up6hqo6g6ljv8l3a2cpzc5.node0; Path=/api; HttpOnly +Expires: Thu, 01 Jan 1970 00:00:00 GMT +messageID: ID:tinglwan-mac-49875-1664560122069-5:5:1:1:1 +Content-Length: 12 +``` + +### Close the listener and cleanup +To close the listener, simply send a "SHUTDOWN" message to ActiveMQ and it will do the job. According to ActiveMQ's documentation, adding the "body" parameter is critical otherwise the web servlet will not read the body from the -d parameter, and this will cause an error. + +```shell +curl -XPOST -u admin:admin http://localhost:8161/api/message/event?type=topic -d "body=SHUTDOWN" +``` + +Alternatively, you can run the test with Apache JMeter. Go to [Design High-Speed Data Ingestion Services Using MQTT, AMQP, and STOMP](https://blogs.oracle.com/...) for more details. + +Once you've completed the test, stop ActiveMQ by running the following command: + +```shell +$ ./bin/activemq stop +``` \ No newline at end of file diff --git a/java/rsi-example/pom.xml b/java/rsi-example/pom.xml new file mode 100644 index 00000000..5d74eb24 --- /dev/null +++ b/java/rsi-example/pom.xml @@ -0,0 +1,90 @@ + + 4.0.0 + + com.oracle.database.jdbc + rsi-example + 0.1.0 + jar + + RSI Example + Design High-Speed Data Ingestion Services Using MQTT, AMQP, and STOMP + + + 19 + 19 + + + + + + com.oracle.database.jdbc + rsi + 21.7.0.0 + + + com.oracle.database.jdbc + ojdbc11 + 21.7.0.0 + + + com.oracle.database.jdbc + ucp11 + 21.7.0.0 + + + + + org.apache.geronimo.specs + geronimo-jms_1.1_spec + 1.1 + + + org.apache.qpid + qpid-jms-client + 1.6.0 + + + org.fusesource.mqtt-client + mqtt-client + 1.16 + + + org.fusesource.stompjms + stompjms-client + 1.19 + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.10.1 + + ${maven.compiler.source} + ${maven.compiler.target} + --enable-preview + + + + + + org.fusesource.mvnplugins + maven-uberize-plugin + 1.14 + + + package + + uberize + + + + + + + + + diff --git a/java/rsi-example/src/main/java/rsi/example/amqp/Listener.java b/java/rsi-example/src/main/java/rsi/example/amqp/Listener.java new file mode 100644 index 00000000..c07c0927 --- /dev/null +++ b/java/rsi-example/src/main/java/rsi/example/amqp/Listener.java @@ -0,0 +1,102 @@ +package rsi.example.amqp; + +import oracle.rsi.PushPublisher; +import oracle.rsi.ReactiveStreamsIngestion; +import oracle.sql.json.OracleJsonFactory; +import oracle.sql.json.OracleJsonObject; +import org.apache.qpid.jms.JmsConnectionFactory; +import rsi.example.common.Retailer; +import rsi.example.common.RSIService; + +import javax.jms.*; +import java.io.ByteArrayInputStream; + +public class Listener { + private static final String ACTIVEMQ_USER = "admin"; + private static final String ACTIVEMQ_PASSWORD = "password"; + private static final String ACTIVEMQ_HOST = "localhost"; + private static final int ACTIVEMQ_PORT = 5672; + private static final String TOPIC_NAME = "event"; + + private static final String DB_URL = "jdbc:oracle:thin:@" + + "(description= (retry_count=20)(retry_delay=3)(address=(protocol=tcps)(port=1521)(host=adb.us-phoenix-1.oraclecloud.com))(connect_data=(service_name=gebqqvpozhjbqbs_azuretestdb_high.adb.oraclecloud.com))(security=(ssl_server_dn_match=yes)))"; + private static final String DB_USERNAME = "admin"; + private static final String DB_PASSWORD = "Example01*manager"; + + private static final OracleJsonFactory JSON_FACTORY = new OracleJsonFactory(); + private static final RSIService RSI_SERVICE = new RSIService(); + + public static void main(String[] args) throws Exception { + // Setup ActiveMQ connection and consumer + String connectionURI = "amqp://" + ACTIVEMQ_HOST + ":" + ACTIVEMQ_PORT; + JmsConnectionFactory factory = new JmsConnectionFactory(connectionURI); + + Connection connection = factory. + createConnection(ACTIVEMQ_USER, ACTIVEMQ_PASSWORD); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Destination destination = session. + createTopic(TOPIC_NAME); + + MessageConsumer consumer = session.createConsumer(destination); + + long start = System.currentTimeMillis(); + long count = 1; + + // Start up RSI + RSI_SERVICE.setUrl(DB_URL); + RSI_SERVICE.setUsername(DB_USERNAME); + RSI_SERVICE.setPassword(DB_PASSWORD); + RSI_SERVICE.setScheme(DB_USERNAME); + RSI_SERVICE.setEntity(Retailer.class); + ReactiveStreamsIngestion rsi = RSI_SERVICE.start(); + PushPublisher pushPublisher = ReactiveStreamsIngestion.pushPublisher(); + pushPublisher.subscribe(rsi.subscriber()); + + System.out.println("Waiting for messages..."); + + while (true) { + Message msg = consumer.receive(); + if (msg instanceof TextMessage) { + String body = ((TextMessage) msg).getText(); + + if (body.trim().equals("SHUTDOWN")) { + long diff = System.currentTimeMillis() - start; + System.out.println(String.format("Received %d in %.2f seconds", count, (1.0 * diff / 1000.0))); + connection.close(); + + // close RSI and worker threads + pushPublisher.close(); + RSI_SERVICE.stop(); + + try { + Thread.sleep(10); + } catch (Exception e) { + } + System.exit(1); + + } else { + // Create OracleJsonObject from the incoming message + OracleJsonObject jsonObject = JSON_FACTORY + .createJsonTextValue( + new ByteArrayInputStream(body.getBytes())) + .asJsonObject(); + + // Push the data +// pushPublisher.accept(new Retailer(jsonObject)); + + if (count == 1) { + start = System.currentTimeMillis(); + } else if (count % 100 == 0) { + System.out.println(String.format("Received %d messages.", count)); + } + count++; + } + + } else { + System.out.println("Unexpected message type: " + msg.getClass()); + } + } + } +} diff --git a/java/rsi-example/src/main/java/rsi/example/common/RSIService.java b/java/rsi-example/src/main/java/rsi/example/common/RSIService.java new file mode 100644 index 00000000..13372292 --- /dev/null +++ b/java/rsi-example/src/main/java/rsi/example/common/RSIService.java @@ -0,0 +1,70 @@ +package rsi.example.common; + +import oracle.rsi.ReactiveStreamsIngestion; + +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public final class RSIService { + private ExecutorService workers; + private ReactiveStreamsIngestion rsi; + + private String url; + private String username; + private String password; + private String scheme; + private Class entity; + + public ReactiveStreamsIngestion start() { + if (rsi != null) { + return rsi; + } + +// workers = Executors.newVirtualThreadPerTaskExecutor(); + workers = Executors.newFixedThreadPool(2); + + rsi = ReactiveStreamsIngestion + .builder() + .url(url) + .username(username) + .password(password) + .schema(scheme) + .entity(entity) + .executor(workers) + .bufferInterval(Duration.ofMinutes(60)) + .build(); + + return rsi; + } + + public void stop() { + if (rsi != null) { + rsi.close(); + } + + if (workers != null) { + workers.shutdown(); + } + } + + public void setUrl(String url) { + this.url = url; + } + + public void setUsername(String username) { + this.username = username; + } + + public void setPassword(String password) { + this.password = password; + } + + public void setScheme(String scheme) { + this.scheme = scheme; + } + + public void setEntity(Class entity) { + this.entity = entity; + } +} diff --git a/java/rsi-example/src/main/java/rsi/example/common/Retailer.java b/java/rsi-example/src/main/java/rsi/example/common/Retailer.java new file mode 100644 index 00000000..dc99e30f --- /dev/null +++ b/java/rsi-example/src/main/java/rsi/example/common/Retailer.java @@ -0,0 +1,74 @@ +package rsi.example.common; + +import oracle.rsi.StreamEntity; +import oracle.rsi.StreamField; +import oracle.sql.json.OracleJsonObject; +import oracle.sql.json.OracleJsonValue; + +import java.util.stream.Stream; + +@StreamEntity(tableName = "retailer") +public class Retailer { + + public Retailer(OracleJsonObject jsonObject) { + Stream + .of(this.getClass().getDeclaredFields()) + .filter(df -> (df.getAnnotation(StreamField.class) != null)) + .forEach(f -> { + f.setAccessible(true); + + String fieldName = f.getName(); + OracleJsonValue jsonValue = jsonObject.get(fieldName); + OracleJsonValue.OracleJsonType type = jsonValue.getOracleJsonType(); + + try { + switch (type) { + case DECIMAL: + f.setInt(this, jsonValue.asJsonDecimal().intValue()); + break; + case STRING: + f.set(this, jsonValue.asJsonString().getString()); + break; + default: + throw new IllegalArgumentException("unknown type"); + } + } catch (IllegalAccessException ex) { + ex.printStackTrace(); + } + }); + } + + @StreamField + public int rank; + + @StreamField + public int msr; + + @StreamField + public String retailer; + + @StreamField + public String name; + + @StreamField + public String city; + + @StreamField + public String phone; + + @StreamField + public String terminal_type; + + @StreamField + public int weeks_active; + + @StreamField + public String instant_sales_amt; + + @StreamField + public String online_sales_amt; + + @StreamField + public String total_sales_amt; + +} diff --git a/java/rsi-example/src/main/java/rsi/example/mqtt/Listener.java b/java/rsi-example/src/main/java/rsi/example/mqtt/Listener.java new file mode 100644 index 00000000..83bc7351 --- /dev/null +++ b/java/rsi-example/src/main/java/rsi/example/mqtt/Listener.java @@ -0,0 +1,143 @@ +package rsi.example.mqtt; + +import oracle.rsi.PushPublisher; +import oracle.rsi.ReactiveStreamsIngestion; +import oracle.sql.json.OracleJsonFactory; +import oracle.sql.json.OracleJsonObject; +import org.fusesource.hawtbuf.Buffer; +import org.fusesource.hawtbuf.UTF8Buffer; +import org.fusesource.mqtt.client.*; +import rsi.example.common.Retailer; +import rsi.example.common.RSIService; + +import java.io.ByteArrayInputStream; + +public class Listener { + private static final String ACTIVEMQ_HOST = "localhost"; + private static final int ACTIVEMQ_PORT = 1883; + private static final String ACTIVEMQ_USER = "admin"; + private static final String ACTIVEMQ_PASSWORD = "password"; + private static final String DESTINATION = "/topic/event"; + private static final String TOPIC_NAME = "event"; + + private static final String DB_URL = "jdbc:oracle:thin:@" + + "(description= (retry_count=20)(retry_delay=3)(address=(protocol=tcps)(port=1521)(host=adb.us-phoenix-1.oraclecloud.com))(connect_data=(service_name=gebqqvpozhjbqbs_azuretestdb_high.adb.oraclecloud.com))(security=(ssl_server_dn_match=yes)))"; + private static final String DB_USERNAME = "admin"; + private static final String DB_PASSWORD = "Example01*manager"; + + private static final OracleJsonFactory JSON_FACTORY = new OracleJsonFactory(); + private static final RSIService RSI_SERVICE = new RSIService(); + + public static void main(String[] args) throws Exception { + // Setup ActiveMQ connection and consumer + MQTT mqtt = new MQTT(); + mqtt.setHost(ACTIVEMQ_HOST, ACTIVEMQ_PORT); + mqtt.setUserName(ACTIVEMQ_USER); + mqtt.setPassword(ACTIVEMQ_PASSWORD); + + final CallbackConnection connection = mqtt.callbackConnection(); + connection.listener(new org.fusesource.mqtt.client.Listener() { + long count = 0; + long start = System.currentTimeMillis(); + + PushPublisher pushPublisher; + + public void onConnected() { + // Start up RSI + RSI_SERVICE.setUrl(DB_URL); + RSI_SERVICE.setUsername(DB_USERNAME); + RSI_SERVICE.setPassword(DB_PASSWORD); + RSI_SERVICE.setScheme(DB_USERNAME); + RSI_SERVICE.setEntity(Retailer.class); + ReactiveStreamsIngestion rsi = RSI_SERVICE.start(); + pushPublisher = ReactiveStreamsIngestion.pushPublisher(); + pushPublisher.subscribe(rsi.subscriber()); + } + public void onDisconnected() { + RSI_SERVICE.stop(); + } + public void onFailure(Throwable value) { + try { + pushPublisher.close(); + } catch (Exception e) { + e.printStackTrace(); + } + RSI_SERVICE.stop(); + + value.printStackTrace(); + System.exit(-2); + } + public void onPublish(UTF8Buffer topic, Buffer msg, Runnable ack) { + String body = msg.utf8().toString(); + + if (body.trim().equals("SHUTDOWN")) { + long diff = System.currentTimeMillis() - start; + System.out.println(String.format("Received %d in %.2f seconds", count, (1.0*diff/1000.0))); + + try { + pushPublisher.close(); + } catch (Exception e) { + e.printStackTrace(); + } + RSI_SERVICE.stop(); + + connection.disconnect(new Callback() { + @Override + public void onSuccess(Void value) { + System.exit(0); + } + @Override + public void onFailure(Throwable value) { + value.printStackTrace(); + System.exit(-2); + } + }); + } else { + OracleJsonObject jsonObject = JSON_FACTORY + .createJsonTextValue( + new ByteArrayInputStream(body.getBytes())) + .asJsonObject(); + + // Push the data +// pushPublisher.accept(new Retailer(jsonObject)); + + if( count == 0 ) { + start = System.currentTimeMillis(); + } + if( count % 100 == 0 ) { + System.out.println(String.format("Received %d messages.", count)); + } + count ++; + } + ack.run(); + } + }); + + connection.connect(new Callback() { + @Override + public void onSuccess(Void value) { + Topic[] topics = {new Topic(TOPIC_NAME, QoS.AT_LEAST_ONCE)}; + connection.subscribe(topics, new Callback() { + public void onSuccess(byte[] qoses) { + } + public void onFailure(Throwable value) { + value.printStackTrace(); + System.exit(-2); + } + }); + } + @Override + public void onFailure(Throwable value) { + value.printStackTrace(); + System.exit(-2); + } + }); + + // Wait forever.. + synchronized (Listener.class) { + while(true) + Listener.class.wait(); + } + } +} + diff --git a/java/rsi-example/src/main/java/rsi/example/stomp/Listener.java b/java/rsi-example/src/main/java/rsi/example/stomp/Listener.java new file mode 100644 index 00000000..be4224b1 --- /dev/null +++ b/java/rsi-example/src/main/java/rsi/example/stomp/Listener.java @@ -0,0 +1,110 @@ +package rsi.example.stomp; + +import oracle.rsi.PushPublisher; +import oracle.rsi.ReactiveStreamsIngestion; +import oracle.sql.json.OracleJsonFactory; +import oracle.sql.json.OracleJsonObject; +import org.fusesource.stomp.jms.StompJmsConnectionFactory; +import org.fusesource.stomp.jms.StompJmsDestination; +import org.fusesource.stomp.jms.message.StompJmsBytesMessage; +import rsi.example.common.Retailer; +import rsi.example.common.RSIService; + +import javax.jms.*; +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; + +public class Listener { + private static final String ACTIVEMQ_HOST = "localhost"; + private static final int ACTIVEMQ_PORT = 61613; + private static final String DESTINATION = "/topic/event"; + + private static final String DB_URL = "jdbc:oracle:thin:@" + + "(description= (retry_count=20)(retry_delay=3)(address=(protocol=tcps)(port=1521)(host=adb.us-phoenix-1.oraclecloud.com))(connect_data=(service_name=gebqqvpozhjbqbs_azuretestdb_high.adb.oraclecloud.com))(security=(ssl_server_dn_match=yes)))"; + private static final String DB_USERNAME = "admin"; + private static final String DB_PASSWORD = "Example01*manager"; + + private static final OracleJsonFactory JSON_FACTORY = new OracleJsonFactory(); + private static final RSIService RSI_SERVICE = new RSIService(); + + public static void main(String[] args) throws Exception { + StompJmsConnectionFactory factory = new StompJmsConnectionFactory(); + factory.setBrokerURI("tcp://" + ACTIVEMQ_HOST + ":" + ACTIVEMQ_PORT); + + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination dest = new StompJmsDestination(DESTINATION); + + MessageConsumer consumer = session.createConsumer(dest); + + long start = System.currentTimeMillis(); + long count = 1; + + // Start up RSI + RSI_SERVICE.setUrl(DB_URL); + RSI_SERVICE.setUsername(DB_USERNAME); + RSI_SERVICE.setPassword(DB_PASSWORD); + RSI_SERVICE.setScheme(DB_USERNAME); + RSI_SERVICE.setEntity(Retailer.class); + ReactiveStreamsIngestion rsi = RSI_SERVICE.start(); + PushPublisher pushPublisher = ReactiveStreamsIngestion.pushPublisher(); + pushPublisher.subscribe(rsi.subscriber()); + + System.out.println("Waiting for messages..."); + while (true) { + Message msg = consumer.receive(); + + if (msg instanceof TextMessage || msg instanceof StompJmsBytesMessage) { + String body = getBody(msg); + if (body.trim().equals("SHUTDOWN")) { + long diff = System.currentTimeMillis() - start; + System.out.println(String.format("Received %d in %.2f seconds", count, (1.0 * diff / 1000.0))); + + // close RSI and worker threads + pushPublisher.close(); + RSI_SERVICE.stop(); + break; + + } else { + // Create OracleJsonObject from the incoming message + OracleJsonObject jsonObject = JSON_FACTORY + .createJsonTextValue( + new ByteArrayInputStream(body.getBytes())) + .asJsonObject(); + + // Push the data + pushPublisher.accept(new Retailer(jsonObject)); + + if (count == 0) { + start = System.currentTimeMillis(); + } + if (count % 1000 == 0) { + System.out.println(String.format("Received %d messages.", count)); + } + count++; + } + + } else { + System.out.println("Unexpected message type: " + msg.getClass()); + } + } + connection.close(); + } + + private static String getBody(Message msg) throws JMSException { + if (msg instanceof TextMessage) { + return ((TextMessage) msg).getText(); + + } else if (msg instanceof StompJmsBytesMessage) { + StompJmsBytesMessage stompMsg = (StompJmsBytesMessage)msg; + + byte bytesArray[] = new byte[(int) stompMsg.getBodyLength()]; + stompMsg.readBytes(bytesArray); + return new String(bytesArray, StandardCharsets.UTF_8); + + } else { + throw new IllegalArgumentException("Unexpected message type: " + msg.getClass()); + } + } +} From 7d391f86510c54715c66bf259312f1c8d28fb040 Mon Sep 17 00:00:00 2001 From: harayuanwang Date: Thu, 10 Nov 2022 11:17:43 -0500 Subject: [PATCH 3/4] modify README.md --- java/rsi-example/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/java/rsi-example/README.md b/java/rsi-example/README.md index edfd4397..bf544440 100644 --- a/java/rsi-example/README.md +++ b/java/rsi-example/README.md @@ -1,4 +1,4 @@ -# RSI Demo +# Design High-Speed Data Ingestion Services Using MQTT, AMQP, and STOMP This directory contains the code samples for Reactive Streams Ingestion (RSI) integrated with ActiveMQ. To view the details, visit [Design High-Speed Data Ingestion Services Using MQTT, AMQP, and STOMP](https://blogs.oracle.com/...). @@ -99,4 +99,4 @@ Once you've completed the test, stop ActiveMQ by running the following command: ```shell $ ./bin/activemq stop -``` \ No newline at end of file +``` From 009f8661a8b494e270ab5a4e3d35ef3fb7c485cf Mon Sep 17 00:00:00 2001 From: harayuanwang Date: Thu, 10 Nov 2022 16:51:43 -0500 Subject: [PATCH 4/4] add JavaDoc and copyrights --- .../main/java/rsi/example/amqp/Listener.java | 37 ++++++++--- .../java/rsi/example/common/RSIService.java | 61 +++++++++++++++++-- .../java/rsi/example/common/Retailer.java | 27 +++++++- .../main/java/rsi/example/mqtt/Listener.java | 38 +++++++++--- .../main/java/rsi/example/stomp/Listener.java | 33 ++++++++-- 5 files changed, 169 insertions(+), 27 deletions(-) diff --git a/java/rsi-example/src/main/java/rsi/example/amqp/Listener.java b/java/rsi-example/src/main/java/rsi/example/amqp/Listener.java index c07c0927..ecd00733 100644 --- a/java/rsi-example/src/main/java/rsi/example/amqp/Listener.java +++ b/java/rsi-example/src/main/java/rsi/example/amqp/Listener.java @@ -1,3 +1,19 @@ +/* + Copyright (c) 2021, 2022, Oracle and/or its affiliates. + This software is dual-licensed to you under the Universal Permissive License + (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License + 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose + either license. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + https://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ package rsi.example.amqp; import oracle.rsi.PushPublisher; @@ -11,6 +27,11 @@ import javax.jms.*; import java.io.ByteArrayInputStream; +/** + * A listener class that listens to inputs from the topic in ActiveMQ using AMQP + * protocol. RSI service starts at the time when the listener is up. Once the data + * is received, RSI streams the records into the database. + */ public class Listener { private static final String ACTIVEMQ_USER = "admin"; private static final String ACTIVEMQ_PASSWORD = "password"; @@ -18,10 +39,12 @@ public class Listener { private static final int ACTIVEMQ_PORT = 5672; private static final String TOPIC_NAME = "event"; - private static final String DB_URL = "jdbc:oracle:thin:@" + - "(description= (retry_count=20)(retry_delay=3)(address=(protocol=tcps)(port=1521)(host=adb.us-phoenix-1.oraclecloud.com))(connect_data=(service_name=gebqqvpozhjbqbs_azuretestdb_high.adb.oraclecloud.com))(security=(ssl_server_dn_match=yes)))"; - private static final String DB_USERNAME = "admin"; - private static final String DB_PASSWORD = "Example01*manager"; + // TODO: replace the DB_URL with yours. + private static final String DB_URL = "jdbc:oracle:thin:@"; + // TODO: replace the DB_USERNAME with your username. + private static final String DB_USERNAME = ""; + // TODO: replace the DB_PASSWORD with your password. + private static final String DB_PASSWORD = ""; private static final OracleJsonFactory JSON_FACTORY = new OracleJsonFactory(); private static final RSIService RSI_SERVICE = new RSIService(); @@ -48,7 +71,7 @@ public static void main(String[] args) throws Exception { RSI_SERVICE.setUrl(DB_URL); RSI_SERVICE.setUsername(DB_USERNAME); RSI_SERVICE.setPassword(DB_PASSWORD); - RSI_SERVICE.setScheme(DB_USERNAME); + RSI_SERVICE.setSchema(DB_USERNAME); RSI_SERVICE.setEntity(Retailer.class); ReactiveStreamsIngestion rsi = RSI_SERVICE.start(); PushPublisher pushPublisher = ReactiveStreamsIngestion.pushPublisher(); @@ -84,11 +107,11 @@ public static void main(String[] args) throws Exception { .asJsonObject(); // Push the data -// pushPublisher.accept(new Retailer(jsonObject)); + pushPublisher.accept(new Retailer(jsonObject)); if (count == 1) { start = System.currentTimeMillis(); - } else if (count % 100 == 0) { + } else if (count % 1000 == 0) { System.out.println(String.format("Received %d messages.", count)); } count++; diff --git a/java/rsi-example/src/main/java/rsi/example/common/RSIService.java b/java/rsi-example/src/main/java/rsi/example/common/RSIService.java index 13372292..c4b163b8 100644 --- a/java/rsi-example/src/main/java/rsi/example/common/RSIService.java +++ b/java/rsi-example/src/main/java/rsi/example/common/RSIService.java @@ -1,3 +1,19 @@ +/* + Copyright (c) 2021, 2022, Oracle and/or its affiliates. + This software is dual-licensed to you under the Universal Permissive License + (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License + 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose + either license. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + https://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ package rsi.example.common; import oracle.rsi.ReactiveStreamsIngestion; @@ -6,30 +22,44 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +/** + * A class that builds RSI service. + */ public final class RSIService { + + /** ExecutorService that uses virtual threads from JDK 19 **/ private ExecutorService workers; + private ReactiveStreamsIngestion rsi; + /** URL of the target database **/ private String url; + /** Username of the database user **/ private String username; + /** Password of the database user **/ private String password; - private String scheme; + /** Database schema to be used **/ + private String schema; + /** Reference to a POJO class that represents objects in the database **/ private Class entity; + /** + * Start RSI + * @return {@link oracle.rsi.ReactiveStreamsIngestion} object + */ public ReactiveStreamsIngestion start() { if (rsi != null) { return rsi; } -// workers = Executors.newVirtualThreadPerTaskExecutor(); - workers = Executors.newFixedThreadPool(2); + workers = Executors.newVirtualThreadPerTaskExecutor(); rsi = ReactiveStreamsIngestion .builder() .url(url) .username(username) .password(password) - .schema(scheme) + .schema(schema) .entity(entity) .executor(workers) .bufferInterval(Duration.ofMinutes(60)) @@ -38,6 +68,9 @@ public ReactiveStreamsIngestion start() { return rsi; } + /** + * Stop RSI + */ public void stop() { if (rsi != null) { rsi.close(); @@ -48,22 +81,38 @@ public void stop() { } } + /** + * Set URL + * @param url URL of the target database. + */ public void setUrl(String url) { this.url = url; } + /** + * @param username Username of the database user + */ public void setUsername(String username) { this.username = username; } + /** + * @param password Password of the database user + */ public void setPassword(String password) { this.password = password; } - public void setScheme(String scheme) { - this.scheme = scheme; + /** + * @param schema Schema to be used + */ + public void setSchema(String schema) { + this.schema = schema; } + /** + * @param entity A POJO class that represents objects in the database + */ public void setEntity(Class entity) { this.entity = entity; } diff --git a/java/rsi-example/src/main/java/rsi/example/common/Retailer.java b/java/rsi-example/src/main/java/rsi/example/common/Retailer.java index dc99e30f..6d28cece 100644 --- a/java/rsi-example/src/main/java/rsi/example/common/Retailer.java +++ b/java/rsi-example/src/main/java/rsi/example/common/Retailer.java @@ -1,4 +1,19 @@ -package rsi.example.common; +/* + Copyright (c) 2021, 2022, Oracle and/or its affiliates. + This software is dual-licensed to you under the Universal Permissive License + (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License + 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose + either license. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + https://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/package rsi.example.common; import oracle.rsi.StreamEntity; import oracle.rsi.StreamField; @@ -7,9 +22,19 @@ import java.util.stream.Stream; +/** + * A POJO that defines the record to be streamed. + * The class that is annotated with @StreamEntity means that its instances can be + * stored in a database. Every instance of an entity represents a row in the table. + * Fields that are annotated with @StreamField will be mapped to the corresponding + * columns in the table. + */ @StreamEntity(tableName = "retailer") public class Retailer { + /** + * @param jsonObject jsonObject which is converted from the JSON payload + */ public Retailer(OracleJsonObject jsonObject) { Stream .of(this.getClass().getDeclaredFields()) diff --git a/java/rsi-example/src/main/java/rsi/example/mqtt/Listener.java b/java/rsi-example/src/main/java/rsi/example/mqtt/Listener.java index 83bc7351..5ea127c6 100644 --- a/java/rsi-example/src/main/java/rsi/example/mqtt/Listener.java +++ b/java/rsi-example/src/main/java/rsi/example/mqtt/Listener.java @@ -1,3 +1,19 @@ +/* + Copyright (c) 2021, 2022, Oracle and/or its affiliates. + This software is dual-licensed to you under the Universal Permissive License + (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License + 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose + either license. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + https://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ package rsi.example.mqtt; import oracle.rsi.PushPublisher; @@ -12,18 +28,24 @@ import java.io.ByteArrayInputStream; +/** + * A listener class that listens to inputs from the topic in ActiveMQ using MQTT + * protocol. RSI service starts at the time when the listener is up. Once the data + * is received, RSI streams the records into the database. + */ public class Listener { private static final String ACTIVEMQ_HOST = "localhost"; private static final int ACTIVEMQ_PORT = 1883; private static final String ACTIVEMQ_USER = "admin"; private static final String ACTIVEMQ_PASSWORD = "password"; - private static final String DESTINATION = "/topic/event"; private static final String TOPIC_NAME = "event"; - private static final String DB_URL = "jdbc:oracle:thin:@" + - "(description= (retry_count=20)(retry_delay=3)(address=(protocol=tcps)(port=1521)(host=adb.us-phoenix-1.oraclecloud.com))(connect_data=(service_name=gebqqvpozhjbqbs_azuretestdb_high.adb.oraclecloud.com))(security=(ssl_server_dn_match=yes)))"; - private static final String DB_USERNAME = "admin"; - private static final String DB_PASSWORD = "Example01*manager"; + // TODO: replace the DB_URL with yours. + private static final String DB_URL = "jdbc:oracle:thin:@"; + // TODO: replace the DB_USERNAME with your username. + private static final String DB_USERNAME = ""; + // TODO: replace the DB_PASSWORD with your password. + private static final String DB_PASSWORD = ""; private static final OracleJsonFactory JSON_FACTORY = new OracleJsonFactory(); private static final RSIService RSI_SERVICE = new RSIService(); @@ -47,7 +69,7 @@ public void onConnected() { RSI_SERVICE.setUrl(DB_URL); RSI_SERVICE.setUsername(DB_USERNAME); RSI_SERVICE.setPassword(DB_PASSWORD); - RSI_SERVICE.setScheme(DB_USERNAME); + RSI_SERVICE.setSchema(DB_USERNAME); RSI_SERVICE.setEntity(Retailer.class); ReactiveStreamsIngestion rsi = RSI_SERVICE.start(); pushPublisher = ReactiveStreamsIngestion.pushPublisher(); @@ -99,12 +121,12 @@ public void onFailure(Throwable value) { .asJsonObject(); // Push the data -// pushPublisher.accept(new Retailer(jsonObject)); + pushPublisher.accept(new Retailer(jsonObject)); if( count == 0 ) { start = System.currentTimeMillis(); } - if( count % 100 == 0 ) { + if( count % 1000 == 0 ) { System.out.println(String.format("Received %d messages.", count)); } count ++; diff --git a/java/rsi-example/src/main/java/rsi/example/stomp/Listener.java b/java/rsi-example/src/main/java/rsi/example/stomp/Listener.java index be4224b1..b0475cbd 100644 --- a/java/rsi-example/src/main/java/rsi/example/stomp/Listener.java +++ b/java/rsi-example/src/main/java/rsi/example/stomp/Listener.java @@ -1,3 +1,19 @@ +/* + Copyright (c) 2021, 2022, Oracle and/or its affiliates. + This software is dual-licensed to you under the Universal Permissive License + (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License + 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose + either license. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + https://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ package rsi.example.stomp; import oracle.rsi.PushPublisher; @@ -14,15 +30,22 @@ import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; +/** + * A listener class that listens to inputs from the topic in ActiveMQ using STOMP + * protocol. RSI service starts at the time when the listener is up. Once the data + * is received, RSI streams the records into the database. + */ public class Listener { private static final String ACTIVEMQ_HOST = "localhost"; private static final int ACTIVEMQ_PORT = 61613; private static final String DESTINATION = "/topic/event"; - private static final String DB_URL = "jdbc:oracle:thin:@" + - "(description= (retry_count=20)(retry_delay=3)(address=(protocol=tcps)(port=1521)(host=adb.us-phoenix-1.oraclecloud.com))(connect_data=(service_name=gebqqvpozhjbqbs_azuretestdb_high.adb.oraclecloud.com))(security=(ssl_server_dn_match=yes)))"; - private static final String DB_USERNAME = "admin"; - private static final String DB_PASSWORD = "Example01*manager"; + // TODO: replace the DB_URL with yours. + private static final String DB_URL = "jdbc:oracle:thin:@"; + // TODO: replace the DB_USERNAME with your username. + private static final String DB_USERNAME = ""; + // TODO: replace the DB_PASSWORD with your password. + private static final String DB_PASSWORD = ""; private static final OracleJsonFactory JSON_FACTORY = new OracleJsonFactory(); private static final RSIService RSI_SERVICE = new RSIService(); @@ -45,7 +68,7 @@ public static void main(String[] args) throws Exception { RSI_SERVICE.setUrl(DB_URL); RSI_SERVICE.setUsername(DB_USERNAME); RSI_SERVICE.setPassword(DB_PASSWORD); - RSI_SERVICE.setScheme(DB_USERNAME); + RSI_SERVICE.setSchema(DB_USERNAME); RSI_SERVICE.setEntity(Retailer.class); ReactiveStreamsIngestion rsi = RSI_SERVICE.start(); PushPublisher pushPublisher = ReactiveStreamsIngestion.pushPublisher();