Skip to content

Commit e1bec48

Browse files
committed
YARN-11153. Make proxy server support yarn federation.
1 parent 36c4be8 commit e1bec48

File tree

11 files changed

+922
-98
lines changed

11 files changed

+922
-98
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,4 +609,9 @@ public boolean equals(Object obj) {
609609
protected interface Func<T, TResult> {
610610
TResult invoke(T input) throws Exception;
611611
}
612+
613+
@VisibleForTesting
614+
public FederationStateStore getStateStore() {
615+
return stateStore;
616+
}
612617
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.sun.jersey.spi.container.servlet.ServletContainer;
2424

2525
import org.apache.hadoop.yarn.metrics.GenericEventTypeMetrics;
26+
import org.apache.hadoop.yarn.server.webproxy.DefaultAppReportFetcher;
2627
import org.apache.hadoop.yarn.webapp.WebAppException;
2728
import org.slf4j.Logger;
2829
import org.slf4j.LoggerFactory;
@@ -1391,9 +1392,9 @@ protected void startWepApp() {
13911392
if(WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf).
13921393
equals(proxyHostAndPort)) {
13931394
if (HAUtil.isHAEnabled(conf)) {
1394-
fetcher = new AppReportFetcher(conf);
1395+
fetcher = new DefaultAppReportFetcher(conf);
13951396
} else {
1396-
fetcher = new AppReportFetcher(conf, getClientRMService());
1397+
fetcher = new DefaultAppReportFetcher(conf, getClientRMService());
13971398
}
13981399
builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME,
13991400
ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class);

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java

Lines changed: 54 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -19,89 +19,62 @@
1919
package org.apache.hadoop.yarn.server.webproxy;
2020

2121
import java.io.IOException;
22+
import org.apache.hadoop.classification.VisibleForTesting;
2223
import org.apache.hadoop.conf.Configuration;
23-
import org.apache.hadoop.ipc.RPC;
24-
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
2524
import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
26-
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
2725
import org.apache.hadoop.yarn.api.records.ApplicationId;
2826
import org.apache.hadoop.yarn.api.records.ApplicationReport;
2927
import org.apache.hadoop.yarn.client.AHSProxy;
30-
import org.apache.hadoop.yarn.client.ClientRMProxy;
3128
import org.apache.hadoop.yarn.conf.YarnConfiguration;
32-
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
3329
import org.apache.hadoop.yarn.exceptions.YarnException;
3430
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
3531
import org.apache.hadoop.yarn.factories.RecordFactory;
3632
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
33+
import org.apache.hadoop.yarn.util.StringHelper;
34+
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
3735

3836
/**
3937
* This class abstracts away how ApplicationReports are fetched.
4038
*/
41-
public class AppReportFetcher {
42-
enum AppReportSource { RM, AHS }
39+
public abstract class AppReportFetcher {
40+
41+
protected enum AppReportSource {RM, AHS}
42+
4343
private final Configuration conf;
44-
private final ApplicationClientProtocol applicationsManager;
45-
private final ApplicationHistoryProtocol historyManager;
46-
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
44+
private ApplicationHistoryProtocol historyManager;
45+
private String ahsAppPageUrlBase;
46+
private final RecordFactory recordFactory = RecordFactoryProvider
47+
.getRecordFactory(null);
4748
private boolean isAHSEnabled;
4849

49-
/**
50-
* Create a new Connection to the RM/Application History Server
51-
* to fetch Application reports.
52-
* @param conf the conf to use to know where the RM is.
53-
*/
5450
public AppReportFetcher(Configuration conf) {
51+
this.conf = conf;
5552
if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
5653
YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
57-
isAHSEnabled = true;
54+
this.isAHSEnabled = true;
55+
this.ahsAppPageUrlBase =
56+
StringHelper.pjoin(WebAppUtils.getHttpSchemePrefix(conf)
57+
+ WebAppUtils.getAHSWebAppURLWithoutScheme(conf),
58+
"applicationhistory", "app");
5859
}
59-
this.conf = conf;
6060
try {
61-
applicationsManager = ClientRMProxy.createRMProxy(conf,
62-
ApplicationClientProtocol.class);
63-
if (isAHSEnabled) {
64-
historyManager = getAHSProxy(conf);
61+
if (this.isAHSEnabled) {
62+
this.historyManager = getAHSProxy(conf);
6563
} else {
6664
this.historyManager = null;
6765
}
6866
} catch (IOException e) {
6967
throw new YarnRuntimeException(e);
7068
}
7169
}
72-
73-
/**
74-
* Create a direct connection to RM instead of a remote connection when
75-
* the proxy is running as part of the RM. Also create a remote connection to
76-
* Application History Server if it is enabled.
77-
* @param conf the configuration to use
78-
* @param applicationsManager what to use to get the RM reports.
79-
*/
80-
public AppReportFetcher(Configuration conf, ApplicationClientProtocol applicationsManager) {
81-
if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
82-
YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
83-
isAHSEnabled = true;
84-
}
85-
this.conf = conf;
86-
this.applicationsManager = applicationsManager;
87-
if (isAHSEnabled) {
88-
try {
89-
historyManager = getAHSProxy(conf);
90-
} catch (IOException e) {
91-
throw new YarnRuntimeException(e);
92-
}
93-
} else {
94-
this.historyManager = null;
95-
}
96-
}
9770

9871
protected ApplicationHistoryProtocol getAHSProxy(Configuration configuration)
9972
throws IOException {
10073
return AHSProxy.createAHSProxy(configuration,
101-
ApplicationHistoryProtocol.class,
102-
configuration.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
103-
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS,
104-
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT));
74+
ApplicationHistoryProtocol.class,
75+
configuration.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
76+
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS,
77+
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT));
10578
}
10679

10780
/**
@@ -112,46 +85,46 @@ protected ApplicationHistoryProtocol getAHSProxy(Configuration configuration)
11285
* @throws YarnException on any error.
11386
* @throws IOException
11487
*/
115-
public FetchedAppReport getApplicationReport(ApplicationId appId)
116-
throws YarnException, IOException {
117-
GetApplicationReportRequest request = recordFactory
118-
.newRecordInstance(GetApplicationReportRequest.class);
119-
request.setApplicationId(appId);
120-
121-
ApplicationReport appReport;
122-
FetchedAppReport fetchedAppReport;
123-
try {
124-
appReport = applicationsManager.
125-
getApplicationReport(request).getApplicationReport();
126-
fetchedAppReport = new FetchedAppReport(appReport, AppReportSource.RM);
127-
} catch (ApplicationNotFoundException e) {
128-
if (!isAHSEnabled) {
129-
// Just throw it as usual if historyService is not enabled.
130-
throw e;
131-
}
132-
//Fetch the application report from AHS
133-
appReport = historyManager.
134-
getApplicationReport(request).getApplicationReport();
135-
fetchedAppReport = new FetchedAppReport(appReport, AppReportSource.AHS);
136-
}
137-
return fetchedAppReport;
88+
public abstract FetchedAppReport getApplicationReport(ApplicationId appId)
89+
throws YarnException, IOException;
90+
91+
public abstract String getRmAppPageUrlBase(ApplicationId appId)
92+
throws IOException, YarnException;
93+
94+
public String getAhsAppPageUrlBase() {
95+
return this.ahsAppPageUrlBase;
13896
}
13997

140-
public void stop() {
141-
if (this.applicationsManager != null) {
142-
RPC.stopProxy(this.applicationsManager);
143-
}
144-
if (this.historyManager != null) {
145-
RPC.stopProxy(this.historyManager);
146-
}
98+
public abstract void stop();
99+
100+
protected Configuration getConf() {
101+
return this.conf;
102+
}
103+
104+
protected ApplicationHistoryProtocol getHistoryManager() {
105+
return this.historyManager;
106+
}
107+
108+
protected RecordFactory getRecordFactory() {
109+
return this.recordFactory;
110+
}
111+
112+
protected boolean isAHSEnabled() {
113+
return this.isAHSEnabled;
114+
}
115+
116+
@VisibleForTesting
117+
public void setHistoryManager(
118+
ApplicationHistoryProtocol historyManager) {
119+
this.historyManager = historyManager;
147120
}
148121

149122
/*
150123
* This class creates a bundle of the application report and the source from
151124
* where the the report was fetched. This allows the WebAppProxyServlet
152125
* to make decisions for the application report based on the source.
153126
*/
154-
static class FetchedAppReport {
127+
protected static class FetchedAppReport {
155128
private ApplicationReport appReport;
156129
private AppReportSource appReportSource;
157130

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to you under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
* <p>
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
* <p>
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
* License for the specific language governing permissions and limitations under
15+
* the License.
16+
*/
17+
18+
package org.apache.hadoop.yarn.server.webproxy;
19+
20+
import java.io.IOException;
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.ipc.RPC;
23+
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
24+
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
25+
import org.apache.hadoop.yarn.api.records.ApplicationId;
26+
import org.apache.hadoop.yarn.api.records.ApplicationReport;
27+
import org.apache.hadoop.yarn.client.ClientRMProxy;
28+
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
29+
import org.apache.hadoop.yarn.exceptions.YarnException;
30+
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
31+
import org.apache.hadoop.yarn.util.StringHelper;
32+
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
33+
34+
public class DefaultAppReportFetcher extends AppReportFetcher {
35+
36+
private final ApplicationClientProtocol applicationsManager;
37+
private String rmAppPageUrlBase;
38+
39+
/**
40+
* Create a new Connection to the RM/Application History Server
41+
* to fetch Application reports.
42+
* @param conf the conf to use to know where the RM is.
43+
*/
44+
public DefaultAppReportFetcher(Configuration conf) {
45+
super(conf);
46+
this.rmAppPageUrlBase = StringHelper
47+
.pjoin(WebAppUtils.getResolvedRMWebAppURLWithScheme(conf),
48+
"cluster", "app");
49+
try {
50+
this.applicationsManager = ClientRMProxy.createRMProxy(conf,
51+
ApplicationClientProtocol.class);
52+
} catch (IOException e) {
53+
throw new YarnRuntimeException(e);
54+
}
55+
}
56+
57+
/**
58+
* Create a direct connection to RM instead of a remote connection when
59+
* the proxy is running as part of the RM. Also create a remote connection to
60+
* Application History Server if it is enabled.
61+
* @param conf the configuration to use
62+
* @param applicationsManager what to use to get the RM reports.
63+
*/
64+
public DefaultAppReportFetcher(Configuration conf,
65+
ApplicationClientProtocol applicationsManager) {
66+
super(conf);
67+
this.rmAppPageUrlBase = StringHelper
68+
.pjoin(WebAppUtils.getResolvedRMWebAppURLWithScheme(conf),
69+
"cluster", "app");
70+
this.applicationsManager = applicationsManager;
71+
}
72+
73+
/**
74+
* Get an application report for the specified application id from the RM and
75+
* fall back to the Application History Server if not found in RM.
76+
* @param appId id of the application to get.
77+
* @return the ApplicationReport for the appId.
78+
* @throws YarnException on any error.
79+
* @throws IOException connection exception.
80+
*/
81+
public FetchedAppReport getApplicationReport(ApplicationId appId)
82+
throws YarnException, IOException {
83+
GetApplicationReportRequest request = getRecordFactory()
84+
.newRecordInstance(GetApplicationReportRequest.class);
85+
request.setApplicationId(appId);
86+
87+
ApplicationReport appReport;
88+
FetchedAppReport fetchedAppReport;
89+
try {
90+
appReport = applicationsManager.
91+
getApplicationReport(request).getApplicationReport();
92+
fetchedAppReport = new FetchedAppReport(appReport, AppReportSource.RM);
93+
} catch (ApplicationNotFoundException e) {
94+
if (!isAHSEnabled()) {
95+
// Just throw it as usual if historyService is not enabled.
96+
throw e;
97+
}
98+
//Fetch the application report from AHS
99+
appReport = getHistoryManager().getApplicationReport(request)
100+
.getApplicationReport();
101+
fetchedAppReport = new FetchedAppReport(appReport, AppReportSource.AHS);
102+
}
103+
return fetchedAppReport;
104+
}
105+
106+
public String getRmAppPageUrlBase(ApplicationId appId)
107+
throws YarnException, IOException {
108+
return this.rmAppPageUrlBase;
109+
}
110+
111+
public void stop() {
112+
if (this.applicationsManager != null) {
113+
RPC.stopProxy(this.applicationsManager);
114+
}
115+
if (this.getHistoryManager() != null) {
116+
RPC.stopProxy(this.getHistoryManager());
117+
}
118+
}
119+
}

0 commit comments

Comments
 (0)