Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core Feature/Bug] [flytekit] Make FlyteRemote work with nested executions #1482

Closed
wild-endeavor opened this issue Sep 15, 2021 · 2 comments
Assignees
Labels
bug Something isn't working enhancement New feature or request flytekit FlyteKit Python related issue

Comments

@wild-endeavor
Copy link
Contributor

wild-endeavor commented Sep 15, 2021

Motivation: Why do you think this is important?
FlyteRemote and its associated immutable objects (FlyteTask, FlyteWorkflow, etc) currently do not support the notion of dynamic tasks that run subwfs/lps.

Launch plan case

For example, running

from flytekit.configuration import set_flyte_config_file
from flytekit.remote import FlyteRemote
set_flyte_config_file("/Users/ytong/.flyte/local_sandbox")
remote = FlyteRemote.from_config(default_project="flytesnacks", default_domain="development")

e = remote.fetch_workflow_execution(name="yhprd5of0t")
e2 = remote.sync(e)

where yhprd5of0t is an execution of this toy example results in this stack trace

Traceback (most recent call last):
  File "c", line 7, in <module>
    e2 = remote.sync(e)
  File "/usr/local/Cellar/python@3.8/3.8.9/Frameworks/Python.framework/Versions/3.8/lib/python3.8/functools.py", line 912, in _method
    return method.__get__(obj, cls)(*args, **kwargs)
  File "/Users/ytong/envs/cookbook/lib/python3.8/site-packages/flytekit/remote/remote.py", line 990, in _
    synced_execution._node_executions = {
  File "/Users/ytong/envs/cookbook/lib/python3.8/site-packages/flytekit/remote/remote.py", line 991, in <dictcomp>
    node.id.node_id: self.sync(FlyteNodeExecution.promote_from_model(node), flyte_entity)
  File "/usr/local/Cellar/python@3.8/3.8.9/Frameworks/Python.framework/Versions/3.8/lib/python3.8/functools.py", line 912, in _method
    return method.__get__(obj, cls)(*args, **kwargs)
  File "/Users/ytong/envs/cookbook/lib/python3.8/site-packages/flytekit/remote/remote.py", line 1013, in _
    synced_execution._subworkflow_node_executions = [
  File "/Users/ytong/envs/cookbook/lib/python3.8/site-packages/flytekit/remote/remote.py", line 1014, in <listcomp>
    self.sync(FlyteNodeExecution.promote_from_model(node), entity_definition)
  File "/usr/local/Cellar/python@3.8/3.8.9/Frameworks/Python.framework/Versions/3.8/lib/python3.8/functools.py", line 912, in _method
    return method.__get__(obj, cls)(*args, **kwargs)
  File "/Users/ytong/envs/cookbook/lib/python3.8/site-packages/flytekit/remote/remote.py", line 1027, in _
    return self._assign_inputs_and_outputs(
  File "/Users/ytong/envs/cookbook/lib/python3.8/site-packages/flytekit/remote/remote.py", line 1071, in _assign_inputs_and_outputs
    python_types=TypeEngine.guess_python_types(interface.inputs),
AttributeError: 'NoneType' object has no attribute 'inputs'

Subworkflow case

The subworkflow execution used was this toy example.

Calls to inspect this run:

Using the command FLYTE_AWS_ENDPOINT=http://localhost:30084 FLYTE_AWS_ACCESS_KEY_ID=minio FLYTE_AWS_SECRET_ACCESS_KEY=miniostorage python c where c contains

from flytekit.configuration import set_flyte_config_file
from flytekit.remote import FlyteRemote
set_flyte_config_file("/Users/ytong/.flyte/local_sandbox")
remote = FlyteRemote.from_config(default_project="flytesnacks", default_domain="development")

e = remote.fetch_workflow_execution(name="ihd00cvlfm")
e2 = remote.sync(e)
print(e2)

the stack trace generated was

Traceback (most recent call last):
  File "c", line 7, in <module>
    e2 = remote.sync(e)
  File "/usr/local/Cellar/python@3.8/3.8.9/Frameworks/Python.framework/Versions/3.8/lib/python3.8/functools.py", line 912, in _method
    return method.__get__(obj, cls)(*args, **kwargs)
  File "/Users/ytong/envs/cookbook/lib/python3.8/site-packages/flytekit/remote/remote.py", line 990, in _
    synced_execution._node_executions = {
  File "/Users/ytong/envs/cookbook/lib/python3.8/site-packages/flytekit/remote/remote.py", line 991, in <dictcomp>
    node.id.node_id: self.sync(FlyteNodeExecution.promote_from_model(node), flyte_entity)
  File "/usr/local/Cellar/python@3.8/3.8.9/Frameworks/Python.framework/Versions/3.8/lib/python3.8/functools.py", line 912, in _method
    return method.__get__(obj, cls)(*args, **kwargs)
  File "/Users/ytong/envs/cookbook/lib/python3.8/site-packages/flytekit/remote/remote.py", line 1013, in _
    synced_execution._subworkflow_node_executions = [
  File "/Users/ytong/envs/cookbook/lib/python3.8/site-packages/flytekit/remote/remote.py", line 1014, in <listcomp>
    self.sync(FlyteNodeExecution.promote_from_model(node), entity_definition)
  File "/usr/local/Cellar/python@3.8/3.8.9/Frameworks/Python.framework/Versions/3.8/lib/python3.8/functools.py", line 912, in _method
    return method.__get__(obj, cls)(*args, **kwargs)
  File "/Users/ytong/envs/cookbook/lib/python3.8/site-packages/flytekit/remote/remote.py", line 1013, in _
    synced_execution._subworkflow_node_executions = [
  File "/Users/ytong/envs/cookbook/lib/python3.8/site-packages/flytekit/remote/remote.py", line 1014, in <listcomp>
    self.sync(FlyteNodeExecution.promote_from_model(node), entity_definition)
  File "/usr/local/Cellar/python@3.8/3.8.9/Frameworks/Python.framework/Versions/3.8/lib/python3.8/functools.py", line 912, in _method
    return method.__get__(obj, cls)(*args, **kwargs)
  File "/Users/ytong/envs/cookbook/lib/python3.8/site-packages/flytekit/remote/remote.py", line 1029, in _
    self.client.get_node_execution_data(execution.id),
  File "/Users/ytong/envs/cookbook/lib/python3.8/site-packages/flytekit/clients/friendly.py", line 687, in get_node_execution_data
    super(SynchronousFlyteClient, self).get_node_execution_data(
  File "/Users/ytong/envs/cookbook/lib/python3.8/site-packages/flytekit/clients/raw.py", line 131, in handler
    return fn(*args, **kwargs)
  File "/Users/ytong/envs/cookbook/lib/python3.8/site-packages/flytekit/clients/raw.py", line 601, in get_node_execution_data
    return self._stub.GetNodeExecutionData(get_node_execution_data_request, metadata=self._metadata)
  File "/Users/ytong/envs/cookbook/lib/python3.8/site-packages/grpc/_channel.py", line 923, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/Users/ytong/envs/cookbook/lib/python3.8/site-packages/grpc/_channel.py", line 826, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.INTERNAL
        details = "failed to get object size for s3://my-s3-bucket/metadata/propeller/flytesnacks-development-ihd00cvlfm/n0/data/0/dynamic-run-subwfs-n1/0/start-node/inputs.pb with NotFound: Not Found

This is coming from the node data call

The file is indeed missing:

AWS_ACCESS_KEY_ID=minio AWS_SECRET_ACCESS_KEY=miniostorage aws --endpoint http://localhost:30084 s3 ls s3://my-s3-bucket/metadata/propeller/flytesnacks-development-ihd00cvlfm/n0/data/0/dynamic-run-subwfs-n1/0/start-node/

however s3://my-s3-bucket/metadata/propeller/flytesnacks-development-ihd00cvlfm/n0/data/0/dynamic-run-subwfs-n1/0/start-node/0/outputs.pb is there, not sure if this is the file we should be looking for.

The same issue as the Launch plan case is also present. The top level nodes under n0 don't get an interface assigned because _get_node_execution_interface is trying to match an auto-generated node id to the original workflow definition.

@wild-endeavor wild-endeavor added enhancement New feature or request bug Something isn't working flytekit FlyteKit Python related issue labels Sep 15, 2021
@wild-endeavor
Copy link
Contributor Author

Looking further, the issue seems to be around getting a node execution's interface which is done here

In the former case (the toy launch-plan-running example),

@wild-endeavor
Copy link
Contributor Author

The data loading issue with the subworkflow is likely that the start node of the subworkflow is somehow mis-named. Looking at http://localhost:30081/api/v1/node_executions/flytesnacks/development/ihd00cvlfm?limit=100&uniqueParentId=fo5klsgq for instance shows three node executions, one of which has

  "id": {
    "node_id": "f4r6jcaq",
    "execution_id": {
      "project": "flytesnacks",
      "domain": "development",
      "name": "ihd00cvlfm"
    }
  }

but has

"metadata": {
  "retry_group": "0",
  "spec_node_id": "start-node"
}

which means the node should be skipped but the logic that does the skipping doesn't match the node id.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working enhancement New feature or request flytekit FlyteKit Python related issue
Projects
None yet
Development

No branches or pull requests

2 participants