2222import java .net .InetSocketAddress ;
2323
2424import org .apache .hadoop .fs .UnsupportedFileSystemException ;
25+ import org .apache .hadoop .io .retry .UnreliableInterface ;
2526import org .apache .hadoop .security .SecurityUtil ;
2627import org .apache .hadoop .security .UserGroupInformation ;
2728import org .apache .hadoop .security .token .Token ;
@@ -58,8 +59,8 @@ public TestNMProxy() throws UnsupportedFileSystemException {
5859
5960 @ Before
6061 public void setUp () throws Exception {
61- conf . setLong ( YarnConfiguration . CLIENT_NM_CONNECT_MAX_WAIT_MS , 10000 );
62- conf . setLong ( YarnConfiguration . CLIENT_NM_CONNECT_RETRY_INTERVAL_MS , 100 );
62+ containerManager . start ( );
63+ containerManager . setBlockNewContainerRequests ( false );
6364 }
6465
6566 @ Override
@@ -77,7 +78,13 @@ public StartContainersResponse startContainers(
7778 // This causes super to throw an NMNotYetReadyException
7879 containerManager .setBlockNewContainerRequests (true );
7980 } else {
80- throw new java .net .ConnectException ("start container exception" );
81+ if (isRetryPolicyRetryForEver ()) {
82+ // Throw non network exception
83+ throw new IOException (
84+ new UnreliableInterface .UnreliableException ());
85+ } else {
86+ throw new java .net .ConnectException ("start container exception" );
87+ }
8188 }
8289 } else {
8390 // This stops super from throwing an NMNotYetReadyException
@@ -86,6 +93,11 @@ public StartContainersResponse startContainers(
8693 return super .startContainers (requests );
8794 }
8895
96+ private boolean isRetryPolicyRetryForEver () {
97+ return conf .getLong (
98+ YarnConfiguration .CLIENT_NM_CONNECT_MAX_WAIT_MS , 1000 ) == -1 ;
99+ }
100+
89101 @ Override
90102 public StopContainersResponse stopContainers (
91103 StopContainersRequest requests ) throws YarnException , IOException {
@@ -110,30 +122,13 @@ public GetContainerStatusesResponse getContainerStatuses(
110122 }
111123
112124 @ Test (timeout = 20000 )
113- public void testNMProxyRetry () throws Exception {
114- containerManager .start ();
115- containerManager .setBlockNewContainerRequests (false );
116- StartContainersRequest allRequests =
117- Records .newRecord (StartContainersRequest .class );
118- ApplicationId appId = ApplicationId .newInstance (1 , 1 );
119- ApplicationAttemptId attemptId = ApplicationAttemptId .newInstance (appId , 1 );
125+ public void testNMProxyRetry () throws Exception {
126+ conf .setLong (YarnConfiguration .CLIENT_NM_CONNECT_MAX_WAIT_MS , 10000 );
127+ conf .setLong (YarnConfiguration .CLIENT_NM_CONNECT_RETRY_INTERVAL_MS , 100 );
128+ StartContainersRequest allRequests =
129+ Records .newRecord (StartContainersRequest .class );
120130
121- org .apache .hadoop .yarn .api .records .Token nmToken =
122- context .getNMTokenSecretManager ().createNMToken (attemptId ,
123- context .getNodeId (), user );
124- final InetSocketAddress address =
125- conf .getSocketAddr (YarnConfiguration .NM_BIND_HOST ,
126- YarnConfiguration .NM_ADDRESS , YarnConfiguration .DEFAULT_NM_ADDRESS ,
127- YarnConfiguration .DEFAULT_NM_PORT );
128- Token <NMTokenIdentifier > token =
129- ConverterUtils .convertFromYarn (nmToken ,
130- SecurityUtil .buildTokenService (address ));
131- UserGroupInformation ugi = UserGroupInformation .createRemoteUser (user );
132- ugi .addToken (token );
133-
134- ContainerManagementProtocol proxy =
135- NMProxy .createNMProxy (conf , ContainerManagementProtocol .class , ugi ,
136- YarnRPC .create (conf ), address );
131+ ContainerManagementProtocol proxy = getNMProxy ();
137132
138133 retryCount = 0 ;
139134 shouldThrowNMNotYetReadyException = false ;
@@ -156,4 +151,38 @@ public void testNMProxyRetry() throws Exception {
156151 proxy .startContainers (allRequests );
157152 Assert .assertEquals (5 , retryCount );
158153 }
154+
155+ @ Test (timeout = 20000 , expected = IOException .class )
156+ public void testShouldNotRetryForeverForNonNetworkExceptionsOnNMConnections ()
157+ throws Exception {
158+ conf .setLong (YarnConfiguration .CLIENT_NM_CONNECT_MAX_WAIT_MS , -1 );
159+ StartContainersRequest allRequests =
160+ Records .newRecord (StartContainersRequest .class );
161+
162+ ContainerManagementProtocol proxy = getNMProxy ();
163+
164+ shouldThrowNMNotYetReadyException = false ;
165+ retryCount = 0 ;
166+ proxy .startContainers (allRequests );
167+ }
168+
169+ private ContainerManagementProtocol getNMProxy () {
170+ ApplicationId appId = ApplicationId .newInstance (1 , 1 );
171+ ApplicationAttemptId attemptId = ApplicationAttemptId .newInstance (appId , 1 );
172+
173+ org .apache .hadoop .yarn .api .records .Token nmToken =
174+ context .getNMTokenSecretManager ().createNMToken (attemptId ,
175+ context .getNodeId (), user );
176+ final InetSocketAddress address =
177+ conf .getSocketAddr (YarnConfiguration .NM_BIND_HOST ,
178+ YarnConfiguration .NM_ADDRESS , YarnConfiguration .DEFAULT_NM_ADDRESS ,
179+ YarnConfiguration .DEFAULT_NM_PORT );
180+ Token <NMTokenIdentifier > token =
181+ ConverterUtils .convertFromYarn (nmToken ,
182+ SecurityUtil .buildTokenService (address ));
183+ UserGroupInformation ugi = UserGroupInformation .createRemoteUser (user );
184+ ugi .addToken (token );
185+ return NMProxy .createNMProxy (conf , ContainerManagementProtocol .class , ugi ,
186+ YarnRPC .create (conf ), address );
187+ }
159188}
0 commit comments