Skip to content

Commit b23728c

Browse files
Added sample code and script for RSI with Virtual Threads and ATP (#221)
1 parent 5c7fe5e commit b23728c

File tree

11 files changed

+393
-0
lines changed

11 files changed

+393
-0
lines changed

java/rsi-atp-vt/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
## Java - Oracle Developers on Medium.com
2+
[High-throughput stream processing with the Java Library for Reactive Streams Ingestion (RSI), Virtual Threads, and the Oracle ATP Database](https://medium.com/oracledevs/high-throughput-stream-processing-with-the-java-library-for-reactive-streams-ingestion-rsi-8aa0fc9d167)

java/rsi-atp-vt/pom.xml

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
3+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
6+
<groupId>com.oracle.dev.jdbc</groupId>
7+
<artifactId>rsi-atp-vt</artifactId>
8+
<version>1.0-SNAPSHOT</version>
9+
10+
<name>rsi-atp-vt</name>
11+
<description>A simple rsi-atp-vt.</description>
12+
<url>https://github.com/juarezjuniorgithub/rsi-atp-vt</url>
13+
14+
<properties>
15+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
16+
<maven.compiler.source>19</maven.compiler.source>
17+
<maven.compiler.target>19</maven.compiler.target>
18+
</properties>
19+
20+
<dependencies>
21+
<dependency>
22+
<groupId>junit</groupId>
23+
<artifactId>junit</artifactId>
24+
<version>3.8.1</version>
25+
</dependency>
26+
<!-- Oracle JDBC JARs -->
27+
<dependency>
28+
<groupId>com.oracle.database.jdbc</groupId>
29+
<artifactId>rsi</artifactId>
30+
<version>21.6.0.0.1</version>
31+
</dependency>
32+
<dependency>
33+
<groupId>com.oracle.database.jdbc</groupId>
34+
<artifactId>ojdbc11</artifactId>
35+
<version>21.6.0.0.1</version>
36+
</dependency>
37+
<dependency>
38+
<groupId>com.oracle.database.jdbc</groupId>
39+
<artifactId>ucp11</artifactId>
40+
<version>21.6.0.0.1</version>
41+
</dependency>
42+
<dependency>
43+
<groupId>com.oracle.ojdbc</groupId>
44+
<artifactId>ons</artifactId>
45+
<version>19.3.0.0</version>
46+
</dependency>
47+
<dependency>
48+
<groupId>com.oracle.database.jdbc</groupId>
49+
<artifactId>oraclepki</artifactId>
50+
<version>19.11.0.0</version>
51+
</dependency>
52+
<dependency>
53+
<groupId>com.oracle.database.security</groupId>
54+
<artifactId>osdt_core</artifactId>
55+
<version>21.6.0.0.1</version>
56+
</dependency>
57+
<dependency>
58+
<groupId>com.oracle.database.security</groupId>
59+
<artifactId>osdt_cert</artifactId>
60+
<version>21.6.0.0.1</version>
61+
</dependency>
62+
</dependencies>
63+
64+
<build>
65+
<pluginManagement>
66+
<!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
67+
<plugins>
68+
<plugin>
69+
<artifactId>maven-clean-plugin</artifactId>
70+
<version>3.1.0</version>
71+
</plugin>
72+
<plugin>
73+
<artifactId>maven-site-plugin</artifactId>
74+
<version>3.7.1</version>
75+
</plugin>
76+
<plugin>
77+
<artifactId>maven-project-info-reports-plugin</artifactId>
78+
<version>3.0.0</version>
79+
</plugin>
80+
<!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
81+
<plugin>
82+
<artifactId>maven-resources-plugin</artifactId>
83+
<version>3.0.2</version>
84+
</plugin>
85+
<plugin>
86+
<artifactId>maven-compiler-plugin</artifactId>
87+
<version>3.8.0</version>
88+
<configuration>
89+
<compilerArgs>
90+
<arg>--enable-preview</arg>
91+
</compilerArgs>
92+
</configuration>
93+
</plugin>
94+
<plugin>
95+
<artifactId>maven-surefire-plugin</artifactId>
96+
<version>2.22.1</version>
97+
</plugin>
98+
<plugin>
99+
<artifactId>maven-jar-plugin</artifactId>
100+
<version>3.0.2</version>
101+
</plugin>
102+
<plugin>
103+
<artifactId>maven-install-plugin</artifactId>
104+
<version>2.5.2</version>
105+
</plugin>
106+
<plugin>
107+
<artifactId>maven-deploy-plugin</artifactId>
108+
<version>2.8.2</version>
109+
</plugin>
110+
</plugins>
111+
</pluginManagement>
112+
</build>
113+
114+
<reporting>
115+
<plugins>
116+
<plugin>
117+
<artifactId>maven-project-info-reports-plugin</artifactId>
118+
</plugin>
119+
</plugins>
120+
</reporting>
121+
</project>

java/rsi-atp-vt/script/rsi.sql

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
CREATE USER RSIATP_USER IDENTIFIED BY "<PASSWORD>";
2+
GRANT DWROLE TO RSIATP_USER;
3+
GRANT UNLIMITED TABLESPACE TO RSIATP_USER;
4+
5+
CREATE TABLE RSIATP_USER.CUSTOMERS (
6+
ID NUMBER(20,0),
7+
NAME VARCHAR2(20 BYTE),
8+
REGION VARCHAR2(20 BYTE)
9+
);
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
Copyright (c) 2021, 2022, Oracle and/or its affiliates.
3+
4+
This software is dual-licensed to you under the Universal Permissive License
5+
(UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License
6+
2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
7+
either license.
8+
9+
Licensed under the Apache License, Version 2.0 (the "License");
10+
you may not use this file except in compliance with the License.
11+
You may obtain a copy of the License at
12+
13+
https://www.apache.org/licenses/LICENSE-2.0
14+
15+
Unless required by applicable law or agreed to in writing, software
16+
distributed under the License is distributed on an "AS IS" BASIS,
17+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
See the License for the specific language governing permissions and
19+
limitations under the License.
20+
*/
21+
22+
package com.oracle.dev.jdbc;
23+
24+
import oracle.rsi.StreamEntity;
25+
import oracle.rsi.StreamField;
26+
27+
@StreamEntity(tableName = "customers")
28+
public class Customer {
29+
30+
public Customer(long id, String name, String region) {
31+
super();
32+
this.id = id;
33+
this.name = name;
34+
this.region = region;
35+
}
36+
37+
@StreamField
38+
public long id;
39+
40+
@StreamField
41+
public String name;
42+
43+
@StreamField(columnName = "region")
44+
public String region;
45+
46+
String someRandomField;
47+
48+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
Copyright (c) 2021, 2022, Oracle and/or its affiliates.
3+
4+
This software is dual-licensed to you under the Universal Permissive License
5+
(UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License
6+
2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
7+
either license.
8+
9+
Licensed under the Apache License, Version 2.0 (the "License");
10+
you may not use this file except in compliance with the License.
11+
You may obtain a copy of the License at
12+
13+
https://www.apache.org/licenses/LICENSE-2.0
14+
15+
Unless required by applicable law or agreed to in writing, software
16+
distributed under the License is distributed on an "AS IS" BASIS,
17+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
See the License for the specific language governing permissions and
19+
limitations under the License.
20+
*/
21+
22+
package com.oracle.dev.jdbc;
23+
24+
import java.io.IOException;
25+
import java.nio.file.Files;
26+
import java.nio.file.Path;
27+
import java.util.Properties;
28+
29+
/**
30+
* <p>
31+
* Configuration for connecting code samples to an Oracle Database instance.
32+
* </p>
33+
*/
34+
public class DatabaseConfig {
35+
36+
private static final Properties CONFIG = new Properties();
37+
38+
static {
39+
try {
40+
var fileStream = Files
41+
.newInputStream(Path.of("C:\\java-projects\\rsi-atp-vt\\src\\main\\resources\\config.properties"));
42+
CONFIG.load(fileStream);
43+
} catch (IOException e) {
44+
e.printStackTrace();
45+
}
46+
}
47+
48+
private static final String DB_USER = CONFIG.getProperty("DB_USER");
49+
50+
private static final String DB_URL = CONFIG.getProperty("DB_URL");
51+
52+
private static final String DB_PASSWORD = CONFIG.getProperty("DB_PASSWORD");
53+
54+
private static final String DB_SCHEMA = CONFIG.getProperty("DB_SCHEMA");
55+
56+
public static String getDbUser() {
57+
return DB_USER;
58+
}
59+
60+
public static String getDbUrl() {
61+
return DB_URL;
62+
}
63+
64+
public static String getDbPassword() {
65+
return DB_PASSWORD;
66+
}
67+
68+
public static String getDbSchema() {
69+
return DB_SCHEMA;
70+
}
71+
72+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
Copyright (c) 2021, 2022, Oracle and/or its affiliates.
3+
4+
This software is dual-licensed to you under the Universal Permissive License
5+
(UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License
6+
2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
7+
either license.
8+
9+
Licensed under the Apache License, Version 2.0 (the "License");
10+
you may not use this file except in compliance with the License.
11+
You may obtain a copy of the License at
12+
13+
https://www.apache.org/licenses/LICENSE-2.0
14+
15+
Unless required by applicable law or agreed to in writing, software
16+
distributed under the License is distributed on an "AS IS" BASIS,
17+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
See the License for the specific language governing permissions and
19+
limitations under the License.
20+
*/
21+
22+
package com.oracle.dev.jdbc;
23+
24+
import java.sql.SQLException;
25+
import java.time.Duration;
26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.Executors;
28+
import java.util.concurrent.ThreadLocalRandom;
29+
import java.util.stream.IntStream;
30+
31+
import oracle.rsi.PushPublisher;
32+
import oracle.rsi.ReactiveStreamsIngestion;
33+
34+
public class RSIApp {
35+
36+
private ExecutorService workerThreadPool;
37+
private ReactiveStreamsIngestion rsi;
38+
private PushPublisher<Object[]> publisher;
39+
40+
public static void main(String[] args) throws SQLException {
41+
RSIApp app = new RSIApp();
42+
app.init();
43+
app.publish();
44+
}
45+
46+
private void init() {
47+
// virtual threads
48+
workerThreadPool = Executors.newVirtualThreadPerTaskExecutor();
49+
rsi = ReactiveStreamsIngestion.builder().url(DatabaseConfig.getDbUrl()).username(DatabaseConfig.getDbUser())
50+
.password(DatabaseConfig.getDbPassword()).schema(DatabaseConfig.getDbSchema())
51+
.executor(workerThreadPool).bufferRows(200).bufferInterval(Duration.ofSeconds(20))
52+
.entity(Customer.class).build();
53+
}
54+
55+
private void publish() {
56+
publisher = ReactiveStreamsIngestion.pushPublisher();
57+
publisher.subscribe(rsi.subscriber());
58+
// virtual threads
59+
var threads = IntStream.range(0, 150).mapToObj(i -> Thread.startVirtualThread(() -> {
60+
publisher.accept(new Object[] { ThreadLocalRandom.current().nextLong(), "Duke Java", "West" });
61+
})).toList();
62+
63+
for (var thread : threads) {
64+
try {
65+
thread.join();
66+
} catch (InterruptedException e) {
67+
e.printStackTrace();
68+
}
69+
}
70+
destroy();
71+
}
72+
73+
private void destroy() {
74+
try {
75+
publisher.close();
76+
rsi.close();
77+
} catch (Exception e) {
78+
e.printStackTrace();
79+
} finally {
80+
workerThreadPool.shutdown();
81+
}
82+
}
83+
84+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# https://docs.oracle.com/en/cloud/paas/autonomous-database/adbsa/connect-jdbc-thin-wallet.html
2+
# jdbc:oracle:thin:@dbname_medium?TNS_ADMIN=<PATH_TO_YOUR_WALLET>
3+
DB_URL=jdbc:oracle:thin:@rsitestdb_medium?TNS_ADMIN=<PATH_TO_YOUR_WALLET>
4+
DB_USER=<DB_USER>
5+
DB_PASSWORD=<DB_PASSWORD>
6+
DB_SCHEMA=<DB_SCHEMA>

java/rsi-atp-vt/terraform/adb.tf

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
resource "oci_database_autonomous_database" "autonomous_database" {
2+
compartment_id = var.compartment_ocid
3+
db_name = upper(replace(var.name, "-", ""))
4+
display_name = format("%s%s", "adb-", var.name)
5+
db_workload = "OLTP"
6+
is_dedicated = false
7+
is_free_tier = false
8+
cpu_core_count = 1
9+
data_storage_size_in_tbs = 1
10+
admin_password = var.adb_admin_password
11+
is_mtls_connection_required = true
12+
customer_contacts {
13+
email = var.adb_customer_contacts_email
14+
}
15+
16+
}

java/rsi-atp-vt/terraform/config.tf

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
terraform {
2+
required_providers {
3+
oci = {
4+
source = "oracle/oci"
5+
}
6+
}
7+
}

java/rsi-atp-vt/terraform/outputs.tf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
output "adb_connection_strings" {
2+
value = oci_database_autonomous_database.autonomous_database.connection_strings
3+
}

0 commit comments

Comments
 (0)