Skip to content

Commit

Permalink
HBASE-21526 Use AsyncClusterConnection in ServerManager for getRsAdmin
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Jun 18, 2019
1 parent 2446f00 commit 5d872d3
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;

import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.yetus.audience.InterfaceAudience;

Expand All @@ -26,6 +27,11 @@
@InterfaceAudience.Private
public interface AsyncClusterConnection extends AsyncConnection {

/**
* Get the admin service for the given region server.
*/
AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName);

/**
* Get the nonce generator for this connection.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,4 +379,9 @@ public void clearRegionLocationCache() {
Optional<MetricsConnection> getConnectionMetrics() {
return metrics;
}

@Override
public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
return new AsyncRegionServerAdmin(serverName, this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;

import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;

/**
* A simple wrapper of the {@link AdminService} for a region server, which returns a
* {@link CompletableFuture}. This is easier to use, as if you use the raw protobuf interface, you
* need to get the result from the {@link RpcCallback}, and if there is an exception, you need to
* get it from the {@link RpcController} passed in.
* <p/>
* Notice that there is no retry, and this is intentional. We have different retry for different
* usage for now, if later we want to unify them, we can move the retry logic into this class.
*/
@InterfaceAudience.Private
public class AsyncRegionServerAdmin {

private final ServerName server;

private final AsyncConnectionImpl conn;

AsyncRegionServerAdmin(ServerName server, AsyncConnectionImpl conn) {
this.server = server;
this.conn = conn;
}

@FunctionalInterface
private interface RpcCall<RESP> {
void call(AdminService.Interface stub, HBaseRpcController controller, RpcCallback<RESP> done);
}

private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) {
CompletableFuture<RESP> future = new CompletableFuture<>();
HBaseRpcController controller = conn.rpcControllerFactory.newController();
try {
rpcCall.call(conn.getAdminStub(server), controller, new RpcCallback<RESP>() {

@Override
public void run(RESP resp) {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
future.complete(resp);
}
}
});
} catch (IOException e) {
future.completeExceptionally(e);
}
return future;
}

public CompletableFuture<GetRegionInfoResponse> getRegionInfo(GetRegionInfoRequest request) {
return call((stub, controller, done) -> stub.getRegionInfo(controller, request, done));
}

public CompletableFuture<GetStoreFileResponse> getStoreFile(GetStoreFileRequest request) {
return call((stub, controller, done) -> stub.getStoreFile(controller, request, done));
}

public CompletableFuture<GetOnlineRegionResponse> getOnlineRegion(
GetOnlineRegionRequest request) {
return call((stub, controller, done) -> stub.getOnlineRegion(controller, request, done));
}

public CompletableFuture<OpenRegionResponse> openRegion(OpenRegionRequest request) {
return call((stub, controller, done) -> stub.openRegion(controller, request, done));
}

public CompletableFuture<WarmupRegionResponse> warmupRegion(WarmupRegionRequest request) {
return call((stub, controller, done) -> stub.warmupRegion(controller, request, done));
}

public CompletableFuture<CloseRegionResponse> closeRegion(CloseRegionRequest request) {
return call((stub, controller, done) -> stub.closeRegion(controller, request, done));
}

public CompletableFuture<FlushRegionResponse> flushRegion(FlushRegionRequest request) {
return call((stub, controller, done) -> stub.flushRegion(controller, request, done));
}

public CompletableFuture<CompactionSwitchResponse> compactionSwitch(
CompactionSwitchRequest request) {
return call((stub, controller, done) -> stub.compactionSwitch(controller, request, done));
}

public CompletableFuture<CompactRegionResponse> compactRegion(CompactRegionRequest request) {
return call((stub, controller, done) -> stub.compactRegion(controller, request, done));
}

public CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
ReplicateWALEntryRequest request) {
return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done));
}

public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request) {
return call((stub, controller, done) -> stub.replay(controller, request, done));
}

public CompletableFuture<RollWALWriterResponse> rollWALWriter(RollWALWriterRequest request) {
return call((stub, controller, done) -> stub.rollWALWriter(controller, request, done));
}

public CompletableFuture<GetServerInfoResponse> getServerInfo(GetServerInfoRequest request) {
return call((stub, controller, done) -> stub.getServerInfo(controller, request, done));
}

public CompletableFuture<StopServerResponse> stopServer(StopServerRequest request) {
return call((stub, controller, done) -> stub.stopServer(controller, request, done));
}

public CompletableFuture<UpdateFavoredNodesResponse> updateFavoredNodes(
UpdateFavoredNodesRequest request) {
return call((stub, controller, done) -> stub.updateFavoredNodes(controller, request, done));
}

public CompletableFuture<UpdateConfigurationResponse> updateConfiguration(
UpdateConfigurationRequest request) {
return call((stub, controller, done) -> stub.updateConfiguration(controller, request, done));
}

public CompletableFuture<GetRegionLoadResponse> getRegionLoad(GetRegionLoadRequest request) {
return call((stub, controller, done) -> stub.getRegionLoad(controller, request, done));
}

public CompletableFuture<ClearCompactionQueuesResponse> clearCompactionQueues(
ClearCompactionQueuesRequest request) {
return call((stub, controller, done) -> stub.clearCompactionQueues(controller, request, done));
}

public CompletableFuture<ClearRegionBlockCacheResponse> clearRegionBlockCache(
ClearRegionBlockCacheRequest request) {
return call((stub, controller, done) -> stub.clearRegionBlockCache(controller, request, done));
}

public CompletableFuture<GetSpaceQuotaSnapshotsResponse> getSpaceQuotaSnapshots(
GetSpaceQuotaSnapshotsRequest request) {
return call((stub, controller, done) -> stub.getSpaceQuotaSnapshots(controller, request, done));
}

public CompletableFuture<ExecuteProceduresResponse> executeProcedures(
ExecuteProceduresRequest request) {
return call((stub, controller, done) -> stub.executeProcedures(controller, request, done));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,4 @@ public static <T> CompletableFuture<T> failedFuture(Throwable e) {
future.completeExceptionally(e);
return future;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.EncryptionTest;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.HBaseFsck;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.HasThread;
Expand Down Expand Up @@ -234,6 +235,7 @@
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
Expand Down Expand Up @@ -1961,6 +1963,15 @@ protected String getDescription() {
});
}

private void warmUpRegion(ServerName server, RegionInfo region) {
FutureUtils.addListener(asyncClusterConnection.getRegionServerAdmin(server)
.warmupRegion(RequestConverter.buildWarmupRegionRequest(region)), (r, e) -> {
if (e != null) {
LOG.warn("Failed to warm up region {} on server {}", region, server, e);
}
});
}

// Public so can be accessed by tests. Blocks until move is done.
// Replace with an async implementation from which you can get
// a success/failure result.
Expand Down Expand Up @@ -2031,11 +2042,12 @@ public void move(final byte[] encodedRegionName, byte[] destServerName) throws H
}

TransitRegionStateProcedure proc =
this.assignmentManager.createMoveRegionProcedure(rp.getRegionInfo(), rp.getDestination());
// Warmup the region on the destination before initiating the move. this call
// is synchronous and takes some time. doing it before the source region gets
// closed
serverManager.sendRegionWarmup(rp.getDestination(), hri);
this.assignmentManager.createMoveRegionProcedure(rp.getRegionInfo(), rp.getDestination());
// Warmup the region on the destination before initiating the move.
// A region server could reject the close request because it either does not
// have the specified region or the region is being split.
warmUpRegion(rp.getDestination(), hri);

LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
Future<byte[]> future = ProcedureSyncWait.submitProcedure(this.procedureExecutor, proc);
try {
Expand Down
Loading

0 comments on commit 5d872d3

Please sign in to comment.