Skip to content

Commit

Permalink
Review fixes for S3Datastore
Browse files Browse the repository at this point in the history
Hardcoded Butler.get from S3 storage for simplest of Datasets with no
- rebased to include newest changes to daf_butler (namely, ButlerURI)
- removed parsePathToUriElements, S3Location and S3LocationFactory,
  and replaced all path handling with ButlerURI and its test cases.
- added transaction checks back into S3Datastore. Unsure on proper
  usage.
- added more path manipulation methods/properties to LocationFactory
  and Location.
  * bucketName returns the name of the bucket in the Location/
    LocationFactory
  * Location has a pathInBucket property that will convert posix
    style paths to S3 protocol style paths. The main difference is
    with respect to leading and trailing separators. S3 interprets
    `/path/`, `/path`, `path/` and `path` keys differently, even
    though some of them are equivalent on a POSIX compliant system.
    So what `/path/to/file.ext` would be on a POSIX system, on S3
    it would read `path/to/file.ext` and the bucket is referenced
    separately with boto3.
- For saving Config as file, moved all the URI handling logic into
  Config and out of Butler makeRepo. The only logic there is root
  directory creation.
  * The call to save config as a file at the butler root directory
    is now done through dumpToUri which then resolves the
    appropriate backend method to call.
- Improved(?) on the proposed scheme for checking if we are
  dealing with a file or a directory in absence of the trailing
  path separator.
  * Noted some differences between the requested generality of code
    in the review for writing to files and inits of config classes.
    For inits it seems as if there's an 2+ year old commit limiting
    the Config files to `yaml` type files only. However, the review
    implied that `dumpToFile` on Config class should be file format
    independent. Then, for `dumpTo*` methods, to check whether we
    have a dir or a file I only inspect whether the path ends on a
    'something.something' style. However, since I can count on
    files having to be `yaml` type and having `yaml` extensions in
    inits I use a simplified logic to determine if we have a dir or
    file. It is possible to generalize inits to other filetypes, as
    long as they have an extension added to them.
  * I assume we do not want to force users to be mindfull of
    trailing separators.
- Now raising errors on unrecognized schemes on all `if scheme ==`
  patterns.
- Closed the StreamIO in Config.dumpToFile
- fixed up the Formatters to return bytes instead of strings. The
  fromBytes methods now expect bytes as well. JSON and YAML were
  main culprints. Fixed the docs for them. At this point I am
  confident I just overwrite the fixes when rewinding changes on
  rebase by accident because I have done that before, twice.
- Added a different way to check if files exist, cheaper but can
  be slower. From boto/botocore#1248 it
  is my understanding that this should not be an issue anymore.
  But the newer boto3 versions are slow to hit package managers.
  • Loading branch information
DinoBektesevic authored and EC2 Default User committed Jul 25, 2019
1 parent 4177d9c commit a71c219
Show file tree
Hide file tree
Showing 16 changed files with 448 additions and 441 deletions.
26 changes: 13 additions & 13 deletions python/lsst/daf/butler/butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

from lsst.utils import doImport
from .core.utils import transactional
from .core.s3utils import parsePathToUriElements
from .core.datasets import DatasetRef, DatasetType
from .core.datastore import Datastore
from .core.registry import Registry
Expand All @@ -50,6 +49,7 @@
from .core.exceptions import ValidationError
from .core.repoRelocation import BUTLER_ROOT_TAG
from .core.safeFileIo import safeMakeDir
from . core.location import ButlerURI

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -180,16 +180,18 @@ def makeRepo(root, config=None, standalone=False, createRegistry=True, searchPat
if isinstance(config, (ButlerConfig, ConfigSubset)):
raise ValueError("makeRepo must be passed a regular Config without defaults applied.")

scheme, rootpath, relpath = parsePathToUriElements(root)
if scheme == 'file://':
uri = ButlerURI(root)
if uri.scheme == 'file':
root = os.path.abspath(root)
if not os.path.isdir(root):
os.makedirs(root)
elif scheme == 's3://':
safeMakeDir(root)
elif uri.scheme == 's3':
s3 = boto3.resource('s3')
# implies bucket exists, if not another level of checks
bucket = s3.Bucket(rootpath)
bucket.put_object(Bucket=rootpath, Key=(relpath))
bucket = s3.Bucket(uri.netloc)
bucket.put_object(Bucket=uri.netloc, Key=(uri.path.lstrip('/')))
else:
raise ValueError(f'Unrecognized scheme: {uri.scheme}')
config = Config(config)

# If we are creating a new repo from scratch with relative roots,
Expand All @@ -202,19 +204,17 @@ def makeRepo(root, config=None, standalone=False, createRegistry=True, searchPat
datastoreClass.setConfigRoot(BUTLER_ROOT_TAG, config, full, overwrite=forceConfigRoot)
registryClass = doImport(full["registry", "cls"])
registryClass.setConfigRoot(BUTLER_ROOT_TAG, config, full, overwrite=forceConfigRoot)

if standalone:
config.merge(full)

if scheme == 'file://':
config.dumpToFile(os.path.join(root, "butler.yaml"))
elif scheme == 's3://':
config.dumpToS3(rootpath, os.path.join(relpath, 'butler.yaml'))
config.dumpToUri(uri)

# Create Registry and populate tables
registryClass.fromConfig(config, create=createRegistry, butlerRoot=root)
return config

def __init__(self, config=None, collection=None, run=None):
def __init__(self, config=None, butler=None, collection=None, run=None, searchPaths=None):
# save arguments for pickling
self._args = (config, butler, collection, run, searchPaths)
if butler is not None:
if config is not None or searchPaths is not None:
Expand Down
18 changes: 12 additions & 6 deletions python/lsst/daf/butler/core/butlerConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import os.path

from .s3utils import parsePathToUriElements
from .location import ButlerURI
from .config import Config
from .datastore import DatastoreConfig
from .schema import SchemaConfig
Expand Down Expand Up @@ -77,12 +77,18 @@ def __init__(self, other=None, searchPaths=None):
return

if isinstance(other, str):
if other.startswith('s3://') and not other.endswith('.yaml'):
scheme, root, relpath = parsePathToUriElements(other)
other = scheme + os.path.join(root, relpath, "butler.yaml")
else:
if os.path.isdir(other):
uri = ButlerURI(other)
if uri.scheme == 'file':
if os.path.isdir(uri.path):
other = os.path.join(other, "butler.yaml")
elif uri.scheme == 's3':
if not uri.path.endswith('yaml'):
if not uri.path.endswith('/'):
uri = ButlerURI(uri.geturl()+'/')
uri.updateFile('butler.yaml')
other = uri.geturl()
else:
raise ValueError(f'Unrecognized URI scheme: {uri.scheme}')

# Create an empty config for us to populate
super().__init__()
Expand Down
49 changes: 42 additions & 7 deletions python/lsst/daf/butler/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import yaml
import sys
import io
import re
from yaml.representer import Representer

from .s3utils import parsePathToUriElements
from lsst.daf.butler.core.location import ButlerURI

try:
import boto3
Expand Down Expand Up @@ -244,10 +244,10 @@ def __initFromS3File(self, path):
raise ModuleNotFoundError(('boto3 not found.'
'Are you sure it is installed?'))

scheme, bucket, key = parsePathToUriElements(path)
uri = ButlerURI(path)
s3 = boto3.client('s3')
try:
response = s3.get_object(Bucket=bucket, Key=key)
response = s3.get_object(Bucket=uri.netloc, Key=uri.path.lstrip('/'))
except (s3.exceptions.NoSuchKey, s3.exceptions.NoSuchBucket) as err:
raise FileNotFoundError(f'No such file or directory: {path}') from err
byteStr = response['Body'].read()
Expand Down Expand Up @@ -773,16 +773,51 @@ def dumpToS3(self, bucket, key):
key : `str`
Path to the file to use for output, relative to the bucket.
"""
if boto3 is None:
raise ModuleNotFoundError(("Could not find boto3. "
"Are you sure it is installed?"))
stream = io.StringIO()
self.dump(stream)
stream.seek(0)

s3 = boto3.client('s3')
s3.put_object(Bucket=bucket, Key=key, Body=stream.read())

stream.close()

def dumpToUri(self, uri):
"""Writes the config to location pointed to by given URI.
If URI does not specify the filename, by default `butler.yaml` will be
used. Currently supports 's3' and 'file' URI schemes.
Parameters
----------
uri: `str` or `ButlerURI`
URI of location where the Config will be written.
"""
if boto3 is None:
raise ModuleNotFoundError(("Could not find boto3. "
"Are you sure it is installed?"))

if isinstance(uri, str):
uri = ButlerURI(uri)

if uri.scheme == 'file':
if os.path.isdir(uri.path):
uri = ButlerURI(os.path.join(uri.path, 'butler.yaml'))
self.dumpToFile(uri.path)
elif uri.scheme == 's3':
# we guesstimate if dir or a file. Paths that *end* on strings with
# dots `[A-Za-z0-9].[A-Za-z0-9]` are assumed to be files.
isDirExpression = re.compile("\w*\.\w*$")
if isDirExpression.search(uri.path) is None:
# trailing '/' is mandatory for dirs, otherwise ButlerURI
# has a hard time updating files in URIs.
if not uri.path.endswith('/'):
uri = ButlerURI(uri.geturl()+'/')
uri.updateFile('butler.yaml')
self.dumpToS3(uri.netloc, uri.path.lstrip('/'))
else:
raise ValueError(f'Unrecognized URI scheme: {uri.scheme}')

@staticmethod
def updateParameters(configType, config, full, toUpdate=None, toCopy=None, overwrite=True):
"""Generic helper function for updating specific config parameters.
Expand Down
143 changes: 88 additions & 55 deletions python/lsst/daf/butler/core/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,23 +175,11 @@ def replace(self, **kwargs):
new : `ButlerURI`
New `ButlerURI` object with updated values.
"""

def __init__(self, datastoreRoot):
"""Constructor
Parameters
----------
datastoreRoot : `str`
Root location of the `Datastore` in the filesystem.
"""
return self.__class__(self._uri._replace(**kwargs))

def updateFile(self, newfile):
"""Update in place the final component of the path with the supplied
file name.
"""

def fromUri(self, uri):
"""Factory function to create a `Location` from a URI.
Parameters
----------
Expand Down Expand Up @@ -220,6 +208,21 @@ def __str__(self):
def _fixupFileUri(parsed, root=None, forceAbsolute=False):
"""Fix up relative paths in file URI instances.
Parameters
----------
parsed : `~urllib.parse.ParseResult`
The result from parsing a URI using `urllib.parse`.
root : `str`, optional
Path to use as root when converting relative to absolute.
If `None`, it will be the current working directory. This
is a local file system path, not a URI.
forceAbsolute : `bool`
If `True`, scheme-less relative URI will be converted to an
absolute path using a ``file`` scheme. If `False` scheme-less URI
will remain scheme-less and will not be updated to ``file`` or
absolute path. URIs with a defined scheme will not be affected
by this parameter.
Returns
-------
modified : `~urllib.parse.ParseResult`
Expand Down Expand Up @@ -306,8 +309,10 @@ class Location:
Relative path within datastore. Assumed to be using the local
path separator if a ``file`` scheme is being used for the URI,
else a POSIX separator.
"""

__slots__ = ("_datastoreRootUri", "_path")

def __init__(self, datastoreRootUri, path):
if isinstance(datastoreRootUri, str):
datastoreRootUri = ButlerURI(datastoreRootUri)
Expand Down Expand Up @@ -342,15 +347,44 @@ def uri(self):
@property
def path(self):
"""Path corresponding to location.
This path includes the root of the `Datastore`.
This path includes the root of the `Datastore`, but does not include
non-path components of the root URI. If a file URI scheme is being
used the path will be returned with the local OS path separator.
"""
return os.path.join(self._datastoreRoot, self._uri.path.lstrip("/"))
if not self._datastoreRootUri.scheme:
# Entirely local file system
return os.path.normpath(os.path.join(self._datastoreRootUri.path, self.pathInStore))
elif self._datastoreRootUri.scheme == "file":
return os.path.normpath(os.path.join(posix2os(self._datastoreRootUri.path), self.pathInStore))
else:
return posixpath.join(self._datastoreRootUri.path, self.pathInStore)

@property
def pathInStore(self):
"""Path corresponding to S3Location relative to `S3Datastore` root.
"""Path corresponding to location relative to `Datastore` root.
Uses the same path separator as supplied to the object constructor.
"""
return self._path

@property
def bucketName(self):
"""If Location is an S3 protocol, returns the bucket name."""
if self._datastoreRootUri.scheme == 's3':
return self._datastoreRootUri.netloc
else:
raise AttributeError(f'Path {self} is not a valid S3 protocol.')

@property
def pathInBucket(self):
"""Returns the path in an S3 Bucket, normalized so that directory
structure in the bucket matches that of a local filesystem.
Effectively, this is the path property with posix separator stipped
from the left hand side of the path.
"""
return self._uri.path.lstrip("/")
return self.path.lstrip('/')

def updateExtension(self, ext):
"""Update the file extension associated with this `Location`.
Expand All @@ -363,68 +397,67 @@ def updateExtension(self, ext):
"""
if ext is None:
return
path, _ = os.path.splitext(self._uri.path)

path, _ = os.path.splitext(self.pathInStore)

# Ensure that we have a leading "." on file extension (and we do not
# try to modify the empty string)
if ext and not ext.startswith("."):
ext = "." + ext

parts = list(self._uri)
parts[2] = path + ext
self._uri = urllib.parse.urlparse(urllib.parse.urlunparse(parts))
self._path = path + ext


class LocationFactory:
"""Factory for `Location` instances.
The factory is constructed from the root location of the datastore.
This location can be a path on the file system (absolute or relative)
or as a URI.
Parameters
----------
datastoreRoot : `str`
Root location of the `Datastore` either as a path in the local
filesystem or as a URI. File scheme URIs can be used. If a local
filesystem path is used without URI scheme, it will be converted
to an absolute path and any home directory indicators expanded.
If a file scheme is used with a relative path, the path will
be treated as a posixpath but then converted to an absolute path.
"""

def __init__(self, datastoreRoot):
"""Constructor
Parameters
----------
bucket : `str`
Name of the Bucket that is used.
datastoreRoot : `str`
Root location of the `S3Datastore` in the Bucket.
"""
self._datastoreRoot = datastoreRoot
self._datastoreRootUri = ButlerURI(datastoreRoot, forceAbsolute=True)

def fromUri(self, uri):
"""Factory function to create a `S3Location` from a URI.
def __str__(self):
return f"{self.__class__.__name__}@{self._datastoreRootUri}"

Parameters
----------
uri : `str`
A valid Universal Resource Identifier.
@property
def bucketName(self):
"""If Location is an S3 protocol, returns the bucket name."""
if self._datastoreRootUri.scheme == 's3':
return self._datastoreRootUri.netloc
else:
raise AttributeError(f'Path {self} is not a valid S3 protocol.')

Returns
-------
location : `S3Location`
The equivalent `S3Location`.
"""
if uri is None or not isinstance(uri, str):
raise ValueError("URI must be a string and not {}".format(uri))
return Location(self._datastoreRoot, uri)
@property
def datastoreRootName(self):
"""Returns the name of the directory used as datastore's root."""
return os.path.basename(self._datastoreRootUri.path)

def fromPath(self, path):
"""Factory function to create a `S3Location` from a POSIX-like path.
"""Factory function to create a `Location` from a POSIX path.
Parameters
----------
path : `str`
A POSIX-like path, relative to the `S3Datastore` root.
A standard POSIX path, relative to the `Datastore` root.
Returns
-------
location : `S3Location`
The equivalent `S3Location`.
location : `Location`
The equivalent `Location`.
"""
uri = urllib.parse.urljoin("file://", path)
return self.fromUri(uri)
if os.path.isabs(path):
raise ValueError(('A path whose absolute location is in an S3 bucket '
'can not have an absolute path: {}').format(path))

return self.fromUri('s3://' + os.path.join(self._bucket, self._datastoreRoot, path))
raise ValueError("LocationFactory path must be relative to datastore, not absolute.")
return Location(self._datastoreRootUri, path)
Loading

0 comments on commit a71c219

Please sign in to comment.