Skip to content

Commit

Permalink
[Improve] [E2E] Change E2E To support ClusterFaultToleranceIT (#3976)
Browse files Browse the repository at this point in the history
* [Improve] [E2E] Change E2E To support ClusterFaultToleranceIT
  • Loading branch information
Hisoka-X authored Jan 30, 2023
1 parent 25d61c1 commit e538903
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 32 deletions.
28 changes: 26 additions & 2 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ jobs:
env:
MAVEN_OPTS: -Xmx2048m

engine-and-transform-v2-it:
engine-v2-it:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'true'
runs-on: ${{ matrix.os }}
Expand All @@ -357,7 +357,31 @@ jobs:
- name: run some modules integration test
if: needs.changes.outputs.api == 'true'
run: |
./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"checkstyle.skip"=true -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :seatunnel-transforms-v2-e2e,:connector-seatunnel-e2e-base -am -Pci
./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"checkstyle.skip"=true -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-seatunnel-e2e-base -am -Pci
env:
MAVEN_OPTS: -Xmx4096m

transform-v2-it:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'true'
runs-on: ${{ matrix.os }}
strategy:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 90
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v3
with:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: run some modules integration test
if: needs.changes.outputs.api == 'true'
run: |
./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"checkstyle.skip"=true -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :seatunnel-transforms-v2-e2e -am -Pci
env:
MAVEN_OPTS: -Xmx4096m

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>imap-storage-file</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair;

Expand All @@ -54,7 +53,6 @@
* Cluster fault tolerance test. Test the job recovery capability and data consistency assurance capability in case of cluster node failure
*/
@Slf4j
@Disabled
public class ClusterFaultToleranceIT {

public static final String DYNAMIC_TEST_CASE_NAME = "dynamic_test_case_name";
Expand Down Expand Up @@ -108,7 +106,6 @@ public void testBatchJobRunOkIn3Node() throws ExecutionException, InterruptedExc
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);

Awaitility.await().atMost(200000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
Thread.sleep(2000);
Expand Down Expand Up @@ -613,24 +610,7 @@ public void testStreamJobRestoreInAllNodeDown() throws ExecutionException, Inter
SeaTunnelClient engineClient = null;

try {
String yaml = "#\n" +
"# Licensed to the Apache Software Foundation (ASF) under one or more\n" +
"# contributor license agreements. See the NOTICE file distributed with\n" +
"# this work for additional information regarding copyright ownership.\n" +
"# The ASF licenses this file to You under the Apache License, Version 2.0\n" +
"# (the \"License\"); you may not use this file except in compliance with\n" +
"# the License. You may obtain a copy of the License at\n" +
"#\n" +
"# http://www.apache.org/licenses/LICENSE-2.0\n" +
"#\n" +
"# Unless required by applicable law or agreed to in writing, software\n" +
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n" +
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" +
"# See the License for the specific language governing permissions and\n" +
"# limitations under the License.\n" +
"#\n" +
"\n" +
"hazelcast:\n" +
String yaml = "hazelcast:\n" +
" cluster-name: seatunnel\n" +
" network:\n" +
" rest-api:\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair;

Expand All @@ -53,7 +52,6 @@
* Cluster fault tolerance test. Test the job which have two pipelines can recovery capability and data consistency assurance capability in case of cluster node failure
*/
@Slf4j
@Disabled
public class ClusterFaultToleranceTwoPipelineIT {

public static final String TEST_TEMPLATE_FILE_NAME = "cluster_batch_fake_to_localfile_two_pipeline_template.conf";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,12 @@ protected void readyToClose(TaskLocation taskLocation) {
}

protected void restoreCoordinator(boolean alreadyStarted) {
LOG.info("received restore CheckpointCoordinator with alreadyStarted= " + alreadyStarted);
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_RESET);
shutdown = false;
if (alreadyStarted) {
tryTriggerPendingCheckpoint();
isAllTaskReady = true;
tryTriggerPendingCheckpoint();
} else {
isAllTaskReady = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public DeployTaskOperation(@NonNull SlotProfile slotProfile, @NonNull Data taskI
public void run() throws Exception {
SeaTunnelServer server = getService();
server.getSlotService().getSlotContext(slotProfile)
.getTaskExecutionService().deployTask(taskImmutableInformation).get();
.getTaskExecutionService().deployTask(taskImmutableInformation);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@
<artifactId>imap-storage-file</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>imap-storage-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>serializer-protobuf</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import org.apache.seatunnel.engine.imap.storage.api.IMapStorageFactory;
import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException;

import com.google.auto.service.AutoService;
import org.apache.hadoop.conf.Configuration;

import java.util.Map;

@AutoService(IMapStorageFactory.class)
public class IMapFileStorageFactory implements IMapStorageFactory {
@Override
public String factoryIdentifier() {
Expand Down

0 comments on commit e538903

Please sign in to comment.