Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix attribute forwarding for tasks with dynamic dependencies #2478

Merged
merged 4 commits into from
Aug 8, 2018

Conversation

riga
Copy link
Contributor

@riga riga commented Jul 30, 2018

Description

This PR fixes the forwarding of attributes (mostly callbacks such as set_status_message) from TaskProcesses to running Tasks that yield dynamic dependencies within their run() method.

Motivation and Context

Currently, the attributes are forwarded and reset like this:

luigi/luigi/worker.py

Lines 137 to 146 in 7d2c557

def _run_get_new_deps(self):
# forward some attributes before running
for reporter_attr, task_attr in six.iteritems(self.forward_reporter_attributes):
setattr(self.task, task_attr, getattr(self.status_reporter, reporter_attr))
task_gen = self.task.run()
# reset attributes again
for reporter_attr, task_attr in six.iteritems(self.forward_reporter_attributes):
setattr(self.task, task_attr, None)

However, when the run method returns a generator, attributes are reset again before the yielded dependencies are handled (right below the referenced code). Actually, this should happen right before _run_get_new_deps returns. I added a context-managed method that handles the attributes so I could place the (only) call to _run_get_new_deps in that context.

Have you tested this?

I added a new test case that checks if attributes are forwarded for tasks that don't yield dependencies and for those who do. The TaskForwardedAttributesTest.test_yielding_task test would actually fail in the current implementation, but passes with the proposed change.

def test_non_yielding_task(self):
task = self.run_task(NonYieldingTask())

self.assertEqual(task.attributes_while_running, forwarded_attributes)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am i correct that this test passes before AND after the code changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's correct.

task = self.run_task(YieldingTask())

self.assertEqual(task.attributes_before_yield, forwarded_attributes)
self.assertEqual(task.attributes_after_yield, forwarded_attributes)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this one would fail before this code change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly.

Copy link
Collaborator

@dlstadther dlstadther left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes sense to me.

Could we get at least one more person to sign off on this?


self.attributes_after_yield = self.gather_forwarded_attributes()

RunOnceTask.run(self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite unintuitive, maybe add a comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✔︎

import luigi.worker


forwarded_attributes = set(luigi.worker.TaskProcess.forward_reporter_attributes.values())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this constant in capital letters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✔︎

luigi/worker.py Outdated

try:
yield self

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove the white-space here and the love above I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✔︎

@riga
Copy link
Contributor Author

riga commented Aug 8, 2018

Anything else I can do here? =)

@dlstadther
Copy link
Collaborator

Nah, LGTM. All reviewer comments have been addressed.

@dlstadther dlstadther merged commit de4629a into spotify:master Aug 8, 2018
dlstadther added a commit to dlstadther/luigi that referenced this pull request Aug 14, 2018
* upstream-master: (82 commits)
  S3 client refactor (spotify#2482)
  Rename to rpc_log_retries, and make it apply to all the logging involved
  Factor log_exceptions into a configuration parameter
  Fix attribute forwarding for tasks with dynamic dependencies (spotify#2478)
  Add a visiblity level for luigi.Parameters (spotify#2278)
  Add support for multiple requires and inherits arguments (spotify#2475)
  Add metadata columns to the RDBMS contrib (spotify#2440)
  Fix race condition in luigi.lock.acquire_for (spotify#2357) (spotify#2477)
  tests: Use RunOnceTask where possible (spotify#2476)
  Optional TOML configs support (spotify#2457)
  Added default port behaviour for Redshift (spotify#2474)
  Add codeowners file with default and specific example (spotify#2465)
  Add Data Revenue to the `blogged` list (spotify#2472)
  Fix Scheduler.add_task to overwrite accepts_messages attribute. (spotify#2469)
  Use task_id comparison in Task.__eq__. (spotify#2462)
  Add stale config
  Move github templates to .github dir
  Fix transfer config import (spotify#2458)
  Additions to provide support for the Load Sharing Facility (LSF) job scheduler (spotify#2373)
  Version 2.7.6
  ...
dlstadther added a commit to dlstadther/luigi that referenced this pull request Aug 14, 2018
* upstream-master:
  S3 client refactor (spotify#2482)
  Rename to rpc_log_retries, and make it apply to all the logging involved
  Factor log_exceptions into a configuration parameter
  Fix attribute forwarding for tasks with dynamic dependencies (spotify#2478)
  Add a visiblity level for luigi.Parameters (spotify#2278)
  Add support for multiple requires and inherits arguments (spotify#2475)
  Add metadata columns to the RDBMS contrib (spotify#2440)
  Fix race condition in luigi.lock.acquire_for (spotify#2357) (spotify#2477)
  tests: Use RunOnceTask where possible (spotify#2476)
  Optional TOML configs support (spotify#2457)
  Added default port behaviour for Redshift (spotify#2474)
  Add codeowners file with default and specific example (spotify#2465)
  Add Data Revenue to the `blogged` list (spotify#2472)
dlstadther added a commit to dlstadther/luigi that referenced this pull request Aug 16, 2018
* upstream-master:
  Remove long-deprecated scheduler config variable alternatives (spotify#2491)
  Bump tornado milestone version (spotify#2490)
  Update moto to 1.x milestone version (spotify#2471)
  Use passed password when create a redis connection (spotify#2489)
  S3 client refactor (spotify#2482)
  Rename to rpc_log_retries, and make it apply to all the logging involved
  Factor log_exceptions into a configuration parameter
  Fix attribute forwarding for tasks with dynamic dependencies (spotify#2478)
  Add a visiblity level for luigi.Parameters (spotify#2278)
  Add support for multiple requires and inherits arguments (spotify#2475)
  Add metadata columns to the RDBMS contrib (spotify#2440)
  Fix race condition in luigi.lock.acquire_for (spotify#2357) (spotify#2477)
  tests: Use RunOnceTask where possible (spotify#2476)
  Optional TOML configs support (spotify#2457)
  Added default port behaviour for Redshift (spotify#2474)
  Add codeowners file with default and specific example (spotify#2465)
  Add Data Revenue to the `blogged` list (spotify#2472)
  Fix Scheduler.add_task to overwrite accepts_messages attribute. (spotify#2469)
  Use task_id comparison in Task.__eq__. (spotify#2462)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants