Skip to content

Commit

Permalink
Accept BatchableStatement in CassandraBatchOperations.
Browse files Browse the repository at this point in the history
Closes #1499
  • Loading branch information
mp911de committed Jul 24, 2024
1 parent 3143970 commit 1a72451
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import org.springframework.data.cassandra.core.cql.WriteOptions;
import org.springframework.util.Assert;

import com.datastax.oss.driver.api.core.cql.BatchableStatement;

/**
* Batch operations for insert/update/delete actions on a table. {@link CassandraBatchOperations} use logged Cassandra
* {@code BATCH}es for single entities and collections of entities. A {@link CassandraBatchOperations} instance cannot
* be modified/used once it was executed.
* {@code BATCH}es for single entities, collections of entities, and {@link BatchableStatement statements}. A
* {@link CassandraBatchOperations} instance cannot be modified/used once it was executed.
* <p>
* Batches are atomic by default. In the context of a Cassandra batch operation, atomic means that if any of the batch
* succeeds, all of it will. Statement order does not matter within a batch. {@link CassandraBatchOperations} applies
Expand Down Expand Up @@ -57,6 +59,36 @@ public interface CassandraBatchOperations {
*/
CassandraBatchOperations withTimestamp(long timestamp);

/**
* Add a {@link BatchableStatement statement} to the batch.
*
* @param statement the batchable statement such as {@code INSERT}, {@code UPDATE}, {@code DELETE}.
* @return {@code this} {@link CassandraBatchOperations}.
* @throws IllegalStateException if the batch was already executed.
* @since 4.4
*/
CassandraBatchOperations addStatement(BatchableStatement<?> statement);

/**
* Add {@link BatchableStatement statements} to the batch.
*
* @param statements the batchable statements such as {@code INSERT}, {@code UPDATE}, {@code DELETE}.
* @return {@code this} {@link CassandraBatchOperations}.
* @throws IllegalStateException if the batch was already executed.
* @since 4.4
*/
CassandraBatchOperations addStatements(BatchableStatement<?>... statements);

/**
* Add {@link BatchableStatement statements} to the batch.
*
* @param statements the batchable statements such as {@code INSERT}, {@code UPDATE}, {@code DELETE}.
* @return {@code this} {@link CassandraBatchOperations}.
* @throws IllegalStateException if the batch was already executed.
* @since 4.4
*/
CassandraBatchOperations addStatements(Iterable<? extends BatchableStatement<?>> statements);

/**
* Add an insert to the batch.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
import org.springframework.data.cassandra.core.mapping.CassandraPersistentEntity;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;

import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.BatchableStatement;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;

/**
* Default implementation for {@link CassandraBatchOperations}.
Expand Down Expand Up @@ -128,6 +131,37 @@ public CassandraBatchOperations withTimestamp(long timestamp) {
return this;
}

@Override
public CassandraBatchOperations addStatement(BatchableStatement<?> statement) {

Assert.notNull(statement, "Statement must not be null");

this.batch.addStatement(statement);

return this;
}

@Override
public CassandraBatchOperations addStatements(BatchableStatement<?>... statements) {

Assert.notNull(statements, "Statements must not be null");

this.batch.addStatements(statements);

return this;
}

@Override
@SuppressWarnings("unchecked")
public CassandraBatchOperations addStatements(Iterable<? extends BatchableStatement<?>> statements) {

Assert.notNull(statements, "Statements must not be null");

this.batch.addStatements((Iterable<BatchableStatement<?>>) statements);

return this;
}

@Override
public CassandraBatchOperations insert(Object... entities) {

Expand All @@ -147,21 +181,23 @@ public CassandraBatchOperations insert(Iterable<?> entities, WriteOptions option
assertNotExecuted();
Assert.notNull(entities, "Entities must not be null");
Assert.notNull(options, "WriteOptions must not be null");
assertNotStatement("insert", entities);
assertNotQueryOptions(entities);

CassandraMappingContext mappingContext = getMappingContext();

for (Object entity : entities) {

Assert.notNull(entity, "Entity must not be null");
assertNotStatement("insert", entity);

BasicCassandraPersistentEntity<?> persistentEntity = mappingContext
.getRequiredPersistentEntity(entity.getClass());

SimpleStatement insertQuery = getStatementFactory()
.insert(entity, options, persistentEntity, persistentEntity.getTableName()).build();

this.batch.addStatement(insertQuery);
addStatement(insertQuery);
}

return this;
Expand Down Expand Up @@ -191,13 +227,14 @@ public CassandraBatchOperations update(Iterable<?> entities, WriteOptions option
for (Object entity : entities) {

Assert.notNull(entity, "Entity must not be null");
assertNotStatement("update", entity);

CassandraPersistentEntity<?> persistentEntity = getRequiredPersistentEntity(entity.getClass());

SimpleStatement update = getStatementFactory()
.update(entity, options, persistentEntity, persistentEntity.getTableName()).build();

this.batch.addStatement(update);
addStatement(update);
}

return this;
Expand Down Expand Up @@ -227,13 +264,14 @@ public CassandraBatchOperations delete(Iterable<?> entities, WriteOptions option
for (Object entity : entities) {

Assert.notNull(entity, "Entity must not be null");
assertNotStatement("delete", entity);

CassandraPersistentEntity<?> persistentEntity = getRequiredPersistentEntity(entity.getClass());

SimpleStatement delete = getStatementFactory()
.delete(entity, options, this.getConverter(), persistentEntity.getTableName()).build();

this.batch.addStatement(delete);
addStatement(delete);
}

return this;
Expand All @@ -257,4 +295,11 @@ private void assertNotExecuted() {
private CassandraPersistentEntity<?> getRequiredPersistentEntity(Class<?> entityType) {
return getMappingContext().getRequiredPersistentEntity(ClassUtils.getUserClass(entityType));
}

private static void assertNotStatement(String operation, Object o) {
if (o instanceof Statement<?>) {
throw new IllegalArgumentException(String.format("%s cannot use a Statement: %s. Use only entities for %s",
StringUtils.capitalize(operation), ClassUtils.getDescriptiveType(o), operation));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,22 @@
*/
package org.springframework.data.cassandra.core;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Collections;

import org.reactivestreams.Subscriber;

import org.springframework.data.cassandra.core.cql.WriteOptions;
import org.springframework.util.Assert;

import com.datastax.oss.driver.api.core.cql.BatchableStatement;

/**
* Reactive Batch operations for insert/update/delete actions on a table. {@link ReactiveCassandraBatchOperations} use
* logged Cassandra {@code BATCH}es for single entities and collections of entities. A
* {@link ReactiveCassandraBatchOperations} instance cannot be modified/used once it was executed.
* logged Cassandra {@code BATCH}es for single entities, collections of entities, and {@link BatchableStatement
* statements}. A {@link ReactiveCassandraBatchOperations} instance cannot be modified/used once it was executed.
* <p>
* Batches are atomic by default. In the context of a Cassandra batch operation, atomic means that if any of the batch
* succeeds, all of it will. Statement order does not matter within a batch. {@link ReactiveCassandraBatchOperations}
Expand Down Expand Up @@ -61,6 +65,62 @@ public interface ReactiveCassandraBatchOperations {
*/
ReactiveCassandraBatchOperations withTimestamp(long timestamp);

/**
* Add a {@link BatchableStatement statement} to the batch.
*
* @param statement the batchable statement such as {@code INSERT}, {@code UPDATE}, {@code DELETE}.
* @return {@code this} {@link ReactiveCassandraBatchOperations}.
* @throws IllegalStateException if the batch was already executed.
* @since 4.4
*/
default ReactiveCassandraBatchOperations addStatement(BatchableStatement<?> statement) {
return addStatement(Mono.just(statement));
}

/**
* Add a Mono of {@link BatchableStatement statement} to the batch.
*
* @param statement the batchable statement such as {@code INSERT}, {@code UPDATE}, {@code DELETE}.
* @return {@code this} {@link ReactiveCassandraBatchOperations}.
* @throws IllegalStateException if the batch was already executed.
* @since 4.4
*/
ReactiveCassandraBatchOperations addStatement(Mono<? extends BatchableStatement<?>> statement);

/**
* Add {@link BatchableStatement statements} to the batch.
*
* @param statements the batchable statements such as {@code INSERT}, {@code UPDATE}, {@code DELETE}.
* @return {@code this} {@link ReactiveCassandraBatchOperations}.
* @throws IllegalStateException if the batch was already executed.
* @since 4.4
*/
default ReactiveCassandraBatchOperations addStatements(BatchableStatement<?>... statements) {
return addStatements(Flux.fromArray(statements).toIterable());
}

/**
* Add {@link BatchableStatement statements} to the batch.
*
* @param statements the batchable statements such as {@code INSERT}, {@code UPDATE}, {@code DELETE}.
* @return {@code this} {@link ReactiveCassandraBatchOperations}.
* @throws IllegalStateException if the batch was already executed.
* @since 4.4
*/
default ReactiveCassandraBatchOperations addStatements(Iterable<? extends BatchableStatement<?>> statements) {
return addStatements(Mono.just(statements));
}

/**
* Add Mono of {@link BatchableStatement statements} to the batch.
*
* @param statements the batchable statements such as {@code INSERT}, {@code UPDATE}, {@code DELETE}.
* @return {@code this} {@link ReactiveCassandraBatchOperations}.
* @throws IllegalStateException if the batch was already executed.
* @since 4.4
*/
ReactiveCassandraBatchOperations addStatements(Mono<? extends Iterable<? extends BatchableStatement<?>>> statements);

/**
* Add an insert to the batch.
*
Expand Down
Loading

0 comments on commit 1a72451

Please sign in to comment.