Skip to content

Commit

Permalink
Enable Python profiling.
Browse files Browse the repository at this point in the history
  • Loading branch information
cmnbroad committed Jul 30, 2018
1 parent b6a630a commit d8414db
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ public class CNNScoreVariants extends VariantWalker {
@Argument(fullName = "keep-temp-file", shortName = "keep-temp-file", doc = "Keep the temporary file that python writes scores to.", optional = true)
private boolean keepTempFile = false;

@Hidden
@Argument(fullName = "python-profile", shortName = "python-profile", doc = "Run the tool with the Python CProfiler on and write results to this file.", optional = true)
private File pythonProfileResults;

// Create the Python executor. This doesn't actually start the Python process, but verifies that
// the requestedPython executable exists and can be located.
final StreamingPythonScriptExecutor<String> pythonExecutor = new StreamingPythonScriptExecutor<>(true);
Expand Down Expand Up @@ -230,7 +234,8 @@ public void onTraversalStart() {
}

// Start the Python process and initialize a stream writer for streaming data to the Python code
pythonExecutor.start(Collections.emptyList(), enableJournal);
pythonExecutor.start(Collections.emptyList(), enableJournal, pythonProfileResults);

pythonExecutor.initStreamWriter(AsynchronousStreamWriter.stringSerializer);
batchList = new ArrayList<>(transferBatchSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,18 @@ public class StreamingPythonScriptExecutor<T> extends PythonExecutorBase {
private FileOutputStream dataTransferFIFOWriter;
private AsynchronousStreamWriter<T> asyncWriter;

private File profileResults;

// Python commands that are executed in the companion python process. The functions called
// here live in the {@code tool} module in {@code gatktool} Python package.
private final static String PYTHON_IMPORT_GATK = "from gatktool import tool" + NL;
private final static String PYTHON_INITIALIZE_GATK = "tool.initializeGATK('%s')" + NL;
private final static String PYTHON_START_PROFILING = "tool.startProfiling()" + NL;
private final static String PYTHON_TERMINATE_GATK = "tool.terminateGATK()" + NL;
private final static String PYTHON_INITIALIZE_DATA_FIFO = "tool.initializeDataFIFO('%s')" + NL;
private final static String PYTHON_CLOSE_DATA_FIFO = "tool.closeDataFIFO()" + NL;
private final static String PYTHON_SEND_ACK_REQUEST = "tool.sendAck()" + NL;
private final static String PYTHON_END_PROFILING = "tool.endProfiling('%s')" + NL;

// keep track of when an ack request has been made and reject attempts to send another ack
// request until the previous one has been handled
Expand Down Expand Up @@ -93,7 +97,7 @@ public StreamingPythonScriptExecutor(final PythonExecutableName pythonExecutable
* @return true if the process is successfully started
*/
public boolean start(final List<String> pythonProcessArgs) {
return start(pythonProcessArgs, false);
return start(pythonProcessArgs, false, null);
}

/**
Expand All @@ -104,7 +108,8 @@ public boolean start(final List<String> pythonProcessArgs) {
* expensive and should only be used for debugging purposes.
* @return true if the process is successfully started
*/
public boolean start(final List<String> pythonProcessArgs, final boolean enableJournaling) {
public boolean start(final List<String> pythonProcessArgs, final boolean enableJournaling, final File profileResults) {
this.profileResults = profileResults;
final List<String> args = new ArrayList<>();
args.add(externalScriptExecutableName);
args.add("-u");
Expand Down Expand Up @@ -291,6 +296,11 @@ protected Process getProcess() {
* Terminate the remote process, closing the fifo if any.
*/
public void terminate() {
if (profileResults != null) {
spController.writeProcessInput(String.format(PYTHON_END_PROFILING, profileResults.getAbsolutePath()));
sendAckRequest();
waitForAck();
}
if (dataTransferFIFOWriter != null) {
if (asyncWriter != null) {
Assert.assertTrue(asyncWriter.terminate());
Expand Down Expand Up @@ -336,6 +346,11 @@ private void initializeTool(final File ackFIFOFile) {
// wait for the ack to be sent
spController.openAckFIFOForRead();
waitForAck();
if (profileResults != null) {
spController.writeProcessInput(PYTHON_START_PROFILING);
sendAckRequest();
waitForAck();
}
}

private void sendAckRequest() {
Expand Down
28 changes: 28 additions & 0 deletions src/main/python/org/broadinstitute/hellbender/gatktool/tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

import sys
import os
import cProfile, pstats, io

_ackFIFO = None
_dataFIFO = None
_GATKProfiler = None

def initializeGATK(ackFIFOName: str):
"""
Expand Down Expand Up @@ -107,6 +109,32 @@ def readDataFIFO() -> str:
global _dataFIFO
return _dataFIFO.readLine()

def startProfiling():
"""
Start Python CProfile profiling.
"""
global _GATKProfiler
_GATKProfiler = cProfile.Profile()
_GATKProfiler.enable()

def endProfiling(profileName: str):
"""
End Python CProfile profiling and write results to a file. The
startProfile function must have been previously called. The results
are ordered by cummulative time.
:param profileName: name of the file to which the profiling results should be written.
"""
global _GATKProfiler
_GATKProfiler.disable()
gatkProfilerDescriptor = os.open(profileName, os.O_WRONLY | os.O_CREAT)
gatkProfileStream = os.fdopen(gatkProfilerDescriptor, 'w')
gatkStats = pstats.Stats(_GATKProfiler, stream=gatkProfileStream).sort_stats('cumulative')
gatkStats.print_stats()
gatkProfileStream.close()
del gatkProfileStream
del gatkProfilerDescriptor
del gatkStats


class AckFIFO:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,34 @@ public void testAsyncWriteService(final PythonScriptExecutor.PythonExecutableNam
}
}

@Test(groups = "python", dataProvider="supportedPythonVersions")
public void testEnablePythonProfiling(final PythonScriptExecutor.PythonExecutableName executableName) throws IOException {
// create a temporary output file for the profile results
final File profileFile = createTempFile("pythonProfileTest", "txt");
final String CONSUME_SOME_CPU_SCRIPT = "list(2 * n for n in range(100))" + NL;

final StreamingPythonScriptExecutor<String> streamingPythonExecutor =
new StreamingPythonScriptExecutor<>(executableName,true);
Assert.assertNotNull(streamingPythonExecutor);

Assert.assertTrue(streamingPythonExecutor.start(Collections.emptyList(), false, profileFile));

try {
streamingPythonExecutor.sendSynchronousCommand(CONSUME_SOME_CPU_SCRIPT);
}
finally {
streamingPythonExecutor.terminate();
Assert.assertFalse(streamingPythonExecutor.getProcess().isAlive());
}
// read the temp file in and validate
try (final FileInputStream fis= new FileInputStream(profileFile);
final BufferedLineReader br = new BufferedLineReader(fis)) {
final String profileLine1 = br.readLine();
Assert.assertNotNull(profileLine1);
Assert.assertTrue(profileLine1.length() != 0);
}
}

@Test(groups = "python", dataProvider="supportedPythonVersions", dependsOnMethods = "testPythonExists",
expectedExceptions = PythonScriptExecutorException.class)
public void testRaisePythonException(final PythonScriptExecutor.PythonExecutableName executableName) {
Expand All @@ -234,7 +262,7 @@ private void executeBadPythonCode(final PythonScriptExecutor.PythonExecutableNam
final StreamingPythonScriptExecutor<String> streamingPythonExecutor =
new StreamingPythonScriptExecutor<>(executableName,true);
Assert.assertNotNull(streamingPythonExecutor);
Assert.assertTrue(streamingPythonExecutor.start(Collections.emptyList(), true));
Assert.assertTrue(streamingPythonExecutor.start(Collections.emptyList(), true, null));

try {
streamingPythonExecutor.sendSynchronousCommand(errorCommand + NL);
Expand Down

0 comments on commit d8414db

Please sign in to comment.