From 192f5dbcaff47b2ce43bf256ccc6cc3f2ede5181 Mon Sep 17 00:00:00 2001 From: sakanaou Date: Tue, 13 Dec 2016 10:48:09 +0100 Subject: [PATCH] STORM-2242: Persisting Trident state honors configured batch.size.rows --- .../trident/state/CassandraState.java | 48 +++++++++++++++---- 1 file changed, 39 insertions(+), 9 deletions(-) diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java index 937b8c97ddd..1cd4db09c4c 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java @@ -23,7 +23,10 @@ import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.Session; import com.datastax.driver.core.Statement; +import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import org.apache.storm.cassandra.client.CassandraConf; import org.apache.storm.cassandra.client.SimpleClient; import org.apache.storm.cassandra.client.SimpleClientProvider; import org.apache.storm.cassandra.query.CQLResultSetValuesMapper; @@ -49,6 +52,8 @@ public class CassandraState implements State { private final Map conf; private final Options options; + private CassandraConf cassandraConf; + private Session session; private SimpleClient client; @@ -62,6 +67,7 @@ public static final class Options implements Serializable { private CQLStatementTupleMapper cqlStatementTupleMapper; private CQLResultSetValuesMapper cqlResultSetValuesMapper; private BatchStatement.Type batchingType; + private Map cassandraConfig; public Options(SimpleClientProvider clientProvider) { this.clientProvider = clientProvider; @@ -82,6 +88,11 @@ public Options withBatching(BatchStatement.Type batchingType) { return this; } + public Options withCassandraConfig(Map config) { + cassandraConfig = config; + return this; + } + } @Override @@ -97,6 +108,10 @@ public void commit(Long txid) { public void prepare() { Preconditions.checkNotNull(options.cqlStatementTupleMapper, "CassandraState.Options should have cqlStatementTupleMapper"); + Map cassandraClientConfig = options.cassandraConfig != null ? options.cassandraConfig : conf; + + cassandraConf = new CassandraConf(cassandraClientConfig); + client = options.clientProvider.getClient(conf); session = client.connect(); } @@ -113,17 +128,32 @@ public void cleanup() { public void updateState(List tuples, final TridentCollector collector) { - List statements = new ArrayList<>(); - for (TridentTuple tuple : tuples) { - statements.addAll(options.cqlStatementTupleMapper.map(conf, session, tuple)); - } + Iterable> partitionedTuples = Iterables.partition(tuples, cassandraConf.getBatchSizeRows()); + + Iterable> partitionedStatements = Iterables.transform(partitionedTuples, new Function, List>() { + @Override + public List apply(List l) { + List result = new ArrayList<>(); + + if (options.batchingType != null) { + BatchStatement batchStatement = new BatchStatement(options.batchingType); + result.add(batchStatement); + + for (TridentTuple tuple : l) { + batchStatement.addAll(options.cqlStatementTupleMapper.map(conf, session, tuple)); + } + } else { + for (TridentTuple tuple : l) { + result.addAll(options.cqlStatementTupleMapper.map(conf, session, tuple)); + } + } + + return result; + } + }); try { - if (options.batchingType != null) { - BatchStatement batchStatement = new BatchStatement(options.batchingType); - batchStatement.addAll(statements); - session.execute(batchStatement); - } else { + for (List statements : partitionedStatements) { for (Statement statement : statements) { session.execute(statement); }