Skip to content

Commit

Permalink
clean up includefile a bit, remove from_env (not used)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackie-ob committed Aug 2, 2022
1 parent 349982f commit e0dbcce
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 105 deletions.
93 changes: 2 additions & 91 deletions metaflow/includefile.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@
import io
import json
import os
import shutil
import uuid

from hashlib import sha1
from tempfile import mkdtemp

from metaflow._vendor import click

from . import parameters
from .current import current
from .exception import MetaflowException, MetaflowInternalError
from .exception import MetaflowException
from .metaflow_config import DATATOOLS_LOCALROOT, DATATOOLS_SUFFIX
from .parameters import DeployTimeField, Parameter
from .plugins.azure.includefile_support import Azure
from .util import to_unicode

try:
Expand Down Expand Up @@ -83,93 +81,6 @@ def path(self):
return self._path


class Azure(object):
@classmethod
def get_root_from_config(cls, echo, create_on_absent=True):
from metaflow.metaflow_config import DATATOOLS_AZUREROOT

return DATATOOLS_AZUREROOT

def __init__(self):
# This local directory is used to house any downloaded blobs, for lifetime of
# this object as a context manager.
self._tmpdir = None

def _get_storage_backend(self, key):
"""
Return an AzureDatastore, rooted at the container level, no prefix.
Key MUST be a fully qualified path. e.g. <container_name>/b/l/o/b/n/a/m/e
"""
from .plugins.azure.azure_utils import parse_azure_full_path

# we parse out the container name only, and use that to root our storage implementation
container_name, _ = parse_azure_full_path(key)
# Import DATASTORES dynamically... otherwise, circular import
from .datastore import DATASTORES

storage_impl = DATASTORES["azure"]
return storage_impl(container_name)

def __enter__(self):
return self

def __exit__(self, *args):
if self._tmpdir and os.path.exists(self._tmpdir):
shutil.rmtree(self._tmpdir)

def get(self, key=None, return_missing=False):
"""Key MUST be a fully qualified path. <container_name>/b/l/o/b/n/a/m/e"""
if not return_missing:
raise MetaflowException("Azure object supports only return_missing=True")
# We fabricate a uri scheme to fit into existing includefile code (just like local://)
if not key.startswith("azure://"):
raise MetaflowInternalError(
msg="Expected Azure object key to start with 'azure://'"
)
uri_style_key = key
short_key = key[8:]
storage = self._get_storage_backend(short_key)
azure_object = None
with storage.load_bytes([short_key]) as load_result:
for _, tmpfile, _ in load_result:
if tmpfile is None:
azure_object = AzureObject(uri_style_key, None, False)
else:
if not self._tmpdir:
self._tmpdir = mkdtemp(prefix="metaflow.includefile.azure.")
output_file_path = os.path.join(self._tmpdir, str(uuid.uuid4()))
shutil.move(tmpfile, output_file_path)
azure_object = AzureObject(uri_style_key, output_file_path, True)
break
return azure_object

def put(self, key, obj, overwrite=True):
"""Key MUST be a fully qualified path. <container_name>/b/l/o/b/n/a/m/e"""
storage = self._get_storage_backend(key)
storage.save_bytes([(key, io.BytesIO(obj))], overwrite=overwrite)
# We fabricate a uri scheme to fit into existing includefile code (just like local://)
return "azure://%s" % key


class AzureObject(object):
def __init__(self, url, path, exists):
self._path = path
self._url = url
self._exists = exists

@property
def path(self):
return self._path

@property
def url(self):
return self._url

@property
def exists(self):
return self._exists


class Local(object):
"""
This class allows you to access the local filesystem in a way similar to the S3 datatools
Expand Down
17 changes: 3 additions & 14 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,10 @@ def init_config():
METAFLOW_CONFIG = init_config()


def from_conf(name, default=None, from_env_only=False, validate_fn=None):
def from_conf(name, default=None, validate_fn=None):
"""
First try to pull value from environment, then from metaflow config JSON.
If from_env_only is True, we never use the value in metaflow config JSON.
We will raise an Error on seeing the value set in metaflow config JSON.
Prior to a value being returned, we will validate using validate_fn (if provided).
Only non-None values are validated.
Expand All @@ -51,18 +48,10 @@ def from_conf(name, default=None, from_env_only=False, validate_fn=None):
"""
value_from_env = os.environ.get(name, None)
value_from_config = METAFLOW_CONFIG.get(name, default)
if from_env_only:
if value_from_config is not None:
raise MetaflowException(
"%s may only be set from environment variable, NOT from metaflow JSON configs."
% name
)
if value_from_env is not None:
value = value_from_env
else:
if value_from_env is not None:
value = value_from_env
else:
value = value_from_config
value = value_from_config
if validate_fn and value is not None:
validate_fn(name, value)
return value
Expand Down
94 changes: 94 additions & 0 deletions metaflow/plugins/azure/includefile_support.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import io
import os
import shutil
import uuid
from tempfile import mkdtemp

from metaflow.exception import MetaflowException, MetaflowInternalError


class Azure(object):
@classmethod
def get_root_from_config(cls, echo, create_on_absent=True):
from metaflow.metaflow_config import DATATOOLS_AZUREROOT

return DATATOOLS_AZUREROOT

def __init__(self):
# This local directory is used to house any downloaded blobs, for lifetime of
# this object as a context manager.
self._tmpdir = None

def _get_storage_backend(self, key):
"""
Return an AzureDatastore, rooted at the container level, no prefix.
Key MUST be a fully qualified path. e.g. <container_name>/b/l/o/b/n/a/m/e
"""
from metaflow.plugins.azure.azure_utils import parse_azure_full_path

# we parse out the container name only, and use that to root our storage implementation
container_name, _ = parse_azure_full_path(key)
# Import DATASTORES dynamically... otherwise, circular import
from metaflow.datastore import DATASTORES

storage_impl = DATASTORES["azure"]
return storage_impl(container_name)

def __enter__(self):
return self

def __exit__(self, *args):
if self._tmpdir and os.path.exists(self._tmpdir):
shutil.rmtree(self._tmpdir)

def get(self, key=None, return_missing=False):
"""Key MUST be a fully qualified path. <container_name>/b/l/o/b/n/a/m/e"""
if not return_missing:
raise MetaflowException("Azure object supports only return_missing=True")
# We fabricate a uri scheme to fit into existing includefile code (just like local://)
if not key.startswith("azure://"):
raise MetaflowInternalError(
msg="Expected Azure object key to start with 'azure://'"
)
uri_style_key = key
short_key = key[8:]
storage = self._get_storage_backend(short_key)
azure_object = None
with storage.load_bytes([short_key]) as load_result:
for _, tmpfile, _ in load_result:
if tmpfile is None:
azure_object = AzureObject(uri_style_key, None, False)
else:
if not self._tmpdir:
self._tmpdir = mkdtemp(prefix="metaflow.includefile.azure.")
output_file_path = os.path.join(self._tmpdir, str(uuid.uuid4()))
shutil.move(tmpfile, output_file_path)
azure_object = AzureObject(uri_style_key, output_file_path, True)
break
return azure_object

def put(self, key, obj, overwrite=True):
"""Key MUST be a fully qualified path. <container_name>/b/l/o/b/n/a/m/e"""
storage = self._get_storage_backend(key)
storage.save_bytes([(key, io.BytesIO(obj))], overwrite=overwrite)
# We fabricate a uri scheme to fit into existing includefile code (just like local://)
return "azure://%s" % key


class AzureObject(object):
def __init__(self, url, path, exists):
self._path = path
self._url = url
self._exists = exists

@property
def path(self):
return self._path

@property
def url(self):
return self._url

@property
def exists(self):
return self._exists

0 comments on commit e0dbcce

Please sign in to comment.