Skip to content

Commit

Permalink
MAPREDUCE-7225: Fix broken current folder expansion during MR job sta…
Browse files Browse the repository at this point in the history
…rt. Contributed by Peter Bacsko.
  • Loading branch information
szilard-nemeth committed Aug 1, 2019
1 parent 89b102f commit a7371a7
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
class JobResourceUploader {
protected static final Logger LOG =
LoggerFactory.getLogger(JobResourceUploader.class);
private static final String ROOT_PATH = "/";

private final boolean useWildcard;
private final FileSystem jtFs;
private SharedCacheClient scClient = null;
Expand Down Expand Up @@ -674,9 +676,30 @@ Path copyRemoteFiles(Path parentDir, Path originalPath,
if (FileUtil.compareFs(remoteFs, jtFs)) {
return originalPath;
}

boolean root = false;
if (ROOT_PATH.equals(originalPath.toUri().getPath())) {
// "/" needs special treatment
root = true;
} else {
// If originalPath ends in a "/", then remove it so
// that originalPath.getName() does not return an empty string
String uriString = originalPath.toUri().toString();
if (uriString.endsWith("/")) {
try {
URI strippedURI =
new URI(uriString.substring(0, uriString.length() - 1));
originalPath = new Path(strippedURI);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Error processing URI", e);
}
}
}

// this might have name collisions. copy will throw an exception
// parse the original path to create new path
Path newPath = new Path(parentDir, originalPath.getName());
Path newPath = root ?
parentDir : new Path(parentDir, originalPath.getName());
FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, conf);
jtFs.setReplication(newPath, replication);
jtFs.makeQualified(newPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@

package org.apache.hadoop.mapreduce;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.spy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -46,8 +49,10 @@
import org.apache.hadoop.mapred.JobConf;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.verification.VerificationMode;


/**
* A class for unit testing JobResourceUploader.
*/
Expand Down Expand Up @@ -375,6 +380,50 @@ public void testErasureCodingDisabled() throws IOException {
testErasureCodingSetting(false);
}

@Test
public void testOriginalPathEndsInSlash()
throws IOException, URISyntaxException {
testOriginalPathWithTrailingSlash(
new Path(new URI("file:/local/mapred/test/")),
new Path("hdfs://localhost:1234/home/hadoop/test/"));
}

@Test
public void testOriginalPathIsRoot() throws IOException, URISyntaxException {
testOriginalPathWithTrailingSlash(
new Path(new URI("file:/")),
new Path("hdfs://localhost:1234/home/hadoop/"));
}

private void testOriginalPathWithTrailingSlash(Path path,
Path expectedRemotePath) throws IOException, URISyntaxException {
Path dstPath = new Path("hdfs://localhost:1234/home/hadoop/");
DistributedFileSystem fs = mock(DistributedFileSystem.class);
// make sure that FileUtils.copy() doesn't try to copy anything
when(fs.mkdirs(any(Path.class))).thenReturn(false);
when(fs.getUri()).thenReturn(dstPath.toUri());

JobResourceUploader uploader = new StubedUploader(fs, true, true);
JobConf jConf = new JobConf();
Path originalPath = spy(path);
FileSystem localFs = mock(FileSystem.class);
FileStatus fileStatus = mock(FileStatus.class);
when(localFs.getFileStatus(any(Path.class))).thenReturn(fileStatus);
when(fileStatus.isDirectory()).thenReturn(true);
when(fileStatus.getPath()).thenReturn(originalPath);

doReturn(localFs).when(originalPath)
.getFileSystem(any(Configuration.class));
when(localFs.getUri()).thenReturn(path.toUri());

uploader.copyRemoteFiles(dstPath,
originalPath, jConf, (short) 1);

ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
verify(fs).makeQualified(pathCaptor.capture());
Assert.assertEquals("Path", expectedRemotePath, pathCaptor.getValue());
}

private void testErasureCodingSetting(boolean defaultBehavior)
throws IOException {
JobConf jConf = new JobConf();
Expand All @@ -387,7 +436,7 @@ private void testErasureCodingSetting(boolean defaultBehavior)
DistributedFileSystem fs = mock(DistributedFileSystem.class);
Path path = new Path("/");
when(fs.makeQualified(any(Path.class))).thenReturn(path);
JobResourceUploader uploader = new StubedUploader(fs, true);
JobResourceUploader uploader = new StubedUploader(fs, true, false);
Job job = Job.getInstance(jConf);

uploader.uploadResources(job, new Path("/test"));
Expand Down Expand Up @@ -728,6 +777,8 @@ private String buildPathStringSub(String pathPrefix, String processedPath,
}

private class StubedUploader extends JobResourceUploader {
private boolean callOriginalCopy = false;

StubedUploader(JobConf conf) throws IOException {
this(conf, false);
}
Expand All @@ -736,8 +787,10 @@ private class StubedUploader extends JobResourceUploader {
super(FileSystem.getLocal(conf), useWildcard);
}

StubedUploader(FileSystem fs, boolean useWildcard) throws IOException {
StubedUploader(FileSystem fs, boolean useWildcard,
boolean callOriginalCopy) throws IOException {
super(fs, useWildcard);
this.callOriginalCopy = callOriginalCopy;
}

@Override
Expand All @@ -757,7 +810,12 @@ boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
@Override
Path copyRemoteFiles(Path parentDir, Path originalPath, Configuration conf,
short replication) throws IOException {
return new Path(destinationPathPrefix + originalPath.getName());
if (callOriginalCopy) {
return super.copyRemoteFiles(
parentDir, originalPath, conf, replication);
} else {
return new Path(destinationPathPrefix + originalPath.getName());
}
}

@Override
Expand Down

0 comments on commit a7371a7

Please sign in to comment.