From 25fa81d71c14b554b5a18bc2f0f67dba099afbe8 Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Thu, 19 Sep 2024 15:34:36 -0700 Subject: [PATCH] Fix incorrect output from prints originating from different processes (#604) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix incorrect output from prints originating from different processes In the PipeFunc documentation I got: ``` ``` { "cell_type": "code", "execution_count": 47, "id": "92", "metadata": { "execution": { "iopub.execute_input": "2024-05-31T05:42:39.297713Z", "iopub.status.busy": "2024-05-31T05:42:39.297474Z", "iopub.status.idle": "2024-05-31T05:42:40.477462Z", "shell.execute_reply": "2024-05-31T05:42:40.475729Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2024-05-30 22:42:39.410279 - Running double_it for x=3" ] }, { "name": "stdout", "output_type": "stream", "text": [ "2024-05-30 22:42:39.408318 - Running double_it for x=0" ] }, { "name": "stdout", "output_type": "stream", "text": [ "2024-05-30 22:42:39.410888 - Running double_it for x=1" ] }, { "name": "stdout", "output_type": "stream", "text": [ "2024-05-30 22:42:39.416024 - Running double_it for x=2" ] }, { "name": "stdout", "output_type": "stream", "text": [ "\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "2024-05-30 22:42:39.431485 - Running half_it for x=0" ] }, { "name": "stdout", "output_type": "stream", "text": [ "\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "2024-05-30 22:42:39.434285 - Running half_it for x=1" ] }, { "name": "stdout", "output_type": "stream", "text": [ "2024-05-30 22:42:39.433559 - Running half_it for x=2" ] }, { "name": "stdout", "output_type": "stream", "text": [ "2024-05-30 22:42:39.439223 - Running half_it for x=3" ] }, { "name": "stdout", "output_type": "stream", "text": [ "\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "2024-05-30 22:42:40.459668 - Running take_sum" ] }, { "name": "stdout", "output_type": "stream", "text": [ "\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "14\n" ] } ], "source": [ "from concurrent.futures import ProcessPoolExecutor\n", "import datetime\n", "import numpy as np\n", "import time\n", "from pipefunc import Pipeline, pipefunc\n", "\n", "\n", "@pipefunc(output_name=\"double\", mapspec=\"x[i] -> double[i]\")\n", "def double_it(x: int) -> int:\n", " print(f\"{datetime.datetime.now()} - Running double_it for x={x}\")\n", " time.sleep(1)\n", " return 2 * x\n", "\n", "\n", "@pipefunc(output_name=\"half\", mapspec=\"x[i] -> half[i]\")\n", "def half_it(x: int) -> int:\n", " print(f\"{datetime.datetime.now()} - Running half_it for x={x}\")\n", " time.sleep(1)\n", " return x // 2\n", "\n", "\n", "@pipefunc(output_name=\"sum\")\n", "def take_sum(half: np.ndarray, double: np.ndarray) -> int:\n", " print(f\"{datetime.datetime.now()} - Running take_sum\")\n", " return sum(half + double)\n", "\n", "\n", "pipeline = Pipeline([double_it, half_it, take_sum])\n", "inputs = {\"x\": [0, 1, 2, 3]}\n", "run_folder = \"my_run_folder\"\n", "executor = ProcessPoolExecutor(max_workers=8) # use 8 processes\n", "results = pipeline.map(\n", " inputs,\n", " run_folder=run_folder,\n", " parallel=True,\n", " executor=executor,\n", " storage=\"shared_memory_dict\",\n", ")\n", "print(results[\"sum\"].output)" ] }, ``` * more line up * Add parallel streams test --------- Co-authored-by: Brigitta Sipőcz --- myst_nb/core/utils.py | 5 +- tests/notebooks/merge_streams_parallel.ipynb | 186 ++++++++++++++++++ tests/test_render_outputs.py | 13 ++ .../test_merge_streams_parallel.xml | 21 ++ 4 files changed, 224 insertions(+), 1 deletion(-) create mode 100644 tests/notebooks/merge_streams_parallel.ipynb create mode 100644 tests/test_render_outputs/test_merge_streams_parallel.xml diff --git a/myst_nb/core/utils.py b/myst_nb/core/utils.py index d1165712..b3f428f5 100644 --- a/myst_nb/core/utils.py +++ b/myst_nb/core/utils.py @@ -25,8 +25,11 @@ def coalesce_streams(outputs: list[NotebookNode]) -> list[NotebookNode]: for output in outputs: if output["output_type"] == "stream": if output["name"] in streams: - streams[output["name"]]["text"] += output["text"] + out = output["text"].strip() + if out: + streams[output["name"]]["text"] += f"{out}\n" else: + output["text"] = output["text"].strip() + "\n" new_outputs.append(output) streams[output["name"]] = output else: diff --git a/tests/notebooks/merge_streams_parallel.ipynb b/tests/notebooks/merge_streams_parallel.ipynb new file mode 100644 index 00000000..9a9e5adc --- /dev/null +++ b/tests/notebooks/merge_streams_parallel.ipynb @@ -0,0 +1,186 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "execution": { + "iopub.execute_input": "2024-09-19T21:44:29.809012Z", + "iopub.status.busy": "2024-09-19T21:44:29.808809Z", + "iopub.status.idle": "2024-09-19T21:44:29.978481Z", + "shell.execute_reply": "2024-09-19T21:44:29.977891Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "0" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "0" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "0" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "0" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "0" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "0" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "0" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "0" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "0" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "0" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n" + ] + } + ], + "source": [ + "from concurrent.futures import ProcessPoolExecutor\n", + "\n", + "with ProcessPoolExecutor() as executor:\n", + " for i in executor.map(print, [0] * 10):\n", + " pass" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.3" + } + }, + "nbformat": 4, + "nbformat_minor": 4 + } diff --git a/tests/test_render_outputs.py b/tests/test_render_outputs.py index 1b58e181..d24d7586 100644 --- a/tests/test_render_outputs.py +++ b/tests/test_render_outputs.py @@ -1,4 +1,5 @@ """Tests for rendering code cell outputs.""" + import pytest from myst_nb.core.render import EntryPointError, load_renderer @@ -103,6 +104,18 @@ def test_merge_streams(sphinx_run, file_regression): file_regression.check(doctree.pformat(), extension=".xml", encoding="utf-8") +@pytest.mark.sphinx_params( + "merge_streams_parallel.ipynb", + conf={"nb_execution_mode": "off", "nb_merge_streams": True}, +) +def test_merge_streams_parallel(sphinx_run, file_regression): + """Test configuring multiple concurrent stdout/stderr outputs to be merged.""" + sphinx_run.build() + assert sphinx_run.warnings() == "" + doctree = sphinx_run.get_resolved_doctree("merge_streams_parallel") + file_regression.check(doctree.pformat(), extension=".xml", encoding="utf-8") + + @pytest.mark.sphinx_params( "metadata_image.ipynb", conf={"nb_execution_mode": "off", "nb_cell_metadata_key": "myst"}, diff --git a/tests/test_render_outputs/test_merge_streams_parallel.xml b/tests/test_render_outputs/test_merge_streams_parallel.xml new file mode 100644 index 00000000..17254962 --- /dev/null +++ b/tests/test_render_outputs/test_merge_streams_parallel.xml @@ -0,0 +1,21 @@ + + + + + from concurrent.futures import ProcessPoolExecutor + + with ProcessPoolExecutor() as executor: + for i in executor.map(print, [0] * 10): + pass + + + 0 + 0 + 0 + 0 + 0 + 0 + 0 + 0 + 0 + 0