From 1bbd501153ccd7f96dc4a736b71d0d1eeea11d79 Mon Sep 17 00:00:00 2001 From: rgan Date: Mon, 1 Aug 2016 22:27:54 -0400 Subject: [PATCH] Use mysql to import data from hdfs to the database when sqoop.mysql.export.clause is set --- .../sqoop/mapreduce/MySQLExportMapper.java | 146 +++++++++++++----- 1 file changed, 110 insertions(+), 36 deletions(-) diff --git a/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java b/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java index 0cfb0b349..bbb96a8f8 100644 --- a/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java @@ -74,6 +74,12 @@ public class MySQLExportMapper public static final long DEFAULT_CHECKPOINT_SLEEP_MS = 0; + public static final String MYSQL_CHARSET_KEY = + "sqoop.mysql.export.charset"; + + public static final String MYSQL_CLAUSE = + "sqoop.mysql.export.clause"; + // Configured value for MYSQL_CHECKPOINT_SLEEP_KEY. protected long checkpointSleepMs; @@ -111,8 +117,9 @@ public class MySQLExportMapper private void initMySQLImportProcess() throws IOException { File taskAttemptDir = TaskId.getLocalWorkPath(conf); + String tableName = conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE"); this.fifoFile = new File(taskAttemptDir, - conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE") + ".txt"); + tableName + ".txt"); String filename = fifoFile.toString(); // Create the FIFO itself. @@ -126,9 +133,6 @@ private void initMySQLImportProcess() throws IOException { "Could not create FIFO to interface with mysqlimport", ioe); } - // Now open the connection to mysqlimport. - ArrayList args = new ArrayList(); - String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY); String databaseName = JdbcUrl.getDatabaseName(connectString); String hostname = JdbcUrl.getHostName(connectString); @@ -138,24 +142,64 @@ private void initMySQLImportProcess() throws IOException { throw new IOException("Could not determine database name"); } - args.add(MySQLUtils.MYSQL_IMPORT_CMD); // needs to be on the path. - String password = DBConfiguration.getPassword((JobConf) conf); - - if (null != password && password.length() > 0) { - passwordFile = new File(MySQLUtils.writePasswordFile(conf)); - args.add("--defaults-file=" + passwordFile); + ArrayList args = null; + // Use mysql to do the import if sqoop.mysql.export.clause is set + // e.g. hadoop jar build/sqoop-1.4.7-SNAPSHOT.jar org.apache.sqoop.Sqoop export -Dsqoop.mysql.export.clause="set @bh_dataformat='mysql'" + // --connect "jdbc:mysql://localhost/test?user=xxx&password=xxx" --table countries + // --export-dir /user/test/countries.txt --fields-terminated-by '\t' --direct --password xxx --username xxx + String mysqlClause = conf.get(MYSQL_CLAUSE); + if (mysqlClause != null) { + args = getMysqlCommandWithClause(filename, tableName, mysqlClause); + } else { + args = getMysqlImportCommand(filename); } - String username = conf.get(MySQLUtils.USERNAME_KEY); - if (null != username) { - args.add("--user=" + username); - } - args.add("--host=" + hostname); - if (-1 != port) { - args.add("--port=" + Integer.toString(port)); + // Begin the export in an external process. + LOG.info("Starting import with arguments:"); + for (String arg : args) { + LOG.info(" " + arg); } + // Actually start mysqlimport. + mysqlImportProcess = Runtime.getRuntime().exec(args.toArray(new String[0])); + + // Log everything it writes to stderr. + // Ignore anything on stdout. + this.outSink = new NullAsyncSink(); + this.outSink.processStream(mysqlImportProcess.getInputStream()); + + this.errSink = new LoggingAsyncSink(LOG); + this.errSink.processStream(mysqlImportProcess.getErrorStream()); + + // Open the named FIFO after starting mysqlimport. + this.importStream = new BufferedOutputStream( + new FileOutputStream(fifoFile)); + + // At this point, mysqlimport is running and hooked up to our FIFO. + // The mapper just needs to populate it with data. + + this.bytesWritten = 0; + } + + private ArrayList getMysqlCommandWithClause(String filename, + String tablename, String clause) throws IOException { + ArrayList args = new ArrayList(); + args.add("mysql"); // needs to be on the path. + String databaseName = setupDbHostUserPwd(args); + args.add(databaseName); + args.add("-e"); + args.add(clause + ";" + String.format("load data local infile '%s' into table %s", filename, tablename)); + return args; + } + + private ArrayList getMysqlImportCommand(String filename) throws IOException { + // Now open the connection to mysqlimport. + ArrayList args = new ArrayList(); + args.add(MySQLUtils.MYSQL_IMPORT_CMD); // needs to be on the path. + + String databaseName = setupDbHostUserPwd(args); + args.add("--compress"); args.add("--local"); args.add("--silent"); @@ -207,32 +251,61 @@ private void initMySQLImportProcess() throws IOException { // These two arguments are positional and must be last. args.add(databaseName); args.add(filename); + return args; + } - // Begin the export in an external process. - LOG.debug("Starting mysqlimport with arguments:"); - for (String arg : args) { - LOG.debug(" " + arg); + private String setupDbHostUserPwd(ArrayList args) throws IOException { + String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY); + String databaseName = JdbcUrl.getDatabaseName(connectString); + String hostname = JdbcUrl.getHostName(connectString); + int port = JdbcUrl.getPort(connectString); + + if (null == databaseName) { + throw new IOException("Could not determine database name"); } - // Actually start mysqlimport. - mysqlImportProcess = Runtime.getRuntime().exec(args.toArray(new String[0])); + String password = DBConfiguration.getPassword((JobConf) conf); - // Log everything it writes to stderr. - // Ignore anything on stdout. - this.outSink = new NullAsyncSink(); - this.outSink.processStream(mysqlImportProcess.getInputStream()); + if (null != password && password.length() > 0) { + passwordFile = new File(MySQLUtils.writePasswordFile(conf)); + args.add("--defaults-file=" + passwordFile); + } - this.errSink = new LoggingAsyncSink(LOG); - this.errSink.processStream(mysqlImportProcess.getErrorStream()); + String username = conf.get(MySQLUtils.USERNAME_KEY); + if (null != username) { + args.add("--user=" + username); + } - // Open the named FIFO after starting mysqlimport. - this.importStream = new BufferedOutputStream( - new FileOutputStream(fifoFile)); + args.add("--host=" + hostname); + if (-1 != port) { + args.add("--port=" + Integer.toString(port)); + } + return databaseName; + } - // At this point, mysqlimport is running and hooked up to our FIFO. - // The mapper just needs to populate it with data. + private String setupDbNameHostUserPwd(ArrayList args) throws IOException { + String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY); + String databaseName = JdbcUrl.getDatabaseName(connectString); + String hostname = JdbcUrl.getHostName(connectString); + int port = JdbcUrl.getPort(connectString); - this.bytesWritten = 0; + if (null == databaseName) { + throw new IOException("Could not determine database name"); + } + + args.add(MySQLUtils.MYSQL_IMPORT_CMD); // needs to be on the path. + String password = DBConfiguration.getPassword((JobConf) conf); + + String username = conf.get(MySQLUtils.USERNAME_KEY); + if (null != username) { + args.add("--user=" + username); + } + + args.add("--host=" + hostname); + if (-1 != port) { + args.add("--port=" + Integer.toString(port)); + } + return databaseName; } @Override @@ -319,7 +392,8 @@ protected void setup(Context context) { this.conf = context.getConfiguration(); // TODO: Support additional encodings. - this.mysqlCharSet = MySQLUtils.MYSQL_DEFAULT_CHARSET; + this.mysqlCharSet = conf.get(MYSQL_CHARSET_KEY, + MySQLUtils.MYSQL_DEFAULT_CHARSET); this.checkpointDistInBytes = conf.getLong( MYSQL_CHECKPOINT_BYTES_KEY, DEFAULT_CHECKPOINT_BYTES);