Skip to content

Commit

Permalink
HBASE-26080 Implement a new mini cluster class for end users (#3470)
Browse files Browse the repository at this point in the history
Signed-off-by: Yulin Niu <niuyulin@apache.org>
  • Loading branch information
Apache9 committed Jul 13, 2021
1 parent 3294d32 commit fa8bc25
Show file tree
Hide file tree
Showing 4 changed files with 707 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.testing;

import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;

/**
* A mini hbase cluster used for testing.
* <p/>
* It will also start the necessary zookeeper cluster and dfs cluster. But we will not provide
* methods for controlling the zookeeper cluster and dfs cluster, as end users do not need to test
* the HBase behavior when these systems are broken.
* <p/>
* The implementation is not required to be thread safe, so do not call different methods
* concurrently.
*/
@InterfaceAudience.Public
public interface TestingHBaseCluster {

/**
* Get configuration of this cluster.
* <p/>
* You could use the returned {@link Configuration} to create
* {@link org.apache.hadoop.hbase.client.Connection} for accessing the testing cluster.
*/
Configuration getConf();

/**
* Start a new master with localhost and random port.
*/
void startMaster() throws Exception;

/**
* Start a new master bind on the given host and port.
*/
void startMaster(String hostname, int port) throws Exception;

/**
* Stop the given master.
* <p/>
* Wait on the returned {@link CompletableFuture} to wait on the master quit. The differences
* comparing to {@link org.apache.hadoop.hbase.client.Admin#stopMaster()} is that first, we could
* also stop backup masters here, second, this method does not always fail since we do not use rpc
* to stop the master.
*/
CompletableFuture<Void> stopMaster(ServerName serverName) throws Exception;

/**
* Start a new region server with localhost and random port.
*/
void startRegionServer() throws Exception;

/**
* Start a new region server bind on the given host and port.
*/
void startRegionServer(String hostname, int port) throws Exception;

/**
* Stop the given region server.
* <p/>
* Wait on the returned {@link CompletableFuture} to wait on the master quit. The difference
* comparing to {@link org.apache.hadoop.hbase.client.Admin#stopMaster()} is that this method does
* not always fail since we do not use rpc to stop the region server.
*/
CompletableFuture<Void> stopRegionServer(ServerName serverName) throws Exception;

/**
* Stop the hbase cluster.
* <p/>
* You need to call {@link #start()} first before calling this method, otherwise an
* {@link IllegalStateException} will be thrown. If the hbase is not running because you have
* already stopped the cluster, an {@link IllegalStateException} will be thrown too.
*/
void stopHBaseCluster() throws Exception;

/**
* Start the hbase cluster.
* <p/>
* This is used to start the hbase cluster again after you call {@link #stopHBaseCluster()}. If
* the cluster is already running or you have not called {@link #start()} yet, an
* {@link IllegalStateException} will be thrown.
*/
void startHBaseCluster() throws Exception;

/**
* Return whether the hbase cluster is running.
*/
boolean isHBaseClusterRunning();

/**
* Start the whole mini cluster, including zookeeper cluster, dfs cluster and hbase cluster.
* <p/>
* You can only call this method once at the beginning, unless you have called {@link #stop()} to
* shutdown the cluster completely, and then you can call this method to start the whole cluster
* again. An {@link IllegalStateException} will be thrown if you call this method incorrectly.
*/
void start() throws Exception;

/**
* Return whether the cluster is running.
* <p/>
* Notice that, this only means you have called {@link #start()} and have not called
* {@link #stop()} yet. If you want to make sure the hbase cluster is running, use
* {@link #isHBaseClusterRunning()}.
*/
boolean isClusterRunning();

/**
* Stop the whole mini cluster, including zookeeper cluster, dfs cluster and hbase cluster.
* <p/>
* You can only call this method after calling {@link #start()}, otherwise an
* {@link IllegalStateException} will be thrown.
*/
void stop() throws Exception;

/**
* Create a {@link TestingHBaseCluster}. You need to call {@link #start()} of the returned
* {@link TestingHBaseCluster} to actually start the mini testing cluster.
*/
static TestingHBaseCluster create(TestingHBaseClusterOption option) {
return new TestingHBaseClusterImpl(option);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.testing;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

@InterfaceAudience.Private
class TestingHBaseClusterImpl implements TestingHBaseCluster {

private final HBaseTestingUtility util = new HBaseTestingUtility();

private final StartMiniClusterOption option;

private final ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
.setNameFormat(getClass().getSuperclass() + "-%d").setDaemon(true).build());

private boolean miniClusterRunning = false;

private boolean miniHBaseClusterRunning = false;

TestingHBaseClusterImpl(TestingHBaseClusterOption option) {
this.option = option.convert();
}

@Override
public Configuration getConf() {
return util.getConfiguration();
}

private int getRegionServerIndex(ServerName serverName) {
// we have a small number of region servers, this should be fine for now.
List<RegionServerThread> servers = util.getMiniHBaseCluster().getRegionServerThreads();
for (int i = 0; i < servers.size(); i++) {
if (servers.get(i).getRegionServer().getServerName().equals(serverName)) {
return i;
}
}
return -1;
}

private int getMasterIndex(ServerName serverName) {
List<MasterThread> masters = util.getMiniHBaseCluster().getMasterThreads();
for (int i = 0; i < masters.size(); i++) {
if (masters.get(i).getMaster().getServerName().equals(serverName)) {
return i;
}
}
return -1;
}

private void join(Thread thread, CompletableFuture<?> future) {
executor.execute(() -> {
try {
thread.join();
future.complete(null);
} catch (InterruptedException e) {
future.completeExceptionally(e);
}
});
}

@Override
public CompletableFuture<Void> stopMaster(ServerName serverName) throws Exception {
CompletableFuture<Void> future = new CompletableFuture<>();
int index = getMasterIndex(serverName);
if (index == -1) {
future.completeExceptionally(new IllegalArgumentException("Unknown master " + serverName));
}
join(util.getMiniHBaseCluster().stopMaster(index), future);
return future;
}

@Override
public CompletableFuture<Void> stopRegionServer(ServerName serverName) throws Exception {
CompletableFuture<Void> future = new CompletableFuture<>();
int index = getRegionServerIndex(serverName);
if (index == -1) {
future
.completeExceptionally(new IllegalArgumentException("Unknown region server " + serverName));
}
join(util.getMiniHBaseCluster().stopRegionServer(index), future);
return future;
}

@Override
public void stopHBaseCluster() throws Exception {
Preconditions.checkState(miniClusterRunning, "Cluster has already been stopped");
Preconditions.checkState(miniHBaseClusterRunning, "HBase cluster has already been started");
util.shutdownMiniHBaseCluster();
miniHBaseClusterRunning = false;
}

@Override
public void startHBaseCluster() throws Exception {
Preconditions.checkState(miniClusterRunning, "Cluster has already been stopped");
Preconditions.checkState(!miniHBaseClusterRunning, "HBase cluster has already been started");
util.startMiniHBaseCluster(option);
miniHBaseClusterRunning = true;
}

@Override
public void start() throws Exception {
Preconditions.checkState(!miniClusterRunning, "Cluster has already been started");
util.startMiniCluster(option);
miniClusterRunning = true;
miniHBaseClusterRunning = true;
}

@Override
public void stop() throws Exception {
Preconditions.checkState(miniClusterRunning, "Cluster has already been stopped");
util.shutdownMiniCluster();
miniClusterRunning = false;
miniHBaseClusterRunning = false;
}

@Override
public boolean isHBaseClusterRunning() {
return miniHBaseClusterRunning;
}

@Override
public boolean isClusterRunning() {
return miniClusterRunning;
}

@Override
public void startMaster() throws Exception {
util.getMiniHBaseCluster().startMaster();
}

@Override
public void startMaster(String hostname, int port) throws Exception {
util.getMiniHBaseCluster().startMaster(hostname, port);
}

@Override
public void startRegionServer() throws Exception {
util.getMiniHBaseCluster().startRegionServer();
}

@Override
public void startRegionServer(String hostname, int port) throws Exception {
util.getMiniHBaseCluster().startRegionServer(hostname, port);
}
}
Loading

0 comments on commit fa8bc25

Please sign in to comment.