Skip to content

Commit

Permalink
More dead code
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Aug 3, 2023
1 parent 4ded2d7 commit b67a624
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 45 deletions.
18 changes: 0 additions & 18 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from dask.utils import parse_timedelta

from distributed import profile, protocol
from distributed.collections import LRU
from distributed.comm import (
Comm,
CommClosedError,
Expand All @@ -40,7 +39,6 @@
from distributed.counter import Counter
from distributed.diskutils import WorkDir, WorkSpace
from distributed.metrics import context_meter, time
from distributed.protocol import pickle
from distributed.system_monitor import SystemMonitor
from distributed.utils import (
NoOpAwaitable,
Expand All @@ -64,21 +62,6 @@
Coro = Coroutine[Any, Any, T]


cache_loads: LRU[bytes, Callable[..., Any]] = LRU(maxsize=100)


def loads_function(bytes_object):
"""Load a function from bytes, cache bytes"""
if len(bytes_object) < 100000:
try:
result = cache_loads[bytes_object]
except KeyError:
result = pickle.loads(bytes_object)
cache_loads[bytes_object] = result
return result
return pickle.loads(bytes_object)


class Status(Enum):
"""
This Enum contains the various states a cluster, worker, scheduler and nanny can be
Expand Down Expand Up @@ -519,7 +502,6 @@ def func(data):
if load:
try:
import_file(out_filename)
cache_loads.data.clear()
except Exception as e:
logger.exception(e)
raise e
Expand Down
25 changes: 0 additions & 25 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
coerce_to_address,
context_meter_to_server_digest,
error_message,
loads_function,
pingpong,
)
from distributed.core import rpc as RPCType
Expand Down Expand Up @@ -123,7 +122,6 @@
WorkerMemoryManager,
)
from distributed.worker_state_machine import (
NO_VALUE,
AcquireReplicasEvent,
BaseWorker,
CancelComputeEvent,
Expand Down Expand Up @@ -2902,29 +2900,6 @@ async def get_data_from_worker(
rpc.reuse(worker, comm)


job_counter = [0]


@context_meter.meter("deserialize")
def _deserialize(function=None, args=None, kwargs=None, task=NO_VALUE):
"""Deserialize task inputs and regularize to func, args, kwargs"""
# Some objects require threadlocal state during deserialization, e.g. to
# detect the current worker
if function is not None:
function = loads_function(function)
if args and isinstance(args, bytes):
args = pickle.loads(args)
if kwargs and isinstance(kwargs, bytes):
kwargs = pickle.loads(kwargs)

if task is not NO_VALUE:
assert not function and not args and not kwargs
function = execute_task
args = (task,)

return function, args or (), kwargs or {}


def execute_task(task):
"""Evaluate a nested task
Expand Down
2 changes: 0 additions & 2 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@
"waiting",
}

NO_VALUE = "--no-value-sentinel--"

RUN_ID_SENTINEL = -1

T_runspec = tuple[Callable, tuple, dict[str, Any]]
Expand Down

0 comments on commit b67a624

Please sign in to comment.