-
Notifications
You must be signed in to change notification settings - Fork 424
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TEZ-4514: Reduce Some FileSystem Calls. #309
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
package org.apache.tez.client; | ||
|
||
import java.io.File; | ||
import java.io.FileNotFoundException; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.net.InetSocketAddress; | ||
|
@@ -44,6 +45,8 @@ | |
import com.google.common.base.Strings; | ||
import org.apache.commons.codec.digest.DigestUtils; | ||
import org.apache.commons.lang.StringUtils; | ||
import org.apache.hadoop.fs.LocatedFileStatus; | ||
import org.apache.hadoop.fs.RemoteIterator; | ||
import org.apache.tez.common.JavaOptsChecker; | ||
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; | ||
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; | ||
|
@@ -118,34 +121,28 @@ | |
@Private | ||
public final class TezClientUtils { | ||
|
||
private static Logger LOG = LoggerFactory.getLogger(TezClientUtils.class); | ||
private static final Logger LOG = LoggerFactory.getLogger(TezClientUtils.class); | ||
private static final int UTF8_CHUNK_SIZE = 16 * 1024; | ||
|
||
private TezClientUtils() {} | ||
|
||
private static FileStatus[] getLRFileStatus(String fileName, Configuration conf) throws | ||
IOException { | ||
URI uri; | ||
private static RemoteIterator<LocatedFileStatus> getListFilesFileStatus(String configUri, Configuration conf) | ||
throws IOException { | ||
Path p = getPath(configUri); | ||
FileSystem fs = p.getFileSystem(conf); | ||
p = fs.resolvePath(p.makeQualified(fs.getUri(), fs.getWorkingDirectory())); | ||
FileSystem targetFS = p.getFileSystem(conf); | ||
return targetFS.listFiles(p, false); | ||
} | ||
|
||
private static Path getPath(String configUri) { | ||
try { | ||
uri = new URI(fileName); | ||
return new Path(new URI(configUri)); | ||
} catch (URISyntaxException e) { | ||
String message = "Invalid URI defined in configuration for" | ||
+ " location of TEZ jars. providedURI=" + fileName; | ||
String message = "Invalid URI defined in configuration for" + " location of TEZ jars. providedURI=" + configUri; | ||
LOG.error(message); | ||
throw new TezUncheckedException(message, e); | ||
} | ||
|
||
Path p = new Path(uri); | ||
FileSystem fs = p.getFileSystem(conf); | ||
p = fs.resolvePath(p.makeQualified(fs.getUri(), | ||
fs.getWorkingDirectory())); | ||
FileSystem targetFS = p.getFileSystem(conf); | ||
if (targetFS.isDirectory(p)) { | ||
return targetFS.listStatus(p); | ||
} else { | ||
FileStatus fStatus = targetFS.getFileStatus(p); | ||
return new FileStatus[]{fStatus}; | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -233,15 +230,11 @@ private static boolean addLocalResources(Configuration conf, | |
} else { | ||
type = LocalResourceType.FILE; | ||
} | ||
RemoteIterator<LocatedFileStatus> fileStatuses = getListFilesFileStatus(configUri, conf); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. getListFilesFileStatus receives a "String fileName" param, and here we pass a "configUri", can you unify and use whatever is closer to the truth? also I can see that getListFilesFileStatus creates an URI eventually, we can pass it here, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done thing, the name is URI, but it isn't a URI object but string, it is extracted from a conf which has name URI, so kept the name old as configURI |
||
|
||
FileStatus [] fileStatuses = getLRFileStatus(configUri, conf); | ||
|
||
for (FileStatus fStatus : fileStatuses) { | ||
while (fileStatuses.hasNext()) { | ||
LocatedFileStatus fStatus = fileStatuses.next(); | ||
String linkName; | ||
if (fStatus.isDirectory()) { | ||
// Skip directories - no recursive search support. | ||
continue; | ||
} | ||
// If the resource is an archive, we've already done this work | ||
if(type != LocalResourceType.ARCHIVE) { | ||
u = fStatus.getPath().toUri(); | ||
|
@@ -250,8 +243,7 @@ private static boolean addLocalResources(Configuration conf, | |
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), | ||
remoteFS.getWorkingDirectory())); | ||
if(null != u.getFragment()) { | ||
LOG.warn("Fragment set for link being interpreted as a file," + | ||
"URI: " + u.toString()); | ||
LOG.warn("Fragment set for link being interpreted as a file, URI: {}", u); | ||
} | ||
} | ||
|
||
|
@@ -336,8 +328,13 @@ public static FileSystem ensureStagingDirExists(Configuration conf, | |
UserGroupInformation ugi = UserGroupInformation.getLoginUser(); | ||
realUser = ugi.getShortUserName(); | ||
currentUser = UserGroupInformation.getCurrentUser().getShortUserName(); | ||
if (fs.exists(stagingArea)) { | ||
FileStatus fsStatus = fs.getFileStatus(stagingArea); | ||
FileStatus fsStatus = null; | ||
try { | ||
fsStatus = fs.getFileStatus(stagingArea); | ||
} catch (FileNotFoundException fnf) { | ||
// Ignore | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what about returning if
and having the rest of the method unindented There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can't return, there is an else block below if fsStatus is null
|
||
} | ||
if (fsStatus != null) { | ||
String owner = fsStatus.getOwner(); | ||
if (!(owner.equals(currentUser) || owner.equals(realUser))) { | ||
throw new IOException("The ownership on the staging directory " | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
package org.apache.tez.dag.app; | ||
|
||
import java.io.EOFException; | ||
import java.io.FileNotFoundException; | ||
import java.io.IOException; | ||
import java.net.URL; | ||
import java.util.ArrayList; | ||
|
@@ -383,18 +384,18 @@ public static List<HistoryEvent> readRecoveryEvents(TezConfiguration tezConf, Ap | |
new Path(currentAttemptRecoveryDataDir, appId.toString().replace( | ||
"application", "dag") | ||
+ "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX); | ||
if (fs.exists(recoveryFilePath)) { | ||
LOG.info("Read recovery file:" + recoveryFilePath); | ||
FSDataInputStream in = null; | ||
try { | ||
in = fs.open(recoveryFilePath); | ||
historyEvents.addAll(RecoveryParser.parseDAGRecoveryFile(in)); | ||
} catch (IOException e) { | ||
throw e; | ||
} finally { | ||
if (in != null) { | ||
in.close(); | ||
} | ||
LOG.info("Read recovery file:" + recoveryFilePath); | ||
FSDataInputStream in = null; | ||
try { | ||
in = fs.open(recoveryFilePath); | ||
historyEvents.addAll(RecoveryParser.parseDAGRecoveryFile(in)); | ||
} catch (FileNotFoundException fnf) { | ||
// Ignore, the file doesn't exist | ||
} catch (IOException e) { | ||
throw e; | ||
} finally { | ||
if (in != null) { | ||
in.close(); | ||
} | ||
} | ||
} | ||
|
@@ -429,12 +430,13 @@ private Path getSummaryPath(Path attemptRecoveryDataDir) { | |
return TezCommonUtils.getSummaryRecoveryPath(attemptRecoveryDataDir); | ||
} | ||
|
||
private FSDataInputStream getSummaryStream(Path summaryPath) | ||
throws IOException { | ||
if (!recoveryFS.exists(summaryPath)) { | ||
private FSDataInputStream getSummaryStream(Path summaryPath) throws IOException { | ||
try { | ||
return recoveryFS.open(summaryPath, recoveryBufferSize); | ||
} catch (FileNotFoundException fnf) { | ||
return null; | ||
|
||
} | ||
return recoveryFS.open(summaryPath, recoveryBufferSize); | ||
} | ||
|
||
private Path getDAGRecoveryFilePath(Path recoveryDataDir, | ||
|
@@ -741,12 +743,14 @@ public DAGRecoveryData parseRecoveryData() throws IOException { | |
+ lastRecoveryFile); | ||
break; | ||
} | ||
FileStatus fileStatus = recoveryFS.getFileStatus(dagRecoveryFile); | ||
lastRecoveryFile = dagRecoveryFile; | ||
LOG.info("Trying to recover dag from recovery file" | ||
+ ", dagId=" + lastInProgressDAG.toString() | ||
+ ", dagRecoveryFile=" + dagRecoveryFile | ||
+ ", len=" + fileStatus.getLen()); | ||
LOG.info("Trying to recover dag from recovery file, dagId={}, dagRecoveryFile={}", lastInProgressDAG, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed fileStatus.getLen() from the log message, is it intentional? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, it was shooting an RPC just for log and file length, so removed it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we about leaving the useful info on DEBUG level, but in that case, we can log the full FileStatus, like
|
||
dagRecoveryFile); | ||
if (LOG.isDebugEnabled()) { | ||
FileStatus fileStatus = recoveryFS.getFileStatus(dagRecoveryFile); | ||
LOG.debug("Recovery file details: {}", fileStatus); | ||
} | ||
|
||
FSDataInputStream dagRecoveryStream = recoveryFS.open(dagRecoveryFile, recoveryBufferSize); | ||
CodedInputStream codedInputStream = CodedInputStream.newInstance(dagRecoveryStream); | ||
codedInputStream.setSizeLimit(Integer.MAX_VALUE); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as far as I can understand, this single listFiles call can be used instead of a "directory or file" check, making this method simpler, looks good