From 9b4b8cd36dcf5da2f21a7a46c793d388b51ba50d Mon Sep 17 00:00:00 2001 From: zhangdonghao Date: Tue, 22 Oct 2024 11:11:36 +0800 Subject: [PATCH] [Improve][E2E]modify the method of obtaining JobId [Improve][E2E]modify the method of obtaining JobId --- .../seatunnel/cdc/mysql/MysqlCDCIT.java | 10 ++++++---- .../cdc/postgres/OpengaussCDCIT.java | 18 +++++++++-------- .../seatunnel/cdc/oracle/OracleCDCIT.java | 10 ++++++---- .../seatunnel/cdc/postgres/PostgresCDCIT.java | 20 +++++++++++-------- .../e2e/connector/tidb/TiDBCDCIT.java | 8 ++++---- .../e2e/connector/paimon/PaimonSinkCDCIT.java | 11 ++++++---- .../e2e/common/util/JobIdGenerator.java | 4 ++-- .../engine/e2e/CheckpointEnableIT.java | 16 +++++++-------- 8 files changed, 55 insertions(+), 42 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java index bca1052b3d7..bf7e8d8fe7c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java @@ -328,12 +328,13 @@ public void testMultiTableWithRestore(TestContainer container) clearTable(MYSQL_DATABASE2, SOURCE_TABLE_1); clearTable(MYSQL_DATABASE2, SOURCE_TABLE_2); - String jobId = JobIdGenerator.newJobId(); + Long jobId = JobIdGenerator.newJobId(); CompletableFuture.supplyAsync( () -> { try { return container.executeJob( - "/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf", jobId); + "/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf", + String.valueOf(jobId)); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); @@ -365,14 +366,15 @@ public void testMultiTableWithRestore(TestContainer container) .pollInterval(1000, TimeUnit.MILLISECONDS) .until(() -> getConnectionStatus("st_user_sink").size() == 1); - Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); + Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode()); // Restore job with add a new table CompletableFuture.supplyAsync( () -> { try { container.restoreJob( - "/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf", jobId); + "/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf", + String.valueOf(jobId)); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java index 1094d140e53..ed3fdd74b40 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java @@ -289,14 +289,14 @@ public void testOpengaussCdcMultiTableE2e(TestContainer container) { disabledReason = "Currently SPARK and FLINK do not support restore") public void testMultiTableWithRestore(TestContainer container) throws IOException, InterruptedException { - String jobId = JobIdGenerator.newJobId(); + Long jobId = JobIdGenerator.newJobId(); try { CompletableFuture.supplyAsync( () -> { try { return container.executeJob( "/opengausscdc_to_opengauss_with_multi_table_mode_one_table.conf", - jobId); + String.valueOf(jobId)); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); @@ -322,7 +322,7 @@ public void testMultiTableWithRestore(TestContainer container) OPENGAUSS_SCHEMA, SINK_TABLE_1))))); - Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); + Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode()); // Restore job with add a new table CompletableFuture.supplyAsync( @@ -330,7 +330,7 @@ public void testMultiTableWithRestore(TestContainer container) try { container.restoreJob( "/opengausscdc_to_opengauss_with_multi_table_mode_two_table.conf", - jobId); + String.valueOf(jobId)); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); @@ -388,13 +388,14 @@ public void testMultiTableWithRestore(TestContainer container) disabledReason = "Currently SPARK and FLINK do not support restore") public void testAddFiledWithRestore(TestContainer container) throws IOException, InterruptedException { - String jobId = JobIdGenerator.newJobId(); + Long jobId = JobIdGenerator.newJobId(); try { CompletableFuture.supplyAsync( () -> { try { return container.executeJob( - "/opengausscdc_to_opengauss_test_add_Filed.conf", jobId); + "/opengausscdc_to_opengauss_test_add_Filed.conf", + String.valueOf(jobId)); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); @@ -417,7 +418,7 @@ public void testAddFiledWithRestore(TestContainer container) OPENGAUSS_SCHEMA, SINK_TABLE_3))))); - Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); + Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode()); // add filed add insert source table data addFieldsForTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_3); @@ -429,7 +430,8 @@ public void testAddFiledWithRestore(TestContainer container) () -> { try { container.restoreJob( - "/opengausscdc_to_opengauss_test_add_Filed.conf", jobId); + "/opengausscdc_to_opengauss_test_add_Filed.conf", + String.valueOf(jobId)); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java index 77e9e4c2e77..86282b7ff03 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java @@ -391,12 +391,13 @@ public void testMultiTableWithRestore(TestContainer container) insertSourceTable(DATABASE, SOURCE_TABLE1); insertSourceTable(DATABASE, SOURCE_TABLE2); - String jobId = JobIdGenerator.newJobId(); + Long jobId = JobIdGenerator.newJobId(); CompletableFuture.supplyAsync( () -> { try { return container.executeJob( - "/oraclecdc_to_oracle_with_multi_table_mode_one_table.conf", jobId); + "/oraclecdc_to_oracle_with_multi_table_mode_one_table.conf", + String.valueOf(jobId)); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); @@ -434,14 +435,15 @@ public void testMultiTableWithRestore(TestContainer container) getSourceQuerySQL( DATABASE, SINK_TABLE1))))); - Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); + Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode()); // Restore job with add a new table CompletableFuture.supplyAsync( () -> { try { container.restoreJob( - "/oraclecdc_to_oracle_with_multi_table_mode_two_table.conf", jobId); + "/oraclecdc_to_oracle_with_multi_table_mode_two_table.conf", + String.valueOf(jobId)); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java index ffb2742cd29..3abca057fb2 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java @@ -275,13 +275,14 @@ public void testPostgresCdcMultiTableE2e(TestContainer container) { disabledReason = "Currently SPARK and FLINK do not support restore") public void testMultiTableWithRestore(TestContainer container) throws IOException, InterruptedException { - String jobId = JobIdGenerator.newJobId(); + Long jobId = JobIdGenerator.newJobId(); try { CompletableFuture.supplyAsync( () -> { try { return container.executeJob( - "/pgcdc_to_pg_with_multi_table_mode_one_table.conf", jobId); + "/pgcdc_to_pg_with_multi_table_mode_one_table.conf", + String.valueOf(jobId)); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); @@ -307,14 +308,15 @@ public void testMultiTableWithRestore(TestContainer container) POSTGRESQL_SCHEMA, SINK_TABLE_1))))); - Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); + Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode()); // Restore job with add a new table CompletableFuture.supplyAsync( () -> { try { container.restoreJob( - "/pgcdc_to_pg_with_multi_table_mode_two_table.conf", jobId); + "/pgcdc_to_pg_with_multi_table_mode_two_table.conf", + String.valueOf(jobId)); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); @@ -372,13 +374,14 @@ public void testMultiTableWithRestore(TestContainer container) disabledReason = "Currently SPARK and FLINK do not support restore") public void testAddFiledWithRestore(TestContainer container) throws IOException, InterruptedException { - String jobId = JobIdGenerator.newJobId(); + Long jobId = JobIdGenerator.newJobId(); try { CompletableFuture.supplyAsync( () -> { try { return container.executeJob( - "/postgrescdc_to_postgres_test_add_Filed.conf", jobId); + "/postgrescdc_to_postgres_test_add_Filed.conf", + String.valueOf(jobId)); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); @@ -401,7 +404,7 @@ public void testAddFiledWithRestore(TestContainer container) POSTGRESQL_SCHEMA, SINK_TABLE_3))))); - Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); + Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode()); // add filed add insert source table data addFieldsForTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_3); @@ -413,7 +416,8 @@ public void testAddFiledWithRestore(TestContainer container) () -> { try { container.restoreJob( - "/postgrescdc_to_postgres_test_add_Filed.conf", jobId); + "/postgrescdc_to_postgres_test_add_Filed.conf", + String.valueOf(jobId)); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java index 02b34412077..78c7077940c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java @@ -166,11 +166,11 @@ public void testMultiTableWithRestore(TestContainer container) // Clear related content to ensure that multiple operations are not affected clearTable(TIDB_DATABASE, SOURCE_TABLE); clearTable(TIDB_DATABASE, SINK_TABLE); - String jobId = JobIdGenerator.newJobId(); + Long jobId = JobIdGenerator.newJobId(); CompletableFuture.supplyAsync( () -> { try { - container.executeJob("/tidb/tidbcdc_to_tidb.conf", jobId); + container.executeJob("/tidb/tidbcdc_to_tidb.conf", String.valueOf(jobId)); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); @@ -191,13 +191,13 @@ public void testMultiTableWithRestore(TestContainer container) query(getSinkQuerySQL(TIDB_DATABASE, SINK_TABLE)))); }); - Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); + Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode()); // Restore job CompletableFuture.supplyAsync( () -> { try { - container.restoreJob("/tidb/tidbcdc_to_tidb.conf", jobId); + container.restoreJob("/tidb/tidbcdc_to_tidb.conf", String.valueOf(jobId)); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java index 293cf6c76e5..05d64679317 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java @@ -559,7 +559,9 @@ public void testChangelogLookup(TestContainer container) throws Exception { TimeUnit.SECONDS.sleep(20); String[] jobIds = new String[] { - JobIdGenerator.newJobId(), JobIdGenerator.newJobId(), JobIdGenerator.newJobId() + String.valueOf(JobIdGenerator.newJobId()), + String.valueOf(JobIdGenerator.newJobId()), + String.valueOf(JobIdGenerator.newJobId()) }; log.info("jobIds: {}", Arrays.toString(jobIds)); List> futures = new ArrayList<>(); @@ -641,14 +643,15 @@ public void testChangelogLookup(TestContainer container) throws Exception { @TestTemplate public void testChangelogFullCompaction(TestContainer container) throws Exception { - String jobId = JobIdGenerator.newJobId(); + Long jobId = JobIdGenerator.newJobId(); log.info("jobId: {}", jobId); CompletableFuture voidCompletableFuture = CompletableFuture.runAsync( () -> { try { container.executeJob( - "/changelog_fake_cdc_sink_paimon_case2.conf", jobId); + "/changelog_fake_cdc_sink_paimon_case2.conf", + String.valueOf(jobId)); } catch (Exception e) { throw new SeaTunnelException(e); } @@ -657,7 +660,7 @@ public void testChangelogFullCompaction(TestContainer container) throws Exceptio TimeUnit.SECONDS.sleep(20); changeLogEnabled = true; // cancel stream job - container.cancelJob(jobId); + container.cancelJob(String.valueOf(jobId)); TimeUnit.SECONDS.sleep(5); // copy paimon to local container.executeExtraCommands(containerExtendedFactory); diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JobIdGenerator.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JobIdGenerator.java index 6904593b242..08fe26893ff 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JobIdGenerator.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JobIdGenerator.java @@ -21,7 +21,7 @@ public class JobIdGenerator { - public static String newJobId() { - return String.valueOf(Math.abs(ThreadLocalRandom.current().nextLong())); + public static Long newJobId() { + return Math.abs(ThreadLocalRandom.current().nextLong()); } } diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java index 35ca8d9413c..f4150829414 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java @@ -97,14 +97,14 @@ public void testZetaBatchCheckpointEnable(TestContainer container) public void testZetaStreamingCheckpointInterval(TestContainer container) throws IOException, InterruptedException, ExecutionException { // start job - String jobId = JobIdGenerator.newJobId(); + Long jobId = JobIdGenerator.newJobId(); CompletableFuture startFuture = CompletableFuture.supplyAsync( () -> { try { return container.executeJob( "/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf", - jobId); + String.valueOf(jobId)); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); @@ -114,7 +114,7 @@ public void testZetaStreamingCheckpointInterval(TestContainer container) // wait obtain job id Thread.sleep(15000); Assertions.assertTrue(container.getServerLogs().contains("checkpoint is enabled")); - Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); + Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode()); Assertions.assertEquals(0, startFuture.get().getExitCode()); // restore job CompletableFuture.supplyAsync( @@ -122,7 +122,7 @@ public void testZetaStreamingCheckpointInterval(TestContainer container) try { return container.restoreJob( "/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf", - jobId); + String.valueOf(jobId)); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); @@ -152,13 +152,13 @@ public void testZetaStreamingCheckpointInterval(TestContainer container) public void testZetaStreamingCheckpointNoInterval(TestContainer container) throws IOException, InterruptedException { // start job - String jobId = JobIdGenerator.newJobId(); + Long jobId = JobIdGenerator.newJobId(); CompletableFuture.supplyAsync( () -> { try { return container.executeJob( "/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf", - jobId); + String.valueOf(jobId)); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); @@ -167,7 +167,7 @@ public void testZetaStreamingCheckpointNoInterval(TestContainer container) Thread.sleep(15000); Assertions.assertTrue(container.getServerLogs().contains("checkpoint is enabled")); - Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); + Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode()); // restore job CompletableFuture.supplyAsync( @@ -176,7 +176,7 @@ public void testZetaStreamingCheckpointNoInterval(TestContainer container) return container .restoreJob( "/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf", - jobId) + String.valueOf(jobId)) .getExitCode(); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage());