-
Notifications
You must be signed in to change notification settings - Fork 276
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Switching from using model Metadata -> TaskMetadata #298
Merged
Merged
Changes from 2 commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,7 @@ | ||
import collections | ||
import datetime | ||
from abc import abstractmethod | ||
from dataclasses import dataclass | ||
from typing import Any, Dict, Generic, Optional, Tuple, Type, TypeVar, Union | ||
|
||
from flytekit.annotated.context_manager import ( | ||
|
@@ -33,6 +35,60 @@ def kwtypes(**kwargs) -> Dict[str, Type]: | |
return d | ||
|
||
|
||
@dataclass | ||
class TaskMetadata(object): | ||
""" | ||
Create Metadata to be associated with this Task | ||
|
||
Args: | ||
cache: Boolean that indicates if caching should be enabled | ||
cache_version: Version string to be used for the cached value | ||
interruptable: Boolean that indicates that this task can be interrupted and/or scheduled on nodes | ||
with lower QoS guarantees. This will directly reduce the `$`/`execution cost` associated, | ||
at the cost of performance penalties due to potential interruptions | ||
deprecated: A string that can be used to provide a warning message for deprecated task. Absence / empty str | ||
indicates that the task is active and not deprecated | ||
retries: for retries=n; n > 0, on failures of this task, the task will be retried at-least n number of times. | ||
timeout: the max amount of time for which one execution of this task should be executed for. If the execution | ||
will be terminated if the runtime exceeds the given timeout (approximately) | ||
""" | ||
|
||
cache: bool = False | ||
cache_version: str = "" | ||
interruptable: bool = False | ||
deprecated: str = "" | ||
retries: int = 0 | ||
timeout: Optional[Union[datetime.timedelta, int]] = None | ||
|
||
def __post_init__(self): | ||
if self.timeout: | ||
if isinstance(self.timeout, int): | ||
self.timeout = datetime.timedelta(seconds=self.timeout) | ||
elif not isinstance(self.timeout, datetime.timedelta): | ||
raise ValueError("timeout should be duration represented as either a datetime.timedelta or int seconds") | ||
if self.cache and not self.cache_version: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what if cache version is set but cache isn't? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this happens a lot of times today (at lyft) |
||
raise ValueError("Caching is enabled ``cache=True`` but ``cache_version`` is not set.") | ||
|
||
@property | ||
def retry_strategy(self) -> _literal_models.RetryStrategy: | ||
return _literal_models.RetryStrategy(self.retries) | ||
|
||
def to_taskmetadata_model(self) -> _task_model.TaskMetadata: | ||
""" | ||
Converts to _task_model.TaskMetadata | ||
""" | ||
return _task_model.TaskMetadata( | ||
discoverable=self.cache, | ||
# TODO Fix the version circular dependency before beta | ||
runtime=_task_model.RuntimeMetadata(_task_model.RuntimeMetadata.RuntimeType.FLYTE_SDK, "0.16.0", "python"), | ||
timeout=self.timeout, | ||
retries=self.retry_strategy, | ||
interruptible=self.interruptable, | ||
discovery_version=self.cache_version, | ||
deprecated_error_message=self.deprecated, | ||
) | ||
|
||
|
||
# This is the least abstract task. It will have access to the loaded Python function | ||
# itself if run locally, so it will always be a Python task. | ||
# This is analogous to the current SdkRunnableTask. Need to analyze the benefits of duplicating the class versus | ||
|
@@ -48,15 +104,15 @@ def __init__( | |
self, | ||
task_type: str, | ||
name: str, | ||
interface: _interface_models.TypedInterface, | ||
metadata: _task_model.TaskMetadata, | ||
interface: Optional[_interface_models.TypedInterface] = None, | ||
metadata: Optional[TaskMetadata] = None, | ||
*args, | ||
**kwargs, | ||
): | ||
self._task_type = task_type | ||
self._name = name | ||
self._interface = interface | ||
self._metadata = metadata | ||
self._metadata = metadata if metadata else TaskMetadata() | ||
|
||
# This will get populated only at registration time, when we retrieve the rest of the environment variables like | ||
# project/domain/version/image and anything else we might need from the environment in the future. | ||
|
@@ -69,7 +125,7 @@ def interface(self) -> _interface_models.TypedInterface: | |
return self._interface | ||
|
||
@property | ||
def metadata(self) -> _task_model.TaskMetadata: | ||
def metadata(self) -> TaskMetadata: | ||
return self._metadata | ||
|
||
@property | ||
|
@@ -175,7 +231,7 @@ def get_task_structure(self) -> SdkTask: | |
settings = FlyteContext.current_context().registration_settings | ||
tk = SdkTask( | ||
type=self.task_type, | ||
metadata=self.metadata, | ||
metadata=self.metadata.to_taskmetadata_model(), | ||
interface=self.interface, | ||
custom=self.get_custom(settings), | ||
container=self.get_container(settings), | ||
|
@@ -233,9 +289,9 @@ def __init__( | |
self, | ||
task_type: str, | ||
name: str, | ||
interface: Interface, | ||
metadata: _task_model.TaskMetadata, | ||
task_config: T, | ||
interface: Optional[Interface] = None, | ||
metadata: Optional[TaskMetadata] = None, | ||
*args, | ||
**kwargs, | ||
): | ||
|
@@ -268,7 +324,7 @@ def compile(self, ctx: FlyteContext, *args, **kwargs): | |
entity=self, | ||
interface=self.python_interface, | ||
timeout=self.metadata.timeout, | ||
retry_strategy=self.metadata.retries, | ||
retry_strategy=self.metadata.retry_strategy, | ||
**kwargs, | ||
) | ||
|
||
|
@@ -289,7 +345,9 @@ def dispatch_execute( | |
|
||
# Create another execution context with the new user params, but let's keep the same working dir | ||
with ctx.new_execution_context( | ||
mode=ctx.execution_state.mode, execution_params=new_user_params, working_dir=ctx.execution_state.working_dir | ||
mode=ctx.execution_state.mode, | ||
execution_params=new_user_params, | ||
working_dir=ctx.execution_state.working_dir, | ||
) as exec_ctx: | ||
# TODO We could support default values here too - but not part of the plan right now | ||
# Translate the input literals to Python native | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you also add a check here if cache is specified but cache_version isn't (and vice-versa)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea