Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBase 2.4 #10

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ sproutMultiModuleBuild {
squads = ['listening']
slackChannel = '#eng-listening-platform'
runECRLogin = 'true'
mavenProperties = '-Dspark.version=3.1.1 -Dscala.version=2.12.10 -Dscala.binary.version=2.12 -Dhbase.version=2.2.6 -Dhadoop.profile=3.0 -Dhadoop-three.version=3.2.1 -DskipTests'
mavenProperties = ''
}
6 changes: 3 additions & 3 deletions hbase-connectors-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<artifactId>hbase-connectors</artifactId>
<groupId>org.apache.hbase.connectors</groupId>
<version>1.0.1.sprout-emr</version>
<version>${revision}</version>
<relativePath>../</relativePath>
</parent>
<artifactId>hbase-connectors-assembly</artifactId>
Expand All @@ -40,12 +40,12 @@
<dependency>
<groupId>org.apache.hbase.connectors.kafka</groupId>
<artifactId>hbase-kafka-proxy</artifactId>
<version>1.0.1.sprout-emr</version>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase.connectors.kafka</groupId>
<artifactId>hbase-kafka-model</artifactId>
<version>1.0.1.sprout-emr</version>
<version>${revision}</version>
</dependency>
</dependencies>
<build>
Expand Down
2 changes: 1 addition & 1 deletion kafka/hbase-kafka-model/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.hbase.connectors</groupId>
<artifactId>kafka</artifactId>
<version>1.0.1.sprout-emr</version>
<version>${revision}</version>
<relativePath>../</relativePath>
</parent>
<groupId>org.apache.hbase.connectors.kafka</groupId>
Expand Down
2 changes: 1 addition & 1 deletion kafka/hbase-kafka-proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.hbase.connectors</groupId>
<artifactId>kafka</artifactId>
<version>1.0.1.sprout-emr</version>
<version>${revision}</version>
<relativePath>../</relativePath>
</parent>
<groupId>org.apache.hbase.connectors.kafka</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
Expand Down Expand Up @@ -69,6 +70,11 @@ private static final class CheckMutation {
List<String> topics = new ArrayList<>();
}

@Override
public RegionLocator getRegionLocator() throws IOException {
throw new UnsupportedOperationException();
}

public KafkaTableForBridge(TableName tableName,
Configuration conf,
TopicRoutingRules routingRules,
Expand Down
4 changes: 2 additions & 2 deletions kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.hbase.connectors</groupId>
<artifactId>hbase-connectors</artifactId>
<version>1.0.1.sprout-emr</version>
<version>${revision}</version>
<relativePath>../</relativePath>
</parent>
<artifactId>kafka</artifactId>
Expand All @@ -48,7 +48,7 @@
<dependency>
<groupId>org.apache.hbase.connectors.kafka</groupId>
<artifactId>hbase-kafka-model</artifactId>
<version>1.0.1.sprout-emr</version>
<version>${revision}</version>
</dependency>
</dependencies>
</dependencyManagement>
Expand Down
42 changes: 32 additions & 10 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<groupId>org.apache.hbase.connectors</groupId>
<artifactId>hbase-connectors</artifactId>
<!-- See https://maven.apache.org/maven-ci-friendly.html -->
<version>1.0.1.sprout-emr</version>
<version>${revision}</version>
<name>Apache HBase Connectors</name>
<packaging>pom</packaging>
<description>
Expand Down Expand Up @@ -136,33 +136,33 @@
<developers/>
<properties>
<!-- See https://maven.apache.org/maven-ci-friendly.html -->
<revision>1.0.1.sprout-emr</revision>
<revision>1.0.1.sprout-emr-hbase24</revision>
<maven.javadoc.skip>true</maven.javadoc.skip>
<maven.build.timestamp.format>yyyy-MM-dd'T'HH:mm</maven.build.timestamp.format>
<buildDate>${maven.build.timestamp}</buildDate>
<compileSource>1.8</compileSource>
<java.min.version>${compileSource}</java.min.version>
<maven.min.version>3.5.0</maven.min.version>
<hbase.version>2.2.6</hbase.version>
<hbase.version>2.4.8</hbase.version>
<exec.maven.version>1.6.0</exec.maven.version>
<audience-annotations.version>0.5.0</audience-annotations.version>
<junit.version>4.12</junit.version>
<hbase-thirdparty.version>2.2.1</hbase-thirdparty.version>
<hbase-thirdparty.version>3.5.1</hbase-thirdparty.version>
<hadoop-two.version>2.8.5</hadoop-two.version>
<hadoop-three.version>3.2.1</hadoop-three.version>
<hadoop.version>${hadoop-two.version}</hadoop.version>
<hadoop.version>${hadoop-three.version}</hadoop.version>
<slf4j.version>1.7.25</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<checkstyle.version>8.18</checkstyle.version>
<maven.checkstyle.version>3.1.0</maven.checkstyle.version>
<surefire.version>3.0.0-M4</surefire.version>
<enforcer.version>3.0.0-M3</enforcer.version>
<checkstyle.version>8.45.1</checkstyle.version>
<maven.checkstyle.version>3.1.2</maven.checkstyle.version>
<surefire.version>3.0.0-M5</surefire.version>
<enforcer.version>3.0.0</enforcer.version>
<extra.enforcer.version>1.2</extra.enforcer.version>
<restrict-imports.enforcer.version>0.14.0</restrict-imports.enforcer.version>
<!--Internally we use a different version of protobuf. See hbase-protocol-shaded-->
<external.protobuf.version>2.5.0</external.protobuf.version>
<protobuf.plugin.version>0.5.0</protobuf.plugin.version>
<commons-io.version>2.5</commons-io.version>
<commons-io.version>2.11.0</commons-io.version>
<avro.version>1.7.7</avro.version>
<commons-lang3.version>3.6</commons-lang3.version>
<!--This property is for hadoops netty. HBase netty
Expand Down Expand Up @@ -696,5 +696,27 @@
</plugins>
</build>
</profile>
<!-- this profile should match the name of the release profile in the root asf pom -->
<profile>
<id>apache-release</id>
<build>
<plugins>
<!-- This should insert itself in place of the normal deploy plugin and then
handle either closing or dropping the staging repository for us depending
on if the build succeeds.
-->
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<version>1.6.8</version>
<extensions>true</extensions>
<configuration>
<nexusUrl>https://repository.apache.org/</nexusUrl>
<serverId>apache.releases.https</serverId>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
26 changes: 16 additions & 10 deletions spark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,25 @@ limitations under the License.

# Apache HBase&trade; Spark Connector

## Scala and Spark Versions
## Spark, Scala and Configurable Options

To generate an artifact for a different [spark version](https://mvnrepository.com/artifact/org.apache.spark/spark-core) and/or [scala version](https://www.scala-lang.org/download/all.html), pass command-line options as follows (changing version numbers appropriately):
To generate an artifact for a different [Spark version](https://mvnrepository.com/artifact/org.apache.spark/spark-core) and/or [Scala version](https://www.scala-lang.org/download/all.html),
[Hadoop version](https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core), or [HBase version](https://mvnrepository.com/artifact/org.apache.hbase/hbase), pass command-line options as follows (changing version numbers appropriately):

```
$ mvn -Dspark.version=2.2.2 -Dscala.version=2.11.7 -Dscala.binary.version=2.11 clean install
$ mvn -Dspark.version=3.1.2 -Dscala.version=2.12.10 -Dhadoop-three.version=3.2.0 -Dscala.binary.version=2.12 -Dhbase.version=2.4.8 clean install
```

---
To build the connector with Spark 3.0, compile it with scala 2.12.
Additional configurations that you can customize are the Spark version, HBase version, and Hadoop version.
Example:
Note: to build the connector with Spark 2.x, compile it with `-Dscala.binary.version=2.11` and use the profile `-Dhadoop.profile=2.0`

## Configuration and Installation
**Client-side** (Spark) configuration:
- The HBase configuration file `hbase-site.xml` should be made available to Spark, it can be copied to `$SPARK_CONF_DIR` (default is $SPARK_HOME/conf`)

**Server-side** (HBase region servers) configuration:
- The following jars needs to be in the CLASSPATH of the HBase region servers:
- scala-library, hbase-spark, and hbase-spark-protocol-shaded.
- The server-side configuration is needed for column filter pushdown
- if you cannot perform the server-side configuration, consider using `.option("hbase.spark.pushdown.columnfilter", false)`
- The Scala library version must match the Scala version (2.11 or 2.12) used for compiling the connector.

```
$ mvn -Dspark.version=3.0.1 -Dscala.version=2.12.10 -Dscala.binary.version=2.12 -Dhbase.version=2.2.4 -Dhadoop.profile=3.0 -Dhadoop-three.version=3.2.0 -DskipTests clean package
```
4 changes: 2 additions & 2 deletions spark/hbase-spark-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.hbase.connectors</groupId>
<artifactId>spark</artifactId>
<version>1.0.1.sprout-emr</version>
<version>${revision}</version>
<relativePath>../</relativePath>
</parent>
<groupId>org.apache.hbase.connectors.spark</groupId>
Expand Down Expand Up @@ -186,7 +186,7 @@
<dependency>
<groupId>org.apache.hbase.connectors.spark</groupId>
<artifactId>hbase-spark</artifactId>
<version>1.0.1.sprout-emr</version>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
Expand Down
2 changes: 1 addition & 1 deletion spark/hbase-spark-protocol-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.hbase.connectors</groupId>
<artifactId>spark</artifactId>
<version>1.0.1.sprout-emr</version>
<version>${revision}</version>
<relativePath>../</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion spark/hbase-spark-protocol/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.hbase.connectors</groupId>
<artifactId>spark</artifactId>
<version>1.0.1.sprout-emr</version>
<version>${revision}</version>
<relativePath>../</relativePath>
</parent>

Expand Down
22 changes: 10 additions & 12 deletions spark/hbase-spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.hbase.connectors</groupId>
<artifactId>spark</artifactId>
<version>1.0.1.sprout-emr</version>
<version>${revision}</version>
<relativePath>../</relativePath>
</parent>

Expand Down Expand Up @@ -286,14 +286,16 @@
<skipTests>true</skipTests>
</properties>
</profile>
<!-- profile against Hadoop 2.x: This is the default. -->
<!--
profile for building against Hadoop 2.x. Activate using:
mvn -Dhadoop.profile=2.0
-->
<profile>
<id>hadoop-2.0</id>
<activation>
<property>
<!--Below formatting for dev-support/generate-hadoopX-poms.sh-->
<!--h2-->
<name>!hadoop.profile</name>
<name>hadoop.profile</name>
<value>2.0</value>
</property>
</activation>
<dependencies>
Expand Down Expand Up @@ -357,20 +359,16 @@
</dependency>
</dependencies>
</profile>
<!--
profile for building against Hadoop 3.0.x. Activate using:
mvn -Dhadoop.profile=3.0
-->
<!-- profile against Hadoop 3.x: This is the default. -->
<profile>
<id>hadoop-3.0</id>
<activation>
<property>
<name>hadoop.profile</name>
<value>3.0</value>
<name>!hadoop.profile</name>
</property>
</activation>
<properties>
<hadoop.version>3.0</hadoop.version>
<hadoop.version>3.2.0</hadoop.version>
</properties>
<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.io.compress.Compression
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
import org.apache.hadoop.hbase.io.hfile.{HFile, CacheConfig, HFileContextBuilder, HFileWriterImpl}
import org.apache.hadoop.hbase.regionserver.{HStore, HStoreFile, StoreFileWriter, BloomType}
import org.apache.hadoop.hbase.regionserver.{HStoreFile, StoreFileWriter, StoreUtils, BloomType}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.broadcast.Broadcast
Expand Down Expand Up @@ -902,8 +902,8 @@ class HBaseContext(@transient val sc: SparkContext,
tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f)
val contextBuilder = new HFileContextBuilder()
.withCompression(Algorithm.valueOf(familyOptions.compression))
.withChecksumType(HStore.getChecksumType(conf))
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
.withChecksumType(StoreUtils.getChecksumType(conf))
.withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
.withBlockSize(familyOptions.blockSize)

if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
Expand All @@ -919,7 +919,7 @@ class HBaseContext(@transient val sc: SparkContext,
new WriterLength(0,
new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
.withBloomType(BloomType.valueOf(familyOptions.bloomType))
.withComparator(CellComparator.getInstance()).withFileContext(hFileContext)
.withFileContext(hFileContext)
.withFilePath(new Path(familydir, "_" + UUID.randomUUID.toString.replaceAll("-", "")))
.withFavoredNodes(favoredNodes).build())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ class NaiveEncoder extends BytesEncoder with Logging{
val value = Bytes.toInt(filterBytes, offset2 + 1)
compare(in.compareTo(value), ops)
case LongEnc | TimestampEnc =>
val in = Bytes.toInt(input, offset1)
val value = Bytes.toInt(filterBytes, offset2 + 1)
val in = Bytes.toLong(input, offset1)
val value = Bytes.toLong(filterBytes, offset2 + 1)
compare(in.compareTo(value), ops)
case FloatEnc =>
val in = Bytes.toFloat(input, offset1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,8 @@ public void testBulkGet() throws IOException {

final JavaRDD<String> stringJavaRDD =
HBASE_CONTEXT.bulkGet(TableName.valueOf(tableName), 2, rdd,
new GetFunction(),
new ResultFunction());
new GetFunction(),
new ResultFunction());

Assert.assertEquals(stringJavaRDD.count(), 5);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
for ( i <- 0 until f1FileList.length) {
val reader = HFile.createReader(fs, f1FileList(i).getPath,
new CacheConfig(config), true, config)
assert(reader.getCompressionAlgorithm.getName.equals("gz"))
assert(reader.getTrailer.getCompressionCodec().getName.equals("gz"))
assert(reader.getDataBlockEncoding.name().equals("PREFIX"))
}

Expand All @@ -401,7 +401,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
for ( i <- 0 until f2FileList.length) {
val reader = HFile.createReader(fs, f2FileList(i).getPath,
new CacheConfig(config), true, config)
assert(reader.getCompressionAlgorithm.getName.equals("none"))
assert(reader.getTrailer.getCompressionCodec().getName.equals("none"))
assert(reader.getDataBlockEncoding.name().equals("NONE"))
}

Expand Down Expand Up @@ -870,7 +870,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
for ( i <- 0 until f1FileList.length) {
val reader = HFile.createReader(fs, f1FileList(i).getPath,
new CacheConfig(config), true, config)
assert(reader.getCompressionAlgorithm.getName.equals("gz"))
assert(reader.getTrailer.getCompressionCodec().getName.equals("gz"))
assert(reader.getDataBlockEncoding.name().equals("PREFIX"))
}

Expand All @@ -880,7 +880,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
for ( i <- 0 until f2FileList.length) {
val reader = HFile.createReader(fs, f2FileList(i).getPath,
new CacheConfig(config), true, config)
assert(reader.getCompressionAlgorithm.getName.equals("none"))
assert(reader.getTrailer.getCompressionCodec().getName.equals("none"))
assert(reader.getDataBlockEncoding.name().equals("NONE"))
}

Expand Down
Loading