Skip to content

Commit

Permalink
Issue #3020: Archived S3 files shall be excluded from pipe storage & …
Browse files Browse the repository at this point in the history
…pipe mount operations - pipe storage ls
  • Loading branch information
ekazachkova committed Jan 26, 2023
1 parent 884e8fc commit f092386
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 18 deletions.
6 changes: 4 additions & 2 deletions pipe-cli/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1075,8 +1075,9 @@ def mvtodir(name, directory):
help="Option for configuring storage summary details listing mode. Possible values: "
"compact - brief summary only (default); "
"full - show extended details, works for the storage summary listing only")
@click.option('-g', '--show-archive', is_flag=True, help='')
@common_options
def storage_list(path, show_details, show_versions, recursive, page, all, output):
def storage_list(path, show_details, show_versions, recursive, page, all, output, show_archive):
"""Lists storage contents
"""
show_extended = False
Expand All @@ -1085,7 +1086,8 @@ def storage_list(path, show_details, show_versions, recursive, page, all, output
click.echo('Extended output could be configured for the storage summary listing only!', err=True)
sys.exit(1)
show_extended = True
DataStorageOperations.storage_list(path, show_details, show_versions, recursive, page, all, show_extended)
DataStorageOperations.storage_list(path, show_details, show_versions, recursive, page, all, show_extended,
show_archive)


@storage.command(name='mkdir')
Expand Down
36 changes: 36 additions & 0 deletions pipe-cli/src/api/datastorage_lifecycle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright 2023 EPAM Systems, Inc. (https://www.epam.com/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .base import API
from ..model.data_storage_lifecycle_model import DataStorageLifecycleModel


class DataStorageLifecycle(API):

def __init__(self):
super(DataStorageLifecycle, self).__init__()

@classmethod
def load_hierarchy(cls, storage_id, path, is_file=False):
api = cls.instance()
request_url = '/datastorage/%s/lifecycle/restore/effectiveHierarchy?path=%s&pathType=%s' \
% (str(storage_id), path, 'FILE' if is_file else 'FOLDER&recursive=true')
response_data = api.call(request_url, None)
if 'payload' not in response_data:
return None
items = []
for lifecycles_json in response_data['payload']:
lifecycle = DataStorageLifecycleModel.load(lifecycles_json)
items.append(lifecycle)
return items
60 changes: 60 additions & 0 deletions pipe-cli/src/model/data_storage_lifecycle_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright 2023 EPAM Systems, Inc. (https://www.epam.com/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from src.utilities import date_utilities


class DataStorageLifecycleModel:

def __init__(self):
self.id = None
self.datastorage_id = None
self.user_actor_id = None
self.path = None
self.type = None
self.restore_versions = None
self.restore_mode = None
self.days = None
self.started = None
self.updated = None
self.restored_till = None
self.status = None

@classmethod
def load(cls, json):
model = DataStorageLifecycleModel()
model.id = json['id']
if 'datastorageId' in json:
model.datastorage_id = int(json['datastorageId'])
if 'userActorId' in json:
model.user_actor_id = int(json['userActorId'])
if 'path' in json:
model.path = json['path']
if 'type' in json:
model.type = json['type']
if 'restoreVersions' in json:
model.restore_versions = json['restoreVersions']
if 'restoreMode' in json:
model.restore_mode = json['restoreMode']
if 'days' in json:
model.days = int(json['days'])
if 'started' in json:
model.started = date_utilities.server_date_representation(json['started'])
if 'updated' in json:
model.updated = date_utilities.server_date_representation(json['updated'])
if 'restoredTill' in json:
model.restored_till = date_utilities.server_date_representation(json['restoredTill'])
if 'status' in json:
model.status = json['status']
return model
50 changes: 50 additions & 0 deletions pipe-cli/src/utilities/datastorage_lifecycle_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2023 EPAM Systems, Inc. (https://www.epam.com/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from src.api.datastorage_lifecycle import DataStorageLifecycle


class DataStorageLifecycleManager:

def __init__(self, storage_id, path, is_file):
self.storage_id = storage_id
self.path = path
self.is_file = is_file
self.items = None
self.sorted_paths = []

def find_lifecycle_status(self, file_path):
if self.items is None:
self.load_items()
if not file_path.startswith('/'):
file_path = '/' + file_path
for path in self.sorted_paths:
if path == file_path or file_path.startswith(path):
item = self.items.get(path)
if not item:
return None, None
if item.status == 'SUCCEEDED':
restored_till = ' till %s' % item.restored_till\
if item.restored_till else ''
return ' (Restored%s)' % restored_till, item.restore_versions
else:
return None, item.restore_versions
return None, None

def load_items(self):
items = DataStorageLifecycle.load_hierarchy(self.storage_id, self.path, self.is_file)
self.items = {}
for item in items:
self.items.update({item.path: item})
self.sorted_paths = sorted([item.path for item in items], key=len, reverse=True)
13 changes: 9 additions & 4 deletions pipe-cli/src/utilities/datastorage_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def restore(cls, path, version, recursive, exclude, include):
manager.restore_version(version, exclude, include, recursive=recursive)

@classmethod
def storage_list(cls, path, show_details, show_versions, recursive, page, show_all, show_extended):
def storage_list(cls, path, show_details, show_versions, recursive, page, show_all, show_extended, show_archive):
"""Lists storage contents
"""
if path:
Expand All @@ -314,10 +314,14 @@ def storage_list(cls, path, show_details, show_versions, recursive, page, show_a
if root_bucket is None:
click.echo('Storage path "{}" was not found'.format(path), err=True)
sys.exit(1)
if show_archive and root_bucket.type != 'S3':
click.echo('Error: --show-archive option is not available for this provider.', err=True)
sys.exit(1)
else:
relative_path = original_path if original_path != '/' else ''
cls.__print_data_storage_contents(root_bucket, relative_path, show_details, recursive,
page_size=page, show_versions=show_versions, show_all=show_all)
page_size=page, show_versions=show_versions, show_all=show_all,
show_archive=show_archive)
else:
# If no argument is specified - list brief details of all buckets
cls.__print_data_storage_contents(None, None, show_details, recursive, show_all=show_all,
Expand Down Expand Up @@ -428,14 +432,15 @@ def __load_storage_list(cls, extended=False):

@classmethod
def __print_data_storage_contents(cls, bucket_model, relative_path, show_details, recursive, page_size=None,
show_versions=False, show_all=False, show_extended=False):
show_versions=False, show_all=False, show_extended=False, show_archive=False):

items = []
header = None
if bucket_model is not None:
wrapper = DataStorageWrapper.get_cloud_wrapper_for_bucket(bucket_model, relative_path)
manager = wrapper.get_list_manager(show_versions=show_versions)
items = manager.list_items(relative_path, recursive=recursive, page_size=page_size, show_all=show_all)
items = manager.list_items(relative_path, recursive=recursive, page_size=page_size, show_all=show_all,
show_archive=show_archive)
else:
hidden_object_manager = HiddenObjectManager()
# If no argument is specified - list brief details of all buckets
Expand Down
2 changes: 1 addition & 1 deletion pipe-cli/src/utilities/storage/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def __init__(self, blob_service, bucket):
self.delimiter = StorageOperations.PATH_SEPARATOR

def list_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE,
show_all=False):
show_all=False, show_archive=False):
prefix = StorageOperations.get_prefix(relative_path)
blobs_generator = self.service.list_blobs(self.bucket.path,
prefix=prefix if relative_path else None,
Expand Down
3 changes: 2 additions & 1 deletion pipe-cli/src/utilities/storage/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,15 @@ class AbstractListingManager:

@abstractmethod
def list_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE,
show_all=False):
show_all=False, show_archive=False):
"""
Lists files and folders by a relative path in the current storage.
:param relative_path: Storage relative path to be listed.
:param recursive: Specifies if the listing has to be recursive.
:param page_size: Max number of items to return. The argument is ignored if show_all argument is specified.
:param show_all: Specifies if all items have to be listed.
:param show_archive: Specifies if archived items have to be listed
"""
pass

Expand Down
2 changes: 1 addition & 1 deletion pipe-cli/src/utilities/storage/gs.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ def __init__(self, client, bucket, show_versions=False):
self.show_versions = show_versions

def list_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE,
show_all=False):
show_all=False, show_archive=False):
prefix = StorageOperations.get_prefix(relative_path)
bucket = self.client.bucket(self.bucket.path)
blobs_iterator = bucket.list_blobs(prefix=prefix if relative_path else None,
Expand Down
39 changes: 30 additions & 9 deletions pipe-cli/src/utilities/storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from boto3.s3.transfer import TransferConfig
from botocore.endpoint import BotocoreHTTPSession, MAX_POOL_CONNECTIONS

from src.utilities.datastorage_lifecycle_manager import DataStorageLifecycleManager
from src.utilities.encoding_utilities import to_string, to_ascii, is_safe_chars
from src.utilities.storage.s3_proxy_utils import AwsProxyConnectWithHeadersHTTPSAdapter
from src.utilities.storage.storage_usage import StorageUsageAccumulator
Expand Down Expand Up @@ -595,7 +596,7 @@ def __init__(self, bucket, session, show_versions=False, region_name=None):
self.show_versions = show_versions

def list_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE,
show_all=False):
show_all=False, show_archive=False):
delimiter = S3BucketOperations.S3_PATH_SEPARATOR
client = self._get_client()
operation_parameters = {
Expand All @@ -615,9 +616,9 @@ def list_items(self, relative_path=None, recursive=False, page_size=StorageOpera
operation_parameters['Prefix'] = prefix

if self.show_versions:
return self.list_versions(client, prefix, operation_parameters, recursive, page_size)
return self.list_versions(client, prefix, operation_parameters, recursive, page_size, show_archive)
else:
return self.list_objects(client, prefix, operation_parameters, recursive, page_size)
return self.list_objects(client, prefix, operation_parameters, recursive, page_size, show_archive)

def get_summary_with_depth(self, max_depth, relative_path=None):
bucket_name = self.bucket.bucket.path
Expand Down Expand Up @@ -687,12 +688,14 @@ def prefix_match(cls, page_file, relative_path=None):
return True
return False

def list_versions(self, client, prefix, operation_parameters, recursive, page_size):
def list_versions(self, client, prefix, operation_parameters, recursive, page_size, show_archive):
paginator = client.get_paginator('list_object_versions')
page_iterator = paginator.paginate(**operation_parameters)
items = []
item_keys = collections.OrderedDict()
items_count = 0
lifecycle_manager = DataStorageLifecycleManager(self.bucket.bucket.identifier, prefix, self.bucket.is_file_flag)

for page in page_iterator:
if 'CommonPrefixes' in page:
for folder in page['CommonPrefixes']:
Expand All @@ -702,7 +705,17 @@ def list_versions(self, client, prefix, operation_parameters, recursive, page_s
if 'Versions' in page:
for version in page['Versions']:
name = self.get_file_name(version, prefix, recursive)
item = self.get_file_object(version, name, version=True)
restore_status = None
if version['StorageClass'] != 'STANDARD':
restore_status, versions_restored = lifecycle_manager.find_lifecycle_status(name)
version_not_restored = restore_status and not version['IsLatest'] and not versions_restored
if not show_archive:
if not restore_status or version_not_restored:
continue
else:
if version_not_restored:
restore_status = None
item = self.get_file_object(version, name, version=True, lifecycle_status=restore_status)
self.process_version(item, item_keys, name)
if 'DeleteMarkers' in page:
for delete_marker in page['DeleteMarkers']:
Expand Down Expand Up @@ -730,11 +743,13 @@ def process_version(self, item, item_keys, name):
item.versions = versions
item_keys[name] = item

def list_objects(self, client, prefix, operation_parameters, recursive, page_size):
def list_objects(self, client, prefix, operation_parameters, recursive, page_size, show_archive):
paginator = client.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(**operation_parameters)
items = []
items_count = 0
lifecycle_manager = DataStorageLifecycleManager(self.bucket.bucket.identifier, prefix, self.bucket.is_file_flag)

for page in page_iterator:
if 'CommonPrefixes' in page:
for folder in page['CommonPrefixes']:
Expand All @@ -744,14 +759,19 @@ def list_objects(self, client, prefix, operation_parameters, recursive, page_siz
if 'Contents' in page:
for file in page['Contents']:
name = self.get_file_name(file, prefix, recursive)
item = self.get_file_object(file, name)
lifecycle_status = None
if file['StorageClass'] != 'STANDARD':
lifecycle_status, _ = lifecycle_manager.find_lifecycle_status(name)
if not show_archive and not lifecycle_status:
continue
item = self.get_file_object(file, name, lifecycle_status=lifecycle_status)
items.append(item)
items_count += 1
if self.need_to_stop_paging(page, page_size, items_count):
break
return items

def get_file_object(self, file, name, version=False, storage_class=True):
def get_file_object(self, file, name, version=False, storage_class=True, lifecycle_status=None):
item = DataStorageItemModel()
item.type = 'File'
item.name = name
Expand All @@ -760,7 +780,8 @@ def get_file_object(self, file, name, version=False, storage_class=True):
item.path = name
item.changed = file['LastModified'].astimezone(Config.instance().timezone())
if storage_class:
item.labels = [DataStorageItemLabelModel('StorageClass', file['StorageClass'])]
lifecycle_status = lifecycle_status if lifecycle_status else ''
item.labels = [DataStorageItemLabelModel('StorageClass', file['StorageClass'] + lifecycle_status)]
if version:
item.version = file['VersionId']
item.latest = file['IsLatest']
Expand Down

0 comments on commit f092386

Please sign in to comment.