Skip to content

Commit

Permalink
Fix incorrect output from prints originating from different processes (
Browse files Browse the repository at this point in the history
…#604)

* Fix incorrect output from prints originating from different processes

In the PipeFunc documentation I got:
```
```
  {
   "cell_type": "code",
   "execution_count": 47,
   "id": "92",
   "metadata": {
    "execution": {
     "iopub.execute_input": "2024-05-31T05:42:39.297713Z",
     "iopub.status.busy": "2024-05-31T05:42:39.297474Z",
     "iopub.status.idle": "2024-05-31T05:42:40.477462Z",
     "shell.execute_reply": "2024-05-31T05:42:40.475729Z"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.410279 - Running double_it for x=3"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.408318 - Running double_it for x=0"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.410888 - Running double_it for x=1"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.416024 - Running double_it for x=2"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.431485 - Running half_it for x=0"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.434285 - Running half_it for x=1"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.433559 - Running half_it for x=2"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.439223 - Running half_it for x=3"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:40.459668 - Running take_sum"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "14\n"
     ]
    }
   ],
   "source": [
    "from concurrent.futures import ProcessPoolExecutor\n",
    "import datetime\n",
    "import numpy as np\n",
    "import time\n",
    "from pipefunc import Pipeline, pipefunc\n",
    "\n",
    "\n",
    "@pipefunc(output_name=\"double\", mapspec=\"x[i] -> double[i]\")\n",
    "def double_it(x: int) -> int:\n",
    "    print(f\"{datetime.datetime.now()} - Running double_it for x={x}\")\n",
    "    time.sleep(1)\n",
    "    return 2 * x\n",
    "\n",
    "\n",
    "@pipefunc(output_name=\"half\", mapspec=\"x[i] -> half[i]\")\n",
    "def half_it(x: int) -> int:\n",
    "    print(f\"{datetime.datetime.now()} - Running half_it for x={x}\")\n",
    "    time.sleep(1)\n",
    "    return x // 2\n",
    "\n",
    "\n",
    "@pipefunc(output_name=\"sum\")\n",
    "def take_sum(half: np.ndarray, double: np.ndarray) -> int:\n",
    "    print(f\"{datetime.datetime.now()} - Running take_sum\")\n",
    "    return sum(half + double)\n",
    "\n",
    "\n",
    "pipeline = Pipeline([double_it, half_it, take_sum])\n",
    "inputs = {\"x\": [0, 1, 2, 3]}\n",
    "run_folder = \"my_run_folder\"\n",
    "executor = ProcessPoolExecutor(max_workers=8)  # use 8 processes\n",
    "results = pipeline.map(\n",
    "    inputs,\n",
    "    run_folder=run_folder,\n",
    "    parallel=True,\n",
    "    executor=executor,\n",
    "    storage=\"shared_memory_dict\",\n",
    ")\n",
    "print(results[\"sum\"].output)"
   ]
  },
```

* more line up

* Add parallel streams test

---------

Co-authored-by: Brigitta Sipőcz <bsipocz@gmail.com>
  • Loading branch information
basnijholt and bsipocz authored Sep 19, 2024
1 parent db8cedb commit 25fa81d
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 1 deletion.
5 changes: 4 additions & 1 deletion myst_nb/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ def coalesce_streams(outputs: list[NotebookNode]) -> list[NotebookNode]:
for output in outputs:
if output["output_type"] == "stream":
if output["name"] in streams:
streams[output["name"]]["text"] += output["text"]
out = output["text"].strip()
if out:
streams[output["name"]]["text"] += f"{out}\n"
else:
output["text"] = output["text"].strip() + "\n"
new_outputs.append(output)
streams[output["name"]] = output
else:
Expand Down
186 changes: 186 additions & 0 deletions tests/notebooks/merge_streams_parallel.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"execution": {
"iopub.execute_input": "2024-09-19T21:44:29.809012Z",
"iopub.status.busy": "2024-09-19T21:44:29.808809Z",
"iopub.status.idle": "2024-09-19T21:44:29.978481Z",
"shell.execute_reply": "2024-09-19T21:44:29.977891Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"0"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"0"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"0"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"0"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"0"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"0"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"0"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"0"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"0"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"0"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
}
],
"source": [
"from concurrent.futures import ProcessPoolExecutor\n",
"\n",
"with ProcessPoolExecutor() as executor:\n",
" for i in executor.map(print, [0] * 10):\n",
" pass"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
13 changes: 13 additions & 0 deletions tests/test_render_outputs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Tests for rendering code cell outputs."""

import pytest

from myst_nb.core.render import EntryPointError, load_renderer
Expand Down Expand Up @@ -103,6 +104,18 @@ def test_merge_streams(sphinx_run, file_regression):
file_regression.check(doctree.pformat(), extension=".xml", encoding="utf-8")


@pytest.mark.sphinx_params(
"merge_streams_parallel.ipynb",
conf={"nb_execution_mode": "off", "nb_merge_streams": True},
)
def test_merge_streams_parallel(sphinx_run, file_regression):
"""Test configuring multiple concurrent stdout/stderr outputs to be merged."""
sphinx_run.build()
assert sphinx_run.warnings() == ""
doctree = sphinx_run.get_resolved_doctree("merge_streams_parallel")
file_regression.check(doctree.pformat(), extension=".xml", encoding="utf-8")


@pytest.mark.sphinx_params(
"metadata_image.ipynb",
conf={"nb_execution_mode": "off", "nb_cell_metadata_key": "myst"},
Expand Down
21 changes: 21 additions & 0 deletions tests/test_render_outputs/test_merge_streams_parallel.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<document source="merge_streams_parallel">
<container cell_index="0" cell_metadata="{'execution': {'iopub.execute_input': '2024-09-19T21:44:29.809012Z', 'iopub.status.busy': '2024-09-19T21:44:29.808809Z', 'iopub.status.idle': '2024-09-19T21:44:29.978481Z', 'shell.execute_reply': '2024-09-19T21:44:29.977891Z'}}" classes="cell" exec_count="1" nb_element="cell_code">
<container classes="cell_input" nb_element="cell_code_source">
<literal_block language="ipython3" linenos="False" xml:space="preserve">
from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
for i in executor.map(print, [0] * 10):
pass
<container classes="cell_output" nb_element="cell_code_output">
<literal_block classes="output stream" language="myst-ansi" linenos="False" xml:space="preserve">
0
0
0
0
0
0
0
0
0
0

0 comments on commit 25fa81d

Please sign in to comment.