diff --git a/pipe-cli/mount/__init__.py b/pipe-cli/mount/__init__.py index e69de29bb2..2791f20f06 100644 --- a/pipe-cli/mount/__init__.py +++ b/pipe-cli/mount/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2017-2022 EPAM Systems, Inc. (https://www.epam.com/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/pipe-cli/mount/pipe-fuse.py b/pipe-cli/mount/pipe-fuse.py index 9528a87685..34ff1c186e 100644 --- a/pipe-cli/mount/pipe-fuse.py +++ b/pipe-cli/mount/pipe-fuse.py @@ -1,4 +1,4 @@ -# Copyright 2017-2020 EPAM Systems, Inc. (https://www.epam.com/) +# Copyright 2017-2022 EPAM Systems, Inc. (https://www.epam.com/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ # limitations under the License. import argparse +import ctypes import errno import logging import os @@ -21,6 +22,7 @@ import traceback import future.utils +from cachetools import TTLCache def is_windows(): @@ -43,27 +45,28 @@ def is_windows(): if os.path.exists(libfuse_path): os.environ["FUSE_LIBRARY_PATH"] = libfuse_path +import fuse +from fuse import FUSE, fuse_operations, fuse_file_info, c_utimbuf -from pipefuse.fuseutils import MB, GB -from pipefuse.cache import CachingFileSystemClient, ListingCache, ThreadSafeListingCache +from pipefuse.api import CloudPipelineClient, CloudType from pipefuse.buffread import BufferingReadAheadFileSystemClient from pipefuse.buffwrite import BufferingWriteFileSystemClient +from pipefuse.cache import ListingCache, ThreadSafeListingCache, \ + CachingListingFileSystemClient +from pipefuse.fslock import get_lock +from pipefuse.fuseutils import MB, GB +from pipefuse.gcp import GoogleStorageLowLevelFileSystemClient +from pipefuse.path import PathExpandingStorageFileSystemClient +from pipefuse.pipefs import PipeFS, SupportedOperationsFS +from pipefuse.record import RecordingFileSystemClient, RecordingFS +from pipefuse.s3 import S3StorageLowLevelClient +from pipefuse.storage import StorageHighLevelFileSystemClient from pipefuse.trunc import CopyOnDownTruncateFileSystemClient, \ WriteNullsOnUpTruncateFileSystemClient, \ WriteLastNullOnUpTruncateFileSystemClient -from pipefuse.api import CloudPipelineClient, CloudType -from pipefuse.gcp import GoogleStorageLowLevelFileSystemClient from pipefuse.webdav import CPWebDavClient -from pipefuse.s3 import S3StorageLowLevelClient -from pipefuse.storage import StorageHighLevelFileSystemClient -from pipefuse.pipefs import PipeFS -from pipefuse.record import RecordingFS, RecordingFileSystemClient -from pipefuse.path import PathExpandingStorageFileSystemClient -from pipefuse.fslock import get_lock -import ctypes -import fuse -from fuse import FUSE, fuse_operations, fuse_file_info, c_utimbuf -from cachetools import TTLCache +from pipefuse.xattr import ExtendedAttributesCache, ThreadSafeExtendedAttributesCache, \ + ExtendedAttributesCachingFileSystemClient, RestrictingExtendedAttributesFS _allowed_logging_level_names = ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG', 'NOTSET'] _allowed_logging_levels = future.utils.lfilter(lambda name: isinstance(name, str), _allowed_logging_level_names) @@ -71,28 +74,31 @@ def is_windows(): _default_logging_level = 'ERROR' _debug_logging_level = 'DEBUG' _info_logging_level = 'INFO' +_xattrs_operations = ['setxattr', 'getxattr', 'listxattr', 'removexattr'] +_xattrs_include_prefix = 'user' def start(mountpoint, webdav, bucket, read_buffer_size, read_ahead_min_size, read_ahead_max_size, read_ahead_size_multiplier, write_buffer_size, trunc_buffer_size, chunk_size, - cache_ttl, cache_size, default_mode, - mount_options=None, threads=False, monitoring_delay=600, recording=False): - if mount_options is None: - mount_options = {} + listing_cache_ttl, listing_cache_size, + xattrs_include_prefixes, xattrs_exclude_prefixes, + xattrs_cache_ttl, xattrs_cache_size, + disabled_operations, default_mode, + mount_options, threads=False, monitoring_delay=600, recording=False): try: os.makedirs(mountpoint) except OSError as e: if e.errno != errno.EEXIST: raise - api = os.environ.get('API', '') - bearer = os.environ.get('API_TOKEN', '') - chunk_size = int(os.environ.get('CP_PIPE_FUSE_CHUNK_SIZE', chunk_size)) - read_ahead_min_size = int(os.environ.get('CP_PIPE_FUSE_READ_AHEAD_MIN_SIZE', read_ahead_min_size)) - read_ahead_max_size = int(os.environ.get('CP_PIPE_FUSE_READ_AHEAD_MAX_SIZE', read_ahead_max_size)) - read_ahead_size_multiplier = int(os.environ.get('CP_PIPE_FUSE_READ_AHEAD_SIZE_MULTIPLIER', - read_ahead_size_multiplier)) + api = os.getenv('API', '') + bearer = os.getenv('API_TOKEN', '') + chunk_size = int(os.getenv('CP_PIPE_FUSE_CHUNK_SIZE', chunk_size)) + read_ahead_min_size = int(os.getenv('CP_PIPE_FUSE_READ_AHEAD_MIN_SIZE', read_ahead_min_size)) + read_ahead_max_size = int(os.getenv('CP_PIPE_FUSE_READ_AHEAD_MAX_SIZE', read_ahead_max_size)) + read_ahead_size_multiplier = int(os.getenv('CP_PIPE_FUSE_READ_AHEAD_SIZE_MULTIPLIER', + read_ahead_size_multiplier)) bucket_type = None root_path = None if not bearer: @@ -120,14 +126,23 @@ def start(mountpoint, webdav, bucket, client = RecordingFileSystemClient(client) if bucket_type in [CloudType.S3, CloudType.GS]: client = PathExpandingStorageFileSystemClient(client, root_path=root_path) - if cache_ttl > 0 and cache_size > 0: - cache_implementation = TTLCache(maxsize=cache_size, ttl=cache_ttl) - cache = ListingCache(cache_implementation) + if listing_cache_ttl > 0 and listing_cache_size > 0: + listing_cache_implementation = TTLCache(maxsize=listing_cache_size, ttl=listing_cache_ttl) + listing_cache = ListingCache(listing_cache_implementation) if threads: - cache = ThreadSafeListingCache(cache) - client = CachingFileSystemClient(client, cache) + listing_cache = ThreadSafeListingCache(listing_cache) + client = CachingListingFileSystemClient(client, listing_cache) else: - logging.info('Caching is disabled.') + logging.info('Listing caching is disabled.') + if bucket_type == CloudType.S3: + if xattrs_cache_ttl > 0 and xattrs_cache_size > 0: + xattrs_cache_implementation = TTLCache(maxsize=xattrs_cache_size, ttl=xattrs_cache_ttl) + xattrs_cache = ExtendedAttributesCache(xattrs_cache_implementation) + if threads: + xattrs_cache = ThreadSafeExtendedAttributesCache(xattrs_cache) + client = ExtendedAttributesCachingFileSystemClient(client, xattrs_cache) + else: + logging.info('Extended attributes caching is disabled.') if read_buffer_size > 0: client = BufferingReadAheadFileSystemClient(client, read_ahead_min_size=read_ahead_min_size, @@ -151,12 +166,30 @@ def start(mountpoint, webdav, bucket, client = WriteNullsOnUpTruncateFileSystemClient(client, capacity=trunc_buffer_size) else: logging.info('Truncating support is disabled.') - logging.info('File system clients pipeline: %s', client.stats()) + fs = PipeFS(client=client, lock=get_lock(threads, monitoring_delay=monitoring_delay), mode=int(default_mode, 8)) + if bucket_type == CloudType.S3: + if xattrs_include_prefixes: + if xattrs_include_prefixes[0] == '*': + logging.info('All extended attributes will be processed.') + else: + fs = RestrictingExtendedAttributesFS(fs, include_prefixes=xattrs_include_prefixes) + if xattrs_exclude_prefixes: + if xattrs_exclude_prefixes[0] == '*': + logging.info('All extended attributes operations will be disabled.') + disabled_operations.extend(_xattrs_operations) + else: + fs = RestrictingExtendedAttributesFS(fs, exclude_prefixes=xattrs_exclude_prefixes) + else: + logging.info('All extended attributes operations will be disabled.') + disabled_operations.extend(_xattrs_operations) + fs = SupportedOperationsFS(fs, exclude=disabled_operations) if recording: fs = RecordingFS(fs) - logging.info('Initializing file system.') + logging.info('File system processing chain: \n%s', fs.summary()) + + logging.info('Initializing file system...') enable_additional_operations() ro = client.is_read_only() or mount_options.get('ro', False) mount_options.pop('ro', None) @@ -270,10 +303,31 @@ def parse_mount_options(options_string): parser.add_argument("-c", "--chunk-size", type=int, required=False, default=10 * MB, help="Multipart upload chunk size. Can be also specified via " "CP_PIPE_FUSE_CHUNK_SIZE environment variable.") - parser.add_argument("-t", "--cache-ttl", type=int, required=False, default=60, + parser.add_argument("-t", "--cache-ttl", "--listing-cache-ttl", dest="listing_cache_ttl", + type=int, required=False, default=60, help="Listing cache time to live, seconds") - parser.add_argument("-s", "--cache-size", type=int, required=False, default=100, + parser.add_argument("-s", "--cache-size", "--listing-cache-size", dest="listing_cache_size", + type=int, required=False, default=100, help="Number of simultaneous listing caches") + parser.add_argument("--xattrs-include-prefix", dest="xattrs_include_prefixes", + type=str, required=False, action="append", default=[], + help="Extended attribute prefixes to be included in processing. " + "Use --xattrs-include-prefix=\"*\" to enable all extended attributes processing. " + "The argument can be specified multiple times. " + "Defaults to \"%s\"." % _xattrs_include_prefix) + parser.add_argument("--xattrs-exclude-prefix", dest="xattrs_exclude_prefixes", + type=str, required=False, action="append", default=[], + help="Extended attribute prefixes to be excluded from processing. " + "Use --xattrs-exclude-prefix=\"*\" to disable all extended attributes processing. " + "The argument can be specified multiple times.") + parser.add_argument("--xattrs-cache-ttl", type=int, required=False, default=60, + help="Extended attributes cache time to live, seconds.") + parser.add_argument("--xattrs-cache-size", type=int, required=False, default=1000, + help="Number of simultaneous extended attributes caches.") + parser.add_argument("--disabled-operation", dest="disabled_operations", + type=str, required=False, action="append", default=[], + help="Disabled file system operations. " + "The argument can be specified multiple times.") parser.add_argument("-m", "--mode", type=str, required=False, default="700", help="Default mode for files") parser.add_argument("-o", "--options", type=str, required=False, @@ -285,6 +339,10 @@ def parse_mount_options(options_string): help="Delay between path lock monitoring cycles.") args = parser.parse_args() + if args.xattrs_include_prefixes and args.xattrs_exclude_prefixes: + parser.error('Either --xattrs-include-prefix or --xattrs-exclude-prefix parameter should be specified.') + if not args.xattrs_include_prefixes and not args.xattrs_exclude_prefixes: + args.xattrs_include_prefixes = [_xattrs_include_prefix] if not args.webdav and not args.bucket: parser.error('Either --webdav or --bucket parameter should be specified.') if args.bucket and (args.chunk_size < 5 * MB or args.chunk_size > 5 * GB): @@ -308,10 +366,14 @@ def parse_mount_options(options_string): read_ahead_size_multiplier=args.read_ahead_size_multiplier, write_buffer_size=args.write_buffer_size, trunc_buffer_size=args.trunc_buffer_size, chunk_size=args.chunk_size, - cache_ttl=args.cache_ttl, cache_size=args.cache_size, + listing_cache_ttl=args.listing_cache_ttl, listing_cache_size=args.listing_cache_size, + xattrs_include_prefixes=args.xattrs_include_prefixes, + xattrs_exclude_prefixes=args.xattrs_exclude_prefixes, + xattrs_cache_ttl=args.xattrs_cache_ttl, xattrs_cache_size=args.xattrs_cache_size, + disabled_operations=args.disabled_operations, default_mode=args.mode, mount_options=parse_mount_options(args.options), threads=args.threads, monitoring_delay=args.monitoring_delay, recording=recording) - except BaseException as e: - logging.error('Unhandled error: %s' % str(e)) + except Exception: + logging.exception('Unhandled error') traceback.print_exc() sys.exit(1) diff --git a/pipe-cli/mount/pipefuse/__init__.py b/pipe-cli/mount/pipefuse/__init__.py index cb38079087..2791f20f06 100644 --- a/pipe-cli/mount/pipefuse/__init__.py +++ b/pipe-cli/mount/pipefuse/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2017-2019 EPAM Systems, Inc. (https://www.epam.com/) +# Copyright 2017-2022 EPAM Systems, Inc. (https://www.epam.com/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,5 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -__all__ = ['webdavfs.py', 'webdav.py'] diff --git a/pipe-cli/mount/pipefuse/buffread.py b/pipe-cli/mount/pipefuse/buffread.py index 1bed858d1f..4d3b994035 100644 --- a/pipe-cli/mount/pipefuse/buffread.py +++ b/pipe-cli/mount/pipefuse/buffread.py @@ -1,4 +1,4 @@ -# Copyright 2017-2020 EPAM Systems, Inc. (https://www.epam.com/) +# Copyright 2017-2022 EPAM Systems, Inc. (https://www.epam.com/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,9 +18,6 @@ from pipefuse.fsclient import FileSystemClientDecorator -_ANY_ERROR = BaseException - - class _ReadBuffer: def __init__(self, offset, capacity, file_size): @@ -122,10 +119,10 @@ def download_range(self, fh, buf, path, offset=0, length=0): file_buf = self._new_read_buf(fh, path, file_buf.file_size, offset, length) self._buffs[buf_key] = file_buf buf.write(file_buf.view(offset, length)) - except _ANY_ERROR: + except Exception: logging.exception('Downloading has failed for %d:%s. ' 'Removing the corresponding buffer.' % (fh, path)) - self._remove_read_buf(fh, path) + self._remove_buf(fh, path) raise def _new_read_buf(self, fh, path, file_size, offset, length): @@ -145,7 +142,7 @@ def flush(self, fh, path): logging.info('Flushing the corresponding buffer for %d:%s' % (fh, path)) self._inner.flush(fh, path) self._remove_buf(fh, path) - except _ANY_ERROR: + except Exception: logging.exception('Flushing has failed for %d:%s. ' 'Removing the corresponding buffer.' % (fh, path)) self._remove_buf(fh, path) diff --git a/pipe-cli/mount/pipefuse/buffwrite.py b/pipe-cli/mount/pipefuse/buffwrite.py index 3354b4ca46..a7d18002d8 100644 --- a/pipe-cli/mount/pipefuse/buffwrite.py +++ b/pipe-cli/mount/pipefuse/buffwrite.py @@ -1,4 +1,4 @@ -# Copyright 2017-2020 EPAM Systems, Inc. (https://www.epam.com/) +# Copyright 2017-2022 EPAM Systems, Inc. (https://www.epam.com/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,9 +17,6 @@ from pipefuse.fsclient import FileSystemClientDecorator -_ANY_ERROR = BaseException - - class _WriteBuffer: def __init__(self, offset, capacity, file_size=0): @@ -114,7 +111,7 @@ def upload_range(self, fh, buf, path, offset=0): self._remove_write_buf(fh, path) file_buf = self._new_write_buf(self._capacity, file_buf.end, buf=None, old_write_buf=file_buf) self._buffs[path] = file_buf - except _ANY_ERROR: + except Exception: logging.exception('Uploading has failed for %d:%s. ' 'Removing the corresponding buffer.' % (fh, path)) self._remove_write_buf(fh, path) @@ -132,7 +129,7 @@ def flush(self, fh, path): self._flush_write_buf(fh, path) self._inner.flush(fh, path) self._remove_write_buf(fh, path) - except _ANY_ERROR: + except Exception: logging.exception('Flushing has failed for %d:%s. ' 'Removing the corresponding buffer.' % (fh, path)) self._remove_write_buf(fh, path) diff --git a/pipe-cli/mount/pipefuse/cache.py b/pipe-cli/mount/pipefuse/cache.py index be5142e7e2..251471ab65 100644 --- a/pipe-cli/mount/pipefuse/cache.py +++ b/pipe-cli/mount/pipefuse/cache.py @@ -1,4 +1,4 @@ -# Copyright 2017-2020 EPAM Systems, Inc. (https://www.epam.com/) +# Copyright 2017-2022 EPAM Systems, Inc. (https://www.epam.com/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,17 +14,14 @@ import logging import time - from datetime import datetime from threading import RLock from dateutil.tz import tzlocal -from pipefuse.fsclient import File, FileSystemClientDecorator from pipefuse import fuseutils - - -_ANY_ERROR = BaseException +from pipefuse.fsclient import File, FileSystemClientDecorator +from pipefuse.lock import synchronized class ListingCache: @@ -98,28 +95,17 @@ def _is_relative(self, cache_path, path): return False -def synchronized(func): - def wrapper(*args, **kwargs): - lock = args[0]._lock - try: - lock.acquire() - return_value = func(*args, **kwargs) - return return_value - finally: - lock.release() - return wrapper - - class ThreadSafeListingCache: - def __init__(self, inner): + def __init__(self, inner, lock=None): """ Thread safe listing cache. :param inner: Not thread safe listing cache. + :param lock: Reentrant lock. """ self._inner = inner - self._lock = RLock() + self._lock = lock or RLock() @synchronized def get(self, path): @@ -150,18 +136,18 @@ def invalidate_cache(self, path): self._inner.invalidate_cache(path) -class CachingFileSystemClient(FileSystemClientDecorator): +class CachingListingFileSystemClient(FileSystemClientDecorator): def __init__(self, inner, cache): """ - Caching file system client decorator. + Caching listing file system client decorator. - It caches listing calls to reduce number of calls to an inner file system client. + It caches listing calls to reduce a number of calls to an inner file system client. :param inner: Decorating file system client. :param cache: Listing cache. """ - super(CachingFileSystemClient, self).__init__(inner) + super(CachingListingFileSystemClient, self).__init__(inner) self._inner = inner self._cache = cache self._delimiter = '/' @@ -170,7 +156,7 @@ def exists(self, path): return self.attrs(path) is not None def attrs(self, path): - logging.info('Getting attributes for %s' % path) + logging.info('Getting attributes for %s...' % path) parent_path, file_name = fuseutils.split_path(path) if not file_name: return self._root() @@ -216,7 +202,7 @@ def upload(self, buf, path): try: self._inner.upload(buf, path) self._cache.replace_in_parent_cache(path, self._inner.attrs(path)) - except _ANY_ERROR: + except Exception: logging.exception('Standalone uploading has failed for %s' % path) self._cache.invalidate_parent_cache(path) raise @@ -225,7 +211,7 @@ def delete(self, path): try: self._inner.delete(path) self._cache.replace_in_parent_cache(path) - except _ANY_ERROR: + except Exception: logging.exception('Deleting has failed for %s' % path) self._cache.invalidate_parent_cache(path) raise @@ -235,7 +221,7 @@ def mv(self, old_path, path): self._inner.mv(old_path, path) self._cache.move_from_parent_cache(old_path, path) self._cache.invalidate_cache_recursively(old_path) - except _ANY_ERROR: + except Exception: logging.exception('Moving from %s to %s has failed' % (old_path, path)) self._cache.invalidate_parent_cache(old_path) self._cache.invalidate_cache_recursively(old_path) @@ -246,7 +232,7 @@ def mkdir(self, path): try: self._inner.mkdir(path) self._cache.replace_in_parent_cache(path, self._inner.attrs(path)) - except _ANY_ERROR: + except Exception: logging.exception('Mkdir has failed for %s' % path) self._cache.invalidate_parent_cache(path) raise @@ -256,7 +242,7 @@ def rmdir(self, path): self._inner.rmdir(path) self._cache.replace_in_parent_cache(path) self._cache.invalidate_cache_recursively(path) - except _ANY_ERROR: + except Exception: logging.exception('Rmdir has failed for %s' % path) self._cache.invalidate_parent_cache(path) self._cache.invalidate_cache_recursively(path) @@ -266,7 +252,7 @@ def flush(self, fh, path): try: self._inner.flush(fh, path) self._cache.replace_in_parent_cache(path, self._inner.attrs(path)) - except _ANY_ERROR: + except Exception: logging.exception('Flushing has failed for %s' % path) self._cache.invalidate_parent_cache(path) raise diff --git a/pipe-cli/mount/pipefuse/chain.py b/pipe-cli/mount/pipefuse/chain.py new file mode 100644 index 0000000000..b4a6a9ed3d --- /dev/null +++ b/pipe-cli/mount/pipefuse/chain.py @@ -0,0 +1,33 @@ +# Copyright 2017-2022 EPAM Systems, Inc. (https://www.epam.com/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABCMeta + + +class ChainingService: + __metaclass__ = ABCMeta + + def parameters(self): + return {} + + def summary(self): + out = '-> ' + str(type(self).__name__) + params = self.parameters() + if params: + out += '[' + ''.join(key + '=' + value for key, value in params.items()) + ']' + inner = getattr(self, '_inner', None) or getattr(self, '_client', None) + if inner: + out += ' ->\n' + inner.summary() + return out + diff --git a/pipe-cli/mount/pipefuse/fsclient.py b/pipe-cli/mount/pipefuse/fsclient.py index c8def37dbf..72d9502421 100644 --- a/pipe-cli/mount/pipefuse/fsclient.py +++ b/pipe-cli/mount/pipefuse/fsclient.py @@ -1,4 +1,4 @@ -# Copyright 2017-2020 EPAM Systems, Inc. (https://www.epam.com/) +# Copyright 2017-2022 EPAM Systems, Inc. (https://www.epam.com/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,14 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. +from abc import ABCMeta, abstractmethod from collections import namedtuple -from abc import ABCMeta, abstractmethod +from pipefuse.chain import ChainingService File = namedtuple('File', ['name', 'size', 'mtime', 'ctime', 'contenttype', 'is_dir']) -class FileSystemClient: +class UnsupportedOperationException(RuntimeError): + pass + + +class FileSystemClient(ChainingService): __metaclass__ = ABCMeta @abstractmethod @@ -125,8 +130,26 @@ def truncate(self, fh, path, length): """ pass - def stats(self): - return str(type(self).__name__) + def download_xattrs(self, path): + """ + Returns extended attributes of a single file or folder by the given path. + + :param path: Relative path to a single file or folder. + :return: Extended attributes or None if the given path doesn't exist. + """ + pass + + def upload_xattrs(self, path, xattrs): + pass + + def upload_xattr(self, path, name, value): + pass + + def remove_xattrs(self, path): + pass + + def remove_xattr(self, path, name): + pass class FileSystemClientDecorator(FileSystemClient): @@ -181,8 +204,20 @@ def flush(self, fh, path): def truncate(self, fh, path, length): self._inner.truncate(fh, path, length) - def stats(self): - return ' -> '.join([str(type(self).__name__), self._inner.stats()]) + def download_xattrs(self, path): + return self._inner.download_xattrs(path) + + def upload_xattrs(self, path, xattrs): + self._inner.upload_xattrs(path, xattrs) + + def upload_xattr(self, path, name, value): + self._inner.upload_xattr(path, name, value) + + def remove_xattrs(self, path): + self._inner.remove_xattrs(path) + + def remove_xattr(self, path, name): + self._inner.remove_xattr(path, name) def __getattr__(self, name): if hasattr(self._inner, name): diff --git a/pipe-cli/mount/pipefuse/lock.py b/pipe-cli/mount/pipefuse/lock.py new file mode 100644 index 0000000000..6249fbbc2e --- /dev/null +++ b/pipe-cli/mount/pipefuse/lock.py @@ -0,0 +1,25 @@ +# Copyright 2017-2022 EPAM Systems, Inc. (https://www.epam.com/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def synchronized(func): + def wrapper(*args, **kwargs): + lock = args[0]._lock + try: + lock.acquire() + return_value = func(*args, **kwargs) + return return_value + finally: + lock.release() + return wrapper diff --git a/pipe-cli/mount/pipefuse/path.py b/pipe-cli/mount/pipefuse/path.py index 9ac3723c15..63836cb917 100644 --- a/pipe-cli/mount/pipefuse/path.py +++ b/pipe-cli/mount/pipefuse/path.py @@ -1,3 +1,17 @@ +# Copyright 2017-2022 EPAM Systems, Inc. (https://www.epam.com/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from pipefuse import fuseutils from pipefuse.fsclient import FileSystemClientDecorator @@ -68,6 +82,26 @@ def truncate(self, fh, path, length): expanded_path = self._expand_path(path) self._inner.truncate(fh, expanded_path, length) + def download_xattrs(self, path): + expanded_path = self._expand_path(path) + return self._inner.download_xattrs(expanded_path) + + def upload_xattrs(self, path, xattrs): + expanded_path = self._expand_path(path) + self._inner.upload_xattrs(expanded_path, xattrs) + + def upload_xattr(self, path, name, value): + expanded_path = self._expand_path(path) + self._inner.upload_xattr(expanded_path, name, value) + + def remove_xattrs(self, path): + expanded_path = self._expand_path(path) + self._inner.remove_xattrs(expanded_path) + + def remove_xattr(self, path, name): + expanded_path = self._expand_path(path) + self._inner.remove_xattr(expanded_path, name) + def _expand_path(self, path): return fuseutils.join_path_with_delimiter(self._root_path, path) \ if self._root_path else path diff --git a/pipe-cli/mount/pipefuse/pipefs.py b/pipe-cli/mount/pipefuse/pipefs.py index 2fc9d68773..893984bfef 100644 --- a/pipe-cli/mount/pipefuse/pipefs.py +++ b/pipe-cli/mount/pipefuse/pipefs.py @@ -1,4 +1,4 @@ -# Copyright 2017-2020 EPAM Systems, Inc. (https://www.epam.com/) +# Copyright 2017-2022 EPAM Systems, Inc. (https://www.epam.com/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,23 +14,22 @@ import datetime import errno +import functools import io import logging import os import platform import stat import time +from threading import RLock import easywebdav from dateutil.tz import tzlocal -from fuse import FuseOSError, Operations -from threading import RLock +from fuse import FuseOSError, Operations, ENOTSUP from pipefuse import fuseutils - - -class UnsupportedOperationException(Exception): - pass +from pipefuse.chain import ChainingService +from pipefuse.fsclient import UnsupportedOperationException class FileHandleContainer(object): @@ -87,7 +86,7 @@ def wrapper(*args, **kwargs): return wrapper -class PipeFS(Operations): +class PipeFS(Operations, ChainingService): def __init__(self, client, lock, mode=0o755): self._client = client @@ -179,10 +178,10 @@ def readdir(self, path, fh): yield f def readlink(self, path): - raise UnsupportedOperationException("readlink") + raise FuseOSError(ENOTSUP) def mknod(self, path, mode, dev): - raise UnsupportedOperationException("mknod") + raise FuseOSError(ENOTSUP) @syncronized @errorlogged @@ -222,7 +221,7 @@ def unlink(self, path): self._client.delete(path) def symlink(self, name, target): - raise UnsupportedOperationException("symlink") + raise FuseOSError(ENOTSUP) @syncronized @errorlogged @@ -230,7 +229,7 @@ def rename(self, old, new): self._client.mv(old, new) def link(self, target, name): - raise UnsupportedOperationException("link") + raise FuseOSError(ENOTSUP) @errorlogged def utimens(self, path, times=None): @@ -297,3 +296,76 @@ def fallocate(self, path, mode, offset, length, fh): logging.warn('Fallocate mode (%s) is not supported yet.' % mode) if offset + length >= props.size: self._client.truncate(fh, path, offset + length) + + @syncronized + @errorlogged + def setxattr(self, path, name, value, options, *args): + self._client.upload_xattr(path, name, value) + return 0 + + @errorlogged + def getxattr(self, path, name, *args): + xattrs = self._client.download_xattrs(path) or {} + xattr = xattrs.get(name) + if xattr is None: + raise FuseOSError(errno.ENODATA) + return xattr + + @errorlogged + def listxattr(self, path): + xattrs = self._client.download_xattrs(path) or {} + return xattrs.keys() + + @syncronized + @errorlogged + def removexattr(self, path, name): + self._client.remove_xattr(path, name) + return 0 + + +class SupportedOperationsFS(ChainingService): + + def __init__(self, inner, exclude): + """ + Supported operations File System. + + It allows only certain operations processing. + + :param inner: Decorating file system. + :param exclude: Unsupported operations. + """ + self._inner = inner + self._exclude = exclude + + def __getattr__(self, name): + if not hasattr(self._inner, name): + return None + attr = getattr(self._inner, name) + if not callable(attr): + return attr + return self._wrap(attr, name=name) + + def __call__(self, name, *args, **kwargs): + if not hasattr(self._inner, name): + return getattr(self, name)(*args, **kwargs) + attr = getattr(self._inner, name) + return self._wrap(attr, name=name)(*args, **kwargs) + + def _wrap(self, attr, name=None): + @functools.wraps(attr) + def _wrapped_attr(*args, **kwargs): + method_name = name or args[0] + if method_name in self._exclude: + logging.debug('Aborting excluded operation %s processing...', method_name) + raise FuseOSError(ENOTSUP) + try: + return attr(*args, **kwargs) + except UnsupportedOperationException: + raise FuseOSError(ENOTSUP) + return _wrapped_attr + + def parameters(self): + params = {} + if self._exclude: + params['exclude'] = ','.join(self._exclude) + return params diff --git a/pipe-cli/mount/pipefuse/record.py b/pipe-cli/mount/pipefuse/record.py index d3db64dc09..1ed7a534b9 100644 --- a/pipe-cli/mount/pipefuse/record.py +++ b/pipe-cli/mount/pipefuse/record.py @@ -1,8 +1,26 @@ +# Copyright 2017-2022 EPAM Systems, Inc. (https://www.epam.com/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools import io import logging import sys -_DEBUG_OPERATIONS = ['read', 'write', 'getxattr'] +from pipefuse.chain import ChainingService + +_DEBUG_INPUT_OPERATIONS = ['read', 'write', 'getxattr'] +_DEBUG_OUTPUT_OPERATIONS = ['getxattr', 'listxattr'] if sys.version_info >= (3, 0): _BYTE_TYPES = (bytearray, bytes) @@ -39,7 +57,11 @@ def _trimmed(value): return str(value) -class RecordingFS: +def _merge_outputs(outputs): + return str(outputs) + + +class RecordingFS(ChainingService): def __init__(self, inner): """ @@ -50,44 +72,40 @@ def __init__(self, inner): :param inner: Recording file system. """ self._inner = inner - self._tag = type(inner).__name__ + ' Recorder' + self._tag = type(inner).__name__ def __getattr__(self, name): - if hasattr(self._inner, name): - attr = getattr(self._inner, name) - if callable(attr): - def _wrapped_attr(method_name, *args, **kwargs): - complete_args_string = _merge_arguments(args, kwargs) - if method_name in _DEBUG_OPERATIONS: - logging.debug('[%s] %s (%s)' % (self._tag, method_name, complete_args_string)) - else: - logging.info('[%s] %s (%s)' % (self._tag, method_name, complete_args_string)) - return attr(method_name, *args, **kwargs) - return _wrapped_attr - else: - return attr - else: - return getattr(self._inner, name) - - def __call__(self, name, *args): - if hasattr(self._inner, name): - attr = getattr(self._inner, name) - if callable(attr): - def _wrapped_attr(*args, **kwargs): - complete_args_string = _merge_arguments(args, kwargs) - if name in _DEBUG_OPERATIONS: - logging.debug('[%s] %s (%s)' % (self._tag, name, complete_args_string)) - else: - logging.info('[%s] %s (%s)' % (self._tag, name, complete_args_string)) - return attr(*args, **kwargs) - return _wrapped_attr(*args) + if not hasattr(self._inner, name): + return None + attr = getattr(self._inner, name) + if not callable(attr): + return attr + return self._wrap(attr, name=name) + + def __call__(self, name, *args, **kwargs): + if not hasattr(self._inner, name): + return getattr(self, name)(*args, **kwargs) + attr = getattr(self._inner, name) + return self._wrap(attr, name=name)(*args, **kwargs) + + def _wrap(self, attr, name=None): + @functools.wraps(attr) + def _wrapped_attr(*args, **kwargs): + method_name = name or args[0] + complete_args_string = _merge_arguments(args, kwargs) + if method_name in _DEBUG_INPUT_OPERATIONS: + logging.debug('[%s Input Recorder] %s (%s)' % (self._tag, method_name, complete_args_string)) else: - return attr(*args) - else: - return getattr(self._inner, name)(*args) + logging.info('[%s Input Recorder] %s (%s)' % (self._tag, method_name, complete_args_string)) + outputs = attr(*args, **kwargs) + if method_name in _DEBUG_OUTPUT_OPERATIONS: + logging.debug('[%s Output Recorder] %s (%s) -> (%s)' % (self._tag, method_name, complete_args_string, + _merge_outputs(outputs))) + return outputs + return _wrapped_attr -class RecordingFileSystemClient: +class RecordingFileSystemClient(ChainingService): def __init__(self, inner): """ @@ -98,7 +116,7 @@ def __init__(self, inner): :param inner: Recording file system client. """ self._inner = inner - self._tag = type(inner).__name__ + ' Recorder' + self._tag = type(inner).__name__ def __getattr__(self, name): if hasattr(self._inner, name): @@ -106,7 +124,7 @@ def __getattr__(self, name): if callable(attr): def _wrapped_attr(*args, **kwargs): complete_args_string = _merge_arguments(args, kwargs) - logging.info('[%s] %s (%s)' % (self._tag, name, complete_args_string)) + logging.info('[%s Input Recorder] %s (%s)' % (self._tag, name, complete_args_string)) return attr(*args, **kwargs) return _wrapped_attr else: diff --git a/pipe-cli/mount/pipefuse/s3.py b/pipe-cli/mount/pipefuse/s3.py index 26bccc372e..a46f1a7e0a 100644 --- a/pipe-cli/mount/pipefuse/s3.py +++ b/pipe-cli/mount/pipefuse/s3.py @@ -1,4 +1,4 @@ -# Copyright 2017-2020 EPAM Systems, Inc. (https://www.epam.com/) +# Copyright 2017-2022 EPAM Systems, Inc. (https://www.epam.com/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -292,3 +292,36 @@ def _new_truncating_mpu(self, source_path, length): mpu = SplittingMultipartCopyUpload(mpu, min_part_size=self._min_part_size, max_part_size=self._max_part_size) mpu = TruncatingMultipartCopyUpload(mpu, length=length, min_part_number=self._min_chunk) return mpu + + def download_xattrs(self, path): + logging.info('Downloading tags for %s...' % path) + try: + response = self._s3.get_object_tagging(Bucket=self.bucket, Key=path) or {} + return {tag.get('Key'): tag.get('Value') for tag in response.get('TagSet', [])} + except Exception: + logging.debug('No tags have been found for %s' % path, exc_info=True) + return {} + + def upload_xattrs(self, path, xattrs): + logging.info('Uploading tags for %s...' % path) + self._s3.put_object_tagging(Bucket=self.bucket, Key=path, Tagging={ + 'TagSet': [{'Key': name, 'Value': value} for name, value in xattrs.items()] + }) + + def upload_xattr(self, path, name, value): + logging.info('Uploading tag %s for %s...' % (name, path)) + tags = self.download_xattrs(path) + tags[name] = value + self.upload_xattrs(path, tags) + + def remove_xattrs(self, path): + logging.info('Removing tags for %s...' % path) + self._s3.delete_object_tagging(Bucket=self.bucket, Key=path) + + def remove_xattr(self, path, name): + logging.info('Removing tag %s for %s...' % (name, path)) + tags = self.download_xattrs(path) + if name in tags: + del tags[name] + self.remove_xattrs(path) + self.upload_xattrs(path, tags) diff --git a/pipe-cli/mount/pipefuse/storage.py b/pipe-cli/mount/pipefuse/storage.py index de990f4973..87a00e094b 100644 --- a/pipe-cli/mount/pipefuse/storage.py +++ b/pipe-cli/mount/pipefuse/storage.py @@ -200,3 +200,33 @@ def flush(self, fh, path): def truncate(self, fh, path, length): source_path = path.lstrip(self._delimiter) self._inner.truncate(fh, source_path, length) + + def download_xattrs(self, path): + source_path = path.lstrip(self._delimiter) + if not source_path: + return {} + return self._inner.download_xattrs(source_path) + + def upload_xattrs(self, path, xattrs): + source_path = path.lstrip(self._delimiter) + if not source_path: + return + self._inner.upload_xattrs(source_path, xattrs) + + def upload_xattr(self, path, name, value): + source_path = path.lstrip(self._delimiter) + if not source_path: + return + self._inner.upload_xattr(source_path, name, value) + + def remove_xattrs(self, path): + source_path = path.lstrip(self._delimiter) + if not source_path: + return + self._inner.remove_xattrs(source_path) + + def remove_xattr(self, path, name): + source_path = path.lstrip(self._delimiter) + if not source_path: + return + self._inner.remove_xattr(source_path, name) diff --git a/pipe-cli/mount/pipefuse/webdav.py b/pipe-cli/mount/pipefuse/webdav.py index 83428c6721..00870951e8 100644 --- a/pipe-cli/mount/pipefuse/webdav.py +++ b/pipe-cli/mount/pipefuse/webdav.py @@ -1,4 +1,4 @@ -# Copyright 2017-2020 EPAM Systems, Inc. (https://www.epam.com/) +# Copyright 2017-2022 EPAM Systems, Inc. (https://www.epam.com/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -87,10 +87,10 @@ def is_available(self): self._send('OPTIONS', '/', 200, allow_redirects=False) return True except easywebdav.OperationFailed as e: - logging.error('WevDav is not available: %s' % str(e.reason)) + logging.exception('WevDav is not available: %s' % str(e.reason)) return False - except BaseException as e: - logging.error('WevDav is not available: %s' % str(e)) + except Exception: + logging.exception('WevDav is not available') return False def is_read_only(self): diff --git a/pipe-cli/mount/pipefuse/xattr.py b/pipe-cli/mount/pipefuse/xattr.py new file mode 100644 index 0000000000..419ed2df6e --- /dev/null +++ b/pipe-cli/mount/pipefuse/xattr.py @@ -0,0 +1,190 @@ +# Copyright 2017-2022 EPAM Systems, Inc. (https://www.epam.com/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools +import logging +from threading import RLock + +from pipefuse.chain import ChainingService +from pipefuse.fsclient import FileSystemClientDecorator, UnsupportedOperationException +from pipefuse.lock import synchronized + + +class ExtendedAttributesCache: + + def __init__(self, cache): + """ + Extended attributes cache. + + :param cache: Cache implementation. + """ + self._cache = cache + + def get(self, path): + xattrs = self._cache.get(path, None) + if xattrs: + return dict(xattrs) + + def set(self, path, xattrs): + self._cache[path] = xattrs + + def invalidate(self, path): + logging.info('Invalidating extended attributes cache for %s' % path) + self._cache.pop(path, None) + + +class ThreadSafeExtendedAttributesCache: + + def __init__(self, inner, lock=None): + """ + Thread safe extended attributes cache. + + :param inner: Not thread safe extended attributes cache. + :param lock: Reentrant lock. + """ + self._inner = inner + self._lock = lock or RLock() + + @synchronized + def get(self, path): + return self._inner.get(path) + + @synchronized + def set(self, path, xattrs): + self._inner.set(path, xattrs) + + @synchronized + def invalidate(self, path): + self._inner.invalidate(path) + + +class ExtendedAttributesCachingFileSystemClient(FileSystemClientDecorator): + + def __init__(self, inner, cache): + """ + Extended attributes caching file system client decorator. + + It caches extended attribute calls to reduce a number of calls to an inner file system client. + + :param inner: Decorating file system client. + :param cache: Extended attributes cache. + """ + super(ExtendedAttributesCachingFileSystemClient, self).__init__(inner) + self._inner = inner + self._cache = cache + + def download_xattrs(self, path): + logging.info('Getting extended attributes for %s...' % path) + xattrs = self._cache.get(path) + if xattrs is not None: + logging.info('Cached extended attributes found for %s' % path) + else: + logging.info('Cached extended attributes not found for %s' % path) + xattrs = self._uncached_download_xattrs(path) + return xattrs + + def _uncached_download_xattrs(self, path): + logging.info('Downloading extended attributes for %s...' % path) + xattrs = self._inner.download_xattrs(path) or {} + self._cache.set(path, xattrs) + return xattrs + + def upload_xattrs(self, path, xattrs): + self._inner.upload_xattrs(path, xattrs) + self._cache.invalidate(path) + + def upload_xattr(self, path, name, value): + self._inner.upload_xattr(path, name, value) + self._cache.invalidate(path) + + def remove_xattrs(self, path): + self._inner.remove_xattrs(path) + self._cache.invalidate(path) + + def remove_xattr(self, path, name): + self._inner.remove_xattr(path, name) + self._cache.invalidate(path) + + +class RestrictingExtendedAttributesFS(ChainingService): + + def __init__(self, inner, include_prefixes=None, exclude_prefixes=None): + """ + Restricting extended attributes File System. + + It allows only certain extended attributes processing. + + :param inner: Decorating file system. + :param include_prefixes: Including extended attribute prefixes. + :param exclude_prefixes: Excluding extended attribute prefixes. + """ + self._inner = inner + self._include_prefixes = include_prefixes or [] + self._exclude_prefixes = exclude_prefixes or [] + self._operations = ['setxattr', 'getxattr', 'removexattr'] + + def __getattr__(self, name): + if not hasattr(self._inner, name): + return None + attr = getattr(self._inner, name) + if not callable(attr): + return attr + return self._wrap(attr, name=name) + + def __call__(self, name, *args, **kwargs): + if not hasattr(self._inner, name): + return getattr(self, name)(*args, **kwargs) + attr = getattr(self._inner, name) + return self._wrap(attr, name=name)(*args, **kwargs) + + def _wrap(self, attr, name=None): + @functools.wraps(attr) + def _wrapped_attr(*args, **kwargs): + method_name = name or args[0] + if method_name in self._operations: + xattr_name = kwargs.get('name') or args[1] + if self._include_prefixes: + if not any(xattr_name.startswith(prefix) for prefix in self._include_prefixes): + logging.debug('Aborting unincluded extended attribute %s processing...', xattr_name) + raise UnsupportedOperationException() + if self._exclude_prefixes: + if any(xattr_name.startswith(prefix) for prefix in self._exclude_prefixes): + logging.debug('Aborting excluded extended attribute %s processing...', xattr_name) + raise UnsupportedOperationException() + return attr(*args, **kwargs) + elif method_name == 'listxattr': + xattrs = attr(*args, **kwargs) or [] + filtered_xattrs = [] + for xattr_name in xattrs: + if self._include_prefixes and not any(xattr_name.startswith(prefix) + for prefix in self._include_prefixes): + logging.debug('Filtering out unincluded extended attribute %s...', xattr_name) + continue + if self._exclude_prefixes and any(xattr_name.startswith(prefix) + for prefix in self._exclude_prefixes): + logging.debug('Filtering out excluded extended attribute %s...', xattr_name) + continue + filtered_xattrs.append(xattr_name) + return filtered_xattrs + else: + return attr(*args, **kwargs) + return _wrapped_attr + + def parameters(self): + params = {} + if self._include_prefixes: + params['include'] = ','.join(self._include_prefixes) + if self._exclude_prefixes: + params['exclude'] = ','.join(self._exclude_prefixes) + return params