diff --git a/Lib/os.py b/Lib/os.py index 598c9e502301f7..0b6b8f38fc242b 100644 --- a/Lib/os.py +++ b/Lib/os.py @@ -432,6 +432,79 @@ def walk(top, topdown=True, onerror=None, followlinks=False): __all__.append("walk") +def _get_dir_entry(entry_path): + # TODO: Create a single DirEntry properly + # This is an ugly hack and won't work for fs root dir + scandir_it = scandir(path.dirname(entry_path)) + with scandir_it: + for entry in scandir_it: + if entry.path == entry_path: + return entry + else: + raise FileNotFoundError(f"No such file or directory: {entry_path}") + +def scantree(top, top_down=True, on_error=None, follow_symlinks=False): + sys.audit("os.scantree", top, top_down, on_error, follow_symlinks) + + try: + top_entry = _get_dir_entry(top) + except OSError as error: + if on_error is not None: + on_error(error) + return + + paths = [top_entry] + while paths: + top_entry = paths.pop() + if isinstance(top_entry, tuple): + yield top_entry + continue + + try: + scandir_it = scandir(top_entry) + except OSError as error: + if on_error is not None: + on_error(error) + continue + + with scandir_it: + dirs = [] + nondirs = [] + for entry in scandir_it: + try: + is_dir = entry.is_dir(follow_symlinks=follow_symlinks) + except OSError: + is_dir = False + + if is_dir: + dirs.append(entry) + else: + nondirs.append(entry) + + if top_down: + dirnames = yield top_entry, dirs.copy(), nondirs + else: + paths.append((top_entry, dirs, nondirs)) + + if dirnames is not None: + dir_map = {e.name: e for e in dirs} + dirs = [] + for dirname in dirnames: + try: + dirs.append(dir_map[dirname]) + except KeyError: + try: + dirs.append(_get_dir_entry(path.join(top_entry.path, dirname))) + except OSError as error: + if on_error is not None: + on_error(error) + yield None + + for new_path in reversed(dirs): + paths.append(new_path) + +__all__.append("scantree") + if {open, stat} <= supports_dir_fd and {scandir, stat} <= supports_fd: def fwalk(top=".", topdown=True, onerror=None, *, follow_symlinks=False, dir_fd=None): @@ -540,6 +613,101 @@ def _fwalk(topfd, toppath, isbytes, topdown, onerror, follow_symlinks): __all__.append("fwalk") + class _WalkAction: + YIELD = object() + CLOSE = object() + WALK = object() + + def fscantree(top, top_down=True, on_error=None, *, follow_symlinks=False, dir_fd=None): + sys.audit("os.fscantree", top, top_down, on_error, follow_symlinks, dir_fd) + top = fspath(top) + isbytes = isinstance(top, bytes) + # Note: To guard against symlink races, we use the standard + # lstat()/open()/fstat() trick. + if not follow_symlinks: + orig_st = stat(top, follow_symlinks=False, dir_fd=dir_fd) + topfd = open(top, O_RDONLY, dir_fd=dir_fd) + stack = [(_WalkAction.CLOSE, topfd)] + try: + if (follow_symlinks or (st.S_ISDIR(orig_st.st_mode) and + path.samestat(orig_st, stat(topfd)))): + stack.append((_WalkAction.WALK, (topfd, top))) + + while stack: + action, value = stack.pop() + if action is _WalkAction.YIELD: + yield value + continue + elif action is _WalkAction.CLOSE: + close(value) + continue + elif action is _WalkAction.WALK: + topfd, top = value + else: + raise AssertionError(f"invalid walk action: {action!r}") + + scandir_it = scandir(topfd) + dirs = [] + nondirs = [] + entries = None if top_down or follow_symlinks else [] + for entry in scandir_it: + try: + if entry.is_dir(follow_symlinks=follow_symlinks): + dirs.append(entry) + if entries is not None: + entries.append(entry) + else: + nondirs.append(entry) + except OSError: + try: + # Add dangling symlinks, ignore disappeared files + if entry.is_symlink(): + nondirs.append(entry) + except OSError: + pass + + if top_down: + # Yield top immediately, before walking subdirs + yield top, dirs, nondirs, topfd + else: + # Yield top after walking subdirs + stack.append( + (_WalkAction.YIELD, (top, dirs, nondirs, topfd))) + + for name in (reversed(dirs) if entries is None + else zip(reversed(dirs), reversed(entries))): + try: + if not follow_symlinks: + if top_down: + orig_st = stat(name, dir_fd=topfd, + follow_symlinks=False) + else: + assert entries is not None + name, entry = name + orig_st = entry.stat(follow_symlinks=False) + dirfd = open(name, O_RDONLY, dir_fd=topfd) + except OSError as err: + if on_error is not None: + on_error(err) + continue + # Close dirfd right after all subdirs have been traversed. + # Note that we use a stack, so actions appended first are + # executed last. + stack.append((_WalkAction.CLOSE, dirfd)) + # Walk all subdirs + if follow_symlinks or path.samestat(orig_st, stat(dirfd)): + dirpath = path.join(top, name) + stack.append((_WalkAction.WALK, (dirfd, dirpath))) + finally: + for action, value in reversed(stack): + if action is _WalkAction.CLOSE: + try: + close(value) + except OSError: + pass + + __all__.append("fscantree") + def execl(file, *args): """execl(file, *args)