diff --git a/java/rsi-atp-vt/README.md b/java/rsi-atp-vt/README.md new file mode 100644 index 00000000..532e41f5 --- /dev/null +++ b/java/rsi-atp-vt/README.md @@ -0,0 +1,2 @@ +## Java - Oracle Developers on Medium.com +[High-throughput stream processing with the Java Library for Reactive Streams Ingestion (RSI), Virtual Threads, and the Oracle ATP Database](https://medium.com/oracledevs/high-throughput-stream-processing-with-the-java-library-for-reactive-streams-ingestion-rsi-8aa0fc9d167) diff --git a/java/rsi-atp-vt/pom.xml b/java/rsi-atp-vt/pom.xml new file mode 100644 index 00000000..6bae7093 --- /dev/null +++ b/java/rsi-atp-vt/pom.xml @@ -0,0 +1,121 @@ + + + + 4.0.0 + + com.oracle.dev.jdbc + rsi-atp-vt + 1.0-SNAPSHOT + + rsi-atp-vt + A simple rsi-atp-vt. + https://github.com/juarezjuniorgithub/rsi-atp-vt + + + UTF-8 + 19 + 19 + + + + + junit + junit + 3.8.1 + + + + com.oracle.database.jdbc + rsi + 21.6.0.0.1 + + + com.oracle.database.jdbc + ojdbc11 + 21.6.0.0.1 + + + com.oracle.database.jdbc + ucp11 + 21.6.0.0.1 + + + com.oracle.ojdbc + ons + 19.3.0.0 + + + com.oracle.database.jdbc + oraclepki + 19.11.0.0 + + + com.oracle.database.security + osdt_core + 21.6.0.0.1 + + + com.oracle.database.security + osdt_cert + 21.6.0.0.1 + + + + + + + + + maven-clean-plugin + 3.1.0 + + + maven-site-plugin + 3.7.1 + + + maven-project-info-reports-plugin + 3.0.0 + + + + maven-resources-plugin + 3.0.2 + + + maven-compiler-plugin + 3.8.0 + + + --enable-preview + + + + + maven-surefire-plugin + 2.22.1 + + + maven-jar-plugin + 3.0.2 + + + maven-install-plugin + 2.5.2 + + + maven-deploy-plugin + 2.8.2 + + + + + + + + + maven-project-info-reports-plugin + + + + diff --git a/java/rsi-atp-vt/script/rsi.sql b/java/rsi-atp-vt/script/rsi.sql new file mode 100644 index 00000000..cf94dcf5 --- /dev/null +++ b/java/rsi-atp-vt/script/rsi.sql @@ -0,0 +1,9 @@ +CREATE USER RSIATP_USER IDENTIFIED BY ""; +GRANT DWROLE TO RSIATP_USER; +GRANT UNLIMITED TABLESPACE TO RSIATP_USER; + +CREATE TABLE RSIATP_USER.CUSTOMERS ( + ID NUMBER(20,0), + NAME VARCHAR2(20 BYTE), + REGION VARCHAR2(20 BYTE) +); diff --git a/java/rsi-atp-vt/src/main/java/com/oracle/dev/jdbc/Customer.java b/java/rsi-atp-vt/src/main/java/com/oracle/dev/jdbc/Customer.java new file mode 100644 index 00000000..c36f58a5 --- /dev/null +++ b/java/rsi-atp-vt/src/main/java/com/oracle/dev/jdbc/Customer.java @@ -0,0 +1,48 @@ +/* + Copyright (c) 2021, 2022, Oracle and/or its affiliates. + + This software is dual-licensed to you under the Universal Permissive License + (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License + 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose + either license. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.oracle.dev.jdbc; + +import oracle.rsi.StreamEntity; +import oracle.rsi.StreamField; + +@StreamEntity(tableName = "customers") +public class Customer { + + public Customer(long id, String name, String region) { + super(); + this.id = id; + this.name = name; + this.region = region; + } + + @StreamField + public long id; + + @StreamField + public String name; + + @StreamField(columnName = "region") + public String region; + + String someRandomField; + +} \ No newline at end of file diff --git a/java/rsi-atp-vt/src/main/java/com/oracle/dev/jdbc/DatabaseConfig.java b/java/rsi-atp-vt/src/main/java/com/oracle/dev/jdbc/DatabaseConfig.java new file mode 100644 index 00000000..ca9a4607 --- /dev/null +++ b/java/rsi-atp-vt/src/main/java/com/oracle/dev/jdbc/DatabaseConfig.java @@ -0,0 +1,72 @@ +/* + Copyright (c) 2021, 2022, Oracle and/or its affiliates. + + This software is dual-licensed to you under the Universal Permissive License + (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License + 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose + either license. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.oracle.dev.jdbc; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Properties; + +/** + *

+ * Configuration for connecting code samples to an Oracle Database instance. + *

+ */ +public class DatabaseConfig { + + private static final Properties CONFIG = new Properties(); + + static { + try { + var fileStream = Files + .newInputStream(Path.of("C:\\java-projects\\rsi-atp-vt\\src\\main\\resources\\config.properties")); + CONFIG.load(fileStream); + } catch (IOException e) { + e.printStackTrace(); + } + } + + private static final String DB_USER = CONFIG.getProperty("DB_USER"); + + private static final String DB_URL = CONFIG.getProperty("DB_URL"); + + private static final String DB_PASSWORD = CONFIG.getProperty("DB_PASSWORD"); + + private static final String DB_SCHEMA = CONFIG.getProperty("DB_SCHEMA"); + + public static String getDbUser() { + return DB_USER; + } + + public static String getDbUrl() { + return DB_URL; + } + + public static String getDbPassword() { + return DB_PASSWORD; + } + + public static String getDbSchema() { + return DB_SCHEMA; + } + +} diff --git a/java/rsi-atp-vt/src/main/java/com/oracle/dev/jdbc/RSIApp.java b/java/rsi-atp-vt/src/main/java/com/oracle/dev/jdbc/RSIApp.java new file mode 100644 index 00000000..ce8f3386 --- /dev/null +++ b/java/rsi-atp-vt/src/main/java/com/oracle/dev/jdbc/RSIApp.java @@ -0,0 +1,84 @@ +/* + Copyright (c) 2021, 2022, Oracle and/or its affiliates. + + This software is dual-licensed to you under the Universal Permissive License + (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License + 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose + either license. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.oracle.dev.jdbc; + +import java.sql.SQLException; +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.IntStream; + +import oracle.rsi.PushPublisher; +import oracle.rsi.ReactiveStreamsIngestion; + +public class RSIApp { + + private ExecutorService workerThreadPool; + private ReactiveStreamsIngestion rsi; + private PushPublisher publisher; + + public static void main(String[] args) throws SQLException { + RSIApp app = new RSIApp(); + app.init(); + app.publish(); + } + + private void init() { + // virtual threads + workerThreadPool = Executors.newVirtualThreadPerTaskExecutor(); + rsi = ReactiveStreamsIngestion.builder().url(DatabaseConfig.getDbUrl()).username(DatabaseConfig.getDbUser()) + .password(DatabaseConfig.getDbPassword()).schema(DatabaseConfig.getDbSchema()) + .executor(workerThreadPool).bufferRows(200).bufferInterval(Duration.ofSeconds(20)) + .entity(Customer.class).build(); + } + + private void publish() { + publisher = ReactiveStreamsIngestion.pushPublisher(); + publisher.subscribe(rsi.subscriber()); + // virtual threads + var threads = IntStream.range(0, 150).mapToObj(i -> Thread.startVirtualThread(() -> { + publisher.accept(new Object[] { ThreadLocalRandom.current().nextLong(), "Duke Java", "West" }); + })).toList(); + + for (var thread : threads) { + try { + thread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + destroy(); + } + + private void destroy() { + try { + publisher.close(); + rsi.close(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + workerThreadPool.shutdown(); + } + } + +} diff --git a/java/rsi-atp-vt/src/main/resources/config.properties b/java/rsi-atp-vt/src/main/resources/config.properties new file mode 100644 index 00000000..083e598d --- /dev/null +++ b/java/rsi-atp-vt/src/main/resources/config.properties @@ -0,0 +1,6 @@ +# https://docs.oracle.com/en/cloud/paas/autonomous-database/adbsa/connect-jdbc-thin-wallet.html +# jdbc:oracle:thin:@dbname_medium?TNS_ADMIN= +DB_URL=jdbc:oracle:thin:@rsitestdb_medium?TNS_ADMIN= +DB_USER= +DB_PASSWORD= +DB_SCHEMA= \ No newline at end of file diff --git a/java/rsi-atp-vt/terraform/adb.tf b/java/rsi-atp-vt/terraform/adb.tf new file mode 100644 index 00000000..caf72dee --- /dev/null +++ b/java/rsi-atp-vt/terraform/adb.tf @@ -0,0 +1,16 @@ +resource "oci_database_autonomous_database" "autonomous_database" { + compartment_id = var.compartment_ocid + db_name = upper(replace(var.name, "-", "")) + display_name = format("%s%s", "adb-", var.name) + db_workload = "OLTP" + is_dedicated = false + is_free_tier = false + cpu_core_count = 1 + data_storage_size_in_tbs = 1 + admin_password = var.adb_admin_password + is_mtls_connection_required = true + customer_contacts { + email = var.adb_customer_contacts_email + } + +} diff --git a/java/rsi-atp-vt/terraform/config.tf b/java/rsi-atp-vt/terraform/config.tf new file mode 100644 index 00000000..e52742e0 --- /dev/null +++ b/java/rsi-atp-vt/terraform/config.tf @@ -0,0 +1,7 @@ +terraform { + required_providers { + oci = { + source = "oracle/oci" + } + } +} \ No newline at end of file diff --git a/java/rsi-atp-vt/terraform/outputs.tf b/java/rsi-atp-vt/terraform/outputs.tf new file mode 100644 index 00000000..f3344af7 --- /dev/null +++ b/java/rsi-atp-vt/terraform/outputs.tf @@ -0,0 +1,3 @@ +output "adb_connection_strings" { + value = oci_database_autonomous_database.autonomous_database.connection_strings +} \ No newline at end of file diff --git a/java/rsi-atp-vt/terraform/variables.tf b/java/rsi-atp-vt/terraform/variables.tf new file mode 100644 index 00000000..3cdf3d1a --- /dev/null +++ b/java/rsi-atp-vt/terraform/variables.tf @@ -0,0 +1,25 @@ +# general +variable "compartment_ocid" { + type = string + default = "" + description = "OCID of your compartment" +} + +# naming +variable "name" { + type = string + default = "" + description = "ADB db name" +} + +# adb +variable "adb_admin_password" { + type = string + default = "" + description = "ADB pw" +} +variable "adb_customer_contacts_email" { + type = string + default = "" + description = "email address" +}