-
Notifications
You must be signed in to change notification settings - Fork 276
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cleanup extensibility #202
Conversation
Codecov Report
@@ Coverage Diff @@
## annotations #202 +/- ##
==============================================
Coverage ? 80.50%
==============================================
Files ? 236
Lines ? 15590
Branches ? 1343
==============================================
Hits ? 12550
Misses ? 2715
Partials ? 325
Continue to review full report at Codecov.
|
# logger.debug(f"Var name {output_name} wf output name {outputs[i]} type: {output_literal_type}") | ||
binding_data = _literal_models.BindingData(promise=out) | ||
bindings.append(_literal_models.Binding(var=output_name, binding=binding_data)) | ||
if len(output_names) > 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change? Do we not trust workflow_outputs
?
from flytekit.annotated.promise import Promise, create_task_output | ||
from flytekit.common import nodes as _nodes, interface as _common_interface | ||
from flytekit.common.exceptions import user as _user_exceptions | ||
from flytekit.common.promise import NodeOutput as _NodeOutput | ||
from flytekit.models import task as _task_model, literals as _literal_models | ||
from flytekit.models.core import workflow as _workflow_model, identifier as _identifier_model | ||
import inspect | ||
from flytekit.annotated.interface import transform_signature_to_typed_interface | ||
|
||
|
||
# This is the least abstract task. It will have access to the loaded Python function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment correct? Do you mean, "most abstract task"?
@@ -155,53 +184,82 @@ def name(self) -> str: | |||
|
|||
class PythonFunctionTask(Task): | |||
|
|||
def __init__(self, task_function: Callable, metadata: _task_model.TaskMetadata, *args, **kwargs): | |||
interface = transform_signature_to_typed_interface(inspect.signature(task_function)) | |||
def __init__(self, task_function: Callable, metadata: _task_model.TaskMetadata, ignore_input_vars: List[str] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is ignore_input_vars for? Can we document the use case?
|
||
from flytekit import logger | ||
from flytekit.annotated import type_engine | ||
from flytekit.common import interface as _common_interface | ||
from flytekit.models import interface as _interface_models | ||
|
||
|
||
def transform_signature_to_typed_interface(signature: inspect.Signature) -> _common_interface.TypedInterface: | ||
class Interface(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we talk about this next week?
class PysparkFunctionTask(PythonFunctionTask): | ||
def __init__(self, task_function: Callable, metadata: _task_model.TaskMetadata, *args, **kwargs): | ||
super(PysparkFunctionTask, self).__init__(task_function, metadata, | ||
ignore_input_vars=["spark_session", "spark_context"], *args, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like I'd rather add more stuff to PysparkFunctionTask
and subclass Task
directly, instead of adding functionality at the base layer to remove/add inputs. Or do you foresee other tasks needing this?
Should spark_session and spark_context be valid inputs for data catalog?
dt = t1() | ||
sql(ds=dt) | ||
|
||
my_wf() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you want to test for something here?
if vars is None: | ||
return self | ||
new_inputs = copy.copy(self._inputs) | ||
for v in vars: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if none of vars are in inputs? should we log or error?
|
||
from flytekit import logger | ||
from flytekit.annotated import type_engine | ||
from flytekit.common import interface as _common_interface | ||
from flytekit.models import interface as _interface_models | ||
|
||
|
||
def transform_signature_to_typed_interface(signature: inspect.Signature) -> _common_interface.TypedInterface: | ||
class Interface(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a general purpose interface? can we name it something more specific?
else: | ||
# Question: How do you know you're going to enumerate them in the correct order? Even if autonamed, will | ||
# output2 come before output100 if there's a hundred outputs? We don't! We'll have to circle back to | ||
# the Python task instance and inspect annotations again. Or we change the Python model representation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do the annotations reference the correct/original order?
enumerate(native_outputs)} | ||
|
||
# We manually construct a LiteralMap here because task inputs and outputs actually violate the assumption | ||
# built into the IDL that all the values of a literal map are of the same type. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we change idl then?
k: flytekit_engine.python_value_to_idl_literal(ctx, v, self.interface.outputs[k].type) for k, v in | ||
native_outputs_as_map.items() | ||
}) | ||
print("Outputs!") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rm?
Implement 2 extensions
Generic SQL task base and extended Spark Task