diff --git a/README.md b/README.md index c85fa6b3..3edd0148 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,7 @@ try: from gssapi.raw import inquire_sec_context_by_oid print("python-gssapi extension is available") except ImportError as exc: - print("python-gssapi extension is not available: %s" % str(exc)) + print(f"python-gssapi extension is not available: {exc}") ``` If it isn't available, then either a newer version of the system's gssapi implementation needs to be setup and diff --git a/build_helpers/check-smb.py b/build_helpers/check-smb.py index 9d1db9d5..17e52a71 100755 --- a/build_helpers/check-smb.py +++ b/build_helpers/check-smb.py @@ -1,13 +1,17 @@ +import logging import os import time import uuid from smbprotocol.connection import Connection +log = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + def test_connection(server, port): conn = Connection(uuid.uuid4(), server, port=port) - print("Opening connection to %s:%d" % (server, port)) + log.info("Opening connection to %s:%d", server, port) conn.connect(timeout=5) conn.disconnect(True) @@ -15,22 +19,22 @@ def test_connection(server, port): if __name__ == "__main__": server = os.environ.get("SMB_SERVER", "127.0.0.1") port = int(os.environ.get("SMB_PORT", 445)) - print("Waiting for SMB server to be online") + log.info("Waiting for SMB server to be online") attempt = 1 total_attempts = 20 while attempt < total_attempts: - print("Starting attempt %d" % attempt) + log.info("Starting attempt %d", attempt) try: test_connection(server, port) break except Exception as e: - print("Connection attempt %d failed: %s" % (attempt, str(e))) + log.info("Connection attempt %d failed: %s", attempt, e) attempt += 1 if attempt == total_attempts: - raise Exception("Timeout while waiting for SMB server to come online") + raise Exception("Timeout while waiting for SMB server to come online") from e - print("Sleeping for 5 seconds before next attempt") + log.info("Sleeping for 5 seconds before next attempt") time.sleep(5) - print("Connection successful") + log.info("Connection successful") diff --git a/examples/high-level/directory-management.py b/examples/high-level/directory-management.py index 3eef9396..e63f234e 100644 --- a/examples/high-level/directory-management.py +++ b/examples/high-level/directory-management.py @@ -1,6 +1,11 @@ +import logging + from smbclient import listdir, mkdir, register_session, rmdir, scandir from smbclient.path import isdir +log = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + # Optional - register the server with explicit credentials register_session("server", username="admin", password="pass") @@ -12,18 +17,18 @@ # Checking whether a file is a directory d_filename = r"\\server\share\directory" -print("Is file {} dir?: {}".format(d_filename, isdir(d_filename))) +log.info("Is file %s dir?: %b", d_filename, isdir(d_filename)) # List the files/directories inside a dir for filename in listdir(r"\\server\share\directory"): - print(filename) + log.info(filename) # Use scandir as a more efficient directory listing as it already contains info like stat and attributes. for file_info in scandir(r"\\server\share\directory"): file_inode = file_info.inode() if file_info.is_file(): - print("File: %s %d" % (file_info.name, file_inode)) + log.info("File: %s %d", file_info.name, file_inode) elif file_info.is_dir(): - print("Dir: %s %d" % (file_info.name, file_inode)) + log.info("Dir: %s %d", file_info.name, file_inode) else: - print("Symlink: %s %d" % (file_info.name, file_inode)) + log.info("Symlink: %s %d", file_info.name, file_inode) diff --git a/examples/low-level/directory-management.py b/examples/low-level/directory-management.py index 18ac9cf0..74a2b2d1 100644 --- a/examples/low-level/directory-management.py +++ b/examples/low-level/directory-management.py @@ -1,3 +1,4 @@ +import logging import uuid from smbprotocol.connection import Connection @@ -15,11 +16,14 @@ from smbprotocol.session import Session from smbprotocol.tree import TreeConnect +log = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + server = "127.0.0.1" port = 445 username = "smbuser" password = "smbpassword" -share = r"\\%s\share" % server +share = rf"\\{server}\share" dir_name = "directory" connection = Connection(uuid.uuid4(), server, port) @@ -55,7 +59,7 @@ ) compound_messages = [ - directory_file.write("Hello World".encode("utf-8"), 0, send=False), + directory_file.write(b"Hello World", 0, send=False), dir_open.query_directory("*", FileInformationClass.FILE_NAMES_INFORMATION, send=False), directory_file.close(False, send=False), dir_open.close(False, send=False), @@ -70,7 +74,7 @@ for dir_file in responses[1]: dir_files.append(dir_file["file_name"].get_value().decode("utf-16-le")) - print("Directory '%s\\%s' contains the files: '%s'" % (share, dir_name, "', '".join(dir_files))) + log.info("Directory '%s\\%s' contains the files: %s", share, dir_name, ", ".join(repr(file) for file in dir_files)) # delete a directory (note the dir needs to be empty to delete on close) dir_open = Open(tree, dir_name) diff --git a/examples/low-level/file-management.py b/examples/low-level/file-management.py index a2e39032..beb5ec8e 100644 --- a/examples/low-level/file-management.py +++ b/examples/low-level/file-management.py @@ -1,3 +1,4 @@ +import logging import uuid from smbprotocol.connection import Connection @@ -27,11 +28,14 @@ from smbprotocol.structure import FlagField from smbprotocol.tree import TreeConnect +log = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + server = "127.0.0.1" port = 445 username = "smbuser" password = "smbpassword" -share = r"\\%s\share" % server +share = rf"\\{server}\share" file_name = "file-test.txt" connection = Connection(uuid.uuid4(), server, port) @@ -83,7 +87,7 @@ # flag field, set the value and get the human readble string max_access = FlagField(size=4, flag_type=FilePipePrinterAccessMask, flag_strict=False) max_access.set_value(open_info[0]["maximal_access"].get_value()) - print("Maximum access mask for file %s\\%s: %s" % (share, file_name, str(max_access))) + log.info("Maximum access mask for file %s\\%s: %s", share, file_name, max_access) # write to a file text = "Hello World, what a nice day to play with SMB" @@ -91,7 +95,7 @@ # read from a file file_text = file_open.read(0, 1024) - print("Text of file %s\\%s: %s" % (share, file_name, file_text.decode("utf-8"))) + log.info("Text of file %s\\%s: %s", share, file_name, file_text.decode("utf-8")) file_open.close(False) # read and delete a file in a single SMB packet instead of 3 @@ -116,6 +120,6 @@ for i, request in enumerate(requests): response = delete_msgs[i][1](request) responses.append(response) - print("Text of file when reading/deleting in 1 request: %s" % responses[1].decode("utf-8")) + log.info("Text of file when reading/deleting in 1 request: %s", responses[1].decode("utf-8")) finally: connection.disconnect(True) diff --git a/setup.py b/setup.py index 1939b8e2..dc8676d5 100644 --- a/setup.py +++ b/setup.py @@ -1,5 +1,4 @@ #!/usr/bin/env python -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/src/smbclient/__init__.py b/src/smbclient/__init__.py index c7b3f553..a2b71370 100644 --- a/src/smbclient/__init__.py +++ b/src/smbclient/__init__.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/src/smbclient/_io.py b/src/smbclient/_io.py index b2032ae9..f836c9d5 100644 --- a/src/smbclient/_io.py +++ b/src/smbclient/_io.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) @@ -52,9 +51,8 @@ def _parse_share_access(raw, mode): for c in raw: access_val = share_access_map.get(c) if access_val is None: - raise ValueError( - "Invalid share_access char %s, can only be %s" % (c, ", ".join(sorted(share_access_map.keys()))) - ) + chars = ", ".join(sorted(share_access_map.keys())) + raise ValueError(f"Invalid share_access char {c}, can only be {chars}") share_access |= access_val if "r" in mode: @@ -82,12 +80,15 @@ def _parse_mode(raw, invalid=""): for c in raw: dispo_val = disposition_map.get(c) if dispo_val is None: - raise ValueError("Invalid mode char %s, can only be %s" % (c, ", ".join(sorted(disposition_map.keys())))) + chars = ", ".join(sorted(disposition_map.keys())) + raise ValueError( + f"Invalid mode char {c}, can only be {chars}", + ) create_disposition |= dispo_val if create_disposition == 0: - raise ValueError("Invalid mode value %s, must contain at least r, w, x, or a" % raw) + raise ValueError(f"Invalid mode value {raw}, must contain at least r, w, x, or a") return create_disposition @@ -102,7 +103,7 @@ def _chunk_size(connection, length, operation): :param operation: The operation the chunk is for: 'read', 'write', 'transact' :return: The size of the chunk we can use and the number of credits to request for the next operation. """ - max_size = getattr(connection, "max_%s_size" % operation) + max_size = getattr(connection, f"max_{operation}_size") # Determine the maximum data length we can send for the operation. We do this by checking the available credits and # calculating whatever is the smallest; length, negotiated operation size, available credit size). @@ -141,7 +142,7 @@ def _resolve_dfs(raw_io): if not info: raise ObjectPathNotFound() - connection_kwargs = getattr(raw_io, "_%s__kwargs" % SMBRawIO.__name__, {}) + connection_kwargs = getattr(raw_io, f"_{SMBRawIO.__name__}__kwargs", {}) for target in info: new_path = raw_path.replace(info.dfs_path, target.target_path, 1) @@ -150,7 +151,7 @@ def _resolve_dfs(raw_io): tree, fd_path = get_smb_tree(new_path, **connection_kwargs) except SMBResponseException as link_exc: - log.warning("Failed to connect to DFS link target %s: %s", str(target), link_exc) + log.warning("Failed to connect to DFS link target %s", target, exc_info=link_exc) continue # Record the target that worked for future reference. @@ -612,8 +613,7 @@ def query_directory(self, pattern, info_class): break query_flags = 0 # Only the first request should have set SMB2_RESTART_SCANS - for entry in entries: - yield entry + yield from entries def readable(self): return False diff --git a/src/smbclient/_os.py b/src/smbclient/_os.py index 63d17741..859fa161 100644 --- a/src/smbclient/_os.py +++ b/src/smbclient/_os.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) @@ -102,7 +101,7 @@ ) -def is_remote_path(path): # type: (str) -> bool +def is_remote_path(path: str) -> bool: """ Returns True iff the given path is a remote SMB path (rather than a local path). @@ -182,9 +181,8 @@ def copyfile(src, dst, **kwargs): copychunk_response = SMB2SrvCopyChunkResponse() copychunk_response.unpack(result) if copychunk_response["chunks_written"].get_value() != len(batch): - raise IOError( - "Failed to copy all the chunks in a server side copyfile: '%s' -> '%s'" - % (norm_src, norm_dst) + raise OSError( + f"Failed to copy all the chunks in a server side copyfile: '{norm_src}' -> '{norm_dst}'" ) @@ -433,7 +431,7 @@ def readlink(path, **kwargs): reparse_buffer = _get_reparse_point(norm_path, **kwargs) reparse_tag = reparse_buffer["reparse_tag"] if reparse_tag.get_value() != ReparseTags.IO_REPARSE_TAG_SYMLINK: - raise ValueError("Cannot read link of reparse point with tag %s at '%s'" % (str(reparse_tag), norm_path)) + raise ValueError(f"Cannot read link of reparse point with tag {reparse_tag} at '{norm_path}'") symlink_buffer = SymbolicLinkReparseDataBuffer() symlink_buffer.unpack(reparse_buffer["data_buffer"].get_value()) @@ -549,7 +547,7 @@ def scandir(path, search_pattern="*", **kwargs): continue dir_entry = SMBDirEntry( - SMBRawIO("%s\\%s" % (path, filename), **kwargs), dir_info, connection_cache=connection_cache + SMBRawIO(rf"{path}\{filename}", **kwargs), dir_info, connection_cache=connection_cache ) yield dir_entry @@ -696,12 +694,12 @@ def symlink(src, dst, target_is_directory=False, **kwargs): norm_src = ntpath.abspath(ntpath.join(dst_dir, norm_src)) else: flags = SymbolicLinkFlags.SYMLINK_FLAG_ABSOLUTE - substitute_name = "\\??\\UNC\\%s" % norm_src[2:] + substitute_name = "\\??\\UNC\\" + norm_src[2:] src_drive = ntpath.splitdrive(norm_src)[0] dst_drive = ntpath.splitdrive(norm_dst)[0] if src_drive.lower() != dst_drive.lower(): - raise ValueError("Resolved link src root '%s' must be the same as the dst root '%s'" % (src_drive, dst_drive)) + raise ValueError(f"Resolved link src root '{src_drive}' must be the same as the dst root '{dst_drive}'") try: src_stat = stat(norm_src, **kwargs) @@ -901,13 +899,11 @@ def walk(top, topdown=True, onerror=None, follow_symlinks=False, **kwargs): if not follow_symlinks and py_stat.S_ISLNK(lstat(dirpath, **kwargs).st_mode): continue - for dir_top, dir_dirs, dir_files in walk(dirpath, **walk_kwargs): - yield dir_top, dir_dirs, dir_files + yield from walk(dirpath, **walk_kwargs) else: # On a bottom up approach we yield the sub directories before the top path. for dirpath in bottom_up_dirs: - for dir_top, dir_dirs, dir_files in walk(dirpath, **walk_kwargs): - yield dir_top, dir_dirs, dir_files + yield from walk(dirpath, **walk_kwargs) yield top, dirs, files @@ -1095,7 +1091,7 @@ def _get_reparse_point(path, **kwargs): def _rename_information(src, dst, replace_if_exists=False, **kwargs): verb = "replace" if replace_if_exists else "rename" if not is_remote_path(ntpath.normpath(dst)): - raise ValueError("dst must be an absolute path to where the file or directory should be %sd." % verb) + raise ValueError(f"dst must be an absolute path to where the file or directory should be {verb}d.") raw_args = dict(kwargs) raw_args.update( @@ -1126,7 +1122,7 @@ def _rename_information(src, dst, replace_if_exists=False, **kwargs): dst_share = ntpath.normpath(dst_raw.fd.tree_connect.share_name).split("\\")[-1] if src_guid != dst_guid or src_share.lower() != dst_share.lower(): - raise ValueError("Cannot %s a file to a different root than the src." % verb) + raise ValueError(f"Cannot {verb} a file to a different root than the src.") dst_tree = dst_raw.fd.tree_connect dst_path = dst_raw.fd.file_name @@ -1181,7 +1177,7 @@ def __init__(self, raw, dir_info, connection_cache=None): self._connection_cache = connection_cache def __str__(self): - return "<{0}: {1!r}>".format(self.__class__.__name__, self.name) + return f"<{self.__class__.__name__}: {self.name!r}>" @property def name(self): diff --git a/src/smbclient/_pool.py b/src/smbclient/_pool.py index 14e80395..c5c8cf1f 100644 --- a/src/smbclient/_pool.py +++ b/src/smbclient/_pool.py @@ -1,13 +1,12 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) +from __future__ import annotations + import atexit import logging import ntpath -import typing import uuid -import warnings from smbprotocol._text import to_text from smbprotocol.connection import Capabilities, Connection @@ -59,7 +58,7 @@ def __call__(cls, *args, **kwargs): return config -class ClientConfig(object, metaclass=_ConfigSingleton): +class ClientConfig(metaclass=_ConfigSingleton): """SMB Client global settings This class defines global settings for the client that affects all connections that the client makes. When setting @@ -99,9 +98,9 @@ def __init__( self.skip_dfs = skip_dfs self.auth_protocol = auth_protocol self.require_secure_negotiate = require_secure_negotiate - self._domain_controller = domain_controller # type: Optional[str] - self._domain_cache = [] # type: List[DomainEntry] - self._referral_cache = [] # type: List[ReferralEntry] + self._domain_controller: str | None = domain_controller + self._domain_cache: list[DomainEntry] = [] + self._referral_cache: list[ReferralEntry] = [] @property def domain_controller(self): @@ -119,13 +118,14 @@ def domain_controller(self, value): if not value or self.skip_dfs: return - ipc_tree = get_smb_tree("\\\\%s\\IPC$" % value)[0] + ipc_tree = get_smb_tree(rf"\\{value}\IPC$")[0] try: domain_referral_response = dfs_request(ipc_tree, "") except InvalidParameter: log.warning( "Specified domain controller %s return STATUS_INVALID_PARAMETER, cannot use as DFS domain " - "cache source" % value + "cache source", + value, ) return @@ -139,13 +139,13 @@ def cache_referral(self, referral): if referral["number_of_referrals"].get_value() > 0: self._referral_cache.append(ReferralEntry(referral)) - def lookup_domain(self, domain_name): # type: (str) -> Optional[DomainEntry] + def lookup_domain(self, domain_name: str) -> DomainEntry | None: # TODO: Check domain referral expiry and resend request if expired for domain in self._domain_cache: if domain.domain_name.lower() == ("\\" + domain_name.lower()): return domain - def lookup_referral(self, path_components: typing.List[str]) -> typing.Optional[ReferralEntry]: + def lookup_referral(self, path_components: list[str]) -> ReferralEntry | None: """Checks if the path exists in the DFS referral cache.""" # A lookup in ReferralCache involves searching for an entry with DFSPathPrefix that is a complete prefix of the # path being looked up. @@ -169,7 +169,7 @@ def set(self, **config): domain_controller = False for key, value in config.items(): if key.startswith("_"): - raise ValueError("Cannot set private attribute %s" % key) + raise ValueError(f"Cannot set private attribute {key}") elif key == "domain_controller": # This must be set last in case we are setting any username/password used for a domain referral lookup. @@ -186,7 +186,7 @@ def _clear_expired_cache(self) -> None: self._referral_cache = [refferal for refferal in self._referral_cache if not refferal.is_expired] -def dfs_request(tree, path): # type: (TreeConnect, str) -> DFSReferralResponse +def dfs_request(tree: TreeConnect, path: str) -> DFSReferralResponse: """Send a DFS Referral request to the IPC tree and return the referrals.""" dfs_referral = DFSReferralRequest() dfs_referral["request_file_name"] = to_text(path) @@ -219,7 +219,7 @@ def delete_session(server, port=445, connection_cache=None): :param port: The port used for the server. :param connection_cache: Connection cache to be used with """ - connection_key = "%s:%s" % (server.lower(), port) + connection_key = f"{server.lower()}:{port}" if connection_cache is None: connection_cache = _SMB_CONNECTIONS @@ -284,14 +284,14 @@ def get_smb_tree( if domain_referral and not domain_referral.is_valid: # If the path is a valid domain name but our domain referral is not currently valid, issue a DC referral # to our known domain controller. - ipc_tree = get_smb_tree("\\\\%s\\IPC$" % client_config.domain_controller, **get_kwargs)[0] + ipc_tree = get_smb_tree(rf"\\{client_config.domain_controller}\IPC$", **get_kwargs)[0] referral_response = dfs_request(ipc_tree, domain_referral.domain_name) domain_referral.process_dc_referral(referral_response) if domain_referral: # Use the dc hint as the source for the root referral request - ipc_tree = get_smb_tree("\\%s\\IPC$" % domain_referral.dc_hint, **get_kwargs)[0] - referral_response = dfs_request(ipc_tree, "\\%s\\%s" % (path_split[0], path_split[1])) + ipc_tree = get_smb_tree(rf"\{domain_referral.dc_hint}\IPC$", **get_kwargs)[0] + referral_response = dfs_request(ipc_tree, rf"\{path_split[0]}\{path_split[1]}") client_config.cache_referral(referral_response) referral = client_config.lookup_referral(path_split) if not referral: @@ -312,7 +312,7 @@ def get_smb_tree( auth_protocol=auth_protocol, ) - share_path = "\\\\%s\\%s" % (server, path_split[1]) + share_path = rf"\\{server}\{path_split[1]}" tree = next((t for t in session.tree_connect_table.values() if t.share_name == share_path), None) if not tree: tree = TreeConnect(session, share_path) @@ -324,14 +324,14 @@ def get_smb_tree( if not session.connection.server_capabilities.has_flag(Capabilities.SMB2_GLOBAL_CAP_DFS): raise - ipc_path = "\\\\%s\\IPC$" % server + ipc_path = rf"\\{server}\IPC$" if path == ipc_path: # In case we already tried connecting to IPC$ but that failed. raise # The share could be a DFS root, issue a root referral request to the hostname and cache the result. ipc_tree = get_smb_tree(ipc_path, **get_kwargs)[0] try: - referral = dfs_request(ipc_tree, "\\%s\\%s" % (path_split[0], path_split[1])) + referral = dfs_request(ipc_tree, rf"\{path_split[0]}\{path_split[1]}") except FSDriverRequired: # If the DFS Request fails with STATUS_FS_DRIVER_REQUIRED then # the server doesn't support DFS requests and the original @@ -396,7 +396,7 @@ def register_session( :param require_signing: Whether signing is required on SMB messages sent over this session. Defaults to True. :return: The Session that was registered or already existed in the pool. """ - connection_key = "%s:%s" % (server.lower(), port) + connection_key = f"{server.lower()}:{port}" if connection_cache is None: connection_cache = _SMB_CONNECTIONS @@ -448,7 +448,7 @@ def reset_connection_cache(fail_on_error=True, connection_cache=None): if fail_on_error: raise else: - warnings.warn("Failed to close connection %s: %s" % (name, str(e))) + log.warning("Failed to close connection %s", name, exc_info=e) atexit.register(reset_connection_cache, fail_on_error=False) diff --git a/src/smbclient/path.py b/src/smbclient/path.py index b6027edd..c3b4d44a 100644 --- a/src/smbclient/path.py +++ b/src/smbclient/path.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/src/smbclient/shutil.py b/src/smbclient/shutil.py index c9d7aefe..30146bc7 100644 --- a/src/smbclient/shutil.py +++ b/src/smbclient/shutil.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) and other contributors # MIT License (see LICENSE or https://opensource.org/licenses/MIT) @@ -173,7 +172,7 @@ def copyfile(src, dst, follow_symlinks=True, **kwargs): raise if is_same: - raise shutil.Error("'%s' and '%s' are the same file, cannot copy" % (src, dst)) + raise shutil.Error(f"'{src}' and '{dst}' are the same file, cannot copy") smbclient_copyfile(src, dst, **kwargs) return dst diff --git a/src/smbprotocol/__init__.py b/src/smbprotocol/__init__.py index 237787e0..7c2c6329 100644 --- a/src/smbprotocol/__init__.py +++ b/src/smbprotocol/__init__.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/src/smbprotocol/_text.py b/src/smbprotocol/_text.py index 71eb8abd..683386ae 100644 --- a/src/smbprotocol/_text.py +++ b/src/smbprotocol/_text.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/src/smbprotocol/change_notify.py b/src/smbprotocol/change_notify.py index c7ecb31a..1565b473 100644 --- a/src/smbprotocol/change_notify.py +++ b/src/smbprotocol/change_notify.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) @@ -280,8 +279,10 @@ def start(self, completion_filter, flags=0, output_buffer_length=65536, send=Tru change_notify["completion_filter"] = completion_filter log.info( - "Session: %s, Tree Connect: %s , Open: %s - sending SMB2 Change Notify request" - % (self.open.tree_connect.session.username, self.open.tree_connect.share_name, self.open.file_name) + "Session: %s, Tree Connect: %s , Open: %s - sending SMB2 Change Notify request", + self.open.tree_connect.session.username, + self.open.tree_connect.share_name, + self.open.file_name, ) log.debug(change_notify) if send: @@ -326,5 +327,5 @@ def _on_response(self): self._t_exc = exc finally: - log.debug("Firing response event for %s change notify" % self.open.file_name) + log.debug("Firing response event for %s change notify", self.open.file_name) self.response_event.set() diff --git a/src/smbprotocol/connection.py b/src/smbprotocol/connection.py index ce663cb6..be06a403 100644 --- a/src/smbprotocol/connection.py +++ b/src/smbprotocol/connection.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) @@ -740,8 +739,11 @@ def __init__(self, guid, server_name, port=445, require_signing=True): sent over this connection """ log.info( - "Initialising connection, guid: %s, require_signing: %s, " - "server_name: %s, port: %d" % (guid, require_signing, server_name, port) + "Initialising connection, guid: %s, require_signing: %s, server_name: %s, port: %d", + guid, + require_signing, + server_name, + port, ) self.server_name = server_name self.port = port @@ -860,7 +862,7 @@ def connect(self, dialect=None, timeout=60, preferred_encryption_algos=None, pre self.transport = Tcp(self.server_name, self.port, timeout) self.transport.connect() t_worker = threading.Thread( - target=self._process_message_thread, name="msg_worker-%s:%s" % (self.server_name, self.port) + target=self._process_message_thread, name=f"msg_worker-{self.server_name}:{self.port}" ) t_worker.daemon = True t_worker.start() @@ -878,7 +880,7 @@ def connect(self, dialect=None, timeout=60, preferred_encryption_algos=None, pre SigningAlgorithms.HMAC_SHA256, ] smb_response = self._send_smb2_negotiate(dialect, timeout, enc_algos, sign_algos) - log.info("Negotiated dialect: %s" % str(smb_response["dialect_revision"])) + log.info("Negotiated dialect: %s", smb_response["dialect_revision"]) self.dialect = smb_response["dialect_revision"].get_value() self.max_transact_size = smb_response["max_transact_size"].get_value() self.max_read_size = smb_response["max_read_size"].get_value() @@ -890,7 +892,7 @@ def connect(self, dialect=None, timeout=60, preferred_encryption_algos=None, pre SecurityMode.SMB2_NEGOTIATE_SIGNING_REQUIRED ): self.require_signing = True - log.info("Connection require signing: %s" % self.require_signing) + log.info("Connection require signing: %s", self.require_signing) capabilities = smb_response["capabilities"] self.server_capabilities = capabilities @@ -1006,9 +1008,10 @@ def receive(self, request, wait=True, timeout=None, resolve_symlinks=True): while True: iter_timeout = int(max(timeout - (time.time() - start_time), 1)) if timeout is not None else None if not request.response_event.wait(timeout=iter_timeout): + value = request.message["message_id"].get_value() raise SMBException( - "Connection timeout of %d seconds exceeded while waiting for a message id %s " - "response from the server" % (timeout, request.message["message_id"].get_value()) + f"Connection timeout of {timeout} seconds exceeded while waiting for a message id {value} " + "response from the server" ) # Use a lock on the request so that in the case of a pending response we have exclusive lock on the event @@ -1044,9 +1047,7 @@ def receive(self, request, wait=True, timeout=None, resolve_symlinks=True): ) exp = SMBResponseException(response) - reparse_buffer = next( - (e for e in exp.error_details if isinstance(e, SMB2SymbolicLinkErrorResponse)) - ) + reparse_buffer = next(e for e in exp.error_details if isinstance(e, SMB2SymbolicLinkErrorResponse)) new_path = reparse_buffer.resolve_path(original_path)[len(tree_share_name) :] new_open = Open(tree, new_path) @@ -1151,7 +1152,7 @@ def verify_signature(self, header, session_id, force=False): session = self.session_table.get(session_id, None) if session is None: - raise SMBException("Failed to find session %s for message verification" % session_id) + raise SMBException(f"Failed to find session {session_id} for message verification") expected = self._generate_signature( header.pack(), @@ -1162,9 +1163,10 @@ def verify_signature(self, header, session_id, force=False): ) actual = header["signature"].get_value() if actual != expected: + actual_signature = binascii.hexlify(actual).decode() + expected_signature = binascii.hexlify(expected).decode() raise SMBException( - "Server message signature could not be verified: %s != %s" - % (binascii.hexlify(actual).decode(), binascii.hexlify(expected).decode()) + f"Server message signature could not be verified: {actual_signature} != {expected_signature}", ) def _check_worker_running(self): @@ -1221,8 +1223,8 @@ def _send( credits_available = sequence_window_high - sequence_window_low if credit_charge > credits_available: raise SMBException( - "Request requires %d credits but only %d credits are available" - % (credit_charge, credits_available) + f"Request requires {credit_charge} credits but only {credits_available} " + "credits are available" ) current_id = message_id or sequence_window_low @@ -1471,17 +1473,15 @@ def _encrypt(self, b_data, session): return header def _decrypt(self, message): - if message["flags"].get_value() != 0x0001: - error_msg = "Expecting flag of 0x0001 but got %s in the SMB Transform Header Response" % format( - message["flags"].get_value(), "x" - ) + value = message["flags"].get_value() + if value != 0x0001: + error_msg = f"Expecting flag of 0x0001 but got {value:x} in the SMB Transform Header Response" raise SMBException(error_msg) session_id = message["session_id"].get_value() session = self.session_table.get(session_id, None) if session is None: - error_msg = "Failed to find valid session %s for message decryption" % session_id - raise SMBException(error_msg) + raise SMBException(f"Failed to find valid session {session_id} for message decryption") if self.dialect >= Dialects.SMB_3_1_1: cipher_id = self.cipher_id @@ -1537,14 +1537,14 @@ def _send_smb2_negotiate(self, dialect, timeout, encryption_algorithms, signing_ highest_dialect = sorted(negotiated_dialects)[-1] self.negotiated_dialects = neg_req["dialects"] = negotiated_dialects log.info( - "Negotiating with SMB2 protocol with highest client dialect " - "of: %s" % [dialect for dialect, v in vars(Dialects).items() if v == highest_dialect][0] + "Negotiating with SMB2 protocol with highest client dialect of: %s", + [dialect for dialect, v in vars(Dialects).items() if v == highest_dialect][0], ) neg_req["security_mode"] = self.client_security_mode if highest_dialect >= Dialects.SMB_2_1_0: - log.debug("Adding client guid %s to negotiate request" % self.client_guid) + log.debug("Adding client guid %s to negotiate request", self.client_guid) neg_req["client_guid"] = self.client_guid else: @@ -1552,7 +1552,7 @@ def _send_smb2_negotiate(self, dialect, timeout, encryption_algorithms, signing_ self.client_guid = None if highest_dialect >= Dialects.SMB_3_0_0: - log.debug("Adding client capabilities %d to negotiate request" % self.client_capabilities) + log.debug("Adding client capabilities %d to negotiate request", self.client_capabilities) neg_req["capabilities"] = self.client_capabilities else: @@ -1566,7 +1566,8 @@ def _send_smb2_negotiate(self, dialect, timeout, encryption_algorithms, signing_ int_cap["data"]["hash_algorithms"] = [HashAlgorithms.SHA_512] int_cap["data"]["salt"] = self.salt log.debug( - "Adding preauth integrity capabilities of hash SHA512 and salt %s to negotiate request" % self.salt + "Adding preauth integrity capabilities of hash SHA512 and salt %s to negotiate request", + self.salt, ) enc_cap = SMB2NegotiateContextRequest() @@ -1699,7 +1700,7 @@ def cancel(self): return message_id = self.message["message_id"].get_value() - log.info("Cancelling message %s" % message_id) + log.info("Cancelling message %s", message_id) self._connection.send( SMB2CancelRequest(), sid=self.session_id, credit_request=0, message_id=message_id, async_id=self.async_id ) diff --git a/src/smbprotocol/create_contexts.py b/src/smbprotocol/create_contexts.py index 6b0bb1a9..d6426edd 100644 --- a/src/smbprotocol/create_contexts.py +++ b/src/smbprotocol/create_contexts.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/src/smbprotocol/dfs.py b/src/smbprotocol/dfs.py index 6e46f1d1..37cccbe0 100644 --- a/src/smbprotocol/dfs.py +++ b/src/smbprotocol/dfs.py @@ -1,8 +1,11 @@ # Copyright: (c) 2020, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) +from __future__ import annotations + import time from collections import OrderedDict, namedtuple +from typing import Iterator from smbprotocol.structure import ( BytesField, @@ -41,24 +44,24 @@ class DomainEntry: domain_list (List[str]): A list of known domain controller hostnames for the domain. """ - def __init__(self, referral): # type: (DFSReferralEntryV3) -> None + def __init__(self, referral: DFSReferralEntryV3): self.domain_list = [] - self._referral = referral # type: DFSReferralEntryV3 - self._start_time = time.time() # type: float - self._domain_hint_idx = None # type: Optional[int] + self._referral = referral + self._start_time: float = time.time() + self._domain_hint_idx: int | None = None @property - def domain_name(self): # type: () -> str + def domain_name(self) -> str: """The domain DFS path.""" return self._referral.dfs_path @property - def dc_hint(self): # type: () -> str + def dc_hint(self) -> str: """The last known good domain hostname in domain_list.""" return self.domain_list[self._domain_hint_idx] @dc_hint.setter - def dc_hint(self, value): # type: (str) -> None + def dc_hint(self, value: str) -> None: for idx, target in enumerate(self.domain_list): if target == value: self._domain_hint_idx = idx @@ -68,16 +71,16 @@ def dc_hint(self, value): # type: (str) -> None raise ValueError("The specific domain hint does not exist in this domain cache entry") @property - def is_expired(self): # type: () -> bool + def is_expired(self) -> bool: """Whether the hint has expired or not.""" return ((time.time() - self._start_time) - self._referral["time_to_live"].get_value()) >= 0 @property - def is_valid(self): # type: () -> bool + def is_valid(self) -> bool: """Whether the domain entry has had a DC referral response or not.""" return self._domain_hint_idx is not None and not self.is_expired - def process_dc_referral(self, referral): # type: (DFSReferralResponse) -> None + def process_dc_referral(self, referral: DFSReferralResponse) -> None: if self._domain_hint_idx is None: self._domain_hint_idx = 0 @@ -96,54 +99,53 @@ class ReferralEntry: referral: The DFSReferralResponse to cache. """ - def __init__(self, referral): # type: (DFSReferralResponse) -> None + def __init__(self, referral: DFSReferralResponse): referrals = referral["referral_entries"].get_value() self._referral_header_flags = referral["referral_header_flags"] - self._referrals = referrals # type: List[Union[DFSReferralEntryV1, DFSReferralEntryV2, DFSReferralEntryV3]] - self._start_time = time.time() # type: float - self._target_hint_idx = 0 # type: int + self._referrals: list[DFSReferralEntryV1 | DFSReferralEntryV2 | DFSReferralEntryV3] = referrals + self._start_time: float = time.time() + self._target_hint_idx: int = 0 @property - def dfs_path(self): # type: () -> str + def dfs_path(self) -> str: return self._referrals[self._target_hint_idx].dfs_path @property - def is_root(self): # type: () -> bool + def is_root(self) -> bool: return self._referrals[self._target_hint_idx]["server_type"].has_flag(DFSServerTypes.ROOT_TARGETS) @property - def is_link(self): # type: () -> bool + def is_link(self) -> bool: return not self.is_root # @property - # def is_interlink(self): # type: () -> bool + # def is_interlink(self) -> bool: # return False @property - def is_expired(self): # type: () -> bool + def is_expired(self) -> bool: referral = self._referrals[self._target_hint_idx] return ((time.time() - self._start_time) - referral["time_to_live"].get_value()) >= 0 @property - def target_failback(self): # type: () -> bool + def target_failback(self) -> bool: return self._referral_header_flags.has_flag(DFSReferralHeaderFlags.TARGET_FAIL_BACK) @property - def target_hint(self): # type: () -> DFSTarget + def target_hint(self) -> DFSTarget: return self.target_list[self._target_hint_idx] @target_hint.setter - def target_hint(self, value): # type: (DFSTarget) -> None + def target_hint(self, value: DFSTarget) -> None: for idx, target in enumerate(self.target_list): if target == value: self._target_hint_idx = idx break - else: raise ValueError("The specific target hint does not exist in this referral entry") @property - def target_list(self): # type: () -> List[DFSTarget] + def target_list(self) -> list[DFSTarget]: return [ DFSTarget( target_path=e.network_address, @@ -152,7 +154,7 @@ def target_list(self): # type: () -> List[DFSTarget] for e in self._referrals ] - def __iter__(self): # type: () -> Iterator[DFSTarget] + def __iter__(self) -> Iterator[DFSTarget]: """Iterates through the target_list with a priority being the hinted value.""" yield self.target_list[self._target_hint_idx] @@ -387,7 +389,7 @@ def process_string_buffer(self, buffer, entry_offset): buffer_fields = ["dfs_path", "dfs_alternate_path", "network_address"] for field_name in buffer_fields: - field_offset = self["%s_offset" % field_name].get_value() + field_offset = self[f"{field_name}_offset"].get_value() if field_offset == 0: continue @@ -433,7 +435,7 @@ def process_string_buffer(self, buffer, entry_offset): buffer_fields.insert(1, "dfs_alternate_path") for field_name in buffer_fields: - field_offset = self["%s_offset" % field_name].get_value() + field_offset = self[f"{field_name}_offset"].get_value() if field_offset == 0: continue diff --git a/src/smbprotocol/exceptions.py b/src/smbprotocol/exceptions.py index a9ec482f..8508eed0 100644 --- a/src/smbprotocol/exceptions.py +++ b/src/smbprotocol/exceptions.py @@ -1,13 +1,15 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) +from __future__ import annotations + import binascii import errno import ntpath import os import socket from collections import OrderedDict +from typing import Any from smbprotocol import Dialects from smbprotocol._text import to_bytes, to_text @@ -47,7 +49,7 @@ class SMBOSError(OSError, SMBException): def __init__(self, ntstatus, filename, filename2=None): self.ntstatus = ntstatus - self.filename2 = str(filename2) if filename2 else None + self.filename2 = os.fspath(filename2) if filename2 else None ntstatus_name = "STATUS_UNKNOWN" for name, val in vars(NtStatus).items(): @@ -70,19 +72,18 @@ def __init__(self, ntstatus, filename, filename2=None): NtStatus.STATUS_NOT_A_DIRECTORY: errno.ENOTDIR, NtStatus.STATUS_DIRECTORY_NOT_EMPTY: errno.ENOTEMPTY, NtStatus.STATUS_END_OF_FILE: getattr(errno, "ENODATA", 120), # Not present on py2 for Windows. - }.get(ntstatus, (0, "Unknown NtStatus error returned '%s'" % ntstatus_name)) + }.get(ntstatus, (0, f"Unknown NtStatus error returned '{ntstatus_name}'")) if not isinstance(error_details, tuple): error_details = (error_details, os.strerror(error_details)) - super().__init__(error_details[0], error_details[1], str(filename)) + super().__init__(error_details[0], error_details[1], os.fspath(filename)) def __str__(self): - msg = "[Error {0}] [NtStatus 0x{1}] {2}: '{3}'".format( - self.errno, format(self.ntstatus, "x").zfill(8), self.strerror, self.filename - ) + status = format(self.ntstatus, "x").zfill(8) + msg = f"[Error {self.errno}] [NtStatus 0x{status}] {self.strerror}: '{self.filename}'" if self.filename2: - msg += " -> '%s'" % self.filename2 + msg += f" -> '{self.filename2}'" return msg @@ -90,10 +91,9 @@ def __str__(self): class SMBLinkRedirectionError(SMBException): @property def message(self): - msg = "Encountered symlink at '%s' that points to '%s' which cannot be redirected: %s" % ( - str(self.path), - str(self.target), - str(self.args[0]), + msg = ( + f"Encountered symlink at '{self.path}' that points to " + f"'{self.target}' which cannot be redirected: {self.args[0]}" ) return msg @@ -141,11 +141,9 @@ def message(self): required_dialect = self._get_dialect_name(self.required_dialect) negotiated_dialect = self._get_dialect_name(self.negotiated_dialect) - msg = "%s is not available on the negotiated dialect %s, requires dialect %s%s" % ( - self.feature_name, - negotiated_dialect, - required_dialect, - msg_suffix, + msg = ( + f"{self.feature_name} is not available on the negotiated dialect {negotiated_dialect}, " + f"requires dialect {required_dialect}{msg_suffix}" ) return msg @@ -169,9 +167,7 @@ def __init__(cls, name, bases, attributes): return if not hasattr(cls, "_STATUS_CODE"): - raise ValueError( - "%s.%s does not have the _STATUS_CODE class attribute set" % (cls.__module__, cls.__name__) - ) + raise ValueError(f"{cls.__module__}.{cls.__name__} does not have the _STATUS_CODE class attribute set") cls.__registry[cls._STATUS_CODE] = cls @@ -201,11 +197,11 @@ class SMBResponseException(SMBException, metaclass=_SMBErrorRegistry): _BASE_MESSAGE = "Unknown error." - def __init__(self, header): # type: (SMB2HeaderResponse) -> None + def __init__(self, header: SMB2HeaderResponse): self.header = header @property - def error_details(self): # type: () -> List[any] + def error_details(self) -> list[Any]: # list of error_details returned by the server, currently used in # the SMB 3.1.1 error response for certain situations error = SMB2ErrorResponse() @@ -235,7 +231,7 @@ def error_details(self): # type: () -> List[any] return error_details @property - def message(self): # type: () -> str + def message(self) -> str: error_details = [] for detail in self.error_details: @@ -243,7 +239,7 @@ def message(self): # type: () -> str flag = str(detail["flags"]) print_name = detail.get_print_name() sub_name = detail.get_substitute_name() - error_details.append("Flag: %s, Print Name: %s, Substitute Name: %s" % (flag, print_name, sub_name)) + error_details.append(f"Flag: {flag}, Print Name: {print_name}, Substitute Name: {sub_name}") elif isinstance(detail, SMB2ShareRedirectErrorContext): ip_addresses = [] @@ -251,19 +247,19 @@ def message(self): # type: () -> str ip_addresses.append(ip_addr.get_ipaddress()) resource_name = to_text(detail["resource_name"].get_value(), encoding="utf-16-le") - error_details.append( - "IP Addresses: '%s', Resource Name: %s" % ("', '".join(ip_addresses), resource_name) - ) + addresses = "', '".join(ip_addresses) + error_details.append(f"IP Addresses: '{addresses}', Resource Name: {resource_name}") else: # unknown error details in response, output raw bytes - error_details.append("Raw: %s" % to_text(binascii.hexlify(detail))) + error_details.append("Raw: " + to_text(binascii.hexlify(detail))) - error_msg = "%s %s: 0x%s" % (self._BASE_MESSAGE, str(self.header["status"]), format(self.status, "x").zfill(8)) + status = format(self.status, "x").zfill(8) + error_msg = f"{self._BASE_MESSAGE} {self.header['status']}: 0x{status}" if error_details: - error_msg += " - %s" % ", ".join(error_details) + error_msg += " - " + (", ".join(error_details)) - return "Received unexpected status from the server: %s" % error_msg + return f"Received unexpected status from the server: {error_msg}" @property def status(self): diff --git a/src/smbprotocol/file_info.py b/src/smbprotocol/file_info.py index 0fc4df71..b54f6090 100644 --- a/src/smbprotocol/file_info.py +++ b/src/smbprotocol/file_info.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/src/smbprotocol/header.py b/src/smbprotocol/header.py index 8142d3d8..fb3a0301 100644 --- a/src/smbprotocol/header.py +++ b/src/smbprotocol/header.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2020, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/src/smbprotocol/ioctl.py b/src/smbprotocol/ioctl.py index 662f5bf2..cf2c5876 100644 --- a/src/smbprotocol/ioctl.py +++ b/src/smbprotocol/ioctl.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/src/smbprotocol/open.py b/src/smbprotocol/open.py index 6885b489..14467d4c 100644 --- a/src/smbprotocol/open.py +++ b/src/smbprotocol/open.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) @@ -1150,8 +1149,10 @@ def create( return create, self._create_response log.info( - "Session: %s, Tree Connect: %s - sending SMB2 Create Request " - "for file %s" % (self.tree_connect.session.username, self.tree_connect.share_name, self.file_name) + "Session: %s, Tree Connect: %s - sending SMB2 Create Request for file %s", + self.tree_connect.session.username, + self.tree_connect.share_name, + self.file_name, ) log.debug(create) @@ -1160,8 +1161,9 @@ def create( def _create_response(self, request): log.info( - "Session: %s, Tree Connect: %s - receiving SMB2 Create " - "Response" % (self.tree_connect.session.username, self.tree_connect.share_name) + "Session: %s, Tree Connect: %s - receiving SMB2 Create Response", + self.tree_connect.session.username, + self.tree_connect.share_name, ) response = self.connection.receive(request) create_response = SMB2CreateResponse() @@ -1222,8 +1224,8 @@ def read(self, offset, length, min_length=0, unbuffered=False, wait=True, send=T """ if length > self.connection.max_read_size: raise SMBException( - "The requested read length %d is greater than " - "the maximum negotiated read size %d" % (length, self.connection.max_read_size) + f"The requested read length {length} is greater than " + f"the maximum negotiated read size {self.connection.max_read_size}" ) read = SMB2ReadRequest() @@ -1244,8 +1246,10 @@ def read(self, offset, length, min_length=0, unbuffered=False, wait=True, send=T return read, self._read_response log.info( - "Session: %s, Tree Connect ID: %s - sending SMB2 Read " - "Request for file %s" % (self.tree_connect.session.username, self.tree_connect.share_name, self.file_name) + "Session: %s, Tree Connect ID: %s - sending SMB2 Read Request for file %s", + self.tree_connect.session.username, + self.tree_connect.share_name, + self.file_name, ) log.debug(read) request = self.connection.send(read, self.tree_connect.session.session_id, self.tree_connect.tree_connect_id) @@ -1253,8 +1257,9 @@ def read(self, offset, length, min_length=0, unbuffered=False, wait=True, send=T def _read_response(self, request, wait=True): log.info( - "Session: %s, Tree Connect ID: %s - receiving SMB2 Read " - "Response" % (self.tree_connect.session.username, self.tree_connect.share_name) + "Session: %s, Tree Connect ID: %s - receiving SMB2 Read Response", + self.tree_connect.session.username, + self.tree_connect.share_name, ) response = self.connection.receive(request, wait=wait) read_response = SMB2ReadResponse() @@ -1288,8 +1293,8 @@ def write(self, data, offset=0, write_through=False, unbuffered=False, wait=True data_len = len(data) if data_len > self.connection.max_write_size: raise SMBException( - "The requested write length %d is greater than " - "the maximum negotiated write size %d" % (data_len, self.connection.max_write_size) + f"The requested write length {data_len} is greater than " + f"the maximum negotiated write size {self.connection.max_write_size}" ) write = SMB2WriteRequest() @@ -1316,8 +1321,10 @@ def write(self, data, offset=0, write_through=False, unbuffered=False, wait=True return write, self._write_response log.info( - "Session: %s, Tree Connect: %s - sending SMB2 Write Request " - "for file %s" % (self.tree_connect.session.username, self.tree_connect.share_name, self.file_name) + "Session: %s, Tree Connect: %s - sending SMB2 Write Request for file %s", + self.tree_connect.session.username, + self.tree_connect.share_name, + self.file_name, ) log.debug(write) request = self.connection.send(write, self.tree_connect.session.session_id, self.tree_connect.tree_connect_id) @@ -1325,8 +1332,9 @@ def write(self, data, offset=0, write_through=False, unbuffered=False, wait=True def _write_response(self, request, wait=True): log.info( - "Session: %s, Tree Connect: %s - receiving SMB2 Write " - "Response" % (self.tree_connect.session.username, self.tree_connect.share_name) + "Session: %s, Tree Connect: %s - receiving SMB2 Write Response", + self.tree_connect.session.username, + self.tree_connect.share_name, ) response = self.connection.receive(request, wait=wait) write_response = SMB2WriteResponse() @@ -1357,8 +1365,10 @@ def flush(self, send=True): return flush, self._flush_response log.info( - "Session: %s, Tree Connect: %s - sending SMB2 Flush Request " - "for file %s" % (self.tree_connect.session.username, self.tree_connect.share_name, self.file_name) + "Session: %s, Tree Connect: %s - sending SMB2 Flush Request for file %s", + self.tree_connect.session.username, + self.tree_connect.share_name, + self.file_name, ) log.debug(flush) request = self.connection.send(flush, self.tree_connect.session.session_id, self.tree_connect.tree_connect_id) @@ -1366,8 +1376,9 @@ def flush(self, send=True): def _flush_response(self, request): log.info( - "Session: %s, Tree Connect: %s - receiving SMB2 Flush " - "Response" % (self.tree_connect.session.username, self.tree_connect.share_name) + "Session: %s, Tree Connect: %s - receiving SMB2 Flush Response", + self.tree_connect.session.username, + self.tree_connect.share_name, ) response = self.connection.receive(request) flush_response = SMB2FlushResponse() @@ -1415,9 +1426,10 @@ def query_directory( return query, self._query_directory_response log.info( - "Session: %s, Tree Connect: %s - sending SMB2 Query " - "Directory Request for directory %s" - % (self.tree_connect.session.username, self.tree_connect.share_name, self.file_name) + "Session: %s, Tree Connect: %s - sending SMB2 Query Directory Request for directory %s", + self.tree_connect.session.username, + self.tree_connect.share_name, + self.file_name, ) log.debug(query) request = self.connection.send(query, self.tree_connect.session.session_id, self.tree_connect.tree_connect_id) @@ -1425,8 +1437,9 @@ def query_directory( def _query_directory_response(self, request): log.info( - "Session: %s, Tree Connect: %s - receiving SMB2 Query " - "Response" % (self.tree_connect.session.username, self.tree_connect.share_name) + "Session: %s, Tree Connect: %s - receiving SMB2 Query Response", + self.tree_connect.session.username, + self.tree_connect.share_name, ) response = self.connection.receive(request) query_response = SMB2QueryDirectoryResponse() @@ -1470,8 +1483,10 @@ def close(self, get_attributes=False, send=True): return close, self._close_response log.info( - "Session: %s, Tree Connect: %s - sending SMB2 Close Request " - "for file %s" % (self.tree_connect.session.username, self.tree_connect.share_name, self.file_name) + "Session: %s, Tree Connect: %s - sending SMB2 Close Request for file %s", + self.tree_connect.session.username, + self.tree_connect.share_name, + self.file_name, ) log.debug(close) request = self.connection.send(close, self.tree_connect.session.session_id, self.tree_connect.tree_connect_id) @@ -1479,8 +1494,9 @@ def close(self, get_attributes=False, send=True): def _close_response(self, request): log.info( - "Session: %s, Tree Connect: %s - receiving SMB2 Close " - "Response" % (self.tree_connect.session.username, self.tree_connect.share_name) + "Session: %s, Tree Connect: %s - receiving SMB2 Close Response", + self.tree_connect.session.username, + self.tree_connect.share_name, ) try: response = self.connection.receive(request) diff --git a/src/smbprotocol/query_info.py b/src/smbprotocol/query_info.py index ef56ccad..88c20227 100644 --- a/src/smbprotocol/query_info.py +++ b/src/smbprotocol/query_info.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/src/smbprotocol/reparse_point.py b/src/smbprotocol/reparse_point.py index d8ed4126..70829dad 100644 --- a/src/smbprotocol/reparse_point.py +++ b/src/smbprotocol/reparse_point.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) @@ -201,7 +200,7 @@ def resolve_link(self, path): return ntpath.abspath(link_target) def _get_name(self, prefix): - offset = self["%s_name_offset" % prefix].get_value() - length = self["%s_name_length" % prefix].get_value() + offset = self[f"{prefix}_name_offset"].get_value() + length = self[f"{prefix}_name_length"].get_value() b_name = self["buffer"].get_value()[offset : offset + length] return to_text(b_name, encoding="utf-16-le") diff --git a/src/smbprotocol/security_descriptor.py b/src/smbprotocol/security_descriptor.py index 0d7636db..bbfc7ca7 100644 --- a/src/smbprotocol/security_descriptor.py +++ b/src/smbprotocol/security_descriptor.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) @@ -158,7 +157,8 @@ def __str__(self): revision = self["revision"].get_value() id_authority = self["identifier_authority"].get_value() sub_authorities = self["sub_authorities"].get_value() - sid_string = "S-%d-%d-%s" % (revision, id_authority, "-".join(str(x) for x in sub_authorities)) + authorities_string = "-".join(str(x) for x in sub_authorities) + sid_string = f"S-{revision}-{id_authority}-{authorities_string}" return sid_string def from_string(self, sid_string): @@ -389,7 +389,7 @@ def _rebuild_buffer(self): for field, value in self._buffer.items(): if not value: continue - offset_field = "offset_%s" % field + offset_field = f"offset_{field}" field_bytes = value.pack() buffer += field_bytes self[offset_field].set_value(offset_count) diff --git a/src/smbprotocol/session.py b/src/smbprotocol/session.py index 56c36fde..c2a69753 100644 --- a/src/smbprotocol/session.py +++ b/src/smbprotocol/session.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) @@ -216,7 +215,7 @@ def __init__(self, connection, username=None, password=None, require_encryption= values are 'negotiate', 'ntlm' or 'kerberos'. Defaults to 'negotiate'. """ - log.info("Initialising session with username: %s" % username) + log.info("Initialising session with username: %s", username) self._connected = False self.session_id = 0 self.require_encryption = require_encryption @@ -272,7 +271,7 @@ def connect(self): protocol=self.auth_protocol, ) except spnego.exceptions.SpnegoError as err: - raise SMBAuthenticationError("Failed to authenticate with server: %s" % str(err.message)) + raise SMBAuthenticationError("Failed to authenticate with server") from err in_token = self.connection.gss_negotiate_token if self.auth_protocol != "negotiate": @@ -282,7 +281,7 @@ def connect(self): try: out_token = context.step(in_token) except spnego.exceptions.SpnegoError as err: - raise SMBAuthenticationError("Failed to authenticate with server: %s" % str(err.message)) + raise SMBAuthenticationError("Failed to authenticate with server") from err if not out_token: break @@ -320,7 +319,7 @@ def connect(self): if status == NtStatus.STATUS_MORE_PROCESSING_REQUIRED: log.info("More processing is required for SMB2_SESSION_SETUP") - log.info("Setting session id to %s" % self.session_id) + log.info("Setting session id to %s", self.session_id) self._connected = True # Move the session from the preauth table to the actual session table. @@ -412,13 +411,13 @@ def disconnect(self, close=True): for tree in list(self.tree_connect_table.values()): tree.disconnect() - log.info("Session: %s - Logging off of SMB Session" % self.username) + log.info("Session: %s - Logging off of SMB Session", self.username) logoff = SMB2Logoff() - log.info("Session: %s - Sending Logoff message" % self.username) + log.info("Session: %s - Sending Logoff message", self.username) log.debug(logoff) request = self.connection.send(logoff, sid=self.session_id) - log.info("Session: %s - Receiving Logoff response" % self.username) + log.info("Session: %s - Receiving Logoff response", self.username) res = self.connection.receive(request) res_logoff = SMB2Logoff() res_logoff.unpack(res["data"].get_value()) diff --git a/src/smbprotocol/structure.py b/src/smbprotocol/structure.py index d9726551..7f20d8ec 100644 --- a/src/smbprotocol/structure.py +++ b/src/smbprotocol/structure.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) @@ -71,11 +70,10 @@ def __str__(self): # the field header is slightly different for a StructureField # remove the leading space and put the value on the next line if isinstance(field, StructureField): - field_header = "%s =\n%s" + field_string = f"{field.name} =\n{field}" else: - field_header = "%s = %s" + field_string = f"{field.name} = {field}" - field_string = field_header % (field.name, str(field)) field_strings.append(_indent_lines(field_string, TAB)) field_strings.append("") @@ -87,8 +85,9 @@ def __str__(self): ) field_strings.append(hex_wrapper.fill(raw_hex)) - string = "%s:\n%s" % (to_text(struct_name), "\n".join([to_text(s) for s in field_strings])) - + struct_name = to_text(struct_name) + field_names = "\n".join([to_text(s) for s in field_strings]) + string = f"{struct_name}:\n{field_names}" return string def __setitem__(self, key, value): @@ -125,7 +124,7 @@ def unpack(self, data): def _get_field(self, key): field = self.fields.get(key, None) if field is None: - raise ValueError("Structure does not contain field %s" % key) + raise ValueError(f"Structure does not contain field {key}") return field @@ -150,7 +149,7 @@ def __init__(self, little_endian=True, default=None, size=None): self.little_endian = little_endian if not (size is None or isinstance(size, int) or isinstance(size, types.LambdaType)): - raise InvalidFieldDefinition("%s size for field must be an int or None for a variable length" % field_type) + raise InvalidFieldDefinition(f"{field_type} size for field must be an int or None for a variable length") self.size = size self.default = default self.value = None @@ -161,6 +160,10 @@ def __str__(self): def __len__(self): return self._get_packed_size() + @property + def _endian_prefix(self): + return "<" if self.little_endian else ">" + def pack(self): """ Packs the field value into a byte string so it can be sent to the @@ -172,10 +175,11 @@ def pack(self): value = self._get_calculated_value(self.value) packed_value = self._pack_value(value) size = self._get_calculated_size(self.size, packed_value) - if len(packed_value) != size: + packed_value_len = len(packed_value) + if packed_value_len != size: raise ValueError( - "Invalid packed data length for field %s of %d " - "does not fit field size of %d" % (self.name, len(packed_value), size) + f"Invalid packed data length for field {self.name} of {packed_value_len} " + f"does not fit field size of {size}" ) return packed_value @@ -310,7 +314,7 @@ def _get_struct_format(self, size, unsigned=True): struct_format = {1: "B", 2: "H", 4: "L", 8: "Q"} if size not in struct_format.keys(): - raise InvalidFieldDefinition("Cannot struct format of size %s" % size) + raise InvalidFieldDefinition(f"Cannot struct format of size {size}") format_char = struct_format[size] if not unsigned: format_char = format_char.lower() @@ -329,13 +333,12 @@ def __init__(self, size, unsigned=True, **kwargs): :param kwargs: Any other kwarg to be sent to Field() """ if size not in [1, 2, 4, 8]: - raise InvalidFieldDefinition("IntField size must have a value of 1, 2, 4, or 8 not %s" % str(size)) + raise InvalidFieldDefinition(f"IntField size must have a value of 1, 2, 4, or 8 not {size}") self.unsigned = unsigned super().__init__(size=size, **kwargs) def _pack_value(self, value): - format = self._get_struct_format(self.size, self.unsigned) - struct_string = "%s%s" % ("<" if self.little_endian else ">", format) + struct_string = self._endian_prefix + self._get_struct_format(self.size, self.unsigned) packed_int = struct.pack(struct_string, value) return packed_int @@ -345,13 +348,12 @@ def _parse_value(self, value): elif isinstance(value, types.LambdaType): int_value = value elif isinstance(value, bytes): - format = self._get_struct_format(self.size, self.unsigned) - struct_string = "%s%s" % ("<" if self.little_endian else ">", format) + struct_string = self._endian_prefix + self._get_struct_format(self.size, self.unsigned) int_value = struct.unpack(struct_string, value)[0] elif isinstance(value, int): int_value = value else: - raise TypeError("Cannot parse value for field %s of type %s to an int" % (self.name, type(value).__name__)) + raise TypeError(f"Cannot parse value for field {self.name} of type {type(value).__name__} to an int") return int_value def _get_packed_size(self): @@ -377,17 +379,14 @@ def _parse_value(self, value): elif isinstance(value, types.LambdaType): bytes_value = value elif isinstance(value, int): - format = self._get_struct_format(self.size) - struct_string = "%s%s" % ("<" if self.little_endian else ">", format) + struct_string = self._endian_prefix + self._get_struct_format(self.size) bytes_value = struct.pack(struct_string, value) elif isinstance(value, Structure): bytes_value = value.pack() elif isinstance(value, bytes): bytes_value = value else: - raise TypeError( - "Cannot parse value for field %s of type %s to a byte string" % (self.name, type(value).__name__) - ) + raise TypeError(f"Cannot parse value for field {self.name} of type {type(value).__name__} to a byte string") return bytes_value def _get_packed_size(self): @@ -487,7 +486,7 @@ def _parse_value(self, value): # manually parse each list entry to the field type specified list_value = value else: - raise TypeError("Cannot parse value for field %s of type %s to a list" % (self.name, type(value).__name__)) + raise TypeError(f"Cannot parse value for field {self.name} of type {type(value).__name__} to a list") list_value = [self._parse_sub_value(v) for v in list_value] return list_value @@ -500,12 +499,12 @@ def _parse_sub_value(self, value): structure_type=type(value), default=value, ) - new_field.name = "%s list entry" % self.name + new_field.name = f"{self.name} list entry" new_field.structure = value new_field.set_value(new_field.default) else: new_field = copy.deepcopy(self.list_type) - new_field.name = "%s list entry" % self.name + new_field.name = f"{self.name} list entry" new_field.set_value(value) return new_field @@ -522,7 +521,8 @@ def _to_string(self): if len(list_string) == 0: string = "[]" else: - string = "[\n%s\n]" % ",\n".join(list_string) + list_str = ",\n".join(list_string) + string = f"[\n{list_str}\n]" return string def _create_list_from_bytes(self, list_count, list_type, value): @@ -583,9 +583,7 @@ def _parse_value(self, value): elif isinstance(value, Structure): structure_value = value else: - raise TypeError( - "Cannot parse value for field %s of type %s to a structure" % (self.name, type(value).__name__) - ) + raise TypeError(f"Cannot parse value for field {self.name} of type {type(value).__name__} to a structure") if isinstance(structure_value, bytes) and self.structure_type and structure_value != b"": if isinstance(self.structure_type, types.LambdaType): @@ -608,7 +606,7 @@ def _to_string(self): def _get_field(self, key): structure_value = self._get_calculated_value(self.value) if isinstance(structure_value, bytes): - raise ValueError("Cannot get field %s when structure is defined as a byte string" % key) + raise ValueError(f"Cannot get field {key} when structure is defined as a byte string") field = structure_value._get_field(key) return field @@ -634,7 +632,7 @@ def __init__(self, size=None, **kwargs): :param kwargs: Any other kwarg to be sent to Field() """ if not (size is None or size == 8): - raise InvalidFieldDefinition("DateTimeField type must have a size of 8 not %d" % size) + raise InvalidFieldDefinition(f"DateTimeField type must have a size of 8 not {size}") super().__init__(size=8, **kwargs) def _pack_value(self, value): @@ -644,8 +642,7 @@ def _pack_value(self, value): epoch_time_ms = td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6 ns100 = self.EPOCH_FILETIME + (epoch_time_ms * 10) - format = self._get_struct_format(8) - struct_string = "%s%s" % ("<" if self.little_endian else ">", format) + struct_string = self._endian_prefix + self._get_struct_format(8) bytes_value = struct.pack(struct_string, ns100) return bytes_value @@ -656,8 +653,7 @@ def _parse_value(self, value): elif isinstance(value, types.LambdaType): datetime_value = value elif isinstance(value, bytes): - format = self._get_struct_format(8) - struct_string = "%s%s" % ("<" if self.little_endian else ">", format) + struct_string = self._endian_prefix + self._get_struct_format(8) int_value = struct.unpack(struct_string, value)[0] return self._parse_value(int_value) # just parse the value again elif isinstance(value, int): @@ -671,9 +667,7 @@ def _parse_value(self, value): elif isinstance(value, datetime.datetime): datetime_value = value else: - raise TypeError( - "Cannot parse value for field %s of type %s to a datetime" % (self.name, type(value).__name__) - ) + raise TypeError(f"Cannot parse value for field {self.name} of type {type(value).__name__} to a datetime") return datetime_value def _get_packed_size(self): @@ -694,7 +688,7 @@ def __init__(self, size=None, **kwargs): :param kwargs: Any other kwarg to be sent to Field() """ if not (size is None or size == 16): - raise InvalidFieldDefinition("UuidField type must have a size of 16 not %d" % size) + raise InvalidFieldDefinition(f"UuidField type must have a size of 16 not {size}") super().__init__(size=16, **kwargs) def _pack_value(self, value): @@ -717,7 +711,7 @@ def _parse_value(self, value): elif isinstance(value, types.LambdaType): uuid_value = value else: - raise TypeError("Cannot parse value for field %s of type %s to a uuid" % (self.name, type(value).__name__)) + raise TypeError(f"Cannot parse value for field {self.name} of type {type(value).__name__} to a uuid") return uuid_value def _get_packed_size(self): @@ -743,7 +737,7 @@ def _parse_value(self, value): break if not valid and int_value != 0 and self.enum_strict: - raise ValueError("Enum value %d does not exist in enum type %s" % (int_value, self.enum_type)) + raise ValueError(f"Enum value {int_value} does not exist in enum type {self.enum_type}") return int_value def _to_string(self): @@ -754,9 +748,9 @@ def _to_string(self): enum_name = enum break if enum_name is None: - return "(%d) UNKNOWN_ENUM" % value + return f"({value}) UNKNOWN_ENUM" else: - return "(%d) %s" % (value, enum_name) + return f"({value}) {enum_name}" class FlagField(IntField): @@ -773,7 +767,7 @@ def set_flag(self, flag): break if not valid and self.flag_strict: - raise ValueError("Flag value does not exist in flag type %s" % self.flag_type) + raise ValueError(f"Flag value does not exist in flag type {self.flag_type}") self.set_value(self.value | flag) def has_flag(self, flag): @@ -786,7 +780,7 @@ def _parse_value(self, value): if isinstance(value, int): current_val &= ~value if current_val != 0 and self.flag_strict: - raise ValueError("Invalid flag for field %s value set %d" % (self.name, current_val)) + raise ValueError(f"Invalid flag for field {self.name} value set {current_val}") return int_value @@ -799,7 +793,7 @@ def _to_string(self): if isinstance(value, int) and self.has_flag(value): flags.append(flag) flags.sort() - return "(%d) %s" % (field_value, ", ".join(flags)) + return f"({field_value}) " + (", ".join(flags)) class BoolField(Field): @@ -811,7 +805,7 @@ def __init__(self, size=1, **kwargs): :param kwargs: Any other kwargs to be sent to Field() """ if size != 1: - raise InvalidFieldDefinition("BoolField size must have a value of 1, not %d" % size) + raise InvalidFieldDefinition(f"BoolField size must have a value of 1, not {size}") super().__init__(size=size, **kwargs) def _pack_value(self, value): @@ -827,7 +821,7 @@ def _parse_value(self, value): elif isinstance(value, types.LambdaType): bool_value = value else: - raise TypeError("Cannot parse value for field %s of type %s to a bool" % (self.name, type(value).__name__)) + raise TypeError(f"Cannot parse value for field {self.name} of type {type(value).__name__} to a bool") return bool_value def _get_packed_size(self): @@ -858,9 +852,7 @@ def _parse_value(self, value): elif isinstance(value, types.LambdaType): text_value = value else: - raise TypeError( - "Cannot parse value for field %s of type %s to a text string" % (self.name, type(value).__name__) - ) + raise TypeError(f"Cannot parse value for field {self.name} of type {type(value).__name__} to a text string") return text_value diff --git a/src/smbprotocol/transport.py b/src/smbprotocol/transport.py index 3ba05b35..052b43a4 100644 --- a/src/smbprotocol/transport.py +++ b/src/smbprotocol/transport.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) @@ -66,7 +65,7 @@ def connect(self): try: self._sock = socket.create_connection((self.server, self.port), timeout=self.timeout) except (OSError, socket.gaierror) as err: - raise ValueError("Failed to connect to '%s:%s': %s" % (self.server, self.port, str(err))) from err + raise ValueError(f"Failed to connect to '{self.server}:{self.port}'") from err self._sock.settimeout(None) # Make sure the socket is in blocking mode. self.connected = True @@ -79,7 +78,7 @@ def close(self): # which returns b'' meaning it was closed. try: self._sock.shutdown(socket.SHUT_RDWR) - except socket.error as e: # pragma: no cover + except OSError as e: # pragma: no cover # Avoid collecting coverage here to avoid CI failing due to race condition differences if e.errno != errno.ENOTCONN: raise @@ -98,8 +97,7 @@ def send(self, header): data_length = len(b_msg) if data_length > self.MAX_SIZE: raise ValueError( - "Data to be sent over Direct TCP size %d exceeds the max length allowed %d" - % (data_length, self.MAX_SIZE) + f"Data to be sent over Direct TCP size {data_length} exceeds the max length allowed {self.MAX_SIZE}" ) tcp_packet = DirectTCPPacket() @@ -127,7 +125,7 @@ def _recv(self, length, timeout): offset = 0 while offset < length: read_len = length - offset - log.debug("Socket recv(%s) (total %s)" % (read_len, length)) + log.debug(f"Socket recv({read_len}) (total {length})") start_time = timeit.default_timer() @@ -144,7 +142,7 @@ def _recv(self, length, timeout): try: b_data = self._sock.recv(read_len) - except socket.error as e: + except OSError as e: # Windows will raise this error if the socket has been shutdown, Linux return returns an empty byte # string so we just replicate that. if e.errno not in [errno.ESHUTDOWN, errno.ECONNRESET]: @@ -153,7 +151,7 @@ def _recv(self, length, timeout): b_data = b"" read_len = len(b_data) - log.debug("Socket recv() returned %s bytes (total %s)" % (read_len, length)) + log.debug(f"Socket recv() returned {read_len} bytes (total {length})") if read_len == 0: self.close() diff --git a/src/smbprotocol/tree.py b/src/smbprotocol/tree.py index 94f3e37d..7383aeb9 100644 --- a/src/smbprotocol/tree.py +++ b/src/smbprotocol/tree.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) @@ -236,16 +235,16 @@ def connect(self, require_secure_negotiate=True): verify the negotiation parameters with the server to prevent SMB downgrade attacks """ - log.info("Session: %s - Creating connection to share %s" % (self.session.username, self.share_name)) + log.info(f"Session: {self.session.username} - Creating connection to share {self.share_name}") utf_share_name = self.share_name.encode("utf-16-le") connect = SMB2TreeConnectRequest() connect["buffer"] = utf_share_name - log.info("Session: %s - Sending Tree Connect message" % self.session.username) + log.info("Session: %s - Sending Tree Connect message", self.session.username) log.debug(connect) request = self.session.connection.send(connect, sid=self.session.session_id) - log.info("Session: %s - Receiving Tree Connect response" % self.session.username) + log.info("Session: %s - Receiving Tree Connect response", self.session.username) response = self.session.connection.receive(request) tree_response = SMB2TreeConnectResponse() tree_response.unpack(response["data"].get_value()) @@ -253,7 +252,7 @@ def connect(self, require_secure_negotiate=True): # https://msdn.microsoft.com/en-us/library/cc246687.aspx self.tree_connect_id = response["tree_id"].get_value() - log.info("Session: %s - Created tree connection with ID %d" % (self.session.username, self.tree_connect_id)) + log.info("Session: %s - Created tree connection with ID %d", self.session.username, self.tree_connect_id) self._connected = True self.session.tree_connect_table[self.tree_connect_id] = self @@ -277,16 +276,14 @@ def disconnect(self): if not self._connected: return - log.info("Session: %s, Tree: %s - Disconnecting from Tree Connect" % (self.session.username, self.share_name)) + log.info(f"Session: {self.session.username}, Tree: {self.share_name} - Disconnecting from Tree Connect") req = SMB2TreeDisconnect() - log.info("Session: %s, Tree: %s - Sending Tree Disconnect message" % (self.session.username, self.share_name)) + log.info(f"Session: {self.session.username}, Tree: {self.share_name} - Sending Tree Disconnect message") log.debug(req) request = self.session.connection.send(req, sid=self.session.session_id, tid=self.tree_connect_id) - log.info( - "Session: %s, Tree: %s - Receiving Tree Disconnect response" % (self.session.username, self.share_name) - ) + log.info(f"Session: {self.session.username}, Tree: {self.share_name} - Receiving Tree Disconnect response") res = self.session.connection.receive(request) res_disconnect = SMB2TreeDisconnect() res_disconnect.unpack(res["data"].get_value()) @@ -295,8 +292,8 @@ def disconnect(self): del self.session.tree_connect_table[self.tree_connect_id] def _verify_dialect_negotiate(self): - log_header = "Session: %s, Tree: %s" % (self.session.username, self.share_name) - log.info("%s - Running secure negotiate process" % log_header) + log_header = f"Session: {self.session.username}, Tree: {self.share_name}" + log.info("%s - Running secure negotiate process", log_header) if not self.session.signing_key: # This will only happen if we authenticated with the guest or anonymous user. @@ -324,13 +321,13 @@ def _verify_dialect_negotiate(self): ioctl_request["buffer"] = val_neg ioctl_request["max_output_response"] = len(val_neg) ioctl_request["flags"] = IOCTLFlags.SMB2_0_IOCTL_IS_FSCTL - log.info("%s - Sending Secure Negotiate Validation message" % log_header) + log.info("%s - Sending Secure Negotiate Validation message", log_header) log.debug(ioctl_request) request = self.session.connection.send( ioctl_request, sid=self.session.session_id, tid=self.tree_connect_id, force_signature=True ) - log.info("%s - Receiving secure negotiation response" % log_header) + log.info("%s - Receiving secure negotiation response", log_header) try: response = self.session.connection.receive(request) @@ -349,7 +346,7 @@ def _verify_dialect_negotiate(self): ioctl_resp.unpack(response["data"].get_value()) log.debug(ioctl_resp) - log.info("%s - Unpacking secure negotiate response info" % log_header) + log.info("%s - Unpacking secure negotiate response info", log_header) val_resp = SMB2ValidateNegotiateInfoResponse() val_resp.unpack(ioctl_resp["buffer"].get_value()) log.debug(val_resp) @@ -364,12 +361,11 @@ def _verify_dialect_negotiate(self): "server security mode", val_resp["security_mode"].get_value(), self.session.connection.server_security_mode ) self._verify("server dialect", val_resp["dialect"].get_value(), self.session.connection.dialect) - log.info("Session: %d, Tree: %d - Secure negotiate complete" % (self.session.session_id, self.tree_connect_id)) + log.info("Session: %d, Tree: %d - Secure negotiate complete", self.session.session_id, self.tree_connect_id) def _verify(self, check, actual, expected): - log_header = "Session: %d, Tree: %d" % (self.session.session_id, self.tree_connect_id) if actual != expected: + log_header = f"Session: {self.session.session_id}, Tree: {self.tree_connect_id}" raise SMBException( - "%s - Secure negotiate failed to verify %s, " - "Actual: %s, Expected: %s" % (log_header, check, actual, expected) + f"{log_header} - Secure negotiate failed to verify {check}, Actual: {actual}, Expected: {expected}", ) diff --git a/tests/__init__.py b/tests/__init__.py index b6f1b33a..a687ba1a 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,3 +1,2 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/tests/conftest.py b/tests/conftest.py index ae47c8d2..ffa2f34b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) @@ -167,8 +166,8 @@ def smb_real(): share = os.environ.get("SMB_SHARE", "share") if server: - share = r"\\%s\%s" % (server, share) - encrypted_share = "%s-encrypted" % share + share = rf"\\{server}\{share}" + encrypted_share = f"{share}-encrypted" return username, password, server, int(port), share, encrypted_share else: pytest.skip("The SMB_SHARE env var was not set, integration tests will be skipped") @@ -183,7 +182,8 @@ def smb_real(): ) def smb_share(request, smb_real): # Use some non ASCII chars to test out edge cases by default. - share_path = "%s\\%s" % (smb_real[request.param[1]], "Pýtæs†-[%s] 💩" % time.time()) + test_folder = f"Pýtæs†-[{time.time()}] 💩" + share_path = rf"{smb_real[request.param[1]]}\{test_folder}" delete_session(smb_real[2]) # Test out forward slashes also work with the share-encrypted test @@ -206,14 +206,14 @@ def smb_share(request, smb_real): ids=["dfs-root", "dfs-single-target", "dfs-broken-target"], ) def smb_dfs_share(request, smb_real): - test_folder = "Pýtæs†-[%s] 💩" % time.time() + test_folder = f"Pýtæs†-[{time.time()}] 💩" if request.param[1]: - target_share_path = "%s\\%s" % (smb_real[request.param[1]], test_folder) - dfs_path = "\\\\%s\\dfs\\%s\\%s" % (smb_real[2], request.param[0], test_folder) + target_share_path = rf"{smb_real[request.param[1]]}\{test_folder}" + dfs_path = rf"\\{smb_real[2]}\dfs\{request.param[0]}\{test_folder}" else: - target_share_path = "\\\\%s\\dfs\\%s" % (smb_real[2], test_folder) + target_share_path = rf"\\{smb_real[2]}\dfs\{test_folder}" dfs_path = target_share_path mkdir(target_share_path, username=smb_real[0], password=smb_real[1], port=smb_real[3]) diff --git a/tests/test_change_notify.py b/tests/test_change_notify.py index 422a5ed7..1ffd0594 100644 --- a/tests/test_change_notify.py +++ b/tests/test_change_notify.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/tests/test_connection.py b/tests/test_connection.py index 67885c3d..644e5d0e 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/tests/test_create_contexts.py b/tests/test_create_contexts.py index 70e0745f..a95e5d1a 100644 --- a/tests/test_create_contexts.py +++ b/tests/test_create_contexts.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) @@ -59,11 +58,11 @@ class TestSMB2CreateContextName: def test_create_message(self): ea_buffer1 = SMB2CreateEABuffer() ea_buffer1["ea_name"] = "Authors\x00".encode("ascii") - ea_buffer1["ea_value"] = "Jordan Borean".encode("utf-8") + ea_buffer1["ea_value"] = b"Jordan Borean" ea_buffer2 = SMB2CreateEABuffer() ea_buffer2["ea_name"] = "Title\x00".encode("ascii") - ea_buffer2["ea_value"] = "Jordan Borean Title".encode("utf-8") + ea_buffer2["ea_value"] = b"Jordan Borean Title" ea_buffers = SMB2CreateContextRequest() ea_buffers["buffer_name"] = CreateContextName.SMB2_CREATE_EA_BUFFER diff --git a/tests/test_dfs.py b/tests/test_dfs.py index 9479a03f..1476130a 100644 --- a/tests/test_dfs.py +++ b/tests/test_dfs.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/tests/test_exceptions.py b/tests/test_exceptions.py index 1f4551fb..f4795b6b 100644 --- a/tests/test_exceptions.py +++ b/tests/test_exceptions.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- -# Copyright: (c) 2019, Jordan Borean# -*- coding: utf-8 -*- (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) import re diff --git a/tests/test_file_info.py b/tests/test_file_info.py index 3cb3d3c5..ce5535cc 100644 --- a/tests/test_file_info.py +++ b/tests/test_file_info.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/tests/test_header.py b/tests/test_header.py index 51cb4864..1175c8b0 100644 --- a/tests/test_header.py +++ b/tests/test_header.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2020, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/tests/test_ioctl.py b/tests/test_ioctl.py index 8e8fca33..7f94ee34 100644 --- a/tests/test_ioctl.py +++ b/tests/test_ioctl.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/tests/test_open.py b/tests/test_open.py index d9f596cf..86d2c1d0 100644 --- a/tests/test_open.py +++ b/tests/test_open.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/tests/test_query_info.py b/tests/test_query_info.py index 2ae9284a..a6e48120 100644 --- a/tests/test_query_info.py +++ b/tests/test_query_info.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/tests/test_reparse_point.py b/tests/test_reparse_point.py index 3b9255e3..97650428 100644 --- a/tests/test_reparse_point.py +++ b/tests/test_reparse_point.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/tests/test_security_descriptor.py b/tests/test_security_descriptor.py index 385918bb..38f1cdfe 100644 --- a/tests/test_security_descriptor.py +++ b/tests/test_security_descriptor.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/tests/test_session.py b/tests/test_session.py index 3ecb177d..5efd6d45 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/tests/test_smbclient_os.py b/tests/test_smbclient_os.py index 37a3493c..da91c20a 100644 --- a/tests/test_smbclient_os.py +++ b/tests/test_smbclient_os.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) @@ -283,7 +282,7 @@ def test_listdir(dirname, smb_share): def test_listdir_with_pattern(smb_share): for filename in ["file.txt", "file-test1.txt", "file-test1a.txt"]: - with smbclient.open_file("%s\\%s" % (smb_share, filename), mode="w") as fd: + with smbclient.open_file(rf"{smb_share}\{filename}", mode="w") as fd: fd.write("content") actual = smbclient.listdir(smb_share, search_pattern="file-test*.txt") @@ -430,7 +429,7 @@ def test_makedirs_file_as_parent(smb_share): def test_read_text_file(smb_share): - file_path = "%s\\%s" % (smb_share, "file.txt") + file_path = rf"{smb_share}\file.txt" file_contents = "File Contents\nNewline" expected = "[NtStatus 0xc0000034] No such file or directory" @@ -471,7 +470,7 @@ def test_read_text_file(smb_share): def test_read_byte_file(smb_share): - file_path = "%s\\%s" % (smb_share, "file.txt") + file_path = rf"{smb_share}\file.txt" file_contents = b"\x00\x01\x02\x03" expected = "[NtStatus 0xc0000034] No such file or directory" @@ -502,7 +501,7 @@ def test_read_byte_file(smb_share): def test_write_text_file(smb_share): - file_path = "%s\\%s" % (smb_share, "file.txt") + file_path = rf"{smb_share}\file.txt" file_contents = "File Contents\nNewline" with smbclient.open_file(file_path, mode="w") as fd: @@ -531,7 +530,7 @@ def test_write_text_file(smb_share): def test_write_byte_file(smb_share): - file_path = "%s\\%s" % (smb_share, "file.txt") + file_path = rf"{smb_share}\file.txt" file_contents = b"File Contents\nNewline" with smbclient.open_file(file_path, mode="wb") as fd: @@ -562,7 +561,7 @@ def test_write_byte_file(smb_share): # https://github.com/jborean93/smbprotocol/issues/20 def test_read_large_text_file(smb_share): - file_path = "%s\\%s" % (smb_share, "file.txt") + file_path = rf"{smb_share}\file.txt" file_contents = "a" * 131074 with smbclient.open_file(file_path, mode="w") as fd: @@ -574,7 +573,7 @@ def test_read_large_text_file(smb_share): def test_write_exclusive_text_file(smb_share): - file_path = "%s\\%s" % (smb_share, "file.txt") + file_path = rf"{smb_share}\file.txt" file_contents = "File Contents\nNewline" with smbclient.open_file(file_path, mode="x") as fd: @@ -600,7 +599,7 @@ def test_write_exclusive_text_file(smb_share): def test_write_exclusive_byte_file(smb_share): - file_path = "%s\\%s" % (smb_share, "file.txt") + file_path = rf"{smb_share}\file.txt" file_contents = b"File Contents\nNewline" with smbclient.open_file(file_path, mode="xb") as fd: @@ -626,7 +625,7 @@ def test_write_exclusive_byte_file(smb_share): def test_append_text_file(smb_share): - file_path = "%s\\%s" % (smb_share, "file.txt") + file_path = rf"{smb_share}\file.txt" with smbclient.open_file(file_path, mode="a") as fd: assert isinstance(fd, io.TextIOWrapper) @@ -647,7 +646,7 @@ def test_append_text_file(smb_share): def test_append_byte_file(smb_share): - file_path = "%s\\%s" % (smb_share, "file.txt") + file_path = rf"{smb_share}\file.txt" with smbclient.open_file(file_path, mode="ab") as fd: assert isinstance(fd, io.BufferedWriter) @@ -668,7 +667,7 @@ def test_append_byte_file(smb_share): def test_read_write_text_file(smb_share): - file_path = "%s\\%s" % (smb_share, "file.txt") + file_path = rf"{smb_share}\file.txt" with smbclient.open_file(file_path, mode="w+") as fd: fd.write("abc") @@ -679,7 +678,7 @@ def test_read_write_text_file(smb_share): def test_read_write_byte_file(smb_share): - file_path = "%s\\%s" % (smb_share, "file.txt") + file_path = rf"{smb_share}\file.txt" with smbclient.open_file(file_path, mode="bw+") as fd: fd.write(b"abc") @@ -690,7 +689,7 @@ def test_read_write_byte_file(smb_share): def test_open_directory_fail(smb_share): - dir_path = "%s\\%s" % (smb_share, "dir") + dir_path = rf"{smb_share}\dir" smbclient.mkdir(dir_path) with pytest.raises(OSError, match=re.escape("[NtStatus 0xc00000ba] Is a directory: ")): @@ -709,14 +708,14 @@ def test_open_directory_with_correct_file_type(smb_share): def test_open_file_in_missing_dir(smb_share): - file_path = "%s\\dir\\%s" % (smb_share, "file.txt") + file_path = rf"{smb_share}\dir\file.txt" with pytest.raises(OSError, match=re.escape("[NtStatus 0xc000003a] No such file or directory: ")): smbclient.open_file(file_path) def test_open_file_with_read_share_access(smb_share): - file_path = "%s\\%s" % (smb_share, "file.txt") + file_path = rf"{smb_share}\file.txt" with smbclient.open_file(file_path, mode="w") as fd: fd.write("contents") @@ -738,7 +737,7 @@ def test_open_file_with_read_share_access(smb_share): def test_open_file_with_write_share_access(smb_share): - file_path = "%s\\%s" % (smb_share, "file.txt") + file_path = rf"{smb_share}\file.txt" with smbclient.open_file(file_path, mode="w") as fd: expected = ( @@ -762,7 +761,7 @@ def test_open_file_with_write_share_access(smb_share): def test_open_file_with_read_write_access(smb_share): - file_path = "%s\\%s" % (smb_share, "file.txt") + file_path = rf"{smb_share}\file.txt" with smbclient.open_file(file_path, mode="w", share_access="rw") as fd: fd.write("content") @@ -1262,7 +1261,7 @@ def test_scandir(smb_share): def test_scamdir_with_pattern(smb_share): for filename in ["file.txt", "file-test1.txt", "file-test1a.txt"]: - with smbclient.open_file("%s\\%s" % (smb_share, filename), mode="w") as fd: + with smbclient.open_file(rf"{smb_share}\{filename}", mode="w") as fd: fd.write("content") count = 0 @@ -1326,7 +1325,9 @@ def test_scandir_with_broken_symlink(smb_share): def test_scandir_with_cache(smb_real): - share_path = "%s\\%s" % (smb_real[4], "Pýtæs†-[%s] 💩" % time.time()) + share = smb_real[4] + test_folder = f"Pýtæs†-[{time.time()}] 💩" + share_path = rf"{share}\{test_folder}" cache = {} smbclient.mkdir(share_path, username=smb_real[0], password=smb_real[1], port=smb_real[3], connection_cache=cache) @@ -1840,7 +1841,7 @@ def test_walk_topdown(smb_share): smbclient.makedirs("%s\\dir1\\dir2\\dir3" % smb_share) for name in ["file1.txt", "dir1\\file2.txt", "dir1\\dir2\\file3.txt", "dir1\\dir2\\dir3\\file4.txt"]: - with smbclient.open_file("%s\\%s" % (smb_share, name), mode="w") as fd: + with smbclient.open_file(rf"{smb_share}\{name}", mode="w") as fd: fd.write("content") scanned_files = [] @@ -1861,7 +1862,7 @@ def test_walk_bottomup(smb_share): smbclient.makedirs("%s\\dir1\\dir2\\dir3" % smb_share) for name in ["file1.txt", "dir1\\file2.txt", "dir1\\dir2\\file3.txt", "dir1\\dir2\\dir3\\file4.txt"]: - with smbclient.open_file("%s\\%s" % (smb_share, name), mode="w") as fd: + with smbclient.open_file(rf"{smb_share}\{name}", mode="w") as fd: fd.write("content") scanned_files = [] diff --git a/tests/test_smbclient_path.py b/tests/test_smbclient_path.py index 9f22a6a8..edf45ea1 100644 --- a/tests/test_smbclient_path.py +++ b/tests/test_smbclient_path.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/tests/test_smbclient_pool.py b/tests/test_smbclient_pool.py index eaeaed37..3aef68e8 100644 --- a/tests/test_smbclient_pool.py +++ b/tests/test_smbclient_pool.py @@ -1,7 +1,8 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2020, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) +import logging + import pytest import smbclient._io as io @@ -89,16 +90,14 @@ def test_reset_connection_error_fail(mocker): pool.reset_connection_cache(connection_cache={"conn": connection_mock}) -def test_reset_connection_error_warning(monkeypatch, mocker): +def test_reset_connection_error_warning(mocker, caplog): connection_mock = mocker.MagicMock() connection_mock.disconnect.side_effect = Exception("exception") - warning_mock = mocker.MagicMock() - monkeypatch.setattr(pool.warnings, "warn", warning_mock) - pool.reset_connection_cache(fail_on_error=False, connection_cache={"conn": connection_mock}) + with caplog.at_level(logging.ERROR): + pool.reset_connection_cache(fail_on_error=False, connection_cache={"conn": connection_mock}) - assert warning_mock.call_count == 1 - assert warning_mock.call_args[0][0] == "Failed to close connection conn: exception" + assert "Failed to close connection conn" in caplog.text def test_dfs_referral_no_links_no_domain(reset_config, monkeypatch, mocker): @@ -134,7 +133,7 @@ def test_dfs_referral_no_links_from_domain(reset_config, monkeypatch, mocker): config.domain_controller = DOMAIN_NAME with pytest.raises(ObjectPathNotFound): - actual_get_smb_tree(f"\\\\{DOMAIN_NAME}\\dfs") + actual_get_smb_tree(rf"\\{DOMAIN_NAME}\dfs") def test_resolve_dfs_referral_no_links(reset_config, monkeypatch, mocker): diff --git a/tests/test_smbclient_shutil.py b/tests/test_smbclient_shutil.py index 5b8fd56a..36258ffa 100644 --- a/tests/test_smbclient_shutil.py +++ b/tests/test_smbclient_shutil.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) @@ -109,7 +108,7 @@ def test_copy_from_local(smb_share, tmp_path): ctypes.windll.kernel32.SetFileTime(handle, ref_time, ref_time, ref_time) ctypes.windll.kernel32.CloseHandle(handle) - dst_filename = "{}\\target.txt".format(smb_share) + dst_filename = f"{smb_share}\\target.txt" copy_from_to(str(src_filename), dst_filename) diff --git a/tests/test_structure.py b/tests/test_structure.py index 47f54637..f262778a 100644 --- a/tests/test_structure.py +++ b/tests/test_structure.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/tests/test_text.py b/tests/test_text.py index 87edb6be..232178de 100644 --- a/tests/test_text.py +++ b/tests/test_text.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) diff --git a/tests/test_transport.py b/tests/test_transport.py index 5fa89000..960350ca 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) @@ -74,7 +73,7 @@ def test_normal_fail_message_too_big(self): def test_invalid_host(self): tcp = Tcp("fake-host", 445) - with pytest.raises(ValueError, match=re.escape("Failed to connect to 'fake-host:445': ")): + with pytest.raises(ValueError, match=re.escape("Failed to connect to 'fake-host:445'")): tcp.connect() diff --git a/tests/test_tree.py b/tests/test_tree.py index f2a53ec4..92206530 100644 --- a/tests/test_tree.py +++ b/tests/test_tree.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT)