Skip to content

Commit

Permalink
HBASE-25968 Request compact to compaction server (#3378)
Browse files Browse the repository at this point in the history
Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
nyl3532016 authored Jun 21, 2021
1 parent 6afca94 commit da0fa30
Show file tree
Hide file tree
Showing 23 changed files with 660 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ class AsyncConnectionImpl implements AsyncConnection {

final AsyncConnectionConfiguration connConf;

private final User user;
protected final User user;

final ConnectionRegistry registry;

private final int rpcTimeout;
protected final int rpcTimeout;

protected final RpcClient rpcClient;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionServerStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
Expand Down Expand Up @@ -54,6 +55,8 @@ public class SecurityInfo {
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
infos.put(CompactionServerStatusProtos.CompactionServerStatusService.getDescriptor().getName(),
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
infos.put(CompactionProtos.CompactionService.getDescriptor().getName(),
new SecurityInfo(SecurityConstants.COMPACTION_SERVER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
// NOTE: IF ADDING A NEW SERVICE, BE SURE TO UPDATE HBasePolicyProvider ALSO ELSE
// new Service will not be found when all is Kerberized!!!!
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ComparatorProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.FilterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
Expand Down Expand Up @@ -3801,4 +3802,16 @@ public static HBaseProtos.LogRequest toBalancerDecisionRequest(int limit) {
.build();
}

public static String toString(CompactionProtos.CompactRequest request) {
ServerName rsServerName = ProtobufUtil.toServerName(request.getServer());
org.apache.hadoop.hbase.client.RegionInfo regionInfo =
ProtobufUtil.toRegionInfo(request.getRegionInfo());
ColumnFamilyDescriptor cfd = ProtobufUtil.toColumnFamilyDescriptor(request.getFamily());
boolean major = request.getMajor();
int priority = request.getPriority();
return new StringBuilder("RS: ").append(rsServerName).append(", region: ")
.append(regionInfo.getRegionNameAsString()).append(", CF: ").append(cfd.getNameAsString())
.append(", major:").append(major).append(", priority:").append(priority).toString();
}

}
62 changes: 62 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/server/Compaction.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto2";

// This file contains protocol buffers that are used for CompactionManagerProtocol.
package hbase.pb;

option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
option java_outer_classname = "CompactionProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

import "HBase.proto";
import "server/ClusterStatus.proto";
import "server/ErrorHandling.proto";

message CompactRequest {
required ServerName server = 1;
required RegionInfo region_info = 2;
required ColumnFamilySchema family = 3;
required bool major = 4;
required int32 priority = 5;
repeated ServerName favored_nodes = 6;
}

message CompactResponse {
}

message CompleteCompactionRequest {
required RegionInfo region_info = 1;
required ColumnFamilySchema family = 2;
repeated string selected_files = 3;
repeated string new_files = 4;
required bool new_force_major = 5;
}

message CompleteCompactionResponse {
required bool success = 1;
}

service CompactionService {
/** Called when a region server request compact a column of a region. */
rpc RequestCompaction(CompactRequest)
returns(CompactResponse);

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import "server/Quota.proto";
import "server/ClusterStatus.proto";
import "server/region/WAL.proto";
import "server/region/TooSlowLog.proto";
import "server/Compaction.proto";

message GetRegionInfoRequest {
required RegionSpecifier region = 1;
Expand Down Expand Up @@ -393,6 +394,9 @@ service AdminService {
rpc ExecuteProcedures(ExecuteProceduresRequest)
returns(ExecuteProceduresResponse);

rpc CompleteCompaction(CompleteCompactionRequest)
returns(CompleteCompactionResponse);

rpc ClearSlowLogsResponses(ClearSlowLogResponseRequest)
returns(ClearSlowLogResponses);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public interface AsyncClusterConnection extends AsyncConnection {
*/
AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName);

/**
* Get the compaction service for the given compaction server.
*/
AsyncCompactionServerService getCompactionServerService(ServerName serverName);

/**
* Get the nonce generator for this connection.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;

Expand All @@ -49,12 +50,16 @@
*/
@InterfaceAudience.Private
class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClusterConnection {

public AsyncClusterConnectionImpl(Configuration conf, ConnectionRegistry registry,
String clusterId, SocketAddress localAddress, User user) {
super(conf, registry, clusterId, localAddress, user);
}

CompactionProtos.CompactionService.Interface createCompactionServerStub(ServerName serverName) {
return CompactionProtos.CompactionService
.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
}

@Override
public NonceGenerator getNonceGenerator() {
return super.getNonceGenerator();
Expand All @@ -70,6 +75,11 @@ public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
return new AsyncRegionServerAdmin(serverName, this);
}

@Override
public AsyncCompactionServerService getCompactionServerService(ServerName serverName) {
return new AsyncCompactionServerService(serverName, this);
}

@Override
public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
boolean writeFlushWALMarker) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;

import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactionService;


/**
* A simple wrapper of the {@link CompactionService} for a compaction server, which returns a
* {@link CompletableFuture}. This is easier to use, as if you use the raw protobuf interface, you
* need to get the result from the {@link RpcCallback}, and if there is an exception, you need to
* get it from the {@link RpcController} passed in.
* <p/>
* Notice that there is no retry, and this is intentional. We have different retry for different
* usage for now, if later we want to unify them, we can move the retry logic into this class.
*/
@InterfaceAudience.Private
public class AsyncCompactionServerService {

private final ServerName server;

private final AsyncClusterConnectionImpl conn;

AsyncCompactionServerService(ServerName server, AsyncClusterConnectionImpl conn) {
this.server = server;
this.conn = conn;
}

@FunctionalInterface
private interface RpcCall<RESP> {
void call(CompactionService.Interface stub, HBaseRpcController controller,
RpcCallback<RESP> done);
}

// TODO: eliminate duplicate code in AsyncRegionServerAdmin and maybe we could also change the
// way on how to do regionServerReport
private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) {
CompletableFuture<RESP> future = new CompletableFuture<>();
HBaseRpcController controller = conn.rpcControllerFactory.newController();
try {
rpcCall.call(conn.createCompactionServerStub(server), controller, new RpcCallback<RESP>() {
@Override
public void run(RESP resp) {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
future.complete(resp);
}
}
});
} catch (Exception e) {
future.completeExceptionally(e);
}
return future;
}

public CompletableFuture<CompactResponse> requestCompaction(CompactRequest request) {
return call((stub, controller, done) -> stub.requestCompaction(controller, request, done));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;

Expand Down Expand Up @@ -133,6 +135,11 @@ public CompletableFuture<GetOnlineRegionResponse> getOnlineRegion(
return call((stub, controller, done) -> stub.getOnlineRegion(controller, request, done));
}

public CompletableFuture<CompleteCompactionResponse>
completeCompaction(CompleteCompactionRequest request) {
return call((stub, controller, done) -> stub.completeCompaction(controller, request, done));
}

public CompletableFuture<OpenRegionResponse> openRegion(OpenRegionRequest request) {
return call((stub, controller, done) -> stub.openRegion(controller, request, done));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.LongAdder;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AbstractRpcServices;
Expand All @@ -31,23 +32,35 @@
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactionService;

@InterfaceAudience.Private
public class CSRpcServices extends AbstractRpcServices {
public class CSRpcServices extends AbstractRpcServices
implements CompactionService.BlockingInterface {
protected static final Logger LOG = LoggerFactory.getLogger(CSRpcServices.class);

protected final HCompactionServer compactionServer;
private final HCompactionServer compactionServer;

// Request counter.
final LongAdder requestCount = new LongAdder();
/** RPC scheduler to use for the compaction server. */
public static final String COMPACTION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
"hbase.compaction.server.rpc.scheduler.factory.class";

/**
* @return immutable list of blocking services and the security info classes that this server
* supports
*/
protected List<RpcServer.BlockingServiceAndInterface> getServices(final Configuration conf) {
// now return empty, compaction server do not receive rpc request
List<RpcServer.BlockingServiceAndInterface> bssi = new ArrayList<>();
bssi.add(new RpcServer.BlockingServiceAndInterface(
CompactionService.newReflectiveBlockingService(this),
CompactionService.BlockingInterface.class));
return new ImmutableList.Builder<RpcServer.BlockingServiceAndInterface>().addAll(bssi).build();
}

Expand All @@ -65,4 +78,19 @@ protected Class<?> getRpcSchedulerFactoryClass(Configuration conf) {
compactionServer = cs;
}


/**
* Request compaction on the compaction server.
* @param controller the RPC controller
* @param request the compaction request
*/
@Override
public CompactResponse requestCompaction(RpcController controller,
CompactionProtos.CompactRequest request) {
requestCount.increment();
LOG.info("Receive compaction request from {}", ProtobufUtil.toString(request));
compactionServer.compactionThreadManager.requestCompaction();
return CompactionProtos.CompactResponse.newBuilder().build();
}

}
Loading

0 comments on commit da0fa30

Please sign in to comment.