Skip to content

Commit 0fe0930

Browse files
authored
Merge pull request #1 from ezhiltech/feature_master_MVA8
Pull request from 'feature_master_MVA8' to 'master'
2 parents c767741 + 98f6f5d commit 0fe0930

File tree

3 files changed

+83
-11
lines changed

3 files changed

+83
-11
lines changed

libarchive/test_zip.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import pytest
2+
import os
3+
import tempfile
4+
from zip import sanitize_filename, ZipFile # Import from zip.py
5+
6+
def test_sanitize_filename_safe():
7+
assert sanitize_filename("test.txt") == "test.txt"
8+
9+
def test_sanitize_filename_traversal():
10+
with pytest.raises(ValueError, match="Potential directory traversal attempt detected"):
11+
sanitize_filename("../etc/passwd")
12+
13+
def test_sanitize_filename_absolute_path():
14+
with pytest.raises(ValueError, match="Potential directory traversal attempt detected"):
15+
sanitize_filename("/etc/passwd")
16+
17+
def create_test_zip(zip_path, filenames):
18+
"""Helper function to create a test ZIP file with given filenames."""
19+
import zipfile
20+
with zipfile.ZipFile(zip_path, 'w') as zf:
21+
for filename in filenames:
22+
zf.writestr(filename, "Test content")
23+
24+
def test_extract_safe():
25+
with tempfile.TemporaryDirectory() as temp_dir:
26+
zip_path = os.path.join(temp_dir, "test.zip")
27+
create_test_zip(zip_path, ["file1.txt", "subdir/file2.txt"])
28+
29+
with ZipFile(zip_path, 'r') as zip_ref:
30+
zip_ref.extract("file1.txt", temp_dir)
31+
32+
assert os.path.exists(os.path.join(temp_dir, "file1.txt"))
33+
34+
def test_extract_traversal_attack():
35+
with tempfile.TemporaryDirectory() as temp_dir:
36+
zip_path = os.path.join(temp_dir, "test.zip")
37+
create_test_zip(zip_path, ["../evil.txt"])
38+
39+
with ZipFile(zip_path, 'r') as zip_ref:
40+
with pytest.raises(ValueError, match="Potential directory traversal attempt detected"):
41+
zip_ref.extract("../evil.txt", temp_dir)
42+
43+
def test_extractall_safe():
44+
with tempfile.TemporaryDirectory() as temp_dir:
45+
zip_path = os.path.join(temp_dir, "test.zip")
46+
create_test_zip(zip_path, ["file1.txt", "subdir/file2.txt"])
47+
48+
with ZipFile(zip_path, 'r') as zip_ref:
49+
zip_ref.extractall(temp_dir)
50+
51+
assert os.path.exists(os.path.join(temp_dir, "file1.txt"))
52+
assert os.path.exists(os.path.join(temp_dir, "subdir", "file2.txt"))
53+
54+
def test_extractall_with_traversal_attack():
55+
with tempfile.TemporaryDirectory() as temp_dir:
56+
zip_path = os.path.join(temp_dir, "test.zip")
57+
create_test_zip(zip_path, ["file1.txt", "../evil.txt"])
58+
59+
with ZipFile(zip_path, 'r') as zip_ref:
60+
with pytest.raises(ValueError, match="Potential directory traversal attempt detected"):
61+
zip_ref.extractall(temp_dir)

libarchive/zip.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import os, time
1+
import os
2+
import time
23
from libarchive import is_archive, Entry, SeekableArchive, _libarchive
34
from zipfile import ZIP_STORED, ZIP_DEFLATED
45

@@ -7,6 +8,12 @@ def is_zipfile(filename):
78
return is_archive(filename, formats=('zip',))
89

910

11+
def sanitize_filename(filename, base_path=os.getcwd()):
12+
abs_path = os.path.abspath(os.path.join(base_path, filename))
13+
if not abs_path.startswith(os.path.abspath(base_path) + os.sep):
14+
raise ValueError("Invalid filename: Potential directory traversal attempt detected.")
15+
return os.path.basename(abs_path) # Ensures only filename is extracted
16+
1017
class ZipEntry(Entry):
1118
def __init__(self, *args, **kwargs):
1219
super(ZipEntry, self).__init__(*args, **kwargs)
@@ -60,30 +67,26 @@ def _set_missing(self, value):
6067
CRC = property(_get_missing, _set_missing)
6168
compress_size = property(_get_missing, _set_missing)
6269

63-
# encryption is one of (traditional = zipcrypt, aes128, aes256)
70+
6471
class ZipFile(SeekableArchive):
6572
def __init__(self, f, mode='r', compression=ZIP_DEFLATED, allowZip64=False, password=None,
66-
encryption=None):
73+
encryption=None):
6774
self.compression = compression
6875
self.encryption = encryption
6976
super(ZipFile, self).__init__(
7077
f, mode=mode, format='zip', entry_class=ZipEntry, encoding='CP437', password=password
7178
)
72-
7379

7480
getinfo = SeekableArchive.getentry
7581

7682
def set_initial_options(self):
7783
if self.mode == 'w' and self.compression == ZIP_STORED:
78-
# Disable compression for writing.
7984
_libarchive.archive_write_set_format_option(self._a, "zip", "compression", "store")
80-
85+
8186
if self.mode == 'w' and self.password:
8287
if not self.encryption:
8388
self.encryption = "traditional"
8489
_libarchive.archive_write_set_format_option(self._a, "zip", "encryption", self.encryption)
85-
86-
8790

8891
def namelist(self):
8992
return list(self.iterpaths())
@@ -104,7 +107,8 @@ def extract(self, name, path=None, pwd=None):
104107
self.add_passphrase(pwd)
105108
if not path:
106109
path = os.getcwd()
107-
return self.readpath(name, os.path.join(path, name))
110+
sanitized_name = sanitize_filename(name)
111+
return self.readpath(sanitized_name, os.path.join(path, sanitized_name))
108112

109113
def extractall(self, path, names=None, pwd=None):
110114
if pwd:
@@ -113,7 +117,8 @@ def extractall(self, path, names=None, pwd=None):
113117
names = self.namelist()
114118
if names:
115119
for name in names:
116-
self.extract(name, path)
120+
sanitized_name = sanitize_filename(name, path)
121+
self.extract(sanitized_name, path)
117122

118123
def read(self, name, pwd=None):
119124
if pwd:

tests.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,12 @@
2626
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
2727
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2828

29-
import os, unittest, tempfile, random, string, sys
29+
import os
30+
import unittest
31+
import tempfile
32+
import random
33+
import string
34+
import sys
3035
import zipfile
3136
import io
3237

@@ -309,6 +314,7 @@ def test_read_with_wrong_password(self):
309314
self.assertRaises(RuntimeError, z.read, ITEM_NAME)
310315
z.close()
311316

317+
312318
class TestProtectedWriting(unittest.TestCase):
313319
def setUp(self):
314320
create_protected_zip()

0 commit comments

Comments
 (0)