From 2cf673ee972249e935a0d971fb40c4327dad8724 Mon Sep 17 00:00:00 2001 From: ouyangwulink Date: Mon, 13 May 2024 19:24:54 +0800 Subject: [PATCH 01/10] [FLINK-35337][cdc] Keep up with the latest version of tikv client --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index fdef5e72aac..d14a2e7df45 100644 --- a/pom.xml +++ b/pom.xml @@ -74,7 +74,7 @@ limitations under the License. 1.19 17.0 1.9.8.Final - 3.2.0 + 6.5.4 2.2.0 1.18.3 1.3 From 2206f0fc538af517f37d2e1f8ca1614463920ac0 Mon Sep 17 00:00:00 2001 From: ouyangwulink Date: Fri, 30 Aug 2024 10:32:21 +0800 Subject: [PATCH 02/10] fixed tidb excepiton at hzbank. --- .../flink/cdc/connectors/tidb/TDBSourceOptions.java | 8 +++++--- .../flink/cdc/connectors/tidb/table/TiDBTableSource.java | 5 ++--- pom.xml | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TDBSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TDBSourceOptions.java index fc468e68512..a1a11d22a1d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TDBSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TDBSourceOptions.java @@ -22,11 +22,11 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; +import org.apache.commons.lang3.StringUtils; import org.tikv.common.ConfigUtils; import org.tikv.common.TiConfiguration; import java.util.Map; -import java.util.Optional; /** Configurations for {@link TiDBSource}. */ public class TDBSourceOptions { @@ -64,7 +64,7 @@ private TDBSourceOptions() {} .stringType() .noDefaultValue() .withDescription( - "TiKV cluster's host-mapping used to configure public IP and intranet IP mapping. When the TiKV cluster is running on the intranet, you can map a set of intranet IPs to public IPs for an outside Flink cluster to access. The format is {Intranet IP1}:{Public IP1};{Intranet IP2}:{Public IP2}, e.g. 192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9."); + "This is route map used to configure public IP and intranet IP mapping. When the TiDB cluster is running on the intranet, you can map a set of intranet IPs to public IPs for an outside Flink cluster to access. The format is {Intranet IP1}:{Public IP1};{Intranet IP2}:{Public IP2}, e.g. 192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9."); public static final ConfigOption TIKV_GRPC_TIMEOUT = ConfigOptions.key(ConfigUtils.TIKV_GRPC_TIMEOUT) .longType() @@ -94,7 +94,9 @@ public static TiConfiguration getTiConfiguration( final Configuration configuration = Configuration.fromMap(options); final TiConfiguration tiConf = TiConfiguration.createDefault(pdAddrsStr); - Optional.of(new UriHostMapping(hostMapping)).ifPresent(tiConf::setHostMapping); + if (StringUtils.isNotBlank(hostMapping)) { + tiConf.setHostMapping(new UriHostMapping(hostMapping)); + } configuration.getOptional(TIKV_GRPC_TIMEOUT).ifPresent(tiConf::setTimeout); configuration.getOptional(TIKV_GRPC_SCAN_TIMEOUT).ifPresent(tiConf::setScanTimeout); configuration diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiDBTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiDBTableSource.java index f9310462548..37188438454 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiDBTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiDBTableSource.java @@ -33,8 +33,6 @@ import org.tikv.common.TiConfiguration; -import javax.annotation.Nullable; - import java.util.Collections; import java.util.List; import java.util.Map; @@ -54,7 +52,8 @@ public class TiDBTableSource implements ScanTableSource, SupportsReadingMetadata private final String database; private final String tableName; private final String pdAddresses; - @Nullable private final String hostMapping; + + private final String hostMapping; private final StartupOptions startupOptions; private final Map options; diff --git a/pom.xml b/pom.xml index d14a2e7df45..fdef5e72aac 100644 --- a/pom.xml +++ b/pom.xml @@ -74,7 +74,7 @@ limitations under the License. 1.19 17.0 1.9.8.Final - 6.5.4 + 3.2.0 2.2.0 1.18.3 1.3 From 3e014a74b464b1c8a4a27b5e8bbcaae9001d249b Mon Sep 17 00:00:00 2001 From: ouyangwulink Date: Tue, 21 May 2024 22:22:04 +0800 Subject: [PATCH 03/10] [FLINK-35354] Add doc for host mapping feature --- .../flink/cdc/connectors/tidb/TDBSourceOptions.java | 8 +++----- .../flink/cdc/connectors/tidb/table/TiDBTableSource.java | 4 ++-- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TDBSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TDBSourceOptions.java index a1a11d22a1d..fc468e68512 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TDBSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TDBSourceOptions.java @@ -22,11 +22,11 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; -import org.apache.commons.lang3.StringUtils; import org.tikv.common.ConfigUtils; import org.tikv.common.TiConfiguration; import java.util.Map; +import java.util.Optional; /** Configurations for {@link TiDBSource}. */ public class TDBSourceOptions { @@ -64,7 +64,7 @@ private TDBSourceOptions() {} .stringType() .noDefaultValue() .withDescription( - "This is route map used to configure public IP and intranet IP mapping. When the TiDB cluster is running on the intranet, you can map a set of intranet IPs to public IPs for an outside Flink cluster to access. The format is {Intranet IP1}:{Public IP1};{Intranet IP2}:{Public IP2}, e.g. 192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9."); + "TiKV cluster's host-mapping used to configure public IP and intranet IP mapping. When the TiKV cluster is running on the intranet, you can map a set of intranet IPs to public IPs for an outside Flink cluster to access. The format is {Intranet IP1}:{Public IP1};{Intranet IP2}:{Public IP2}, e.g. 192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9."); public static final ConfigOption TIKV_GRPC_TIMEOUT = ConfigOptions.key(ConfigUtils.TIKV_GRPC_TIMEOUT) .longType() @@ -94,9 +94,7 @@ public static TiConfiguration getTiConfiguration( final Configuration configuration = Configuration.fromMap(options); final TiConfiguration tiConf = TiConfiguration.createDefault(pdAddrsStr); - if (StringUtils.isNotBlank(hostMapping)) { - tiConf.setHostMapping(new UriHostMapping(hostMapping)); - } + Optional.of(new UriHostMapping(hostMapping)).ifPresent(tiConf::setHostMapping); configuration.getOptional(TIKV_GRPC_TIMEOUT).ifPresent(tiConf::setTimeout); configuration.getOptional(TIKV_GRPC_SCAN_TIMEOUT).ifPresent(tiConf::setScanTimeout); configuration diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiDBTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiDBTableSource.java index 37188438454..5e89c1623e8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiDBTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiDBTableSource.java @@ -31,6 +31,7 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; +import com.sun.istack.internal.Nullable; import org.tikv.common.TiConfiguration; import java.util.Collections; @@ -52,8 +53,7 @@ public class TiDBTableSource implements ScanTableSource, SupportsReadingMetadata private final String database; private final String tableName; private final String pdAddresses; - - private final String hostMapping; + @Nullable private final String hostMapping; private final StartupOptions startupOptions; private final Map options; From 0a2cc0575b370a0a81a36353f75a555370044211 Mon Sep 17 00:00:00 2001 From: ouyangwulink Date: Wed, 22 May 2024 09:03:48 +0800 Subject: [PATCH 04/10] [FLINK-35354] fixed annotation import --- .../flink/cdc/connectors/tidb/table/TiDBTableSource.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiDBTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiDBTableSource.java index 5e89c1623e8..f9310462548 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiDBTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiDBTableSource.java @@ -31,9 +31,10 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; -import com.sun.istack.internal.Nullable; import org.tikv.common.TiConfiguration; +import javax.annotation.Nullable; + import java.util.Collections; import java.util.List; import java.util.Map; From 80d24d5da49c7b8843f81b5cb87543ead585c5a6 Mon Sep 17 00:00:00 2001 From: ouyangwulink Date: Tue, 9 Jul 2024 13:50:20 +0800 Subject: [PATCH 05/10] edit region close logic --- .idea/vcs.xml | 22 +- .../tidb/TiKVRichParallelSourceFunction.java | 2 +- .../src/main/java/org/tikv/cdc/CDCClient.java | 2 +- .../java/org/tikv/cdc/RegionCDCClient.java | 16 +- .../org/tikv/common/util/ChannelFactory.java | 34 +- .../cdc/connectors/tidb/TiDBTestBase.java | 15 +- .../cdc/connectors/tests/TiDBE2eITCase.java | 409 +++++++++--------- 7 files changed, 239 insertions(+), 261 deletions(-) diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 818d136a83f..234c8a603b8 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,25 +1,7 @@ - - - - + - + \ No newline at end of file diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java index 9570f40ed23..7f8b0b6fa61 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java @@ -242,7 +242,7 @@ protected void readChangeEvents() throws Exception { // use startTs of row as messageTs, use commitTs of row as fetchTs reportMetrics(committedRow.getStartTs(), committedRow.getCommitTs()); } catch (Exception e) { - e.printStackTrace(); + LOG.error("read change event error.", e); } } }); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/cdc/CDCClient.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/cdc/CDCClient.java index c62fe0c99af..9fc50de64bb 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/cdc/CDCClient.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/cdc/CDCClient.java @@ -89,7 +89,7 @@ public CDCClient(final TiSession session, final KeyRange keyRange, final CDCConf try { eventsBuffer.put(event); } catch (InterruptedException e) { - e.printStackTrace(); + LOGGER.error("Events buffer put error!", e); } }; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/cdc/RegionCDCClient.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/cdc/RegionCDCClient.java index be7f8e7fe15..d4d6cf113ff 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/cdc/RegionCDCClient.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/cdc/RegionCDCClient.java @@ -207,12 +207,12 @@ public void onError(final Throwable error) { private void onError(final Throwable error, long resolvedTs) { LOGGER.error( - "region CDC error: region: {}, resolvedTs:{}, error: {}", + "RegionCDC on error: region: {}, resolvedTs:{}, error: {}", region.getId(), resolvedTs, error); - running.set(false); - eventConsumer.accept(CDCEvent.error(region.getId(), error, resolvedTs)); + // running.set(false); + // eventConsumer.accept(CDCEvent.error(region.getId(), error, resolvedTs)); } @Override @@ -231,12 +231,13 @@ public void onNext(final ChangeDataEvent event) { if (event.hasResolvedTs()) { final ResolvedTs resolvedTs = event.getResolvedTs(); this.resolvedTs = resolvedTs.getTs(); - if (resolvedTs.getRegionsList().indexOf(region.getId()) >= 0) { + if (resolvedTs.getRegionsList().contains(region.getId())) { submitEvent(CDCEvent.resolvedTsEvent(region.getId(), resolvedTs.getTs())); } } } } catch (final Exception e) { + LOGGER.error("Region CDC Client error:", e); onError(e, resolvedTs); } } @@ -248,9 +249,14 @@ private void onErrorEventHandle(final ChangeDataEvent event) { .filter(errEvent -> errEvent.hasError()) .collect(Collectors.toList()); if (errorEvents != null && errorEvents.size() > 0) { + errorEvents.forEach( + e -> { + LOGGER.error("RegionCDC on error event handle :{}.", e); + }); onError( new RuntimeException( - "regionCDC error:" + errorEvents.get(0).getError().toString()), + "RegionCDC on error event handle:" + + errorEvents.get(0).getError().toString()), this.resolvedTs); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/common/util/ChannelFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/common/util/ChannelFactory.java index b415343c571..90ba7f0126e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/common/util/ChannelFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/tikv/common/util/ChannelFactory.java @@ -159,28 +159,30 @@ public ManagedChannel getChannel(String addressStr, HostMapping hostMapping) { // So a coarse grain lock is ok here NettyChannelBuilder builder = NettyChannelBuilder.forAddress( - mappedAddr.getHost(), mappedAddr.getPort()) - .maxInboundMessageSize(maxFrameSize) - .keepAliveTime(keepaliveTime, TimeUnit.SECONDS) - .keepAliveTimeout(keepaliveTimeout, TimeUnit.SECONDS) - .keepAliveWithoutCalls(true) - .idleTimeout(idleTimeout, TimeUnit.SECONDS); + mappedAddr.getHost(), mappedAddr.getPort()); - if (sslContextBuilder == null) { - return builder.usePlaintext().build(); - } else { - SslContext sslContext = null; - try { - sslContext = sslContextBuilder.build(); - } catch (SSLException e) { - logger.error("create ssl context failed!", e); - return null; + builder.maxInboundMessageSize(maxFrameSize) + // .negotiationType(NegotiationType.TLS) + .enableRetry() + .keepAliveTime(keepaliveTime, TimeUnit.SECONDS) + .keepAliveTimeout(keepaliveTimeout, TimeUnit.SECONDS) + .keepAliveWithoutCalls(true) + .idleTimeout(idleTimeout, TimeUnit.SECONDS); + try { + if (sslContextBuilder == null) { + return builder.usePlaintext().build(); + } else { + SslContext sslContext = sslContextBuilder.build(); + return builder.sslContext(sslContext).build(); } - return builder.sslContext(sslContext).build(); + } catch (SSLException e) { + logger.error("create ssl context failed!", e); + return null; } }); } + @Override public void close() { for (ManagedChannel ch : connPool.values()) { ch.shutdown(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/TiDBTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/TiDBTestBase.java index 7c32539e4d9..e43bcea0d6d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/TiDBTestBase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/TiDBTestBase.java @@ -20,7 +20,6 @@ import org.apache.flink.test.util.AbstractTestBase; import com.alibaba.dcm.DnsCacheManipulator; -import org.apache.commons.lang3.RandomUtils; import org.awaitility.Awaitility; import org.awaitility.core.ConditionTimeoutException; import org.junit.AfterClass; @@ -67,13 +66,13 @@ public class TiDBTestBase extends AbstractTestBase { public static final int TIDB_PORT = 4000; public static final int TIKV_PORT_ORIGIN = 20160; public static final int PD_PORT_ORIGIN = 2379; - public static int pdPort = PD_PORT_ORIGIN + RandomUtils.nextInt(0, 1000); + public static int pdPort = PD_PORT_ORIGIN + 10; @ClassRule public static final Network NETWORK = Network.newNetwork(); @ClassRule public static final GenericContainer PD = - new FixedHostPortGenericContainer<>("pingcap/pd:v6.1.0") + new FixedHostPortGenericContainer<>("pingcap/pd:v6.5.3") .withFileSystemBind("src/test/resources/config/pd.toml", "/pd.toml") .withFixedExposedPort(pdPort, PD_PORT_ORIGIN) .withCommand( @@ -93,7 +92,7 @@ public class TiDBTestBase extends AbstractTestBase { @ClassRule public static final GenericContainer TIKV = - new FixedHostPortGenericContainer<>("pingcap/tikv:v6.1.0") + new FixedHostPortGenericContainer<>("pingcap/tikv:v6.5.3") .withFixedExposedPort(TIKV_PORT_ORIGIN, TIKV_PORT_ORIGIN) .withFileSystemBind("src/test/resources/config/tikv.toml", "/tikv.toml") .withCommand( @@ -111,7 +110,7 @@ public class TiDBTestBase extends AbstractTestBase { @ClassRule public static final GenericContainer TIDB = - new GenericContainer<>("pingcap/tidb:v6.1.0") + new GenericContainer<>("pingcap/tidb:v6.5.3") .withExposedPorts(TIDB_PORT) .withFileSystemBind("src/test/resources/config/tidb.toml", "/tidb.toml") .withCommand( @@ -137,9 +136,9 @@ public static void startContainers() throws Exception { @AfterClass public static void stopContainers() { - DnsCacheManipulator.removeDnsCache(PD_SERVICE_NAME); - DnsCacheManipulator.removeDnsCache(TIKV_SERVICE_NAME); - Stream.of(TIKV, PD, TIDB).forEach(GenericContainer::stop); + // DnsCacheManipulator.removeDnsCache(PD_SERVICE_NAME); + // DnsCacheManipulator.removeDnsCache(TIKV_SERVICE_NAME); + // Stream.of(TIKV, PD, TIDB).forEach(GenericContainer::stop); } public String getJdbcUrl(String databaseName) { diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/TiDBE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/TiDBE2eITCase.java index 232d6f7b7b8..cf07ffa2106 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/TiDBE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/TiDBE2eITCase.java @@ -20,7 +20,6 @@ import org.apache.flink.cdc.common.test.utils.JdbcProxy; import org.apache.flink.cdc.common.test.utils.TestUtils; import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment; - import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -54,230 +53,220 @@ /** End-to-end tests for tidb-cdc connector uber jar. */ public class TiDBE2eITCase extends FlinkContainerTestEnvironment { - private static final Logger LOG = LoggerFactory.getLogger(TiDBE2eITCase.class); - private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); - - public static final String PD_SERVICE_NAME = "pd0"; - public static final String TIKV_SERVICE_NAME = "tikv0"; - public static final String TIDB_SERVICE_NAME = "tidb0"; + private static final Logger LOG = LoggerFactory.getLogger(TiDBE2eITCase.class); + private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); - public static final String TIDB_USER = "root"; - public static final String TIDB_PASSWORD = ""; + public static final String PD_SERVICE_NAME = "pd0"; + public static final String TIKV_SERVICE_NAME = "tikv0"; + public static final String TIDB_SERVICE_NAME = "tidb0"; - public static final int TIDB_PORT = 4000; - public static final int TIKV_PORT = 20160; - public static final int PD_PORT = 2379; + public static final String TIDB_USER = "root"; + public static final String TIDB_PASSWORD = ""; - private static final Path tidbCdcJar = TestUtils.getResource("tidb-cdc-connector.jar"); - private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + public static final int TIDB_PORT = 4000; + public static final int TIKV_PORT = 20160; + public static final int PD_PORT = 2379; - @ClassRule - public static final GenericContainer PD = - new GenericContainer<>("pingcap/pd:v6.1.0") - .withExposedPorts(PD_PORT) - .withFileSystemBind("src/test/resources/docker/tidb/pd.toml", "/pd.toml") - .withCommand( - "--name=pd0", - "--client-urls=http://0.0.0.0:2379", - "--peer-urls=http://0.0.0.0:2380", - "--advertise-client-urls=http://pd0:2379", - "--advertise-peer-urls=http://pd0:2380", - "--initial-cluster=pd0=http://pd0:2380", - "--data-dir=/data/pd0", - "--config=/pd.toml", - "--log-file=/logs/pd0.log") - .withNetwork(NETWORK) - .withNetworkMode("host") - .withNetworkAliases(PD_SERVICE_NAME) - .withStartupTimeout(Duration.ofSeconds(120)) - .withLogConsumer(new Slf4jLogConsumer(LOG)); + private static final Path tidbCdcJar = TestUtils.getResource("tidb-cdc-connector.jar"); + private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); - @ClassRule - public static final GenericContainer TIKV = - new GenericContainer<>("pingcap/tikv:v6.1.0") - .withExposedPorts(TIKV_PORT) - .withFileSystemBind("src/test/resources/docker/tidb/tikv.toml", "/tikv.toml") - .withCommand( - "--addr=0.0.0.0:20160", - "--advertise-addr=tikv0:20160", - "--data-dir=/data/tikv0", - "--pd=pd0:2379", - "--config=/tikv.toml", - "--log-file=/logs/tikv0.log") - .withNetwork(NETWORK) - .dependsOn(PD) - .withNetworkAliases(TIKV_SERVICE_NAME) - .withStartupTimeout(Duration.ofSeconds(120)) - .withLogConsumer(new Slf4jLogConsumer(LOG)); + @ClassRule + public static final GenericContainer PD = + new GenericContainer<>("pingcap/pd:v6.1.0") + .withExposedPorts(PD_PORT) + .withFileSystemBind("src/test/resources/docker/tidb/pd.toml", "/pd.toml") + .withCommand( + "--name=pd0", + "--client-urls=http://0.0.0.0:2379", + "--peer-urls=http://0.0.0.0:2380", + "--advertise-client-urls=http://pd0:2379", + "--advertise-peer-urls=http://pd0:2380", + "--initial-cluster=pd0=http://pd0:2380", + "--data-dir=/data/pd0", + "--config=/pd.toml", + "--log-file=/logs/pd0.log") + .withNetwork(NETWORK) + .withNetworkMode("host") + .withNetworkAliases(PD_SERVICE_NAME) + .withStartupTimeout(Duration.ofSeconds(120)) + .withLogConsumer(new Slf4jLogConsumer(LOG)); - @ClassRule - public static final GenericContainer TIDB = - new GenericContainer<>("pingcap/tidb:v6.1.0") - .withExposedPorts(TIDB_PORT) - .withFileSystemBind("src/test/resources/docker/tidb/tidb.toml", "/tidb.toml") - .withCommand( - "--store=tikv", - "--path=pd0:2379", - "--config=/tidb.toml", - "--advertise-address=tidb0") - .withNetwork(NETWORK) - .dependsOn(TIKV) - .withNetworkAliases(TIDB_SERVICE_NAME) - .withStartupTimeout(Duration.ofSeconds(120)) - .withLogConsumer(new Slf4jLogConsumer(LOG)); + @ClassRule + public static final GenericContainer TIKV = + new GenericContainer<>("pingcap/tikv:v6.1.0") + .withExposedPorts(TIKV_PORT) + .withFileSystemBind("src/test/resources/docker/tidb/tikv.toml", "/tikv.toml") + .withCommand( + "--addr=0.0.0.0:20160", + "--advertise-addr=tikv0:20160", + "--data-dir=/data/tikv0", + "--pd=pd0:2379", + "--config=/tikv.toml", + "--log-file=/logs/tikv0.log") + .withNetwork(NETWORK) + .dependsOn(PD) + .withNetworkAliases(TIKV_SERVICE_NAME) + .withStartupTimeout(Duration.ofSeconds(120)) + .withLogConsumer(new Slf4jLogConsumer(LOG)); - @Before - public void before() { - LOG.info("Starting containers..."); - Startables.deepStart(Stream.of(PD, TIKV, TIDB)).join(); - LOG.info("Containers are started."); - super.before(); - initializeTidbTable("tidb_inventory"); - } + @ClassRule + public static final GenericContainer TIDB = + new GenericContainer<>("pingcap/tidb:v6.1.0") + .withExposedPorts(TIDB_PORT) + .withFileSystemBind("src/test/resources/docker/tidb/tidb.toml", "/tidb.toml") + .withCommand( + "--store=tikv", "--path=pd0:2379", "--config=/tidb.toml", "--advertise-address=tidb0") + .withNetwork(NETWORK) + .dependsOn(TIKV) + .withNetworkAliases(TIDB_SERVICE_NAME) + .withStartupTimeout(Duration.ofSeconds(120)) + .withLogConsumer(new Slf4jLogConsumer(LOG)); - @After - public void after() { - LOG.info("Stopping containers..."); - Stream.of(TIDB, TIKV, PD).forEach(GenericContainer::stop); - log.info("Containers are stopped."); - super.after(); - } + @Before + public void before() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(PD, TIKV, TIDB)).join(); + LOG.info("Containers are started."); + super.before(); + initializeTidbTable("tidb_inventory"); + } - @Test - public void testTIDBCDC() throws Exception { - List sqlLines = - Arrays.asList( - "CREATE TABLE tidb_source (", - " `id` INT NOT NULL,", - " name STRING,", - " description STRING,", - " weight DECIMAL(20, 10),", - " PRIMARY KEY (`id`) NOT ENFORCED", - ") WITH (", - " 'connector' = 'tidb-cdc',", - " 'tikv.grpc.timeout_in_ms' = '20000',", - " 'pd-addresses' = '" + PD_SERVICE_NAME + ":" + PD_PORT + "',", - " 'database-name' = 'inventory',", - " 'table-name' = 'products'", - ");", - "CREATE TABLE products_sink (", - " `id` INT NOT NULL,", - " name STRING,", - " description STRING,", - " weight DECIMAL(10,3),", - " primary key (`id`) not enforced", - ") WITH (", - " 'connector' = 'jdbc',", - String.format( - " 'url' = 'jdbc:mysql://%s:3306/%s',", - INTER_CONTAINER_MYSQL_ALIAS, - mysqlInventoryDatabase.getDatabaseName()), - " 'table-name' = 'products_sink',", - " 'username' = '" + MYSQL_TEST_USER + "',", - " 'password' = '" + MYSQL_TEST_PASSWORD + "'", - ");", - "INSERT INTO products_sink", - "SELECT * FROM tidb_source;"); + @After + public void after() { + LOG.info("Stopping containers..."); + Stream.of(TIDB, TIKV, PD).forEach(GenericContainer::stop); + log.info("Containers are stopped."); + super.after(); + } - submitSQLJob(sqlLines, tidbCdcJar, jdbcJar, mysqlDriverJar); - waitUntilJobRunning(Duration.ofSeconds(30)); + @Test + public void testTIDBCDC() throws Exception { + List sqlLines = + Arrays.asList( + "CREATE TABLE tidb_source (", + " `id` INT NOT NULL,", + " name STRING,", + " description STRING,", + " weight DECIMAL(20, 10),", + " PRIMARY KEY (`id`) NOT ENFORCED", + ") WITH (", + " 'connector' = 'tidb-cdc',", + " 'tikv.grpc.timeout_in_ms' = '20000',", + " 'pd-addresses' = '" + PD_SERVICE_NAME + ":" + PD_PORT + "',", + " 'database-name' = 'inventory',", + " 'table-name' = 'products'", + ");", + "CREATE TABLE products_sink (", + " `id` INT NOT NULL,", + " name STRING,", + " description STRING,", + " weight DECIMAL(10,3),", + " primary key (`id`) not enforced", + ") WITH (", + " 'connector' = 'jdbc',", + String.format( + " 'url' = 'jdbc:mysql://%s:3306/%s',", + INTER_CONTAINER_MYSQL_ALIAS, mysqlInventoryDatabase.getDatabaseName()), + " 'table-name' = 'products_sink',", + " 'username' = '" + MYSQL_TEST_USER + "',", + " 'password' = '" + MYSQL_TEST_PASSWORD + "'", + ");", + "INSERT INTO products_sink", + "SELECT * FROM tidb_source;"); - // generate binlogs - try (Connection connection = getTidbJdbcConnection("inventory"); - Statement statement = connection.createStatement()) { - statement.execute( - "UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); - statement.execute("UPDATE products SET weight='5.1' WHERE id=107;"); - statement.execute( - "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 - statement.execute( - "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); - statement.execute( - "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); - statement.execute("UPDATE products SET weight='5.17' WHERE id=111;"); - statement.execute("DELETE FROM products WHERE id=111;"); + submitSQLJob(sqlLines, tidbCdcJar, jdbcJar, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); - ResultSet resultSet = statement.executeQuery("SELECT count(1) FROM products"); - int recordCount = 0; - while (resultSet.next()) { - recordCount = resultSet.getInt(1); - } - assertEquals(recordCount, 10); - } catch (SQLException e) { - LOG.error("Update table for CDC failed.", e); - throw e; - } + // generate binlogs + try (Connection connection = getTidbJdbcConnection("inventory"); + Statement statement = connection.createStatement()) { + statement.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); + statement.execute("UPDATE products SET weight='5.1' WHERE id=107;"); + statement.execute( + "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 + statement.execute( + "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); + statement.execute( + "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + statement.execute("UPDATE products SET weight='5.17' WHERE id=111;"); + statement.execute("DELETE FROM products WHERE id=111;"); - // assert final results - String mysqlJdbcUrl = - String.format( - "jdbc:mysql://%s:%s/%s", - MYSQL.getHost(), - MYSQL.getDatabasePort(), - mysqlInventoryDatabase.getDatabaseName()); - JdbcProxy proxy = - new JdbcProxy( - mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, MYSQL_DRIVER_CLASS); - List expectResult = - Arrays.asList( - "101,scooter,Small 2-wheel scooter,3.14", - "102,car battery,12V car battery,8.1", - "103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8", - "104,hammer,12oz carpenter's hammer,0.75", - "105,hammer,14oz carpenter's hammer,0.875", - "106,hammer,18oz carpenter hammer,1.0", - "107,rocks,box of assorted rocks,5.1", - "108,jacket,water resistent black wind breaker,0.1", - "109,spare tire,24 inch spare tire,22.2", - "110,jacket,new water resistent white wind breaker,0.5"); - proxy.checkResultWithTimeout( - expectResult, - "products_sink", - new String[] {"id", "name", "description", "weight"}, - 360000L); + ResultSet resultSet = statement.executeQuery("SELECT count(1) FROM products"); + int recordCount = 0; + while (resultSet.next()) { + recordCount = resultSet.getInt(1); + } + assertEquals(recordCount, 10); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; } - protected Connection getTidbJdbcConnection(String databaseName) throws SQLException { - return DriverManager.getConnection( - "jdbc:mysql://" - + TIDB.getContainerIpAddress() - + ":" - + TIDB.getMappedPort(TIDB_PORT) - + "/" - + databaseName, - TIDB_USER, - TIDB_PASSWORD); - } + // assert final results + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), MYSQL.getDatabasePort(), mysqlInventoryDatabase.getDatabaseName()); + JdbcProxy proxy = + new JdbcProxy(mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, MYSQL_DRIVER_CLASS); + List expectResult = + Arrays.asList( + "101,scooter,Small 2-wheel scooter,3.14", + "102,car battery,12V car battery,8.1", + "103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8", + "104,hammer,12oz carpenter's hammer,0.75", + "105,hammer,14oz carpenter's hammer,0.875", + "106,hammer,18oz carpenter hammer,1.0", + "107,rocks,box of assorted rocks,5.1", + "108,jacket,water resistent black wind breaker,0.1", + "109,spare tire,24 inch spare tire,22.2", + "110,jacket,new water resistent white wind breaker,0.5"); + proxy.checkResultWithTimeout( + expectResult, + "products_sink", + new String[] {"id", "name", "description", "weight"}, + 360000L); + } + + protected Connection getTidbJdbcConnection(String databaseName) throws SQLException { + return DriverManager.getConnection( + "jdbc:mysql://" + + TIDB.getContainerIpAddress() + + ":" + + TIDB.getMappedPort(TIDB_PORT) + + "/" + + databaseName, + TIDB_USER, + TIDB_PASSWORD); + } - /** - * Executes a JDBC statement using the default jdbc config without autocommitting the - * connection. - */ - protected void initializeTidbTable(String sqlFile) { - final String ddlFile = String.format("ddl/%s.sql", sqlFile); - final URL ddlTestFile = TiDBE2eITCase.class.getClassLoader().getResource(ddlFile); - assertNotNull("Cannot locate " + ddlFile, ddlTestFile); - try (Connection connection = getTidbJdbcConnection(""); - Statement statement = connection.createStatement()) { - final List statements = - Arrays.stream( - Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream() - .map(String::trim) - .filter(x -> !x.startsWith("--") && !x.isEmpty()) - .map( - x -> { - final Matcher m = - COMMENT_PATTERN.matcher(x); - return m.matches() ? m.group(1) : x; - }) - .collect(Collectors.joining("\n")) - .split(";")) - .collect(Collectors.toList()); - for (String stmt : statements) { - statement.execute(stmt); - } - } catch (Exception e) { - throw new RuntimeException(e); - } + /** + * Executes a JDBC statement using the default jdbc config without autocommitting the connection. + */ + protected void initializeTidbTable(String sqlFile) { + final String ddlFile = String.format("ddl/%s.sql", sqlFile); + final URL ddlTestFile = TiDBE2eITCase.class.getClassLoader().getResource(ddlFile); + assertNotNull("Cannot locate " + ddlFile, ddlTestFile); + try (Connection connection = getTidbJdbcConnection(""); + Statement statement = connection.createStatement()) { + final List statements = + Arrays.stream( + Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream() + .map(String::trim) + .filter(x -> !x.startsWith("--") && !x.isEmpty()) + .map( + x -> { + final Matcher m = COMMENT_PATTERN.matcher(x); + return m.matches() ? m.group(1) : x; + }) + .collect(Collectors.joining("\n")) + .split(";")) + .collect(Collectors.toList()); + for (String stmt : statements) { + statement.execute(stmt); + } + } catch (Exception e) { + throw new RuntimeException(e); } + } } From eea7d70967871a378b436dcd5e2c8eca0e19b7ea Mon Sep 17 00:00:00 2001 From: ouyangwulink Date: Fri, 16 Aug 2024 10:55:08 +0800 Subject: [PATCH 06/10] log flush rows error. --- .../tidb/TiKVRichParallelSourceFunction.java | 9 +- .../cdc/connectors/tests/TiDBE2eITCase.java | 409 +++++++++--------- 2 files changed, 218 insertions(+), 200 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java index 7f8b0b6fa61..798c42a6d42 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java @@ -269,7 +269,14 @@ protected void flushRows(final long timestamp) throws Exception { final Cdcpb.Event.Row prewriteRow = prewrites.remove(RowKeyWithTs.ofStart(commitRow)); // if pull cdc event block when region split, cdc event will lose. - committedEvents.offer(prewriteRow); + try { + committedEvents.offer(prewriteRow); + } catch (NullPointerException e) { + LOG.error( + "Flush rows npe, remove pre writes error.flush row ts:{},commits {}", + timestamp, + commitRow); + } } } } diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/TiDBE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/TiDBE2eITCase.java index cf07ffa2106..232d6f7b7b8 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/TiDBE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/TiDBE2eITCase.java @@ -20,6 +20,7 @@ import org.apache.flink.cdc.common.test.utils.JdbcProxy; import org.apache.flink.cdc.common.test.utils.TestUtils; import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment; + import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -53,220 +54,230 @@ /** End-to-end tests for tidb-cdc connector uber jar. */ public class TiDBE2eITCase extends FlinkContainerTestEnvironment { - private static final Logger LOG = LoggerFactory.getLogger(TiDBE2eITCase.class); - private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); + private static final Logger LOG = LoggerFactory.getLogger(TiDBE2eITCase.class); + private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); + + public static final String PD_SERVICE_NAME = "pd0"; + public static final String TIKV_SERVICE_NAME = "tikv0"; + public static final String TIDB_SERVICE_NAME = "tidb0"; - public static final String PD_SERVICE_NAME = "pd0"; - public static final String TIKV_SERVICE_NAME = "tikv0"; - public static final String TIDB_SERVICE_NAME = "tidb0"; + public static final String TIDB_USER = "root"; + public static final String TIDB_PASSWORD = ""; - public static final String TIDB_USER = "root"; - public static final String TIDB_PASSWORD = ""; + public static final int TIDB_PORT = 4000; + public static final int TIKV_PORT = 20160; + public static final int PD_PORT = 2379; - public static final int TIDB_PORT = 4000; - public static final int TIKV_PORT = 20160; - public static final int PD_PORT = 2379; + private static final Path tidbCdcJar = TestUtils.getResource("tidb-cdc-connector.jar"); + private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); - private static final Path tidbCdcJar = TestUtils.getResource("tidb-cdc-connector.jar"); - private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + @ClassRule + public static final GenericContainer PD = + new GenericContainer<>("pingcap/pd:v6.1.0") + .withExposedPorts(PD_PORT) + .withFileSystemBind("src/test/resources/docker/tidb/pd.toml", "/pd.toml") + .withCommand( + "--name=pd0", + "--client-urls=http://0.0.0.0:2379", + "--peer-urls=http://0.0.0.0:2380", + "--advertise-client-urls=http://pd0:2379", + "--advertise-peer-urls=http://pd0:2380", + "--initial-cluster=pd0=http://pd0:2380", + "--data-dir=/data/pd0", + "--config=/pd.toml", + "--log-file=/logs/pd0.log") + .withNetwork(NETWORK) + .withNetworkMode("host") + .withNetworkAliases(PD_SERVICE_NAME) + .withStartupTimeout(Duration.ofSeconds(120)) + .withLogConsumer(new Slf4jLogConsumer(LOG)); - @ClassRule - public static final GenericContainer PD = - new GenericContainer<>("pingcap/pd:v6.1.0") - .withExposedPorts(PD_PORT) - .withFileSystemBind("src/test/resources/docker/tidb/pd.toml", "/pd.toml") - .withCommand( - "--name=pd0", - "--client-urls=http://0.0.0.0:2379", - "--peer-urls=http://0.0.0.0:2380", - "--advertise-client-urls=http://pd0:2379", - "--advertise-peer-urls=http://pd0:2380", - "--initial-cluster=pd0=http://pd0:2380", - "--data-dir=/data/pd0", - "--config=/pd.toml", - "--log-file=/logs/pd0.log") - .withNetwork(NETWORK) - .withNetworkMode("host") - .withNetworkAliases(PD_SERVICE_NAME) - .withStartupTimeout(Duration.ofSeconds(120)) - .withLogConsumer(new Slf4jLogConsumer(LOG)); + @ClassRule + public static final GenericContainer TIKV = + new GenericContainer<>("pingcap/tikv:v6.1.0") + .withExposedPorts(TIKV_PORT) + .withFileSystemBind("src/test/resources/docker/tidb/tikv.toml", "/tikv.toml") + .withCommand( + "--addr=0.0.0.0:20160", + "--advertise-addr=tikv0:20160", + "--data-dir=/data/tikv0", + "--pd=pd0:2379", + "--config=/tikv.toml", + "--log-file=/logs/tikv0.log") + .withNetwork(NETWORK) + .dependsOn(PD) + .withNetworkAliases(TIKV_SERVICE_NAME) + .withStartupTimeout(Duration.ofSeconds(120)) + .withLogConsumer(new Slf4jLogConsumer(LOG)); - @ClassRule - public static final GenericContainer TIKV = - new GenericContainer<>("pingcap/tikv:v6.1.0") - .withExposedPorts(TIKV_PORT) - .withFileSystemBind("src/test/resources/docker/tidb/tikv.toml", "/tikv.toml") - .withCommand( - "--addr=0.0.0.0:20160", - "--advertise-addr=tikv0:20160", - "--data-dir=/data/tikv0", - "--pd=pd0:2379", - "--config=/tikv.toml", - "--log-file=/logs/tikv0.log") - .withNetwork(NETWORK) - .dependsOn(PD) - .withNetworkAliases(TIKV_SERVICE_NAME) - .withStartupTimeout(Duration.ofSeconds(120)) - .withLogConsumer(new Slf4jLogConsumer(LOG)); + @ClassRule + public static final GenericContainer TIDB = + new GenericContainer<>("pingcap/tidb:v6.1.0") + .withExposedPorts(TIDB_PORT) + .withFileSystemBind("src/test/resources/docker/tidb/tidb.toml", "/tidb.toml") + .withCommand( + "--store=tikv", + "--path=pd0:2379", + "--config=/tidb.toml", + "--advertise-address=tidb0") + .withNetwork(NETWORK) + .dependsOn(TIKV) + .withNetworkAliases(TIDB_SERVICE_NAME) + .withStartupTimeout(Duration.ofSeconds(120)) + .withLogConsumer(new Slf4jLogConsumer(LOG)); - @ClassRule - public static final GenericContainer TIDB = - new GenericContainer<>("pingcap/tidb:v6.1.0") - .withExposedPorts(TIDB_PORT) - .withFileSystemBind("src/test/resources/docker/tidb/tidb.toml", "/tidb.toml") - .withCommand( - "--store=tikv", "--path=pd0:2379", "--config=/tidb.toml", "--advertise-address=tidb0") - .withNetwork(NETWORK) - .dependsOn(TIKV) - .withNetworkAliases(TIDB_SERVICE_NAME) - .withStartupTimeout(Duration.ofSeconds(120)) - .withLogConsumer(new Slf4jLogConsumer(LOG)); + @Before + public void before() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(PD, TIKV, TIDB)).join(); + LOG.info("Containers are started."); + super.before(); + initializeTidbTable("tidb_inventory"); + } - @Before - public void before() { - LOG.info("Starting containers..."); - Startables.deepStart(Stream.of(PD, TIKV, TIDB)).join(); - LOG.info("Containers are started."); - super.before(); - initializeTidbTable("tidb_inventory"); - } + @After + public void after() { + LOG.info("Stopping containers..."); + Stream.of(TIDB, TIKV, PD).forEach(GenericContainer::stop); + log.info("Containers are stopped."); + super.after(); + } - @After - public void after() { - LOG.info("Stopping containers..."); - Stream.of(TIDB, TIKV, PD).forEach(GenericContainer::stop); - log.info("Containers are stopped."); - super.after(); - } + @Test + public void testTIDBCDC() throws Exception { + List sqlLines = + Arrays.asList( + "CREATE TABLE tidb_source (", + " `id` INT NOT NULL,", + " name STRING,", + " description STRING,", + " weight DECIMAL(20, 10),", + " PRIMARY KEY (`id`) NOT ENFORCED", + ") WITH (", + " 'connector' = 'tidb-cdc',", + " 'tikv.grpc.timeout_in_ms' = '20000',", + " 'pd-addresses' = '" + PD_SERVICE_NAME + ":" + PD_PORT + "',", + " 'database-name' = 'inventory',", + " 'table-name' = 'products'", + ");", + "CREATE TABLE products_sink (", + " `id` INT NOT NULL,", + " name STRING,", + " description STRING,", + " weight DECIMAL(10,3),", + " primary key (`id`) not enforced", + ") WITH (", + " 'connector' = 'jdbc',", + String.format( + " 'url' = 'jdbc:mysql://%s:3306/%s',", + INTER_CONTAINER_MYSQL_ALIAS, + mysqlInventoryDatabase.getDatabaseName()), + " 'table-name' = 'products_sink',", + " 'username' = '" + MYSQL_TEST_USER + "',", + " 'password' = '" + MYSQL_TEST_PASSWORD + "'", + ");", + "INSERT INTO products_sink", + "SELECT * FROM tidb_source;"); - @Test - public void testTIDBCDC() throws Exception { - List sqlLines = - Arrays.asList( - "CREATE TABLE tidb_source (", - " `id` INT NOT NULL,", - " name STRING,", - " description STRING,", - " weight DECIMAL(20, 10),", - " PRIMARY KEY (`id`) NOT ENFORCED", - ") WITH (", - " 'connector' = 'tidb-cdc',", - " 'tikv.grpc.timeout_in_ms' = '20000',", - " 'pd-addresses' = '" + PD_SERVICE_NAME + ":" + PD_PORT + "',", - " 'database-name' = 'inventory',", - " 'table-name' = 'products'", - ");", - "CREATE TABLE products_sink (", - " `id` INT NOT NULL,", - " name STRING,", - " description STRING,", - " weight DECIMAL(10,3),", - " primary key (`id`) not enforced", - ") WITH (", - " 'connector' = 'jdbc',", - String.format( - " 'url' = 'jdbc:mysql://%s:3306/%s',", - INTER_CONTAINER_MYSQL_ALIAS, mysqlInventoryDatabase.getDatabaseName()), - " 'table-name' = 'products_sink',", - " 'username' = '" + MYSQL_TEST_USER + "',", - " 'password' = '" + MYSQL_TEST_PASSWORD + "'", - ");", - "INSERT INTO products_sink", - "SELECT * FROM tidb_source;"); + submitSQLJob(sqlLines, tidbCdcJar, jdbcJar, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); - submitSQLJob(sqlLines, tidbCdcJar, jdbcJar, mysqlDriverJar); - waitUntilJobRunning(Duration.ofSeconds(30)); + // generate binlogs + try (Connection connection = getTidbJdbcConnection("inventory"); + Statement statement = connection.createStatement()) { + statement.execute( + "UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); + statement.execute("UPDATE products SET weight='5.1' WHERE id=107;"); + statement.execute( + "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 + statement.execute( + "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); + statement.execute( + "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + statement.execute("UPDATE products SET weight='5.17' WHERE id=111;"); + statement.execute("DELETE FROM products WHERE id=111;"); - // generate binlogs - try (Connection connection = getTidbJdbcConnection("inventory"); - Statement statement = connection.createStatement()) { - statement.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); - statement.execute("UPDATE products SET weight='5.1' WHERE id=107;"); - statement.execute( - "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 - statement.execute( - "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); - statement.execute( - "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); - statement.execute("UPDATE products SET weight='5.17' WHERE id=111;"); - statement.execute("DELETE FROM products WHERE id=111;"); + ResultSet resultSet = statement.executeQuery("SELECT count(1) FROM products"); + int recordCount = 0; + while (resultSet.next()) { + recordCount = resultSet.getInt(1); + } + assertEquals(recordCount, 10); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } - ResultSet resultSet = statement.executeQuery("SELECT count(1) FROM products"); - int recordCount = 0; - while (resultSet.next()) { - recordCount = resultSet.getInt(1); - } - assertEquals(recordCount, 10); - } catch (SQLException e) { - LOG.error("Update table for CDC failed.", e); - throw e; + // assert final results + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + mysqlInventoryDatabase.getDatabaseName()); + JdbcProxy proxy = + new JdbcProxy( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, MYSQL_DRIVER_CLASS); + List expectResult = + Arrays.asList( + "101,scooter,Small 2-wheel scooter,3.14", + "102,car battery,12V car battery,8.1", + "103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8", + "104,hammer,12oz carpenter's hammer,0.75", + "105,hammer,14oz carpenter's hammer,0.875", + "106,hammer,18oz carpenter hammer,1.0", + "107,rocks,box of assorted rocks,5.1", + "108,jacket,water resistent black wind breaker,0.1", + "109,spare tire,24 inch spare tire,22.2", + "110,jacket,new water resistent white wind breaker,0.5"); + proxy.checkResultWithTimeout( + expectResult, + "products_sink", + new String[] {"id", "name", "description", "weight"}, + 360000L); } - // assert final results - String mysqlJdbcUrl = - String.format( - "jdbc:mysql://%s:%s/%s", - MYSQL.getHost(), MYSQL.getDatabasePort(), mysqlInventoryDatabase.getDatabaseName()); - JdbcProxy proxy = - new JdbcProxy(mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, MYSQL_DRIVER_CLASS); - List expectResult = - Arrays.asList( - "101,scooter,Small 2-wheel scooter,3.14", - "102,car battery,12V car battery,8.1", - "103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8", - "104,hammer,12oz carpenter's hammer,0.75", - "105,hammer,14oz carpenter's hammer,0.875", - "106,hammer,18oz carpenter hammer,1.0", - "107,rocks,box of assorted rocks,5.1", - "108,jacket,water resistent black wind breaker,0.1", - "109,spare tire,24 inch spare tire,22.2", - "110,jacket,new water resistent white wind breaker,0.5"); - proxy.checkResultWithTimeout( - expectResult, - "products_sink", - new String[] {"id", "name", "description", "weight"}, - 360000L); - } - - protected Connection getTidbJdbcConnection(String databaseName) throws SQLException { - return DriverManager.getConnection( - "jdbc:mysql://" - + TIDB.getContainerIpAddress() - + ":" - + TIDB.getMappedPort(TIDB_PORT) - + "/" - + databaseName, - TIDB_USER, - TIDB_PASSWORD); - } + protected Connection getTidbJdbcConnection(String databaseName) throws SQLException { + return DriverManager.getConnection( + "jdbc:mysql://" + + TIDB.getContainerIpAddress() + + ":" + + TIDB.getMappedPort(TIDB_PORT) + + "/" + + databaseName, + TIDB_USER, + TIDB_PASSWORD); + } - /** - * Executes a JDBC statement using the default jdbc config without autocommitting the connection. - */ - protected void initializeTidbTable(String sqlFile) { - final String ddlFile = String.format("ddl/%s.sql", sqlFile); - final URL ddlTestFile = TiDBE2eITCase.class.getClassLoader().getResource(ddlFile); - assertNotNull("Cannot locate " + ddlFile, ddlTestFile); - try (Connection connection = getTidbJdbcConnection(""); - Statement statement = connection.createStatement()) { - final List statements = - Arrays.stream( - Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream() - .map(String::trim) - .filter(x -> !x.startsWith("--") && !x.isEmpty()) - .map( - x -> { - final Matcher m = COMMENT_PATTERN.matcher(x); - return m.matches() ? m.group(1) : x; - }) - .collect(Collectors.joining("\n")) - .split(";")) - .collect(Collectors.toList()); - for (String stmt : statements) { - statement.execute(stmt); - } - } catch (Exception e) { - throw new RuntimeException(e); + /** + * Executes a JDBC statement using the default jdbc config without autocommitting the + * connection. + */ + protected void initializeTidbTable(String sqlFile) { + final String ddlFile = String.format("ddl/%s.sql", sqlFile); + final URL ddlTestFile = TiDBE2eITCase.class.getClassLoader().getResource(ddlFile); + assertNotNull("Cannot locate " + ddlFile, ddlTestFile); + try (Connection connection = getTidbJdbcConnection(""); + Statement statement = connection.createStatement()) { + final List statements = + Arrays.stream( + Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream() + .map(String::trim) + .filter(x -> !x.startsWith("--") && !x.isEmpty()) + .map( + x -> { + final Matcher m = + COMMENT_PATTERN.matcher(x); + return m.matches() ? m.group(1) : x; + }) + .collect(Collectors.joining("\n")) + .split(";")) + .collect(Collectors.toList()); + for (String stmt : statements) { + statement.execute(stmt); + } + } catch (Exception e) { + throw new RuntimeException(e); + } } - } } From 9294b879421c15706283d288be1f56a4db1e0dc8 Mon Sep 17 00:00:00 2001 From: ouyangwulink Date: Mon, 28 Oct 2024 11:08:52 +0800 Subject: [PATCH 07/10] Init tidb --- .../cdc/connectors/tidb/source/config/TiDBSourceOptions.java | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceOptions.java diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceOptions.java new file mode 100644 index 00000000000..e4122c88896 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceOptions.java @@ -0,0 +1,3 @@ +package org.apache.flink.cdc.connectors.tidb.source.config; + +public class TiDBSourceOptions {} From ef38a4b0f2139926057443fd88517e992108b6b7 Mon Sep 17 00:00:00 2001 From: ouyangwulink Date: Mon, 28 Oct 2024 18:07:34 +0800 Subject: [PATCH 08/10] tidb incremental --- .../flink-connector-tidb-cdc/pom.xml | 6 ++ .../connectors/tidb/source/TiDBDialect.java | 84 ++++++++++++++++ .../source/config/TiDBConnectorConfig.java | 47 +++++++++ .../tidb/source/config/TiDBSourceConfig.java | 95 +++++++++++++++++++ .../enumetator/TiDBSourceEnumerator.java | 3 + .../tidb/source/schema/TiDBSchema.java | 14 +++ 6 files changed, 249 insertions(+) create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBDialect.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBConnectorConfig.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfig.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/enumetator/TiDBSourceEnumerator.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/schema/TiDBSchema.java diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/pom.xml b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/pom.xml index 05b1e63ed63..1e8aaa25774 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/pom.xml +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/pom.xml @@ -150,6 +150,12 @@ limitations under the License. 1.8.0 test + + org.apache.flink + flink-cdc-base + 3.3-SNAPSHOT + compile + diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBDialect.java new file mode 100644 index 00000000000..c1f3485b799 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBDialect.java @@ -0,0 +1,84 @@ +package org.apache.flink.cdc.connectors.tidb.source; + +import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; +import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect; +import org.apache.flink.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory; +import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkSplitter; +import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; +import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase; +import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask; +import org.apache.flink.cdc.connectors.tidb.source.schema.TiDBSchema; + +import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges; + +import java.util.List; +import java.util.Map; + +public class TiDBDialect implements JdbcDataSourceDialect { + private static final long serialVersionUID = 1L; + + private transient TiDBSchema tiDBSchema; + + @Override + public String getName() { + return "TiDB"; + } + + @Override + public Offset displayCurrentOffset(JdbcSourceConfig sourceConfig) { + return null; + } + + @Override + public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig sourceConfig) { + return false; + } + + @Override + public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) { + return null; + } + + @Override + public FetchTask.Context createFetchTaskContext(JdbcSourceConfig sourceConfig) { + return null; + } + + @Override + public boolean isIncludeDataCollection(JdbcSourceConfig sourceConfig, TableId tableId) { + return false; + } + + @Override + public List discoverDataCollections(JdbcSourceConfig sourceConfig) { + return null; + } + + @Override + public Map discoverDataCollectionSchemas( + JdbcSourceConfig sourceConfig) { + return null; + } + + @Override + public JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) { + return null; + } + + @Override + public JdbcConnectionPoolFactory getPooledDataSourceFactory() { + return null; + } + + @Override + public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) { + return null; + } + + @Override + public FetchTask createFetchTask(SourceSplitBase sourceSplitBase) { + return null; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBConnectorConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBConnectorConfig.java new file mode 100644 index 00000000000..d7f6dc7d606 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBConnectorConfig.java @@ -0,0 +1,47 @@ +package org.apache.flink.cdc.connectors.tidb.source.config; + +import io.debezium.config.Configuration; +import io.debezium.connector.SourceInfoStructMaker; +import io.debezium.relational.ColumnFilterMode; +import io.debezium.relational.RelationalDatabaseConnectorConfig; +import io.debezium.relational.Selectors; +import io.debezium.relational.Tables; + +public class TiDBConnectorConfig extends RelationalDatabaseConnectorConfig { + protected static final String LOGICAL_NAME = "tidb_cdc_connector"; + + public TiDBConnectorConfig( + Configuration config, + String logicalName, + Tables.TableFilter systemTablesFilter, + Selectors.TableIdToStringMapper tableIdMapper, + int defaultSnapshotFetchSize, + ColumnFilterMode columnFilterMode) { + super( + config, + logicalName, + systemTablesFilter, + tableIdMapper, + defaultSnapshotFetchSize, + columnFilterMode); + } + + public TiDBConnectorConfig(TiDBSourceConfig tiDBSourceConfig) { + this(null, null, null, null, 0, null); + } + + @Override + public String getContextName() { + return "TiDB"; + } + + @Override + public String getConnectorName() { + return "TiDB"; + } + + @Override + protected SourceInfoStructMaker getSourceInfoStructMaker(Version version) { + return null; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfig.java new file mode 100644 index 00000000000..9438c193271 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfig.java @@ -0,0 +1,95 @@ +package org.apache.flink.cdc.connectors.tidb.source.config; + +import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; +import org.apache.flink.cdc.connectors.base.options.StartupOptions; + +import io.debezium.config.Configuration; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; + +public class TiDBSourceConfig extends JdbcSourceConfig { + private static final long serialVersionUID = 1L; + private final String compatibleMode; + private final String pdAddresses; + + private final String hostMapping; + + public TiDBSourceConfig( + String compatibleMode, + StartupOptions startupOptions, + List databaseList, + List schemaList, + List tableList, + String pdAddresses, + String hostMapping, + int splitSize, + int splitMetaGroupSize, + double distributionFactorUpper, + double distributionFactorLower, + boolean includeSchemaChanges, + boolean closeIdleReaders, + Properties dbzProperties, + Configuration dbzConfiguration, + String driverClassName, + String hostname, + int port, + String username, + String password, + int fetchSize, + String serverTimeZone, + Duration connectTimeout, + int connectMaxRetries, + int connectionPoolSize, + String chunkKeyColumn, + boolean skipSnapshotBackfill, + boolean isScanNewlyAddedTableEnabled) { + super( + startupOptions, + databaseList, + schemaList, + tableList, + splitSize, + splitMetaGroupSize, + distributionFactorUpper, + distributionFactorLower, + includeSchemaChanges, + closeIdleReaders, + dbzProperties, + dbzConfiguration, + driverClassName, + hostname, + port, + username, + password, + fetchSize, + serverTimeZone, + connectTimeout, + connectMaxRetries, + connectionPoolSize, + chunkKeyColumn, + skipSnapshotBackfill, + isScanNewlyAddedTableEnabled); + this.compatibleMode = compatibleMode; + this.pdAddresses = pdAddresses; + this.hostMapping = hostMapping; + } + + public String getCompatibleMode() { + return compatibleMode; + } + + public String getPdAddresses() { + return pdAddresses; + } + + public String getHostMapping() { + return hostMapping; + } + + @Override + public TiDBConnectorConfig getDbzConnectorConfig() { + return new TiDBConnectorConfig(this); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/enumetator/TiDBSourceEnumerator.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/enumetator/TiDBSourceEnumerator.java new file mode 100644 index 00000000000..69a0ae592b2 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/enumetator/TiDBSourceEnumerator.java @@ -0,0 +1,3 @@ +package org.apache.flink.cdc.connectors.tidb.source.enumetator; + +public class TiDBSourceEnumerator {} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/schema/TiDBSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/schema/TiDBSchema.java new file mode 100644 index 00000000000..d91486002e1 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/schema/TiDBSchema.java @@ -0,0 +1,14 @@ +package org.apache.flink.cdc.connectors.tidb.source.schema; + +import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges; + +import java.util.Map; + +public class TiDBSchema { + private final Map schemasByTableId; + + public TiDBSchema(Map schemasByTableId) { + this.schemasByTableId = schemasByTableId; + } +} From 1d1a198e9fbcbf8c995ef970b4562670a3cfae3b Mon Sep 17 00:00:00 2001 From: ouyangwulink Date: Tue, 29 Oct 2024 18:38:12 +0800 Subject: [PATCH 09/10] add 1029 --- .../tidb/source/TiDBSourceBuilder.java | 156 ++++++++++++++++++ .../source/config/TiDBConnectorConfig.java | 44 +++-- .../tidb/source/config/TiDBSourceConfig.java | 3 +- .../config/TiDBSourceConfigFactory.java | 105 ++++++++++++ .../offset/LogMessageOffsetFactory.java | 39 +++++ .../source/splitter/TiDBChunkSplitter.java | 38 +++++ 6 files changed, 365 insertions(+), 20 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBSourceBuilder.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfigFactory.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/offset/LogMessageOffsetFactory.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/splitter/TiDBChunkSplitter.java diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBSourceBuilder.java new file mode 100644 index 00000000000..2585b0d9556 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBSourceBuilder.java @@ -0,0 +1,156 @@ +package org.apache.flink.cdc.connectors.tidb.source; + +import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfigFactory; +import org.apache.flink.cdc.connectors.base.options.StartupOptions; +import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; +import org.apache.flink.cdc.connectors.tidb.source.config.TiDBSourceConfigFactory; +import org.apache.flink.cdc.connectors.tidb.source.offset.LogMessageOffsetFactory; +import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; + +import java.time.Duration; +import java.util.Properties; + +import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull; + +public class TiDBSourceBuilder { + private final TiDBSourceConfigFactory configFactory = new TiDBSourceConfigFactory(); + private LogMessageOffsetFactory offsetFactory; + private DebeziumDeserializationSchema deserializer; + private TiDBDialect dialect; + + public TiDBSourceBuilder startupOptions(StartupOptions startupOptions) { + this.configFactory.startupOptions(startupOptions); + return this; + } + + public TiDBSourceBuilder hostname(String hostname) { + this.configFactory.hostname(hostname); + return this; + } + + public TiDBSourceBuilder port(int port) { + this.configFactory.port(port); + return this; + } + + public TiDBSourceBuilder driverClassName(String driverClassName) { + this.configFactory.driverClassName(driverClassName); + return this; + } + + public TiDBSourceBuilder databaseList(String... databaseList) { + this.configFactory.databaseList(databaseList); + return this; + } + + public TiDBSourceBuilder tableList(String... tableList) { + this.configFactory.tableList(tableList); + return this; + } + + public TiDBSourceBuilder username(String username) { + this.configFactory.username(username); + return this; + } + + public TiDBSourceBuilder password(String password) { + this.configFactory.password(password); + return this; + } + + public TiDBSourceBuilder debeziumProperties(Properties properties) { + this.configFactory.debeziumProperties(properties); + return this; + } + + public TiDBSourceBuilder tikvProperties(Properties properties) { + this.configFactory.tikvProperties(properties); + return this; + } + + public TiDBSourceBuilder serverTimeZone(String timeZone) { + this.configFactory.serverTimeZone(timeZone); + return this; + } + + public TiDBSourceBuilder connectTimeout(Duration connectTimeout) { + this.configFactory.connectTimeout(connectTimeout); + return this; + } + + public TiDBSourceBuilder connectionPoolSize(int connectionPoolSize) { + this.configFactory.connectionPoolSize(connectionPoolSize); + return this; + } + + public TiDBSourceBuilder connectMaxRetries(int connectMaxRetries) { + this.configFactory.connectMaxRetries(connectMaxRetries); + return this; + } + + public TiDBSourceBuilder chunkKeyColumn(String chunkKeyColumn) { + this.configFactory.chunkKeyColumn(chunkKeyColumn); + return this; + } + + /** + * The split size (number of rows) of table snapshot, captured tables are split into multiple + * splits when read the snapshot of table. + */ + public TiDBSourceBuilder splitSize(int splitSize) { + this.configFactory.splitSize(splitSize); + return this; + } + + /** The maximum fetch size for per poll when read table snapshot. */ + public TiDBSourceBuilder fetchSize(int fetchSize) { + this.configFactory.fetchSize(fetchSize); + return this; + } + + public TiDBSourceBuilder splitMetaGroupSize(int splitMetaGroupSize) { + this.configFactory.splitMetaGroupSize(splitMetaGroupSize); + return this; + } + + public TiDBSourceBuilder distributionFactorUpper(double distributionFactorUpper) { + this.configFactory.distributionFactorUpper(distributionFactorUpper); + return this; + } + + /** + * The lower bound of split key evenly distribution factor, the factor is used to determine + * whether the table is evenly distribution or not. + */ + public TiDBSourceBuilder distributionFactorLower(double distributionFactorLower) { + this.configFactory.distributionFactorLower(distributionFactorLower); + return this; + } + + public TiDBSourceBuilder scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) { + this.configFactory.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled); + return this; + } + + public TiDBSourceBuilder deserializer(DebeziumDeserializationSchema deserializer) { + this.deserializer = deserializer; + return this; + } + + public TiDBIncrementalSource build() { + this.offsetFactory = new LogMessageOffsetFactory(); + this.dialect = new TiDBDialect(); + return new TiDBIncrementalSource<>( + configFactory, checkNotNull(deserializer), offsetFactory, dialect); + } + + public static class TiDBIncrementalSource extends JdbcIncrementalSource { + public TiDBIncrementalSource( + JdbcSourceConfigFactory configFactory, + DebeziumDeserializationSchema deserializationSchema, + LogMessageOffsetFactory offsetFactory, + TiDBDialect dataSourceDialect) { + super(configFactory, deserializationSchema, offsetFactory, dataSourceDialect); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBConnectorConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBConnectorConfig.java index d7f6dc7d606..f1895695485 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBConnectorConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBConnectorConfig.java @@ -4,30 +4,38 @@ import io.debezium.connector.SourceInfoStructMaker; import io.debezium.relational.ColumnFilterMode; import io.debezium.relational.RelationalDatabaseConnectorConfig; -import io.debezium.relational.Selectors; +import io.debezium.relational.TableId; import io.debezium.relational.Tables; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + public class TiDBConnectorConfig extends RelationalDatabaseConnectorConfig { protected static final String LOGICAL_NAME = "tidb_cdc_connector"; + protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = Integer.MIN_VALUE; + // todo + protected static final List BUILT_IN_DB_NAMES = + Collections.unmodifiableList( + Arrays.asList("information_schema", "mysql", "tidb", "LBACSYS", "ORAAUDITOR")); + private final TiDBSourceConfig sourceConfig; - public TiDBConnectorConfig( - Configuration config, - String logicalName, - Tables.TableFilter systemTablesFilter, - Selectors.TableIdToStringMapper tableIdMapper, - int defaultSnapshotFetchSize, - ColumnFilterMode columnFilterMode) { + public TiDBConnectorConfig(TiDBSourceConfig sourceConfig) { + // todo super( - config, - logicalName, - systemTablesFilter, - tableIdMapper, - defaultSnapshotFetchSize, - columnFilterMode); - } - - public TiDBConnectorConfig(TiDBSourceConfig tiDBSourceConfig) { - this(null, null, null, null, 0, null); + Configuration.from(sourceConfig.getDbzProperties()), + LOGICAL_NAME, + Tables.TableFilter.fromPredicate( + tableId -> + "mysql".equalsIgnoreCase(sourceConfig.getCompatibleMode()) + ? !BUILT_IN_DB_NAMES.contains(tableId.catalog()) + : !BUILT_IN_DB_NAMES.contains(tableId.schema())), + TableId::identifier, + DEFAULT_SNAPSHOT_FETCH_SIZE, + "mysql".equalsIgnoreCase(sourceConfig.getCompatibleMode()) + ? ColumnFilterMode.CATALOG + : ColumnFilterMode.SCHEMA); + this.sourceConfig = sourceConfig; } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfig.java index 9438c193271..d4c03e720e1 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfig.java @@ -20,7 +20,6 @@ public TiDBSourceConfig( String compatibleMode, StartupOptions startupOptions, List databaseList, - List schemaList, List tableList, String pdAddresses, String hostMapping, @@ -48,7 +47,7 @@ public TiDBSourceConfig( super( startupOptions, databaseList, - schemaList, + null, tableList, splitSize, splitMetaGroupSize, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfigFactory.java new file mode 100644 index 00000000000..5e5438f6699 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfigFactory.java @@ -0,0 +1,105 @@ +package org.apache.flink.cdc.connectors.tidb.source.config; + +import io.debezium.config.Configuration; +import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; +import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfigFactory; + +import java.util.Properties; + +import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull; +import static org.apache.flink.cdc.connectors.base.utils.EnvironmentUtils.checkSupportCheckpointsAfterTasksFinished; + +/** A factory to initialize {@link TiDBSourceConfig}. */ +@SuppressWarnings("UnusedReturnValue") +public class TiDBSourceConfigFactory extends JdbcSourceConfigFactory { + private static final long serialVersionUID = 1L; + private String compatibleMode; + private String driverClassName; + private String pdAddresses; + + private String hostMapping; + + private Properties tikvProperties; + + public JdbcSourceConfigFactory compatibleMode(String compatibleMode) { + this.compatibleMode = compatibleMode; + return this; + } + + public JdbcSourceConfigFactory driverClassName(String driverClassName) { + this.driverClassName = driverClassName; + return this; + } + + public JdbcSourceConfigFactory pdAddresses(String pdAddresses) { + this.pdAddresses = pdAddresses; + return this; + } + + public JdbcSourceConfigFactory hostMapping(String hostMapping) { + this.hostMapping = hostMapping; + return this; + } + + public JdbcSourceConfigFactory tikvProperties(Properties tikvProperties) { + this.tikvProperties = tikvProperties; + return this; + } + + @Override + public JdbcSourceConfig create(int subtask) { + checkSupportCheckpointsAfterTasksFinished(closeIdleReaders); + Properties props = new Properties(); + props.setProperty("database.server.name", "tidb_cdc"); + props.setProperty("database.hostname", checkNotNull(hostname)); + props.setProperty("database.port", String.valueOf(port)); + props.setProperty("database.user", checkNotNull(username)); + props.setProperty("database.password", checkNotNull(password)); + props.setProperty("database.dbname", checkNotNull(databaseList.get(0))); + props.setProperty("database.connect.timeout.ms", String.valueOf(connectTimeout.toMillis())); + + // table filter + // props.put("database.include.list", String.join(",", databaseList)); + if (tableList != null) { + props.put("table.include.list", String.join(",", tableList)); + } + // value converter + props.put("decimal.handling.mode", "precise"); + props.put("time.precision.mode", "adaptive_time_microseconds"); + props.put("binary.handling.mode", "bytes"); + + if (dbzProperties != null) { + props.putAll(dbzProperties); + } + + Configuration dbzConfiguration = Configuration.from(props); + return new TiDBSourceConfig( + compatibleMode, + startupOptions, + databaseList, + tableList, + pdAddresses, + hostMapping, + splitSize, + splitMetaGroupSize, + distributionFactorUpper, + distributionFactorLower, + includeSchemaChanges, + closeIdleReaders, + dbzProperties, + dbzConfiguration, + driverClassName, + hostname, + port, + username, + password, + fetchSize, + serverTimeZone, + connectTimeout, + connectMaxRetries, + connectionPoolSize, + chunkKeyColumn, + skipSnapshotBackfill, + scanNewlyAddedTableEnabled); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/offset/LogMessageOffsetFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/offset/LogMessageOffsetFactory.java new file mode 100644 index 00000000000..c733630858d --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/offset/LogMessageOffsetFactory.java @@ -0,0 +1,39 @@ +package org.apache.flink.cdc.connectors.tidb.source.offset; + +import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; +import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory; + +import java.util.Map; + +public class LogMessageOffsetFactory extends OffsetFactory { + + @Override + public Offset newOffset(Map offset) { + return null; + } + + @Override + public Offset newOffset(String filename, Long position) { + return null; + } + + @Override + public Offset newOffset(Long position) { + return null; + } + + @Override + public Offset createTimestampOffset(long timestampMillis) { + return null; + } + + @Override + public Offset createInitialOffset() { + return null; + } + + @Override + public Offset createNoStoppingOffset() { + return null; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/splitter/TiDBChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/splitter/TiDBChunkSplitter.java new file mode 100644 index 00000000000..1718a7ce78d --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/splitter/TiDBChunkSplitter.java @@ -0,0 +1,38 @@ +package org.apache.flink.cdc.connectors.tidb.source.splitter; + +import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.Column; +import io.debezium.relational.TableId; +import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; +import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect; +import org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter; +import org.apache.flink.table.types.DataType; + +import java.sql.SQLException; + +public class TiDBChunkSplitter extends JdbcSourceChunkSplitter { + public TiDBChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) { + super(sourceConfig, dialect); + } + + @Override + protected Object queryNextChunkMax( + JdbcConnection jdbc, + TableId tableId, + Column splitColumn, + int chunkSize, + Object includedLowerBound) + throws SQLException { + return null; + } + + @Override + protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException { + return null; + } + + @Override + protected DataType fromDbzColumn(Column splitColumn) { + return null; + } +} From 51c82679c08643c918d57c5f7c27601590a2d892 Mon Sep 17 00:00:00 2001 From: wulin Date: Thu, 12 Dec 2024 15:27:35 +0800 Subject: [PATCH 10/10] [FLINK-36895] The JdbcSourceChunkSplitter#queryMin method passed a parameter with tableName/coiumnName reversed. --- .../splitter/JdbcSourceChunkSplitter.java | 719 +++++++++--------- 1 file changed, 350 insertions(+), 369 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java index 9e5594136b8..ab5b504188c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java @@ -17,6 +17,11 @@ package org.apache.flink.cdc.connectors.base.source.assigner.splitter; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.Column; +import io.debezium.relational.Table; +import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges; import org.apache.flink.cdc.common.annotation.Experimental; import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect; @@ -27,17 +32,10 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.FlinkRuntimeException; - -import io.debezium.jdbc.JdbcConnection; -import io.debezium.relational.Column; -import io.debezium.relational.Table; -import io.debezium.relational.TableId; -import io.debezium.relational.history.TableChanges; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; - import java.math.BigDecimal; import java.sql.SQLException; import java.util.ArrayList; @@ -56,379 +54,362 @@ /** The {@code ChunkSplitter} used to split table into a set of chunks for JDBC data source. */ @Experimental public abstract class JdbcSourceChunkSplitter implements ChunkSplitter { - private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceChunkSplitter.class); - protected final JdbcSourceConfig sourceConfig; - protected final JdbcDataSourceDialect dialect; - - public JdbcSourceChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) { - this.sourceConfig = sourceConfig; - this.dialect = dialect; + private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceChunkSplitter.class); + protected final JdbcSourceConfig sourceConfig; + protected final JdbcDataSourceDialect dialect; + + public JdbcSourceChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) { + this.sourceConfig = sourceConfig; + this.dialect = dialect; + } + + /** Generates all snapshot splits (chunks) for the give table path. */ + @Override + public Collection generateSplits(TableId tableId) { + try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) { + + LOG.info("Start splitting table {} into chunks...", tableId); + long start = System.currentTimeMillis(); + + Table table = Objects.requireNonNull(dialect.queryTableSchema(jdbc, tableId)).getTable(); + Column splitColumn = getSplitColumn(table, sourceConfig.getChunkKeyColumn()); + final List chunks; + try { + chunks = splitTableIntoChunks(jdbc, tableId, splitColumn); + } catch (SQLException e) { + throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e); + } + + // convert chunks into splits + List splits = new ArrayList<>(); + RowType splitType = getSplitType(splitColumn); + for (int i = 0; i < chunks.size(); i++) { + ChunkRange chunk = chunks.get(i); + SnapshotSplit split = + createSnapshotSplit( + jdbc, tableId, i, splitType, chunk.getChunkStart(), chunk.getChunkEnd()); + splits.add(split); + } + + long end = System.currentTimeMillis(); + LOG.info( + "Split table {} into {} chunks, time cost: {}ms.", tableId, splits.size(), end - start); + return splits; + } catch (Exception e) { + throw new FlinkRuntimeException( + String.format("Generate Splits for table %s error", tableId), e); } - - /** Generates all snapshot splits (chunks) for the give table path. */ - @Override - public Collection generateSplits(TableId tableId) { - try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) { - - LOG.info("Start splitting table {} into chunks...", tableId); - long start = System.currentTimeMillis(); - - Table table = - Objects.requireNonNull(dialect.queryTableSchema(jdbc, tableId)).getTable(); - Column splitColumn = getSplitColumn(table, sourceConfig.getChunkKeyColumn()); - final List chunks; - try { - chunks = splitTableIntoChunks(jdbc, tableId, splitColumn); - } catch (SQLException e) { - throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e); - } - - // convert chunks into splits - List splits = new ArrayList<>(); - RowType splitType = getSplitType(splitColumn); - for (int i = 0; i < chunks.size(); i++) { - ChunkRange chunk = chunks.get(i); - SnapshotSplit split = - createSnapshotSplit( - jdbc, - tableId, - i, - splitType, - chunk.getChunkStart(), - chunk.getChunkEnd()); - splits.add(split); - } - - long end = System.currentTimeMillis(); - LOG.info( - "Split table {} into {} chunks, time cost: {}ms.", - tableId, - splits.size(), - end - start); - return splits; - } catch (Exception e) { - throw new FlinkRuntimeException( - String.format("Generate Splits for table %s error", tableId), e); - } + } + + /** + * Query the maximum value of the next chunk, and the next chunk must be greater than or equal to + * includedLowerBound value [min_1, max_1), [min_2, max_2),... [min_n, null). Each + * time this method is called it will return max1, max2... + * + *

Each database has different grammar to get limit number of data, for example, `limit N` in + * mysql or postgres, `top(N)` in sqlserver , `FETCH FIRST %S ROWS ONLY` in DB2. + * + * @param jdbc JDBC connection. + * @param tableId table identity. + * @param splitColumn column. + * @param chunkSize chunk size. + * @param includedLowerBound the previous chunk end value. + * @return next chunk end value. + */ + protected abstract Object queryNextChunkMax( + JdbcConnection jdbc, + TableId tableId, + Column splitColumn, + int chunkSize, + Object includedLowerBound) + throws SQLException; + + /** + * Approximate total number of entries in the lookup table. + * + *

Each database has different system table to lookup up approximate total number. For example, + * `pg_class` in postgres, `sys.dm_db_partition_stats` in sqlserver, `SYSCAT.TABLE` in db2. + * + * @param jdbc JDBC connection. + * @param tableId table identity. + * @return approximate row count. + */ + protected abstract Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) + throws SQLException; + + /** + * Checks whether split column is evenly distributed across its range. + * + * @param splitColumn split column. + * @return true that means split column with type BIGINT, INT, DECIMAL. + */ + protected boolean isEvenlySplitColumn(Column splitColumn) { + DataType flinkType = fromDbzColumn(splitColumn); + LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot(); + + // currently, we only support the optimization that split column with type BIGINT, INT, + // DECIMAL + return typeRoot == LogicalTypeRoot.BIGINT + || typeRoot == LogicalTypeRoot.INTEGER + || typeRoot == LogicalTypeRoot.DECIMAL; + } + + /** + * Get a corresponding Flink data type from a debezium {@link Column}. + * + * @param splitColumn dbz split column. + * @return flink data type + */ + protected abstract DataType fromDbzColumn(Column splitColumn); + + /** Returns the distribution factor of the given table. */ + protected double calculateDistributionFactor( + TableId tableId, Object min, Object max, long approximateRowCnt) { + + if (!min.getClass().equals(max.getClass())) { + throw new IllegalStateException( + String.format( + "Unsupported operation type, the MIN value type %s is different with MAX value type %s.", + min.getClass().getSimpleName(), max.getClass().getSimpleName())); } - - /** - * Query the maximum value of the next chunk, and the next chunk must be greater than or equal - * to includedLowerBound value [min_1, max_1), [min_2, max_2),... [min_n, null). - * Each time this method is called it will return max1, max2... - * - *

Each database has different grammar to get limit number of data, for example, `limit N` in - * mysql or postgres, `top(N)` in sqlserver , `FETCH FIRST %S ROWS ONLY` in DB2. - * - * @param jdbc JDBC connection. - * @param tableId table identity. - * @param splitColumn column. - * @param chunkSize chunk size. - * @param includedLowerBound the previous chunk end value. - * @return next chunk end value. - */ - protected abstract Object queryNextChunkMax( - JdbcConnection jdbc, - TableId tableId, - Column splitColumn, - int chunkSize, - Object includedLowerBound) - throws SQLException; - - /** - * Approximate total number of entries in the lookup table. - * - *

Each database has different system table to lookup up approximate total number. For - * example, `pg_class` in postgres, `sys.dm_db_partition_stats` in sqlserver, `SYSCAT.TABLE` in - * db2. - * - * @param jdbc JDBC connection. - * @param tableId table identity. - * @return approximate row count. - */ - protected abstract Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) - throws SQLException; - - /** - * Checks whether split column is evenly distributed across its range. - * - * @param splitColumn split column. - * @return true that means split column with type BIGINT, INT, DECIMAL. - */ - protected boolean isEvenlySplitColumn(Column splitColumn) { - DataType flinkType = fromDbzColumn(splitColumn); - LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot(); - - // currently, we only support the optimization that split column with type BIGINT, INT, - // DECIMAL - return typeRoot == LogicalTypeRoot.BIGINT - || typeRoot == LogicalTypeRoot.INTEGER - || typeRoot == LogicalTypeRoot.DECIMAL; + if (approximateRowCnt == 0) { + return Double.MAX_VALUE; } - - /** - * Get a corresponding Flink data type from a debezium {@link Column}. - * - * @param splitColumn dbz split column. - * @return flink data type - */ - protected abstract DataType fromDbzColumn(Column splitColumn); - - /** Returns the distribution factor of the given table. */ - protected double calculateDistributionFactor( - TableId tableId, Object min, Object max, long approximateRowCnt) { - - if (!min.getClass().equals(max.getClass())) { - throw new IllegalStateException( - String.format( - "Unsupported operation type, the MIN value type %s is different with MAX value type %s.", - min.getClass().getSimpleName(), max.getClass().getSimpleName())); - } - if (approximateRowCnt == 0) { - return Double.MAX_VALUE; - } - BigDecimal difference = ObjectUtils.minus(max, min); - // factor = (max - min + 1) / rowCount - final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1)); - double distributionFactor = - subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, ROUND_CEILING).doubleValue(); - LOG.info( - "The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}", - tableId, - distributionFactor, - min, - max, - approximateRowCnt); - return distributionFactor; + BigDecimal difference = ObjectUtils.minus(max, min); + // factor = (max - min + 1) / rowCount + final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1)); + double distributionFactor = + subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, ROUND_CEILING).doubleValue(); + LOG.info( + "The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}", + tableId, + distributionFactor, + min, + max, + approximateRowCnt); + return distributionFactor; + } + + /** + * Get the column which is seen as chunk key. + * + * @param table table identity. + * @param chunkKeyColumn column name which is seen as chunk key, if chunkKeyColumn is null, use + * primary key instead. @Column the column which is seen as chunk key. + */ + protected Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) { + return JdbcChunkUtils.getSplitColumn(table, chunkKeyColumn); + } + + /** ChunkEnd less than or equal to max. */ + protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) { + return ObjectUtils.compare(chunkEnd, max) <= 0; + } + + /** ChunkEnd greater than or equal to max. */ + protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) { + return ObjectUtils.compare(chunkEnd, max) >= 0; + } + + /** + * Query the maximum and minimum value of the column in the table. e.g. query string + * SELECT MIN(%s) FROM %s WHERE %s > ? + * + * @param jdbc JDBC connection. + * @param tableId table identity. + * @param splitColumn column. + * @return maximum and minimum value. + */ + protected Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column splitColumn) + throws SQLException { + return JdbcChunkUtils.queryMinMax( + jdbc, jdbc.quotedTableIdString(tableId), jdbc.quotedColumnIdString(splitColumn.name())); + } + + /** + * Query the minimum value of the column in the table, and the minimum value must greater than the + * excludedLowerBound value. e.g. prepare query string + * SELECT MIN(%s) FROM %s WHERE %s > ? + * + * @param jdbc JDBC connection. + * @param tableId table identity. + * @param splitColumn column. + * @param excludedLowerBound the minimum value should be greater than this value. + * @return minimum value. + */ + protected Object queryMin( + JdbcConnection jdbc, TableId tableId, Column splitColumn, Object excludedLowerBound) + throws SQLException { + return JdbcChunkUtils.queryMin( + jdbc, + jdbc.quotedTableIdString(tableId), + jdbc.quotedColumnIdString(splitColumn.name()), + excludedLowerBound); + } + + /** + * convert dbz column to Flink row type. + * + * @param splitColumn split column. + * @return flink row type. + */ + private RowType getSplitType(Column splitColumn) { + return (RowType) ROW(FIELD(splitColumn.name(), fromDbzColumn(splitColumn))).getLogicalType(); + } + + /** + * We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using + * evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request + * many queries and is not efficient. + */ + private List splitTableIntoChunks( + JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException { + final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn); + final Object min = minMax[0]; + final Object max = minMax[1]; + if (min == null || max == null || min.equals(max)) { + // empty table, or only one row, return full table scan as a chunk + return Collections.singletonList(ChunkRange.all()); } - /** - * Get the column which is seen as chunk key. - * - * @param table table identity. - * @param chunkKeyColumn column name which is seen as chunk key, if chunkKeyColumn is null, use - * primary key instead. @Column the column which is seen as chunk key. - */ - protected Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) { - return JdbcChunkUtils.getSplitColumn(table, chunkKeyColumn); + final int chunkSize = sourceConfig.getSplitSize(); + final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); + final double distributionFactorLower = sourceConfig.getDistributionFactorLower(); + + if (isEvenlySplitColumn(splitColumn)) { + long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId); + double distributionFactor = calculateDistributionFactor(tableId, min, max, approximateRowCnt); + + boolean dataIsEvenlyDistributed = + doubleCompare(distributionFactor, distributionFactorLower) >= 0 + && doubleCompare(distributionFactor, distributionFactorUpper) <= 0; + + if (dataIsEvenlyDistributed) { + // the minimum dynamic chunk size is at least 1 + final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); + return splitEvenlySizedChunks( + tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); + } else { + return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); + } + } else { + return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); } - - /** ChunkEnd less than or equal to max. */ - protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) { - return ObjectUtils.compare(chunkEnd, max) <= 0; - } - - /** ChunkEnd greater than or equal to max. */ - protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) { - return ObjectUtils.compare(chunkEnd, max) >= 0; - } - - /** - * Query the maximum and minimum value of the column in the table. e.g. query string - * SELECT MIN(%s) FROM %s WHERE %s > ? - * - * @param jdbc JDBC connection. - * @param tableId table identity. - * @param splitColumn column. - * @return maximum and minimum value. - */ - protected Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column splitColumn) - throws SQLException { - return JdbcChunkUtils.queryMinMax( - jdbc, - jdbc.quotedTableIdString(tableId), - jdbc.quotedColumnIdString(splitColumn.name())); + } + + /** + * Split table into evenly sized chunks based on the numeric min and max value of split column, + * and tumble chunks in step size. + */ + private List splitEvenlySizedChunks( + TableId tableId, + Object min, + Object max, + long approximateRowCnt, + int chunkSize, + int dynamicChunkSize) { + LOG.info( + "Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}", + tableId, + approximateRowCnt, + chunkSize, + dynamicChunkSize); + if (approximateRowCnt <= chunkSize) { + // there is no more than one chunk, return full table as a chunk + return Collections.singletonList(ChunkRange.all()); } - /** - * Query the minimum value of the column in the table, and the minimum value must greater than - * the excludedLowerBound value. e.g. prepare query string - * SELECT MIN(%s) FROM %s WHERE %s > ? - * - * @param jdbc JDBC connection. - * @param tableId table identity. - * @param splitColumn column. - * @param excludedLowerBound the minimum value should be greater than this value. - * @return minimum value. - */ - protected Object queryMin( - JdbcConnection jdbc, TableId tableId, Column splitColumn, Object excludedLowerBound) - throws SQLException { - return JdbcChunkUtils.queryMin( - jdbc, - jdbc.quotedColumnIdString(splitColumn.name()), - jdbc.quotedTableIdString(tableId), - excludedLowerBound); + final List splits = new ArrayList<>(); + Object chunkStart = null; + Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize); + while (ObjectUtils.compare(chunkEnd, max) <= 0) { + splits.add(ChunkRange.of(chunkStart, chunkEnd)); + chunkStart = chunkEnd; + try { + chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize); + } catch (ArithmeticException e) { + // Stop chunk split to avoid dead loop when number overflows. + break; + } } - - /** - * convert dbz column to Flink row type. - * - * @param splitColumn split column. - * @return flink row type. - */ - private RowType getSplitType(Column splitColumn) { - return (RowType) - ROW(FIELD(splitColumn.name(), fromDbzColumn(splitColumn))).getLogicalType(); + // add the ending split + splits.add(ChunkRange.of(chunkStart, null)); + return splits; + } + + /** Split table into unevenly sized chunks by continuously calculating next chunk max value. */ + private List splitUnevenlySizedChunks( + JdbcConnection jdbc, + TableId tableId, + Column splitColumn, + Object min, + Object max, + int chunkSize) + throws SQLException { + LOG.info("Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize); + final List splits = new ArrayList<>(); + Object chunkStart = null; + Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize); + int count = 0; + while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max, splitColumn)) { + // we start from [null, min + chunk_size) and avoid [null, min) + splits.add(ChunkRange.of(chunkStart, chunkEnd)); + // may sleep a while to avoid DDOS on PostgreSQL server + maySleep(count++, tableId); + chunkStart = chunkEnd; + chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize); } - - /** - * We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using - * evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request - * many queries and is not efficient. - */ - private List splitTableIntoChunks( - JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException { - final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn); - final Object min = minMax[0]; - final Object max = minMax[1]; - if (min == null || max == null || min.equals(max)) { - // empty table, or only one row, return full table scan as a chunk - return Collections.singletonList(ChunkRange.all()); - } - - final int chunkSize = sourceConfig.getSplitSize(); - final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); - final double distributionFactorLower = sourceConfig.getDistributionFactorLower(); - - if (isEvenlySplitColumn(splitColumn)) { - long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId); - double distributionFactor = - calculateDistributionFactor(tableId, min, max, approximateRowCnt); - - boolean dataIsEvenlyDistributed = - doubleCompare(distributionFactor, distributionFactorLower) >= 0 - && doubleCompare(distributionFactor, distributionFactorUpper) <= 0; - - if (dataIsEvenlyDistributed) { - // the minimum dynamic chunk size is at least 1 - final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); - return splitEvenlySizedChunks( - tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); - } else { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); - } - } else { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); - } + // add the ending split + splits.add(ChunkRange.of(chunkStart, null)); + return splits; + } + + private Object nextChunkEnd( + JdbcConnection jdbc, + Object previousChunkEnd, + TableId tableId, + Column splitColumn, + Object max, + int chunkSize) + throws SQLException { + // chunk end might be null when max values are removed + Object chunkEnd = queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd); + if (Objects.equals(previousChunkEnd, chunkEnd)) { + // we don't allow equal chunk start and end, + // should query the next one larger than chunkEnd + chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd); } - - /** - * Split table into evenly sized chunks based on the numeric min and max value of split column, - * and tumble chunks in step size. - */ - private List splitEvenlySizedChunks( - TableId tableId, - Object min, - Object max, - long approximateRowCnt, - int chunkSize, - int dynamicChunkSize) { - LOG.info( - "Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}", - tableId, - approximateRowCnt, - chunkSize, - dynamicChunkSize); - if (approximateRowCnt <= chunkSize) { - // there is no more than one chunk, return full table as a chunk - return Collections.singletonList(ChunkRange.all()); - } - - final List splits = new ArrayList<>(); - Object chunkStart = null; - Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize); - while (ObjectUtils.compare(chunkEnd, max) <= 0) { - splits.add(ChunkRange.of(chunkStart, chunkEnd)); - chunkStart = chunkEnd; - try { - chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize); - } catch (ArithmeticException e) { - // Stop chunk split to avoid dead loop when number overflows. - break; - } - } - // add the ending split - splits.add(ChunkRange.of(chunkStart, null)); - return splits; + if (isChunkEndGeMax(chunkEnd, max, splitColumn)) { + return null; + } else { + return chunkEnd; } - - /** Split table into unevenly sized chunks by continuously calculating next chunk max value. */ - private List splitUnevenlySizedChunks( - JdbcConnection jdbc, - TableId tableId, - Column splitColumn, - Object min, - Object max, - int chunkSize) - throws SQLException { - LOG.info( - "Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize); - final List splits = new ArrayList<>(); - Object chunkStart = null; - Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize); - int count = 0; - while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max, splitColumn)) { - // we start from [null, min + chunk_size) and avoid [null, min) - splits.add(ChunkRange.of(chunkStart, chunkEnd)); - // may sleep a while to avoid DDOS on PostgreSQL server - maySleep(count++, tableId); - chunkStart = chunkEnd; - chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize); - } - // add the ending split - splits.add(ChunkRange.of(chunkStart, null)); - return splits; - } - - private Object nextChunkEnd( - JdbcConnection jdbc, - Object previousChunkEnd, - TableId tableId, - Column splitColumn, - Object max, - int chunkSize) - throws SQLException { - // chunk end might be null when max values are removed - Object chunkEnd = - queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd); - if (Objects.equals(previousChunkEnd, chunkEnd)) { - // we don't allow equal chunk start and end, - // should query the next one larger than chunkEnd - chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd); - } - if (isChunkEndGeMax(chunkEnd, max, splitColumn)) { - return null; - } else { - return chunkEnd; - } - } - - private SnapshotSplit createSnapshotSplit( - JdbcConnection jdbc, - TableId tableId, - int chunkId, - RowType splitKeyType, - Object chunkStart, - Object chunkEnd) { - // currently, we only support single split column - Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart}; - Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd}; - Map schema = new HashMap<>(); - schema.put(tableId, dialect.queryTableSchema(jdbc, tableId)); - return new SnapshotSplit( - tableId, chunkId, splitKeyType, splitStart, splitEnd, null, schema); - } - - private void maySleep(int count, TableId tableId) { - // every 10 queries to sleep 0.1s - if (count % 10 == 0) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - // nothing to do - } - LOG.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId); - } + } + + private SnapshotSplit createSnapshotSplit( + JdbcConnection jdbc, + TableId tableId, + int chunkId, + RowType splitKeyType, + Object chunkStart, + Object chunkEnd) { + // currently, we only support single split column + Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart}; + Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd}; + Map schema = new HashMap<>(); + schema.put(tableId, dialect.queryTableSchema(jdbc, tableId)); + return new SnapshotSplit(tableId, chunkId, splitKeyType, splitStart, splitEnd, null, schema); + } + + private void maySleep(int count, TableId tableId) { + // every 10 queries to sleep 0.1s + if (count % 10 == 0) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // nothing to do + } + LOG.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId); } + } }