From 00cb1a2468eee13edd2f20610e9d027ec3cad51d Mon Sep 17 00:00:00 2001 From: Chris Norman Date: Wed, 20 Jun 2018 17:05:44 -0400 Subject: [PATCH] Enable Python profiling. --- .../tools/walkers/vqsr/CNNScoreVariants.java | 7 ++++- .../python/StreamingPythonScriptExecutor.java | 19 ++++++++++-- .../hellbender/gatktool/tool.py | 28 +++++++++++++++++ ...StreamingPythonScriptExecutorUnitTest.java | 30 ++++++++++++++++++- 4 files changed, 80 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/broadinstitute/hellbender/tools/walkers/vqsr/CNNScoreVariants.java b/src/main/java/org/broadinstitute/hellbender/tools/walkers/vqsr/CNNScoreVariants.java index 366c906dc9d..b4641c6fb5c 100644 --- a/src/main/java/org/broadinstitute/hellbender/tools/walkers/vqsr/CNNScoreVariants.java +++ b/src/main/java/org/broadinstitute/hellbender/tools/walkers/vqsr/CNNScoreVariants.java @@ -166,6 +166,10 @@ public class CNNScoreVariants extends VariantWalker { @Argument(fullName = "keep-temp-file", shortName = "keep-temp-file", doc = "Keep the temporary file that python writes scores to.", optional = true) private boolean keepTempFile = false; + @Hidden + @Argument(fullName = "python-profile", shortName = "python-profile", doc = "Run the tool with the Python CProfiler on and write results to this file.", optional = true) + private File pythonProfileResults; + // Create the Python executor. This doesn't actually start the Python process, but verifies that // the requestedPython executable exists and can be located. final StreamingPythonScriptExecutor pythonExecutor = new StreamingPythonScriptExecutor<>(true); @@ -230,7 +234,8 @@ public void onTraversalStart() { } // Start the Python process and initialize a stream writer for streaming data to the Python code - pythonExecutor.start(Collections.emptyList(), enableJournal); + pythonExecutor.start(Collections.emptyList(), enableJournal, pythonProfileResults); + pythonExecutor.initStreamWriter(AsynchronousStreamWriter.stringSerializer); batchList = new ArrayList<>(transferBatchSize); diff --git a/src/main/java/org/broadinstitute/hellbender/utils/python/StreamingPythonScriptExecutor.java b/src/main/java/org/broadinstitute/hellbender/utils/python/StreamingPythonScriptExecutor.java index f67b8332483..ab33a9dbb3c 100644 --- a/src/main/java/org/broadinstitute/hellbender/utils/python/StreamingPythonScriptExecutor.java +++ b/src/main/java/org/broadinstitute/hellbender/utils/python/StreamingPythonScriptExecutor.java @@ -54,14 +54,18 @@ public class StreamingPythonScriptExecutor extends PythonExecutorBase { private FileOutputStream dataTransferFIFOWriter; private AsynchronousStreamWriter asyncWriter; + private File profileResults; + // Python commands that are executed in the companion python process. The functions called // here live in the {@code tool} module in {@code gatktool} Python package. private final static String PYTHON_IMPORT_GATK = "from gatktool import tool" + NL; private final static String PYTHON_INITIALIZE_GATK = "tool.initializeGATK('%s')" + NL; + private final static String PYTHON_START_PROFILING = "tool.startProfiling()" + NL; private final static String PYTHON_TERMINATE_GATK = "tool.terminateGATK()" + NL; private final static String PYTHON_INITIALIZE_DATA_FIFO = "tool.initializeDataFIFO('%s')" + NL; private final static String PYTHON_CLOSE_DATA_FIFO = "tool.closeDataFIFO()" + NL; private final static String PYTHON_SEND_ACK_REQUEST = "tool.sendAck()" + NL; + private final static String PYTHON_END_PROFILING = "tool.endProfiling('%s')" + NL; // keep track of when an ack request has been made and reject attempts to send another ack // request until the previous one has been handled @@ -93,7 +97,7 @@ public StreamingPythonScriptExecutor(final PythonExecutableName pythonExecutable * @return true if the process is successfully started */ public boolean start(final List pythonProcessArgs) { - return start(pythonProcessArgs, false); + return start(pythonProcessArgs, false, null); } /** @@ -104,7 +108,8 @@ public boolean start(final List pythonProcessArgs) { * expensive and should only be used for debugging purposes. * @return true if the process is successfully started */ - public boolean start(final List pythonProcessArgs, final boolean enableJournaling) { + public boolean start(final List pythonProcessArgs, final boolean enableJournaling, final File profileResults) { + this.profileResults = profileResults; final List args = new ArrayList<>(); args.add(externalScriptExecutableName); args.add("-u"); @@ -291,6 +296,11 @@ protected Process getProcess() { * Terminate the remote process, closing the fifo if any. */ public void terminate() { + if (profileResults != null) { + spController.writeProcessInput(String.format(PYTHON_END_PROFILING, profileResults.getAbsolutePath())); + sendAckRequest(); + waitForAck(); + } if (dataTransferFIFOWriter != null) { if (asyncWriter != null) { Assert.assertTrue(asyncWriter.terminate()); @@ -336,6 +346,11 @@ private void initializeTool(final File ackFIFOFile) { // wait for the ack to be sent spController.openAckFIFOForRead(); waitForAck(); + if (profileResults != null) { + spController.writeProcessInput(PYTHON_START_PROFILING); + sendAckRequest(); + waitForAck(); + } } private void sendAckRequest() { diff --git a/src/main/python/org/broadinstitute/hellbender/gatktool/tool.py b/src/main/python/org/broadinstitute/hellbender/gatktool/tool.py index 1c78813a636..5d3259f5134 100644 --- a/src/main/python/org/broadinstitute/hellbender/gatktool/tool.py +++ b/src/main/python/org/broadinstitute/hellbender/gatktool/tool.py @@ -14,9 +14,11 @@ import sys import os +import cProfile, pstats, io _ackFIFO = None _dataFIFO = None +_GATKProfiler = None def initializeGATK(ackFIFOName: str): """ @@ -107,6 +109,32 @@ def readDataFIFO() -> str: global _dataFIFO return _dataFIFO.readLine() +def startProfiling(): + """ + Start Python CProfile profiling. + """ + global _GATKProfiler + _GATKProfiler = cProfile.Profile() + _GATKProfiler.enable() + +def endProfiling(profileName: str): + """ + End Python CProfile profiling and write results to a file. The + startProfile function must have been previously called. The results + are ordered by cummulative time. + :param profileName: name of the file to which the profiling results should be written. + """ + global _GATKProfiler + _GATKProfiler.disable() + gatkProfilerDescriptor = os.open(profileName, os.O_WRONLY | os.O_CREAT) + gatkProfileStream = os.fdopen(gatkProfilerDescriptor, 'w') + gatkStats = pstats.Stats(_GATKProfiler, stream=gatkProfileStream).sort_stats('cumulative') + gatkStats.print_stats() + gatkProfileStream.close() + del gatkProfileStream + del gatkProfilerDescriptor + del gatkStats + class AckFIFO: """ diff --git a/src/test/java/org/broadinstitute/hellbender/utils/python/StreamingPythonScriptExecutorUnitTest.java b/src/test/java/org/broadinstitute/hellbender/utils/python/StreamingPythonScriptExecutorUnitTest.java index cf8fdd4b84b..9a58d57d796 100644 --- a/src/test/java/org/broadinstitute/hellbender/utils/python/StreamingPythonScriptExecutorUnitTest.java +++ b/src/test/java/org/broadinstitute/hellbender/utils/python/StreamingPythonScriptExecutorUnitTest.java @@ -218,6 +218,34 @@ public void testAsyncWriteService(final PythonScriptExecutor.PythonExecutableNam } } + @Test(groups = "python", dataProvider="supportedPythonVersions") + public void testEnablePythonProfiling(final PythonScriptExecutor.PythonExecutableName executableName) throws IOException { + // create a temporary output file for the profile results + final File profileFile = createTempFile("pythonProfileTest", "txt"); + final String CONSUME_SOME_CPU_SCRIPT = "list(2 * n for n in range(100))" + NL; + + final StreamingPythonScriptExecutor streamingPythonExecutor = + new StreamingPythonScriptExecutor<>(executableName,true); + Assert.assertNotNull(streamingPythonExecutor); + + Assert.assertTrue(streamingPythonExecutor.start(Collections.emptyList(), false, profileFile)); + + try { + streamingPythonExecutor.sendSynchronousCommand(CONSUME_SOME_CPU_SCRIPT); + } + finally { + streamingPythonExecutor.terminate(); + Assert.assertFalse(streamingPythonExecutor.getProcess().isAlive()); + } + // read the temp file in and validate + try (final FileInputStream fis= new FileInputStream(profileFile); + final BufferedLineReader br = new BufferedLineReader(fis)) { + final String profileLine1 = br.readLine(); + Assert.assertNotNull(profileLine1); + Assert.assertTrue(profileLine1.length() != 0); + } + } + @Test(groups = "python", dataProvider="supportedPythonVersions", dependsOnMethods = "testPythonExists", expectedExceptions = PythonScriptExecutorException.class) public void testRaisePythonException(final PythonScriptExecutor.PythonExecutableName executableName) { @@ -234,7 +262,7 @@ private void executeBadPythonCode(final PythonScriptExecutor.PythonExecutableNam final StreamingPythonScriptExecutor streamingPythonExecutor = new StreamingPythonScriptExecutor<>(executableName,true); Assert.assertNotNull(streamingPythonExecutor); - Assert.assertTrue(streamingPythonExecutor.start(Collections.emptyList(), true)); + Assert.assertTrue(streamingPythonExecutor.start(Collections.emptyList(), true, null)); try { streamingPythonExecutor.sendSynchronousCommand(errorCommand + NL);