Skip to content

Commit

Permalink
refactor: Move GRACE usage to worker
Browse files Browse the repository at this point in the history
  • Loading branch information
eddiebergman committed Dec 9, 2024
1 parent b5a01f5 commit 65b2bf9
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 11 deletions.
10 changes: 7 additions & 3 deletions neps/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
from typing import (
TYPE_CHECKING,
Any,
ClassVar,
Generic,
Literal,
TypeVar,
)

from neps.env import (
FS_SYNC_GRACE_BASE,
FS_SYNC_GRACE_INC,
LINUX_FILELOCK_FUNCTION,
MAX_RETRIES_CREATE_LOAD_STATE,
Expand All @@ -33,7 +35,6 @@
WorkerRaiseError,
)
from neps.state._eval import evaluate_trial
from neps.state.filebased import FileLocker
from neps.state.neps_state import NePSState
from neps.state.optimizer import BudgetInfo, OptimizationState, OptimizerInfo
from neps.state.settings import DefaultReportValues, OnErrorPossibilities, WorkerSettings
Expand Down Expand Up @@ -156,6 +157,8 @@ class DefaultWorker(Generic[Loc]):
worker_cumulative_evaluation_time_seconds: float = 0.0
"""The time spent evaluating configurations by this worker."""

_GRACE: ClassVar = FS_SYNC_GRACE_BASE

@classmethod
def new(
cls,
Expand Down Expand Up @@ -353,6 +356,7 @@ def _get_next_trial(self) -> Trial | Literal["break"]:
# If there are no global stopping criterion, we can no just return early.
with self.state._optimizer_lock.lock():
with self.state._trial_lock.lock():
time.sleep(self._GRACE) # Give the lock some time to
trials = self.state._trials.latest()

if self._requires_global_stopping_criterion:
Expand Down Expand Up @@ -418,7 +422,7 @@ def _get_next_trial(self) -> Trial | Literal["break"]:
sampled_trial.id,
)
else:
_grace = FileLocker._GRACE
_grace = DefaultWorker._GRACE
_inc = FS_SYNC_GRACE_INC
logger.warning(
"The new sampled trial was given an id of '%s', which is not"
Expand All @@ -434,7 +438,7 @@ def _get_next_trial(self) -> Trial | Literal["break"]:
_grace,
_grace + _inc,
)
FileLocker._increse_grace(_inc)
DefaultWorker._GRACE = _grace + FS_SYNC_GRACE_INC
raise e

# Forgive me lord, for I have sinned, this function is atrocious but complicated
Expand Down
9 changes: 1 addition & 8 deletions neps/state/filebased.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import logging
import os
import pprint
import time
from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import asdict, dataclass
Expand All @@ -14,7 +13,7 @@
import numpy as np
import portalocker as pl

from neps.env import ENV_VARS_USED, FS_SYNC_GRACE_BASE
from neps.env import ENV_VARS_USED
from neps.state.err_dump import ErrDump
from neps.state.optimizer import BudgetInfo, OptimizationState, OptimizerInfo
from neps.state.seed_snapshot import SeedSnapshot
Expand Down Expand Up @@ -308,15 +307,10 @@ class FileLocker:
lock_path: Path
poll: float
timeout: float | None
_GRACE: ClassVar = FS_SYNC_GRACE_BASE

def __post_init__(self) -> None:
self.lock_path = self.lock_path.resolve().absolute()

@classmethod
def _increse_grace(cls, grace: float) -> None:
cls._GRACE = grace + cls._GRACE

@contextmanager
def lock(
self,
Expand All @@ -334,7 +328,6 @@ def lock(
flags=FILELOCK_EXCLUSIVE_NONE_BLOCKING,
fail_when_locked=fail_if_locked,
) as fh:
time.sleep(self._GRACE) # Give the lock some time to
yield
fh.flush()
os.fsync(fh)
Expand Down

0 comments on commit 65b2bf9

Please sign in to comment.