You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
______________________________ test_queue_in_task ______________________________
fut = <Task cancelled name='Task-98242' coro=<TCP.write() done, defined at /Users/runner/work/distributed/distributed/distributed/comm/tcp.py:264>>
timeout = 0
asyncdefwait_for(fut, timeout, *, loop=None):
"""Wait for the single Future or coroutine to complete, with timeout.
Coroutine will be wrapped in Task.
Returns result of the Future or coroutine. When a timeout occurs,
it cancels the task and raises TimeoutError. To avoid the task
cancellation, wrap it in shield().
If the wait is cancelled, the task is also cancelled.
This function is a coroutine.
"""
if loop isNone:
loop = events.get_running_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
if timeout isNone:
returnawait fut
if timeout <= 0:
fut = ensure_future(fut, loop=loop)
if fut.done():
return fut.result()
await _cancel_and_wait(fut, loop=loop)
try:
> fut.result()
E asyncio.exceptions.CancelledError
../../../miniconda3/envs/dask-distributed/lib/python3.8/asyncio/tasks.py:465: CancelledError
The above exception was the direct cause of the following exception:
addr = 'tcp://127.0.0.1:61650', timeout = 5, deserialize = True
handshake_overrides = None
connection_args = {'extra_conn_args': {}, 'require_encryption': False, 'ssl_context': None}
scheme = 'tcp', loc = '127.0.0.1:61650'
backend = <distributed.comm.tcp.TCPBackend object at 0x7fac198f1a00>
connector = <distributed.comm.tcp.TCPConnector object at 0x7fac09cd62e0>
comm = <TCP (closed) local=tcp://127.0.0.1:61672 remote=tcp://127.0.0.1:61650>
time_left = <function connect.<locals>.time_left at 0x7fac072290d0>
backoff_base = 0.01
asyncdefconnect(
addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args
):
"""
Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
and yield a ``Comm`` object. If the connection attempt fails, it is
retried until the *timeout* is expired.
"""
if timeout isNone:
timeout = dask.config.get("distributed.comm.timeouts.connect")
timeout = parse_timedelta(timeout, default="seconds")
scheme, loc = parse_address(addr)
backend = registry.get_backend(scheme)
connector = backend.get_connector()
comm = None
start = time()
deftime_left():
deadline = start + timeout
returnmax(0, deadline - time())
backoff_base = 0.01
attempt = 0
# Prefer multiple small attempts than one long attempt. This should protect
# primarily from DNS race conditions
# gh3104, gh4176, gh4167
intermediate_cap = timeout / 5
active_exception = None
while time_left() > 0:
try:
comm = await asyncio.wait_for(
connector.connect(loc, deserialize=deserialize, **connection_args),
timeout=min(intermediate_cap, time_left()),
)
break
except FatalCommClosedError:
raise
# Note: CommClosed inherits from OSError
except (asyncio.TimeoutError, OSError) as exc:
active_exception = exc
# As descibed above, the intermediate timeout is used to distributed
# initial, bulk connect attempts homogeneously. In particular with
# the jitter upon retries we should not be worred about overloading
# any more DNS servers
intermediate_cap = timeout
# FullJitter see https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
upper_cap = min(time_left(), backoff_base * (2**attempt))
backoff = random.uniform(0, upper_cap)
attempt += 1
logger.debug(
"Could not connect to %s, waiting for %s before retrying", loc, backoff
)
await asyncio.sleep(backoff)
else:
raiseOSError(
f"Timed out trying to connect to {addr} after {timeout} s"
) fromactive_exception
local_info = {
**comm.handshake_info(),
**(handshake_overrides or {}),
}
try:
# This would be better, but connections leak if worker is closed quickly
# write, handshake = await asyncio.gather(comm.write(local_info), comm.read())
handshake = await asyncio.wait_for(comm.read(), time_left())
> await asyncio.wait_for(comm.write(local_info), time_left())
distributed/comm/core.py:329:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
fut = <Task cancelled name='Task-98242' coro=<TCP.write() done, defined at /Users/runner/work/distributed/distributed/distributed/comm/tcp.py:264>>
timeout = 0
asyncdefwait_for(fut, timeout, *, loop=None):
"""Wait for the single Future or coroutine to complete, with timeout.
Coroutine will be wrapped in Task.
Returns result of the Future or coroutine. When a timeout occurs,
it cancels the task and raises TimeoutError. To avoid the task
cancellation, wrap it in shield().
If the wait is cancelled, the task is also cancelled.
This function is a coroutine.
"""
if loop isNone:
loop = events.get_running_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
if timeout isNone:
returnawait fut
if timeout <= 0:
fut = ensure_future(fut, loop=loop)
if fut.done():
return fut.result()
await _cancel_and_wait(fut, loop=loop)
try:
fut.result()
except exceptions.CancelledError as exc:
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.5665.903089' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.8627.879592' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.8201.026423' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Couldn't use data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.8702.652440': database disk image is malformed
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.6103.222769' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.8167.464470' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.5875.244662' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
> raise exceptions.TimeoutError() fromexc
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Couldn't use data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.6120.028201': database disk image is malformed
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.8708.741716' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.7571.317207' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.7927.739035' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.7257.085266' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Couldn't use data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.8301.448241': database disk image is malformed
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.7510.478115' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.5874.693733' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.8208.120834' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.7883.179834' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.7963.418348' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.8677.154143' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.8038.315420' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.8258.240734' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.7433.435893' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Couldn't use data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.8161.318413': database disk image is malformed
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.7968.771358' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.8018.649164' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.5830.572480' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.8587.104023' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1657605455732.local.6429.160444' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
E asyncio.exceptions.TimeoutError
../../../miniconda3/envs/dask-distributed/lib/python3.8/asyncio/tasks.py:467: TimeoutError
The above exception was the direct cause of the following exception:
loop = <tornado.platform.asyncio.AsyncIOLoop object at 0x7fac06f830d0>
deftest_queue_in_task(loop):
port = open_port()
# Ensure that we can create a Queue inside a task on a
# worker in a separate Python process than the client
with popen(
[
"dask-scheduler",
"--no-dashboard",
f"--port={port}",
]
):
with popen(["dask-worker", f"127.0.0.1:{port}"]):
> with Client(f"tcp://127.0.0.1:{port}", loop=loop) as c:
distributed/tests/test_queues.py:295:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/client.py:940: in __init__
self.start(timeout=timeout)
distributed/client.py:1098: in start
sync(self.loop, self._start, **kwargs)
distributed/utils.py:405: in sync
raise exc.with_traceback(tb)
distributed/utils.py:378: in f
result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.8/site-packages/tornado/gen.py:762: in run
value = future.result()
distributed/client.py:1178: in _start
awaitself._ensure_connected(timeout=timeout)
distributed/client.py:1241: in _ensure_connected
comm = await connect(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
addr = 'tcp://127.0.0.1:61650', timeout = 5, deserialize = True
handshake_overrides = None
connection_args = {'extra_conn_args': {}, 'require_encryption': False, 'ssl_context': None}
scheme = 'tcp', loc = '127.0.0.1:61650'
backend = <distributed.comm.tcp.TCPBackend object at 0x7fac198f1a00>
connector = <distributed.comm.tcp.TCPConnector object at 0x7fac09cd62e0>
comm = <TCP (closed) local=tcp://127.0.0.1:61672 remote=tcp://127.0.0.1:61650>
time_left = <function connect.<locals>.time_left at 0x7fac072290d0>
backoff_base = 0.01
asyncdefconnect(
addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args
):
"""
Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
and yield a ``Comm`` object. If the connection attempt fails, it is
retried until the *timeout* is expired.
"""
if timeout isNone:
timeout = dask.config.get("distributed.comm.timeouts.connect")
timeout = parse_timedelta(timeout, default="seconds")
scheme, loc = parse_address(addr)
backend = registry.get_backend(scheme)
connector = backend.get_connector()
comm = None
start = time()
deftime_left():
deadline = start + timeout
returnmax(0, deadline - time())
backoff_base = 0.01
attempt = 0
# Prefer multiple small attempts than one long attempt. This should protect
# primarily from DNS race conditions
# gh3104, gh4176, gh4167
intermediate_cap = timeout / 5
active_exception = None
while time_left() > 0:
try:
comm = await asyncio.wait_for(
connector.connect(loc, deserialize=deserialize, **connection_args),
timeout=min(intermediate_cap, time_left()),
)
break
except FatalCommClosedError:
raise
# Note: CommClosed inherits from OSError
except (asyncio.TimeoutError, OSError) as exc:
active_exception = exc
# As descibed above, the intermediate timeout is used to distributed
# initial, bulk connect attempts homogeneously. In particular with
# the jitter upon retries we should not be worred about overloading
# any more DNS servers
intermediate_cap = timeout
# FullJitter see https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
upper_cap = min(time_left(), backoff_base * (2**attempt))
backoff = random.uniform(0, upper_cap)
attempt += 1
logger.debug(
"Could not connect to %s, waiting for %s before retrying", loc, backoff
)
await asyncio.sleep(backoff)
else:
raiseOSError(
f"Timed out trying to connect to {addr} after {timeout} s"
) fromactive_exception
local_info = {
**comm.handshake_info(),
**(handshake_overrides or {}),
}
try:
# This would be better, but connections leak if worker is closed quickly
# write, handshake = await asyncio.gather(comm.write(local_info), comm.read())
handshake = await asyncio.wait_for(comm.read(), time_left())
await asyncio.wait_for(comm.write(local_info), time_left())
exceptExceptionas exc:
with suppress(Exception):
await comm.close()
> raiseOSError(
f"Timed out during handshake while connecting to {addr} after {timeout} s"
) fromexc
E OSError: Timed out during handshake while connecting to tcp://127.0.0.1:61650 after 5 s
distributed/comm/core.py:333: OSError
----------------------------- Captured stderr call -----------------------------
[2022](https://github.com/dask/distributed/runs/7295961305?check_suite_focus=true#step:11:2023)-07-12 06:38:00,933 - distributed.scheduler - INFO - -----------------------------------------------
2022-07-12 06:38:00,947 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2022-07-12 06:38:00,956 - distributed.scheduler - INFO - State start
2022-07-12 06:38:00,967 - distributed.scheduler - INFO - -----------------------------------------------
2022-07-12 06:38:00,968 - distributed.scheduler - INFO - Clear task state
2022-07-12 06:38:00,968 - distributed.scheduler - INFO - Scheduler at: tcp://10.213.5.250:61650
2022-07-12 06:38:00,968 - distributed.scheduler - INFO - dashboard at: :8787
2022-07-12 06:38:01,012 - distributed.nanny - INFO - Start Nanny at: 'tcp://127.0.0.1:61663'
2022-07-12 06:38:03,930 - distributed.worker - INFO - Start worker at: tcp://127.0.0.1:61669
2022-07-12 06:38:03,930 - distributed.worker - INFO - Listening to: tcp://127.0.0.1:61669
2022-07-12 06:38:03,930 - distributed.worker - INFO - dashboard at: 127.0.0.1:61670
2022-07-12 06:38:03,930 - distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:61650
2022-07-12 06:38:03,931 - distributed.worker - INFO - -------------------------------------------------
2022-07-12 06:38:03,931 - distributed.worker - INFO - Threads: 3
2022-07-12 06:38:03,931 - distributed.worker - INFO - Memory: 14.00 GiB
2022-07-12 06:38:03,931 - distributed.worker - INFO - Local Directory: /var/folders/24/8k48jl6d249_n_qfxwsl6xvm0000gn/T/dask-worker-space/worker-mwp91vu7
2022-07-12 06:38:03,931 - distributed.worker - INFO - -------------------------------------------------
2022-07-12 06:38:03,950 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:61669', status: init, memory: 0, processing: 0>
2022-07-12 06:38:04,605 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:61669
2022-07-12 06:38:04,605 - distributed.core - INFO - Starting established connection
2022-07-12 06:38:04,606 - distributed.worker - INFO - Registered to: tcp://127.0.0.1:61650
2022-07-12 06:38:04,606 - distributed.worker - INFO - -------------------------------------------------
2022-07-12 06:38:04,608 - distributed.core - INFO - Starting established connection
2022-07-12 06:38:04,610 - distributed.comm.tcp - INFO - Connection from tcp://127.0.0.1:61672 closed before handshake completed
2022-07-12 06:38:04,612 - distributed._signals - INFO - Received signal SIGINT (2)
2022-07-12 06:38:04,612 - distributed.nanny - INFO - Closing Nanny at 'tcp://127.0.0.1:61663'.
2022-07-12 06:38:04,613 - distributed.nanny - INFO - Nanny asking worker to close
2022-07-12 06:38:04,615 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:61669
2022-07-12 06:38:04,618 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-a940ed52-6789-4717-9630-4bf5106a7444 Address tcp://127.0.0.1:61669 Status: Status.closing
2022-07-12 06:38:04,619 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:61669', status: closing, memory: 0, processing: 0>
2022-07-12 06:38:04,619 - distributed.core - INFO - Removing comms to tcp://127.0.0.1:61669
2022-07-12 06:38:04,619 - distributed.scheduler - INFO - Lost all workers
2022-07-12 06:38:04,897 - distributed.dask_worker - INFO - End worker
2022-07-12 06:38:05,243 - distributed._signals - INFO - Received signal SIGINT (2)
2022-07-12 06:38:05,244 - distributed.scheduler - INFO - Scheduler closing...
2022-07-12 06:38:05,245 - distributed.scheduler - INFO - Scheduler closing all comms
2022-07-12 06:38:05,245 - distributed.scheduler - INFO - Stopped scheduler at 'tcp://10.213.5.250:61650'
2022-07-12 06:38:05,246 - distributed.scheduler - INFO - End scheduler
https://github.com/dask/distributed/runs/7295961305?check_suite_focus=true#step:11:2282
The text was updated successfully, but these errors were encountered: