Skip to content

Commit

Permalink
HDFS-17544. [ARR] The router client rpc protocol PB supports asynchro…
Browse files Browse the repository at this point in the history
…ny. (apache#6870). Contributed by Jian Zhang.

Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
  • Loading branch information
KeeProMise committed Nov 21, 2024
1 parent ff2a574 commit 9217317
Show file tree
Hide file tree
Showing 14 changed files with 3,100 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -274,47 +274,47 @@ public class ClientNamenodeProtocolTranslatorPB implements
ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
final private ClientNamenodeProtocolPB rpcProxy;

static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
protected static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
GetServerDefaultsRequestProto.newBuilder().build();

private final static GetFsStatusRequestProto VOID_GET_FSSTATUS_REQUEST =
protected final static GetFsStatusRequestProto VOID_GET_FSSTATUS_REQUEST =
GetFsStatusRequestProto.newBuilder().build();

private final static GetFsReplicatedBlockStatsRequestProto
protected final static GetFsReplicatedBlockStatsRequestProto
VOID_GET_FS_REPLICATED_BLOCK_STATS_REQUEST =
GetFsReplicatedBlockStatsRequestProto.newBuilder().build();

private final static GetFsECBlockGroupStatsRequestProto
protected final static GetFsECBlockGroupStatsRequestProto
VOID_GET_FS_ECBLOCKGROUP_STATS_REQUEST =
GetFsECBlockGroupStatsRequestProto.newBuilder().build();

private final static RollEditsRequestProto VOID_ROLLEDITS_REQUEST =
protected final static RollEditsRequestProto VOID_ROLLEDITS_REQUEST =
RollEditsRequestProto.getDefaultInstance();

private final static RefreshNodesRequestProto VOID_REFRESH_NODES_REQUEST =
protected final static RefreshNodesRequestProto VOID_REFRESH_NODES_REQUEST =
RefreshNodesRequestProto.newBuilder().build();

private final static FinalizeUpgradeRequestProto
protected final static FinalizeUpgradeRequestProto
VOID_FINALIZE_UPGRADE_REQUEST =
FinalizeUpgradeRequestProto.newBuilder().build();

private final static UpgradeStatusRequestProto
protected final static UpgradeStatusRequestProto
VOID_UPGRADE_STATUS_REQUEST =
UpgradeStatusRequestProto.newBuilder().build();

private final static GetDataEncryptionKeyRequestProto
protected final static GetDataEncryptionKeyRequestProto
VOID_GET_DATA_ENCRYPTIONKEY_REQUEST =
GetDataEncryptionKeyRequestProto.newBuilder().build();

private final static GetStoragePoliciesRequestProto
protected final static GetStoragePoliciesRequestProto
VOID_GET_STORAGE_POLICIES_REQUEST =
GetStoragePoliciesRequestProto.newBuilder().build();

private final static GetErasureCodingPoliciesRequestProto
protected final static GetErasureCodingPoliciesRequestProto
VOID_GET_EC_POLICIES_REQUEST = GetErasureCodingPoliciesRequestProto
.newBuilder().build();

private final static GetErasureCodingCodecsRequestProto
protected final static GetErasureCodingCodecsRequestProto
VOID_GET_EC_CODEC_REQUEST = GetErasureCodingCodecsRequestProto
.newBuilder().build();

Expand Down Expand Up @@ -1137,7 +1137,7 @@ public void removeCacheDirective(long id)
setId(id).build()));
}

private static class BatchedCacheEntries
protected static class BatchedCacheEntries
implements BatchedEntries<CacheDirectiveEntry> {
private final ListCacheDirectivesResponseProto response;

Expand Down Expand Up @@ -1200,7 +1200,7 @@ public void removeCachePool(String cachePoolName) throws IOException {
setPoolName(cachePoolName).build()));
}

private static class BatchedCachePoolEntries
protected static class BatchedCachePoolEntries
implements BatchedEntries<CachePoolEntry> {
private final ListCachePoolsResponseProto proto;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.protocolPB;

import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.internal.ShadedProtobufHelper;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCompleteWith;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;

public final class AsyncRpcProtocolPBUtil {
public static final Logger LOG = LoggerFactory.getLogger(AsyncRpcProtocolPBUtil.class);

private AsyncRpcProtocolPBUtil() {}

public static <T, R> R asyncIpcClient(
ShadedProtobufHelper.IpcCall<T> call, ApplyFunction<T, R> response,
Class<R> clazz) throws IOException {
ipc(call);
AsyncGet<T, Exception> asyncReqMessage =
(AsyncGet<T, Exception>) ProtobufRpcEngine2.getAsyncReturnMessage();
CompletableFuture<Writable> responseFuture = Client.getResponseFuture();
// transfer originCall & callerContext to worker threads of executor.
final Server.Call originCall = Server.getCurCall().get();
final CallerContext originContext = CallerContext.getCurrent();
asyncCompleteWith(responseFuture);
asyncApply(o -> {
try {
Server.getCurCall().set(originCall);
CallerContext.setCurrent(originContext);
T res = asyncReqMessage.get(-1, null);
return response.apply(res);
} catch (Exception e) {
throw warpCompletionException(e);
}
});
return asyncReturn(clazz);
}
}
Loading

0 comments on commit 9217317

Please sign in to comment.