Skip to content

Commit f32d77d

Browse files
authored
Lazy iteration over JDBC ResultSet (#1487)
* refactor * autolint * polish * autolint * changes per review * autolint * unwrapping caller * changes per review
1 parent 328c2e5 commit f32d77d

File tree

4 files changed

+143
-59
lines changed

4 files changed

+143
-59
lines changed

extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@
3131
import java.util.ArrayList;
3232
import java.util.List;
3333
import java.util.Objects;
34+
import java.util.function.Consumer;
3435
import java.util.function.Function;
35-
import java.util.function.Predicate;
36+
import java.util.stream.Stream;
3637
import javax.sql.DataSource;
3738
import org.apache.polaris.extension.persistence.relational.jdbc.models.Converter;
3839

@@ -90,41 +91,56 @@ public void executeScript(String scriptFilePath) throws SQLException {
9091
}
9192

9293
/**
93-
* Executes SELECT Query
94+
* Executes SELECT Query and returns the results after applying a transformer
9495
*
9596
* @param query : Query to executed
96-
* @param entityClass : Class of the entity being selected
97-
* @param transformer : Transformation of entity class to Result class
98-
* @param entityFilter : Filter to applied on the Result class
99-
* @param limit : Limit to to enforced.
100-
* @return List of Result class objects
101-
* @param <T> : Entity class
102-
* @param <R> : Result class
97+
* @param converterInstance : An instance of the type being selected, used to convert to a
98+
* business entity like PolarisBaseEntity
99+
* @param transformer Transformation of entity class to Result class
100+
* @return The list of results yielded by the query
101+
* @param <T> : Persistence entity class
102+
* @param <R> : Business entity class
103103
* @throws SQLException : Exception during the query execution.
104104
*/
105105
public <T, R> List<R> executeSelect(
106106
@Nonnull String query,
107-
@Nonnull Class<T> entityClass,
108-
@Nonnull Function<T, R> transformer,
109-
Predicate<R> entityFilter,
110-
int limit)
107+
@Nonnull Converter<T> converterInstance,
108+
@Nonnull Function<T, R> transformer)
109+
throws SQLException {
110+
ArrayList<R> results = new ArrayList<>();
111+
executeSelectOverStream(
112+
query, converterInstance, stream -> stream.map(transformer).forEach(results::add));
113+
return results;
114+
}
115+
116+
/**
117+
* Executes SELECT Query and takes a consumer over the results. For callers that want more
118+
* sophisticated control over how query results are handled.
119+
*
120+
* @param query : Query to executed
121+
* @param converterInstance : An entity of the type being selected
122+
* @param consumer : An function to consume the returned results
123+
* @param <T> : Entity class
124+
* @throws SQLException : Exception during the query execution.
125+
*/
126+
public <T> void executeSelectOverStream(
127+
@Nonnull String query,
128+
@Nonnull Converter<T> converterInstance,
129+
@Nonnull Consumer<Stream<T>> consumer)
111130
throws SQLException {
112131
try (Connection connection = borrowConnection();
113132
Statement statement = connection.createStatement();
114133
ResultSet resultSet = statement.executeQuery(query)) {
115-
List<R> resultList = new ArrayList<>();
116-
while (resultSet.next() && resultList.size() < limit) {
117-
Converter<T> object =
118-
(Converter<T>)
119-
entityClass.getDeclaredConstructor().newInstance(); // Create a new instance
120-
R entity = transformer.apply(object.fromResultSet(resultSet));
121-
if (entityFilter == null || entityFilter.test(entity)) {
122-
resultList.add(entity);
123-
}
124-
}
125-
return resultList;
134+
ResultSetIterator<T> iterator = new ResultSetIterator<>(resultSet, converterInstance);
135+
consumer.accept(iterator.toStream());
126136
} catch (SQLException e) {
127137
throw e;
138+
} catch (RuntimeException e) {
139+
if (e.getCause() instanceof SQLException) {
140+
throw (SQLException) e.getCause();
141+
} else {
142+
throw e;
143+
}
128144
} catch (Exception e) {
129145
throw new RuntimeException(e);
130146
}

extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -297,8 +297,7 @@ public PolarisBaseEntity lookupEntityByName(
297297
private PolarisBaseEntity getPolarisBaseEntity(String query) {
298298
try {
299299
List<PolarisBaseEntity> results =
300-
datasourceOperations.executeSelect(
301-
query, ModelEntity.class, ModelEntity::toEntity, null, Integer.MAX_VALUE);
300+
datasourceOperations.executeSelect(query, new ModelEntity(), ModelEntity::toEntity);
302301
if (results.isEmpty()) {
303302
return null;
304303
} else if (results.size() > 1) {
@@ -322,8 +321,7 @@ public List<PolarisBaseEntity> lookupEntities(
322321
if (entityIds == null || entityIds.isEmpty()) return new ArrayList<>();
323322
String query = generateSelectQueryWithEntityIds(realmId, entityIds);
324323
try {
325-
return datasourceOperations.executeSelect(
326-
query, ModelEntity.class, ModelEntity::toEntity, null, Integer.MAX_VALUE);
324+
return datasourceOperations.executeSelect(query, new ModelEntity(), ModelEntity::toEntity);
327325
} catch (SQLException e) {
328326
throw new RuntimeException(
329327
String.format("Failed to retrieve polaris entities due to %s", e.getMessage()), e);
@@ -412,9 +410,17 @@ public <T> List<T> listEntities(
412410
// absence of transaction.
413411
String query = QueryGenerator.generateSelectQuery(new ModelEntity(), params);
414412
try {
415-
List<PolarisBaseEntity> results =
416-
datasourceOperations.executeSelect(
417-
query, ModelEntity.class, ModelEntity::toEntity, entityFilter, limit);
413+
List<PolarisBaseEntity> results = new ArrayList<>();
414+
datasourceOperations.executeSelectOverStream(
415+
query,
416+
new ModelEntity(),
417+
stream -> {
418+
stream
419+
.map(ModelEntity::toEntity)
420+
.filter(entityFilter)
421+
.limit(limit)
422+
.forEach(results::add);
423+
});
418424
return results == null
419425
? Collections.emptyList()
420426
: results.stream().filter(entityFilter).map(transformer).collect(Collectors.toList());
@@ -461,11 +467,7 @@ public PolarisGrantRecord lookupGrantRecord(
461467
try {
462468
List<PolarisGrantRecord> results =
463469
datasourceOperations.executeSelect(
464-
query,
465-
ModelGrantRecord.class,
466-
ModelGrantRecord::toGrantRecord,
467-
null,
468-
Integer.MAX_VALUE);
470+
query, new ModelGrantRecord(), ModelGrantRecord::toGrantRecord);
469471
if (results.size() > 1) {
470472
throw new IllegalStateException(
471473
String.format(
@@ -496,11 +498,7 @@ public List<PolarisGrantRecord> loadAllGrantRecordsOnSecurable(
496498
try {
497499
List<PolarisGrantRecord> results =
498500
datasourceOperations.executeSelect(
499-
query,
500-
ModelGrantRecord.class,
501-
ModelGrantRecord::toGrantRecord,
502-
null,
503-
Integer.MAX_VALUE);
501+
query, new ModelGrantRecord(), ModelGrantRecord::toGrantRecord);
504502
return results == null ? Collections.emptyList() : results;
505503
} catch (SQLException e) {
506504
throw new RuntimeException(
@@ -522,11 +520,7 @@ public List<PolarisGrantRecord> loadAllGrantRecordsOnGrantee(
522520
try {
523521
List<PolarisGrantRecord> results =
524522
datasourceOperations.executeSelect(
525-
query,
526-
ModelGrantRecord.class,
527-
ModelGrantRecord::toGrantRecord,
528-
null,
529-
Integer.MAX_VALUE);
523+
query, new ModelGrantRecord(), ModelGrantRecord::toGrantRecord);
530524
return results == null ? Collections.emptyList() : results;
531525
} catch (SQLException e) {
532526
throw new RuntimeException(
@@ -553,8 +547,7 @@ public boolean hasChildren(
553547
String query = generateSelectQuery(new ModelEntity(), params);
554548
try {
555549
List<ModelEntity> results =
556-
datasourceOperations.executeSelect(
557-
query, ModelEntity.class, Function.identity(), null, Integer.MAX_VALUE);
550+
datasourceOperations.executeSelect(query, new ModelEntity(), Function.identity());
558551
return results != null && !results.isEmpty();
559552
} catch (SQLException e) {
560553
throw new RuntimeException(
@@ -574,10 +567,8 @@ public PolarisPrincipalSecrets loadPrincipalSecrets(
574567
List<PolarisPrincipalSecrets> results =
575568
datasourceOperations.executeSelect(
576569
query,
577-
ModelPrincipalAuthenticationData.class,
578-
ModelPrincipalAuthenticationData::toPrincipalAuthenticationData,
579-
null,
580-
Integer.MAX_VALUE);
570+
new ModelPrincipalAuthenticationData(),
571+
ModelPrincipalAuthenticationData::toPrincipalAuthenticationData);
581572
return results == null || results.isEmpty() ? null : results.getFirst();
582573
} catch (SQLException e) {
583574
LOGGER.error(
@@ -880,10 +871,8 @@ private List<PolarisPolicyMappingRecord> fetchPolicyMappingRecords(String query)
880871
List<PolarisPolicyMappingRecord> results =
881872
datasourceOperations.executeSelect(
882873
query,
883-
ModelPolicyMappingRecord.class,
884-
ModelPolicyMappingRecord::toPolicyMappingRecord,
885-
null,
886-
Integer.MAX_VALUE);
874+
new ModelPolicyMappingRecord(),
875+
ModelPolicyMappingRecord::toPolicyMappingRecord);
887876
return results == null ? Collections.emptyList() : results;
888877
} catch (SQLException e) {
889878
throw new RuntimeException(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.extension.persistence.relational.jdbc;
20+
21+
import java.sql.ResultSet;
22+
import java.sql.SQLException;
23+
import java.util.Iterator;
24+
import java.util.NoSuchElementException;
25+
import java.util.Spliterator;
26+
import java.util.Spliterators;
27+
import java.util.stream.Stream;
28+
import java.util.stream.StreamSupport;
29+
import org.apache.polaris.extension.persistence.relational.jdbc.models.Converter;
30+
31+
/**
32+
* Used to wrap a ResultSet and to build a stream from the data it contains. This data structure
33+
* will not close the ResultSet passed in, so the caller is still responsible for managing its
34+
* lifecycle
35+
*/
36+
public class ResultSetIterator<T> implements Iterator<T> {
37+
private final ResultSet resultSet;
38+
private final Converter<T> converterInstance;
39+
private boolean hasNext;
40+
41+
public ResultSetIterator(ResultSet resultSet, Converter<T> converterInstance)
42+
throws SQLException {
43+
this.resultSet = resultSet;
44+
this.converterInstance = converterInstance;
45+
advance();
46+
}
47+
48+
private void advance() throws SQLException {
49+
try {
50+
hasNext = resultSet.next();
51+
} catch (SQLException e) {
52+
hasNext = false;
53+
throw e;
54+
}
55+
}
56+
57+
@Override
58+
public boolean hasNext() {
59+
return hasNext;
60+
}
61+
62+
@Override
63+
public T next() {
64+
if (!hasNext) {
65+
throw new NoSuchElementException();
66+
}
67+
try {
68+
T object = converterInstance.fromResultSet(resultSet);
69+
advance();
70+
return object;
71+
} catch (Exception e) {
72+
throw new RuntimeException(e);
73+
}
74+
}
75+
76+
public Stream<T> toStream() {
77+
Spliterator<T> spliterator = Spliterators.spliteratorUnknownSize(this, 0);
78+
return StreamSupport.stream(spliterator, false);
79+
}
80+
}

extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.function.Function;
2828
import javax.sql.DataSource;
2929
import org.apache.polaris.extension.persistence.relational.jdbc.DatasourceOperations;
30+
import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelEntity;
3031
import org.junit.jupiter.api.BeforeEach;
3132
import org.junit.jupiter.api.Test;
3233
import org.junit.jupiter.api.extension.ExtendWith;
@@ -76,9 +77,7 @@ void testExecuteSelect_exception() throws Exception {
7677

7778
assertThrows(
7879
SQLException.class,
79-
() ->
80-
datasourceOperations.executeSelect(
81-
query, Object.class, Function.identity(), null, Integer.MAX_VALUE));
80+
() -> datasourceOperations.executeSelect(query, new ModelEntity(), Function.identity()));
8281
}
8382

8483
@Test

0 commit comments

Comments
 (0)