diff --git a/tdenginereader/doc/tdenginereader-CN.md b/tdenginereader/doc/tdenginereader-CN.md index aa3751efb8..e950275640 100644 --- a/tdenginereader/doc/tdenginereader-CN.md +++ b/tdenginereader/doc/tdenginereader-CN.md @@ -22,7 +22,7 @@ TDengineReader 通过 TDengine 的 JDBC driver 查询获取数据。 "reader": { "name": "tdenginereader", "parameter": { - "user": "root", + "username": "root", "password": "taosdata", "connection": [ { @@ -165,24 +165,8 @@ TDengineReader 通过 TDengine 的 JDBC driver 查询获取数据。 #### 4.1.1 数据特征 -建表语句: - -单行记录类似于: - #### 4.1.2 机器参数 -* 执行DataX的机器参数为: - 1. cpu: - 2. mem: - 3. net: 千兆双网卡 - 4. disc: DataX 数据不落磁盘,不统计此项 - -* TDengine数据库机器参数为: - 1. cpu: - 2. mem: - 3. net: 千兆双网卡 - 4. disc: - #### 4.1.3 DataX jvm 参数 -Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError @@ -201,9 +185,6 @@ TDengineReader 通过 TDengine 的 JDBC driver 查询获取数据。 说明: -1. 这里的单表,主键类型为 bigint(20),自增。 -2. batchSize 和 通道个数,对性能影响较大。 - #### 4.2.4 性能测试小结 1. diff --git a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java index 332ddf3aff..379777941d 100644 --- a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java +++ b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java @@ -93,7 +93,7 @@ public void init() { } if (start >= end) throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, - "The parameter [" + Key.BEGIN_DATETIME + "] should be less than the parameter [" + Key.END_DATETIME + "]."); + "The parameter " + Key.BEGIN_DATETIME + ": " + beginDatetime + " should be less than the parameter " + Key.END_DATETIME + ": " + endDatetime + "."); } @@ -119,7 +119,6 @@ public List split(int adviceNumber) { } } - LOG.info("Configuration: {}", configurations); return configurations; } } @@ -142,15 +141,14 @@ public static class Task extends Reader.Task { try { Class.forName("com.taosdata.jdbc.TSDBDriver"); Class.forName("com.taosdata.jdbc.rs.RestfulDriver"); - } catch (ClassNotFoundException e) { - e.printStackTrace(); + } catch (ClassNotFoundException ignored) { + LOG.warn(ignored.getMessage(), ignored); } } @Override public void init() { this.readerSliceConfig = super.getPluginJobConf(); - LOG.info("getPluginJobConf: {}", readerSliceConfig); String user = readerSliceConfig.getString(Key.USERNAME); String password = readerSliceConfig.getString(Key.PASSWORD); @@ -174,7 +172,12 @@ public void init() { @Override public void destroy() { - + try { + if (conn != null) + conn.close(); + } catch (SQLException e) { + LOG.error(e.getMessage(), e); + } } @Override @@ -199,22 +202,15 @@ public void startRead(RecordSender recordSender) { sqlList.addAll(querySql); } - try (Statement stmt = conn.createStatement()) { - for (String sql : sqlList) { + for (String sql : sqlList) { + try (Statement stmt = conn.createStatement()) { ResultSet rs = stmt.executeQuery(sql); while (rs.next()) { Record record = buildRecord(recordSender, rs, mandatoryEncoding); recordSender.sendToWriter(record); } - } - } catch (SQLException e) { - throw DataXException.asDataXException(TDengineReaderErrorCode.RUNTIME_EXCEPTION, e.getMessage(), e); - } finally { - try { - if (conn != null) - conn.close(); } catch (SQLException e) { - e.printStackTrace(); + LOG.error(e.getMessage(), e); } } } diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Constants.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Constants.java index d62c8f3273..e0445219e8 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Constants.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Constants.java @@ -1,8 +1,6 @@ package com.alibaba.datax.plugin.writer.tdenginewriter; public class Constants { - public static final String DEFAULT_USERNAME = "root"; - public static final String DEFAULT_PASSWORD = "taosdata"; - public static final int DEFAULT_BATCH_SIZE = 1; + public static final int DEFAULT_BATCH_SIZE = 1000; public static final boolean DEFAULT_IGNORE_TAGS_UNMATCHED = false; } \ No newline at end of file diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java index 4ee91ce08b..9801a32394 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java @@ -52,14 +52,13 @@ public void setSchemaManager(SchemaManager schemaManager) { try { Class.forName("com.taosdata.jdbc.TSDBDriver"); Class.forName("com.taosdata.jdbc.rs.RestfulDriver"); - } catch (ClassNotFoundException e) { - e.printStackTrace(); + } catch (ClassNotFoundException ignored) { } } public DefaultDataHandler(Configuration configuration, TaskPluginCollector taskPluginCollector) { - this.username = configuration.getString(Key.USERNAME, Constants.DEFAULT_USERNAME); - this.password = configuration.getString(Key.PASSWORD, Constants.DEFAULT_PASSWORD); + this.username = configuration.getString(Key.USERNAME); + this.password = configuration.getString(Key.PASSWORD); this.jdbcUrl = configuration.getString(Key.JDBC_URL); this.batchSize = configuration.getInt(Key.BATCH_SIZE, Constants.DEFAULT_BATCH_SIZE); this.tables = configuration.getList(Key.TABLE, String.class); @@ -73,14 +72,15 @@ public int handle(RecordReceiver lineReceiver, TaskPluginCollector collector) { int count = 0; int affectedRows = 0; - try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) { LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established."); - // prepare table_name -> table_meta - this.schemaManager = new SchemaManager(conn); - this.tableMetas = schemaManager.loadTableMeta(tables); - // prepare table_name -> column_meta - this.columnMetas = schemaManager.loadColumnMetas(tables); + if (schemaManager == null) { + // prepare table_name -> table_meta + this.schemaManager = new SchemaManager(conn); + this.tableMetas = schemaManager.loadTableMeta(tables); + // prepare table_name -> column_meta + this.columnMetas = schemaManager.loadColumnMetas(tables); + } List recordBatch = new ArrayList<>(); Record record; @@ -89,10 +89,11 @@ public int handle(RecordReceiver lineReceiver, TaskPluginCollector collector) { recordBatch.add(record); } else { try { - affectedRows = writeBatch(conn, recordBatch); - } catch (SQLException e) { + recordBatch.add(record); + affectedRows += writeBatch(conn, recordBatch); + } catch (Exception e) { LOG.warn("use one row insert. because:" + e.getMessage()); - affectedRows = writeEachRow(conn, recordBatch); + affectedRows += writeEachRow(conn, recordBatch); } recordBatch.clear(); } @@ -101,10 +102,10 @@ public int handle(RecordReceiver lineReceiver, TaskPluginCollector collector) { if (!recordBatch.isEmpty()) { try { - affectedRows = writeBatch(conn, recordBatch); - } catch (SQLException e) { + affectedRows += writeBatch(conn, recordBatch); + } catch (Exception e) { LOG.warn("use one row insert. because:" + e.getMessage()); - affectedRows = writeEachRow(conn, recordBatch); + affectedRows += writeEachRow(conn, recordBatch); } recordBatch.clear(); } @@ -126,8 +127,8 @@ private int writeEachRow(Connection conn, List recordBatch) { recordList.add(record); try { affectedRows += writeBatch(conn, recordList); - } catch (SQLException e) { - LOG.error(e.getMessage()); + } catch (Exception e) { + LOG.error(e.getMessage(), e); this.taskPluginCollector.collectDirtyRecord(record, e); } } @@ -145,7 +146,7 @@ private int writeEachRow(Connection conn, List recordBatch) { * 3. 对于tb,拼sql,例如:data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => insert into tb(ts, f1, f2) values(ts, f1, f2) * 4. 对于t,拼sql,例如:data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, f3, t1, t2] insert into t(ts, f1, f2, f3, t1, t2) values(ts, f1, f2, f3, t1, t2) */ - public int writeBatch(Connection conn, List recordBatch) throws SQLException { + public int writeBatch(Connection conn, List recordBatch) throws Exception { int affectedRows = 0; for (String table : tables) { TableMeta tableMeta = tableMetas.get(table); @@ -173,31 +174,62 @@ public int writeBatch(Connection conn, List recordBatch) throws SQLExcep * record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], ) * record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], ) */ - private int writeBatchToSupTableBySQL(Connection conn, String table, List recordBatch) throws SQLException { + private int writeBatchToSupTableBySQL(Connection conn, String table, List recordBatch) throws Exception { List columnMetas = this.columnMetas.get(table); StringBuilder sb = new StringBuilder("insert into"); for (Record record : recordBatch) { sb.append(" ").append(record.getColumn(indexOf("tbname")).asString()) .append(" using ").append(table) - .append(" tags") - .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { - return colMeta.isTag; - }).map(colMeta -> { - return buildColumnValue(colMeta, record); - }).collect(Collectors.joining(",", "(", ")"))) - .append(" ") - .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { + .append(" tags"); +// sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { +// return colMeta.isTag; +// }).map(colMeta -> { +// return buildColumnValue(colMeta, record); +// }).collect(Collectors.joining(",", "(", ")"))); + sb.append("("); + for (int i = 0; i < columns.size(); i++) { + ColumnMeta colMeta = columnMetas.get(i); + if (!columns.contains(colMeta.field)) + continue; + if (!colMeta.isTag) + continue; + String tagValue = buildColumnValue(colMeta, record); + if (i == 0) { + sb.append(tagValue); + } else { + sb.append(",").append(tagValue); + } + } + sb.append(")"); + + sb.append(" ").append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { return !colMeta.isTag; }).map(colMeta -> { return colMeta.field; }).collect(Collectors.joining(",", "(", ")"))) - .append(" values") - .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { - return !colMeta.isTag; - }).map(colMeta -> { - return buildColumnValue(colMeta, record); - }).collect(Collectors.joining(",", "(", ")"))); + .append(" values"); + +// sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { +// return !colMeta.isTag; +// }).map(colMeta -> { +// return buildColumnValue(colMeta, record); +// }).collect(Collectors.joining(",", "(", ")"))); + sb.append("("); + for (int i = 0; i < columnMetas.size(); i++) { + ColumnMeta colMeta = columnMetas.get(i); + if (!columns.contains(colMeta.field)) + continue; + if (colMeta.isTag) + continue; + String colValue = buildColumnValue(colMeta, record); + if (i == 0) { + sb.append(colValue); + } else { + sb.append(",").append(colValue); + } + } + sb.append(")"); } String sql = sb.toString(); @@ -213,10 +245,11 @@ private int executeUpdate(Connection conn, String sql) throws SQLException { return count; } - private String buildColumnValue(ColumnMeta colMeta, Record record) { + private String buildColumnValue(ColumnMeta colMeta, Record record) throws Exception { Column column = record.getColumn(indexOf(colMeta.field)); TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision(); - switch (column.getType()) { + Column.Type type = column.getType(); + switch (type) { case DATE: { Date value = column.asDate(); switch (timestampPrecision) { @@ -243,8 +276,9 @@ private String buildColumnValue(ColumnMeta colMeta, Record record) { case DOUBLE: case INT: case LONG: + column.asString(); default: - return column.asString(); + throw new Exception("invalid column type: " + type); } } @@ -392,7 +426,7 @@ private String buildSchemalessColumnValue(ColumnMeta colMeta, Record record) { * else * insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)]) */ - private int writeBatchToSubTable(Connection conn, String table, List recordBatch) throws SQLException { + private int writeBatchToSubTable(Connection conn, String table, List recordBatch) throws Exception { List columnMetas = this.columnMetas.get(table); StringBuilder sb = new StringBuilder(); @@ -419,11 +453,25 @@ private int writeBatchToSubTable(Connection conn, String table, List rec if (ignoreTagsUnmatched && !tagsAllMatch) continue; - sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { - return !colMeta.isTag; - }).map(colMeta -> { - return buildColumnValue(colMeta, record); - }).collect(Collectors.joining(", ", "(", ") "))); +// sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { +// return !colMeta.isTag; +// }).map(colMeta -> { +// return buildColumnValue(colMeta, record); +// }).collect(Collectors.joining(", ", "(", ") "))); + sb.append("("); + for (int i = 0; i < columnMetas.size(); i++) { + ColumnMeta colMeta = columnMetas.get(i); + if (colMeta.isTag) + continue; + String colValue = buildColumnValue(colMeta, record); + if (i == 0) { + sb.append(colValue); + } else { + sb.append(",").append(colValue); + } + } + sb.append(")"); + validRecords++; } @@ -462,7 +510,7 @@ private boolean equals(Column column, ColumnMeta colMeta) { * table: ["weather"], column: ["ts, f1, f2, f3, t1, t2"] * sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...) */ - private int writeBatchToNormalTable(Connection conn, String table, List recordBatch) throws SQLException { + private int writeBatchToNormalTable(Connection conn, String table, List recordBatch) throws Exception { List columnMetas = this.columnMetas.get(table); StringBuilder sb = new StringBuilder(); @@ -474,9 +522,22 @@ private int writeBatchToNormalTable(Connection conn, String table, List .append(" values "); for (Record record : recordBatch) { - sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> { - return buildColumnValue(colMeta, record); - }).collect(Collectors.joining(",", "(", ")"))); +// sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> { +// return buildColumnValue(colMeta, record); +// }).collect(Collectors.joining(",", "(", ")"))); + sb.append("("); + for (int i = 0; i < columnMetas.size(); i++) { + ColumnMeta colMeta = columnMetas.get(i); + if (!columns.contains(colMeta.field)) + continue; + String colValue = buildColumnValue(colMeta, record); + if (i == 0) { + sb.append(colValue); + } else { + sb.append(",").append(colValue); + } + } + sb.append(")"); } String sql = sb.toString(); @@ -492,4 +553,4 @@ private int indexOf(String colName) throws DataXException { "cannot find col: " + colName + " in columns: " + columns); } -} +} \ No newline at end of file diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java index c48b7942dc..f3bdbfbc31 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java @@ -10,6 +10,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.*; +import java.util.stream.Collectors; public class SchemaManager { private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class); @@ -123,12 +124,14 @@ public Map> loadColumnMetas(List tables) throws } } } catch (SQLException e) { - e.printStackTrace(); + LOG.error(e.getMessage(), e); } colMeta.value = value; }); - LOG.debug("load column metadata of " + table + ": " + Arrays.toString(columnMetaList.toArray())); + LOG.debug("load column metadata of " + table + ": " + + columnMetaList.stream().map(ColumnMeta::toString).collect(Collectors.joining(",", "[", "]")) + ); ret.put(table, columnMetaList); } return ret; @@ -142,7 +145,9 @@ private TableMeta buildSupTableMeta(ResultSet rs) throws SQLException { tableMeta.tags = rs.getInt("tags"); tableMeta.tables = rs.getInt("tables"); - LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta); + if (LOG.isDebugEnabled()){ + LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta); + } return tableMeta; } diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java index 469449e63a..e1f2bc291c 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java @@ -4,10 +4,10 @@ public enum TDengineWriterErrorCode implements ErrorCode { - REQUIRED_VALUE("TDengineWriter-00", "缺失必要的值"), - ILLEGAL_VALUE("TDengineWriter-01", "值非法"), - RUNTIME_EXCEPTION("TDengineWriter-02", "运行时异常"), - TYPE_ERROR("TDengineWriter-03", "Datax类型无法正确映射到TDengine类型"); + REQUIRED_VALUE("TDengineWriter-00", "parameter value is missing"), + ILLEGAL_VALUE("TDengineWriter-01", "invalid parameter value"), + RUNTIME_EXCEPTION("TDengineWriter-02", "runtime exception"), + TYPE_ERROR("TDengineWriter-03", "data type mapping error"); private final String code; private final String description; diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java index e0acacb8bd..1034b74a88 100644 --- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java @@ -28,7 +28,7 @@ public class DefaultDataHandlerTest { private final TaskPluginCollector taskPluginCollector = new TDengineWriter.Task().getTaskPluginCollector(); @Test - public void writeSupTableBySQL() throws SQLException { + public void writeSupTableBySQL() throws Exception { // given createSupAndSubTable(); Configuration configuration = Configuration.from("{" + @@ -68,7 +68,7 @@ public void writeSupTableBySQL() throws SQLException { } @Test - public void writeSupTableBySQL_2() throws SQLException { + public void writeSupTableBySQL_2() throws Exception { // given createSupAndSubTable(); Configuration configuration = Configuration.from("{" + @@ -106,7 +106,7 @@ public void writeSupTableBySQL_2() throws SQLException { } @Test - public void writeSupTableBySchemaless() throws SQLException { + public void writeSupTableBySchemaless() throws Exception { // given createSupTable(); Configuration configuration = Configuration.from("{" + @@ -146,7 +146,7 @@ public void writeSupTableBySchemaless() throws SQLException { } @Test - public void writeSubTableWithTableName() throws SQLException { + public void writeSubTableWithTableName() throws Exception { // given createSupAndSubTable(); Configuration configuration = Configuration.from("{" + @@ -185,7 +185,7 @@ public void writeSubTableWithTableName() throws SQLException { } @Test - public void writeSubTableWithoutTableName() throws SQLException { + public void writeSubTableWithoutTableName() throws Exception { // given createSupAndSubTable(); Configuration configuration = Configuration.from("{" + @@ -224,7 +224,7 @@ public void writeSubTableWithoutTableName() throws SQLException { } @Test - public void writeNormalTable() throws SQLException { + public void writeNormalTable() throws Exception { // given createSupAndSubTable(); Configuration configuration = Configuration.from("{" +