Skip to content
This repository was archived by the owner on Jul 9, 2021. It is now read-only.

Use mysql to import data from hdfs to the database when sqoop.mysql.e… #25

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 110 additions & 36 deletions src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ public class MySQLExportMapper<KEYIN, VALIN>

public static final long DEFAULT_CHECKPOINT_SLEEP_MS = 0;

public static final String MYSQL_CHARSET_KEY =
"sqoop.mysql.export.charset";

public static final String MYSQL_CLAUSE =
"sqoop.mysql.export.clause";

// Configured value for MYSQL_CHECKPOINT_SLEEP_KEY.
protected long checkpointSleepMs;

Expand Down Expand Up @@ -111,8 +117,9 @@ public class MySQLExportMapper<KEYIN, VALIN>
private void initMySQLImportProcess() throws IOException {
File taskAttemptDir = TaskId.getLocalWorkPath(conf);

String tableName = conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE");
this.fifoFile = new File(taskAttemptDir,
conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE") + ".txt");
tableName + ".txt");
String filename = fifoFile.toString();

// Create the FIFO itself.
Expand All @@ -126,9 +133,6 @@ private void initMySQLImportProcess() throws IOException {
"Could not create FIFO to interface with mysqlimport", ioe);
}

// Now open the connection to mysqlimport.
ArrayList<String> args = new ArrayList<String>();

String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
String databaseName = JdbcUrl.getDatabaseName(connectString);
String hostname = JdbcUrl.getHostName(connectString);
Expand All @@ -138,24 +142,64 @@ private void initMySQLImportProcess() throws IOException {
throw new IOException("Could not determine database name");
}

args.add(MySQLUtils.MYSQL_IMPORT_CMD); // needs to be on the path.
String password = DBConfiguration.getPassword((JobConf) conf);

if (null != password && password.length() > 0) {
passwordFile = new File(MySQLUtils.writePasswordFile(conf));
args.add("--defaults-file=" + passwordFile);
ArrayList<String> args = null;
// Use mysql to do the import if sqoop.mysql.export.clause is set
// e.g. hadoop jar build/sqoop-1.4.7-SNAPSHOT.jar org.apache.sqoop.Sqoop export -Dsqoop.mysql.export.clause="set @bh_dataformat='mysql'"
// --connect "jdbc:mysql://localhost/test?user=xxx&password=xxx" --table countries
// --export-dir /user/test/countries.txt --fields-terminated-by '\t' --direct --password xxx --username xxx
String mysqlClause = conf.get(MYSQL_CLAUSE);
if (mysqlClause != null) {
args = getMysqlCommandWithClause(filename, tableName, mysqlClause);
} else {
args = getMysqlImportCommand(filename);
}

String username = conf.get(MySQLUtils.USERNAME_KEY);
if (null != username) {
args.add("--user=" + username);
}

args.add("--host=" + hostname);
if (-1 != port) {
args.add("--port=" + Integer.toString(port));
// Begin the export in an external process.
LOG.info("Starting import with arguments:");
for (String arg : args) {
LOG.info(" " + arg);
}

// Actually start mysqlimport.
mysqlImportProcess = Runtime.getRuntime().exec(args.toArray(new String[0]));

// Log everything it writes to stderr.
// Ignore anything on stdout.
this.outSink = new NullAsyncSink();
this.outSink.processStream(mysqlImportProcess.getInputStream());

this.errSink = new LoggingAsyncSink(LOG);
this.errSink.processStream(mysqlImportProcess.getErrorStream());

// Open the named FIFO after starting mysqlimport.
this.importStream = new BufferedOutputStream(
new FileOutputStream(fifoFile));

// At this point, mysqlimport is running and hooked up to our FIFO.
// The mapper just needs to populate it with data.

this.bytesWritten = 0;
}

private ArrayList<String> getMysqlCommandWithClause(String filename,
String tablename, String clause) throws IOException {
ArrayList<String> args = new ArrayList<String>();
args.add("mysql"); // needs to be on the path.
String databaseName = setupDbHostUserPwd(args);
args.add(databaseName);
args.add("-e");
args.add(clause + ";" + String.format("load data local infile '%s' into table %s", filename, tablename));
return args;
}

private ArrayList<String> getMysqlImportCommand(String filename) throws IOException {
// Now open the connection to mysqlimport.
ArrayList<String> args = new ArrayList<String>();
args.add(MySQLUtils.MYSQL_IMPORT_CMD); // needs to be on the path.

String databaseName = setupDbHostUserPwd(args);

args.add("--compress");
args.add("--local");
args.add("--silent");
Expand Down Expand Up @@ -207,32 +251,61 @@ private void initMySQLImportProcess() throws IOException {
// These two arguments are positional and must be last.
args.add(databaseName);
args.add(filename);
return args;
}

// Begin the export in an external process.
LOG.debug("Starting mysqlimport with arguments:");
for (String arg : args) {
LOG.debug(" " + arg);
private String setupDbHostUserPwd(ArrayList<String> args) throws IOException {
String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
String databaseName = JdbcUrl.getDatabaseName(connectString);
String hostname = JdbcUrl.getHostName(connectString);
int port = JdbcUrl.getPort(connectString);

if (null == databaseName) {
throw new IOException("Could not determine database name");
}

// Actually start mysqlimport.
mysqlImportProcess = Runtime.getRuntime().exec(args.toArray(new String[0]));
String password = DBConfiguration.getPassword((JobConf) conf);

// Log everything it writes to stderr.
// Ignore anything on stdout.
this.outSink = new NullAsyncSink();
this.outSink.processStream(mysqlImportProcess.getInputStream());
if (null != password && password.length() > 0) {
passwordFile = new File(MySQLUtils.writePasswordFile(conf));
args.add("--defaults-file=" + passwordFile);
}

this.errSink = new LoggingAsyncSink(LOG);
this.errSink.processStream(mysqlImportProcess.getErrorStream());
String username = conf.get(MySQLUtils.USERNAME_KEY);
if (null != username) {
args.add("--user=" + username);
}

// Open the named FIFO after starting mysqlimport.
this.importStream = new BufferedOutputStream(
new FileOutputStream(fifoFile));
args.add("--host=" + hostname);
if (-1 != port) {
args.add("--port=" + Integer.toString(port));
}
return databaseName;
}

// At this point, mysqlimport is running and hooked up to our FIFO.
// The mapper just needs to populate it with data.
private String setupDbNameHostUserPwd(ArrayList<String> args) throws IOException {
String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
String databaseName = JdbcUrl.getDatabaseName(connectString);
String hostname = JdbcUrl.getHostName(connectString);
int port = JdbcUrl.getPort(connectString);

this.bytesWritten = 0;
if (null == databaseName) {
throw new IOException("Could not determine database name");
}

args.add(MySQLUtils.MYSQL_IMPORT_CMD); // needs to be on the path.
String password = DBConfiguration.getPassword((JobConf) conf);

String username = conf.get(MySQLUtils.USERNAME_KEY);
if (null != username) {
args.add("--user=" + username);
}

args.add("--host=" + hostname);
if (-1 != port) {
args.add("--port=" + Integer.toString(port));
}
return databaseName;
}

@Override
Expand Down Expand Up @@ -319,7 +392,8 @@ protected void setup(Context context) {
this.conf = context.getConfiguration();

// TODO: Support additional encodings.
this.mysqlCharSet = MySQLUtils.MYSQL_DEFAULT_CHARSET;
this.mysqlCharSet = conf.get(MYSQL_CHARSET_KEY,
MySQLUtils.MYSQL_DEFAULT_CHARSET);

this.checkpointDistInBytes = conf.getLong(
MYSQL_CHECKPOINT_BYTES_KEY, DEFAULT_CHECKPOINT_BYTES);
Expand Down