From c942daafb4f84cef3bf9ea25e839d01358631f75 Mon Sep 17 00:00:00 2001 From: liuliu Date: Tue, 10 May 2022 11:58:17 +0800 Subject: [PATCH] [hotfix-769][clickhouse] repair bug due to ru.yandex.clickhouse.ClickHouseConnectionImpl#setAutoCommit(false) --- .../source/ClickhouseInputFormat.java | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/flinkx-connectors/flinkx-connector-clickhouse/src/main/java/com/dtstack/flinkx/connector/clickhouse/source/ClickhouseInputFormat.java b/flinkx-connectors/flinkx-connector-clickhouse/src/main/java/com/dtstack/flinkx/connector/clickhouse/source/ClickhouseInputFormat.java index 39eb2d6cf9..5c9af8ef75 100644 --- a/flinkx-connectors/flinkx-connector-clickhouse/src/main/java/com/dtstack/flinkx/connector/clickhouse/source/ClickhouseInputFormat.java +++ b/flinkx-connectors/flinkx-connector-clickhouse/src/main/java/com/dtstack/flinkx/connector/clickhouse/source/ClickhouseInputFormat.java @@ -20,9 +20,20 @@ import com.dtstack.flinkx.connector.clickhouse.util.ClickhouseUtil; import com.dtstack.flinkx.connector.jdbc.source.JdbcInputFormat; +import com.dtstack.flinkx.connector.jdbc.source.JdbcInputSplit; +import com.dtstack.flinkx.util.ColumnBuildUtil; +import com.dtstack.flinkx.util.TableUtil; + +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import java.sql.Connection; import java.sql.SQLException; +import java.util.LinkedList; +import java.util.List; /** * @program: flinkx @@ -31,6 +42,60 @@ */ public class ClickhouseInputFormat extends JdbcInputFormat { + @Override + public void openInternal(InputSplit inputSplit) { + JdbcInputSplit jdbcInputSplit = (JdbcInputSplit) inputSplit; + initMetric(jdbcInputSplit); + if (!canReadData(jdbcInputSplit)) { + LOG.warn( + "Not read data when the start location are equal to end location, start = {}, end = {}", + jdbcInputSplit.getStartLocation(), + jdbcInputSplit.getEndLocation()); + hasNext = false; + return; + } + + String querySQL = null; + try { + dbConn = getConnection(); + + Pair, List> pair = null; + List fullColumnList = new LinkedList<>(); + List fullColumnTypeList = new LinkedList<>(); + if (StringUtils.isBlank(jdbcConf.getCustomSql())) { + pair = getTableMetaData(); + fullColumnList = pair.getLeft(); + fullColumnTypeList = pair.getRight(); + } + Pair, List> columnPair = + ColumnBuildUtil.handleColumnList( + jdbcConf.getColumn(), fullColumnList, fullColumnTypeList); + columnNameList = columnPair.getLeft(); + columnTypeList = columnPair.getRight(); + + querySQL = buildQuerySql(jdbcInputSplit); + jdbcConf.setQuerySql(querySQL); + executeQuery(jdbcInputSplit.getStartLocation()); + if (!resultSet.isClosed()) { + columnCount = resultSet.getMetaData().getColumnCount(); + } + // 增量任务 + isUpdateLocation = + jdbcConf.isIncrement() && !jdbcConf.isPolling() && !jdbcConf.isUseMaxFunc(); + RowType rowType = + TableUtil.createRowType( + columnNameList, columnTypeList, jdbcDialect.getRawTypeConverter()); + setRowConverter( + rowConverter == null + ? jdbcDialect.getColumnConverter(rowType, jdbcConf) + : rowConverter); + } catch (SQLException se) { + String expMsg = se.getMessage(); + expMsg = querySQL == null ? expMsg : expMsg + "\n querySQL: " + querySQL; + throw new IllegalArgumentException("open() failed." + expMsg, se); + } + } + @Override protected Connection getConnection() throws SQLException { return ClickhouseUtil.getConnection(