From 5d872d34225157bfbc18a6600c958c93d96636fd Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 6 Dec 2018 21:25:34 +0800 Subject: [PATCH] HBASE-21526 Use AsyncClusterConnection in ServerManager for getRsAdmin --- .../hbase/client/AsyncClusterConnection.java | 6 + .../hbase/client/AsyncConnectionImpl.java | 5 + .../hbase/client/AsyncRegionServerAdmin.java | 210 ++++++++++++++++++ .../apache/hadoop/hbase/util/FutureUtils.java | 2 +- .../apache/hadoop/hbase/master/HMaster.java | 22 +- .../hadoop/hbase/master/ServerManager.java | 67 ------ .../procedure/RSProcedureDispatcher.java | 19 +- 7 files changed, 244 insertions(+), 87 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java index c7dea25b6701..1327fd779b8f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.yetus.audience.InterfaceAudience; @@ -26,6 +27,11 @@ @InterfaceAudience.Private public interface AsyncClusterConnection extends AsyncConnection { + /** + * Get the admin service for the given region server. + */ + AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName); + /** * Get the nonce generator for this connection. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 4a5d0c398d69..62b9d8ba7ac8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -379,4 +379,9 @@ public void clearRegionLocationCache() { Optional getConnectionMetrics() { return metrics; } + + @Override + public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) { + return new AsyncRegionServerAdmin(serverName, this); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java new file mode 100644 index 000000000000..9accd8977ff3 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; + +/** + * A simple wrapper of the {@link AdminService} for a region server, which returns a + * {@link CompletableFuture}. This is easier to use, as if you use the raw protobuf interface, you + * need to get the result from the {@link RpcCallback}, and if there is an exception, you need to + * get it from the {@link RpcController} passed in. + *

+ * Notice that there is no retry, and this is intentional. We have different retry for different + * usage for now, if later we want to unify them, we can move the retry logic into this class. + */ +@InterfaceAudience.Private +public class AsyncRegionServerAdmin { + + private final ServerName server; + + private final AsyncConnectionImpl conn; + + AsyncRegionServerAdmin(ServerName server, AsyncConnectionImpl conn) { + this.server = server; + this.conn = conn; + } + + @FunctionalInterface + private interface RpcCall { + void call(AdminService.Interface stub, HBaseRpcController controller, RpcCallback done); + } + + private CompletableFuture call(RpcCall rpcCall) { + CompletableFuture future = new CompletableFuture<>(); + HBaseRpcController controller = conn.rpcControllerFactory.newController(); + try { + rpcCall.call(conn.getAdminStub(server), controller, new RpcCallback() { + + @Override + public void run(RESP resp) { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + future.complete(resp); + } + } + }); + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + } + + public CompletableFuture getRegionInfo(GetRegionInfoRequest request) { + return call((stub, controller, done) -> stub.getRegionInfo(controller, request, done)); + } + + public CompletableFuture getStoreFile(GetStoreFileRequest request) { + return call((stub, controller, done) -> stub.getStoreFile(controller, request, done)); + } + + public CompletableFuture getOnlineRegion( + GetOnlineRegionRequest request) { + return call((stub, controller, done) -> stub.getOnlineRegion(controller, request, done)); + } + + public CompletableFuture openRegion(OpenRegionRequest request) { + return call((stub, controller, done) -> stub.openRegion(controller, request, done)); + } + + public CompletableFuture warmupRegion(WarmupRegionRequest request) { + return call((stub, controller, done) -> stub.warmupRegion(controller, request, done)); + } + + public CompletableFuture closeRegion(CloseRegionRequest request) { + return call((stub, controller, done) -> stub.closeRegion(controller, request, done)); + } + + public CompletableFuture flushRegion(FlushRegionRequest request) { + return call((stub, controller, done) -> stub.flushRegion(controller, request, done)); + } + + public CompletableFuture compactionSwitch( + CompactionSwitchRequest request) { + return call((stub, controller, done) -> stub.compactionSwitch(controller, request, done)); + } + + public CompletableFuture compactRegion(CompactRegionRequest request) { + return call((stub, controller, done) -> stub.compactRegion(controller, request, done)); + } + + public CompletableFuture replicateWALEntry( + ReplicateWALEntryRequest request) { + return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done)); + } + + public CompletableFuture replay(ReplicateWALEntryRequest request) { + return call((stub, controller, done) -> stub.replay(controller, request, done)); + } + + public CompletableFuture rollWALWriter(RollWALWriterRequest request) { + return call((stub, controller, done) -> stub.rollWALWriter(controller, request, done)); + } + + public CompletableFuture getServerInfo(GetServerInfoRequest request) { + return call((stub, controller, done) -> stub.getServerInfo(controller, request, done)); + } + + public CompletableFuture stopServer(StopServerRequest request) { + return call((stub, controller, done) -> stub.stopServer(controller, request, done)); + } + + public CompletableFuture updateFavoredNodes( + UpdateFavoredNodesRequest request) { + return call((stub, controller, done) -> stub.updateFavoredNodes(controller, request, done)); + } + + public CompletableFuture updateConfiguration( + UpdateConfigurationRequest request) { + return call((stub, controller, done) -> stub.updateConfiguration(controller, request, done)); + } + + public CompletableFuture getRegionLoad(GetRegionLoadRequest request) { + return call((stub, controller, done) -> stub.getRegionLoad(controller, request, done)); + } + + public CompletableFuture clearCompactionQueues( + ClearCompactionQueuesRequest request) { + return call((stub, controller, done) -> stub.clearCompactionQueues(controller, request, done)); + } + + public CompletableFuture clearRegionBlockCache( + ClearRegionBlockCacheRequest request) { + return call((stub, controller, done) -> stub.clearRegionBlockCache(controller, request, done)); + } + + public CompletableFuture getSpaceQuotaSnapshots( + GetSpaceQuotaSnapshotsRequest request) { + return call((stub, controller, done) -> stub.getSpaceQuotaSnapshots(controller, request, done)); + } + + public CompletableFuture executeProcedures( + ExecuteProceduresRequest request) { + return call((stub, controller, done) -> stub.executeProcedures(controller, request, done)); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java index 067f2769af4d..dfd9ead27854 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java @@ -193,4 +193,4 @@ public static CompletableFuture failedFuture(Throwable e) { future.completeExceptionally(e); return future; } -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 8ea7b2b41cc4..58cb7800f5cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -202,6 +202,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.EncryptionTest; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.HBaseFsck; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.HasThread; @@ -234,6 +235,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; @@ -1961,6 +1963,15 @@ protected String getDescription() { }); } + private void warmUpRegion(ServerName server, RegionInfo region) { + FutureUtils.addListener(asyncClusterConnection.getRegionServerAdmin(server) + .warmupRegion(RequestConverter.buildWarmupRegionRequest(region)), (r, e) -> { + if (e != null) { + LOG.warn("Failed to warm up region {} on server {}", region, server, e); + } + }); + } + // Public so can be accessed by tests. Blocks until move is done. // Replace with an async implementation from which you can get // a success/failure result. @@ -2031,11 +2042,12 @@ public void move(final byte[] encodedRegionName, byte[] destServerName) throws H } TransitRegionStateProcedure proc = - this.assignmentManager.createMoveRegionProcedure(rp.getRegionInfo(), rp.getDestination()); - // Warmup the region on the destination before initiating the move. this call - // is synchronous and takes some time. doing it before the source region gets - // closed - serverManager.sendRegionWarmup(rp.getDestination(), hri); + this.assignmentManager.createMoveRegionProcedure(rp.getRegionInfo(), rp.getDestination()); + // Warmup the region on the destination before initiating the move. + // A region server could reject the close request because it either does not + // have the specified region or the region is being split. + warmUpRegion(rp.getDestination(), hri); + LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer"); Future future = ProcedureSyncWait.submitProcedure(this.procedureExecutor, proc); try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 02a27f8dd4c4..2943e56b04a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -24,7 +24,6 @@ import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -51,12 +50,9 @@ import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -159,25 +155,16 @@ public class ServerManager { private final ConcurrentNavigableMap onlineServers = new ConcurrentSkipListMap<>(); - /** - * Map of admin interfaces per registered regionserver; these interfaces we use to control - * regionservers out on the cluster - */ - private final Map rsAdmins = new HashMap<>(); - /** List of region servers that should not get any more new regions. */ private final ArrayList drainingServers = new ArrayList<>(); private final MasterServices master; - private final ClusterConnection connection; private final DeadServer deadservers = new DeadServer(); private final long maxSkew; private final long warningSkew; - private final RpcControllerFactory rpcControllerFactory; - /** Listeners that are called on server events. */ private List listeners = new CopyOnWriteArrayList<>(); @@ -189,8 +176,6 @@ public ServerManager(final MasterServices master) { Configuration c = master.getConfiguration(); maxSkew = c.getLong("hbase.master.maxclockskew", 30000); warningSkew = c.getLong("hbase.master.warningclockskew", 10000); - this.connection = master.getClusterConnection(); - this.rpcControllerFactory = this.connection == null? null: connection.getRpcControllerFactory(); persistFlushedSequenceId = c.getBoolean(PERSIST_FLUSHEDSEQUENCEID, PERSIST_FLUSHEDSEQUENCEID_DEFAULT); } @@ -438,7 +423,6 @@ private ServerName findServerWithSameHostnamePortWithLock( void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) { LOG.info("Registering regionserver=" + serverName); this.onlineServers.put(serverName, sl); - this.rsAdmins.remove(serverName); } @VisibleForTesting @@ -634,7 +618,6 @@ public synchronized void moveFromOnlineToDeadServers(final ServerName sn) { this.onlineServers.remove(sn); onlineServers.notifyAll(); } - this.rsAdmins.remove(sn); } /* @@ -677,34 +660,6 @@ public synchronized boolean addServerToDrainList(final ServerName sn) { return this.drainingServers.add(sn); } - // RPC methods to region servers - - private HBaseRpcController newRpcController() { - return rpcControllerFactory == null ? null : rpcControllerFactory.newController(); - } - - /** - * Sends a WARMUP RPC to the specified server to warmup the specified region. - *

- * A region server could reject the close request because it either does not - * have the specified region or the region is being split. - * @param server server to warmup a region - * @param region region to warmup - */ - public void sendRegionWarmup(ServerName server, - RegionInfo region) { - if (server == null) return; - try { - AdminService.BlockingInterface admin = getRsAdmin(server); - HBaseRpcController controller = newRpcController(); - ProtobufUtil.warmupRegion(controller, admin, region); - } catch (IOException e) { - LOG.error("Received exception in RPC for warmup server:" + - server + "region: " + region + - "exception: " + e); - } - } - /** * Contacts a region server and waits up to timeout ms * to close the region. This bypasses the active hmaster. @@ -737,28 +692,6 @@ public static void closeRegionSilentlyAndWait(ClusterConnection connection, + " timeout " + timeout); } - /** - * @param sn - * @return Admin interface for the remote regionserver named sn - * @throws IOException - * @throws RetriesExhaustedException wrapping a ConnectException if failed - */ - public AdminService.BlockingInterface getRsAdmin(final ServerName sn) - throws IOException { - AdminService.BlockingInterface admin = this.rsAdmins.get(sn); - if (admin == null) { - LOG.debug("New admin connection to " + sn.toString()); - if (sn.equals(master.getServerName()) && master instanceof HRegionServer) { - // A master is also a region server now, see HBASE-10569 for details - admin = ((HRegionServer)master).getRSRpcServices(); - } else { - admin = this.connection.getAdmin(sn); - } - this.rsAdmins.put(sn, admin); - } - return admin; - } - /** * Calculate min necessary to start. This is not an absolute. It is just * a friction that will cause us hang around a bit longer waiting on diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java index b469cb86e631..cb1e12c395e7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.MasterServices; @@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -41,11 +43,9 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; -import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; @@ -219,13 +219,8 @@ public ExecuteProceduresRemoteCall(final ServerName serverName, this.remoteProcedures = remoteProcedures; } - private AdminService.BlockingInterface getRsAdmin() throws IOException { - final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName); - if (admin == null) { - throw new IOException("Attempting to send OPEN RPC to server " + getServerName() + - " failed because no RPC connection found to this server"); - } - return admin; + private AsyncRegionServerAdmin getRsAdmin() throws IOException { + return master.getAsyncClusterConnection().getRegionServerAdmin(serverName); } protected final ServerName getServerName() { @@ -345,11 +340,7 @@ public void dispatchServerOperations(MasterProcedureEnv env, List