Skip to content
This repository was archived by the owner on Jul 9, 2021. It is now read-only.

Add management of the schema in direct netezza import #33

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/java/org/apache/sqoop/manager/DirectNetezzaManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
Expand Down Expand Up @@ -86,6 +87,8 @@ public class DirectNetezzaManager extends NetezzaManager {
public static final String NETEZZA_TABLE_ENCODING_LONG_ARG =
"encoding";

public static final String NETEZZA_SCHEMA_OPT = "netezza.schema";
public static final String NETEZZA_TABLE_SCHEMA_LONG_ARG = "schema";

private static final String QUERY_CHECK_DICTIONARY_FOR_TABLE =
"SELECT 1 FROM _V_TABLE WHERE OWNER= ? "
Expand Down Expand Up @@ -293,6 +296,9 @@ protected RelatedOptions getNetezzaExtraOpts() {
netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_TABLE_ENCODING_OPT)
.hasArg().withDescription("Table encoding")
.withLongOpt(NETEZZA_TABLE_ENCODING_LONG_ARG).create());
netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_SCHEMA_OPT)
.hasArg().withDescription("Allow Schema")
.withLongOpt(NETEZZA_TABLE_SCHEMA_LONG_ARG).create());
return netezzaOpts;
}

Expand All @@ -302,6 +308,8 @@ private void handleNetezzaExtraArgs(SqoopOptions opts)
Configuration conf = opts.getConf();

String[] extraArgs = opts.getExtraArgs();

LOG.debug("extraArgs " + Arrays.toString(extraArgs));

RelatedOptions netezzaOpts = getNetezzaExtraOpts();
CommandLine cmdLine = new GnuParser().parse(netezzaOpts, extraArgs, true);
Expand All @@ -320,6 +328,12 @@ private void handleNetezzaExtraArgs(SqoopOptions opts)
conf.set(NETEZZA_TABLE_ENCODING_OPT, encoding);
}

if (cmdLine.hasOption(NETEZZA_TABLE_SCHEMA_LONG_ARG)) {
String schemaName = cmdLine.getOptionValue(NETEZZA_TABLE_SCHEMA_LONG_ARG);
LOG.info("We will use schema " + schemaName);
conf.set(NETEZZA_SCHEMA_OPT, schemaName);
}

conf.setBoolean(NETEZZA_CTRL_CHARS_OPT,
cmdLine.hasOption(NETEZZA_CTRL_CHARS_LONG_ARG));

Expand Down Expand Up @@ -353,6 +367,20 @@ public boolean isORMFacilitySelfManaged() {
public boolean isDirectModeHCatSupported() {
return true;
}

@Override
public String escapeTableName(String tableName) {
Configuration conf = options.getConf();

String schema = conf.get(NETEZZA_SCHEMA_OPT);
// Return table name including schema if requested
if (schema != null && !schema.isEmpty()) {
return escapeIdentifier(schema) + "." + escapeIdentifier(tableName);
}

return escapeIdentifier(tableName);
}



public static String getLocalLogDir(TaskAttemptID attemptId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,15 @@ private String getSqlStatement(int myId) throws IOException {
char ec = (char) conf.getInt(DelimiterSet.OUTPUT_ESCAPED_BY_KEY, 0);

String nullValue = conf.get(DirectNetezzaManager.NETEZZA_NULL_VALUE);
String schema = conf.get(DirectNetezzaManager.NETEZZA_SCHEMA_OPT);


String tableName = dbc.getInputTableName();
if (schema!=null)
{
tableName = schema +"."+tableName;
}


int errorThreshold = conf.getInt(
DirectNetezzaManager.NETEZZA_ERROR_THRESHOLD_OPT, 1);
String logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
Expand Down Expand Up @@ -129,7 +136,7 @@ private String getSqlStatement(int myId) throws IOException {
sqlStmt.append(',').append(cols[i]);
}
}
sqlStmt.append(" FROM ").append(dbc.getInputTableName()).append(' ');
sqlStmt.append(" FROM ").append(tableName).append(' ');
sqlStmt.append("WHERE (DATASLICEID % ");
sqlStmt.append(numMappers).append(") = ").append(myId);
if (inputConds != null && inputConds.length() > 0) {
Expand Down