Skip to content

Commit 30497ad

Browse files
committed
Avoiding denial of service between clients.
1 parent a243ca8 commit 30497ad

File tree

14 files changed

+189
-74
lines changed

14 files changed

+189
-74
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -937,6 +937,9 @@ public static class Call implements Schedulable,
937937
// the priority level assigned by scheduler, 0 by default
938938
private long clientStateId;
939939
private boolean isCallCoordinated;
940+
// Serialized RouterFederatedStateProto message to
941+
// store last seen states for multiple namespaces.
942+
private ByteString federatedNamespaceState;
940943

941944
Call() {
942945
this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT,
@@ -994,6 +997,14 @@ public ProcessingDetails getProcessingDetails() {
994997
return processingDetails;
995998
}
996999

1000+
public void setFederatedNamespaceState(ByteString federatedNamespaceState) {
1001+
this.federatedNamespaceState = federatedNamespaceState;
1002+
}
1003+
1004+
public ByteString getFederatedNamespaceState() {
1005+
return this.federatedNamespaceState;
1006+
}
1007+
9971008
@Override
9981009
public String toString() {
9991010
return "Call#" + callId + " Retry#" + retryCount;
@@ -2868,6 +2879,9 @@ private void processRpcRequest(RpcRequestHeaderProto header,
28682879
stateId = alignmentContext.receiveRequestState(
28692880
header, getMaxIdleTime());
28702881
call.setClientStateId(stateId);
2882+
if (header.hasRouterFederatedState()) {
2883+
call.setFederatedNamespaceState(header.getRouterFederatedState());
2884+
}
28712885
}
28722886
} catch (IOException ioe) {
28732887
throw new RpcServerException("Processing RPC request caught ", ioe);

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public void updateResponseState(RpcResponseHeaderProto.Builder header) {
7474
* in responses.
7575
*/
7676
@Override
77-
public void receiveResponseState(RpcResponseHeaderProto header) {
77+
public synchronized void receiveResponseState(RpcResponseHeaderProto header) {
7878
if (header.hasRouterFederatedState()) {
7979
routerFederatedState = header.getRouterFederatedState();
8080
} else {
@@ -86,7 +86,7 @@ public void receiveResponseState(RpcResponseHeaderProto header) {
8686
* Client side implementation for providing state alignment info in requests.
8787
*/
8888
@Override
89-
public void updateRequestState(RpcRequestHeaderProto.Builder header) {
89+
public synchronized void updateRequestState(RpcRequestHeaderProto.Builder header) {
9090
if (lastSeenStateId.get() != NamespaceStateId.DEFAULT) {
9191
header.setStateId(lastSeenStateId.get());
9292
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.util.concurrent.locks.ReentrantReadWriteLock;
3434

3535
import org.apache.hadoop.classification.VisibleForTesting;
36-
import org.apache.hadoop.hdfs.ClientGSIContext;
3736
import org.apache.hadoop.conf.Configuration;
3837
import org.apache.hadoop.security.UserGroupInformation;
3938
import org.apache.hadoop.util.Time;
@@ -194,7 +193,8 @@ public void close() {
194193
* @throws IOException If the connection cannot be obtained.
195194
*/
196195
public ConnectionContext getConnection(UserGroupInformation ugi,
197-
String nnAddress, Class<?> protocol, String nsId) throws IOException {
196+
String nnAddress, Class<?> protocol, String nsId,
197+
Long clientStateId) throws IOException {
198198

199199
// Check if the manager is shutdown
200200
if (!this.running) {
@@ -224,10 +224,11 @@ public ConnectionContext getConnection(UserGroupInformation ugi,
224224
pool = new ConnectionPool(
225225
this.conf, nnAddress, ugi, this.minSize, this.maxSize,
226226
this.minActiveRatio, protocol,
227-
new ClientGSIContext(this.federatedNamespaceIds.getNamespaceId(nsId)));
227+
new PoolAlignmentContext(this.federatedNamespaceIds.getNamespaceId(nsId)));
228228
this.pools.put(connectionId, pool);
229229
this.connectionPoolToNamespaceMap.put(connectionId, nsId);
230230
}
231+
pool.getPoolAlignmentContext().advanceClientStateId(clientStateId);
231232
} finally {
232233
writeLock.unlock();
233234
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public class ConnectionPool {
110110
/** Enable using multiple physical socket or not. **/
111111
private final boolean enableMultiSocket;
112112
/** StateID alignment context. */
113-
private final AlignmentContext alignmentContext;
113+
private final PoolAlignmentContext alignmentContext;
114114

115115
/** Map for the protocols and their protobuf implementations. */
116116
private final static Map<Class<?>, ProtoImpl> PROTO_MAP = new HashMap<>();
@@ -141,7 +141,7 @@ private static class ProtoImpl {
141141

142142
protected ConnectionPool(Configuration config, String address,
143143
UserGroupInformation user, int minPoolSize, int maxPoolSize,
144-
float minActiveRatio, Class<?> proto, AlignmentContext alignmentContext)
144+
float minActiveRatio, Class<?> proto, PoolAlignmentContext alignmentContext)
145145
throws IOException {
146146

147147
this.conf = config;
@@ -217,6 +217,14 @@ public AtomicInteger getClientIndex() {
217217
return this.clientIndex;
218218
}
219219

220+
/**
221+
* Get the alignment context for this pool
222+
* @return Alignment context
223+
*/
224+
public PoolAlignmentContext getPoolAlignmentContext() {
225+
return this.alignmentContext;
226+
}
227+
220228
/**
221229
* Return the next connection round-robin.
222230
*

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederatedNamespaceIds.java

Lines changed: 13 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import java.util.Collections;
2222
import java.util.Map;
2323
import java.util.concurrent.ConcurrentHashMap;
24-
import java.util.concurrent.locks.ReentrantLock;
25-
import org.apache.hadoop.classification.VisibleForTesting;
2624
import org.apache.hadoop.hdfs.NamespaceStateId;
2725
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
2826
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterFederatedStateProto;
@@ -36,67 +34,30 @@
3634
* Router clients share and query the entire collection.
3735
*/
3836
public class FederatedNamespaceIds {
39-
private final Map<String, NamespaceStateId> namespaceIdMap = new ConcurrentHashMap<>();
40-
private final ReentrantLock lock = new ReentrantLock();
41-
42-
public void updateStateUsingRequestHeader(RpcHeaderProtos.RpcRequestHeaderProto header) {
43-
if (header.hasRouterFederatedState()) {
44-
RouterFederatedStateProto federatedState;
45-
try {
46-
federatedState = RouterFederatedStateProto.parseFrom(header.getRouterFederatedState());
47-
} catch (InvalidProtocolBufferException e) {
48-
throw new RuntimeException(e);
49-
}
50-
lock.lock();
51-
try {
52-
federatedState.getNamespaceStateIdsMap().forEach((nsId, stateId) -> {
53-
if (!namespaceIdMap.containsKey(nsId)) {
54-
namespaceIdMap.putIfAbsent(nsId, new NamespaceStateId());
55-
}
56-
namespaceIdMap.get(nsId).update(stateId);
57-
});
58-
} finally {
59-
lock.unlock();
60-
}
61-
62-
}
63-
}
37+
private final ConcurrentHashMap<String, NamespaceStateId> namespaceIdMap =
38+
new ConcurrentHashMap<>();
6439

6540
public void setResponseHeaderState(RpcHeaderProtos.RpcResponseHeaderProto.Builder headerBuilder) {
41+
if (namespaceIdMap.isEmpty()) {
42+
return;
43+
}
6644
RouterFederatedStateProto.Builder federatedStateBuilder =
6745
RouterFederatedStateProto.newBuilder();
68-
lock.lock();
69-
try {
70-
namespaceIdMap.forEach((k, v) -> federatedStateBuilder.putNamespaceStateIds(k, v.get()));
71-
} finally {
72-
lock.unlock();
73-
}
46+
namespaceIdMap.forEach((k, v) -> federatedStateBuilder.putNamespaceStateIds(k, v.get()));
7447
headerBuilder.setRouterFederatedState(federatedStateBuilder.build().toByteString());
7548
}
7649

7750
public NamespaceStateId getNamespaceId(String nsId) {
78-
lock.lock();
79-
try {
80-
namespaceIdMap.putIfAbsent(nsId, new NamespaceStateId());
81-
} finally {
82-
lock.unlock();
83-
}
84-
return namespaceIdMap.get(nsId);
51+
return namespaceIdMap.computeIfAbsent(nsId, key -> new NamespaceStateId());
8552
}
8653

8754
public void removeNamespaceId(String nsId) {
88-
lock.lock();
89-
try {
90-
namespaceIdMap.remove(nsId);
91-
} finally {
92-
lock.unlock();
93-
}
55+
namespaceIdMap.remove(nsId);
9456
}
9557

9658
/**
97-
* Utility function to view state of routerFederatedState field in RPC headers.
59+
* Utility function to parse routerFederatedState field in RPC headers.
9860
*/
99-
@VisibleForTesting
10061
public static Map<String, Long> getRouterFederatedStateMap(ByteString byteString) {
10162
if (byteString != null) {
10263
RouterFederatedStateProto federatedState;
@@ -110,4 +71,8 @@ public static Map<String, Long> getRouterFederatedStateMap(ByteString byteString
11071
return Collections.emptyMap();
11172
}
11273
}
74+
75+
public int size() {
76+
return namespaceIdMap.size();
77+
}
11378
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hdfs.server.federation.router;
20+
21+
import java.io.IOException;
22+
import org.apache.hadoop.hdfs.NamespaceStateId;
23+
import org.apache.hadoop.ipc.AlignmentContext;
24+
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
25+
26+
27+
public class PoolAlignmentContext implements AlignmentContext {
28+
private NamespaceStateId sharedGlobalStateId;
29+
private NamespaceStateId poolLocalStateId;
30+
31+
PoolAlignmentContext(NamespaceStateId namespaceStateId) {
32+
sharedGlobalStateId = namespaceStateId;
33+
poolLocalStateId = new NamespaceStateId();
34+
}
35+
36+
/**
37+
* Client side implementation only receives state alignment info.
38+
* It does not provide state alignment info therefore this does nothing.
39+
*/
40+
@Override
41+
public void updateResponseState(RpcHeaderProtos.RpcResponseHeaderProto.Builder header) {
42+
// Do nothing.
43+
}
44+
45+
/**
46+
* Router update globally shared namespaceStateId value using response from
47+
* namenodes.
48+
*/
49+
@Override
50+
public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header) {
51+
sharedGlobalStateId.update(header.getStateId());
52+
}
53+
54+
/**
55+
* Client side implementation for routers to provide state info in requests to
56+
* namenodes.
57+
*/
58+
@Override
59+
public void updateRequestState(RpcHeaderProtos.RpcRequestHeaderProto.Builder header) {
60+
long maxStateId = Long.max(poolLocalStateId.get(), sharedGlobalStateId.get());
61+
header.setStateId(maxStateId);
62+
}
63+
64+
/**
65+
* Client side implementation only provides state alignment info in requests.
66+
* Client does not receive RPC requests therefore this does nothing.
67+
*/
68+
@Override
69+
public long receiveRequestState(RpcHeaderProtos.RpcRequestHeaderProto header, long threshold)
70+
throws IOException {
71+
// Do nothing.
72+
return 0;
73+
}
74+
75+
@Override
76+
public long getLastSeenStateId() {
77+
return sharedGlobalStateId.get();
78+
}
79+
80+
@Override
81+
public boolean isCoordinatedCall(String protocolName, String method) {
82+
throw new UnsupportedOperationException(
83+
"Client should not be checking uncoordinated call");
84+
}
85+
86+
public void advanceClientStateId(Long clientStateId) {
87+
poolLocalStateId.update(clientStateId);
88+
}
89+
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
191191
FEDERATION_STORE_PREFIX + "enable";
192192
public static final boolean DFS_ROUTER_STORE_ENABLE_DEFAULT = true;
193193

194+
public static final String DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE =
195+
FEDERATION_ROUTER_PREFIX + "observer.federated.state.propagation.maxsize";
196+
public static final int DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE_DEFAULT = 5;
197+
194198
public static final String FEDERATION_STORE_SERIALIZER_CLASS =
195199
FEDERATION_STORE_PREFIX + "serializer";
196200
public static final Class<StateStoreSerializerPBImpl>

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import org.apache.hadoop.ipc.StandbyException;
8181
import org.apache.hadoop.net.ConnectTimeoutException;
8282
import org.apache.hadoop.security.UserGroupInformation;
83+
import org.apache.hadoop.thirdparty.protobuf.ByteString;
8384
import org.apache.hadoop.util.StringUtils;
8485
import org.eclipse.jetty.util.ajax.JSON;
8586
import org.slf4j.Logger;
@@ -369,8 +370,20 @@ private ConnectionContext getConnection(UserGroupInformation ugi, String nsId,
369370
connUGI = UserGroupInformation.createProxyUser(
370371
ugi.getUserName(), routerUser);
371372
}
373+
374+
Long clientStateID = Long.MIN_VALUE;
375+
Call call = Server.getCurCall().get();
376+
if (call != null) {
377+
ByteString callFederatedNamespaceState = call.getFederatedNamespaceState();
378+
if (callFederatedNamespaceState != null) {
379+
Map<String, Long> clientFederatedStateIds =
380+
FederatedNamespaceIds.getRouterFederatedStateMap(callFederatedNamespaceState);
381+
clientStateID = clientFederatedStateIds.getOrDefault(nsId, Long.MIN_VALUE);
382+
}
383+
}
384+
372385
connection = this.connectionManager.getConnection(
373-
connUGI, rpcAddress, proto, nsId);
386+
connUGI, rpcAddress, proto, nsId, clientStateID);
374387
LOG.debug("User {} NN {} is using connection {}",
375388
ugi.getUserName(), rpcAddress, connection);
376389
} catch (Exception ex) {

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -255,18 +255,18 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
255255
* @param conf HDFS Configuration.
256256
* @param router A router using this RPC server.
257257
* @param nnResolver The NN resolver instance to determine active NNs in HA.
258-
* @param fileResolver File resolver to resolve file paths to subclusters.
258+
* @param fResolver File resolver to resolve file paths to subclusters.
259259
* @throws IOException If the RPC server could not be created.
260260
*/
261261
public RouterRpcServer(Configuration conf, Router router,
262-
ActiveNamenodeResolver nnResolver, FileSubclusterResolver fileResolver)
262+
ActiveNamenodeResolver nnResolver, FileSubclusterResolver fResolver)
263263
throws IOException {
264264
super(RouterRpcServer.class.getName());
265265

266266
this.conf = conf;
267267
this.router = router;
268268
this.namenodeResolver = nnResolver;
269-
this.subclusterResolver = fileResolver;
269+
this.subclusterResolver = fResolver;
270270

271271
// RPC server settings
272272
int handlerCount = this.conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY,
@@ -332,7 +332,7 @@ public RouterRpcServer(Configuration conf, Router router,
332332
.setnumReaders(readerCount)
333333
.setQueueSizePerHandler(handlerQueueSize)
334334
.setVerbose(false)
335-
.setAlignmentContext(new RouterStateIdContext(federatedNamespaceIds))
335+
.setAlignmentContext(new RouterStateIdContext(conf, federatedNamespaceIds))
336336
.setSecretManager(this.securityManager.getSecretManager())
337337
.build();
338338

0 commit comments

Comments
 (0)