diff --git a/test/test_remote.py b/test/test_remote.py index 3a47afab5..9636ca486 100644 --- a/test/test_remote.py +++ b/test/test_remote.py @@ -694,259 +694,279 @@ def test_push_error(self, repo): @with_rw_repo("HEAD") def test_set_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - remote.set_url(url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.set_url(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_set_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - remote.set_url(url, allow_unsafe_protocols=True) - assert list(remote.urls)[-1] == url - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + remote.set_url(url, allow_unsafe_protocols=True) + assert list(remote.urls)[-1] == url + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_add_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - remote.add_url(url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.add_url(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_add_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - remote.add_url(url, allow_unsafe_protocols=True) - assert list(remote.urls)[-1] == url - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + remote.add_url(url, allow_unsafe_protocols=True) + assert list(remote.urls)[-1] == url + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_create_remote_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - Remote.create(rw_repo, "origin", url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + Remote.create(rw_repo, "origin", url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_create_remote_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for i, url in enumerate(urls): - remote = Remote.create(rw_repo, f"origin{i}", url, allow_unsafe_protocols=True) - assert remote.url == url - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for i, url in enumerate(urls): + remote = Remote.create(rw_repo, f"origin{i}", url, allow_unsafe_protocols=True) + assert remote.url == url + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_fetch_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - remote.fetch(url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.fetch(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_fetch_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - remote.fetch(url, allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + remote.fetch(url, allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_fetch_unsafe_options(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - remote.fetch(**unsafe_option) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + remote.fetch(**unsafe_option) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_fetch_unsafe_options_allowed(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] - for unsafe_option in unsafe_options: - # The options will be allowed, but the command will fail. - assert not tmp_file.exists() - with self.assertRaises(GitCommandError): - remote.fetch(**unsafe_option, allow_unsafe_options=True) - assert tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + assert not tmp_file.exists() + with self.assertRaises(GitCommandError): + remote.fetch(**unsafe_option, allow_unsafe_options=True) + assert tmp_file.exists() + tmp_file.unlink() @with_rw_repo("HEAD") def test_pull_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - remote.pull(url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.pull(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_pull_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - remote.pull(url, allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + remote.pull(url, allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_pull_unsafe_options(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - remote.pull(**unsafe_option) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + remote.pull(**unsafe_option) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_pull_unsafe_options_allowed(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] - for unsafe_option in unsafe_options: - # The options will be allowed, but the command will fail. - assert not tmp_file.exists() - with self.assertRaises(GitCommandError): - remote.pull(**unsafe_option, allow_unsafe_options=True) - assert tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + assert not tmp_file.exists() + with self.assertRaises(GitCommandError): + remote.pull(**unsafe_option, allow_unsafe_options=True) + assert tmp_file.exists() + tmp_file.unlink() @with_rw_repo("HEAD") def test_push_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - remote.push(url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.push(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_push_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - remote.push(url, allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + remote.push(url, allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_push_unsafe_options(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - { - "receive-pack": f"touch {tmp_file}", - "exec": f"touch {tmp_file}", - } - ] - for unsafe_option in unsafe_options: - assert not tmp_file.exists() - with self.assertRaises(UnsafeOptionError): - remote.push(**unsafe_option) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + { + "receive-pack": f"touch {tmp_file}", + "exec": f"touch {tmp_file}", + } + ] + for unsafe_option in unsafe_options: + assert not tmp_file.exists() + with self.assertRaises(UnsafeOptionError): + remote.push(**unsafe_option) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_push_unsafe_options_allowed(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - { - "receive-pack": f"touch {tmp_file}", - "exec": f"touch {tmp_file}", - } - ] - for unsafe_option in unsafe_options: - # The options will be allowed, but the command will fail. - assert not tmp_file.exists() - with self.assertRaises(GitCommandError): - remote.push(**unsafe_option, allow_unsafe_options=True) - assert tmp_file.exists() - tmp_file.unlink() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + { + "receive-pack": f"touch {tmp_file}", + "exec": f"touch {tmp_file}", + } + ] + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + assert not tmp_file.exists() + with self.assertRaises(GitCommandError): + remote.push(**unsafe_option, allow_unsafe_options=True) + assert tmp_file.exists() + tmp_file.unlink() class TestTimeouts(TestBase): diff --git a/test/test_repo.py b/test/test_repo.py index 5874dbe6a..d5474353f 100644 --- a/test/test_repo.py +++ b/test/test_repo.py @@ -268,143 +268,176 @@ def test_leaking_password_in_clone_logs(self, rw_dir): @with_rw_repo("HEAD") def test_clone_unsafe_options(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - rw_repo.clone(tmp_dir, multi_options=[unsafe_option]) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + rw_repo.clone(tmp_dir, multi_options=[unsafe_option]) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_clone_unsafe_options_allowed(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - ] - for i, unsafe_option in enumerate(unsafe_options): - destination = tmp_dir / str(i) - assert not tmp_file.exists() - # The options will be allowed, but the command will fail. - with self.assertRaises(GitCommandError): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + ] + for i, unsafe_option in enumerate(unsafe_options): + destination = tmp_dir / str(i) + assert not tmp_file.exists() + # The options will be allowed, but the command will fail. + with self.assertRaises(GitCommandError): + rw_repo.clone(destination, multi_options=[unsafe_option], allow_unsafe_options=True) + assert tmp_file.exists() + tmp_file.unlink() + + unsafe_options = [ + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for i, unsafe_option in enumerate(unsafe_options): + destination = tmp_dir / str(i) + assert not destination.exists() rw_repo.clone(destination, multi_options=[unsafe_option], allow_unsafe_options=True) - assert tmp_file.exists() - tmp_file.unlink() - - unsafe_options = [ - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for i, unsafe_option in enumerate(unsafe_options): - destination = tmp_dir / str(i) - assert not destination.exists() - rw_repo.clone(destination, multi_options=[unsafe_option], allow_unsafe_options=True) - assert destination.exists() + assert destination.exists() @with_rw_repo("HEAD") def test_clone_safe_options(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - options = [ - "--depth=1", - "--single-branch", - "-q", - ] - for option in options: - destination = tmp_dir / option - assert not destination.exists() - rw_repo.clone(destination, multi_options=[option]) - assert destination.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + options = [ + "--depth=1", + "--single-branch", + "-q", + ] + for option in options: + destination = tmp_dir / option + assert not destination.exists() + rw_repo.clone(destination, multi_options=[option]) + assert destination.exists() @with_rw_repo("HEAD") def test_clone_from_unsafe_options(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - Repo.clone_from(rw_repo.working_dir, tmp_dir, multi_options=[unsafe_option]) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + Repo.clone_from(rw_repo.working_dir, tmp_dir, multi_options=[unsafe_option]) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_clone_from_unsafe_options_allowed(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - ] - for i, unsafe_option in enumerate(unsafe_options): - destination = tmp_dir / str(i) - assert not tmp_file.exists() - # The options will be allowed, but the command will fail. - with self.assertRaises(GitCommandError): - Repo.clone_from( - rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True - ) - assert tmp_file.exists() - tmp_file.unlink() - - unsafe_options = [ - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for i, unsafe_option in enumerate(unsafe_options): - destination = tmp_dir / str(i) - assert not destination.exists() - Repo.clone_from(rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True) - assert destination.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + ] + for i, unsafe_option in enumerate(unsafe_options): + destination = tmp_dir / str(i) + assert not tmp_file.exists() + # The options will be allowed, but the command will fail. + with self.assertRaises(GitCommandError): + Repo.clone_from( + rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True + ) + assert tmp_file.exists() + tmp_file.unlink() + + unsafe_options = [ + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for i, unsafe_option in enumerate(unsafe_options): + destination = tmp_dir / str(i) + assert not destination.exists() + Repo.clone_from(rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True) + assert destination.exists() @with_rw_repo("HEAD") def test_clone_from_safe_options(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - options = [ - "--depth=1", - "--single-branch", - "-q", - ] - for option in options: - destination = tmp_dir / option - assert not destination.exists() - Repo.clone_from(rw_repo.common_dir, destination, multi_options=[option]) - assert destination.exists() - - def test_clone_from_unsafe_procol(self): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - Repo.clone_from(url, tmp_dir) - assert not tmp_file.exists() - - def test_clone_from_unsafe_procol_allowed(self): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - "ext::sh -c touch% /tmp/pwn", - "fd::/foo", - ] - for url in urls: - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - Repo.clone_from(url, tmp_dir, allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + options = [ + "--depth=1", + "--single-branch", + "-q", + ] + for option in options: + destination = tmp_dir / option + assert not destination.exists() + Repo.clone_from(rw_repo.common_dir, destination, multi_options=[option]) + assert destination.exists() + + def test_clone_from_unsafe_protocol(self): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + Repo.clone_from(url, tmp_dir / "repo") + assert not tmp_file.exists() + + def test_clone_from_unsafe_protocol_allowed(self): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + Repo.clone_from(url, tmp_dir / "repo", allow_unsafe_protocols=True) + assert not tmp_file.exists() + + def test_clone_from_unsafe_protocol_allowed_and_enabled(self): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + ] + allow_ext = [ + "--config=protocol.ext.allow=always", + ] + for url in urls: + # The URL will be allowed into the command, and the protocol is enabled, + # but the command will fail since it can't read from the remote repo. + assert not tmp_file.exists() + with self.assertRaises(GitCommandError): + Repo.clone_from( + url, + tmp_dir / "repo", + multi_options=allow_ext, + allow_unsafe_protocols=True, + allow_unsafe_options=True, + ) + assert tmp_file.exists() + tmp_file.unlink() @with_rw_repo("HEAD") def test_max_chunk_size(self, repo): @@ -1326,26 +1359,28 @@ def test_do_not_strip_newline_in_stdout(self, rw_dir): @with_rw_repo("HEAD") def test_clone_command_injection(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - unexpected_file = tmp_dir / "pwn" - assert not unexpected_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + unexpected_file = tmp_dir / "pwn" + assert not unexpected_file.exists() - payload = f"--upload-pack=touch {unexpected_file}" - rw_repo.clone(payload) + payload = f"--upload-pack=touch {unexpected_file}" + rw_repo.clone(payload) - assert not unexpected_file.exists() - # A repo was cloned with the payload as name - assert pathlib.Path(payload).exists() + assert not unexpected_file.exists() + # A repo was cloned with the payload as name + assert pathlib.Path(payload).exists() @with_rw_repo("HEAD") def test_clone_from_command_injection(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - temp_repo = Repo.init(tmp_dir / "repo") - unexpected_file = tmp_dir / "pwn" + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + temp_repo = Repo.init(tmp_dir / "repo") + unexpected_file = tmp_dir / "pwn" - assert not unexpected_file.exists() - payload = f"--upload-pack=touch {unexpected_file}" - with self.assertRaises(GitCommandError): - rw_repo.clone_from(payload, temp_repo.common_dir) + assert not unexpected_file.exists() + payload = f"--upload-pack=touch {unexpected_file}" + with self.assertRaises(GitCommandError): + rw_repo.clone_from(payload, temp_repo.common_dir) - assert not unexpected_file.exists() + assert not unexpected_file.exists() diff --git a/test/test_submodule.py b/test/test_submodule.py index 13878df28..982226411 100644 --- a/test/test_submodule.py +++ b/test/test_submodule.py @@ -1101,139 +1101,147 @@ def test_add_no_clone_multi_options_argument(self, rwdir): @with_rw_repo("HEAD") def test_submodule_add_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - Submodule.add(rw_repo, "new", "new", url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + Submodule.add(rw_repo, "new", "new", url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_add_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::/foo", - ] - for url in urls: - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - Submodule.add(rw_repo, "new", "new", url, allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + Submodule.add(rw_repo, "new", "new", url, allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_add_unsafe_options(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - Submodule.add(rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option]) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + Submodule.add(rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option]) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_add_unsafe_options_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - ] - for unsafe_option in unsafe_options: - # The options will be allowed, but the command will fail. - with self.assertRaises(GitCommandError): - Submodule.add( - rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option], allow_unsafe_options=True - ) - assert not tmp_file.exists() - - unsafe_options = [ - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for unsafe_option in unsafe_options: - with self.assertRaises(GitCommandError): - Submodule.add( - rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option], allow_unsafe_options=True - ) + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + ] + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + with self.assertRaises(GitCommandError): + Submodule.add( + rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option], allow_unsafe_options=True + ) + assert not tmp_file.exists() + + unsafe_options = [ + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for unsafe_option in unsafe_options: + with self.assertRaises(GitCommandError): + Submodule.add( + rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option], allow_unsafe_options=True + ) @with_rw_repo("HEAD") def test_submodule_update_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::/foo", - ] - for url in urls: - submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url) - with self.assertRaises(UnsafeProtocolError): - submodule.update() - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::/foo", + ] + for url in urls: + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url) + with self.assertRaises(UnsafeProtocolError): + submodule.update() + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_update_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::/foo", - ] - for url in urls: - submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url) - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - submodule.update(allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::/foo", + ] + for url in urls: + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url) + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + submodule.update(allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_update_unsafe_options(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - submodule.update(clone_multi_options=[unsafe_option]) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + submodule.update(clone_multi_options=[unsafe_option]) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_update_unsafe_options_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - ] - submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) - for unsafe_option in unsafe_options: - # The options will be allowed, but the command will fail. - with self.assertRaises(GitCommandError): - submodule.update(clone_multi_options=[unsafe_option], allow_unsafe_options=True) - assert not tmp_file.exists() - - unsafe_options = [ - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) - for unsafe_option in unsafe_options: - with self.assertRaises(GitCommandError): - submodule.update(clone_multi_options=[unsafe_option], allow_unsafe_options=True) + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + ] + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + with self.assertRaises(GitCommandError): + submodule.update(clone_multi_options=[unsafe_option], allow_unsafe_options=True) + assert not tmp_file.exists() + + unsafe_options = [ + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) + for unsafe_option in unsafe_options: + with self.assertRaises(GitCommandError): + submodule.update(clone_multi_options=[unsafe_option], allow_unsafe_options=True)