Skip to content

Commit

Permalink
HBASE-27783 Addendum forward port the test improvement when backporti…
Browse files Browse the repository at this point in the history
…ng to branch-2
  • Loading branch information
Apache9 committed Apr 20, 2023
1 parent 398c5ef commit f5ee958
Showing 1 changed file with 40 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseClassTestRule;
Expand All @@ -40,11 +43,17 @@
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
@Category({ MasterTests.class, LargeTests.class })
public class TestDisablePeerModification {

Expand All @@ -54,9 +63,9 @@ public class TestDisablePeerModification {

private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();

private static CountDownLatch ARRIVE = new CountDownLatch(1);
private static volatile CountDownLatch ARRIVE;

private static CountDownLatch RESUME = new CountDownLatch(1);
private static volatile CountDownLatch RESUME;

public static final class MockPeerStorage extends FSReplicationPeerStorage {

Expand All @@ -77,6 +86,14 @@ public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean ena
}
}

@Parameter
public boolean async;

@Parameters(name = "{index}: async={0}")
public static List<Object[]> params() {
return Arrays.asList(new Object[] { true }, new Object[] { false });
}

@BeforeClass
public static void setUp() throws Exception {
UTIL.getConfiguration().setClass(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
Expand All @@ -89,18 +106,37 @@ public static void tearDown() throws IOException {
UTIL.shutdownMiniCluster();
}

@Before
public void setUpBeforeTest() throws IOException {
UTIL.getAdmin().replicationPeerModificationSwitch(true, true);
}

@Test
public void testDrainProcs() throws Exception {
ARRIVE = new CountDownLatch(1);
RESUME = new CountDownLatch(1);
AsyncAdmin admin = UTIL.getAsyncConnection().getAdmin();
ReplicationPeerConfig rpc =
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test")
.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build();
CompletableFuture<Void> addFuture = admin.addReplicationPeer("test_peer", rpc);
CompletableFuture<Void> addFuture = admin.addReplicationPeer("test_peer_" + async, rpc);
ARRIVE.await();

// we have a pending add peer procedure which has already passed the first state, let's issue a
// peer modification switch request to disable peer modification and set drainProcs to true
CompletableFuture<Boolean> switchFuture = admin.replicationPeerModificationSwitch(false, true);
CompletableFuture<Boolean> switchFuture;
if (async) {
switchFuture = admin.replicationPeerModificationSwitch(false, true);
} else {
switchFuture = new CompletableFuture<>();
ForkJoinPool.commonPool().submit(() -> {
try {
switchFuture.complete(UTIL.getAdmin().replicationPeerModificationSwitch(false, true));
} catch (IOException e) {
switchFuture.completeExceptionally(e);
}
});
}

// sleep a while, the switchFuture should not finish yet
// the sleep is necessary as we can not join on the switchFuture, so there is no stable way to
Expand Down

0 comments on commit f5ee958

Please sign in to comment.