rows = new ArrayList<>(size);
- while (size > 0 && iter.hasNext()) {
- Row row = iter.next();
- rows.add(row.toString());
- size--;
- }
- return rows;
- }
-
/**
* Make some changes on the specified customer table. Changelog in string could be accessed by
* {@link #firstPartStreamEvents}.
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
index 4b355cac027..9d5a810d9e2 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
@@ -45,6 +45,9 @@
import java.util.List;
import java.util.concurrent.ExecutionException;
+import static org.apache.flink.cdc.common.testutils.TestCaseUtils.fetchAndConvert;
+import static org.apache.flink.cdc.common.testutils.TestCaseUtils.waitForSinkSize;
+import static org.apache.flink.cdc.common.testutils.TestCaseUtils.waitForSnapshotStarted;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@@ -158,7 +161,7 @@ public void testConsumingAllEvents()
statement.execute("DELETE FROM inventory.products WHERE id=111;");
}
- waitForSinkSize("sink", 20);
+ waitForSinkSize("sink", false, 20);
/*
*
@@ -266,7 +269,7 @@ public void testStartupFromLatestOffset() throws Exception {
statement.execute("DELETE FROM inventory.products WHERE id=111;");
}
- waitForSinkSize("sink", 5);
+ waitForSinkSize("sink", false, 5);
String[] expected =
new String[] {"110,jacket,new water resistent white wind breaker,0.500"};
@@ -442,7 +445,7 @@ public void testAllTypes() throws Throwable {
// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM full_types");
- waitForSinkSize("sink", 1);
+ waitForSinkSize("sink", false, 1);
// wait a bit to make sure the replication slot is ready
Thread.sleep(5000);
@@ -452,7 +455,7 @@ public void testAllTypes() throws Throwable {
statement.execute("UPDATE inventory.full_types SET small_c=0 WHERE id=1;");
}
- waitForSinkSize("sink", 3);
+ waitForSinkSize("sink", false, 3);
List expected =
Arrays.asList(
@@ -546,7 +549,7 @@ public void testMetadataColumns() throws Throwable {
}
// waiting for change events finished.
- waitForSinkSize("sink", 16);
+ waitForSinkSize("sink", false, 16);
String databaseName = POSTGRES_CONTAINER.getDatabaseName();
List expected =
@@ -679,7 +682,7 @@ public void testUpsertMode() throws Exception {
statement.execute("DELETE FROM inventory.products WHERE id=111;");
}
- waitForSinkSize("sink", 20);
+ waitForSinkSize("sink", false, 20);
/*
*
@@ -784,7 +787,7 @@ public void testUniqueIndexIncludingFunction() throws Exception {
expected.addAll(Arrays.asList("-U[1, a]", "+U[1, null]"));
CloseableIterator iterator = tableResult.collect();
- assertEqualsInAnyOrder(expected, fetchRows(iterator, expected.size()));
+ assertEqualsInAnyOrder(expected, fetchAndConvert(iterator, expected.size(), Row::toString));
tableResult.getJobClient().get().cancel().get();
RowUtils.USE_LEGACY_TO_STRING = true;
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLSavepointITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLSavepointITCase.java
index 24b9ec8ff3f..58ebb1cb733 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLSavepointITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLSavepointITCase.java
@@ -41,6 +41,7 @@
import java.util.Optional;
import java.util.concurrent.ExecutionException;
+import static org.apache.flink.cdc.common.testutils.TestCaseUtils.waitForSinkSize;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;
@@ -123,19 +124,24 @@ private void testRestartFromSavepoint() throws Exception {
// wait for the source startup, we don't have a better way to wait it, use sleep for now
Thread.sleep(10000L);
- waitForSinkResult(
- "sink",
- Arrays.asList(
- "+I[101, scooter, Small 2-wheel scooter, 3.140]",
- "+I[102, car battery, 12V car battery, 8.100]",
- "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]",
- "+I[104, hammer, 12oz carpenter's hammer, 0.750]",
- "+I[105, hammer, 14oz carpenter's hammer, 0.875]",
- "+I[106, hammer, 16oz carpenter's hammer, 1.000]",
- "+I[107, rocks, box of assorted rocks, 5.300]",
- "+I[108, jacket, water resistent black wind breaker, 0.100]",
- "+I[109, spare tire, 24 inch spare tire, 22.200]",
- "+I[110, jacket, new water resistent white wind breaker, 0.500]"));
+
+ waitForSinkSize("sink", false, 10);
+
+ List actual = TestValuesTableFactory.getResultsAsStrings("sink");
+ assertThat(
+ actual,
+ containsInAnyOrder(
+ Arrays.asList(
+ "+I[101, scooter, Small 2-wheel scooter, 3.140]",
+ "+I[102, car battery, 12V car battery, 8.100]",
+ "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]",
+ "+I[104, hammer, 12oz carpenter's hammer, 0.750]",
+ "+I[105, hammer, 14oz carpenter's hammer, 0.875]",
+ "+I[106, hammer, 16oz carpenter's hammer, 1.000]",
+ "+I[107, rocks, box of assorted rocks, 5.300]",
+ "+I[108, jacket, water resistent black wind breaker, 0.100]",
+ "+I[109, spare tire, 24 inch spare tire, 22.200]",
+ "+I[110, jacket, new water resistent white wind breaker, 0.500]")));
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
jobClient.cancel().get();
@@ -162,7 +168,7 @@ private void testRestartFromSavepoint() throws Exception {
result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
jobClient = result.getJobClient().get();
- waitForSinkSize("sink", 15);
+ waitForSinkSize("sink", false, 15);
String[] expected =
new String[] {
@@ -179,7 +185,7 @@ private void testRestartFromSavepoint() throws Exception {
"+I[112, jacket, new water resistent white wind breaker, 0.500]"
};
- List actual = TestValuesTableFactory.getResultsAsStrings("sink");
+ actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertThat(actual, containsInAnyOrder(expected));
jobClient.cancel().get();
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/PostgresTestUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/PostgresTestUtils.java
index fce9ece5e3a..34eba11fdfc 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/PostgresTestUtils.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/PostgresTestUtils.java
@@ -21,7 +21,6 @@
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.util.CloseableIterator;
import org.apache.commons.lang3.StringUtils;
@@ -86,24 +85,6 @@ public static void restartTaskManager(MiniCluster miniCluster, Runnable afterFai
miniCluster.startTaskManager();
}
- public static void waitForUpsertSinkSize(String sinkName, int expectedSize)
- throws InterruptedException {
- while (upsertSinkSize(sinkName) < expectedSize) {
- Thread.sleep(100);
- }
- }
-
- public static int upsertSinkSize(String sinkName) {
- synchronized (TestValuesTableFactory.class) {
- try {
- return TestValuesTableFactory.getResultsAsStrings(sinkName).size();
- } catch (IllegalArgumentException e) {
- // job is not started yet
- return 0;
- }
- }
- }
-
public static String getTableNameRegex(String[] captureCustomerTables) {
checkState(captureCustomerTables.length > 0);
if (captureCustomerTables.length == 1) {
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/pom.xml b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/pom.xml
index a96d3eb57c2..a5996d96180 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/pom.xml
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/pom.xml
@@ -76,6 +76,14 @@ limitations under the License.
+
+ org.apache.flink
+ flink-cdc-common
+ ${project.version}
+ test-jar
+ test
+
+
org.apache.flink
flink-table-planner_${scala.binary.version}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/SqlServerTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/SqlServerTestBase.java
index d107ea5f489..bf5fdb93f31 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/SqlServerTestBase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/SqlServerTestBase.java
@@ -17,7 +17,6 @@
package org.apache.flink.cdc.connectors.sqlserver;
-import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.AbstractTestBase;
import org.awaitility.Awaitility;
@@ -198,28 +197,4 @@ protected void initializeSqlServerTable(String sqlFile) {
throw new RuntimeException(e);
}
}
-
- protected static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
- while (sinkSize(sinkName) == 0) {
- Thread.sleep(100);
- }
- }
-
- protected static void waitForSinkSize(String sinkName, int expectedSize)
- throws InterruptedException {
- while (sinkSize(sinkName) < expectedSize) {
- Thread.sleep(100);
- }
- }
-
- protected static int sinkSize(String sinkName) {
- synchronized (TestValuesTableFactory.class) {
- try {
- return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size();
- } catch (IllegalArgumentException e) {
- // job is not started yet
- return 0;
- }
- }
- }
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java
index e8b231aac2b..f41550e4f7c 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java
@@ -47,12 +47,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
-import java.util.function.Function;
-import java.util.stream.Collectors;
import static java.lang.String.format;
+import static org.apache.flink.cdc.common.testutils.TestCaseUtils.fetchAndConvert;
import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.catalog.Column.physical;
@@ -368,7 +366,7 @@ private List testBackfillWhenWritingEvents(
try (CloseableIterator iterator =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source")
.executeAndCollect()) {
- records = fetchRowData(iterator, fetchSize, customerTable::stringify);
+ records = fetchAndConvert(iterator, fetchSize, customerTable::stringify);
env.close();
}
return records;
@@ -493,7 +491,8 @@ private void testSqlServerParallelSource(
LOG.info("snapshot data start");
assertEqualsInAnyOrder(
- expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
+ expectedSnapshotData,
+ fetchAndConvert(iterator, expectedSnapshotData.size(), Row::toString));
// second step: check the change stream data
for (String tableId : captureCustomerTables) {
@@ -525,7 +524,9 @@ private void testSqlServerParallelSource(
for (int i = 0; i < captureCustomerTables.length; i++) {
expectedBinlogData.addAll(Arrays.asList(binlogForSingleTable));
}
- assertEqualsInAnyOrder(expectedBinlogData, fetchRows(iterator, expectedBinlogData.size()));
+ assertEqualsInAnyOrder(
+ expectedBinlogData,
+ fetchAndConvert(iterator, expectedBinlogData.size(), Row::toString));
tableResult.getJobClient().get().cancel().get();
}
@@ -550,27 +551,6 @@ private void sleepMs(long millis) {
}
}
- public static List fetchRowData(
- Iterator iter, int size, Function stringifier) {
- List rows = new ArrayList<>(size);
- while (size > 0 && iter.hasNext()) {
- RowData row = iter.next();
- rows.add(row);
- size--;
- }
- return rows.stream().map(stringifier).collect(Collectors.toList());
- }
-
- private static List fetchRows(Iterator iter, int size) {
- List rows = new ArrayList<>(size);
- while (size > 0 && iter.hasNext()) {
- Row row = iter.next();
- rows.add(row.toString());
- size--;
- }
- return rows;
- }
-
private String getTableNameRegex(String[] captureCustomerTables) {
checkState(captureCustomerTables.length > 0);
if (captureCustomerTables.length == 1) {
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceTestBase.java
index cd670792e97..1ca9870deba 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceTestBase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceTestBase.java
@@ -24,7 +24,6 @@
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
@@ -198,30 +197,6 @@ protected static void assertEqualsInOrder(List expected, List ac
assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0]));
}
- protected static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
- while (sinkSize(sinkName) == 0) {
- Thread.sleep(100);
- }
- }
-
- protected static void waitForSinkSize(String sinkName, int expectedSize)
- throws InterruptedException {
- while (sinkSize(sinkName) < expectedSize) {
- Thread.sleep(100);
- }
- }
-
- protected static int sinkSize(String sinkName) {
- synchronized (TestValuesTableFactory.class) {
- try {
- return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size();
- } catch (IllegalArgumentException e) {
- // job is not started yet
- return 0;
- }
- }
- }
-
protected Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(
MSSQL_SERVER_CONTAINER.getJdbcUrl(),
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerConnectorITCase.java
index 0b3465e0a7a..122a6a6e0d2 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerConnectorITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerConnectorITCase.java
@@ -40,6 +40,8 @@
import java.util.concurrent.ExecutionException;
import static org.apache.flink.api.common.JobStatus.RUNNING;
+import static org.apache.flink.cdc.common.testutils.TestCaseUtils.waitForSinkSize;
+import static org.apache.flink.cdc.common.testutils.TestCaseUtils.waitForSnapshotStarted;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@@ -165,7 +167,7 @@ public void testConsumingAllEvents()
statement.execute("UPDATE inventory.dbo.products SET volume='1.2' WHERE id=110;");
}
- waitForSinkSize("sink", 20);
+ waitForSinkSize("sink", false, 20);
/*
*
@@ -267,7 +269,7 @@ public void testStartupFromLatestOffset() throws Exception {
statement.execute(
"INSERT INTO inventory.dbo.products (name,description,weight) VALUES ('scooter','Big 3-wheel scooter',5.20);");
- waitForSinkSize("sink", 2);
+ waitForSinkSize("sink", false, 2);
String[] expected =
new String[] {
@@ -379,7 +381,7 @@ public void testAllTypes() throws Throwable {
"UPDATE column_type_test.dbo.full_types SET val_int=8888 WHERE id=0;");
}
- waitForSinkSize("sink", 2);
+ waitForSinkSize("sink", false, 2);
List expected =
Arrays.asList(
@@ -463,7 +465,7 @@ public void testMetadataColumns() throws Throwable {
}
// waiting for change events finished.
- waitForSinkSize("sink", 16);
+ waitForSinkSize("sink", false, 16);
List expected =
Arrays.asList(
@@ -549,7 +551,7 @@ private void testUseChunkColumn(String chunkColumn)
// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM evenly_shopping_cart");
- waitForSinkSize("sink", 12);
+ waitForSinkSize("sink", false, 12);
result.getJobClient().get().cancel().get();
}
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTimezoneITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTimezoneITCase.java
index c3d9f985515..f56882db6be 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTimezoneITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTimezoneITCase.java
@@ -36,6 +36,7 @@
import java.util.TimeZone;
import java.util.concurrent.ExecutionException;
+import static org.apache.flink.cdc.common.testutils.TestCaseUtils.waitForSnapshotStarted;
import static org.junit.Assert.assertEquals;
import static org.testcontainers.containers.MSSQLServerContainer.MS_SQL_SERVER_PORT;
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/pom.xml b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/pom.xml
index 05b1e63ed63..869b41f35ba 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/pom.xml
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/pom.xml
@@ -56,6 +56,15 @@ limitations under the License.
+
+
+ org.apache.flink
+ flink-cdc-common
+ ${project.version}
+ test-jar
+ test
+
+
org.apache.flink
flink-table-planner_${scala.binary.version}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBConnectorITCase.java
index dcc48817804..7fcdeedee37 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBConnectorITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBConnectorITCase.java
@@ -38,6 +38,7 @@
import java.util.List;
import java.util.stream.Collectors;
+import static org.apache.flink.cdc.common.testutils.TestCaseUtils.waitForSinkSize;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -100,7 +101,7 @@ public void testConsumingAllEvents() throws Exception {
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM tidb_source");
// wait for snapshot finished and begin binlog
- waitForSinkSize("sink", 9);
+ waitForSinkSize("sink", false, 9);
try (Connection connection = getJdbcConnection("inventory");
Statement statement = connection.createStatement()) {
@@ -118,7 +119,7 @@ public void testConsumingAllEvents() throws Exception {
statement.execute("DELETE FROM products WHERE id=111;");
}
- waitForSinkSize("sink", 16);
+ waitForSinkSize("sink", false, 16);
/*
*
@@ -205,7 +206,7 @@ public void testDeleteColumn() throws Exception {
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM tidb_source");
// wait for snapshot finished and begin binlog
- waitForSinkSize("sink", 9);
+ waitForSinkSize("sink", false, 9);
try (Connection connection = getJdbcConnection("inventory");
Statement statement = connection.createStatement()) {
@@ -220,7 +221,7 @@ public void testDeleteColumn() throws Exception {
statement.execute("DELETE FROM products WHERE id=111;");
}
- waitForSinkSize("sink", 15);
+ waitForSinkSize("sink", false, 15);
List expected =
Arrays.asList(
@@ -284,7 +285,7 @@ public void testAddColumn() throws Exception {
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM tidb_source");
// wait for snapshot finished and begin binlog
- waitForSinkSize("sink", 9);
+ waitForSinkSize("sink", false, 9);
try (Connection connection = getJdbcConnection("inventory");
Statement statement = connection.createStatement()) {
@@ -304,7 +305,7 @@ public void testAddColumn() throws Exception {
statement.execute("DELETE FROM products WHERE id=111;");
}
- waitForSinkSize("sink", 16);
+ waitForSinkSize("sink", false, 16);
List expected =
Arrays.asList(
@@ -374,7 +375,7 @@ public void testMetadataColumns() throws Exception {
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM tidb_source");
// wait for snapshot finished and begin binlog
- waitForSinkSize("sink", 9);
+ waitForSinkSize("sink", false, 9);
try (Connection connection = getJdbcConnection("inventory");
Statement statement = connection.createStatement()) {
@@ -382,7 +383,7 @@ public void testMetadataColumns() throws Exception {
"UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
}
- waitForSinkSize("sink", 10);
+ waitForSinkSize("sink", false, 10);
List expected =
Arrays.asList(
@@ -519,7 +520,7 @@ public void testAllDataTypes() throws Throwable {
"INSERT INTO sink SELECT id, tiny_c, tiny_un_c, small_c, small_un_c, medium_c, medium_un_c, int_c, int_un_c, int11_c, big_c, big_un_c, varchar_c, char_c, real_c, float_c, double_c, decimal_c, numeric_c, big_decimal_c, bit1_c, tiny1_c, boolean_c, date_c, time_c, datetime3_c, datetime6_c, cast(timestamp_c as timestamp), file_uuid, bit_c, text_c, tiny_blob_c, blob_c, medium_blob_c, long_blob_c, year_c, enum_c, set_c, json_c FROM tidb_source");
// wait for snapshot finished and begin binlog
- waitForSinkSize("sink", 1);
+ waitForSinkSize("sink", false, 1);
try (Connection connection = getJdbcConnection("column_type_test");
Statement statement = connection.createStatement()) {
@@ -527,7 +528,7 @@ public void testAllDataTypes() throws Throwable {
"UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;");
}
- waitForSinkSize("sink", 2);
+ waitForSinkSize("sink", false, 2);
List expected =
Arrays.asList(
@@ -599,7 +600,7 @@ public void testTiDBServerTimezone(String timezone) throws Exception {
"INSERT INTO sink select `id`, date_c, time_c,datetime3_c, datetime6_c, cast(timestamp_c as timestamp) FROM tidb_source t");
// wait for snapshot finished and begin binlog
- waitForSinkSize("sink", 1);
+ waitForSinkSize("sink", false, 1);
try (Connection connection = getJdbcConnection("column_type_test");
Statement statement = connection.createStatement()) {
@@ -607,7 +608,7 @@ public void testTiDBServerTimezone(String timezone) throws Exception {
"UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;");
}
- waitForSinkSize("sink", 2);
+ waitForSinkSize("sink", false, 2);
List expected =
Arrays.asList(
@@ -619,24 +620,6 @@ public void testTiDBServerTimezone(String timezone) throws Exception {
result.getJobClient().get().cancel().get();
}
- private static void waitForSinkSize(String sinkName, int expectedSize)
- throws InterruptedException {
- while (sinkSize(sinkName) < expectedSize) {
- Thread.sleep(100);
- }
- }
-
- private static int sinkSize(String sinkName) {
- synchronized (TestValuesTableFactory.class) {
- try {
- return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size();
- } catch (IllegalArgumentException e) {
- // job is not started yet
- return 0;
- }
- }
- }
-
public static void assertEqualsInAnyOrder(List expected, List actual) {
assertTrue(expected != null && actual != null);
assertEqualsInOrder(
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBConnectorRegionITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBConnectorRegionITCase.java
index 47c7ac366a2..7169492c0da 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBConnectorRegionITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBConnectorRegionITCase.java
@@ -35,6 +35,8 @@
import java.sql.ResultSet;
import java.sql.Statement;
+import static org.apache.flink.cdc.common.testutils.TestCaseUtils.waitForSinkSize;
+
/** Integration tests for TiDB change stream event SQL source. */
public class TiDBConnectorRegionITCase extends TiDBTestBase {
@@ -87,7 +89,7 @@ public void testRegionChange() throws Exception {
// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM tidb_source");
- waitForSinkSize("sink", 1);
+ waitForSinkSize("sink", false, 1);
int count = 0;
@@ -125,25 +127,7 @@ public void testRegionChange() throws Exception {
LOG.info("count: {}", count);
}
- waitForSinkSize("sink", count);
+ waitForSinkSize("sink", false, count);
result.getJobClient().get().cancel().get();
}
-
- private static void waitForSinkSize(String sinkName, int expectedSize)
- throws InterruptedException {
- while (sinkSize(sinkName) < expectedSize) {
- Thread.sleep(100);
- }
- }
-
- private static int sinkSize(String sinkName) {
- synchronized (TestValuesTableFactory.class) {
- try {
- return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size();
- } catch (IllegalArgumentException e) {
- // job is not started yet
- return 0;
- }
- }
- }
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-vitess-cdc/pom.xml b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-vitess-cdc/pom.xml
index 39354246ed8..0c9667850c2 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-vitess-cdc/pom.xml
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-vitess-cdc/pom.xml
@@ -83,6 +83,14 @@ limitations under the License.
+
+ org.apache.flink
+ flink-cdc-common
+ ${project.version}
+ test-jar
+ test
+
+
org.apache.flink
flink-table-planner_${scala.binary.version}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-vitess-cdc/src/test/java/org/apache/flink/cdc/connectors/vitess/table/VitessConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-vitess-cdc/src/test/java/org/apache/flink/cdc/connectors/vitess/table/VitessConnectorITCase.java
index c660fa67e84..eb1798239ab 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-vitess-cdc/src/test/java/org/apache/flink/cdc/connectors/vitess/table/VitessConnectorITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-vitess-cdc/src/test/java/org/apache/flink/cdc/connectors/vitess/table/VitessConnectorITCase.java
@@ -24,7 +24,6 @@
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
import org.junit.Before;
import org.junit.Test;
@@ -32,13 +31,14 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
+import static org.apache.flink.cdc.common.testutils.TestCaseUtils.fetchAndConvert;
+import static org.apache.flink.cdc.common.testutils.TestCaseUtils.waitForSinkSize;
+import static org.apache.flink.cdc.common.testutils.TestCaseUtils.waitForSnapshotStarted;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -129,7 +129,7 @@ public void testConsumingAllEvents()
statement.execute("DELETE FROM test.products WHERE id=111;");
}
- waitForSinkSize("sink", 20);
+ waitForSinkSize("sink", false, 20);
List expected =
Arrays.asList(
@@ -207,49 +207,15 @@ public void testAllTypes() throws Throwable {
"-U[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Hello World, abc, 123.102, 404.4443, 123.4567, 346, true]",
"+U[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Bye World, abc, 123.102, 404.4443, 123.4567, 346, true]");
- List actual = fetchRows(result.collect(), expected.size());
+ List actual = fetchAndConvert(result.collect(), expected.size(), Row::toString);
assertEquals(expected, actual);
result.getJobClient().get().cancel().get();
}
- private static List fetchRows(Iterator iter, int size) {
- List rows = new ArrayList<>(size);
- while (size > 0 && iter.hasNext()) {
- Row row = iter.next();
- rows.add(row.toString());
- size--;
- }
- return rows;
- }
-
public static void assertEqualsInAnyOrder(List actual, List expected) {
assertTrue(actual != null && expected != null);
assertEquals(
actual.stream().sorted().collect(Collectors.toList()),
expected.stream().sorted().collect(Collectors.toList()));
}
-
- private static void waitForSnapshotStarted(CloseableIterator iterator) throws Exception {
- while (!iterator.hasNext()) {
- Thread.sleep(100);
- }
- }
-
- private static void waitForSinkSize(String sinkName, int expectedSize)
- throws InterruptedException {
- while (sinkSize(sinkName) < expectedSize) {
- Thread.sleep(100);
- }
- }
-
- private static int sinkSize(String sinkName) {
- synchronized (TestValuesTableFactory.class) {
- try {
- return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size();
- } catch (IllegalArgumentException e) {
- // job is not started yet
- return 0;
- }
- }
- }
}
diff --git a/tools/mig-test/datastream/compile_jobs.rb b/tools/mig-test/datastream/compile_jobs.rb
index 5c906e5bf31..2326a454059 100644
--- a/tools/mig-test/datastream/compile_jobs.rb
+++ b/tools/mig-test/datastream/compile_jobs.rb
@@ -20,7 +20,7 @@
JOB_VERSIONS.each do |version|
puts "Compiling DataStream job for CDC #{version}"
- `cd datastream-#{version} && mvn clean package -DskipTests`
+ system "cd datastream-#{version} && mvn clean package -DskipTests"
end
puts 'Done'
\ No newline at end of file
diff --git a/tools/mig-test/datastream/datastream-3.2.0/pom.xml b/tools/mig-test/datastream/datastream-3.2.0/pom.xml
index c1f556033d1..9eb0212f977 100644
--- a/tools/mig-test/datastream/datastream-3.2.0/pom.xml
+++ b/tools/mig-test/datastream/datastream-3.2.0/pom.xml
@@ -29,7 +29,7 @@ limitations under the License.
UTF-8
1.18.1
3.2.0
- 1.9.7.Final
+ 1.9.8.Final
2.12
2.0.13
UTF-8
@@ -136,20 +136,96 @@ limitations under the License.
- maven-assembly-plugin
+ org.apache.maven.plugins
+ maven-shade-plugin
+ shade-flink
package
- single
+ shade
+
+
+ false
+ false
+ true
+
+ ${project.basedir}/target/dependency-reduced-pom.xml
+
+
+
+ *:*
+
+ module-info.class
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+ io.debezium:debezium-api
+ io.debezium:debezium-embedded
+ io.debezium:debezium-core
+ io.debezium:debezium-ddl-parser
+ io.debezium:debezium-connector-mysql
+ org.apache.flink:flink-connector-debezium
+ org.apache.flink:flink-connector-mysql-cdc
+ org.antlr:antlr4-runtime
+ org.apache.kafka:*
+ mysql:mysql-connector-java
+ com.zendesk:mysql-binlog-connector-java
+ com.fasterxml.*:*
+ com.google.guava:*
+ com.esri.geometry:esri-geometry-api
+ com.zaxxer:HikariCP
+
+ org.apache.flink:flink-shaded-guava
+
+
+
+
+ org.apache.kafka
+
+ org.apache.flink.cdc.connectors.shaded.org.apache.kafka
+
+
+
+ org.antlr
+
+ org.apache.flink.cdc.connectors.shaded.org.antlr
+
+
+
+ com.fasterxml
+
+ org.apache.flink.cdc.connectors.shaded.com.fasterxml
+
+
+
+ com.google
+
+ org.apache.flink.cdc.connectors.shaded.com.google
+
+
+
+ com.esri.geometry
+ org.apache.flink.cdc.connectors.shaded.com.esri.geometry
+
+
+ com.zaxxer
+
+ org.apache.flink.cdc.connectors.shaded.com.zaxxer
+
+
+
+
-
-
- jar-with-dependencies
-
-
diff --git a/tools/mig-test/datastream/datastream-3.2.0/src/main/java/DataStreamJob.java b/tools/mig-test/datastream/datastream-3.2.0/src/main/java/DataStreamJob.java
index f821ac0a2de..bfaa2d529bf 100644
--- a/tools/mig-test/datastream/datastream-3.2.0/src/main/java/DataStreamJob.java
+++ b/tools/mig-test/datastream/datastream-3.2.0/src/main/java/DataStreamJob.java
@@ -23,7 +23,7 @@
public class DataStreamJob {
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
MySqlSource mySqlSource = MySqlSource.builder()
.hostname("localhost")
.port(3306)
@@ -45,10 +45,6 @@ public static void main(String[] args) {
.print()
.setParallelism(1);
- try {
- env.execute();
- } catch (Exception e) {
- // ... unfortunately
- }
+ env.execute();
}
}
diff --git a/tools/mig-test/datastream/datastream-3.2.1/pom.xml b/tools/mig-test/datastream/datastream-3.2.1/pom.xml
index c7d680a2f3b..b3e2117720b 100644
--- a/tools/mig-test/datastream/datastream-3.2.1/pom.xml
+++ b/tools/mig-test/datastream/datastream-3.2.1/pom.xml
@@ -136,20 +136,96 @@ limitations under the License.
- maven-assembly-plugin
+ org.apache.maven.plugins
+ maven-shade-plugin
+ shade-flink
package
- single
+ shade
+
+
+ false
+ false
+ true
+
+ ${project.basedir}/target/dependency-reduced-pom.xml
+
+
+
+ *:*
+
+ module-info.class
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+ io.debezium:debezium-api
+ io.debezium:debezium-embedded
+ io.debezium:debezium-core
+ io.debezium:debezium-ddl-parser
+ io.debezium:debezium-connector-mysql
+ org.apache.flink:flink-connector-debezium
+ org.apache.flink:flink-connector-mysql-cdc
+ org.antlr:antlr4-runtime
+ org.apache.kafka:*
+ mysql:mysql-connector-java
+ com.zendesk:mysql-binlog-connector-java
+ com.fasterxml.*:*
+ com.google.guava:*
+ com.esri.geometry:esri-geometry-api
+ com.zaxxer:HikariCP
+
+ org.apache.flink:flink-shaded-guava
+
+
+
+
+ org.apache.kafka
+
+ org.apache.flink.cdc.connectors.shaded.org.apache.kafka
+
+
+
+ org.antlr
+
+ org.apache.flink.cdc.connectors.shaded.org.antlr
+
+
+
+ com.fasterxml
+
+ org.apache.flink.cdc.connectors.shaded.com.fasterxml
+
+
+
+ com.google
+
+ org.apache.flink.cdc.connectors.shaded.com.google
+
+
+
+ com.esri.geometry
+ org.apache.flink.cdc.connectors.shaded.com.esri.geometry
+
+
+ com.zaxxer
+
+ org.apache.flink.cdc.connectors.shaded.com.zaxxer
+
+
+
+
-
-
- jar-with-dependencies
-
-
diff --git a/tools/mig-test/datastream/datastream-3.2.1/src/main/java/DataStreamJob.java b/tools/mig-test/datastream/datastream-3.2.1/src/main/java/DataStreamJob.java
index f821ac0a2de..bfaa2d529bf 100644
--- a/tools/mig-test/datastream/datastream-3.2.1/src/main/java/DataStreamJob.java
+++ b/tools/mig-test/datastream/datastream-3.2.1/src/main/java/DataStreamJob.java
@@ -23,7 +23,7 @@
public class DataStreamJob {
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
MySqlSource mySqlSource = MySqlSource.builder()
.hostname("localhost")
.port(3306)
@@ -45,10 +45,6 @@ public static void main(String[] args) {
.print()
.setParallelism(1);
- try {
- env.execute();
- } catch (Exception e) {
- // ... unfortunately
- }
+ env.execute();
}
}
diff --git a/tools/mig-test/datastream/datastream-3.3.0/pom.xml b/tools/mig-test/datastream/datastream-3.3.0/pom.xml
index cc65c6c7869..e3ba1b846d9 100644
--- a/tools/mig-test/datastream/datastream-3.3.0/pom.xml
+++ b/tools/mig-test/datastream/datastream-3.3.0/pom.xml
@@ -29,7 +29,7 @@ limitations under the License.
UTF-8
1.19.1
3.3.0
- 1.9.7.Final
+ 1.9.8.Final
2.12
2.0.13
UTF-8
@@ -136,20 +136,96 @@ limitations under the License.
- maven-assembly-plugin
+ org.apache.maven.plugins
+ maven-shade-plugin
+ shade-flink
package
- single
+ shade
+
+
+ false
+ false
+ true
+
+ ${project.basedir}/target/dependency-reduced-pom.xml
+
+
+
+ *:*
+
+ module-info.class
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+ io.debezium:debezium-api
+ io.debezium:debezium-embedded
+ io.debezium:debezium-core
+ io.debezium:debezium-ddl-parser
+ io.debezium:debezium-connector-mysql
+ org.apache.flink:flink-connector-debezium
+ org.apache.flink:flink-connector-mysql-cdc
+ org.antlr:antlr4-runtime
+ org.apache.kafka:*
+ mysql:mysql-connector-java
+ com.zendesk:mysql-binlog-connector-java
+ com.fasterxml.*:*
+ com.google.guava:*
+ com.esri.geometry:esri-geometry-api
+ com.zaxxer:HikariCP
+
+ org.apache.flink:flink-shaded-guava
+
+
+
+
+ org.apache.kafka
+
+ org.apache.flink.cdc.connectors.shaded.org.apache.kafka
+
+
+
+ org.antlr
+
+ org.apache.flink.cdc.connectors.shaded.org.antlr
+
+
+
+ com.fasterxml
+
+ org.apache.flink.cdc.connectors.shaded.com.fasterxml
+
+
+
+ com.google
+
+ org.apache.flink.cdc.connectors.shaded.com.google
+
+
+
+ com.esri.geometry
+ org.apache.flink.cdc.connectors.shaded.com.esri.geometry
+
+
+ com.zaxxer
+
+ org.apache.flink.cdc.connectors.shaded.com.zaxxer
+
+
+
+
-
-
- jar-with-dependencies
-
-
diff --git a/tools/mig-test/datastream/datastream-3.3.0/src/main/java/DataStreamJob.java b/tools/mig-test/datastream/datastream-3.3.0/src/main/java/DataStreamJob.java
index f821ac0a2de..bfaa2d529bf 100644
--- a/tools/mig-test/datastream/datastream-3.3.0/src/main/java/DataStreamJob.java
+++ b/tools/mig-test/datastream/datastream-3.3.0/src/main/java/DataStreamJob.java
@@ -23,7 +23,7 @@
public class DataStreamJob {
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
MySqlSource mySqlSource = MySqlSource.builder()
.hostname("localhost")
.port(3306)
@@ -45,10 +45,6 @@ public static void main(String[] args) {
.print()
.setParallelism(1);
- try {
- env.execute();
- } catch (Exception e) {
- // ... unfortunately
- }
+ env.execute();
}
}
diff --git a/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/pom.xml b/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/pom.xml
index e174d5583cf..f83f6804295 100644
--- a/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/pom.xml
+++ b/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/pom.xml
@@ -29,7 +29,7 @@ limitations under the License.
UTF-8
1.19.1
3.4-SNAPSHOT
- 1.9.7.Final
+ 1.9.8.Final
2.12
2.0.13
UTF-8
@@ -136,20 +136,96 @@ limitations under the License.
- maven-assembly-plugin
+ org.apache.maven.plugins
+ maven-shade-plugin
+ shade-flink
package
- single
+ shade
+
+
+ false
+ false
+ true
+
+ ${project.basedir}/target/dependency-reduced-pom.xml
+
+
+
+ *:*
+
+ module-info.class
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+ io.debezium:debezium-api
+ io.debezium:debezium-embedded
+ io.debezium:debezium-core
+ io.debezium:debezium-ddl-parser
+ io.debezium:debezium-connector-mysql
+ org.apache.flink:flink-connector-debezium
+ org.apache.flink:flink-connector-mysql-cdc
+ org.antlr:antlr4-runtime
+ org.apache.kafka:*
+ mysql:mysql-connector-java
+ com.zendesk:mysql-binlog-connector-java
+ com.fasterxml.*:*
+ com.google.guava:*
+ com.esri.geometry:esri-geometry-api
+ com.zaxxer:HikariCP
+
+ org.apache.flink:flink-shaded-guava
+
+
+
+
+ org.apache.kafka
+
+ org.apache.flink.cdc.connectors.shaded.org.apache.kafka
+
+
+
+ org.antlr
+
+ org.apache.flink.cdc.connectors.shaded.org.antlr
+
+
+
+ com.fasterxml
+
+ org.apache.flink.cdc.connectors.shaded.com.fasterxml
+
+
+
+ com.google
+
+ org.apache.flink.cdc.connectors.shaded.com.google
+
+
+
+ com.esri.geometry
+ org.apache.flink.cdc.connectors.shaded.com.esri.geometry
+
+
+ com.zaxxer
+
+ org.apache.flink.cdc.connectors.shaded.com.zaxxer
+
+
+
+
-
-
- jar-with-dependencies
-
-
diff --git a/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/src/main/java/DataStreamJob.java b/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/src/main/java/DataStreamJob.java
index f821ac0a2de..bfaa2d529bf 100644
--- a/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/src/main/java/DataStreamJob.java
+++ b/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/src/main/java/DataStreamJob.java
@@ -23,7 +23,7 @@
public class DataStreamJob {
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
MySqlSource mySqlSource = MySqlSource.builder()
.hostname("localhost")
.port(3306)
@@ -45,10 +45,6 @@ public static void main(String[] args) {
.print()
.setParallelism(1);
- try {
- env.execute();
- } catch (Exception e) {
- // ... unfortunately
- }
+ env.execute();
}
}
diff --git a/tools/mig-test/datastream/run_migration_test.rb b/tools/mig-test/datastream/run_migration_test.rb
index deb16b0f074..9b355c1c3dd 100644
--- a/tools/mig-test/datastream/run_migration_test.rb
+++ b/tools/mig-test/datastream/run_migration_test.rb
@@ -32,12 +32,18 @@ def exec_sql_source(sql)
`mysql -h 127.0.0.1 -P#{SOURCE_PORT} -uroot --skip-password -e "USE #{DATABASE_NAME}; #{sql}"`
end
+def extract_job_id(output)
+ current_job_id = output.split("\n").filter { _1.start_with?('Job has been submitted with JobID ') }.first&.split&.last
+ raise StandardError, "Failed to submit Flink job. Output: #{output}" unless current_job_id&.length == 32
+ current_job_id
+end
+
def put_mystery_data(mystery)
exec_sql_source("REPLACE INTO girl(id, name) VALUES (17, '#{mystery}');")
end
def ensure_mystery_data(mystery)
- throw StandardError, 'Failed to get specific mystery string' unless `cat #{FLINK_HOME}/log/*.out`.include? mystery
+ raise StandardError, 'Failed to get specific mystery string' unless `cat #{FLINK_HOME}/log/*.out`.include? mystery
end
puts ' Waiting for source to start up...'
@@ -52,8 +58,8 @@ def test_migration_chore(from_version, to_version)
# Clear previous savepoints and logs
`rm -rf savepoints`
- old_job_id = `#{FLINK_HOME}/bin/flink run -p 1 -c DataStreamJob --detached datastream-#{from_version}/target/datastream-job-#{from_version}-jar-with-dependencies.jar`.split.last
- raise StandardError, 'Failed to submit Flink job' unless old_job_id.length == 32
+ old_output = `#{FLINK_HOME}/bin/flink run -p 1 -c DataStreamJob --detached datastream-#{from_version}/target/datastream-job-#{from_version}.jar`
+ old_job_id = extract_job_id(old_output)
puts "Submitted job at #{from_version} as #{old_job_id}"
@@ -64,8 +70,8 @@ def test_migration_chore(from_version, to_version)
puts `#{FLINK_HOME}/bin/flink stop --savepointPath #{Dir.pwd}/savepoints #{old_job_id}`
savepoint_file = `ls savepoints`.split("\n").last
- new_job_id = `#{FLINK_HOME}/bin/flink run --fromSavepoint #{Dir.pwd}/savepoints/#{savepoint_file} -p 1 -c DataStreamJob --detached datastream-#{to_version}/target/datastream-job-#{to_version}-jar-with-dependencies.jar`.split.last
- raise StandardError, 'Failed to submit Flink job' unless new_job_id.length == 32
+ new_output = `#{FLINK_HOME}/bin/flink run --fromSavepoint #{Dir.pwd}/savepoints/#{savepoint_file} -p 1 -c DataStreamJob --detached datastream-#{to_version}/target/datastream-job-#{to_version}.jar`
+ new_job_id = extract_job_id(new_output)
puts "Submitted job at #{to_version} as #{new_job_id}"
random_string_2 = SecureRandom.hex(8)
diff --git a/tools/mig-test/run_migration_test.rb b/tools/mig-test/run_migration_test.rb
index bb111252cbf..4ee9e20d897 100644
--- a/tools/mig-test/run_migration_test.rb
+++ b/tools/mig-test/run_migration_test.rb
@@ -37,7 +37,7 @@ def put_mystery_data(mystery)
end
def ensure_mystery_data(mystery)
- throw StandardError, 'Failed to get specific mystery string' unless `cat #{FLINK_HOME}/log/*.out`.include? mystery
+ raise StandardError, 'Failed to get specific mystery string' unless `cat #{FLINK_HOME}/log/*.out`.include? mystery
end
def extract_job_id(output)