Skip to content

Commit e4cfeeb

Browse files
Fred Jijagadish-northguard
authored andcommitted
SAMZA-1143; Universal config support for localized resource
More details in https://issues.apache.org/jira/browse/SAMZA-1143 Tests: ./gradlew clean check successful and all unit tests passed Author: Fred Ji <fji@linkedin.com> Reviewers: Jagadish <jagadish@apache.org> Closes apache#90 from fredji97/universalLocalizer
1 parent 9d52e99 commit e4cfeeb

File tree

14 files changed

+836
-59
lines changed

14 files changed

+836
-59
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.samza.job.yarn;
20+
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import org.apache.commons.lang.StringUtils;
24+
import org.apache.samza.config.Config;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
29+
/**
30+
* FileSystemImplConfig is intended to manage the Samza config for fs.&lt;scheme&gt;impl.
31+
* e.g. fs.http.impl
32+
*/
33+
public class FileSystemImplConfig {
34+
private static final Logger log = LoggerFactory.getLogger(FileSystemImplConfig.class);
35+
private static final String FS_IMPL_PREFIX = "fs.";
36+
private static final String FS_IMPL_SUFFIX = ".impl";
37+
private static final String FS_IMPL = "fs.%s.impl";
38+
39+
private final Config config;
40+
41+
public FileSystemImplConfig(final Config config) {
42+
if (null == config) {
43+
throw new IllegalArgumentException("config cannot be null");
44+
}
45+
this.config = config;
46+
}
47+
48+
/**
49+
* Get all schemes
50+
* @return List of schemes in strings
51+
*/
52+
public List<String> getSchemes() {
53+
Config subConfig = config.subset(FS_IMPL_PREFIX, true);
54+
List<String> schemes = new ArrayList<String>();
55+
for (String key : subConfig.keySet()) {
56+
if (key.endsWith(FS_IMPL_SUFFIX)) {
57+
schemes.add(key.substring(0, key.length() - FS_IMPL_SUFFIX.length()));
58+
}
59+
}
60+
return schemes;
61+
}
62+
63+
/**
64+
* Get the fs.&lt;scheme&gt;impl as the config key from scheme
65+
* @param scheme scheme name, such as http, hdfs, myscheme
66+
* @return fs.&lt;scheme&gt;impl
67+
*/
68+
public String getFsImplKey(final String scheme) {
69+
String fsImplKey = String.format(FS_IMPL, scheme);
70+
return fsImplKey;
71+
}
72+
73+
/**
74+
* Get the class name corresponding for the given scheme
75+
* @param scheme scheme name, such as http, hdfs, myscheme
76+
* @return full scoped class name for the file system for &lt;scheme&gt;
77+
*/
78+
public String getFsImplClassName(final String scheme) {
79+
String fsImplKey = getFsImplKey(scheme);
80+
String fsImplClassName = config.get(fsImplKey);
81+
if (StringUtils.isEmpty(fsImplClassName)) {
82+
throw new LocalizerResourceException(fsImplKey + " does not have configured class implementation");
83+
}
84+
return fsImplClassName;
85+
}
86+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.samza.job.yarn;
20+
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import org.apache.commons.lang.StringUtils;
24+
import org.apache.hadoop.fs.Path;
25+
import org.apache.hadoop.yarn.api.records.LocalResourceType;
26+
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
27+
import org.apache.samza.config.Config;
28+
29+
/**
30+
* LocalizerResourceConfig is intended to manage/fetch the config values
31+
* for the yarn localizer resource(s) from the configuration.
32+
*
33+
* There are 4 config values
34+
* yarn.resources.&lt;resourceName&gt;.path
35+
* (Required) The path for fetching the resource for localization,
36+
* e.g. http://hostname.com/test.
37+
* yarn.resources.&lt;resourceName&gt;.local.name
38+
* (Optional) The local name used for the localized resource.
39+
* If not set, the default one will be the &lt;resourceName&gt; from the config key.
40+
* yarn.resources.&lt;resourceName&gt;.local.type
41+
* (Optional) The value value is a string format of {@link LocalResourceType}:
42+
* ARCHIVE, FILE, PATTERN.
43+
* If not set, the default value is FILE.
44+
* yarn.resources.&lt;resourceName&gt;.local.visibility
45+
* (Optional) The valid value is a string format of {@link LocalResourceVisibility}:
46+
* PUBLIC, PRIVATE, or APPLICATION.
47+
* If not set, the default value is is APPLICATION.
48+
*/
49+
public class LocalizerResourceConfig {
50+
private static final String RESOURCE_PREFIX = "yarn.resources.";
51+
private static final String PATH_SUFFIX = ".path";
52+
private static final String RESOURCE_PATH = "yarn.resources.%s.path";
53+
private static final String RESOURCE_LOCAL_NAME = "yarn.resources.%s.local.name";
54+
private static final String RESOURCE_LOCAL_TYPE = "yarn.resources.%s.local.type";
55+
private static final String RESOURCE_LOCAL_VISIBILITY = "yarn.resources.%s.local.visibility";
56+
private static final String DEFAULT_RESOURCE_LOCAL_TYPE = "FILE";
57+
private static final String DEFAULT_RESOURCE_LOCAL_VISIBILITY = "APPLICATION";
58+
59+
private final Config config;
60+
61+
public LocalizerResourceConfig(final Config config) {
62+
if (null == config) {
63+
throw new IllegalArgumentException("config cannot be null");
64+
}
65+
this.config = config;
66+
}
67+
68+
public List<String> getResourceNames() {
69+
Config subConfig = config.subset(RESOURCE_PREFIX, true);
70+
List<String> resourceNames = new ArrayList<String>();
71+
for (String key : subConfig.keySet()) {
72+
if (key.endsWith(PATH_SUFFIX)) {
73+
resourceNames.add(key.substring(0, key.length() - PATH_SUFFIX.length()));
74+
}
75+
}
76+
return resourceNames;
77+
}
78+
79+
public Path getResourcePath(final String resourceName) {
80+
String pathStr = config.get(String.format(RESOURCE_PATH, resourceName));
81+
if (StringUtils.isEmpty(pathStr)) {
82+
throw new LocalizerResourceException("resource path is required but not defined in config for resource " + resourceName);
83+
}
84+
return new Path(pathStr);
85+
}
86+
87+
public LocalResourceType getResourceLocalType(final String resourceName) {
88+
String typeStr = config.get(String.format(RESOURCE_LOCAL_TYPE, resourceName), DEFAULT_RESOURCE_LOCAL_TYPE);
89+
return LocalResourceType.valueOf(StringUtils.upperCase(typeStr));
90+
}
91+
92+
public LocalResourceVisibility getResourceLocalVisibility(final String resourceName) {
93+
String visibilityStr = config.get(String.format(RESOURCE_LOCAL_VISIBILITY, resourceName), DEFAULT_RESOURCE_LOCAL_VISIBILITY);
94+
return LocalResourceVisibility.valueOf(StringUtils.upperCase(visibilityStr));
95+
}
96+
97+
public String getResourceLocalName(final String resourceName) {
98+
String name = config.get(String.format(RESOURCE_LOCAL_NAME, resourceName), resourceName);
99+
return name;
100+
}
101+
102+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.samza.job.yarn;
20+
21+
public class LocalizerResourceException extends RuntimeException {
22+
23+
/**
24+
* Constructs an {@code LocalizerResourceException} with {@code null}
25+
* as its error detail message.
26+
*/
27+
public LocalizerResourceException() {
28+
super();
29+
}
30+
31+
/**
32+
* Constructs an {@code LocalizerResourceException} with the specified detail message.
33+
*
34+
* @param message
35+
* The detail message (which is saved for later retrieval
36+
* by the {@link #getMessage()} method)
37+
*/
38+
public LocalizerResourceException(String message) {
39+
super(message);
40+
}
41+
42+
/**
43+
* Constructs an {@code LocalizerResourceException} with the specified detail message
44+
* and cause.
45+
*
46+
* @param message
47+
* The detail message (which is saved for later retrieval
48+
* by the {@link #getMessage()} method)
49+
*
50+
* @param cause
51+
* The cause (which is saved for later retrieval by the
52+
* {@link #getCause()} method). (A null value is permitted,
53+
* and indicates that the cause is nonexistent or unknown.)
54+
*/
55+
public LocalizerResourceException(String message, Throwable cause) {
56+
super(message, cause);
57+
}
58+
59+
/**
60+
* Constructs an {@code LocalizerResourceException} with the specified cause and a
61+
* detail message of {@code (cause==null ? null : cause.toString())}
62+
*
63+
* @param cause
64+
* The cause (which is saved for later retrieval by the
65+
* {@link #getCause()} method). (A null value is permitted,
66+
* and indicates that the cause is nonexistent or unknown.)
67+
*/
68+
public LocalizerResourceException(Throwable cause) {
69+
super(cause);
70+
}
71+
72+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.samza.job.yarn;
20+
21+
import com.google.common.collect.ImmutableMap;
22+
import java.io.IOException;
23+
import java.util.List;
24+
import java.util.Map;
25+
import org.apache.hadoop.fs.FileStatus;
26+
import org.apache.hadoop.fs.Path;
27+
import org.apache.hadoop.yarn.api.records.LocalResource;
28+
import org.apache.hadoop.yarn.api.records.LocalResourceType;
29+
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
30+
import org.apache.hadoop.yarn.api.records.URL;
31+
import org.apache.hadoop.yarn.conf.YarnConfiguration;
32+
import org.apache.hadoop.yarn.util.ConverterUtils;
33+
import org.apache.hadoop.yarn.util.Records;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
37+
38+
/**
39+
* A universal approach to generate local resource map which can be put in ContainerLaunchContext directly
40+
*/
41+
public class LocalizerResourceMapper {
42+
private static final Logger log = LoggerFactory.getLogger(LocalizerResourceMapper.class);
43+
44+
private final YarnConfiguration yarnConfiguration; //yarn configurations
45+
private final LocalizerResourceConfig resourceConfig;
46+
private final Map<String, LocalResource> localResourceMap;
47+
48+
public LocalizerResourceMapper(LocalizerResourceConfig resourceConfig, YarnConfiguration yarnConfiguration) {
49+
this.yarnConfiguration = yarnConfiguration;
50+
this.resourceConfig = resourceConfig;
51+
this.localResourceMap = buildResourceMapping();
52+
}
53+
54+
private Map<String, LocalResource> buildResourceMapping() {
55+
ImmutableMap.Builder<String, LocalResource> localResourceMapBuilder = ImmutableMap.builder();
56+
57+
List<String> resourceNames = resourceConfig.getResourceNames();
58+
for (String resourceName : resourceNames) {
59+
String resourceLocalName = resourceConfig.getResourceLocalName(resourceName);
60+
LocalResourceType resourceType = resourceConfig.getResourceLocalType(resourceName);
61+
LocalResourceVisibility resourceVisibility = resourceConfig.getResourceLocalVisibility(resourceName);
62+
Path resourcePath = resourceConfig.getResourcePath(resourceName);
63+
64+
LocalResource localResource = createLocalResource(resourcePath, resourceType, resourceVisibility);
65+
66+
localResourceMapBuilder.put(resourceLocalName, localResource);
67+
log.info("preparing local resource: {}", resourceLocalName);
68+
}
69+
70+
return localResourceMapBuilder.build();
71+
}
72+
73+
private LocalResource createLocalResource(Path resourcePath, LocalResourceType resourceType, LocalResourceVisibility resourceVisibility) {
74+
LocalResource localResource = Records.newRecord(LocalResource.class);
75+
URL resourceUrl = ConverterUtils.getYarnUrlFromPath(resourcePath);
76+
try {
77+
FileStatus resourceFileStatus = resourcePath.getFileSystem(yarnConfiguration).getFileStatus(resourcePath);
78+
79+
if (null == resourceFileStatus) {
80+
throw new LocalizerResourceException("Check getFileStatus implementation. getFileStatus gets unexpected null for resourcePath " + resourcePath);
81+
}
82+
83+
localResource.setResource(resourceUrl);
84+
log.info("setLocalizerResource for {}", resourceUrl);
85+
localResource.setSize(resourceFileStatus.getLen());
86+
localResource.setTimestamp(resourceFileStatus.getModificationTime());
87+
localResource.setType(resourceType);
88+
localResource.setVisibility(resourceVisibility);
89+
return localResource;
90+
} catch (IOException ioe) {
91+
log.error("IO Exception when accessing the resource file status from the filesystem: " + resourcePath, ioe);
92+
throw new LocalizerResourceException("IO Exception when accessing the resource file status from the filesystem: " + resourcePath);
93+
}
94+
95+
}
96+
97+
public Map<String, LocalResource> getResourceMap() {
98+
return ImmutableMap.copyOf(localResourceMap);
99+
}
100+
101+
}

samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,10 @@
3636
import org.apache.samza.config.YarnConfig;
3737
import org.apache.samza.coordinator.JobModelManager;
3838
import org.apache.samza.job.CommandBuilder;
39-
import org.apache.samza.job.yarn.YarnContainer;
4039
import org.apache.samza.metrics.MetricsRegistryMap;
4140
import org.apache.samza.util.hadoop.HttpFileSystem;
4241
import org.slf4j.Logger;
4342
import org.slf4j.LoggerFactory;
44-
4543
import java.io.IOException;
4644
import java.util.ArrayList;
4745
import java.util.List;
@@ -120,6 +118,12 @@ public YarnClusterResourceManager(Config config, JobModelManager jobModelManager
120118
hConfig = new YarnConfiguration();
121119
hConfig.set("fs.http.impl", HttpFileSystem.class.getName());
122120

121+
// Use the Samza job config "fs.<scheme>.impl" to override YarnConfiguration
122+
FileSystemImplConfig fsImplConfig = new FileSystemImplConfig(config);
123+
fsImplConfig.getSchemes().forEach(
124+
scheme -> hConfig.set(fsImplConfig.getFsImplKey(scheme), fsImplConfig.getFsImplClassName(scheme))
125+
);
126+
123127
MetricsRegistryMap registry = new MetricsRegistryMap();
124128
metrics = new SamzaAppMasterMetrics(config, samzaAppState, registry);
125129

0 commit comments

Comments
 (0)