From 92173176713e05214f30850355fc0eb4daee251a Mon Sep 17 00:00:00 2001 From: Jian Zhang <1361320460@qq.com> Date: Fri, 2 Aug 2024 18:22:21 +0800 Subject: [PATCH] HDFS-17544. [ARR] The router client rpc protocol PB supports asynchrony. (#6870). Contributed by Jian Zhang. Signed-off-by: He Xiaoqiao --- .../ClientNamenodeProtocolTranslatorPB.java | 28 +- .../protocolPB/AsyncRpcProtocolPBUtil.java | 69 + .../RouterClientProtocolTranslatorPB.java | 2025 +++++++++++++++++ ...erGetUserMappingsProtocolTranslatorPB.java | 52 + .../RouterNamenodeProtocolTranslatorPB.java | 270 +++ ...freshUserMappingsProtocolTranslatorPB.java | 64 + .../hadoop/hdfs/protocolPB/package-info.java | 29 + .../federation/router/ConnectionPool.java | 16 +- .../federation/router/async/AsyncUtil.java | 7 + .../TestAsyncRpcProtocolPBUtil.java | 121 + .../hdfs/protocolPB/TestClientProtocol.java | 31 + ...tClientProtocolServerSideTranslatorPB.java | 88 + .../TestClientProtocolTranslatorPB.java | 80 + .../TestRouterClientSideTranslatorPB.java | 242 ++ 14 files changed, 3100 insertions(+), 22 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterClientProtocolTranslatorPB.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterGetUserMappingsProtocolTranslatorPB.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterNamenodeProtocolTranslatorPB.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterRefreshUserMappingsProtocolTranslatorPB.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/package-info.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestClientProtocol.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestClientProtocolServerSideTranslatorPB.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestClientProtocolTranslatorPB.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 543f0a58e6ec6..78d2b312b4f56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -274,47 +274,47 @@ public class ClientNamenodeProtocolTranslatorPB implements ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator { final private ClientNamenodeProtocolPB rpcProxy; - static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST = + protected static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST = GetServerDefaultsRequestProto.newBuilder().build(); - private final static GetFsStatusRequestProto VOID_GET_FSSTATUS_REQUEST = + protected final static GetFsStatusRequestProto VOID_GET_FSSTATUS_REQUEST = GetFsStatusRequestProto.newBuilder().build(); - private final static GetFsReplicatedBlockStatsRequestProto + protected final static GetFsReplicatedBlockStatsRequestProto VOID_GET_FS_REPLICATED_BLOCK_STATS_REQUEST = GetFsReplicatedBlockStatsRequestProto.newBuilder().build(); - private final static GetFsECBlockGroupStatsRequestProto + protected final static GetFsECBlockGroupStatsRequestProto VOID_GET_FS_ECBLOCKGROUP_STATS_REQUEST = GetFsECBlockGroupStatsRequestProto.newBuilder().build(); - private final static RollEditsRequestProto VOID_ROLLEDITS_REQUEST = + protected final static RollEditsRequestProto VOID_ROLLEDITS_REQUEST = RollEditsRequestProto.getDefaultInstance(); - private final static RefreshNodesRequestProto VOID_REFRESH_NODES_REQUEST = + protected final static RefreshNodesRequestProto VOID_REFRESH_NODES_REQUEST = RefreshNodesRequestProto.newBuilder().build(); - private final static FinalizeUpgradeRequestProto + protected final static FinalizeUpgradeRequestProto VOID_FINALIZE_UPGRADE_REQUEST = FinalizeUpgradeRequestProto.newBuilder().build(); - private final static UpgradeStatusRequestProto + protected final static UpgradeStatusRequestProto VOID_UPGRADE_STATUS_REQUEST = UpgradeStatusRequestProto.newBuilder().build(); - private final static GetDataEncryptionKeyRequestProto + protected final static GetDataEncryptionKeyRequestProto VOID_GET_DATA_ENCRYPTIONKEY_REQUEST = GetDataEncryptionKeyRequestProto.newBuilder().build(); - private final static GetStoragePoliciesRequestProto + protected final static GetStoragePoliciesRequestProto VOID_GET_STORAGE_POLICIES_REQUEST = GetStoragePoliciesRequestProto.newBuilder().build(); - private final static GetErasureCodingPoliciesRequestProto + protected final static GetErasureCodingPoliciesRequestProto VOID_GET_EC_POLICIES_REQUEST = GetErasureCodingPoliciesRequestProto .newBuilder().build(); - private final static GetErasureCodingCodecsRequestProto + protected final static GetErasureCodingCodecsRequestProto VOID_GET_EC_CODEC_REQUEST = GetErasureCodingCodecsRequestProto .newBuilder().build(); @@ -1137,7 +1137,7 @@ public void removeCacheDirective(long id) setId(id).build())); } - private static class BatchedCacheEntries + protected static class BatchedCacheEntries implements BatchedEntries { private final ListCacheDirectivesResponseProto response; @@ -1200,7 +1200,7 @@ public void removeCachePool(String cachePoolName) throws IOException { setPoolName(cachePoolName).build())); } - private static class BatchedCachePoolEntries + protected static class BatchedCachePoolEntries implements BatchedEntries { private final ListCachePoolsResponseProto proto; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java new file mode 100644 index 0000000000000..f65742a5df4e0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocolPB; + +import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.ipc.CallerContext; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.ProtobufRpcEngine2; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ipc.internal.ShadedProtobufHelper; +import org.apache.hadoop.util.concurrent.AsyncGet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCompleteWith; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; +import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc; + +public final class AsyncRpcProtocolPBUtil { + public static final Logger LOG = LoggerFactory.getLogger(AsyncRpcProtocolPBUtil.class); + + private AsyncRpcProtocolPBUtil() {} + + public static R asyncIpcClient( + ShadedProtobufHelper.IpcCall call, ApplyFunction response, + Class clazz) throws IOException { + ipc(call); + AsyncGet asyncReqMessage = + (AsyncGet) ProtobufRpcEngine2.getAsyncReturnMessage(); + CompletableFuture responseFuture = Client.getResponseFuture(); + // transfer originCall & callerContext to worker threads of executor. + final Server.Call originCall = Server.getCurCall().get(); + final CallerContext originContext = CallerContext.getCurrent(); + asyncCompleteWith(responseFuture); + asyncApply(o -> { + try { + Server.getCurCall().set(originCall); + CallerContext.setCurrent(originContext); + T res = asyncReqMessage.get(-1, null); + return response.apply(res); + } catch (Exception e) { + throw warpCompletionException(e); + } + }); + return asyncReturn(clazz); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterClientProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterClientProtocolTranslatorPB.java new file mode 100644 index 0000000000000..93cbae4ec61ee --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterClientProtocolTranslatorPB.java @@ -0,0 +1,2025 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.IOException; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; + +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.crypto.CryptoProtocolVersion; +import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; +import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; +import org.apache.hadoop.fs.CacheFlag; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.fs.Options.Rename; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.QuotaUsage; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.fs.XAttr; +import org.apache.hadoop.fs.XAttrSetFlag; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.AclStatus; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.hdfs.AddBlockFlag; +import org.apache.hadoop.hdfs.inotify.EventBatchList; +import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; +import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; +import org.apache.hadoop.hdfs.protocol.CachePoolEntry; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; +import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.HdfsPartialListing; +import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats; +import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult; +import org.apache.hadoop.hdfs.protocol.EncryptionZone; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; +import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; +import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus; +import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing; +import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; +import org.apache.hadoop.hdfs.protocol.SnapshotStatus; +import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.AclProtos.ModifyAclEntriesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveAclEntriesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveAclRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveDefaultAclRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.AclProtos.SetAclRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCacheDirectiveRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCachePoolRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSnapshotRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSymlinkRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteSnapshotRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DisallowSnapshotRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBatchedListingRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetCurrentEditLogTxidRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEnclosingRootRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLinkTargetRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetListingRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLocatedFileInfoRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetQuotaUsageRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportListingRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotListingRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.HAServiceStateRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCacheDirectiveRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MsyncRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesBatchResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCacheDirectiveRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameSnapshotRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenewLeaseRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ReportBadBlocksRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetOwnerRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPermissionRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UnsetStoragePolicyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SatisfyStoragePolicyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto; +import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListReencryptionStatusRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ZoneReencryptionStatusProto; +import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ReencryptEncryptionZoneRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.AddErasureCodingPoliciesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPolicyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.RemoveErasureCodingPolicyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.EnableErasureCodingPolicyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.DisableErasureCodingPolicyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.SetErasureCodingPolicyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.UnsetErasureCodingPolicyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.CodecProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BatchedDirectoryListingProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetECTopologyResultForPoliciesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ErasureCodingPolicyProto; +import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.SetXAttrRequestProto; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto; +import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto; +import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.thirdparty.protobuf.ByteString; +import org.apache.hadoop.util.Lists; + +import static org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil.asyncIpcClient; + +/** + * This class forwards NN's ClientProtocol calls as RPC calls to the NN server + * while translating from the parameter types used in ClientProtocol to the + * new PB types. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class RouterClientProtocolTranslatorPB extends ClientNamenodeProtocolTranslatorPB { + private final ClientNamenodeProtocolPB rpcProxy; + + public RouterClientProtocolTranslatorPB(ClientNamenodeProtocolPB proxy) { + super(proxy); + rpcProxy = proxy; + } + + @Override + public void close() { + super.close(); + } + + @Override + public LocatedBlocks getBlockLocations(String src, long offset, long length) + throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getBlockLocations(src, offset, length); + } + GetBlockLocationsRequestProto req = GetBlockLocationsRequestProto + .newBuilder() + .setSrc(src) + .setOffset(offset) + .setLength(length) + .build(); + + return asyncIpcClient(() -> rpcProxy.getBlockLocations(null, req), + res -> res.hasLocations() ? PBHelperClient.convert(res.getLocations()) : null, + LocatedBlocks.class); + } + + @Override + public FsServerDefaults getServerDefaults() throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getServerDefaults(); + } + GetServerDefaultsRequestProto req = VOID_GET_SERVER_DEFAULT_REQUEST; + + return asyncIpcClient(() -> rpcProxy.getServerDefaults(null, req), + res -> PBHelperClient.convert(res.getServerDefaults()), + FsServerDefaults.class); + } + + @Override + public HdfsFileStatus create( + String src, FsPermission masked, + String clientName, EnumSetWritable flag, + boolean createParent, short replication, long blockSize, + CryptoProtocolVersion[] supportedVersions, String ecPolicyName, + String storagePolicy) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.create( + src, masked, clientName, flag, createParent, replication, + blockSize, supportedVersions, ecPolicyName, storagePolicy); + } + + CreateRequestProto.Builder builder = CreateRequestProto.newBuilder() + .setSrc(src) + .setMasked(PBHelperClient.convert(masked)) + .setClientName(clientName) + .setCreateFlag(PBHelperClient.convertCreateFlag(flag)) + .setCreateParent(createParent) + .setReplication(replication) + .setBlockSize(blockSize); + if (ecPolicyName != null) { + builder.setEcPolicyName(ecPolicyName); + } + if (storagePolicy != null) { + builder.setStoragePolicy(storagePolicy); + } + FsPermission unmasked = masked.getUnmasked(); + if (unmasked != null) { + builder.setUnmasked(PBHelperClient.convert(unmasked)); + } + builder.addAllCryptoProtocolVersion( + PBHelperClient.convert(supportedVersions)); + CreateRequestProto req = builder.build(); + + return asyncIpcClient(() -> rpcProxy.create(null, req), + res -> res.hasFs() ? PBHelperClient.convert(res.getFs()) : null, + HdfsFileStatus.class); + } + + @Override + public boolean truncate(String src, long newLength, String clientName) + throws IOException { + if (!Client.isAsynchronousMode()) { + return super.truncate(src, newLength, clientName); + } + + TruncateRequestProto req = TruncateRequestProto.newBuilder() + .setSrc(src) + .setNewLength(newLength) + .setClientName(clientName) + .build(); + + return asyncIpcClient(() -> rpcProxy.truncate(null, req), + res -> res.getResult(), Boolean.class); + } + + @Override + public LastBlockWithStatus append(String src, String clientName, + EnumSetWritable flag) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.append(src, clientName, flag); + } + + AppendRequestProto req = AppendRequestProto.newBuilder().setSrc(src) + .setClientName(clientName).setFlag( + PBHelperClient.convertCreateFlag(flag)) + .build(); + + return asyncIpcClient(() -> rpcProxy.append(null, req), + res -> { + LocatedBlock lastBlock = res.hasBlock() ? PBHelperClient + .convertLocatedBlockProto(res.getBlock()) : null; + HdfsFileStatus stat = (res.hasStat()) ? + PBHelperClient.convert(res.getStat()) : null; + return new LastBlockWithStatus(lastBlock, stat); + }, LastBlockWithStatus.class); + } + + @Override + public boolean setReplication(String src, short replication) + throws IOException { + if (!Client.isAsynchronousMode()) { + return super.setReplication(src, replication); + } + SetReplicationRequestProto req = SetReplicationRequestProto.newBuilder() + .setSrc(src) + .setReplication(replication) + .build(); + + return asyncIpcClient(() -> rpcProxy.setReplication(null, req), + res -> res.getResult(), Boolean.class); + } + + @Override + public void setPermission(String src, FsPermission permission) + throws IOException { + if (!Client.isAsynchronousMode()) { + super.setPermission(src, permission); + return; + } + SetPermissionRequestProto req = SetPermissionRequestProto.newBuilder() + .setSrc(src) + .setPermission(PBHelperClient.convert(permission)) + .build(); + + asyncIpcClient(() -> rpcProxy.setPermission(null, req), + res -> null, null); + } + + @Override + public void setOwner(String src, String username, String groupname) + throws IOException { + if (!Client.isAsynchronousMode()) { + super.setOwner(src, username, groupname); + return; + } + SetOwnerRequestProto.Builder req = SetOwnerRequestProto.newBuilder() + .setSrc(src); + if (username != null) { + req.setUsername(username); + } + if (groupname != null) { + req.setGroupname(groupname); + } + + asyncIpcClient(() -> rpcProxy.setOwner(null, req.build()), + res -> null, null); + } + + @Override + public void abandonBlock(ExtendedBlock b, long fileId, String src, + String holder) throws IOException { + if (!Client.isAsynchronousMode()) { + super.abandonBlock(b, fileId, src, holder); + return; + } + AbandonBlockRequestProto req = AbandonBlockRequestProto.newBuilder() + .setB(PBHelperClient.convert(b)).setSrc(src).setHolder(holder) + .setFileId(fileId).build(); + asyncIpcClient(() -> rpcProxy.abandonBlock(null, req), + res -> null, null); + } + + @Override + public LocatedBlock addBlock( + String src, String clientName, + ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, + String[] favoredNodes, EnumSet addBlockFlags) + throws IOException { + if (!Client.isAsynchronousMode()) { + return super.addBlock(src, clientName, previous, excludeNodes, + fileId, favoredNodes, addBlockFlags); + } + AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder() + .setSrc(src).setClientName(clientName).setFileId(fileId); + if (previous != null) { + req.setPrevious(PBHelperClient.convert(previous)); + } + if (excludeNodes != null) { + req.addAllExcludeNodes(PBHelperClient.convert(excludeNodes)); + } + if (favoredNodes != null) { + req.addAllFavoredNodes(Arrays.asList(favoredNodes)); + } + if (addBlockFlags != null) { + req.addAllFlags(PBHelperClient.convertAddBlockFlags( + addBlockFlags)); + } + + return asyncIpcClient(() -> rpcProxy.addBlock(null, req.build()), + res -> PBHelperClient.convertLocatedBlockProto(res.getBlock()), + LocatedBlock.class); + } + + @Override + public LocatedBlock getAdditionalDatanode( + String src, long fileId, + ExtendedBlock blk, DatanodeInfo[] existings, String[] existingStorageIDs, + DatanodeInfo[] excludes, int numAdditionalNodes, String clientName) + throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getAdditionalDatanode(src, fileId, blk, existings, + existingStorageIDs, excludes, numAdditionalNodes, clientName); + } + GetAdditionalDatanodeRequestProto req = GetAdditionalDatanodeRequestProto + .newBuilder() + .setSrc(src) + .setFileId(fileId) + .setBlk(PBHelperClient.convert(blk)) + .addAllExistings(PBHelperClient.convert(existings)) + .addAllExistingStorageUuids(Arrays.asList(existingStorageIDs)) + .addAllExcludes(PBHelperClient.convert(excludes)) + .setNumAdditionalNodes(numAdditionalNodes) + .setClientName(clientName) + .build(); + + return asyncIpcClient(() -> rpcProxy.getAdditionalDatanode(null, req), + res -> PBHelperClient.convertLocatedBlockProto(res.getBlock()), + LocatedBlock.class); + } + + @Override + public boolean complete(String src, String clientName, + ExtendedBlock last, long fileId) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.complete(src, clientName, last, fileId); + } + CompleteRequestProto.Builder req = CompleteRequestProto.newBuilder() + .setSrc(src) + .setClientName(clientName) + .setFileId(fileId); + if (last != null) { + req.setLast(PBHelperClient.convert(last)); + } + + return asyncIpcClient(() -> rpcProxy.complete(null, req.build()), + res -> res.getResult(), + Boolean.class); + } + + @Override + public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { + if (!Client.isAsynchronousMode()) { + super.reportBadBlocks(blocks); + return; + } + ReportBadBlocksRequestProto req = ReportBadBlocksRequestProto.newBuilder() + .addAllBlocks(Arrays.asList( + PBHelperClient.convertLocatedBlocks(blocks))) + .build(); + + asyncIpcClient(() -> rpcProxy.reportBadBlocks(null, req), + res -> null, Void.class); + } + + @Override + public boolean rename(String src, String dst) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.rename(src, dst); + } + RenameRequestProto req = RenameRequestProto.newBuilder() + .setSrc(src) + .setDst(dst).build(); + + return asyncIpcClient(() -> rpcProxy.rename(null, req), + res -> res.getResult(), + Boolean.class); + } + + + @Override + public void rename2(String src, String dst, Rename... options) + throws IOException { + if (!Client.isAsynchronousMode()) { + super.rename2(src, dst, options); + return; + } + boolean overwrite = false; + boolean toTrash = false; + if (options != null) { + for (Rename option : options) { + if (option == Rename.OVERWRITE) { + overwrite = true; + } + if (option == Rename.TO_TRASH) { + toTrash = true; + } + } + } + Rename2RequestProto req = Rename2RequestProto.newBuilder(). + setSrc(src). + setDst(dst). + setOverwriteDest(overwrite). + setMoveToTrash(toTrash). + build(); + + asyncIpcClient(() -> rpcProxy.rename2(null, req), + res -> null, Void.class); + } + + @Override + public void concat(String trg, String[] srcs) throws IOException { + if (!Client.isAsynchronousMode()) { + super.concat(trg, srcs); + return; + } + ConcatRequestProto req = ConcatRequestProto.newBuilder(). + setTrg(trg). + addAllSrcs(Arrays.asList(srcs)).build(); + + asyncIpcClient(() -> rpcProxy.concat(null, req), + res -> null, Void.class); + } + + + @Override + public boolean delete(String src, boolean recursive) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.delete(src, recursive); + } + DeleteRequestProto req = DeleteRequestProto.newBuilder().setSrc(src) + .setRecursive(recursive).build(); + + return asyncIpcClient(() -> rpcProxy.delete(null, req), + res -> res.getResult(), Boolean.class); + } + + @Override + public boolean mkdirs(String src, FsPermission masked, boolean createParent) + throws IOException { + if (!Client.isAsynchronousMode()) { + return super.mkdirs(src, masked, createParent); + } + MkdirsRequestProto.Builder builder = MkdirsRequestProto.newBuilder() + .setSrc(src) + .setMasked(PBHelperClient.convert(masked)) + .setCreateParent(createParent); + FsPermission unmasked = masked.getUnmasked(); + if (unmasked != null) { + builder.setUnmasked(PBHelperClient.convert(unmasked)); + } + MkdirsRequestProto req = builder.build(); + + return asyncIpcClient(() -> rpcProxy.mkdirs(null, req), + res -> res.getResult(), Boolean.class); + } + + @Override + public DirectoryListing getListing( + String src, byte[] startAfter, boolean needLocation) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getListing(src, startAfter, needLocation); + } + GetListingRequestProto req = GetListingRequestProto.newBuilder() + .setSrc(src) + .setStartAfter(ByteString.copyFrom(startAfter)) + .setNeedLocation(needLocation).build(); + + return asyncIpcClient(() -> rpcProxy.getListing(null, req), + res -> { + if (res.hasDirList()) { + return PBHelperClient.convert(res.getDirList()); + } + return null; + }, DirectoryListing.class); + } + + @Override + public BatchedDirectoryListing getBatchedListing( + String[] srcs, byte[] startAfter, boolean needLocation) + throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getBatchedListing(srcs, startAfter, needLocation); + } + GetBatchedListingRequestProto req = GetBatchedListingRequestProto + .newBuilder() + .addAllPaths(Arrays.asList(srcs)) + .setStartAfter(ByteString.copyFrom(startAfter)) + .setNeedLocation(needLocation).build(); + + return asyncIpcClient(() -> rpcProxy.getBatchedListing(null, req), + res -> { + if (res.getListingsCount() > 0) { + HdfsPartialListing[] listingArray = + new HdfsPartialListing[res.getListingsCount()]; + int listingIdx = 0; + for (BatchedDirectoryListingProto proto : res.getListingsList()) { + HdfsPartialListing listing; + if (proto.hasException()) { + HdfsProtos.RemoteExceptionProto reProto = proto.getException(); + RemoteException ex = new RemoteException( + reProto.getClassName(), reProto.getMessage()); + listing = new HdfsPartialListing(proto.getParentIdx(), ex); + } else { + List statuses = + PBHelperClient.convertHdfsFileStatus( + proto.getPartialListingList()); + listing = new HdfsPartialListing(proto.getParentIdx(), statuses); + } + listingArray[listingIdx++] = listing; + } + BatchedDirectoryListing batchedListing = + new BatchedDirectoryListing(listingArray, res.getHasMore(), + res.getStartAfter().toByteArray()); + return batchedListing; + } + return null; + }, BatchedDirectoryListing.class); + } + + + @Override + public void renewLease(String clientName, List namespaces) + throws IOException { + if (!Client.isAsynchronousMode()) { + super.renewLease(clientName, namespaces); + return; + } + RenewLeaseRequestProto.Builder builder = RenewLeaseRequestProto + .newBuilder().setClientName(clientName); + if (namespaces != null && !namespaces.isEmpty()) { + builder.addAllNamespaces(namespaces); + } + + asyncIpcClient(() -> rpcProxy.renewLease(null, builder.build()), + res -> null, Void.class); + } + + @Override + public boolean recoverLease(String src, String clientName) + throws IOException { + if (!Client.isAsynchronousMode()) { + return super.recoverLease(src, clientName); + } + RecoverLeaseRequestProto req = RecoverLeaseRequestProto.newBuilder() + .setSrc(src) + .setClientName(clientName).build(); + + return asyncIpcClient(() -> rpcProxy.recoverLease(null, req), + res -> res.getResult(), Boolean.class); + } + + @Override + public long[] getStats() throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getStats(); + } + + return asyncIpcClient(() -> rpcProxy.getFsStats(null, VOID_GET_FSSTATUS_REQUEST), + res -> PBHelperClient.convert(res), long[].class); + } + + @Override + public ReplicatedBlockStats getReplicatedBlockStats() throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getReplicatedBlockStats(); + } + + return asyncIpcClient(() -> rpcProxy.getFsReplicatedBlockStats(null, + VOID_GET_FS_REPLICATED_BLOCK_STATS_REQUEST), + res -> PBHelperClient.convert(res), ReplicatedBlockStats.class); + } + + @Override + public ECBlockGroupStats getECBlockGroupStats() throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getECBlockGroupStats(); + } + + return asyncIpcClient(() -> rpcProxy.getFsECBlockGroupStats(null, + VOID_GET_FS_ECBLOCKGROUP_STATS_REQUEST), + res -> PBHelperClient.convert(res), ECBlockGroupStats.class); + } + + @Override + public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) + throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getDatanodeReport(type); + } + GetDatanodeReportRequestProto req = GetDatanodeReportRequestProto + .newBuilder() + .setType(PBHelperClient.convert(type)).build(); + + return asyncIpcClient(() -> rpcProxy.getDatanodeReport(null, req), + res -> PBHelperClient.convert(res.getDiList()), DatanodeInfo[].class); + } + + @Override + public DatanodeStorageReport[] getDatanodeStorageReport( + DatanodeReportType type) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getDatanodeStorageReport(type); + } + final GetDatanodeStorageReportRequestProto req + = GetDatanodeStorageReportRequestProto.newBuilder() + .setType(PBHelperClient.convert(type)).build(); + + return asyncIpcClient(() -> rpcProxy.getDatanodeStorageReport(null, req), + res -> PBHelperClient.convertDatanodeStorageReports( + res.getDatanodeStorageReportsList()), DatanodeStorageReport[].class); + } + + @Override + public long getPreferredBlockSize(String filename) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getPreferredBlockSize(filename); + } + GetPreferredBlockSizeRequestProto req = GetPreferredBlockSizeRequestProto + .newBuilder() + .setFilename(filename) + .build(); + + return asyncIpcClient(() -> rpcProxy.getPreferredBlockSize(null, req), + res -> res.getBsize(), Long.class); + } + + @Override + public boolean setSafeMode(SafeModeAction action, boolean isChecked) + throws IOException { + if (!Client.isAsynchronousMode()) { + return super.setSafeMode(action, isChecked); + } + SetSafeModeRequestProto req = SetSafeModeRequestProto.newBuilder() + .setAction(PBHelperClient.convert(action)) + .setChecked(isChecked).build(); + + return asyncIpcClient(() -> rpcProxy.setSafeMode(null, req), + res -> res.getResult(), Boolean.class); + } + + @Override + public boolean saveNamespace(long timeWindow, long txGap) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.saveNamespace(timeWindow, txGap); + } + SaveNamespaceRequestProto req = SaveNamespaceRequestProto.newBuilder() + .setTimeWindow(timeWindow).setTxGap(txGap).build(); + + return asyncIpcClient(() -> rpcProxy.saveNamespace(null, req), + res -> res.getSaved(), Boolean.class); + } + + @Override + public long rollEdits() throws IOException { + if (!Client.isAsynchronousMode()) { + return super.rollEdits(); + } + return asyncIpcClient(() -> rpcProxy.rollEdits(null, VOID_ROLLEDITS_REQUEST), + res -> res.getNewSegmentTxId(), Long.class); + } + + @Override + public boolean restoreFailedStorage(String arg) throws IOException{ + if (!Client.isAsynchronousMode()) { + return super.restoreFailedStorage(arg); + } + RestoreFailedStorageRequestProto req = RestoreFailedStorageRequestProto + .newBuilder() + .setArg(arg).build(); + + return asyncIpcClient(() -> rpcProxy.restoreFailedStorage(null, req), + res -> res.getResult(), Boolean.class); + } + + @Override + public void refreshNodes() throws IOException { + if (!Client.isAsynchronousMode()) { + super.refreshNodes(); + return; + } + asyncIpcClient(() -> rpcProxy.refreshNodes(null, VOID_REFRESH_NODES_REQUEST), + res -> null, Void.class); + } + + @Override + public void finalizeUpgrade() throws IOException { + if (!Client.isAsynchronousMode()) { + super.finalizeUpgrade(); + return; + } + asyncIpcClient(() -> rpcProxy.finalizeUpgrade(null, VOID_FINALIZE_UPGRADE_REQUEST), + res -> null, Void.class); + } + + @Override + public boolean upgradeStatus() throws IOException { + if (!Client.isAsynchronousMode()) { + return super.upgradeStatus(); + } + return asyncIpcClient(() -> rpcProxy.upgradeStatus(null, VOID_UPGRADE_STATUS_REQUEST), + res -> res.getUpgradeFinalized(), Boolean.class); + } + + @Override + public RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action) + throws IOException { + if (!Client.isAsynchronousMode()) { + return super.rollingUpgrade(action); + } + final RollingUpgradeRequestProto r = RollingUpgradeRequestProto.newBuilder() + .setAction(PBHelperClient.convert(action)).build(); + + return asyncIpcClient(() -> rpcProxy.rollingUpgrade(null, r), + res -> PBHelperClient.convert(res.getRollingUpgradeInfo()), + RollingUpgradeInfo.class); + } + + @Override + public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) + throws IOException { + if (!Client.isAsynchronousMode()) { + return super.listCorruptFileBlocks(path, cookie); + } + ListCorruptFileBlocksRequestProto.Builder req = + ListCorruptFileBlocksRequestProto.newBuilder().setPath(path); + if (cookie != null) { + req.setCookie(cookie); + } + + return asyncIpcClient(() -> rpcProxy.listCorruptFileBlocks(null, req.build()), + res ->PBHelperClient.convert(res.getCorrupt()), CorruptFileBlocks.class); + } + + @Override + public void metaSave(String filename) throws IOException { + if (!Client.isAsynchronousMode()) { + super.metaSave(filename); + return; + } + MetaSaveRequestProto req = MetaSaveRequestProto.newBuilder() + .setFilename(filename).build(); + + asyncIpcClient(() -> rpcProxy.metaSave(null, req), + res -> null, Void.class); + } + + @Override + public HdfsFileStatus getFileInfo(String src) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getFileInfo(src); + } + GetFileInfoRequestProto req = GetFileInfoRequestProto.newBuilder() + .setSrc(src) + .build(); + + return asyncIpcClient(() -> rpcProxy.getFileInfo(null, req), + res -> res.hasFs() ? PBHelperClient.convert(res.getFs()) : null, + HdfsFileStatus.class); + } + + @Override + public HdfsLocatedFileStatus getLocatedFileInfo( + String src, boolean needBlockToken) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getLocatedFileInfo(src, needBlockToken); + } + GetLocatedFileInfoRequestProto req = + GetLocatedFileInfoRequestProto.newBuilder() + .setSrc(src) + .setNeedBlockToken(needBlockToken) + .build(); + + return asyncIpcClient(() -> rpcProxy.getLocatedFileInfo(null, req), + res -> (HdfsLocatedFileStatus) (res.hasFs() ? PBHelperClient.convert(res.getFs()) : null), + HdfsLocatedFileStatus.class); + } + + @Override + public HdfsFileStatus getFileLinkInfo(String src) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getFileLinkInfo(src); + } + GetFileLinkInfoRequestProto req = GetFileLinkInfoRequestProto.newBuilder() + .setSrc(src).build(); + + return asyncIpcClient(() -> rpcProxy.getFileLinkInfo(null, req), + res -> res.hasFs() ? PBHelperClient.convert(res.getFs()) : null, + HdfsFileStatus.class); + } + + @Override + public ContentSummary getContentSummary(String path) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getContentSummary(path); + } + GetContentSummaryRequestProto req = GetContentSummaryRequestProto + .newBuilder() + .setPath(path) + .build(); + + return asyncIpcClient(() -> rpcProxy.getContentSummary(null, req), + res -> PBHelperClient.convert(res.getSummary()), ContentSummary.class); + } + + @Override + public void setQuota( + String path, long namespaceQuota, long storagespaceQuota, + StorageType type) throws IOException { + if (!Client.isAsynchronousMode()) { + super.setQuota(path, namespaceQuota, storagespaceQuota, type); + return; + } + final SetQuotaRequestProto.Builder builder + = SetQuotaRequestProto.newBuilder() + .setPath(path) + .setNamespaceQuota(namespaceQuota) + .setStoragespaceQuota(storagespaceQuota); + if (type != null) { + builder.setStorageType(PBHelperClient.convertStorageType(type)); + } + final SetQuotaRequestProto req = builder.build(); + + asyncIpcClient(() -> rpcProxy.setQuota(null, req), + res -> null, Void.class); + } + + @Override + public void fsync( + String src, long fileId, String client, long lastBlockLength) throws IOException { + if (!Client.isAsynchronousMode()) { + super.fsync(src, fileId, client, lastBlockLength); + return; + } + FsyncRequestProto req = FsyncRequestProto.newBuilder().setSrc(src) + .setClient(client).setLastBlockLength(lastBlockLength) + .setFileId(fileId).build(); + + asyncIpcClient(() -> rpcProxy.fsync(null, req), + res -> null, Void.class); + } + + @Override + public void setTimes(String src, long mtime, long atime) throws IOException { + if (!Client.isAsynchronousMode()) { + super.setTimes(src, mtime, atime); + return; + } + SetTimesRequestProto req = SetTimesRequestProto.newBuilder() + .setSrc(src) + .setMtime(mtime) + .setAtime(atime) + .build(); + + asyncIpcClient(() -> rpcProxy.setTimes(null, req), + res -> null, Void.class); + } + + @Override + public void createSymlink( + String target, String link, FsPermission dirPerm, + boolean createParent) throws IOException { + if (!Client.isAsynchronousMode()) { + super.createSymlink(target, link, dirPerm, createParent); + return; + } + CreateSymlinkRequestProto req = CreateSymlinkRequestProto.newBuilder() + .setTarget(target) + .setLink(link) + .setDirPerm(PBHelperClient.convert(dirPerm)) + .setCreateParent(createParent) + .build(); + + asyncIpcClient(() -> rpcProxy.createSymlink(null, req), + res -> null, Void.class); + } + + @Override + public String getLinkTarget(String path) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getLinkTarget(path); + } + GetLinkTargetRequestProto req = GetLinkTargetRequestProto.newBuilder() + .setPath(path).build(); + + return asyncIpcClient(() -> rpcProxy.getLinkTarget(null, req), + res -> res.hasTargetPath() ? res.getTargetPath() : null, + String.class); + } + + @Override + public LocatedBlock updateBlockForPipeline( + ExtendedBlock block, String clientName) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.updateBlockForPipeline(block, clientName); + } + UpdateBlockForPipelineRequestProto req = UpdateBlockForPipelineRequestProto + .newBuilder() + .setBlock(PBHelperClient.convert(block)) + .setClientName(clientName) + .build(); + + return asyncIpcClient(() -> rpcProxy.updateBlockForPipeline(null, req), + res -> PBHelperClient.convertLocatedBlockProto(res.getBlock()), + LocatedBlock.class); + } + + @Override + public void updatePipeline( + String clientName, ExtendedBlock oldBlock, ExtendedBlock newBlock, + DatanodeID[] newNodes, String[] storageIDs) throws IOException { + if (!Client.isAsynchronousMode()) { + super.updatePipeline(clientName, oldBlock, newBlock, newNodes, storageIDs); + return; + } + UpdatePipelineRequestProto req = UpdatePipelineRequestProto.newBuilder() + .setClientName(clientName) + .setOldBlock(PBHelperClient.convert(oldBlock)) + .setNewBlock(PBHelperClient.convert(newBlock)) + .addAllNewNodes(Arrays.asList(PBHelperClient.convert(newNodes))) + .addAllStorageIDs(storageIDs == null ? null : Arrays.asList(storageIDs)) + .build(); + + asyncIpcClient(() -> rpcProxy.updatePipeline(null, req), + res -> null, Void.class); + } + + @Override + public Token getDelegationToken( + Text renewer) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getDelegationToken(renewer); + } + GetDelegationTokenRequestProto req = GetDelegationTokenRequestProto + .newBuilder() + .setRenewer(renewer == null ? "" : renewer.toString()) + .build(); + + return asyncIpcClient(() -> rpcProxy.getDelegationToken(null, req), + res -> res.hasToken() ? + PBHelperClient.convertDelegationToken(res.getToken()) : null, Token.class); + } + + @Override + public long renewDelegationToken( + Token token) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.renewDelegationToken(token); + } + RenewDelegationTokenRequestProto req = + RenewDelegationTokenRequestProto.newBuilder(). + setToken(PBHelperClient.convert(token)). + build(); + + return asyncIpcClient(() -> rpcProxy.renewDelegationToken(null, req), + res -> res.getNewExpiryTime(), Long.class); + } + + @Override + public void cancelDelegationToken( + Token token) throws IOException { + if (!Client.isAsynchronousMode()) { + super.cancelDelegationToken(token); + return; + } + CancelDelegationTokenRequestProto req = CancelDelegationTokenRequestProto + .newBuilder() + .setToken(PBHelperClient.convert(token)) + .build(); + + asyncIpcClient(() -> rpcProxy.cancelDelegationToken(null, req), + res -> null, Void.class); + } + + @Override + public void setBalancerBandwidth(long bandwidth) throws IOException { + if (!Client.isAsynchronousMode()) { + super.setBalancerBandwidth(bandwidth); + return; + } + SetBalancerBandwidthRequestProto req = + SetBalancerBandwidthRequestProto.newBuilder() + .setBandwidth(bandwidth) + .build(); + + asyncIpcClient(() -> rpcProxy.setBalancerBandwidth(null, req), + res -> null, Void.class); + } + + @Override + public DataEncryptionKey getDataEncryptionKey() throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getDataEncryptionKey(); + } + return asyncIpcClient(() -> rpcProxy.getDataEncryptionKey(null, + VOID_GET_DATA_ENCRYPTIONKEY_REQUEST), + res -> res.hasDataEncryptionKey() ? + PBHelperClient.convert(res.getDataEncryptionKey()) : null, + DataEncryptionKey.class); + } + + + @Override + public boolean isFileClosed(String src) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.isFileClosed(src); + } + IsFileClosedRequestProto req = IsFileClosedRequestProto.newBuilder() + .setSrc(src).build(); + + return asyncIpcClient(() -> rpcProxy.isFileClosed(null, req), + res -> res.getResult(), Boolean.class); + } + + @Override + public String createSnapshot( + String snapshotRoot, String snapshotName) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.createSnapshot(snapshotRoot, snapshotName); + } + final CreateSnapshotRequestProto.Builder builder + = CreateSnapshotRequestProto.newBuilder().setSnapshotRoot(snapshotRoot); + if (snapshotName != null) { + builder.setSnapshotName(snapshotName); + } + final CreateSnapshotRequestProto req = builder.build(); + + return asyncIpcClient(() -> rpcProxy.createSnapshot(null, req), + res -> res.getSnapshotPath(), String.class); + } + + @Override + public void deleteSnapshot(String snapshotRoot, String snapshotName) + throws IOException { + if (!Client.isAsynchronousMode()) { + super.deleteSnapshot(snapshotRoot, snapshotName); + return; + } + DeleteSnapshotRequestProto req = DeleteSnapshotRequestProto.newBuilder() + .setSnapshotRoot(snapshotRoot).setSnapshotName(snapshotName).build(); + + asyncIpcClient(() -> rpcProxy.deleteSnapshot(null, req), + res -> null, Void.class); + } + + @Override + public void allowSnapshot(String snapshotRoot) throws IOException { + if (!Client.isAsynchronousMode()) { + super.allowSnapshot(snapshotRoot); + return; + } + AllowSnapshotRequestProto req = AllowSnapshotRequestProto.newBuilder() + .setSnapshotRoot(snapshotRoot).build(); + + asyncIpcClient(() -> rpcProxy.allowSnapshot(null, req), + res -> null, Void.class); + } + + @Override + public void disallowSnapshot(String snapshotRoot) throws IOException { + if (!Client.isAsynchronousMode()) { + super.disallowSnapshot(snapshotRoot); + return; + } + DisallowSnapshotRequestProto req = DisallowSnapshotRequestProto + .newBuilder().setSnapshotRoot(snapshotRoot).build(); + + asyncIpcClient(() -> rpcProxy.disallowSnapshot(null, req), + res -> null, Void.class); + } + + @Override + public void renameSnapshot( + String snapshotRoot, String snapshotOldName, + String snapshotNewName) throws IOException { + if (!Client.isAsynchronousMode()) { + super.renameSnapshot(snapshotRoot, snapshotOldName, snapshotNewName); + return; + } + RenameSnapshotRequestProto req = RenameSnapshotRequestProto.newBuilder() + .setSnapshotRoot(snapshotRoot).setSnapshotOldName(snapshotOldName) + .setSnapshotNewName(snapshotNewName).build(); + + asyncIpcClient(() -> rpcProxy.renameSnapshot(null, req), + res -> null, Void.class); + } + + @Override + public SnapshottableDirectoryStatus[] getSnapshottableDirListing() throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getSnapshottableDirListing(); + } + GetSnapshottableDirListingRequestProto req = + GetSnapshottableDirListingRequestProto.newBuilder().build(); + + return asyncIpcClient(() -> rpcProxy.getSnapshottableDirListing(null, req), + res -> { + if (res.hasSnapshottableDirList()) { + return PBHelperClient.convert(res.getSnapshottableDirList()); + } + return null; + }, SnapshottableDirectoryStatus[].class); + } + + @Override + public SnapshotStatus[] getSnapshotListing(String path) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getSnapshotListing(path); + } + GetSnapshotListingRequestProto req = + GetSnapshotListingRequestProto.newBuilder() + .setSnapshotRoot(path).build(); + + return asyncIpcClient(() -> rpcProxy.getSnapshotListing(null, req), + res -> { + if (res.hasSnapshotList()) { + return PBHelperClient.convert(res.getSnapshotList()); + } + return null; + }, SnapshotStatus[].class); + } + + @Override + public SnapshotDiffReport getSnapshotDiffReport( + String snapshotRoot, String fromSnapshot, String toSnapshot) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getSnapshotDiffReport(snapshotRoot, fromSnapshot, toSnapshot); + } + GetSnapshotDiffReportRequestProto req = GetSnapshotDiffReportRequestProto + .newBuilder().setSnapshotRoot(snapshotRoot) + .setFromSnapshot(fromSnapshot).setToSnapshot(toSnapshot).build(); + + return asyncIpcClient(() -> rpcProxy.getSnapshotDiffReport(null, req), + res -> PBHelperClient.convert(res.getDiffReport()), SnapshotDiffReport.class); + } + + @Override + public SnapshotDiffReportListing getSnapshotDiffReportListing( + String snapshotRoot, String fromSnapshot, String toSnapshot, + byte[] startPath, int index) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getSnapshotDiffReportListing(snapshotRoot, fromSnapshot, + toSnapshot, startPath, index); + } + GetSnapshotDiffReportListingRequestProto req = + GetSnapshotDiffReportListingRequestProto.newBuilder() + .setSnapshotRoot(snapshotRoot).setFromSnapshot(fromSnapshot) + .setToSnapshot(toSnapshot).setCursor( + HdfsProtos.SnapshotDiffReportCursorProto.newBuilder() + .setStartPath(PBHelperClient.getByteString(startPath)) + .setIndex(index).build()).build(); + + return asyncIpcClient(() -> rpcProxy.getSnapshotDiffReportListing(null, req), + res -> PBHelperClient.convert(res.getDiffReport()), + SnapshotDiffReportListing.class); + } + + @Override + public long addCacheDirective( + CacheDirectiveInfo directive, EnumSet flags) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.addCacheDirective(directive, flags); + } + AddCacheDirectiveRequestProto.Builder builder = + AddCacheDirectiveRequestProto.newBuilder(). + setInfo(PBHelperClient.convert(directive)); + if (!flags.isEmpty()) { + builder.setCacheFlags(PBHelperClient.convertCacheFlags(flags)); + } + + return asyncIpcClient(() -> rpcProxy.addCacheDirective(null, builder.build()), + res -> res.getId(), Long.class); + } + + @Override + public void modifyCacheDirective( + CacheDirectiveInfo directive, EnumSet flags) throws IOException { + if (!Client.isAsynchronousMode()) { + super.modifyCacheDirective(directive, flags); + return; + } + ModifyCacheDirectiveRequestProto.Builder builder = + ModifyCacheDirectiveRequestProto.newBuilder(). + setInfo(PBHelperClient.convert(directive)); + if (!flags.isEmpty()) { + builder.setCacheFlags(PBHelperClient.convertCacheFlags(flags)); + } + + asyncIpcClient(() -> rpcProxy.modifyCacheDirective(null, builder.build()), + res -> null, Void.class); + } + + @Override + public void removeCacheDirective(long id) throws IOException { + if (!Client.isAsynchronousMode()) { + super.removeCacheDirective(id); + return; + } + + asyncIpcClient(() -> rpcProxy.removeCacheDirective(null, + RemoveCacheDirectiveRequestProto.newBuilder(). + setId(id).build()), + res -> null, Void.class); + } + + @Override + public BatchedEntries listCacheDirectives( + long prevId, CacheDirectiveInfo filter) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.listCacheDirectives(prevId, filter); + } + if (filter == null) { + filter = new CacheDirectiveInfo.Builder().build(); + } + CacheDirectiveInfo f = filter; + + return asyncIpcClient(() -> rpcProxy.listCacheDirectives(null, + ListCacheDirectivesRequestProto.newBuilder(). + setPrevId(prevId). + setFilter(PBHelperClient.convert(f)). + build()), + res -> new BatchedCacheEntries(res), BatchedEntries.class); + } + + @Override + public void addCachePool(CachePoolInfo info) throws IOException { + if (!Client.isAsynchronousMode()) { + super.addCachePool(info); + return; + } + AddCachePoolRequestProto.Builder builder = + AddCachePoolRequestProto.newBuilder(); + builder.setInfo(PBHelperClient.convert(info)); + + asyncIpcClient(() -> rpcProxy.addCachePool(null, builder.build()), + res -> null, Void.class); + } + + @Override + public void modifyCachePool(CachePoolInfo req) throws IOException { + if (!Client.isAsynchronousMode()) { + super.modifyCachePool(req); + return; + } + ModifyCachePoolRequestProto.Builder builder = + ModifyCachePoolRequestProto.newBuilder(); + builder.setInfo(PBHelperClient.convert(req)); + + asyncIpcClient(() -> rpcProxy.modifyCachePool(null, builder.build()), + res -> null, Void.class); + } + + @Override + public void removeCachePool(String cachePoolName) throws IOException { + if (!Client.isAsynchronousMode()) { + super.removeCachePool(cachePoolName); + return; + } + + asyncIpcClient(() -> rpcProxy.removeCachePool(null, + RemoveCachePoolRequestProto.newBuilder(). + setPoolName(cachePoolName).build()), + res -> null, Void.class); + } + + @Override + public BatchedEntries listCachePools(String prevKey) + throws IOException { + if (!Client.isAsynchronousMode()) { + return super.listCachePools(prevKey); + } + + return asyncIpcClient(() -> rpcProxy.listCachePools(null, + ListCachePoolsRequestProto.newBuilder().setPrevPoolName(prevKey).build()), + res -> new BatchedCachePoolEntries(res), BatchedEntries.class); + } + + @Override + public void modifyAclEntries(String src, List aclSpec) + throws IOException { + if (!Client.isAsynchronousMode()) { + super.modifyAclEntries(src, aclSpec); + return; + } + ModifyAclEntriesRequestProto req = ModifyAclEntriesRequestProto + .newBuilder().setSrc(src) + .addAllAclSpec(PBHelperClient.convertAclEntryProto(aclSpec)).build(); + + asyncIpcClient(() -> rpcProxy.modifyAclEntries(null, req), + res -> null, Void.class); + } + + @Override + public void removeAclEntries(String src, List aclSpec) + throws IOException { + if (!Client.isAsynchronousMode()) { + super.removeAclEntries(src, aclSpec); + return; + } + RemoveAclEntriesRequestProto req = RemoveAclEntriesRequestProto + .newBuilder().setSrc(src) + .addAllAclSpec(PBHelperClient.convertAclEntryProto(aclSpec)).build(); + + asyncIpcClient(() -> rpcProxy.removeAclEntries(null, req), + res -> null, Void.class); + } + + @Override + public void removeDefaultAcl(String src) throws IOException { + if (!Client.isAsynchronousMode()) { + super.removeDefaultAcl(src); + return; + } + RemoveDefaultAclRequestProto req = RemoveDefaultAclRequestProto + .newBuilder().setSrc(src).build(); + + asyncIpcClient(() -> rpcProxy.removeDefaultAcl(null, req), + res -> null, Void.class); + } + + @Override + public void removeAcl(String src) throws IOException { + if (!Client.isAsynchronousMode()) { + super.removeAcl(src); + return; + } + RemoveAclRequestProto req = RemoveAclRequestProto.newBuilder() + .setSrc(src).build(); + + asyncIpcClient(() -> rpcProxy.removeAcl(null, req), + res -> null, Void.class); + } + + @Override + public void setAcl(String src, List aclSpec) throws IOException { + if (!Client.isAsynchronousMode()) { + super.setAcl(src, aclSpec); + return; + } + SetAclRequestProto req = SetAclRequestProto.newBuilder() + .setSrc(src) + .addAllAclSpec(PBHelperClient.convertAclEntryProto(aclSpec)) + .build(); + + asyncIpcClient(() -> rpcProxy.setAcl(null, req), + res -> null, Void.class); + } + + @Override + public AclStatus getAclStatus(String src) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getAclStatus(src); + } + GetAclStatusRequestProto req = GetAclStatusRequestProto.newBuilder() + .setSrc(src).build(); + + return asyncIpcClient(() -> rpcProxy.getAclStatus(null, req), + res -> PBHelperClient.convert(res), AclStatus.class); + } + + @Override + public void createEncryptionZone(String src, String keyName) + throws IOException { + if (!Client.isAsynchronousMode()) { + super.createEncryptionZone(src, keyName); + return; + } + final CreateEncryptionZoneRequestProto.Builder builder = + CreateEncryptionZoneRequestProto.newBuilder(); + builder.setSrc(src); + if (keyName != null && !keyName.isEmpty()) { + builder.setKeyName(keyName); + } + CreateEncryptionZoneRequestProto req = builder.build(); + + asyncIpcClient(() -> rpcProxy.createEncryptionZone(null, req), + res -> null, Void.class); + } + + @Override + public EncryptionZone getEZForPath(String src) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getEZForPath(src); + } + final GetEZForPathRequestProto.Builder builder = + GetEZForPathRequestProto.newBuilder(); + builder.setSrc(src); + final GetEZForPathRequestProto req = builder.build(); + + return asyncIpcClient(() -> rpcProxy.getEZForPath(null, req), + res -> { + if (res.hasZone()) { + return PBHelperClient.convert(res.getZone()); + } else { + return null; + } + }, EncryptionZone.class); + } + + @Override + public BatchedEntries listEncryptionZones(long id) + throws IOException { + if (!Client.isAsynchronousMode()) { + return super.listEncryptionZones(id); + } + final ListEncryptionZonesRequestProto req = + ListEncryptionZonesRequestProto.newBuilder() + .setId(id) + .build(); + + return asyncIpcClient(() -> rpcProxy.listEncryptionZones(null, req), + res -> { + List elements = + Lists.newArrayListWithCapacity(res.getZonesCount()); + for (EncryptionZoneProto p : res.getZonesList()) { + elements.add(PBHelperClient.convert(p)); + } + return new BatchedListEntries<>(elements, res.getHasMore()); + }, BatchedEntries.class); + } + + @Override + public void setErasureCodingPolicy(String src, String ecPolicyName) + throws IOException { + if (!Client.isAsynchronousMode()) { + super.setErasureCodingPolicy(src, ecPolicyName); + return; + } + final SetErasureCodingPolicyRequestProto.Builder builder = + SetErasureCodingPolicyRequestProto.newBuilder(); + builder.setSrc(src); + if (ecPolicyName != null) { + builder.setEcPolicyName(ecPolicyName); + } + SetErasureCodingPolicyRequestProto req = builder.build(); + + asyncIpcClient(() -> rpcProxy.setErasureCodingPolicy(null, req), + res -> null, Void.class); + } + + @Override + public void unsetErasureCodingPolicy(String src) throws IOException { + if (!Client.isAsynchronousMode()) { + super.unsetErasureCodingPolicy(src); + return; + } + final UnsetErasureCodingPolicyRequestProto.Builder builder = + UnsetErasureCodingPolicyRequestProto.newBuilder(); + builder.setSrc(src); + UnsetErasureCodingPolicyRequestProto req = builder.build(); + + asyncIpcClient(() -> rpcProxy.unsetErasureCodingPolicy(null, req), + res -> null, Void.class); + } + + @Override + public ECTopologyVerifierResult getECTopologyResultForPolicies( + final String... policyNames) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getECTopologyResultForPolicies(policyNames); + } + final GetECTopologyResultForPoliciesRequestProto.Builder builder = + GetECTopologyResultForPoliciesRequestProto.newBuilder(); + builder.addAllPolicies(Arrays.asList(policyNames)); + GetECTopologyResultForPoliciesRequestProto req = builder.build(); + + return asyncIpcClient(() -> rpcProxy.getECTopologyResultForPolicies(null, req), + res -> PBHelperClient.convertECTopologyVerifierResultProto(res.getResponse()), + ECTopologyVerifierResult.class); + } + + @Override + public void reencryptEncryptionZone(String zone, ReencryptAction action) + throws IOException { + if (!Client.isAsynchronousMode()) { + super.reencryptEncryptionZone(zone, action); + return; + } + final ReencryptEncryptionZoneRequestProto.Builder builder = + ReencryptEncryptionZoneRequestProto.newBuilder(); + builder.setZone(zone).setAction(PBHelperClient.convert(action)); + ReencryptEncryptionZoneRequestProto req = builder.build(); + + asyncIpcClient(() -> rpcProxy.reencryptEncryptionZone(null, req), + res -> null, Void.class); + } + + @Override + public BatchedEntries listReencryptionStatus(long id) + throws IOException { + if (!Client.isAsynchronousMode()) { + return super.listReencryptionStatus(id); + } + final ListReencryptionStatusRequestProto req = + ListReencryptionStatusRequestProto.newBuilder().setId(id).build(); + + return asyncIpcClient(() -> rpcProxy.listReencryptionStatus(null, req), + res -> { + List elements = + Lists.newArrayListWithCapacity(res.getStatusesCount()); + for (ZoneReencryptionStatusProto p : res.getStatusesList()) { + elements.add(PBHelperClient.convert(p)); + } + return new BatchedListEntries<>(elements, res.getHasMore()); + }, BatchedEntries.class); + } + + @Override + public void setXAttr(String src, XAttr xAttr, EnumSet flag) + throws IOException { + if (!Client.isAsynchronousMode()) { + super.setXAttr(src, xAttr, flag); + return; + } + SetXAttrRequestProto req = SetXAttrRequestProto.newBuilder() + .setSrc(src) + .setXAttr(PBHelperClient.convertXAttrProto(xAttr)) + .setFlag(PBHelperClient.convert(flag)) + .build(); + + asyncIpcClient(() -> rpcProxy.setXAttr(null, req), + res -> null, Void.class); + } + + @Override + public List getXAttrs(String src, List xAttrs) + throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getXAttrs(src, xAttrs); + } + GetXAttrsRequestProto.Builder builder = GetXAttrsRequestProto.newBuilder(); + builder.setSrc(src); + if (xAttrs != null) { + builder.addAllXAttrs(PBHelperClient.convertXAttrProto(xAttrs)); + } + GetXAttrsRequestProto req = builder.build(); + + return asyncIpcClient(() -> rpcProxy.getXAttrs(null, req), + res -> PBHelperClient.convert(res), List.class); + } + + @Override + public List listXAttrs(String src) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.listXAttrs(src); + } + ListXAttrsRequestProto.Builder builder = + ListXAttrsRequestProto.newBuilder(); + builder.setSrc(src); + ListXAttrsRequestProto req = builder.build(); + + return asyncIpcClient(() -> rpcProxy.listXAttrs(null, req), + res -> PBHelperClient.convert(res), List.class); + } + + @Override + public void removeXAttr(String src, XAttr xAttr) throws IOException { + if (!Client.isAsynchronousMode()) { + super.removeXAttr(src, xAttr); + return; + } + RemoveXAttrRequestProto req = RemoveXAttrRequestProto + .newBuilder().setSrc(src) + .setXAttr(PBHelperClient.convertXAttrProto(xAttr)).build(); + + asyncIpcClient(() -> rpcProxy.removeXAttr(null, req), + res -> null, Void.class); + } + + @Override + public void checkAccess(String path, FsAction mode) throws IOException { + if (!Client.isAsynchronousMode()) { + super.checkAccess(path, mode); + return; + } + CheckAccessRequestProto req = CheckAccessRequestProto.newBuilder() + .setPath(path).setMode(PBHelperClient.convert(mode)).build(); + + asyncIpcClient(() -> rpcProxy.checkAccess(null, req), + res -> null, Void.class); + } + + @Override + public void setStoragePolicy(String src, String policyName) + throws IOException { + if (!Client.isAsynchronousMode()) { + super.setStoragePolicy(src, policyName); + return; + } + SetStoragePolicyRequestProto req = SetStoragePolicyRequestProto + .newBuilder().setSrc(src).setPolicyName(policyName).build(); + + asyncIpcClient(() -> rpcProxy.setStoragePolicy(null, req), + res -> null, Void.class); + } + + @Override + public void unsetStoragePolicy(String src) throws IOException { + if (!Client.isAsynchronousMode()) { + super.unsetStoragePolicy(src); + return; + } + UnsetStoragePolicyRequestProto req = UnsetStoragePolicyRequestProto + .newBuilder().setSrc(src).build(); + + asyncIpcClient(() -> rpcProxy.unsetStoragePolicy(null, req), + res -> null, Void.class); + } + + @Override + public BlockStoragePolicy getStoragePolicy(String path) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getStoragePolicy(path); + } + GetStoragePolicyRequestProto request = GetStoragePolicyRequestProto + .newBuilder().setPath(path).build(); + + return asyncIpcClient(() -> rpcProxy.getStoragePolicy(null, request), + res -> PBHelperClient.convert(res.getStoragePolicy()), + BlockStoragePolicy.class); + } + + @Override + public BlockStoragePolicy[] getStoragePolicies() throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getStoragePolicies(); + } + + return asyncIpcClient(() -> rpcProxy.getStoragePolicies(null, + VOID_GET_STORAGE_POLICIES_REQUEST), + res -> PBHelperClient.convertStoragePolicies(res.getPoliciesList()), + BlockStoragePolicy[].class); + } + + public long getCurrentEditLogTxid() throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getCurrentEditLogTxid(); + } + GetCurrentEditLogTxidRequestProto req = GetCurrentEditLogTxidRequestProto + .getDefaultInstance(); + + return asyncIpcClient(() -> rpcProxy.getCurrentEditLogTxid(null, req), + res -> res.getTxid(), Long.class); + } + + @Override + public EventBatchList getEditsFromTxid(long txid) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getEditsFromTxid(txid); + } + GetEditsFromTxidRequestProto req = GetEditsFromTxidRequestProto.newBuilder() + .setTxid(txid).build(); + + return asyncIpcClient(() -> rpcProxy.getEditsFromTxid(null, req), + res -> PBHelperClient.convert(res), EventBatchList.class); + } + + @Override + public AddErasureCodingPolicyResponse[] addErasureCodingPolicies( + ErasureCodingPolicy[] policies) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.addErasureCodingPolicies(policies); + } + List protos = Arrays.stream(policies) + .map(PBHelperClient::convertErasureCodingPolicy) + .collect(Collectors.toList()); + AddErasureCodingPoliciesRequestProto req = + AddErasureCodingPoliciesRequestProto.newBuilder() + .addAllEcPolicies(protos).build(); + + return asyncIpcClient(() -> rpcProxy.addErasureCodingPolicies(null, req), + res -> res.getResponsesList().stream() + .map(PBHelperClient::convertAddErasureCodingPolicyResponse) + .toArray(AddErasureCodingPolicyResponse[]::new), + AddErasureCodingPolicyResponse[].class); + } + + @Override + public void removeErasureCodingPolicy(String ecPolicyName) + throws IOException { + if (!Client.isAsynchronousMode()) { + super.removeErasureCodingPolicy(ecPolicyName); + return; + } + RemoveErasureCodingPolicyRequestProto.Builder builder = + RemoveErasureCodingPolicyRequestProto.newBuilder(); + builder.setEcPolicyName(ecPolicyName); + RemoveErasureCodingPolicyRequestProto req = builder.build(); + + asyncIpcClient(() -> rpcProxy.removeErasureCodingPolicy(null, req), + res -> null, Void.class); + } + + @Override + public void enableErasureCodingPolicy(String ecPolicyName) + throws IOException { + if (!Client.isAsynchronousMode()) { + super.enableErasureCodingPolicy(ecPolicyName); + return; + } + EnableErasureCodingPolicyRequestProto.Builder builder = + EnableErasureCodingPolicyRequestProto.newBuilder(); + builder.setEcPolicyName(ecPolicyName); + EnableErasureCodingPolicyRequestProto req = builder.build(); + + asyncIpcClient(() -> rpcProxy.enableErasureCodingPolicy(null, req), + res -> null, Void.class); + } + + @Override + public void disableErasureCodingPolicy(String ecPolicyName) + throws IOException { + if (!Client.isAsynchronousMode()) { + super.disableErasureCodingPolicy(ecPolicyName); + return; + } + DisableErasureCodingPolicyRequestProto.Builder builder = + DisableErasureCodingPolicyRequestProto.newBuilder(); + builder.setEcPolicyName(ecPolicyName); + DisableErasureCodingPolicyRequestProto req = builder.build(); + + asyncIpcClient(() -> rpcProxy.disableErasureCodingPolicy(null, req), + res -> null, Void.class); + } + + @Override + public ErasureCodingPolicyInfo[] getErasureCodingPolicies() + throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getErasureCodingPolicies(); + } + + return asyncIpcClient(() -> rpcProxy.getErasureCodingPolicies( + null, VOID_GET_EC_POLICIES_REQUEST), + res -> { + ErasureCodingPolicyInfo[] ecPolicies = + new ErasureCodingPolicyInfo[res.getEcPoliciesCount()]; + int i = 0; + for (ErasureCodingPolicyProto proto : res.getEcPoliciesList()) { + ecPolicies[i++] = + PBHelperClient.convertErasureCodingPolicyInfo(proto); + } + return ecPolicies; + }, ErasureCodingPolicyInfo[].class); + } + + @Override + public Map getErasureCodingCodecs() throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getErasureCodingCodecs(); + } + + return asyncIpcClient(() -> rpcProxy + .getErasureCodingCodecs(null, VOID_GET_EC_CODEC_REQUEST), + res -> { + Map ecCodecs = new HashMap<>(); + for (CodecProto codec : res.getCodecList()) { + ecCodecs.put(codec.getCodec(), codec.getCoders()); + } + return ecCodecs; + }, Map.class); + } + + @Override + public ErasureCodingPolicy getErasureCodingPolicy(String src) + throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getErasureCodingPolicy(src); + } + GetErasureCodingPolicyRequestProto req = + GetErasureCodingPolicyRequestProto.newBuilder().setSrc(src).build(); + + return asyncIpcClient(() -> rpcProxy.getErasureCodingPolicy(null, req), + res -> { + if (res.hasEcPolicy()) { + return PBHelperClient.convertErasureCodingPolicy( + res.getEcPolicy()); + } + return null; + }, ErasureCodingPolicy.class); + } + + @Override + public QuotaUsage getQuotaUsage(String path) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getQuotaUsage(path); + } + GetQuotaUsageRequestProto req = + GetQuotaUsageRequestProto.newBuilder().setPath(path).build(); + + return asyncIpcClient(() -> rpcProxy.getQuotaUsage(null, req), + res -> PBHelperClient.convert(res.getUsage()), QuotaUsage.class); + } + + @Override + public BatchedEntries listOpenFiles( + long prevId, EnumSet openFilesTypes, + String path) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.listOpenFiles(prevId, openFilesTypes, path); + } + ListOpenFilesRequestProto.Builder req = + ListOpenFilesRequestProto.newBuilder().setId(prevId); + if (openFilesTypes != null) { + req.addAllTypes(PBHelperClient.convertOpenFileTypes(openFilesTypes)); + } + req.setPath(path); + + return asyncIpcClient(() -> rpcProxy.listOpenFiles(null, req.build()), + res -> { + List openFileEntries = + Lists.newArrayListWithCapacity(res.getEntriesCount()); + for (OpenFilesBatchResponseProto p : res.getEntriesList()) { + openFileEntries.add(PBHelperClient.convert(p)); + } + return new BatchedListEntries<>(openFileEntries, res.getHasMore()); + }, BatchedEntries.class); + } + + @Override + public void msync() throws IOException { + if (!Client.isAsynchronousMode()) { + super.msync(); + return; + } + MsyncRequestProto.Builder req = MsyncRequestProto.newBuilder(); + + asyncIpcClient(() -> rpcProxy.msync(null, req.build()), + res -> null, Void.class); + } + + @Override + public void satisfyStoragePolicy(String src) throws IOException { + if (!Client.isAsynchronousMode()) { + super.satisfyStoragePolicy(src); + return; + } + SatisfyStoragePolicyRequestProto req = + SatisfyStoragePolicyRequestProto.newBuilder().setSrc(src).build(); + + asyncIpcClient(() -> rpcProxy.satisfyStoragePolicy(null, req), + res -> null, Void.class); + } + + @Override + public DatanodeInfo[] getSlowDatanodeReport() throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getSlowDatanodeReport(); + } + GetSlowDatanodeReportRequestProto req = + GetSlowDatanodeReportRequestProto.newBuilder().build(); + + return asyncIpcClient(() -> rpcProxy.getSlowDatanodeReport(null, req), + res -> PBHelperClient.convert(res.getDatanodeInfoProtoList()), + DatanodeInfo[].class); + } + + @Override + public HAServiceProtocol.HAServiceState getHAServiceState() + throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getHAServiceState(); + } + HAServiceStateRequestProto req = + HAServiceStateRequestProto.newBuilder().build(); + + return asyncIpcClient(() -> rpcProxy.getHAServiceState(null, req), + res -> { + switch(res.getState()) { + case ACTIVE: + return HAServiceProtocol.HAServiceState.ACTIVE; + case STANDBY: + return HAServiceProtocol.HAServiceState.STANDBY; + case OBSERVER: + return HAServiceProtocol.HAServiceState.OBSERVER; + case INITIALIZING: + default: + return HAServiceProtocol.HAServiceState.INITIALIZING; + } + }, HAServiceProtocol.HAServiceState.class); + } + + @Override + public Path getEnclosingRoot(String filename) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getEnclosingRoot(filename); + } + final GetEnclosingRootRequestProto.Builder builder = + GetEnclosingRootRequestProto.newBuilder(); + builder.setFilename(filename); + final GetEnclosingRootRequestProto req = builder.build(); + + return asyncIpcClient(() -> rpcProxy.getEnclosingRoot(null, req), + res -> new Path(res.getEnclosingRootPath()), + Path.class); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterGetUserMappingsProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterGetUserMappingsProtocolTranslatorPB.java new file mode 100644 index 0000000000000..4d8dd7164ade0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterGetUserMappingsProtocolTranslatorPB.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocolPB; + +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos; +import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB; +import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB; + +import java.io.IOException; + +import static org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil.asyncIpcClient; + +public class RouterGetUserMappingsProtocolTranslatorPB + extends GetUserMappingsProtocolClientSideTranslatorPB { + private final GetUserMappingsProtocolPB rpcProxy; + + public RouterGetUserMappingsProtocolTranslatorPB(GetUserMappingsProtocolPB rpcProxy) { + super(rpcProxy); + this.rpcProxy = rpcProxy; + } + + @Override + public String[] getGroupsForUser(String user) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getGroupsForUser(user); + } + GetUserMappingsProtocolProtos.GetGroupsForUserRequestProto request = + GetUserMappingsProtocolProtos.GetGroupsForUserRequestProto + .newBuilder().setUser(user).build(); + + return asyncIpcClient(() -> rpcProxy.getGroupsForUser(null, request), + res -> res.getGroupsList().toArray(new String[res.getGroupsCount()]), + String[].class); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterNamenodeProtocolTranslatorPB.java new file mode 100644 index 0000000000000..54c099581423d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterNamenodeProtocolTranslatorPB.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos; +import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; +import org.apache.hadoop.hdfs.server.namenode.NNStorage; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; +import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; +import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; +import org.apache.hadoop.ipc.Client; + +import java.io.IOException; +import static org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil.asyncIpcClient; + +public class RouterNamenodeProtocolTranslatorPB extends NamenodeProtocolTranslatorPB { + /* + * Protobuf requests with no parameters instantiated only once + */ + private static final GetBlockKeysRequestProto VOID_GET_BLOCKKEYS_REQUEST = + GetBlockKeysRequestProto.newBuilder().build(); + private static final GetTransactionIdRequestProto VOID_GET_TRANSACTIONID_REQUEST = + GetTransactionIdRequestProto.newBuilder().build(); + private static final RollEditLogRequestProto VOID_ROLL_EDITLOG_REQUEST = + RollEditLogRequestProto.newBuilder().build(); + private static final VersionRequestProto VOID_VERSION_REQUEST = + VersionRequestProto.newBuilder().build(); + private final NamenodeProtocolPB rpcProxy; + + public RouterNamenodeProtocolTranslatorPB(NamenodeProtocolPB rpcProxy) { + super(rpcProxy); + this.rpcProxy = rpcProxy; + } + + @Override + public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long + minBlockSize, long timeInterval, StorageType storageType) + throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getBlocks(datanode, size, minBlockSize, timeInterval, storageType); + } + NamenodeProtocolProtos.GetBlocksRequestProto.Builder builder = + NamenodeProtocolProtos.GetBlocksRequestProto.newBuilder() + .setDatanode(PBHelperClient.convert((DatanodeID)datanode)).setSize(size) + .setMinBlockSize(minBlockSize).setTimeInterval(timeInterval); + if (storageType != null) { + builder.setStorageType(PBHelperClient.convertStorageType(storageType)); + } + NamenodeProtocolProtos.GetBlocksRequestProto req = builder.build(); + + return asyncIpcClient(() -> rpcProxy.getBlocks(null, req), + res -> PBHelper.convert(res.getBlocks()), + BlocksWithLocations.class); + } + + @Override + public ExportedBlockKeys getBlockKeys() throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getBlockKeys(); + } + + return asyncIpcClient(() -> rpcProxy.getBlockKeys(null, + VOID_GET_BLOCKKEYS_REQUEST), + res -> res.hasKeys() ? PBHelper.convert(res.getKeys()) : null, + ExportedBlockKeys.class); + } + + @Override + public long getTransactionID() throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getTransactionID(); + } + + return asyncIpcClient(() -> rpcProxy.getTransactionId(null, + VOID_GET_TRANSACTIONID_REQUEST), + res -> res.getTxId(), Long.class); + } + + @Override + public long getMostRecentCheckpointTxId() throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getMostRecentCheckpointTxId(); + } + + return asyncIpcClient(() -> rpcProxy.getMostRecentCheckpointTxId(null, + NamenodeProtocolProtos + .GetMostRecentCheckpointTxIdRequestProto + .getDefaultInstance()), + res -> res.getTxId(), Long.class); + } + + @Override + public long getMostRecentNameNodeFileTxId(NNStorage.NameNodeFile nnf) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getMostRecentNameNodeFileTxId(nnf); + } + + return asyncIpcClient(() -> rpcProxy.getMostRecentNameNodeFileTxId(null, + NamenodeProtocolProtos + .GetMostRecentNameNodeFileTxIdRequestProto + .newBuilder() + .setNameNodeFile(nnf.toString()) + .build()), + res -> res.getTxId(), Long.class); + } + + @Override + public CheckpointSignature rollEditLog() throws IOException { + if (!Client.isAsynchronousMode()) { + return super.rollEditLog(); + } + + return asyncIpcClient(() -> rpcProxy.rollEditLog(null, + VOID_ROLL_EDITLOG_REQUEST), + res -> PBHelper.convert(res.getSignature()), CheckpointSignature.class); + } + + @Override + public NamespaceInfo versionRequest() throws IOException { + if (!Client.isAsynchronousMode()) { + return super.versionRequest(); + } + return asyncIpcClient(() -> rpcProxy.versionRequest(null, + VOID_VERSION_REQUEST), + res -> PBHelper.convert(res.getInfo()), + NamespaceInfo.class); + } + + @Override + public void errorReport(NamenodeRegistration registration, int errorCode, + String msg) throws IOException { + if (!Client.isAsynchronousMode()) { + super.errorReport(registration, errorCode, msg); + return; + } + NamenodeProtocolProtos.ErrorReportRequestProto req = + NamenodeProtocolProtos.ErrorReportRequestProto.newBuilder() + .setErrorCode(errorCode).setMsg(msg) + .setRegistration(PBHelper.convert(registration)).build(); + + asyncIpcClient(() -> rpcProxy.errorReport(null, req), + res -> null, Void.class); + } + + @Override + public NamenodeRegistration registerSubordinateNamenode( + NamenodeRegistration registration) throws IOException { + if (!Client.isAsynchronousMode()) { + return super.registerSubordinateNamenode(registration); + } + NamenodeProtocolProtos.RegisterRequestProto req = + NamenodeProtocolProtos.RegisterRequestProto.newBuilder() + .setRegistration(PBHelper.convert(registration)).build(); + + return asyncIpcClient(() -> rpcProxy.registerSubordinateNamenode(null, req), + res -> PBHelper.convert(res.getRegistration()), + NamenodeRegistration.class); + } + + @Override + public NamenodeCommand startCheckpoint(NamenodeRegistration registration) + throws IOException { + if (!Client.isAsynchronousMode()) { + return super.startCheckpoint(registration); + } + NamenodeProtocolProtos.StartCheckpointRequestProto req = + NamenodeProtocolProtos.StartCheckpointRequestProto.newBuilder() + .setRegistration(PBHelper.convert(registration)).build(); + + return asyncIpcClient(() -> rpcProxy.startCheckpoint(null, req), + res -> { + HdfsServerProtos.NamenodeCommandProto cmd = res.getCommand(); + return PBHelper.convert(cmd); + }, NamenodeCommand.class); + } + + @Override + public void endCheckpoint(NamenodeRegistration registration, + CheckpointSignature sig) throws IOException { + if (!Client.isAsynchronousMode()) { + super.endCheckpoint(registration, sig); + return; + } + NamenodeProtocolProtos.EndCheckpointRequestProto req = + NamenodeProtocolProtos.EndCheckpointRequestProto.newBuilder() + .setRegistration(PBHelper.convert(registration)) + .setSignature(PBHelper.convert(sig)).build(); + + asyncIpcClient(() -> rpcProxy.endCheckpoint(null, req), + res -> null, Void.class); + } + + @Override + public RemoteEditLogManifest getEditLogManifest(long sinceTxId) + throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getEditLogManifest(sinceTxId); + } + NamenodeProtocolProtos.GetEditLogManifestRequestProto req = + NamenodeProtocolProtos.GetEditLogManifestRequestProto + .newBuilder().setSinceTxId(sinceTxId).build(); + + return asyncIpcClient(() -> rpcProxy.getEditLogManifest(null, req), + res -> PBHelper.convert(res.getManifest()), RemoteEditLogManifest.class); + } + + @Override + public boolean isUpgradeFinalized() throws IOException { + if (!Client.isAsynchronousMode()) { + return super.isUpgradeFinalized(); + } + NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto req = + NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto + .newBuilder().build(); + + return asyncIpcClient(() -> rpcProxy.isUpgradeFinalized(null, req), + res -> res.getIsUpgradeFinalized(), Boolean.class); + } + + @Override + public boolean isRollingUpgrade() throws IOException { + if (!Client.isAsynchronousMode()) { + return super.isRollingUpgrade(); + } + NamenodeProtocolProtos.IsRollingUpgradeRequestProto req = + NamenodeProtocolProtos.IsRollingUpgradeRequestProto + .newBuilder().build(); + + return asyncIpcClient(() -> rpcProxy.isRollingUpgrade(null, req), + res -> res.getIsRollingUpgrade(), Boolean.class); + } + + @Override + public Long getNextSPSPath() throws IOException { + if (!Client.isAsynchronousMode()) { + return super.getNextSPSPath(); + } + NamenodeProtocolProtos.GetNextSPSPathRequestProto req = + NamenodeProtocolProtos.GetNextSPSPathRequestProto.newBuilder().build(); + + return asyncIpcClient(() -> rpcProxy.getNextSPSPath(null, req), + res -> res.hasSpsPath() ? res.getSpsPath() : null, Long.class); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterRefreshUserMappingsProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterRefreshUserMappingsProtocolTranslatorPB.java new file mode 100644 index 0000000000000..78728e04f295d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterRefreshUserMappingsProtocolTranslatorPB.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocolPB; + +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos; +import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB; +import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB; +import java.io.IOException; + +import static org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil.asyncIpcClient; + +public class RouterRefreshUserMappingsProtocolTranslatorPB + extends RefreshUserMappingsProtocolClientSideTranslatorPB { + private final RefreshUserMappingsProtocolPB rpcProxy; + public RouterRefreshUserMappingsProtocolTranslatorPB(RefreshUserMappingsProtocolPB rpcProxy) { + super(rpcProxy); + this.rpcProxy = rpcProxy; + } + + @Override + public void refreshUserToGroupsMappings() throws IOException { + if (!Client.isAsynchronousMode()) { + super.refreshUserToGroupsMappings(); + return; + } + + asyncIpcClient(() -> rpcProxy.refreshUserToGroupsMappings(null, + RefreshUserMappingsProtocolProtos + .RefreshUserToGroupsMappingsRequestProto + .newBuilder().build()), + res -> null, Void.class); + } + + @Override + public void refreshSuperUserGroupsConfiguration() throws IOException { + if (!Client.isAsynchronousMode()) { + super.refreshSuperUserGroupsConfiguration(); + return; + } + + asyncIpcClient(() -> rpcProxy.refreshSuperUserGroupsConfiguration(null, + RefreshUserMappingsProtocolProtos + .RefreshSuperUserGroupsConfigurationRequestProto + .newBuilder().build()), + res -> null, Void.class); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/package-info.java new file mode 100644 index 0000000000000..f17eab74865d0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/package-info.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains the implementation of the Protocol Buffers + * protocols related to HDFS Router. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving + +package org.apache.hadoop.hdfs.protocolPB; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java index c13debf571c7d..5c3f95b2c8286 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java @@ -32,6 +32,10 @@ import javax.net.SocketFactory; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.hdfs.protocolPB.RouterClientProtocolTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.RouterGetUserMappingsProtocolTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.RouterNamenodeProtocolTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.RouterRefreshUserMappingsProtocolTranslatorPB; import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -41,9 +45,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; -import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; -import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.RetryPolicy; @@ -55,10 +57,8 @@ import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB; import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB; import org.apache.hadoop.tools.GetUserMappingsProtocol; -import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB; import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB; import org.apache.hadoop.util.Time; import org.eclipse.jetty.util.ajax.JSON; @@ -117,15 +117,15 @@ public class ConnectionPool { static { PROTO_MAP.put(ClientProtocol.class, new ProtoImpl(ClientNamenodeProtocolPB.class, - ClientNamenodeProtocolTranslatorPB.class)); + RouterClientProtocolTranslatorPB.class)); PROTO_MAP.put(NamenodeProtocol.class, new ProtoImpl( - NamenodeProtocolPB.class, NamenodeProtocolTranslatorPB.class)); + NamenodeProtocolPB.class, RouterNamenodeProtocolTranslatorPB.class)); PROTO_MAP.put(RefreshUserMappingsProtocol.class, new ProtoImpl(RefreshUserMappingsProtocolPB.class, - RefreshUserMappingsProtocolClientSideTranslatorPB.class)); + RouterRefreshUserMappingsProtocolTranslatorPB.class)); PROTO_MAP.put(GetUserMappingsProtocol.class, new ProtoImpl(GetUserMappingsProtocolPB.class, - GetUserMappingsProtocolClientSideTranslatorPB.class)); + RouterGetUserMappingsProtocolTranslatorPB.class)); } /** Class to store the protocol implementation. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java index ec2f7360b3092..d7ed04b08e368 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java @@ -58,6 +58,7 @@ public final class AsyncUtil { private static final Boolean BOOLEAN_RESULT = false; private static final Long LONG_RESULT = -1L; + private static final Integer INT_RESULT = -1; private static final Object NULL_RESULT = null; private AsyncUtil(){} @@ -83,6 +84,8 @@ public static R asyncReturn(Class clazz) { return (R) BOOLEAN_RESULT; } else if (clazz.equals(Long.class)) { return (R) LONG_RESULT; + } else if (clazz.equals(Integer.class)) { + return (R) INT_RESULT; } return (R) NULL_RESULT; } @@ -137,6 +140,10 @@ public static void asyncComplete(R value) { CompletableFuture.completedFuture(value)); } + public static void asyncCompleteWith(CompletableFuture completableFuture) { + CUR_COMPLETABLE_FUTURE.set((CompletableFuture) completableFuture); + } + /** * Completes the current asynchronous operation with an exception. * This method sets the result of the current thread's {@link CompletableFuture} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java new file mode 100644 index 0000000000000..c0cd4c898e828 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocolPB; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.ProtobufRpcEngine2; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ipc.TestRPC; +import org.apache.hadoop.ipc.TestRpcBase; +import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.thirdparty.protobuf.BlockingService; +import org.apache.hadoop.util.Time; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestAsyncRpcProtocolPBUtil { + private static final Logger LOG = LoggerFactory.getLogger(TestAsyncRpcProtocolPBUtil.class); + private static final int SERVER_PROCESS_COST_MS = 100; + private TestClientProtocolTranslatorPB clientPB; + private Server rpcServer; + + @Before + public void setUp() throws IOException { + Configuration conf = new Configuration(); + RPC.setProtocolEngine(conf, TestRpcBase.TestRpcService.class, + ProtobufRpcEngine2.class); + + // Create server side implementation + TestClientProtocolServerSideTranslatorPB serverImpl = + new TestClientProtocolServerSideTranslatorPB(SERVER_PROCESS_COST_MS); + BlockingService service = TestRpcServiceProtos.TestProtobufRpcProto + .newReflectiveBlockingService(serverImpl); + + // start the IPC server + rpcServer = new RPC.Builder(conf) + .setProtocol(TestRpcBase.TestRpcService.class) + .setInstance(service).setBindAddress("0.0.0.0") + .setPort(0).setNumHandlers(1).setVerbose(true).build(); + + rpcServer.start(); + + InetSocketAddress addr = NetUtils.getConnectAddress(rpcServer); + + TestRpcBase.TestRpcService proxy = RPC.getProxy(TestRpcBase.TestRpcService.class, + TestRPC.TestProtocol.versionID, addr, conf); + clientPB = new TestClientProtocolTranslatorPB(proxy); + Client.setAsynchronousMode(true); + clientPB.ping(); + } + + @After + public void clear() { + if (clientPB != null) { + clientPB.close(); + } + if (rpcServer != null) { + rpcServer.stop(); + } + } + + @Test + public void testAsyncIpcClient() throws Exception { + Client.setAsynchronousMode(true); + long start = Time.monotonicNow(); + clientPB.add(1, 2); + long cost = Time.monotonicNow() - start; + LOG.info("rpc client add {} {}, cost: {}ms", 1, 2, cost); + Integer res = syncReturn(Integer.class); + checkResult(3, res, cost); + + start = Time.monotonicNow(); + clientPB.echo("test echo!"); + cost = Time.monotonicNow() - start; + LOG.info("rpc client echo {}, cost: {}ms", "test echo!", cost); + String value = syncReturn(String.class); + checkResult("test echo!", value, cost); + + start = Time.monotonicNow(); + clientPB.error(); + LOG.info("rpc client error, cost: {}ms", Time.monotonicNow() - start); + LambdaTestUtils.intercept(RemoteException.class, "test!", + () -> AsyncUtil.syncReturn(String.class)); + } + + private void checkResult(Object expected, Object actual, long cost) { + assertTrue(cost < SERVER_PROCESS_COST_MS); + assertEquals(expected, actual); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestClientProtocol.java new file mode 100644 index 0000000000000..fee964a529270 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestClientProtocol.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.IOException; + +public interface TestClientProtocol { + void ping() throws IOException; + + String echo(String echoMessage) throws IOException; + + void error() throws IOException; + + int add(int num1, int num2) throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestClientProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestClientProtocolServerSideTranslatorPB.java new file mode 100644 index 0000000000000..4e64a3af9dddb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestClientProtocolServerSideTranslatorPB.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocolPB; + +import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.ipc.TestRpcBase; +import org.apache.hadoop.ipc.protobuf.TestProtos; +import org.apache.hadoop.thirdparty.protobuf.RpcController; +import org.apache.hadoop.thirdparty.protobuf.ServiceException; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestClientProtocolServerSideTranslatorPB extends TestRpcBase.PBServerImpl { + private static final Logger LOG = + LoggerFactory.getLogger(TestClientProtocolServerSideTranslatorPB.class); + private final int processTime; + + public TestClientProtocolServerSideTranslatorPB(int processTime) { + this.processTime = processTime; + } + + @Override + public TestProtos.EmptyResponseProto error( + RpcController unused, TestProtos.EmptyRequestProto request) + throws ServiceException { + long start = Time.monotonicNow(); + try { + Thread.sleep(processTime); + throw new ServiceException("error", new StandbyException("test!")); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + LOG.info("rpc server error cost: {}ms", Time.monotonicNow() - start); + } + return null; + } + + @Override + public TestProtos.EchoResponseProto echo( + RpcController unused, TestProtos.EchoRequestProto request) throws ServiceException { + TestProtos.EchoResponseProto res = null; + long start = Time.monotonicNow(); + try { + Thread.sleep(processTime); + res = super.echo(unused, request); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + LOG.info("rpc server echo: {}, result: {}, cost: {}ms", request.getMessage(), + res.getMessage(), Time.monotonicNow() - start); + } + return res; + } + + @Override + public TestProtos.AddResponseProto add( + RpcController controller, TestProtos.AddRequestProto request) throws ServiceException { + TestProtos.AddResponseProto res = null; + long start = Time.monotonicNow(); + try { + Thread.sleep(processTime); + res = super.add(controller, request); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + LOG.info("rpc server add: {} {}, result: {}, cost: {}ms", + request.getParam1(), request.getParam2(), res.getResult(), Time.monotonicNow() - start); + } + return res; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestClientProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestClientProtocolTranslatorPB.java new file mode 100644 index 0000000000000..3fd9a2c4ea0b6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestClientProtocolTranslatorPB.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocolPB; + +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.TestRpcBase.TestRpcService; +import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto; +import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto; +import org.apache.hadoop.ipc.protobuf.TestProtos.AddRequestProto; + +import java.io.Closeable; +import java.io.IOException; + +public class TestClientProtocolTranslatorPB implements TestClientProtocol, Closeable { + final private TestRpcService rpcProxy; + + public TestClientProtocolTranslatorPB(TestRpcService rpcProxy) { + this.rpcProxy = rpcProxy; + } + + @Override + public void ping() throws IOException { + EmptyRequestProto req = EmptyRequestProto.newBuilder() + .build(); + + AsyncRpcProtocolPBUtil.asyncIpcClient(() -> rpcProxy.ping(null, req), + res -> null, Void.class); + } + + @Override + public String echo(String echoMessage) throws IOException { + EchoRequestProto req = EchoRequestProto.newBuilder() + .setMessage(echoMessage) + .build(); + + return AsyncRpcProtocolPBUtil.asyncIpcClient(() -> rpcProxy.echo(null, req), + res -> res.getMessage(), String.class); + } + + @Override + public void error() throws IOException { + EmptyRequestProto req = EmptyRequestProto.newBuilder() + .build(); + + AsyncRpcProtocolPBUtil.asyncIpcClient(() -> rpcProxy.error(null, req), + res -> null, Void.class); + } + + @Override + public int add(int num1, int num2) throws IOException { + AddRequestProto req = AddRequestProto.newBuilder() + .setParam1(num1) + .setParam2(num2) + .build(); + + return AsyncRpcProtocolPBUtil.asyncIpcClient(() -> rpcProxy.add(null, req), + res -> res.getResult(), Integer.class); + } + + @Override + public void close() { + RPC.stopProxy(rpcProxy); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java new file mode 100644 index 0000000000000..d107bbe3bf7c3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocolPB; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.CryptoProtocolVersion; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; +import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.ProtobufRpcEngine2; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB; +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB; +import org.apache.hadoop.util.Lists; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.EnumSet; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.hadoop.crypto.CryptoProtocolVersion.ENCRYPTION_ZONES; +import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT; +import static org.apache.hadoop.fs.permission.AclEntryType.USER; +import static org.apache.hadoop.fs.permission.FsAction.ALL; +import static org.apache.hadoop.fs.permission.FsAction.NONE; +import static org.apache.hadoop.fs.permission.FsAction.READ; +import static org.apache.hadoop.fs.permission.FsAction.READ_WRITE; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; +import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TestRouterClientSideTranslatorPB { + private static MiniDFSCluster cluster = null; + private static InetSocketAddress nnAddress = null; + private static Configuration conf = null; + private static RouterClientProtocolTranslatorPB clientProtocolTranslatorPB; + private static RouterGetUserMappingsProtocolTranslatorPB getUserMappingsProtocolTranslatorPB; + private static RouterNamenodeProtocolTranslatorPB namenodeProtocolTranslatorPB; + private static RouterRefreshUserMappingsProtocolTranslatorPB + refreshUserMappingsProtocolTranslatorPB; + private static final String TEST_DIR_PATH = "/test"; + private boolean mode; + + @BeforeClass + public static void setUp() throws Exception { + conf = new HdfsConfiguration(); + cluster = (new MiniDFSCluster.Builder(conf)) + .numDataNodes(1).build(); + cluster.waitClusterUp(); + nnAddress = cluster.getNameNode().getNameNodeAddress(); + clientProtocolTranslatorPB = new RouterClientProtocolTranslatorPB( + createProxy(ClientNamenodeProtocolPB.class)); + getUserMappingsProtocolTranslatorPB = new RouterGetUserMappingsProtocolTranslatorPB( + createProxy(GetUserMappingsProtocolPB.class)); + namenodeProtocolTranslatorPB = new RouterNamenodeProtocolTranslatorPB( + createProxy(NamenodeProtocolPB.class)); + refreshUserMappingsProtocolTranslatorPB = new RouterRefreshUserMappingsProtocolTranslatorPB( + createProxy(RefreshUserMappingsProtocolPB.class)); + } + + @AfterClass + public static void tearDown() throws Exception { + if (clientProtocolTranslatorPB != null) { + clientProtocolTranslatorPB.close(); + } + if (getUserMappingsProtocolTranslatorPB != null) { + getUserMappingsProtocolTranslatorPB.close(); + } + if (namenodeProtocolTranslatorPB != null) { + namenodeProtocolTranslatorPB.close(); + } + if (refreshUserMappingsProtocolTranslatorPB != null) { + refreshUserMappingsProtocolTranslatorPB.close(); + } + if (cluster != null) { + cluster.shutdown(); + } + } + + @Before + public void setAsync() { + mode = Client.isAsynchronousMode(); + Client.setAsynchronousMode(true); + } + + @After + public void unsetAsync() { + Client.setAsynchronousMode(mode); + } + + @Test + public void testRouterClientProtocolTranslatorPB() throws Exception { + clientProtocolTranslatorPB.mkdirs(TEST_DIR_PATH, new FsPermission(ALL, ALL, ALL), false); + Boolean success = syncReturn(Boolean.class); + assertTrue(success); + + clientProtocolTranslatorPB.setPermission(TEST_DIR_PATH, + new FsPermission(READ_WRITE, READ, NONE)); + syncReturn(Void.class); + + clientProtocolTranslatorPB.getFileInfo(TEST_DIR_PATH); + HdfsFileStatus hdfsFileStatus = syncReturn(HdfsFileStatus.class); + assertEquals(hdfsFileStatus.getPermission(), new FsPermission(READ_WRITE, READ, NONE)); + + List aclSpec = Lists.newArrayList(aclEntry(DEFAULT, USER, "tmpUser", ALL)); + clientProtocolTranslatorPB.setAcl(TEST_DIR_PATH, aclSpec); + syncReturn(Void.class); + clientProtocolTranslatorPB.setOwner(TEST_DIR_PATH, "tmpUser", "tmpUserGroup"); + syncReturn(Void.class); + + clientProtocolTranslatorPB.getFileInfo(TEST_DIR_PATH); + hdfsFileStatus = syncReturn(HdfsFileStatus.class); + assertEquals("tmpUser", hdfsFileStatus.getOwner()); + assertEquals("tmpUserGroup", hdfsFileStatus.getGroup()); + + clientProtocolTranslatorPB.create(TEST_DIR_PATH + "/testCreate.file", + new FsPermission(ALL, ALL, ALL), "testAsyncClient", + new EnumSetWritable<>(EnumSet.of(CreateFlag.CREATE)), + false, (short) 1, 128 * 1024 * 1024L, new CryptoProtocolVersion[]{ENCRYPTION_ZONES}, + null, null); + hdfsFileStatus = syncReturn(HdfsFileStatus.class); + assertTrue(hdfsFileStatus.isFile()); + assertEquals(128 * 1024 * 1024, hdfsFileStatus.getBlockSize()); + + clientProtocolTranslatorPB.getListing(TEST_DIR_PATH, new byte[1], true); + DirectoryListing directoryListing = syncReturn(DirectoryListing.class); + assertEquals(1, directoryListing.getPartialListing().length); + + clientProtocolTranslatorPB.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL); + DatanodeInfo[] datanodeInfos = syncReturn(DatanodeInfo[].class); + assertEquals(1, datanodeInfos.length); + + clientProtocolTranslatorPB.createSymlink(TEST_DIR_PATH + "/testCreate.file", + "/link/link.file", new FsPermission(ALL, ALL, ALL), true); + syncReturn(Void.class); + + clientProtocolTranslatorPB.getFileLinkInfo("/link/link.file"); + hdfsFileStatus = syncReturn(HdfsFileStatus.class); + assertEquals("testCreate.file", hdfsFileStatus.getSymlink().getName()); + + clientProtocolTranslatorPB.rename(TEST_DIR_PATH + "/testCreate.file", + TEST_DIR_PATH + "/testRename.file"); + success = syncReturn(boolean.class); + assertTrue(success); + + clientProtocolTranslatorPB.delete(TEST_DIR_PATH, true); + success = syncReturn(boolean.class); + assertTrue(success); + + LambdaTestUtils.intercept(RemoteException.class, "Parent directory doesn't exist: /test", + () -> { + clientProtocolTranslatorPB.mkdirs(TEST_DIR_PATH + "/testCreate.file", + new FsPermission(ALL, ALL, ALL), false); + syncReturn(boolean.class); + }); + } + + @Test + public void testRouterGetUserMappingsProtocolTranslatorPB() throws Exception { + getUserMappingsProtocolTranslatorPB.getGroupsForUser("root"); + String[] strings = syncReturn(String[].class); + assertTrue(strings.length != 0); + + getUserMappingsProtocolTranslatorPB.getGroupsForUser("tmp"); + strings = syncReturn(String[].class); + assertEquals(0, strings.length); + } + + @Test + public void testRouterNamenodeProtocolTranslatorPB() throws Exception { + namenodeProtocolTranslatorPB.getTransactionID(); + Long id = syncReturn(Long.class); + assertTrue(id > 0); + + namenodeProtocolTranslatorPB.getBlockKeys(); + ExportedBlockKeys exportedBlockKeys = syncReturn(ExportedBlockKeys.class); + assertNotNull(exportedBlockKeys); + + namenodeProtocolTranslatorPB.rollEditLog(); + CheckpointSignature checkpointSignature = syncReturn(CheckpointSignature.class); + assertNotNull(checkpointSignature); + } + + @Test + public void testRouterRefreshUserMappingsProtocolTranslatorPB() throws Exception { + refreshUserMappingsProtocolTranslatorPB.refreshUserToGroupsMappings(); + syncReturn(Void.class); + + refreshUserMappingsProtocolTranslatorPB.refreshSuperUserGroupsConfiguration(); + syncReturn(Void.class); + } + + public static

P createProxy(Class

protocol) throws IOException { + RPC.setProtocolEngine( + conf, protocol, ProtobufRpcEngine2.class); + final long version = RPC.getProtocolVersion(protocol); + return RPC.getProtocolProxy( + protocol, version, nnAddress, UserGroupInformation.getCurrentUser(), + conf, + NetUtils.getDefaultSocketFactory(conf), + RPC.getRpcTimeout(conf), null, + new AtomicBoolean(false)).getProxy(); + } +}