From caef57b54c0eb084bb5a59c89ba03bfdaea94d08 Mon Sep 17 00:00:00 2001 From: Matt Stump Date: Mon, 17 Sep 2012 12:45:12 -0700 Subject: [PATCH] updated so that it works with newer versions of cassandra, the interface for ConfigHelper was split into seperate methods for input and output eg setInputPartitioner --- pom.xml | 2 +- .../cascading/cassandra/CassandraTap.java | 23 +- .../hadoop/ColumnFamilyInputFormat.java | 235 ------------- .../hadoop/ColumnFamilyRecordReader.java | 321 ------------------ .../cascading/cassandra/CassandraClient.java | 2 +- .../cassandra/CassandraFlowTest.java | 4 +- 6 files changed, 18 insertions(+), 569 deletions(-) delete mode 100644 src/main/java/org/pingles/cascading/cassandra/hadoop/ColumnFamilyInputFormat.java delete mode 100644 src/main/java/org/pingles/cascading/cassandra/hadoop/ColumnFamilyRecordReader.java diff --git a/pom.xml b/pom.xml index 4191266..bcbf793 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ org.apache.cassandra cassandra-all - 1.0.8 + 1.1.5 cascading diff --git a/src/main/java/org/pingles/cascading/cassandra/CassandraTap.java b/src/main/java/org/pingles/cascading/cassandra/CassandraTap.java index cb58eed..e68b91e 100644 --- a/src/main/java/org/pingles/cascading/cassandra/CassandraTap.java +++ b/src/main/java/org/pingles/cascading/cassandra/CassandraTap.java @@ -11,7 +11,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; -import org.pingles.cascading.cassandra.hadoop.ColumnFamilyInputFormat; +import org.apache.cassandra.hadoop.ColumnFamilyInputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; @@ -50,7 +50,7 @@ public CassandraTap( this.rpcPort = rpcPort; this.columnFamilyName = columnFamilyName; this.keyspace = keyspace; - this.pathUUID = java.util.UUID.randomUUID().toString(); + this.pathUUID = java.util.UUID.randomUUID().toString(); } @Override @@ -61,18 +61,19 @@ public void sinkInit(JobConf conf) throws IOException { super.sinkInit(conf); ConfigHelper.setOutputColumnFamily(conf, keyspace, columnFamilyName); - ConfigHelper.setPartitioner(conf, - "org.apache.cassandra.dht.RandomPartitioner"); + ConfigHelper.setOutputPartitioner(conf, "org.apache.cassandra.dht.RandomPartitioner"); endpointInit(conf); } @Override public void sourceInit(JobConf conf) throws IOException { + LOGGER.info("Created Cassandra tap {}", getPath()); LOGGER.info("Sourcing from column family: {}", columnFamilyName); FileInputFormat.addInputPaths(conf, getPath().toString()); conf.setInputFormat(ColumnFamilyInputFormat.class); ConfigHelper.setInputColumnFamily(conf, keyspace, columnFamilyName); + ConfigHelper.setInputPartitioner(conf, "org.apache.cassandra.dht.RandomPartitioner"); endpointInit(conf); super.sourceInit(conf); @@ -80,15 +81,19 @@ public void sourceInit(JobConf conf) throws IOException { protected void endpointInit(JobConf conf) throws IOException { if (initialAddress != null) { - ConfigHelper.setInitialAddress(conf, initialAddress); - } else if (ConfigHelper.getInitialAddress(conf) == null) { - ConfigHelper.setInitialAddress(conf, DEFAULT_ADDRESS); + ConfigHelper.setInputInitialAddress(conf, initialAddress); + ConfigHelper.setOutputInitialAddress(conf, initialAddress); + } else if (ConfigHelper.getInputInitialAddress(conf) == null) { + ConfigHelper.setInputInitialAddress(conf, DEFAULT_ADDRESS); + ConfigHelper.setOutputInitialAddress(conf, DEFAULT_ADDRESS); } if (rpcPort != null) { - ConfigHelper.setRpcPort(conf, rpcPort.toString()); + ConfigHelper.setInputRpcPort(conf, rpcPort.toString()); + ConfigHelper.setOutputRpcPort(conf, rpcPort.toString()); } else if (conf.get(THRIFT_PORT_KEY) == null) { - ConfigHelper.setRpcPort(conf, DEFAULT_RPC_PORT); + ConfigHelper.setInputRpcPort(conf, DEFAULT_RPC_PORT); + ConfigHelper.setOutputRpcPort(conf, DEFAULT_RPC_PORT); } } diff --git a/src/main/java/org/pingles/cascading/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/main/java/org/pingles/cascading/cassandra/hadoop/ColumnFamilyInputFormat.java deleted file mode 100644 index 27f8138..0000000 --- a/src/main/java/org/pingles/cascading/cassandra/hadoop/ColumnFamilyInputFormat.java +++ /dev/null @@ -1,235 +0,0 @@ -package org.pingles.cascading.cassandra.hadoop; - -import org.apache.cassandra.db.IColumn; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.hadoop.ConfigHelper; -import org.apache.cassandra.thrift.Cassandra; -import org.apache.cassandra.thrift.InvalidRequestException; -import org.apache.cassandra.thrift.KeyRange; -import org.apache.cassandra.thrift.TBinaryProtocol; -import org.apache.cassandra.thrift.TokenRange; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.thrift.TException; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Random; -import java.util.SortedMap; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import static java.lang.System.nanoTime; - -public class ColumnFamilyInputFormat implements InputFormat> { - private static final Logger LOGGER = LoggerFactory.getLogger(ColumnFamilyInputFormat.class); - private String keyspace; - private String cfName; - - private void validateConfiguration(JobConf conf) { - if (ConfigHelper.getInputKeyspace(conf) == null) { - throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()"); - } - if (ConfigHelper.getInputColumnFamily(conf) == null) { - throw new UnsupportedOperationException("you must set the column family with setColumnFamily()"); - } - if (ConfigHelper.getInputSlicePredicate(conf) == null) - { - throw new UnsupportedOperationException("you must set the predicate with setPredicate"); - } - - } - - public InputSplit[] getSplits(JobConf jobConf, int i) throws IOException { - validateConfiguration(jobConf); - - List masterRangeNodes = getRangeMap(jobConf); - keyspace = ConfigHelper.getInputKeyspace(jobConf); - cfName = ConfigHelper.getInputColumnFamily(jobConf); - - ExecutorService executor = Executors.newCachedThreadPool(); - ArrayList splits = new ArrayList(); - - try { - List>> splitFutures = new ArrayList>>(); - KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(jobConf); - IPartitioner partitioner = null; - Range jobRange = null; - - if (jobKeyRange != null) { - partitioner = ConfigHelper.getPartitioner(jobConf); - assert partitioner.preservesOrder() : "ConfigHelper.setInputKeyRange(..) can only be used with a order preserving paritioner"; - assert jobKeyRange.start_key == null : "only start_token supported"; - assert jobKeyRange.end_key == null : "only end_token supported"; - jobRange = new Range(partitioner.getTokenFactory().fromString(jobKeyRange.start_token), partitioner.getTokenFactory().fromString(jobKeyRange.end_token), partitioner); - } - - for (TokenRange range : masterRangeNodes) { - if (jobRange == null) { - splitFutures.add(executor.submit(new SplitCallable(range, jobConf))); - } - if (jobRange != null) { - Range dhtRange = new Range(partitioner.getTokenFactory().fromString(range.start_token), - partitioner.getTokenFactory().fromString(range.end_token), - partitioner); - - if (dhtRange.intersects(jobRange)) { - for (Range intersection: dhtRange.intersectionWith(jobRange)) - { - range.start_token = partitioner.getTokenFactory().toString(intersection.left); - range.end_token = partitioner.getTokenFactory().toString(intersection.right); - // for each range, pick a live owner and ask it to compute bite-sized splits - splitFutures.add(executor.submit(new SplitCallable(range, jobConf))); - } - } - } - } - - // wait until we have all the results back - for (Future> futureInputSplits : splitFutures) { - try { - splits.addAll(futureInputSplits.get()); - } catch (Exception e) { - throw new IOException("Could not get input splits", e); - } - } - } finally { - executor.shutdownNow(); - } - - assert splits.size() > 0; - Collections.shuffle(splits, new Random(nanoTime())); - - LOGGER.info("Splits are: {}", StringUtils.join(splits, ",")); - - return splits.toArray(new InputSplit[splits.size()]); - } - - private List getRangeMap(JobConf jobConf) throws IOException { - Cassandra.Client client = getClientFromAddressList(jobConf); - - List map; - try { - map = client.describe_ring(ConfigHelper.getInputKeyspace(jobConf)); - } catch (TException e) { - throw new RuntimeException(e); - } catch (InvalidRequestException e) { - throw new RuntimeException(e); - } - return map; - } - - public RecordReader> getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException { - return new ColumnFamilyRecordReader((ColumnFamilySplit) inputSplit, jobConf); - } - - public static Cassandra.Client getClientFromAddressList(JobConf jobConf) throws IOException - { - String[] addresses = ConfigHelper.getInitialAddress(jobConf).split(","); - Cassandra.Client client = null; - List exceptions = new ArrayList(); - for (String address : addresses) { - try { - client = createConnection(address, ConfigHelper.getRpcPort(jobConf), true); - break; - } catch (IOException ioe) { - exceptions.add(ioe); - } - } - if (client == null) { - LOGGER.error("failed to connect to any initial addresses"); - for (IOException ioe : exceptions) { - LOGGER.error("", ioe); - } - throw exceptions.get(exceptions.size() - 1); - } - return client; - } - - public static Cassandra.Client createConnection(String host, Integer port, boolean framed) throws IOException - { - TSocket socket = new TSocket(host, port); - TTransport trans = framed ? new TFramedTransport(socket) : socket; - try { - trans.open(); - } catch (TTransportException e) { - throw new IOException("unable to connect to server", e); - } - return new Cassandra.Client(new TBinaryProtocol(trans)); - } - - /** - * Gets a token range and splits it up according to the suggested - * size into input splits that Hadoop can use. - */ - class SplitCallable implements Callable> - { - - private final TokenRange range; - private final JobConf conf; - - public SplitCallable(TokenRange tr, JobConf conf) { - this.range = tr; - this.conf = conf; - } - - public List call() throws Exception { - ArrayList splits = new ArrayList(); - List tokens = getSubSplits(keyspace, cfName, range, conf); - assert range.rpc_endpoints.size() == range.endpoints.size() : "rpc_endpoints size must match endpoints size"; - // turn the sub-ranges into InputSplits - String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]); - // hadoop needs hostname, not ip - int endpointIndex = 0; - for (String endpoint: range.rpc_endpoints) { - String endpoint_address = endpoint; - if (endpoint_address == null || endpoint_address.equals("0.0.0.0")) - endpoint_address = range.endpoints.get(endpointIndex); - endpoints[endpointIndex++] = InetAddress.getByName(endpoint_address).getHostName(); - } - - for (int i = 1; i < tokens.size(); i++) { - ColumnFamilySplit split = new ColumnFamilySplit(tokens.get(i - 1), tokens.get(i), endpoints); - LOGGER.debug("adding " + split); - splits.add(split); - } - return splits; - } - } - - private List getSubSplits(String keyspace, String cfName, TokenRange range, JobConf conf) throws IOException { - int splitsize = ConfigHelper.getInputSplitSize(conf); - for (String host : range.rpc_endpoints) { - try { - Cassandra.Client client = createConnection(host, ConfigHelper.getRpcPort(conf), true); - client.set_keyspace(keyspace); - return client.describe_splits(cfName, range.start_token, range.end_token, splitsize); - } catch (IOException e) { - LOGGER.debug("failed connect to endpoint " + host, e); - } catch (TException e) { - throw new RuntimeException(e); - } catch (InvalidRequestException e) { - throw new RuntimeException(e); - } - } - throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ",")); - } -} diff --git a/src/main/java/org/pingles/cascading/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/main/java/org/pingles/cascading/cassandra/hadoop/ColumnFamilyRecordReader.java deleted file mode 100644 index ca8cc30..0000000 --- a/src/main/java/org/pingles/cascading/cassandra/hadoop/ColumnFamilyRecordReader.java +++ /dev/null @@ -1,321 +0,0 @@ -package org.pingles.cascading.cassandra.hadoop; - -import com.google.common.collect.AbstractIterator; -import org.apache.cassandra.config.ConfigurationException; -import org.apache.cassandra.db.IColumn; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.TypeParser; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.hadoop.ConfigHelper; -import org.apache.cassandra.thrift.AuthenticationException; -import org.apache.cassandra.thrift.AuthenticationRequest; -import org.apache.cassandra.thrift.AuthorizationException; -import org.apache.cassandra.thrift.Cassandra; -import org.apache.cassandra.thrift.CfDef; -import org.apache.cassandra.thrift.Column; -import org.apache.cassandra.thrift.ColumnOrSuperColumn; -import org.apache.cassandra.thrift.ColumnParent; -import org.apache.cassandra.thrift.ConsistencyLevel; -import org.apache.cassandra.thrift.CounterColumn; -import org.apache.cassandra.thrift.CounterSuperColumn; -import org.apache.cassandra.thrift.InvalidRequestException; -import org.apache.cassandra.thrift.KeyRange; -import org.apache.cassandra.thrift.KeySlice; -import org.apache.cassandra.thrift.KsDef; -import org.apache.cassandra.thrift.SlicePredicate; -import org.apache.cassandra.thrift.SuperColumn; -import org.apache.cassandra.thrift.TBinaryProtocol; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.thrift.TException; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; - -public class ColumnFamilyRecordReader implements RecordReader> { - private static final Logger LOGGER = LoggerFactory.getLogger(ColumnFamilyInputFormat.class); - - private final ColumnFamilySplit inputSplit; - private final JobConf jobConf; - private final SlicePredicate predicate; - private final int totalRowCount; - private final int batchRowCount; - private final String columnFamily; - private final ConsistencyLevel consistencyLevel; - private final String keyspace; - private final String initialAddress; - private TSocket socket; - private Cassandra.Client client; - private Pair> currentRow; - private int rpcPort; - private ColumnFamilyRecordReader.RowIterator rowIterator; - private ByteBuffer keyBytesBuffer; - - public ColumnFamilyRecordReader(ColumnFamilySplit inputSplit, JobConf jobConf) { - this.inputSplit = inputSplit; - this.jobConf = jobConf; - - this.predicate = ConfigHelper.getInputSlicePredicate(jobConf); - this.totalRowCount = ConfigHelper.getInputSplitSize(jobConf); - this.batchRowCount = ConfigHelper.getRangeBatchSize(jobConf); - this.columnFamily = ConfigHelper.getInputColumnFamily(jobConf); - this.consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getReadConsistencyLevel(jobConf)); - this.keyspace = ConfigHelper.getInputKeyspace(jobConf); - this.initialAddress = ConfigHelper.getInitialAddress(jobConf); - this.rpcPort = ConfigHelper.getRpcPort(jobConf); - this.keyBytesBuffer = ByteBuffer.allocate(4096); - - try { - this.client = createClient(); - } catch (TException e) { - throw new RuntimeException(e); - } catch (InvalidRequestException e) { - throw new RuntimeException(e); - } catch (AuthorizationException e) { - throw new RuntimeException(e); - } catch (AuthenticationException e) { - throw new RuntimeException(e); - } - - this.rowIterator = new RowIterator(); - } - - private TSocket getSocket() throws TTransportException { - if (socket != null && socket.isOpen()) { - return socket; - } - socket = new TSocket(initialAddress, rpcPort); - socket.open(); - return socket; - } - - private Cassandra.Client createClient() throws TException, InvalidRequestException, AuthorizationException, AuthenticationException { - TBinaryProtocol binaryProtocol = new TBinaryProtocol(new TFramedTransport(getSocket())); - Cassandra.Client c = new Cassandra.Client(binaryProtocol); - - c.set_keyspace(keyspace); - - if (ConfigHelper.getInputKeyspaceUserName(jobConf) != null) { - Map creds = new HashMap(); - creds.put("username", ConfigHelper.getInputKeyspaceUserName(jobConf)); - creds.put("password", ConfigHelper.getInputKeyspacePassword(jobConf)); - AuthenticationRequest authRequest = new AuthenticationRequest(creds); - c.login(authRequest); - } - - return c; - } - - public boolean next(ByteBuffer keyBuffer, SortedMap valueMap) throws IOException { - if (!rowIterator.hasNext()) - return false; - currentRow = rowIterator.next(); - - ByteBuffer currentKey = currentRow.left; - SortedMap currentValue = currentRow.right; - keyBuffer.clear(); - keyBuffer.put(currentKey); - keyBuffer.flip(); - - valueMap.clear(); - valueMap.putAll(currentValue); - - return true; - } - - public ByteBuffer createKey() { - return keyBytesBuffer; - } - - private SortedMap emptyMap() { - return new TreeMap(); - } - - public SortedMap createValue() { - return emptyMap(); - } - - public long getPos() throws IOException { - return rowIterator.rowsRead(); - } - - public void close() throws IOException { - if (socket != null && socket.isOpen()) { - socket.close(); - socket = null; - client = null; - } - } - - public float getProgress() throws IOException { - return ((float)rowIterator.rowsRead()) / totalRowCount; - } - - private class RowIterator extends AbstractIterator>> { - private List rows; - private String startToken; - private int totalRead = 0; - private int i = 0; - private final AbstractType comparator; - private final AbstractType subComparator; - private final IPartitioner partitioner; - - private RowIterator() { - try { - partitioner = FBUtilities.newPartitioner(client.describe_partitioner()); - - // Get the Keyspace metadata, then get the specific CF metadata - // in order to populate the sub/comparator. - KsDef ks_def = client.describe_keyspace(keyspace); - List cfnames = new ArrayList(); - for (CfDef cfd : ks_def.cf_defs) - cfnames.add(cfd.name); - int idx = cfnames.indexOf(columnFamily); - CfDef cf_def = ks_def.cf_defs.get(idx); - - comparator = TypeParser.parse(cf_def.comparator_type); - subComparator = cf_def.subcomparator_type == null ? null : TypeParser.parse(cf_def.subcomparator_type); - } catch (ConfigurationException e) { - throw new RuntimeException("unable to load sub/comparator", e); - } catch (TException e) { - throw new RuntimeException("error communicating via Thrift", e); - } catch (Exception e) { - throw new RuntimeException("unable to load keyspace " + keyspace, e); - } - } - - private void maybeInit() { - // check if we need another batch - if (rows != null && i >= rows.size()) - rows = null; - - if (rows != null) - return; - - if (startToken == null) { - startToken = inputSplit.getStartToken(); - } else if (startToken.equals(inputSplit.getEndToken())) { - rows = null; - return; - } - - KeyRange keyRange = new KeyRange(batchRowCount); - keyRange.setStart_token(startToken); - keyRange.setEnd_token(inputSplit.getEndToken()); - try { - rows = client.get_range_slices(new ColumnParent(columnFamily), predicate, keyRange, consistencyLevel); - - // nothing new? reached the end - if (rows.isEmpty()) { - rows = null; - return; - } - - // Pre-compute the last row key, before removing empty rows - ByteBuffer lastRowKey = rows.get(rows.size() - 1).key; - - // only remove empty rows if the slice predicate is empty - if (isPredicateEmpty(predicate)) { - Iterator rowsIterator = rows.iterator(); - while (rowsIterator.hasNext()) - if (rowsIterator.next().columns.isEmpty()) - rowsIterator.remove(); - } - - // reset to iterate through the new batch - i = 0; - - // prepare for the next slice to be read - startToken = partitioner.getTokenFactory().toString(partitioner.getToken(lastRowKey)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - /** - * @return total number of rows read by this record reader - */ - public int rowsRead() { - return totalRead; - } - - protected Pair> computeNext() { - maybeInit(); - if (rows == null) - return endOfData(); - - totalRead++; - KeySlice ks = rows.get(i++); - SortedMap map = new TreeMap(comparator); - for (ColumnOrSuperColumn cosc : ks.columns) { - IColumn column = unthriftify(cosc); - map.put(column.name(), column); - } - return new Pair>(ks.key, map); - } - - private IColumn unthriftify(ColumnOrSuperColumn cosc) { - if (cosc.counter_column != null) - return unthriftifyCounter(cosc.counter_column); - if (cosc.counter_super_column != null) - return unthriftifySuperCounter(cosc.counter_super_column); - if (cosc.super_column != null) - return unthriftifySuper(cosc.super_column); - assert cosc.column != null; - return unthriftifySimple(cosc.column); - } - - private IColumn unthriftifySuper(SuperColumn super_column) { - org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator); - for (Column column : super_column.columns) { - sc.addColumn(unthriftifySimple(column)); - } - return sc; - } - - private IColumn unthriftifySimple(Column column) { - return new org.apache.cassandra.db.Column(column.name, column.value, column.timestamp); - } - - private IColumn unthriftifyCounter(CounterColumn column) { - //CounterColumns read the nodeID from the System table, so need the StorageService running and access - //to cassandra.yaml. To avoid a Hadoop needing access to yaml return a regular Column. - return new org.apache.cassandra.db.Column(column.name, ByteBufferUtil.bytes(column.value), 0); - } - - private IColumn unthriftifySuperCounter(CounterSuperColumn superColumn) { - org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(superColumn.name, subComparator); - for (CounterColumn column : superColumn.columns) - sc.addColumn(unthriftifyCounter(column)); - return sc; - } - } - - private boolean isPredicateEmpty(SlicePredicate predicate) { - if (predicate != null) { - if (predicate.isSetColumn_names()) { - return false; - } - if (predicate.getSlice_range().getStart() != null && predicate.getSlice_range().getFinish() != null) { - return false; - } - } - - return true; - } -} diff --git a/src/test/java/org/pingles/cascading/cassandra/CassandraClient.java b/src/test/java/org/pingles/cascading/cassandra/CassandraClient.java index 6205870..5d61132 100644 --- a/src/test/java/org/pingles/cascading/cassandra/CassandraClient.java +++ b/src/test/java/org/pingles/cascading/cassandra/CassandraClient.java @@ -142,7 +142,7 @@ public void useKeyspace(String keyspaceName) throws TException, InvalidRequestEx client.recv_set_keyspace(); } - public void truncate(String columnFamilyName) throws TException, InvalidRequestException, UnavailableException { + public void truncate(String columnFamilyName) throws TException, InvalidRequestException, UnavailableException, TimedOutException { LOGGER.info("Truncating {}", columnFamilyName); client.send_truncate(columnFamilyName); client.recv_truncate(); diff --git a/src/test/java/org/pingles/cascading/cassandra/CassandraFlowTest.java b/src/test/java/org/pingles/cascading/cassandra/CassandraFlowTest.java index 0828600..9fc0864 100644 --- a/src/test/java/org/pingles/cascading/cassandra/CassandraFlowTest.java +++ b/src/test/java/org/pingles/cascading/cassandra/CassandraFlowTest.java @@ -79,7 +79,7 @@ public void beforeTest() throws Exception { } @After - public void afterTest() throws TException, InvalidRequestException, UnavailableException { + public void afterTest() throws TException, InvalidRequestException, UnavailableException, TimedOutException { try { client.truncate(columnFamilyName); } finally { @@ -124,7 +124,7 @@ public void testNarrowRowSinkWithStringColumnNamesSpecified() throws Exception { assertEquals("b", getTestBytes("2", "lower")); assertEquals("B", getTestBytes("2", "upper")); } - + @Test public void testWideRowAsSource() throws Exception { client.put(columnFamilyName, toBytes("21"), toBytes("lower"), toBytes("a"));