Skip to content

Commit 6256a3c

Browse files
authored
Merge branch 'apache:trunk' into YARN-11592
2 parents 0836b2b + 6529085 commit 6256a3c

File tree

75 files changed

+2412
-437
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+2412
-437
lines changed

LICENSE-binary

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -257,36 +257,36 @@ io.grpc:grpc-netty:1.26.0
257257
io.grpc:grpc-protobuf:1.26.0
258258
io.grpc:grpc-protobuf-lite:1.26.0
259259
io.grpc:grpc-stub:1.26.0
260-
io.netty:netty-all:4.1.94.Final
261-
io.netty:netty-buffer:4.1.94.Final
262-
io.netty:netty-codec:4.1.94.Final
263-
io.netty:netty-codec-dns:4.1.94.Final
264-
io.netty:netty-codec-haproxy:4.1.94.Final
265-
io.netty:netty-codec-http:4.1.94.Final
266-
io.netty:netty-codec-http2:4.1.94.Final
267-
io.netty:netty-codec-memcache:4.1.94.Final
268-
io.netty:netty-codec-mqtt:4.1.94.Final
269-
io.netty:netty-codec-redis:4.1.94.Final
270-
io.netty:netty-codec-smtp:4.1.94.Final
271-
io.netty:netty-codec-socks:4.1.94.Final
272-
io.netty:netty-codec-stomp:4.1.94.Final
273-
io.netty:netty-codec-xml:4.1.94.Final
274-
io.netty:netty-common:4.1.94.Final
275-
io.netty:netty-handler:4.1.94.Final
276-
io.netty:netty-handler-proxy:4.1.94.Final
277-
io.netty:netty-resolver:4.1.94.Final
278-
io.netty:netty-resolver-dns:4.1.94.Final
279-
io.netty:netty-transport:4.1.94.Final
280-
io.netty:netty-transport-rxtx:4.1.94.Final
281-
io.netty:netty-transport-sctp:4.1.94.Final
282-
io.netty:netty-transport-udt:4.1.94.Final
283-
io.netty:netty-transport-classes-epoll:4.1.94.Final
284-
io.netty:netty-transport-native-unix-common:4.1.94.Final
285-
io.netty:netty-transport-classes-kqueue:4.1.94.Final
286-
io.netty:netty-resolver-dns-classes-macos:4.1.94.Final
287-
io.netty:netty-transport-native-epoll:4.1.94.Final
288-
io.netty:netty-transport-native-kqueue:4.1.94.Final
289-
io.netty:netty-resolver-dns-native-macos:4.1.94.Final
260+
io.netty:netty-all:4.1.100.Final
261+
io.netty:netty-buffer:4.1.100.Final
262+
io.netty:netty-codec:4.1.100.Final
263+
io.netty:netty-codec-dns:4.1.100.Final
264+
io.netty:netty-codec-haproxy:4.1.100.Final
265+
io.netty:netty-codec-http:4.1.100.Final
266+
io.netty:netty-codec-http2:4.1.100.Final
267+
io.netty:netty-codec-memcache:4.1.100.Final
268+
io.netty:netty-codec-mqtt:4.1.100.Final
269+
io.netty:netty-codec-redis:4.1.100.Final
270+
io.netty:netty-codec-smtp:4.1.100.Final
271+
io.netty:netty-codec-socks:4.1.100.Final
272+
io.netty:netty-codec-stomp:4.1.100.Final
273+
io.netty:netty-codec-xml:4.1.100.Final
274+
io.netty:netty-common:4.1.100.Final
275+
io.netty:netty-handler:4.1.100.Final
276+
io.netty:netty-handler-proxy:4.1.100.Final
277+
io.netty:netty-resolver:4.1.100.Final
278+
io.netty:netty-resolver-dns:4.1.100.Final
279+
io.netty:netty-transport:4.1.100.Final
280+
io.netty:netty-transport-rxtx:4.1.100.Final
281+
io.netty:netty-transport-sctp:4.1.100.Final
282+
io.netty:netty-transport-udt:4.1.100.Final
283+
io.netty:netty-transport-classes-epoll:4.1.100.Final
284+
io.netty:netty-transport-native-unix-common:4.1.100.Final
285+
io.netty:netty-transport-classes-kqueue:4.1.100.Final
286+
io.netty:netty-resolver-dns-classes-macos:4.1.100.Final
287+
io.netty:netty-transport-native-epoll:4.1.100.Final
288+
io.netty:netty-transport-native-kqueue:4.1.100.Final
289+
io.netty:netty-resolver-dns-native-macos:4.1.100.Final
290290
io.opencensus:opencensus-api:0.12.3
291291
io.opencensus:opencensus-contrib-grpc-metrics:0.12.3
292292
io.reactivex:rxjava:1.3.8

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,10 @@ public class CommonConfigurationKeysPublic {
504504
"ipc.server.log.slow.rpc";
505505
public static final boolean IPC_SERVER_LOG_SLOW_RPC_DEFAULT = false;
506506

507+
public static final String IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY =
508+
"ipc.server.log.slow.rpc.threshold.ms";
509+
public static final long IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT = 0;
510+
507511
public static final String IPC_SERVER_PURGE_INTERVAL_MINUTES_KEY =
508512
"ipc.server.purge.interval";
509513
public static final int IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT = 15;

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,13 @@ public final class StoreStatisticNames {
244244
public static final String OBJECT_MULTIPART_UPLOAD_ABORTED =
245245
"object_multipart_aborted";
246246

247+
/**
248+
* Object multipart list request.
249+
* Value :{@value}.
250+
*/
251+
public static final String OBJECT_MULTIPART_UPLOAD_LIST =
252+
"object_multipart_list";
253+
247254
/**
248255
* Object put/multipart upload count.
249256
* Value :{@value}.

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@
2929
import org.apache.hadoop.HadoopIllegalArgumentException;
3030
import org.apache.hadoop.classification.InterfaceAudience;
3131
import org.apache.hadoop.classification.InterfaceStability;
32+
import org.apache.hadoop.security.SecurityUtil;
3233
import org.apache.hadoop.util.ZKUtil.ZKAuthInfo;
3334
import org.apache.hadoop.util.StringUtils;
35+
import org.apache.zookeeper.client.ZKClientConfig;
3436
import org.apache.zookeeper.data.ACL;
3537
import org.apache.zookeeper.KeeperException;
3638
import org.apache.zookeeper.Watcher;
@@ -48,6 +50,10 @@
4850
import org.slf4j.Logger;
4951
import org.slf4j.LoggerFactory;
5052

53+
import javax.naming.ConfigurationException;
54+
55+
import org.apache.hadoop.security.SecurityUtil.TruststoreKeystore;
56+
5157
/**
5258
*
5359
* This class implements a simple library to perform leader election on top of
@@ -170,6 +176,7 @@ enum State {
170176
private final int zkSessionTimeout;
171177
private final List<ACL> zkAcl;
172178
private final List<ZKAuthInfo> zkAuthInfo;
179+
private TruststoreKeystore truststoreKeystore;
173180
private byte[] appData;
174181
private final String zkLockFilePath;
175182
private final String zkBreadCrumbPath;
@@ -209,6 +216,7 @@ enum State {
209216
* @param app
210217
* reference to callback interface object
211218
* @param maxRetryNum maxRetryNum.
219+
* @param truststoreKeystore truststore keystore, that we will use for ZK if SSL/TLS is enabled
212220
* @throws IOException raised on errors performing I/O.
213221
* @throws HadoopIllegalArgumentException
214222
* if valid data is not supplied.
@@ -218,10 +226,10 @@ enum State {
218226
public ActiveStandbyElector(String zookeeperHostPorts,
219227
int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
220228
List<ZKAuthInfo> authInfo, ActiveStandbyElectorCallback app,
221-
int maxRetryNum) throws IOException, HadoopIllegalArgumentException,
222-
KeeperException {
229+
int maxRetryNum, TruststoreKeystore truststoreKeystore)
230+
throws IOException, HadoopIllegalArgumentException, KeeperException {
223231
this(zookeeperHostPorts, zookeeperSessionTimeout, parentZnodeName, acl,
224-
authInfo, app, maxRetryNum, true);
232+
authInfo, app, maxRetryNum, true, truststoreKeystore);
225233
}
226234

227235
/**
@@ -254,6 +262,7 @@ public ActiveStandbyElector(String zookeeperHostPorts,
254262
* @param failFast
255263
* whether need to add the retry when establishing ZK connection.
256264
* @param maxRetryNum max Retry Num
265+
* @param truststoreKeystore truststore keystore, that we will use for ZK if SSL/TLS is enabled
257266
* @throws IOException
258267
* raised on errors performing I/O.
259268
* @throws HadoopIllegalArgumentException
@@ -264,7 +273,7 @@ public ActiveStandbyElector(String zookeeperHostPorts,
264273
public ActiveStandbyElector(String zookeeperHostPorts,
265274
int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
266275
List<ZKAuthInfo> authInfo, ActiveStandbyElectorCallback app,
267-
int maxRetryNum, boolean failFast) throws IOException,
276+
int maxRetryNum, boolean failFast, TruststoreKeystore truststoreKeystore) throws IOException,
268277
HadoopIllegalArgumentException, KeeperException {
269278
if (app == null || acl == null || parentZnodeName == null
270279
|| zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) {
@@ -279,6 +288,7 @@ public ActiveStandbyElector(String zookeeperHostPorts,
279288
zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME;
280289
zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME;
281290
this.maxRetryNum = maxRetryNum;
291+
this.truststoreKeystore = truststoreKeystore;
282292

283293
// establish the ZK Connection for future API calls
284294
if (failFast) {
@@ -740,7 +750,19 @@ protected synchronized ZooKeeper connectToZooKeeper() throws IOException,
740750
* @throws IOException raised on errors performing I/O.
741751
*/
742752
protected ZooKeeper createZooKeeper() throws IOException {
743-
return new ZooKeeper(zkHostPort, zkSessionTimeout, watcher);
753+
ZKClientConfig zkClientConfig = new ZKClientConfig();
754+
if (truststoreKeystore != null) {
755+
try {
756+
SecurityUtil.setSslConfiguration(zkClientConfig, truststoreKeystore);
757+
} catch (ConfigurationException ce) {
758+
throw new IOException(ce);
759+
}
760+
}
761+
return initiateZookeeper(zkClientConfig);
762+
}
763+
764+
protected ZooKeeper initiateZookeeper(ZKClientConfig zkClientConfig) throws IOException {
765+
return new ZooKeeper(zkHostPort, zkSessionTimeout, watcher, zkClientConfig);
744766
}
745767

746768
private void fatalError(String errorMessage) {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@
5959
import org.slf4j.Logger;
6060
import org.slf4j.LoggerFactory;
6161

62+
import org.apache.hadoop.security.SecurityUtil.TruststoreKeystore;
63+
6264
@InterfaceAudience.LimitedPrivate("HDFS")
6365
public abstract class ZKFailoverController {
6466

@@ -147,6 +149,7 @@ protected abstract void checkRpcAdminAccess()
147149
protected abstract InetSocketAddress getRpcAddressToBindTo();
148150
protected abstract PolicyProvider getPolicyProvider();
149151
protected abstract List<HAServiceTarget> getAllOtherNodes();
152+
protected abstract boolean isSSLEnabled();
150153

151154
/**
152155
* Return the name of a znode inside the configured parent znode in which
@@ -372,9 +375,10 @@ private void initZK() throws HadoopIllegalArgumentException, IOException,
372375
int maxRetryNum = conf.getInt(
373376
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
374377
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
378+
TruststoreKeystore truststoreKeystore = isSSLEnabled() ? new TruststoreKeystore(conf) : null;
375379
elector = new ActiveStandbyElector(zkQuorum,
376380
zkTimeout, getParentZnode(), zkAcls, zkAuths,
377-
new ElectorCallbacks(), maxRetryNum);
381+
new ElectorCallbacks(), maxRetryNum, truststoreKeystore);
378382
}
379383

380384
private String getParentZnode() {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java

Lines changed: 62 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818

1919
package org.apache.hadoop.ipc;
2020

21+
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE;
22+
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT;
23+
2124
import java.io.IOException;
2225
import java.lang.reflect.Constructor;
2326
import java.lang.reflect.InvocationTargetException;
@@ -63,7 +66,7 @@ static Class<? extends RpcScheduler> convertSchedulerClass(
6366
}
6467

6568
private volatile boolean clientBackOffEnabled;
66-
private boolean serverFailOverEnabled;
69+
private volatile boolean serverFailOverEnabled;
6770

6871
// Atomic refs point to active callQueue
6972
// We have two so we can better control swapping
@@ -81,18 +84,15 @@ public CallQueueManager(Class<? extends BlockingQueue<E>> backingClass,
8184
namespace, conf);
8285
int[] capacityWeights = parseCapacityWeights(priorityLevels,
8386
namespace, conf);
87+
this.serverFailOverEnabled = getServerFailOverEnable(namespace, conf);
8488
BlockingQueue<E> bq = createCallQueueInstance(backingClass,
8589
priorityLevels, maxQueueSize, namespace, capacityWeights, conf);
8690
this.clientBackOffEnabled = clientBackOffEnabled;
87-
this.serverFailOverEnabled = conf.getBoolean(
88-
namespace + "." +
89-
CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE,
90-
CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT);
9191
this.putRef = new AtomicReference<BlockingQueue<E>>(bq);
9292
this.takeRef = new AtomicReference<BlockingQueue<E>>(bq);
9393
LOG.info("Using callQueue: {}, queueCapacity: {}, " +
94-
"scheduler: {}, ipcBackoff: {}.",
95-
backingClass, maxQueueSize, schedulerClass, clientBackOffEnabled);
94+
"scheduler: {}, ipcBackoff: {}, ipcFailOver: {}.",
95+
backingClass, maxQueueSize, schedulerClass, clientBackOffEnabled, serverFailOverEnabled);
9696
}
9797

9898
@VisibleForTesting // only!
@@ -105,6 +105,41 @@ public CallQueueManager(Class<? extends BlockingQueue<E>> backingClass,
105105
this.serverFailOverEnabled = serverFailOverEnabled;
106106
}
107107

108+
/**
109+
* Return boolean value configured by property 'ipc.<port>.callqueue.overflow.trigger.failover'
110+
* if it is present. If the config is not present, default config
111+
* (without port) is used to derive class i.e 'ipc.callqueue.overflow.trigger.failover',
112+
* and derived value is returned if configured. Otherwise, default value
113+
* {@link CommonConfigurationKeys#IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT} is returned.
114+
*
115+
* @param namespace Namespace "ipc" + "." + Server's listener port.
116+
* @param conf Configuration properties.
117+
* @return Value returned based on configuration.
118+
*/
119+
private boolean getServerFailOverEnable(String namespace, Configuration conf) {
120+
String propertyKey = namespace + "." +
121+
CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE;
122+
123+
if (conf.get(propertyKey) != null) {
124+
return conf.getBoolean(propertyKey,
125+
CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT);
126+
}
127+
128+
String[] nsPort = namespace.split("\\.");
129+
if (nsPort.length == 2) {
130+
// Only if ns is split with ".", we can separate namespace and port.
131+
// In the absence of "ipc.<port>.callqueue.overflow.trigger.failover" property,
132+
// we look up "ipc.callqueue.overflow.trigger.failover" property.
133+
return conf.getBoolean(nsPort[0] + "."
134+
+ IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE, IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT);
135+
}
136+
137+
// Otherwise return default value.
138+
LOG.info("{} not specified set default value is {}",
139+
IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE, IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT);
140+
return CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT;
141+
}
142+
108143
private static <T extends RpcScheduler> T createScheduler(
109144
Class<T> theClass, int priorityLevels, String ns, Configuration conf) {
110145
// Used for custom, configurable scheduler
@@ -155,9 +190,9 @@ private <T extends BlockingQueue<E>> T createCallQueueInstance(
155190
// Used for custom, configurable callqueues
156191
try {
157192
Constructor<T> ctor = theClass.getDeclaredConstructor(int.class,
158-
int.class, String.class, int[].class, Configuration.class);
159-
return ctor.newInstance(priorityLevels, maxLen, ns,
160-
capacityWeights, conf);
193+
int.class, String.class, int[].class, boolean.class, Configuration.class);
194+
return ctor.newInstance(priorityLevels, maxLen, ns, capacityWeights,
195+
this.serverFailOverEnabled, conf);
161196
} catch (RuntimeException e) {
162197
throw e;
163198
} catch (InvocationTargetException e) {
@@ -199,6 +234,20 @@ boolean isClientBackoffEnabled() {
199234
return clientBackOffEnabled;
200235
}
201236

237+
@VisibleForTesting
238+
public boolean isServerFailOverEnabled() {
239+
return serverFailOverEnabled;
240+
}
241+
242+
@VisibleForTesting
243+
public boolean isServerFailOverEnabledByQueue() {
244+
BlockingQueue<E> bq = putRef.get();
245+
if (bq instanceof FairCallQueue) {
246+
return ((FairCallQueue<E>) bq).isServerFailOverEnabled();
247+
}
248+
return false;
249+
}
250+
202251
// Based on policy to determine back off current call
203252
boolean shouldBackOff(Schedulable e) {
204253
return scheduler.shouldBackOff(e);
@@ -421,6 +470,9 @@ public synchronized void swapQueue(
421470
RpcScheduler newScheduler = createScheduler(schedulerClass, priorityLevels,
422471
ns, conf);
423472
int[] capacityWeights = parseCapacityWeights(priorityLevels, ns, conf);
473+
474+
// Update serverFailOverEnabled.
475+
this.serverFailOverEnabled = getServerFailOverEnable(ns, conf);
424476
BlockingQueue<E> newQ = createCallQueueInstance(queueClassToUse,
425477
priorityLevels, maxSize, ns, capacityWeights, conf);
426478

0 commit comments

Comments
 (0)