Skip to content

Commit e5e9139

Browse files
JohnZZGithubliuml07
authored andcommitted
MAPREDUCE-7294. Only application master should upload resource to Yarn Shared Cache (#2223)
Contributed by Zhenzhao Wang <zhenzhaowang@gmail.com> Signed-off-by: Mingliang Liu <liuml07@apache.org>
1 parent 75bc54a commit e5e9139

File tree

3 files changed

+39
-20
lines changed
  • hadoop-mapreduce-project/hadoop-mapreduce-client
    • hadoop-mapreduce-client-app/src
    • hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce

3 files changed

+39
-20
lines changed

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1423,7 +1423,8 @@ public static String escapeString(String data) {
14231423
* be set up to false. In that way, the NMs that host the task containers
14241424
* won't try to upload the resources to shared cache.
14251425
*/
1426-
private static void cleanupSharedCacheUploadPolicies(Configuration conf) {
1426+
@VisibleForTesting
1427+
static void cleanupSharedCacheUploadPolicies(Configuration conf) {
14271428
Job.setArchiveSharedCacheUploadPolicies(conf, Collections.emptyMap());
14281429
Job.setFileSharedCacheUploadPolicies(conf, Collections.emptyMap());
14291430
}

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939

4040
import org.apache.commons.io.FileUtils;
4141
import org.apache.hadoop.conf.Configuration;
42+
import org.apache.hadoop.mapreduce.Job;
4243
import org.apache.hadoop.mapreduce.JobACL;
4344
import org.apache.hadoop.mapreduce.JobContext;
4445
import org.apache.hadoop.mapreduce.JobID;
@@ -991,6 +992,28 @@ public void testJobPriorityUpdate() throws Exception {
991992
Assert.assertEquals(updatedPriority, jobPriority);
992993
}
993994

995+
@Test
996+
public void testCleanupSharedCacheUploadPolicies() {
997+
Configuration config = new Configuration();
998+
Map<String, Boolean> archivePolicies = new HashMap<>();
999+
archivePolicies.put("archive1", true);
1000+
archivePolicies.put("archive2", true);
1001+
Job.setArchiveSharedCacheUploadPolicies(config, archivePolicies);
1002+
Map<String, Boolean> filePolicies = new HashMap<>();
1003+
filePolicies.put("file1", true);
1004+
filePolicies.put("jar1", true);
1005+
Job.setFileSharedCacheUploadPolicies(config, filePolicies);
1006+
Assert.assertEquals(
1007+
2, Job.getArchiveSharedCacheUploadPolicies(config).size());
1008+
Assert.assertEquals(
1009+
2, Job.getFileSharedCacheUploadPolicies(config).size());
1010+
JobImpl.cleanupSharedCacheUploadPolicies(config);
1011+
Assert.assertEquals(
1012+
0, Job.getArchiveSharedCacheUploadPolicies(config).size());
1013+
Assert.assertEquals(
1014+
0, Job.getFileSharedCacheUploadPolicies(config).size());
1015+
}
1016+
9941017
private static CommitterEventHandler createCommitterEventHandler(
9951018
Dispatcher dispatcher, OutputCommitter committer) {
9961019
final SystemClock clock = SystemClock.getInstance();

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1450,26 +1450,21 @@ public static void setArchiveSharedCacheUploadPolicies(Configuration conf,
14501450
*/
14511451
private static void setSharedCacheUploadPolicies(Configuration conf,
14521452
Map<String, Boolean> policies, boolean areFiles) {
1453-
if (policies != null) {
1454-
StringBuilder sb = new StringBuilder();
1455-
Iterator<Map.Entry<String, Boolean>> it = policies.entrySet().iterator();
1456-
Map.Entry<String, Boolean> e;
1457-
if (it.hasNext()) {
1458-
e = it.next();
1459-
sb.append(e.getKey() + DELIM + e.getValue());
1460-
} else {
1461-
// policies is an empty map, just skip setting the parameter
1462-
return;
1463-
}
1464-
while (it.hasNext()) {
1465-
e = it.next();
1466-
sb.append("," + e.getKey() + DELIM + e.getValue());
1467-
}
1468-
String confParam =
1469-
areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES
1470-
: MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
1471-
conf.set(confParam, sb.toString());
1453+
String confParam = areFiles ?
1454+
MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES :
1455+
MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
1456+
// If no policy is provided, we will reset the config by setting an empty
1457+
// string value. In other words, cleaning up existing policies. This is
1458+
// useful when we try to clean up shared cache upload policies for
1459+
// non-application master tasks. See MAPREDUCE-7294 for details.
1460+
if (policies == null || policies.size() == 0) {
1461+
conf.set(confParam, "");
1462+
return;
14721463
}
1464+
StringBuilder sb = new StringBuilder();
1465+
policies.forEach((k,v) -> sb.append(k).append(DELIM).append(v).append(","));
1466+
sb.deleteCharAt(sb.length() - 1);
1467+
conf.set(confParam, sb.toString());
14731468
}
14741469

14751470
/**

0 commit comments

Comments
 (0)