Skip to content

Commit

Permalink
Merge branch 'main' into transition_counter_max
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed May 11, 2022
2 parents e4f8d9c + 9f02e7a commit 35a5568
Show file tree
Hide file tree
Showing 10 changed files with 639 additions and 675 deletions.
2 changes: 1 addition & 1 deletion distributed/batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def _background_send(self):
self.stopped.set()
self.abort()

def send(self, *msgs: dict) -> None:
def send(self, *msgs) -> None:
"""Schedule a message for sending to the other side
This completes quickly and synchronously
Expand Down
8 changes: 1 addition & 7 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ async def handle_comm(self, comm):
"Failed while closing connection to %r: %s", address, e
)

async def handle_stream(self, comm, extra=None, every_cycle=()):
async def handle_stream(self, comm, extra=None):
extra = extra or {}
logger.info("Starting established connection")

Expand Down Expand Up @@ -653,12 +653,6 @@ async def handle_stream(self, comm, extra=None, every_cycle=()):
logger.error("odd message %s", msg)
await asyncio.sleep(0)

for func in every_cycle:
if is_coroutine_function(func):
self.loop.add_callback(func)
else:
func()

except OSError:
pass
except Exception as e:
Expand Down
6 changes: 5 additions & 1 deletion distributed/diagnostics/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ async def close(self) -> None:
"""

def update_graph(
self, scheduler: Scheduler, keys: set[str], restrictions: dict, **kwargs
self,
scheduler: Scheduler,
keys: set[str],
restrictions: dict[str, float],
**kwargs,
) -> None:
"""Run when a new graph / tasks enter the scheduler"""

Expand Down
Loading

0 comments on commit 35a5568

Please sign in to comment.