Skip to content

Commit

Permalink
[Improve][E2E]modify the method of obtaining JobId
Browse files Browse the repository at this point in the history
[Improve][E2E]modify the method of obtaining JobId
  • Loading branch information
hawk9821 committed Oct 22, 2024
1 parent 330b98b commit 9b4b8cd
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -328,12 +328,13 @@ public void testMultiTableWithRestore(TestContainer container)
clearTable(MYSQL_DATABASE2, SOURCE_TABLE_1);
clearTable(MYSQL_DATABASE2, SOURCE_TABLE_2);

String jobId = JobIdGenerator.newJobId();
Long jobId = JobIdGenerator.newJobId();
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
"/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf", jobId);
"/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf",
String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand Down Expand Up @@ -365,14 +366,15 @@ public void testMultiTableWithRestore(TestContainer container)
.pollInterval(1000, TimeUnit.MILLISECONDS)
.until(() -> getConnectionStatus("st_user_sink").size() == 1);

Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());
Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode());

// Restore job with add a new table
CompletableFuture.supplyAsync(
() -> {
try {
container.restoreJob(
"/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf", jobId);
"/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf",
String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,14 @@ public void testOpengaussCdcMultiTableE2e(TestContainer container) {
disabledReason = "Currently SPARK and FLINK do not support restore")
public void testMultiTableWithRestore(TestContainer container)
throws IOException, InterruptedException {
String jobId = JobIdGenerator.newJobId();
Long jobId = JobIdGenerator.newJobId();
try {
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
"/opengausscdc_to_opengauss_with_multi_table_mode_one_table.conf",
jobId);
String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand All @@ -322,15 +322,15 @@ public void testMultiTableWithRestore(TestContainer container)
OPENGAUSS_SCHEMA,
SINK_TABLE_1)))));

Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());
Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode());

// Restore job with add a new table
CompletableFuture.supplyAsync(
() -> {
try {
container.restoreJob(
"/opengausscdc_to_opengauss_with_multi_table_mode_two_table.conf",
jobId);
String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand Down Expand Up @@ -388,13 +388,14 @@ public void testMultiTableWithRestore(TestContainer container)
disabledReason = "Currently SPARK and FLINK do not support restore")
public void testAddFiledWithRestore(TestContainer container)
throws IOException, InterruptedException {
String jobId = JobIdGenerator.newJobId();
Long jobId = JobIdGenerator.newJobId();
try {
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
"/opengausscdc_to_opengauss_test_add_Filed.conf", jobId);
"/opengausscdc_to_opengauss_test_add_Filed.conf",
String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand All @@ -417,7 +418,7 @@ public void testAddFiledWithRestore(TestContainer container)
OPENGAUSS_SCHEMA,
SINK_TABLE_3)))));

Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());
Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode());

// add filed add insert source table data
addFieldsForTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_3);
Expand All @@ -429,7 +430,8 @@ public void testAddFiledWithRestore(TestContainer container)
() -> {
try {
container.restoreJob(
"/opengausscdc_to_opengauss_test_add_Filed.conf", jobId);
"/opengausscdc_to_opengauss_test_add_Filed.conf",
String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,12 +391,13 @@ public void testMultiTableWithRestore(TestContainer container)
insertSourceTable(DATABASE, SOURCE_TABLE1);
insertSourceTable(DATABASE, SOURCE_TABLE2);

String jobId = JobIdGenerator.newJobId();
Long jobId = JobIdGenerator.newJobId();
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
"/oraclecdc_to_oracle_with_multi_table_mode_one_table.conf", jobId);
"/oraclecdc_to_oracle_with_multi_table_mode_one_table.conf",
String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand Down Expand Up @@ -434,14 +435,15 @@ public void testMultiTableWithRestore(TestContainer container)
getSourceQuerySQL(
DATABASE, SINK_TABLE1)))));

Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());
Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode());

// Restore job with add a new table
CompletableFuture.supplyAsync(
() -> {
try {
container.restoreJob(
"/oraclecdc_to_oracle_with_multi_table_mode_two_table.conf", jobId);
"/oraclecdc_to_oracle_with_multi_table_mode_two_table.conf",
String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,14 @@ public void testPostgresCdcMultiTableE2e(TestContainer container) {
disabledReason = "Currently SPARK and FLINK do not support restore")
public void testMultiTableWithRestore(TestContainer container)
throws IOException, InterruptedException {
String jobId = JobIdGenerator.newJobId();
Long jobId = JobIdGenerator.newJobId();
try {
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
"/pgcdc_to_pg_with_multi_table_mode_one_table.conf", jobId);
"/pgcdc_to_pg_with_multi_table_mode_one_table.conf",
String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand All @@ -307,14 +308,15 @@ public void testMultiTableWithRestore(TestContainer container)
POSTGRESQL_SCHEMA,
SINK_TABLE_1)))));

Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());
Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode());

// Restore job with add a new table
CompletableFuture.supplyAsync(
() -> {
try {
container.restoreJob(
"/pgcdc_to_pg_with_multi_table_mode_two_table.conf", jobId);
"/pgcdc_to_pg_with_multi_table_mode_two_table.conf",
String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand Down Expand Up @@ -372,13 +374,14 @@ public void testMultiTableWithRestore(TestContainer container)
disabledReason = "Currently SPARK and FLINK do not support restore")
public void testAddFiledWithRestore(TestContainer container)
throws IOException, InterruptedException {
String jobId = JobIdGenerator.newJobId();
Long jobId = JobIdGenerator.newJobId();
try {
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
"/postgrescdc_to_postgres_test_add_Filed.conf", jobId);
"/postgrescdc_to_postgres_test_add_Filed.conf",
String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand All @@ -401,7 +404,7 @@ public void testAddFiledWithRestore(TestContainer container)
POSTGRESQL_SCHEMA,
SINK_TABLE_3)))));

Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());
Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode());

// add filed add insert source table data
addFieldsForTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_3);
Expand All @@ -413,7 +416,8 @@ public void testAddFiledWithRestore(TestContainer container)
() -> {
try {
container.restoreJob(
"/postgrescdc_to_postgres_test_add_Filed.conf", jobId);
"/postgrescdc_to_postgres_test_add_Filed.conf",
String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,11 @@ public void testMultiTableWithRestore(TestContainer container)
// Clear related content to ensure that multiple operations are not affected
clearTable(TIDB_DATABASE, SOURCE_TABLE);
clearTable(TIDB_DATABASE, SINK_TABLE);
String jobId = JobIdGenerator.newJobId();
Long jobId = JobIdGenerator.newJobId();
CompletableFuture.supplyAsync(
() -> {
try {
container.executeJob("/tidb/tidbcdc_to_tidb.conf", jobId);
container.executeJob("/tidb/tidbcdc_to_tidb.conf", String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand All @@ -191,13 +191,13 @@ public void testMultiTableWithRestore(TestContainer container)
query(getSinkQuerySQL(TIDB_DATABASE, SINK_TABLE))));
});

Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());
Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode());

// Restore job
CompletableFuture.supplyAsync(
() -> {
try {
container.restoreJob("/tidb/tidbcdc_to_tidb.conf", jobId);
container.restoreJob("/tidb/tidbcdc_to_tidb.conf", String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,9 @@ public void testChangelogLookup(TestContainer container) throws Exception {
TimeUnit.SECONDS.sleep(20);
String[] jobIds =
new String[] {
JobIdGenerator.newJobId(), JobIdGenerator.newJobId(), JobIdGenerator.newJobId()
String.valueOf(JobIdGenerator.newJobId()),
String.valueOf(JobIdGenerator.newJobId()),
String.valueOf(JobIdGenerator.newJobId())
};
log.info("jobIds: {}", Arrays.toString(jobIds));
List<CompletableFuture<Void>> futures = new ArrayList<>();
Expand Down Expand Up @@ -641,14 +643,15 @@ public void testChangelogLookup(TestContainer container) throws Exception {

@TestTemplate
public void testChangelogFullCompaction(TestContainer container) throws Exception {
String jobId = JobIdGenerator.newJobId();
Long jobId = JobIdGenerator.newJobId();
log.info("jobId: {}", jobId);
CompletableFuture<Void> voidCompletableFuture =
CompletableFuture.runAsync(
() -> {
try {
container.executeJob(
"/changelog_fake_cdc_sink_paimon_case2.conf", jobId);
"/changelog_fake_cdc_sink_paimon_case2.conf",
String.valueOf(jobId));
} catch (Exception e) {
throw new SeaTunnelException(e);
}
Expand All @@ -657,7 +660,7 @@ public void testChangelogFullCompaction(TestContainer container) throws Exceptio
TimeUnit.SECONDS.sleep(20);
changeLogEnabled = true;
// cancel stream job
container.cancelJob(jobId);
container.cancelJob(String.valueOf(jobId));
TimeUnit.SECONDS.sleep(5);
// copy paimon to local
container.executeExtraCommands(containerExtendedFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

public class JobIdGenerator {

public static String newJobId() {
return String.valueOf(Math.abs(ThreadLocalRandom.current().nextLong()));
public static Long newJobId() {
return Math.abs(ThreadLocalRandom.current().nextLong());
}
}
Loading

0 comments on commit 9b4b8cd

Please sign in to comment.