Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

set defaultFetchBuffer in findEach/findList in relational/orm Queries #3523

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,9 @@ public interface SpiSqlBinding extends SpiCancelableQuery {
*/
int getBufferFetchSizeHint();

/**
* Set the JDBC fetchSize buffer hint if not explicitly set.
*/
void setDefaultFetchBuffer(int fetchSize);

}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ public String getBindLog() {

protected abstract void requestComplete();

/**
* Set the JDBC buffer fetchSize hint if not set explicitly.
*/
public void setDefaultFetchBuffer(int fetchSize) {
query.setDefaultFetchBuffer(fetchSize);
}
/**
* Close the underlying resources.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,12 @@ AutoTuneService createAutoTuneService(SpiEbeanServer server) {
}

DtoQueryEngine createDtoQueryEngine() {
return new DtoQueryEngine(binder);
return new DtoQueryEngine(binder, config.getJdbcFetchSizeFindEach(), config.getJdbcFetchSizeFindList());
}

RelationalQueryEngine createRelationalQueryEngine() {
return new DefaultRelationalQueryEngine(binder, config.getDatabaseBooleanTrue(), config.getPlatformConfig().getDbUuid().useBinaryOptimized());
return new DefaultRelationalQueryEngine(binder, config.getDatabaseBooleanTrue(), config.getPlatformConfig().getDbUuid().useBinaryOptimized(),
config.getJdbcFetchSizeFindEach(), config.getJdbcFetchSizeFindList());
}

OrmQueryEngine createOrmQueryEngine() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,17 @@ public final class DefaultRelationalQueryEngine implements RelationalQueryEngine
private final String dbTrueValue;
private final boolean binaryOptimizedUUID;
private final TimedMetricMap timedMetricMap;
private final int defaultFetchSizeFindEach;
private final int defaultFetchSizeFindList;

public DefaultRelationalQueryEngine(Binder binder, String dbTrueValue, boolean binaryOptimizedUUID) {
public DefaultRelationalQueryEngine(Binder binder, String dbTrueValue, boolean binaryOptimizedUUID,
int defaultFetchSizeFindEach, int defaultFetchSizeFindList) {
this.binder = binder;
this.dbTrueValue = dbTrueValue == null ? "true" : dbTrueValue;
this.binaryOptimizedUUID = binaryOptimizedUUID;
this.timedMetricMap = MetricFactory.get().createTimedMetricMap("sql.query.");
this.defaultFetchSizeFindEach = defaultFetchSizeFindEach;
this.defaultFetchSizeFindList = defaultFetchSizeFindList;
}

@Override
Expand All @@ -59,6 +64,9 @@ private String errMsg(String msg, String sql) {
@Override
public void findEach(RelationalQueryRequest request, RowConsumer consumer) {
try {
if (defaultFetchSizeFindEach > 0) {
request.setDefaultFetchBuffer(defaultFetchSizeFindEach);
}
request.executeSql(binder, SpiQuery.Type.ITERATE);
request.mapEach(consumer);
request.logSummary();
Expand All @@ -74,6 +82,9 @@ public void findEach(RelationalQueryRequest request, RowConsumer consumer) {
@Override
public <T> void findEach(RelationalQueryRequest request, RowReader<T> reader, Predicate<T> consumer) {
try {
if (defaultFetchSizeFindEach > 0) {
request.setDefaultFetchBuffer(defaultFetchSizeFindEach);
}
request.executeSql(binder, SpiQuery.Type.ITERATE);
while (request.next()) {
if (!consumer.test(reader.read())) {
Expand Down Expand Up @@ -109,6 +120,9 @@ public <T> T findOne(RelationalQueryRequest request, RowMapper<T> mapper) {
@Override
public <T> List<T> findList(RelationalQueryRequest request, RowReader<T> reader) {
try {
if (defaultFetchSizeFindList > 0) {
request.setDefaultFetchBuffer(defaultFetchSizeFindList);
}
request.executeSql(binder, SpiQuery.Type.LIST);
List<T> rows = new ArrayList<>();
while (request.next()) {
Expand All @@ -129,6 +143,9 @@ public <T> List<T> findList(RelationalQueryRequest request, RowReader<T> reader)
public <T> T findSingleAttribute(RelationalQueryRequest request, Class<T> cls) {
ScalarType<T> scalarType = (ScalarType<T>) binder.getScalarType(cls);
try {
if (defaultFetchSizeFindList > 0) {
request.setDefaultFetchBuffer(defaultFetchSizeFindList);
}
request.executeSql(binder, SpiQuery.Type.ATTRIBUTE);
final DataReader dataReader = binder.createDataReader(request.resultSet());
T value = null;
Expand All @@ -151,6 +168,9 @@ public <T> T findSingleAttribute(RelationalQueryRequest request, Class<T> cls) {
public <T> List<T> findSingleAttributeList(RelationalQueryRequest request, Class<T> cls) {
ScalarType<T> scalarType = (ScalarType<T>) binder.getScalarType(cls);
try {
if (defaultFetchSizeFindList > 0) {
request.setDefaultFetchBuffer(defaultFetchSizeFindList);
}
request.executeSql(binder, SpiQuery.Type.ATTRIBUTE);
final DataReader dataReader = binder.createDataReader(request.resultSet());
List<T> rows = new ArrayList<>();
Expand All @@ -173,6 +193,9 @@ public <T> List<T> findSingleAttributeList(RelationalQueryRequest request, Class
public <T> void findSingleAttributeEach(RelationalQueryRequest request, Class<T> cls, Consumer<T> consumer) {
ScalarType<T> scalarType = (ScalarType<T>) binder.getScalarType(cls);
try {
if (defaultFetchSizeFindEach > 0) {
request.setDefaultFetchBuffer(defaultFetchSizeFindEach);
}
request.executeSql(binder, SpiQuery.Type.ATTRIBUTE);
final DataReader dataReader = binder.createDataReader(request.resultSet());
while (dataReader.next()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import io.ebeaninternal.api.SpiQuery;
import io.ebeaninternal.server.core.DtoQueryRequest;
import io.ebeaninternal.server.persist.Binder;

import jakarta.persistence.PersistenceException;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -15,13 +15,20 @@
public final class DtoQueryEngine {

private final Binder binder;
private final int defaultFetchSizeFindEach;
private final int defaultFetchSizeFindList;

public DtoQueryEngine(Binder binder) {
public DtoQueryEngine(Binder binder, int defaultFetchSizeFindEach, int defaultFetchSizeFindList) {
this.binder = binder;
this.defaultFetchSizeFindEach = defaultFetchSizeFindEach;
this.defaultFetchSizeFindList = defaultFetchSizeFindList;
}

public <T> List<T> findList(DtoQueryRequest<T> request) {
try {
if (defaultFetchSizeFindList > 0) {
request.setDefaultFetchBuffer(defaultFetchSizeFindList);
}
request.executeSql(binder, SpiQuery.Type.LIST);
List<T> rows = new ArrayList<>();
while (request.next()) {
Expand All @@ -38,6 +45,9 @@ public <T> List<T> findList(DtoQueryRequest<T> request) {

public <T> QueryIterator<T> findIterate(DtoQueryRequest<T> request) {
try {
if (defaultFetchSizeFindEach > 0) {
request.setDefaultFetchBuffer(defaultFetchSizeFindEach);
}
request.executeSql(binder, SpiQuery.Type.ITERATE);
return new DtoQueryIterator<>(request);
} catch (SQLException e) {
Expand All @@ -46,7 +56,10 @@ public <T> QueryIterator<T> findIterate(DtoQueryRequest<T> request) {
}

public <T> void findEach(DtoQueryRequest<T> request, Consumer<T> consumer) {
try {
try {
if (defaultFetchSizeFindEach > 0) {
request.setDefaultFetchBuffer(defaultFetchSizeFindEach);
}
request.executeSql(binder, SpiQuery.Type.ITERATE);
while (request.next()) {
consumer.accept(request.readNextBean());
Expand All @@ -61,6 +74,9 @@ public <T> void findEach(DtoQueryRequest<T> request, Consumer<T> consumer) {
public <T> void findEach(DtoQueryRequest<T> request, int batchSize, Consumer<List<T>> consumer) {
try {
List<T> buffer = new ArrayList<>();
if (defaultFetchSizeFindEach > 0) {
request.setDefaultFetchBuffer(defaultFetchSizeFindEach);
}
request.executeSql(binder, SpiQuery.Type.ITERATE);
while (request.next()) {
buffer.add(request.readNextBean());
Expand All @@ -82,6 +98,9 @@ public <T> void findEach(DtoQueryRequest<T> request, int batchSize, Consumer<Lis

public <T> void findEachWhile(DtoQueryRequest<T> request, Predicate<T> consumer) {
try {
if (defaultFetchSizeFindEach > 0) {
request.setDefaultFetchBuffer(defaultFetchSizeFindEach);
}
request.executeSql(binder, SpiQuery.Type.ITERATE);
while (request.next()) {
if (!consumer.test(request.readNextBean())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,13 @@ public BindParams getBindParams() {
return bindParams;
}

@Override
public void setDefaultFetchBuffer(int fetchSize) {
if (bufferFetchSizeHint == 0) {
bufferFetchSizeHint = fetchSize;
}
}

@Override
public DtoQuery<T> setBufferFetchSizeHint(int bufferFetchSizeHint) {
this.bufferFetchSizeHint = bufferFetchSizeHint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ public BindParams getBindParams() {
return bindParams;
}

@Override
public final void setDefaultFetchBuffer(int fetchSize) {
if (bufferFetchSizeHint == 0) {
bufferFetchSizeHint = fetchSize;
}
}

@Override
public DefaultRelationalQuery setBufferFetchSizeHint(int bufferFetchSizeHint) {
this.bufferFetchSizeHint = bufferFetchSizeHint;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package org.tests.query.other;

import io.ebean.DB;
import io.ebean.Transaction;
import io.ebean.annotation.Platform;
import io.ebean.xtest.BaseTestCase;
import io.ebean.xtest.ForPlatform;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.tests.model.basic.EBasic;

import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.assertj.core.api.Assertions.assertThat;

/**
* This test ensures that MariaDb uses the correct streaming result in findEach queries.
* See Issue #56.
*/
public class TestFindIterateMariaDb extends BaseTestCase {

@BeforeEach
public void setup() {
// we need at least > fetchSize beans
if (DB.find(EBasic.class).findCount() < 1000) {
for (int i = 0; i < 1000; i++) {
EBasic dumbModel = new EBasic();
dumbModel.setName("Goodbye now");
DB.save(dumbModel);
}
}
}

public static class DtoBasic {
private String name;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}

@Test
@ForPlatform(Platform.MARIADB)
public void testStreamingOnSqlQueryFindEach() {

try (Transaction txn = DB.beginTransaction()) {
AtomicBoolean mariadbStreaming = new AtomicBoolean();
DB.sqlQuery("select name from e_basic").findEach(bean -> {
if (!mariadbStreaming.get() && isMariaDbStreaming()) {
mariadbStreaming.set(true);
}
});
assertThat(mariadbStreaming.get()).isTrue();
}
}

@Test
@ForPlatform(Platform.MARIADB)
public void testStreamingOnOrmFindEach() {

try (Transaction txn = DB.beginTransaction()) {
AtomicBoolean mariadbStreaming = new AtomicBoolean();
DB.find(EBasic.class).findEach(bean -> {
if (!mariadbStreaming.get() && isMariaDbStreaming()) {
mariadbStreaming.set(true);
}
});
assertThat(mariadbStreaming.get()).isTrue();

}
}

@Test
@ForPlatform(Platform.MARIADB)
public void testStreamingOnOrmAsDtoFindEach() {

try (Transaction txn = DB.beginTransaction()) {
AtomicBoolean mariadbStreaming = new AtomicBoolean();
DB.find(EBasic.class).select("name").asDto(DtoBasic.class).findEach(bean -> {
if (!mariadbStreaming.get() && isMariaDbStreaming()) {
mariadbStreaming.set(true);
}
});
assertThat(mariadbStreaming.get()).isTrue();

}
}

@Test
@ForPlatform(Platform.MARIADB)
public void testStreamingOnDtoFindEach() {

try (Transaction txn = DB.beginTransaction()) {
AtomicBoolean mariadbStreaming = new AtomicBoolean();
DB.findDto(DtoBasic.class, "select name from e_basic").findEach(bean -> {
if (!mariadbStreaming.get() && isMariaDbStreaming()) {
mariadbStreaming.set(true);
}
});
assertThat(mariadbStreaming.get()).isTrue();

}
}
@Test
@ForPlatform(Platform.MARIADB)
public void testStreamingOnDtoFindEachWhile() {

try (Transaction txn = DB.beginTransaction()) {
AtomicBoolean mariadbStreaming = new AtomicBoolean();
DB.findDto(DtoBasic.class, "select name from e_basic").findEachWhile(bean -> {
if (!mariadbStreaming.get() && isMariaDbStreaming()) {
mariadbStreaming.set(true);
}
return false;
});
assertThat(mariadbStreaming.get()).isTrue();

}
}

@Test
@ForPlatform(Platform.MARIADB)
public void testStreamingOnDtoFindEachBatch() {

try (Transaction txn = DB.beginTransaction()) {
AtomicBoolean mariadbStreaming = new AtomicBoolean();
DB.findDto(DtoBasic.class, "select name from e_basic").findEach(2000, bean -> {
if (!mariadbStreaming.get() && isMariaDbStreaming()) {
mariadbStreaming.set(true);
}
});
assertThat(mariadbStreaming.get()).isTrue();

}
}

/**
* Take a look into the current connection. We are streaming, if there is a result set.
*/
private boolean isMariaDbStreaming() {
try {
org.mariadb.jdbc.Connection conn = Transaction.current().connection().unwrap(org.mariadb.jdbc.Connection.class);
org.mariadb.jdbc.client.impl.StandardClient client = (org.mariadb.jdbc.client.impl.StandardClient) conn.getClient();
Field field = client.getClass().getDeclaredField("streamMsg");
field.setAccessible(true);
return field.get(client) != null;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

}