diff --git a/doc/users/aws.rst b/doc/users/aws.rst
new file mode 100644
index 0000000000..832072ba62
--- /dev/null
+++ b/doc/users/aws.rst
@@ -0,0 +1,102 @@
+.. _aws:
+
+============================================
+Using Nipype with Amazon Web Services (AWS)
+============================================
+Several groups have been successfully using Nipype on AWS. This procedure
+involves setting a temporary cluster using StarCluster and potentially
+transferring files to/from S3. The latter is supported by Nipype through
+DataSink and S3DataGrabber.
+
+
+Using DataSink with S3
+======================
+The DataSink class now supports sending output data directly to an AWS S3
+bucket. It does this through the introduction of several input attributes to the
+DataSink interface and by parsing the `base_directory` attribute. This class
+uses the `boto3 `_ and
+`botocore `_ Python packages to
+interact with AWS. To configure the DataSink to write data to S3, the user must
+set the ``base_directory`` property to an S3-style filepath. For example:
+
+::
+
+ import nipype.interfaces.io as nio
+ ds = nio.DataSink()
+ ds.inputs.base_directory = 's3://mybucket/path/to/output/dir'
+
+With the "s3://" prefix in the path, the DataSink knows that the output
+directory to send files is on S3 in the bucket "mybucket". "path/to/output/dir"
+is the relative directory path within the bucket "mybucket" where output data
+will be uploaded to (NOTE: if the relative path specified contains folders that
+don’t exist in the bucket, the DataSink will create them). The DataSink treats
+the S3 base directory exactly as it would a local directory, maintaining support
+for containers, substitutions, subfolders, "." notation, etc to route output
+data appropriately.
+
+There are four new attributes introduced with S3-compatibility: ``creds_path``,
+``encrypt_bucket_keys``, ``local_copy``, and ``bucket``.
+
+::
+
+ ds.inputs.creds_path = '/home/user/aws_creds/credentials.csv'
+ ds.inputs.encrypt_bucket_keys = True
+ ds.local_copy = '/home/user/workflow_outputs/local_backup'
+
+``creds_path`` is a file path where the user's AWS credentials file (typically
+a csv) is stored. This credentials file should contain the AWS access key id and
+secret access key and should be formatted as one of the following (these formats
+are how Amazon provides the credentials file by default when first downloaded).
+
+Root-account user:
+
+::
+
+ AWSAccessKeyID=ABCDEFGHIJKLMNOP
+ AWSSecretKey=zyx123wvu456/ABC890+gHiJk
+
+IAM-user:
+
+::
+
+ User Name,Access Key Id,Secret Access Key
+ "username",ABCDEFGHIJKLMNOP,zyx123wvu456/ABC890+gHiJk
+
+The ``creds_path`` is necessary when writing files to a bucket that has
+restricted access (almost no buckets are publicly writable). If ``creds_path``
+is not specified, the DataSink will check the ``AWS_ACCESS_KEY_ID`` and
+``AWS_SECRET_ACCESS_KEY`` environment variables and use those values for bucket
+access.
+
+``encrypt_bucket_keys`` is a boolean flag that indicates whether to encrypt the
+output data on S3, using server-side AES-256 encryption. This is useful if the
+data being output is sensitive and one desires an extra layer of security on the
+data. By default, this is turned off.
+
+``local_copy`` is a string of the filepath where local copies of the output data
+are stored in addition to those sent to S3. This is useful if one wants to keep
+a backup version of the data stored on their local computer. By default, this is
+turned off.
+
+``bucket`` is a boto3 Bucket object that the user can use to overwrite the
+bucket specified in their ``base_directory``. This can be useful if one has to
+manually create a bucket instance on their own using special credentials (or
+using a mock server like `fakes3 `_). This is
+typically used for developers unit-testing the DataSink class. Most users do not
+need to use this attribute for actual workflows. This is an optional argument.
+
+Finally, the user needs only to specify the input attributes for any incoming
+data to the node, and the outputs will be written to their S3 bucket.
+
+::
+
+ workflow.connect(inputnode, 'subject_id', ds, 'container')
+ workflow.connect(realigner, 'realigned_files', ds, 'motion')
+
+So, for example, outputs for sub001’s realigned_file1.nii.gz will be in:
+s3://mybucket/path/to/output/dir/sub001/motion/realigned_file1.nii.gz
+
+
+Using S3DataGrabber
+======================
+Coming soon...
\ No newline at end of file
diff --git a/doc/users/index.rst b/doc/users/index.rst
index 3a432135a6..13c1487ae0 100644
--- a/doc/users/index.rst
+++ b/doc/users/index.rst
@@ -38,6 +38,7 @@
spmmcr
mipav
nipypecmd
+ aws
diff --git a/nipype/interfaces/io.py b/nipype/interfaces/io.py
index 5909843c34..6f0ad3bc32 100644
--- a/nipype/interfaces/io.py
+++ b/nipype/interfaces/io.py
@@ -134,7 +134,54 @@ def _add_output_traits(self, base):
return base
+# Class to track percentage of S3 file upload
+class ProgressPercentage(object):
+ '''
+ Callable class instsance (via __call__ method) that displays
+ upload percentage of a file to S3
+ '''
+
+ def __init__(self, filename):
+ '''
+ '''
+
+ # Import packages
+ import threading
+
+ # Initialize data attributes
+ self._filename = filename
+ self._size = float(os.path.getsize(filename))
+ self._seen_so_far = 0
+ self._lock = threading.Lock()
+
+ def __call__(self, bytes_amount):
+ '''
+ '''
+
+ # Import packages
+ import sys
+
+ # With the lock on, print upload status
+ with self._lock:
+ self._seen_so_far += bytes_amount
+ if self._size != 0:
+ percentage = (self._seen_so_far / self._size) * 100
+ else:
+ percentage = 0
+ progress_str = '%d / %d (%.2f%%)\r'\
+ % (self._seen_so_far, self._size, percentage)
+
+ # Write to stdout
+ sys.stdout.write(progress_str)
+ sys.stdout.flush()
+
+
+# DataSink inputs
class DataSinkInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec):
+ '''
+ '''
+
+ # Init inputspec data attributes
base_directory = Directory(
desc='Path to the base directory for storing data.')
container = traits.Str(
@@ -146,17 +193,32 @@ class DataSinkInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec):
desc=('List of 2-tuples reflecting string '
'to substitute and string to replace '
'it with'))
- regexp_substitutions = InputMultiPath(traits.Tuple(traits.Str, traits.Str),
- desc=('List of 2-tuples reflecting a pair '
- 'of a Python regexp pattern and a '
- 'replacement string. Invoked after '
- 'string `substitutions`'))
+ regexp_substitutions = \
+ InputMultiPath(traits.Tuple(traits.Str, traits.Str),
+ desc=('List of 2-tuples reflecting a pair of a '\
+ 'Python regexp pattern and a replacement '\
+ 'string. Invoked after string `substitutions`'))
_outputs = traits.Dict(traits.Str, value={}, usedefault=True)
remove_dest_dir = traits.Bool(False, usedefault=True,
desc='remove dest directory when copying dirs')
+ # AWS S3 data attributes
+ creds_path = traits.Str(desc='Filepath to AWS credentials file for S3 bucket '\
+ 'access; if not specified, the credentials will '\
+ 'be taken from the AWS_ACCESS_KEY_ID and '\
+ 'AWS_SECRET_ACCESS_KEY environment variables')
+ encrypt_bucket_keys = traits.Bool(desc='Flag indicating whether to use S3 '\
+ 'server-side AES-256 encryption')
+ # Set this if user wishes to override the bucket with their own
+ bucket = traits.Generic(mandatory=False,
+ desc='Boto3 S3 bucket for manual override of bucket')
+ # Set this if user wishes to have local copy of files as well
+ local_copy = traits.Str(desc='Copy files locally as well as to S3 bucket')
+
+ # Set call-able inputs attributes
def __setattr__(self, key, value):
+
if key not in self.copyable_trait_names():
if not isdefined(value):
super(DataSinkInputSpec, self).__setattr__(key, value)
@@ -167,11 +229,14 @@ def __setattr__(self, key, value):
super(DataSinkInputSpec, self).__setattr__(key, value)
+# DataSink outputs
class DataSinkOutputSpec(TraitedSpec):
+ # Init out file
out_file = traits.Any(desc='datasink output')
+# Custom DataSink class
class DataSink(IOBase):
""" Generic datasink module to store structured outputs
@@ -233,9 +298,12 @@ class DataSink(IOBase):
>>> ds.run() # doctest: +SKIP
"""
+
+ # Give obj .inputs and .outputs
input_spec = DataSinkInputSpec
output_spec = DataSinkOutputSpec
+ # Initialization method to set up datasink
def __init__(self, infields=None, force_run=True, **kwargs):
"""
Parameters
@@ -257,6 +325,7 @@ def __init__(self, infields=None, force_run=True, **kwargs):
if force_run:
self._always_run = True
+ # Get destination paths
def _get_dst(self, src):
# If path is directory with trailing os.path.sep,
# then remove that for a more robust behavior
@@ -280,6 +349,7 @@ def _get_dst(self, src):
dst = dst[1:]
return dst
+ # Substitute paths in substitutions dictionary parameter
def _substitute(self, pathstr):
pathstr_ = pathstr
if isdefined(self.inputs.substitutions):
@@ -300,67 +370,377 @@ def _substitute(self, pathstr):
iflogger.info('sub: %s -> %s' % (pathstr_, pathstr))
return pathstr
+ # Check for s3 in base directory
+ def _check_s3_base_dir(self):
+ '''
+ Method to see if the datasink's base directory specifies an
+ S3 bucket path; if it does, it parses the path for the bucket
+ name in the form 's3://bucket_name/...' and returns it
+
+ Parameters
+ ----------
+
+ Returns
+ -------
+ s3_flag : boolean
+ flag indicating whether the base_directory contained an
+ S3 bucket path
+ bucket_name : string
+ name of the S3 bucket to connect to; if the base directory
+ is not a valid S3 path, defaults to ''
+ '''
+
+ # Init variables
+ s3_str = 's3://'
+ bucket_name = ''
+ base_directory = self.inputs.base_directory
+
+ if not isdefined(base_directory):
+ s3_flag = False
+ return s3_flag, bucket_name
+
+ # Explicitly lower-case the "s3"
+ if base_directory.lower().startswith(s3_str):
+ base_dir_sp = base_directory.split('/')
+ base_dir_sp[0] = base_dir_sp[0].lower()
+ base_directory = '/'.join(base_dir_sp)
+
+ # Check if 's3://' in base dir
+ if base_directory.startswith(s3_str):
+ # Expects bucket name to be 's3://bucket_name/base_dir/..'
+ bucket_name = base_directory.split(s3_str)[1].split('/')[0]
+ s3_flag = True
+ # Otherwise it's just a normal datasink
+ else:
+ s3_flag = False
+
+ # Return s3_flag
+ return s3_flag, bucket_name
+
+ # Function to return AWS secure environment variables
+ def _return_aws_keys(self):
+ '''
+ Method to return AWS access key id and secret access key using
+ credentials found in a local file.
+
+ Parameters
+ ----------
+ self : nipype.interfaces.io.DataSink
+ self for instance method
+
+ Returns
+ -------
+ aws_access_key_id : string
+ string of the AWS access key ID
+ aws_secret_access_key : string
+ string of the AWS secret access key
+ '''
+
+ # Import packages
+ import os
+
+ # Init variables
+ creds_path = self.inputs.creds_path
+
+ # Check if creds exist
+ if creds_path and os.path.exists(creds_path):
+ with open(creds_path, 'r') as creds_in:
+ # Grab csv rows
+ row1 = creds_in.readline()
+ row2 = creds_in.readline()
+
+ # Are they root or user keys
+ if 'User Name' in row1:
+ # And split out for keys
+ aws_access_key_id = row2.split(',')[1]
+ aws_secret_access_key = row2.split(',')[2]
+ elif 'AWSAccessKeyId' in row1:
+ # And split out for keys
+ aws_access_key_id = row1.split('=')[1]
+ aws_secret_access_key = row2.split('=')[1]
+ else:
+ err_msg = 'Credentials file not recognized, check file is correct'
+ raise Exception(err_msg)
+
+ # Strip any carriage return/line feeds
+ aws_access_key_id = aws_access_key_id.replace('\r', '').replace('\n', '')
+ aws_secret_access_key = aws_secret_access_key.replace('\r', '').replace('\n', '')
+ else:
+ aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
+ aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
+
+ # Return keys
+ return aws_access_key_id, aws_secret_access_key
+
+ # Fetch bucket object
+ def _fetch_bucket(self, bucket_name):
+ '''
+ Method to return a bucket object which can be used to interact
+ with an AWS S3 bucket using credentials found in a local file.
+
+ Parameters
+ ----------
+ self : nipype.interfaces.io.DataSink
+ self for instance method
+ bucket_name : string
+ string corresponding to the name of the bucket on S3
+
+ Returns
+ -------
+ bucket : boto3.resources.factory.s3.Bucket
+ boto3 s3 Bucket object which is used to interact with files
+ in an S3 bucket on AWS
+ '''
+
+ # Import packages
+ import logging
+
+ try:
+ import boto3
+ import botocore
+ except ImportError as exc:
+ err_msg = 'Boto3 package is not installed - install boto3 and '\
+ 'try again.'
+ raise Exception(err_msg)
+
+ # Init variables
+ creds_path = self.inputs.creds_path
+ iflogger = logging.getLogger('interface')
+
+ # Get AWS credentials
+ try:
+ aws_access_key_id, aws_secret_access_key = \
+ self._return_aws_keys()
+ except Exception as exc:
+ err_msg = 'There was a problem extracting the AWS credentials '\
+ 'from the credentials file provided: %s. Error:\n%s'\
+ % (creds_path, exc)
+ raise Exception(err_msg)
+
+ # Try and get AWS credentials if a creds_path is specified
+ if aws_access_key_id and aws_secret_access_key:
+ # Init connection
+ iflogger.info('Connecting to S3 bucket: %s with credentials...'\
+ % bucket_name)
+ # Use individual session for each instance of DataSink
+ # Better when datasinks are being used in multi-threading, see:
+ # http://boto3.readthedocs.org/en/latest/guide/resources.html#multithreading
+ session = boto3.session.Session(aws_access_key_id=aws_access_key_id,
+ aws_secret_access_key=aws_secret_access_key)
+ s3_resource = session.resource('s3', use_ssl=True)
+
+ # Otherwise, connect anonymously
+ else:
+ iflogger.info('Connecting to AWS: %s anonymously...'\
+ % bucket_name)
+ session = boto3.session.Session()
+ s3_resource = session.resource('s3', use_ssl=True)
+ s3_resource.meta.client.meta.events.register('choose-signer.s3.*',
+ botocore.handlers.disable_signing)
+
+ # Explicitly declare a secure SSL connection for bucket object
+ bucket = s3_resource.Bucket(bucket_name)
+
+ # And try fetch the bucket with the name argument
+ try:
+ s3_resource.meta.client.head_bucket(Bucket=bucket_name)
+ except botocore.exceptions.ClientError as exc:
+ error_code = int(exc.response['Error']['Code'])
+ if error_code == 403:
+ err_msg = 'Access to bucket: %s is denied; check credentials'\
+ % bucket_name
+ raise Exception(err_msg)
+ elif error_code == 404:
+ err_msg = 'Bucket: %s does not exist; check spelling and try '\
+ 'again' % bucket_name
+ raise Exception(err_msg)
+ else:
+ err_msg = 'Unable to connect to bucket: %s. Error message:\n%s'\
+ % (bucket_name, exc)
+ except Exception as exc:
+ err_msg = 'Unable to connect to bucket: %s. Error message:\n%s'\
+ % (bucket_name, exc)
+ raise Exception(err_msg)
+
+ # Return the bucket
+ return bucket
+
+ # Send up to S3 method
+ def _upload_to_s3(self, bucket, src, dst):
+ '''
+ Method to upload outputs to S3 bucket instead of on local disk
+ '''
+
+ # Import packages
+ import hashlib
+ import logging
+ import os
+
+ from botocore.exceptions import ClientError
+
+ # Init variables
+ iflogger = logging.getLogger('interface')
+ s3_str = 's3://'
+ s3_prefix = s3_str + bucket.name
+
+ # Explicitly lower-case the "s3"
+ if dst.lower().startswith(s3_str):
+ dst_sp = dst.split('/')
+ dst_sp[0] = dst_sp[0].lower()
+ dst = '/'.join(dst_sp)
+
+ # If src is a directory, collect files (this assumes dst is a dir too)
+ if os.path.isdir(src):
+ src_files = []
+ for root, dirs, files in os.walk(src):
+ src_files.extend([os.path.join(root, fil) for fil in files])
+ # Make the dst files have the dst folder as base dir
+ dst_files = [os.path.join(dst, src_f.split(src)[1]) \
+ for src_f in src_files]
+ else:
+ src_files = [src]
+ dst_files = [dst]
+
+ # Iterate over src and copy to dst
+ for src_idx, src_f in enumerate(src_files):
+ # Get destination filename/keyname
+ dst_f = dst_files[src_idx]
+ dst_k = dst_f.replace(s3_prefix, '').lstrip('/')
+
+ # See if same file is already up there
+ try:
+ dst_obj = bucket.Object(key=dst_k)
+ dst_md5 = dst_obj.e_tag.strip('"')
+
+ # See if same file is already there
+ src_read = open(src_f, 'rb').read()
+ src_md5 = hashlib.md5(src_read).hexdigest()
+ # Move to next loop iteration
+ if dst_md5 == src_md5:
+ iflogger.info('File %s already exists on S3, skipping...' % dst_f)
+ continue
+ else:
+ iflogger.info('Overwriting previous S3 file...')
+
+ except ClientError:
+ iflogger.info('New file to S3')
+
+ # Copy file up to S3 (either encrypted or not)
+ iflogger.info('Uploading %s to S3 bucket, %s, as %s...'\
+ % (src_f, bucket.name, dst_f))
+ if self.inputs.encrypt_bucket_keys:
+ extra_args = {'ServerSideEncryption' : 'AES256'}
+ else:
+ extra_args = {}
+ bucket.upload_file(src_f, dst_k, ExtraArgs=extra_args,
+ Callback=ProgressPercentage(src_f))
+
+ # List outputs, main run routine
def _list_outputs(self):
"""Execute this module.
"""
+
+ # Init variables
+ iflogger = logging.getLogger('interface')
outputs = self.output_spec().get()
out_files = []
- outdir = self.inputs.base_directory
- if not isdefined(outdir):
- outdir = '.'
- outdir = os.path.abspath(outdir)
+ # Use hardlink
+ use_hardlink = str2bool(config.get('execution', 'try_hard_link_datasink'))
+
+ # Set local output directory if specified
+ if isdefined(self.inputs.local_copy):
+ outdir = self.inputs.local_copy
+ else:
+ outdir = self.inputs.base_directory
+ # If base directory isn't given, assume current directory
+ if not isdefined(outdir):
+ outdir = '.'
+
+ # Check if base directory reflects S3 bucket upload
+ s3_flag, bucket_name = self._check_s3_base_dir()
+ if s3_flag:
+ s3dir = self.inputs.base_directory
+ # If user overrides bucket object, use that
+ if self.inputs.bucket:
+ bucket = self.inputs.bucket
+ # Otherwise fetch bucket object using name
+ else:
+ try:
+ bucket = self._fetch_bucket(bucket_name)
+ # If encountering an exception during bucket access, set output
+ # base directory to a local folder
+ except Exception as exc:
+ s3dir = ''
+ if not isdefined(self.inputs.local_copy):
+ local_out_exception = os.path.join(os.path.expanduser('~'),
+ 's3_datasink_' + bucket_name)
+ outdir = local_out_exception
+ # Log local copying directory
+ iflogger.info('Access to S3 failed! Storing outputs locally at: '\
+ '%s\nError: %s' %(outdir, exc))
+ else:
+ s3dir = ''
+
+ # If container input is given, append that to outdir
if isdefined(self.inputs.container):
outdir = os.path.join(outdir, self.inputs.container)
- if not os.path.exists(outdir):
- try:
- os.makedirs(outdir)
- except OSError as inst:
- if 'File exists' in inst:
- pass
- else:
- raise(inst)
- use_hardlink = str2bool(config.get('execution',
- 'try_hard_link_datasink'))
- for key, files in list(self.inputs._outputs.items()):
+ s3dir = os.path.join(s3dir, self.inputs.container)
+
+ # If sinking to local folder
+ if outdir != s3dir:
+ outdir = os.path.abspath(outdir)
+ # Create the directory if it doesn't exist
+ if not os.path.exists(outdir):
+ try:
+ os.makedirs(outdir)
+ except OSError as inst:
+ if 'File exists' in inst:
+ pass
+ else:
+ raise(inst)
+
+ # Iterate through outputs attributes {key : path(s)}
+ for key, files in self.inputs._outputs.items():
if not isdefined(files):
continue
iflogger.debug("key: %s files: %s" % (key, str(files)))
files = filename_to_list(files)
tempoutdir = outdir
+ if s3_flag:
+ s3tempoutdir = s3dir
for d in key.split('.'):
if d[0] == '@':
continue
tempoutdir = os.path.join(tempoutdir, d)
+ if s3_flag:
+ s3tempoutdir = os.path.join(s3tempoutdir, d)
# flattening list
if isinstance(files, list):
if isinstance(files[0], list):
files = [item for sublist in files for item in sublist]
+ # Iterate through passed-in source files
for src in filename_to_list(files):
+ # Format src and dst files
src = os.path.abspath(src)
- if os.path.isfile(src):
- dst = self._get_dst(src)
- dst = os.path.join(tempoutdir, dst)
- dst = self._substitute(dst)
- path, _ = os.path.split(dst)
- if not os.path.exists(path):
- try:
- os.makedirs(path)
- except OSError as inst:
- if 'File exists' in inst:
- pass
- else:
- raise(inst)
- iflogger.debug("copyfile: %s %s" % (src, dst))
- copyfile(src, dst, copy=True, hashmethod='content',
- use_hardlink=use_hardlink)
- out_files.append(dst)
- elif os.path.isdir(src):
- dst = self._get_dst(os.path.join(src, ''))
- dst = os.path.join(tempoutdir, dst)
- dst = self._substitute(dst)
- path, _ = os.path.split(dst)
+ if not os.path.isfile(src):
+ src = os.path.join(src, '')
+ dst = self._get_dst(src)
+ if s3_flag:
+ s3dst = os.path.join(s3tempoutdir, dst)
+ s3dst = self._substitute(s3dst)
+ dst = os.path.join(tempoutdir, dst)
+ dst = self._substitute(dst)
+ path, _ = os.path.split(dst)
+
+ # If we're uploading to S3
+ if s3_flag:
+ self._upload_to_s3(bucket, src, s3dst)
+ out_files.append(s3dst)
+ # Otherwise, copy locally src -> dst
+ if not s3_flag or isdefined(self.inputs.local_copy):
+ # Create output directory if it doesnt exist
if not os.path.exists(path):
try:
os.makedirs(path)
@@ -369,110 +749,27 @@ def _list_outputs(self):
pass
else:
raise(inst)
- if os.path.exists(dst) and self.inputs.remove_dest_dir:
- iflogger.debug("removing: %s" % dst)
- shutil.rmtree(dst)
- iflogger.debug("copydir: %s %s" % (src, dst))
- copytree(src, dst)
- out_files.append(dst)
+ # If src is a file, copy it to dst
+ if os.path.isfile(src):
+ iflogger.debug('copyfile: %s %s' % (src, dst))
+ copyfile(src, dst, copy=True, hashmethod='content',
+ use_hardlink=use_hardlink)
+ out_files.append(dst)
+ # If src is a directory, copy entire contents to dst dir
+ elif os.path.isdir(src):
+ if os.path.exists(dst) and self.inputs.remove_dest_dir:
+ iflogger.debug('removing: %s' % dst)
+ shutil.rmtree(dst)
+ iflogger.debug('copydir: %s %s' % (src, dst))
+ copytree(src, dst)
+ out_files.append(dst)
+
+ # Return outputs dictionary
outputs['out_file'] = out_files
return outputs
-class S3DataSinkInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec):
- testing = traits.Bool(False, usedefault=True,
- desc='Flag for using local fakes3 server.'
- ' (for testing purposes only)')
- anon = traits.Bool(False, usedefault=True,
- desc='Use anonymous connection to s3')
- bucket = traits.Str(mandatory=True,
- desc='Amazon S3 bucket where your data is stored')
- bucket_path = traits.Str('', usedefault=True,
- desc='Location within your bucket to store '
- 'data.')
- base_directory = Directory(
- desc='Path to the base directory for storing data.')
- container = traits.Str(
- desc='Folder within base directory in which to store output')
- parameterization = traits.Bool(True, usedefault=True,
- desc='store output in parametrized structure')
- strip_dir = Directory(desc='path to strip out of filename')
- substitutions = InputMultiPath(traits.Tuple(traits.Str, traits.Str),
- desc=('List of 2-tuples reflecting string '
- 'to substitute and string to replace '
- 'it with'))
- regexp_substitutions = InputMultiPath(traits.Tuple(traits.Str, traits.Str),
- desc=('List of 2-tuples reflecting a pair '
- 'of a Python regexp pattern and a '
- 'replacement string. Invoked after '
- 'string `substitutions`'))
-
- _outputs = traits.Dict(traits.Str, value={}, usedefault=True)
- remove_dest_dir = traits.Bool(False, usedefault=True,
- desc='remove dest directory when copying dirs')
-
- def __setattr__(self, key, value):
- if key not in self.copyable_trait_names():
- if not isdefined(value):
- super(S3DataSinkInputSpec, self).__setattr__(key, value)
- self._outputs[key] = value
- else:
- if key in self._outputs:
- self._outputs[key] = value
- super(S3DataSinkInputSpec, self).__setattr__(key, value)
-
-
-class S3DataSink(DataSink):
- """ Works exactly like DataSink, except the specified files will
- also be uploaded to Amazon S3 storage in the specified bucket
- and location. 'bucket_path' is the s3 analog for
- 'base_directory'.
-
- """
- input_spec = S3DataSinkInputSpec
-
- def _list_outputs(self):
- """Execute this module.
- """
- outputs = super(S3DataSink, self)._list_outputs()
-
- self.localtos3(outputs['out_file'])
-
- return outputs
-
- def localtos3(self, paths):
- if self.inputs.testing:
- conn = S3Connection(anon=True, is_secure=False, port=4567,
- host='localhost',
- calling_format=OrdinaryCallingFormat())
-
- else:
- conn = S3Connection(anon=self.inputs.anon)
- bkt = conn.get_bucket(self.inputs.bucket)
- s3paths = []
-
- for path in paths:
- # convert local path to s3 path
- bd_index = path.find(self.inputs.base_directory)
- if bd_index != -1: # base_directory is in path, maintain directory structure
- s3path = path[bd_index + len(self.inputs.base_directory):] # cut out base directory
- if s3path[0] == os.path.sep:
- s3path = s3path[1:]
- else: # base_directory isn't in path, simply place all files in bucket_path folder
- s3path = os.path.split(path)[1] # take filename from path
- s3path = os.path.join(self.inputs.bucket_path, s3path)
- if s3path[-1] == os.path.sep:
- s3path = s3path[:-1]
- s3paths.append(s3path)
-
- k = boto.s3.key.Key(bkt)
- k.key = s3path
- k.set_contents_from_filename(path)
-
- return s3paths
-
-
class S3DataGrabberInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec):
anon = traits.Bool(False, usedefault=True,
desc='Use anonymous connection to s3. If this is set to True, boto may print' +
diff --git a/nipype/interfaces/tests/test_auto_S3DataSink.py b/nipype/interfaces/tests/test_auto_S3DataSink.py
deleted file mode 100644
index 9ef342defb..0000000000
--- a/nipype/interfaces/tests/test_auto_S3DataSink.py
+++ /dev/null
@@ -1,44 +0,0 @@
-# AUTO-GENERATED by tools/checkspecs.py - DO NOT EDIT
-from ...testing import assert_equal
-from ..io import S3DataSink
-
-
-def test_S3DataSink_inputs():
- input_map = dict(_outputs=dict(usedefault=True,
- ),
- anon=dict(usedefault=True,
- ),
- base_directory=dict(),
- bucket=dict(mandatory=True,
- ),
- bucket_path=dict(usedefault=True,
- ),
- container=dict(),
- ignore_exception=dict(nohash=True,
- usedefault=True,
- ),
- parameterization=dict(usedefault=True,
- ),
- regexp_substitutions=dict(),
- remove_dest_dir=dict(usedefault=True,
- ),
- strip_dir=dict(),
- substitutions=dict(),
- testing=dict(usedefault=True,
- ),
- )
- inputs = S3DataSink.input_spec()
-
- for key, metadata in list(input_map.items()):
- for metakey, value in list(metadata.items()):
- yield assert_equal, getattr(inputs.traits()[key], metakey), value
-
-
-def test_S3DataSink_outputs():
- output_map = dict(out_file=dict(),
- )
- outputs = S3DataSink.output_spec()
-
- for key, metadata in list(output_map.items()):
- for metakey, value in list(metadata.items()):
- yield assert_equal, getattr(outputs.traits()[key], metakey), value
diff --git a/nipype/interfaces/tests/test_io.py b/nipype/interfaces/tests/test_io.py
index 37ed6eae43..c1f4ec35f5 100644
--- a/nipype/interfaces/tests/test_io.py
+++ b/nipype/interfaces/tests/test_io.py
@@ -18,13 +18,32 @@
import nipype.interfaces.io as nio
from nipype.interfaces.base import Undefined
+# Check for boto
noboto = False
try:
import boto
from boto.s3.connection import S3Connection, OrdinaryCallingFormat
-except:
+except ImportError:
noboto = True
+# Check for boto3
+noboto3 = False
+try:
+ import boto3
+ from botocore.utils import fix_s3_host
+except ImportError:
+ noboto3 = True
+
+# Check for fakes3
+import subprocess
+try:
+ ret_code = subprocess.check_call(['which', 'fakes3'], stdout=open(os.devnull, 'wb'))
+ if ret_code == 0:
+ fakes3 = True
+ else:
+ fakes3 = False
+except subprocess.CalledProcessError:
+ fakes3 = False
def test_datagrabber():
dg = nio.DataGrabber()
@@ -166,50 +185,164 @@ def test_datasink():
yield assert_true, 'test' in ds.inputs.copyable_trait_names()
-@skipif(noboto)
-def test_s3datasink():
- ds = nio.S3DataSink()
- yield assert_true, ds.inputs.parameterization
- yield assert_equal, ds.inputs.base_directory, Undefined
- yield assert_equal, ds.inputs.strip_dir, Undefined
- yield assert_equal, ds.inputs._outputs, {}
- ds = nio.S3DataSink(base_directory='foo')
- yield assert_equal, ds.inputs.base_directory, 'foo'
- ds = nio.S3DataSink(infields=['test'])
- yield assert_true, 'test' in ds.inputs.copyable_trait_names()
+# Make dummy input file
+def _make_dummy_input():
+ '''
+ Function to create a dummy file
+ '''
+ # Import packages
+ import tempfile
-def test_datasink_substitutions():
- indir = mkdtemp(prefix='-Tmp-nipype_ds_subs_in')
- outdir = mkdtemp(prefix='-Tmp-nipype_ds_subs_out')
- files = []
- for n in ['ababab.n', 'xabababyz.n']:
- f = os.path.join(indir, n)
- files.append(f)
- open(f, 'w')
- ds = nio.DataSink(
- parametrization=False,
- base_directory=outdir,
- substitutions=[('ababab', 'ABABAB')],
- # end archoring ($) is used to assure operation on the filename
- # instead of possible temporary directories names matches
- # Patterns should be more comprehendable in the real-world usage
- # cases since paths would be quite more sensible
- regexp_substitutions=[(r'xABABAB(\w*)\.n$', r'a-\1-b.n'),
- ('(.*%s)[-a]([^%s]*)$' % ((os.path.sep,) * 2),
- r'\1!\2')])
- setattr(ds.inputs, '@outdir', files)
+
+ # Init variables
+ input_dir = tempfile.mkdtemp()
+ input_path = os.path.join(input_dir, 'datasink_test_s3.txt')
+
+ # Create input file
+ with open(input_path, 'wb') as f:
+ f.write(b'ABCD1234')
+
+ # Return path
+ return input_path
+
+
+# Test datasink writes to s3 properly
+@skipif(noboto3 or not fakes3)
+def test_datasink_to_s3():
+ '''
+ This function tests to see if the S3 functionality of a DataSink
+ works properly
+ '''
+
+ # Import packages
+ import hashlib
+ import tempfile
+
+ # Init variables
+ ds = nio.DataSink()
+ bucket_name = 'test'
+ container = 'outputs'
+ attr_folder = 'text_file'
+ output_dir = 's3://' + bucket_name
+ # Local temporary filepaths for testing
+ fakes3_dir = tempfile.mkdtemp()
+ input_path = _make_dummy_input()
+
+ # Start up fake-S3 server
+ proc = Popen(['fakes3', '-r', fakes3_dir, '-p', '4567'], stdout=open(os.devnull, 'wb'))
+
+ # Init boto3 s3 resource to talk with fakes3
+ resource = boto3.resource(aws_access_key_id='mykey',
+ aws_secret_access_key='mysecret',
+ service_name='s3',
+ endpoint_url='http://localhost:4567',
+ use_ssl=False)
+ resource.meta.client.meta.events.unregister('before-sign.s3', fix_s3_host)
+
+ # Create bucket
+ bucket = resource.create_bucket(Bucket=bucket_name)
+
+ # Prep datasink
+ ds.inputs.base_directory = output_dir
+ ds.inputs.container = container
+ ds.inputs.bucket = bucket
+ setattr(ds.inputs, attr_folder, input_path)
+
+ # Run datasink
ds.run()
- yield assert_equal, \
- sorted([os.path.basename(x) for
- x in glob.glob(os.path.join(outdir, '*'))]), \
- ['!-yz-b.n', 'ABABAB.n'] # so we got re used 2nd and both patterns
- shutil.rmtree(indir)
- shutil.rmtree(outdir)
+ # Get MD5sums and compare
+ key = '/'.join([container, attr_folder, os.path.basename(input_path)])
+ obj = bucket.Object(key=key)
+ dst_md5 = obj.e_tag.replace('"', '')
+ src_md5 = hashlib.md5(open(input_path, 'rb').read()).hexdigest()
-@skipif(noboto)
-def test_s3datasink_substitutions():
+ # Kill fakes3
+ proc.kill()
+
+ # Delete fakes3 folder and input file
+ shutil.rmtree(fakes3_dir)
+ shutil.rmtree(os.path.dirname(input_path))
+
+ # Make sure md5sums match
+ yield assert_equal, src_md5, dst_md5
+
+
+# Test AWS creds read from env vars
+@skipif(noboto3 or not fakes3)
+def test_aws_keys_from_env():
+ '''
+ Function to ensure the DataSink can successfully read in AWS
+ credentials from the environment variables
+ '''
+
+ # Import packages
+ import os
+ import nipype.interfaces.io as nio
+
+ # Init variables
+ ds = nio.DataSink()
+ aws_access_key_id = 'ABCDACCESS'
+ aws_secret_access_key = 'DEFGSECRET'
+
+ # Set env vars
+ os.environ['AWS_ACCESS_KEY_ID'] = aws_access_key_id
+ os.environ['AWS_SECRET_ACCESS_KEY'] = aws_secret_access_key
+
+ # Call function to return creds
+ access_key_test, secret_key_test = ds._return_aws_keys()
+
+ # Assert match
+ yield assert_equal, aws_access_key_id, access_key_test
+ yield assert_equal, aws_secret_access_key, secret_key_test
+
+
+# Test the local copy attribute
+def test_datasink_localcopy():
+ '''
+ Function to validate DataSink will make local copy via local_copy
+ attribute
+ '''
+
+ # Import packages
+ import hashlib
+ import tempfile
+
+ # Init variables
+ local_dir = tempfile.mkdtemp()
+ container = 'outputs'
+ attr_folder = 'text_file'
+
+ # Make dummy input file and datasink
+ input_path = _make_dummy_input()
+ ds = nio.DataSink()
+
+ # Set up datasink
+ ds.inputs.container = container
+ ds.inputs.local_copy = local_dir
+ setattr(ds.inputs, attr_folder, input_path)
+
+ # Expected local copy path
+ local_copy = os.path.join(local_dir, container, attr_folder,
+ os.path.basename(input_path))
+
+ # Run the datasink
+ ds.run()
+
+ # Check md5sums of both
+ src_md5 = hashlib.md5(open(input_path, 'rb').read()).hexdigest()
+ dst_md5 = hashlib.md5(open(local_copy, 'rb').read()).hexdigest()
+
+ # Delete temp diretories
+ shutil.rmtree(os.path.dirname(input_path))
+ shutil.rmtree(local_dir)
+
+ # Perform test
+ yield assert_equal, src_md5, dst_md5
+
+
+def test_datasink_substitutions():
indir = mkdtemp(prefix='-Tmp-nipype_ds_subs_in')
outdir = mkdtemp(prefix='-Tmp-nipype_ds_subs_out')
files = []
@@ -217,27 +350,7 @@ def test_s3datasink_substitutions():
f = os.path.join(indir, n)
files.append(f)
open(f, 'w')
-
- # run fakes3 server and set up bucket
- fakes3dir = op.expanduser('~/fakes3')
- try:
- proc = Popen(
- ['fakes3', '-r', fakes3dir, '-p', '4567'], stdout=open(os.devnull, 'wb'))
- except OSError as ose:
- if 'No such file or directory' in str(ose):
- return # fakes3 not installed. OK!
- raise ose
-
- conn = S3Connection(anon=True, is_secure=False, port=4567,
- host='localhost',
- calling_format=OrdinaryCallingFormat())
- conn.create_bucket('test')
-
- ds = nio.S3DataSink(
- testing=True,
- anon=True,
- bucket='test',
- bucket_path='output/',
+ ds = nio.DataSink(
parametrization=False,
base_directory=outdir,
substitutions=[('ababab', 'ABABAB')],
@@ -254,34 +367,6 @@ def test_s3datasink_substitutions():
sorted([os.path.basename(x) for
x in glob.glob(os.path.join(outdir, '*'))]), \
['!-yz-b.n', 'ABABAB.n'] # so we got re used 2nd and both patterns
-
- bkt = conn.get_bucket(ds.inputs.bucket)
- bkt_files = list(k for k in bkt.list())
-
- found = [False, False]
- failed_deletes = 0
- for k in bkt_files:
- if '!-yz-b.n' in k.key:
- found[0] = True
- try:
- bkt.delete_key(k)
- except:
- failed_deletes += 1
- elif 'ABABAB.n' in k.key:
- found[1] = True
- try:
- bkt.delete_key(k)
- except:
- failed_deletes += 1
-
- # ensure delete requests were successful
- yield assert_equal, failed_deletes, 0
-
- # ensure both keys are found in bucket
- yield assert_equal, found.count(True), 2
-
- proc.kill()
- shutil.rmtree(fakes3dir)
shutil.rmtree(indir)
shutil.rmtree(outdir)