Skip to content

Commit 8fe6a78

Browse files
authored
[JDBC] Part2: Add Relational JDBC module (#1287)
1 parent bb7bce4 commit 8fe6a78

File tree

19 files changed

+2734
-7
lines changed

19 files changed

+2734
-7
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ Apache Polaris is organized into the following modules:
6161
- Persistence modules:
6262
- `polaris-jpa-model` - The JPA entity definitions
6363
- `polaris-eclipselink` - The Eclipselink implementation of the MetaStoreManager interface
64+
- `polaris-relational-jdbc` - The JDBC implementation of BasePersistence to be used via AtomicMetaStoreManager
6465

6566
Apache Polaris is built using Gradle with Java 21+ and Docker 27+.
6667

bom/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ dependencies {
4242
api(project(":polaris-service-common"))
4343

4444
api(project(":polaris-eclipselink"))
45+
api(project(":polaris-relational-jdbc"))
4546
api(project(":polaris-jpa-model"))
4647

4748
api(project(":polaris-quarkus-admin"))
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
plugins { id("polaris-server") }
21+
22+
dependencies {
23+
implementation(project(":polaris-core"))
24+
implementation(libs.slf4j.api)
25+
implementation(libs.guava)
26+
27+
compileOnly(libs.jakarta.annotation.api)
28+
compileOnly(libs.jakarta.enterprise.cdi.api)
29+
compileOnly(libs.jakarta.inject.api)
30+
31+
implementation(libs.smallrye.common.annotation) // @Identifier
32+
33+
testImplementation(libs.mockito.junit.jupiter)
34+
35+
testImplementation(libs.h2)
36+
testImplementation(testFixtures(project(":polaris-core")))
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.extension.persistence.relational.jdbc;
20+
21+
import static java.nio.charset.StandardCharsets.UTF_8;
22+
23+
import jakarta.annotation.Nonnull;
24+
import java.io.BufferedReader;
25+
import java.io.IOException;
26+
import java.io.InputStreamReader;
27+
import java.sql.Connection;
28+
import java.sql.ResultSet;
29+
import java.sql.SQLException;
30+
import java.sql.Statement;
31+
import java.util.ArrayList;
32+
import java.util.List;
33+
import java.util.Objects;
34+
import java.util.function.Function;
35+
import java.util.function.Predicate;
36+
import javax.sql.DataSource;
37+
import org.apache.polaris.extension.persistence.relational.jdbc.models.Converter;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
40+
41+
public class DatasourceOperations {
42+
private static final Logger LOGGER = LoggerFactory.getLogger(DatasourceOperations.class);
43+
44+
private static final String ALREADY_EXISTS_STATE_POSTGRES = "42P07";
45+
private static final String CONSTRAINT_VIOLATION_SQL_CODE = "23505";
46+
47+
private final DataSource datasource;
48+
49+
public DatasourceOperations(DataSource datasource) {
50+
this.datasource = datasource;
51+
}
52+
53+
/**
54+
* Execute SQL script
55+
*
56+
* @param scriptFilePath : Path of SQL script.
57+
* @throws SQLException : Exception while executing the script.
58+
*/
59+
public void executeScript(String scriptFilePath) throws SQLException {
60+
ClassLoader classLoader = DatasourceOperations.class.getClassLoader();
61+
try (Connection connection = borrowConnection();
62+
Statement statement = connection.createStatement()) {
63+
boolean autoCommit = connection.getAutoCommit();
64+
connection.setAutoCommit(true);
65+
try {
66+
BufferedReader reader =
67+
new BufferedReader(
68+
new InputStreamReader(
69+
Objects.requireNonNull(classLoader.getResourceAsStream(scriptFilePath)),
70+
UTF_8));
71+
StringBuilder sqlBuffer = new StringBuilder();
72+
String line;
73+
while ((line = reader.readLine()) != null) {
74+
line = line.trim();
75+
if (!line.isEmpty() && !line.startsWith("--")) { // Ignore empty lines and comments
76+
sqlBuffer.append(line).append("\n");
77+
if (line.endsWith(";")) { // Execute statement when semicolon is found
78+
String sql = sqlBuffer.toString().trim();
79+
try {
80+
int rowsUpdated = statement.executeUpdate(sql);
81+
LOGGER.debug("Query {} executed {} rows affected", sql, rowsUpdated);
82+
} catch (SQLException e) {
83+
LOGGER.error("Error executing query {}", sql, e);
84+
// re:throw this as unhandled exception
85+
throw new RuntimeException(e);
86+
}
87+
sqlBuffer.setLength(0); // Clear the buffer for the next statement
88+
}
89+
}
90+
}
91+
} finally {
92+
connection.setAutoCommit(autoCommit);
93+
}
94+
} catch (IOException e) {
95+
LOGGER.debug("Error reading the script file", e);
96+
throw new RuntimeException(e);
97+
} catch (SQLException e) {
98+
LOGGER.debug("Error executing the script file", e);
99+
throw e;
100+
}
101+
}
102+
103+
/**
104+
* Executes SELECT Query
105+
*
106+
* @param query : Query to executed
107+
* @param entityClass : Class of the entity being selected
108+
* @param transformer : Transformation of entity class to Result class
109+
* @param entityFilter : Filter to applied on the Result class
110+
* @param limit : Limit to to enforced.
111+
* @return List of Result class objects
112+
* @param <T> : Entity class
113+
* @param <R> : Result class
114+
* @throws SQLException : Exception during the query execution.
115+
*/
116+
public <T, R> List<R> executeSelect(
117+
@Nonnull String query,
118+
@Nonnull Class<T> entityClass,
119+
@Nonnull Function<T, R> transformer,
120+
Predicate<R> entityFilter,
121+
int limit)
122+
throws SQLException {
123+
try (Connection connection = borrowConnection();
124+
Statement statement = connection.createStatement();
125+
ResultSet resultSet = statement.executeQuery(query)) {
126+
List<R> resultList = new ArrayList<>();
127+
while (resultSet.next() && resultList.size() < limit) {
128+
Converter<T> object =
129+
(Converter<T>)
130+
entityClass.getDeclaredConstructor().newInstance(); // Create a new instance
131+
R entity = transformer.apply(object.fromResultSet(resultSet));
132+
if (entityFilter == null || entityFilter.test(entity)) {
133+
resultList.add(entity);
134+
}
135+
}
136+
return resultList;
137+
} catch (SQLException e) {
138+
LOGGER.debug("Error executing query {}", query, e);
139+
throw e;
140+
} catch (Exception e) {
141+
throw new RuntimeException(e);
142+
}
143+
}
144+
145+
/**
146+
* Executes the UPDATE or INSERT Query
147+
*
148+
* @param query : query to be executed
149+
* @return : Number of rows modified / inserted.
150+
* @throws SQLException : Exception during Query Execution.
151+
*/
152+
public int executeUpdate(String query) throws SQLException {
153+
try (Connection connection = borrowConnection();
154+
Statement statement = connection.createStatement()) {
155+
boolean autoCommit = connection.getAutoCommit();
156+
connection.setAutoCommit(true);
157+
try {
158+
return statement.executeUpdate(query);
159+
} catch (SQLException e) {
160+
LOGGER.debug("Error executing query {}", query, e);
161+
throw e;
162+
} finally {
163+
connection.setAutoCommit(autoCommit);
164+
}
165+
}
166+
}
167+
168+
/**
169+
* Transaction callback to be executed.
170+
*
171+
* @param callback : TransactionCallback to be executed within transaction
172+
* @throws SQLException : Exception caught during transaction execution.
173+
*/
174+
public void runWithinTransaction(TransactionCallback callback) throws SQLException {
175+
try (Connection connection = borrowConnection()) {
176+
boolean autoCommit = connection.getAutoCommit();
177+
connection.setAutoCommit(false);
178+
boolean success = false;
179+
try {
180+
try (Statement statement = connection.createStatement()) {
181+
success = callback.execute(statement);
182+
}
183+
} finally {
184+
if (success) {
185+
connection.commit();
186+
} else {
187+
connection.rollback();
188+
}
189+
connection.setAutoCommit(autoCommit);
190+
}
191+
} catch (SQLException e) {
192+
LOGGER.debug("Caught Error while executing transaction", e);
193+
throw e;
194+
}
195+
}
196+
197+
// Interface for transaction callback
198+
public interface TransactionCallback {
199+
boolean execute(Statement statement) throws SQLException;
200+
}
201+
202+
public boolean isConstraintViolation(SQLException e) {
203+
return CONSTRAINT_VIOLATION_SQL_CODE.equals(e.getSQLState());
204+
}
205+
206+
public boolean isAlreadyExistsException(SQLException e) {
207+
return ALREADY_EXISTS_STATE_POSTGRES.equals(e.getSQLState());
208+
}
209+
210+
private Connection borrowConnection() throws SQLException {
211+
return datasource.getConnection();
212+
}
213+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.extension.persistence.relational.jdbc;
20+
21+
import java.security.SecureRandom;
22+
23+
public class IdGenerator {
24+
private static final IdGenerator idGenerator = new IdGenerator();
25+
26+
public static IdGenerator getIdGenerator() {
27+
return idGenerator;
28+
}
29+
30+
private final SecureRandom secureRandom = new SecureRandom();
31+
private static final long LONG_MAX_ID = 0x7fffffffffffffffL;
32+
33+
private IdGenerator() {}
34+
35+
public long nextId() {
36+
// Make sure this is a positive number.
37+
// conflicting ids don't get accepted and is enforced by table constraints.
38+
return secureRandom.nextLong() & LONG_MAX_ID;
39+
}
40+
}

0 commit comments

Comments
 (0)