Skip to content

Commit b3d99b9

Browse files
committed
HDFS-16708. RBF: Support transmit state id from client in router.
1 parent 2f49eec commit b3d99b9

File tree

46 files changed

+1896
-164
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1896
-164
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public interface AlignmentContext {
4646
void updateResponseState(RpcResponseHeaderProto.Builder header);
4747

4848
/**
49-
* This is the intended client method call to implement to recieve state info
49+
* This is the intended client method call to implement to receive state info
5050
* during RPC response processing.
5151
*
5252
* @param header The RPC response header.
@@ -73,7 +73,7 @@ public interface AlignmentContext {
7373
* @return state id required for the server to execute the call.
7474
* @throws IOException raised on errors performing I/O.
7575
*/
76-
long receiveRequestState(RpcRequestHeaderProto header, long threshold)
76+
long receiveRequestState(RpcRequestHeaderProto header, long threshold, boolean isCoordinatedCall)
7777
throws IOException;
7878

7979
/**
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.ipc;
20+
21+
public enum NameServiceStateIdMode {
22+
DISABLE("DISABLE"),
23+
TRANSMISSION("TRANSMISSION"),
24+
PROXY("PROXY");
25+
26+
private String name;
27+
28+
NameServiceStateIdMode(String name) {
29+
this.name = name;
30+
}
31+
32+
@Override
33+
public String toString() {
34+
return name;
35+
}
36+
37+
public boolean isDisable() {
38+
return this == NameServiceStateIdMode.DISABLE;
39+
}
40+
41+
public boolean isTransmission() {
42+
return this == NameServiceStateIdMode.TRANSMISSION;
43+
}
44+
45+
public boolean isProxy() {
46+
return this == NameServiceStateIdMode.PROXY;
47+
}
48+
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
3131
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
3232
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
33+
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
34+
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.NameserviceStateIdContextProto.NameServiceStateIdModeProto;
3335
import org.apache.hadoop.net.NetUtils;
3436

3537
import org.apache.hadoop.thirdparty.protobuf.RpcController;
@@ -238,4 +240,29 @@ public static String toTraceName(String fullName) {
238240
return fullName.substring(secondLastPeriod + 1, lastPeriod) + "#" +
239241
fullName.substring(lastPeriod + 1);
240242
}
243+
244+
public static NameServiceStateIdMode toStateIdMode(String mode) {
245+
return NameServiceStateIdMode.valueOf(mode.toUpperCase());
246+
}
247+
248+
public static NameServiceStateIdMode toStateIdMode(RpcRequestHeaderProto proto) {
249+
if (proto.hasNameserviceStateIdsContext()) {
250+
return NameServiceStateIdMode.valueOf(proto.getNameserviceStateIdsContext().getMode().name());
251+
}
252+
return null;
253+
}
254+
255+
public static NameServiceStateIdModeProto toNameServiceStateIdModeProto(
256+
NameServiceStateIdMode mode) {
257+
switch(mode) {
258+
case DISABLE:
259+
return NameServiceStateIdModeProto.DISABLE;
260+
case TRANSMISSION:
261+
return NameServiceStateIdModeProto.TRANSMISSION;
262+
case PROXY:
263+
return NameServiceStateIdModeProto.PROXY;
264+
default:
265+
return null;
266+
}
267+
}
241268
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -928,7 +928,7 @@ public static class Call implements Schedulable,
928928
private volatile String detailedMetricsName = "";
929929
final int callId; // the client's call id
930930
final int retryCount; // the retry count of the call
931-
long timestampNanos; // time the call was received
931+
private final long timestampNanos; // time the call was received
932932
long responseTimestampNanos; // time the call was served
933933
private AtomicInteger responseWaitCount = new AtomicInteger(1);
934934
final RPC.RpcKind rpcKind;
@@ -1110,6 +1110,18 @@ public void setDeferredResponse(Writable response) {
11101110

11111111
public void setDeferredError(Throwable t) {
11121112
}
1113+
1114+
public long getTimestampNanos() {
1115+
return timestampNanos;
1116+
}
1117+
1118+
public int getCallId() {
1119+
return callId;
1120+
}
1121+
1122+
public byte[] getClientId() {
1123+
return clientId;
1124+
}
11131125
}
11141126

11151127
/** A RPC extended call queued for handling. */
@@ -1190,8 +1202,7 @@ public Void run() throws Exception {
11901202
ResponseParams responseParams = new ResponseParams();
11911203

11921204
try {
1193-
value = call(
1194-
rpcKind, connection.protocolName, rpcRequest, timestampNanos);
1205+
value = call(rpcKind, connection.protocolName, rpcRequest, getTimestampNanos());
11951206
} catch (Throwable e) {
11961207
populateResponseParamsOnError(e, responseParams);
11971208
}
@@ -2884,11 +2895,10 @@ private void processRpcRequest(RpcRequestHeaderProto header,
28842895
protoName = req.getRequestHeader().getDeclaringClassProtocolName();
28852896
if (alignmentContext.isCoordinatedCall(protoName, methodName)) {
28862897
call.markCallCoordinated(true);
2887-
long stateId;
2888-
stateId = alignmentContext.receiveRequestState(
2889-
header, getMaxIdleTime());
2890-
call.setClientStateId(stateId);
28912898
}
2899+
long stateId = alignmentContext.receiveRequestState(header, getMaxIdleTime(),
2900+
call.isCallCoordinated());
2901+
call.setClientStateId(stateId);
28922902
} catch (IOException ioe) {
28932903
throw new RpcServerException("Processing RPC request caught ", ioe);
28942904
}

hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,21 @@ message RPCCallerContextProto {
7474
optional bytes signature = 2;
7575
}
7676

77+
message NameserviceStateIdContextProto {
78+
enum NameServiceStateIdModeProto {
79+
DISABLE = 0; // NameserviceStateIdContextProto will be ignored.
80+
TRANSMISSION = 1; // NameserviceStateIdContextProto will transparent transmission by router.
81+
PROXY = 2; // State id is proxy by router, NameserviceStateIdContextProto will be ignore.
82+
}
83+
required NameServiceStateIdModeProto mode = 1 [default = DISABLE];
84+
repeated NameserviceStateIdProto nameserviceStateIds = 2; // Last seen state IDs for multiple nameservices.
85+
}
86+
87+
message NameserviceStateIdProto {
88+
required string nsId = 1;
89+
required int64 stateId = 2;
90+
}
91+
7792
message RpcRequestHeaderProto { // the header for the RpcRequest
7893
enum OperationProto {
7994
RPC_FINAL_PACKET = 0; // The final RPC Packet
@@ -91,6 +106,7 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
91106
optional RPCTraceInfoProto traceInfo = 6; // tracing info
92107
optional RPCCallerContextProto callerContext = 7; // call context
93108
optional int64 stateId = 8; // The last seen Global State ID
109+
optional NameserviceStateIdContextProto nameserviceStateIdsContext = 9;
94110
}
95111

96112

@@ -157,6 +173,7 @@ message RpcResponseHeaderProto {
157173
optional bytes clientId = 7; // Globally unique client ID
158174
optional sint32 retryCount = 8 [default = -1];
159175
optional int64 stateId = 9; // The last written Global State ID
176+
optional NameserviceStateIdContextProto nameserviceStateIdsContext = 10;
160177
}
161178

162179
message RpcSaslProto {

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.hadoop.classification.InterfaceAudience;
2222
import org.apache.hadoop.classification.InterfaceStability;
2323
import org.apache.hadoop.ipc.AlignmentContext;
24+
import org.apache.hadoop.ipc.NameServiceStateIdMode;
2425
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
2526
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
2627

@@ -37,18 +38,28 @@
3738
@InterfaceStability.Evolving
3839
public class ClientGSIContext implements AlignmentContext {
3940

40-
private final LongAccumulator lastSeenStateId =
41-
new LongAccumulator(Math::max, Long.MIN_VALUE);
41+
public final static String DEFAULT_NS = "";
42+
private final FederatedNamespaceIds federatedNamespaceIds;
43+
private final String nsId;
44+
45+
public ClientGSIContext(NameServiceStateIdMode mode) {
46+
this(mode, DEFAULT_NS);
47+
}
48+
49+
public ClientGSIContext(NameServiceStateIdMode mode, String nsId) {
50+
this.federatedNamespaceIds = new FederatedNamespaceIds(mode);
51+
this.nsId = nsId;
52+
}
53+
4254

4355
@Override
4456
public long getLastSeenStateId() {
45-
return lastSeenStateId.get();
57+
return federatedNamespaceIds.getNamespaceId(nsId, true).get();
4658
}
4759

4860
@Override
4961
public boolean isCoordinatedCall(String protocolName, String method) {
50-
throw new UnsupportedOperationException(
51-
"Client should not be checking uncoordinated call");
62+
throw new UnsupportedOperationException("Client should not be checking uncoordinated call");
5263
}
5364

5465
/**
@@ -66,24 +77,33 @@ public void updateResponseState(RpcResponseHeaderProto.Builder header) {
6677
*/
6778
@Override
6879
public void receiveResponseState(RpcResponseHeaderProto header) {
69-
lastSeenStateId.accumulate(header.getStateId());
80+
if (federatedNamespaceIds.isDisable()) {
81+
federatedNamespaceIds.updateNameserviceState(this.nsId, header.getStateId());
82+
} else {
83+
federatedNamespaceIds.updateStateUsingResponseHeader(header);
84+
}
7085
}
7186

7287
/**
7388
* Client side implementation for providing state alignment info in requests.
7489
*/
7590
@Override
7691
public void updateRequestState(RpcRequestHeaderProto.Builder header) {
77-
header.setStateId(lastSeenStateId.longValue());
92+
if (federatedNamespaceIds.isDisable()) {
93+
header.setStateId(federatedNamespaceIds.getNamespaceId(this.nsId, true).get());
94+
header.clearNameserviceStateIdsContext();
95+
} else {
96+
federatedNamespaceIds.setRequestHeaderState(header);
97+
}
7898
}
7999

80100
/**
81101
* Client side implementation only provides state alignment info in requests.
82102
* Client does not receive RPC requests therefore this does nothing.
83103
*/
84104
@Override
85-
public long receiveRequestState(RpcRequestHeaderProto header, long threshold)
86-
throws IOException {
105+
public long receiveRequestState(RpcRequestHeaderProto header, long threshold,
106+
boolean isCoordinatedCall) throws IOException {
87107
// Do nothing.
88108
return 0;
89109
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hdfs;
20+
21+
import java.util.Map;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
24+
import org.apache.hadoop.ipc.RpcClientUtil;
25+
import org.apache.hadoop.ipc.NameServiceStateIdMode;
26+
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
27+
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
28+
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.NameserviceStateIdProto;
29+
30+
31+
/** Collection of last-seen namespace state Ids for a set of namespaces. */
32+
public class FederatedNamespaceIds {
33+
34+
private final Map<String, NamespaceStateId> namespaceIdMap = new ConcurrentHashMap<>();
35+
private NameServiceStateIdMode mode;
36+
37+
public FederatedNamespaceIds(NameServiceStateIdMode mode) {
38+
this.mode = mode;
39+
}
40+
41+
public void updateStateUsingRequestHeader(RpcRequestHeaderProto header) {
42+
mode = RpcClientUtil.toStateIdMode(header);
43+
header.getNameserviceStateIdsContext().getNameserviceStateIdsList()
44+
.forEach(this::updateNameserviceState);
45+
}
46+
47+
public void updateStateUsingResponseHeader(RpcResponseHeaderProto header) {
48+
header.getNameserviceStateIdsContext().getNameserviceStateIdsList()
49+
.forEach(this::updateNameserviceState);
50+
}
51+
52+
public void updateNameserviceState(NameserviceStateIdProto proto) {
53+
namespaceIdMap.computeIfAbsent(proto.getNsId(), n -> new NamespaceStateId());
54+
namespaceIdMap.get(proto.getNsId()).update(proto.getStateId());
55+
}
56+
57+
public void updateNameserviceState(String nsId, long stateId) {
58+
namespaceIdMap.computeIfAbsent(nsId, n -> new NamespaceStateId());
59+
namespaceIdMap.get(nsId).update(stateId);
60+
}
61+
62+
public void setRequestHeaderState(RpcRequestHeaderProto.Builder headerBuilder) {
63+
headerBuilder.getNameserviceStateIdsContextBuilder()
64+
.setMode(RpcClientUtil.toNameServiceStateIdModeProto(mode));
65+
namespaceIdMap.forEach((k, v) -> headerBuilder.getNameserviceStateIdsContextBuilder()
66+
.addNameserviceStateIds(
67+
NameserviceStateIdProto.newBuilder()
68+
.setNsId(k)
69+
.setStateId(v.get())
70+
.build())
71+
);
72+
}
73+
74+
public void setRequestHeaderState(RpcRequestHeaderProto.Builder headerBuilder, String nsId) {
75+
NamespaceStateId namespaceStateId = namespaceIdMap.get(nsId);
76+
long stateId = (namespaceStateId == null) ? NamespaceStateId.DEFAULT : namespaceStateId.get();
77+
headerBuilder.setStateId(stateId);
78+
}
79+
80+
public void setResponseHeaderState(RpcResponseHeaderProto.Builder headerBuilder) {
81+
headerBuilder.getNameserviceStateIdsContextBuilder()
82+
.setMode(RpcClientUtil.toNameServiceStateIdModeProto(mode));
83+
namespaceIdMap.forEach((k, v) -> headerBuilder.getNameserviceStateIdsContextBuilder()
84+
.addNameserviceStateIds(
85+
NameserviceStateIdProto.newBuilder()
86+
.setNsId(k)
87+
.setStateId(v.get())
88+
.build())
89+
);
90+
}
91+
92+
public NamespaceStateId getNamespaceId(String nsId, boolean useDefault) {
93+
if (useDefault) {
94+
namespaceIdMap.computeIfAbsent(nsId, n -> new NamespaceStateId());
95+
}
96+
return namespaceIdMap.get(nsId);
97+
}
98+
99+
public boolean isProxyMode() {
100+
return mode == NameServiceStateIdMode.PROXY;
101+
}
102+
103+
public boolean isTransmissionMode() {
104+
return mode == NameServiceStateIdMode.TRANSMISSION;
105+
}
106+
107+
public boolean isDisable() {
108+
return mode == NameServiceStateIdMode.DISABLE;
109+
}
110+
111+
public NameServiceStateIdMode getMode() {
112+
return mode;
113+
}
114+
115+
public boolean contains(String nsId) {
116+
return this.namespaceIdMap.containsKey(nsId);
117+
}
118+
}

0 commit comments

Comments
 (0)