diff --git a/.github/workflows/conda.yml b/.github/workflows/conda.yml index 7725bcfe7d..07630f9b67 100644 --- a/.github/workflows/conda.yml +++ b/.github/workflows/conda.yml @@ -13,7 +13,7 @@ on: # When this workflow is queued, automatically cancel any previous running # or pending jobs from the same branch concurrency: - group: conda-${{ github.head_ref }} + group: conda-${{ github.ref }} cancel-in-progress: true # Required shell entrypoint to have properly activated conda environments diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index bd207e8e86..02709f0703 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -9,7 +9,7 @@ on: # When this workflow is queued, automatically cancel any previous running # or pending jobs from the same branch concurrency: - group: ${{ github.ref }} + group: tests-${{ github.ref }} cancel-in-progress: true jobs: @@ -23,22 +23,26 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest, windows-latest, macos-latest] - python-version: ["3.8", "3.9", "3.10", "3.11"] + environment: [mindeps, "3.8", "3.9", "3.10", "3.11"] queuing: [queue] # Cherry-pick test modules to split the overall runtime roughly in half partition: [ci1, not ci1] exclude: - os: macos-latest - python-version: 3.9 + environment: "3.9" - os: macos-latest - python-version: 3.10 + environment: "3.10" + - os: macos-latest + environment: mindeps + - os: windows-latest + environment: mindeps include: - os: ubuntu-latest - python-version: 3.9 + environment: "3.9" queuing: no_queue partition: "ci1" - os: ubuntu-latest - python-version: 3.9 + environment: "3.9" queuing: no_queue partition: "not ci1" @@ -51,15 +55,15 @@ jobs: # run: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20] env: - CONDA_FILE: continuous_integration/environment-${{ matrix.python-version }}.yaml + CONDA_FILE: continuous_integration/environment-${{ matrix.environment }}.yaml steps: - name: Set $TEST_ID run: | export PARTITION_LABEL=$( echo "${{ matrix.partition }}" | sed "s/ //" ) - export TEST_ID="${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.queuing }}-$PARTITION_LABEL" + export TEST_ID="${{ matrix.os }}-${{ matrix.environment }}-${{ matrix.queuing }}-$PARTITION_LABEL" # Switch to this version for stress-test: - # export TEST_ID="${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.queuing }}-$PARTITION_LABEL-${{ matrix.run }}" + # export TEST_ID="${{ matrix.os }}-${{ matrix.environment }}-${{ matrix.queuing }}-$PARTITION_LABEL-${{ matrix.run }}" echo "TEST_ID: $TEST_ID" echo "TEST_ID=$TEST_ID" >> $GITHUB_ENV shell: bash @@ -101,7 +105,7 @@ jobs: path: ${{ env.CONDA }}/envs key: conda-${{ matrix.os }}-${{ steps.get-date.outputs.today }}-${{ hashFiles(env.CONDA_FILE) }}-${{ env.CACHE_NUMBER }} env: - # Increase this value to reset cache if continuous_integration/environment-${{ matrix.python-version }}.yaml has not changed + # Increase this value to reset cache if continuous_integration/environment-${{ matrix.environment }}.yaml has not changed CACHE_NUMBER: 0 id: cache @@ -150,6 +154,8 @@ jobs: run: echo "DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=inf" >> $GITHUB_ENV - name: Print host info + # host_info.py imports numpy, which isn't a direct dependency of distributed + if: matrix.environment != 'mindeps' shell: bash -l {0} run: | python continuous_integration/scripts/host_info.py @@ -159,6 +165,7 @@ jobs: shell: bash -l {0} env: PYTHONFAULTHANDLER: 1 + MINDEPS: ${{ matrix.environment == 'mindeps' }} run: | source continuous_integration/scripts/set_ulimit.sh set -o pipefail diff --git a/continuous_integration/environment-3.10.yaml b/continuous_integration/environment-3.10.yaml index be9939b81c..a88007dfd2 100644 --- a/continuous_integration/environment-3.10.yaml +++ b/continuous_integration/environment-3.10.yaml @@ -17,7 +17,7 @@ dependencies: - h5py - ipykernel - ipywidgets - - jinja2 + - jinja2 >=2.10.3 - locket >=1.0 - msgpack-python - netcdf4 diff --git a/continuous_integration/environment-3.8.yaml b/continuous_integration/environment-3.8.yaml index 24c3cddec1..b969f90395 100644 --- a/continuous_integration/environment-3.8.yaml +++ b/continuous_integration/environment-3.8.yaml @@ -18,7 +18,7 @@ dependencies: - h5py - ipykernel - ipywidgets - - jinja2 + - jinja2 >=2.10.3 - locket >=1.0 - msgpack-python - netcdf4 diff --git a/continuous_integration/environment-3.9.yaml b/continuous_integration/environment-3.9.yaml index 175dae3ec9..1c467ecbc4 100644 --- a/continuous_integration/environment-3.9.yaml +++ b/continuous_integration/environment-3.9.yaml @@ -18,7 +18,7 @@ dependencies: - h5py - ipykernel - ipywidgets - - jinja2 + - jinja2 >=2.10.3 - locket >=1.0 - lz4 >=0.23.1 # Only tested here - msgpack-python diff --git a/continuous_integration/environment-mindeps.yaml b/continuous_integration/environment-mindeps.yaml new file mode 100644 index 0000000000..867385586b --- /dev/null +++ b/continuous_integration/environment-mindeps.yaml @@ -0,0 +1,32 @@ +name: dask-distributed +channels: + - conda-forge + - defaults +dependencies: + - python=3.8 + - click=7.0 + - cloudpickle=1.5.0 + - cytoolz=0.10.1 + - jinja2=2.10.3 + - locket=1.0.0 + - msgpack-python=1.0.0 + - packaging=20.0 + - psutil=5.7.0 + - pyyaml=5.3.1 + - sortedcontainers=2.0.5 + - tblib=1.6.0 + - toolz=0.10.0 + - tornado=6.0.3 + - urllib3=1.24.3 + - zict=2.1.0 + # Distributed depends on the latest version of Dask + - pip + - pip: + - git+https://github.com/dask/dask + # test dependencies + - pytest + - pytest-cov + - pytest-faulthandler + - pytest-repeat + - pytest-rerunfailures + - pytest-timeout diff --git a/continuous_integration/recipes/dask/meta.yaml b/continuous_integration/recipes/dask/meta.yaml index 7b2f546547..c3038cf952 100644 --- a/continuous_integration/recipes/dask/meta.yaml +++ b/continuous_integration/recipes/dask/meta.yaml @@ -30,7 +30,7 @@ requirements: - numpy >=1.18 - pandas >=1.0 - bokeh >=2.4.2,<3 - - jinja2 + - jinja2 >=2.10.3 run_constrained: - openssl !=1.1.1e diff --git a/continuous_integration/recipes/distributed/meta.yaml b/continuous_integration/recipes/distributed/meta.yaml index da1ece3989..19335fc222 100644 --- a/continuous_integration/recipes/distributed/meta.yaml +++ b/continuous_integration/recipes/distributed/meta.yaml @@ -31,20 +31,20 @@ requirements: - python >=3.8 - click >=7.0 - cloudpickle >=1.5.0 - - cytoolz >=0.10.0 + - cytoolz >=0.10.1 - {{ pin_compatible('dask-core', max_pin='x.x.x.x') }} - - jinja2 + - jinja2 >=2.10.3 - locket >=1.0.0 - - msgpack-python >=0.6.0 + - msgpack-python >=1.0.0 - packaging >=20.0 - - psutil >=5.0 - - pyyaml - - sortedcontainers !=2.0.0,!=2.0.1 + - psutil >=5.7.0 + - pyyaml >=5.3.1 + - sortedcontainers >=2.0.5 - tblib >=1.6.0 - toolz >=0.10.0 - tornado >=6.0.3 - - urllib3 - - zict >=0.1.3 + - urllib3 >=1.24.3 + - zict >=2.1.0 run_constrained: - openssl !=1.1.1e diff --git a/distributed/http/scheduler/tests/test_scheduler_http.py b/distributed/http/scheduler/tests/test_scheduler_http.py index 159388075c..838fedb2f6 100644 --- a/distributed/http/scheduler/tests/test_scheduler_http.py +++ b/distributed/http/scheduler/tests/test_scheduler_http.py @@ -5,11 +5,7 @@ import re from unittest import mock -import aiohttp import pytest - -pytest.importorskip("bokeh") - from tornado.escape import url_escape from tornado.httpclient import AsyncHTTPClient, HTTPClientError @@ -84,6 +80,8 @@ async def test_worker_404(c, s): @gen_cluster(client=True, scheduler_kwargs={"http_prefix": "/foo", "dashboard": True}) async def test_prefix(c, s, a, b): + pytest.importorskip("bokeh") + http_client = AsyncHTTPClient() for suffix in ["foo/info/main/workers.html", "foo/json/index.html", "foo/system"]: response = await http_client.fetch( @@ -295,6 +293,8 @@ async def test_task_page(c, s, a, b): }, ) async def test_allow_websocket_origin(c, s, a, b): + pytest.importorskip("bokeh") + from tornado.httpclient import HTTPRequest from tornado.websocket import websocket_connect @@ -335,6 +335,8 @@ def test_api_disabled_by_default(): }, ) async def test_api(c, s, a, b): + aiohttp = pytest.importorskip("aiohttp") + async with aiohttp.ClientSession() as session: async with session.get( "http://localhost:%d/api/v1" % s.http_server.port @@ -353,6 +355,8 @@ async def test_api(c, s, a, b): }, ) async def test_retire_workers(c, s, a, b): + aiohttp = pytest.importorskip("aiohttp") + async with aiohttp.ClientSession() as session: params = {"workers": [a.address, b.address]} async with session.post( @@ -374,6 +378,8 @@ async def test_retire_workers(c, s, a, b): }, ) async def test_get_workers(c, s, a, b): + aiohttp = pytest.importorskip("aiohttp") + async with aiohttp.ClientSession() as session: async with session.get( "http://localhost:%d/api/v1/get_workers" % s.http_server.port @@ -394,6 +400,8 @@ async def test_get_workers(c, s, a, b): }, ) async def test_adaptive_target(c, s, a, b): + aiohttp = pytest.importorskip("aiohttp") + async with aiohttp.ClientSession() as session: async with session.get( "http://localhost:%d/api/v1/adaptive_target" % s.http_server.port diff --git a/distributed/protocol/tests/test_collection_cuda.py b/distributed/protocol/tests/test_collection_cuda.py index bd2614f179..d0cb3224ed 100644 --- a/distributed/protocol/tests/test_collection_cuda.py +++ b/distributed/protocol/tests/test_collection_cuda.py @@ -3,9 +3,6 @@ import pytest pytestmark = pytest.mark.gpu - -from dask.dataframe.utils import assert_eq - from distributed.protocol import deserialize, serialize @@ -46,6 +43,7 @@ def test_serialize_cupy(collection, y, y_serializer): def test_serialize_pandas_pandas(collection, df2, df2_serializer): cudf = pytest.importorskip("cudf") pd = pytest.importorskip("pandas") + dd = pytest.importorskip("dask.dataframe") df1 = cudf.DataFrame({"A": [1, 2, None], "B": [1.0, 2.0, None]}) if df2 is not None: df2 = cudf.from_pandas(pd.DataFrame(df2)) @@ -63,8 +61,8 @@ def test_serialize_pandas_pandas(collection, df2, df2_serializer): assert sub_headers[1]["serializer"] == df2_serializer assert isinstance(t, collection) - assert_eq(t["df1"] if isinstance(t, dict) else t[0], df1) + dd.assert_eq(t["df1"] if isinstance(t, dict) else t[0], df1) if df2 is None: assert (t["df2"] if isinstance(t, dict) else t[1]) is None else: - assert_eq(t["df2"] if isinstance(t, dict) else t[1], df2) + dd.assert_eq(t["df2"] if isinstance(t, dict) else t[1], df2) diff --git a/distributed/protocol/tests/test_highlevelgraph.py b/distributed/protocol/tests/test_highlevelgraph.py index cd35088382..adb150bfbc 100644 --- a/distributed/protocol/tests/test_highlevelgraph.py +++ b/distributed/protocol/tests/test_highlevelgraph.py @@ -4,6 +4,11 @@ import pytest +np = pytest.importorskip("numpy") +pd = pytest.importorskip("pandas") + +from numpy.testing import assert_array_equal + import dask import dask.array as da import dask.dataframe as dd @@ -11,11 +16,6 @@ from distributed.diagnostics import SchedulerPlugin from distributed.utils_test import gen_cluster -np = pytest.importorskip("numpy") -pd = pytest.importorskip("pandas") - -from numpy.testing import assert_array_equal - @gen_cluster(client=True) async def test_combo_of_layer_types(c, s, a, b): diff --git a/distributed/protocol/tests/test_protocol_utils.py b/distributed/protocol/tests/test_protocol_utils.py index d577350470..3d9b1b51df 100644 --- a/distributed/protocol/tests/test_protocol_utils.py +++ b/distributed/protocol/tests/test_protocol_utils.py @@ -66,6 +66,7 @@ def test_readonly_buffer(self): assert result == base def test_catch_non_memoryview(self): + pytest.importorskip("numpy") with pytest.raises(TypeError, match="Expected memoryview"): merge_memoryviews([b"1234", memoryview(b"4567")]) diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index e87413d44b..311cb04df0 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -10,10 +10,10 @@ from typing import Any, Mapping from unittest import mock -import numpy as np -import pandas as pd import pytest +pd = pytest.importorskip("pandas") + import dask import dask.dataframe as dd from dask.distributed import Event, Nanny, Worker @@ -39,8 +39,6 @@ from distributed.utils_test import gen_cluster, gen_test, wait_for_state from distributed.worker_state_machine import TaskState as WorkerTaskState -pa = pytest.importorskip("pyarrow") - async def clean_worker( worker: Worker, interval: float = 0.01, timeout: int | None = None @@ -657,6 +655,8 @@ def test_processing_chain(): In practice this takes place on many different workers. Here we verify its accuracy in a single threaded situation. """ + np = pytest.importorskip("numpy") + pa = pytest.importorskip("pyarrow") class Stub: def __init__(self, value: int) -> None: @@ -1222,6 +1222,8 @@ async def test_basic_lowlevel_shuffle( npartitions, barrier_first_worker, ): + pa = pytest.importorskip("pyarrow") + dfs = [] rows_per_df = 10 for ix in range(n_input_partitions): @@ -1296,6 +1298,8 @@ def _done(): @gen_test() async def test_error_offload(tmpdir, loop_in_thread): + pa = pytest.importorskip("pyarrow") + dfs = [] rows_per_df = 10 n_input_partitions = 2 @@ -1347,6 +1351,8 @@ async def offload(self, func, *args): @gen_test() async def test_error_send(tmpdir, loop_in_thread): + pa = pytest.importorskip("pyarrow") + dfs = [] rows_per_df = 10 n_input_partitions = 1 @@ -1397,6 +1403,8 @@ async def send(self, address: str, shards: list[bytes]) -> None: @gen_test() async def test_error_receive(tmpdir, loop_in_thread): + pa = pytest.importorskip("pyarrow") + dfs = [] rows_per_df = 10 n_input_partitions = 1 diff --git a/distributed/system_monitor.py b/distributed/system_monitor.py index d92dac6c11..2790338352 100644 --- a/distributed/system_monitor.py +++ b/distributed/system_monitor.py @@ -65,7 +65,8 @@ def __init__( try: disk_ioc = psutil.disk_io_counters() except Exception: - # FIXME is this possible? + # FIXME occurs when psutil version doesn't have handling for given platform / kernel; + # should we explicitly error in this case? monitor_disk_io = False # pragma: nocover else: if disk_ioc is None: # pragma: nocover diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index 0fe602c14a..7f8258bfa3 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -212,6 +212,10 @@ async def test_scheduler_file(): s.stop() +@pytest.mark.xfail( + os.environ.get("MINDEPS") == "true", + reason="Timeout errors with mindeps environment", +) @gen_cluster(client=True, Worker=Nanny, nthreads=[("127.0.0.1", 2)]) async def test_nanny_timeout(c, s, a): x = await c.scatter(123) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index e627b2df95..38ced07531 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -3971,7 +3971,9 @@ async def test_TaskState__to_dict(c, s): def _verify_cluster_state( - state: dict, workers: Collection[Worker], allow_missing: bool = False + state: dict, + workers: Collection[Worker], + allow_missing: bool = False, ) -> None: addrs = {w.address for w in workers} assert state.keys() == {"scheduler", "workers", "versions"} diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index f25dcf5775..bf726d1bcf 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -12,7 +12,6 @@ from time import sleep from typing import Callable, Iterable, Mapping, Sequence -import numpy as np import pytest from tlz import merge, sliding_window @@ -75,6 +74,7 @@ async def test_work_stealing(c, s, a, b): @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2) async def test_dont_steal_expensive_data_fast_computation(c, s, a, b): np = pytest.importorskip("numpy") + x = c.submit(np.arange, 1000000, workers=a.address) await wait([x]) future = c.submit(np.sum, [1], workers=a.address) # learn that sum is fast @@ -1352,6 +1352,8 @@ def test_steal_worker_state(ws_with_running_task): @pytest.mark.slow() @gen_cluster(nthreads=[("", 1)] * 4, client=True) async def test_steal_very_fast_tasks(c, s, *workers): + np = pytest.importorskip("numpy") + # Ensure that very fast tasks are allowed to be stolen root = dask.delayed(lambda n: "x" * n)( dask.utils.parse_bytes("1MiB"), dask_key_name="root" diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 08d33c9450..992206abfd 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -3622,6 +3622,8 @@ def get_data(self, comm, **kwargs): @pytest.mark.slow @gen_cluster(client=True, Worker=BreakingWorker) async def test_broken_comm(c, s, a, b): + pytest.importorskip("dask.dataframe") + df = dask.datasets.timeseries( start="2000-01-01", end="2000-01-10", diff --git a/requirements.txt b/requirements.txt index e5294dd7c9..8378684d49 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,15 +1,15 @@ click >= 7.0 cloudpickle >= 1.5.0 dask == 2022.12.1 -jinja2 +jinja2 >= 2.10.3 locket >= 1.0.0 -msgpack >= 0.6.0 +msgpack >= 1.0.0 packaging >= 20.0 -psutil >= 5.0 -sortedcontainers !=2.0.0, !=2.0.1 +psutil >= 5.7.0 +pyyaml >= 5.3.1 +sortedcontainers >= 2.0.5 tblib >= 1.6.0 toolz >= 0.10.0 tornado >= 6.0.3 -urllib3 -zict >= 0.1.3 -pyyaml +urllib3 >= 1.24.3 +zict >= 2.1.0 \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index bad2bb8116..a1f85cda8a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -80,7 +80,7 @@ filterwarnings = ignore:overflow encountered in long_scalars:RuntimeWarning ignore:Creating scratch directories is taking a surprisingly long time.*:UserWarning ignore:Scheduler already contains a plugin with name nonidempotentplugin. overwriting:UserWarning - ignore:Increasing number of chunks by factor of 20:dask.array.core.PerformanceWarning + ignore:Increasing number of chunks by factor of 20::dask.array.core.PerformanceWarning ignore::distributed.versions.VersionMismatchWarning ignore:(?s)Exception in thread.*old_ssh.*channel\.send\(b"\\x03"\).*Socket is closed:pytest.PytestUnhandledThreadExceptionWarning ignore:(?s)Exception in thread.*paramiko\.ssh_exception\.NoValidConnectionsError:pytest.PytestUnhandledThreadExceptionWarning