Skip to content
This repository was archived by the owner on Jul 9, 2021. It is now read-only.

phoenix integration with 1.4.6 #10

Open
wants to merge 6 commits into
base: branch-1.4.6
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@
</if>

<!-- Set dependency versions that are working with all Hadoop versions-->
<property name="hbase95.version" value="0.95.2-hadoop${hbasecompatprofile}-SNAPSHOT" />
<property name="hbase95.version" value="0.98.9-hadoop${hbasecompatprofile}" />

<!-- Set phoenix version-->
<property name="phoenix.version" value="4.6.0-HBase-0.98"/>

<!-- Load system-wide and project-wide default properties set by
the user, to avoid needing to override with -D. -->
Expand Down
22 changes: 16 additions & 6 deletions ivy.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ under the License.
extends="runtime"
description="artifacts needed to compile/test the application"/>
<conf name="accumulo" visibility="private" />
<conf name="phoenix" visibility="private" />
<conf name="hbase94" visibility="private" />
<conf name="hbase95" visibility="private" extends="hbasecompat${hbasecompatprofile}" />
<conf name="hbasecompat1" visibility="private" />
Expand All @@ -46,15 +47,15 @@ under the License.
<conf name="avrohadoop2" visibility="private" />
<conf name="hcatalog13" visibility="private" />
<conf name="hadoop23" visibility="private"
extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo" />
extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo,phoenix" />
<conf name="hadoop20" visibility="private"
extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo" />
extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo,phoenix" />
<conf name="hadoop100" visibility="private"
extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo" />
extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo,phoenix" />
<conf name="hadoop200" visibility="private"
extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo" />
extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo,phoenix" />
<conf name="hadoop210" visibility="private"
extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo" />
extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo,phoenix" />

<conf name="test" visibility="private" extends="common,runtime"/>
<conf name="hadoop23test" visibility="private" extends="test,hadoop23" />
Expand Down Expand Up @@ -291,7 +292,16 @@ under the License.
<exclude org="log4j" module="log4j"/>
</dependency>


<dependency org="org.apache.phoenix" name="phoenix-core" rev="${phoenix.version}" conf="phoenix->default">
<artifact name="phoenix-core" type="jar"/>
<artifact name="phoenix-core" type="test-jar" ext="jar" m:classifier="tests"/>
<exclude org="org.apache.hbase" module="hbase-common"/>
<exclude org="org.apache.hbase" module="hbase-prefix-tree"/>
<exclude org="org.apache.hbase" module="hbase-client"/>
<exclude org="org.apache.hbase" module="hbase-server"/>
<exclude org="org.apache.hbase" module="hbase-protocol"/>
</dependency>

<dependency org="org.apache.hive.hcatalog" name="hive-hcatalog-core"
rev="${hcatalog.version}" conf="hcatalog13->default">
<artifact name="hive-hcatalog-core" type="jar"/>
Expand Down
3 changes: 3 additions & 0 deletions ivy/ivysettings.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ under the License.
-->
<property name="repo.maven.org" value="http://repo1.maven.org/maven2/"
override="false"/>
<property name="release.apache.org"
value="https://repository.apache.org/content/repositories/releases/"
override="false"/>
<property name="snapshot.apache.org"
value="https://repository.apache.org/content/repositories/snapshots/"
override="false"/>
Expand Down
3 changes: 3 additions & 0 deletions src/docs/user/import.txt
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,9 @@ include::hbase.txt[]
include::accumulo-args.txt[]
include::accumulo.txt[]

include::phoenix-args.txt[]
include::phoenix.txt[]

include::codegen-args.txt[]

As mentioned earlier, a byproduct of importing a table to HDFS is a
Expand Down
33 changes: 33 additions & 0 deletions src/docs/user/phoenix-args.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@

////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////


.Phoenix arguments:
[grid="all"]
`-------------------------------------`-----------------------------------
Argument Description
--------------------------------------------------------------------------
+\--phoenix-table <table-nam>+ Specifies Phoenix table to use\
as the target instead of HDFS
+\--phoenix-column-mapping <col-map>+ (Optional)Comma-separated mapping of\
sqoop column to phoenix column.
The two should be separated by ;
+\--phoenix-bulkload+ Enables bulk loading
--------------------------------------------------------------------------

47 changes: 47 additions & 0 deletions src/docs/user/phoenix.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@

////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////


Importing Data Into Phoenix
^^^^^^^^^^^^^^^^^^^^^^^^^^^

Sqoop supports additional import targets beyond HDFS and Hive. Sqoop
can also import records into a table in Phoenix.

By specifying +\--phoenix-table+, you instruct Sqoop to import
to a table in Phoenix backed Hbase rather than a directory in HDFS. Sqoop will
import data to the table specified as the argument to +\--phoenix-table+.

NOTE: This function is incompatible with direct import (parameter
+\--direct+).

If the column names in the input table differ from the column names
in Phoenix, specify +\--phoenix-column-mapping+. The argument is a
comma-separated list of input column name and phoenix column name . The
two should be separated by a ;.
Ex: sqoop_col1;phoenix_col1,sqoop_col2;phoenix_col2..
The mapper transforms each row of the input table into an upsert
into Phoenix and persists using PhoenixOutputFormat.


If the target Phoenix table do not exist, the Sqoop job will
exit with an error. You should create the target table before running an import.

To decrease the load on hbase, Sqoop can do bulk loading as opposed to
direct writes. To use bulk loading, enable it using +\--phoenix-bulkload+.
2 changes: 1 addition & 1 deletion src/docs/user/validation.txt
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ The following are the limitations in the current implementation:

* all-tables option
* free-form query option
* Data imported into Hive, HBase or Accumulo
* Data imported into Hive, HBase, Phoenix or Accumulo
* table import with --where argument
* incremental imports

Expand Down
57 changes: 56 additions & 1 deletion src/java/org/apache/sqoop/SqoopOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,15 @@ public String toString() {
// explicit split by cols
@StoredAsProperty("reset.onemapper") private boolean autoResetToOneMapper;

// Phoenix table to import into.
@StoredAsProperty("phoenix.table") private String phoenixTable;

//Phoenix column mapping to sqoop columns.
@StoredAsProperty("phoenix.column.mapping") private String phoenixColumnMapping;

//Is this a bulkload job.
@StoredAsProperty("phoenix.bulk.load.enabled") private boolean phoenixBulkLoadEnabled;

// These next two fields are not serialized to the metastore.
// If this SqoopOptions is created by reading a saved job, these will
// be populated by the JobStorage to facilitate updating the same
Expand Down Expand Up @@ -432,7 +441,7 @@ private void setDelimiterProperties(Properties props,
putProperty(props, prefix + ".escape",
Integer.toString((int) values.getEscapedBy()));
putProperty(props, prefix + ".enclose.required",
Boolean.toString(values.isEncloseRequired()));
Boolean.toString(values.isEncloseRequired()));
}

/** Take a comma-delimited list of input and split the elements
Expand Down Expand Up @@ -1894,6 +1903,7 @@ public void setConf(Configuration config) {
this.conf = config;
}


/**
* @return command-line arguments after a '-'.
*/
Expand Down Expand Up @@ -2404,6 +2414,51 @@ public void setAccumuloZookeepers(String zookeepers) {
this.accumuloZookeepers = zookeepers;
}

/**
* Get the phoenix table to import
* @return
*/
public String getPhoenixTable() {
return phoenixTable;
}

/**
* sets the target phoenix table
* @param phoenixTable
*/
public void setPhoenixTable(String phoenixTable) {
this.phoenixTable = phoenixTable;
}

/**
* one to one mapping between db columns and phoenix columns for the table
* the pattern is dbcolumn1;phoenixcolumn1,dbcolumn2;phoenixcolumn2
* @return
*/
public String getPhoenixColumnMapping() {
return phoenixColumnMapping;
}

/**
* sets the db column manpping to phoenix column mapping.
* @param phoenixColumnMapping
*/
public void setPhoenixColumnMapping(String phoenixColumnMapping) {
this.phoenixColumnMapping = phoenixColumnMapping;
}

/**
* returns if the load to phoenix is through the bulk load
* @return
*/
public boolean isPhoenixBulkLoadEnabled() {
return phoenixBulkLoadEnabled;
}

public void setPhoenixBulkLoadEnabled(boolean phoenixBulkLoadEnabled) {
this.phoenixBulkLoadEnabled = phoenixBulkLoadEnabled;
}

public void setConnManagerClassName(String connManagerClass) {
this.connManagerClassName = connManagerClass;
}
Expand Down
8 changes: 8 additions & 0 deletions src/java/org/apache/sqoop/manager/ConnManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,14 @@ public boolean isDirectModeHBaseSupported() {
public boolean isDirectModeAccumuloSupported() {
return false;
}

/**
* By default direct mode is not compatible with Phoenix
* @return Whether direct mode is allowed.
*/
public boolean isDirectModePhoenixSupported() {
return false;
}

}

24 changes: 23 additions & 1 deletion src/java/org/apache/sqoop/manager/SqlManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import org.apache.sqoop.mapreduce.AccumuloImportJob;
import org.apache.sqoop.mapreduce.HBaseBulkImportJob;
import org.apache.sqoop.mapreduce.JdbcCallExportJob;
import org.apache.sqoop.mapreduce.PhoenixBulkImportJob;
import org.apache.sqoop.mapreduce.PhoenixImportJob;
import org.apache.sqoop.phoenix.PhoenixUtil;
import org.apache.sqoop.util.LoggingUtils;
import org.apache.sqoop.util.SqlTypeMap;

Expand Down Expand Up @@ -661,12 +664,21 @@ public void importTable(com.cloudera.sqoop.manager.ImportJobContext context)
+ "classpath, cannot import to Accumulo!");
}
importer = new AccumuloImportJob(opts, context);
} else if(opts.getPhoenixTable() != null) {
if(!PhoenixUtil.isPhoenixJarPresent()) {
throw new ImportException("Phoenix jars are not present in "
+ "classpath, cannot import to Phoenix!");
}
if(opts.isPhoenixBulkLoadEnabled()) {
importer = new PhoenixBulkImportJob(opts, context);
} else {
importer = new PhoenixImportJob(opts, context);
}
} else {
// Import to HDFS.
importer = new DataDrivenImportJob(opts, context.getInputFormat(),
context);
}

checkTableImportOptions(context);

String splitCol = getSplitColumn(opts, tableName);
Expand Down Expand Up @@ -704,6 +716,16 @@ public void importQuery(com.cloudera.sqoop.manager.ImportJobContext context)
+ " cannot import to Accumulo!");
}
importer = new AccumuloImportJob(opts, context);
} else if(opts.getPhoenixTable() != null) {
if(!PhoenixUtil.isPhoenixJarPresent()) {
throw new ImportException("Phoenix jars are not present in "
+ "classpath, cannot import to Phoenix!");
}
if(opts.isPhoenixBulkLoadEnabled()) {
importer = new PhoenixBulkImportJob(opts, context);
} else {
importer = new PhoenixImportJob(opts, context);
}
} else {
// Import to HDFS.
importer = new DataDrivenImportJob(opts, context.getInputFormat(),
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,11 @@ public void runImport(String tableName, String ormJarFile, String splitByCol,
throw new IOException("Direct mode is incompatible with "
+ "HBase. Please remove the parameter --direct");
}
if (options.getPhoenixTable() != null && options.isDirect()
&& !getContext().getConnManager().isDirectModePhoenixSupported()) {
throw new IOException("Direct mode is incompatible with "
+ "Phoenix. Please remove the parameter --direct");
}
if (null != tableName) {
LOG.info("Beginning import of " + tableName);
} else {
Expand Down
Loading