Skip to content

Commit

Permalink
HBASE-28064:Implement truncate_region command to truncate region dire…
Browse files Browse the repository at this point in the history
…ctly from FS

Implemented truncate_region command.
  • Loading branch information
vaijosh committed Oct 17, 2023
1 parent 391dfda commit 2a6ae93
Show file tree
Hide file tree
Showing 26 changed files with 857 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,20 @@ default void modifyTable(TableDescriptor td) throws IOException {
get(modifyTableAsync(td), getSyncWaitTimeout(), TimeUnit.MILLISECONDS);
}

/**
* Truncate an individual region.
* @param regionName region to truncate
* @throws IOException if a remote or network exception occurs
*/
void truncateRegion(byte[] regionName) throws IOException;

/**
* Truncate an individual region. Asynchronous operation.
* @param regionName region to truncate
* @throws IOException if a remote or network exception occurs
*/
Future<Void> truncateRegionAsync(byte[] regionName) throws IOException;

/**
* Modify an existing table, more IRB (ruby) friendly version. Asynchronous operation. This means
* that it may be a while before your schema change is updated across all of the table. You can
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,16 @@ public Future<Void> splitRegionAsync(byte[] regionName, byte[] splitPoint) throw
return admin.splitRegion(regionName, splitPoint);
}

@Override
public void truncateRegion(byte[] regionName) throws IOException {
get(admin.truncateRegion(regionName));
}

@Override
public Future<Void> truncateRegionAsync(byte[] regionName) {
return admin.truncateRegion(regionName);
}

@Override
public Future<Void> modifyTableAsync(TableDescriptor td) throws IOException {
return admin.modifyTable(td);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,12 @@ default CompletableFuture<Void> mergeRegions(byte[] nameOfRegionA, byte[] nameOf
*/
CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint);

/**
* Truncate an individual region.
* @param regionName region to truncate
*/
CompletableFuture<Void> truncateRegion(byte[] regionName);

/**
* Assign an individual region.
* @param regionName Encoded or full name of region to assign.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,11 @@ public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint)
return wrap(rawAdmin.splitRegion(regionName, splitPoint));
}

@Override
public CompletableFuture<Void> truncateRegion(byte[] regionName) {
return wrap(rawAdmin.truncateRegion(regionName));
}

@Override
public CompletableFuture<Void> assign(byte[] regionName) {
return wrap(rawAdmin.assign(regionName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1623,6 +1623,61 @@ private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
return future;
}

@Override
public CompletableFuture<Void> truncateRegion(byte[] regionName) {
CompletableFuture<Void> future = new CompletableFuture<>();
addListener(getRegionLocation(regionName), (location, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
RegionInfo regionInfo = location.getRegion();
if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
future.completeExceptionally(new IllegalArgumentException(
"Can't truncate replicas directly.Replicas are auto-truncated "
+ "when their primary is truncated."));
return;
}
ServerName serverName = location.getServerName();
if (serverName == null) {
future
.completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
return;
}
addListener(truncate_region(regionInfo), (ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
future.complete(ret);
}
});
});
return future;
}

private CompletableFuture<Void> truncate_region(final RegionInfo hri) {
CompletableFuture<Void> future = new CompletableFuture<>();
TableName tableName = hri.getTable();
final MasterProtos.TruncateRegionRequest request;
try {
request =
RequestConverter.buildSTruncateRegionRequest(hri, ng.getNonceGroup(), ng.newNonce());
} catch (DeserializationException e) {
future.completeExceptionally(e);
return future;
}
addListener(this.procedureCall(tableName, request, MasterService.Interface::truncateRegion,
MasterProtos.TruncateRegionResponse::getProcId,
new TruncateRegionProcedureBiConsumer(tableName)), (ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
future.complete(ret);
}
});
return future;
}

@Override
public CompletableFuture<Void> assign(byte[] regionName) {
CompletableFuture<Void> future = new CompletableFuture<>();
Expand Down Expand Up @@ -2882,6 +2937,18 @@ String getOperationType() {
}
}

private static class TruncateRegionProcedureBiConsumer extends TableProcedureBiConsumer {

TruncateRegionProcedureBiConsumer(TableName tableName) {
super(tableName);
}

@Override
String getOperationType() {
return "TRUNCATE_REGION";
}
}

private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer {
SnapshotProcedureBiConsumer(TableName tableName) {
super(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,17 @@ public static SplitTableRegionRequest buildSplitTableRegionRequest(final RegionI
return builder.build();
}

public static MasterProtos.TruncateRegionRequest buildSTruncateRegionRequest(
final RegionInfo regionInfo, final long nonceGroup, final long nonce)
throws DeserializationException {
MasterProtos.TruncateRegionRequest.Builder builder =
MasterProtos.TruncateRegionRequest.newBuilder();
builder.setRegionInfo(ProtobufUtil.toRegionInfo(regionInfo));
builder.setNonceGroup(nonceGroup);
builder.setNonce(nonce);
return builder.build();
}

/**
* Create a protocol buffer AssignRegionRequest
* @return an AssignRegionRequest
Expand Down
16 changes: 16 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,16 @@ message SplitTableRegionResponse {
optional uint64 proc_id = 1;
}

message TruncateRegionRequest {
required RegionInfo region_info = 1;
optional uint64 nonce_group = 2 [default = 0];
optional uint64 nonce = 3 [default = 0];
}

message TruncateRegionResponse {
optional uint64 proc_id = 1;
}

message CreateTableRequest {
required TableSchema table_schema = 1;
repeated bytes split_keys = 2;
Expand Down Expand Up @@ -864,6 +874,12 @@ service MasterService {
rpc SplitRegion(SplitTableRegionRequest)
returns(SplitTableRegionResponse);

/**
* Truncate region
*/
rpc TruncateRegion(TruncateRegionRequest)
returns(TruncateRegionResponse);

/** Deletes a table */
rpc DeleteTable(DeleteTableRequest)
returns(DeleteTableResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,20 @@ message TruncateTableStateData {
repeated RegionInfo region_info = 5;
}

enum TruncateRegionState {
TRUNCATE_REGION_PRE_OPERATION = 1;
TRUNCATE_REGION_MAKE_OFFLINE = 2;
TRUNCATE_REGION_REMOVE = 3;
TRUNCATE_REGION_MAKE_ONLINE = 4;
TRUNCATE_REGION_POST_OPERATION = 5;
}

message TruncateRegionStateData {
required UserInformation user_info = 1;
required RegionInfo region = 2;
optional RegionInfo new_region = 3;
}

enum DeleteTableState {
DELETE_TABLE_PRE_OPERATION = 1;
DELETE_TABLE_REMOVE_FROM_META = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,26 @@ default void preSplitRegionAction(final ObserverContext<MasterCoprocessorEnviron
final TableName tableName, final byte[] splitRow) throws IOException {
}

/**
* Called before the region is truncated.
* @param c The environment to interact with the framework and master
* @param regionInfo The Region To be truncated
*/
@SuppressWarnings("unused")
default void preTruncateRegionAction(final ObserverContext<MasterCoprocessorEnvironment> c,
final RegionInfo regionInfo) {
}

/**
* Called post the region is truncated.
* @param c The environment to interact with the framework and master
* @param regionInfo The Region To be truncated
*/
@SuppressWarnings("unused")
default void postTruncateRegionAction(final ObserverContext<MasterCoprocessorEnvironment> c,
final RegionInfo regionInfo) {
}

/**
* Called after the region is split.
* @param c the environment to interact with the framework and master
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2275,6 +2275,29 @@ protected String getDescription() {
});
}

@Override
public long truncateRegion(final RegionInfo regionInfo, final long nonceGroup, final long nonce)
throws IOException {
checkInitialized();

return MasterProcedureUtil
.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
@Override
protected void run() throws IOException {
LOG.info(
getClientIdAuditPrefix() + " truncate region " + regionInfo.getRegionNameAsString());

// Execute the operation asynchronously
submitProcedure(getAssignmentManager().createTruncateRegionProcedure(regionInfo));
}

@Override
protected String getDescription() {
return "TruncateRegionProcedure";
}
});
}

private void warmUpRegion(ServerName server, RegionInfo region) {
FutureUtils.addListener(asyncClusterConnection.getRegionServerAdmin(server)
.warmupRegion(RequestConverter.buildWarmupRegionRequest(region)), (r, e) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,34 @@ public void call(MasterObserver observer) throws IOException {
});
}

/**
* Invoked just before calling the truncate region procedure
* @param region Region to be truncated
* @param user The user
*/
public void preTruncateRegionAction(final RegionInfo region, User user) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
public void call(MasterObserver observer) throws IOException {
observer.preTruncateRegionAction(this, region);
}
});
}

/**
* Invoked after calling the truncate region procedure
* @param region Region which was truncated
* @param user The user
*/
public void postTruncateRegionAction(final RegionInfo region, User user) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
public void call(MasterObserver observer) throws IOException {
observer.postTruncateRegionAction(this, region);
}
});
}

/**
* This will be called before update META step as part of split table region procedure.
* @param user the user
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,18 @@ public SplitTableRegionResponse splitRegion(final RpcController controller,
}
}

@Override
public MasterProtos.TruncateRegionResponse truncateRegion(RpcController controller,
final MasterProtos.TruncateRegionRequest request) throws ServiceException {
try {
long procId = server.truncateRegion(ProtobufUtil.toRegionInfo(request.getRegionInfo()),
request.getNonceGroup(), request.getNonce());
return MasterProtos.TruncateRegionResponse.newBuilder().setProcId(procId).build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}

@Override
public ClientProtos.CoprocessorServiceResponse execMasterService(final RpcController controller,
final ClientProtos.CoprocessorServiceRequest request) throws ServiceException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,4 +488,13 @@ boolean normalizeRegions(final NormalizeTableFilterParams ntfp, final boolean is
*/
long flushTable(final TableName tableName, final List<byte[]> columnFamilies,
final long nonceGroup, final long nonce) throws IOException;

/**
* Truncate region
* @param regionInfo region to be truncated
* @param nonceGroup the nonce group
* @param nonce the nonce
* @return procedure Id
*/
long truncateRegion(RegionInfo regionInfo, long nonceGroup, long nonce) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.master.procedure.TruncateRegionProcedure;
import org.apache.hadoop.hbase.master.region.MasterRegion;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
Expand Down Expand Up @@ -1082,6 +1083,11 @@ public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToS
return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey);
}

public TruncateRegionProcedure createTruncateRegionProcedure(final RegionInfo regionToTruncate)
throws IOException {
return new TruncateRegionProcedure(getProcedureEnvironment(), regionToTruncate);
}

public MergeTableRegionsProcedure createMergeProcedure(RegionInfo... ris) throws IOException {
return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public enum TableOperationType {
REGION_ASSIGN,
REGION_UNASSIGN,
REGION_GC,
MERGED_REGIONS_GC/* region operations */
MERGED_REGIONS_GC/* region operations */,
REGION_TRUNCATE
}

/** Returns the name of the table the procedure is operating on */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ private static boolean requireTableExclusiveLock(TableProcedureInterface proc) {
case REGION_GC:
case MERGED_REGIONS_GC:
case REGION_SNAPSHOT:
case REGION_TRUNCATE:
return false;
default:
break;
Expand Down
Loading

0 comments on commit 2a6ae93

Please sign in to comment.