Skip to content

Commit

Permalink
IncludeFile now returns the included file in the client (#607)
Browse files Browse the repository at this point in the history
* DRAFT: IncludeFile now returns the included file in the client and CLI

THIS IS NOT FINISHED; DO NOT MERGE AS IS.

* Fix the tests

* Forgot to update type check for multiple encoding
  • Loading branch information
romain-intel authored Jul 29, 2021
1 parent aa7c73b commit 0575f8d
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 41 deletions.
3 changes: 3 additions & 0 deletions metaflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
MetaflowNamespaceMismatch,\
MetaflowInternalError

from metaflow.includefile import IncludedFile
from metaflow.metaflow_config import DEFAULT_METADATA
from metaflow.plugins import ENVIRONMENTS, METADATA_PROVIDERS
from metaflow.unbounded_foreach import CONTROL_TASK_TAG
Expand Down Expand Up @@ -711,6 +712,8 @@ def data(self):
sha = self._object['sha']
with filecache.get_data(ds_type, self.path_components[0], sha) as f:
obj = pickle.load(f)
if isinstance(obj, IncludedFile):
return obj.decode(self.id)
return obj

# TODO add
Expand Down
47 changes: 36 additions & 11 deletions metaflow/includefile.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,27 @@ def put(self, key, obj, overwrite=True):
DATACLIENTS = {'local': Local,
's3': S3}

class IncludedFile(object):
# Thin wrapper to indicate to the MF client that this object is special
# and should be handled as an IncludedFile when returning it (ie: fetching
# the actual content)

def __init__(self, descriptor):
self._descriptor = json.dumps(descriptor)

@property
def descriptor(self):
return self._descriptor

def decode(self, name, var_type='Artifact'):
ok, file_type, err = LocalFile.is_file_handled(self._descriptor)
if not ok:
raise MetaflowException("%s '%s' could not be loaded: %s" % (var_type, name, err))
if file_type is None or isinstance(file_type, LocalFile):
raise MetaflowException("%s '%s' was not properly converted" % (var_type, name))
return file_type.load(self._descriptor)


class LocalFile():
def __init__(self, is_text, encoding, path):
self._is_text = is_text
Expand All @@ -173,7 +194,16 @@ def __init__(self, is_text, encoding, path):

@classmethod
def is_file_handled(cls, path):
# This returns a tuple:
# - True/False indicating whether the file is handled
# - None if we need to create a handler for the file, a LocalFile if
# we already know what to do with the file or a Uploader if the file
# is already present remotely (either starting with s3:// or local://)
# - An error message if file is not handled
if path:
if isinstance(path, IncludedFile):
path = path.descriptor

decoded_value = Uploader.decode_value(to_unicode(path))
if decoded_value['type'] == 'self':
return True, LocalFile(
Expand Down Expand Up @@ -295,15 +325,10 @@ def __init__(
name, required=required, help=help,
type=FilePathClass(is_text, encoding), **kwargs)

def load_parameter(self, val):
if val is None:
return val
ok, file_type, err = LocalFile.is_file_handled(val)
if not ok:
raise MetaflowException("Parameter '%s' could not be loaded: %s" % (self.name, err))
if file_type is None or isinstance(file_type, LocalFile):
raise MetaflowException("Parameter '%s' was not properly converted" % self.name)
return file_type.load(val)
def load_parameter(self, v):
if v is None:
return v
return v.decode(self.name, var_type='Parameter')


class Uploader():
Expand All @@ -316,11 +341,11 @@ def __init__(self, client_class):
@staticmethod
def encode_url(url_type, url, **kwargs):
# Avoid encoding twice (default -> URL -> _convert method of FilePath for example)
if url is None or len(url) == 0 or url[0] == '{':
if isinstance(url, IncludedFile):
return url
return_value = {'type': url_type, 'url': url}
return_value.update(kwargs)
return json.dumps(return_value)
return IncludedFile(return_value)

@staticmethod
def decode_value(value):
Expand Down
5 changes: 4 additions & 1 deletion test/core/metaflow_test/cli_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
from tempfile import NamedTemporaryFile

from metaflow.includefile import IncludedFile
from metaflow.util import is_stringish

from . import MetaflowCheck, AssertArtifactFailed, AssertLogFailed, truncate
Expand Down Expand Up @@ -39,6 +40,8 @@ def assert_artifact(self, step, name, value, fields=None):
for field, v in fields.items():
if is_stringish(artifact):
data = json.loads(artifact)
elif isinstance(artifact, IncludedFile):
data = json.loads(artifact.descriptor)
else:
data = artifact
if not isinstance(data, dict):
Expand Down Expand Up @@ -92,4 +95,4 @@ def get_log(self, step, logtype):
'logs',
'--%s' % logtype,
'%s/%s' % (self.run_id, step)]
return self.run_cli(cmd, capture_output=True).decode('utf-8')
return self.run_cli(cmd, capture_output=True).decode('utf-8')
1 change: 1 addition & 0 deletions test/core/metaflow_test/metadata_check.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json

from metaflow.util import is_stringish

from . import MetaflowCheck, AssertArtifactFailed, AssertLogFailed, assert_equals, assert_exception, truncate
Expand Down
74 changes: 45 additions & 29 deletions test/core/tests/basic_include.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,33 +41,49 @@ def step_all(self):
pass

def check_results(self, flow, checker):
for step in flow:
checker.assert_artifact(
step.name,
'myfile_txt',
None,
fields={'type': 'uploader-v1',
'is_text': True,
'encoding': None})
checker.assert_artifact(
step.name,
'myfile_utf8',
None,
fields={'type': 'uploader-v1',
'is_text': True,
'encoding': 'utf8'})
checker.assert_artifact(
step.name,
'myfile_binary',
None,
fields={'type': 'uploader-v1',
'is_text': False,
'encoding': None})
checker.assert_artifact(
step.name,
'myfile_overriden',
None,
fields={'type': 'uploader-v1',
'is_text': True,
'encoding': None})
run = checker.get_run()
if run is None:
# CliChecker does not return a run object; we check to make sure
# the returned value is the blob describing the artifact
# (this may be improved in the future)
for step in flow:
checker.assert_artifact(
step.name,
'myfile_txt',
None,
fields={'type': 'uploader-v1',
'is_text': True,
'encoding': None})
checker.assert_artifact(
step.name,
'myfile_utf8',
None,
fields={'type': 'uploader-v1',
'is_text': True,
'encoding': 'utf8'})
checker.assert_artifact(
step.name,
'myfile_binary',
None,
fields={'type': 'uploader-v1',
'is_text': False,
'encoding': None})
checker.assert_artifact(
step.name,
'myfile_overriden',
None,
fields={'type': 'uploader-v1',
'is_text': True,
'encoding': None})
else:
# In the case of the client, we check the value.
for step in flow:
checker.assert_artifact(step.name, 'myfile_txt',
"Regular Text File")
checker.assert_artifact(step.name, 'myfile_utf8',
u"UTF Text File \u5e74")
checker.assert_artifact(step.name, 'myfile_binary',
u"UTF Text File \u5e74".encode(encoding='utf8'))
checker.assert_artifact(step.name, 'myfile_overriden',
"Override Text File")

0 comments on commit 0575f8d

Please sign in to comment.