Skip to content

Commit

Permalink
HBASE-22271 Implement grant/revoke/delete table acls/delete namespace…
Browse files Browse the repository at this point in the history
… acls in Procedure
  • Loading branch information
mymeiyi committed Jun 4, 2019
1 parent aacacc5 commit a018dfa
Show file tree
Hide file tree
Showing 36 changed files with 1,219 additions and 855 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
* whether the hfile was recorded by preCommitStoreFile hook (true)
*/
public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
readBulkloadRows(List<TableName> tableList) throws IOException {
readBulkloadRowsTestMetaTableMetrics(List<TableName> tableList) throws IOException {

Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>();
List<byte[]> rows = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2138,14 +2138,47 @@ void cloneTableSchema(TableName tableName, TableName newTableName, boolean prese
* permissions.
* @throws IOException if a remote or network exception occurs
*/
void grant(UserPermission userPermission, boolean mergeExistingPermissions) throws IOException;
default void grant(UserPermission userPermission, boolean mergeExistingPermissions)
throws IOException {
get(grantAsync(userPermission, mergeExistingPermissions));
}

/**
* Grants user specific permissions but does not block. You can use Future.get(long, TimeUnit) to
* wait on the operation to complete. It may throw ExecutionException if there was an error while
* executing the operation or TimeoutException in case the wait timeout was not long enough to
* allow the operation to complete.
* @param userPermission user name and the specific permission
* @param mergeExistingPermissions If set to false, later granted permissions will override
* previous granted permissions. otherwise, it'll merge with previous granted
* permissions.
* @throws IOException if a remote or network exception occurs
* @return the result of the async creation. You can use Future.get(long, TimeUnit) to wait on the
* operation to complete.
*/
Future<Void> grantAsync(UserPermission userPermission, boolean mergeExistingPermissions)
throws IOException;

/**
* Revokes user specific permissions
* @param userPermission user name and the specific permission
* @throws IOException if a remote or network exception occurs
*/
void revoke(UserPermission userPermission) throws IOException;
default void revoke(UserPermission userPermission) throws IOException {
get(revokeAsync(userPermission));
}

/**
* Revokes user specific permissions but does not block. You can use Future.get(long, TimeUnit) to
* wait on the operation to complete. It may throw ExecutionException if there was an error while
* executing the operation or TimeoutException in case the wait timeout was not long enough to
* allow the operation to complete.
* @param userPermission user name and the specific permission
* @throws IOException if a remote or network exception occurs
* @return the result of the async creation. You can use Future.get(long, TimeUnit) to wait on the
* operation to complete.
*/
Future<Void> revokeAsync(UserPermission userPermission) throws IOException;

/**
* Get the global/namespace/table permissions for user
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3837,30 +3837,71 @@ protected SpaceQuotaSnapshot rpcCall() throws Exception {
});
}

@InterfaceAudience.Private
@InterfaceStability.Evolving
private class UpdatePermissionFuture extends ProcedureFuture<Void> {
private final UserPermission userPermission;
private final Supplier<String> getOperation;

public UpdatePermissionFuture(HBaseAdmin admin, UserPermission userPermission, Long procId,
Supplier<String> getOperation) {
super(admin, procId);
this.userPermission = userPermission;
this.getOperation = getOperation;
}

@Override
public String toString() {
return "Operation: " + getOperation.get() + ", userPermission: " + userPermission;
}
}

@InterfaceAudience.Private
@InterfaceStability.Evolving
private class GrantFuture extends UpdatePermissionFuture {
private final boolean mergeExistingPermissions;

public GrantFuture(HBaseAdmin admin, UserPermission userPermission,
boolean mergeExistingPermissions, Long procId, Supplier<String> getOperation) {
super(admin, userPermission, procId, getOperation);
this.mergeExistingPermissions = mergeExistingPermissions;
}

@Override
public String toString() {
return super.toString() + ", mergeExistingPermissions: " + mergeExistingPermissions;
}
}

@Override
public void grant(UserPermission userPermission, boolean mergeExistingPermissions)
public Future<Void> grantAsync(UserPermission userPermission, boolean mergeExistingPermissions)
throws IOException {
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
protected Void rpcCall() throws Exception {
GrantRequest req =
ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions);
this.master.grant(getRpcController(), req);
return null;
}
});
AccessControlProtos.GrantResponse response =
executeCallable(new MasterCallable<AccessControlProtos.GrantResponse>(getConnection(),
getRpcControllerFactory()) {
@Override
protected AccessControlProtos.GrantResponse rpcCall() throws Exception {
GrantRequest req =
ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions);
return this.master.grant(getRpcController(), req);
}
});
return new GrantFuture(this, userPermission, mergeExistingPermissions, response.getProcId(),
() -> "GRANT");
}

@Override
public void revoke(UserPermission userPermission) throws IOException {
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
protected Void rpcCall() throws Exception {
RevokeRequest req = ShadedAccessControlUtil.buildRevokeRequest(userPermission);
this.master.revoke(getRpcController(), req);
return null;
}
});
public Future<Void> revokeAsync(UserPermission userPermission) throws IOException {
AccessControlProtos.RevokeResponse response =
executeCallable(new MasterCallable<AccessControlProtos.RevokeResponse>(getConnection(),
getRpcControllerFactory()) {
@Override
protected AccessControlProtos.RevokeResponse rpcCall() throws Exception {
RevokeRequest req = ShadedAccessControlUtil.buildRevokeRequest(userPermission);
return this.master.revoke(getRpcController(), req);
}
});
return new UpdatePermissionFuture(this, userPermission, response.getProcId(), () -> "REVOKE");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3832,20 +3832,18 @@ public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableN
@Override
public CompletableFuture<Void> grant(UserPermission userPermission,
boolean mergeExistingPermissions) {
return this.<Void> newMasterCaller()
.action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller,
stub, ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
(s, c, req, done) -> s.grant(c, req, done), resp -> null))
.call();
return this.<GrantRequest, GrantResponse> procedureCall(
ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
(s, c, req, done) -> s.grant(c, req, done), (resp) -> resp.getProcId(),
new GrantProcedureBiConsumer(userPermission, mergeExistingPermissions, () -> "GRANT"));
}

@Override
public CompletableFuture<Void> revoke(UserPermission userPermission) {
return this.<Void> newMasterCaller()
.action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
(s, c, req, done) -> s.revoke(c, req, done), resp -> null))
.call();
return this.<RevokeRequest, RevokeResponse> procedureCall(
ShadedAccessControlUtil.buildRevokeRequest(userPermission),
(s, c, req, done) -> s.revoke(c, req, done), (resp) -> resp.getProcId(),
new UpdatePermissionProcedureBiConsumer(userPermission, () -> "REVOKE"));
}

@Override
Expand Down Expand Up @@ -3873,4 +3871,53 @@ public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
resp -> resp.getHasUserPermissionList()))
.call();
}

private static class UpdatePermissionProcedureBiConsumer extends ProcedureBiConsumer {
protected final UserPermission userPermission;
private final Supplier<String> getOperation;

UpdatePermissionProcedureBiConsumer(UserPermission userPermission,
Supplier<String> getOperation) {
this.userPermission = userPermission;
this.getOperation = getOperation;
}

String getOperationType() {
return getOperation.get();
}

String getDescription() {
return "Operation: " + getOperationType() + ", UserPermission: " + userPermission;
}

@Override
void onFinished() {
LOG.info(getDescription() + " completed");
}

@Override
void onError(Throwable error) {
LOG.info(getDescription() + " failed with " + error.getMessage());
}
}

private static class GrantProcedureBiConsumer extends UpdatePermissionProcedureBiConsumer {
protected final boolean mergeExistingPermissions;

GrantProcedureBiConsumer(UserPermission userPermission, boolean mergeExistingPermissions,
Supplier<String> getOperation) {
super(userPermission, getOperation);
this.mergeExistingPermissions = mergeExistingPermissions;
}

@Override
String getDescription() {
return super.getDescription() + ", mergeExistingPermissions: " + mergeExistingPermissions;
}

@Override
String getOperationType() {
return "GRANT";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@

@InterfaceAudience.Private
public enum LockedResourceType {
SERVER, NAMESPACE, TABLE, REGION, PEER, META
SERVER, NAMESPACE, TABLE, REGION, PEER, META, ACL
}
2 changes: 2 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/AccessControl.proto
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,15 @@ message GrantRequest {
}

message GrantResponse {
optional uint64 proc_id = 1;
}

message RevokeRequest {
required UserPermission user_permission = 1;
}

message RevokeResponse {
optional uint64 proc_id = 1;
}

message GetUserPermissionsRequest {
Expand Down
19 changes: 18 additions & 1 deletion hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

import "AccessControl.proto";
import "HBase.proto";
import "RPC.proto";
import "Snapshot.proto";
Expand Down Expand Up @@ -607,4 +608,20 @@ enum SplitWALState{
ACQUIRE_SPLIT_WAL_WORKER = 1;
DISPATCH_WAL_TO_WORKER = 2;
RELEASE_SPLIT_WORKER = 3;
}
}

enum UpdatePermissionState {
UPDATE_PERMISSION_STORAGE = 1;
UPDATE_PERMISSION_CACHE_ON_RS = 2;
POST_UPDATE_PERMISSION = 3;
}

message UpdatePermissionStateData {
required ServerName target_server = 1;
required string entry = 2;
}

message UpdatePermissionRemoteStateData {
required ServerName target_server = 1;
required string entry = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,14 @@ public enum EventType {
*
* RS_REPLAY_SYNC_REPLICATION_WAL
*/
RS_REPLAY_SYNC_REPLICATION_WAL(85, ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL);
RS_REPLAY_SYNC_REPLICATION_WAL(85, ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL),

/**
* RS refresh permission cache.<br>
*
* RS_REFRESH_PERMISSION_CACHE
*/
RS_REFRESH_PERMISSION_CACHE(86, ExecutorType.RS_REFRESH_PERMISSION_CACHE);

private final int code;
private final ExecutorType executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public enum ExecutorType {
RS_REFRESH_PEER(31),
RS_REPLAY_SYNC_REPLICATION_WAL(32),
RS_SWITCH_RPC_THROTTLE(33),
RS_IN_MEMORY_COMPACTION(34);
RS_IN_MEMORY_COMPACTION(34),
RS_REFRESH_PERMISSION_CACHE(35);

ExecutorType(int value) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -52,7 +53,6 @@
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
Expand Down Expand Up @@ -107,6 +107,7 @@
import org.apache.hadoop.hbase.security.access.Permission.Action;
import org.apache.hadoop.hbase.security.access.PermissionStorage;
import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
import org.apache.hadoop.hbase.security.access.UpdatePermissionProcedure;
import org.apache.hadoop.hbase.security.access.UserPermission;
import org.apache.hadoop.hbase.security.visibility.VisibilityController;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
Expand Down Expand Up @@ -2727,14 +2728,11 @@ public GrantResponse grant(RpcController controller, GrantRequest request)
if (master.cpHost != null) {
master.cpHost.preGrant(perm, mergeExistingPermissions);
}
try (Table table = master.getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
PermissionStorage.addUserPermission(getConfiguration(), perm, table,
mergeExistingPermissions);
}
if (master.cpHost != null) {
master.cpHost.postGrant(perm, mergeExistingPermissions);
}
return GrantResponse.getDefaultInstance();
UpdatePermissionProcedure procedure = new UpdatePermissionProcedure(
UpdatePermissionProcedure.UpdatePermissionType.GRANT, master.getServerName(),
Optional.of(perm), Optional.of(mergeExistingPermissions), Optional.empty());
long procId = master.getMasterProcedureExecutor().submitProcedure(procedure);
return GrantResponse.newBuilder().setProcId(procId).build();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
Expand All @@ -2749,13 +2747,11 @@ public RevokeResponse revoke(RpcController controller, RevokeRequest request)
if (master.cpHost != null) {
master.cpHost.preRevoke(userPermission);
}
try (Table table = master.getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
PermissionStorage.removeUserPermission(master.getConfiguration(), userPermission, table);
}
if (master.cpHost != null) {
master.cpHost.postRevoke(userPermission);
}
return RevokeResponse.getDefaultInstance();
UpdatePermissionProcedure procedure = new UpdatePermissionProcedure(
UpdatePermissionProcedure.UpdatePermissionType.REVOKE, master.getServerName(),
Optional.of(userPermission), Optional.empty(), Optional.empty());
long procId = master.getMasterProcedureExecutor().submitProcedure(procedure);
return RevokeResponse.newBuilder().setProcId(procId).build();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
import org.apache.hadoop.hbase.security.access.ZKPermissionStorage;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -526,7 +526,7 @@ default SplitWALManager getSplitWALManager(){
AccessChecker getAccessChecker();

/**
* @return the {@link ZKPermissionWatcher}
* @return the {@link ZKPermissionStorage}
*/
ZKPermissionWatcher getZKPermissionWatcher();
ZKPermissionStorage getZKPermissionStorage();
}
Loading

0 comments on commit a018dfa

Please sign in to comment.