Skip to content

Commit

Permalink
Drop comparison of versions against all clients (dask#6861)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait authored Aug 11, 2022
1 parent d234569 commit 99a2db1
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 19 deletions.
7 changes: 2 additions & 5 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3752,12 +3752,9 @@ async def add_worker(

version_warning = version_module.error_message(
version_module.get_versions(),
merge(
{w: ws.versions for w, ws in self.workers.items()},
{c: cs.versions for c, cs in self.clients.items() if cs.versions},
),
{w: ws.versions for w, ws in self.workers.items()},
versions,
client_name="This Worker",
source_name=str(ws.server_id),
)
msg.update(version_warning)

Expand Down
13 changes: 7 additions & 6 deletions distributed/tests/test_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ def kwargs_matching():
return dict(
scheduler=get_versions(),
workers={f"worker-{i}": get_versions() for i in range(3)},
client=get_versions(),
source=get_versions(),
)


def test_versions_match(kwargs_matching):
assert error_message(**kwargs_matching)["warning"] == ""


@pytest.fixture(params=["client", "scheduler", "worker-1"])
@pytest.fixture(params=["source", "scheduler", "worker-1"])
def node(request):
"""Node affected by version mismatch."""
return request.param
Expand Down Expand Up @@ -76,7 +76,7 @@ def pattern(effect):


def test_version_mismatch(node, effect, kwargs_not_matching, pattern):
column_matching = {"client": 1, "scheduler": 2, "workers": 3}
column_matching = {"source": 1, "scheduler": 2, "workers": 3}
msg = error_message(**kwargs_not_matching)
i = column_matching.get(node, 3)
assert "Mismatched versions found" in msg["warning"]
Expand All @@ -95,7 +95,7 @@ def test_version_mismatch(node, effect, kwargs_not_matching, pattern):
def test_scheduler_mismatched_irrelevant_package(kwargs_matching):
"""An irrelevant package on the scheduler can have any version."""
kwargs_matching["scheduler"]["packages"]["numpy"] = "0.0.0"
assert "numpy" in kwargs_matching["client"]["packages"]
assert "numpy" in kwargs_matching["source"]["packages"]

assert error_message(**kwargs_matching)["warning"] == ""

Expand All @@ -108,7 +108,7 @@ def test_scheduler_additional_irrelevant_package(kwargs_matching):


def test_python_mismatch(kwargs_matching):
kwargs_matching["client"]["packages"]["python"] = "0.0.0"
kwargs_matching["source"]["packages"]["python"] = "0.0.0"
msg = error_message(**kwargs_matching)
assert "Mismatched versions found" in msg["warning"]
assert "python" in msg["warning"]
Expand All @@ -134,7 +134,8 @@ async def test_version_warning_in_cluster(s, a, b):
assert any("0.0.0" in str(r.message) for r in record)

async with Worker(s.address) as w:
assert any("workers" in line.message for line in w.logs)
assert any(w.id in line.message for line in w.logs)
assert any("Workers" in line.message for line in w.logs)
assert any("dask" in line.message for line in w.logs)
assert any("0.0.0" in line.message in line.message for line in w.logs)

Expand Down
16 changes: 8 additions & 8 deletions distributed/versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,15 @@ def get_package_info(
return pversions


def error_message(scheduler, workers, client, client_name="client"):
def error_message(scheduler, workers, source, source_name="Client"):
from distributed.utils import asciitable

client = client.get("packages") if client else "UNKNOWN"
source = source.get("packages") if source else "UNKNOWN"
scheduler = scheduler.get("packages") if scheduler else "UNKNOWN"
workers = {k: v.get("packages") if v else "UNKNOWN" for k, v in workers.items()}

packages = set()
packages.update(client)
packages.update(source)
packages.update(scheduler)
for worker in workers:
packages.update(workers.get(worker))
Expand All @@ -128,10 +128,10 @@ def error_message(scheduler, workers, client, client_name="client"):
if pkg in scheduler_relevant_packages:
versions.add(scheduler_version)

client_version = (
client.get(pkg, "MISSING") if isinstance(client, dict) else client
source_version = (
source.get(pkg, "MISSING") if isinstance(source, dict) else source
)
versions.add(client_version)
versions.add(source_version)

worker_versions = {
workers[w].get(pkg, "MISSING")
Expand All @@ -148,14 +148,14 @@ def error_message(scheduler, workers, client, client_name="client"):
elif len(worker_versions) == 0:
worker_versions = None

errs.append((pkg, client_version, scheduler_version, worker_versions))
errs.append((pkg, source_version, scheduler_version, worker_versions))
if pkg in notes_mismatch_package.keys():
notes.append(f"- {pkg}: {notes_mismatch_package[pkg]}")

out = {"warning": "", "error": ""}

if errs:
err_table = asciitable(["Package", client_name, "scheduler", "workers"], errs)
err_table = asciitable(["Package", source_name, "Scheduler", "Workers"], errs)
err_msg = f"Mismatched versions found\n\n{err_table}"
if notes:
err_msg += "\nNotes: \n{}".format("\n".join(notes))
Expand Down

0 comments on commit 99a2db1

Please sign in to comment.