Skip to content

Commit

Permalink
Merge updateToHBase24 from LucaCanali: apache#88
Browse files Browse the repository at this point in the history
The purpose of this is to match our hbase-server version with the jar
provided on EMR. While our HBase is listed as 2.2.6, the library code we
have from Amazon is for 2.4.1+. There was a breaking change in 2.4.1
and this mismatch prevents bulkLoadThinRows() from working.
  • Loading branch information
ianawilson committed Dec 27, 2021
2 parents e0074c8 + cecb422 commit 45971a1
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
Expand Down Expand Up @@ -69,6 +70,11 @@ private static final class CheckMutation {
List<String> topics = new ArrayList<>();
}

@Override
public RegionLocator getRegionLocator() throws IOException {
throw new UnsupportedOperationException();
}

public KafkaTableForBridge(TableName tableName,
Configuration conf,
TopicRoutingRules routingRules,
Expand Down
42 changes: 32 additions & 10 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<groupId>org.apache.hbase.connectors</groupId>
<artifactId>hbase-connectors</artifactId>
<!-- See https://maven.apache.org/maven-ci-friendly.html -->
<version>1.0.1.sprout-emr</version>
<version>${revision}</version>
<name>Apache HBase Connectors</name>
<packaging>pom</packaging>
<description>
Expand Down Expand Up @@ -136,33 +136,33 @@
<developers/>
<properties>
<!-- See https://maven.apache.org/maven-ci-friendly.html -->
<revision>1.0.1.sprout-emr</revision>
<revision>1.0.1.sprout-emr-hbase2.4</revision>
<maven.javadoc.skip>true</maven.javadoc.skip>
<maven.build.timestamp.format>yyyy-MM-dd'T'HH:mm</maven.build.timestamp.format>
<buildDate>${maven.build.timestamp}</buildDate>
<compileSource>1.8</compileSource>
<java.min.version>${compileSource}</java.min.version>
<maven.min.version>3.5.0</maven.min.version>
<hbase.version>2.2.6</hbase.version>
<hbase.version>2.4.8</hbase.version>
<exec.maven.version>1.6.0</exec.maven.version>
<audience-annotations.version>0.5.0</audience-annotations.version>
<junit.version>4.12</junit.version>
<hbase-thirdparty.version>2.2.1</hbase-thirdparty.version>
<hbase-thirdparty.version>3.5.1</hbase-thirdparty.version>
<hadoop-two.version>2.8.5</hadoop-two.version>
<hadoop-three.version>3.2.1</hadoop-three.version>
<hadoop.version>${hadoop-two.version}</hadoop.version>
<hadoop.version>${hadoop-three.version}</hadoop.version>
<slf4j.version>1.7.25</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<checkstyle.version>8.18</checkstyle.version>
<maven.checkstyle.version>3.1.0</maven.checkstyle.version>
<surefire.version>3.0.0-M4</surefire.version>
<enforcer.version>3.0.0-M3</enforcer.version>
<checkstyle.version>8.45.1</checkstyle.version>
<maven.checkstyle.version>3.1.2</maven.checkstyle.version>
<surefire.version>3.0.0-M5</surefire.version>
<enforcer.version>3.0.0</enforcer.version>
<extra.enforcer.version>1.2</extra.enforcer.version>
<restrict-imports.enforcer.version>0.14.0</restrict-imports.enforcer.version>
<!--Internally we use a different version of protobuf. See hbase-protocol-shaded-->
<external.protobuf.version>2.5.0</external.protobuf.version>
<protobuf.plugin.version>0.5.0</protobuf.plugin.version>
<commons-io.version>2.5</commons-io.version>
<commons-io.version>2.11.0</commons-io.version>
<avro.version>1.7.7</avro.version>
<commons-lang3.version>3.6</commons-lang3.version>
<!--This property is for hadoops netty. HBase netty
Expand Down Expand Up @@ -696,5 +696,27 @@
</plugins>
</build>
</profile>
<!-- this profile should match the name of the release profile in the root asf pom -->
<profile>
<id>apache-release</id>
<build>
<plugins>
<!-- This should insert itself in place of the normal deploy plugin and then
handle either closing or dropping the staging repository for us depending
on if the build succeeds.
-->
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<version>1.6.8</version>
<extensions>true</extensions>
<configuration>
<nexusUrl>https://repository.apache.org/</nexusUrl>
<serverId>apache.releases.https</serverId>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
26 changes: 16 additions & 10 deletions spark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,25 @@ limitations under the License.

# Apache HBase&trade; Spark Connector

## Scala and Spark Versions
## Spark, Scala and Configurable Options

To generate an artifact for a different [spark version](https://mvnrepository.com/artifact/org.apache.spark/spark-core) and/or [scala version](https://www.scala-lang.org/download/all.html), pass command-line options as follows (changing version numbers appropriately):
To generate an artifact for a different [Spark version](https://mvnrepository.com/artifact/org.apache.spark/spark-core) and/or [Scala version](https://www.scala-lang.org/download/all.html),
[Hadoop version](https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core), or [HBase version](https://mvnrepository.com/artifact/org.apache.hbase/hbase), pass command-line options as follows (changing version numbers appropriately):

```
$ mvn -Dspark.version=2.2.2 -Dscala.version=2.11.7 -Dscala.binary.version=2.11 clean install
$ mvn -Dspark.version=3.1.2 -Dscala.version=2.12.10 -Dhadoop-three.version=3.2.0 -Dscala.binary.version=2.12 -Dhbase.version=2.4.8 clean install
```

---
To build the connector with Spark 3.0, compile it with scala 2.12.
Additional configurations that you can customize are the Spark version, HBase version, and Hadoop version.
Example:
Note: to build the connector with Spark 2.x, compile it with `-Dscala.binary.version=2.11` and use the profile `-Dhadoop.profile=2.0`

## Configuration and Installation
**Client-side** (Spark) configuration:
- The HBase configuration file `hbase-site.xml` should be made available to Spark, it can be copied to `$SPARK_CONF_DIR` (default is $SPARK_HOME/conf`)

**Server-side** (HBase region servers) configuration:
- The following jars needs to be in the CLASSPATH of the HBase region servers:
- scala-library, hbase-spark, and hbase-spark-protocol-shaded.
- The server-side configuration is needed for column filter pushdown
- if you cannot perform the server-side configuration, consider using `.option("hbase.spark.pushdown.columnfilter", false)`
- The Scala library version must match the Scala version (2.11 or 2.12) used for compiling the connector.

```
$ mvn -Dspark.version=3.0.1 -Dscala.version=2.12.10 -Dscala.binary.version=2.12 -Dhbase.version=2.2.4 -Dhadoop.profile=3.0 -Dhadoop-three.version=3.2.0 -DskipTests clean package
```
20 changes: 9 additions & 11 deletions spark/hbase-spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,16 @@
<skipTests>true</skipTests>
</properties>
</profile>
<!-- profile against Hadoop 2.x: This is the default. -->
<!--
profile for building against Hadoop 2.x. Activate using:
mvn -Dhadoop.profile=2.0
-->
<profile>
<id>hadoop-2.0</id>
<activation>
<property>
<!--Below formatting for dev-support/generate-hadoopX-poms.sh-->
<!--h2-->
<name>!hadoop.profile</name>
<name>hadoop.profile</name>
<value>2.0</value>
</property>
</activation>
<dependencies>
Expand Down Expand Up @@ -357,20 +359,16 @@
</dependency>
</dependencies>
</profile>
<!--
profile for building against Hadoop 3.0.x. Activate using:
mvn -Dhadoop.profile=3.0
-->
<!-- profile against Hadoop 3.x: This is the default. -->
<profile>
<id>hadoop-3.0</id>
<activation>
<property>
<name>hadoop.profile</name>
<value>3.0</value>
<name>!hadoop.profile</name>
</property>
</activation>
<properties>
<hadoop.version>3.0</hadoop.version>
<hadoop.version>3.2.0</hadoop.version>
</properties>
<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.io.compress.Compression
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
import org.apache.hadoop.hbase.io.hfile.{HFile, CacheConfig, HFileContextBuilder, HFileWriterImpl}
import org.apache.hadoop.hbase.regionserver.{HStore, HStoreFile, StoreFileWriter, BloomType}
import org.apache.hadoop.hbase.regionserver.{HStoreFile, StoreFileWriter, StoreUtils, BloomType}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.broadcast.Broadcast
Expand Down Expand Up @@ -902,8 +902,8 @@ class HBaseContext(@transient val sc: SparkContext,
tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f)
val contextBuilder = new HFileContextBuilder()
.withCompression(Algorithm.valueOf(familyOptions.compression))
.withChecksumType(HStore.getChecksumType(conf))
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
.withChecksumType(StoreUtils.getChecksumType(conf))
.withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
.withBlockSize(familyOptions.blockSize)

if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
Expand All @@ -919,7 +919,7 @@ class HBaseContext(@transient val sc: SparkContext,
new WriterLength(0,
new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
.withBloomType(BloomType.valueOf(familyOptions.bloomType))
.withComparator(CellComparator.getInstance()).withFileContext(hFileContext)
.withFileContext(hFileContext)
.withFilePath(new Path(familydir, "_" + UUID.randomUUID.toString.replaceAll("-", "")))
.withFavoredNodes(favoredNodes).build())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ class NaiveEncoder extends BytesEncoder with Logging{
val value = Bytes.toInt(filterBytes, offset2 + 1)
compare(in.compareTo(value), ops)
case LongEnc | TimestampEnc =>
val in = Bytes.toInt(input, offset1)
val value = Bytes.toInt(filterBytes, offset2 + 1)
val in = Bytes.toLong(input, offset1)
val value = Bytes.toLong(filterBytes, offset2 + 1)
compare(in.compareTo(value), ops)
case FloatEnc =>
val in = Bytes.toFloat(input, offset1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,8 @@ public void testBulkGet() throws IOException {

final JavaRDD<String> stringJavaRDD =
HBASE_CONTEXT.bulkGet(TableName.valueOf(tableName), 2, rdd,
new GetFunction(),
new ResultFunction());
new GetFunction(),
new ResultFunction());

Assert.assertEquals(stringJavaRDD.count(), 5);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
for ( i <- 0 until f1FileList.length) {
val reader = HFile.createReader(fs, f1FileList(i).getPath,
new CacheConfig(config), true, config)
assert(reader.getCompressionAlgorithm.getName.equals("gz"))
assert(reader.getTrailer.getCompressionCodec().getName.equals("gz"))
assert(reader.getDataBlockEncoding.name().equals("PREFIX"))
}

Expand All @@ -401,7 +401,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
for ( i <- 0 until f2FileList.length) {
val reader = HFile.createReader(fs, f2FileList(i).getPath,
new CacheConfig(config), true, config)
assert(reader.getCompressionAlgorithm.getName.equals("none"))
assert(reader.getTrailer.getCompressionCodec().getName.equals("none"))
assert(reader.getDataBlockEncoding.name().equals("NONE"))
}

Expand Down Expand Up @@ -870,7 +870,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
for ( i <- 0 until f1FileList.length) {
val reader = HFile.createReader(fs, f1FileList(i).getPath,
new CacheConfig(config), true, config)
assert(reader.getCompressionAlgorithm.getName.equals("gz"))
assert(reader.getTrailer.getCompressionCodec().getName.equals("gz"))
assert(reader.getDataBlockEncoding.name().equals("PREFIX"))
}

Expand All @@ -880,7 +880,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
for ( i <- 0 until f2FileList.length) {
val reader = HFile.createReader(fs, f2FileList(i).getPath,
new CacheConfig(config), true, config)
assert(reader.getCompressionAlgorithm.getName.equals("none"))
assert(reader.getTrailer.getCompressionCodec().getName.equals("none"))
assert(reader.getDataBlockEncoding.name().equals("NONE"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,74 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
assert(!builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
}

test("Long Type") {
val greaterLogic = new GreaterThanLogicExpression("Col1", 0)
greaterLogic.setEncoder(encoder)
val greaterAndEqualLogic = new GreaterThanOrEqualLogicExpression("Col1", 0)
greaterAndEqualLogic.setEncoder(encoder)
val lessLogic = new LessThanLogicExpression("Col1", 0)
lessLogic.setEncoder(encoder)
val lessAndEqualLogic = new LessThanOrEqualLogicExpression("Col1", 0)
lessAndEqualLogic.setEncoder(encoder)
val equalLogic = new EqualLogicExpression("Col1", 0, false)
val notEqualLogic = new EqualLogicExpression("Col1", 0, true)

val columnToCurrentRowValueMap = new util.HashMap[String, ByteArrayComparable]()
columnToCurrentRowValueMap.put("Col1", new ByteArrayComparable(Bytes.toBytes(10L)))
val valueFromQueryValueArray = new Array[Array[Byte]](1)

//great than
valueFromQueryValueArray(0) = encoder.encode(LongType, 10L)
assert(!greaterLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))

valueFromQueryValueArray(0) = encoder.encode(LongType, 20L)
assert(!greaterLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))

//great than and equal
valueFromQueryValueArray(0) = encoder.encode(LongType, 5L)
assert(greaterAndEqualLogic.execute(columnToCurrentRowValueMap,
valueFromQueryValueArray))

valueFromQueryValueArray(0) = encoder.encode(LongType, 10L)
assert(greaterAndEqualLogic.execute(columnToCurrentRowValueMap,
valueFromQueryValueArray))

valueFromQueryValueArray(0) = encoder.encode(LongType, 20L)
assert(!greaterAndEqualLogic.execute(columnToCurrentRowValueMap,
valueFromQueryValueArray))

//less than
valueFromQueryValueArray(0) = encoder.encode(LongType, 10L)
assert(!lessLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))

valueFromQueryValueArray(0) = encoder.encode(LongType, 5L)
assert(!lessLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))

//less than and equal
valueFromQueryValueArray(0) = encoder.encode(LongType, 20L)
assert(lessAndEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))

valueFromQueryValueArray(0) = encoder.encode(LongType, 20L)
assert(lessAndEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))

valueFromQueryValueArray(0) = encoder.encode(LongType, 10L)
assert(lessAndEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))

//equal too
valueFromQueryValueArray(0) = Bytes.toBytes(10L)
assert(equalLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))

valueFromQueryValueArray(0) = Bytes.toBytes(5L)
assert(!equalLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))

//not equal too
valueFromQueryValueArray(0) = Bytes.toBytes(10L)
assert(!notEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))

valueFromQueryValueArray(0) = Bytes.toBytes(5L)
assert(notEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
}

test("String Type") {
val leftLogic = new LessThanLogicExpression("Col1", 0)
leftLogic.setEncoder(encoder)
Expand Down
6 changes: 3 additions & 3 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@

<properties>
<protobuf.plugin.version>0.6.1</protobuf.plugin.version>
<hbase-thirdparty.version>2.2.1</hbase-thirdparty.version>
<jackson.version>2.9.10</jackson.version>
<spark.version>3.1.1</spark.version>
<hbase-thirdparty.version>3.5.1</hbase-thirdparty.version>
<jackson.version>2.12.5</jackson.version>
<spark.version>3.1.2</spark.version>
<!-- The following version is in sync with Spark's choice
Please take caution when this version is modified -->
<scala.version>2.12.10</scala.version>
Expand Down

0 comments on commit 45971a1

Please sign in to comment.