diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 818d136a83f..234c8a603b8 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,25 +1,7 @@ - - - - + - + \ No newline at end of file diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java index 9e5594136b8..ab5b504188c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java @@ -17,6 +17,11 @@ package org.apache.flink.cdc.connectors.base.source.assigner.splitter; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.Column; +import io.debezium.relational.Table; +import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges; import org.apache.flink.cdc.common.annotation.Experimental; import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect; @@ -27,17 +32,10 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.FlinkRuntimeException; - -import io.debezium.jdbc.JdbcConnection; -import io.debezium.relational.Column; -import io.debezium.relational.Table; -import io.debezium.relational.TableId; -import io.debezium.relational.history.TableChanges; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; - import java.math.BigDecimal; import java.sql.SQLException; import java.util.ArrayList; @@ -56,379 +54,362 @@ /** The {@code ChunkSplitter} used to split table into a set of chunks for JDBC data source. */ @Experimental public abstract class JdbcSourceChunkSplitter implements ChunkSplitter { - private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceChunkSplitter.class); - protected final JdbcSourceConfig sourceConfig; - protected final JdbcDataSourceDialect dialect; - - public JdbcSourceChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) { - this.sourceConfig = sourceConfig; - this.dialect = dialect; + private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceChunkSplitter.class); + protected final JdbcSourceConfig sourceConfig; + protected final JdbcDataSourceDialect dialect; + + public JdbcSourceChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) { + this.sourceConfig = sourceConfig; + this.dialect = dialect; + } + + /** Generates all snapshot splits (chunks) for the give table path. */ + @Override + public Collection generateSplits(TableId tableId) { + try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) { + + LOG.info("Start splitting table {} into chunks...", tableId); + long start = System.currentTimeMillis(); + + Table table = Objects.requireNonNull(dialect.queryTableSchema(jdbc, tableId)).getTable(); + Column splitColumn = getSplitColumn(table, sourceConfig.getChunkKeyColumn()); + final List chunks; + try { + chunks = splitTableIntoChunks(jdbc, tableId, splitColumn); + } catch (SQLException e) { + throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e); + } + + // convert chunks into splits + List splits = new ArrayList<>(); + RowType splitType = getSplitType(splitColumn); + for (int i = 0; i < chunks.size(); i++) { + ChunkRange chunk = chunks.get(i); + SnapshotSplit split = + createSnapshotSplit( + jdbc, tableId, i, splitType, chunk.getChunkStart(), chunk.getChunkEnd()); + splits.add(split); + } + + long end = System.currentTimeMillis(); + LOG.info( + "Split table {} into {} chunks, time cost: {}ms.", tableId, splits.size(), end - start); + return splits; + } catch (Exception e) { + throw new FlinkRuntimeException( + String.format("Generate Splits for table %s error", tableId), e); } - - /** Generates all snapshot splits (chunks) for the give table path. */ - @Override - public Collection generateSplits(TableId tableId) { - try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) { - - LOG.info("Start splitting table {} into chunks...", tableId); - long start = System.currentTimeMillis(); - - Table table = - Objects.requireNonNull(dialect.queryTableSchema(jdbc, tableId)).getTable(); - Column splitColumn = getSplitColumn(table, sourceConfig.getChunkKeyColumn()); - final List chunks; - try { - chunks = splitTableIntoChunks(jdbc, tableId, splitColumn); - } catch (SQLException e) { - throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e); - } - - // convert chunks into splits - List splits = new ArrayList<>(); - RowType splitType = getSplitType(splitColumn); - for (int i = 0; i < chunks.size(); i++) { - ChunkRange chunk = chunks.get(i); - SnapshotSplit split = - createSnapshotSplit( - jdbc, - tableId, - i, - splitType, - chunk.getChunkStart(), - chunk.getChunkEnd()); - splits.add(split); - } - - long end = System.currentTimeMillis(); - LOG.info( - "Split table {} into {} chunks, time cost: {}ms.", - tableId, - splits.size(), - end - start); - return splits; - } catch (Exception e) { - throw new FlinkRuntimeException( - String.format("Generate Splits for table %s error", tableId), e); - } + } + + /** + * Query the maximum value of the next chunk, and the next chunk must be greater than or equal to + * includedLowerBound value [min_1, max_1), [min_2, max_2),... [min_n, null). Each + * time this method is called it will return max1, max2... + * + *

Each database has different grammar to get limit number of data, for example, `limit N` in + * mysql or postgres, `top(N)` in sqlserver , `FETCH FIRST %S ROWS ONLY` in DB2. + * + * @param jdbc JDBC connection. + * @param tableId table identity. + * @param splitColumn column. + * @param chunkSize chunk size. + * @param includedLowerBound the previous chunk end value. + * @return next chunk end value. + */ + protected abstract Object queryNextChunkMax( + JdbcConnection jdbc, + TableId tableId, + Column splitColumn, + int chunkSize, + Object includedLowerBound) + throws SQLException; + + /** + * Approximate total number of entries in the lookup table. + * + *

Each database has different system table to lookup up approximate total number. For example, + * `pg_class` in postgres, `sys.dm_db_partition_stats` in sqlserver, `SYSCAT.TABLE` in db2. + * + * @param jdbc JDBC connection. + * @param tableId table identity. + * @return approximate row count. + */ + protected abstract Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) + throws SQLException; + + /** + * Checks whether split column is evenly distributed across its range. + * + * @param splitColumn split column. + * @return true that means split column with type BIGINT, INT, DECIMAL. + */ + protected boolean isEvenlySplitColumn(Column splitColumn) { + DataType flinkType = fromDbzColumn(splitColumn); + LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot(); + + // currently, we only support the optimization that split column with type BIGINT, INT, + // DECIMAL + return typeRoot == LogicalTypeRoot.BIGINT + || typeRoot == LogicalTypeRoot.INTEGER + || typeRoot == LogicalTypeRoot.DECIMAL; + } + + /** + * Get a corresponding Flink data type from a debezium {@link Column}. + * + * @param splitColumn dbz split column. + * @return flink data type + */ + protected abstract DataType fromDbzColumn(Column splitColumn); + + /** Returns the distribution factor of the given table. */ + protected double calculateDistributionFactor( + TableId tableId, Object min, Object max, long approximateRowCnt) { + + if (!min.getClass().equals(max.getClass())) { + throw new IllegalStateException( + String.format( + "Unsupported operation type, the MIN value type %s is different with MAX value type %s.", + min.getClass().getSimpleName(), max.getClass().getSimpleName())); } - - /** - * Query the maximum value of the next chunk, and the next chunk must be greater than or equal - * to includedLowerBound value [min_1, max_1), [min_2, max_2),... [min_n, null). - * Each time this method is called it will return max1, max2... - * - *

Each database has different grammar to get limit number of data, for example, `limit N` in - * mysql or postgres, `top(N)` in sqlserver , `FETCH FIRST %S ROWS ONLY` in DB2. - * - * @param jdbc JDBC connection. - * @param tableId table identity. - * @param splitColumn column. - * @param chunkSize chunk size. - * @param includedLowerBound the previous chunk end value. - * @return next chunk end value. - */ - protected abstract Object queryNextChunkMax( - JdbcConnection jdbc, - TableId tableId, - Column splitColumn, - int chunkSize, - Object includedLowerBound) - throws SQLException; - - /** - * Approximate total number of entries in the lookup table. - * - *

Each database has different system table to lookup up approximate total number. For - * example, `pg_class` in postgres, `sys.dm_db_partition_stats` in sqlserver, `SYSCAT.TABLE` in - * db2. - * - * @param jdbc JDBC connection. - * @param tableId table identity. - * @return approximate row count. - */ - protected abstract Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) - throws SQLException; - - /** - * Checks whether split column is evenly distributed across its range. - * - * @param splitColumn split column. - * @return true that means split column with type BIGINT, INT, DECIMAL. - */ - protected boolean isEvenlySplitColumn(Column splitColumn) { - DataType flinkType = fromDbzColumn(splitColumn); - LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot(); - - // currently, we only support the optimization that split column with type BIGINT, INT, - // DECIMAL - return typeRoot == LogicalTypeRoot.BIGINT - || typeRoot == LogicalTypeRoot.INTEGER - || typeRoot == LogicalTypeRoot.DECIMAL; + if (approximateRowCnt == 0) { + return Double.MAX_VALUE; } - - /** - * Get a corresponding Flink data type from a debezium {@link Column}. - * - * @param splitColumn dbz split column. - * @return flink data type - */ - protected abstract DataType fromDbzColumn(Column splitColumn); - - /** Returns the distribution factor of the given table. */ - protected double calculateDistributionFactor( - TableId tableId, Object min, Object max, long approximateRowCnt) { - - if (!min.getClass().equals(max.getClass())) { - throw new IllegalStateException( - String.format( - "Unsupported operation type, the MIN value type %s is different with MAX value type %s.", - min.getClass().getSimpleName(), max.getClass().getSimpleName())); - } - if (approximateRowCnt == 0) { - return Double.MAX_VALUE; - } - BigDecimal difference = ObjectUtils.minus(max, min); - // factor = (max - min + 1) / rowCount - final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1)); - double distributionFactor = - subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, ROUND_CEILING).doubleValue(); - LOG.info( - "The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}", - tableId, - distributionFactor, - min, - max, - approximateRowCnt); - return distributionFactor; + BigDecimal difference = ObjectUtils.minus(max, min); + // factor = (max - min + 1) / rowCount + final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1)); + double distributionFactor = + subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, ROUND_CEILING).doubleValue(); + LOG.info( + "The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}", + tableId, + distributionFactor, + min, + max, + approximateRowCnt); + return distributionFactor; + } + + /** + * Get the column which is seen as chunk key. + * + * @param table table identity. + * @param chunkKeyColumn column name which is seen as chunk key, if chunkKeyColumn is null, use + * primary key instead. @Column the column which is seen as chunk key. + */ + protected Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) { + return JdbcChunkUtils.getSplitColumn(table, chunkKeyColumn); + } + + /** ChunkEnd less than or equal to max. */ + protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) { + return ObjectUtils.compare(chunkEnd, max) <= 0; + } + + /** ChunkEnd greater than or equal to max. */ + protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) { + return ObjectUtils.compare(chunkEnd, max) >= 0; + } + + /** + * Query the maximum and minimum value of the column in the table. e.g. query string + * SELECT MIN(%s) FROM %s WHERE %s > ? + * + * @param jdbc JDBC connection. + * @param tableId table identity. + * @param splitColumn column. + * @return maximum and minimum value. + */ + protected Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column splitColumn) + throws SQLException { + return JdbcChunkUtils.queryMinMax( + jdbc, jdbc.quotedTableIdString(tableId), jdbc.quotedColumnIdString(splitColumn.name())); + } + + /** + * Query the minimum value of the column in the table, and the minimum value must greater than the + * excludedLowerBound value. e.g. prepare query string + * SELECT MIN(%s) FROM %s WHERE %s > ? + * + * @param jdbc JDBC connection. + * @param tableId table identity. + * @param splitColumn column. + * @param excludedLowerBound the minimum value should be greater than this value. + * @return minimum value. + */ + protected Object queryMin( + JdbcConnection jdbc, TableId tableId, Column splitColumn, Object excludedLowerBound) + throws SQLException { + return JdbcChunkUtils.queryMin( + jdbc, + jdbc.quotedTableIdString(tableId), + jdbc.quotedColumnIdString(splitColumn.name()), + excludedLowerBound); + } + + /** + * convert dbz column to Flink row type. + * + * @param splitColumn split column. + * @return flink row type. + */ + private RowType getSplitType(Column splitColumn) { + return (RowType) ROW(FIELD(splitColumn.name(), fromDbzColumn(splitColumn))).getLogicalType(); + } + + /** + * We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using + * evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request + * many queries and is not efficient. + */ + private List splitTableIntoChunks( + JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException { + final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn); + final Object min = minMax[0]; + final Object max = minMax[1]; + if (min == null || max == null || min.equals(max)) { + // empty table, or only one row, return full table scan as a chunk + return Collections.singletonList(ChunkRange.all()); } - /** - * Get the column which is seen as chunk key. - * - * @param table table identity. - * @param chunkKeyColumn column name which is seen as chunk key, if chunkKeyColumn is null, use - * primary key instead. @Column the column which is seen as chunk key. - */ - protected Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) { - return JdbcChunkUtils.getSplitColumn(table, chunkKeyColumn); + final int chunkSize = sourceConfig.getSplitSize(); + final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); + final double distributionFactorLower = sourceConfig.getDistributionFactorLower(); + + if (isEvenlySplitColumn(splitColumn)) { + long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId); + double distributionFactor = calculateDistributionFactor(tableId, min, max, approximateRowCnt); + + boolean dataIsEvenlyDistributed = + doubleCompare(distributionFactor, distributionFactorLower) >= 0 + && doubleCompare(distributionFactor, distributionFactorUpper) <= 0; + + if (dataIsEvenlyDistributed) { + // the minimum dynamic chunk size is at least 1 + final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); + return splitEvenlySizedChunks( + tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); + } else { + return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); + } + } else { + return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); } - - /** ChunkEnd less than or equal to max. */ - protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) { - return ObjectUtils.compare(chunkEnd, max) <= 0; - } - - /** ChunkEnd greater than or equal to max. */ - protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) { - return ObjectUtils.compare(chunkEnd, max) >= 0; - } - - /** - * Query the maximum and minimum value of the column in the table. e.g. query string - * SELECT MIN(%s) FROM %s WHERE %s > ? - * - * @param jdbc JDBC connection. - * @param tableId table identity. - * @param splitColumn column. - * @return maximum and minimum value. - */ - protected Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column splitColumn) - throws SQLException { - return JdbcChunkUtils.queryMinMax( - jdbc, - jdbc.quotedTableIdString(tableId), - jdbc.quotedColumnIdString(splitColumn.name())); + } + + /** + * Split table into evenly sized chunks based on the numeric min and max value of split column, + * and tumble chunks in step size. + */ + private List splitEvenlySizedChunks( + TableId tableId, + Object min, + Object max, + long approximateRowCnt, + int chunkSize, + int dynamicChunkSize) { + LOG.info( + "Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}", + tableId, + approximateRowCnt, + chunkSize, + dynamicChunkSize); + if (approximateRowCnt <= chunkSize) { + // there is no more than one chunk, return full table as a chunk + return Collections.singletonList(ChunkRange.all()); } - /** - * Query the minimum value of the column in the table, and the minimum value must greater than - * the excludedLowerBound value. e.g. prepare query string - * SELECT MIN(%s) FROM %s WHERE %s > ? - * - * @param jdbc JDBC connection. - * @param tableId table identity. - * @param splitColumn column. - * @param excludedLowerBound the minimum value should be greater than this value. - * @return minimum value. - */ - protected Object queryMin( - JdbcConnection jdbc, TableId tableId, Column splitColumn, Object excludedLowerBound) - throws SQLException { - return JdbcChunkUtils.queryMin( - jdbc, - jdbc.quotedColumnIdString(splitColumn.name()), - jdbc.quotedTableIdString(tableId), - excludedLowerBound); + final List splits = new ArrayList<>(); + Object chunkStart = null; + Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize); + while (ObjectUtils.compare(chunkEnd, max) <= 0) { + splits.add(ChunkRange.of(chunkStart, chunkEnd)); + chunkStart = chunkEnd; + try { + chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize); + } catch (ArithmeticException e) { + // Stop chunk split to avoid dead loop when number overflows. + break; + } } - - /** - * convert dbz column to Flink row type. - * - * @param splitColumn split column. - * @return flink row type. - */ - private RowType getSplitType(Column splitColumn) { - return (RowType) - ROW(FIELD(splitColumn.name(), fromDbzColumn(splitColumn))).getLogicalType(); + // add the ending split + splits.add(ChunkRange.of(chunkStart, null)); + return splits; + } + + /** Split table into unevenly sized chunks by continuously calculating next chunk max value. */ + private List splitUnevenlySizedChunks( + JdbcConnection jdbc, + TableId tableId, + Column splitColumn, + Object min, + Object max, + int chunkSize) + throws SQLException { + LOG.info("Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize); + final List splits = new ArrayList<>(); + Object chunkStart = null; + Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize); + int count = 0; + while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max, splitColumn)) { + // we start from [null, min + chunk_size) and avoid [null, min) + splits.add(ChunkRange.of(chunkStart, chunkEnd)); + // may sleep a while to avoid DDOS on PostgreSQL server + maySleep(count++, tableId); + chunkStart = chunkEnd; + chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize); } - - /** - * We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using - * evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request - * many queries and is not efficient. - */ - private List splitTableIntoChunks( - JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException { - final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn); - final Object min = minMax[0]; - final Object max = minMax[1]; - if (min == null || max == null || min.equals(max)) { - // empty table, or only one row, return full table scan as a chunk - return Collections.singletonList(ChunkRange.all()); - } - - final int chunkSize = sourceConfig.getSplitSize(); - final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); - final double distributionFactorLower = sourceConfig.getDistributionFactorLower(); - - if (isEvenlySplitColumn(splitColumn)) { - long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId); - double distributionFactor = - calculateDistributionFactor(tableId, min, max, approximateRowCnt); - - boolean dataIsEvenlyDistributed = - doubleCompare(distributionFactor, distributionFactorLower) >= 0 - && doubleCompare(distributionFactor, distributionFactorUpper) <= 0; - - if (dataIsEvenlyDistributed) { - // the minimum dynamic chunk size is at least 1 - final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); - return splitEvenlySizedChunks( - tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); - } else { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); - } - } else { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); - } + // add the ending split + splits.add(ChunkRange.of(chunkStart, null)); + return splits; + } + + private Object nextChunkEnd( + JdbcConnection jdbc, + Object previousChunkEnd, + TableId tableId, + Column splitColumn, + Object max, + int chunkSize) + throws SQLException { + // chunk end might be null when max values are removed + Object chunkEnd = queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd); + if (Objects.equals(previousChunkEnd, chunkEnd)) { + // we don't allow equal chunk start and end, + // should query the next one larger than chunkEnd + chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd); } - - /** - * Split table into evenly sized chunks based on the numeric min and max value of split column, - * and tumble chunks in step size. - */ - private List splitEvenlySizedChunks( - TableId tableId, - Object min, - Object max, - long approximateRowCnt, - int chunkSize, - int dynamicChunkSize) { - LOG.info( - "Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}", - tableId, - approximateRowCnt, - chunkSize, - dynamicChunkSize); - if (approximateRowCnt <= chunkSize) { - // there is no more than one chunk, return full table as a chunk - return Collections.singletonList(ChunkRange.all()); - } - - final List splits = new ArrayList<>(); - Object chunkStart = null; - Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize); - while (ObjectUtils.compare(chunkEnd, max) <= 0) { - splits.add(ChunkRange.of(chunkStart, chunkEnd)); - chunkStart = chunkEnd; - try { - chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize); - } catch (ArithmeticException e) { - // Stop chunk split to avoid dead loop when number overflows. - break; - } - } - // add the ending split - splits.add(ChunkRange.of(chunkStart, null)); - return splits; + if (isChunkEndGeMax(chunkEnd, max, splitColumn)) { + return null; + } else { + return chunkEnd; } - - /** Split table into unevenly sized chunks by continuously calculating next chunk max value. */ - private List splitUnevenlySizedChunks( - JdbcConnection jdbc, - TableId tableId, - Column splitColumn, - Object min, - Object max, - int chunkSize) - throws SQLException { - LOG.info( - "Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize); - final List splits = new ArrayList<>(); - Object chunkStart = null; - Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize); - int count = 0; - while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max, splitColumn)) { - // we start from [null, min + chunk_size) and avoid [null, min) - splits.add(ChunkRange.of(chunkStart, chunkEnd)); - // may sleep a while to avoid DDOS on PostgreSQL server - maySleep(count++, tableId); - chunkStart = chunkEnd; - chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize); - } - // add the ending split - splits.add(ChunkRange.of(chunkStart, null)); - return splits; - } - - private Object nextChunkEnd( - JdbcConnection jdbc, - Object previousChunkEnd, - TableId tableId, - Column splitColumn, - Object max, - int chunkSize) - throws SQLException { - // chunk end might be null when max values are removed - Object chunkEnd = - queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd); - if (Objects.equals(previousChunkEnd, chunkEnd)) { - // we don't allow equal chunk start and end, - // should query the next one larger than chunkEnd - chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd); - } - if (isChunkEndGeMax(chunkEnd, max, splitColumn)) { - return null; - } else { - return chunkEnd; - } - } - - private SnapshotSplit createSnapshotSplit( - JdbcConnection jdbc, - TableId tableId, - int chunkId, - RowType splitKeyType, - Object chunkStart, - Object chunkEnd) { - // currently, we only support single split column - Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart}; - Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd}; - Map schema = new HashMap<>(); - schema.put(tableId, dialect.queryTableSchema(jdbc, tableId)); - return new SnapshotSplit( - tableId, chunkId, splitKeyType, splitStart, splitEnd, null, schema); - } - - private void maySleep(int count, TableId tableId) { - // every 10 queries to sleep 0.1s - if (count % 10 == 0) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - // nothing to do - } - LOG.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId); - } + } + + private SnapshotSplit createSnapshotSplit( + JdbcConnection jdbc, + TableId tableId, + int chunkId, + RowType splitKeyType, + Object chunkStart, + Object chunkEnd) { + // currently, we only support single split column + Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart}; + Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd}; + Map schema = new HashMap<>(); + schema.put(tableId, dialect.queryTableSchema(jdbc, tableId)); + return new SnapshotSplit(tableId, chunkId, splitKeyType, splitStart, splitEnd, null, schema); + } + + private void maySleep(int count, TableId tableId) { + // every 10 queries to sleep 0.1s + if (count % 10 == 0) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // nothing to do + } + LOG.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId); } + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/pom.xml b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/pom.xml index 05b1e63ed63..1e8aaa25774 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/pom.xml +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/pom.xml @@ -150,6 +150,12 @@ limitations under the License. 1.8.0 test + + org.apache.flink + flink-cdc-base + 3.3-SNAPSHOT + compile + diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java index 9570f40ed23..798c42a6d42 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java @@ -242,7 +242,7 @@ protected void readChangeEvents() throws Exception { // use startTs of row as messageTs, use commitTs of row as fetchTs reportMetrics(committedRow.getStartTs(), committedRow.getCommitTs()); } catch (Exception e) { - e.printStackTrace(); + LOG.error("read change event error.", e); } } }); @@ -269,7 +269,14 @@ protected void flushRows(final long timestamp) throws Exception { final Cdcpb.Event.Row prewriteRow = prewrites.remove(RowKeyWithTs.ofStart(commitRow)); // if pull cdc event block when region split, cdc event will lose. - committedEvents.offer(prewriteRow); + try { + committedEvents.offer(prewriteRow); + } catch (NullPointerException e) { + LOG.error( + "Flush rows npe, remove pre writes error.flush row ts:{},commits {}", + timestamp, + commitRow); + } } } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBDialect.java new file mode 100644 index 00000000000..c1f3485b799 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBDialect.java @@ -0,0 +1,84 @@ +package org.apache.flink.cdc.connectors.tidb.source; + +import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; +import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect; +import org.apache.flink.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory; +import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkSplitter; +import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; +import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase; +import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask; +import org.apache.flink.cdc.connectors.tidb.source.schema.TiDBSchema; + +import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges; + +import java.util.List; +import java.util.Map; + +public class TiDBDialect implements JdbcDataSourceDialect { + private static final long serialVersionUID = 1L; + + private transient TiDBSchema tiDBSchema; + + @Override + public String getName() { + return "TiDB"; + } + + @Override + public Offset displayCurrentOffset(JdbcSourceConfig sourceConfig) { + return null; + } + + @Override + public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig sourceConfig) { + return false; + } + + @Override + public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) { + return null; + } + + @Override + public FetchTask.Context createFetchTaskContext(JdbcSourceConfig sourceConfig) { + return null; + } + + @Override + public boolean isIncludeDataCollection(JdbcSourceConfig sourceConfig, TableId tableId) { + return false; + } + + @Override + public List discoverDataCollections(JdbcSourceConfig sourceConfig) { + return null; + } + + @Override + public Map discoverDataCollectionSchemas( + JdbcSourceConfig sourceConfig) { + return null; + } + + @Override + public JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) { + return null; + } + + @Override + public JdbcConnectionPoolFactory getPooledDataSourceFactory() { + return null; + } + + @Override + public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) { + return null; + } + + @Override + public FetchTask createFetchTask(SourceSplitBase sourceSplitBase) { + return null; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBSourceBuilder.java new file mode 100644 index 00000000000..2585b0d9556 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBSourceBuilder.java @@ -0,0 +1,156 @@ +package org.apache.flink.cdc.connectors.tidb.source; + +import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfigFactory; +import org.apache.flink.cdc.connectors.base.options.StartupOptions; +import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; +import org.apache.flink.cdc.connectors.tidb.source.config.TiDBSourceConfigFactory; +import org.apache.flink.cdc.connectors.tidb.source.offset.LogMessageOffsetFactory; +import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; + +import java.time.Duration; +import java.util.Properties; + +import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull; + +public class TiDBSourceBuilder { + private final TiDBSourceConfigFactory configFactory = new TiDBSourceConfigFactory(); + private LogMessageOffsetFactory offsetFactory; + private DebeziumDeserializationSchema deserializer; + private TiDBDialect dialect; + + public TiDBSourceBuilder startupOptions(StartupOptions startupOptions) { + this.configFactory.startupOptions(startupOptions); + return this; + } + + public TiDBSourceBuilder hostname(String hostname) { + this.configFactory.hostname(hostname); + return this; + } + + public TiDBSourceBuilder port(int port) { + this.configFactory.port(port); + return this; + } + + public TiDBSourceBuilder driverClassName(String driverClassName) { + this.configFactory.driverClassName(driverClassName); + return this; + } + + public TiDBSourceBuilder databaseList(String... databaseList) { + this.configFactory.databaseList(databaseList); + return this; + } + + public TiDBSourceBuilder tableList(String... tableList) { + this.configFactory.tableList(tableList); + return this; + } + + public TiDBSourceBuilder username(String username) { + this.configFactory.username(username); + return this; + } + + public TiDBSourceBuilder password(String password) { + this.configFactory.password(password); + return this; + } + + public TiDBSourceBuilder debeziumProperties(Properties properties) { + this.configFactory.debeziumProperties(properties); + return this; + } + + public TiDBSourceBuilder tikvProperties(Properties properties) { + this.configFactory.tikvProperties(properties); + return this; + } + + public TiDBSourceBuilder serverTimeZone(String timeZone) { + this.configFactory.serverTimeZone(timeZone); + return this; + } + + public TiDBSourceBuilder connectTimeout(Duration connectTimeout) { + this.configFactory.connectTimeout(connectTimeout); + return this; + } + + public TiDBSourceBuilder connectionPoolSize(int connectionPoolSize) { + this.configFactory.connectionPoolSize(connectionPoolSize); + return this; + } + + public TiDBSourceBuilder connectMaxRetries(int connectMaxRetries) { + this.configFactory.connectMaxRetries(connectMaxRetries); + return this; + } + + public TiDBSourceBuilder chunkKeyColumn(String chunkKeyColumn) { + this.configFactory.chunkKeyColumn(chunkKeyColumn); + return this; + } + + /** + * The split size (number of rows) of table snapshot, captured tables are split into multiple + * splits when read the snapshot of table. + */ + public TiDBSourceBuilder splitSize(int splitSize) { + this.configFactory.splitSize(splitSize); + return this; + } + + /** The maximum fetch size for per poll when read table snapshot. */ + public TiDBSourceBuilder fetchSize(int fetchSize) { + this.configFactory.fetchSize(fetchSize); + return this; + } + + public TiDBSourceBuilder splitMetaGroupSize(int splitMetaGroupSize) { + this.configFactory.splitMetaGroupSize(splitMetaGroupSize); + return this; + } + + public TiDBSourceBuilder distributionFactorUpper(double distributionFactorUpper) { + this.configFactory.distributionFactorUpper(distributionFactorUpper); + return this; + } + + /** + * The lower bound of split key evenly distribution factor, the factor is used to determine + * whether the table is evenly distribution or not. + */ + public TiDBSourceBuilder distributionFactorLower(double distributionFactorLower) { + this.configFactory.distributionFactorLower(distributionFactorLower); + return this; + } + + public TiDBSourceBuilder scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) { + this.configFactory.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled); + return this; + } + + public TiDBSourceBuilder deserializer(DebeziumDeserializationSchema deserializer) { + this.deserializer = deserializer; + return this; + } + + public TiDBIncrementalSource build() { + this.offsetFactory = new LogMessageOffsetFactory(); + this.dialect = new TiDBDialect(); + return new TiDBIncrementalSource<>( + configFactory, checkNotNull(deserializer), offsetFactory, dialect); + } + + public static class TiDBIncrementalSource extends JdbcIncrementalSource { + public TiDBIncrementalSource( + JdbcSourceConfigFactory configFactory, + DebeziumDeserializationSchema deserializationSchema, + LogMessageOffsetFactory offsetFactory, + TiDBDialect dataSourceDialect) { + super(configFactory, deserializationSchema, offsetFactory, dataSourceDialect); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBConnectorConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBConnectorConfig.java new file mode 100644 index 00000000000..f1895695485 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBConnectorConfig.java @@ -0,0 +1,55 @@ +package org.apache.flink.cdc.connectors.tidb.source.config; + +import io.debezium.config.Configuration; +import io.debezium.connector.SourceInfoStructMaker; +import io.debezium.relational.ColumnFilterMode; +import io.debezium.relational.RelationalDatabaseConnectorConfig; +import io.debezium.relational.TableId; +import io.debezium.relational.Tables; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class TiDBConnectorConfig extends RelationalDatabaseConnectorConfig { + protected static final String LOGICAL_NAME = "tidb_cdc_connector"; + protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = Integer.MIN_VALUE; + // todo + protected static final List BUILT_IN_DB_NAMES = + Collections.unmodifiableList( + Arrays.asList("information_schema", "mysql", "tidb", "LBACSYS", "ORAAUDITOR")); + private final TiDBSourceConfig sourceConfig; + + public TiDBConnectorConfig(TiDBSourceConfig sourceConfig) { + // todo + super( + Configuration.from(sourceConfig.getDbzProperties()), + LOGICAL_NAME, + Tables.TableFilter.fromPredicate( + tableId -> + "mysql".equalsIgnoreCase(sourceConfig.getCompatibleMode()) + ? !BUILT_IN_DB_NAMES.contains(tableId.catalog()) + : !BUILT_IN_DB_NAMES.contains(tableId.schema())), + TableId::identifier, + DEFAULT_SNAPSHOT_FETCH_SIZE, + "mysql".equalsIgnoreCase(sourceConfig.getCompatibleMode()) + ? ColumnFilterMode.CATALOG + : ColumnFilterMode.SCHEMA); + this.sourceConfig = sourceConfig; + } + + @Override + public String getContextName() { + return "TiDB"; + } + + @Override + public String getConnectorName() { + return "TiDB"; + } + + @Override + protected SourceInfoStructMaker getSourceInfoStructMaker(Version version) { + return null; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfig.java new file mode 100644 index 00000000000..d4c03e720e1 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfig.java @@ -0,0 +1,94 @@ +package org.apache.flink.cdc.connectors.tidb.source.config; + +import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; +import org.apache.flink.cdc.connectors.base.options.StartupOptions; + +import io.debezium.config.Configuration; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; + +public class TiDBSourceConfig extends JdbcSourceConfig { + private static final long serialVersionUID = 1L; + private final String compatibleMode; + private final String pdAddresses; + + private final String hostMapping; + + public TiDBSourceConfig( + String compatibleMode, + StartupOptions startupOptions, + List databaseList, + List tableList, + String pdAddresses, + String hostMapping, + int splitSize, + int splitMetaGroupSize, + double distributionFactorUpper, + double distributionFactorLower, + boolean includeSchemaChanges, + boolean closeIdleReaders, + Properties dbzProperties, + Configuration dbzConfiguration, + String driverClassName, + String hostname, + int port, + String username, + String password, + int fetchSize, + String serverTimeZone, + Duration connectTimeout, + int connectMaxRetries, + int connectionPoolSize, + String chunkKeyColumn, + boolean skipSnapshotBackfill, + boolean isScanNewlyAddedTableEnabled) { + super( + startupOptions, + databaseList, + null, + tableList, + splitSize, + splitMetaGroupSize, + distributionFactorUpper, + distributionFactorLower, + includeSchemaChanges, + closeIdleReaders, + dbzProperties, + dbzConfiguration, + driverClassName, + hostname, + port, + username, + password, + fetchSize, + serverTimeZone, + connectTimeout, + connectMaxRetries, + connectionPoolSize, + chunkKeyColumn, + skipSnapshotBackfill, + isScanNewlyAddedTableEnabled); + this.compatibleMode = compatibleMode; + this.pdAddresses = pdAddresses; + this.hostMapping = hostMapping; + } + + public String getCompatibleMode() { + return compatibleMode; + } + + public String getPdAddresses() { + return pdAddresses; + } + + public String getHostMapping() { + return hostMapping; + } + + @Override + public TiDBConnectorConfig getDbzConnectorConfig() { + return new TiDBConnectorConfig(this); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfigFactory.java new file mode 100644 index 00000000000..5e5438f6699 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfigFactory.java @@ -0,0 +1,105 @@ +package org.apache.flink.cdc.connectors.tidb.source.config; + +import io.debezium.config.Configuration; +import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; +import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfigFactory; + +import java.util.Properties; + +import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull; +import static org.apache.flink.cdc.connectors.base.utils.EnvironmentUtils.checkSupportCheckpointsAfterTasksFinished; + +/** A factory to initialize {@link TiDBSourceConfig}. */ +@SuppressWarnings("UnusedReturnValue") +public class TiDBSourceConfigFactory extends JdbcSourceConfigFactory { + private static final long serialVersionUID = 1L; + private String compatibleMode; + private String driverClassName; + private String pdAddresses; + + private String hostMapping; + + private Properties tikvProperties; + + public JdbcSourceConfigFactory compatibleMode(String compatibleMode) { + this.compatibleMode = compatibleMode; + return this; + } + + public JdbcSourceConfigFactory driverClassName(String driverClassName) { + this.driverClassName = driverClassName; + return this; + } + + public JdbcSourceConfigFactory pdAddresses(String pdAddresses) { + this.pdAddresses = pdAddresses; + return this; + } + + public JdbcSourceConfigFactory hostMapping(String hostMapping) { + this.hostMapping = hostMapping; + return this; + } + + public JdbcSourceConfigFactory tikvProperties(Properties tikvProperties) { + this.tikvProperties = tikvProperties; + return this; + } + + @Override + public JdbcSourceConfig create(int subtask) { + checkSupportCheckpointsAfterTasksFinished(closeIdleReaders); + Properties props = new Properties(); + props.setProperty("database.server.name", "tidb_cdc"); + props.setProperty("database.hostname", checkNotNull(hostname)); + props.setProperty("database.port", String.valueOf(port)); + props.setProperty("database.user", checkNotNull(username)); + props.setProperty("database.password", checkNotNull(password)); + props.setProperty("database.dbname", checkNotNull(databaseList.get(0))); + props.setProperty("database.connect.timeout.ms", String.valueOf(connectTimeout.toMillis())); + + // table filter + // props.put("database.include.list", String.join(",", databaseList)); + if (tableList != null) { + props.put("table.include.list", String.join(",", tableList)); + } + // value converter + props.put("decimal.handling.mode", "precise"); + props.put("time.precision.mode", "adaptive_time_microseconds"); + props.put("binary.handling.mode", "bytes"); + + if (dbzProperties != null) { + props.putAll(dbzProperties); + } + + Configuration dbzConfiguration = Configuration.from(props); + return new TiDBSourceConfig( + compatibleMode, + startupOptions, + databaseList, + tableList, + pdAddresses, + hostMapping, + splitSize, + splitMetaGroupSize, + distributionFactorUpper, + distributionFactorLower, + includeSchemaChanges, + closeIdleReaders, + dbzProperties, + dbzConfiguration, + driverClassName, + hostname, + port, + username, + password, + fetchSize, + serverTimeZone, + connectTimeout, + connectMaxRetries, + connectionPoolSize, + chunkKeyColumn, + skipSnapshotBackfill, + scanNewlyAddedTableEnabled); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceOptions.java new file mode 100644 index 00000000000..e4122c88896 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceOptions.java @@ -0,0 +1,3 @@ +package org.apache.flink.cdc.connectors.tidb.source.config; + +public class TiDBSourceOptions {} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/enumetator/TiDBSourceEnumerator.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/enumetator/TiDBSourceEnumerator.java new file mode 100644 index 00000000000..69a0ae592b2 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/enumetator/TiDBSourceEnumerator.java @@ -0,0 +1,3 @@ +package org.apache.flink.cdc.connectors.tidb.source.enumetator; + +public class TiDBSourceEnumerator {} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/offset/LogMessageOffsetFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/offset/LogMessageOffsetFactory.java new file mode 100644 index 00000000000..c733630858d --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/offset/LogMessageOffsetFactory.java @@ -0,0 +1,39 @@ +package org.apache.flink.cdc.connectors.tidb.source.offset; + +import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; +import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory; + +import java.util.Map; + +public class LogMessageOffsetFactory extends OffsetFactory { + + @Override + public Offset newOffset(Map offset) { + return null; + } + + @Override + public Offset newOffset(String filename, Long position) { + return null; + } + + @Override + public Offset newOffset(Long position) { + return null; + } + + @Override + public Offset createTimestampOffset(long timestampMillis) { + return null; + } + + @Override + public Offset createInitialOffset() { + return null; + } + + @Override + public Offset createNoStoppingOffset() { + return null; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/schema/TiDBSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/schema/TiDBSchema.java new file mode 100644 index 00000000000..d91486002e1 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/schema/TiDBSchema.java @@ -0,0 +1,14 @@ +package org.apache.flink.cdc.connectors.tidb.source.schema; + +import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges; + +import java.util.Map; + +public class TiDBSchema { + private final Map schemasByTableId; + + public TiDBSchema(Map schemasByTableId) { + this.schemasByTableId = schemasByTableId; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/splitter/TiDBChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/splitter/TiDBChunkSplitter.java new file mode 100644 index 00000000000..1718a7ce78d --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/splitter/TiDBChunkSplitter.java @@ -0,0 +1,38 @@ +package org.apache.flink.cdc.connectors.tidb.source.splitter; + +import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.Column; +import io.debezium.relational.TableId; +import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; +import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect; +import org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter; +import org.apache.flink.table.types.DataType; + +import java.sql.SQLException; + +public class TiDBChunkSplitter extends JdbcSourceChunkSplitter { + public TiDBChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) { + super(sourceConfig, dialect); + } + + @Override + protected Object queryNextChunkMax( + JdbcConnection jdbc, + TableId tableId, + Column splitColumn, + int chunkSize, + Object includedLowerBound) + throws SQLException { + return null; + } + + @Override + protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException { + return null; + } + + @Override + protected DataType fromDbzColumn(Column splitColumn) { + return null; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/cdc/CDCClient.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/cdc/CDCClient.java index c62fe0c99af..9fc50de64bb 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/cdc/CDCClient.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/cdc/CDCClient.java @@ -89,7 +89,7 @@ public CDCClient(final TiSession session, final KeyRange keyRange, final CDCConf try { eventsBuffer.put(event); } catch (InterruptedException e) { - e.printStackTrace(); + LOGGER.error("Events buffer put error!", e); } }; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/cdc/RegionCDCClient.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/cdc/RegionCDCClient.java index be7f8e7fe15..d4d6cf113ff 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/cdc/RegionCDCClient.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/cdc/RegionCDCClient.java @@ -207,12 +207,12 @@ public void onError(final Throwable error) { private void onError(final Throwable error, long resolvedTs) { LOGGER.error( - "region CDC error: region: {}, resolvedTs:{}, error: {}", + "RegionCDC on error: region: {}, resolvedTs:{}, error: {}", region.getId(), resolvedTs, error); - running.set(false); - eventConsumer.accept(CDCEvent.error(region.getId(), error, resolvedTs)); + // running.set(false); + // eventConsumer.accept(CDCEvent.error(region.getId(), error, resolvedTs)); } @Override @@ -231,12 +231,13 @@ public void onNext(final ChangeDataEvent event) { if (event.hasResolvedTs()) { final ResolvedTs resolvedTs = event.getResolvedTs(); this.resolvedTs = resolvedTs.getTs(); - if (resolvedTs.getRegionsList().indexOf(region.getId()) >= 0) { + if (resolvedTs.getRegionsList().contains(region.getId())) { submitEvent(CDCEvent.resolvedTsEvent(region.getId(), resolvedTs.getTs())); } } } } catch (final Exception e) { + LOGGER.error("Region CDC Client error:", e); onError(e, resolvedTs); } } @@ -248,9 +249,14 @@ private void onErrorEventHandle(final ChangeDataEvent event) { .filter(errEvent -> errEvent.hasError()) .collect(Collectors.toList()); if (errorEvents != null && errorEvents.size() > 0) { + errorEvents.forEach( + e -> { + LOGGER.error("RegionCDC on error event handle :{}.", e); + }); onError( new RuntimeException( - "regionCDC error:" + errorEvents.get(0).getError().toString()), + "RegionCDC on error event handle:" + + errorEvents.get(0).getError().toString()), this.resolvedTs); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/common/util/ChannelFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/common/util/ChannelFactory.java index b415343c571..90ba7f0126e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/common/util/ChannelFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/common/util/ChannelFactory.java @@ -159,28 +159,30 @@ public ManagedChannel getChannel(String addressStr, HostMapping hostMapping) { // So a coarse grain lock is ok here NettyChannelBuilder builder = NettyChannelBuilder.forAddress( - mappedAddr.getHost(), mappedAddr.getPort()) - .maxInboundMessageSize(maxFrameSize) - .keepAliveTime(keepaliveTime, TimeUnit.SECONDS) - .keepAliveTimeout(keepaliveTimeout, TimeUnit.SECONDS) - .keepAliveWithoutCalls(true) - .idleTimeout(idleTimeout, TimeUnit.SECONDS); + mappedAddr.getHost(), mappedAddr.getPort()); - if (sslContextBuilder == null) { - return builder.usePlaintext().build(); - } else { - SslContext sslContext = null; - try { - sslContext = sslContextBuilder.build(); - } catch (SSLException e) { - logger.error("create ssl context failed!", e); - return null; + builder.maxInboundMessageSize(maxFrameSize) + // .negotiationType(NegotiationType.TLS) + .enableRetry() + .keepAliveTime(keepaliveTime, TimeUnit.SECONDS) + .keepAliveTimeout(keepaliveTimeout, TimeUnit.SECONDS) + .keepAliveWithoutCalls(true) + .idleTimeout(idleTimeout, TimeUnit.SECONDS); + try { + if (sslContextBuilder == null) { + return builder.usePlaintext().build(); + } else { + SslContext sslContext = sslContextBuilder.build(); + return builder.sslContext(sslContext).build(); } - return builder.sslContext(sslContext).build(); + } catch (SSLException e) { + logger.error("create ssl context failed!", e); + return null; } }); } + @Override public void close() { for (ManagedChannel ch : connPool.values()) { ch.shutdown(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/TiDBTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/TiDBTestBase.java index 7c32539e4d9..e43bcea0d6d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/TiDBTestBase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/TiDBTestBase.java @@ -20,7 +20,6 @@ import org.apache.flink.test.util.AbstractTestBase; import com.alibaba.dcm.DnsCacheManipulator; -import org.apache.commons.lang3.RandomUtils; import org.awaitility.Awaitility; import org.awaitility.core.ConditionTimeoutException; import org.junit.AfterClass; @@ -67,13 +66,13 @@ public class TiDBTestBase extends AbstractTestBase { public static final int TIDB_PORT = 4000; public static final int TIKV_PORT_ORIGIN = 20160; public static final int PD_PORT_ORIGIN = 2379; - public static int pdPort = PD_PORT_ORIGIN + RandomUtils.nextInt(0, 1000); + public static int pdPort = PD_PORT_ORIGIN + 10; @ClassRule public static final Network NETWORK = Network.newNetwork(); @ClassRule public static final GenericContainer PD = - new FixedHostPortGenericContainer<>("pingcap/pd:v6.1.0") + new FixedHostPortGenericContainer<>("pingcap/pd:v6.5.3") .withFileSystemBind("src/test/resources/config/pd.toml", "/pd.toml") .withFixedExposedPort(pdPort, PD_PORT_ORIGIN) .withCommand( @@ -93,7 +92,7 @@ public class TiDBTestBase extends AbstractTestBase { @ClassRule public static final GenericContainer TIKV = - new FixedHostPortGenericContainer<>("pingcap/tikv:v6.1.0") + new FixedHostPortGenericContainer<>("pingcap/tikv:v6.5.3") .withFixedExposedPort(TIKV_PORT_ORIGIN, TIKV_PORT_ORIGIN) .withFileSystemBind("src/test/resources/config/tikv.toml", "/tikv.toml") .withCommand( @@ -111,7 +110,7 @@ public class TiDBTestBase extends AbstractTestBase { @ClassRule public static final GenericContainer TIDB = - new GenericContainer<>("pingcap/tidb:v6.1.0") + new GenericContainer<>("pingcap/tidb:v6.5.3") .withExposedPorts(TIDB_PORT) .withFileSystemBind("src/test/resources/config/tidb.toml", "/tidb.toml") .withCommand( @@ -137,9 +136,9 @@ public static void startContainers() throws Exception { @AfterClass public static void stopContainers() { - DnsCacheManipulator.removeDnsCache(PD_SERVICE_NAME); - DnsCacheManipulator.removeDnsCache(TIKV_SERVICE_NAME); - Stream.of(TIKV, PD, TIDB).forEach(GenericContainer::stop); + // DnsCacheManipulator.removeDnsCache(PD_SERVICE_NAME); + // DnsCacheManipulator.removeDnsCache(TIKV_SERVICE_NAME); + // Stream.of(TIKV, PD, TIDB).forEach(GenericContainer::stop); } public String getJdbcUrl(String databaseName) {