Skip to content

Commit

Permalink
[FlyteClient][FlyteDeck] Get Downloaded Artifact Signed URL via Data …
Browse files Browse the repository at this point in the history
…Proxy (flyteorg#2777)
  • Loading branch information
Future-Outlier authored and otarabai committed Oct 15, 2024
1 parent 8382853 commit 49582ed
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 1 deletion.
41 changes: 41 additions & 0 deletions flytekit/clients/friendly.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
from flyteidl.admin import task_pb2 as _task_pb2
from flyteidl.admin import workflow_attributes_pb2 as _workflow_attributes_pb2
from flyteidl.admin import workflow_pb2 as _workflow_pb2
from flyteidl.core import identifier_pb2 as _identifier_pb2
from flyteidl.service import dataproxy_pb2 as _data_proxy_pb2
from flyteidl.service.dataproxy_pb2 import ARTIFACT_TYPE_DECK
from google.protobuf.duration_pb2 import Duration

from flytekit.clients.raw import RawSynchronousFlyteClient as _RawSynchronousFlyteClient
Expand Down Expand Up @@ -1046,3 +1048,42 @@ def get_data(self, flyte_uri: str) -> _data_proxy_pb2.GetDataResponse:

resp = self._dataproxy_stub.GetData(req, metadata=self._metadata)
return resp

def get_download_artifact_signed_url(
self,
node_id: str,
project: str,
domain: str,
name: str,
artifact_type: _data_proxy_pb2.ArtifactType = ARTIFACT_TYPE_DECK,
expires_in: datetime.timedelta = None,
) -> _data_proxy_pb2.CreateDownloadLinkResponse:
"""
Get a signed url for an artifact.
:param node_id: Node id associated with artifact
:param project: Name of the project the resource belongs to
:param domain: Name of the domain the resource belongs to
:param name: User or system provided value for the resource
:param artifact_type: ArtifactType of the artifact requested
:param expires_in: If provided this defines a requested expiration duration for the generated url
:rtype: flyteidl.service.dataproxy_pb2.CreateDownloadLinkResponse
"""
expires_in_pb = None
if expires_in:
expires_in_pb = Duration()
expires_in_pb.FromTimedelta(expires_in)
return super(SynchronousFlyteClient, self).create_download_link(
_data_proxy_pb2.CreateDownloadLinkRequest(
artifact_type=artifact_type,
node_execution_id=_identifier_pb2.NodeExecutionIdentifier(
node_id=node_id,
execution_id=_identifier_pb2.WorkflowExecutionIdentifier(
project=project,
domain=domain,
name=name,
),
),
expires_in=expires_in_pb,
)
)
23 changes: 23 additions & 0 deletions tests/flytekit/integration/remote/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
from flytekit.exceptions.user import FlyteAssertion, FlyteEntityNotExistException
from flytekit.extras.sqlite3.task import SQLite3Config, SQLite3Task
from flytekit.remote.remote import FlyteRemote
from flyteidl.service import dataproxy_pb2 as _data_proxy_pb2
from flytekit.types.schema import FlyteSchema
from flytekit.clients.friendly import SynchronousFlyteClient as _SynchronousFlyteClient
from flytekit.configuration import PlatformConfig

MODULE_PATH = pathlib.Path(__file__).parent / "workflows/basic"
CONFIG = os.environ.get("FLYTECTL_CONFIG", str(pathlib.Path.home() / ".flyte" / "config-sandbox.yaml"))
Expand Down Expand Up @@ -99,6 +102,26 @@ def test_fetch_execute_launch_plan(register):
assert execution.outputs["o0"] == "hello world"


def test_get_download_artifact_signed_url(register):
remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN)
flyte_launch_plan = remote.fetch_launch_plan(name="basic.basic_workflow.my_wf", version=VERSION)
execution = remote.execute(flyte_launch_plan, inputs={"a": 10, "b": "foobar"}, wait=True)
project, domain, name = execution.id.project, execution.id.domain, execution.id.name

# Fetch the download deck signed URL for the execution
client = _SynchronousFlyteClient(PlatformConfig.for_endpoint("localhost:30080", True))
download_link_response = client.get_download_artifact_signed_url(
node_id="n0", # Assuming node_id is "n0"
project=project,
domain=domain,
name=name,
artifact_type=_data_proxy_pb2.ARTIFACT_TYPE_DECK,
)

# Check if the signed URL is valid and starts with the expected prefix
signed_url = download_link_response.signed_url[0]
assert signed_url.startswith(f"http://localhost:30002/my-s3-bucket/metadata/propeller/{project}-{domain}-{name}/n0/data/0/deck.html")

def test_fetch_execute_launch_plan_with_args(register):
remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN)
flyte_launch_plan = remote.fetch_launch_plan(name="basic.basic_workflow.my_wf", version=VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from flytekit import task, workflow


@task
@task(enable_deck=True)
def t1(a: int) -> typing.NamedTuple("OutputsBC", t1_int_output=int, c=str):
return a + 2, "world"

Expand Down

0 comments on commit 49582ed

Please sign in to comment.