diff --git a/pipe-cli/pipe.py b/pipe-cli/pipe.py index 15001c70c9..57ebdec49d 100644 --- a/pipe-cli/pipe.py +++ b/pipe-cli/pipe.py @@ -1074,8 +1074,9 @@ def mvtodir(name, directory): help="Option for configuring storage summary details listing mode. Possible values: " "compact - brief summary only (default); " "full - show extended details, works for the storage summary listing only") +@click.option('-g', '--show-archive', is_flag=True, help='Show archived files.') @common_options -def storage_list(path, show_details, show_versions, recursive, page, all, output): +def storage_list(path, show_details, show_versions, recursive, page, all, output, show_archive): """Lists storage contents """ show_extended = False @@ -1084,7 +1085,8 @@ def storage_list(path, show_details, show_versions, recursive, page, all, output click.echo('Extended output could be configured for the storage summary listing only!', err=True) sys.exit(1) show_extended = True - DataStorageOperations.storage_list(path, show_details, show_versions, recursive, page, all, show_extended) + DataStorageOperations.storage_list(path, show_details, show_versions, recursive, page, all, show_extended, + show_archive) @storage.command(name='mkdir') diff --git a/pipe-cli/src/api/datastorage_lifecycle.py b/pipe-cli/src/api/datastorage_lifecycle.py new file mode 100644 index 0000000000..d0f2292023 --- /dev/null +++ b/pipe-cli/src/api/datastorage_lifecycle.py @@ -0,0 +1,36 @@ +# Copyright 2023 EPAM Systems, Inc. (https://www.epam.com/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .base import API +from ..model.data_storage_lifecycle_model import DataStorageLifecycleModel + + +class DataStorageLifecycle(API): + + def __init__(self): + super(DataStorageLifecycle, self).__init__() + + @classmethod + def load_hierarchy(cls, storage_id, path, is_file=False): + api = cls.instance() + request_url = '/datastorage/%s/lifecycle/restore/effectiveHierarchy?path=%s&pathType=%s' \ + % (str(storage_id), path, 'FILE' if is_file else 'FOLDER&recursive=true') + response_data = api.call(request_url, None) + if 'payload' not in response_data: + return None + items = [] + for lifecycles_json in response_data['payload']: + lifecycle = DataStorageLifecycleModel.load(lifecycles_json) + items.append(lifecycle) + return items diff --git a/pipe-cli/src/model/data_storage_lifecycle_model.py b/pipe-cli/src/model/data_storage_lifecycle_model.py new file mode 100644 index 0000000000..277a8402d6 --- /dev/null +++ b/pipe-cli/src/model/data_storage_lifecycle_model.py @@ -0,0 +1,60 @@ +# Copyright 2023 EPAM Systems, Inc. (https://www.epam.com/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from src.utilities import date_utilities + + +class DataStorageLifecycleModel: + + def __init__(self): + self.id = None + self.datastorage_id = None + self.user_actor_id = None + self.path = None + self.type = None + self.restore_versions = None + self.restore_mode = None + self.days = None + self.started = None + self.updated = None + self.restored_till = None + self.status = None + + @classmethod + def load(cls, json): + model = DataStorageLifecycleModel() + model.id = json['id'] + if 'datastorageId' in json: + model.datastorage_id = int(json['datastorageId']) + if 'userActorId' in json: + model.user_actor_id = int(json['userActorId']) + if 'path' in json: + model.path = json['path'] + if 'type' in json: + model.type = json['type'] + if 'restoreVersions' in json: + model.restore_versions = json['restoreVersions'] + if 'restoreMode' in json: + model.restore_mode = json['restoreMode'] + if 'days' in json: + model.days = int(json['days']) + if 'started' in json: + model.started = date_utilities.server_date_representation(json['started']) + if 'updated' in json: + model.updated = date_utilities.server_date_representation(json['updated']) + if 'restoredTill' in json: + model.restored_till = date_utilities.server_date_representation(json['restoredTill']) + if 'status' in json: + model.status = json['status'] + return model diff --git a/pipe-cli/src/utilities/datastorage_lifecycle_manager.py b/pipe-cli/src/utilities/datastorage_lifecycle_manager.py new file mode 100644 index 0000000000..7582494c77 --- /dev/null +++ b/pipe-cli/src/utilities/datastorage_lifecycle_manager.py @@ -0,0 +1,52 @@ +# Copyright 2023 EPAM Systems, Inc. (https://www.epam.com/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from src.api.datastorage_lifecycle import DataStorageLifecycle + + +class DataStorageLifecycleManager: + + def __init__(self, storage_id, path, is_file): + self.storage_id = storage_id + self.path = path + self.is_file = is_file + self.items = None + self.sorted_paths = [] + + def find_lifecycle_status(self, file_path): + if self.items is None: + self.load_items() + if not file_path.startswith('/'): + file_path = '/' + file_path + for path in self.sorted_paths: + if path == file_path or file_path.startswith(path): + item = self.items.get(path) + if not item: + return None, None + if item.status == 'SUCCEEDED': + restored_till = ' till %s' % item.restored_till\ + if item.restored_till else '' + return ' (Restored%s)' % restored_till, item.restore_versions + else: + return None, item.restore_versions + return None, None + + def load_items(self): + items = DataStorageLifecycle.load_hierarchy(self.storage_id, self.path, self.is_file) + self.items = {} + if not items: + return + for item in items: + self.items.update({item.path: item}) + self.sorted_paths = sorted([item.path for item in items], key=len, reverse=True) diff --git a/pipe-cli/src/utilities/datastorage_operations.py b/pipe-cli/src/utilities/datastorage_operations.py index 8d310cfc98..ef21ae739c 100644 --- a/pipe-cli/src/utilities/datastorage_operations.py +++ b/pipe-cli/src/utilities/datastorage_operations.py @@ -21,6 +21,8 @@ import datetime import prettytable import sys + +from botocore.exceptions import ClientError from future.utils import iteritems from operator import itemgetter @@ -300,7 +302,7 @@ def restore(cls, path, version, recursive, exclude, include): manager.restore_version(version, exclude, include, recursive=recursive) @classmethod - def storage_list(cls, path, show_details, show_versions, recursive, page, show_all, show_extended): + def storage_list(cls, path, show_details, show_versions, recursive, page, show_all, show_extended, show_archive): """Lists storage contents """ if path: @@ -313,10 +315,14 @@ def storage_list(cls, path, show_details, show_versions, recursive, page, show_a if root_bucket is None: click.echo('Storage path "{}" was not found'.format(path), err=True) sys.exit(1) + if show_archive and root_bucket.type != 'S3': + click.echo('Error: --show-archive option is not available for this provider.', err=True) + sys.exit(1) else: relative_path = original_path if original_path != '/' else '' cls.__print_data_storage_contents(root_bucket, relative_path, show_details, recursive, - page_size=page, show_versions=show_versions, show_all=show_all) + page_size=page, show_versions=show_versions, show_all=show_all, + show_archive=show_archive) else: # If no argument is specified - list brief details of all buckets cls.__print_data_storage_contents(None, None, show_details, recursive, show_all=show_all, @@ -427,14 +433,15 @@ def __load_storage_list(cls, extended=False): @classmethod def __print_data_storage_contents(cls, bucket_model, relative_path, show_details, recursive, page_size=None, - show_versions=False, show_all=False, show_extended=False): + show_versions=False, show_all=False, show_extended=False, show_archive=False): items = [] header = None if bucket_model is not None: wrapper = DataStorageWrapper.get_cloud_wrapper_for_bucket(bucket_model, relative_path) manager = wrapper.get_list_manager(show_versions=show_versions) - items = manager.list_items(relative_path, recursive=recursive, page_size=page_size, show_all=show_all) + items = manager.list_items(relative_path, recursive=recursive, page_size=page_size, show_all=show_all, + show_archive=show_archive) else: hidden_object_manager = HiddenObjectManager() # If no argument is specified - list brief details of all buckets @@ -636,6 +643,12 @@ def _transfer_item(cls, item, manager, source_wrapper, destination_wrapper, tran transfer_results = cls._flush_transfer_results(source_wrapper, destination_wrapper, transfer_results, clean=clean) except Exception as e: + if isinstance(e, ClientError) \ + and e.message and 'InvalidObjectState' in e.message and 'storage class' in e.message: + if not quiet: + click.echo(u'File {} transferring has failed. Archived file shall be restored first.' + .format(full_path)) + return transfer_results, fail_after_exception if on_failures == AllowedFailuresValues.FAIL: err_msg = u'File transferring has failed {}. Exiting...'.format(full_path) logging.warn(err_msg) diff --git a/pipe-cli/src/utilities/storage/azure.py b/pipe-cli/src/utilities/storage/azure.py index 0481c85aea..087f7433a5 100644 --- a/pipe-cli/src/utilities/storage/azure.py +++ b/pipe-cli/src/utilities/storage/azure.py @@ -80,7 +80,7 @@ def __init__(self, blob_service, bucket): self.delimiter = StorageOperations.PATH_SEPARATOR def list_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE, - show_all=False): + show_all=False, show_archive=False): prefix = StorageOperations.get_prefix(relative_path) blobs_generator = self.service.list_blobs(self.bucket.path, prefix=prefix if relative_path else None, diff --git a/pipe-cli/src/utilities/storage/common.py b/pipe-cli/src/utilities/storage/common.py index c71ce53fe2..f6f44bc3f4 100644 --- a/pipe-cli/src/utilities/storage/common.py +++ b/pipe-cli/src/utilities/storage/common.py @@ -294,7 +294,7 @@ class AbstractListingManager: @abstractmethod def list_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE, - show_all=False): + show_all=False, show_archive=False): """ Lists files and folders by a relative path in the current storage. @@ -302,6 +302,7 @@ def list_items(self, relative_path=None, recursive=False, page_size=StorageOpera :param recursive: Specifies if the listing has to be recursive. :param page_size: Max number of items to return. The argument is ignored if show_all argument is specified. :param show_all: Specifies if all items have to be listed. + :param show_archive: Specifies if archived items have to be listed """ pass diff --git a/pipe-cli/src/utilities/storage/gs.py b/pipe-cli/src/utilities/storage/gs.py index f894035dba..3f058be052 100644 --- a/pipe-cli/src/utilities/storage/gs.py +++ b/pipe-cli/src/utilities/storage/gs.py @@ -394,7 +394,7 @@ def __init__(self, client, bucket, show_versions=False): self.show_versions = show_versions def list_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE, - show_all=False): + show_all=False, show_archive=False): prefix = StorageOperations.get_prefix(relative_path) bucket = self.client.bucket(self.bucket.path) blobs_iterator = bucket.list_blobs(prefix=prefix if relative_path else None, diff --git a/pipe-cli/src/utilities/storage/s3.py b/pipe-cli/src/utilities/storage/s3.py index a41325b3de..bb28fca830 100644 --- a/pipe-cli/src/utilities/storage/s3.py +++ b/pipe-cli/src/utilities/storage/s3.py @@ -15,6 +15,7 @@ from boto3.s3.transfer import TransferConfig from botocore.endpoint import BotocoreHTTPSession, MAX_POOL_CONNECTIONS +from src.utilities.datastorage_lifecycle_manager import DataStorageLifecycleManager from src.utilities.encoding_utilities import to_string, to_ascii, is_safe_chars from src.utilities.storage.s3_proxy_utils import AwsProxyConnectWithHeadersHTTPSAdapter from src.utilities.storage.storage_usage import StorageUsageAccumulator @@ -449,7 +450,7 @@ def __init__(self, bucket, session, show_versions=False, region_name=None): self.show_versions = show_versions def list_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE, - show_all=False): + show_all=False, show_archive=False): delimiter = S3BucketOperations.S3_PATH_SEPARATOR client = self._get_client() operation_parameters = { @@ -469,9 +470,9 @@ def list_items(self, relative_path=None, recursive=False, page_size=StorageOpera operation_parameters['Prefix'] = prefix if self.show_versions: - return self.list_versions(client, prefix, operation_parameters, recursive, page_size) + return self.list_versions(client, prefix, operation_parameters, recursive, page_size, show_archive) else: - return self.list_objects(client, prefix, operation_parameters, recursive, page_size) + return self.list_objects(client, prefix, operation_parameters, recursive, page_size, show_archive) def get_summary_with_depth(self, max_depth, relative_path=None): bucket_name = self.bucket.bucket.path @@ -525,12 +526,29 @@ def get_summary(self, relative_path=None): break return delimiter.join([self.bucket.bucket.path, relative_path]), total_objects, total_size - def list_versions(self, client, prefix, operation_parameters, recursive, page_size): + @classmethod + def prefix_match(cls, page_file, relative_path=None): + if not relative_path: + return True + if 'Key' not in page_file or not page_file['Key']: + return False + key = page_file['Key'] + if key == relative_path: + return True + if relative_path.endswith(S3BucketOperations.S3_PATH_SEPARATOR): + return True + if key.startswith("%s%s" % (relative_path, S3BucketOperations.S3_PATH_SEPARATOR)): + return True + return False + + def list_versions(self, client, prefix, operation_parameters, recursive, page_size, show_archive): paginator = client.get_paginator('list_object_versions') page_iterator = paginator.paginate(**operation_parameters) items = [] item_keys = collections.OrderedDict() items_count = 0 + lifecycle_manager = DataStorageLifecycleManager(self.bucket.bucket.identifier, prefix, self.bucket.is_file_flag) + for page in page_iterator: if 'CommonPrefixes' in page: for folder in page['CommonPrefixes']: @@ -540,7 +558,17 @@ def list_versions(self, client, prefix, operation_parameters, recursive, page_s if 'Versions' in page: for version in page['Versions']: name = self.get_file_name(version, prefix, recursive) - item = self.get_file_object(version, name, version=True) + restore_status = None + if version['StorageClass'] != 'STANDARD': + restore_status, versions_restored = lifecycle_manager.find_lifecycle_status(name) + version_not_restored = restore_status and not version['IsLatest'] and not versions_restored + if not show_archive: + if not restore_status or version_not_restored: + continue + else: + if version_not_restored: + restore_status = None + item = self.get_file_object(version, name, version=True, lifecycle_status=restore_status) self.process_version(item, item_keys, name) if 'DeleteMarkers' in page: for delete_marker in page['DeleteMarkers']: @@ -568,11 +596,13 @@ def process_version(self, item, item_keys, name): item.versions = versions item_keys[name] = item - def list_objects(self, client, prefix, operation_parameters, recursive, page_size): + def list_objects(self, client, prefix, operation_parameters, recursive, page_size, show_archive): paginator = client.get_paginator('list_objects_v2') page_iterator = paginator.paginate(**operation_parameters) items = [] items_count = 0 + lifecycle_manager = DataStorageLifecycleManager(self.bucket.bucket.identifier, prefix, self.bucket.is_file_flag) + for page in page_iterator: if 'CommonPrefixes' in page: for folder in page['CommonPrefixes']: @@ -582,14 +612,19 @@ def list_objects(self, client, prefix, operation_parameters, recursive, page_siz if 'Contents' in page: for file in page['Contents']: name = self.get_file_name(file, prefix, recursive) - item = self.get_file_object(file, name) + lifecycle_status = None + if file['StorageClass'] != 'STANDARD': + lifecycle_status, _ = lifecycle_manager.find_lifecycle_status(name) + if not show_archive and not lifecycle_status: + continue + item = self.get_file_object(file, name, lifecycle_status=lifecycle_status) items.append(item) items_count += 1 if self.need_to_stop_paging(page, page_size, items_count): break return items - def get_file_object(self, file, name, version=False, storage_class=True): + def get_file_object(self, file, name, version=False, storage_class=True, lifecycle_status=None): item = DataStorageItemModel() item.type = 'File' item.name = name @@ -598,7 +633,8 @@ def get_file_object(self, file, name, version=False, storage_class=True): item.path = name item.changed = file['LastModified'].astimezone(Config.instance().timezone()) if storage_class: - item.labels = [DataStorageItemLabelModel('StorageClass', file['StorageClass'])] + lifecycle_status = lifecycle_status if lifecycle_status else '' + item.labels = [DataStorageItemLabelModel('StorageClass', file['StorageClass'] + lifecycle_status)] if version: item.version = file['VersionId'] item.latest = file['IsLatest']