Skip to content

Commit

Permalink
[performance] Forward underlying archive block sizes to statfs and st…
Browse files Browse the repository at this point in the history
…at implementations

This is especially important for Lustre after disabling the buffering
for calls through FUSE in the earlier commit. Without this, we would now
have 8K reads to Lustre that has huge latencies and should function more
optimally with 4 MiB reads.

See also the proposal to increase the default buffer size of 8K:
python/cpython#117151
  • Loading branch information
mxmlnkn committed Sep 14, 2024
1 parent 1457684 commit e84b180
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 18 deletions.
10 changes: 8 additions & 2 deletions core/ratarmountcore/AutoMountLayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
import traceback

from dataclasses import dataclass
from typing import Dict, IO, Iterable, List, Optional, Tuple, Union
from typing import Any, Dict, IO, Iterable, List, Optional, Tuple, Union

from .compressions import stripSuffixFromTarFile
from .factory import openMountSource
from .FolderMountSource import FolderMountSource
from .MountSource import FileInfo, MountSource
from .MountSource import FileInfo, MountSource, mergeStatfs
from .SQLiteIndexedTar import SQLiteIndexedTar, SQLiteIndexedTarUserData
from .utils import overrides

Expand Down Expand Up @@ -322,6 +322,12 @@ def getMountSource(self, fileInfo: FileInfo) -> Tuple[str, MountSource, FileInfo
deeperMountPoint, deeperMountSource, deeperFileInfo = mountSource.getMountSource(sourceFileInfo)
return os.path.join(mountPoint, deeperMountPoint.lstrip('/')), deeperMountSource, deeperFileInfo

@overrides(MountSource)
def statfs(self) -> Dict[str, Any]:
return mergeStatfs(
[mountInfo.mountSource.statfs() for _, mountInfo in self.mounted.items()], printDebug=self.printDebug
)

@overrides(MountSource)
def __exit__(self, exception_type, exception_value, exception_traceback):
for _, mountInfo in self.mounted.items():
Expand Down
6 changes: 5 additions & 1 deletion core/ratarmountcore/FileVersionLayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import stat


from typing import Dict, IO, Iterable, Optional, Tuple, Union
from typing import Any, Dict, IO, Iterable, Optional, Tuple, Union

from .MountSource import FileInfo, MountSource
from .utils import overrides
Expand Down Expand Up @@ -236,3 +236,7 @@ def getMountSource(self, fileInfo: FileInfo) -> Tuple[str, MountSource, FileInfo
@overrides(MountSource)
def __exit__(self, exception_type, exception_value, exception_traceback):
self.mountSource.__exit__(exception_type, exception_value, exception_traceback)

@overrides(MountSource)
def statfs(self) -> Dict[str, Any]:
return self.mountSource.statfs()
23 changes: 22 additions & 1 deletion core/ratarmountcore/FolderMountSource.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import os
import stat
from typing import Dict, IO, Iterable, Optional, Union
from typing import Any, Dict, IO, Iterable, Optional, Union

from .MountSource import FileInfo, MountSource
from .utils import overrides
Expand Down Expand Up @@ -32,6 +32,7 @@ class FolderMountSource(MountSource):

def __init__(self, path: str) -> None:
self.root: str = path
self._statfs = FolderMountSource._getStatfsForFolder(self.root)

def setFolderDescriptor(self, fd: int) -> None:
"""
Expand All @@ -41,6 +42,22 @@ def setFolderDescriptor(self, fd: int) -> None:
"""
os.fchdir(fd)
self.root = '.'
self._statfs = FolderMountSource._getStatfsForFolder(self.root)

@staticmethod
def _getStatfsForFolder(path: str):
result = os.statvfs(path)
return {
'f_bsize': result.f_bsize,
'f_frsize': result.f_frsize,
'f_blocks': result.f_blocks,
'f_bfree': 0,
'f_bavail': 0,
'f_files': result.f_files,
'f_ffree': 0,
'f_favail': 0,
'f_namemax': result.f_namemax,
}

def _realpath(self, path: str) -> str:
"""Path given relative to folder root. Leading '/' is acceptable"""
Expand Down Expand Up @@ -149,6 +166,10 @@ def open(self, fileInfo: FileInfo, buffering=-1) -> IO[bytes]:
except Exception as e:
raise ValueError(f"Specified path '{realpath}' is not a file that can be read!") from e

@overrides(MountSource)
def statfs(self) -> Dict[str, Any]:
return self._statfs.copy()

@overrides(MountSource)
def __exit__(self, exception_type, exception_value, exception_traceback):
pass
Expand Down
3 changes: 0 additions & 3 deletions core/ratarmountcore/LibarchiveMountSource.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ def __init__(self, archive, entryIndex: int, encoding: str = 'utf-8'):
self.entryIndex = entryIndex
self._fileInfoRow: Optional[Tuple] = None

if self.eof:
return

def __del__(self):
laffi.entry_free(self._entry)

Expand Down
33 changes: 33 additions & 0 deletions core/ratarmountcore/MountSource.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ def open(self, fileInfo: FileInfo, buffering=-1) -> IO[bytes]:
a default buffer size equal to the file(system)'s block size or Python's io.DEFAULT_BUFFER_SIZE.
"""

def statfs(self) -> Dict[str, Any]:
"""
Returns a dictionary with keys named like the POSIX statvfs struct.
https://pubs.opengroup.org/onlinepubs/009695399/basedefs/sys/statvfs.h.html
Keys may be missing. Either an empty dictionary should be returned, or at least f_bsize and f_namemax
should be initialized because that's what libfuse returns per default if the statfs call is not implemented.
If statfs is not implemented / the returned dictionary call is empty, libfuse will return default values:
{'f_bsize': 512, 'f_namemax': 255}
https://github.com/libfuse/libfuse/blob/373ddc7eae7b0c684fc4ab29d8addfa3b9e99e1e/lib/fuse.c#L1962-L1975
"""
return {}

def fileVersions(self, path: str) -> int:
return 1 if self.exists(path) else 0

Expand Down Expand Up @@ -124,3 +136,24 @@ def createRootFileInfo(userdata: List[Any]):
userdata = userdata,
# fmt: on
)


def mergeStatfs(values: Iterable[Dict[str, Any]], printDebug: int = 0):
result = {}
for statfs in values:
for key, value in statfs.items():
if key not in result:
result[key] = value
continue

if key in ('f_bsize', 'f_frsize'):
result[key] = max(result[key], value)
continue

if key == 'f_namemax':
result[key] = min(result[key], value)
continue

if result[key] != value and printDebug >= 1:
print(f"[Warning] Failed to merge statfs values ({value}, {result[key]}) for key: {key}.")
return result
11 changes: 11 additions & 0 deletions core/ratarmountcore/SQLiteIndexedTar.py
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,17 @@ def read(self, fileInfo: FileInfo, size: int, offset: int) -> bytes:
self.tarFileObject.seek(tarFileInfo.offset + offset, os.SEEK_SET)
return self.tarFileObject.read(size)

@overrides(MountSource)
def statfs(self) -> Dict[str, Any]:
return {
'f_bsize': self.blockSize,
'f_frsize': self.blockSize,
'f_bfree': 0,
'f_bavail': 0,
'f_ffree': 0,
'f_favail': 0,
}

@staticmethod
def _getPastEndOffset(sqlConnection: sqlite3.Connection) -> Optional[int]:
"""
Expand Down
26 changes: 25 additions & 1 deletion core/ratarmountcore/SingleFileMountSource.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
# -*- coding: utf-8 -*-

import io
import os
import stat
import threading
import time
from typing import cast, Dict, IO, Iterable, Optional, Union
from typing import Any, cast, Dict, IO, Iterable, Optional, Union

from .MountSource import FileInfo, MountSource
from .StenciledFile import RawStenciledFile, StenciledFile
Expand All @@ -29,6 +30,25 @@ def __init__(self, path: str, fileobj: IO[bytes]):
self.mtime = int(time.time())
self.size: int = self.fileobj.seek(0, io.SEEK_END)

fileno = None
try:
fileno = self.fileobj.fileno()
except Exception:
pass

self._statfs = {}
if fileno is not None:
statfs = os.fstat(fileno)
self._statfs = {
'f_bsize': statfs.st_blksize,
'f_frsize': statfs.st_blksize,
'f_blocks': statfs.st_blocks,
'f_bfree': 0,
'f_bavail': 0,
'f_ffree': 0,
'f_favail': 0,
}

def _createFileInfo(self):
# This must be a function and cannot be cached into a member in order to avoid userdata being a shared list!
return FileInfo(
Expand Down Expand Up @@ -93,6 +113,10 @@ def open(self, fileInfo: FileInfo, buffering=-1) -> IO[bytes]:
def isImmutable(self) -> bool:
return True

@overrides(MountSource)
def statfs(self) -> Dict[str, Any]:
return self._statfs.copy()

@overrides(MountSource)
def __exit__(self, exception_type, exception_value, exception_traceback):
self.fileobj.close()
Expand Down
19 changes: 19 additions & 0 deletions core/ratarmountcore/SquashFSMountSource.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,25 @@ def open(self, fileInfo: FileInfo, buffering=-1) -> IO[bytes]:
assert isinstance(extendedFileInfo, SQLiteIndexedTarUserData)
return self.image.open(self.image.read_inode(extendedFileInfo.offsetheader))

@overrides(MountSource)
def statfs(self) -> Dict[str, Any]:
blockSize = 512
try:
blockSize = os.fstat(self.rawFileObject.fileno()).st_blksize
except Exception:
pass

blockSize = max(blockSize, self.image._sblk.block_size)
return {
'f_bsize': blockSize,
'f_frsize': blockSize,
'f_bfree': 0,
'f_bavail': 0,
'f_ffree': 0,
'f_favail': 0,
'f_namemax': 256,
}

def _tryToOpenFirstFile(self):
# Get first row that has the regular file bit set in mode (stat.S_IFREG == 32768 == 1<<15).
result = self.index.getConnection().execute(
Expand Down
8 changes: 6 additions & 2 deletions core/ratarmountcore/UnionMountSource.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import stat
import time

from typing import Dict, Iterable, IO, List, Optional, Set, Tuple, Union
from typing import Any, Dict, Iterable, IO, List, Optional, Set, Tuple, Union

from .MountSource import FileInfo, MountSource, createRootFileInfo
from .MountSource import FileInfo, MountSource, createRootFileInfo, mergeStatfs
from .utils import overrides


Expand Down Expand Up @@ -221,6 +221,10 @@ def getMountSource(self, fileInfo: FileInfo) -> Tuple[str, MountSource, FileInfo
# the mount point path returned by getMountSource to the mount point '/'.
return mountSource.getMountSource(sourceFileInfo)

@overrides(MountSource)
def statfs(self) -> Dict[str, Any]:
return mergeStatfs([mountSource.statfs() for mountSource in self.mountSources], printDebug=self.printDebug)

@overrides(MountSource)
def __exit__(self, exception_type, exception_value, exception_traceback):
for mountSource in self.mountSources:
Expand Down
45 changes: 37 additions & 8 deletions ratarmount.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ def fsync(self, path, datasync, fh):

@overrides(fuse.Operations)
def statfs(self, path):
return self._statfs
return self._statfs.copy()


class FuseMount(fuse.Operations):
Expand All @@ -512,6 +512,11 @@ class FuseMount(fuse.Operations):
This is why MountSource also should expect leading slashes in all paths.
"""

# Use a relatively large minimum 256 KiB block size to get filesystem users to use larger reads
# because reads have a relative large overhead because of the fusepy, libfuse, kernel FUSE, SQLite,
# ratarmountcore, StenciledFile, and other layers they have to go through.
MINIMUM_BLOCK_SIZE = 256 * 1024

def __init__(self, pathToMount: Union[str, List[str]], mountPoint: str, **options) -> None:
self.mountPoint = os.path.realpath(mountPoint)
# This check is important for the self-bind test below, which assumes a folder.
Expand Down Expand Up @@ -649,8 +654,6 @@ def createMultiMount() -> MountSource:
self.mknod = self.writeOverlay.mknod
self.truncate = self.writeOverlay.truncate

self.statfs = self.writeOverlay.statfs

# Create mount point if it does not exist
self.mountPointWasCreated = False
if mountPoint and not os.path.exists(mountPoint):
Expand Down Expand Up @@ -722,11 +725,13 @@ def _fileInfoToDict(fileInfo: FileInfo):
statDict['st_mtime'] = int(statDict['st_mtime'])
statDict['st_nlink'] = 1 # TODO: this is wrong for files with hardlinks

# du by default sums disk usage (the number of blocks used by a file)
# instead of file size directly. Tar files are usually a series of 512B
# blocks, so we report a 1-block header + ceil(filesize / 512).
statDict['st_blksize'] = 512
statDict['st_blocks'] = 1 + ((fileInfo.size + 511) // 512)
# `du` sums disk usage (the number of blocks used by a file) instead of the file sizes by default.
# So, we need to return some valid values. Tar files are usually a series of 512 B blocks, but this
# block size is also used by Python as the default read call size, so it should be something larger
# for better performance.
blockSize = FuseMount.MINIMUM_BLOCK_SIZE
statDict['st_blksize'] = blockSize
statDict['st_blocks'] = 1 + ((fileInfo.size + blockSize - 1) // blockSize)

return statDict

Expand Down Expand Up @@ -864,6 +869,30 @@ def fsync(self, path, datasync, fh):
self.writeOverlay.fsync(path, datasync, self._resolveFileHandle(fh))
return 0 # Nothing to flush, so return success

@overrides(fuse.Operations)
def statfs(self, path):
# The filesystem block size is used, e.g., by Python as the default buffer size and therefore the
# default (p)read size when possible. For network file systems such as Lustre, or block compression
# such as in SquashFS, this proved to be highly insufficient to reach optimal performance!
# Note that there are some efforts to get rid of Python's behavior to use the block size and to
# increase the fixed default buffer size:
# https://github.com/python/cpython/issues/117151
if self.writeOverlay:
# Merge the block size from other mount sources while throwing away b_free and similar members
# that are set to 0 because those are read-only mount sources.
keys = ['f_bsize', 'f_frsize']
result = self.writeOverlay.statfs(path).copy()
result.update({key: value for key, value in self.mountSource.statfs().items() if key in keys})

result = self.mountSource.statfs()

# Use a relatively large minimum 256 KiB block size to direct filesystem users to use larger reads
# because they have a relative large overhead because of the fusepy, libfuse, kernel FUSE, SQLite,
# ratarmountcore, StenciledFile, and other layers.
for key in ['f_bsize', 'f_frsize']:
result[key] = max(result.get(key, 0), FuseMount.MINIMUM_BLOCK_SIZE)
return result


def checkInputFileType(
tarFile: str, encoding: str = tarfile.ENCODING, printDebug: int = 0
Expand Down

0 comments on commit e84b180

Please sign in to comment.