Skip to content

Commit

Permalink
Add support for extended attributes (#2848)
Browse files Browse the repository at this point in the history
  • Loading branch information
tcibinan committed Oct 3, 2022
1 parent 711f7f0 commit 4bcbd49
Show file tree
Hide file tree
Showing 16 changed files with 670 additions and 147 deletions.
13 changes: 13 additions & 0 deletions pipe-cli/mount/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2017-2022 EPAM Systems, Inc. (https://www.epam.com/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
140 changes: 101 additions & 39 deletions pipe-cli/mount/pipe-fuse.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2017-2020 EPAM Systems, Inc. (https://www.epam.com/)
# Copyright 2017-2022 EPAM Systems, Inc. (https://www.epam.com/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -13,6 +13,7 @@
# limitations under the License.

import argparse
import ctypes
import errno
import logging
import os
Expand All @@ -21,6 +22,7 @@
import traceback

import future.utils
from cachetools import TTLCache


def is_windows():
Expand All @@ -43,56 +45,60 @@ def is_windows():
if os.path.exists(libfuse_path):
os.environ["FUSE_LIBRARY_PATH"] = libfuse_path

import fuse
from fuse import FUSE, fuse_operations, fuse_file_info, c_utimbuf

from pipefuse.fuseutils import MB, GB
from pipefuse.cache import CachingFileSystemClient, ListingCache, ThreadSafeListingCache
from pipefuse.api import CloudPipelineClient, CloudType
from pipefuse.buffread import BufferingReadAheadFileSystemClient
from pipefuse.buffwrite import BufferingWriteFileSystemClient
from pipefuse.cache import ListingCache, ThreadSafeListingCache, \
CachingListingFileSystemClient
from pipefuse.fslock import get_lock
from pipefuse.fuseutils import MB, GB
from pipefuse.gcp import GoogleStorageLowLevelFileSystemClient
from pipefuse.path import PathExpandingStorageFileSystemClient
from pipefuse.pipefs import PipeFS, SupportedOperationsFS
from pipefuse.record import RecordingFileSystemClient, RecordingFS
from pipefuse.s3 import S3StorageLowLevelClient
from pipefuse.storage import StorageHighLevelFileSystemClient
from pipefuse.trunc import CopyOnDownTruncateFileSystemClient, \
WriteNullsOnUpTruncateFileSystemClient, \
WriteLastNullOnUpTruncateFileSystemClient
from pipefuse.api import CloudPipelineClient, CloudType
from pipefuse.gcp import GoogleStorageLowLevelFileSystemClient
from pipefuse.webdav import CPWebDavClient
from pipefuse.s3 import S3StorageLowLevelClient
from pipefuse.storage import StorageHighLevelFileSystemClient
from pipefuse.pipefs import PipeFS
from pipefuse.record import RecordingFS, RecordingFileSystemClient
from pipefuse.path import PathExpandingStorageFileSystemClient
from pipefuse.fslock import get_lock
import ctypes
import fuse
from fuse import FUSE, fuse_operations, fuse_file_info, c_utimbuf
from cachetools import TTLCache
from pipefuse.xattr import ExtendedAttributesCache, ThreadSafeExtendedAttributesCache, \
ExtendedAttributesCachingFileSystemClient, RestrictingExtendedAttributesFS

_allowed_logging_level_names = ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG', 'NOTSET']
_allowed_logging_levels = future.utils.lfilter(lambda name: isinstance(name, str), _allowed_logging_level_names)
_allowed_logging_levels_string = ', '.join(_allowed_logging_levels)
_default_logging_level = 'ERROR'
_debug_logging_level = 'DEBUG'
_info_logging_level = 'INFO'
_xattrs_operations = ['setxattr', 'getxattr', 'listxattr', 'removexattr']
_xattrs_include_prefix = 'user'


def start(mountpoint, webdav, bucket,
read_buffer_size, read_ahead_min_size, read_ahead_max_size, read_ahead_size_multiplier,
write_buffer_size, trunc_buffer_size, chunk_size,
cache_ttl, cache_size, default_mode,
mount_options=None, threads=False, monitoring_delay=600, recording=False):
if mount_options is None:
mount_options = {}
listing_cache_ttl, listing_cache_size,
xattrs_include_prefixes, xattrs_exclude_prefixes,
xattrs_cache_ttl, xattrs_cache_size,
disabled_operations, default_mode,
mount_options, threads=False, monitoring_delay=600, recording=False):
try:
os.makedirs(mountpoint)
except OSError as e:
if e.errno != errno.EEXIST:
raise

api = os.environ.get('API', '')
bearer = os.environ.get('API_TOKEN', '')
chunk_size = int(os.environ.get('CP_PIPE_FUSE_CHUNK_SIZE', chunk_size))
read_ahead_min_size = int(os.environ.get('CP_PIPE_FUSE_READ_AHEAD_MIN_SIZE', read_ahead_min_size))
read_ahead_max_size = int(os.environ.get('CP_PIPE_FUSE_READ_AHEAD_MAX_SIZE', read_ahead_max_size))
read_ahead_size_multiplier = int(os.environ.get('CP_PIPE_FUSE_READ_AHEAD_SIZE_MULTIPLIER',
read_ahead_size_multiplier))
api = os.getenv('API', '')
bearer = os.getenv('API_TOKEN', '')
chunk_size = int(os.getenv('CP_PIPE_FUSE_CHUNK_SIZE', chunk_size))
read_ahead_min_size = int(os.getenv('CP_PIPE_FUSE_READ_AHEAD_MIN_SIZE', read_ahead_min_size))
read_ahead_max_size = int(os.getenv('CP_PIPE_FUSE_READ_AHEAD_MAX_SIZE', read_ahead_max_size))
read_ahead_size_multiplier = int(os.getenv('CP_PIPE_FUSE_READ_AHEAD_SIZE_MULTIPLIER',
read_ahead_size_multiplier))
bucket_type = None
root_path = None
if not bearer:
Expand Down Expand Up @@ -120,14 +126,23 @@ def start(mountpoint, webdav, bucket,
client = RecordingFileSystemClient(client)
if bucket_type in [CloudType.S3, CloudType.GS]:
client = PathExpandingStorageFileSystemClient(client, root_path=root_path)
if cache_ttl > 0 and cache_size > 0:
cache_implementation = TTLCache(maxsize=cache_size, ttl=cache_ttl)
cache = ListingCache(cache_implementation)
if listing_cache_ttl > 0 and listing_cache_size > 0:
listing_cache_implementation = TTLCache(maxsize=listing_cache_size, ttl=listing_cache_ttl)
listing_cache = ListingCache(listing_cache_implementation)
if threads:
cache = ThreadSafeListingCache(cache)
client = CachingFileSystemClient(client, cache)
listing_cache = ThreadSafeListingCache(listing_cache)
client = CachingListingFileSystemClient(client, listing_cache)
else:
logging.info('Caching is disabled.')
logging.info('Listing caching is disabled.')
if bucket_type == CloudType.S3:
if xattrs_cache_ttl > 0 and xattrs_cache_size > 0:
xattrs_cache_implementation = TTLCache(maxsize=xattrs_cache_size, ttl=xattrs_cache_ttl)
xattrs_cache = ExtendedAttributesCache(xattrs_cache_implementation)
if threads:
xattrs_cache = ThreadSafeExtendedAttributesCache(xattrs_cache)
client = ExtendedAttributesCachingFileSystemClient(client, xattrs_cache)
else:
logging.info('Extended attributes caching is disabled.')
if read_buffer_size > 0:
client = BufferingReadAheadFileSystemClient(client,
read_ahead_min_size=read_ahead_min_size,
Expand All @@ -151,12 +166,30 @@ def start(mountpoint, webdav, bucket,
client = WriteNullsOnUpTruncateFileSystemClient(client, capacity=trunc_buffer_size)
else:
logging.info('Truncating support is disabled.')
logging.info('File system clients pipeline: %s', client.stats())

fs = PipeFS(client=client, lock=get_lock(threads, monitoring_delay=monitoring_delay), mode=int(default_mode, 8))
if bucket_type == CloudType.S3:
if xattrs_include_prefixes:
if xattrs_include_prefixes[0] == '*':
logging.info('All extended attributes will be processed.')
else:
fs = RestrictingExtendedAttributesFS(fs, include_prefixes=xattrs_include_prefixes)
if xattrs_exclude_prefixes:
if xattrs_exclude_prefixes[0] == '*':
logging.info('All extended attributes operations will be disabled.')
disabled_operations.extend(_xattrs_operations)
else:
fs = RestrictingExtendedAttributesFS(fs, exclude_prefixes=xattrs_exclude_prefixes)
else:
logging.info('All extended attributes operations will be disabled.')
disabled_operations.extend(_xattrs_operations)
fs = SupportedOperationsFS(fs, exclude=disabled_operations)
if recording:
fs = RecordingFS(fs)

logging.info('Initializing file system.')
logging.info('File system processing chain: \n%s', fs.summary())

logging.info('Initializing file system...')
enable_additional_operations()
ro = client.is_read_only() or mount_options.get('ro', False)
mount_options.pop('ro', None)
Expand Down Expand Up @@ -270,10 +303,31 @@ def parse_mount_options(options_string):
parser.add_argument("-c", "--chunk-size", type=int, required=False, default=10 * MB,
help="Multipart upload chunk size. Can be also specified via "
"CP_PIPE_FUSE_CHUNK_SIZE environment variable.")
parser.add_argument("-t", "--cache-ttl", type=int, required=False, default=60,
parser.add_argument("-t", "--cache-ttl", "--listing-cache-ttl", dest="listing_cache_ttl",
type=int, required=False, default=60,
help="Listing cache time to live, seconds")
parser.add_argument("-s", "--cache-size", type=int, required=False, default=100,
parser.add_argument("-s", "--cache-size", "--listing-cache-size", dest="listing_cache_size",
type=int, required=False, default=100,
help="Number of simultaneous listing caches")
parser.add_argument("--xattrs-include-prefix", dest="xattrs_include_prefixes",
type=str, required=False, action="append", default=[],
help="Extended attribute prefixes to be included in processing. "
"Use --xattrs-include-prefix=\"*\" to enable all extended attributes processing. "
"The argument can be specified multiple times. "
"Defaults to \"%s\"." % _xattrs_include_prefix)
parser.add_argument("--xattrs-exclude-prefix", dest="xattrs_exclude_prefixes",
type=str, required=False, action="append", default=[],
help="Extended attribute prefixes to be excluded from processing. "
"Use --xattrs-exclude-prefix=\"*\" to disable all extended attributes processing. "
"The argument can be specified multiple times.")
parser.add_argument("--xattrs-cache-ttl", type=int, required=False, default=60,
help="Extended attributes cache time to live, seconds.")
parser.add_argument("--xattrs-cache-size", type=int, required=False, default=1000,
help="Number of simultaneous extended attributes caches.")
parser.add_argument("--disabled-operation", dest="disabled_operations",
type=str, required=False, action="append", default=[],
help="Disabled file system operations. "
"The argument can be specified multiple times.")
parser.add_argument("-m", "--mode", type=str, required=False, default="700",
help="Default mode for files")
parser.add_argument("-o", "--options", type=str, required=False,
Expand All @@ -285,6 +339,10 @@ def parse_mount_options(options_string):
help="Delay between path lock monitoring cycles.")
args = parser.parse_args()

if args.xattrs_include_prefixes and args.xattrs_exclude_prefixes:
parser.error('Either --xattrs-include-prefix or --xattrs-exclude-prefix parameter should be specified.')
if not args.xattrs_include_prefixes and not args.xattrs_exclude_prefixes:
args.xattrs_include_prefixes = [_xattrs_include_prefix]
if not args.webdav and not args.bucket:
parser.error('Either --webdav or --bucket parameter should be specified.')
if args.bucket and (args.chunk_size < 5 * MB or args.chunk_size > 5 * GB):
Expand All @@ -308,10 +366,14 @@ def parse_mount_options(options_string):
read_ahead_size_multiplier=args.read_ahead_size_multiplier,
write_buffer_size=args.write_buffer_size, trunc_buffer_size=args.trunc_buffer_size,
chunk_size=args.chunk_size,
cache_ttl=args.cache_ttl, cache_size=args.cache_size,
listing_cache_ttl=args.listing_cache_ttl, listing_cache_size=args.listing_cache_size,
xattrs_include_prefixes=args.xattrs_include_prefixes,
xattrs_exclude_prefixes=args.xattrs_exclude_prefixes,
xattrs_cache_ttl=args.xattrs_cache_ttl, xattrs_cache_size=args.xattrs_cache_size,
disabled_operations=args.disabled_operations,
default_mode=args.mode, mount_options=parse_mount_options(args.options),
threads=args.threads, monitoring_delay=args.monitoring_delay, recording=recording)
except BaseException as e:
logging.error('Unhandled error: %s' % str(e))
except Exception:
logging.exception('Unhandled error')
traceback.print_exc()
sys.exit(1)
4 changes: 1 addition & 3 deletions pipe-cli/mount/pipefuse/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2017-2019 EPAM Systems, Inc. (https://www.epam.com/)
# Copyright 2017-2022 EPAM Systems, Inc. (https://www.epam.com/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -11,5 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

__all__ = ['webdavfs.py', 'webdav.py']
11 changes: 4 additions & 7 deletions pipe-cli/mount/pipefuse/buffread.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2017-2020 EPAM Systems, Inc. (https://www.epam.com/)
# Copyright 2017-2022 EPAM Systems, Inc. (https://www.epam.com/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -18,9 +18,6 @@
from pipefuse.fsclient import FileSystemClientDecorator


_ANY_ERROR = BaseException


class _ReadBuffer:

def __init__(self, offset, capacity, file_size):
Expand Down Expand Up @@ -122,10 +119,10 @@ def download_range(self, fh, buf, path, offset=0, length=0):
file_buf = self._new_read_buf(fh, path, file_buf.file_size, offset, length)
self._buffs[buf_key] = file_buf
buf.write(file_buf.view(offset, length))
except _ANY_ERROR:
except Exception:
logging.exception('Downloading has failed for %d:%s. '
'Removing the corresponding buffer.' % (fh, path))
self._remove_read_buf(fh, path)
self._remove_buf(fh, path)
raise

def _new_read_buf(self, fh, path, file_size, offset, length):
Expand All @@ -145,7 +142,7 @@ def flush(self, fh, path):
logging.info('Flushing the corresponding buffer for %d:%s' % (fh, path))
self._inner.flush(fh, path)
self._remove_buf(fh, path)
except _ANY_ERROR:
except Exception:
logging.exception('Flushing has failed for %d:%s. '
'Removing the corresponding buffer.' % (fh, path))
self._remove_buf(fh, path)
Expand Down
9 changes: 3 additions & 6 deletions pipe-cli/mount/pipefuse/buffwrite.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2017-2020 EPAM Systems, Inc. (https://www.epam.com/)
# Copyright 2017-2022 EPAM Systems, Inc. (https://www.epam.com/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -17,9 +17,6 @@
from pipefuse.fsclient import FileSystemClientDecorator


_ANY_ERROR = BaseException


class _WriteBuffer:

def __init__(self, offset, capacity, file_size=0):
Expand Down Expand Up @@ -114,7 +111,7 @@ def upload_range(self, fh, buf, path, offset=0):
self._remove_write_buf(fh, path)
file_buf = self._new_write_buf(self._capacity, file_buf.end, buf=None, old_write_buf=file_buf)
self._buffs[path] = file_buf
except _ANY_ERROR:
except Exception:
logging.exception('Uploading has failed for %d:%s. '
'Removing the corresponding buffer.' % (fh, path))
self._remove_write_buf(fh, path)
Expand All @@ -132,7 +129,7 @@ def flush(self, fh, path):
self._flush_write_buf(fh, path)
self._inner.flush(fh, path)
self._remove_write_buf(fh, path)
except _ANY_ERROR:
except Exception:
logging.exception('Flushing has failed for %d:%s. '
'Removing the corresponding buffer.' % (fh, path))
self._remove_write_buf(fh, path)
Expand Down
Loading

0 comments on commit 4bcbd49

Please sign in to comment.