Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Archived S3 files shall be excluded from pipe storage & pipe mount operations #3035

Merged
merged 4 commits into from
Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pipe-cli/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1075,8 +1075,9 @@ def mvtodir(name, directory):
help="Option for configuring storage summary details listing mode. Possible values: "
"compact - brief summary only (default); "
"full - show extended details, works for the storage summary listing only")
@click.option('-g', '--show-archive', is_flag=True, help='Show archived files.')
@common_options
def storage_list(path, show_details, show_versions, recursive, page, all, output):
def storage_list(path, show_details, show_versions, recursive, page, all, output, show_archive):
"""Lists storage contents
"""
show_extended = False
Expand All @@ -1085,7 +1086,8 @@ def storage_list(path, show_details, show_versions, recursive, page, all, output
click.echo('Extended output could be configured for the storage summary listing only!', err=True)
sys.exit(1)
show_extended = True
DataStorageOperations.storage_list(path, show_details, show_versions, recursive, page, all, show_extended)
DataStorageOperations.storage_list(path, show_details, show_versions, recursive, page, all, show_extended,
show_archive)


@storage.command(name='mkdir')
Expand Down
36 changes: 36 additions & 0 deletions pipe-cli/src/api/datastorage_lifecycle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright 2023 EPAM Systems, Inc. (https://www.epam.com/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .base import API
from ..model.data_storage_lifecycle_model import DataStorageLifecycleModel


class DataStorageLifecycle(API):

def __init__(self):
super(DataStorageLifecycle, self).__init__()

@classmethod
def load_hierarchy(cls, storage_id, path, is_file=False):
api = cls.instance()
request_url = '/datastorage/%s/lifecycle/restore/effectiveHierarchy?path=%s&pathType=%s' \
% (str(storage_id), path, 'FILE' if is_file else 'FOLDER&recursive=true')
response_data = api.call(request_url, None)
if 'payload' not in response_data:
return None
items = []
for lifecycles_json in response_data['payload']:
lifecycle = DataStorageLifecycleModel.load(lifecycles_json)
items.append(lifecycle)
return items
60 changes: 60 additions & 0 deletions pipe-cli/src/model/data_storage_lifecycle_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright 2023 EPAM Systems, Inc. (https://www.epam.com/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from src.utilities import date_utilities


class DataStorageLifecycleModel:

def __init__(self):
self.id = None
self.datastorage_id = None
self.user_actor_id = None
self.path = None
self.type = None
self.restore_versions = None
self.restore_mode = None
self.days = None
self.started = None
self.updated = None
self.restored_till = None
self.status = None

@classmethod
def load(cls, json):
model = DataStorageLifecycleModel()
model.id = json['id']
if 'datastorageId' in json:
model.datastorage_id = int(json['datastorageId'])
if 'userActorId' in json:
model.user_actor_id = int(json['userActorId'])
if 'path' in json:
model.path = json['path']
if 'type' in json:
model.type = json['type']
if 'restoreVersions' in json:
model.restore_versions = json['restoreVersions']
if 'restoreMode' in json:
model.restore_mode = json['restoreMode']
if 'days' in json:
model.days = int(json['days'])
if 'started' in json:
model.started = date_utilities.server_date_representation(json['started'])
if 'updated' in json:
model.updated = date_utilities.server_date_representation(json['updated'])
if 'restoredTill' in json:
model.restored_till = date_utilities.server_date_representation(json['restoredTill'])
if 'status' in json:
model.status = json['status']
return model
52 changes: 52 additions & 0 deletions pipe-cli/src/utilities/datastorage_lifecycle_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright 2023 EPAM Systems, Inc. (https://www.epam.com/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from src.api.datastorage_lifecycle import DataStorageLifecycle


class DataStorageLifecycleManager:

def __init__(self, storage_id, path, is_file):
self.storage_id = storage_id
self.path = path
self.is_file = is_file
self.items = None
self.sorted_paths = []

def find_lifecycle_status(self, file_path):
if self.items is None:
self.load_items()
if not file_path.startswith('/'):
file_path = '/' + file_path
for path in self.sorted_paths:
if path == file_path or file_path.startswith(path):
item = self.items.get(path)
if not item:
return None, None
if item.status == 'SUCCEEDED':
restored_till = ' till %s' % item.restored_till\
if item.restored_till else ''
return ' (Restored%s)' % restored_till, item.restore_versions
else:
return None, item.restore_versions
return None, None

def load_items(self):
items = DataStorageLifecycle.load_hierarchy(self.storage_id, self.path, self.is_file)
self.items = {}
if not items:
return
for item in items:
self.items.update({item.path: item})
self.sorted_paths = sorted([item.path for item in items], key=len, reverse=True)
21 changes: 17 additions & 4 deletions pipe-cli/src/utilities/datastorage_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import datetime
import prettytable
import sys

from botocore.exceptions import ClientError
from future.utils import iteritems
from operator import itemgetter

Expand Down Expand Up @@ -301,7 +303,7 @@ def restore(cls, path, version, recursive, exclude, include):
manager.restore_version(version, exclude, include, recursive=recursive)

@classmethod
def storage_list(cls, path, show_details, show_versions, recursive, page, show_all, show_extended):
def storage_list(cls, path, show_details, show_versions, recursive, page, show_all, show_extended, show_archive):
"""Lists storage contents
"""
if path:
Expand All @@ -314,10 +316,14 @@ def storage_list(cls, path, show_details, show_versions, recursive, page, show_a
if root_bucket is None:
click.echo('Storage path "{}" was not found'.format(path), err=True)
sys.exit(1)
if show_archive and root_bucket.type != 'S3':
click.echo('Error: --show-archive option is not available for this provider.', err=True)
sys.exit(1)
else:
relative_path = original_path if original_path != '/' else ''
cls.__print_data_storage_contents(root_bucket, relative_path, show_details, recursive,
page_size=page, show_versions=show_versions, show_all=show_all)
page_size=page, show_versions=show_versions, show_all=show_all,
show_archive=show_archive)
else:
# If no argument is specified - list brief details of all buckets
cls.__print_data_storage_contents(None, None, show_details, recursive, show_all=show_all,
Expand Down Expand Up @@ -428,14 +434,15 @@ def __load_storage_list(cls, extended=False):

@classmethod
def __print_data_storage_contents(cls, bucket_model, relative_path, show_details, recursive, page_size=None,
show_versions=False, show_all=False, show_extended=False):
show_versions=False, show_all=False, show_extended=False, show_archive=False):

items = []
header = None
if bucket_model is not None:
wrapper = DataStorageWrapper.get_cloud_wrapper_for_bucket(bucket_model, relative_path)
manager = wrapper.get_list_manager(show_versions=show_versions)
items = manager.list_items(relative_path, recursive=recursive, page_size=page_size, show_all=show_all)
items = manager.list_items(relative_path, recursive=recursive, page_size=page_size, show_all=show_all,
show_archive=show_archive)
else:
hidden_object_manager = HiddenObjectManager()
# If no argument is specified - list brief details of all buckets
Expand Down Expand Up @@ -637,6 +644,12 @@ def _transfer_item(cls, item, manager, source_wrapper, destination_wrapper, tran
transfer_results = cls._flush_transfer_results(source_wrapper, destination_wrapper,
transfer_results, clean=clean)
except Exception as e:
if isinstance(e, ClientError) \
and e.message and 'InvalidObjectState' in e.message and 'storage class' in e.message:
if not quiet:
click.echo(u'File {} transferring has failed. Archived file shall be restored first.'
.format(full_path))
return transfer_results, fail_after_exception
if on_failures == AllowedFailuresValues.FAIL:
err_msg = u'File transferring has failed {}. Exiting...'.format(full_path)
logging.warn(err_msg)
Expand Down
2 changes: 1 addition & 1 deletion pipe-cli/src/utilities/storage/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def __init__(self, blob_service, bucket):
self.delimiter = StorageOperations.PATH_SEPARATOR

def list_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE,
show_all=False):
show_all=False, show_archive=False):
prefix = StorageOperations.get_prefix(relative_path)
blobs_generator = self.service.list_blobs(self.bucket.path,
prefix=prefix if relative_path else None,
Expand Down
3 changes: 2 additions & 1 deletion pipe-cli/src/utilities/storage/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,15 @@ class AbstractListingManager:

@abstractmethod
def list_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE,
show_all=False):
show_all=False, show_archive=False):
"""
Lists files and folders by a relative path in the current storage.

:param relative_path: Storage relative path to be listed.
:param recursive: Specifies if the listing has to be recursive.
:param page_size: Max number of items to return. The argument is ignored if show_all argument is specified.
:param show_all: Specifies if all items have to be listed.
:param show_archive: Specifies if archived items have to be listed
"""
pass

Expand Down
2 changes: 1 addition & 1 deletion pipe-cli/src/utilities/storage/gs.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ def __init__(self, client, bucket, show_versions=False):
self.show_versions = show_versions

def list_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE,
show_all=False):
show_all=False, show_archive=False):
prefix = StorageOperations.get_prefix(relative_path)
bucket = self.client.bucket(self.bucket.path)
blobs_iterator = bucket.list_blobs(prefix=prefix if relative_path else None,
Expand Down
39 changes: 30 additions & 9 deletions pipe-cli/src/utilities/storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from boto3.s3.transfer import TransferConfig
from botocore.endpoint import BotocoreHTTPSession, MAX_POOL_CONNECTIONS

from src.utilities.datastorage_lifecycle_manager import DataStorageLifecycleManager
from src.utilities.encoding_utilities import to_string, to_ascii, is_safe_chars
from src.utilities.storage.s3_proxy_utils import AwsProxyConnectWithHeadersHTTPSAdapter
from src.utilities.storage.storage_usage import StorageUsageAccumulator
Expand Down Expand Up @@ -595,7 +596,7 @@ def __init__(self, bucket, session, show_versions=False, region_name=None):
self.show_versions = show_versions

def list_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE,
show_all=False):
show_all=False, show_archive=False):
delimiter = S3BucketOperations.S3_PATH_SEPARATOR
client = self._get_client()
operation_parameters = {
Expand All @@ -615,9 +616,9 @@ def list_items(self, relative_path=None, recursive=False, page_size=StorageOpera
operation_parameters['Prefix'] = prefix

if self.show_versions:
return self.list_versions(client, prefix, operation_parameters, recursive, page_size)
return self.list_versions(client, prefix, operation_parameters, recursive, page_size, show_archive)
else:
return self.list_objects(client, prefix, operation_parameters, recursive, page_size)
return self.list_objects(client, prefix, operation_parameters, recursive, page_size, show_archive)

def get_summary_with_depth(self, max_depth, relative_path=None):
bucket_name = self.bucket.bucket.path
Expand Down Expand Up @@ -687,12 +688,14 @@ def prefix_match(cls, page_file, relative_path=None):
return True
return False

def list_versions(self, client, prefix, operation_parameters, recursive, page_size):
def list_versions(self, client, prefix, operation_parameters, recursive, page_size, show_archive):
paginator = client.get_paginator('list_object_versions')
page_iterator = paginator.paginate(**operation_parameters)
items = []
item_keys = collections.OrderedDict()
items_count = 0
lifecycle_manager = DataStorageLifecycleManager(self.bucket.bucket.identifier, prefix, self.bucket.is_file_flag)

for page in page_iterator:
if 'CommonPrefixes' in page:
for folder in page['CommonPrefixes']:
Expand All @@ -702,7 +705,17 @@ def list_versions(self, client, prefix, operation_parameters, recursive, page_s
if 'Versions' in page:
for version in page['Versions']:
name = self.get_file_name(version, prefix, recursive)
item = self.get_file_object(version, name, version=True)
restore_status = None
if version['StorageClass'] != 'STANDARD':
restore_status, versions_restored = lifecycle_manager.find_lifecycle_status(name)
version_not_restored = restore_status and not version['IsLatest'] and not versions_restored
if not show_archive:
if not restore_status or version_not_restored:
continue
else:
if version_not_restored:
restore_status = None
item = self.get_file_object(version, name, version=True, lifecycle_status=restore_status)
self.process_version(item, item_keys, name)
if 'DeleteMarkers' in page:
for delete_marker in page['DeleteMarkers']:
Expand Down Expand Up @@ -730,11 +743,13 @@ def process_version(self, item, item_keys, name):
item.versions = versions
item_keys[name] = item

def list_objects(self, client, prefix, operation_parameters, recursive, page_size):
def list_objects(self, client, prefix, operation_parameters, recursive, page_size, show_archive):
paginator = client.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(**operation_parameters)
items = []
items_count = 0
lifecycle_manager = DataStorageLifecycleManager(self.bucket.bucket.identifier, prefix, self.bucket.is_file_flag)

for page in page_iterator:
if 'CommonPrefixes' in page:
for folder in page['CommonPrefixes']:
Expand All @@ -744,14 +759,19 @@ def list_objects(self, client, prefix, operation_parameters, recursive, page_siz
if 'Contents' in page:
for file in page['Contents']:
name = self.get_file_name(file, prefix, recursive)
item = self.get_file_object(file, name)
lifecycle_status = None
if file['StorageClass'] != 'STANDARD':
lifecycle_status, _ = lifecycle_manager.find_lifecycle_status(name)
if not show_archive and not lifecycle_status:
continue
item = self.get_file_object(file, name, lifecycle_status=lifecycle_status)
items.append(item)
items_count += 1
if self.need_to_stop_paging(page, page_size, items_count):
break
return items

def get_file_object(self, file, name, version=False, storage_class=True):
def get_file_object(self, file, name, version=False, storage_class=True, lifecycle_status=None):
item = DataStorageItemModel()
item.type = 'File'
item.name = name
Expand All @@ -760,7 +780,8 @@ def get_file_object(self, file, name, version=False, storage_class=True):
item.path = name
item.changed = file['LastModified'].astimezone(Config.instance().timezone())
if storage_class:
item.labels = [DataStorageItemLabelModel('StorageClass', file['StorageClass'])]
lifecycle_status = lifecycle_status if lifecycle_status else ''
item.labels = [DataStorageItemLabelModel('StorageClass', file['StorageClass'] + lifecycle_status)]
if version:
item.version = file['VersionId']
item.latest = file['IsLatest']
Expand Down