From ad9a17176bf625af70016ee2039e6b73289c3954 Mon Sep 17 00:00:00 2001 From: Bubba Date: Fri, 26 Jul 2019 11:11:19 -0400 Subject: [PATCH] fix: Respect zipped symlinks (#1140) --- samcli/local/lambdafn/zip.py | 71 +++++++++++- tests/unit/local/lambdafn/test_zip.py | 154 ++++++++++++++++++++++---- 2 files changed, 197 insertions(+), 28 deletions(-) diff --git a/samcli/local/lambdafn/zip.py b/samcli/local/lambdafn/zip.py index cf4f07a8d0..7af870d7b6 100644 --- a/samcli/local/lambdafn/zip.py +++ b/samcli/local/lambdafn/zip.py @@ -19,6 +19,70 @@ LOG = logging.getLogger(__name__) +S_IFLNK = 0xA + + +def _is_symlink(file_info): + """ + Check the upper 4 bits of the external attribute for a symlink. + See: https://unix.stackexchange.com/questions/14705/the-zip-formats-external-file-attribute + + Parameters + ---------- + file_info : zipfile.ZipInfo + The ZipInfo for a ZipFile + + Returns + ------- + bool + A response regarding whether the ZipInfo defines a symlink or not. + """ + + return (file_info.external_attr >> 28) == 0xA + + +def _extract(file_info, output_dir, zip_ref): + """ + Unzip the given file into the given directory while preserving file permissions in the process. + + Parameters + ---------- + file_info : zipfile.ZipInfo + The ZipInfo for a ZipFile + + output_dir : str + Path to the directory where the it should be unzipped to + + zip_ref : zipfile.ZipFile + The ZipFile we are working with. + + Returns + ------- + string + Returns the target path the Zip Entry was extracted to. + """ + + # Handle any regular file/directory entries + if not _is_symlink(file_info): + return zip_ref.extract(file_info, output_dir) + + source = zip_ref.read(file_info.filename).decode('utf8') + link_name = os.path.normpath(os.path.join(output_dir, file_info.filename)) + + # make leading dirs if needed + leading_dirs = os.path.dirname(link_name) + if not os.path.exists(leading_dirs): + os.makedirs(leading_dirs) + + # If the link already exists, delete it or symlink() fails + if os.path.lexists(link_name): + os.remove(link_name) + + # Create a symbolic link pointing to source named link_name. + os.symlink(source, link_name) + + return link_name + def unzip(zip_file_path, output_dir, permission=None): """ @@ -40,10 +104,7 @@ def unzip(zip_file_path, output_dir, permission=None): # For each item in the zip file, extract the file and set permissions if available for file_info in zip_ref.infolist(): - name = file_info.filename - extracted_path = os.path.join(output_dir, name) - - zip_ref.extract(name, output_dir) + extracted_path = _extract(file_info, output_dir, zip_ref) _set_permissions(file_info, extracted_path) _override_permissions(extracted_path, permission) @@ -81,7 +142,7 @@ def _set_permissions(zip_file_info, extracted_path): """ # Permission information is stored in first two bytes. - permission = zip_file_info.external_attr >> 16 + permission = (zip_file_info.external_attr >> 16) & 511 if not permission: # Zips created on certain Windows machines, however, might not have any permission information on them. # Skip setting a permission on these files. diff --git a/tests/unit/local/lambdafn/test_zip.py b/tests/unit/local/lambdafn/test_zip.py index 2b8dfb355d..cd06c40625 100644 --- a/tests/unit/local/lambdafn/test_zip.py +++ b/tests/unit/local/lambdafn/test_zip.py @@ -14,51 +14,102 @@ class TestUnzipWithPermissions(TestCase): - files_with_permissions = { - "folder1/1.txt": 0o644, - "folder1/2.txt": 0o777, - "folder2/subdir/1.txt": 0o666, - "folder2/subdir/2.txt": 0o400 + """ + External Attribute Magic = type + permission + DOS is-dir flag? + + TTTTugsrwxrwxrwx0000000000ADVSHR + ^^^^____________________________ File Type [UPPER 4 bits, 29-32] + ^___________________________ setuid [bit 28] + ^__________________________ setgid [bit 27] + ^_________________________ sticky [bit 26] + ^^^^^^^^^________________ Permissions [bits 17-25] + ^^^^^^^^________ Other [bits 9-16] + ^^^^^^^^ DOS attribute bits: [LOWER 8 bits] + + Interesting File Types + S_IFDIR 0040000 /* directory */ + S_IFREG 0100000 /* regular */ + S_IFLNK 0120000 /* symbolic link */ + + See: https://unix.stackexchange.com/questions/14705/%20the-zip-formats-external-file-attribute + """ + + files_with_external_attr = { + "1.txt": { + "file_type": 0o10, + "contents": b'foo', + "permissions": 0o644, + }, + "folder1/2.txt": { + "file_type": 0o10, + "contents": b'bar', + "permissions": 0o777, + }, + "folder2/subdir/3.txt": { + "file_type": 0o10, + "contents": b'foo bar', + "permissions": 0o666, + }, + "folder2/subdir/4.txt": { + "file_type": 0o10, + "contents": b'bar foo', + "permissions": 0o400, + }, + "symlinkToF2": { + "file_type": 0o12, + "contents": b'1.txt', + "permissions": 0o644, + } } + expected_files = 0 + expected_symlinks = 0 + actual_files = 0 + actual_symlinks = 0 + @parameterized.expand([param(True), param(False)]) - def test_must_unzip(self, check_permissions): + def test_must_unzip(self, verify_external_attributes): + self._reset(verify_external_attributes) - with self._create_zip(self.files_with_permissions, check_permissions) as zip_file_name: + with self._create_zip(self.files_with_external_attr, verify_external_attributes) as zip_file_name: with self._temp_dir() as extract_dir: - unzip(zip_file_name, extract_dir) for root, dirs, files in os.walk(extract_dir): for file in files: - filepath = os.path.join(extract_dir, root, file) - perm = oct(stat.S_IMODE(os.stat(filepath).st_mode)) - key = os.path.relpath(filepath, extract_dir) - expected_permission = oct(self.files_with_permissions[key]) + self._verify_file(extract_dir, file, root, verify_external_attributes) - self.assertIn(key, self.files_with_permissions) + self._verify_file_count(verify_external_attributes) - if check_permissions: - self.assertEquals(expected_permission, - perm, - "File {} has wrong permission {}".format(key, perm)) + @contextmanager + def _reset(self, verify_external_attributes): + self.expected_files = 0 + self.expected_symlinks = 0 + self.actual_files = 0 + self.actual_symlinks = 0 + if verify_external_attributes: + for filename, data in self.files_with_external_attr.items(): + if data["file_type"] == 0o12: + self.expected_symlinks += 1 + elif data["file_type"] == 0o10: + self.expected_files += 1 @contextmanager - def _create_zip(self, files_with_permissions, add_permissions=True): + def _create_zip(self, file_dict, add_attributes=True): zipfilename = None - data = b'hello world' try: zipfilename = NamedTemporaryFile(mode="w+b").name zf = zipfile.ZipFile(zipfilename, "w", zipfile.ZIP_DEFLATED) - for filename, perm in files_with_permissions.items(): + for filename, data in file_dict.items(): + fileinfo = zipfile.ZipInfo(filename) - if add_permissions: - fileinfo.external_attr = perm << 16 + if add_attributes: + fileinfo.external_attr = (data["file_type"] << 28) | (data["permissions"] << 16) - zf.writestr(fileinfo, data) + zf.writestr(fileinfo, data["contents"]) zf.close() @@ -68,6 +119,63 @@ def _create_zip(self, files_with_permissions, add_permissions=True): if zipfilename: os.remove(zipfilename) + @contextmanager + def _verify_file(self, extract_dir, file, root, verify_external_attributes): + filepath = os.path.join(extract_dir, root, file) + key = os.path.relpath(filepath, extract_dir) + mode = os.lstat(filepath).st_mode + actual_permissions = oct(stat.S_IMODE(mode)) + expected_permission = oct(self.files_with_external_attr[key]["permissions"]) + + self.assertIn(key, self.files_with_external_attr) + if verify_external_attributes: + self._verify_external_attributes( + actual_permissions, + expected_permission, + key, + mode) + + @contextmanager + def _verify_external_attributes(self, actual_permissions, expected_permission, key, + mode): + if stat.S_ISREG(mode): + self.assertTrue( + self.files_with_external_attr[key]["file_type"] == 0o10, + "Expected a regular file." + ) + self.actual_files += 1 + elif stat.S_ISLNK(mode): + self.assertTrue( + self.files_with_external_attr[key]["file_type"] == 0o12, + "Expected a Symlink." + ) + self.actual_symlinks += 1 + return + + self.assertEquals( + expected_permission, + actual_permissions, + "File {} has wrong permission {}, expected {}.".format( + key, + actual_permissions, + expected_permission + ) + ) + + @contextmanager + def _verify_file_count(self, verify_external_attributes): + if verify_external_attributes: + self.assertEqual( + self.expected_files, + self.actual_files, + "Expected {} files but found {}.".format(self.expected_files, self.actual_files) + ) + self.assertEqual( + self.expected_symlinks, + self.actual_symlinks, + "Expected {} symlinks but found {}.".format(self.expected_symlinks, self.actual_symlinks) + ) + @contextmanager def _temp_dir(self): name = None