diff --git a/external/hbase/pom.xml b/external/hbase/pom.xml new file mode 100644 index 0000000000000..9b8c789734834 --- /dev/null +++ b/external/hbase/pom.xml @@ -0,0 +1,140 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent + 1.1.0-SNAPSHOT + ../../pom.xml + + + org.apache.spark + spark-hbase_2.10 + + spark-hbase + + jar + Spark Project External Flume + http://spark.apache.org/ + + + + org.scalatest + scalatest_${scala.binary.version} + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + test-jar + tests + test + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + + + org.apache.hbase + hbase-client + ${hbase-new.version} + + + org.apache.hbase + hbase-client + ${hbase-new.version} + test-jar + tests + test + + + org.apache.hbase + hbase-server + ${hbase-new.version} + + + org.apache.hbase + hbase-server + ${hbase-new.version} + test-jar + tests + test + + + org.apache.hbase + hbase-hadoop1-compat + ${hbase-new.version} + runtime + + + org.apache.hbase + hbase-hadoop1-compat + ${hbase-new.version} + test-jar + tests + test + + + org.apache.hbase + hbase-common + ${hbase-new.version} + + + org.apache.hbase + hbase-common + ${hbase-new.version} + test-jar + tests + test + + + org.apache.hbase + hbase-hadoop-compat + ${hbase-new.version} + test + + + org.apache.hbase + hbase-hadoop-compat + ${hbase-new.version} + test-jar + tests + test + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.scalatest + scalatest-maven-plugin + + + + + \ No newline at end of file diff --git a/external/hbase/src/main/scala/org/apache/spark/hbase/HBaseContext.scala b/external/hbase/src/main/scala/org/apache/spark/hbase/HBaseContext.scala new file mode 100644 index 0000000000000..8ce73b6e4bb58 --- /dev/null +++ b/external/hbase/src/main/scala/org/apache/spark/hbase/HBaseContext.scala @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.hbase + +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.spark.rdd.RDD +import java.io.ByteArrayOutputStream +import java.io.DataOutputStream +import java.io.ByteArrayInputStream +import java.io.DataInputStream +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.client.HConnectionManager +import org.apache.spark.api.java.JavaPairRDD +import java.io.OutputStream +import org.apache.hadoop.hbase.client.HTable +import org.apache.hadoop.hbase.client.Scan +import org.apache.hadoop.hbase.client.Get +import java.util.ArrayList +import org.apache.hadoop.hbase.client.Result +import scala.reflect.ClassTag +import org.apache.hadoop.hbase.client.HConnection +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.client.Increment +import org.apache.hadoop.hbase.client.Delete +import org.apache.spark.SparkContext +import org.apache.hadoop.hbase.mapreduce.TableInputFormat +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.hbase.mapreduce.TableMapper +import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper +import org.apache.hadoop.hbase.util.Base64 +import org.apache.spark.rdd.HadoopRDD +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.SerializableWritable +import java.util.HashMap +import java.util.concurrent.atomic.AtomicInteger +import org.apache.hadoop.hbase.HConstants +import java.util.concurrent.atomic.AtomicLong +import java.util.Timer +import java.util.TimerTask +import org.apache.hadoop.hbase.client.Mutation +import scala.collection.mutable.MutableList +import org.apache.spark.streaming.dstream.DStream + +/** + * HBaseContext is a façade of simple and complex HBase operations + * like bulk put, get, increment, delete, and scan + * + * HBase Context will take the responsibilities to happen to + * complexity of disseminating the configuration information + * to the working and managing the life cycle of HConnections. + * + * First constructor: + * @param sc Active SparkContext + * @param broadcastedConf This is a Broadcast object that holds a + * serializable Configuration object + * + */ +@serializable class HBaseContext(@transient sc: SparkContext, + broadcastedConf: Broadcast[SerializableWritable[Configuration]]) { + + /** + * Second constructor option: + * @param sc Active SparkContext + * @param config Configuration object to make connection to HBase + */ + def this(@transient sc: SparkContext, @transient config: Configuration) { + this(sc, sc.broadcast(new SerializableWritable(config))) + } + + /** + * A simple enrichment of the traditional Spark RDD foreachPartition. + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param RDD Original RDD with data to iterate over + * @param f Function to be given a iterator to iterate through + * the RDD values and a HConnection object to interact + * with HBase + */ + def foreachPartition[T](rdd: RDD[T], + f: (Iterator[T], HConnection) => Unit) = { + rdd.foreachPartition( + it => hbaseForeachPartition(broadcastedConf, it, f)) + } + + /** + * A simple enrichment of the traditional Spark Streaming dStream foreach + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param DStream Original DStream with data to iterate over + * @param f Function to be given a iterator to iterate through + * the DStream values and a HConnection object to + * interact with HBase + */ + def streamForeach[T](dstream: DStream[T], + f: (Iterator[T], HConnection) => Unit) = { + dstream.foreach((rdd, time) => { + foreachPartition(rdd, f) + }) + } + + /** + * A simple enrichment of the traditional Spark RDD mapPartition. + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * Note: Make sure to partition correctly to avoid memory issue when + * getting data from HBase + * + * @param RDD Original RDD with data to iterate over + * @param mp Function to be given a iterator to iterate through + * the RDD values and a HConnection object to interact + * with HBase + * @return Returns a new RDD generated by the user definition + * function just like normal mapPartition + */ + def mapPartition[T, U: ClassTag](rdd: RDD[T], + mp: (Iterator[T], HConnection) => Iterator[U]): RDD[U] = { + + rdd.mapPartitions[U](it => hbaseMapPartition[T, U](broadcastedConf, + it, + mp), true) + } + + /** + * A simple enrichment of the traditional Spark Streaming DStream + * mapPartition. + * + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * Note: Make sure to partition correctly to avoid memory issue when + * getting data from HBase + * + * @param DStream Original DStream with data to iterate over + * @param mp Function to be given a iterator to iterate through + * the DStream values and a HConnection object to + * interact with HBase + * @return Returns a new DStream generated by the user + * definition function just like normal mapPartition + */ + def streamMap[T, U: ClassTag](dstream: DStream[T], + mp: (Iterator[T], HConnection) => Iterator[U]): DStream[U] = { + + dstream.mapPartitions(it => hbaseMapPartition[T, U](broadcastedConf, + it, + mp), true) + } + + /** + * A simple abstraction over the HBaseContext.foreachPartition method. + * + * It allow addition support for a user to take RDD + * and generate puts and send them to HBase. + * The complexity of even the HConnection is + * removed from the developer + * + * @param RDD Original RDD with data to iterate over + * @param tableNm The name of the table to put into + * @param f Function to convert a value in the RDD to a HBase Put + * @autoFlush If autoFlush should be turned on + */ + def bulkPut[T](rdd: RDD[T], tableNm: String, f: (T) => Put, autoFlush: Boolean) { + + rdd.foreachPartition( + it => hbaseForeachPartition[T]( + broadcastedConf, + it, + (iterator, hConnection) => { + val htable = hConnection.getTable(tableNm) + htable.setAutoFlush(autoFlush, true) + iterator.foreach(T => htable.put(f(T))) + htable.flushCommits() + htable.close() + })) + } + + /** + * A simple abstraction over the HBaseContext.streamMapPartition method. + * + * It allow addition support for a user to take a DStream and + * generate puts and send them to HBase. + * + * The complexity of even the HConnection is + * removed from the developer + * + * @param DStream Original DStream with data to iterate over + * @param tableNm The name of the table to put into + * @param f Function to convert a value in the RDD to a HBase Put + * @autoFlush If autoFlush should be turned on + */ + def streamBulkPut[T](dstream: DStream[T], + tableNm: String, + f: (T) => Put, + autoFlush: Boolean) = { + dstream.foreach((rdd, time) => { + bulkPut(rdd, tableNm, f, autoFlush) + }) + } + + /** + * A simple abstraction over the HBaseContext.foreachPartition method. + * + * It allow addition support for a user to take a RDD and + * generate increments and send them to HBase. + * + * The complexity of even the HConnection is + * removed from the developer + * + * @param RDD Original RDD with data to iterate over + * @param tableNm The name of the table to increment to + * @param f Function to convert a value in the RDD to a + * HBase Increments + * @batchSize The number of increments to batch before sending to HBase + */ + def bulkIncrement[T](rdd: RDD[T], tableNm:String, f:(T) => Increment, batchSize: Int) { + bulkMutation(rdd, tableNm, f, batchSize) + } + + /** + * A simple abstraction over the HBaseContext.foreachPartition method. + * + * It allow addition support for a user to take a RDD and generate delete + * and send them to HBase. The complexity of even the HConnection is + * removed from the developer + * + * @param RDD Original RDD with data to iterate over + * @param tableNm The name of the table to delete from + * @param f function to convert a value in the RDD to a + * HBase Deletes + * @batchSize The number of delete to batch before sending to HBase + */ + def bulkDelete[T](rdd: RDD[T], tableNm:String, f:(T) => Delete, batchSize: Int) { + bulkMutation(rdd, tableNm, f, batchSize) + } + + /** + * Under lining function to support all bulk mutations + * + * May be opened up if requested + */ + private def bulkMutation[T](rdd: RDD[T], tableNm: String, f: (T) => Mutation, batchSize: Int) { + rdd.foreachPartition( + it => hbaseForeachPartition[T]( + broadcastedConf, + it, + (iterator, hConnection) => { + val htable = hConnection.getTable(tableNm) + val mutationList = new ArrayList[Mutation] + iterator.foreach(T => { + mutationList.add(f(T)) + if (mutationList.size >= batchSize) { + htable.batch(mutationList) + mutationList.clear() + } + }) + if (mutationList.size() > 0) { + htable.batch(mutationList) + mutationList.clear() + } + htable.close() + })) + } + + /** + * A simple abstraction over the HBaseContext.streamForeach method. + * + * It allow addition support for a user to take a DStream and + * generate Increments and send them to HBase. + * + * The complexity of even the HConnection is + * removed from the developer + * + * @param DStream Original DStream with data to iterate over + * @param tableNm The name of the table to increments into + * @param f function to convert a value in the RDD to a + * HBase Increments + * @batchSize The number of increments to batch before sending to HBase + */ + def streamBulkIncrement[T](dstream: DStream[T], + tableNm: String, + f: (T) => Increment, + batchSize: Int) = { + streamBulkMutation(dstream, tableNm, f, batchSize) + } + + /** + * A simple abstraction over the HBaseContext.streamForeach method. + * + * It allow addition support for a user to take a DStream and + * generate Delete and send them to HBase. + * + * The complexity of even the HConnection is + * removed from the developer + * + * @param DStream Original DStream with data to iterate over + * @param tableNm The name of the table to delete from + * @param f function to convert a value in the RDD to a + * HBase Delete + * @batchSize The number of deletes to batch before sending to HBase + */ + def streamBulkDelete[T](dstream: DStream[T], + tableNm: String, + f: (T) => Delete, + batchSize: Int) = { + streamBulkMutation(dstream, tableNm, f, batchSize) + } + + /** + * Under lining function to support all bulk streaming mutations + * + * May be opened up if requested + */ + private def streamBulkMutation[T](dstream: DStream[T], + tableNm: String, + f: (T) => Mutation, + batchSize: Int) = { + dstream.foreach((rdd, time) => { + bulkMutation(rdd, tableNm, f, batchSize) + }) + } + + + /** + * A simple abstraction over the HBaseContext.mapPartition method. + * + * It allow addition support for a user to take a RDD and generates a + * new RDD based on Gets and the results they bring back from HBase + * + * @param RDD Original RDD with data to iterate over + * @param tableNm The name of the table to get from + * @param makeGet Function to convert a value in the RDD to a + * HBase Get + * @param convertResult This will convert the HBase Result object to + * what ever the user wants to put in the resulting + * RDD + * return New RDD that is created by the Get to HBase + */ + def bulkGet[T, U: ClassTag](tableNm: String, + batchSize: Int, + rdd: RDD[T], + makeGet: (T) => Get, + convertResult: (Result) => U): RDD[U] = { + + val getMapPartition = new GetMapPartition(tableNm, + batchSize, + makeGet, + convertResult) + + rdd.mapPartitions[U](it => hbaseMapPartition[T, U](broadcastedConf, + it, + getMapPartition.run), true) + } + + /** + * A simple abstraction over the HBaseContext.streamMap method. + * + * It allow addition support for a user to take a DStream and + * generates a new DStream based on Gets and the results + * they bring back from HBase + * + * @param DStream Original DStream with data to iterate over + * @param tableNm The name of the table to get from + * @param makeGet Function to convert a value in the DStream to a + * HBase Get + * @param convertResult This will convert the HBase Result object to + * what ever the user wants to put in the resulting + * DStream + * return new DStream that is created by the Get to HBase + */ + def streamBulkGet[T, U: ClassTag](tableNm: String, + batchSize:Int, + dstream: DStream[T], + makeGet: (T) => Get, + convertResult: (Result) => U): DStream[U] = { + + val getMapPartition = new GetMapPartition(tableNm, + batchSize, + makeGet, + convertResult) + + dstream.mapPartitions[U](it => hbaseMapPartition[T, U](broadcastedConf, + it, + getMapPartition.run), true) + } + + /** + * This function will use the native HBase TableInputFormat with the + * given scan object to generate a new RDD + * + * @param tableNm The name of the table to scan + * @param scan The HBase scan object to use to read data from HBase + * @param f Function to convert a Result object from HBase into + * what the user wants in the final generated RDD + * @return New RDD with results from scan + */ + def hbaseRDD[U: ClassTag](tableNm: String, scan: Scan, f: ((ImmutableBytesWritable, Result)) => U): RDD[U] = { + + var job: Job = new Job(broadcastedConf.value.value) + + TableMapReduceUtil.initTableMapperJob(tableNm, scan, classOf[IdentityTableMapper], null, null, job) + + sc.newAPIHadoopRDD(job.getConfiguration(), + classOf[TableInputFormat], + classOf[ImmutableBytesWritable], + classOf[Result]).map(f) + } + + /** + * A overloaded version of HBaseContext hbaseRDD that predefines the + * type of the outputing RDD + * + * @param tableNm The name of the table to scan + * @param scan The HBase scan object to use to read data from HBase + * @return New RDD with results from scan + * + */ + def hbaseRDD(tableNm: String, scans: Scan): RDD[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])] = { + hbaseRDD[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])]( + tableNm, + scans, + (r: (ImmutableBytesWritable, Result)) => { + val it = r._2.list().iterator() + val list = new ArrayList[(Array[Byte], Array[Byte], Array[Byte])]() + + while (it.hasNext()) { + val kv = it.next() + list.add((kv.getFamily(), kv.getQualifier(), kv.getValue())) + } + + (r._1.copyBytes(), list) + }) + } + + /** + * Under lining wrapper all foreach functions in HBaseContext + * + */ + private def hbaseForeachPartition[T](configBroadcast: Broadcast[SerializableWritable[Configuration]], + it: Iterator[T], + f: (Iterator[T], HConnection) => Unit) = { + + val config = configBroadcast.value.value + + val hConnection = HConnectionStaticCache.getHConnection(config) + try { + f(it, hConnection) + } finally { + HConnectionStaticCache.finishWithHConnection(config, hConnection) + } + } + + /** + * Under lining wrapper all mapPartition functions in HBaseContext + * + */ + private def hbaseMapPartition[K, U](configBroadcast: Broadcast[SerializableWritable[Configuration]], + it: Iterator[K], + mp: (Iterator[K], HConnection) => Iterator[U]): Iterator[U] = { + + val config = configBroadcast.value.value + + val hConnection = HConnectionStaticCache.getHConnection(config) + + try { + val res = mp(it, hConnection) + res + } finally { + HConnectionStaticCache.finishWithHConnection(config, hConnection) + } + } + + /** + * Under lining wrapper all get mapPartition functions in HBaseContext + * + */ + @serializable private class GetMapPartition[T, U: ClassTag](tableNm: String, + batchSize: Int, + makeGet: (T) => Get, + convertResult: (Result) => U) { + + + def run(iterator: Iterator[T], hConnection: HConnection): Iterator[U] = { + val htable = hConnection.getTable(tableNm) + + val gets = new ArrayList[Get]() + var res = List[U]() + + while (iterator.hasNext) { + gets.add(makeGet(iterator.next)) + + if (gets.size() == batchSize) { + var results = htable.get(gets) + res = res ++ results.map(convertResult) + gets.clear() + } + } + if (gets.size() > 0) { + val results = htable.get(gets) + res = res ++ results.map(convertResult) + gets.clear() + } + htable.close() + res.iterator + } + } +} \ No newline at end of file diff --git a/external/hbase/src/main/scala/org/apache/spark/hbase/HConnectionStaticCache.scala b/external/hbase/src/main/scala/org/apache/spark/hbase/HConnectionStaticCache.scala new file mode 100644 index 0000000000000..4d5791e361413 --- /dev/null +++ b/external/hbase/src/main/scala/org/apache/spark/hbase/HConnectionStaticCache.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.hbase + +import org.apache.hadoop.hbase.client.HConnection +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong +import java.util.Timer +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.HConstants +import org.apache.hadoop.hbase.client.HConnectionManager +import java.util.TimerTask +import scala.collection.mutable.MutableList +import org.apache.spark.Logging +import scala.collection.mutable.SynchronizedMap +import scala.collection.mutable.HashMap + +/** + * A static caching class that will manage all HConnection in a worker + * + * The main idea is there is a hashMap with + * HConstants.HBASE_CLIENT_INSTANCE_ID which is ("hbase.client.instance.id") + * + * In that HashMap there is three things + * - HConnection + * - Number of checked out users of the HConnection + * - Time since the HConnection was last used + * + * There is also a Timer thread that will start up every 2 minutes + * When the Timer thread starts up it will look for HConnection with no + * checked out users and a last used time that is older then 1 minute. + * + * This class is not intended to be used by Users + */ +object HConnectionStaticCache extends Logging { + @transient private val hconnectionMap = + new HashMap[String, (HConnection, AtomicInteger, AtomicLong)] with SynchronizedMap[String, (HConnection, AtomicInteger, AtomicLong)] + + @transient private val hconnectionTimeout = 60000 + + @transient private val hconnectionCleaner = new Timer + + hconnectionCleaner.schedule(new hconnectionCleanerTask, hconnectionTimeout * 2) + + /** + * Gets or starts a HConnection based on a config object + */ + def getHConnection(config: Configuration): HConnection = { + val instanceId = config.get(HConstants.HBASE_CLIENT_INSTANCE_ID) + var hconnectionAndCounter = hconnectionMap.get(instanceId).getOrElse(null) + if (hconnectionAndCounter == null) { + hconnectionMap.synchronized { + hconnectionAndCounter = hconnectionMap.get(instanceId).getOrElse(null) + if (hconnectionAndCounter == null) { + + val hConnection = HConnectionManager.createConnection(config) + hconnectionAndCounter = (hConnection, new AtomicInteger, new AtomicLong) + hconnectionMap.put(instanceId, hconnectionAndCounter) + + } + } + logDebug("Created hConnection '" + instanceId + "'"); + } else { + logDebug("Get hConnection from cache '" + instanceId + "'"); + } + hconnectionAndCounter._2.incrementAndGet() + return hconnectionAndCounter._1 + } + + /** + * tell us a thread is no longer using a HConnection + */ + def finishWithHConnection(config: Configuration, hconnection: HConnection) { + val instanceId = config.get(HConstants.HBASE_CLIENT_INSTANCE_ID) + + var hconnectionAndCounter = hconnectionMap.get(instanceId).getOrElse(null) + if (hconnectionAndCounter != null) { + var usesLeft = hconnectionAndCounter._2.decrementAndGet() + if (usesLeft < 0) { + hconnectionAndCounter._2.set(0) + usesLeft = 0 + } + if (usesLeft == 0) { + hconnectionAndCounter._3.set(System.currentTimeMillis()) + logDebug("Finished last use of hconnection '" + instanceId + "'"); + } else { + logDebug("Finished a use of hconnection '" + instanceId + "' with " + usesLeft + " uses left"); + } + } else { + logWarning("Tried to remove use of '" + instanceId + "' but nothing was there"); + } + } + + /** + * The timer thread that cleans up the HashMap of Collections + */ + protected class hconnectionCleanerTask extends TimerTask { + override def run() { + + logDebug("Running hconnectionCleanerTask:" + hconnectionMap.size); + + val removeList = new MutableList[String] + + hconnectionMap.foreach(entry => { + if (entry._1 == 0 && + entry._2._3.get() + 60000 < System.currentTimeMillis()) { + removeList.+=(entry._1) + } + }) + + if (removeList.length > 0) { + hconnectionMap.synchronized { + removeList.foreach(key => { + val v = hconnectionMap.get(key).getOrElse(null) + if (v != null) { + if (v._2.get() == 0 && + v._3.get() + 60000 < System.currentTimeMillis()) { + + logDebug("closing hconnection: " + key); + + v._1.close() + + hconnectionMap.remove(key); + } + } else { + logWarning("Tried to remove use of '" + key + "' but nothing was there"); + } + }) + } + } + } + } + +} \ No newline at end of file diff --git a/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseBulkDeleteExample.scala b/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseBulkDeleteExample.scala new file mode 100644 index 0000000000000..d0db4945e9732 --- /dev/null +++ b/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseBulkDeleteExample.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.hbase.example + +import org.apache.spark.SparkContext +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.util.Bytes +import org.apache.spark.hbase.HBaseContext +import org.apache.hadoop.hbase.client.Delete + +object HBaseBulkDeleteExample { + def main(args: Array[String]) { + if (args.length == 0) { + System.out.println("HBaseBulkDeletesExample {master} {tableName} {columnFamily}"); + return; + } + + val master = args(0); + val tableName = args(1); + + val sc = new SparkContext(master, "HBaseBulkDeletesExample"); + sc.addJar("SparkHBase.jar") + + //[Array[Byte]] + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1")), + (Bytes.toBytes("2")), + (Bytes.toBytes("3")), + (Bytes.toBytes("4")), + (Bytes.toBytes("5")) + ) + ) + + val conf = HBaseConfiguration.create(); + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")); + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); + + val hbaseContext = new HBaseContext(sc, conf); + hbaseContext.bulkDelete[Array[Byte]](rdd, + tableName, + putRecord => new Delete(putRecord), + 4); + } +} \ No newline at end of file diff --git a/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseBulkGetExample.scala b/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseBulkGetExample.scala new file mode 100644 index 0000000000000..e8ae629b89054 --- /dev/null +++ b/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseBulkGetExample.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.hbase.example + +import org.apache.spark.SparkContext +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Get +import org.apache.hadoop.hbase.client.Result +import org.apache.spark.hbase.HBaseContext + +object HBaseBulkGetExample { + def main(args: Array[String]) { + if (args.length == 0) { + System.out.println("GenerateGraphs {master} {tableName}"); + return ; + } + + val master = args(0); + val tableName = args(1); + + val sc = new SparkContext(master, "HBaseBulkGetExample"); + sc.addJar("SparkHBase.jar") + + //[(Array[Byte])] + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1")), + (Bytes.toBytes("2")), + (Bytes.toBytes("3")), + (Bytes.toBytes("4")), + (Bytes.toBytes("5")), + (Bytes.toBytes("6")), + (Bytes.toBytes("7")))) + + val conf = HBaseConfiguration.create() + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")) + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")) + + val hbaseContext = new HBaseContext(sc, conf); + + val getRdd = hbaseContext.bulkGet[Array[Byte], String]( + tableName, + 2, + rdd, + record => { + System.out.println("making Get" ) + new Get(record) + }, + (result: Result) => { + + val it = result.list().iterator() + val B = new StringBuilder + + B.append(Bytes.toString(result.getRow()) + ":") + + while (it.hasNext()) { + val kv = it.next() + val q = Bytes.toString(kv.getQualifier()) + if (q.equals("counter")) { + B.append("(" + Bytes.toString(kv.getQualifier()) + "," + Bytes.toLong(kv.getValue()) + ")") + } else { + B.append("(" + Bytes.toString(kv.getQualifier()) + "," + Bytes.toString(kv.getValue()) + ")") + } + } + B.toString + }) + + + getRdd.collect.foreach(v => System.out.println(v)) + + } +} \ No newline at end of file diff --git a/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseBulkIncrementExample.scala b/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseBulkIncrementExample.scala new file mode 100644 index 0000000000000..b44d1f614f369 --- /dev/null +++ b/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseBulkIncrementExample.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.hbase.example + +import org.apache.spark.SparkContext +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Increment +import org.apache.spark.hbase.HBaseContext + +object HBaseBulkIncrementExample { + def main(args: Array[String]) { + if (args.length == 0) { + System.out.println("GenerateGraphs {master} {tableName} {columnFamily}"); + return; + } + + val master = args(0); + val tableName = args(1); + val columnFamily = args(2); + + val sc = new SparkContext(master, "HBaseBulkIncrementsExample"); + sc.addJar("SparkHBase.jar") + + //[(Array[Byte], Array[(Array[Byte], Array[Byte], Long)])] + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("counter"), 1L))), + (Bytes.toBytes("2"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("counter"), 2L))), + (Bytes.toBytes("3"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("counter"), 3L))), + (Bytes.toBytes("4"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("counter"), 4L))), + (Bytes.toBytes("5"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("counter"), 5L))) + ) + ); + + val conf = HBaseConfiguration.create(); + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")); + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); + + val hbaseContext = new HBaseContext(sc, conf); + hbaseContext.bulkIncrement[(Array[Byte], Array[(Array[Byte], Array[Byte], Long)])](rdd, + tableName, + (incrementRecord) => { + val increment = new Increment(incrementRecord._1) + incrementRecord._2.foreach((incrementValue) => + increment.addColumn(incrementValue._1, incrementValue._2, incrementValue._3)) + increment + }, + 4); + } +} \ No newline at end of file diff --git a/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseBulkPutExample.scala b/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseBulkPutExample.scala new file mode 100644 index 0000000000000..d2e2dd789722d --- /dev/null +++ b/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseBulkPutExample.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.hbase.example + +import org.apache.spark.SparkContext +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Put +import org.apache.spark.hbase.HBaseContext + +object HBaseBulkPutExample { + def main(args: Array[String]) { + if (args.length == 0) { + System.out.println("HBaseBulkPutExample {master} {tableName} {columnFamily}"); + return; + } + + val master = args(0); + val tableName = args(1); + val columnFamily = args(2); + + val sc = new SparkContext(master, "HBaseBulkPutExample"); + sc.addJar("SparkHBase.jar") + + //[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])] + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))), + (Bytes.toBytes("2"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))), + (Bytes.toBytes("3"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))), + (Bytes.toBytes("4"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))), + (Bytes.toBytes("5"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5")))) + ) + ) + + val conf = HBaseConfiguration.create(); + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")); + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); + + val hbaseContext = new HBaseContext(sc, conf); + hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd, + tableName, + (putRecord) => { + val put = new Put(putRecord._1) + putRecord._2.foreach((putValue) => put.add(putValue._1, putValue._2, putValue._3)) + put + }, + true); + } +} \ No newline at end of file diff --git a/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseBulkPutExampleFromFile.scala b/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseBulkPutExampleFromFile.scala new file mode 100644 index 0000000000000..233b4fba117e5 --- /dev/null +++ b/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseBulkPutExampleFromFile.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.hbase.example + +import org.apache.spark.SparkContext +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Put +import org.apache.spark.hbase.HBaseContext +import org.apache.hadoop.mapred.TextInputFormat +import org.apache.hadoop.io.LongWritable +import org.apache.hadoop.io.Text + +object HBaseBulkPutExampleFromFile { + def main(args: Array[String]) { + if (args.length == 0) { + System.out.println("HBaseBulkPutExample {master} {tableName} {columnFamily}"); + return; + } + + val master = args(0) + val tableName = args(1) + val columnFamily = args(2) + val inputFile = args(3) + + val sc = new SparkContext(master, "HBaseBulkPutExample"); + sc.addJar("SparkHBase.jar") + + var rdd = sc.hadoopFile( + inputFile, + classOf[TextInputFormat], + classOf[LongWritable], + classOf[Text]).map(v => { + System.out.println("reading-" + v._2.toString()) + v._2.toString() + }) + + val conf = HBaseConfiguration.create(); + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")); + conf.addResource(new Path("/etc/hbase/conf/hdfs-site.xml")); + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); + + val hbaseContext = new HBaseContext(sc, conf); + hbaseContext.bulkPut[String](rdd, + tableName, + (putRecord) => { + System.out.println("hbase-" + putRecord) + val put = new Put(Bytes.toBytes("Value- " + putRecord)) + put.add(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes(putRecord.length())) + put + }, + true); + } +} \ No newline at end of file diff --git a/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseBulkPutTimestampExample.scala b/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseBulkPutTimestampExample.scala new file mode 100644 index 0000000000000..5c4205ba3ff02 --- /dev/null +++ b/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseBulkPutTimestampExample.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.hbase.example + +import org.apache.spark.SparkContext +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Put +import org.apache.spark.hbase.HBaseContext + +object HBaseBulkPutTimestampExample { + def main(args: Array[String]) { + if (args.length == 0) { + System.out.println("HBaseBulkPutTimestampExample {master} {tableName} {columnFamily}"); + return ; + } + + val master = args(0); + val tableName = args(1); + val columnFamily = args(2); + + val sc = new SparkContext(master, "HBaseBulkPutTimestampExample"); + sc.addJar("SparkHBase.jar") + + val rdd = sc.parallelize(Array( + (Bytes.toBytes("6"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))), + (Bytes.toBytes("7"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))), + (Bytes.toBytes("8"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))), + (Bytes.toBytes("9"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))), + (Bytes.toBytes("10"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5")))))); + + val conf = HBaseConfiguration.create(); + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")); + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); + + val timeStamp = System.currentTimeMillis() + + val hbaseContext = new HBaseContext(sc, conf); + hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd, + tableName, + (putRecord) => { + val put = new Put(putRecord._1) + putRecord._2.foreach((putValue) => put.add(putValue._1, putValue._2, timeStamp, putValue._3)) + put + }, + true); + } +} \ No newline at end of file diff --git a/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseDistributeScanExample.scala b/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseDistributeScanExample.scala new file mode 100644 index 0000000000000..2c336bb918349 --- /dev/null +++ b/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseDistributeScanExample.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.hbase.example + +import org.apache.spark.SparkContext +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Get +import org.apache.hadoop.hbase.client.Result +import org.apache.spark.hbase.HBaseContext +import org.apache.hadoop.hbase.client.Scan +import java.util.ArrayList + + +object HBaseDistributedScanExample { + def main(args: Array[String]) { + if (args.length == 0) { + System.out.println("GenerateGraphs {master} {tableName}") + return ; + } + + val master = args(0); + val tableName = args(1); + + val sc = new SparkContext(master, "HBaseDistributedScanExample") + sc.addJar("SparkHBase.jar") + + val conf = HBaseConfiguration.create() + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")) + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")) + + val hbaseContext = new HBaseContext(sc, conf) + + var scan = new Scan() + scan.setCaching(100) + + var getRdd = hbaseContext.hbaseRDD(tableName, scan) + + getRdd.collect.foreach(v => System.out.println(Bytes.toString(v._1))) + + } +} \ No newline at end of file diff --git a/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseStreamingBulkPutExample.scala b/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseStreamingBulkPutExample.scala new file mode 100644 index 0000000000000..2b2215b67c67c --- /dev/null +++ b/external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseStreamingBulkPutExample.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.hbase.example + +import org.apache.spark.SparkContext +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Put +import org.apache.spark.hbase.HBaseContext +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.Seconds +import org.apache.spark.SparkConf + +object HBaseStreamingBulkPutExample { + def main(args: Array[String]) { + if (args.length == 0) { + System.out.println("HBaseStreamingBulkPutExample {master} {host} {port} {tableName} {columnFamily}"); + return; + } + + val master = args(0); + val host = args(1); + val port = args(2); + val tableName = args(3); + val columnFamily = args(4); + + System.out.println("master:" + master) + System.out.println("host:" + host) + System.out.println("port:" + Integer.parseInt(port)) + System.out.println("tableName:" + tableName) + System.out.println("columnFamily:" + columnFamily) + + val sparkConf = new SparkConf(); + + sparkConf.set("spark.cleaner.ttl", "120000"); + + val sc = new SparkContext(master, "HBaseStreamingBulkPutExample", sparkConf) + sc.addJar("SparkHBase.jar") + + val ssc = new StreamingContext(sc, Seconds(1)) + + val lines = ssc.socketTextStream(host, Integer.parseInt(port)) + + val conf = HBaseConfiguration.create(); + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")); + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); + + val hbaseContext = new HBaseContext(sc, conf); + + hbaseContext.streamBulkPut[String](lines, + tableName, + (putRecord) => { + if (putRecord.length() > 0) { + val put = new Put(Bytes.toBytes(putRecord)) + put.add(Bytes.toBytes("c"), Bytes.toBytes("foo"), Bytes.toBytes("bar")) + put + } else { + null + } + }, + false); + + ssc.start(); + + + } +} \ No newline at end of file diff --git a/external/hbase/src/test/scala/org/apache/spark/hbase/HBaseContextSuite.scala b/external/hbase/src/test/scala/org/apache/spark/hbase/HBaseContextSuite.scala new file mode 100644 index 0000000000000..bd2fdaa98e722 --- /dev/null +++ b/external/hbase/src/test/scala/org/apache/spark/hbase/HBaseContextSuite.scala @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.hbase + +import org.apache.spark.hbase.HBaseContext; +import org.apache.spark.streaming.StreamingContext +import org.scalatest.FunSuite +import org.apache.spark.SparkContext._ +import org.apache.spark._ +import org.apache.hadoop.hbase.HBaseTestingUtility +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.HTable +import org.apache.hadoop.hbase.client.Scan +import org.apache.hadoop.hbase.client.Get +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.client.HConnection +import org.apache.hadoop.hbase.client.HConnectionManager +import org.apache.hadoop.hbase.client.Increment +import org.apache.hadoop.hbase.client.Delete +import org.apache.hadoop.hbase.client.Result +import org.apache.hadoop.conf.Configuration + +class HBaseContextSuite extends FunSuite with LocalSparkContext { + + + val tableName = "t1" + val columnFamily = "c" + + val htu = HBaseTestingUtility.createLocalHTU() + + + htu.cleanupTestDir() + println("starting miniclusterFooBar") + htu.startMiniZKCluster(); + htu.startMiniHBaseCluster(1, 1); + println(" - minicluster started") + try { + htu.deleteTable(Bytes.toBytes(tableName)) + } catch { + case e: Exception => { + println(" - no table " + tableName + " found") + } + } + println(" - creating table " + tableName) + htu.createTable(Bytes.toBytes(tableName), Bytes.toBytes(columnFamily)) + println(" - created table") + + override def beforeAll() { + + } + + override def afterAll() { + htu.deleteTable(Bytes.toBytes(tableName)) + println("shuting down minicluster") + htu.shutdownMiniHBaseCluster() + htu.shutdownMiniZKCluster() + println(" - minicluster shut down") + htu.cleanupTestDir() + } + + test("bulkput to test HBase client") { + val config = htu.getConfiguration + val sc = new SparkContext("local", "test") + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")))), + (Bytes.toBytes("2"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("foo2")))), + (Bytes.toBytes("3"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("c"), Bytes.toBytes("foo3")))), + (Bytes.toBytes("4"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("d"), Bytes.toBytes("foo")))), + (Bytes.toBytes("5"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("e"), Bytes.toBytes("bar")))))) + + val hbaseContext = new HBaseContext(sc, config); + hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd, + tableName, + (putRecord) => { + + val put = new Put(putRecord._1) + putRecord._2.foreach((putValue) => put.add(putValue._1, putValue._2, putValue._3)) + put + + }, + true); + + val connection = HConnectionManager.createConnection(config) + val htable = connection.getTable(Bytes.toBytes("t1")) + + assert(Bytes.toString(htable.get(new Get(Bytes.toBytes("1"))). + getColumnLatest(Bytes.toBytes(columnFamily), Bytes.toBytes("a")). + getValue()).equals("foo1")) + + assert(Bytes.toString(htable.get(new Get(Bytes.toBytes("2"))). + getColumnLatest(Bytes.toBytes(columnFamily), Bytes.toBytes("b")). + getValue()).equals("foo2")) + + assert(Bytes.toString(htable.get(new Get(Bytes.toBytes("3"))). + getColumnLatest(Bytes.toBytes(columnFamily), Bytes.toBytes("c")). + getValue()).equals("foo3")) + + assert(Bytes.toString(htable.get(new Get(Bytes.toBytes("4"))). + getColumnLatest(Bytes.toBytes(columnFamily), Bytes.toBytes("d")). + getValue()).equals("foo")) + + assert(Bytes.toString(htable.get(new Get(Bytes.toBytes("5"))). + getColumnLatest(Bytes.toBytes(columnFamily), Bytes.toBytes("e")). + getValue()).equals("bar")) + } + + test("bulkIncrement to test HBase client") { + val config = htu.getConfiguration + val sc = new SparkContext("local", "test") + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("counter"), 1L))), + (Bytes.toBytes("2"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("counter"), 2L))), + (Bytes.toBytes("3"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("counter"), 3L))), + (Bytes.toBytes("4"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("counter"), 4L))), + (Bytes.toBytes("5"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("counter"), 5L))))) + + val hbaseContext = new HBaseContext(sc, config); + + hbaseContext.bulkIncrement[(Array[Byte], Array[(Array[Byte], Array[Byte], Long)])](rdd, + tableName, + (incrementRecord) => { + val increment = new Increment(incrementRecord._1) + incrementRecord._2.foreach((incrementValue) => + increment.addColumn(incrementValue._1, incrementValue._2, incrementValue._3)) + increment + }, + 4); + + hbaseContext.bulkIncrement[(Array[Byte], Array[(Array[Byte], Array[Byte], Long)])](rdd, + tableName, + (incrementRecord) => { + val increment = new Increment(incrementRecord._1) + incrementRecord._2.foreach((incrementValue) => + increment.addColumn(incrementValue._1, incrementValue._2, incrementValue._3)) + increment + }, + 4); + + val connection = HConnectionManager.createConnection(config) + val htable = connection.getTable(Bytes.toBytes("t1")) + + assert(Bytes.toLong(htable.get(new Get(Bytes.toBytes("1"))). + getColumnLatest(Bytes.toBytes(columnFamily), Bytes.toBytes("counter")). + getValue()) == 2L) + + assert(Bytes.toLong(htable.get(new Get(Bytes.toBytes("2"))). + getColumnLatest(Bytes.toBytes(columnFamily), Bytes.toBytes("counter")). + getValue()) == 4L) + + assert(Bytes.toLong(htable.get(new Get(Bytes.toBytes("3"))). + getColumnLatest(Bytes.toBytes(columnFamily), Bytes.toBytes("counter")). + getValue()) == 6L) + + assert(Bytes.toLong(htable.get(new Get(Bytes.toBytes("4"))). + getColumnLatest(Bytes.toBytes(columnFamily), Bytes.toBytes("counter")). + getValue()) == 8L) + + assert(Bytes.toLong(htable.get(new Get(Bytes.toBytes("5"))). + getColumnLatest(Bytes.toBytes(columnFamily), Bytes.toBytes("counter")). + getValue()) == 10L) + } + + test("bulkDelete to test HBase client") { + val config = htu.getConfiguration + val connection = HConnectionManager.createConnection(config) + val htable = connection.getTable(Bytes.toBytes("t1")) + + var put = new Put(Bytes.toBytes("delete1")) + put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")) + htable.put(put) + put = new Put(Bytes.toBytes("delete2")) + put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2")) + htable.put(put) + put = new Put(Bytes.toBytes("delete3")) + put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + htable.put(put) + + val sc = new SparkContext("local", "test") + val rdd = sc.parallelize(Array( + (Bytes.toBytes("delete1")), + (Bytes.toBytes("delete3")))) + + val hbaseContext = new HBaseContext(sc, config); + hbaseContext.bulkDelete[Array[Byte]](rdd, + tableName, + putRecord => new Delete(putRecord), + 4); + + assert(htable.get(new Get(Bytes.toBytes("delete1"))). + getColumnLatest(Bytes.toBytes(columnFamily), Bytes.toBytes("a")) == null) + assert(htable.get(new Get(Bytes.toBytes("delete3"))). + getColumnLatest(Bytes.toBytes(columnFamily), Bytes.toBytes("a")) == null) + assert(Bytes.toString(htable.get(new Get(Bytes.toBytes("delete2"))). + getColumnLatest(Bytes.toBytes(columnFamily), Bytes.toBytes("a")). + getValue()).equals("foo2")) + } + + test("bulkGet to test HBase client") { + val config = htu.getConfiguration + val connection = HConnectionManager.createConnection(config) + val htable = connection.getTable(Bytes.toBytes("t1")) + + var put = new Put(Bytes.toBytes("get1")) + put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")) + htable.put(put) + put = new Put(Bytes.toBytes("get2")) + put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2")) + htable.put(put) + put = new Put(Bytes.toBytes("get3")) + put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + htable.put(put) + + val sc = new SparkContext("local", "test") + val rdd = sc.parallelize(Array( + (Bytes.toBytes("get1")), + (Bytes.toBytes("get2")), + (Bytes.toBytes("get3")), + (Bytes.toBytes("get4")))) + + val hbaseContext = new HBaseContext(sc, config); + + val getRdd = hbaseContext.bulkGet[Array[Byte], String]( + tableName, + 2, + rdd, + record => { + System.out.println("making Get") + new Get(record) + }, + (result: Result) => { + println("result.isEmpty(): " + result.isEmpty()) + if (result.list() != null) { + val it = result.list().iterator() + val B = new StringBuilder + + B.append(Bytes.toString(result.getRow()) + ":") + + while (it.hasNext()) { + val kv = it.next() + val q = Bytes.toString(kv.getQualifier()) + if (q.equals("counter")) { + B.append("(" + Bytes.toString(kv.getQualifier()) + "," + Bytes.toLong(kv.getValue()) + ")") + } else { + B.append("(" + Bytes.toString(kv.getQualifier()) + "," + Bytes.toString(kv.getValue()) + ")") + } + } + B.toString + } else { + null + } + }) + + val getArray = getRdd.collect + + assert(getArray.length == 4) + assert(getArray.contains("get1:(a,foo1)")) + assert(getArray.contains("get2:(a,foo2)")) + assert(getArray.contains("get3:(a,foo3)")) + + } + + test("distributedScan to test HBase client") { + val config = htu.getConfiguration + val connection = HConnectionManager.createConnection(config) + val htable = connection.getTable(Bytes.toBytes("t1")) + + var put = new Put(Bytes.toBytes("scan1")) + put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")) + htable.put(put) + put = new Put(Bytes.toBytes("scan2")) + put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2")) + htable.put(put) + put = new Put(Bytes.toBytes("scan3")) + put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + htable.put(put) + put = new Put(Bytes.toBytes("scan4")) + put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + htable.put(put) + put = new Put(Bytes.toBytes("scan5")) + put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + htable.put(put) + + val sc = new SparkContext("local", "test") + val hbaseContext = new HBaseContext(sc, config) + + var scan = new Scan() + scan.setCaching(100) + scan.setStartRow(Bytes.toBytes("scan2")) + scan.setStopRow(Bytes.toBytes("scan4_")) + + val scanRdd = hbaseContext.hbaseRDD(tableName, scan) + + val scanList = scanRdd.collect + + assert(scanList.length == 3) + + } + +} \ No newline at end of file diff --git a/external/hbase/src/test/scala/org/apache/spark/hbase/LocalSparkContext.scala b/external/hbase/src/test/scala/org/apache/spark/hbase/LocalSparkContext.scala new file mode 100644 index 0000000000000..659e5f947120c --- /dev/null +++ b/external/hbase/src/test/scala/org/apache/spark/hbase/LocalSparkContext.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.hbase + +import _root_.io.netty.util.internal.logging.{Slf4JLoggerFactory, InternalLoggerFactory} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.BeforeAndAfterEach +import org.scalatest.Suite +import org.apache.spark.SparkContext + +/** Manages a local `sc` {@link SparkContext} variable, correctly stopping it after each test. */ +trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self: Suite => + + @transient var sc: SparkContext = _ + + override def beforeAll() { + InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory()) + super.beforeAll() + } + + override def afterEach() { + resetSparkContext() + super.afterEach() + } + + def resetSparkContext() = { + LocalSparkContext.stop(sc) + sc = null + } + +} + +object LocalSparkContext { + def stop(sc: SparkContext) { + if (sc != null) { + sc.stop() + } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port") + } + + /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */ + def withSpark[T](sc: SparkContext)(f: SparkContext => T) = { + try { + f(sc) + } finally { + stop(sc) + } + } + +} diff --git a/pom.xml b/pom.xml index 4e2d64a833640..bc7d9f82b22a5 100644 --- a/pom.xml +++ b/pom.xml @@ -102,6 +102,7 @@ external/flume external/zeromq external/mqtt + external/hbase examples @@ -123,6 +124,7 @@ 2.4.1 ${hadoop.version} 0.94.6 + 0.96.2-hadoop1 3.4.5 0.12.0 1.4.3