Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions examples/src/main/python/hbase_inputformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

import sys
import json

from pyspark import SparkContext

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're expanding the example to illustrate dealing with Result that contains multiple records, it would probably be a good idea to expand the sample data illustrated in the comments below to make use of that feature (i.e. have the sample data have multiple records (column families?)).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea. I will modify the sample data to show multiple records in a columnFamily and columnFamily:qualifier

Expand All @@ -25,24 +26,24 @@
hbase(main):016:0> create 'test', 'f1'
0 row(s) in 1.0430 seconds

hbase(main):017:0> put 'test', 'row1', 'f1', 'value1'
hbase(main):017:0> put 'test', 'row1', 'f1:a', 'value1'
0 row(s) in 0.0130 seconds

hbase(main):018:0> put 'test', 'row2', 'f1', 'value2'
hbase(main):018:0> put 'test', 'row1', 'f1:b', 'value2'
0 row(s) in 0.0030 seconds

hbase(main):019:0> put 'test', 'row3', 'f1', 'value3'
hbase(main):019:0> put 'test', 'row2', 'f1', 'value3'
0 row(s) in 0.0050 seconds

hbase(main):020:0> put 'test', 'row4', 'f1', 'value4'
hbase(main):020:0> put 'test', 'row3', 'f1', 'value4'
0 row(s) in 0.0110 seconds

hbase(main):021:0> scan 'test'
ROW COLUMN+CELL
row1 column=f1:, timestamp=1401883411986, value=value1
row2 column=f1:, timestamp=1401883415212, value=value2
row3 column=f1:, timestamp=1401883417858, value=value3
row4 column=f1:, timestamp=1401883420805, value=value4
row1 column=f1:a, timestamp=1401883411986, value=value1
row1 column=f1:b, timestamp=1401883415212, value=value2
row2 column=f1:, timestamp=1401883417858, value=value3
row3 column=f1:, timestamp=1401883420805, value=value4
4 row(s) in 0.0240 seconds
"""
if __name__ == "__main__":
Expand All @@ -61,6 +62,8 @@
table = sys.argv[2]
sc = SparkContext(appName="HBaseInputFormat")

# Other options for configuring scan behavior are available. More information available at
# https://github.com/apache/hbase/blob/master/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
Expand All @@ -72,6 +75,8 @@
keyConverter=keyConv,
valueConverter=valueConv,
conf=conf)
hbase_rdd = hbase_rdd.flatMapValues(lambda v: v.split("\n")).mapValues(json.loads)

output = hbase_rdd.collect()
for (k, v) in output:
print (k, v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.examples

import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.spark._
Expand All @@ -36,7 +36,7 @@ object HBaseTest {
// Initialize hBase table if necessary
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(args(0))) {
val tableDesc = new HTableDescriptor(args(0))
val tableDesc = new HTableDescriptor(TableName.valueOf(args(0)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an API change in HBase?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The old one is deprecated

admin.createTable(tableDesc)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,34 @@
package org.apache.spark.examples.pythonconverters

import scala.collection.JavaConversions._
import scala.util.parsing.json.JSONObject

import org.apache.spark.api.python.Converter
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.KeyValue.Type
import org.apache.hadoop.hbase.CellUtil

/**
* Implementation of [[org.apache.spark.api.python.Converter]] that converts an
* HBase Result to a String
* Implementation of [[org.apache.spark.api.python.Converter]] that converts all
* the records in an HBase Result to a String
*/
class HBaseResultToStringConverter extends Converter[Any, String] {
override def convert(obj: Any): String = {
import collection.JavaConverters._
val result = obj.asInstanceOf[Result]
Bytes.toStringBinary(result.value())
val output = result.listCells.asScala.map(cell =>
Map(
"row" -> Bytes.toStringBinary(CellUtil.cloneRow(cell)),
"columnFamily" -> Bytes.toStringBinary(CellUtil.cloneFamily(cell)),
"qualifier" -> Bytes.toStringBinary(CellUtil.cloneQualifier(cell)),
"timestamp" -> cell.getTimestamp.toString,
"type" -> Type.codeToType(cell.getTypeByte).toString,
"value" -> Bytes.toStringBinary(CellUtil.cloneValue(cell))
)
)
output.map(JSONObject(_).toString()).mkString("\n")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about JSONObject(output).toString()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Output is a Buffer[Map[String, String]], since there are several records in an HBase Result.
However JSONObject has the only constructor JSONObject(obj: Map[String, Any]). So JSONObject(output).toString() will cause compilation failure. ^^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That make sense. JSON will escape the \n in String, so it's safe to have \n as separator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! In fact, HBase itself will escape \n too. That's why I choose \n at the first place.
Thanks!

}
}

Expand Down