Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize S3 re-upload #2289

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 212 additions & 13 deletions physionet-django/project/cloud/s3.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import hashlib
import json
import os
import re
import typing

import boto3
import botocore
import re
import os
import json
from django.conf import settings

from project.models import PublishedProject, AccessPolicy, AWS
from user.models import User
from project.authorization.access import can_view_project_files
from user.models import User


# Manage AWS buckets and objects
Expand Down Expand Up @@ -104,6 +108,162 @@ def create_s3_resource():
raise botocore.exceptions.NoCredentialsError("S3 credentials are undefined.")


class S3ObjectInfo(typing.NamedTuple):
"""
Metadata about an object (file) stored in an S3 bucket.

Attributes:
size (int): Size of file in bytes.
etag (str): HTTP entity tag.
"""
size: int
etag: str

def match_path(self, path):
"""
Check whether the contents of the object match a local file.

Note that because S3 uses the MD5 algorithm, it is possible
for someone to deliberately construct two different files that
have the same ETag, and this function will incorrectly return
True.

Also note that the hash is dependent on the parameters
(multipart_threshold, multipart_chunksize) used when the file
was uploaded to S3, so it is also possible that this function
will return False when in fact the two files are identical.

Args:
path (str): Filesystem path of the local file.

Returns:
bool: True if the local file and remote file contents are
identical (barring MD5 collisions.)
"""
if self.size != os.path.getsize(path):
return False
with open(path, 'rb') as fileobj:
return self.etag == s3_file_etag(fileobj)


def s3_file_etag(fileobj, chunksize=(8 * 1024 * 1024)):
"""
Calculate the ETag that S3 would use for a file.

The ETag used by S3 is dependent on whether the file was uploaded
as a single part or as multiple parts, and if multiple parts are
used, depends on the chunk size. The default behavior of boto3 is
to use a chunk size of 8 MiB.

Args:
fileobj (io.BufferedIOBase): Readable binary file object.
chunksize (int): Chunk size for multi-part uploads.

Returns:
str: Expected value of the ETag, if the given file were
uploaded to S3.
"""
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#large-object-checksums
meta_hash = hashlib.md5()
part_hash = hashlib.md5()
remaining = chunksize
n_parts = 0
while True:
data = fileobj.read(min(remaining, 1024 * 1024))
if not data:
break
part_hash.update(data)
remaining -= len(data)
if remaining == 0:
meta_hash.update(part_hash.digest())
part_hash = hashlib.md5()
remaining = chunksize
n_parts += 1
if n_parts > 0 and remaining < chunksize:
meta_hash.update(part_hash.digest())
n_parts += 1

# Quotation marks are part of the ETag value.
if n_parts == 0:
return '"{}"'.format(part_hash.hexdigest())
else:
return '"{}-{}"'.format(meta_hash.hexdigest(), n_parts)


def get_s3_object_info(s3, bucket_name, key):
"""
Retrieve metadata of an object (file) stored in an S3 bucket.

Args:
s3 (boto3.client.S3): An initialized AWS S3 client object.
bucket_name (str): Name of the bucket.
key (str): Key of the object in the bucket.

Returns:
S3ObjectInfo: Metadata of the object.

Raises:
botocore.exceptions.ClientError: If the object does not exist
or the client does not have permission to access it.

Example:
>>> get_s3_object_info(s3, "physionet-open", "mitdb/1.0.0/100.dat")
S3ObjectInfo(size=1950000, etag='"4968ff1f99b15820d4a7a1d274d13d66"')
"""
response = s3.head_object(Bucket=bucket_name, Key=key)
return S3ObjectInfo(
size=response['ContentLength'],
etag=response['ETag'],
)


def list_s3_subdir_object_info(s3, bucket_name, prefix):
"""
Retrieve metadata of all files in a subdirectory of an S3 bucket.

Args:
s3 (boto3.client.S3): An initialized AWS S3 client object.
bucket_name (str): Name of the bucket.
prefix (str): Prefix of the directory within the bucket.

Returns:
files (dict): A dictionary mapping file paths (keys) to
S3ObjectInfo.
subdirs (list): A list of prefixes representing subdirectories
of the given directory.

Raises:
botocore.exceptions.ClientError: If the bucket does not exist
or the client does not have permission to access it.

Example:
>>> f, d = get_s3_object_info(s3, "physionet-open", "mitdb/1.0.0/")
>>> f['mitdb/1.0.0/100.atr']
S3ObjectInfo(size=4558, etag='"566dc5a685c634deba5a081b4ceec345"')
>>> f['mitdb/1.0.0/100.dat']
S3ObjectInfo(size=1950000, etag='"4968ff1f99b15820d4a7a1d274d13d66"')
>>> d
['mitdb/1.0.0/mitdbdir/', 'mitdb/1.0.0/x_mitdb/']
"""
pages = s3.get_paginator('list_objects_v2').paginate(
Bucket=bucket_name,
Prefix=prefix,
Delimiter='/',
)

files = {}
subdirs = []
for page in pages:
for file_data in page.get('Contents', []):
files[file_data['Key']] = S3ObjectInfo(
size=file_data['Size'],
etag=file_data['ETag'],
)
subdirs += page.get('CommonPrefixes', [])

return files, subdirs


def get_bucket_name(project):
"""
Determine and return the S3 bucket name associated with
Expand Down Expand Up @@ -369,15 +529,26 @@ def send_files_to_s3(folder_path, s3_prefix, bucket_name, project):

s3 = create_s3_client()
for root, _, files in os.walk(folder_path):
if root == folder_path:
dir_prefix = s3_prefix
else:
dir_prefix = os.path.join(
s3_prefix, os.path.relpath(root, folder_path), ''
)
existing_files, existing_subdirs = list_s3_subdir_object_info(
s3, bucket_name, dir_prefix
)

for file_name in files:
local_file_path = os.path.join(root, file_name)
s3_key = os.path.join(
s3_prefix, os.path.relpath(local_file_path, folder_path)
)
s3.upload_file(
Filename=local_file_path,
Bucket=bucket_name,
Key=s3_key,

# Upload file if not already up-to-date
send_file_to_s3(
s3, bucket_name, s3_key, local_file_path,
existing_files.get(s3_key)
)

# If project has a ZIP file, upload it as well
Expand All @@ -389,11 +560,39 @@ def send_files_to_s3(folder_path, s3_prefix, bucket_name, project):
else:
s3_key = zip_name

s3.upload_file(
Filename=zip_file_path,
Bucket=bucket_name,
Key=s3_key,
)
# Upload file if not already up-to-date
try:
existing_file = get_s3_object_info(s3, bucket_name, s3_key)
except s3.exceptions.ClientError:
existing_file = None
send_file_to_s3(s3, bucket_name, s3_key, zip_file_path, existing_file)


def send_file_to_s3(s3, bucket_name, key, file_path, existing_file=None):
"""
Upload a local file to an AWS S3 bucket.

If the remote file already exists and matches the contents of the
local file, the remote file is not modified.

Args:
s3 (boto3.client.S3): An initialized AWS S3 client object.
bucket_name (str): Destination bucket name.
key (str): Destination key within the bucket.
file_path (str): Filesystem path of the local file.
existing_file (S3ObjectInfo or None): Metadata of the existing
object, if any.

Returns:
None
"""
if existing_file is not None and existing_file.match_path(file_path):
return
s3.upload_file(
Filename=file_path,
Bucket=bucket_name,
Key=key,
)


def get_aws_accounts_for_dataset(dataset_name):
Expand Down
53 changes: 53 additions & 0 deletions physionet-django/project/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,59 @@ def test_upload_open_projects(self):

self.assertEqual(bucket_files, expected_files)

def test_reupload_open_project(self):
"""
Test re-uploading a project after modifying its published content.
"""
create_s3_server_access_log_bucket()

project = PublishedProject.objects.get(slug='demobsn', version='1.0')

# Create files of various sizes to test multi-part uploads.
os.chmod(project.file_root(), 0o755)
for size_mb in [0, 8, 16, 17.1]:
path = os.path.join(project.file_root(), str(size_mb))
with open(path, 'wb') as f:
f.write(b'x' * int(size_mb * 1024 * 1024))

# Upload the project.
upload_project_to_S3(project)

# List the objects that were uploaded, and add a custom tag to each.
s3 = create_s3_client()
bucket = get_bucket_name(project)
objects = s3.list_objects_v2(Bucket=bucket)
custom_tagset = [{'Key': 'test-reupload', 'Value': '1'}]
for object_info in objects['Contents']:
s3.put_object_tagging(
Bucket=bucket, Key=object_info['Key'],
Tagging={'TagSet': custom_tagset},
)

# Modify some existing files.
alter_paths = ['data1.txt', 'scripts/lib.py']
for path in alter_paths:
os.chmod(os.path.join(project.file_root(), path), 0o644)
with open(os.path.join(project.file_root(), path), 'a') as f:
f.write('# additional content\n')

# Re-upload the project. This should update only the files
# that were modified above.
project = PublishedProject.objects.get(slug='demobsn', version='1.0')
upload_project_to_S3(project)

# All of the objects that were not modified should still have
# the custom tag; modified objects should have been replaced
# and their tags should be empty.
project_prefix = project.slug + '/' + project.version + '/'
for object_info in objects['Contents']:
key = object_info['Key']
tags = s3.get_object_tagging(Bucket=bucket, Key=key)
if key.removeprefix(project_prefix) in alter_paths:
self.assertEqual(tags['TagSet'], [], key)
else:
self.assertEqual(tags['TagSet'], custom_tagset, key)

def assert_bucket_is_public(self, bucket_name):
"""
Check that a bucket exists and allows some form of public access.
Expand Down
Loading