diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraBatchOperations.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraBatchOperations.java index 068e360cd..77e961b4a 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraBatchOperations.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraBatchOperations.java @@ -20,10 +20,12 @@ import org.springframework.data.cassandra.core.cql.WriteOptions; import org.springframework.util.Assert; +import com.datastax.oss.driver.api.core.cql.BatchableStatement; + /** * Batch operations for insert/update/delete actions on a table. {@link CassandraBatchOperations} use logged Cassandra - * {@code BATCH}es for single entities and collections of entities. A {@link CassandraBatchOperations} instance cannot - * be modified/used once it was executed. + * {@code BATCH}es for single entities, collections of entities, and {@link BatchableStatement statements}. A + * {@link CassandraBatchOperations} instance cannot be modified/used once it was executed. *

* Batches are atomic by default. In the context of a Cassandra batch operation, atomic means that if any of the batch * succeeds, all of it will. Statement order does not matter within a batch. {@link CassandraBatchOperations} applies @@ -57,6 +59,36 @@ public interface CassandraBatchOperations { */ CassandraBatchOperations withTimestamp(long timestamp); + /** + * Add a {@link BatchableStatement statement} to the batch. + * + * @param statement the batchable statement such as {@code INSERT}, {@code UPDATE}, {@code DELETE}. + * @return {@code this} {@link CassandraBatchOperations}. + * @throws IllegalStateException if the batch was already executed. + * @since 4.4 + */ + CassandraBatchOperations addStatement(BatchableStatement statement); + + /** + * Add {@link BatchableStatement statements} to the batch. + * + * @param statements the batchable statements such as {@code INSERT}, {@code UPDATE}, {@code DELETE}. + * @return {@code this} {@link CassandraBatchOperations}. + * @throws IllegalStateException if the batch was already executed. + * @since 4.4 + */ + CassandraBatchOperations addStatements(BatchableStatement... statements); + + /** + * Add {@link BatchableStatement statements} to the batch. + * + * @param statements the batchable statements such as {@code INSERT}, {@code UPDATE}, {@code DELETE}. + * @return {@code this} {@link CassandraBatchOperations}. + * @throws IllegalStateException if the batch was already executed. + * @since 4.4 + */ + CassandraBatchOperations addStatements(Iterable> statements); + /** * Add an insert to the batch. * diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraBatchTemplate.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraBatchTemplate.java index 8189da6dc..3122181dc 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraBatchTemplate.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraBatchTemplate.java @@ -27,11 +27,14 @@ import org.springframework.data.cassandra.core.mapping.CassandraPersistentEntity; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; +import org.springframework.util.StringUtils; import com.datastax.oss.driver.api.core.cql.BatchStatement; import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder; import com.datastax.oss.driver.api.core.cql.BatchType; +import com.datastax.oss.driver.api.core.cql.BatchableStatement; import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.Statement; /** * Default implementation for {@link CassandraBatchOperations}. @@ -128,6 +131,37 @@ public CassandraBatchOperations withTimestamp(long timestamp) { return this; } + @Override + public CassandraBatchOperations addStatement(BatchableStatement statement) { + + Assert.notNull(statement, "Statement must not be null"); + + this.batch.addStatement(statement); + + return this; + } + + @Override + public CassandraBatchOperations addStatements(BatchableStatement... statements) { + + Assert.notNull(statements, "Statements must not be null"); + + this.batch.addStatements(statements); + + return this; + } + + @Override + @SuppressWarnings("unchecked") + public CassandraBatchOperations addStatements(Iterable> statements) { + + Assert.notNull(statements, "Statements must not be null"); + + this.batch.addStatements((Iterable>) statements); + + return this; + } + @Override public CassandraBatchOperations insert(Object... entities) { @@ -147,6 +181,7 @@ public CassandraBatchOperations insert(Iterable entities, WriteOptions option assertNotExecuted(); Assert.notNull(entities, "Entities must not be null"); Assert.notNull(options, "WriteOptions must not be null"); + assertNotStatement("insert", entities); assertNotQueryOptions(entities); CassandraMappingContext mappingContext = getMappingContext(); @@ -154,6 +189,7 @@ public CassandraBatchOperations insert(Iterable entities, WriteOptions option for (Object entity : entities) { Assert.notNull(entity, "Entity must not be null"); + assertNotStatement("insert", entity); BasicCassandraPersistentEntity persistentEntity = mappingContext .getRequiredPersistentEntity(entity.getClass()); @@ -161,7 +197,7 @@ public CassandraBatchOperations insert(Iterable entities, WriteOptions option SimpleStatement insertQuery = getStatementFactory() .insert(entity, options, persistentEntity, persistentEntity.getTableName()).build(); - this.batch.addStatement(insertQuery); + addStatement(insertQuery); } return this; @@ -191,13 +227,14 @@ public CassandraBatchOperations update(Iterable entities, WriteOptions option for (Object entity : entities) { Assert.notNull(entity, "Entity must not be null"); + assertNotStatement("update", entity); CassandraPersistentEntity persistentEntity = getRequiredPersistentEntity(entity.getClass()); SimpleStatement update = getStatementFactory() .update(entity, options, persistentEntity, persistentEntity.getTableName()).build(); - this.batch.addStatement(update); + addStatement(update); } return this; @@ -227,13 +264,14 @@ public CassandraBatchOperations delete(Iterable entities, WriteOptions option for (Object entity : entities) { Assert.notNull(entity, "Entity must not be null"); + assertNotStatement("delete", entity); CassandraPersistentEntity persistentEntity = getRequiredPersistentEntity(entity.getClass()); SimpleStatement delete = getStatementFactory() .delete(entity, options, this.getConverter(), persistentEntity.getTableName()).build(); - this.batch.addStatement(delete); + addStatement(delete); } return this; @@ -257,4 +295,11 @@ private void assertNotExecuted() { private CassandraPersistentEntity getRequiredPersistentEntity(Class entityType) { return getMappingContext().getRequiredPersistentEntity(ClassUtils.getUserClass(entityType)); } + + private static void assertNotStatement(String operation, Object o) { + if (o instanceof Statement) { + throw new IllegalArgumentException(String.format("%s cannot use a Statement: %s. Use only entities for %s", + StringUtils.capitalize(operation), ClassUtils.getDescriptiveType(o), operation)); + } + } } diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchOperations.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchOperations.java index c0d5abf9b..fb7c02cab 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchOperations.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchOperations.java @@ -15,18 +15,22 @@ */ package org.springframework.data.cassandra.core; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.Collections; import org.reactivestreams.Subscriber; + import org.springframework.data.cassandra.core.cql.WriteOptions; import org.springframework.util.Assert; +import com.datastax.oss.driver.api.core.cql.BatchableStatement; + /** * Reactive Batch operations for insert/update/delete actions on a table. {@link ReactiveCassandraBatchOperations} use - * logged Cassandra {@code BATCH}es for single entities and collections of entities. A - * {@link ReactiveCassandraBatchOperations} instance cannot be modified/used once it was executed. + * logged Cassandra {@code BATCH}es for single entities, collections of entities, and {@link BatchableStatement + * statements}. A {@link ReactiveCassandraBatchOperations} instance cannot be modified/used once it was executed. *

* Batches are atomic by default. In the context of a Cassandra batch operation, atomic means that if any of the batch * succeeds, all of it will. Statement order does not matter within a batch. {@link ReactiveCassandraBatchOperations} @@ -61,6 +65,62 @@ public interface ReactiveCassandraBatchOperations { */ ReactiveCassandraBatchOperations withTimestamp(long timestamp); + /** + * Add a {@link BatchableStatement statement} to the batch. + * + * @param statement the batchable statement such as {@code INSERT}, {@code UPDATE}, {@code DELETE}. + * @return {@code this} {@link ReactiveCassandraBatchOperations}. + * @throws IllegalStateException if the batch was already executed. + * @since 4.4 + */ + default ReactiveCassandraBatchOperations addStatement(BatchableStatement statement) { + return addStatement(Mono.just(statement)); + } + + /** + * Add a Mono of {@link BatchableStatement statement} to the batch. + * + * @param statement the batchable statement such as {@code INSERT}, {@code UPDATE}, {@code DELETE}. + * @return {@code this} {@link ReactiveCassandraBatchOperations}. + * @throws IllegalStateException if the batch was already executed. + * @since 4.4 + */ + ReactiveCassandraBatchOperations addStatement(Mono> statement); + + /** + * Add {@link BatchableStatement statements} to the batch. + * + * @param statements the batchable statements such as {@code INSERT}, {@code UPDATE}, {@code DELETE}. + * @return {@code this} {@link ReactiveCassandraBatchOperations}. + * @throws IllegalStateException if the batch was already executed. + * @since 4.4 + */ + default ReactiveCassandraBatchOperations addStatements(BatchableStatement... statements) { + return addStatements(Flux.fromArray(statements).toIterable()); + } + + /** + * Add {@link BatchableStatement statements} to the batch. + * + * @param statements the batchable statements such as {@code INSERT}, {@code UPDATE}, {@code DELETE}. + * @return {@code this} {@link ReactiveCassandraBatchOperations}. + * @throws IllegalStateException if the batch was already executed. + * @since 4.4 + */ + default ReactiveCassandraBatchOperations addStatements(Iterable> statements) { + return addStatements(Mono.just(statements)); + } + + /** + * Add Mono of {@link BatchableStatement statements} to the batch. + * + * @param statements the batchable statements such as {@code INSERT}, {@code UPDATE}, {@code DELETE}. + * @return {@code this} {@link ReactiveCassandraBatchOperations}. + * @throws IllegalStateException if the batch was already executed. + * @since 4.4 + */ + ReactiveCassandraBatchOperations addStatements(Mono>> statements); + /** * Add an insert to the batch. * diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchTemplate.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchTemplate.java index a88b6ed2a..c0ca8d86c 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchTemplate.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchTemplate.java @@ -34,12 +34,14 @@ import org.springframework.data.cassandra.core.mapping.CassandraPersistentEntity; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; +import org.springframework.util.StringUtils; import com.datastax.oss.driver.api.core.cql.BatchStatement; import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder; import com.datastax.oss.driver.api.core.cql.BatchType; import com.datastax.oss.driver.api.core.cql.BatchableStatement; import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.Statement; /** * Default implementation for {@link ReactiveCassandraBatchOperations}. @@ -59,7 +61,7 @@ class ReactiveCassandraBatchTemplate implements ReactiveCassandraBatchOperations private final CassandraMappingContext mappingContext; - private final List>>> batchMonos = new CopyOnWriteArrayList<>(); + private final List>>> batchMonos = new CopyOnWriteArrayList<>(); private final ReactiveCassandraOperations operations; @@ -126,6 +128,7 @@ protected StatementFactory getStatementFactory() { } @Override + @SuppressWarnings("unchecked") public Mono execute() { return Mono.defer(() -> { @@ -140,7 +143,6 @@ public Mono execute() { this.batch.addStatements((List>) statements); return this.operations.getReactiveCqlOperations().queryForResultSet(this.batch.build()); - }) // .flatMap(resultSet -> resultSet.rows().collectList() .map(rows -> new WriteResult(resultSet.getAllExecutionInfo(), resultSet.wasApplied(), rows))); @@ -159,6 +161,27 @@ public ReactiveCassandraBatchOperations withTimestamp(long timestamp) { return this; } + @Override + public ReactiveCassandraBatchOperations addStatement(Mono> statement) { + + Assert.notNull(statement, "Statement mono must not be null"); + + this.batchMonos.add(statement.map(List::of)); + + return this; + } + + @Override + public ReactiveCassandraBatchOperations addStatements( + Mono>> statements) { + + Assert.notNull(statements, "Statements mono must not be null"); + + this.batchMonos.add(statements); + + return this; + } + @Override public ReactiveCassandraBatchOperations insert(Object... entities) { @@ -185,7 +208,7 @@ public ReactiveCassandraBatchOperations insert(Iterable entities, WriteOption Assert.notNull(options, "WriteOptions must not be null"); assertNotQueryOptions(entities); - this.batchMonos.add(Mono.just(doInsert(entities, options))); + addStatements(doInsert(entities, options)); return this; } @@ -197,7 +220,7 @@ public ReactiveCassandraBatchOperations insert(Mono> entit Assert.notNull(entities, "Entities must not be null"); Assert.notNull(options, "WriteOptions must not be null"); - this.batchMonos.add(entities.map(entity -> doInsert(entity, options))); + addStatements(entities.map(entity -> doInsert(entity, options))); return this; } @@ -210,6 +233,7 @@ private Collection doInsert(Iterable entities, WriteOptions for (Object entity : entities) { Assert.notNull(entity, "Entity must not be null"); + assertNotStatement("insert", entity); BasicCassandraPersistentEntity persistentEntity = mappingContext .getRequiredPersistentEntity(entity.getClass()); @@ -249,7 +273,7 @@ public ReactiveCassandraBatchOperations update(Iterable entities, WriteOption Assert.notNull(options, "WriteOptions must not be null"); assertNotQueryOptions(entities); - this.batchMonos.add(Mono.just(doUpdate(entities, options))); + addStatements(Mono.just(doUpdate(entities, options))); return this; } @@ -261,7 +285,7 @@ public ReactiveCassandraBatchOperations update(Mono> entit Assert.notNull(entities, "Entities must not be null"); Assert.notNull(options, "WriteOptions must not be null"); - this.batchMonos.add(entities.map(entity -> doUpdate(entity, options))); + addStatements(entities.map(entity -> doUpdate(entity, options))); return this; } @@ -273,6 +297,7 @@ private Collection doUpdate(Iterable entities, WriteOptions for (Object entity : entities) { Assert.notNull(entity, "Entity must not be null"); + assertNotStatement("update", entity); CassandraPersistentEntity persistentEntity = getRequiredPersistentEntity(entity.getClass()); @@ -311,7 +336,7 @@ public ReactiveCassandraBatchOperations delete(Iterable entities, WriteOption Assert.notNull(options, "WriteOptions must not be null"); assertNotQueryOptions(entities); - this.batchMonos.add(Mono.just(doDelete(entities, options))); + addStatements(Mono.just(doDelete(entities, options))); return this; } @@ -323,7 +348,7 @@ public ReactiveCassandraBatchOperations delete(Mono> entit Assert.notNull(entities, "Entities must not be null"); Assert.notNull(options, "WriteOptions must not be null"); - this.batchMonos.add(entities.map(it -> doDelete(it, options))); + addStatements(entities.map(it -> doDelete(it, options))); return this; } @@ -346,6 +371,7 @@ private Collection doDelete(Iterable entities, WriteOptions for (Object entity : entities) { Assert.notNull(entity, "Entity must not be null"); + assertNotStatement("delete", entity); CassandraPersistentEntity persistentEntity = getRequiredPersistentEntity(entity.getClass()); diff --git a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/CassandraBatchTemplateIntegrationTests.java b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/CassandraBatchTemplateIntegrationTests.java index 98b0f1394..628805a25 100644 --- a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/CassandraBatchTemplateIntegrationTests.java +++ b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/CassandraBatchTemplateIntegrationTests.java @@ -18,10 +18,12 @@ import static org.assertj.core.api.Assertions.*; import java.util.Arrays; +import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; + import org.springframework.data.cassandra.core.cql.WriteOptions; import org.springframework.data.cassandra.domain.FlatGroup; import org.springframework.data.cassandra.domain.Group; @@ -32,6 +34,7 @@ import com.datastax.oss.driver.api.core.cql.BatchType; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; /** * Integration tests for {@link CassandraBatchTemplate}. @@ -61,6 +64,33 @@ void setUp() throws Exception { template.insert(mike); } + @Test // GH-1499 + void shouldAddStatements() { + + CassandraBatchOperations batchOperations = new CassandraBatchTemplate(template, BatchType.LOGGED); + + List statements = List.of(SimpleStatement + .newInstance("INSERT INTO GROUP (groupname, hash_prefix, username) VALUES('users', '0x1', 'walter')")); + + batchOperations.addStatements(statements).execute(); + + Group loaded = template.selectOneById(walter.getId(), Group.class); + + assertThat(loaded.getId().getUsername()).isEqualTo(walter.getId().getUsername()); + } + + @Test // GH-1499 + void insertUpdateDeleteShouldRejectStatements() { + + CassandraBatchOperations batchOperations = new CassandraBatchTemplate(template, BatchType.LOGGED); + + SimpleStatement statement = SimpleStatement.newInstance("FOO"); + + assertThatIllegalArgumentException().isThrownBy(() -> batchOperations.insert(statement)); + assertThatIllegalArgumentException().isThrownBy(() -> batchOperations.update(statement)); + assertThatIllegalArgumentException().isThrownBy(() -> batchOperations.delete(statement)); + } + @Test // DATACASS-288 void shouldInsertEntities() { diff --git a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchTemplateIntegrationTests.java b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchTemplateIntegrationTests.java index 8dfadd5b3..f1d4e4534 100644 --- a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchTemplateIntegrationTests.java +++ b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchTemplateIntegrationTests.java @@ -23,11 +23,13 @@ import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; + import org.springframework.data.cassandra.ReactiveResultSet; import org.springframework.data.cassandra.core.convert.MappingCassandraConverter; import org.springframework.data.cassandra.core.cql.ReactiveCqlTemplate; @@ -41,6 +43,7 @@ import com.datastax.oss.driver.api.core.cql.BatchType; import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; /** * Integration tests for {@link ReactiveCassandraBatchTemplate}. @@ -76,6 +79,34 @@ void setUp() { .verifyComplete(); } + @Test // GH-1499 + void shouldAddStatements() { + + ReactiveCassandraBatchOperations batchOperations = new ReactiveCassandraBatchTemplate(template, BatchType.LOGGED); + + List statements = List.of(SimpleStatement + .newInstance("INSERT INTO GROUP (groupname, hash_prefix, username) VALUES('users', '0x1', 'walter')")); + + batchOperations.addStatements(statements).execute().as(StepVerifier::create).expectNextCount(1).verifyComplete(); + + template.selectOneById(walter.getId(), Group.class) // + .as(StepVerifier::create) // + .assertNext(loaded -> assertThat(loaded.getId().getUsername()).isEqualTo(walter.getId().getUsername())) + .verifyComplete(); + } + + @Test // GH-1499 + void insertUpdateDeleteShouldRejectStatements() { + + ReactiveCassandraBatchOperations batchOperations = new ReactiveCassandraBatchTemplate(template, BatchType.LOGGED); + + SimpleStatement statement = SimpleStatement.newInstance("FOO"); + + assertThatIllegalArgumentException().isThrownBy(() -> batchOperations.insert(statement)); + assertThatIllegalArgumentException().isThrownBy(() -> batchOperations.update(statement)); + assertThatIllegalArgumentException().isThrownBy(() -> batchOperations.delete(statement)); + } + @Test // #1135 void insertAsVarargsShouldRejectQueryOptions() { assertThatIllegalArgumentException()