From bcb5df2d0f36a751eeba4cd52a73681e7708da42 Mon Sep 17 00:00:00 2001 From: h3rrr <81402797+h3rrr@users.noreply.github.com> Date: Wed, 27 Aug 2025 10:52:06 +0800 Subject: [PATCH 01/21] Create SECURITY.md Signed-off-by: h3rrr <81402797+h3rrr@users.noreply.github.com> --- SECURITY.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 SECURITY.md diff --git a/SECURITY.md b/SECURITY.md new file mode 100644 index 0000000000..dbacdeda04 --- /dev/null +++ b/SECURITY.md @@ -0,0 +1,18 @@ +# Security Policy + +# Reporting a Vulnerability +At MONAI, we take security seriously and appreciate your efforts to responsibly disclose vulnerabilities. If you discover a security issue, please report it as soon as possible. + +Please do not create public issues for security-related reports. + +* To report a security issue, please use the GitHub Security Advisories tab to "Open a draft security advisory". +* Include a detailed description of the issue, steps to reproduce, potential impact, and any possible mitigations. +* If applicable, please also attach proof-of-concept code or screenshots. +* We will acknowledge your report within 72 hours and provide a status update as we investigate. + +# Disclosure Policy +* We follow a coordinated disclosure approach. +* We will not publicly disclose vulnerabilities until a fix has been developed and released. +* Credit will be given to researchers who responsibly disclose vulnerabilities, if requested. +# Acknowledgements +We greatly appreciate contributions from the security community and strive to recognize all researchers who help keep Cherry Studio safe. From 929f0b721530b8e741f82aefc0e39c0d4bd43cb4 Mon Sep 17 00:00:00 2001 From: h3rrr <81402797+h3rrr@users.noreply.github.com> Date: Wed, 27 Aug 2025 11:03:03 +0800 Subject: [PATCH 02/21] Update SECURITY.md Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Signed-off-by: h3rrr <81402797+h3rrr@users.noreply.github.com> --- SECURITY.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/SECURITY.md b/SECURITY.md index dbacdeda04..b1ab1540ac 100644 --- a/SECURITY.md +++ b/SECURITY.md @@ -15,4 +15,4 @@ Please do not create public issues for security-related reports. * We will not publicly disclose vulnerabilities until a fix has been developed and released. * Credit will be given to researchers who responsibly disclose vulnerabilities, if requested. # Acknowledgements -We greatly appreciate contributions from the security community and strive to recognize all researchers who help keep Cherry Studio safe. +We greatly appreciate contributions from the security community and strive to recognize all researchers who help keep MONAI safe. From 7160338285ed9942499a4523777a14c992c083b3 Mon Sep 17 00:00:00 2001 From: h3rrr <81402797+h3rrr@users.noreply.github.com> Date: Mon, 8 Sep 2025 12:48:38 +0800 Subject: [PATCH 03/21] Update SECURITY.md Co-authored-by: Eric Kerfoot <17726042+ericspod@users.noreply.github.com> Signed-off-by: h3rrr <81402797+h3rrr@users.noreply.github.com> --- SECURITY.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/SECURITY.md b/SECURITY.md index b1ab1540ac..b9254888ba 100644 --- a/SECURITY.md +++ b/SECURITY.md @@ -1,5 +1,23 @@ # Security Policy +## Reporting a Vulnerability +MONAI takes security seriously and appreciate your efforts to responsibly disclose vulnerabilities. If you discover a security issue, please report it as soon as possible. + +To report a security issue: +* please use the GitHub Security Advisories tab to "[Open a draft security advisory](https://github.com/Project-MONAI/MONAI/security/advisories/new)". +* Include a detailed description of the issue, steps to reproduce, potential impact, and any possible mitigations. +* If applicable, please also attach proof-of-concept code or screenshots. +* We aim to acknowledge your report within 72 hours and provide a status update as we investigate. +* Please do not create public issues for security-related reports. + +## Disclosure Policy +* We follow a coordinated disclosure approach. +* We will not publicly disclose vulnerabilities until a fix has been developed and released. +* Credit will be given to researchers who responsibly disclose vulnerabilities, if requested. + +## Acknowledgements +We greatly appreciate contributions from the security community and strive to recognize all researchers who help keep MONAI safe. + # Reporting a Vulnerability At MONAI, we take security seriously and appreciate your efforts to responsibly disclose vulnerabilities. If you discover a security issue, please report it as soon as possible. From 2c72dbc089d0249124c94604745baf0c3805a825 Mon Sep 17 00:00:00 2001 From: h3rrr <81402797+h3rrr@users.noreply.github.com> Date: Thu, 11 Sep 2025 22:54:32 +0800 Subject: [PATCH 04/21] Update utils.py Path traversal security issue fix Zip Slip Signed-off-by: h3rrr <81402797+h3rrr@users.noreply.github.com> --- monai/apps/utils.py | 56 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 48 insertions(+), 8 deletions(-) diff --git a/monai/apps/utils.py b/monai/apps/utils.py index 95c1450f2a..d871fb255a 100644 --- a/monai/apps/utils.py +++ b/monai/apps/utils.py @@ -11,6 +11,8 @@ from __future__ import annotations +import os +import shutil import hashlib import json import logging @@ -80,7 +82,6 @@ def get_logger( logger = get_logger("monai.apps") __all__.append("logger") - def _basename(p: PathLike) -> str: """get the last part of the path (removing the trailing slash if it exists)""" sep = os.path.sep + (os.path.altsep or "") + "/ " @@ -121,7 +122,32 @@ def update_to(self, b: int = 1, bsize: int = 1, tsize: int | None = None) -> Non logger.error(f"Download failed from {url} to {filepath}.") raise e - +def safe_extract_member(member, extract_to): + """Securely verify compressed package member paths to prevent path traversal attacks""" + # Get member path (handle different compression formats) + if hasattr(member, 'filename'): + member_path = member.filename # zipfile + elif hasattr(member, 'name'): + member_path = member.name # tarfile + else: + member_path = str(member) + + member_path = os.path.normpath(member_path) + + if os.path.isabs(member_path) or '..' in member_path.split(os.sep): + raise ValueError(f"Unsafe path detected in archive: {member_path}") + + full_path = os.path.join(extract_to, member_path) + full_path = os.path.normpath(full_path) + + extract_to_abs = os.path.abspath(extract_to) + full_path_abs = os.path.abspath(full_path) + + if not (full_path_abs == extract_to_abs or full_path_abs.startswith(extract_to_abs + os.sep)): + raise ValueError(f"Path traversal attack detected: {member_path}") + + return full_path + def check_hash(filepath: PathLike, val: str | None = None, hash_type: str = "md5") -> bool: """ Verify hash signature of specified file. @@ -287,14 +313,28 @@ def extractall( logger.info(f"Writing into directory: {output_dir}.") _file_type = file_type.lower().strip() if filepath.name.endswith("zip") or _file_type == "zip": - zip_file = zipfile.ZipFile(filepath) - zip_file.extractall(output_dir) - zip_file.close() + with zipfile.ZipFile(filepath, 'r') as zip_file: + for member in zip_file.infolist(): + if member.is_dir(): + continue + safe_path = safe_extract_member(member, output_dir) + os.makedirs(os.path.dirname(safe_path), exist_ok=True) + with zip_file.open(member) as source: + with open(safe_path, 'wb') as target: + shutil.copyfileobj(source, target) return if filepath.name.endswith("tar") or filepath.name.endswith("tar.gz") or "tar" in _file_type: - tar_file = tarfile.open(filepath) - tar_file.extractall(output_dir) - tar_file.close() + with tarfile.open(filepath, 'r') as tar_file: + for member in tar_file.getmembers(): + if not member.isfile(): + continue + + safe_path = safe_extract_member(member, output_dir) + os.makedirs(os.path.dirname(safe_path), exist_ok=True) + with tar_file.extractfile(member) as source: + if source: + with open(safe_path, 'wb') as target: + shutil.copyfileobj(source, target) return raise NotImplementedError( f'Unsupported file type, available options are: ["zip", "tar.gz", "tar"]. name={filepath} type={file_type}.' From bad190702b54a0d702ac39b9a0aa17a4bf491108 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 11 Sep 2025 14:55:02 +0000 Subject: [PATCH 05/21] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- monai/apps/utils.py | 90 ++++++++++++++++++++++----------------------- 1 file changed, 44 insertions(+), 46 deletions(-) diff --git a/monai/apps/utils.py b/monai/apps/utils.py index d871fb255a..7e75d66650 100644 --- a/monai/apps/utils.py +++ b/monai/apps/utils.py @@ -11,14 +11,12 @@ from __future__ import annotations -import os +import os import shutil import hashlib import json import logging -import os import re -import shutil import sys import tarfile import tempfile @@ -122,32 +120,32 @@ def update_to(self, b: int = 1, bsize: int = 1, tsize: int | None = None) -> Non logger.error(f"Download failed from {url} to {filepath}.") raise e -def safe_extract_member(member, extract_to): - """Securely verify compressed package member paths to prevent path traversal attacks""" - # Get member path (handle different compression formats) - if hasattr(member, 'filename'): - member_path = member.filename # zipfile - elif hasattr(member, 'name'): - member_path = member.name # tarfile - else: - member_path = str(member) - - member_path = os.path.normpath(member_path) - - if os.path.isabs(member_path) or '..' in member_path.split(os.sep): - raise ValueError(f"Unsafe path detected in archive: {member_path}") - - full_path = os.path.join(extract_to, member_path) - full_path = os.path.normpath(full_path) - - extract_to_abs = os.path.abspath(extract_to) - full_path_abs = os.path.abspath(full_path) - - if not (full_path_abs == extract_to_abs or full_path_abs.startswith(extract_to_abs + os.sep)): - raise ValueError(f"Path traversal attack detected: {member_path}") - +def safe_extract_member(member, extract_to): + """Securely verify compressed package member paths to prevent path traversal attacks""" + # Get member path (handle different compression formats) + if hasattr(member, 'filename'): + member_path = member.filename # zipfile + elif hasattr(member, 'name'): + member_path = member.name # tarfile + else: + member_path = str(member) + + member_path = os.path.normpath(member_path) + + if os.path.isabs(member_path) or '..' in member_path.split(os.sep): + raise ValueError(f"Unsafe path detected in archive: {member_path}") + + full_path = os.path.join(extract_to, member_path) + full_path = os.path.normpath(full_path) + + extract_to_abs = os.path.abspath(extract_to) + full_path_abs = os.path.abspath(full_path) + + if not (full_path_abs == extract_to_abs or full_path_abs.startswith(extract_to_abs + os.sep)): + raise ValueError(f"Path traversal attack detected: {member_path}") + return full_path - + def check_hash(filepath: PathLike, val: str | None = None, hash_type: str = "md5") -> bool: """ Verify hash signature of specified file. @@ -313,27 +311,27 @@ def extractall( logger.info(f"Writing into directory: {output_dir}.") _file_type = file_type.lower().strip() if filepath.name.endswith("zip") or _file_type == "zip": - with zipfile.ZipFile(filepath, 'r') as zip_file: - for member in zip_file.infolist(): - if member.is_dir(): - continue - safe_path = safe_extract_member(member, output_dir) - os.makedirs(os.path.dirname(safe_path), exist_ok=True) - with zip_file.open(member) as source: - with open(safe_path, 'wb') as target: + with zipfile.ZipFile(filepath, 'r') as zip_file: + for member in zip_file.infolist(): + if member.is_dir(): + continue + safe_path = safe_extract_member(member, output_dir) + os.makedirs(os.path.dirname(safe_path), exist_ok=True) + with zip_file.open(member) as source: + with open(safe_path, 'wb') as target: shutil.copyfileobj(source, target) return if filepath.name.endswith("tar") or filepath.name.endswith("tar.gz") or "tar" in _file_type: - with tarfile.open(filepath, 'r') as tar_file: - for member in tar_file.getmembers(): - if not member.isfile(): - continue - - safe_path = safe_extract_member(member, output_dir) - os.makedirs(os.path.dirname(safe_path), exist_ok=True) - with tar_file.extractfile(member) as source: - if source: - with open(safe_path, 'wb') as target: + with tarfile.open(filepath, 'r') as tar_file: + for member in tar_file.getmembers(): + if not member.isfile(): + continue + + safe_path = safe_extract_member(member, output_dir) + os.makedirs(os.path.dirname(safe_path), exist_ok=True) + with tar_file.extractfile(member) as source: + if source: + with open(safe_path, 'wb') as target: shutil.copyfileobj(source, target) return raise NotImplementedError( From a8ed1dfad1e38ddafd3f15cecf90906161aa32f9 Mon Sep 17 00:00:00 2001 From: h3rrr <81402797+h3rrr@users.noreply.github.com> Date: Fri, 12 Sep 2025 01:04:26 +0800 Subject: [PATCH 06/21] Update SECURITY.md Signed-off-by: h3rrr <81402797+h3rrr@users.noreply.github.com> --- SECURITY.md | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/SECURITY.md b/SECURITY.md index b9254888ba..8b4158eb73 100644 --- a/SECURITY.md +++ b/SECURITY.md @@ -14,23 +14,5 @@ To report a security issue: * We follow a coordinated disclosure approach. * We will not publicly disclose vulnerabilities until a fix has been developed and released. * Credit will be given to researchers who responsibly disclose vulnerabilities, if requested. - ## Acknowledgements We greatly appreciate contributions from the security community and strive to recognize all researchers who help keep MONAI safe. - -# Reporting a Vulnerability -At MONAI, we take security seriously and appreciate your efforts to responsibly disclose vulnerabilities. If you discover a security issue, please report it as soon as possible. - -Please do not create public issues for security-related reports. - -* To report a security issue, please use the GitHub Security Advisories tab to "Open a draft security advisory". -* Include a detailed description of the issue, steps to reproduce, potential impact, and any possible mitigations. -* If applicable, please also attach proof-of-concept code or screenshots. -* We will acknowledge your report within 72 hours and provide a status update as we investigate. - -# Disclosure Policy -* We follow a coordinated disclosure approach. -* We will not publicly disclose vulnerabilities until a fix has been developed and released. -* Credit will be given to researchers who responsibly disclose vulnerabilities, if requested. -# Acknowledgements -We greatly appreciate contributions from the security community and strive to recognize all researchers who help keep MONAI safe. From d3c711b0c1b5d653f0cd41071b07e7e3ffd40bb6 Mon Sep 17 00:00:00 2001 From: h3rrr <81402797+h3rrr@users.noreply.github.com> Date: Fri, 12 Sep 2025 14:13:46 +0800 Subject: [PATCH 07/21] Update utils.py Changed to previous content, the fix will be filed in a new PR Signed-off-by: h3rrr <81402797+h3rrr@users.noreply.github.com> --- monai/apps/utils.py | 56 ++++++++------------------------------------- 1 file changed, 9 insertions(+), 47 deletions(-) diff --git a/monai/apps/utils.py b/monai/apps/utils.py index 7e75d66650..95c1450f2a 100644 --- a/monai/apps/utils.py +++ b/monai/apps/utils.py @@ -11,12 +11,12 @@ from __future__ import annotations -import os -import shutil import hashlib import json import logging +import os import re +import shutil import sys import tarfile import tempfile @@ -80,6 +80,7 @@ def get_logger( logger = get_logger("monai.apps") __all__.append("logger") + def _basename(p: PathLike) -> str: """get the last part of the path (removing the trailing slash if it exists)""" sep = os.path.sep + (os.path.altsep or "") + "/ " @@ -120,31 +121,6 @@ def update_to(self, b: int = 1, bsize: int = 1, tsize: int | None = None) -> Non logger.error(f"Download failed from {url} to {filepath}.") raise e -def safe_extract_member(member, extract_to): - """Securely verify compressed package member paths to prevent path traversal attacks""" - # Get member path (handle different compression formats) - if hasattr(member, 'filename'): - member_path = member.filename # zipfile - elif hasattr(member, 'name'): - member_path = member.name # tarfile - else: - member_path = str(member) - - member_path = os.path.normpath(member_path) - - if os.path.isabs(member_path) or '..' in member_path.split(os.sep): - raise ValueError(f"Unsafe path detected in archive: {member_path}") - - full_path = os.path.join(extract_to, member_path) - full_path = os.path.normpath(full_path) - - extract_to_abs = os.path.abspath(extract_to) - full_path_abs = os.path.abspath(full_path) - - if not (full_path_abs == extract_to_abs or full_path_abs.startswith(extract_to_abs + os.sep)): - raise ValueError(f"Path traversal attack detected: {member_path}") - - return full_path def check_hash(filepath: PathLike, val: str | None = None, hash_type: str = "md5") -> bool: """ @@ -311,28 +287,14 @@ def extractall( logger.info(f"Writing into directory: {output_dir}.") _file_type = file_type.lower().strip() if filepath.name.endswith("zip") or _file_type == "zip": - with zipfile.ZipFile(filepath, 'r') as zip_file: - for member in zip_file.infolist(): - if member.is_dir(): - continue - safe_path = safe_extract_member(member, output_dir) - os.makedirs(os.path.dirname(safe_path), exist_ok=True) - with zip_file.open(member) as source: - with open(safe_path, 'wb') as target: - shutil.copyfileobj(source, target) + zip_file = zipfile.ZipFile(filepath) + zip_file.extractall(output_dir) + zip_file.close() return if filepath.name.endswith("tar") or filepath.name.endswith("tar.gz") or "tar" in _file_type: - with tarfile.open(filepath, 'r') as tar_file: - for member in tar_file.getmembers(): - if not member.isfile(): - continue - - safe_path = safe_extract_member(member, output_dir) - os.makedirs(os.path.dirname(safe_path), exist_ok=True) - with tar_file.extractfile(member) as source: - if source: - with open(safe_path, 'wb') as target: - shutil.copyfileobj(source, target) + tar_file = tarfile.open(filepath) + tar_file.extractall(output_dir) + tar_file.close() return raise NotImplementedError( f'Unsupported file type, available options are: ["zip", "tar.gz", "tar"]. name={filepath} type={file_type}.' From f7937403c23228d5971b57b78156c33cecef94f9 Mon Sep 17 00:00:00 2001 From: h3rrr <81402797+h3rrr@users.noreply.github.com> Date: Fri, 12 Sep 2025 14:56:13 +0800 Subject: [PATCH 08/21] Path traversal issue fix The path traversal issue in the previously submitted vulnerability report has been fixed and has been verified and fixed locally. Signed-off-by: h3rrr <81402797+h3rrr@users.noreply.github.com> --- monai/apps/utils.py | 56 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 47 insertions(+), 9 deletions(-) diff --git a/monai/apps/utils.py b/monai/apps/utils.py index 95c1450f2a..fb534cd9e8 100644 --- a/monai/apps/utils.py +++ b/monai/apps/utils.py @@ -11,12 +11,12 @@ from __future__ import annotations +import os +import shutil import hashlib import json import logging -import os import re -import shutil import sys import tarfile import tempfile @@ -80,7 +80,6 @@ def get_logger( logger = get_logger("monai.apps") __all__.append("logger") - def _basename(p: PathLike) -> str: """get the last part of the path (removing the trailing slash if it exists)""" sep = os.path.sep + (os.path.altsep or "") + "/ " @@ -121,6 +120,31 @@ def update_to(self, b: int = 1, bsize: int = 1, tsize: int | None = None) -> Non logger.error(f"Download failed from {url} to {filepath}.") raise e +def safe_extract_member(member, extract_to): + """Securely verify compressed package member paths to prevent path traversal attacks""" + # Get member path (handle different compression formats) + if hasattr(member, 'filename'): + member_path = member.filename # zipfile + elif hasattr(member, 'name'): + member_path = member.name # tarfile + else: + member_path = str(member) + + member_path = os.path.normpath(member_path) + + if os.path.isabs(member_path) or '..' in member_path.split(os.sep): + raise ValueError(f"Unsafe path detected in archive: {member_path}") + + full_path = os.path.join(extract_to, member_path) + full_path = os.path.normpath(full_path) + + extract_to_abs = os.path.abspath(extract_to) + full_path_abs = os.path.abspath(full_path) + + if not (full_path_abs == extract_to_abs or full_path_abs.startswith(extract_to_abs + os.sep)): + raise ValueError(f"Path traversal attack detected: {member_path}") + + return full_path def check_hash(filepath: PathLike, val: str | None = None, hash_type: str = "md5") -> bool: """ @@ -287,14 +311,28 @@ def extractall( logger.info(f"Writing into directory: {output_dir}.") _file_type = file_type.lower().strip() if filepath.name.endswith("zip") or _file_type == "zip": - zip_file = zipfile.ZipFile(filepath) - zip_file.extractall(output_dir) - zip_file.close() + with zipfile.ZipFile(filepath, 'r') as zip_file: + for member in zip_file.infolist(): + if member.is_dir(): + continue + safe_path = safe_extract_member(member, output_dir) + os.makedirs(os.path.dirname(safe_path), exist_ok=True) + with zip_file.open(member) as source: + with open(safe_path, 'wb') as target: + shutil.copyfileobj(source, target) return if filepath.name.endswith("tar") or filepath.name.endswith("tar.gz") or "tar" in _file_type: - tar_file = tarfile.open(filepath) - tar_file.extractall(output_dir) - tar_file.close() + with tarfile.open(filepath, 'r') as tar_file: + for member in tar_file.getmembers(): + if not member.isfile(): + continue + safe_path = safe_extract_member(member, output_dir) + os.makedirs(os.path.dirname(safe_path), exist_ok=True) + source = tar_file.extractfile(member) + if source is not None: + with source: + with open(safe_path, 'wb') as target: + shutil.copyfileobj(source, target) return raise NotImplementedError( f'Unsupported file type, available options are: ["zip", "tar.gz", "tar"]. name={filepath} type={file_type}.' From 292d7d86c363389620ef6eda985123816d119f4a Mon Sep 17 00:00:00 2001 From: h3rrr <81402797+h3rrr@users.noreply.github.com> Date: Fri, 12 Sep 2025 22:57:14 +0800 Subject: [PATCH 09/21] Update utils.py Sorry... I chose the wrong branch before. I added some anti-bypass measures to make the decompression operation safer. Signed-off-by: h3rrr <81402797+h3rrr@users.noreply.github.com> --- monai/apps/utils.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/monai/apps/utils.py b/monai/apps/utils.py index fb534cd9e8..2478550919 100644 --- a/monai/apps/utils.py +++ b/monai/apps/utils.py @@ -130,6 +130,11 @@ def safe_extract_member(member, extract_to): else: member_path = str(member) + if hasattr(member, 'issym') and member.issym(): + raise ValueError(f"Symbolic link detected in archive: {member_path}") + if hasattr(member, 'islnk') and member.islnk(): + raise ValueError(f"Hard link detected in archive: {member_path}") + member_path = os.path.normpath(member_path) if os.path.isabs(member_path) or '..' in member_path.split(os.sep): @@ -313,9 +318,10 @@ def extractall( if filepath.name.endswith("zip") or _file_type == "zip": with zipfile.ZipFile(filepath, 'r') as zip_file: for member in zip_file.infolist(): + safe_path = safe_extract_member(member, output_dir) if member.is_dir(): continue - safe_path = safe_extract_member(member, output_dir) + os.makedirs(os.path.dirname(safe_path), exist_ok=True) with zip_file.open(member) as source: with open(safe_path, 'wb') as target: @@ -324,9 +330,10 @@ def extractall( if filepath.name.endswith("tar") or filepath.name.endswith("tar.gz") or "tar" in _file_type: with tarfile.open(filepath, 'r') as tar_file: for member in tar_file.getmembers(): + safe_path = safe_extract_member(member, output_dir) if not member.isfile(): continue - safe_path = safe_extract_member(member, output_dir) + os.makedirs(os.path.dirname(safe_path), exist_ok=True) source = tar_file.extractfile(member) if source is not None: From aaecf777784c0d82eafe7ee8bcf11cade50566f6 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 12 Sep 2025 14:57:37 +0000 Subject: [PATCH 10/21] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- monai/apps/utils.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/monai/apps/utils.py b/monai/apps/utils.py index 2478550919..7d6f492412 100644 --- a/monai/apps/utils.py +++ b/monai/apps/utils.py @@ -130,11 +130,11 @@ def safe_extract_member(member, extract_to): else: member_path = str(member) - if hasattr(member, 'issym') and member.issym(): - raise ValueError(f"Symbolic link detected in archive: {member_path}") - if hasattr(member, 'islnk') and member.islnk(): + if hasattr(member, 'issym') and member.issym(): + raise ValueError(f"Symbolic link detected in archive: {member_path}") + if hasattr(member, 'islnk') and member.islnk(): raise ValueError(f"Hard link detected in archive: {member_path}") - + member_path = os.path.normpath(member_path) if os.path.isabs(member_path) or '..' in member_path.split(os.sep): @@ -321,7 +321,7 @@ def extractall( safe_path = safe_extract_member(member, output_dir) if member.is_dir(): continue - + os.makedirs(os.path.dirname(safe_path), exist_ok=True) with zip_file.open(member) as source: with open(safe_path, 'wb') as target: @@ -333,7 +333,7 @@ def extractall( safe_path = safe_extract_member(member, output_dir) if not member.isfile(): continue - + os.makedirs(os.path.dirname(safe_path), exist_ok=True) source = tar_file.extractfile(member) if source is not None: From 8d8d4a4fc01d4b246e4e87cf604f5feb28fb54b9 Mon Sep 17 00:00:00 2001 From: h3rrr <81402797+h3rrr@users.noreply.github.com> Date: Fri, 12 Sep 2025 22:58:17 +0800 Subject: [PATCH 11/21] Update test_download_and_extract.py New test cases added as required Signed-off-by: h3rrr <81402797+h3rrr@users.noreply.github.com> --- tests/apps/test_download_and_extract.py | 185 ++++++++++++++++++++++++ 1 file changed, 185 insertions(+) diff --git a/tests/apps/test_download_and_extract.py b/tests/apps/test_download_and_extract.py index 190e32fc79..40d1fc8c06 100644 --- a/tests/apps/test_download_and_extract.py +++ b/tests/apps/test_download_and_extract.py @@ -13,6 +13,9 @@ import tempfile import unittest +import os +import zipfile +import tarfile from pathlib import Path from urllib.error import ContentTooShortError, HTTPError @@ -66,5 +69,187 @@ def test_default(self, key, file_type): ) +class TestPathTraversalProtection(unittest.TestCase): + """Test cases for path traversal attack protection in extractall function.""" + + def test_valid_zip_extraction(self): + """Test that valid zip files extract successfully without raising exceptions.""" + with tempfile.TemporaryDirectory() as tmp_dir: + # Create a valid zip file + zip_path = Path(tmp_dir) / "valid_test.zip" + extract_dir = Path(tmp_dir) / "extract" + extract_dir.mkdir() + + # Create zip with normal file structure + with zipfile.ZipFile(zip_path, 'w') as zf: + zf.writestr("normal_file.txt", "This is a normal file") + zf.writestr("subdir/nested_file.txt", "This is a nested file") + zf.writestr("another_file.json", '{"key": "value"}') + + # This should not raise any exception + try: + extractall(str(zip_path), str(extract_dir)) + + # Verify files were extracted correctly + self.assertTrue((extract_dir / "normal_file.txt").exists()) + self.assertTrue((extract_dir / "subdir" / "nested_file.txt").exists()) + self.assertTrue((extract_dir / "another_file.json").exists()) + + # Verify content + with open(extract_dir / "normal_file.txt", 'r') as f: + self.assertEqual(f.read(), "This is a normal file") + + except Exception as e: + self.fail(f"Valid zip extraction should not raise exception: {e}") + + def test_malicious_zip_path_traversal(self): + """Test that malicious zip files with path traversal attempts raise ValueError.""" + with tempfile.TemporaryDirectory() as tmp_dir: + # Create malicious zip file with path traversal + zip_path = Path(tmp_dir) / "malicious_test.zip" + extract_dir = Path(tmp_dir) / "extract" + extract_dir.mkdir() + + # Create zip with malicious paths + with zipfile.ZipFile(zip_path, 'w') as zf: + # Try to write outside extraction directory + zf.writestr("../../../etc/malicious.txt", "malicious content") + zf.writestr("normal_file.txt", "normal content") + + # This should raise ValueError due to path traversal detection + with self.assertRaises(ValueError) as context: + extractall(str(zip_path), str(extract_dir)) + + self.assertIn("unsafe path", str(context.exception).lower()) + + def test_valid_tar_extraction(self): + """Test that valid tar files extract successfully without raising exceptions.""" + with tempfile.TemporaryDirectory() as tmp_dir: + # Create a valid tar file + tar_path = Path(tmp_dir) / "valid_test.tar.gz" + extract_dir = Path(tmp_dir) / "extract" + extract_dir.mkdir() + + # Create tar with normal file structure + with tarfile.open(tar_path, 'w:gz') as tf: + # Create temporary files to add to tar + temp_file1 = Path(tmp_dir) / "temp1.txt" + temp_file1.write_text("This is a normal file") + tf.add(temp_file1, arcname="normal_file.txt") + + temp_file2 = Path(tmp_dir) / "temp2.txt" + temp_file2.write_text("This is a nested file") + tf.add(temp_file2, arcname="subdir/nested_file.txt") + + # This should not raise any exception + try: + extractall(str(tar_path), str(extract_dir)) + + # Verify files were extracted correctly + self.assertTrue((extract_dir / "normal_file.txt").exists()) + self.assertTrue((extract_dir / "subdir" / "nested_file.txt").exists()) + + # Verify content + with open(extract_dir / "normal_file.txt", 'r') as f: + self.assertEqual(f.read(), "This is a normal file") + + except Exception as e: + self.fail(f"Valid tar extraction should not raise exception: {e}") + + def test_malicious_tar_path_traversal(self): + """Test that malicious tar files with path traversal attempts raise ValueError.""" + with tempfile.TemporaryDirectory() as tmp_dir: + # Create malicious tar file with path traversal + tar_path = Path(tmp_dir) / "malicious_test.tar.gz" + extract_dir = Path(tmp_dir) / "extract" + extract_dir.mkdir() + + # Create tar with malicious paths + with tarfile.open(tar_path, 'w:gz') as tf: + # Create a temporary file + temp_file = Path(tmp_dir) / "temp.txt" + temp_file.write_text("malicious content") + + # Add with malicious path (trying to write outside extraction directory) + tf.add(temp_file, arcname="../../../etc/malicious.txt") + + # This should raise ValueError due to path traversal detection + with self.assertRaises(ValueError) as context: + extractall(str(tar_path), str(extract_dir)) + + self.assertIn("unsafe path", str(context.exception).lower()) + + def test_absolute_path_protection(self): + """Test protection against absolute paths in archives.""" + with tempfile.TemporaryDirectory() as tmp_dir: + # Create zip with absolute path + zip_path = Path(tmp_dir) / "absolute_path_test.zip" + extract_dir = Path(tmp_dir) / "extract" + extract_dir.mkdir() + + with zipfile.ZipFile(zip_path, 'w') as zf: + # Try to use absolute path + zf.writestr("/etc/passwd", "malicious content") + + # This should raise ValueError due to absolute path detection + with self.assertRaises(ValueError) as context: + extractall(str(zip_path), str(extract_dir)) + + self.assertIn("unsafe path", str(context.exception).lower()) + + def test_malicious_symlink_protection(self): + """Test protection against malicious symlinks in tar archives.""" + with tempfile.TemporaryDirectory() as tmp_dir: + # Create malicious tar file with symlink + tar_path = Path(tmp_dir) / "malicious_symlink_test.tar.gz" + extract_dir = Path(tmp_dir) / "extract" + extract_dir.mkdir() + + # Create tar with malicious symlink + with tarfile.open(tar_path, 'w:gz') as tf: + temp_file = Path(tmp_dir) / "normal.txt" + temp_file.write_text("normal content") + tf.add(temp_file, arcname="normal.txt") + + symlink_info = tarfile.TarInfo(name="malicious_symlink.txt") + symlink_info.type = tarfile.SYMTYPE + symlink_info.linkname = "../../../etc/passwd" + symlink_info.size = 0 + tf.addfile(symlink_info) + + with self.assertRaises(ValueError) as context: + extractall(str(tar_path), str(extract_dir)) + + error_msg = str(context.exception).lower() + self.assertTrue("unsafe path" in error_msg or "symlink" in error_msg) + + def test_malicious_hardlink_protection(self): + """Test protection against malicious hard links in tar archives.""" + with tempfile.TemporaryDirectory() as tmp_dir: + # Create malicious tar file with hard link + tar_path = Path(tmp_dir) / "malicious_hardlink_test.tar.gz" + extract_dir = Path(tmp_dir) / "extract" + extract_dir.mkdir() + + # Create tar with malicious hard link + with tarfile.open(tar_path, 'w:gz') as tf: + temp_file = Path(tmp_dir) / "normal.txt" + temp_file.write_text("normal content") + tf.add(temp_file, arcname="normal.txt") + + hardlink_info = tarfile.TarInfo(name="malicious_hardlink.txt") + hardlink_info.type = tarfile.LNKTYPE + hardlink_info.linkname = "/etc/passwd" + hardlink_info.size = 0 + tf.addfile(hardlink_info) + + with self.assertRaises(ValueError) as context: + extractall(str(tar_path), str(extract_dir)) + + error_msg = str(context.exception).lower() + self.assertTrue("unsafe path" in error_msg or "hardlink" in error_msg) + + + if __name__ == "__main__": unittest.main() From 7a9f479120fd43deb76cabc80f8116eb2fb76756 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 12 Sep 2025 14:58:40 +0000 Subject: [PATCH 12/21] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/apps/test_download_and_extract.py | 151 ++++++++++++------------ 1 file changed, 75 insertions(+), 76 deletions(-) diff --git a/tests/apps/test_download_and_extract.py b/tests/apps/test_download_and_extract.py index 40d1fc8c06..9f31239952 100644 --- a/tests/apps/test_download_and_extract.py +++ b/tests/apps/test_download_and_extract.py @@ -13,7 +13,6 @@ import tempfile import unittest -import os import zipfile import tarfile from pathlib import Path @@ -71,7 +70,7 @@ def test_default(self, key, file_type): class TestPathTraversalProtection(unittest.TestCase): """Test cases for path traversal attack protection in extractall function.""" - + def test_valid_zip_extraction(self): """Test that valid zip files extract successfully without raising exceptions.""" with tempfile.TemporaryDirectory() as tmp_dir: @@ -79,29 +78,29 @@ def test_valid_zip_extraction(self): zip_path = Path(tmp_dir) / "valid_test.zip" extract_dir = Path(tmp_dir) / "extract" extract_dir.mkdir() - + # Create zip with normal file structure with zipfile.ZipFile(zip_path, 'w') as zf: zf.writestr("normal_file.txt", "This is a normal file") zf.writestr("subdir/nested_file.txt", "This is a nested file") zf.writestr("another_file.json", '{"key": "value"}') - + # This should not raise any exception try: extractall(str(zip_path), str(extract_dir)) - + # Verify files were extracted correctly self.assertTrue((extract_dir / "normal_file.txt").exists()) self.assertTrue((extract_dir / "subdir" / "nested_file.txt").exists()) self.assertTrue((extract_dir / "another_file.json").exists()) - + # Verify content - with open(extract_dir / "normal_file.txt", 'r') as f: + with open(extract_dir / "normal_file.txt") as f: self.assertEqual(f.read(), "This is a normal file") - + except Exception as e: self.fail(f"Valid zip extraction should not raise exception: {e}") - + def test_malicious_zip_path_traversal(self): """Test that malicious zip files with path traversal attempts raise ValueError.""" with tempfile.TemporaryDirectory() as tmp_dir: @@ -109,19 +108,19 @@ def test_malicious_zip_path_traversal(self): zip_path = Path(tmp_dir) / "malicious_test.zip" extract_dir = Path(tmp_dir) / "extract" extract_dir.mkdir() - + # Create zip with malicious paths with zipfile.ZipFile(zip_path, 'w') as zf: # Try to write outside extraction directory zf.writestr("../../../etc/malicious.txt", "malicious content") zf.writestr("normal_file.txt", "normal content") - + # This should raise ValueError due to path traversal detection with self.assertRaises(ValueError) as context: extractall(str(zip_path), str(extract_dir)) - + self.assertIn("unsafe path", str(context.exception).lower()) - + def test_valid_tar_extraction(self): """Test that valid tar files extract successfully without raising exceptions.""" with tempfile.TemporaryDirectory() as tmp_dir: @@ -129,33 +128,33 @@ def test_valid_tar_extraction(self): tar_path = Path(tmp_dir) / "valid_test.tar.gz" extract_dir = Path(tmp_dir) / "extract" extract_dir.mkdir() - + # Create tar with normal file structure with tarfile.open(tar_path, 'w:gz') as tf: # Create temporary files to add to tar temp_file1 = Path(tmp_dir) / "temp1.txt" temp_file1.write_text("This is a normal file") tf.add(temp_file1, arcname="normal_file.txt") - + temp_file2 = Path(tmp_dir) / "temp2.txt" temp_file2.write_text("This is a nested file") tf.add(temp_file2, arcname="subdir/nested_file.txt") - + # This should not raise any exception try: extractall(str(tar_path), str(extract_dir)) - + # Verify files were extracted correctly self.assertTrue((extract_dir / "normal_file.txt").exists()) self.assertTrue((extract_dir / "subdir" / "nested_file.txt").exists()) - + # Verify content - with open(extract_dir / "normal_file.txt", 'r') as f: + with open(extract_dir / "normal_file.txt") as f: self.assertEqual(f.read(), "This is a normal file") - + except Exception as e: self.fail(f"Valid tar extraction should not raise exception: {e}") - + def test_malicious_tar_path_traversal(self): """Test that malicious tar files with path traversal attempts raise ValueError.""" with tempfile.TemporaryDirectory() as tmp_dir: @@ -163,22 +162,22 @@ def test_malicious_tar_path_traversal(self): tar_path = Path(tmp_dir) / "malicious_test.tar.gz" extract_dir = Path(tmp_dir) / "extract" extract_dir.mkdir() - + # Create tar with malicious paths with tarfile.open(tar_path, 'w:gz') as tf: # Create a temporary file temp_file = Path(tmp_dir) / "temp.txt" temp_file.write_text("malicious content") - + # Add with malicious path (trying to write outside extraction directory) tf.add(temp_file, arcname="../../../etc/malicious.txt") - + # This should raise ValueError due to path traversal detection with self.assertRaises(ValueError) as context: extractall(str(tar_path), str(extract_dir)) - + self.assertIn("unsafe path", str(context.exception).lower()) - + def test_absolute_path_protection(self): """Test protection against absolute paths in archives.""" with tempfile.TemporaryDirectory() as tmp_dir: @@ -186,66 +185,66 @@ def test_absolute_path_protection(self): zip_path = Path(tmp_dir) / "absolute_path_test.zip" extract_dir = Path(tmp_dir) / "extract" extract_dir.mkdir() - + with zipfile.ZipFile(zip_path, 'w') as zf: # Try to use absolute path zf.writestr("/etc/passwd", "malicious content") - + # This should raise ValueError due to absolute path detection with self.assertRaises(ValueError) as context: extractall(str(zip_path), str(extract_dir)) - + self.assertIn("unsafe path", str(context.exception).lower()) - def test_malicious_symlink_protection(self): - """Test protection against malicious symlinks in tar archives.""" - with tempfile.TemporaryDirectory() as tmp_dir: - # Create malicious tar file with symlink - tar_path = Path(tmp_dir) / "malicious_symlink_test.tar.gz" - extract_dir = Path(tmp_dir) / "extract" - extract_dir.mkdir() - - # Create tar with malicious symlink - with tarfile.open(tar_path, 'w:gz') as tf: - temp_file = Path(tmp_dir) / "normal.txt" - temp_file.write_text("normal content") - tf.add(temp_file, arcname="normal.txt") - - symlink_info = tarfile.TarInfo(name="malicious_symlink.txt") - symlink_info.type = tarfile.SYMTYPE - symlink_info.linkname = "../../../etc/passwd" - symlink_info.size = 0 - tf.addfile(symlink_info) - - with self.assertRaises(ValueError) as context: - extractall(str(tar_path), str(extract_dir)) - + def test_malicious_symlink_protection(self): + """Test protection against malicious symlinks in tar archives.""" + with tempfile.TemporaryDirectory() as tmp_dir: + # Create malicious tar file with symlink + tar_path = Path(tmp_dir) / "malicious_symlink_test.tar.gz" + extract_dir = Path(tmp_dir) / "extract" + extract_dir.mkdir() + + # Create tar with malicious symlink + with tarfile.open(tar_path, 'w:gz') as tf: + temp_file = Path(tmp_dir) / "normal.txt" + temp_file.write_text("normal content") + tf.add(temp_file, arcname="normal.txt") + + symlink_info = tarfile.TarInfo(name="malicious_symlink.txt") + symlink_info.type = tarfile.SYMTYPE + symlink_info.linkname = "../../../etc/passwd" + symlink_info.size = 0 + tf.addfile(symlink_info) + + with self.assertRaises(ValueError) as context: + extractall(str(tar_path), str(extract_dir)) + error_msg = str(context.exception).lower() self.assertTrue("unsafe path" in error_msg or "symlink" in error_msg) - - def test_malicious_hardlink_protection(self): - """Test protection against malicious hard links in tar archives.""" - with tempfile.TemporaryDirectory() as tmp_dir: - # Create malicious tar file with hard link - tar_path = Path(tmp_dir) / "malicious_hardlink_test.tar.gz" - extract_dir = Path(tmp_dir) / "extract" - extract_dir.mkdir() - - # Create tar with malicious hard link - with tarfile.open(tar_path, 'w:gz') as tf: - temp_file = Path(tmp_dir) / "normal.txt" - temp_file.write_text("normal content") - tf.add(temp_file, arcname="normal.txt") - - hardlink_info = tarfile.TarInfo(name="malicious_hardlink.txt") - hardlink_info.type = tarfile.LNKTYPE - hardlink_info.linkname = "/etc/passwd" - hardlink_info.size = 0 - tf.addfile(hardlink_info) - - with self.assertRaises(ValueError) as context: - extractall(str(tar_path), str(extract_dir)) - + + def test_malicious_hardlink_protection(self): + """Test protection against malicious hard links in tar archives.""" + with tempfile.TemporaryDirectory() as tmp_dir: + # Create malicious tar file with hard link + tar_path = Path(tmp_dir) / "malicious_hardlink_test.tar.gz" + extract_dir = Path(tmp_dir) / "extract" + extract_dir.mkdir() + + # Create tar with malicious hard link + with tarfile.open(tar_path, 'w:gz') as tf: + temp_file = Path(tmp_dir) / "normal.txt" + temp_file.write_text("normal content") + tf.add(temp_file, arcname="normal.txt") + + hardlink_info = tarfile.TarInfo(name="malicious_hardlink.txt") + hardlink_info.type = tarfile.LNKTYPE + hardlink_info.linkname = "/etc/passwd" + hardlink_info.size = 0 + tf.addfile(hardlink_info) + + with self.assertRaises(ValueError) as context: + extractall(str(tar_path), str(extract_dir)) + error_msg = str(context.exception).lower() self.assertTrue("unsafe path" in error_msg or "hardlink" in error_msg) From d07a2692a6bab5f3779bbd75a81f539519663b19 Mon Sep 17 00:00:00 2001 From: h3rrr <81402797+h3rrr@users.noreply.github.com> Date: Fri, 12 Sep 2025 23:13:42 +0800 Subject: [PATCH 13/21] Update monai/apps/utils.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Signed-off-by: h3rrr <81402797+h3rrr@users.noreply.github.com> --- monai/apps/utils.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/monai/apps/utils.py b/monai/apps/utils.py index 7d6f492412..b6a63bc204 100644 --- a/monai/apps/utils.py +++ b/monai/apps/utils.py @@ -131,10 +131,9 @@ def safe_extract_member(member, extract_to): member_path = str(member) if hasattr(member, 'issym') and member.issym(): - raise ValueError(f"Symbolic link detected in archive: {member_path}") + raise ValueError(f"Unsafe path: symlink {member_path}") if hasattr(member, 'islnk') and member.islnk(): - raise ValueError(f"Hard link detected in archive: {member_path}") - + raise ValueError(f"Unsafe path: hardlink {member_path}") member_path = os.path.normpath(member_path) if os.path.isabs(member_path) or '..' in member_path.split(os.sep): From a460e1f5f425c86ab74e1a1bfb15d7eb060c6a38 Mon Sep 17 00:00:00 2001 From: h3rrr <81402797+h3rrr@users.noreply.github.com> Date: Sat, 13 Sep 2025 12:24:35 +0800 Subject: [PATCH 14/21] Update utils.py Signed-off-by: h3rrr <81402797+h3rrr@users.noreply.github.com> --- monai/apps/utils.py | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/monai/apps/utils.py b/monai/apps/utils.py index b6a63bc204..e665332d7c 100644 --- a/monai/apps/utils.py +++ b/monai/apps/utils.py @@ -11,12 +11,12 @@ from __future__ import annotations -import os -import shutil import hashlib import json import logging +import os import re +import shutil import sys import tarfile import tempfile @@ -80,6 +80,7 @@ def get_logger( logger = get_logger("monai.apps") __all__.append("logger") + def _basename(p: PathLike) -> str: """get the last part of the path (removing the trailing slash if it exists)""" sep = os.path.sep + (os.path.altsep or "") + "/ " @@ -120,23 +121,25 @@ def update_to(self, b: int = 1, bsize: int = 1, tsize: int | None = None) -> Non logger.error(f"Download failed from {url} to {filepath}.") raise e + def safe_extract_member(member, extract_to): """Securely verify compressed package member paths to prevent path traversal attacks""" # Get member path (handle different compression formats) - if hasattr(member, 'filename'): + if hasattr(member, "filename"): member_path = member.filename # zipfile - elif hasattr(member, 'name'): + elif hasattr(member, "name"): member_path = member.name # tarfile else: member_path = str(member) - if hasattr(member, 'issym') and member.issym(): - raise ValueError(f"Unsafe path: symlink {member_path}") - if hasattr(member, 'islnk') and member.islnk(): - raise ValueError(f"Unsafe path: hardlink {member_path}") + if hasattr(member, "issym") and member.issym(): + raise ValueError(f"Symbolic link detected in archive: {member_path}") + if hasattr(member, "islnk") and member.islnk(): + raise ValueError(f"Hard link detected in archive: {member_path}") + member_path = os.path.normpath(member_path) - if os.path.isabs(member_path) or '..' in member_path.split(os.sep): + if os.path.isabs(member_path) or ".." in member_path.split(os.sep): raise ValueError(f"Unsafe path detected in archive: {member_path}") full_path = os.path.join(extract_to, member_path) @@ -150,6 +153,7 @@ def safe_extract_member(member, extract_to): return full_path + def check_hash(filepath: PathLike, val: str | None = None, hash_type: str = "md5") -> bool: """ Verify hash signature of specified file. @@ -315,7 +319,7 @@ def extractall( logger.info(f"Writing into directory: {output_dir}.") _file_type = file_type.lower().strip() if filepath.name.endswith("zip") or _file_type == "zip": - with zipfile.ZipFile(filepath, 'r') as zip_file: + with zipfile.ZipFile(filepath, "r") as zip_file: for member in zip_file.infolist(): safe_path = safe_extract_member(member, output_dir) if member.is_dir(): @@ -323,11 +327,11 @@ def extractall( os.makedirs(os.path.dirname(safe_path), exist_ok=True) with zip_file.open(member) as source: - with open(safe_path, 'wb') as target: + with open(safe_path, "wb") as target: shutil.copyfileobj(source, target) return if filepath.name.endswith("tar") or filepath.name.endswith("tar.gz") or "tar" in _file_type: - with tarfile.open(filepath, 'r') as tar_file: + with tarfile.open(filepath, "r") as tar_file: for member in tar_file.getmembers(): safe_path = safe_extract_member(member, output_dir) if not member.isfile(): @@ -337,7 +341,7 @@ def extractall( source = tar_file.extractfile(member) if source is not None: with source: - with open(safe_path, 'wb') as target: + with open(safe_path, "wb") as target: shutil.copyfileobj(source, target) return raise NotImplementedError( From 74b603a1b3f3b3a36c9f0a7f3090eb914af5a913 Mon Sep 17 00:00:00 2001 From: h3rrr <81402797+h3rrr@users.noreply.github.com> Date: Sat, 13 Sep 2025 12:25:46 +0800 Subject: [PATCH 15/21] Update test_download_and_extract.py Signed-off-by: h3rrr <81402797+h3rrr@users.noreply.github.com> --- tests/apps/test_download_and_extract.py | 28 ++++++++++++------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/apps/test_download_and_extract.py b/tests/apps/test_download_and_extract.py index 9f31239952..02526fd9a4 100644 --- a/tests/apps/test_download_and_extract.py +++ b/tests/apps/test_download_and_extract.py @@ -11,10 +11,11 @@ from __future__ import annotations +import os +import tarfile import tempfile import unittest import zipfile -import tarfile from pathlib import Path from urllib.error import ContentTooShortError, HTTPError @@ -80,7 +81,7 @@ def test_valid_zip_extraction(self): extract_dir.mkdir() # Create zip with normal file structure - with zipfile.ZipFile(zip_path, 'w') as zf: + with zipfile.ZipFile(zip_path, "w") as zf: zf.writestr("normal_file.txt", "This is a normal file") zf.writestr("subdir/nested_file.txt", "This is a nested file") zf.writestr("another_file.json", '{"key": "value"}') @@ -95,7 +96,7 @@ def test_valid_zip_extraction(self): self.assertTrue((extract_dir / "another_file.json").exists()) # Verify content - with open(extract_dir / "normal_file.txt") as f: + with open(extract_dir / "normal_file.txt", "r") as f: self.assertEqual(f.read(), "This is a normal file") except Exception as e: @@ -110,7 +111,7 @@ def test_malicious_zip_path_traversal(self): extract_dir.mkdir() # Create zip with malicious paths - with zipfile.ZipFile(zip_path, 'w') as zf: + with zipfile.ZipFile(zip_path, "w") as zf: # Try to write outside extraction directory zf.writestr("../../../etc/malicious.txt", "malicious content") zf.writestr("normal_file.txt", "normal content") @@ -130,7 +131,7 @@ def test_valid_tar_extraction(self): extract_dir.mkdir() # Create tar with normal file structure - with tarfile.open(tar_path, 'w:gz') as tf: + with tarfile.open(tar_path, "w:gz") as tf: # Create temporary files to add to tar temp_file1 = Path(tmp_dir) / "temp1.txt" temp_file1.write_text("This is a normal file") @@ -149,7 +150,7 @@ def test_valid_tar_extraction(self): self.assertTrue((extract_dir / "subdir" / "nested_file.txt").exists()) # Verify content - with open(extract_dir / "normal_file.txt") as f: + with open(extract_dir / "normal_file.txt", "r") as f: self.assertEqual(f.read(), "This is a normal file") except Exception as e: @@ -164,7 +165,7 @@ def test_malicious_tar_path_traversal(self): extract_dir.mkdir() # Create tar with malicious paths - with tarfile.open(tar_path, 'w:gz') as tf: + with tarfile.open(tar_path, "w:gz") as tf: # Create a temporary file temp_file = Path(tmp_dir) / "temp.txt" temp_file.write_text("malicious content") @@ -186,9 +187,9 @@ def test_absolute_path_protection(self): extract_dir = Path(tmp_dir) / "extract" extract_dir.mkdir() - with zipfile.ZipFile(zip_path, 'w') as zf: + with zipfile.ZipFile(zip_path, "w") as zf: # Try to use absolute path - zf.writestr("/etc/passwd", "malicious content") + zf.writestr("/etc/passwd_bad", "malicious content") # This should raise ValueError due to absolute path detection with self.assertRaises(ValueError) as context: @@ -205,14 +206,14 @@ def test_malicious_symlink_protection(self): extract_dir.mkdir() # Create tar with malicious symlink - with tarfile.open(tar_path, 'w:gz') as tf: + with tarfile.open(tar_path, "w:gz") as tf: temp_file = Path(tmp_dir) / "normal.txt" temp_file.write_text("normal content") tf.add(temp_file, arcname="normal.txt") symlink_info = tarfile.TarInfo(name="malicious_symlink.txt") symlink_info.type = tarfile.SYMTYPE - symlink_info.linkname = "../../../etc/passwd" + symlink_info.linkname = "../../../etc/passwd_bad" symlink_info.size = 0 tf.addfile(symlink_info) @@ -231,14 +232,14 @@ def test_malicious_hardlink_protection(self): extract_dir.mkdir() # Create tar with malicious hard link - with tarfile.open(tar_path, 'w:gz') as tf: + with tarfile.open(tar_path, "w:gz") as tf: temp_file = Path(tmp_dir) / "normal.txt" temp_file.write_text("normal content") tf.add(temp_file, arcname="normal.txt") hardlink_info = tarfile.TarInfo(name="malicious_hardlink.txt") hardlink_info.type = tarfile.LNKTYPE - hardlink_info.linkname = "/etc/passwd" + hardlink_info.linkname = "/etc/passwd_bad" hardlink_info.size = 0 tf.addfile(hardlink_info) @@ -249,6 +250,5 @@ def test_malicious_hardlink_protection(self): self.assertTrue("unsafe path" in error_msg or "hardlink" in error_msg) - if __name__ == "__main__": unittest.main() From e204524d05a12027f4fd4fbf85ed1a455b02c051 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 13 Sep 2025 04:26:15 +0000 Subject: [PATCH 16/21] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/apps/test_download_and_extract.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/apps/test_download_and_extract.py b/tests/apps/test_download_and_extract.py index 02526fd9a4..6d16a72735 100644 --- a/tests/apps/test_download_and_extract.py +++ b/tests/apps/test_download_and_extract.py @@ -11,7 +11,6 @@ from __future__ import annotations -import os import tarfile import tempfile import unittest @@ -96,7 +95,7 @@ def test_valid_zip_extraction(self): self.assertTrue((extract_dir / "another_file.json").exists()) # Verify content - with open(extract_dir / "normal_file.txt", "r") as f: + with open(extract_dir / "normal_file.txt") as f: self.assertEqual(f.read(), "This is a normal file") except Exception as e: @@ -150,7 +149,7 @@ def test_valid_tar_extraction(self): self.assertTrue((extract_dir / "subdir" / "nested_file.txt").exists()) # Verify content - with open(extract_dir / "normal_file.txt", "r") as f: + with open(extract_dir / "normal_file.txt") as f: self.assertEqual(f.read(), "This is a normal file") except Exception as e: From 4431852ff3bf278d40467a82573709be18387821 Mon Sep 17 00:00:00 2001 From: h3rrr <81402797+h3rrr@users.noreply.github.com> Date: Sat, 13 Sep 2025 12:37:41 +0800 Subject: [PATCH 17/21] Update monai/apps/utils.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Signed-off-by: h3rrr <81402797+h3rrr@users.noreply.github.com> --- monai/apps/utils.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/monai/apps/utils.py b/monai/apps/utils.py index e665332d7c..cd779ef18c 100644 --- a/monai/apps/utils.py +++ b/monai/apps/utils.py @@ -145,15 +145,14 @@ def safe_extract_member(member, extract_to): full_path = os.path.join(extract_to, member_path) full_path = os.path.normpath(full_path) - extract_to_abs = os.path.abspath(extract_to) - full_path_abs = os.path.abspath(full_path) - - if not (full_path_abs == extract_to_abs or full_path_abs.startswith(extract_to_abs + os.sep)): - raise ValueError(f"Path traversal attack detected: {member_path}") + extract_root = os.path.realpath(extract_to) + target_real = os.path.realpath(full_path) + # Ensure the resolved path stays within the extraction root + if os.path.commonpath([extract_root, target_real]) != extract_root: + raise ValueError(f"Unsafe path: path traversal {member_path}") # noqa: TRY003 return full_path - def check_hash(filepath: PathLike, val: str | None = None, hash_type: str = "md5") -> bool: """ Verify hash signature of specified file. From e4326096df4ed1167c2ba947bc7ee0ff5364849c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 13 Sep 2025 04:38:04 +0000 Subject: [PATCH 18/21] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- monai/apps/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/monai/apps/utils.py b/monai/apps/utils.py index cd779ef18c..bdba84139c 100644 --- a/monai/apps/utils.py +++ b/monai/apps/utils.py @@ -149,7 +149,7 @@ def safe_extract_member(member, extract_to): target_real = os.path.realpath(full_path) # Ensure the resolved path stays within the extraction root if os.path.commonpath([extract_root, target_real]) != extract_root: - raise ValueError(f"Unsafe path: path traversal {member_path}") # noqa: TRY003 + raise ValueError(f"Unsafe path: path traversal {member_path}") return full_path From 6b7ebe6a9b79a871d045ccb0307abaf57460fa50 Mon Sep 17 00:00:00 2001 From: h3rrr <81402797+h3rrr@users.noreply.github.com> Date: Sat, 13 Sep 2025 13:30:59 +0800 Subject: [PATCH 19/21] Update utils.py Signed-off-by: h3rrr <81402797+h3rrr@users.noreply.github.com> From fcfa5613d4b5dde0239a839aecaab777368a0805 Mon Sep 17 00:00:00 2001 From: h3rrr <81402797+h3rrr@users.noreply.github.com> Date: Sat, 13 Sep 2025 13:41:14 +0800 Subject: [PATCH 20/21] Update utils.py Signed-off-by: h3rrr <81402797+h3rrr@users.noreply.github.com> --- monai/apps/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/monai/apps/utils.py b/monai/apps/utils.py index bdba84139c..71330e0f11 100644 --- a/monai/apps/utils.py +++ b/monai/apps/utils.py @@ -153,6 +153,7 @@ def safe_extract_member(member, extract_to): return full_path + def check_hash(filepath: PathLike, val: str | None = None, hash_type: str = "md5") -> bool: """ Verify hash signature of specified file. From 98e5a0e94174a0eae90e0b8585ba27c58b0d9095 Mon Sep 17 00:00:00 2001 From: h3rrr <81402797+h3rrr@users.noreply.github.com> Date: Sat, 13 Sep 2025 14:01:35 +0800 Subject: [PATCH 21/21] Update utils.py Signed-off-by: h3rrr <81402797+h3rrr@users.noreply.github.com> --- monai/apps/utils.py | 50 +++++++++++++++++++++++++-------------------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/monai/apps/utils.py b/monai/apps/utils.py index 71330e0f11..856bc64c9e 100644 --- a/monai/apps/utils.py +++ b/monai/apps/utils.py @@ -274,6 +274,32 @@ def download_url( ) +def _extract_zip(filepath, output_dir): + with zipfile.ZipFile(filepath, "r") as zip_file: + for member in zip_file.infolist(): + safe_path = safe_extract_member(member, output_dir) + if member.is_dir(): + continue + os.makedirs(os.path.dirname(safe_path), exist_ok=True) + with zip_file.open(member) as source: + with open(safe_path, "wb") as target: + shutil.copyfileobj(source, target) + + +def _extract_tar(filepath, output_dir): + with tarfile.open(filepath, "r") as tar_file: + for member in tar_file.getmembers(): + safe_path = safe_extract_member(member, output_dir) + if not member.isfile(): + continue + os.makedirs(os.path.dirname(safe_path), exist_ok=True) + source = tar_file.extractfile(member) + if source is not None: + with source: + with open(safe_path, "wb") as target: + shutil.copyfileobj(source, target) + + def extractall( filepath: PathLike, output_dir: PathLike = ".", @@ -319,30 +345,10 @@ def extractall( logger.info(f"Writing into directory: {output_dir}.") _file_type = file_type.lower().strip() if filepath.name.endswith("zip") or _file_type == "zip": - with zipfile.ZipFile(filepath, "r") as zip_file: - for member in zip_file.infolist(): - safe_path = safe_extract_member(member, output_dir) - if member.is_dir(): - continue - - os.makedirs(os.path.dirname(safe_path), exist_ok=True) - with zip_file.open(member) as source: - with open(safe_path, "wb") as target: - shutil.copyfileobj(source, target) + _extract_zip(filepath, output_dir) return if filepath.name.endswith("tar") or filepath.name.endswith("tar.gz") or "tar" in _file_type: - with tarfile.open(filepath, "r") as tar_file: - for member in tar_file.getmembers(): - safe_path = safe_extract_member(member, output_dir) - if not member.isfile(): - continue - - os.makedirs(os.path.dirname(safe_path), exist_ok=True) - source = tar_file.extractfile(member) - if source is not None: - with source: - with open(safe_path, "wb") as target: - shutil.copyfileobj(source, target) + _extract_tar(filepath, output_dir) return raise NotImplementedError( f'Unsupported file type, available options are: ["zip", "tar.gz", "tar"]. name={filepath} type={file_type}.'