1919package org .apache .hadoop .yarn .conf ;
2020
2121import java .net .InetSocketAddress ;
22+ import java .net .UnknownHostException ;
23+ import java .util .ArrayList ;
24+ import java .util .Arrays ;
2225import java .util .Collection ;
26+ import java .util .Collections ;
27+ import java .util .HashMap ;
28+ import java .util .HashSet ;
29+ import java .util .List ;
30+ import java .util .Map ;
31+ import java .util .Set ;
2332
2433import org .apache .hadoop .HadoopIllegalArgumentException ;
2534import org .apache .hadoop .classification .InterfaceAudience ;
2635import org .apache .hadoop .conf .Configuration ;
36+ import org .apache .hadoop .net .DomainNameResolver ;
37+ import org .apache .hadoop .net .DomainNameResolverFactory ;
2738import org .apache .hadoop .net .NetUtils ;
2839import org .apache .hadoop .yarn .exceptions .YarnRuntimeException ;
2940
@@ -39,8 +50,29 @@ public class HAUtil {
3950 public static final String BAD_CONFIG_MESSAGE_PREFIX =
4051 "Invalid configuration! " ;
4152
53+ private final static List <String > RM_ADDRESS_CONFIG_KEYS = Arrays .asList (
54+ YarnConfiguration .RM_ADDRESS ,
55+ YarnConfiguration .RM_SCHEDULER_ADDRESS ,
56+ YarnConfiguration .RM_RESOURCE_TRACKER_ADDRESS ,
57+ YarnConfiguration .RM_ADMIN_ADDRESS ,
58+ YarnConfiguration .RM_WEBAPP_ADDRESS ,
59+ YarnConfiguration .RM_WEBAPP_HTTPS_ADDRESS
60+ );
61+
62+ private static DomainNameResolver dnr ;
63+ private static Collection <String > originalRMIDs = null ;
64+
4265 private HAUtil () { /* Hidden constructor */ }
4366
67+ public static DomainNameResolver getDnr () {
68+ return dnr ;
69+ }
70+
71+ public static void setDnrByConfiguration (Configuration conf ) {
72+ HAUtil .dnr = DomainNameResolverFactory .newInstance (
73+ conf , YarnConfiguration .RESOLVE_RM_ADDRESS_KEY );
74+ }
75+
4476 private static void throwBadConfigurationException (String msg ) {
4577 throw new YarnRuntimeException (BAD_CONFIG_MESSAGE_PREFIX + msg );
4678 }
@@ -118,6 +150,12 @@ public static void verifyAndSetConfiguration(Configuration conf)
118150 * Then set the RM-ids.
119151 */
120152 private static void verifyAndSetRMHAIdsList (Configuration conf ) {
153+ boolean resolveNeeded = conf .getBoolean (
154+ YarnConfiguration .RESOLVE_RM_ADDRESS_NEEDED_KEY ,
155+ YarnConfiguration .RESOLVE_RM_ADDRESS_NEEDED_DEFAULT );
156+ if (resolveNeeded ) {
157+ getRMHAId (conf );
158+ }
121159 Collection <String > ids =
122160 conf .getTrimmedStringCollection (YarnConfiguration .RM_HA_IDS );
123161 if (ids .size () < 2 ) {
@@ -227,6 +265,16 @@ public static Collection<String> getRMHAIds(Configuration conf) {
227265 return conf .getStringCollection (YarnConfiguration .RM_HA_IDS );
228266 }
229267
268+ /**
269+ * Instead of returning RM_HA_IDS in current configurations, it
270+ * would return the originally preset one in case of DNS resolving
271+ * @param conf Configuration.
272+ * @return RM Ids from original xml file
273+ */
274+ public static Collection <String > getOriginalRMHAIds (Configuration conf ) {
275+ return originalRMIDs == null ? getRMHAIds (conf ) : originalRMIDs ;
276+ }
277+
230278 /**
231279 * @param conf Configuration. Please use verifyAndSetRMHAId to check.
232280 * @return RM Id on success
@@ -235,19 +283,10 @@ public static String getRMHAId(Configuration conf) {
235283 int found = 0 ;
236284 String currentRMId = conf .getTrimmed (YarnConfiguration .RM_HA_ID );
237285 if (currentRMId == null ) {
238- for (String rmId : getRMHAIds (conf )) {
239- String key = addSuffix (YarnConfiguration .RM_ADDRESS , rmId );
240- String addr = conf .get (key );
241- if (addr == null ) {
242- continue ;
243- }
244- InetSocketAddress s ;
245- try {
246- s = NetUtils .createSocketAddr (addr );
247- } catch (Exception e ) {
248- LOG .warn ("Exception in creating socket address " + addr , e );
249- continue ;
250- }
286+ Map <String , InetSocketAddress > idAddressPairs = getResolvedRMIdPairs (conf );
287+ for (Map .Entry <String , InetSocketAddress > entry : idAddressPairs .entrySet ()) {
288+ String rmId = entry .getKey ();
289+ InetSocketAddress s = entry .getValue ();
251290 if (!s .isUnresolved () && NetUtils .isLocalAddress (s .getAddress ())) {
252291 currentRMId = rmId .trim ();
253292 found ++;
@@ -262,6 +301,179 @@ public static String getRMHAId(Configuration conf) {
262301 return currentRMId ;
263302 }
264303
304+ /**
305+ * This function resolves all RMIds with their address. For multi-A DNS records,
306+ * it will resolve all of them, and generate a new Id for each of them.
307+ *
308+ * @param conf Configuration
309+ * @return Map key as RMId, value as its address
310+ */
311+ public static Map <String , InetSocketAddress > getResolvedRMIdPairs (
312+ Configuration conf ) {
313+ boolean resolveNeeded = conf .getBoolean (
314+ YarnConfiguration .RESOLVE_RM_ADDRESS_NEEDED_KEY ,
315+ YarnConfiguration .RESOLVE_RM_ADDRESS_NEEDED_DEFAULT );
316+ boolean requireFQDN = conf .getBoolean (
317+ YarnConfiguration .RESOLVE_RM_ADDRESS_TO_FQDN ,
318+ YarnConfiguration .RESOLVE_RM_ADDRESS_TO_FQDN_DEFAULT );
319+ // In case client using DIFFERENT addresses for each service address
320+ // need to categorize them first
321+ Map <List <String >, List <String >> addressesConfigKeysMap = new HashMap <>();
322+ Collection <String > rmIds = getOriginalRMHAIds (conf );
323+ for (String configKey : RM_ADDRESS_CONFIG_KEYS ) {
324+ List <String > addresses = new ArrayList <>();
325+ for (String rmId : rmIds ) {
326+ String keyToRead = addSuffix (configKey , rmId );
327+ InetSocketAddress address = getInetSocketAddressFromString (
328+ conf .get (keyToRead ));
329+ if (address != null ) {
330+ addresses .add (address .getHostName ());
331+ }
332+ }
333+ Collections .sort (addresses );
334+ List <String > configKeysOfTheseAddresses = addressesConfigKeysMap .get (addresses );
335+ if (configKeysOfTheseAddresses == null ) {
336+ configKeysOfTheseAddresses = new ArrayList <>();
337+ addressesConfigKeysMap .put (addresses , configKeysOfTheseAddresses );
338+ }
339+ configKeysOfTheseAddresses .add (configKey );
340+ }
341+ // We need to resolve and override by group (categorized by their input host)
342+ // But since the function is called from "getRMHAId",
343+ // this function would only return value which is corresponded to YarnConfiguration.RM_ADDRESS
344+ Map <String , InetSocketAddress > ret = null ;
345+ for (List <String > configKeys : addressesConfigKeysMap .values ()) {
346+ Map <String , InetSocketAddress > res = getResolvedIdPairs (conf , resolveNeeded , requireFQDN , getOriginalRMHAIds (conf ),
347+ configKeys .get (0 ), YarnConfiguration .RM_HA_IDS , configKeys );
348+ if (configKeys .contains (YarnConfiguration .RM_ADDRESS )) {
349+ ret = res ;
350+ }
351+ }
352+ return ret ;
353+ }
354+
355+ private static Map <String , InetSocketAddress > getResolvedIdPairs (
356+ Configuration conf , boolean resolveNeeded , boolean requireFQDN , Collection <String > ids ,
357+ String configKey , String configKeyToReplace , List <String > listOfConfigKeysToReplace ) {
358+ Map <String , InetSocketAddress > idAddressPairs = new HashMap <>();
359+ Map <String , String > generatedIdToOriginalId = new HashMap <>();
360+ for (String id : ids ) {
361+ String key = addSuffix (configKey , id );
362+ String addr = conf .get (key ); // string with port
363+ InetSocketAddress address = getInetSocketAddressFromString (addr );
364+ if (address == null ) {
365+ continue ;
366+ }
367+ if (resolveNeeded ) {
368+ if (dnr == null ) {
369+ setDnrByConfiguration (conf );
370+ }
371+ // If the address needs to be resolved, get all of the IP addresses
372+ // from this address and pass them into the map
373+ LOG .info ("Multi-A domain name " + addr +
374+ " will be resolved by " + dnr .getClass ().getName ());
375+ int port = address .getPort ();
376+ String [] resolvedHostNames ;
377+ try {
378+ resolvedHostNames = dnr .getAllResolvedHostnameByDomainName (
379+ address .getHostName (), requireFQDN );
380+ } catch (UnknownHostException e ) {
381+ LOG .warn ("Exception in resolving socket address "
382+ + address .getHostName (), e );
383+ continue ;
384+ }
385+ LOG .info ("Resolved addresses for " + addr +
386+ " is " + Arrays .toString (resolvedHostNames ));
387+ if (resolvedHostNames == null || resolvedHostNames .length < 1 ) {
388+ LOG .warn ("Cannot resolve from address " + address .getHostName ());
389+ } else {
390+ // If multiple address resolved, corresponding id needs to be created
391+ for (int i = 0 ; i < resolvedHostNames .length ; i ++) {
392+ String generatedRMId = id + "_resolved_" + (i + 1 );
393+ idAddressPairs .put (generatedRMId ,
394+ new InetSocketAddress (resolvedHostNames [i ], port ));
395+ generatedIdToOriginalId .put (generatedRMId , id );
396+ }
397+ }
398+ overrideIdsInConfiguration (
399+ idAddressPairs , generatedIdToOriginalId , configKeyToReplace ,
400+ listOfConfigKeysToReplace , conf );
401+ } else {
402+ idAddressPairs .put (id , address );
403+ }
404+ }
405+ return idAddressPairs ;
406+ }
407+
408+ /**
409+ * This function override all RMIds and their addresses by the input Map.
410+ *
411+ * @param idAddressPairs key as Id, value as its address
412+ * @param generatedIdToOriginalId key as generated rmId from multi-A,
413+ * value as its original input Id
414+ * @param configKeyToReplace key to replace
415+ * @param listOfConfigKeysToReplace list of keys to replace/add
416+ * @param conf Configuration
417+ */
418+ synchronized static void overrideIdsInConfiguration (
419+ Map <String , InetSocketAddress > idAddressPairs ,
420+ Map <String , String > generatedIdToOriginalId ,
421+ String configKeyToReplace , List <String > listOfConfigKeysToReplace ,
422+ Configuration conf ) {
423+ Collection <String > currentIds = getRMHAIds (conf );
424+ Set <String > resolvedIds = new HashSet <>(idAddressPairs .keySet ());
425+ // override rm-ids
426+ if (originalRMIDs == null ) {
427+ originalRMIDs = currentIds ;
428+ }
429+ // if it is already resolved, we need to form a superset
430+ resolvedIds .addAll ((currentIds ));
431+ resolvedIds .removeAll (generatedIdToOriginalId .values ());
432+ conf .setStrings (configKeyToReplace ,
433+ resolvedIds .toArray (new String [0 ]));
434+ // override/add address configuration entries for each rm-id
435+ for (Map .Entry <String , InetSocketAddress > entry : idAddressPairs .entrySet ()) {
436+ String rmId = entry .getKey ();
437+ String addr = entry .getValue ().getHostName ();
438+ String originalRMId = generatedIdToOriginalId .get (rmId );
439+ if (originalRMId != null ) {
440+ // for each required configKeys, get its port and then set it back
441+ for (String configKey : listOfConfigKeysToReplace ) {
442+ String keyToRead = addSuffix (configKey , originalRMId );
443+ InetSocketAddress originalAddress = getInetSocketAddressFromString (
444+ conf .get (keyToRead ));
445+ if (originalAddress == null ) {
446+ LOG .warn ("Missing configuration for key " + keyToRead );
447+ continue ;
448+ }
449+ int port = originalAddress .getPort ();
450+ String keyToWrite = addSuffix (configKey , rmId );
451+ conf .setSocketAddr (keyToWrite , new InetSocketAddress (addr , port ));
452+ }
453+ }
454+ }
455+ }
456+
457+ /**
458+ * Helper function to create InetsocketAddress from string address.
459+ *
460+ * @param addr string format of address accepted by NetUtils.createSocketAddr
461+ * @return InetSocketAddress of input, would return null upon any kinds of invalid inout
462+ */
463+ public static InetSocketAddress getInetSocketAddressFromString (String addr ) {
464+ if (addr == null ) {
465+ return null ;
466+ }
467+ InetSocketAddress address ;
468+ try {
469+ address = NetUtils .createSocketAddr (addr );
470+ } catch (Exception e ) {
471+ LOG .warn ("Exception in creating socket address " + addr , e );
472+ return null ;
473+ }
474+ return address ;
475+ }
476+
265477 @ VisibleForTesting
266478 static String getNeedToSetValueMessage (String confKey ) {
267479 return confKey + " needs to be set in an HA configuration." ;
0 commit comments