Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changed skip_if_completed_pre_run->task_completion_check_at_run #420

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions gokart/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def __init__(
check_unfulfilled_deps: bool = True,
check_complete_on_run: bool = False,
task_completion_cache: Optional[Dict[str, Any]] = None,
skip_if_completed_pre_run: bool = True,
task_completion_check_at_run: bool = True,
) -> None:
super(TaskProcess, self).__init__()
self.task = task
Expand All @@ -139,13 +139,13 @@ def __init__(
self.check_unfulfilled_deps = check_unfulfilled_deps
self.check_complete_on_run = check_complete_on_run
self.task_completion_cache = task_completion_cache
self.skip_if_completed_pre_run = skip_if_completed_pre_run
self.task_completion_check_at_run = task_completion_check_at_run

# completeness check using the cache
self.check_complete = functools.partial(luigi.worker.check_complete_cached, completion_cache=task_completion_cache)

def _run_task(self) -> Optional[collections.abc.Generator]:
if self.skip_if_completed_pre_run and self.check_complete(self.task):
if self.task_completion_check_at_run and self.check_complete(self.task):
logger.warning(f'{self.task} is skipped because the task is already completed.')
return None
return self.task.run()
Expand Down Expand Up @@ -375,8 +375,8 @@ class gokart_worker(luigi.Config):
'dynamic dependencies but assumes that the completion status does not change '
'after it was true the first time.',
)
skip_if_completed_pre_run: bool = ExplicitBoolParameter(
default=True, description='If true, skip running tasks that are already completed just before the Task is run.'
task_completion_check_at_run: bool = ExplicitBoolParameter(
default=True, description='If true, tasks completeness will be re-checked just before the run, in case they are finished elsewhere.'
)


Expand Down Expand Up @@ -915,7 +915,7 @@ def _create_task_process(self, task):
check_unfulfilled_deps=self._config.check_unfulfilled_deps,
check_complete_on_run=self._config.check_complete_on_run,
task_completion_cache=self._task_completion_cache,
skip_if_completed_pre_run=self._config.skip_if_completed_pre_run,
task_completion_check_at_run=self._config.task_completion_check_at_run,
)

def _purge_children(self) -> None:
Expand Down
14 changes: 7 additions & 7 deletions test/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ def complete(self) -> bool:

class TestWorkerSkipIfCompletedPreRun:
@pytest.mark.parametrize(
'skip_if_completed_pre_run,is_completed,expect_skipped',
'task_completion_check_at_run,is_completed,expect_skipped',
[
pytest.param(True, True, True, id='skipped when completed and skip_if_completed_pre_run is True'),
pytest.param(True, False, False, id='not skipped when not completed and skip_if_completed_pre_run is True'),
pytest.param(False, True, False, id='not skipped when completed and skip_if_completed_pre_run is False'),
pytest.param(False, False, False, id='not skipped when not completed and skip_if_completed_pre_run is False'),
pytest.param(True, True, True, id='skipped when completed and task_completion_check_at_run is True'),
pytest.param(True, False, False, id='not skipped when not completed and task_completion_check_at_run is True'),
pytest.param(False, True, False, id='not skipped when completed and task_completion_check_at_run is False'),
pytest.param(False, False, False, id='not skipped when not completed and task_completion_check_at_run is False'),
],
)
def test_skip_task(self, monkeypatch: pytest.MonkeyPatch, skip_if_completed_pre_run: bool, is_completed: bool, expect_skipped: bool):
def test_skip_task(self, monkeypatch: pytest.MonkeyPatch, task_completion_check_at_run: bool, is_completed: bool, expect_skipped: bool):
sch = scheduler.Scheduler()
worker = Worker(scheduler=sch, config=gokart_worker(skip_if_completed_pre_run=skip_if_completed_pre_run))
worker = Worker(scheduler=sch, config=gokart_worker(task_completion_check_at_run=task_completion_check_at_run))

mock_complete = Mock(return_value=is_completed)
# NOTE: set `complete_check_at_run=False` to avoid using deprecated skip logic.
Expand Down
Loading