Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ls shell-like wildcards #302

Closed
wants to merge 15 commits into from
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]


### Added
* Commands `ls` now supports shell-like patterns for wildcard matching beside of glob-like patterns.

## [1.23.0] - 2023-08-10

### Added
Expand Down
55 changes: 17 additions & 38 deletions b2sdk/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
######################################################################
from __future__ import annotations

import fnmatch
import logging
import pathlib
from contextlib import suppress

from b2sdk.session import B2Session

from .encryption.setting import EncryptionSetting, EncryptionSettingFactory
from .encryption.types import EncryptionMode
from .exception import (
Expand Down Expand Up @@ -49,6 +49,7 @@
limit_trace_arguments,
validate_b2_file_name,
)
from .utils.wildcards import WildcardStyle, get_solid_prefix, get_wildcard_matcher

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -330,6 +331,7 @@ def ls(
recursive: bool = False,
fetch_count: int | None = 10000,
with_wildcard: bool = False,
wildcard_style: WildcardStyle = WildcardStyle.GLOB,
):
"""
Pretend that folders exist and yields the information about the files in a folder.
Expand All @@ -348,9 +350,10 @@ def ls(
when ``True``, just returns info about the most recent versions
:param recursive: if ``True``, list folders recursively
:param fetch_count: how many entries to return or ``None`` to use the default. Acceptable values: 1 - 10000
:param with_wildcard: Accepts "*", "?", "[]" and "[!]" in folder_to_list, similarly to what shell does.
:param with_wildcard: Accepts "*", "**", "?", "[]", "[!]", and "{}" in `folder_to_list`, similarly to shell.
As of 1.19.0 it can only be enabled when recursive is also enabled.
Also, in this mode, folder_to_list is considered to be a filename or a pattern.
:param wildcard_style: Style of wildcard to use. Default is WildcardStyle.GLOB ("glob")
:rtype: generator[tuple[b2sdk.v2.FileVersion, str]]
:returns: generator of (file_version, folder_name) tuples

Expand All @@ -364,39 +367,14 @@ def ls(
# Every file returned must have a name that starts with the
# folder name and a "/".
prefix = folder_to_list
# In case of wildcards, we don't assume that this is folder that we're searching through.
# It could be an exact file, e.g. 'a/b.txt' that we're trying to locate.
if prefix != '' and not prefix.endswith('/') and not with_wildcard:
prefix += '/'

# If we're running with wildcard-matching, we could get
# a different prefix from it. We search for the first
# occurrence of the special characters and fetch
# parent path from that place.
# Examples:
# 'b/c/*.txt' –> 'b/c/'
# '*.txt' –> ''
# 'a/*/result.[ct]sv' –> 'a/'
if with_wildcard:
for wildcard_character in '*?[':
try:
starter_index = folder_to_list.index(wildcard_character)
except ValueError:
continue

# +1 to include the starter character. Using posix path to
# ensure consistent behaviour on Windows (e.g. case sensitivity).
path = pathlib.PurePosixPath(folder_to_list[:starter_index + 1])
parent_path = str(path.parent)
# Path considers dot to be the empty path.
# There's no shorter path than that.
if parent_path == '.':
prefix = ''
break
# We could receive paths in different stage, e.g. 'a/*/result.[ct]sv' has two
# possible parent paths: 'a/' and 'a/*/', with the first one being the correct one
if len(parent_path) < len(prefix):
prefix = parent_path
prefix = get_solid_prefix(prefix, folder_to_list, wildcard_style)
wildcard_matcher = get_wildcard_matcher(folder_to_list, wildcard_style)
elif prefix != '' and not prefix.endswith('/'):
# we don't assume that this is folder that we're searching through.
# It could be an exact file, e.g. 'a/b.txt' that we're trying to locate.
prefix += '/'

# Loop until all files in the named directory have been listed.
# The starting point of the first list_file_names request is the
Expand All @@ -409,24 +387,25 @@ def ls(
current_dir = None
start_file_name = prefix
start_file_id = None
session = self.api.session
session: B2Session = self.api.session
while True:
if latest_only:
response = session.list_file_names(self.id_, start_file_name, fetch_count, prefix)
else:
response = session.list_file_versions(
self.id_, start_file_name, start_file_id, fetch_count, prefix
)

for entry in response['files']:
file_version = self.api.file_version_factory.from_api_response(entry)
if not file_version.file_name.startswith(prefix):
# We're past the files we care about
return
if with_wildcard and not fnmatch.fnmatchcase(
file_version.file_name, folder_to_list
):

if with_wildcard and not wildcard_matcher(file_version.file_name):
# File doesn't match our wildcard rules
continue

after_prefix = file_version.file_name[len(prefix):]
# In case of wildcards, we don't care about folders at all, and it's recursive by default.
if '/' not in after_prefix or recursive:
Expand Down
122 changes: 122 additions & 0 deletions b2sdk/utils/wildcards.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
######################################################################
#
# File: b2sdk/utils/wildcards.py
#
# Copyright 2023 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations

import fnmatch
import logging
import pathlib
from enum import Enum
from functools import partial
from typing import Callable

from wcmatch import glob as wcglob

logger = logging.getLogger(__name__)


class WildcardStyle(str, Enum):
GLOB = 'glob' # supports *, ? [], [!], no escaping
SHELL = 'shell' # supports *, **, ?, [], [!], {}, with escaping


def _find_unescaped_char(
folder_to_list: str, wildcard_character: str, offset: int = 0
) -> int | None:
"""Find the first occurrence of a character in a string, ignoring escaped characters.

:raises ValueError: no unescaped character is found
"""
max_index = len(folder_to_list)
while offset < max_index:
starter_index = folder_to_list.index(wildcard_character, offset)
if starter_index is None:
return None
elif starter_index > 0 and folder_to_list[starter_index - 1] == '\\':
# the character is escaped, ignore it
offset = starter_index + 1
continue
return starter_index
raise ValueError("no unescaped character found")


def get_solid_prefix(
current_prefix: str, folder_to_list: str, wildcard_style: WildcardStyle
) -> str:
"""Find the longest prefix of the folder that does not contain any wildcard characters.

>>> get_solid_prefix('b/c/*.txt', WildcardStyle.SHELL)
'b/c/'
>>> get_solid_prefix('*.txt', WildcardStyle.SHELL)
''
>>> get_solid_prefix('a', WildcardStyle.SHELL)
'a/'
"""
MATCHERS = {
# wildcard style: (wildcard match checker, allowed wildcard chars)
WildcardStyle.SHELL.value:
(
_find_unescaped_char,
('*', '?', '[', '{'), # ** is matched via *
),
WildcardStyle.GLOB.value: (
lambda folder, char: folder.index(char),
('*', '?', '['),
),
}

try:
finder, charset = MATCHERS[wildcard_style]
except KeyError:
raise ValueError(f'Unknown wildcard style: {wildcard_style!r}')

solid_length = len(folder_to_list)
for wildcard_character in charset:
try:
char_index = finder(folder_to_list, wildcard_character)
except ValueError:
logger.debug('no unescaped character found')
continue
else:
solid_length = min(char_index, solid_length)

# +1 to include the starter character. Using posix path to
# ensure consistent behaviour on Windows (e.g. case sensitivity).
path = pathlib.PurePosixPath(folder_to_list[:solid_length + 1])
parent_path = str(path.parent)

# Path considers dot to be the empty path.
# There's no shorter path than that.
if parent_path == '.':
return ''

# We could receive paths in different stage, e.g. 'a/*/result.[ct]sv' has two
# possible parent paths: 'a/' and 'a/*/', with the first one being the correct one
return min(parent_path, current_prefix, key=len)


def get_wildcard_matcher(match_pattern: str,
wildcard_style: WildcardStyle) -> Callable[[str], bool]:
"""Return a wildcard matcher for chosen style and pattern."""
if wildcard_style == WildcardStyle.SHELL:
wc_flags = (
wcglob.CASE # case sensitive
| wcglob.BRACE # support {} for multiple options
| wcglob.GLOBSTAR # support ** for recursive matching
| wcglob.NEGATE # support [!] for negation
)
wildcard_matcher = partial(
lambda file_name: wcglob.globmatch(file_name, match_pattern, flags=wc_flags)
)
elif wildcard_style == WildcardStyle.GLOB:
wildcard_matcher = partial(lambda file_name: fnmatch.fnmatchcase(file_name, match_pattern))
else:
raise ValueError(f"Unknown wildcard style: {wildcard_style}")

return wildcard_matcher
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ logfury>=1.0.1,<2.0.0
requests>=2.9.1,<3.0.0
tqdm>=4.5.0,<5.0.0
typing-extensions>=4.7.1; python_version < '3.12'
wcmatch>=8.4.1,<9.0.0
2 changes: 0 additions & 2 deletions test/integration/fixtures/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
######################################################################
from __future__ import annotations

import os

import pytest
from .. import get_b2_auth_data

Expand Down
Loading