diff --git a/securedrop/debian/securedrop-app-code.postinst b/securedrop/debian/securedrop-app-code.postinst index 95026acc1cf..0d2da38772b 100644 --- a/securedrop/debian/securedrop-app-code.postinst +++ b/securedrop/debian/securedrop-app-code.postinst @@ -177,7 +177,9 @@ export_journalist_public_key() { # Based on sdconfig.py, this should work with very old config.py files. fingerprint=$(cd /var/www/securedrop; python3 -c "import config; print(config.JOURNALIST_KEY)") # Set up journalist.pub as root/www-data 640 before writing to it. - install --mode=640 --owner=root --group=www-data $journalist_pub + touch $journalist_pub + chown root:www-data $journalist_pub + chmod 640 $journalist_pub # FIXME: we should verify the exported key matches the fingerprint we wanted. # shellcheck disable=SC2024 sudo -u www-data gpg2 --homedir=/var/lib/securedrop/keys --export --armor "$fingerprint" > $journalist_pub diff --git a/securedrop/encryption.py b/securedrop/encryption.py index 71c82d5fe87..d5b309381ce 100644 --- a/securedrop/encryption.py +++ b/securedrop/encryption.py @@ -1,16 +1,18 @@ import os import re import typing -from datetime import date -from io import BytesIO, StringIO +from io import BytesIO from pathlib import Path -from typing import Dict, List, Optional +from typing import BinaryIO, Dict, Optional import pretty_bad_protocol as gnupg from redis import Redis from sdconfig import SecureDropConfig +import redwood + if typing.TYPE_CHECKING: + from models import Source from source_user import SourceUser # To fix https://github.com/freedomofpress/securedrop/issues/78 @@ -33,28 +35,20 @@ class GpgDecryptError(Exception): class EncryptionManager: - - GPG_KEY_TYPE = "RSA" - GPG_KEY_LENGTH = 4096 - - # All reply keypairs will be "created" on the same day SecureDrop (then - # Strongbox) was publicly released for the first time. - # https://www.newyorker.com/news/news-desk/strongbox-and-aaron-swartz - DEFAULT_KEY_CREATION_DATE = date(2013, 5, 14) - - # '0' is the magic value that tells GPG's batch key generation not - # to set an expiration date. - DEFAULT_KEY_EXPIRATION_DATE = "0" + """EncryptionManager provides a high-level interface for each PGP operation we do""" REDIS_FINGERPRINT_HASH = "sd/crypto-util/fingerprints" REDIS_KEY_HASH = "sd/crypto-util/keys" - SOURCE_KEY_NAME = "Source Key" SOURCE_KEY_UID_RE = re.compile(r"(Source|Autogenerated) Key <[-A-Za-z0-9+/=_]+>") - def __init__(self, gpg_key_dir: Path, journalist_key_fingerprint: str) -> None: + def __init__(self, gpg_key_dir: Path, journalist_pub_key: Path) -> None: self._gpg_key_dir = gpg_key_dir - self._journalist_key_fingerprint = journalist_key_fingerprint + self.journalist_pub_key = journalist_pub_key + if not self.journalist_pub_key.exists(): + raise RuntimeError( + f"The journalist public key does not exist at {self.journalist_pub_key}" + ) self._redis = Redis(decode_responses=True) # Instantiate the "main" GPG binary @@ -71,15 +65,6 @@ def __init__(self, gpg_key_dir: Path, journalist_key_fingerprint: str) -> None: binary="gpg2", homedir=str(self._gpg_key_dir), options=["--yes", "--trust-model direct"] ) - # Ensure that the journalist public key has been previously imported in GPG - try: - self.get_journalist_public_key() - except GpgKeyNotFoundError: - raise OSError( - f"The journalist public key with fingerprint {journalist_key_fingerprint}" - f" has not been imported into GPG." - ) - @classmethod def get_default(cls) -> "EncryptionManager": global _default_encryption_mgr @@ -87,27 +72,22 @@ def get_default(cls) -> "EncryptionManager": config = SecureDropConfig.get_current() _default_encryption_mgr = cls( gpg_key_dir=config.GPG_KEY_DIR, - journalist_key_fingerprint=config.JOURNALIST_KEY, + journalist_pub_key=(config.SECUREDROP_DATA_ROOT / "journalist.pub"), ) return _default_encryption_mgr - def generate_source_key_pair(self, source_user: "SourceUser") -> None: - gen_key_input = self._gpg.gen_key_input( - passphrase=source_user.gpg_secret, - name_email=source_user.filesystem_id, - key_type=self.GPG_KEY_TYPE, - key_length=self.GPG_KEY_LENGTH, - name_real=self.SOURCE_KEY_NAME, - creation_date=self.DEFAULT_KEY_CREATION_DATE.isoformat(), - expire_date=self.DEFAULT_KEY_EXPIRATION_DATE, - ) - new_key = self._gpg.gen_key(gen_key_input) - - # Store the newly-created key's fingerprint in Redis for faster lookups - self._save_key_fingerprint_to_redis(source_user.filesystem_id, str(new_key)) - def delete_source_key_pair(self, source_filesystem_id: str) -> None: - source_key_fingerprint = self.get_source_key_fingerprint(source_filesystem_id) + """ + Try to delete the source's key from the filesystem. If it's not found, either: + (a) it doesn't exist or + (b) the source is Sequoia-based and has its key stored in Source.pgp_public_key, + which will be deleted when the Source instance itself is deleted. + """ + try: + source_key_fingerprint = self.get_source_key_fingerprint(source_filesystem_id) + except GpgKeyNotFoundError: + # If the source is entirely Sequoia-based, there is nothing to delete + return # The subkeys keyword argument deletes both secret and public keys self._gpg_for_key_deletion.delete_keys(source_key_fingerprint, secret=True, subkeys=True) @@ -116,7 +96,7 @@ def delete_source_key_pair(self, source_filesystem_id: str) -> None: self._redis.hdel(self.REDIS_FINGERPRINT_HASH, source_filesystem_id) def get_journalist_public_key(self) -> str: - return self._get_public_key(self._journalist_key_fingerprint) + return self.journalist_pub_key.read_text() def get_source_public_key(self, source_filesystem_id: str) -> str: source_key_fingerprint = self.get_source_key_fingerprint(source_filesystem_id) @@ -134,35 +114,48 @@ def get_source_key_fingerprint(self, source_filesystem_id: str) -> str: return source_key_fingerprint def encrypt_source_message(self, message_in: str, encrypted_message_path_out: Path) -> None: - message_as_stream = StringIO(message_in) - self._encrypt( + redwood.encrypt_message( # A submission is only encrypted for the journalist key - using_keys_with_fingerprints=[self._journalist_key_fingerprint], - plaintext_in=message_as_stream, - ciphertext_path_out=encrypted_message_path_out, + recipients=[self.get_journalist_public_key()], + plaintext=message_in, + destination=encrypted_message_path_out, ) - def encrypt_source_file(self, file_in: typing.IO, encrypted_file_path_out: Path) -> None: - self._encrypt( + def encrypt_source_file(self, file_in: BinaryIO, encrypted_file_path_out: Path) -> None: + redwood.encrypt_stream( # A submission is only encrypted for the journalist key - using_keys_with_fingerprints=[self._journalist_key_fingerprint], - plaintext_in=file_in, - ciphertext_path_out=encrypted_file_path_out, + recipients=[self.get_journalist_public_key()], + plaintext=file_in, + destination=encrypted_file_path_out, ) def encrypt_journalist_reply( - self, for_source_with_filesystem_id: str, reply_in: str, encrypted_reply_path_out: Path + self, for_source: "Source", reply_in: str, encrypted_reply_path_out: Path ) -> None: - source_key_fingerprint = self.get_source_key_fingerprint(for_source_with_filesystem_id) - reply_as_stream = StringIO(reply_in) - self._encrypt( + redwood.encrypt_message( # A reply is encrypted for both the journalist key and the source key - using_keys_with_fingerprints=[source_key_fingerprint, self._journalist_key_fingerprint], - plaintext_in=reply_as_stream, - ciphertext_path_out=encrypted_reply_path_out, + recipients=[for_source.public_key, self.get_journalist_public_key()], + plaintext=reply_in, + destination=encrypted_reply_path_out, ) - def decrypt_journalist_reply(self, for_source_user: "SourceUser", ciphertext_in: bytes) -> str: + def decrypt_journalist_reply( + self, for_source_user: "SourceUser", for_source: "Source", ciphertext_in: bytes + ) -> str: + """ + Decrypt a reply sent by a journalist. + + Note that `for_source_user` and `for_source` must point to the same source + """ + if for_source_user.filesystem_id != for_source.filesystem_id: + raise ValueError("for_source_user and for_source filesystem_id's did not match!") + if for_source.pgp_secret_key is not None: + return redwood.decrypt( + ciphertext_in, + secret_key=for_source.pgp_secret_key, + passphrase=for_source_user.gpg_secret, + ).decode() + # TODO: Migrate gpg-stored secret keys to database storage for Sequoia ciphertext_as_stream = BytesIO(ciphertext_in) out = self._gpg.decrypt_file(ciphertext_as_stream, passphrase=for_source_user.gpg_secret) if not out.ok: @@ -170,27 +163,6 @@ def decrypt_journalist_reply(self, for_source_user: "SourceUser", ciphertext_in: return out.data.decode("utf-8") - def _encrypt( - self, - using_keys_with_fingerprints: List[str], - plaintext_in: typing.IO, - ciphertext_path_out: Path, - ) -> None: - # Remove any spaces from provided fingerprints GPG outputs fingerprints - # with spaces for readability, but requires the spaces to be removed - # when using fingerprints to specify recipients. - sanitized_key_fingerprints = [fpr.replace(" ", "") for fpr in using_keys_with_fingerprints] - - out = self._gpg.encrypt( - plaintext_in, - *sanitized_key_fingerprints, - output=str(ciphertext_path_out), - always_trust=True, - armor=False, - ) - if not out.ok: - raise GpgEncryptError(out.stderr) - def _get_source_key_details(self, source_filesystem_id: str) -> Dict[str, str]: for key in self._gpg.list_keys(): for uid in key["uids"]: diff --git a/securedrop/journalist_app/main.py b/securedrop/journalist_app/main.py index dca9a2d13d5..eefe4602ce1 100644 --- a/securedrop/journalist_app/main.py +++ b/securedrop/journalist_app/main.py @@ -143,7 +143,7 @@ def reply() -> werkzeug.Response: g.source.interaction_count, g.source.journalist_filename ) EncryptionManager.get_default().encrypt_journalist_reply( - for_source_with_filesystem_id=g.filesystem_id, + for_source=g.source, reply_in=form.message.data, encrypted_reply_path_out=Path(Storage.get_default().path(g.filesystem_id, filename)), ) diff --git a/securedrop/loaddata.py b/securedrop/loaddata.py index c44af15e545..43f7e2a2d8f 100755 --- a/securedrop/loaddata.py +++ b/securedrop/loaddata.py @@ -221,7 +221,7 @@ def add_reply( record_source_interaction(source) fname = f"{source.interaction_count}-{source.journalist_filename}-reply.gpg" EncryptionManager.get_default().encrypt_journalist_reply( - for_source_with_filesystem_id=source.filesystem_id, + for_source=source, reply_in=next(replies), encrypted_reply_path_out=Path(Storage.get_default().path(source.filesystem_id, fname)), ) @@ -252,9 +252,6 @@ def add_source() -> Tuple[Source, str]: source = source_user.get_db_record() db.session.commit() - # Generate source key - EncryptionManager.get_default().generate_source_key_pair(source_user) - return source, codename diff --git a/securedrop/source_app/main.py b/securedrop/source_app/main.py index 6573f5f6f4d..50ae26281be 100644 --- a/securedrop/source_app/main.py +++ b/securedrop/source_app/main.py @@ -7,7 +7,7 @@ import store import werkzeug from db import db -from encryption import EncryptionManager, GpgKeyNotFoundError +from encryption import EncryptionManager from flask import ( Blueprint, abort, @@ -43,6 +43,8 @@ ) from store import Storage +import redwood + def make_blueprint(config: SecureDropConfig) -> Blueprint: view = Blueprint("main", __name__) @@ -161,7 +163,9 @@ def lookup(logged_in_source: SourceUser) -> str: with open(reply_path, "rb") as f: contents = f.read() decrypted_reply = EncryptionManager.get_default().decrypt_journalist_reply( - for_source_user=logged_in_source, ciphertext_in=contents + for_source_user=logged_in_source, + for_source=logged_in_source_in_db, + ciphertext_in=contents, ) reply.decrypted = decrypted_reply except UnicodeDecodeError: @@ -175,13 +179,6 @@ def lookup(logged_in_source: SourceUser) -> str: # Sort the replies by date replies.sort(key=operator.attrgetter("date"), reverse=True) - # If not done yet, generate a keypair to encrypt replies from the journalist - encryption_mgr = EncryptionManager.get_default() - try: - encryption_mgr.get_source_public_key(logged_in_source.filesystem_id) - except GpgKeyNotFoundError: - encryption_mgr.generate_source_key_pair(logged_in_source) - return render_template( "lookup.html", is_user_logged_in=True, @@ -356,20 +353,33 @@ def batch_delete(logged_in_source: SourceUser) -> werkzeug.Response: @view.route("/login", methods=("GET", "POST")) def login() -> Union[str, werkzeug.Response]: form = LoginForm() - if form.validate_on_submit(): - try: - SessionManager.log_user_in( - db_session=db.session, - supplied_passphrase=DicewarePassphrase(request.form["codename"].strip()), - ) - except InvalidPassphraseError: - current_app.logger.info("Login failed for invalid codename") - flash_msg("error", None, gettext("Sorry, that is not a recognized codename.")) - else: - # Success: a valid passphrase was supplied - return redirect(url_for(".lookup", from_login="1")) - - return render_template("login.html", form=form) + if not form.validate_on_submit(): + return render_template("login.html", form=form) + try: + source_user = SessionManager.log_user_in( + db_session=db.session, + supplied_passphrase=DicewarePassphrase(request.form["codename"].strip()), + ) + except InvalidPassphraseError: + current_app.logger.info("Login failed for invalid codename") + flash_msg("error", None, gettext("Sorry, that is not a recognized codename.")) + return render_template("login.html", form=form) + # Success: a valid passphrase was supplied and the source was logged-in + source = source_user.get_db_record() + if source.fingerprint is None: + # This legacy source didn't have a PGP keypair generated yet, + # do it now. + public_key, secret_key, fingerprint = redwood.generate_source_key_pair( + source_user.gpg_secret, source_user.filesystem_id + ) + source.pgp_public_key = public_key + source.pgp_secret_key = secret_key + source.pgp_fingerprint = fingerprint + db.session.add(source) + db.session.commit() + # TODO: Migrate GPG secret key here too + + return redirect(url_for(".lookup", from_login="1")) @view.route("/logout") def logout() -> Union[str, werkzeug.Response]: diff --git a/securedrop/source_user.py b/securedrop/source_user.py index 7987d2bb8ab..e89330d4290 100644 --- a/securedrop/source_user.py +++ b/securedrop/source_user.py @@ -12,6 +12,8 @@ from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session +import redwood + if TYPE_CHECKING: from passphrases import DicewarePassphrase from store import Storage @@ -99,9 +101,18 @@ def create_source_user( # Could not generate a designation that is not already used raise SourceDesignationCollisionError() + # Generate PGP keys + public_key, secret_key, fingerprint = redwood.generate_source_key_pair( + gpg_secret, filesystem_id + ) + # Store the source in the DB source_db_record = models.Source( - filesystem_id=filesystem_id, journalist_designation=valid_designation + filesystem_id=filesystem_id, + journalist_designation=valid_designation, + public_key=public_key, + secret_key=secret_key, + fingerprint=fingerprint, ) db_session.add(source_db_record) try: diff --git a/securedrop/store.py b/securedrop/store.py index 30be59c66f2..9d8e514c3bd 100644 --- a/securedrop/store.py +++ b/securedrop/store.py @@ -8,7 +8,7 @@ from hashlib import sha256 from pathlib import Path from tempfile import _TemporaryFileWrapper -from typing import IO, List, Optional, Type, Union +from typing import BinaryIO, List, Optional, Type, Union import rm from encryption import EncryptionManager @@ -301,7 +301,7 @@ def save_file_submission( count: int, journalist_filename: str, filename: Optional[str], - stream: "IO[bytes]", + stream: BinaryIO, ) -> str: if filename is not None: diff --git a/securedrop/tests/conftest.py b/securedrop/tests/conftest.py index 378719af8b0..eef04b38190 100644 --- a/securedrop/tests/conftest.py +++ b/securedrop/tests/conftest.py @@ -9,7 +9,6 @@ from pathlib import Path from typing import Any, Dict, Generator, Tuple from unittest import mock -from unittest.mock import PropertyMock from uuid import uuid4 import pretty_bad_protocol as gnupg @@ -17,7 +16,6 @@ import pytest import sdconfig from db import db -from encryption import EncryptionManager from flask import Flask, url_for from journalist_app import create_app as create_journalist_app from passphrases import PassphraseGenerator @@ -91,14 +89,11 @@ def setup_journalist_key_and_gpg_folder() -> Generator[Tuple[str, Path], None, N gpg = gnupg.GPG("gpg2", homedir=str(tmp_gpg_dir)) journalist_public_key_path = Path(__file__).parent / "files" / "test_journalist_key.pub" journalist_public_key = journalist_public_key_path.read_text() - journalist_key_fingerprint = gpg.import_keys(journalist_public_key).fingerprints[0] + # TODO: we don't need the journalist pub key in the keyring anymore, but include + # it anyways to match a legacy prod keyring. + gpg.import_keys(journalist_public_key) - # Reduce source GPG key length to speed up tests at the expense of security - with mock.patch.object( - EncryptionManager, "GPG_KEY_LENGTH", PropertyMock(return_value=1024) - ): - - yield journalist_key_fingerprint, tmp_gpg_dir + yield "65A1B5FF195B56353CC63DFFCC40EF1228271441", tmp_gpg_dir finally: shutil.rmtree(tmp_gpg_dir, ignore_errors=True) @@ -230,7 +225,6 @@ def test_source(journalist_app: Flask, app_storage: Storage) -> Dict[str, Any]: source_passphrase=passphrase, source_app_storage=app_storage, ) - EncryptionManager.get_default().generate_source_key_pair(source_user) source = source_user.get_db_record() return { "source_user": source_user, diff --git a/securedrop/tests/factories.py b/securedrop/tests/factories.py index a595f0b840c..a4b0443ae95 100644 --- a/securedrop/tests/factories.py +++ b/securedrop/tests/factories.py @@ -106,5 +106,10 @@ def create( config.TEMP_DIR.mkdir(parents=True) config.STORE_DIR.mkdir(parents=True) + # Copy the journalist public key into DATA_ROOT + shutil.copy2( + Path(__file__).parent / "files/test_journalist_key.pub", + config.SECUREDROP_DATA_ROOT / "journalist.pub", + ) # All done return config diff --git a/securedrop/tests/functional/app_navigators/journalist_app_nav.py b/securedrop/tests/functional/app_navigators/journalist_app_nav.py index 5795280fb31..c5634e270ba 100644 --- a/securedrop/tests/functional/app_navigators/journalist_app_nav.py +++ b/securedrop/tests/functional/app_navigators/journalist_app_nav.py @@ -1,22 +1,20 @@ import base64 import gzip -import tempfile from binascii import unhexlify from random import randint from typing import Callable, Dict, Iterable, Optional, Tuple import requests import two_factor -from encryption import EncryptionManager from selenium.common.exceptions import NoSuchElementException from selenium.webdriver import ActionChains from selenium.webdriver.common.by import By from selenium.webdriver.firefox.webdriver import WebDriver from selenium.webdriver.remote.webelement import WebElement from selenium.webdriver.support import expected_conditions +from tests import utils from tests.functional.app_navigators._nav_helper import NavigationHelper from tests.functional.tor_utils import proxies_for_url -from tests.test_encryption import import_journalist_private_key class JournalistAppNavigator: @@ -98,20 +96,7 @@ def _download_content_at_url(url: str, cookies: Dict[str, str]) -> bytes: data += chunk return data - @staticmethod - def _unzip_content(raw_content: bytes) -> str: - with tempfile.TemporaryFile() as fp: - fp.write(raw_content) - fp.seek(0) - - gzf = gzip.GzipFile(mode="rb", fileobj=fp) - content = gzf.read() - - return content.decode("utf-8") - - def journalist_downloads_first_message( - self, encryption_mgr_to_use_for_decryption: EncryptionManager - ) -> str: + def journalist_downloads_first_message(self) -> str: # Select the first submission from the first source in the page self.journalist_selects_the_first_source() self.nav_helper.wait_for( @@ -136,15 +121,13 @@ def cookie_string_from_selenium_cookies( cks = cookie_string_from_selenium_cookies(self.driver.get_cookies()) raw_content = self._download_content_at_url(file_url, cks) - with import_journalist_private_key(encryption_mgr_to_use_for_decryption): - decryption_result = encryption_mgr_to_use_for_decryption._gpg.decrypt(raw_content) - + decryption_result = utils.decrypt_as_journalist(raw_content) if file_url.endswith(".gz.gpg"): - decrypted_message = self._unzip_content(decryption_result.data) + decrypted_message = gzip.decompress(decryption_result) else: - decrypted_message = decryption_result.data.decode("utf-8") + decrypted_message = decryption_result - return decrypted_message + return decrypted_message.decode() def journalist_selects_the_first_source(self) -> None: self.driver.find_element_by_css_selector("#un-starred-source-link-1").click() diff --git a/securedrop/tests/functional/conftest.py b/securedrop/tests/functional/conftest.py index 216d452f435..fbde7fb1f9f 100644 --- a/securedrop/tests/functional/conftest.py +++ b/securedrop/tests/functional/conftest.py @@ -275,7 +275,6 @@ def create_source_and_submission(config_in_use: SecureDropConfig) -> Tuple[Sourc """ # This function will be called in a separate Process that runs the app # Hence the late imports - from encryption import EncryptionManager from models import Submission from passphrases import PassphraseGenerator from source_user import create_source_user @@ -291,7 +290,6 @@ def create_source_and_submission(config_in_use: SecureDropConfig) -> Tuple[Sourc source_app_storage=Storage.get_default(), ) source_db_record = source_user.get_db_record() - EncryptionManager.get_default().generate_source_key_pair(source_user) # Create a file submission from this source source_db_record.interaction_count += 1 diff --git a/securedrop/tests/functional/pageslayout/test_submit_and_retrieve_file.py b/securedrop/tests/functional/pageslayout/test_submit_and_retrieve_file.py index e4ac6e6306e..c2d98ffc4e1 100644 --- a/securedrop/tests/functional/pageslayout/test_submit_and_retrieve_file.py +++ b/securedrop/tests/functional/pageslayout/test_submit_and_retrieve_file.py @@ -1,5 +1,4 @@ import pytest -from encryption import EncryptionManager from selenium.common.exceptions import NoSuchElementException from tests.functional.app_navigators.journalist_app_nav import JournalistAppNavigator from tests.functional.app_navigators.source_app_nav import SourceAppNavigator @@ -48,13 +47,7 @@ def test_submit_and_retrieve_happy_path( # And when they try to download the file # Then it succeeds and the journalist sees the correct content - apps_sd_config = sd_servers_with_clean_state.config_in_use - retrieved_message = journ_app_nav.journalist_downloads_first_message( - encryption_mgr_to_use_for_decryption=EncryptionManager( - gpg_key_dir=apps_sd_config.GPG_KEY_DIR, - journalist_key_fingerprint=apps_sd_config.JOURNALIST_KEY, - ) - ) + retrieved_message = journ_app_nav.journalist_downloads_first_message() assert retrieved_message == submitted_content # And when they reply to the source, it succeeds diff --git a/securedrop/tests/functional/test_submit_and_retrieve_message.py b/securedrop/tests/functional/test_submit_and_retrieve_message.py index 61c237d84d4..23a13e547b7 100644 --- a/securedrop/tests/functional/test_submit_and_retrieve_message.py +++ b/securedrop/tests/functional/test_submit_and_retrieve_message.py @@ -1,4 +1,3 @@ -from encryption import EncryptionManager from tests.functional.app_navigators.journalist_app_nav import JournalistAppNavigator from tests.functional.app_navigators.source_app_nav import SourceAppNavigator @@ -37,11 +36,5 @@ def test_submit_and_retrieve_happy_path( # And they try to download the message # Then it succeeds and the journalist sees correct message - servers_sd_config = sd_servers_with_clean_state.config_in_use - retrieved_message = journ_app_nav.journalist_downloads_first_message( - encryption_mgr_to_use_for_decryption=EncryptionManager( - gpg_key_dir=servers_sd_config.GPG_KEY_DIR, - journalist_key_fingerprint=servers_sd_config.JOURNALIST_KEY, - ) - ) + retrieved_message = journ_app_nav.journalist_downloads_first_message() assert retrieved_message == submitted_message diff --git a/securedrop/tests/test_encryption.py b/securedrop/tests/test_encryption.py index ea34678a326..daa2f6cfe3b 100644 --- a/securedrop/tests/test_encryption.py +++ b/securedrop/tests/test_encryption.py @@ -1,69 +1,30 @@ -import typing -from contextlib import contextmanager -from datetime import datetime from pathlib import Path import pytest from db import db -from encryption import EncryptionManager, GpgDecryptError, GpgEncryptError, GpgKeyNotFoundError +from encryption import EncryptionManager, GpgDecryptError, GpgKeyNotFoundError from passphrases import PassphraseGenerator from source_user import create_source_user +from tests import utils + +from redwood import RedwoodError class TestEncryptionManager: def test_get_default(self, config): + # Given an encryption manager encryption_mgr = EncryptionManager.get_default() assert encryption_mgr - assert encryption_mgr.get_journalist_public_key() - - def test_generate_source_key_pair( - self, setup_journalist_key_and_gpg_folder, source_app, app_storage - ): - # Given a source user - with source_app.app_context(): - source_user = create_source_user( - db_session=db.session, - source_passphrase=PassphraseGenerator.get_default().generate_passphrase(), - source_app_storage=app_storage, - ) - - # And an encryption manager - journalist_key_fingerprint, gpg_key_dir = setup_journalist_key_and_gpg_folder - encryption_mgr = EncryptionManager( - gpg_key_dir=gpg_key_dir, journalist_key_fingerprint=journalist_key_fingerprint - ) - - # When using the encryption manager to generate a key pair for this source user + # When using the encryption manager to fetch the journalist public key # It succeeds - encryption_mgr.generate_source_key_pair(source_user) - - # And the newly-created key's fingerprint was added to Redis - fingerprint_in_redis = encryption_mgr._redis.hget( - encryption_mgr.REDIS_FINGERPRINT_HASH, source_user.filesystem_id - ) - assert fingerprint_in_redis - source_key_fingerprint = encryption_mgr.get_source_key_fingerprint( - source_user.filesystem_id - ) - assert fingerprint_in_redis == source_key_fingerprint - - # And the user's newly-generated public key can be retrieved - assert encryption_mgr.get_source_public_key(source_user.filesystem_id) - - # And the key has a hardcoded creation date to avoid leaking information about when sources - # first created their account - source_key_details = encryption_mgr._get_source_key_details(source_user.filesystem_id) - assert source_key_details - creation_date = _parse_gpg_date_string(source_key_details["date"]) - assert creation_date.date() == EncryptionManager.DEFAULT_KEY_CREATION_DATE - - # And the user's key does not expire - assert source_key_details["expires"] == "" + journalist_pub_key = encryption_mgr.get_journalist_public_key() + assert journalist_pub_key.startswith("-----BEGIN PGP PUBLIC KEY BLOCK----") - def test_get_source_public_key(self, test_source): - # Given a source user with a key pair in the default encryption manager + def test_get_gpg_source_public_key(self, test_source): + # Given a source user with a key pair in the gpg keyring source_user = test_source["source_user"] encryption_mgr = EncryptionManager.get_default() + utils.create_legacy_gpg_key(encryption_mgr, source_user, test_source["source"]) # When using the encryption manager to fetch the source user's public key # It succeeds @@ -80,35 +41,21 @@ def test_get_source_public_key(self, test_source): # And the public key was saved to Redis assert encryption_mgr._redis.hget(encryption_mgr.REDIS_KEY_HASH, source_key_fingerprint) - def test_get_journalist_public_key(self, setup_journalist_key_and_gpg_folder): - # Given an encryption manager - journalist_key_fingerprint, gpg_key_dir = setup_journalist_key_and_gpg_folder - encryption_mgr = EncryptionManager( - gpg_key_dir=gpg_key_dir, journalist_key_fingerprint=journalist_key_fingerprint - ) - - # When using the encryption manager to fetch the journalist public key - # It succeeds - journalist_pub_key = encryption_mgr.get_journalist_public_key() - assert journalist_pub_key - assert journalist_pub_key.startswith("-----BEGIN PGP PUBLIC KEY BLOCK----") - - def test_get_source_public_key_wrong_id(self, setup_journalist_key_and_gpg_folder): + def test_get_gpg_source_public_key_wrong_id(self, test_source): # Given an encryption manager - journalist_key_fingerprint, gpg_key_dir = setup_journalist_key_and_gpg_folder - encryption_mgr = EncryptionManager( - gpg_key_dir=gpg_key_dir, journalist_key_fingerprint=journalist_key_fingerprint - ) + encryption_mgr = EncryptionManager.get_default() # When using the encryption manager to fetch a key for an invalid filesystem id # It fails with pytest.raises(GpgKeyNotFoundError): encryption_mgr.get_source_public_key("1234test") - def test_delete_source_key_pair(self, source_app, test_source): - # Given a source user with a key pair in the default encryption manager + def test_delete_gpg_source_key_pair(self, source_app, test_source): + # Given a source user with a key pair in the gpg keyring source_user = test_source["source_user"] encryption_mgr = EncryptionManager.get_default() + utils.create_legacy_gpg_key(encryption_mgr, source_user, test_source["source"]) + assert encryption_mgr.get_source_public_key(source_user.filesystem_id) # When using the encryption manager to delete this source user's key pair # It succeeds @@ -124,27 +71,17 @@ def test_delete_source_key_pair(self, source_app, test_source): with pytest.raises(GpgKeyNotFoundError): encryption_mgr._get_source_key_details(source_user.filesystem_id) - def test_delete_source_key_pair_on_journalist_key(self, setup_journalist_key_and_gpg_folder): - # Given an encryption manager - journalist_key_fingerprint, gpg_key_dir = setup_journalist_key_and_gpg_folder - encryption_mgr = EncryptionManager( - gpg_key_dir=gpg_key_dir, journalist_key_fingerprint=journalist_key_fingerprint - ) - - # When trying to delete the journalist key via the encryption manager - # It fails - with pytest.raises(GpgKeyNotFoundError): - encryption_mgr.delete_source_key_pair(journalist_key_fingerprint) - def test_delete_source_key_pair_pinentry_status_is_handled( self, source_app, test_source, mocker, capsys ): """ Regression test for https://github.com/freedomofpress/securedrop/issues/4294 """ - # Given a source user with a key pair in the default encryption manager + # Given a source user with a key pair in the gpg keyring source_user = test_source["source_user"] encryption_mgr = EncryptionManager.get_default() + utils.create_legacy_gpg_key(encryption_mgr, source_user, test_source["source"]) + assert encryption_mgr.get_source_public_key(source_user.filesystem_id) # And a gpg binary that will trigger the issue described in #4294 mocker.patch( @@ -164,12 +101,9 @@ def test_delete_source_key_pair_pinentry_status_is_handled( captured = capsys.readouterr() assert "ValueError: Unknown status message: 'PINENTRY_LAUNCHED'" not in captured.err - def test_encrypt_source_message(self, setup_journalist_key_and_gpg_folder, tmp_path): + def test_encrypt_source_message(self, config, tmp_path): # Given an encryption manager - journalist_key_fingerprint, gpg_key_dir = setup_journalist_key_and_gpg_folder - encryption_mgr = EncryptionManager( - gpg_key_dir=gpg_key_dir, journalist_key_fingerprint=journalist_key_fingerprint - ) + encryption_mgr = EncryptionManager.get_default() # And a message to be submitted by a source message = "s3cr3t message" @@ -183,92 +117,136 @@ def test_encrypt_source_message(self, setup_journalist_key_and_gpg_folder, tmp_p # And the output file contains the encrypted data encrypted_message = encrypted_message_path.read_bytes() - assert encrypted_message + assert encrypted_message.startswith(b"-----BEGIN PGP MESSAGE-----") # And the journalist is able to decrypt the message - with import_journalist_private_key(encryption_mgr): - decrypted_message = encryption_mgr._gpg.decrypt(encrypted_message).data - assert decrypted_message.decode() == message - - # And the source or anyone else is NOT able to decrypt the message - # For GPG 2.1+, a non-null passphrase _must_ be passed to decrypt() - assert not encryption_mgr._gpg.decrypt(encrypted_message, passphrase="test 123").ok + decrypted_message = utils.decrypt_as_journalist(encrypted_message).decode() + assert decrypted_message == message - def test_encrypt_source_file(self, setup_journalist_key_and_gpg_folder, tmp_path): + def test_encrypt_source_file(self, config, tmp_path): # Given an encryption manager - journalist_key_fingerprint, gpg_key_dir = setup_journalist_key_and_gpg_folder - encryption_mgr = EncryptionManager( - gpg_key_dir=gpg_key_dir, journalist_key_fingerprint=journalist_key_fingerprint - ) + encryption_mgr = EncryptionManager.get_default() # And a file to be submitted by a source - we use this python file file_to_encrypt_path = Path(__file__) - with file_to_encrypt_path.open() as file_to_encrypt: - # When the source tries to encrypt the file - # It succeeds - encrypted_file_path = tmp_path / "file.gpg" + # When the source tries to encrypt the file + # It succeeds + encrypted_file_path = tmp_path / "file.gpg" + with file_to_encrypt_path.open("rb") as fh: encryption_mgr.encrypt_source_file( - file_in=file_to_encrypt, + file_in=fh, encrypted_file_path_out=encrypted_file_path, ) - # And the output file contains the encrypted data - encrypted_file = encrypted_file_path.read_bytes() - assert encrypted_file + # And the output file contains the encrypted data + encrypted_file = encrypted_file_path.read_bytes() + assert encrypted_file.startswith(b"-----BEGIN PGP MESSAGE-----") # And the journalist is able to decrypt the file - with import_journalist_private_key(encryption_mgr): - decrypted_file = encryption_mgr._gpg.decrypt(encrypted_file).data - assert decrypted_file.decode() == file_to_encrypt_path.read_text() - - # And the source or anyone else is NOT able to decrypt the file - # For GPG 2.1+, a non-null passphrase _must_ be passed to decrypt() - assert not encryption_mgr._gpg.decrypt(encrypted_file, passphrase="test 123").ok + decrypted_file = utils.decrypt_as_journalist(encrypted_file) + assert decrypted_file == file_to_encrypt_path.read_bytes() def test_encrypt_and_decrypt_journalist_reply( self, source_app, test_source, tmp_path, app_storage ): - # Given a source user with a key pair in the default encryption manager + # Given a source user source_user1 = test_source["source_user"] + source1 = test_source["source"] encryption_mgr = EncryptionManager.get_default() - # And another source with a key pair in the default encryption manager + # And another source user with source_app.app_context(): source_user2 = create_source_user( db_session=db.session, source_passphrase=PassphraseGenerator.get_default().generate_passphrase(), source_app_storage=app_storage, ) - encryption_mgr.generate_source_key_pair(source_user2) + source2 = source_user2.get_db_record() # When the journalist tries to encrypt a reply to source1 # It succeeds journalist_reply = "s3cr3t message" encrypted_reply_path = tmp_path / "reply.gpg" encryption_mgr.encrypt_journalist_reply( - for_source_with_filesystem_id=source_user1.filesystem_id, + for_source=source1, reply_in=journalist_reply, encrypted_reply_path_out=encrypted_reply_path, ) # And the output file contains the encrypted data encrypted_reply = encrypted_reply_path.read_bytes() - assert encrypted_reply + assert encrypted_reply.startswith(b"-----BEGIN PGP MESSAGE-----") # And source1 is able to decrypt the reply decrypted_reply = encryption_mgr.decrypt_journalist_reply( - for_source_user=source_user1, ciphertext_in=encrypted_reply + for_source_user=source_user1, + for_source=source1, + ciphertext_in=encrypted_reply, ) - assert decrypted_reply assert decrypted_reply == journalist_reply # And source2 is NOT able to decrypt the reply - with pytest.raises(GpgDecryptError): + with pytest.raises(RedwoodError): encryption_mgr.decrypt_journalist_reply( - for_source_user=source_user2, ciphertext_in=encrypted_reply + for_source_user=source_user2, + for_source=source2, + ciphertext_in=encrypted_reply, + ) + + # And the journalist is able to decrypt their own reply + decrypted_reply_for_journalist = utils.decrypt_as_journalist(encrypted_reply) + assert decrypted_reply_for_journalist.decode() == journalist_reply + + def test_gpg_encrypt_and_decrypt_journalist_reply( + self, source_app, test_source, tmp_path, app_storage + ): + # Given a source user with a key pair in the gpg keyring + source_user1 = test_source["source_user"] + source1 = test_source["source"] + encryption_mgr = EncryptionManager.get_default() + utils.create_legacy_gpg_key(encryption_mgr, source_user1, source1) + + # And another source with a key pair in the gpg keyring + with source_app.app_context(): + source_user2 = create_source_user( + db_session=db.session, + source_passphrase=PassphraseGenerator.get_default().generate_passphrase(), + source_app_storage=app_storage, + ) + source2 = source_user2.get_db_record() + utils.create_legacy_gpg_key(encryption_mgr, source_user2, source2) + + # When the journalist tries to encrypt a reply to source1 + # It succeeds + journalist_reply = "s3cr3t message" + encrypted_reply_path = tmp_path / "reply.gpg" + encryption_mgr.encrypt_journalist_reply( + for_source=source1, + reply_in=journalist_reply, + encrypted_reply_path_out=encrypted_reply_path, ) + # And the output file contains the encrypted data + encrypted_reply = encrypted_reply_path.read_bytes() + assert encrypted_reply.startswith(b"-----BEGIN PGP MESSAGE-----") + + # And source1 is able to decrypt the reply + decrypted_reply = encryption_mgr.decrypt_journalist_reply( + for_source_user=source_user1, + for_source=source1, + ciphertext_in=encrypted_reply, + ) + assert decrypted_reply == journalist_reply + + # And source2 is NOT able to decrypt the reply + with pytest.raises(GpgDecryptError): + encryption_mgr.decrypt_journalist_reply( + for_source_user=source_user2, + for_source=source2, + ciphertext_in=encrypted_reply, + ) + # Amd the reply can't be decrypted without providing the source1's gpg secret result = encryption_mgr._gpg.decrypt( # For GPG 2.1+, a non-null passphrase _must_ be passed to decrypt() @@ -278,77 +256,5 @@ def test_encrypt_and_decrypt_journalist_reply( assert not result.ok # And the journalist is able to decrypt their reply - with import_journalist_private_key(encryption_mgr): - decrypted_reply_for_journalist = encryption_mgr._gpg.decrypt( - # For GPG 2.1+, a non-null passphrase _must_ be passed to decrypt() - encrypted_reply, - passphrase="test 123", - ).data - assert decrypted_reply_for_journalist.decode() == journalist_reply - - def test_encrypt_fails(self, setup_journalist_key_and_gpg_folder, tmp_path): - # Given an encryption manager - journalist_key_fingerprint, gpg_key_dir = setup_journalist_key_and_gpg_folder - encryption_mgr = EncryptionManager( - gpg_key_dir=gpg_key_dir, journalist_key_fingerprint=journalist_key_fingerprint - ) - - # When trying to encrypt some data without providing any recipient - # It fails and the right exception is raised - with pytest.raises(GpgEncryptError) as exc: - encryption_mgr._encrypt( - using_keys_with_fingerprints=[], - plaintext_in="test", - ciphertext_path_out=tmp_path / "encrypt_fails", - ) - assert "no terminal at all requested" in str(exc) - - -def _parse_gpg_date_string(date_string: str) -> datetime: - """Parse a date string returned from `gpg --with-colons --list-keys` into a datetime. - - The format of the date strings is complicated; see gnupg doc/DETAILS for a - full explanation. - - Key details: - - The creation date of the key is given in UTC. - - the date is usually printed in seconds since epoch, however, we are - migrating to an ISO 8601 format (e.g. "19660205T091500"). A simple - way to detect the new format is to scan for the 'T'. - """ - if "T" in date_string: - dt = datetime.strptime(date_string, "%Y%m%dT%H%M%S") - else: - dt = datetime.utcfromtimestamp(int(date_string)) - return dt - - -@contextmanager -def import_journalist_private_key( - encryption_mgr: EncryptionManager, -) -> typing.Generator[None, None, None]: - """Import the journalist secret key so the encryption_mgr can decrypt data for the journalist. - - The journalist secret key is removed at the end of this context manager in order to not impact - other decryption-related tests. - """ - # Import the journalist private key - journalist_private_key_path = Path(__file__).parent / "files" / "test_journalist_key.sec" - encryption_mgr._gpg.import_keys(journalist_private_key_path.read_text()) - journalist_secret_key_fingerprint = "C1C4E16BB24E4F4ABF37C3A6C3E7C4C0A2201B2A" - - yield - - # Make sure to remove the journalist private key to not impact the other tests - encryption_mgr._gpg_for_key_deletion.delete_keys( - fingerprints=journalist_secret_key_fingerprint, secret=True, subkeys=False - ) - - # Double check that the journalist private key was removed - is_journalist_secret_key_available = False - for key in encryption_mgr._gpg.list_keys(secret=True): - for uid in key["uids"]: - if "SecureDrop Test" in uid: - is_journalist_secret_key_available = True - break - assert not is_journalist_secret_key_available + decrypted_reply_for_journalist = utils.decrypt_as_journalist(encrypted_reply).decode() + assert decrypted_reply_for_journalist == journalist_reply diff --git a/securedrop/tests/test_integration.py b/securedrop/tests/test_integration.py index 7458878acbb..bc1ee845988 100644 --- a/securedrop/tests/test_integration.py +++ b/securedrop/tests/test_integration.py @@ -15,7 +15,6 @@ from source_app.session_manager import SessionManager from store import Storage from tests import utils -from tests.test_encryption import import_journalist_private_key from tests.utils import login_journalist from two_factor import TOTP @@ -78,11 +77,8 @@ def test_submit_message(journalist_app, source_app, test_journo, app_storage): resp = app.get(submission_url) assert resp.status_code == 200 - encryption_mgr = EncryptionManager.get_default() - with import_journalist_private_key(encryption_mgr): - decryption_result = encryption_mgr._gpg.decrypt(resp.data) - assert decryption_result.ok - assert decryption_result.data.decode("utf-8") == test_msg + decryption_result = utils.decrypt_as_journalist(resp.data).decode() + assert decryption_result == test_msg # delete submission resp = app.get(col_url) @@ -175,12 +171,8 @@ def test_submit_file(journalist_app, source_app, test_journo, app_storage): resp = app.get(submission_url) assert resp.status_code == 200 - encryption_mgr = EncryptionManager.get_default() - with import_journalist_private_key(encryption_mgr): - decrypted_data = encryption_mgr._gpg.decrypt(resp.data) - assert decrypted_data.ok - - sio = BytesIO(decrypted_data.data) + decrypted_data = utils.decrypt_as_journalist(resp.data) + sio = BytesIO(decrypted_data) with gzip.GzipFile(mode="rb", fileobj=sio) as gzip_file: unzipped_decrypted_data = gzip_file.read() mtime = gzip_file.mtime @@ -260,8 +252,6 @@ def _helper_test_reply(journalist_app, source_app, test_journo, test_reply): resp = app.get(col_url) assert resp.status_code == 200 - assert EncryptionManager.get_default().get_source_key_fingerprint(filesystem_id) - # Create 2 replies to test deleting on journalist and source interface with journalist_app.test_client() as app: login_journalist( @@ -301,8 +291,12 @@ def _helper_test_reply(journalist_app, source_app, test_journo, test_reply): zf = zipfile.ZipFile(BytesIO(resp.data), "r") data = zf.read(zf.namelist()[0]) - _can_decrypt_with_journalist_secret_key(data) - _can_decrypt_with_source_secret_key(data, source_user.gpg_secret) + journalist_decrypted = utils.decrypt_as_journalist(data).decode() + assert journalist_decrypted == test_reply + source_decrypted = EncryptionManager.get_default().decrypt_journalist_reply( + source_user, source_user.get_db_record(), data + ) + assert source_decrypted == test_reply # Test deleting reply on the journalist interface last_reply_number = len(soup.select('input[name="doc_names_selected"]')) - 1 @@ -360,26 +354,6 @@ def assertion(): utils.asynchronous.wait_for_assertion(assertion) -def _can_decrypt_with_journalist_secret_key(msg: bytes) -> None: - encryption_mgr = EncryptionManager.get_default() - with import_journalist_private_key(encryption_mgr): - # For GPG 2.1+, a non null passphrase _must_ be passed to decrypt() - decryption_result = encryption_mgr._gpg.decrypt(msg, passphrase="dummy passphrase") - - assert decryption_result.ok, "Could not decrypt msg with key, gpg says: {}".format( - decryption_result.stderr - ) - - -def _can_decrypt_with_source_secret_key(msg: bytes, source_gpg_secret: str) -> None: - encryption_mgr = EncryptionManager.get_default() - decryption_result = encryption_mgr._gpg.decrypt(msg, passphrase=source_gpg_secret) - - assert decryption_result.ok, "Could not decrypt msg with key, gpg says: {}".format( - decryption_result.stderr - ) - - def test_reply_normal(journalist_app, source_app, test_journo): """Test for regression on #1360 (failure to encode bytes before calling gpg functions). diff --git a/securedrop/tests/test_journalist.py b/securedrop/tests/test_journalist.py index a6b20c91a5a..02ddd9e91dc 100644 --- a/securedrop/tests/test_journalist.py +++ b/securedrop/tests/test_journalist.py @@ -41,7 +41,6 @@ from tests import utils from tests.factories import SecureDropConfigFactory from tests.functional.db_session import get_database_session -from tests.utils import login_journalist from tests.utils.i18n import ( get_plural_tests, get_test_locales, @@ -52,6 +51,8 @@ from tests.utils.instrument import InstrumentedApp from two_factor import TOTP +from .utils import create_legacy_gpg_key, login_journalist + # Smugly seed the RNG for deterministic testing random.seed(r"¯\_(ツ)_/¯") @@ -2867,10 +2868,15 @@ def test_delete_collection_updates_db(journalist_app, test_journo, test_source, assert not seen_reply -def test_delete_source_deletes_source_key(journalist_app, test_source, test_journo, app_storage): - """Verify that when a source is deleted, the PGP key that corresponds +def test_delete_source_deletes_gpg_source_key( + journalist_app, test_source, test_journo, app_storage +): + """Verify that when a legacy source is deleted, the GPG key that corresponds to them is also deleted.""" + encryption_mgr = EncryptionManager.get_default() + create_legacy_gpg_key(encryption_mgr, test_source["source_user"], test_source["source"]) + with journalist_app.app_context(): source = Source.query.get(test_source["id"]) journo = Journalist.query.get(test_journo["id"]) @@ -2879,7 +2885,6 @@ def test_delete_source_deletes_source_key(journalist_app, test_source, test_jour utils.db_helper.reply(app_storage, journo, source, 2) # Source key exists - encryption_mgr = EncryptionManager.get_default() assert encryption_mgr.get_source_key_fingerprint(test_source["filesystem_id"]) journalist_app_module.utils.delete_collection(test_source["filesystem_id"]) diff --git a/securedrop/tests/test_journalist_api.py b/securedrop/tests/test_journalist_api.py index 7b264d21483..2cdd88e457b 100644 --- a/securedrop/tests/test_journalist_api.py +++ b/securedrop/tests/test_journalist_api.py @@ -1,16 +1,16 @@ import json import random from datetime import datetime +from pathlib import Path from uuid import UUID, uuid4 from db import db from encryption import EncryptionManager from flask import url_for from models import Journalist, Reply, Source, SourceStar, Submission +from tests.utils.api_helper import get_api_headers from two_factor import TOTP -from .utils.api_helper import get_api_headers - random.seed("◔ ⌣ ◔") @@ -767,7 +767,7 @@ def test_unencrypted_replies_get_rejected( def test_authorized_user_can_add_reply( - journalist_app, journalist_api_token, test_source, test_journo, app_storage + journalist_app, journalist_api_token, test_source, test_journo, app_storage, tmp_path ): with journalist_app.test_client() as app: source_id = test_source["source"].id @@ -776,12 +776,15 @@ def test_authorized_user_can_add_reply( # First we must encrypt the reply, or it will get rejected # by the server. encryption_mgr = EncryptionManager.get_default() - source_key = encryption_mgr.get_source_key_fingerprint(test_source["source"].filesystem_id) - reply_content = encryption_mgr._gpg.encrypt("This is a plaintext reply", source_key).data + reply_path = tmp_path / "message.gpg" + encryption_mgr.encrypt_journalist_reply( + test_source["source"], "This is a plaintext reply", reply_path + ) + reply_content = reply_path.read_text() response = app.post( url_for("api.all_source_replies", source_uuid=uuid), - data=json.dumps({"reply": reply_content.decode("utf-8")}), + data=json.dumps({"reply": reply_content}), headers=get_api_headers(journalist_api_token), ) assert response.status_code == 201 @@ -809,10 +812,9 @@ def test_authorized_user_can_add_reply( source.interaction_count, source.journalist_filename ) - expected_filepath = app_storage.path(source.filesystem_id, expected_filename) + expected_filepath = Path(app_storage.path(source.filesystem_id, expected_filename)) - with open(expected_filepath, "rb") as fh: - saved_content = fh.read() + saved_content = expected_filepath.read_text() assert reply_content == saved_content diff --git a/securedrop/tests/utils/__init__.py b/securedrop/tests/utils/__init__.py index 0f9effe244e..6ec4f9745e7 100644 --- a/securedrop/tests/utils/__init__.py +++ b/securedrop/tests/utils/__init__.py @@ -1,10 +1,20 @@ +import datetime +from pathlib import Path + import flask import models from db import db +from encryption import EncryptionManager from flask.testing import FlaskClient +from source_user import SourceUser from tests.utils import asynchronous, db_helper # noqa: F401 from two_factor import TOTP +import redwood + +JOURNALIST_SECRET_KEY_PATH = Path(__file__).parent.parent / "files" / "test_journalist_key.sec" +JOURNALIST_SECRET_KEY = JOURNALIST_SECRET_KEY_PATH.read_text() + def flaky_filter_xfail(err, *args): """ @@ -56,3 +66,40 @@ def login_journalist( assert session.get_user() is not None _reset_journalist_last_token(username) + + +def decrypt_as_journalist(ciphertext: bytes) -> bytes: + return redwood.decrypt( + ciphertext=ciphertext, + secret_key=JOURNALIST_SECRET_KEY, + passphrase="correcthorsebatterystaple", + ) + + +def create_legacy_gpg_key( + manager: EncryptionManager, source_user: SourceUser, source: models.Source +) -> None: + """Create a GPG key for the source, so we can test pre-Sequoia behavior""" + # All reply keypairs will be "created" on the same day SecureDrop (then + # Strongbox) was publicly released for the first time. + # https://www.newyorker.com/news/news-desk/strongbox-and-aaron-swartz + default_key_creation_date = datetime.date(2013, 5, 14) + gen_key_input = manager._gpg.gen_key_input( + passphrase=source_user.gpg_secret, + name_email=source_user.filesystem_id, + key_type="RSA", + key_length=4096, + name_real="Source Key", + creation_date=default_key_creation_date.isoformat(), + # '0' is the magic value that tells GPG's batch key generation not + # to set an expiration date. + expire_date="0", + ) + manager._gpg.gen_key(gen_key_input) + + # Delete the Sequoia-generated keys + source.pgp_public_key = None + source.pgp_fingerprint = None + source.pgp_secret_key = None + db.session.add(source) + db.session.commit() diff --git a/securedrop/tests/utils/db_helper.py b/securedrop/tests/utils/db_helper.py index 55998f037df..d9161edb596 100644 --- a/securedrop/tests/utils/db_helper.py +++ b/securedrop/tests/utils/db_helper.py @@ -74,7 +74,7 @@ def reply(storage, journalist, source, num_replies): fname = f"{source.interaction_count}-{source.journalist_filename}-reply.gpg" EncryptionManager.get_default().encrypt_journalist_reply( - for_source_with_filesystem_id=source.filesystem_id, + for_source=source, reply_in=str(os.urandom(1)), encrypted_reply_path_out=storage.path(source.filesystem_id, fname), ) @@ -104,7 +104,6 @@ def init_source(storage): source_passphrase=passphrase, source_app_storage=storage, ) - EncryptionManager.get_default().generate_source_key_pair(source_user) return source_user.get_db_record(), passphrase