You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
2021-02-08T21:33:58.6228723Z ______________________________ test_idle_timeout ______________________________
2021-02-08T21:33:58.6229193Z
2021-02-08T21:33:58.6229763Z def test_func():
2021-02-08T21:33:58.6230524Z result = None
2021-02-08T21:33:58.6231092Z workers = []
2021-02-08T21:33:58.6231819Z with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:
2021-02-08T21:33:58.6232442Z
2021-02-08T21:33:58.6232952Z async def coro():
2021-02-08T21:33:58.6233671Z with dask.config.set(config):
2021-02-08T21:33:58.6278267Z s = False
2021-02-08T21:33:58.6278895Z for i in range(5):
2021-02-08T21:33:58.6279521Z try:
2021-02-08T21:33:58.6280066Z s, ws = await start_cluster(
2021-02-08T21:33:58.6280690Z nthreads,
2021-02-08T21:33:58.6281251Z scheduler,
2021-02-08T21:33:58.6281838Z loop,
2021-02-08T21:33:58.6282375Z security=security,
2021-02-08T21:33:58.6283003Z Worker=Worker,
2021-02-08T21:33:58.6283786Z scheduler_kwargs=scheduler_kwargs,
2021-02-08T21:33:58.6284524Z worker_kwargs=worker_kwargs,
2021-02-08T21:33:58.6285057Z )
2021-02-08T21:33:58.6285649Z except Exception as e:
2021-02-08T21:33:58.6286225Z logger.error(
2021-02-08T21:33:58.6287545Z "Failed to start gen_cluster, retrying",
2021-02-08T21:33:58.6288199Z exc_info=True,
2021-02-08T21:33:58.6288731Z )
2021-02-08T21:33:58.6289384Z await asyncio.sleep(1)
2021-02-08T21:33:58.6289897Z else:
2021-02-08T21:33:58.6290325Z workers[:] = ws
2021-02-08T21:33:58.6290808Z args = [s] + workers
2021-02-08T21:33:58.6291322Z break
2021-02-08T21:33:58.6291862Z if s is False:
2021-02-08T21:33:58.6292484Z raise Exception("Could not start cluster")
2021-02-08T21:33:58.6293207Z if client:
2021-02-08T21:33:58.6294100Z c = await Client(
2021-02-08T21:33:58.6294588Z s.address,
2021-02-08T21:33:58.6295005Z loop=loop,
2021-02-08T21:33:58.6295466Z security=security,
2021-02-08T21:33:58.6296157Z asynchronous=True,
2021-02-08T21:33:58.6296725Z **client_kwargs,
2021-02-08T21:33:58.6297253Z )
2021-02-08T21:33:58.6297723Z args = [c] + args
2021-02-08T21:33:58.6298668Z try:
2021-02-08T21:33:58.6299164Z future = func(*args)
2021-02-08T21:33:58.6299742Z if timeout:
2021-02-08T21:33:58.6300376Z future = asyncio.wait_for(future, timeout)
2021-02-08T21:33:58.6301080Z result = await future
2021-02-08T21:33:58.6301626Z if s.validate:
2021-02-08T21:33:58.6302228Z s.validate_state()
2021-02-08T21:33:58.6302746Z finally:
2021-02-08T21:33:58.6304747Z if client and c.status not in ("closing", "closed"):
2021-02-08T21:33:58.6305455Z await c._close(fast=s.status == Status.closed)
2021-02-08T21:33:58.6306113Z await end_cluster(s, workers)
2021-02-08T21:33:58.6306746Z await asyncio.wait_for(cleanup_global_workers(), 1)
2021-02-08T21:33:58.6307296Z
2021-02-08T21:33:58.6307881Z try:
2021-02-08T21:33:58.6308395Z c = await default_client()
2021-02-08T21:33:58.6308901Z except ValueError:
2021-02-08T21:33:58.6309389Z pass
2021-02-08T21:33:58.6309765Z else:
2021-02-08T21:33:58.6310206Z await c._close(fast=True)
2021-02-08T21:33:58.6310645Z
2021-02-08T21:33:58.6311027Z def get_unclosed():
2021-02-08T21:33:58.6311645Z return [c for c in Comm._instances if not c.closed()] + [
2021-02-08T21:33:58.6312153Z c
2021-02-08T21:33:58.6312682Z for c in _global_clients.values()
2021-02-08T21:33:58.6313330Z if c.status != "closed"
2021-02-08T21:33:58.6313748Z ]
2021-02-08T21:33:58.6314115Z
2021-02-08T21:33:58.6314442Z try:
2021-02-08T21:33:58.6314874Z start = time()
2021-02-08T21:33:58.6315321Z while time() < start + 5:
2021-02-08T21:33:58.6315817Z gc.collect()
2021-02-08T21:33:58.6316280Z if not get_unclosed():
2021-02-08T21:33:58.6316711Z break
2021-02-08T21:33:58.6317222Z await asyncio.sleep(0.05)
2021-02-08T21:33:58.6317681Z else:
2021-02-08T21:33:58.6318153Z if allow_unclosed:
2021-02-08T21:33:58.6318710Z print(f"Unclosed Comms: {get_unclosed()}")
2021-02-08T21:33:58.6319263Z else:
2021-02-08T21:33:58.6320929Z raise RuntimeError("Unclosed Comms", get_unclosed())
2021-02-08T21:33:58.6321600Z finally:
2021-02-08T21:33:58.6322085Z Comm._instances.clear()
2021-02-08T21:33:58.6322643Z _global_clients.clear()
2021-02-08T21:33:58.6323224Z
2021-02-08T21:33:58.6323609Z return result
2021-02-08T21:33:58.6324021Z
2021-02-08T21:33:58.6324414Z result = loop.run_sync(
2021-02-08T21:33:58.6324995Z > coro, timeout=timeout * 2 if timeout else timeout
2021-02-08T21:33:58.6325529Z )
2021-02-08T21:33:58.6325869Z
2021-02-08T21:33:58.6326289Z distributed\utils_test.py:954:
2021-02-08T21:33:58.6326814Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
2021-02-08T21:33:58.6327064Z
2021-02-08T21:33:58.6328098Z self = <tornado.platform.asyncio.AsyncIOLoop object at 0x00000194C8190608>
2021-02-08T21:33:58.6329351Z func = <function gen_cluster.<locals>._.<locals>.test_func.<locals>.coro at 0x00000194C8214B88>
2021-02-08T21:33:58.6330159Z timeout = 20
2021-02-08T21:33:58.6330549Z
2021-02-08T21:33:58.6331158Z def run_sync(self, func, timeout=None):
2021-02-08T21:33:58.6332007Z """Starts the `IOLoop`, runs the given function, and stops the loop.
2021-02-08T21:33:58.6332846Z
2021-02-08T21:33:59.5394622Z The function must return either an awaitable object or
2021-02-08T21:33:59.5405824Z ``None``. If the function returns an awaitable object, the
2021-02-08T21:33:59.5406838Z `IOLoop` will run until the awaitable is resolved (and
2021-02-08T21:33:59.5407735Z `run_sync()` will return the awaitable's result). If it raises
2021-02-08T21:33:59.5408810Z an exception, the `IOLoop` will stop and the exception will be
2021-02-08T21:33:59.5409684Z re-raised to the caller.
2021-02-08T21:33:59.5410221Z
2021-02-08T21:33:59.5411125Z The keyword-only argument ``timeout`` may be used to set
2021-02-08T21:33:59.5412010Z a maximum duration for the function. If the timeout expires,
2021-02-08T21:33:59.5412974Z a `tornado.util.TimeoutError` is raised.
2021-02-08T21:33:59.5413662Z
2021-02-08T21:33:59.5415602Z This method is useful to allow asynchronous calls in a
2021-02-08T21:33:59.5416534Z ``main()`` function::
2021-02-08T21:33:59.5417109Z
2021-02-08T21:33:59.5417614Z async def main():
2021-02-08T21:33:59.5418302Z # do stuff...
2021-02-08T21:33:59.5419219Z
2021-02-08T21:33:59.5419872Z if __name__ == '__main__':
2021-02-08T21:33:59.5420571Z IOLoop.current().run_sync(main)
2021-02-08T21:33:59.5421220Z
2021-02-08T21:33:59.5421799Z .. versionchanged:: 4.3
2021-02-08T21:33:59.5422558Z Returning a non-``None``, non-awaitable value is now an error.
2021-02-08T21:33:59.5423258Z
2021-02-08T21:33:59.5423768Z .. versionchanged:: 5.0
2021-02-08T21:33:59.5425166Z If a timeout occurs, the ``func`` coroutine will be cancelled.
2021-02-08T21:33:59.5425804Z
2021-02-08T21:33:59.5426425Z """
2021-02-08T21:33:59.5426891Z future_cell = [None]
2021-02-08T21:33:59.5427574Z
2021-02-08T21:33:59.5428039Z def run():
2021-02-08T21:33:59.5429861Z try:
2021-02-08T21:33:59.5430281Z result = func()
2021-02-08T21:33:59.5430773Z if result is not None:
2021-02-08T21:33:59.5431342Z from tornado.gen import convert_yielded
2021-02-08T21:33:59.5432007Z result = convert_yielded(result)
2021-02-08T21:33:59.5432535Z except Exception:
2021-02-08T21:33:59.5433081Z future_cell[0] = Future()
2021-02-08T21:33:59.5433650Z future_set_exc_info(future_cell[0], sys.exc_info())
2021-02-08T21:33:59.5434154Z else:
2021-02-08T21:33:59.5434645Z if is_future(result):
2021-02-08T21:33:59.5435118Z future_cell[0] = result
2021-02-08T21:33:59.5435587Z else:
2021-02-08T21:33:59.5436016Z future_cell[0] = Future()
2021-02-08T21:33:59.5436586Z future_cell[0].set_result(result)
2021-02-08T21:33:59.5437256Z self.add_future(future_cell[0], lambda future: self.stop())
2021-02-08T21:33:59.5437906Z self.add_callback(run)
2021-02-08T21:33:59.5438727Z if timeout is not None:
2021-02-08T21:33:59.5439882Z def timeout_callback():
2021-02-08T21:33:59.5440548Z # If we can cancel the future, do so and wait on it. If not,
2021-02-08T21:33:59.5441252Z # Just stop the loop and return with the task still pending.
2021-02-08T21:33:59.5441993Z # (If we neither cancel nor wait for the task, a warning
2021-02-08T21:33:59.5442541Z # will be logged).
2021-02-08T21:33:59.5443054Z if not future_cell[0].cancel():
2021-02-08T21:33:59.5443514Z self.stop()
2021-02-08T21:33:59.5444226Z timeout_handle = self.add_timeout(self.time() + timeout, timeout_callback)
2021-02-08T21:33:59.5444872Z self.start()
2021-02-08T21:33:59.5445305Z if timeout is not None:
2021-02-08T21:33:59.5446133Z self.remove_timeout(timeout_handle)
2021-02-08T21:33:59.5446789Z if future_cell[0].cancelled() or not future_cell[0].done():
2021-02-08T21:33:59.5447598Z > raise TimeoutError('Operation timed out after %s seconds' % timeout)
2021-02-08T21:33:59.5448521Z E tornado.util.TimeoutError: Operation timed out after 20 seconds
2021-02-08T21:33:59.5449180Z
2021-02-08T21:33:59.5449928Z C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\ioloop.py:575: TimeoutError
The text was updated successfully, but these errors were encountered:
Personally have seen less flaky test failures with the change in PR ( #4526 ). Not sure if I've just been lucky with runs on that PR (though have seen failures on other PRs without that change). So don't want to say that has actually fixed things. Though did want to mention it in the event things do continue to work reliably going forward
https://github.com/dask/distributed/pull/4490/checks?check_run_id=1858104683
Details:
The text was updated successfully, but these errors were encountered: