Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,7 @@
CURRENT_VERSION = __version__
FRAMEWORKS = ["tensorflow", "pytorch", "mxnet", "xgboost"]
TESTS_PACKAGES = ["pytest", "torchvision", "pandas"]
INSTALL_REQUIRES = [
# aiboto3 implicitly depends on aiobotocore
"aioboto3==6.4.1", # no version deps
"aiobotocore==0.11.0", # pinned to a specific botocore & boto3
"aiohttp>=3.6.0,<4.0", # aiobotocore breaks with 4.0
# boto3 explicitly depends on botocore
"boto3>=1.10.32", # Sagemaker requires >= 1.9.213
"botocore>=1.13.32",
"nest_asyncio",
"protobuf>=3.6.0",
"numpy",
"packaging",
]
INSTALL_REQUIRES = ["protobuf>=3.6.0", "numpy", "packaging", "boto3>=1.10.32"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we pin all these versions to a unique version ?

Copy link
Contributor Author

@rahul003 rahul003 Dec 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python dependency management works different than other languages, due to how PyPI dependency resolution works.
I did some research about it. Here's a reference https://stackoverflow.com/a/44938662. As library developers, we should not be pinning to any specific version. The maintainers of environments which include this library, or users of library can pin to specific versions of dependencies they use. If the library itself pins a specific version of a dependency, then some other package which also depends on that same dependency in the same environment can't be installed due to version conflicts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not block this PR. Let's talk about pinning, goals behind it and how can we achieve those goals.



def compile_summary_protobuf():
Expand Down
381 changes: 190 additions & 191 deletions smdebug/core/access_layer/s3handler.py

Large diffs are not rendered by default.

31 changes: 8 additions & 23 deletions smdebug/core/access_layer/utils.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
# Standard Library
import asyncio
import os

# Third Party
import aioboto3
from botocore.exceptions import ClientError

# First Party
from smdebug.core.access_layer.s3handler import ListRequest, S3Handler
from smdebug.core.access_layer.s3handler import DeleteRequest, ListRequest, S3Handler
from smdebug.core.logger import get_logger
from smdebug.core.sagemaker_utils import is_sagemaker_job
from smdebug.core.utils import get_region, is_s3
from smdebug.core.utils import is_s3

# Local
from .file import TSAccessFile
Expand Down Expand Up @@ -55,9 +53,8 @@ def has_training_ended(trial_prefix):
s3, bucket_name, key_name = is_s3(file_path)
if s3:
try:
s3_handler = S3Handler()
request = ListRequest(bucket_name, key_name)
file_available = s3_handler.list_prefixes([request])[0]
file_available = S3Handler.list_prefixes([request])[0]
if len(file_available) > 0:
return True
else:
Expand All @@ -74,23 +71,12 @@ def has_training_ended(trial_prefix):


def delete_s3_prefixes(bucket, keys):
s3_handler = S3Handler()
if not isinstance(keys, list):
keys = [keys]
list_prefixes = s3_handler.list_prefixes(
[ListRequest(Bucket=bucket, Prefix=key) for key in keys]
)
prefixes = [item for sublist in list_prefixes for item in sublist]
loop = asyncio.get_event_loop()

async def del_folder(bucket, keys):
loop = asyncio.get_event_loop()
client = aioboto3.client("s3", loop=loop, region_name=get_region())
await asyncio.gather(*[client.delete_object(Bucket=bucket, Key=key) for key in keys])
await client.close()

task = loop.create_task(del_folder(bucket, prefixes))
loop.run_until_complete(task)
delreqs = []
for key in keys:
delreqs.append(DeleteRequest(bucket, key))
S3Handler.delete_prefixes(delreqs)


def check_dir_exists(path):
Expand All @@ -99,9 +85,8 @@ def check_dir_exists(path):
s3, bucket_name, key_name = is_s3(path)
if s3:
try:
s3_handler = S3Handler()
request = ListRequest(bucket_name, key_name)
folder = s3_handler.list_prefixes([request])[0]
folder = S3Handler.list_prefixes([request])[0]
if len(folder) > 0 and has_training_ended(folder[-1]):
raise RuntimeError(
"The path:{} already exists on s3. "
Expand Down
6 changes: 2 additions & 4 deletions smdebug/core/index_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,6 @@ def __init__(self, path):
super().__init__(path)
self.path = path
_, self.bucket_name, self.prefix_name = is_s3(path)
self.s3_handler = S3Handler()

self.index_file_cache = ReadIndexFilesCache()

def _is_event_file_present(self, file):
Expand All @@ -274,7 +272,7 @@ def fetch_tensor_value(self, tensor_location: TensorLocation) -> np.ndarray:
start = tensor_location.start_idx
length = tensor_location.length
request = [ReadObjectRequest(event_file_name, int(start), int(length))]
res = self.s3_handler.get_objects(request)
res = S3Handler.get_objects(request)
tr = TensorReader(res[0]) # Access the only element in res
tensor_tuple = list(tr.read_tensors())[0] # Access the only element in the list
tensor_name, step, tensor_data, mode, mode_step = tensor_tuple
Expand Down Expand Up @@ -323,7 +321,7 @@ def read_index_files(
)
self.index_file_cache.add(index_file, start_after_key)

responses = self.s3_handler.get_objects(object_requests)
responses = S3Handler.get_objects(object_requests)
return responses, steps, start_after_key, workers

def list_index_files(self, start_after_key=None):
Expand Down
2 changes: 1 addition & 1 deletion smdebug/core/s3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

# list_info will be a list of ListRequest objects. Returns list of lists of files for each request
def _list_s3_prefixes(list_info):
files = S3Handler().list_prefixes(list_info)
files = S3Handler.list_prefixes(list_info)
if len(files) == 1:
files = files[0]
return files
Expand Down
3 changes: 1 addition & 2 deletions smdebug/trials/s3_trial.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def __init__(
self.prefix_name = os.path.join(prefix_name, "")
self.path = "s3://" + os.path.join(self.bucket_name, self.prefix_name)
self.index_reader = S3IndexReader(self.path)
self.s3_handler = S3Handler()
self._load_collections()
self._load_tensors()

Expand All @@ -67,7 +66,7 @@ def _read_collections(self, collection_files):
first_collection_file = collection_files[0] # First Collection File
key = os.path.join(first_collection_file)
collections_req = ReadObjectRequest(self._get_s3_location(key))
obj_data = self.s3_handler.get_objects([collections_req])[0]
obj_data = S3Handler.get_objects([collections_req])[0]
obj_data = obj_data.decode("utf-8")
self.collection_manager = CollectionManager.load_from_string(obj_data)
self.num_workers = self.collection_manager.get_num_workers()
Expand Down
19 changes: 2 additions & 17 deletions tests/analysis/utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
# Standard Library
import asyncio
import os

# Third Party
import aioboto3
import numpy as np

# First Party
from smdebug.core.access_layer.s3handler import ListRequest, S3Handler
from smdebug.core.access_layer.s3handler import DeleteRequest, S3Handler
from smdebug.core.collection_manager import CollectionManager
from smdebug.core.config_constants import DEFAULT_COLLECTIONS_FILE_NAME
from smdebug.core.writer import FileWriter
Expand Down Expand Up @@ -51,18 +49,5 @@ def check_trial(trial_obj, num_steps, num_tensors):
assert v is not None


async def del_prefix_helper(bucket, keys):
loop = asyncio.get_event_loop()
client = aioboto3.client("s3", loop=loop)
await asyncio.gather(*[client.delete_object(Bucket=bucket, Key=key) for key in keys])
await client.close()


def delete_s3_prefix(bucket, prefix):
s3_handler = S3Handler()
list_req = [ListRequest(Bucket=bucket, Prefix=prefix)]
keys = s3_handler.list_prefixes(list_req)[0]

loop = asyncio.get_event_loop()
task = loop.create_task(del_prefix_helper(bucket, keys))
loop.run_until_complete(task)
S3Handler.delete_prefix(delete_request=DeleteRequest(Bucket=bucket, Prefix=prefix))
Loading