Skip to content

Commit 50f7f6d

Browse files
authored
YARN-10210. Add a RMFailoverProxyProvider that does DNS resolution on failover.
1 parent 3d5ade1 commit 50f7f6d

File tree

4 files changed

+726
-0
lines changed

4 files changed

+726
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to you under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
* <p>
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
* <p>
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
* License for the specific language governing permissions and limitations under
15+
* the License.
16+
*/
17+
package org.apache.hadoop.yarn.client;
18+
19+
import org.apache.hadoop.conf.Configuration;
20+
import org.apache.hadoop.io.retry.FailoverProxyProvider;
21+
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
22+
import org.apache.hadoop.yarn.api.records.NodeReport;
23+
import org.apache.hadoop.yarn.client.api.YarnClient;
24+
import org.apache.hadoop.yarn.conf.YarnConfiguration;
25+
import org.apache.hadoop.yarn.exceptions.YarnException;
26+
import org.apache.hadoop.yarn.server.MiniYARNCluster;
27+
import org.junit.Before;
28+
import org.junit.Test;
29+
30+
import java.io.Closeable;
31+
import java.io.IOException;
32+
import java.lang.reflect.InvocationHandler;
33+
import java.lang.reflect.Proxy;
34+
import java.net.InetSocketAddress;
35+
import java.util.List;
36+
37+
import static org.junit.Assert.*;
38+
import static org.mockito.Mockito.any;
39+
import static org.mockito.Mockito.eq;
40+
import static org.mockito.Mockito.times;
41+
import static org.mockito.Mockito.mock;
42+
import static org.mockito.Mockito.verify;
43+
import static org.mockito.Mockito.when;
44+
45+
/**
46+
* Unit tests for {@link DefaultNoHARMFailoverProxyProvider} and
47+
* {@link AutoRefreshNoHARMFailoverProxyProvider}.
48+
*/
49+
public class TestNoHaRMFailoverProxyProvider {
50+
51+
// Default port of yarn RM
52+
private static final int RM1_PORT = 8032;
53+
private static final int RM2_PORT = 8031;
54+
55+
private static final int NUMNODEMANAGERS = 1;
56+
private Configuration conf;
57+
58+
private class TestProxy extends Proxy implements Closeable {
59+
protected TestProxy(InvocationHandler h) {
60+
super(h);
61+
}
62+
63+
@Override
64+
public void close() throws IOException {
65+
}
66+
}
67+
68+
@Before
69+
public void setUp() throws IOException, YarnException {
70+
conf = new YarnConfiguration();
71+
}
72+
73+
/**
74+
* Tests the proxy generated by {@link DefaultNoHAFailoverProxyProvider}
75+
* will connect to RM.
76+
*/
77+
@Test
78+
public void testRestartedRM() throws Exception {
79+
MiniYARNCluster cluster =
80+
new MiniYARNCluster("testRestartedRMNegative", NUMNODEMANAGERS, 1, 1);
81+
YarnClient rmClient = YarnClient.createYarnClient();
82+
try {
83+
cluster.init(conf);
84+
cluster.start();
85+
final Configuration yarnConf = cluster.getConfig();
86+
rmClient = YarnClient.createYarnClient();
87+
rmClient.init(yarnConf);
88+
rmClient.start();
89+
List <NodeReport> nodeReports = rmClient.getNodeReports();
90+
assertEquals(
91+
"The proxy didn't get expected number of node reports",
92+
NUMNODEMANAGERS, nodeReports.size());
93+
} finally {
94+
if (rmClient != null) {
95+
rmClient.stop();
96+
}
97+
cluster.stop();
98+
}
99+
}
100+
101+
/**
102+
* Tests the proxy generated by
103+
* {@link AutoRefreshNoHARMFailoverProxyProvider} will connect to RM.
104+
*/
105+
@Test
106+
public void testConnectingToRM() throws Exception {
107+
conf.setClass(YarnConfiguration.CLIENT_FAILOVER_NO_HA_PROXY_PROVIDER,
108+
AutoRefreshNoHARMFailoverProxyProvider.class,
109+
RMFailoverProxyProvider.class);
110+
MiniYARNCluster cluster =
111+
new MiniYARNCluster("testRestartedRMNegative", NUMNODEMANAGERS, 1, 1);
112+
YarnClient rmClient = null;
113+
try {
114+
cluster.init(conf);
115+
cluster.start();
116+
final Configuration yarnConf = cluster.getConfig();
117+
rmClient = YarnClient.createYarnClient();
118+
rmClient.init(yarnConf);
119+
rmClient.start();
120+
List <NodeReport> nodeReports = rmClient.getNodeReports();
121+
assertEquals(
122+
"The proxy didn't get expected number of node reports",
123+
NUMNODEMANAGERS, nodeReports.size());
124+
} finally {
125+
if (rmClient != null) {
126+
rmClient.stop();
127+
}
128+
cluster.stop();
129+
}
130+
}
131+
132+
/**
133+
* Test that the {@link DefaultNoHARMFailoverProxyProvider}
134+
* will generate different proxies after RM IP changed
135+
* and {@link DefaultNoHARMFailoverProxyProvider#performFailover(Object)}
136+
* get called.
137+
*/
138+
@Test
139+
public void testDefaultFPPGetOneProxy() throws Exception {
140+
// Create a proxy and mock a RMProxy
141+
Proxy mockProxy1 = new TestProxy((proxy, method, args) -> null);
142+
Class protocol = ApplicationClientProtocol.class;
143+
RMProxy mockRMProxy = mock(RMProxy.class);
144+
DefaultNoHARMFailoverProxyProvider <RMProxy> fpp =
145+
new DefaultNoHARMFailoverProxyProvider<RMProxy>();
146+
147+
InetSocketAddress mockAdd1 = new InetSocketAddress(RM1_PORT);
148+
149+
// Mock RMProxy methods
150+
when(mockRMProxy.getRMAddress(any(YarnConfiguration.class),
151+
any(Class.class))).thenReturn(mockAdd1);
152+
when(mockRMProxy.getProxy(any(YarnConfiguration.class),
153+
any(Class.class), eq(mockAdd1))).thenReturn(mockProxy1);
154+
155+
// Initialize failover proxy provider and get proxy from it.
156+
fpp.init(conf, mockRMProxy, protocol);
157+
FailoverProxyProvider.ProxyInfo<RMProxy> actualProxy1 = fpp.getProxy();
158+
assertEquals(
159+
"AutoRefreshRMFailoverProxyProvider doesn't generate " +
160+
"expected proxy",
161+
mockProxy1, actualProxy1.proxy);
162+
163+
// Invoke fpp.getProxy() multiple times and
164+
// validate the returned proxy is always mockProxy1
165+
actualProxy1 = fpp.getProxy();
166+
assertEquals(
167+
"AutoRefreshRMFailoverProxyProvider doesn't generate " +
168+
"expected proxy",
169+
mockProxy1, actualProxy1.proxy);
170+
actualProxy1 = fpp.getProxy();
171+
assertEquals(
172+
"AutoRefreshRMFailoverProxyProvider doesn't generate " +
173+
"expected proxy",
174+
mockProxy1, actualProxy1.proxy);
175+
176+
// verify that mockRMProxy.getProxy() is invoked once only.
177+
verify(mockRMProxy, times(1))
178+
.getProxy(any(YarnConfiguration.class), any(Class.class),
179+
eq(mockAdd1));
180+
181+
// Perform Failover and get proxy again from failover proxy provider
182+
fpp.performFailover(actualProxy1.proxy);
183+
FailoverProxyProvider.ProxyInfo<RMProxy> actualProxy2 = fpp.getProxy();
184+
assertEquals("AutoRefreshRMFailoverProxyProvider " +
185+
"doesn't generate expected proxy after failover",
186+
mockProxy1, actualProxy2.proxy);
187+
188+
// verify that mockRMProxy.getProxy() didn't get invoked again after
189+
// performFailover()
190+
verify(mockRMProxy, times(1))
191+
.getProxy(any(YarnConfiguration.class), any(Class.class),
192+
eq(mockAdd1));
193+
}
194+
195+
/**
196+
* Test that the {@link AutoRefreshNoHARMFailoverProxyProvider}
197+
* will generate different proxies after RM IP changed
198+
* and {@link AutoRefreshNoHARMFailoverProxyProvider#performFailover(Object)}
199+
* get called.
200+
*/
201+
@Test
202+
public void testAutoRefreshIPChange() throws Exception {
203+
conf.setClass(YarnConfiguration.CLIENT_FAILOVER_NO_HA_PROXY_PROVIDER,
204+
AutoRefreshNoHARMFailoverProxyProvider.class,
205+
RMFailoverProxyProvider.class);
206+
207+
// Create two proxies and mock a RMProxy
208+
Proxy mockProxy1 = new TestProxy((proxy, method, args) -> null);
209+
Proxy mockProxy2 = new TestProxy((proxy, method, args) -> null);
210+
Class protocol = ApplicationClientProtocol.class;
211+
RMProxy mockRMProxy = mock(RMProxy.class);
212+
AutoRefreshNoHARMFailoverProxyProvider<RMProxy> fpp =
213+
new AutoRefreshNoHARMFailoverProxyProvider<RMProxy>();
214+
215+
// generate two address with different ports.
216+
InetSocketAddress mockAdd1 = new InetSocketAddress(RM1_PORT);
217+
InetSocketAddress mockAdd2 = new InetSocketAddress(RM2_PORT);
218+
219+
// Mock RMProxy methods
220+
when(mockRMProxy.getRMAddress(any(YarnConfiguration.class),
221+
any(Class.class))).thenReturn(mockAdd1);
222+
when(mockRMProxy.getProxy(any(YarnConfiguration.class),
223+
any(Class.class), eq(mockAdd1))).thenReturn(mockProxy1);
224+
225+
// Initialize proxy provider and get proxy from it.
226+
fpp.init(conf, mockRMProxy, protocol);
227+
FailoverProxyProvider.ProxyInfo <RMProxy> actualProxy1 = fpp.getProxy();
228+
assertEquals(
229+
"AutoRefreshRMFailoverProxyProvider doesn't generate " +
230+
"expected proxy",
231+
mockProxy1, actualProxy1.proxy);
232+
233+
// Invoke fpp.getProxy() multiple times and
234+
// validate the returned proxy is always mockProxy1
235+
actualProxy1 = fpp.getProxy();
236+
assertEquals(
237+
"AutoRefreshRMFailoverProxyProvider doesn't generate " +
238+
"expected proxy",
239+
mockProxy1, actualProxy1.proxy);
240+
actualProxy1 = fpp.getProxy();
241+
assertEquals(
242+
"AutoRefreshRMFailoverProxyProvider doesn't generate " +
243+
"expected proxy",
244+
mockProxy1, actualProxy1.proxy);
245+
246+
// verify that mockRMProxy.getProxy() is invoked once only.
247+
verify(mockRMProxy, times(1))
248+
.getProxy(any(YarnConfiguration.class), any(Class.class),
249+
eq(mockAdd1));
250+
251+
// Mock RMProxy methods to generate different proxy
252+
// based on different IP address.
253+
when(mockRMProxy.getRMAddress(
254+
any(YarnConfiguration.class),
255+
any(Class.class))).thenReturn(mockAdd2);
256+
when(mockRMProxy.getProxy(
257+
any(YarnConfiguration.class),
258+
any(Class.class), eq(mockAdd2))).thenReturn(mockProxy2);
259+
260+
// Perform Failover and get proxy again from failover proxy provider
261+
fpp.performFailover(actualProxy1.proxy);
262+
FailoverProxyProvider.ProxyInfo <RMProxy> actualProxy2 = fpp.getProxy();
263+
assertEquals("AutoRefreshNoHARMFailoverProxyProvider " +
264+
"doesn't generate expected proxy after failover",
265+
mockProxy2, actualProxy2.proxy);
266+
267+
// check the proxy is different with the one we created before.
268+
assertNotEquals("AutoRefreshNoHARMFailoverProxyProvider " +
269+
"shouldn't generate same proxy after failover",
270+
actualProxy1.proxy, actualProxy2.proxy);
271+
}
272+
}

0 commit comments

Comments
 (0)