-
-
Notifications
You must be signed in to change notification settings - Fork 727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Exclude IPython code from computations #7788
Exclude IPython code from computations #7788
Conversation
Updated to ignore any frames in In [1]: import dask
...: from distributed import Client
...: client = Client()
...: dask.config.set({"distributed.diagnostics.computations.nframes": 2})
...:
...: def foo(x): print(x); return x;
...: def zip(x): return foo(x);
...: N = client.map(zip, range(2))
In [2]: print(client.cluster.scheduler.computations[-1].code[0][0])
def run_cell(
self,
raw_cell,
store_history=False,
silent=False,
shell_futures=True,
cell_id=None,
):
"""Run a complete IPython cell.
Parameters
----------
raw_cell : str
The code (including IPython code such as %magic functions) to run.
store_history : bool
If True, the raw and translated cell will be stored in IPython's
history. For user code calling back into IPython's machinery, this
should be set to False.
silent : bool
If True, avoid side-effects, such as implicit displayhooks and
and logging. silent=True forces store_history=False.
shell_futures : bool
If True, the code will share future statements with the interactive
shell. It will both be affected by previous __future__ imports, and
any __future__ imports in the code will affect the shell. If False,
__future__ imports are not shared in either direction.
Returns
-------
result : :class:`ExecutionResult`
"""
result = None
try:
result = self._run_cell(
raw_cell, store_history, silent, shell_futures, cell_id
)
finally:
self.events.trigger('post_execute')
if not silent:
self.events.trigger('post_run_cell', result)
return result Which is coming from a separate module in IPython. |
1ab1e79
to
889c0dc
Compare
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 20 files ± 0 20 suites ±0 11h 13m 8s ⏱️ - 42m 3s For more details on these failures, see this check. Results for commit f4cc4d0. ± Comparison against base commit 6b04277. ♻️ This comment has been updated with latest results. |
Yeah, I briefly tried to address this by adding modules to the list at distributed/distributed/distributed.yaml Line 274 in b83c055
and new things kept showing up... including traitlets, so just doing The goal here is just to collect code that's interesting and relevant to the Dask cluster is going to be doing. I don't think anything in |
Went with this suspicion (b1b8585) and seems to work well enough for me in my trials. For example, the original issue is as expected: In [1]: import dask
...: from distributed import Client
...: client = Client()
...: dask.config.set({"distributed.diagnostics.computations.nframes": 2})
...:
...: def foo(x): print(x); return x;
...: def zip(x): return foo(x);
...: N = client.map(zip, range(2))
...: import time
...: time.sleep(2)
In [2]: print(client.cluster.scheduler.computations[-1].code[0][0])
1
0
import dask
from distributed import Client
client = Client()
dask.config.set({"distributed.diagnostics.computations.nframes": 2})
def foo(x): print(x); return x;
def zip(x): return foo(x);
N = client.map(zip, range(2))
import time
time.sleep(2)
print(client.cluster.scheduler.computations[-1].code[0][0]) |
Are you assuming there's only a single frame of user code? (I gave your code a quick read and that's what it looks like). If so, that's not correct. Here's another example where you can see this: import dask
from distributed import Client
client = Client()
dask.config.set({"distributed.diagnostics.computations.nframes": 3})
def foo(x): print(x); return x;
def bar(x): return client.map(foo, range(x))
N = bar(3)
|
b1b8585
to
bc660ac
Compare
Sorry for the delay, and you were (unsurprisingly) quite right. I've updated to take the first encountered ipython related frame onwards, so the example now works without taking just the last frame of user code |
@ntabris when you have time to give this another look, I think it may be what we're after now. |
@milesgranger can you say more about what testing you've done / are expecting someone else to do? I have a few nits, but if this has been tested and works, I'd be happy to approve. |
@milesgranger: Is there a way we can automatically test this? What additional packages would we need to install on CI? |
@hendrikmakait @ntabris There exists no tests for the current IPython logic in this same function (AFAICT), so was kinda going off that being 'okay'. Given that it is testing specifically if a frame is part of |
I always get nervous around untested code in particular if I don't know how if this is more of a fringe case to handle or something that happens frequently.
Something we could explore would be executing code via https://ipython.readthedocs.io/en/stable/api/generated/IPython.core.interactiveshell.html#IPython.core.interactiveshell.InteractiveShell.run_cell. It's less e2e compared to actually running an interactive shell session, but might get us most of the way without having to fiddle with subprocesses or something like that. |
e90d56b
to
bb7f77a
Compare
Got something together with testing ignoring IPython frames bb7f77a, but wasn't as nice as I'd hoped for. Seems like some conflict with the current event loop due to trying to use a client inside of this IPython InteractiveShell. Tried a number of things, including but not limited to:
Will try a few more things later, but putting it here for now in case anyone has suggestions or wanted to try their hand at it. |
Maybe @graingert can help with event-loop-related problems? |
611b9f9
to
35956fd
Compare
I've also been trying my hand at creating a test and this is the best version I could come up with: @gen_cluster(client=False)
async def test_ignore_ipython(s, a, b):
pytest.importorskip("IPython")
from IPython.core.interactiveshell import InteractiveShell
source_code = """
import time
import dask
from distributed import Client
dask.config.set({{"distributed.diagnostics.computations.nframes": 3}})
with Client("{}") as client:
def foo(x): print(x); return x;
def bar(x): return client.map(foo, range(x))
N = client.gather(bar(3))
""".format(s.address)
with concurrent.futures.ProcessPoolExecutor() as executor:
result = await asyncio.get_running_loop().run_in_executor(
executor,
run_in_ipython,
source_code,
)
assert result.success
computations = s.computations
assert len(computations) == 1 # 1 computation
computation = computations[0]
assert len(computation.code) == 1 # 1 code
code = computation.code[0]
assert len(code) == 2 # 2 frames, even with nframes 3 other is ipython code
def normalize(s):
return re.sub(r"\s+", " ", s).strip()
assert normalize(code[0]) == normalize(source_code)
assert normalize(code[1]) == "def bar(x): return client.map(foo, range(x))" Not running IPython in a separate process kept leaking a thread for me, so I couldn't use the |
Co-authored-by: Hendrik Makait <hendrik@makait.com>
distributed/tests/test_client.py
Outdated
# directly in InteractiveShell (and does) but requires `--reruns=1` | ||
# due to some underlying lag in the asyncio if ran by itself, but will | ||
# otherwise run fine in the suite of tests. | ||
with concurrent.futures.ProcessPoolExecutor() as executor: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a good idea to pass mp_context=multiprocessing.get_context("spawn")
as this defaults to fork on linux, which I'm surprised is working in asyncio
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ready to merge after CI finishes
Closes #7777
pre-commit run --all-files
@ntabris wasn't sure how pedantic one should be to verify the
run_code
is indeed the IPython'srun_code
we want to ignore, so feel free to recommend adjustments. :)