diff --git a/pom.xml b/pom.xml
index 4191266..bcbf793 100644
--- a/pom.xml
+++ b/pom.xml
@@ -16,7 +16,7 @@
org.apache.cassandra
cassandra-all
- 1.0.8
+ 1.1.5
cascading
diff --git a/src/main/java/org/pingles/cascading/cassandra/CassandraTap.java b/src/main/java/org/pingles/cascading/cassandra/CassandraTap.java
index cb58eed..e68b91e 100644
--- a/src/main/java/org/pingles/cascading/cassandra/CassandraTap.java
+++ b/src/main/java/org/pingles/cascading/cassandra/CassandraTap.java
@@ -11,7 +11,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
-import org.pingles.cascading.cassandra.hadoop.ColumnFamilyInputFormat;
+import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
@@ -50,7 +50,7 @@ public CassandraTap(
this.rpcPort = rpcPort;
this.columnFamilyName = columnFamilyName;
this.keyspace = keyspace;
- this.pathUUID = java.util.UUID.randomUUID().toString();
+ this.pathUUID = java.util.UUID.randomUUID().toString();
}
@Override
@@ -61,18 +61,19 @@ public void sinkInit(JobConf conf) throws IOException {
super.sinkInit(conf);
ConfigHelper.setOutputColumnFamily(conf, keyspace, columnFamilyName);
- ConfigHelper.setPartitioner(conf,
- "org.apache.cassandra.dht.RandomPartitioner");
+ ConfigHelper.setOutputPartitioner(conf, "org.apache.cassandra.dht.RandomPartitioner");
endpointInit(conf);
}
@Override
public void sourceInit(JobConf conf) throws IOException {
+ LOGGER.info("Created Cassandra tap {}", getPath());
LOGGER.info("Sourcing from column family: {}", columnFamilyName);
FileInputFormat.addInputPaths(conf, getPath().toString());
conf.setInputFormat(ColumnFamilyInputFormat.class);
ConfigHelper.setInputColumnFamily(conf, keyspace, columnFamilyName);
+ ConfigHelper.setInputPartitioner(conf, "org.apache.cassandra.dht.RandomPartitioner");
endpointInit(conf);
super.sourceInit(conf);
@@ -80,15 +81,19 @@ public void sourceInit(JobConf conf) throws IOException {
protected void endpointInit(JobConf conf) throws IOException {
if (initialAddress != null) {
- ConfigHelper.setInitialAddress(conf, initialAddress);
- } else if (ConfigHelper.getInitialAddress(conf) == null) {
- ConfigHelper.setInitialAddress(conf, DEFAULT_ADDRESS);
+ ConfigHelper.setInputInitialAddress(conf, initialAddress);
+ ConfigHelper.setOutputInitialAddress(conf, initialAddress);
+ } else if (ConfigHelper.getInputInitialAddress(conf) == null) {
+ ConfigHelper.setInputInitialAddress(conf, DEFAULT_ADDRESS);
+ ConfigHelper.setOutputInitialAddress(conf, DEFAULT_ADDRESS);
}
if (rpcPort != null) {
- ConfigHelper.setRpcPort(conf, rpcPort.toString());
+ ConfigHelper.setInputRpcPort(conf, rpcPort.toString());
+ ConfigHelper.setOutputRpcPort(conf, rpcPort.toString());
} else if (conf.get(THRIFT_PORT_KEY) == null) {
- ConfigHelper.setRpcPort(conf, DEFAULT_RPC_PORT);
+ ConfigHelper.setInputRpcPort(conf, DEFAULT_RPC_PORT);
+ ConfigHelper.setOutputRpcPort(conf, DEFAULT_RPC_PORT);
}
}
diff --git a/src/main/java/org/pingles/cascading/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/main/java/org/pingles/cascading/cassandra/hadoop/ColumnFamilyInputFormat.java
deleted file mode 100644
index 27f8138..0000000
--- a/src/main/java/org/pingles/cascading/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ /dev/null
@@ -1,235 +0,0 @@
-package org.pingles.cascading.cassandra.hadoop;
-
-import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.cassandra.thrift.InvalidRequestException;
-import org.apache.cassandra.thrift.KeyRange;
-import org.apache.cassandra.thrift.TBinaryProtocol;
-import org.apache.cassandra.thrift.TokenRange;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.SortedMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import static java.lang.System.nanoTime;
-
-public class ColumnFamilyInputFormat implements InputFormat> {
- private static final Logger LOGGER = LoggerFactory.getLogger(ColumnFamilyInputFormat.class);
- private String keyspace;
- private String cfName;
-
- private void validateConfiguration(JobConf conf) {
- if (ConfigHelper.getInputKeyspace(conf) == null) {
- throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()");
- }
- if (ConfigHelper.getInputColumnFamily(conf) == null) {
- throw new UnsupportedOperationException("you must set the column family with setColumnFamily()");
- }
- if (ConfigHelper.getInputSlicePredicate(conf) == null)
- {
- throw new UnsupportedOperationException("you must set the predicate with setPredicate");
- }
-
- }
-
- public InputSplit[] getSplits(JobConf jobConf, int i) throws IOException {
- validateConfiguration(jobConf);
-
- List masterRangeNodes = getRangeMap(jobConf);
- keyspace = ConfigHelper.getInputKeyspace(jobConf);
- cfName = ConfigHelper.getInputColumnFamily(jobConf);
-
- ExecutorService executor = Executors.newCachedThreadPool();
- ArrayList splits = new ArrayList();
-
- try {
- List>> splitFutures = new ArrayList>>();
- KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(jobConf);
- IPartitioner partitioner = null;
- Range jobRange = null;
-
- if (jobKeyRange != null) {
- partitioner = ConfigHelper.getPartitioner(jobConf);
- assert partitioner.preservesOrder() : "ConfigHelper.setInputKeyRange(..) can only be used with a order preserving paritioner";
- assert jobKeyRange.start_key == null : "only start_token supported";
- assert jobKeyRange.end_key == null : "only end_token supported";
- jobRange = new Range(partitioner.getTokenFactory().fromString(jobKeyRange.start_token), partitioner.getTokenFactory().fromString(jobKeyRange.end_token), partitioner);
- }
-
- for (TokenRange range : masterRangeNodes) {
- if (jobRange == null) {
- splitFutures.add(executor.submit(new SplitCallable(range, jobConf)));
- }
- if (jobRange != null) {
- Range dhtRange = new Range(partitioner.getTokenFactory().fromString(range.start_token),
- partitioner.getTokenFactory().fromString(range.end_token),
- partitioner);
-
- if (dhtRange.intersects(jobRange)) {
- for (Range intersection: dhtRange.intersectionWith(jobRange))
- {
- range.start_token = partitioner.getTokenFactory().toString(intersection.left);
- range.end_token = partitioner.getTokenFactory().toString(intersection.right);
- // for each range, pick a live owner and ask it to compute bite-sized splits
- splitFutures.add(executor.submit(new SplitCallable(range, jobConf)));
- }
- }
- }
- }
-
- // wait until we have all the results back
- for (Future> futureInputSplits : splitFutures) {
- try {
- splits.addAll(futureInputSplits.get());
- } catch (Exception e) {
- throw new IOException("Could not get input splits", e);
- }
- }
- } finally {
- executor.shutdownNow();
- }
-
- assert splits.size() > 0;
- Collections.shuffle(splits, new Random(nanoTime()));
-
- LOGGER.info("Splits are: {}", StringUtils.join(splits, ","));
-
- return splits.toArray(new InputSplit[splits.size()]);
- }
-
- private List getRangeMap(JobConf jobConf) throws IOException {
- Cassandra.Client client = getClientFromAddressList(jobConf);
-
- List map;
- try {
- map = client.describe_ring(ConfigHelper.getInputKeyspace(jobConf));
- } catch (TException e) {
- throw new RuntimeException(e);
- } catch (InvalidRequestException e) {
- throw new RuntimeException(e);
- }
- return map;
- }
-
- public RecordReader> getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
- return new ColumnFamilyRecordReader((ColumnFamilySplit) inputSplit, jobConf);
- }
-
- public static Cassandra.Client getClientFromAddressList(JobConf jobConf) throws IOException
- {
- String[] addresses = ConfigHelper.getInitialAddress(jobConf).split(",");
- Cassandra.Client client = null;
- List exceptions = new ArrayList();
- for (String address : addresses) {
- try {
- client = createConnection(address, ConfigHelper.getRpcPort(jobConf), true);
- break;
- } catch (IOException ioe) {
- exceptions.add(ioe);
- }
- }
- if (client == null) {
- LOGGER.error("failed to connect to any initial addresses");
- for (IOException ioe : exceptions) {
- LOGGER.error("", ioe);
- }
- throw exceptions.get(exceptions.size() - 1);
- }
- return client;
- }
-
- public static Cassandra.Client createConnection(String host, Integer port, boolean framed) throws IOException
- {
- TSocket socket = new TSocket(host, port);
- TTransport trans = framed ? new TFramedTransport(socket) : socket;
- try {
- trans.open();
- } catch (TTransportException e) {
- throw new IOException("unable to connect to server", e);
- }
- return new Cassandra.Client(new TBinaryProtocol(trans));
- }
-
- /**
- * Gets a token range and splits it up according to the suggested
- * size into input splits that Hadoop can use.
- */
- class SplitCallable implements Callable>
- {
-
- private final TokenRange range;
- private final JobConf conf;
-
- public SplitCallable(TokenRange tr, JobConf conf) {
- this.range = tr;
- this.conf = conf;
- }
-
- public List call() throws Exception {
- ArrayList splits = new ArrayList();
- List tokens = getSubSplits(keyspace, cfName, range, conf);
- assert range.rpc_endpoints.size() == range.endpoints.size() : "rpc_endpoints size must match endpoints size";
- // turn the sub-ranges into InputSplits
- String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
- // hadoop needs hostname, not ip
- int endpointIndex = 0;
- for (String endpoint: range.rpc_endpoints) {
- String endpoint_address = endpoint;
- if (endpoint_address == null || endpoint_address.equals("0.0.0.0"))
- endpoint_address = range.endpoints.get(endpointIndex);
- endpoints[endpointIndex++] = InetAddress.getByName(endpoint_address).getHostName();
- }
-
- for (int i = 1; i < tokens.size(); i++) {
- ColumnFamilySplit split = new ColumnFamilySplit(tokens.get(i - 1), tokens.get(i), endpoints);
- LOGGER.debug("adding " + split);
- splits.add(split);
- }
- return splits;
- }
- }
-
- private List getSubSplits(String keyspace, String cfName, TokenRange range, JobConf conf) throws IOException {
- int splitsize = ConfigHelper.getInputSplitSize(conf);
- for (String host : range.rpc_endpoints) {
- try {
- Cassandra.Client client = createConnection(host, ConfigHelper.getRpcPort(conf), true);
- client.set_keyspace(keyspace);
- return client.describe_splits(cfName, range.start_token, range.end_token, splitsize);
- } catch (IOException e) {
- LOGGER.debug("failed connect to endpoint " + host, e);
- } catch (TException e) {
- throw new RuntimeException(e);
- } catch (InvalidRequestException e) {
- throw new RuntimeException(e);
- }
- }
- throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ","));
- }
-}
diff --git a/src/main/java/org/pingles/cascading/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/main/java/org/pingles/cascading/cassandra/hadoop/ColumnFamilyRecordReader.java
deleted file mode 100644
index ca8cc30..0000000
--- a/src/main/java/org/pingles/cascading/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ /dev/null
@@ -1,321 +0,0 @@
-package org.pingles.cascading.cassandra.hadoop;
-
-import com.google.common.collect.AbstractIterator;
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.thrift.AuthenticationException;
-import org.apache.cassandra.thrift.AuthenticationRequest;
-import org.apache.cassandra.thrift.AuthorizationException;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.cassandra.thrift.CfDef;
-import org.apache.cassandra.thrift.Column;
-import org.apache.cassandra.thrift.ColumnOrSuperColumn;
-import org.apache.cassandra.thrift.ColumnParent;
-import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.thrift.CounterColumn;
-import org.apache.cassandra.thrift.CounterSuperColumn;
-import org.apache.cassandra.thrift.InvalidRequestException;
-import org.apache.cassandra.thrift.KeyRange;
-import org.apache.cassandra.thrift.KeySlice;
-import org.apache.cassandra.thrift.KsDef;
-import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.SuperColumn;
-import org.apache.cassandra.thrift.TBinaryProtocol;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-public class ColumnFamilyRecordReader implements RecordReader> {
- private static final Logger LOGGER = LoggerFactory.getLogger(ColumnFamilyInputFormat.class);
-
- private final ColumnFamilySplit inputSplit;
- private final JobConf jobConf;
- private final SlicePredicate predicate;
- private final int totalRowCount;
- private final int batchRowCount;
- private final String columnFamily;
- private final ConsistencyLevel consistencyLevel;
- private final String keyspace;
- private final String initialAddress;
- private TSocket socket;
- private Cassandra.Client client;
- private Pair> currentRow;
- private int rpcPort;
- private ColumnFamilyRecordReader.RowIterator rowIterator;
- private ByteBuffer keyBytesBuffer;
-
- public ColumnFamilyRecordReader(ColumnFamilySplit inputSplit, JobConf jobConf) {
- this.inputSplit = inputSplit;
- this.jobConf = jobConf;
-
- this.predicate = ConfigHelper.getInputSlicePredicate(jobConf);
- this.totalRowCount = ConfigHelper.getInputSplitSize(jobConf);
- this.batchRowCount = ConfigHelper.getRangeBatchSize(jobConf);
- this.columnFamily = ConfigHelper.getInputColumnFamily(jobConf);
- this.consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getReadConsistencyLevel(jobConf));
- this.keyspace = ConfigHelper.getInputKeyspace(jobConf);
- this.initialAddress = ConfigHelper.getInitialAddress(jobConf);
- this.rpcPort = ConfigHelper.getRpcPort(jobConf);
- this.keyBytesBuffer = ByteBuffer.allocate(4096);
-
- try {
- this.client = createClient();
- } catch (TException e) {
- throw new RuntimeException(e);
- } catch (InvalidRequestException e) {
- throw new RuntimeException(e);
- } catch (AuthorizationException e) {
- throw new RuntimeException(e);
- } catch (AuthenticationException e) {
- throw new RuntimeException(e);
- }
-
- this.rowIterator = new RowIterator();
- }
-
- private TSocket getSocket() throws TTransportException {
- if (socket != null && socket.isOpen()) {
- return socket;
- }
- socket = new TSocket(initialAddress, rpcPort);
- socket.open();
- return socket;
- }
-
- private Cassandra.Client createClient() throws TException, InvalidRequestException, AuthorizationException, AuthenticationException {
- TBinaryProtocol binaryProtocol = new TBinaryProtocol(new TFramedTransport(getSocket()));
- Cassandra.Client c = new Cassandra.Client(binaryProtocol);
-
- c.set_keyspace(keyspace);
-
- if (ConfigHelper.getInputKeyspaceUserName(jobConf) != null) {
- Map creds = new HashMap();
- creds.put("username", ConfigHelper.getInputKeyspaceUserName(jobConf));
- creds.put("password", ConfigHelper.getInputKeyspacePassword(jobConf));
- AuthenticationRequest authRequest = new AuthenticationRequest(creds);
- c.login(authRequest);
- }
-
- return c;
- }
-
- public boolean next(ByteBuffer keyBuffer, SortedMap valueMap) throws IOException {
- if (!rowIterator.hasNext())
- return false;
- currentRow = rowIterator.next();
-
- ByteBuffer currentKey = currentRow.left;
- SortedMap currentValue = currentRow.right;
- keyBuffer.clear();
- keyBuffer.put(currentKey);
- keyBuffer.flip();
-
- valueMap.clear();
- valueMap.putAll(currentValue);
-
- return true;
- }
-
- public ByteBuffer createKey() {
- return keyBytesBuffer;
- }
-
- private SortedMap emptyMap() {
- return new TreeMap();
- }
-
- public SortedMap createValue() {
- return emptyMap();
- }
-
- public long getPos() throws IOException {
- return rowIterator.rowsRead();
- }
-
- public void close() throws IOException {
- if (socket != null && socket.isOpen()) {
- socket.close();
- socket = null;
- client = null;
- }
- }
-
- public float getProgress() throws IOException {
- return ((float)rowIterator.rowsRead()) / totalRowCount;
- }
-
- private class RowIterator extends AbstractIterator>> {
- private List rows;
- private String startToken;
- private int totalRead = 0;
- private int i = 0;
- private final AbstractType comparator;
- private final AbstractType subComparator;
- private final IPartitioner partitioner;
-
- private RowIterator() {
- try {
- partitioner = FBUtilities.newPartitioner(client.describe_partitioner());
-
- // Get the Keyspace metadata, then get the specific CF metadata
- // in order to populate the sub/comparator.
- KsDef ks_def = client.describe_keyspace(keyspace);
- List cfnames = new ArrayList();
- for (CfDef cfd : ks_def.cf_defs)
- cfnames.add(cfd.name);
- int idx = cfnames.indexOf(columnFamily);
- CfDef cf_def = ks_def.cf_defs.get(idx);
-
- comparator = TypeParser.parse(cf_def.comparator_type);
- subComparator = cf_def.subcomparator_type == null ? null : TypeParser.parse(cf_def.subcomparator_type);
- } catch (ConfigurationException e) {
- throw new RuntimeException("unable to load sub/comparator", e);
- } catch (TException e) {
- throw new RuntimeException("error communicating via Thrift", e);
- } catch (Exception e) {
- throw new RuntimeException("unable to load keyspace " + keyspace, e);
- }
- }
-
- private void maybeInit() {
- // check if we need another batch
- if (rows != null && i >= rows.size())
- rows = null;
-
- if (rows != null)
- return;
-
- if (startToken == null) {
- startToken = inputSplit.getStartToken();
- } else if (startToken.equals(inputSplit.getEndToken())) {
- rows = null;
- return;
- }
-
- KeyRange keyRange = new KeyRange(batchRowCount);
- keyRange.setStart_token(startToken);
- keyRange.setEnd_token(inputSplit.getEndToken());
- try {
- rows = client.get_range_slices(new ColumnParent(columnFamily), predicate, keyRange, consistencyLevel);
-
- // nothing new? reached the end
- if (rows.isEmpty()) {
- rows = null;
- return;
- }
-
- // Pre-compute the last row key, before removing empty rows
- ByteBuffer lastRowKey = rows.get(rows.size() - 1).key;
-
- // only remove empty rows if the slice predicate is empty
- if (isPredicateEmpty(predicate)) {
- Iterator rowsIterator = rows.iterator();
- while (rowsIterator.hasNext())
- if (rowsIterator.next().columns.isEmpty())
- rowsIterator.remove();
- }
-
- // reset to iterate through the new batch
- i = 0;
-
- // prepare for the next slice to be read
- startToken = partitioner.getTokenFactory().toString(partitioner.getToken(lastRowKey));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * @return total number of rows read by this record reader
- */
- public int rowsRead() {
- return totalRead;
- }
-
- protected Pair> computeNext() {
- maybeInit();
- if (rows == null)
- return endOfData();
-
- totalRead++;
- KeySlice ks = rows.get(i++);
- SortedMap map = new TreeMap(comparator);
- for (ColumnOrSuperColumn cosc : ks.columns) {
- IColumn column = unthriftify(cosc);
- map.put(column.name(), column);
- }
- return new Pair>(ks.key, map);
- }
-
- private IColumn unthriftify(ColumnOrSuperColumn cosc) {
- if (cosc.counter_column != null)
- return unthriftifyCounter(cosc.counter_column);
- if (cosc.counter_super_column != null)
- return unthriftifySuperCounter(cosc.counter_super_column);
- if (cosc.super_column != null)
- return unthriftifySuper(cosc.super_column);
- assert cosc.column != null;
- return unthriftifySimple(cosc.column);
- }
-
- private IColumn unthriftifySuper(SuperColumn super_column) {
- org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator);
- for (Column column : super_column.columns) {
- sc.addColumn(unthriftifySimple(column));
- }
- return sc;
- }
-
- private IColumn unthriftifySimple(Column column) {
- return new org.apache.cassandra.db.Column(column.name, column.value, column.timestamp);
- }
-
- private IColumn unthriftifyCounter(CounterColumn column) {
- //CounterColumns read the nodeID from the System table, so need the StorageService running and access
- //to cassandra.yaml. To avoid a Hadoop needing access to yaml return a regular Column.
- return new org.apache.cassandra.db.Column(column.name, ByteBufferUtil.bytes(column.value), 0);
- }
-
- private IColumn unthriftifySuperCounter(CounterSuperColumn superColumn) {
- org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(superColumn.name, subComparator);
- for (CounterColumn column : superColumn.columns)
- sc.addColumn(unthriftifyCounter(column));
- return sc;
- }
- }
-
- private boolean isPredicateEmpty(SlicePredicate predicate) {
- if (predicate != null) {
- if (predicate.isSetColumn_names()) {
- return false;
- }
- if (predicate.getSlice_range().getStart() != null && predicate.getSlice_range().getFinish() != null) {
- return false;
- }
- }
-
- return true;
- }
-}
diff --git a/src/test/java/org/pingles/cascading/cassandra/CassandraClient.java b/src/test/java/org/pingles/cascading/cassandra/CassandraClient.java
index 6205870..5d61132 100644
--- a/src/test/java/org/pingles/cascading/cassandra/CassandraClient.java
+++ b/src/test/java/org/pingles/cascading/cassandra/CassandraClient.java
@@ -142,7 +142,7 @@ public void useKeyspace(String keyspaceName) throws TException, InvalidRequestEx
client.recv_set_keyspace();
}
- public void truncate(String columnFamilyName) throws TException, InvalidRequestException, UnavailableException {
+ public void truncate(String columnFamilyName) throws TException, InvalidRequestException, UnavailableException, TimedOutException {
LOGGER.info("Truncating {}", columnFamilyName);
client.send_truncate(columnFamilyName);
client.recv_truncate();
diff --git a/src/test/java/org/pingles/cascading/cassandra/CassandraFlowTest.java b/src/test/java/org/pingles/cascading/cassandra/CassandraFlowTest.java
index 0828600..9fc0864 100644
--- a/src/test/java/org/pingles/cascading/cassandra/CassandraFlowTest.java
+++ b/src/test/java/org/pingles/cascading/cassandra/CassandraFlowTest.java
@@ -79,7 +79,7 @@ public void beforeTest() throws Exception {
}
@After
- public void afterTest() throws TException, InvalidRequestException, UnavailableException {
+ public void afterTest() throws TException, InvalidRequestException, UnavailableException, TimedOutException {
try {
client.truncate(columnFamilyName);
} finally {
@@ -124,7 +124,7 @@ public void testNarrowRowSinkWithStringColumnNamesSpecified() throws Exception {
assertEquals("b", getTestBytes("2", "lower"));
assertEquals("B", getTestBytes("2", "upper"));
}
-
+
@Test
public void testWideRowAsSource() throws Exception {
client.put(columnFamilyName, toBytes("21"), toBytes("lower"), toBytes("a"));