From 1f48b3a6b32467c5608d81c3bd0f57b3674e7732 Mon Sep 17 00:00:00 2001 From: Rob Bygrave Date: Sat, 3 Feb 2024 08:20:58 +1300 Subject: [PATCH] (fix) Future queries do not trigger flush on BatchedPstmtHolder for #3319 This change is that findFutureCount, findFutureList, findFutureIds do not trigger a flush on BatchedPstmtHolder. This is to address the possible ConcurrentModificationException that could occur at BatchedPstmtHolder.closeStatements(BatchedPstmtHolder.java:153) --- .../java/io/ebeaninternal/api/SpiQuery.java | 10 +++ .../server/core/DefaultServer.java | 3 + .../server/query/DefaultOrmQueryEngine.java | 4 + .../server/querydefn/DefaultOrmQuery.java | 11 +++ .../tests/query/TestFindFutureRowCount.java | 76 +++++++++++++++++++ 5 files changed, 104 insertions(+) diff --git a/ebean-core/src/main/java/io/ebeaninternal/api/SpiQuery.java b/ebean-core/src/main/java/io/ebeaninternal/api/SpiQuery.java index 89f2173eb1..5669dcf579 100644 --- a/ebean-core/src/main/java/io/ebeaninternal/api/SpiQuery.java +++ b/ebean-core/src/main/java/io/ebeaninternal/api/SpiQuery.java @@ -654,6 +654,16 @@ public static TemporalMode of(SpiQuery query) { */ ObjectGraphNode parentNode(); + /** + * Set that this is a future query that will execute in the background. + */ + void usingFuture(); + + /** + * Return true if this is a future query. + */ + boolean isUsingFuture(); + /** * Return false when this is a lazy load or refresh query for a bean. *

diff --git a/ebean-core/src/main/java/io/ebeaninternal/server/core/DefaultServer.java b/ebean-core/src/main/java/io/ebeaninternal/server/core/DefaultServer.java index 8b2dc60e31..c4ee13dc69 100644 --- a/ebean-core/src/main/java/io/ebeaninternal/server/core/DefaultServer.java +++ b/ebean-core/src/main/java/io/ebeaninternal/server/core/DefaultServer.java @@ -1273,6 +1273,7 @@ public int update(SpiQuery query) { @Override public FutureRowCount findFutureCount(SpiQuery query) { SpiQuery copy = query.copy(); + copy.usingFuture(); boolean createdTransaction = false; SpiTransaction transaction = query.transaction(); if (transaction == null) { @@ -1291,6 +1292,7 @@ public FutureRowCount findFutureCount(SpiQuery query) { @Override public FutureIds findFutureIds(SpiQuery query) { SpiQuery copy = query.copy(); + copy.usingFuture(); boolean createdTransaction = false; SpiTransaction transaction = query.transaction(); if (transaction == null) { @@ -1309,6 +1311,7 @@ public FutureIds findFutureIds(SpiQuery query) { @Override public FutureList findFutureList(SpiQuery query) { SpiQuery spiQuery = query.copy(); + spiQuery.usingFuture(); // FutureList query always run in it's own persistence content spiQuery.setPersistenceContext(new DefaultPersistenceContext()); if (!spiQuery.isDisableReadAudit()) { diff --git a/ebean-core/src/main/java/io/ebeaninternal/server/query/DefaultOrmQueryEngine.java b/ebean-core/src/main/java/io/ebeaninternal/server/query/DefaultOrmQueryEngine.java index 48eb512a86..3679950862 100644 --- a/ebean-core/src/main/java/io/ebeaninternal/server/query/DefaultOrmQueryEngine.java +++ b/ebean-core/src/main/java/io/ebeaninternal/server/query/DefaultOrmQueryEngine.java @@ -51,6 +51,10 @@ public boolean isMultiValueSupported(Class cls) { * Flushes the jdbc batch by default unless explicitly turned off on the transaction. */ private void flushJdbcBatchOnQuery(OrmQueryRequest request) { + if (request.query().isUsingFuture()) { + // future queries never invoke a flush + return; + } SpiTransaction t = request.transaction(); if (t.isFlushOnQuery()) { // before we perform a query, we need to flush any diff --git a/ebean-core/src/main/java/io/ebeaninternal/server/querydefn/DefaultOrmQuery.java b/ebean-core/src/main/java/io/ebeaninternal/server/querydefn/DefaultOrmQuery.java index 0e9e5f7d44..f877be1267 100644 --- a/ebean-core/src/main/java/io/ebeaninternal/server/querydefn/DefaultOrmQuery.java +++ b/ebean-core/src/main/java/io/ebeaninternal/server/querydefn/DefaultOrmQuery.java @@ -58,6 +58,7 @@ public class DefaultOrmQuery extends AbstractQuery implements SpiQuery { private Type type; private String label; private Mode mode = Mode.NORMAL; + private boolean usingFuture; private Object tenantId; /** * Holds query in structured form. @@ -971,6 +972,16 @@ public final void setMode(Mode mode) { this.mode = mode; } + @Override + public final void usingFuture() { + this.usingFuture = true; + } + + @Override + public final boolean isUsingFuture() { + return usingFuture; + } + @Override public final boolean isUsageProfiling() { return usageProfiling; diff --git a/ebean-test/src/test/java/org/tests/query/TestFindFutureRowCount.java b/ebean-test/src/test/java/org/tests/query/TestFindFutureRowCount.java index 886e7cf00a..5b0f6f384d 100644 --- a/ebean-test/src/test/java/org/tests/query/TestFindFutureRowCount.java +++ b/ebean-test/src/test/java/org/tests/query/TestFindFutureRowCount.java @@ -6,11 +6,87 @@ import org.tests.model.basic.EBasic; import java.util.List; +import java.util.concurrent.ExecutionException; import static org.assertj.core.api.Assertions.assertThat; class TestFindFutureRowCount extends BaseTestCase { + @Test + void futureCount_doesNotTriggerFlush() throws ExecutionException, InterruptedException { + try (Transaction transaction = DB.beginTransaction()) { + transaction.setBatchMode(true); + transaction.setBatchSize(100); + + EBasic basic = new EBasic("count_batched"); + DB.save(basic); + + FutureRowCount futureCount = DB.find(EBasic.class) + .where().eq("name", "count_batched") + .findFutureCount(); + + assertThat(futureCount.get()).isEqualTo(0); + + transaction.flush(); + + FutureRowCount futureCountAfter = DB.find(EBasic.class) + .where().eq("name", "count_batched") + .findFutureCount(); + + assertThat(futureCountAfter.get()).isEqualTo(1); + } + } + + @Test + void futureList_doesNotTriggerFlush() throws ExecutionException, InterruptedException { + try (Transaction transaction = DB.beginTransaction()) { + transaction.setBatchMode(true); + transaction.setBatchSize(100); + + EBasic basic = new EBasic("list_batched"); + DB.save(basic); + + FutureList futureList = DB.find(EBasic.class) + .where().eq("name", "list_batched") + .findFutureList(); + + assertThat(futureList.get()).isEmpty(); + + transaction.flush(); + + FutureList futureListAfter = DB.find(EBasic.class) + .where().eq("name", "list_batched") + .findFutureList(); + + assertThat(futureListAfter.get()).hasSize(1); + } + } + + @Test + void futureIds_doesNotTriggerFlush() throws ExecutionException, InterruptedException { + try (Transaction transaction = DB.beginTransaction()) { + transaction.setBatchMode(true); + transaction.setBatchSize(100); + + EBasic basic = new EBasic("ids_batched"); + DB.save(basic); + + FutureIds futureIds = DB.find(EBasic.class) + .where().eq("name", "ids_batched") + .findFutureIds(); + + assertThat(futureIds.get()).isEmpty(); + + transaction.flush(); + + FutureIds futureIdsAfter = DB.find(EBasic.class) + .where().eq("name", "ids_batched") + .findFutureIds(); + + assertThat(futureIdsAfter.get()).hasSize(1); + } + } + @Test void count_when_inTransaction() throws Exception { try (Transaction transaction = DB.beginTransaction()) {