Skip to content

Commit

Permalink
[hotfix-769][clickhouse] repair bug due to ru.yandex.clickhouse.Click…
Browse files Browse the repository at this point in the history
…HouseConnectionImpl#setAutoCommit(false)
  • Loading branch information
Paddy0523 authored and FlechazoW committed May 10, 2022
1 parent eb25dbf commit 2d301c3
Showing 1 changed file with 65 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,20 @@

import com.dtstack.flinkx.connector.clickhouse.util.ClickhouseUtil;
import com.dtstack.flinkx.connector.jdbc.source.JdbcInputFormat;
import com.dtstack.flinkx.connector.jdbc.source.JdbcInputSplit;
import com.dtstack.flinkx.util.ColumnBuildUtil;
import com.dtstack.flinkx.util.TableUtil;

import org.apache.flink.core.io.InputSplit;
import org.apache.flink.table.types.logical.RowType;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;

/**
* @program: flinkx
Expand All @@ -31,6 +42,60 @@
*/
public class ClickhouseInputFormat extends JdbcInputFormat {

@Override
public void openInternal(InputSplit inputSplit) {
JdbcInputSplit jdbcInputSplit = (JdbcInputSplit) inputSplit;
initMetric(jdbcInputSplit);
if (!canReadData(jdbcInputSplit)) {
LOG.warn(
"Not read data when the start location are equal to end location, start = {}, end = {}",
jdbcInputSplit.getStartLocation(),
jdbcInputSplit.getEndLocation());
hasNext = false;
return;
}

String querySQL = null;
try {
dbConn = getConnection();

Pair<List<String>, List<String>> pair = null;
List<String> fullColumnList = new LinkedList<>();
List<String> fullColumnTypeList = new LinkedList<>();
if (StringUtils.isBlank(jdbcConf.getCustomSql())) {
pair = getTableMetaData();
fullColumnList = pair.getLeft();
fullColumnTypeList = pair.getRight();
}
Pair<List<String>, List<String>> columnPair =
ColumnBuildUtil.handleColumnList(
jdbcConf.getColumn(), fullColumnList, fullColumnTypeList);
columnNameList = columnPair.getLeft();
columnTypeList = columnPair.getRight();

querySQL = buildQuerySql(jdbcInputSplit);
jdbcConf.setQuerySql(querySQL);
executeQuery(jdbcInputSplit.getStartLocation());
if (!resultSet.isClosed()) {
columnCount = resultSet.getMetaData().getColumnCount();
}
// 增量任务
isUpdateLocation =
jdbcConf.isIncrement() && !jdbcConf.isPolling() && !jdbcConf.isUseMaxFunc();
RowType rowType =
TableUtil.createRowType(
columnNameList, columnTypeList, jdbcDialect.getRawTypeConverter());
setRowConverter(
rowConverter == null
? jdbcDialect.getColumnConverter(rowType, jdbcConf)
: rowConverter);
} catch (SQLException se) {
String expMsg = se.getMessage();
expMsg = querySQL == null ? expMsg : expMsg + "\n querySQL: " + querySQL;
throw new IllegalArgumentException("open() failed." + expMsg, se);
}
}

@Override
protected Connection getConnection() throws SQLException {
return ClickhouseUtil.getConnection(
Expand Down

0 comments on commit 2d301c3

Please sign in to comment.