Skip to content

Commit

Permalink
Merge pull request #3523 from FOCONIS/streaming-tests
Browse files Browse the repository at this point in the history
set defaultFetchBuffer in findEach/findList in SQL and DTO Queries
  • Loading branch information
rbygrave authored Dec 18, 2024
2 parents 1c038f4 + 4d08dce commit 4367546
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,9 @@ public interface SpiSqlBinding extends SpiCancelableQuery {
*/
int getBufferFetchSizeHint();

/**
* Set the JDBC fetchSize buffer hint if not explicitly set.
*/
void setDefaultFetchBuffer(int fetchSize);

}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ public String getBindLog() {

protected abstract void requestComplete();

/**
* Set the JDBC buffer fetchSize hint if not set explicitly.
*/
public void setDefaultFetchBuffer(int fetchSize) {
query.setDefaultFetchBuffer(fetchSize);
}
/**
* Close the underlying resources.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,12 @@ AutoTuneService createAutoTuneService(SpiEbeanServer server) {
}

DtoQueryEngine createDtoQueryEngine() {
return new DtoQueryEngine(binder);
return new DtoQueryEngine(binder, config.getJdbcFetchSizeFindEach(), config.getJdbcFetchSizeFindList());
}

RelationalQueryEngine createRelationalQueryEngine() {
return new DefaultRelationalQueryEngine(binder, config.getDatabaseBooleanTrue(), config.getPlatformConfig().getDbUuid().useBinaryOptimized());
return new DefaultRelationalQueryEngine(binder, config.getDatabaseBooleanTrue(), config.getPlatformConfig().getDbUuid().useBinaryOptimized(),
config.getJdbcFetchSizeFindEach(), config.getJdbcFetchSizeFindList());
}

OrmQueryEngine createOrmQueryEngine() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,17 @@ public final class DefaultRelationalQueryEngine implements RelationalQueryEngine
private final String dbTrueValue;
private final boolean binaryOptimizedUUID;
private final TimedMetricMap timedMetricMap;
private final int defaultFetchSizeFindEach;
private final int defaultFetchSizeFindList;

public DefaultRelationalQueryEngine(Binder binder, String dbTrueValue, boolean binaryOptimizedUUID) {
public DefaultRelationalQueryEngine(Binder binder, String dbTrueValue, boolean binaryOptimizedUUID,
int defaultFetchSizeFindEach, int defaultFetchSizeFindList) {
this.binder = binder;
this.dbTrueValue = dbTrueValue == null ? "true" : dbTrueValue;
this.binaryOptimizedUUID = binaryOptimizedUUID;
this.timedMetricMap = MetricFactory.get().createTimedMetricMap("sql.query.");
this.defaultFetchSizeFindEach = defaultFetchSizeFindEach;
this.defaultFetchSizeFindList = defaultFetchSizeFindList;
}

@Override
Expand All @@ -59,6 +64,9 @@ private String errMsg(String msg, String sql) {
@Override
public void findEach(RelationalQueryRequest request, RowConsumer consumer) {
try {
if (defaultFetchSizeFindEach > 0) {
request.setDefaultFetchBuffer(defaultFetchSizeFindEach);
}
request.executeSql(binder, SpiQuery.Type.ITERATE);
request.mapEach(consumer);
request.logSummary();
Expand All @@ -74,6 +82,9 @@ public void findEach(RelationalQueryRequest request, RowConsumer consumer) {
@Override
public <T> void findEach(RelationalQueryRequest request, RowReader<T> reader, Predicate<T> consumer) {
try {
if (defaultFetchSizeFindEach > 0) {
request.setDefaultFetchBuffer(defaultFetchSizeFindEach);
}
request.executeSql(binder, SpiQuery.Type.ITERATE);
while (request.next()) {
if (!consumer.test(reader.read())) {
Expand Down Expand Up @@ -109,6 +120,9 @@ public <T> T findOne(RelationalQueryRequest request, RowMapper<T> mapper) {
@Override
public <T> List<T> findList(RelationalQueryRequest request, RowReader<T> reader) {
try {
if (defaultFetchSizeFindList > 0) {
request.setDefaultFetchBuffer(defaultFetchSizeFindList);
}
request.executeSql(binder, SpiQuery.Type.LIST);
List<T> rows = new ArrayList<>();
while (request.next()) {
Expand All @@ -129,6 +143,9 @@ public <T> List<T> findList(RelationalQueryRequest request, RowReader<T> reader)
public <T> T findSingleAttribute(RelationalQueryRequest request, Class<T> cls) {
ScalarType<T> scalarType = (ScalarType<T>) binder.getScalarType(cls);
try {
if (defaultFetchSizeFindList > 0) {
request.setDefaultFetchBuffer(defaultFetchSizeFindList);
}
request.executeSql(binder, SpiQuery.Type.ATTRIBUTE);
final DataReader dataReader = binder.createDataReader(request.resultSet());
T value = null;
Expand All @@ -151,6 +168,9 @@ public <T> T findSingleAttribute(RelationalQueryRequest request, Class<T> cls) {
public <T> List<T> findSingleAttributeList(RelationalQueryRequest request, Class<T> cls) {
ScalarType<T> scalarType = (ScalarType<T>) binder.getScalarType(cls);
try {
if (defaultFetchSizeFindList > 0) {
request.setDefaultFetchBuffer(defaultFetchSizeFindList);
}
request.executeSql(binder, SpiQuery.Type.ATTRIBUTE);
final DataReader dataReader = binder.createDataReader(request.resultSet());
List<T> rows = new ArrayList<>();
Expand All @@ -173,6 +193,9 @@ public <T> List<T> findSingleAttributeList(RelationalQueryRequest request, Class
public <T> void findSingleAttributeEach(RelationalQueryRequest request, Class<T> cls, Consumer<T> consumer) {
ScalarType<T> scalarType = (ScalarType<T>) binder.getScalarType(cls);
try {
if (defaultFetchSizeFindEach > 0) {
request.setDefaultFetchBuffer(defaultFetchSizeFindEach);
}
request.executeSql(binder, SpiQuery.Type.ATTRIBUTE);
final DataReader dataReader = binder.createDataReader(request.resultSet());
while (dataReader.next()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import io.ebeaninternal.api.SpiQuery;
import io.ebeaninternal.server.core.DtoQueryRequest;
import io.ebeaninternal.server.persist.Binder;

import jakarta.persistence.PersistenceException;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -15,13 +15,20 @@
public final class DtoQueryEngine {

private final Binder binder;
private final int defaultFetchSizeFindEach;
private final int defaultFetchSizeFindList;

public DtoQueryEngine(Binder binder) {
public DtoQueryEngine(Binder binder, int defaultFetchSizeFindEach, int defaultFetchSizeFindList) {
this.binder = binder;
this.defaultFetchSizeFindEach = defaultFetchSizeFindEach;
this.defaultFetchSizeFindList = defaultFetchSizeFindList;
}

public <T> List<T> findList(DtoQueryRequest<T> request) {
try {
if (defaultFetchSizeFindList > 0) {
request.setDefaultFetchBuffer(defaultFetchSizeFindList);
}
request.executeSql(binder, SpiQuery.Type.LIST);
List<T> rows = new ArrayList<>();
while (request.next()) {
Expand All @@ -38,6 +45,9 @@ public <T> List<T> findList(DtoQueryRequest<T> request) {

public <T> QueryIterator<T> findIterate(DtoQueryRequest<T> request) {
try {
if (defaultFetchSizeFindEach > 0) {
request.setDefaultFetchBuffer(defaultFetchSizeFindEach);
}
request.executeSql(binder, SpiQuery.Type.ITERATE);
return new DtoQueryIterator<>(request);
} catch (SQLException e) {
Expand All @@ -46,7 +56,10 @@ public <T> QueryIterator<T> findIterate(DtoQueryRequest<T> request) {
}

public <T> void findEach(DtoQueryRequest<T> request, Consumer<T> consumer) {
try {
try {
if (defaultFetchSizeFindEach > 0) {
request.setDefaultFetchBuffer(defaultFetchSizeFindEach);
}
request.executeSql(binder, SpiQuery.Type.ITERATE);
while (request.next()) {
consumer.accept(request.readNextBean());
Expand All @@ -61,6 +74,9 @@ public <T> void findEach(DtoQueryRequest<T> request, Consumer<T> consumer) {
public <T> void findEach(DtoQueryRequest<T> request, int batchSize, Consumer<List<T>> consumer) {
try {
List<T> buffer = new ArrayList<>();
if (defaultFetchSizeFindEach > 0) {
request.setDefaultFetchBuffer(defaultFetchSizeFindEach);
}
request.executeSql(binder, SpiQuery.Type.ITERATE);
while (request.next()) {
buffer.add(request.readNextBean());
Expand All @@ -82,6 +98,9 @@ public <T> void findEach(DtoQueryRequest<T> request, int batchSize, Consumer<Lis

public <T> void findEachWhile(DtoQueryRequest<T> request, Predicate<T> consumer) {
try {
if (defaultFetchSizeFindEach > 0) {
request.setDefaultFetchBuffer(defaultFetchSizeFindEach);
}
request.executeSql(binder, SpiQuery.Type.ITERATE);
while (request.next()) {
if (!consumer.test(request.readNextBean())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,13 @@ public BindParams getBindParams() {
return bindParams;
}

@Override
public void setDefaultFetchBuffer(int fetchSize) {
if (bufferFetchSizeHint == 0) {
bufferFetchSizeHint = fetchSize;
}
}

@Override
public DtoQuery<T> setBufferFetchSizeHint(int bufferFetchSizeHint) {
this.bufferFetchSizeHint = bufferFetchSizeHint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ public BindParams getBindParams() {
return bindParams;
}

@Override
public final void setDefaultFetchBuffer(int fetchSize) {
if (bufferFetchSizeHint == 0) {
bufferFetchSizeHint = fetchSize;
}
}

@Override
public DefaultRelationalQuery setBufferFetchSizeHint(int bufferFetchSizeHint) {
this.bufferFetchSizeHint = bufferFetchSizeHint;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package org.tests.query.other;

import io.ebean.DB;
import io.ebean.Transaction;
import io.ebean.annotation.Platform;
import io.ebean.xtest.BaseTestCase;
import io.ebean.xtest.ForPlatform;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.tests.model.basic.EBasic;

import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.assertj.core.api.Assertions.assertThat;

/**
* This test ensures that MariaDb uses the correct streaming result in findEach queries.
* See Issue #56.
*/
public class TestFindIterateMariaDb extends BaseTestCase {

@BeforeEach
public void setup() {
// we need at least > fetchSize beans
if (DB.find(EBasic.class).findCount() < 1000) {
for (int i = 0; i < 1000; i++) {
EBasic dumbModel = new EBasic();
dumbModel.setName("Goodbye now");
DB.save(dumbModel);
}
}
}

public static class DtoBasic {
private String name;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}

@Test
@ForPlatform(Platform.MARIADB)
public void testStreamingOnSqlQueryFindEach() {

try (Transaction txn = DB.beginTransaction()) {
AtomicBoolean mariadbStreaming = new AtomicBoolean();
DB.sqlQuery("select name from e_basic").findEach(bean -> {
if (!mariadbStreaming.get() && isMariaDbStreaming()) {
mariadbStreaming.set(true);
}
});
assertThat(mariadbStreaming.get()).isTrue();
}
}

@Test
@ForPlatform(Platform.MARIADB)
public void testStreamingOnOrmFindEach() {

try (Transaction txn = DB.beginTransaction()) {
AtomicBoolean mariadbStreaming = new AtomicBoolean();
DB.find(EBasic.class).findEach(bean -> {
if (!mariadbStreaming.get() && isMariaDbStreaming()) {
mariadbStreaming.set(true);
}
});
assertThat(mariadbStreaming.get()).isTrue();

}
}

@Test
@ForPlatform(Platform.MARIADB)
public void testStreamingOnOrmAsDtoFindEach() {

try (Transaction txn = DB.beginTransaction()) {
AtomicBoolean mariadbStreaming = new AtomicBoolean();
DB.find(EBasic.class).select("name").asDto(DtoBasic.class).findEach(bean -> {
if (!mariadbStreaming.get() && isMariaDbStreaming()) {
mariadbStreaming.set(true);
}
});
assertThat(mariadbStreaming.get()).isTrue();

}
}

@Test
@ForPlatform(Platform.MARIADB)
public void testStreamingOnDtoFindEach() {

try (Transaction txn = DB.beginTransaction()) {
AtomicBoolean mariadbStreaming = new AtomicBoolean();
DB.findDto(DtoBasic.class, "select name from e_basic").findEach(bean -> {
if (!mariadbStreaming.get() && isMariaDbStreaming()) {
mariadbStreaming.set(true);
}
});
assertThat(mariadbStreaming.get()).isTrue();

}
}
@Test
@ForPlatform(Platform.MARIADB)
public void testStreamingOnDtoFindEachWhile() {

try (Transaction txn = DB.beginTransaction()) {
AtomicBoolean mariadbStreaming = new AtomicBoolean();
DB.findDto(DtoBasic.class, "select name from e_basic").findEachWhile(bean -> {
if (!mariadbStreaming.get() && isMariaDbStreaming()) {
mariadbStreaming.set(true);
}
return false;
});
assertThat(mariadbStreaming.get()).isTrue();

}
}

@Test
@ForPlatform(Platform.MARIADB)
public void testStreamingOnDtoFindEachBatch() {

try (Transaction txn = DB.beginTransaction()) {
AtomicBoolean mariadbStreaming = new AtomicBoolean();
DB.findDto(DtoBasic.class, "select name from e_basic").findEach(2000, bean -> {
if (!mariadbStreaming.get() && isMariaDbStreaming()) {
mariadbStreaming.set(true);
}
});
assertThat(mariadbStreaming.get()).isTrue();

}
}

/**
* Take a look into the current connection. We are streaming, if there is a result set.
*/
private boolean isMariaDbStreaming() {
try {
org.mariadb.jdbc.Connection conn = Transaction.current().connection().unwrap(org.mariadb.jdbc.Connection.class);
org.mariadb.jdbc.client.impl.StandardClient client = (org.mariadb.jdbc.client.impl.StandardClient) conn.getClient();
Field field = client.getClass().getDeclaredField("streamMsg");
field.setAccessible(true);
return field.get(client) != null;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

}

0 comments on commit 4367546

Please sign in to comment.